当前位置: 首页 > news >正文

网站采集被降权免费ppt资源网站

网站采集被降权,免费ppt资源网站,郑州网站制作网,外贸云一、Flink与其他组件的协同 Flink 是一个分布式、高性能、始终可用、准确一次#xff08;Exactly-Once#xff09;语义的流处理引擎#xff0c;广泛应用于大数据实时处理场景中。它与 Hadoop 生态系统中的组件可以深度集成#xff0c;形成完整的大数据处理链路。下面我们从…一、Flink与其他组件的协同 Flink 是一个分布式、高性能、始终可用、准确一次Exactly-Once语义的流处理引擎广泛应用于大数据实时处理场景中。它与 Hadoop 生态系统中的组件可以深度集成形成完整的大数据处理链路。下面我们从 Flink 的 核心架构 出发结合与 Hadoop 组件协同方式详细剖析 Flink 的作用。 1. Flink 核心架构详解 1架构组件图概览 ------------------------- | Client | -------------------------|v ------------------------- | JobManager (JM) | -- Master 负责调度 -------------------------|v ------------------------- | TaskManagers (TM) | -- Worker 执行算子任务 -------------------------|v ------------------------- | Slot | -- 执行资源单位 -------------------------2核心组件职责 组件描述Client提交作业到 Flink 集群触发作业执行。JobManager (JM)管理作业生命周期负责调度任务、故障恢复、协调检查点Checkpoint等。TaskManager (TM)具体执行作业的物理任务算子负责数据交换、状态管理等。SlotTaskManager 内部的资源单位用于任务部署。每个 TaskManager 有多个 Slot。 3状态管理与容错 Checkpoint/Savepoint可恢复一致性状态Exactly Once State Backend保存状态如 RocksDB、FsStateBackend Recovery通过重放 Checkpoint 恢复任务 2. Flink 与 Hadoop 各组件的协同关系 Flink 虽然是独立系统但能与 Hadoop 生态的多个关键组件协同工作构建完整的大数据平台。 1与 HDFSHadoop Distributed File System 协同方式描述输入源Flink 可直接读取 HDFS 中的批量数据如 ORC、Parquet、Text 等格式状态后端Flink Checkpoint/Savepoint 可存储到 HDFS 上保证高可用与容灾输出目标Flink 作业可以将计算结果输出到 HDFS作为后续离线处理的数据 fs.defaultFS: hdfs://namenode:8020 state.backend: filesystem state.checkpoints.dir: hdfs://namenode:8020/flink/checkpoints/2与 Hive 协同方式描述读取表数据Flink 可通过 Hive Catalog 与 Hive 元数据打通直接读取 Hive 表写入表Flink SQL 可将流式数据写入 Hive使用 INSERT INTO统一元数据Flink Hive Catalog 支持表结构共享便于湖仓一体实践 CREATE CATALOG my_hive WITH (type hive,hive-conf-dir /etc/hive/conf );3与 Kafka实时采集 协同方式描述实时数据源Flink 通过 Kafka Source 接收实时数据流如日志、订单等下游结果写入Flink 可将流式计算结果写入 Kafka供下游消费Exactly Once 语义Flink Kafka Checkpoint 可实现端到端的精确一次语义 FlinkKafkaConsumerString consumer new FlinkKafkaConsumer(topic, new SimpleStringSchema(), properties); consumer.setStartFromGroupOffsets(); consumer.setCommitOffsetsOnCheckpoints(true);4与 HBase实时查询 协同方式描述维表关联Flink 可使用 HBase 作为维表进行流批 Join实时补充维度数据实时写入计算结果可实时写入 HBase支持下游查询系统使用如用户画像等 tableEnv.executeSql(CREATE TABLE hbase_dim (...) WITH (connector hbase-2.2, ...));5与 YARN 协同方式描述资源调度Flink 可部署在 YARN 上利用 Hadoop 的资源调度管理能力Session / Per-Job 模式支持多租户资源隔离或每个作业独立资源隔离部署 flink run -m yarn-cluster -ynm my-flink-job myjob.jar6与 Zookeeper 协同方式描述高可用 JobManager使用 Zookeeper 实现 JobManager 的 leader electionCheckpoint HA 元数据存储配合 HDFS 存储 Checkpoint 元数据路径 high-availability: zookeeper high-availability.zookeeper.quorum: zk1:2181,zk2:2181,zk3:2181 high-availability.storageDir: hdfs://namenode:8020/flink/ha/3. Flink 的作用总结 模块Flink 的角色实时数据处理核心组件进行低延迟、高吞吐流处理计算数据清洗与 ETL提供强大 SQL / DataStream API 进行多源数据处理与聚合实时指标计算支持实时 KPI、UV/PV、订单流等分析数据湖构建可作为流式数据入湖的计算引擎结合 Hudi/Iceberg实时监控预警搭配 Kafka Prometheus构建告警与监控系统实时数仓建设联合 Kafka Hive HDFS HBase 构建流批一体数仓体系 4. Flink 架构在 Hadoop 平台的实际部署图 -------------| Flume/Nginx|------------|Kafka集群|---------------------------------------| |---v--- ----v----| Flink |-- 清洗 → 维表 Join → 计算 | Spark |------ --------| | -------v--------- --------v-------- | HBase/Redis | | HDFS / Hive | ----------------- -----------------二、Flink DataStream API的使用 现在以 Flink DataStream API 为核心深入剖析一个真实生产场景的 从 Kafka 到 Kafka 的流式处理全流程包括 项目结构与依赖 数据模型与清洗 水位线与乱序处理 异步维表查询HBase/MySQL/Redis 窗口聚合逻辑 数据下发Kafka Sink 容错机制与 Checkpoint 配置 1. 项目结构与依赖 1Maven 依赖pom.xml dependenciesdependencygroupIdorg.apache.flink/groupIdartifactIdflink-java/artifactIdversion1.17.1/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-streaming-java/artifactIdversion1.17.1/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-kafka/artifactIdversion3.0.1-1.17/version/dependencydependencygroupIdcom.fasterxml.jackson.core/groupIdartifactIdjackson-databind/artifactIdversion2.14.2/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-hbase-2.2/artifactIdversion1.17.1/version/dependency /dependencies2. 数据模型定义 1订单数据结构OrderEvent public class OrderEvent {public String orderId;public String userId;public String productId;public double price;public int quantity;public long orderTime; // epoch millis }2 商品维度ProductInfo public class ProductInfo {public String productId;public String categoryId;public String productName; }3聚合结果结构OrderStat public class OrderStat {public String categoryId;public long windowStart;public long windowEnd;public double totalAmount; }3. Kafka Source JSON 反序列化 StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(60000, CheckpointingMode.EXACTLY_ONCE);KafkaSourceString kafkaSource KafkaSource.Stringbuilder().setBootstrapServers(kafka:9092).setTopics(order_events).setGroupId(flink-consumer).setStartingOffsets(OffsetsInitializer.latest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamOrderEvent orderStream env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), KafkaSource).map(json - new ObjectMapper().readValue(json, OrderEvent.class)).returns(OrderEvent.class);4. 水位线处理乱序数据支持 WatermarkStrategyOrderEvent watermarkStrategy WatermarkStrategy.OrderEventforBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner((event, ts) - event.orderTime);DataStreamOrderEvent orderStreamWithWM orderStream.assignTimestampsAndWatermarks(watermarkStrategy);5. 异步维表关联以 HBase 为例 使用 AsyncFunction 实现异步查询支持 Redis/HBase/MySQL 示例实现AsyncProductEnrichmentFunction public class AsyncProductEnrichmentFunction extends RichAsyncFunctionOrderEvent, Tuple2OrderEvent, ProductInfo {private transient HBaseClient hBaseClient;Overridepublic void open(Configuration parameters) throws Exception {hBaseClient new HBaseClient(hbase.zookeeper.quorum);}Overridepublic void asyncInvoke(OrderEvent input, ResultFutureTuple2OrderEvent, ProductInfo resultFuture) {CompletableFuture.supplyAsync(() - hBaseClient.queryProductInfo(input.productId)).thenAccept(productInfo - resultFuture.complete(Collections.singletonList(Tuple2.of(input, productInfo))));}Overridepublic void close() throws Exception {hBaseClient.close();} }应用异步函数 DataStreamTuple2OrderEvent, ProductInfo enrichedStream AsyncDataStream.unorderedWait(orderStreamWithWM,new AsyncProductEnrichmentFunction(),5, TimeUnit.SECONDS, 100 );6. 按类目 ID 滚动窗口聚合 DataStreamOrderStat resultStream enrichedStream.map(tuple - new Tuple3(tuple.f1.categoryId, tuple.f0.orderTime, tuple.f0.price * tuple.f0.quantity)).returns(Types.TUPLE(Types.STRING, Types.LONG, Types.DOUBLE)).assignTimestampsAndWatermarks(WatermarkStrategy.Tuple3String, Long, DoubleforBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner((t, ts) - t.f1)).keyBy(t - t.f0).window(TumblingEventTimeWindows.of(Time.minutes(1))).aggregate(new AggregateFunctionTuple3String, Long, Double, Double, OrderStat() {private long windowStart, windowEnd;private String categoryId;public Double createAccumulator() { return 0.0; }public Double add(Tuple3String, Long, Double value, Double acc) {categoryId value.f0;return acc value.f2;}public OrderStat getResult(Double acc) {return new OrderStat(categoryId, windowStart, windowEnd, acc);}public Double merge(Double acc1, Double acc2) {return acc1 acc2;}}, new ProcessWindowFunctionOrderStat, OrderStat, String, TimeWindow() {Overridepublic void process(String key, Context context, IterableOrderStat elements, CollectorOrderStat out) {OrderStat stat elements.iterator().next();stat.windowStart context.window().getStart();stat.windowEnd context.window().getEnd();out.collect(stat);}});7. 写入 Kafka Sink KafkaSinkOrderStat kafkaSink KafkaSink.OrderStatbuilder().setBootstrapServers(kafka:9092).setRecordSerializer(KafkaRecordSerializationSchema.builder().setTopic(order_stats).setValueSerializationSchema(stat - {ObjectMapper mapper new ObjectMapper();return mapper.writeValueAsBytes(stat);}).build()).setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE).build();resultStream.sinkTo(kafkaSink);8.  容错与 HA 配置关键 1Checkpoint 配置 env.enableCheckpointing(60000); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000); env.setStateBackend(new RocksDBStateBackend(hdfs://namenode/flink/checkpoints));2高可用配置flink-conf.yaml high-availability: zookeeper high-availability.zookeeper.quorum: zk1:2181,zk2:2181 state.checkpoints.dir: hdfs://namenode/flink/checkpoints state.savepoints.dir: hdfs://namenode/flink/savepoints9. 运行命令on YARN flink run -m yarn-cluster -c com.company.OrderRealtimeJob your-job.jar10. 监控与排障建议 工具功能Flink Web UI监控 Task、Checkpoint、WatermarkPrometheus指标采集Grafana可视化AlertManager告警配置Savepoint容错恢复点 三、FlinkCDC实时采集数据入湖 解析Flink CDCChange Data Capture在大数据体系中的使用方法并结合 Kafka、Hudi、Iceberg、Hive、HDFS 等大数据组件提供一套 可落地、可执行、可扩展的完整集成方案。 1. Flink CDC 简介 Flink CDC 是 Apache Flink Debezium 的组合用于实时采集 MySQL/PostgreSQL 等数据库的变更数据INSERT/UPDATE/DELETE并以 流式方式传递到下游系统Kafka、Hudi、Iceberg、HBase 等。 2. 典型架构场景Flink CDC Hudi Hive 实时数据湖方案 ------------- ---------------------| MySQL/Postgres | || Source DB -------- | Flink CDC Connector |------------- | |--------------------|| Row-level ChangeLogv--------------------| Flink Job || (数据清洗/处理) |--------------------|v--------------------| Hudi Sink (Flink) |--------------------|v--------------------------| Hive / Presto / Trino || 实时查询支持 ACID |---------------------------3. 方案目标 实时采集 MySQL 数据基于 Binlog 支持变更Insert/Update/Delete语义 数据存入 Hudi 表支持 MOR/COW 格式 Hive/Presto 端可直接查询 4. 组件版本建议 组件版本建议Flink1.17.x 或 1.18.xFlink CDC2.4.1Debezium内嵌于 Flink CDCHudi0.13.1Hive2.3.x / 3.1.xHadoop/HDFS3.x 5. 部署准备 1安装 Kafka可选 用于做 CDC 中转可选支持 Flink 直接接 Hudi 2安装 Hive Metastore Hadoop HDFS 用于管理 Hudi 表元数据和 HDFS 存储 3准备 MySQL 源数据库 配置 binlog设置 binlog_format ROW并开启 server_id、binlog_row_image full 6. 关键配置代码与步骤 1添加 Maven 依赖 dependencies!-- Flink CDC --dependencygroupIdcom.ververica/groupIdartifactIdflink-connector-mysql-cdc/artifactIdversion2.4.1/version/dependency!-- Hudi Sink --dependencygroupIdorg.apache.hudi/groupIdartifactIdhudi-flink-bundle_2.12/artifactIdversion0.13.1/version/dependency /dependencies2Flink SQL 示例CDC → Hudi -- 1. 源表MySQL CDC 表 CREATE TABLE ods_orders (id STRING,user_id STRING,amount DOUBLE,ts TIMESTAMP(3),PRIMARY KEY (id) NOT ENFORCED ) WITH (connector mysql-cdc,hostname mysql-host,port 3306,username flink,password flink123,database-name srm,table-name orders,scan.startup.mode initial );-- 2. 目标表Hudi 表MOR 模式 CREATE TABLE dwd_orders (id STRING PRIMARY KEY NOT ENFORCED,user_id STRING,amount DOUBLE,ts TIMESTAMP(3) ) PARTITIONED BY (user_id) WITH (connector hudi,path hdfs://namenode/data/hudi/dwd_orders,table.type MERGE_ON_READ,hoodie.datasource.write.recordkey.field id,write.tasks 4,compaction.async.enabled true,hive_sync.enabled true,hive_sync.mode hms,hive_sync.metastore.uris thrift://hive-metastore:9083,hive_sync.db ods,hive_sync.table dwd_orders );-- 3. 实时写入 INSERT INTO dwd_orders SELECT * FROM ods_orders;7. 关键功能说明 功能配置字段说明主键变更支持PRIMARY KEY ... NOT ENFORCED支持 upsert增量采集模式scan.startup.mode initial首次全量 后续增量实时 compactioncompaction.async.enabled trueMOR 表性能保障Hive 数据同步hive_sync.enabled trueHudi 自动注册 Hive 元数据 8. 整合优化建议 1多表 CDC 同步统一处理 使用 Flink CDC 的 schema-name.table-name通配符 database-name srm, table-name .*,配合 Flink SQL Catalog Dynamic Table Factory可实现一拖 N 的多表处理逻辑。 2增加清洗逻辑如空值过滤、转换 SELECTid,user_id,amount * 1.13 AS amount_tax,ts FROM ods_orders WHERE amount IS NOT NULL;3写入 Kafka替代 Hudi → 用于事件总线或下游消费 CREATE TABLE kafka_sink (id STRING,user_id STRING,amount DOUBLE,ts TIMESTAMP(3) ) WITH (connector kafka,topic ods.orders,properties.bootstrap.servers kafka:9092,format json,scan.startup.mode latest-offset );9. Flink CDC 整合场景汇总 场景描述推荐组件实时数据入湖MySQL → HudiFlink CDC Hudi数据仓库加速Oracle → IcebergFlink CDC Iceberg数据中台构建MySQL → Kafka → 多下游Flink CDC Kafka数据回流校验Kafka → Flink → MySQLFlink SQL JDBC SinkDWD建模ODS → DWD/DWM → ADSFlink SQL 维表 JOIN 10. 可视化监控 工具功能Flink UICheckpoint、Watermark、吞吐Prometheus指标采集Grafana监控仪表盘HiveSQL 查询验证 四、自定义 Flink CDC Job 的完整实现 自定义 Flink CDC Job 的完整实现采用 Java DataStream API 编写支持 多表接入MySQL 为例 自定义清洗、转换逻辑 支持写入 Kafka、Hudi、Iceberg 等下游系统 可部署为标准 Flink 应用flink run 执行 1. 自定义 Flink CDC Job 场景说明 目标 从 MySQL 采集订单表 srm.orders 做清洗如金额换算、字段过滤 输出到 Hudi 表或 Kafka/Console 2. 依赖配置Maven dependencies!-- Flink CDC --dependencygroupIdcom.ververica/groupIdartifactIdflink-connector-mysql-cdc/artifactIdversion2.4.1/version/dependency!-- Flink 通用 --dependencygroupIdorg.apache.flink/groupIdartifactIdflink-streaming-java/artifactIdversion1.17.2/version/dependency!-- 可选Sink 依赖如 Kafka、Hudi、Iceberg -- /dependencies3. 完整代码示例CustomCdcJob.java public class CustomCdcJob {public static void main(String[] args) throws Exception {// 1. 创建执行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 2. 配置 CDC 源MySQLMySqlSourceOrder mysqlSource MySqlSource.Orderbuilder().hostname(mysql-host).port(3306).databaseList(srm).tableList(srm.orders).username(flink).password(flink123).deserializer(new OrderDeserializationSchema()) // 自定义反序列化.build();// 3. 接入 SourceDataStreamSourceOrder orderStream env.fromSource(mysqlSource,WatermarkStrategy.noWatermarks(),MySQL CDC Source);// 4. 数据清洗/转换SingleOutputStreamOperatorOrder cleaned orderStream.filter(order - order.amount 0).map(order - {order.amount order.amount * 1.13; // 加税return order;});// 5. Sink控制台 / Kafka / Hudicleaned.print();env.execute(Custom Flink CDC Job);} }4. 自定义反序列化器OrderDeserializationSchema public class OrderDeserializationSchema implements DebeziumDeserializationSchemaOrder {Overridepublic void deserialize(SourceRecord sourceRecord, CollectorOrder collector) {Struct value (Struct) sourceRecord.value();if (value null) return;Struct after value.getStruct(after);if (after ! null) {Order order new Order();order.id after.getString(id);order.userId after.getString(user_id);order.amount after.getFloat64(amount);order.ts Instant.ofEpochMilli(after.getInt64(ts)).atZone(ZoneId.of(UTC)).toLocalDateTime();collector.collect(order);}}Overridepublic TypeInformationOrder getProducedType() {return TypeInformation.of(Order.class);} }5. 定义 POJO 类Order.java public class Order implements Serializable {public String id;public String userId;public Double amount;public LocalDateTime ts;Overridepublic String toString() {return String.format([Order] id%s, user%s, amt%.2f, ts%s,id, userId, amount, ts.toString());} }6. Sink 可选方案 1控制台输出开发调试 cleaned.print();2Kafka Sink事件总线 KafkaSinkString kafkaSink KafkaSink.Stringbuilder().setBootstrapServers(kafka:9092).setRecordSerializer(KafkaRecordSerializationSchema.builder().setTopic(srm.orders.cdc).setValueSerializationSchema(new SimpleStringSchema()).build()).build();cleaned.map(order - JSON.toJSONString(order)).sinkTo(kafkaSink);3写入 Hudi 表通过 Flink Hudi Sink cleaned.addSink(HudiSinkUtil.getSink());自定义 Hudi Sink 工具类可基于 HoodieSink 封装。 七、打包部署方式 1使用 maven-shade-plugin 打 fat-jar mvn clean package -DskipTests输出custom-cdc-job-1.0-SNAPSHOT.jar 2提交到 Flink 集群 flink run -m yarn-cluster -c com.example.CustomCdcJob custom-cdc-job.jar8. 扩展功能可选 功能实现方式多表同步.tableList(srm.orders,srm.invoice)动态 schema 推导使用 JsonDebeziumDeserializationSchema维表 joinFlink SQL / Broadcast Join自定义状态存储Flink KeyedStateexactly-once 写入 Kafka/Hudi使用 checkpoint 支持
http://www.pierceye.com/news/412312/

