当前位置: 首页 > news >正文

湘潭网站建设有名磐石网络wordpress strip_tags

湘潭网站建设有名磐石网络,wordpress strip_tags,摄影创意网站,公司做网站广告语Flink学习笔记 前言#xff1a;今天是学习 flink 的第 17 天啦#xff01;学习了 flinkSQL 的客户端工具 flinkSQL-client#xff0c;主要是解决大数据领域数据计算避免频繁提交jar包#xff0c;而是简单编写sql即可测试数据#xff0c;文章中主要结合 hive#xff0c;即…Flink学习笔记 前言今天是学习 flink 的第 17 天啦学习了 flinkSQL 的客户端工具 flinkSQL-client主要是解决大数据领域数据计算避免频繁提交jar包而是简单编写sql即可测试数据文章中主要结合 hive即编写 flinksql 可以操作 hive 中的数据表以及流批一体化kafak 将数据写入到 hive中结合自己实验猜想和代码实践总结了很多自己的理解和想法希望和大家多多交流 Tips分享是快乐的源泉在我的博客里不仅有知识的海洋还有满满的正能量加持快来和我一起分享这份快乐吧 喜欢我的博客的话记得点个红心❤️和小关小注哦您的支持是我创作的动力 文章目录 Flink学习笔记三、FlinkSQL Client1. 启动 Hive 组件2. Flink-Client 准备操作3. Flink-Client 三种显示格式4. 与 hive 结合的初始化5. 两种方式创建 Hive 相关库表5.1 SQL 方式5.2 Table-API 方式 6. 基于 Catalog 的数据库操作7. kafka 通过 FlinkSQL Client 写入 Hive7.1 flink 准备 jar 包7.2 案例演示 三、FlinkSQL Client 1. 启动 Hive 组件 ############################################################### # 记得先启动 hadoop 不然连接 beeline 时报错 Error: Could not open client transport with JDBC Uri: jdbc:hive2://node1:10000: java.net.ConnectException: 拒绝连接 (Connection refused) (state08S01,code0)##hive:##nohup hive --service metastore nohup.outnohup hive --service hiveserver2 hiveserver2.outbeeline!connect jdbc:hive2://node1:10000关闭ps -ef | grep hivekill -9 pid ###############################################################2. Flink-Client 准备操作 可以在建表或者建库的时候可以直接编写 SQL 语句提交web 页面也能显示 Running省去打包操作 flink 的 lib 目录下 jar 包 基于 flink 1.3.1 hive 2.1.1jar 包放在我的资源区了大家有需要可以下载一下 flink-connector-hive_2.11-1.13.1.jar hive-exec-2.1.1.jar antlr-runtime-3.4.jar需要提前启动 flink 集群模式 ./bin/start-cluster.sh进入 Flink Client 操作 ./bin/sql-client.sh embedded退出操作 quit;3. Flink-Client 三种显示格式 1- 表格模式 SET sql-client.execution.result-modetable;2- 变更日志模式 SET sql-client.execution.result-modechangelog;3- Tableau 模式 SET sql-client.execution.result-modetableau;4. 与 hive 结合的初始化 创建 init.sql # 显示打印错误日志信息 SET sql-client.verbose true; CREATE CATALOG myhive WITH (type hive,hive-conf-dir/export/server/flink-1.13.1/conf);USE CATALOG myhive;CREATE DATABASE if not exists itcast_flinksql;USE itcast_flinksql;初始化启动 ./sql-client.sh -i init.sql5. 两种方式创建 Hive 相关库表 现将 hive 的 hive-site.xml 文件保存到 src/main/resource 目录下 5.1 SQL 方式 package cn.itcast.day01.catalog;/*** author lql* time 2024-03-14 09:38:43* description TODO*/import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.catalog.hive.HiveCatalog;import java.util.stream.Stream;/*** 用户可以使用 DDL 通过 Table API 或者 SQL Client 在 Catalog 中创建表*/ public class SqlDDLDemo {public static void main(String[] args) {//todo 0设置当前hadoop操作的用户名System.setProperty(HADOOP_USER_NAME, root);//todo 1初始化flink流处理的运行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();EnvironmentSettings bbSetting EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tabEnv StreamTableEnvironment.create(env, bbSetting);//todo 2创建HiveCatalogString catalogName myHive;String databaseName itcast_flinksql;HiveCatalog catalog new HiveCatalog(catalogName, //指定catalog的名字default, //默认数据库的名字src/main/resources, //指定hive-site.xml文件的路径2.1.1 //指定hive的版本);//todo 3注册目录System.out.println(注册目录);tabEnv.registerCatalog(catalogName, catalog);//todo 4切换目录System.out.println(切换目录);tabEnv.useCatalog(catalogName);//todo 5创建数据库System.out.println(创建数据库);String createDBSql CREATE DATABASE IF NOT EXISTS catalogName.databaseName;tabEnv.executeSql(createDBSql);//todo 6切换数据库System.out.println(切换数据库);tabEnv.useDatabase(databaseName);//todo 7创建表System.out.println(创建表);String createTableSql CREATE TABLE IF NOT EXISTS mytable(name string, age int);tabEnv.executeSql(createTableSql);//todo 8查询所有的表System.out.println(创建表);tabEnv.listTables();} } 结果成功注册了目录数据库表 总结 1- 需要设置 hadoop 权限2- new 一个 HiveCatalog 对象 5.2 Table-API 方式 package cn.itcast.day01.catalog;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.catalog.CatalogDatabaseImpl; import org.apache.flink.table.catalog.CatalogTableImpl; import org.apache.flink.table.catalog.ObjectPath; import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException; import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException; import org.apache.flink.table.catalog.hive.HiveCatalog; import java.util.HashMap; import java.util.Map;/*** author lql* time 2023-03-14 09:10:58* description TODO 用户可以用编程的方式使用Java 或者 Scala 来创建 Catalog 表。*/ public class JavaClientDemo {public static void main(String[] args) throws DatabaseAlreadyExistException, TableAlreadyExistException, DatabaseNotExistException {// 设置当前hadoop操作的用户名System.setProperty(HADOOP_USER_NAME,root);// 创建流处理环境设置并行度为 5StreamExecutionEnvironment streamEnv StreamExecutionEnvironment.getExecutionEnvironment();streamEnv.setParallelism(5);// 设置 table 环境使用 blink planner并开启流式查询模式EnvironmentSettings tableEnvSettings EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tableEnv StreamTableEnvironment.create(streamEnv, tableEnvSettings);// 定义 Hive Catalog 名称、数据库名称、表格名称等信息String catalogName myhive;String databaseName itcast_flinksql;String tableName test;// 创建一个 HiveCatalog 实例用于访问 Hive 中的表格和数据库HiveCatalog catalog new HiveCatalog(catalogName, // Catalog 名称default, // 默认使用的数据库src/main/resources, // Hive 配置文件的目录路径hive-site.xml2.1.1 // Hive 版本号);// 注册 HiveCatalog// 在注册之前需要保证在指定目录下有 metadata 目录并且 metadata 目录下没有 myhive 目录否则会失败tableEnv.registerCatalog(catalogName, catalog);tableEnv.useCatalog(catalogName);// 创建数据库System.out.println(---------------创建数据库------------------------);catalog.createDatabase(databaseName,new CatalogDatabaseImpl(new HashMap(),my_comments),true);// 创建表格System.out.println(--------------创建表table-----------------------------);TableSchema schema TableSchema.builder().field(name, DataTypes.STRING()).field(age, DataTypes.INT()).build();MapString,String properties new HashMap();/*** flink 通用表将具有is_generictrue.*/properties.put(is_generic,String.valueOf(true));catalog.createTable(new ObjectPath(databaseName,tableName),new CatalogTableImpl(schema,properties,my_comments),true);System.out.println(tableName);} }结果成功注册了目录数据库表 总结 1- 建立数据库时需要 new 一个 CatalogDatabaseImpl2- 创建表格式需要先定义数据结构schema3- 设置 hadoop 操作的用户名 6. 基于 Catalog 的数据库操作 例子对 hive 数据库进行创建、删除、判断等操作的代码 java 实现 package cn.itcast.day01.catalog;/*** author lql* time 2024-03-14 09:37:16* description TODO*/ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.catalog.CatalogDatabase; import org.apache.flink.table.catalog.CatalogDatabaseImpl; import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException; import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException; import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; import org.apache.flink.table.catalog.hive.HiveCatalog;import java.util.HashMap;/*** 数据库操作*/ public class DatabaseOperater {public static void main(String[] args) throws DatabaseAlreadyExistException, DatabaseNotEmptyException, DatabaseNotExistException {//设置当前hadoop操作的用户名System.setProperty(HADOOP_USER_NAME, root);StreamExecutionEnvironment streamEnv StreamExecutionEnvironment.getExecutionEnvironment();streamEnv.setParallelism(5);EnvironmentSettings tableEnvSettings EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tableEnv StreamTableEnvironment.create(streamEnv, tableEnvSettings);String catalogName myhive;String databaseName itcast_flinksql;String tableName test;HiveCatalog catalog new HiveCatalog(catalogName, // catalog namedefault, // default databasesrc/main/resources, // Hive config (hive-site.xml) directory2.1.1 // Hive version);//注册目录tableEnv.registerCatalog(catalogName, catalog);tableEnv.useCatalog(catalogName);System.out.println(---------------创建数据库------------------------);catalog.createDatabase(databaseName,new CatalogDatabaseImpl(new HashMap(), my comment), true);System.out.println(---------------删除数据库------------------------);catalog.dropDatabase(databaseName, true, true);System.out.println(---------------验证数据库是否存在------------------------);boolean result catalog.databaseExists(databaseName);System.out.println(---------------result------------------------);System.out.println(---------------在目录中列出数据库------------------------);catalog.listDatabases();} }总结需要 new 一个 HashMap 7. kafka 通过 FlinkSQL Client 写入 Hive 7.1 flink 准备 jar 包 flink-sql-connector-kafka_2.12-1.13.1.jar flink-connector-kafka_2.12-1.13.1.jar flink-shaded-hadoop-2-uber-2.7.5-10.0.jar# 如果是集群模式需要将 lib 包分发到各台机器7.2 案例演示 准备工作 # 启动 zookeeper # 启动 hadoop # 启动 hive # 启动 flink cluster 模式 # 进入 flinksql client 客户端注意要用前面的 init.sql 脚本初始化逻辑 sql CREATE TABLE IF NOT EXISTS order(id INT,category STRING,areaName STRING,money INT,timestamp BIGINT,eventTime AS TO_TIMESTAMP(FROM_UNIXTIME(timestamp / 1000, yyyy-MM-dd HH:mm:ss)), -- 事件时间WATERMARK FOR eventTime AS eventTime - INTERVAL 10 SECOND -- 水印 ) WITH (connector kafka,topic order, -- 指定消费的topicscan.startup.mode latest-offset, -- 指定起始offset位置properties.zookeeper.connect node1:2181,properties.bootstrap.servers node1:9092,properties.group.id order_01,format json,json.ignore-parse-errors true );启动kafka生产者的数据 {id:1,timestamp:1588870980000,category:电脑,areaName:石家庄,money:1450} {id:2,timestamp:1588870860000,category:手机,areaName:北京,money:1450} {id:3,timestamp:1588870980000,category:手机,areaName:北京,money:8412} {id:4,timestamp:1588885260000,category:电脑,areaName:上海,money:1513} {id:5,timestamp:1588870980000,category:家电,areaName:北京,money:1550} {id:6,timestamp:1588870860000,category:电脑,areaName:深圳,money:1550}结果kafka 数据源源不断写入到 hive 表中 总结 1- 在 hive 中可以看到表但是不能查询数据报错 Error因为这个表是 flink 通用表2- 如果想要建 hive 兼容表需要通用表将具有 is_generic true改为 is_generic False。
http://www.pierceye.com/news/92261/

