湘潭网站建设有名磐石网络,wordpress strip_tags,摄影创意网站,公司做网站广告语Flink学习笔记 前言#xff1a;今天是学习 flink 的第 17 天啦#xff01;学习了 flinkSQL 的客户端工具 flinkSQL-client#xff0c;主要是解决大数据领域数据计算避免频繁提交jar包#xff0c;而是简单编写sql即可测试数据#xff0c;文章中主要结合 hive#xff0c;即…Flink学习笔记 前言今天是学习 flink 的第 17 天啦学习了 flinkSQL 的客户端工具 flinkSQL-client主要是解决大数据领域数据计算避免频繁提交jar包而是简单编写sql即可测试数据文章中主要结合 hive即编写 flinksql 可以操作 hive 中的数据表以及流批一体化kafak 将数据写入到 hive中结合自己实验猜想和代码实践总结了很多自己的理解和想法希望和大家多多交流 Tips分享是快乐的源泉在我的博客里不仅有知识的海洋还有满满的正能量加持快来和我一起分享这份快乐吧 喜欢我的博客的话记得点个红心❤️和小关小注哦您的支持是我创作的动力 文章目录 Flink学习笔记三、FlinkSQL Client1. 启动 Hive 组件2. Flink-Client 准备操作3. Flink-Client 三种显示格式4. 与 hive 结合的初始化5. 两种方式创建 Hive 相关库表5.1 SQL 方式5.2 Table-API 方式 6. 基于 Catalog 的数据库操作7. kafka 通过 FlinkSQL Client 写入 Hive7.1 flink 准备 jar 包7.2 案例演示 三、FlinkSQL Client
1. 启动 Hive 组件
###############################################################
# 记得先启动 hadoop 不然连接 beeline 时报错
Error: Could not open client transport with JDBC Uri: jdbc:hive2://node1:10000: java.net.ConnectException: 拒绝连接 (Connection refused) (state08S01,code0)##hive:##nohup hive --service metastore nohup.outnohup hive --service hiveserver2 hiveserver2.outbeeline!connect jdbc:hive2://node1:10000关闭ps -ef | grep hivekill -9 pid
###############################################################2. Flink-Client 准备操作
可以在建表或者建库的时候可以直接编写 SQL 语句提交web 页面也能显示 Running省去打包操作
flink 的 lib 目录下 jar 包 基于 flink 1.3.1 hive 2.1.1jar 包放在我的资源区了大家有需要可以下载一下 flink-connector-hive_2.11-1.13.1.jar
hive-exec-2.1.1.jar
antlr-runtime-3.4.jar需要提前启动 flink 集群模式
./bin/start-cluster.sh进入 Flink Client 操作
./bin/sql-client.sh embedded退出操作
quit;3. Flink-Client 三种显示格式
1- 表格模式
SET sql-client.execution.result-modetable;2- 变更日志模式
SET sql-client.execution.result-modechangelog;3- Tableau 模式
SET sql-client.execution.result-modetableau;4. 与 hive 结合的初始化
创建 init.sql
# 显示打印错误日志信息
SET sql-client.verbose true;
CREATE CATALOG myhive WITH (type hive,hive-conf-dir/export/server/flink-1.13.1/conf);USE CATALOG myhive;CREATE DATABASE if not exists itcast_flinksql;USE itcast_flinksql;初始化启动
./sql-client.sh -i init.sql5. 两种方式创建 Hive 相关库表
现将 hive 的 hive-site.xml 文件保存到 src/main/resource 目录下
5.1 SQL 方式
package cn.itcast.day01.catalog;/*** author lql* time 2024-03-14 09:38:43* description TODO*/import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;import java.util.stream.Stream;/*** 用户可以使用 DDL 通过 Table API 或者 SQL Client 在 Catalog 中创建表*/
public class SqlDDLDemo {public static void main(String[] args) {//todo 0设置当前hadoop操作的用户名System.setProperty(HADOOP_USER_NAME, root);//todo 1初始化flink流处理的运行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();EnvironmentSettings bbSetting EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tabEnv StreamTableEnvironment.create(env, bbSetting);//todo 2创建HiveCatalogString catalogName myHive;String databaseName itcast_flinksql;HiveCatalog catalog new HiveCatalog(catalogName, //指定catalog的名字default, //默认数据库的名字src/main/resources, //指定hive-site.xml文件的路径2.1.1 //指定hive的版本);//todo 3注册目录System.out.println(注册目录);tabEnv.registerCatalog(catalogName, catalog);//todo 4切换目录System.out.println(切换目录);tabEnv.useCatalog(catalogName);//todo 5创建数据库System.out.println(创建数据库);String createDBSql CREATE DATABASE IF NOT EXISTS catalogName.databaseName;tabEnv.executeSql(createDBSql);//todo 6切换数据库System.out.println(切换数据库);tabEnv.useDatabase(databaseName);//todo 7创建表System.out.println(创建表);String createTableSql CREATE TABLE IF NOT EXISTS mytable(name string, age int);tabEnv.executeSql(createTableSql);//todo 8查询所有的表System.out.println(创建表);tabEnv.listTables();}
}
结果成功注册了目录数据库表
总结
1- 需要设置 hadoop 权限2- new 一个 HiveCatalog 对象 5.2 Table-API 方式
package cn.itcast.day01.catalog;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import java.util.HashMap;
import java.util.Map;/*** author lql* time 2023-03-14 09:10:58* description TODO 用户可以用编程的方式使用Java 或者 Scala 来创建 Catalog 表。*/
public class JavaClientDemo {public static void main(String[] args) throws DatabaseAlreadyExistException, TableAlreadyExistException, DatabaseNotExistException {// 设置当前hadoop操作的用户名System.setProperty(HADOOP_USER_NAME,root);// 创建流处理环境设置并行度为 5StreamExecutionEnvironment streamEnv StreamExecutionEnvironment.getExecutionEnvironment();streamEnv.setParallelism(5);// 设置 table 环境使用 blink planner并开启流式查询模式EnvironmentSettings tableEnvSettings EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tableEnv StreamTableEnvironment.create(streamEnv, tableEnvSettings);// 定义 Hive Catalog 名称、数据库名称、表格名称等信息String catalogName myhive;String databaseName itcast_flinksql;String tableName test;// 创建一个 HiveCatalog 实例用于访问 Hive 中的表格和数据库HiveCatalog catalog new HiveCatalog(catalogName, // Catalog 名称default, // 默认使用的数据库src/main/resources, // Hive 配置文件的目录路径hive-site.xml2.1.1 // Hive 版本号);// 注册 HiveCatalog// 在注册之前需要保证在指定目录下有 metadata 目录并且 metadata 目录下没有 myhive 目录否则会失败tableEnv.registerCatalog(catalogName, catalog);tableEnv.useCatalog(catalogName);// 创建数据库System.out.println(---------------创建数据库------------------------);catalog.createDatabase(databaseName,new CatalogDatabaseImpl(new HashMap(),my_comments),true);// 创建表格System.out.println(--------------创建表table-----------------------------);TableSchema schema TableSchema.builder().field(name, DataTypes.STRING()).field(age, DataTypes.INT()).build();MapString,String properties new HashMap();/*** flink 通用表将具有is_generictrue.*/properties.put(is_generic,String.valueOf(true));catalog.createTable(new ObjectPath(databaseName,tableName),new CatalogTableImpl(schema,properties,my_comments),true);System.out.println(tableName);}
}结果成功注册了目录数据库表
总结
1- 建立数据库时需要 new 一个 CatalogDatabaseImpl2- 创建表格式需要先定义数据结构schema3- 设置 hadoop 操作的用户名 6. 基于 Catalog 的数据库操作
例子对 hive 数据库进行创建、删除、判断等操作的代码 java 实现
package cn.itcast.day01.catalog;/*** author lql* time 2024-03-14 09:37:16* description TODO*/
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.CatalogDatabase;
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.hive.HiveCatalog;import java.util.HashMap;/*** 数据库操作*/
public class DatabaseOperater {public static void main(String[] args) throws DatabaseAlreadyExistException, DatabaseNotEmptyException, DatabaseNotExistException {//设置当前hadoop操作的用户名System.setProperty(HADOOP_USER_NAME, root);StreamExecutionEnvironment streamEnv StreamExecutionEnvironment.getExecutionEnvironment();streamEnv.setParallelism(5);EnvironmentSettings tableEnvSettings EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tableEnv StreamTableEnvironment.create(streamEnv, tableEnvSettings);String catalogName myhive;String databaseName itcast_flinksql;String tableName test;HiveCatalog catalog new HiveCatalog(catalogName, // catalog namedefault, // default databasesrc/main/resources, // Hive config (hive-site.xml) directory2.1.1 // Hive version);//注册目录tableEnv.registerCatalog(catalogName, catalog);tableEnv.useCatalog(catalogName);System.out.println(---------------创建数据库------------------------);catalog.createDatabase(databaseName,new CatalogDatabaseImpl(new HashMap(), my comment), true);System.out.println(---------------删除数据库------------------------);catalog.dropDatabase(databaseName, true, true);System.out.println(---------------验证数据库是否存在------------------------);boolean result catalog.databaseExists(databaseName);System.out.println(---------------result------------------------);System.out.println(---------------在目录中列出数据库------------------------);catalog.listDatabases();}
}总结需要 new 一个 HashMap 7. kafka 通过 FlinkSQL Client 写入 Hive
7.1 flink 准备 jar 包
flink-sql-connector-kafka_2.12-1.13.1.jar
flink-connector-kafka_2.12-1.13.1.jar
flink-shaded-hadoop-2-uber-2.7.5-10.0.jar# 如果是集群模式需要将 lib 包分发到各台机器7.2 案例演示
准备工作
# 启动 zookeeper
# 启动 hadoop
# 启动 hive
# 启动 flink cluster 模式
# 进入 flinksql client 客户端注意要用前面的 init.sql 脚本初始化逻辑 sql
CREATE TABLE IF NOT EXISTS order(id INT,category STRING,areaName STRING,money INT,timestamp BIGINT,eventTime AS TO_TIMESTAMP(FROM_UNIXTIME(timestamp / 1000, yyyy-MM-dd HH:mm:ss)), -- 事件时间WATERMARK FOR eventTime AS eventTime - INTERVAL 10 SECOND -- 水印
) WITH (connector kafka,topic order, -- 指定消费的topicscan.startup.mode latest-offset, -- 指定起始offset位置properties.zookeeper.connect node1:2181,properties.bootstrap.servers node1:9092,properties.group.id order_01,format json,json.ignore-parse-errors true
);启动kafka生产者的数据
{id:1,timestamp:1588870980000,category:电脑,areaName:石家庄,money:1450}
{id:2,timestamp:1588870860000,category:手机,areaName:北京,money:1450}
{id:3,timestamp:1588870980000,category:手机,areaName:北京,money:8412}
{id:4,timestamp:1588885260000,category:电脑,areaName:上海,money:1513}
{id:5,timestamp:1588870980000,category:家电,areaName:北京,money:1550}
{id:6,timestamp:1588870860000,category:电脑,areaName:深圳,money:1550}结果kafka 数据源源不断写入到 hive 表中
总结
1- 在 hive 中可以看到表但是不能查询数据报错 Error因为这个表是 flink 通用表2- 如果想要建 hive 兼容表需要通用表将具有 is_generic true改为 is_generic False。