网站制作 东莞,公司网站网页设计,网站空间购买官方,卡盟网站模板在数据处理中#xff0c;我们经常需要构建ETL的任务#xff0c;对数据进行加载#xff0c;转换处理后再写入到数据存储中。Google的云平台提供了多种方案来构建ETL任务#xff0c;我也研究了一下这些方案#xff0c;比较方案之间的优缺点#xff0c;从而找到一个最适合我…在数据处理中我们经常需要构建ETL的任务对数据进行加载转换处理后再写入到数据存储中。Google的云平台提供了多种方案来构建ETL任务我也研究了一下这些方案比较方案之间的优缺点从而找到一个最适合我业务场景的方案。
假设我们的业务场景需要定期从Kafka中获取数据经过一些数据清洗数据关联数据Enrich操作之后把数据写入到Bigquery数据仓库从而方便以后生成统计分析报表。
Google云平台提供了几个方案来完成这个任务
1. Datafusion通过在UI界面设计ETL pipeline然后把Pipeline转换为Spark应用部署在Dataproc上运行。
2. 编写Spark应用代码然后在Dataproc上运行或者在K8S集群上通过Spark operator来调度执行。
3. 编写Apache Beam代码通过Dataflow runner在VM上执行任务。
方案一的优点是基本不需要编写代码在图形界面上即可完成Pipeline的设计。缺点是如果有一些额外的需求可能不太方便实现另外最主要是太贵。Datafusion需要单独部署在一个Instance上24小时运行这个Instance企业版的收费大概一小时要几美元。另外Pipeline运行的时候会调度Dataproc的Instance这里会产生额外的费用。
方案二的优点是可以灵活的通过Spark代码来完成各种需求。缺点也是比较贵因为Dataproc是基于Hadoop集群的需要有Zookeeper, driver和executor这几个VM。如果采用K8S集群则Spark operator也是需要单独24小时运行在一个pod上另外还有额外的driver, executor的Pod需要调度执行。
方案三是综合考虑最优的方案因为Beam的代码是提供了一个通用的流批处理框架可以运行在Spark,Flink,Dataflow等引擎上而Dataflow是Google提供的一个优秀的引擎在运行任务时Dataflow按需调度VM来运行只收取运行时的费用。
因此对于我的这个业务场景使用方案三是最合适的。下面我将介绍一下整个实现的过程。
Beam批处理任务的实现
在Dataflow的官方Template里面有一个消费Kafka数据写入到Bigquery的例子但是这个是流处理方式实现的对于我的业务场景来说并不需要这么实时的处理数据只需要定期消费即可因此用批处理的方式更合适这样也能大幅节约费用。
Beam的Kafka I/O connector是默认处理的数据是无边界的即流式数据。要以批处理的方式来处理需要调用withStartReadTime和withStopReadTime两个方法获取要读取的Kafka topic的start和end offset这样就可以把数据转换为有边界数据。调用这两个方法需要注意的是如果Kafka没有任何一条消息的时间戳是大于等于这个时间戳的话那么会报错因此我们需要确定一下具体的时间戳。
以下的代码是检查Kafka消息的所有分区是否存在消息的时间戳是大于我们指定的时间戳如果不存在的话那么我们需要找出这些分区里面的最晚时间戳里面的最早的一个。例如Topic有3个分区要指定的时间戳是1697289783000但是3个分区里面的所有消息都小于这个时间戳因此我们需要分别找出每个分区里面的消息的最晚的时间戳然后取这3个分区的最晚时间戳里面最早的那个作为我们的指定时间戳。
public class CheckKafkaMsgTimestamp {private static final Logger LOG LoggerFactory.getLogger(CheckKafkaMsgTimestamp.class);public static KafkaResult getTimestamp(String bootstrapServer, String topic, long startTimestamp, long stopTimestamp) {long max_timestamp stopTimestamp;long max_records 5L;Properties props new Properties();props.setProperty(bootstrap.servers, bootstrapServer);props.setProperty(group.id, test);props.setProperty(enable.auto.commit, true);props.setProperty(auto.commit.interval.ms, 1000);props.setProperty(key.deserializer, org.apache.kafka.common.serialization.StringDeserializer);props.setProperty(value.deserializer, org.apache.kafka.common.serialization.StringDeserializer);KafkaConsumerString, String consumer new KafkaConsumer(props);// Get all the partitions of the topicint partition_num consumer.partitionsFor(topic).size();HashMapTopicPartition, Long search_map new HashMap();ArrayListTopicPartition tp new ArrayList();for (int i0;ipartition_num;i) {search_map.put(new TopicPartition(topic, i), stopTimestamp);tp.add(new TopicPartition(topic, i));}// Check if message exist with timestamp greater than search timestampBoolean flag true;ArrayListTopicPartition selected_tp new ArrayList();//LOG.info(Start to check the timestamp {}, stopTimestamp);MapTopicPartition, OffsetAndTimestamp results consumer.offsetsForTimes(search_map);for (Map.EntryTopicPartition, OffsetAndTimestamp entry : results.entrySet()) {OffsetAndTimestamp value entry.getValue();if (valuenull) { //there is at least one partition dont have timestamp greater or equal to the stopTimeflag false;break;}}// Get the latest timestamp of all partitions if the above check result is false// Note the timestamp is the earliest of all the partitions. if (!flag) {max_timestamp 0L;consumer.assign(tp);MapTopicPartition, Long endoffsets consumer.endOffsets(tp);for (Map.EntryTopicPartition, Long entry : endoffsets.entrySet()) {Long temp_timestamp 0L;int record_count 0;TopicPartition t entry.getKey();long offset entry.getValue();if (offset 1) {LOG.warn(Can not get max_timestamp as partition has no record!);continue;}consumer.assign(Arrays.asList(t));consumer.seek(t, offsetmax_records?offset-5:0);IteratorConsumerRecordString, String records consumer.poll(Duration.ofSeconds(2)).iterator();while (records.hasNext()) {record_count;ConsumerRecordString, String record records.next();LOG.info(Topic: {}, Record Timestamp: {}, recordcount: {}, t, record.timestamp(), record_count);if (temp_timestamp 0L || record.timestamp() temp_timestamp) {temp_timestamp record.timestamp();}}//LOG.info(Record count: {}, record_count);if (temp_timestamp 0L temp_timestamp startTimestamp) {if (max_timestamp 0L || max_timestamp temp_timestamp) {max_timestamp temp_timestamp;}selected_tp.add(t);LOG.info(Temp_timestamp {}, temp_timestamp);LOG.info(Selected topic partition {}, t);LOG.info(Partition offset {}, consumer.position(t));//consumer.seek(t, -1L);}}} else {selected_tp tp;}consumer.close();LOG.info(Max Timestamp: {}, max_timestamp);return new KafkaResult(max_timestamp, selected_tp);}
}
调用以上代码我们可以获取要选择的分区以及对应的时间戳。利用这两个信息我们就可以把指定时间范围内的Kafka数据转换为有边界数据了。以下是Beam建立Pipeline并处理数据然后写入到Bigquery的代码
KafkaResult checkResult CheckKafkaMsgTimestamp.getTimestamp(options.getBootstrapServer(), options.getInputTopic(), start_read_time, stop_read_time);
stop_read_time checkResult.max_timestamp;
ArrayListTopicPartition selected_tp checkResult.selected_tp;PCollectionString input pipeline.apply(Read messages from Kafka,KafkaIO.String, Stringread().withBootstrapServers(options.getBootstrapServer()).withKeyDeserializer(StringDeserializer.class).withValueDeserializer(StringDeserializer.class).withConsumerConfigUpdates(ImmutableMap.of(group.id, telematics_statistic.app, enable.auto.commit, true)).withStartReadTime(Instant.ofEpochMilli(start_read_time)).withStopReadTime(Instant.ofEpochMilli(stop_read_time)).withTopicPartitions(selected_tp).withoutMetadata()).apply(Get message contents, Values.Stringcreate());PCollectionTuple msgTuple input.apply(Filter message, ParDo.of(new DoFnString, TelematicsStatisticsMsg() {ProcessElementpublic void processElement(Element String element, MultiOutputReceiver out) {TelematicsStatisticsMsg msg GSON.fromJson(element, TelematicsStatisticsMsg.class);if (msg.timestamp0 || msg.vinnull) {out.get(otherMsgTag).output(element);} else {if (msg.timestampstart_process_time || msg.timestampstop_process_time) {out.get(otherMsgTag).output(element);} else {out.get(statisticsMsgTag).output(msg);}}}}).withOutputTags(statisticsMsgTag, TupleTagList.of(otherMsgTag))); // Get the filter out msg
PCollectionTelematicsStatisticsMsg statisticsMsg msgTuple.get(statisticsMsgTag);
// Save the raw records to Bigquery
statisticsMsg.apply(Convert raw records to BigQuery TableRow, MapElements.into(TypeDescriptor.of(TableRow.class)).via(TelematicsStatisticsMsg - new TableRow().set(timestamp, Instant.ofEpochMilli(TelematicsStatisticsMsg.timestamp).toString()).set(vin, TelematicsStatisticsMsg.vin).set(service, TelematicsStatisticsMsg.service).set(type, TelematicsStatisticsMsg.messageType))).apply(Save raw records to BigQuery, BigQueryIO.writeTableRows().to(options.getStatisticsOutputTable()).withSchema(new TableSchema().setFields(Arrays.asList(new TableFieldSchema().setName(timestamp).setType(TIMESTAMP),new TableFieldSchema().setName(vin).setType(STRING),new TableFieldSchema().setName(service).setType(STRING),new TableFieldSchema().setName(type).setType(STRING)))).withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED).withWriteDisposition(WriteDisposition.WRITE_APPEND));PipelineResult result pipeline.run();
try {result.getState();result.waitUntilFinish();
} catch (UnsupportedOperationException e) {// do nothing
} catch (Exception e) {e.printStackTrace();
}
需要注意的是每次处理任务完成后我们需要把当前的stopReadTime记录下来下次任务运行的时候把这个时间戳作为startReadTime这样可以避免某些情况下的数据缺失读取的问题。这个时间戳我们可以把其记录在GCS的bucket里面。这里略过这部分代码。
提交Dataflow任务
之后我们就可以调用Google的Cloud Build功能来把代码打包为Flex Template
首先在Java项目中运行mvn clean package打包jar文件
然后在命令行中设置以下环境变量
export TEMPLATE_PATHgs://[your project ID]/dataflow/templates/telematics-pipeline.json
export TEMPLATE_IMAGEgcr.io/[your project ID]/telematics-pipeline:latest
export REGIONus-west1
之后运行gcloud build的命令来构建镜像
gcloud dataflow flex-template build $TEMPLATE_PATH --image-gcr-path $TEMPLATE_IMAGE --sdk-language JAVA --flex-template-base-image gcr.io/dataflow-templates-base/java17-template-launcher-base:20230308_RC00 --jar target/telematics-pipeline-1.0-SNAPSHOT.jar --env FLEX_TEMPLATE_JAVA_MAIN_CLASScom.example.TelematicsBatch
最后就可以调用命令来提交任务执行了
gcloud dataflow flex-template run analytics-pipeline-date %Y%m%d-%H%M%S --template-file-gcs-location $TEMPLATE_PATH --region us-west1 --parameters ^~^bootstrapServerkafka-1:9094,kafka-2:9094~statisticsOutputTableyouprojectid:dataset.tablename~serviceAccountxxxprojectid.iam.gserviceaccount.com~regionus-west1~usePublicIpsfalse~runnerDataflowRunner~subnetworkXXXX~tempLocationgs://bucketname/temp/~startTime1693530000000~stopTime1697216400000~processStartTime1693530000000~processStopTime1697216400000
如果我们需要任务自动定期执行还可以在dataflow里面import一个Pipeline用之前指定的Template_path来导入。然后设置任务的定期周期和启动时间即可非常方便。