相关文章:

  • 网站加上视频对seo影响wordpress打开xml-rpc
  • 个人网站建设分几个步走单页面网站多少钱
  • 自己做网站详细步骤保定网站建设方案优化
  • 传奇手游网站大全9377公司网站建设安全的风险
  • 昆明建设厅网站企业管理咨询上班好吗
  • 福州做网站销售公司用vs2010做网站的好处
  • 深圳企业建站平台网站备案费一般是多少
  • 郑州哪里有做网站郑州货拉拉
  • 汽车网页制作素材滕州网站搜索引擎优化
  • 网站备案地点郓城做网站
  • 专业的外贸网站建设公司价格网站如何制作浙江
  • 东莞运营推广网站建设费用微信小程序开发需要多少钱?
  • 福州专业网站搭建排名沈阳教做网站
  • 公益网站建设方案代码需求网站
  • php网站开发步骤苏州知名网站制作开发
  • 万网免费建企业网站长春搜索引擎优化
  • 网站如何建设数据库网站制作自己接单
  • 为什么有的网站点不开免费的png素材网
  • 百度多久收录网站整体vi设计公司
  • 卡盟网站怎么做图片大全wordpress企业主题餐饮
  • 网站建设培训公司网站跳出率高
  • 电脑网站手机版怎么做网站建设平台哪个公司好
  • 常州网站制作报价wordpress 主页不显示图片
  • 如何在淘宝上做自己的网站东莞通网上营业厅
  • 北京专业响应式网站建设龙岗品牌网站建设
  • 网站qq联系怎么做莲都区建设分局网站
  • 河南旅游集团 网站建设网络运营与推广
  • 搭建网站要多少钱龙岩融胤网络科技有限公司
  • 网站建设实训报告命名规范深圳外贸网站开发
  • 深圳好看的公司网站做网站 网络科技公司