当前位置: 首页 > news >正文

网站建设中添加图片链接cad线下培训班

网站建设中添加图片链接,cad线下培训班,磁县信息港,wordpress widget 模板01 Changelog相关优化规则 0101 运行upsert-kafka作业 登录sql-client#xff0c;创建一个upsert-kafka的sql作业#xff08;注意#xff0c;这里发送给kafka的消息必须带key#xff0c;普通只有value的消息无法解析#xff0c;这里的key即是主键的值#xff09; CREA…01 Changelog相关优化规则 0101 运行upsert-kafka作业 登录sql-client创建一个upsert-kafka的sql作业注意这里发送给kafka的消息必须带key普通只有value的消息无法解析这里的key即是主键的值 CREATE TABLE pageviews_per_region (user_region STRING,pv STRING,PRIMARY KEY (user_region) NOT ENFORCED -- 设置主键 ) WITH (connector upsert-kafka,topic pageviews_per_region,properties.bootstrap.servers xxxxxx:9092,key.format csv,value.format csv );select * from pageviews_per_region;发送消息带key和消费消息显示key方式如下 kafka-console-producer.sh --broker-list xxxxxx:9092 --topic pageviews_per_region --property parse.keytrue --property key.separator: key1:value1,value1 key2:value2,value2kafka-console-consumer.sh --bootstrap-server xxxxxx:9092 --topic pageviews_per_region --from-beginning --property print.keytrue作业的DAG图如下 0102 StreamPhysicalChangelogNormalize DAG图中有一个ChangelogNormalize代码中搜索到对应的类是StreamPhysicalChangelogNormalize这是一个对changelog数据做规范化的类注释如下 /*** Stream physical RelNode which normalizes a changelog stream which maybe an upsert stream or a* changelog stream containing duplicate events. This node normalize such stream into a regular* changelog stream that contains INSERT/UPDATE_BEFORE/UPDATE_AFTER/DELETE records without* duplication.*/ class StreamPhysicalChangelogNormalize(功能就是转成对应的exec节点 override def translateToExecNode(): ExecNode[_] {val generateUpdateBefore ChangelogPlanUtils.generateUpdateBefore(this)new StreamExecChangelogNormalize(unwrapTableConfig(this),uniqueKeys,generateUpdateBefore,InputProperty.DEFAULT,FlinkTypeFactory.toLogicalRowType(getRowType),getRelDetailedDescription) }0103 StreamPhysicalTableSourceScanRule StreamPhysicalChangelogNormalize是在优化规则StreamPhysicalTableSourceScanRule当中创建的如下流式的FlinkLogicalTableSourceScan会应用该规则 class StreamPhysicalTableSourceScanRuleextends ConverterRule(classOf[FlinkLogicalTableSourceScan],FlinkConventions.LOGICAL,FlinkConventions.STREAM_PHYSICAL,StreamPhysicalTableSourceScanRule) {创建StreamPhysicalChangelogNormalize也就是转为changelog的条件如下 if (isUpsertSource(resolvedSchema, table.tableSource) ||isSourceChangeEventsDuplicate(resolvedSchema, table.tableSource, config) ) {isUpsertSource判断是否为upsert流判断逻辑如下 public static boolean isUpsertSource(ResolvedSchema resolvedSchema, DynamicTableSource tableSource) {if (!(tableSource instanceof ScanTableSource)) {return false;}ChangelogMode mode ((ScanTableSource) tableSource).getChangelogMode();boolean isUpsertMode mode.contains(RowKind.UPDATE_AFTER) !mode.contains(RowKind.UPDATE_BEFORE);boolean hasPrimaryKey resolvedSchema.getPrimaryKey().isPresent();return isUpsertMode hasPrimaryKey; }其中ChangelogMode在各自数据源实现类的getChangelogMode接口中定义如JDBC只支持insert Override public ChangelogMode getChangelogMode() {return ChangelogMode.insertOnly(); }isSourceChangeEventsDuplicate判断不是upsert的更改流判断逻辑如下 public static boolean isSourceChangeEventsDuplicate(ResolvedSchema resolvedSchema,DynamicTableSource tableSource,TableConfig tableConfig) {if (!(tableSource instanceof ScanTableSource)) {return false;}ChangelogMode mode ((ScanTableSource) tableSource).getChangelogMode();boolean isCDCSource !mode.containsOnly(RowKind.INSERT) !isUpsertSource(resolvedSchema, tableSource);boolean changeEventsDuplicate tableConfig.get(ExecutionConfigOptions.TABLE_EXEC_SOURCE_CDC_EVENTS_DUPLICATE);boolean hasPrimaryKey resolvedSchema.getPrimaryKey().isPresent();return isCDCSource changeEventsDuplicate hasPrimaryKey; }综合来说要走StreamPhysicalChangelogNormalize这一条调用链就不能是insertOnly的数据源但目前大部分Flink实现的数据源包括Iceberg都是insertOnly的 0104 更新模式 Flink相关的更新模式类有如下几个RowKind、ChangelogMode、UpdateKind RowKind RowKind是定义更新流每条数据的类型其中对于更新有两条数据一条删除旧数据一条插入新数据 /** Insertion operation. */ INSERT(I, (byte) 0),/*** Update operation with the previous content of the updated row.** pThis kind SHOULD occur together with {link #UPDATE_AFTER} for modelling an update that* needs to retract the previous row first. It is useful in cases of a non-idempotent update,* i.e., an update of a row that is not uniquely identifiable by a key.*/ UPDATE_BEFORE(-U, (byte) 1),/*** Update operation with new content of the updated row.** pThis kind CAN occur together with {link #UPDATE_BEFORE} for modelling an update that* needs to retract the previous row first. OR it describes an idempotent update, i.e., an* update of a row that is uniquely identifiable by a key.*/ UPDATE_AFTER(U, (byte) 2),/** Deletion operation. */ DELETE(-D, (byte) 3);ChangelogMode ChangelogMode定义数据源的更新模式主要三种就是包含不同的RowKind的类型 private static final ChangelogMode INSERT_ONLY ChangelogMode.newBuilder().addContainedKind(RowKind.INSERT).build();private static final ChangelogMode UPSERT ChangelogMode.newBuilder().addContainedKind(RowKind.INSERT).addContainedKind(RowKind.UPDATE_AFTER).addContainedKind(RowKind.DELETE).build();private static final ChangelogMode ALL ChangelogMode.newBuilder().addContainedKind(RowKind.INSERT).addContainedKind(RowKind.UPDATE_BEFORE).addContainedKind(RowKind.UPDATE_AFTER).addContainedKind(RowKind.DELETE).build();UpdateKind UpdateKind是针对update这种更新类型细分 /** NONE doesnt represent any kind of update operation. */ NONE,/*** This kind indicates that operators should emit update changes just as a row of {code* RowKind#UPDATE_AFTER}.*/ ONLY_UPDATE_AFTER,/*** This kind indicates that operators should emit update changes in the way that a row of {code* RowKind#UPDATE_BEFORE} and a row of {code RowKind#UPDATE_AFTER} together.*/ BEFORE_AND_AFTER02 StreamExecChangelogNormalize StreamExecChangelogNormalize的处理流程中根据是否启用table.exec.mini-batch.enabled分为微批处理和单数据的流处理 微批处理使用ProcTimeMiniBatchDeduplicateKeepLastRowFunction流式使用ProcTimeDeduplicateKeepLastRowFunction两者的核心差别就是微批会缓存数据使用一个for循环处理 这两个函数除了StreamPhysicalChangelogNormalize这一条链路外还有StreamExecDeduplicate这一条链路对应StreamPhysicalRankRule是一个排序的东西 for (Map.EntryRowData, RowData entry : buffer.entrySet()) {RowData currentKey entry.getKey();RowData currentRow entry.getValue();ctx.setCurrentKey(currentKey);if (inputInsertOnly) {processLastRowOnProcTime(currentRow,generateUpdateBefore,generateInsert,state,out,isStateTtlEnabled,equaliser);} else {processLastRowOnChangelog(currentRow, generateUpdateBefore, state, out, isStateTtlEnabled, equaliser);} }processLastRowOnProcTime 对数据按照时间语义进行去重将当前数据作为最新这个函数只针对insert only的数据 static void checkInsertOnly(RowData currentRow) {Preconditions.checkArgument(currentRow.getRowKind() RowKind.INSERT); }整套处理逻辑就是对数据根据场景修改数据的RowKind类型 } else {if (generateUpdateBefore) {preRow.setRowKind(RowKind.UPDATE_BEFORE);out.collect(preRow);}currentRow.setRowKind(RowKind.UPDATE_AFTER);out.collect(currentRow); }processLastRowOnChangelog 这个函数就是按Key去重本质上也是针对数据修改RowKind 核心的一块功能就是更新的时候要将前一个数据修改为UPDATE_BEFORE } else {if (generateUpdateBefore) {preRow.setRowKind(RowKind.UPDATE_BEFORE);out.collect(preRow);}currentRow.setRowKind(RowKind.UPDATE_AFTER);out.collect(currentRow); }函数整体借用的是Flink的state功能从状态中获取前面的数据所以对状态缓存由要求另外针对非删除型的数据如果TTL没有开的话就不会更新前面的数据 if (!isStateTtlEnabled equaliser.equals(preRow, currentRow)) {// currentRow is the same as preRow and state cleaning is not enabled.// We do not emit retraction and update message.// If state cleaning is enabled, we have to emit messages to prevent too early// state eviction of downstream operators.return; }03 初始RowKind来源 前面的流程里在进行changelog转换的时候数据是已经存在一个RowKind的值了这一章追踪初始RowKind的来源 基于Flink-27的设计Kafka数据源处理任务有一个KafkaRecordEmitteremitRecord当中做数据的反序列化 deserializationSchema.deserialize(consumerRecord, sourceOutputWrapper);最终走到DeserializationSchema.deserialize完成最终的反序列化 default void deserialize(byte[] message, CollectorT out) throws IOException {T deserialize deserialize(message);if (deserialize ! null) {out.collect(deserialize);} }这里message是一个二进制数组实际是Kafka的数据类型ConsumerRecord。根据SQL当中的配置value反序列化使用的是csv所以走到CsvRowDataDeserializationSchema当中处理 final JsonNode root objectReader.readValue(message); return (RowData) runtimeConverter.convert(root);这里读出来的root是数据的keyconvert的转化的实现类是CsvToRowDataConverters其createRowConverter接口当中创建了转化函数函数中将数据转化为了Flink的数据类型GenericRowData GenericRowData row new GenericRowData(arity);GenericRowData的定义当中有初始化RowKind就是insert public GenericRowData(int arity) {this.fields new Object[arity];this.kind RowKind.INSERT; // INSERT as default }04 补充 0401 delete 按照官方说法发送一个空消息就会产生delete Also, null values are interpreted in a special way: a record with a null value represents a “DELETE”.使用kafka producer控制台发送空消息无法解析 [ERROR] Could not execute SQL statement. Reason: org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.MismatchedInputException: No content to map due to end-of-inputat [Source: UNKNOWN; byte offset: #UNKNOWN]官方说法是kafka的控制台版本对 null的支持问题需要3.2以上版本 https://issues.apache.org/jira/browse/FLINK-27663?jqlproject%20%3D%20FLINK%20AND%20text%20~%20%22upsert-kafka%22 空值处理逻辑在DynamicKafkaDeserializationSchema.deserialize当中 这里根据输入的数据是否空值进行分支处理非空值时走的就是前三章的逻辑也就是这里是前三章逻辑的入口 if (record.value() null upsertMode) {// collect tombstone messages in upsert mode by handoutputCollector.collect(null); } else {valueDeserialization.deserialize(record.value(), outputCollector); }空值时走到OutputProjectionCollector.emitRow这里会设置初始类型为DELETE if (physicalValueRow null) {if (upsertMode) {rowKind RowKind.DELETE;} else {throw new DeserializationException(Invalid null value received in non-upsert mode. Could not to set row kind for output record.);} } else {rowKind physicalValueRow.getRowKind(); }
http://www.pierceye.com/news/834233/

