网站采集被降权,免费ppt资源网站,郑州网站制作网,外贸云一、Flink与其他组件的协同
Flink 是一个分布式、高性能、始终可用、准确一次#xff08;Exactly-Once#xff09;语义的流处理引擎#xff0c;广泛应用于大数据实时处理场景中。它与 Hadoop 生态系统中的组件可以深度集成#xff0c;形成完整的大数据处理链路。下面我们从…一、Flink与其他组件的协同
Flink 是一个分布式、高性能、始终可用、准确一次Exactly-Once语义的流处理引擎广泛应用于大数据实时处理场景中。它与 Hadoop 生态系统中的组件可以深度集成形成完整的大数据处理链路。下面我们从 Flink 的 核心架构 出发结合与 Hadoop 组件协同方式详细剖析 Flink 的作用。 1. Flink 核心架构详解
1架构组件图概览
-------------------------
| Client |
-------------------------|v
-------------------------
| JobManager (JM) | -- Master 负责调度
-------------------------|v
-------------------------
| TaskManagers (TM) | -- Worker 执行算子任务
-------------------------|v
-------------------------
| Slot | -- 执行资源单位
-------------------------2核心组件职责
组件描述Client提交作业到 Flink 集群触发作业执行。JobManager (JM)管理作业生命周期负责调度任务、故障恢复、协调检查点Checkpoint等。TaskManager (TM)具体执行作业的物理任务算子负责数据交换、状态管理等。SlotTaskManager 内部的资源单位用于任务部署。每个 TaskManager 有多个 Slot。
3状态管理与容错 Checkpoint/Savepoint可恢复一致性状态Exactly Once State Backend保存状态如 RocksDB、FsStateBackend Recovery通过重放 Checkpoint 恢复任务 2. Flink 与 Hadoop 各组件的协同关系
Flink 虽然是独立系统但能与 Hadoop 生态的多个关键组件协同工作构建完整的大数据平台。
1与 HDFSHadoop Distributed File System
协同方式描述输入源Flink 可直接读取 HDFS 中的批量数据如 ORC、Parquet、Text 等格式状态后端Flink Checkpoint/Savepoint 可存储到 HDFS 上保证高可用与容灾输出目标Flink 作业可以将计算结果输出到 HDFS作为后续离线处理的数据
fs.defaultFS: hdfs://namenode:8020
state.backend: filesystem
state.checkpoints.dir: hdfs://namenode:8020/flink/checkpoints/2与 Hive
协同方式描述读取表数据Flink 可通过 Hive Catalog 与 Hive 元数据打通直接读取 Hive 表写入表Flink SQL 可将流式数据写入 Hive使用 INSERT INTO统一元数据Flink Hive Catalog 支持表结构共享便于湖仓一体实践
CREATE CATALOG my_hive WITH (type hive,hive-conf-dir /etc/hive/conf
);3与 Kafka实时采集
协同方式描述实时数据源Flink 通过 Kafka Source 接收实时数据流如日志、订单等下游结果写入Flink 可将流式计算结果写入 Kafka供下游消费Exactly Once 语义Flink Kafka Checkpoint 可实现端到端的精确一次语义
FlinkKafkaConsumerString consumer new FlinkKafkaConsumer(topic, new SimpleStringSchema(), properties);
consumer.setStartFromGroupOffsets();
consumer.setCommitOffsetsOnCheckpoints(true);4与 HBase实时查询
协同方式描述维表关联Flink 可使用 HBase 作为维表进行流批 Join实时补充维度数据实时写入计算结果可实时写入 HBase支持下游查询系统使用如用户画像等
tableEnv.executeSql(CREATE TABLE hbase_dim (...) WITH (connector hbase-2.2, ...));5与 YARN
协同方式描述资源调度Flink 可部署在 YARN 上利用 Hadoop 的资源调度管理能力Session / Per-Job 模式支持多租户资源隔离或每个作业独立资源隔离部署
flink run -m yarn-cluster -ynm my-flink-job myjob.jar6与 Zookeeper
协同方式描述高可用 JobManager使用 Zookeeper 实现 JobManager 的 leader electionCheckpoint HA 元数据存储配合 HDFS 存储 Checkpoint 元数据路径
high-availability: zookeeper
high-availability.zookeeper.quorum: zk1:2181,zk2:2181,zk3:2181
high-availability.storageDir: hdfs://namenode:8020/flink/ha/3. Flink 的作用总结
模块Flink 的角色实时数据处理核心组件进行低延迟、高吞吐流处理计算数据清洗与 ETL提供强大 SQL / DataStream API 进行多源数据处理与聚合实时指标计算支持实时 KPI、UV/PV、订单流等分析数据湖构建可作为流式数据入湖的计算引擎结合 Hudi/Iceberg实时监控预警搭配 Kafka Prometheus构建告警与监控系统实时数仓建设联合 Kafka Hive HDFS HBase 构建流批一体数仓体系 4. Flink 架构在 Hadoop 平台的实际部署图 -------------| Flume/Nginx|------------|Kafka集群|---------------------------------------| |---v--- ----v----| Flink |-- 清洗 → 维表 Join → 计算 | Spark |------ --------| |
-------v--------- --------v--------
| HBase/Redis | | HDFS / Hive |
----------------- -----------------二、Flink DataStream API的使用
现在以 Flink DataStream API 为核心深入剖析一个真实生产场景的 从 Kafka 到 Kafka 的流式处理全流程包括 项目结构与依赖 数据模型与清洗 水位线与乱序处理 异步维表查询HBase/MySQL/Redis 窗口聚合逻辑 数据下发Kafka Sink 容错机制与 Checkpoint 配置 1. 项目结构与依赖
1Maven 依赖pom.xml
dependenciesdependencygroupIdorg.apache.flink/groupIdartifactIdflink-java/artifactIdversion1.17.1/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-streaming-java/artifactIdversion1.17.1/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-kafka/artifactIdversion3.0.1-1.17/version/dependencydependencygroupIdcom.fasterxml.jackson.core/groupIdartifactIdjackson-databind/artifactIdversion2.14.2/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-hbase-2.2/artifactIdversion1.17.1/version/dependency
/dependencies2. 数据模型定义
1订单数据结构OrderEvent
public class OrderEvent {public String orderId;public String userId;public String productId;public double price;public int quantity;public long orderTime; // epoch millis
}2 商品维度ProductInfo
public class ProductInfo {public String productId;public String categoryId;public String productName;
}3聚合结果结构OrderStat
public class OrderStat {public String categoryId;public long windowStart;public long windowEnd;public double totalAmount;
}3. Kafka Source JSON 反序列化
StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60000, CheckpointingMode.EXACTLY_ONCE);KafkaSourceString kafkaSource KafkaSource.Stringbuilder().setBootstrapServers(kafka:9092).setTopics(order_events).setGroupId(flink-consumer).setStartingOffsets(OffsetsInitializer.latest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamOrderEvent orderStream env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), KafkaSource).map(json - new ObjectMapper().readValue(json, OrderEvent.class)).returns(OrderEvent.class);4. 水位线处理乱序数据支持
WatermarkStrategyOrderEvent watermarkStrategy WatermarkStrategy.OrderEventforBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner((event, ts) - event.orderTime);DataStreamOrderEvent orderStreamWithWM orderStream.assignTimestampsAndWatermarks(watermarkStrategy);5. 异步维表关联以 HBase 为例
使用 AsyncFunction 实现异步查询支持 Redis/HBase/MySQL
示例实现AsyncProductEnrichmentFunction
public class AsyncProductEnrichmentFunction extends RichAsyncFunctionOrderEvent, Tuple2OrderEvent, ProductInfo {private transient HBaseClient hBaseClient;Overridepublic void open(Configuration parameters) throws Exception {hBaseClient new HBaseClient(hbase.zookeeper.quorum);}Overridepublic void asyncInvoke(OrderEvent input, ResultFutureTuple2OrderEvent, ProductInfo resultFuture) {CompletableFuture.supplyAsync(() - hBaseClient.queryProductInfo(input.productId)).thenAccept(productInfo - resultFuture.complete(Collections.singletonList(Tuple2.of(input, productInfo))));}Overridepublic void close() throws Exception {hBaseClient.close();}
}应用异步函数
DataStreamTuple2OrderEvent, ProductInfo enrichedStream AsyncDataStream.unorderedWait(orderStreamWithWM,new AsyncProductEnrichmentFunction(),5, TimeUnit.SECONDS, 100
);6. 按类目 ID 滚动窗口聚合
DataStreamOrderStat resultStream enrichedStream.map(tuple - new Tuple3(tuple.f1.categoryId, tuple.f0.orderTime, tuple.f0.price * tuple.f0.quantity)).returns(Types.TUPLE(Types.STRING, Types.LONG, Types.DOUBLE)).assignTimestampsAndWatermarks(WatermarkStrategy.Tuple3String, Long, DoubleforBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner((t, ts) - t.f1)).keyBy(t - t.f0).window(TumblingEventTimeWindows.of(Time.minutes(1))).aggregate(new AggregateFunctionTuple3String, Long, Double, Double, OrderStat() {private long windowStart, windowEnd;private String categoryId;public Double createAccumulator() { return 0.0; }public Double add(Tuple3String, Long, Double value, Double acc) {categoryId value.f0;return acc value.f2;}public OrderStat getResult(Double acc) {return new OrderStat(categoryId, windowStart, windowEnd, acc);}public Double merge(Double acc1, Double acc2) {return acc1 acc2;}}, new ProcessWindowFunctionOrderStat, OrderStat, String, TimeWindow() {Overridepublic void process(String key, Context context, IterableOrderStat elements, CollectorOrderStat out) {OrderStat stat elements.iterator().next();stat.windowStart context.window().getStart();stat.windowEnd context.window().getEnd();out.collect(stat);}});7. 写入 Kafka Sink
KafkaSinkOrderStat kafkaSink KafkaSink.OrderStatbuilder().setBootstrapServers(kafka:9092).setRecordSerializer(KafkaRecordSerializationSchema.builder().setTopic(order_stats).setValueSerializationSchema(stat - {ObjectMapper mapper new ObjectMapper();return mapper.writeValueAsBytes(stat);}).build()).setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE).build();resultStream.sinkTo(kafkaSink);8. 容错与 HA 配置关键
1Checkpoint 配置
env.enableCheckpointing(60000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000);
env.setStateBackend(new RocksDBStateBackend(hdfs://namenode/flink/checkpoints));2高可用配置flink-conf.yaml
high-availability: zookeeper
high-availability.zookeeper.quorum: zk1:2181,zk2:2181
state.checkpoints.dir: hdfs://namenode/flink/checkpoints
state.savepoints.dir: hdfs://namenode/flink/savepoints9. 运行命令on YARN
flink run -m yarn-cluster -c com.company.OrderRealtimeJob your-job.jar10. 监控与排障建议
工具功能Flink Web UI监控 Task、Checkpoint、WatermarkPrometheus指标采集Grafana可视化AlertManager告警配置Savepoint容错恢复点 三、FlinkCDC实时采集数据入湖
解析Flink CDCChange Data Capture在大数据体系中的使用方法并结合 Kafka、Hudi、Iceberg、Hive、HDFS 等大数据组件提供一套 可落地、可执行、可扩展的完整集成方案。 1. Flink CDC 简介
Flink CDC 是 Apache Flink Debezium 的组合用于实时采集 MySQL/PostgreSQL 等数据库的变更数据INSERT/UPDATE/DELETE并以 流式方式传递到下游系统Kafka、Hudi、Iceberg、HBase 等。 2. 典型架构场景Flink CDC Hudi Hive 实时数据湖方案 ------------- ---------------------| MySQL/Postgres | || Source DB -------- | Flink CDC Connector |------------- | |--------------------|| Row-level ChangeLogv--------------------| Flink Job || (数据清洗/处理) |--------------------|v--------------------| Hudi Sink (Flink) |--------------------|v--------------------------| Hive / Presto / Trino || 实时查询支持 ACID |---------------------------3. 方案目标 实时采集 MySQL 数据基于 Binlog 支持变更Insert/Update/Delete语义 数据存入 Hudi 表支持 MOR/COW 格式 Hive/Presto 端可直接查询 4. 组件版本建议
组件版本建议Flink1.17.x 或 1.18.xFlink CDC2.4.1Debezium内嵌于 Flink CDCHudi0.13.1Hive2.3.x / 3.1.xHadoop/HDFS3.x 5. 部署准备
1安装 Kafka可选
用于做 CDC 中转可选支持 Flink 直接接 Hudi
2安装 Hive Metastore Hadoop HDFS
用于管理 Hudi 表元数据和 HDFS 存储
3准备 MySQL 源数据库
配置 binlog设置 binlog_format ROW并开启 server_id、binlog_row_image full 6. 关键配置代码与步骤
1添加 Maven 依赖
dependencies!-- Flink CDC --dependencygroupIdcom.ververica/groupIdartifactIdflink-connector-mysql-cdc/artifactIdversion2.4.1/version/dependency!-- Hudi Sink --dependencygroupIdorg.apache.hudi/groupIdartifactIdhudi-flink-bundle_2.12/artifactIdversion0.13.1/version/dependency
/dependencies2Flink SQL 示例CDC → Hudi
-- 1. 源表MySQL CDC 表
CREATE TABLE ods_orders (id STRING,user_id STRING,amount DOUBLE,ts TIMESTAMP(3),PRIMARY KEY (id) NOT ENFORCED
) WITH (connector mysql-cdc,hostname mysql-host,port 3306,username flink,password flink123,database-name srm,table-name orders,scan.startup.mode initial
);-- 2. 目标表Hudi 表MOR 模式
CREATE TABLE dwd_orders (id STRING PRIMARY KEY NOT ENFORCED,user_id STRING,amount DOUBLE,ts TIMESTAMP(3)
) PARTITIONED BY (user_id)
WITH (connector hudi,path hdfs://namenode/data/hudi/dwd_orders,table.type MERGE_ON_READ,hoodie.datasource.write.recordkey.field id,write.tasks 4,compaction.async.enabled true,hive_sync.enabled true,hive_sync.mode hms,hive_sync.metastore.uris thrift://hive-metastore:9083,hive_sync.db ods,hive_sync.table dwd_orders
);-- 3. 实时写入
INSERT INTO dwd_orders
SELECT * FROM ods_orders;7. 关键功能说明
功能配置字段说明主键变更支持PRIMARY KEY ... NOT ENFORCED支持 upsert增量采集模式scan.startup.mode initial首次全量 后续增量实时 compactioncompaction.async.enabled trueMOR 表性能保障Hive 数据同步hive_sync.enabled trueHudi 自动注册 Hive 元数据 8. 整合优化建议
1多表 CDC 同步统一处理
使用 Flink CDC 的 schema-name.table-name通配符
database-name srm,
table-name .*,配合 Flink SQL Catalog Dynamic Table Factory可实现一拖 N 的多表处理逻辑。 2增加清洗逻辑如空值过滤、转换
SELECTid,user_id,amount * 1.13 AS amount_tax,ts
FROM ods_orders
WHERE amount IS NOT NULL;3写入 Kafka替代 Hudi → 用于事件总线或下游消费
CREATE TABLE kafka_sink (id STRING,user_id STRING,amount DOUBLE,ts TIMESTAMP(3)
) WITH (connector kafka,topic ods.orders,properties.bootstrap.servers kafka:9092,format json,scan.startup.mode latest-offset
);9. Flink CDC 整合场景汇总
场景描述推荐组件实时数据入湖MySQL → HudiFlink CDC Hudi数据仓库加速Oracle → IcebergFlink CDC Iceberg数据中台构建MySQL → Kafka → 多下游Flink CDC Kafka数据回流校验Kafka → Flink → MySQLFlink SQL JDBC SinkDWD建模ODS → DWD/DWM → ADSFlink SQL 维表 JOIN 10. 可视化监控
工具功能Flink UICheckpoint、Watermark、吞吐Prometheus指标采集Grafana监控仪表盘HiveSQL 查询验证 四、自定义 Flink CDC Job 的完整实现
自定义 Flink CDC Job 的完整实现采用 Java DataStream API 编写支持 多表接入MySQL 为例 自定义清洗、转换逻辑 支持写入 Kafka、Hudi、Iceberg 等下游系统 可部署为标准 Flink 应用flink run 执行 1. 自定义 Flink CDC Job 场景说明
目标 从 MySQL 采集订单表 srm.orders 做清洗如金额换算、字段过滤 输出到 Hudi 表或 Kafka/Console 2. 依赖配置Maven
dependencies!-- Flink CDC --dependencygroupIdcom.ververica/groupIdartifactIdflink-connector-mysql-cdc/artifactIdversion2.4.1/version/dependency!-- Flink 通用 --dependencygroupIdorg.apache.flink/groupIdartifactIdflink-streaming-java/artifactIdversion1.17.2/version/dependency!-- 可选Sink 依赖如 Kafka、Hudi、Iceberg --
/dependencies3. 完整代码示例CustomCdcJob.java
public class CustomCdcJob {public static void main(String[] args) throws Exception {// 1. 创建执行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 2. 配置 CDC 源MySQLMySqlSourceOrder mysqlSource MySqlSource.Orderbuilder().hostname(mysql-host).port(3306).databaseList(srm).tableList(srm.orders).username(flink).password(flink123).deserializer(new OrderDeserializationSchema()) // 自定义反序列化.build();// 3. 接入 SourceDataStreamSourceOrder orderStream env.fromSource(mysqlSource,WatermarkStrategy.noWatermarks(),MySQL CDC Source);// 4. 数据清洗/转换SingleOutputStreamOperatorOrder cleaned orderStream.filter(order - order.amount 0).map(order - {order.amount order.amount * 1.13; // 加税return order;});// 5. Sink控制台 / Kafka / Hudicleaned.print();env.execute(Custom Flink CDC Job);}
}4. 自定义反序列化器OrderDeserializationSchema
public class OrderDeserializationSchema implements DebeziumDeserializationSchemaOrder {Overridepublic void deserialize(SourceRecord sourceRecord, CollectorOrder collector) {Struct value (Struct) sourceRecord.value();if (value null) return;Struct after value.getStruct(after);if (after ! null) {Order order new Order();order.id after.getString(id);order.userId after.getString(user_id);order.amount after.getFloat64(amount);order.ts Instant.ofEpochMilli(after.getInt64(ts)).atZone(ZoneId.of(UTC)).toLocalDateTime();collector.collect(order);}}Overridepublic TypeInformationOrder getProducedType() {return TypeInformation.of(Order.class);}
}5. 定义 POJO 类Order.java
public class Order implements Serializable {public String id;public String userId;public Double amount;public LocalDateTime ts;Overridepublic String toString() {return String.format([Order] id%s, user%s, amt%.2f, ts%s,id, userId, amount, ts.toString());}
}6. Sink 可选方案
1控制台输出开发调试
cleaned.print();2Kafka Sink事件总线
KafkaSinkString kafkaSink KafkaSink.Stringbuilder().setBootstrapServers(kafka:9092).setRecordSerializer(KafkaRecordSerializationSchema.builder().setTopic(srm.orders.cdc).setValueSerializationSchema(new SimpleStringSchema()).build()).build();cleaned.map(order - JSON.toJSONString(order)).sinkTo(kafkaSink);3写入 Hudi 表通过 Flink Hudi Sink
cleaned.addSink(HudiSinkUtil.getSink());自定义 Hudi Sink 工具类可基于 HoodieSink 封装。 七、打包部署方式
1使用 maven-shade-plugin 打 fat-jar
mvn clean package -DskipTests输出custom-cdc-job-1.0-SNAPSHOT.jar 2提交到 Flink 集群
flink run -m yarn-cluster -c com.example.CustomCdcJob custom-cdc-job.jar8. 扩展功能可选
功能实现方式多表同步.tableList(srm.orders,srm.invoice)动态 schema 推导使用 JsonDebeziumDeserializationSchema维表 joinFlink SQL / Broadcast Join自定义状态存储Flink KeyedStateexactly-once 写入 Kafka/Hudi使用 checkpoint 支持