怎么做优惠券网站,企业oa办公系统大概多少钱一套,东莞南城外贸网站建设,wordpress 页面内菜单原理#xff1a;
同步原理#xff1a;其实就是伪装成一个mysql 的从库会拉取主库的binlog日志读取数据#xff0c;相当于mysql 的主从复制。然而flink的数据处理方式是流处理#xff0c;实时收集清洗数据。相关联的checkpoint#xff0c;其实就是一个容错恢复快照#x…原理
同步原理其实就是伪装成一个mysql 的从库会拉取主库的binlog日志读取数据相当于mysql 的主从复制。然而flink的数据处理方式是流处理实时收集清洗数据。相关联的checkpoint其实就是一个容错恢复快照没执行后会保存一个当前处理数据的offset如果有job异常停止或者checkpoint失败那么下次checkpoint将从上次失败的地方继续处理数据。容错恢复的算法是异步屏障算法。
1.自定函数 利用flink-table的TableFunction表函数 flink-core包的Tuple函数Api实现代码
public class ASI_UDTF extends TableFunctionTuple1String[] {public void eval(String str1) {if (Strings.isNullOrEmpty(str1)) {collect(null);} else {String[] split1 str1.split(,);Tuple1String[] of1 Tuple1.of(split1);collect(of1);}}
} 在flink控制台里把自定义函数的jar包生成一个对应的函数 注意
函数的入参判空用lateral table派生表关联时也要注意如果是null值的情况所以要外连接例如
LEFT JOIN lateral table (trans_to_array(gss.goods_specification_values)) as F(gss_array_values) ON TRUE 2.oracle-cdc同步到mysql-jdbc的场景 oracle-cdc同步到mysql-jdbc需要驱动jar三个包 flink-connector-jdbc-3.0.0-1.16.jar flink-sql-connector-oracle-cdc-2.3.0.jar mysql-connector-java-5.1.49.jar使用flinksql的方式
oracle-cdccreate source table的参考格式
Flink SQL CREATE TABLE products (ID INT NOT NULL,NAME STRING,DESCRIPTION STRING,WEIGHT DECIMAL(10, 3),PRIMARY KEY(id) NOT ENFORCED) WITH (connector oracle-cdc,hostname localhost,port 1521,username flinkuser,password flinkpw,database-name XE,schema-name inventory,table-name products);mysql-jdbccreate sink table参考格式
Flink SQLCREATE TABLE vehicle_info (id BIGINT,company_name STRING,PRIMARY KEY (id) NOT ENFORCED
) WITH (connector jdbc,url jdbc:mysql://192.168.0.33:3306/mayi_user,table-name vehicle_info,username mayi_admin,password 1q2w3e4r
);
相关使用链接https://ververica.github.io/flink-cdc-connectors/master/content/connectors/oracle-cdc.html