相关文章:

  • 网站网上商城建设网站上线详细步骤
  • 有那些网站可以做推广长沙公司排名
  • 怎样申请做p2p融资网站北京做网站一般多少钱
  • 建筑公司网站作用ASP.NET实用网站开发答案
  • 网站建设如何获取客户东莞网络营销销售
  • 郑州平台类网站自建房平面图设计软件
  • 昆明网站运营公司有哪些网页版微信登录二维码
  • 中国最权威的网站排名网上商店系统设计与开发
  • 自己做装修网站需要多少钱太仓建设网站
  • 湘潭做网站电话磐石网络龙岩kk社区
  • 重庆孝爱之家网站建设哪里可以做期货网站平台
  • 建设网站价位视频网站开发防止盗链
  • 制作网站的难度如何提网站建设需求
  • 做网上竞彩网站合法吗找工作用什么平台最好
  • 石家庄模板网站建网站要钱吗 优帮云
  • wap 网站 源码制作手游需要学什么软件
  • 自己做网站怎样挣钱个人网站模板 php
  • 新加坡建设局网站网站建设资料清单
  • 做网站用什么语言制作最安全?网站设计酷站
  • 河南省做网站的公司个人网站可以做电商吗
  • 专门做家教的网站网站开发大学
  • 资源专业网站优化排名wordpress 调用 置顶
  • 网站的建设维护网站换空间有影响吗
  • 兰州网站建设公南昌做网站的
  • 网站菜单样式襄樊公司网站建设
  • 学校网站建设平台wordpress 4.9.2
  • 开o2o网站需要什么手续企业微信开放平台
  • 网站开发 外文文献移动网站制作价格
  • 如何做网站的版块规划舆情监测
  • 怎么给公司注册网站二级域名的网站备案