相关文章:

  • 如何做挂qq的网站2017网站建设
  • wordpress语言切换网站保定广告设计公司
  • 做网站需要走公司吗运行一个网站要多少钱
  • 怎样可以免费做网站wap网站软件
  • 织梦手机网站免费模板漳州城乡建设局网站
  • 厦门建设网站的公司php除了写网站吗
  • 如何做全网影视网站居然之家装修公司怎么样
  • 佛山网站建设公司哪家最好万能软文范例800字
  • 网站排名优化如何做wordpress 免费版广告
  • 拓客网站建设建易网官网
  • 网站目录链接怎么做的建网站pc版 (报价)
  • 北京网站制作业务如何开展做网站海报
  • 网站的设计方法有哪些互动网络游戏公司网站建设
  • 公司网站开发制作公司国内重大新闻2022
  • 搜索引擎排名网站北京到广州快递要几天
  • 制作网站怎么制作html网站 下载
  • 深圳网络营销网站设计做个网站哪里可以做
  • 九牛科技网站开发微信营销小型网站建设步骤
  • 分类信息系统网站模板口碑好的网站建设多少钱
  • 米粒网站建设网站开发项目费用预算
  • 12380网站建设的意见建议公司网站维护和更新属于哪个部门
  • 公众号做微网站吗做国外网站的站长
  • 现在网站优化app程序开发定制
  • 德阳网站怎么做seowordpress app 插件
  • 水文化建设网站网站排名优化公司哪家好
  • 网站图片的暗纹是怎么做的做家教中介 不建网站怎么做
  • 学校网站建设价格明细表淮安网站网站建设
  • 怎样做代刷网站长电子商务网站开发费用入账
  • 网站健设推广产品多少钱商业网站开发的实训小结怎么写
  • 优秀的网站建设推荐做百度推广是网站好还是阿里好