当前位置: 首页 > news >正文

广告vi设计是什么公司网站seo公司

广告vi设计是什么,公司网站seo公司,文山网站建设代理,平台网站 备案吗前言#xff1a;原本想讲如何基于Flink实现定制化计算引擎的开发#xff0c;并以FlinkCDC为例介绍#xff1b;发现这两个在表达上不知以谁为主#xff0c;所以先分析FlinkCDC的应用场景和技术实现原理#xff0c;下一篇再去分析Flink能在哪些方面#xff0c;做定制化计算…前言原本想讲如何基于Flink实现定制化计算引擎的开发并以FlinkCDC为例介绍发现这两个在表达上不知以谁为主所以先分析FlinkCDC的应用场景和技术实现原理下一篇再去分析Flink能在哪些方面做定制化计算引擎的开发操作。本文将从FlinkCDC应用场景开始然后讲述其基于Flink的实现原理和代码应用为下一篇介绍基于Flink开发定制化引擎做铺垫。 一、FlinkCDC应用场景 经常有同事或朋友问Flink和FlinkCDC有什么区别 Flink是一个流数据处理计算框架FlinkCDC是数据采集工具 Flink应用场景对比的是Storm、Spark FlinkCDC应用场景对比的是Sqoop、Canal、Maxwell和KafkaConnectSource、Debezium等 FlinkCDC是Flink社区伙伴对数据采集需求开发的一个SDK工具让Flink在数据捕捉场景使用起来更方便一些。 1.1 CDC的应用场景分析 CDC的英文名是Change Data Capture (变化数据获取)解决的应用场景是对存储中间件中数据的采集比如Mysql、Orcle、PGSql、MongoDB等中间件 采集的方式分为基于查询和基于BinLog两种 以mysql的数据采集为例可以通过jdbc批次查询也可以通过Binlog解析增量数据采集 两者的一些特性对比如下 基于查询的CDC直接获取数据基于binlog的采集需要开启binlog服务。 1.2 FlinkCDC的应用分析 FlinkCDC和Canal实现的应用场景需求是差不多的都是通过binlog采集增量数据 但是可用性上的不同是   对于cannal类似的采集服务需要三步: 1.开启mysql的binlog2.将数据写到kafka3.用flink订阅kafka中的数据进行业务需求处理 对于FlinkCDC只需要在binlog开启后直接在一个Flink任务内做业务处理可以写到kafka处理也行 所有如cannal的采集功能服务都需要单独维护一套服务增加了运维负担FlinkCDC可以当作任务部署到集群大幅减轻了数据采集的应用难度 用一个任务就完成这个应用功能 二、FlinkCDC技术分析与本地操作 2.1 FlinkCDC的技术架构分析 与Canal这些提供服务能力的服务不同FlinkCdc只是一个任务可以简单的开发和部署。 Flink是借用了Debezium的功能Debezium是一个可轻量级嵌入代码逻辑的服务将Debezium的采集功能用Flink的sourceFunction包装然后打包成SDK提供给Flink开发使用 借用Flink自己的算子和sink能力可以将采集到的数据以Flink的特性加工数据并将数据写入Flink内置的connect组件sink到服务里如Kafka、Pulser、ES、RabbitMQ、MongoDB等。 2.2 本地操作 2.2.1准备mysql数据库表和数据 use flink_test; #检测binlog是否开启 show variables like %log_bin%#构建测试表 CREATE TABLE event_info (id int NOT NULL AUTO_INCREMENT,name varchar(255) NOT NULL,category varchar(512) DEFAULT NULL,pv int DEFAULT 0,uv int DEFAULT 0,PRIMARY KEY (id) ) ENGINEInnoDB DEFAULT CHARSETutf8mb4 COLLATEutf8mb4_0900_ai_ci;#写入数据 insert into event_info(id,name,category,pv,uv) values (1,aaa,nfh,20,8), (2,bbb,dfgf,30,2), (3,ccc,fsd,40,4), (4,ddd,afs,50,7), (5,eee,asfa,60,3) (6,aaa,nfh,20,8), (7,bbb,dfgf,30,2), (8,ccc,fsd,40,4), (9,ddd,afs,50,7), (10,eee,asfa,60,3); 2.2.2 pom文件 注意Flink和FlinkCDC的版本映射很多显示的可以关联的版本之间是冲突的这是一个很繁琐的工作我调试各个版本之间的映射花了一天左右的时间[求赞求收藏] 下面这是Flink1.14.5版本和FlinkCDC2.2.1版本已经调好的依赖 ?xml version1.0 encodingUTF-8? project xmlnshttp://maven.apache.org/POM/4.0.0xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersiongroupIdorg.example/groupIdartifactIdchangedateDoris/artifactIdversion1.0-SNAPSHOT/versionpropertiesproject.build.sourceEncodingUTF-8/project.build.sourceEncodingscala.version2.12/scala.versionjava.version1.8/java.versionflink.version1.14.5/flink.versionfastjson.version1.2.62/fastjson.versionhadoop.version2.8.3/hadoop.versionscope.modecompile/scope.modeslf4j.version1.7.30/slf4j.version/propertiesdependencies!-- springboot 依赖--!-- flink --dependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-planner_${scala.version}/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-streaming-scala_${scala.version}/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-clients_${scala.version}/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-jdbc_${scala.version}/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-kafka_${scala.version}/artifactIdversion${flink.version}/version/dependencydependencygroupIdcom.alibaba/groupIdartifactIdfastjson/artifactIdversion${fastjson.version}/version/dependency!-- Add log dependencies when debugging locally --dependencygroupIdorg.slf4j/groupIdartifactIdslf4j-api/artifactIdversion${slf4j.version}/version/dependencydependencygroupIdorg.slf4j/groupIdartifactIdslf4j-log4j12/artifactIdversion${slf4j.version}/version/dependency!-- mysql-connector --dependencygroupIdmysql/groupIdartifactIdmysql-connector-java/artifactIdversion8.0.12/version/dependencydependencygroupIdcom.ververica/groupIdartifactIdflink-sql-connector-mysql-cdc/artifactIdversion2.2.1 /versionexclusionsexclusionartifactIdflink-shaded-guava/artifactIdgroupIdorg.apache.flink/groupId/exclusion/exclusions/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-runtime-web_${scala.version}/artifactIdversion${flink.version}/version/dependency/dependenciesbuild !-- filters-- !-- filter${project.basedir}/src/main/resources/env/application-${profileActive}.properties/filter-- !-- /filters--pluginsplugingroupIdorg.apache.maven.plugins/groupIdartifactIdmaven-shade-plugin/artifactIdversion2.1/versionexecutionsexecutionphasepackage/phasegoalsgoalshade/goal/goalsconfigurationfiltersfilterartifact*:*/artifactexcludesexcludeMETA-INF/*.SF/excludeexcludeMETA-INF/*.DSA/excludeexcludeMETA-INF/*.RSA/exclude/excludes/filter/filtersartifactSetincludesinclude*:*/include/includesexcludesexcludeorg.slf4j:slf4j-api:jar:/exclude/excludes/artifactSettransformerstransformerimplementationorg.apache.maven.plugins.shade.resource.AppendingTransformerresourceMETA-INF/spring.handlers/resource/transformertransformerimplementationorg.apache.maven.plugins.shade.resource.AppendingTransformerresourceMETA-INF/spring.schemas/resource/transformertransformerimplementationorg.apache.maven.plugins.shade.resource.AppendingTransformerresourceMETA-INF/spring.factories/resource/transformer/transformers/configuration/execution/executions/pluginpluginartifactIdmaven-resources-plugin/artifactIdconfigurationencodingutf-8/encodinguseDefaultDelimiterstrue/useDefaultDelimitersdelimitersdelimiter$[*]/delimiter/delimiters/configuration/pluginplugingroupIdorg.apache.maven.plugins/groupIdartifactIdmaven-assembly-plugin/artifactIdexecutionsexecutionphasepackage/phasegoalsgoalsingle/goal/goalsconfigurationdescriptorRefsdescriptorRefjar-with-dependencies/descriptorRef/descriptorRefs/configuration/execution/executions/pluginplugingroupIdorg.apache.maven.plugins/groupIdartifactIdmaven-compiler-plugin/artifactIdconfigurationsource8/sourcetarget8/target/configuration/plugin/plugins/build /project 2.2.3 java代码 package yto.com.net.demo;import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.ververica.cdc.connectors.mysql.source.MySqlSource; import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.RestOptions; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.util.Collector; import org.slf4j.Logger; import org.slf4j.LoggerFactory;public class mysqlBinlogRead {private static final Logger log LoggerFactory.getLogger(mysqlBinlogRead.class);public static void main(String[] args) throws Exception {MySqlSourceString mySqlSource MySqlSource.Stringbuilder().hostname(ip).port(3306).databaseList(flink_test).tableList(flink_test.event_info).username(mysqlUser).password(password).deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String // .startupOptions(StartupOptions.earliest()).build();Configuration configuration new Configuration();configuration.setInteger(RestOptions.PORT, 8083);StreamExecutionEnvironment env StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration); // StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// enable checkpointenv.enableCheckpointing(10000);DataStreamSourceString cdcSource env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), MySQL CDC Source);SingleOutputStreamOperatorString process cdcSource.process(new ProcessFunctionString, String() {Overridepublic void processElement(String row, ProcessFunctionString, String.Context context, CollectorString collector) throws Exception {JSONObject rowJson JSON.parseObject(row);String op rowJson.getString(op);JSONObject source rowJson.getJSONObject(source);String table source.getString(table);}});cdcSource.print(message:);env.execute(flinkCdc Read message);}}2.2.4 运行结果 如图所示已经写入库的结果通过connect获取增量数据通过binlog获取 注意表中的历史数据过多全量读取的时候将会内存溢出。
http://www.pierceye.com/news/665757/

