当前位置: 首页 > news >正文

长沙网站设计服务建筑公司注册资金最低多少

长沙网站设计服务,建筑公司注册资金最低多少,wordpress播放代码,wordpress 数据库缓存插件Flink 系列文章 1、Flink 部署、概念介绍、source、transformation、sink使用示例、四大基石介绍和示例等系列综合文章链接 13、Flink 的table api与sql的基本概念、通用api介绍及入门示例 14、Flink 的table api与sql之数据类型: 内置数据类型以及它们的属性 15、Flink 的ta…Flink 系列文章 1、Flink 部署、概念介绍、source、transformation、sink使用示例、四大基石介绍和示例等系列综合文章链接 13、Flink 的table api与sql的基本概念、通用api介绍及入门示例 14、Flink 的table api与sql之数据类型: 内置数据类型以及它们的属性 15、Flink 的table api与sql之流式概念-详解的介绍了动态表、时间属性配置如何处理更新结果、时态表、流上的join、流上的确定性以及查询配置 16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及FileSystem示例1 16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Elasticsearch示例2 16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Kafka示例3 16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及JDBC示例4 16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Hive示例6 20、Flink SQL之SQL Client: 不用编写代码就可以尝试 Flink SQL可以直接提交 SQL 任务到集群上 22、Flink 的table api与sql之创建表的DDL 24、Flink 的table api与sql之Catalogs 30、Flink SQL之SQL 客户端通过kafka和filesystem的例子介绍了配置文件使用-表、视图等 42、Flink 的table api与sql之Hive Catalog 文章目录 Flink 系列文章一、Hive Catalog二、Set up HiveCatalog1、Dependencies2、Configuration 三、How to use HiveCatalog四、示例-Flink 集成 Hive1、修改hive的配置文件非常重要 2、配置Flink集群和SQL cli1、将所有 Hive 依赖项添加到 Flink 发行版中的 /lib 文件夹中2、修改 SQL CLI 的 yaml 配置文件 sql-cli-defaults.yaml 3、验证kafka集群生产-消费功能1、kafka发送消息2、kafka接收消息 4、在Flink Cli创建kafka表5、通过kafka发送消息同时在flink中查询1、kafka发送消息2、Flink sql cli查询数据 五、支持的数据类型六、Scala Shell 本文以一个详细的示例介绍了Flink与hive的集成其中涉及的版本在示例部分有消息的说明。 本文依赖有hadoop、hive、kafka、mysql、flink等所有环境可用。 本分分为6个部分即hivecatalog介绍、依赖、怎么使用和详细示例、flink与hive的胡数据类型映射等。 一、Hive Catalog 多年来Hive Metastore已经发展成为Hadoop生态系统中事实上的元数据中心。许多公司在其生产中只有一个 Hive Metastore service实例来管理其所有元数据Hive 元数据或非 Hive 元数据。 对于同时拥有 Hive 和 Flink 部署的用户HiveCatalog 使他们能够使用 Hive Metastore 来管理 Flink 的元数据。 对于刚刚部署 Flink 的用户HiveCatalog 是 Flink 提供的唯一开箱即用的持久目录。如果没有持久目录使用 Flink SQL CREATE DDL 的用户必须在每个会话中重复创建像 Kafka 表这样的元对象这会浪费大量时间。HiveCatalog 填补了这一空白使用户能够只创建一次表和其他元对象并在以后跨会话方便地引用和管理它们。 二、Set up HiveCatalog 1、Dependencies 在 Flink 中设置 HiveCatalog 需要与整个 Flink-Hive 集成相同的依赖关系。具体参考16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Hive示例6 2、Configuration 在 Flink 中设置 HiveCatalog 需要与整个 Flink-Hive 集成相同的配置。 具体参考16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Hive示例6 三、How to use HiveCatalog 正确配置后HiveCatalog 应该可以开箱即用。用户可以用 DDL 创建 Flink 元对象之后应该会立即看到它们。 HiveCatalog 可用于处理两种类型的表与 Hive 兼容的表和泛型表Hive-compatible tables and generic tables。与 Hive 兼容的表是以 Hive 兼容方式存储的表就存储层中的元数据和数据而言。因此通过 Flink 创建的 Hive 兼容表可以从 Hive 端查询。 另一方面Generic tables是特定于 Flink 的。使用 HiveCatalog 创建Generic tables时我们只是使用 HMS 来保存元数据。虽然这些表对 Hive 可见但 Hive 不太可能理解元数据。因此在 Hive 中使用此类表会导致未定义的行为。 Flink 使用属性 ‘is_generic’ 来判断表是与 Hive 兼容还是泛型。使用 HiveCatalog 创建表时默认情况下将其视为泛型表。如果要创建与 Hive 兼容的表请确保在表属性中将 is_generic 设置为 false。 如上所述不应从 Hive 使用泛型表。在 Hive CLI 中可以为表调用描述格式并通过检查 is_generic 属性来确定它是否为泛型。泛型表将具有 is_generictrue。 四、示例-Flink 集成 Hive 本示例是介绍flink集成hive的内容主要体现的是flink和hive共用hive的元数据。 本示例中hive的版本是3.1.2 flink的版本是1.13.6 hadoop的版本是3.1.4 kafka的版本是2.12-3.0.0 mysql的版本是5非8 1、修改hive的配置文件 下面的配置在我们部署hive的时候已经配置过可能内容不一样详见1、apache-hive-3.1.2简介及部署三种部署方式-内嵌模式、本地模式和远程模式及验证详解 非常重要 网络上很多都是介绍jar文件的没有该种情况介绍 如果hive的其他机器上如果hive-site.xml文件中配置的如下远程访问hiveserver2的则需要按照如下方式配置否则flink不能访问hive的metadata服务。 网络上很多都是介绍jar文件的没有该种情况介绍 ?xml version1.0 encodingUTF-8 standaloneno? ?xml-stylesheet typetext/xsl hrefconfiguration.xsl?configuration!-- 远程模式部署metastore 服务地址 --propertynamehive.metastore.uris/namevaluethrift://server4:9083/value/property/configuration不能访问元数据的错误提示 Flink SQL show tables; ---------------------- | table name | ---------------------- | alanchan_kafka_table | | source_table | ---------------------- 2 rows in setFlink SQL desc source_table; [ERROR] Could not execute SQL statement. Reason: org.apache.hadoop.hive.metastore.api.MetaException: Your client does not appear to support Hive tests. To skip capability checks, please set metastore.client.capability.check to false. This setting can be set globally, or on the client for the current metastore session. Note that this may lead to incorrect results, data loss, undefined behavior, etc. if your client is actually incompatible. You can also specify custom client capabilities via get_table_req API. 我的环境配置文件本地路径是 /usr/local/bigdata/apache-hive-3.1.2-bin/conf/hive-site.xml 配置如下所示 configurationpropertynamejavax.jdo.option.ConnectionURL/namevaluejdbc:mysql://localhost/metastore?createDatabaseIfNotExisttrue/valuedescriptionmetadata is stored in a MySQL server/description/propertypropertynamejavax.jdo.option.ConnectionDriverName/namevaluecom.mysql.jdbc.Driver/valuedescriptionMySQL JDBC driver class/description/propertypropertynamejavax.jdo.option.ConnectionUserName/namevalueroot/valuedescriptionuser name for connecting to mysql server/description/propertypropertynamejavax.jdo.option.ConnectionPassword/namevalue123456/valuedescriptionpassword for connecting to mysql server/description/propertypropertynamehive.metastore.uris/namevaluethrift://server4:9083/valuedescriptionIP address (or fully-qualified domain name) and port of the metastore host/description/propertypropertynamehive.metastore.schema.verification/namevaluetrue/value/property/configuration将hive的配置文件配置好后进行启动然后验证hive运行情况。 启动命令 nohup /usr/local/bigdata/apache-hive-3.1.2-bin/bin/hive --service metastore /usr/local/bigdata/apache-hive-3.1.2-bin/logs/metastore.log --hiveconf hive.root.loggerWARN,console 21 nohup /usr/local/bigdata/apache-hive-3.1.2-bin/bin/hive --service hiveserver2 /usr/local/bigdata/apache-hive-3.1.2-bin/logs/hiveserver2.log --hiveconf hive.root.loggerWARN,console 21 或hive-metastore service hive-metastore starthive-server2 service hive-server2 start! connect jdbc:hive2://server4:10000验证hive [alanchanserver4 apache-hive-3.1.2-bin]$ beeline Beeline version 3.1.2 by Apache Hive beeline ! connect jdbc:hive2://server4:10000 Connecting to jdbc:hive2://server4:10000 Enter username for jdbc:hive2://server4:10000: alanchan(根据自己当初配置的用户名和密码输入) Enter password for jdbc:hive2://server4:10000: ********(根据自己当初配置的用户名和密码输入) Connected to: Apache Hive (version 3.1.2) Driver: Hive JDBC (version 3.1.2) Transaction isolation: TRANSACTION_REPEATABLE_READ 0: jdbc:hive2://server4:10000 show databases; -------------------------- | database_name | -------------------------- | alan_hivecatalog_hivedb | | default | | test | | testhive | -------------------------- 4 rows selected (0.14 seconds) 0: jdbc:hive2://server4:10000 use test; No rows affected (0.025 seconds) 0: jdbc:hive2://server4:10000 show tables; ---------------- | tab_name | ---------------- | dim_address | | dim_channel | | dim_date | | dim_product | | dim_region | | dim_user | | dms_content_t | | dw_sales | | fact_order | | fact_order2 | ---------------- 10 rows selected (0.031 seconds) 0: jdbc:hive2://server4:10000 2、配置Flink集群和SQL cli 1、将所有 Hive 依赖项添加到 Flink 发行版中的 /lib 文件夹中 注意版本和名称flink不同的版本要求不同。我的环境下总计如下jar包 antlr-runtime-3.5.2.jar flink-connector-hive_2.12-1.13.6.jar flink-connector-jdbc_2.11-1.13.6.jar flink-csv-1.13.5.jar flink-dist_2.11-1.13.5.jar flink-json-1.13.5.jar flink-shaded-hadoop-2-uber-2.8.3-10.0.jar flink-shaded-hadoop-3-3.1.1.7.2.9.0-173-9.0.jar flink-shaded-zookeeper-3.4.14.jar flink-sql-connector-elasticsearch7_2.11-1.13.6.jar flink-sql-connector-hive-3.1.2_2.12-1.13.6.jar flink-sql-connector-kafka_2.11-1.13.5.jar flink-table_2.11-1.13.5.jar flink-table-blink_2.11-1.13.5.jar guava-27.0-jre.jar hive-exec-3.1.2.jar libfb303-0.9.3.jar mysql-connector-java-6.0.6.jar 而集成hive的jar如下 antlr-runtime-3.5.2.jar flink-connector-hive_2.12-1.13.6.jar flink-shaded-hadoop-2-uber-2.8.3-10.0.jar flink-shaded-hadoop-3-3.1.1.7.2.9.0-173-9.0.jar flink-sql-connector-hive-3.1.2_2.12-1.13.6.jar guava-27.0-jre.jar hive-exec-3.1.2.jar libfb303-0.9.3.jar mysql-connector-java-6.0.6.jar2、修改 SQL CLI 的 yaml 配置文件 sql-cli-defaults.yaml 配置 如下所示仅示例部分 execution:planner: blinktype: streaming...current-catalog: myhive # set the HiveCatalog as the current catalog of the sessioncurrent-database: mydatabasecatalogs:- name: myhivetype: hivehive-conf-dir: /opt/hive-conf # contains hive-site.xml-------------------具体示例--------------------------- # 定义 catalogs catalogs:- name: alan_hivecatalogtype: hiveproperty-version: 1hive-conf-dir: /usr/local/bigdata/apache-hive-3.1.2-bin/conf # 须包含 hive-site.xml# 改变表程序基本的执行行为属性。 execution:planner: blink # 可选 blink 默认或 oldtype: streaming # 必选执行模式为 batch 或 streamingresult-mode: table # 必选table 或 changelogmax-table-result-rows: 1000000 # 可选table 模式下可维护的最大行数默认为 1000000小于 1 则表示无限制time-characteristic: event-time # 可选 processing-time 或 event-time 默认parallelism: 1 # 可选Flink 的并行数量默认为 1periodic-watermarks-interval: 200 # 可选周期性 watermarks 的间隔时间默认 200 msmax-parallelism: 16 # 可选Flink 的最大并行数量默认 128min-idle-state-retention: 0 # 可选表程序的最小空闲状态时间max-idle-state-retention: 0 # 可选表程序的最大空闲状态时间current-catalog: alan_hivecatalog # 可选当前会话 catalog 的名称默认为 default_catalogcurrent-database: alan_hivecatalog_hivedb # 可选当前 catalog 的当前数据库名称默认为当前 catalog 的默认数据库restart-strategy: # 可选重启策略restart-strategytype: fallback # 默认情况下“回退”到全局重启策略# 用于调整和调优表程序的配置选项。 # 在专用的”配置”页面上可以找到完整的选项列表及其默认值。 configuration:table.optimizer.join-reorder-enabled: truetable.exec.spill-compression.enabled: truetable.exec.spill-compression.block-size: 128kb# 描述表程序提交集群的属性。 deployment:response-timeout: 5000验证配置内容 -- 1、启动flink sql如果之前的sql cli启动中则需要重启本示例使用的是yarn-session模式 [alanchanserver1 bin]$ sql-client.sh -s yarn-session 2023-08-30 00:22:54,215 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli [] - Found Yarn properties file under /tmp/.yarn-properties-alanchan. 2023-08-30 00:22:54,215 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli [] - Found Yarn properties file under /tmp/.yarn-properties-alanchan. No default environment specified. Searching for /usr/local/bigdata/flink-1.13.5/conf/sql-client-defaults.yaml...found. Reading default environment from: file:/usr/local/bigdata/flink-1.13.5/conf/sql-client-defaults.yaml Command history file path: /home/alanchan/.flink-sql-history -- 2、验证配置的catalog和database Flink SQL SHOW CURRENT CATALOG; ---------------------- | current catalog name | ---------------------- | alan_hivecatalog | ---------------------- 1 row in setFlink SQL SHOW CURRENT DATABASE; ------------------------- | current database name | ------------------------- | alan_hivecatalog_hivedb | ------------------------- 1 row in setFlink SQL SHOW TABLES; Empty set 3、验证kafka集群生产-消费功能 前提是kafka集群功能正常以下仅仅是简单的验证指定的主题test_kafka_hive收发消息功能。 1、kafka发送消息 [alanchanserver3 bin]$ kafka-topics.sh --create --bootstrap-server server1:9092 --topic test_kafka_hive --partitions 1 --replication-factor 1 Created topic test_kafka_hive. [alanchanserver3 bin]$ kafka-console-producer.sh --broker-list server1:9092 --topic test_kafka_hive hello alan hello alanchan good morning2、kafka接收消息 [alanchanserver3 bin]$ kafka-console-consumer.sh --bootstrap-server server1:9092 --topic test_kafka_hive --from-beginning hello alan hello alanchan good morning4、在Flink Cli创建kafka表 CREATE TABLE alanchan_kafka_table (id INT,name STRING,age BIGINT,t_insert_time TIMESTAMP(3) METADATA FROM timestamp,WATERMARK FOR t_insert_time as t_insert_time - INTERVAL 5 SECOND ) WITH (connector kafka,topic test_kafka_hive,scan.startup.mode earliest-offset,properties.bootstrap.servers 192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092,format csv );Flink SQL CREATE TABLE alanchan_kafka_table (id INT,name STRING,age BIGINT,t_insert_time TIMESTAMP(3) METADATA FROM timestamp,WATERMARK FOR t_insert_time as t_insert_time - INTERVAL 5 SECOND) WITH (connector kafka,topic test_kafka_hive,scan.startup.mode earliest-offset,properties.bootstrap.servers 192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092,format csv); [INFO] Execute statement succeed.Flink SQL show tables; ---------------------- | table name | ---------------------- | alanchan_kafka_table | | source_table | ---------------------- 2 rows in setFlink SQL desc alanchan_kafka_table; -------------------------------------------------------------------------------------------------------------------- | name | type | null | key | extras | watermark | -------------------------------------------------------------------------------------------------------------------- | id | INT | true | | | | | name | STRING | true | | | | | age | BIGINT | true | | | | | t_insert_time | TIMESTAMP(3) *ROWTIME* | true | | METADATA FROM timestamp | t_insert_time - INTERVAL 5 SECOND | -------------------------------------------------------------------------------------------------------------------- 4 rows in set0: jdbc:hive2://server4:10000 use alan_hivecatalog_hivedb; No rows affected (0.049 seconds) 0: jdbc:hive2://server4:10000 show tables; ----------------------- | tab_name | ----------------------- | alanchan_kafka_table | | source_table | ----------------------- 2 rows selected (0.041 seconds) 0: jdbc:hive2://server4:10000 describe formatted alanchan_kafka_table; --------------------------------------------------------------------------------------------------------------------------------------- | col_name | data_type | comment | --------------------------------------------------------------------------------------------------------------------------------------- | # col_name | data_type | comment | | | NULL | NULL | | # Detailed Table Information | NULL | NULL | | Database: | alan_hivecatalog_hivedb | NULL | | OwnerType: | USER | NULL | | Owner: | null | NULL | | CreateTime: | Wed Aug 30 08:28:11 CST 2023 | NULL | | LastAccessTime: | UNKNOWN | NULL | | Retention: | 0 | NULL | | Location: | hdfs://HadoopHAcluster/user/hive/warehouse/alan_hivecatalog_hivedb.db/alanchan_kafka_table | NULL | | Table Type: | MANAGED_TABLE | NULL | | Table Parameters: | NULL | NULL | | | flink.connector | kafka | | | flink.format | csv | | | flink.properties.bootstrap.servers | 192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092 | | | flink.scan.startup.mode | earliest-offset | | | flink.schema.0.data-type | INT | | | flink.schema.0.name | id | | | flink.schema.1.data-type | VARCHAR(2147483647) | | | flink.schema.1.name | name | | | flink.schema.2.data-type | BIGINT | | | flink.schema.2.name | age | | | flink.schema.3.data-type | TIMESTAMP(3) | | | flink.schema.3.metadata | timestamp | | | flink.schema.3.name | t_insert_time | | | flink.schema.3.virtual | false | | | flink.schema.watermark.0.rowtime | t_insert_time | | | flink.schema.watermark.0.strategy.data-type | TIMESTAMP(3) | | | flink.schema.watermark.0.strategy.expr | t_insert_time - INTERVAL 5 SECOND | | | flink.topic | test_kafka_hive | | | transient_lastDdlTime | 1693355291 | | | NULL | NULL | | # Storage Information | NULL | NULL | | SerDe Library: | org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe | NULL | | InputFormat: | org.apache.hadoop.mapred.TextInputFormat | NULL | | OutputFormat: | org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat | NULL | | Compressed: | No | NULL | | Num Buckets: | -1 | NULL | | Bucket Columns: | [] | NULL | | Sort Columns: | [] | NULL | | Storage Desc Params: | NULL | NULL | | | serialization.format | 1 | --------------------------------------------------------------------------------------------------------------------------------------- 42 rows selected (0.279 seconds) 5、通过kafka发送消息同时在flink中查询 1、kafka发送消息 [alanchanserver3 bin]$ kafka-topics.sh --create --bootstrap-server server1:9092 --topic test_kafka_hive --partitions 1 --replication-factor 1 WARNING: Due to limitations in metric names, topics with a period (.) or underscore (_) could collide. To avoid issues it is best to use either, but not both. Created topic test_kafka_hive. [alanchanserver3 bin]$ kafka-console-producer.sh --broker-list server1:9092 --topic test_kafka_hive 1,alan,15 2,alanchan,20 3,alanchanchn,25 4,alan_chan,30 5,alan_chan_chn,452、Flink sql cli查询数据 Flink SQL SET sql-client.execution.result-mode tableau; [INFO] Session property has been set.Flink SQL select * from alanchan_kafka_table;------------------------------------------------------------------------------------------------ | op | id | name | age | t_insert_time | ------------------------------------------------------------------------------------------------ | I | 1 | alan | 15 | 2023-08-30 09:29:33.993 | | I | 2 | alanchan | 20 | 2023-08-30 09:29:48.793 | | I | 3 | alanchanchn | 25 | 2023-08-30 09:29:54.795 | | I | 4 | alan_chan | 30 | 2023-08-30 09:30:01.480 | | I | 5 | alan_chan_chn | 45 | 2023-08-30 09:30:07.161 | 五、支持的数据类型 对于与 Hive 兼容的表HiveCatalog 需要将 Flink 数据类型映射到相应的 Hive 类型如下表所述 关于类型映射需要注意的事项 Hive CHAR § 类型的最大长度为255Hive VARCHAR§类型的最大长度为65535Hive MAP类型的key仅支持基本类型而Flink’s MAP 类型的key执行任意类型Hive不支持联合数据类型比如STRUCTHive TIMESTAMP 的精度是 9 Hive UDFs函数只能处理 precision 9的 TIMESTAMP 值Hive 不支持 Flink提供的 TIMESTAMP_WITH_TIME_ZONE, TIMESTAMP_WITH_LOCAL_TIME_ZONE, 及MULTISET类型Flink INTERVAL 类型与 Hive INTERVAL 类型不一样 六、Scala Shell 注意由于 Scala Shell 目前不支持 blink planner因此不建议在 Scala Shell 中使用 Hive 连接器。 以上以一个详细的示例介绍了Flink与hive的集成其中有很多的坑都解决了。
http://www.pierceye.com/news/624680/

相关文章:

  • 网站建设托管推广海报中文域名做的网站
  • 临沂专业网站建设公司哪家好网站建设的网页
  • 当牛做吗网站源代码分享百度云帝国怎么做网站
  • 简约网站欣赏做美食网站赚钱吗
  • 一叶子网站建设目标教育平台oss做视频网站
  • 购物网站开发流程图wordpress 批量注册
  • 如何做网站优化的内容google网站推广
  • 网站模版亮点北京电商网站开发费用
  • 南昌专业的企业网站建设公司wordpress源码在哪
  • 农家院做宣传应该在哪个网站营销代码查询
  • 大型企业网站设计案例晋江做网站的公司哪家好
  • 海外模板网站有哪些全国网页设计大赛
  • 网站设计常州注册公司没有地址怎么弄
  • 注销建设工程规划许可证在哪个网站wordpress+Apache升级
  • 视频网站如何做盗链青岛商城网站开发
  • 网站主色调googleapis wordpress
  • 作网站番禺区网络推广渠道
  • app开发网站排行app制作平台排行
  • 盐城网站建设找哪家好个人如何做短视频网站
  • 域名进行网站备案吗2023年重启核酸
  • 为什么几年前做的网站视频看不了wordpress图片标签
  • 做照片用的视频模板下载网站好网站源代码购买
  • 网站rss生成上海网页网络技术有限公司
  • 白山北京网站建设遂宁网站优化
  • 青岛网站建站公司银川网站建站公司
  • 做海报哪个网站的素材多成都私人放款联系方式电话
  • 黑河市网站建设公司广州好的网站建设
  • 番禺网站建设培训班做网站需要具备的基础条件
  • seo网站排名后退网站效果检测
  • 郑州做网站加密的公司免费logo设计生成器在线制作