相关文章:

  • 大连专业企业建站找哪家wordpress 保护wp-login.php
  • 微网站建设哪家便宜想要找个网站做环评公示
  • 建设银行网银网站激活个人简历模板电子版可填写
  • 肃州区建设局网站宁夏百度seo
  • 关于做电影的网站设计西安网站建设制作 熊掌号
  • idc网站建设怎么自己做一个网页链接
  • 网站开发安全模块方案个人网站搭建软件
  • 重庆建设招标造价信息网站个人网站建设与维护
  • 网站备案用户名忘了怎么办嘉兴做外贸网站的公司
  • 1688网站的特点网站制作器手机版下载
  • 兖州网站开发做一个中英文网站多少钱
  • wordpress怎么做网盘站好看的页面图片
  • 建设网站深圳罗湖安徽合肥做网站
  • 一级a做爰片免费网站下载网站快慢由什么决定
  • 网页设计与网站建设 郑州大学网络购物网站备案
  • 美观网站建设哪家好优化大师最新版下载
  • 外贸品牌网站制作wordpress 微信主题
  • 旅游网站开发需求分析网站的根目录的路径
  • easyUI网站开发docker wordpress mysql
  • dede手机网站模板下载黄冈做网站
  • 诸城网站建设葛小燕现在搜索引擎哪个比百度好用
  • 嘉兴做微网站多少钱注册网页需要多少钱
  • 论坛类网站设计大型网站系统解决方案
  • 网站建设中页面设计广告策划书籍
  • 云南省建设工程投标中心网站网页的制作步骤是什么
  • 保定网站设计概述更换动易网站模板的方法
  • 新手如何注册网站域名做 理财网站有哪些
  • 南宁快速建站模板企业网站的开发与应用
  • 网站运营适合什么样的人做企业宣传及介绍ppt
  • 怎么样网站开源小升初在线做试卷的网站