当前位置: 首页 > news >正文

网站制作 东莞公司网站网页设计

网站制作 东莞,公司网站网页设计,网站空间购买官方,卡盟网站模板在数据处理中#xff0c;我们经常需要构建ETL的任务#xff0c;对数据进行加载#xff0c;转换处理后再写入到数据存储中。Google的云平台提供了多种方案来构建ETL任务#xff0c;我也研究了一下这些方案#xff0c;比较方案之间的优缺点#xff0c;从而找到一个最适合我…在数据处理中我们经常需要构建ETL的任务对数据进行加载转换处理后再写入到数据存储中。Google的云平台提供了多种方案来构建ETL任务我也研究了一下这些方案比较方案之间的优缺点从而找到一个最适合我业务场景的方案。 假设我们的业务场景需要定期从Kafka中获取数据经过一些数据清洗数据关联数据Enrich操作之后把数据写入到Bigquery数据仓库从而方便以后生成统计分析报表。 Google云平台提供了几个方案来完成这个任务 1. Datafusion通过在UI界面设计ETL pipeline然后把Pipeline转换为Spark应用部署在Dataproc上运行。 2. 编写Spark应用代码然后在Dataproc上运行或者在K8S集群上通过Spark operator来调度执行。 3. 编写Apache Beam代码通过Dataflow runner在VM上执行任务。 方案一的优点是基本不需要编写代码在图形界面上即可完成Pipeline的设计。缺点是如果有一些额外的需求可能不太方便实现另外最主要是太贵。Datafusion需要单独部署在一个Instance上24小时运行这个Instance企业版的收费大概一小时要几美元。另外Pipeline运行的时候会调度Dataproc的Instance这里会产生额外的费用。 方案二的优点是可以灵活的通过Spark代码来完成各种需求。缺点也是比较贵因为Dataproc是基于Hadoop集群的需要有Zookeeper, driver和executor这几个VM。如果采用K8S集群则Spark operator也是需要单独24小时运行在一个pod上另外还有额外的driver, executor的Pod需要调度执行。 方案三是综合考虑最优的方案因为Beam的代码是提供了一个通用的流批处理框架可以运行在Spark,Flink,Dataflow等引擎上而Dataflow是Google提供的一个优秀的引擎在运行任务时Dataflow按需调度VM来运行只收取运行时的费用。 因此对于我的这个业务场景使用方案三是最合适的。下面我将介绍一下整个实现的过程。 Beam批处理任务的实现 在Dataflow的官方Template里面有一个消费Kafka数据写入到Bigquery的例子但是这个是流处理方式实现的对于我的业务场景来说并不需要这么实时的处理数据只需要定期消费即可因此用批处理的方式更合适这样也能大幅节约费用。 Beam的Kafka I/O connector是默认处理的数据是无边界的即流式数据。要以批处理的方式来处理需要调用withStartReadTime和withStopReadTime两个方法获取要读取的Kafka topic的start和end offset这样就可以把数据转换为有边界数据。调用这两个方法需要注意的是如果Kafka没有任何一条消息的时间戳是大于等于这个时间戳的话那么会报错因此我们需要确定一下具体的时间戳。 以下的代码是检查Kafka消息的所有分区是否存在消息的时间戳是大于我们指定的时间戳如果不存在的话那么我们需要找出这些分区里面的最晚时间戳里面的最早的一个。例如Topic有3个分区要指定的时间戳是1697289783000但是3个分区里面的所有消息都小于这个时间戳因此我们需要分别找出每个分区里面的消息的最晚的时间戳然后取这3个分区的最晚时间戳里面最早的那个作为我们的指定时间戳。 public class CheckKafkaMsgTimestamp {private static final Logger LOG LoggerFactory.getLogger(CheckKafkaMsgTimestamp.class);public static KafkaResult getTimestamp(String bootstrapServer, String topic, long startTimestamp, long stopTimestamp) {long max_timestamp stopTimestamp;long max_records 5L;Properties props new Properties();props.setProperty(bootstrap.servers, bootstrapServer);props.setProperty(group.id, test);props.setProperty(enable.auto.commit, true);props.setProperty(auto.commit.interval.ms, 1000);props.setProperty(key.deserializer, org.apache.kafka.common.serialization.StringDeserializer);props.setProperty(value.deserializer, org.apache.kafka.common.serialization.StringDeserializer);KafkaConsumerString, String consumer new KafkaConsumer(props);// Get all the partitions of the topicint partition_num consumer.partitionsFor(topic).size();HashMapTopicPartition, Long search_map new HashMap();ArrayListTopicPartition tp new ArrayList();for (int i0;ipartition_num;i) {search_map.put(new TopicPartition(topic, i), stopTimestamp);tp.add(new TopicPartition(topic, i));}// Check if message exist with timestamp greater than search timestampBoolean flag true;ArrayListTopicPartition selected_tp new ArrayList();//LOG.info(Start to check the timestamp {}, stopTimestamp);MapTopicPartition, OffsetAndTimestamp results consumer.offsetsForTimes(search_map);for (Map.EntryTopicPartition, OffsetAndTimestamp entry : results.entrySet()) {OffsetAndTimestamp value entry.getValue();if (valuenull) { //there is at least one partition dont have timestamp greater or equal to the stopTimeflag false;break;}}// Get the latest timestamp of all partitions if the above check result is false// Note the timestamp is the earliest of all the partitions. if (!flag) {max_timestamp 0L;consumer.assign(tp);MapTopicPartition, Long endoffsets consumer.endOffsets(tp);for (Map.EntryTopicPartition, Long entry : endoffsets.entrySet()) {Long temp_timestamp 0L;int record_count 0;TopicPartition t entry.getKey();long offset entry.getValue();if (offset 1) {LOG.warn(Can not get max_timestamp as partition has no record!);continue;}consumer.assign(Arrays.asList(t));consumer.seek(t, offsetmax_records?offset-5:0);IteratorConsumerRecordString, String records consumer.poll(Duration.ofSeconds(2)).iterator();while (records.hasNext()) {record_count;ConsumerRecordString, String record records.next();LOG.info(Topic: {}, Record Timestamp: {}, recordcount: {}, t, record.timestamp(), record_count);if (temp_timestamp 0L || record.timestamp() temp_timestamp) {temp_timestamp record.timestamp();}}//LOG.info(Record count: {}, record_count);if (temp_timestamp 0L temp_timestamp startTimestamp) {if (max_timestamp 0L || max_timestamp temp_timestamp) {max_timestamp temp_timestamp;}selected_tp.add(t);LOG.info(Temp_timestamp {}, temp_timestamp);LOG.info(Selected topic partition {}, t);LOG.info(Partition offset {}, consumer.position(t));//consumer.seek(t, -1L);}}} else {selected_tp tp;}consumer.close();LOG.info(Max Timestamp: {}, max_timestamp);return new KafkaResult(max_timestamp, selected_tp);} } 调用以上代码我们可以获取要选择的分区以及对应的时间戳。利用这两个信息我们就可以把指定时间范围内的Kafka数据转换为有边界数据了。以下是Beam建立Pipeline并处理数据然后写入到Bigquery的代码 KafkaResult checkResult CheckKafkaMsgTimestamp.getTimestamp(options.getBootstrapServer(), options.getInputTopic(), start_read_time, stop_read_time); stop_read_time checkResult.max_timestamp; ArrayListTopicPartition selected_tp checkResult.selected_tp;PCollectionString input pipeline.apply(Read messages from Kafka,KafkaIO.String, Stringread().withBootstrapServers(options.getBootstrapServer()).withKeyDeserializer(StringDeserializer.class).withValueDeserializer(StringDeserializer.class).withConsumerConfigUpdates(ImmutableMap.of(group.id, telematics_statistic.app, enable.auto.commit, true)).withStartReadTime(Instant.ofEpochMilli(start_read_time)).withStopReadTime(Instant.ofEpochMilli(stop_read_time)).withTopicPartitions(selected_tp).withoutMetadata()).apply(Get message contents, Values.Stringcreate());PCollectionTuple msgTuple input.apply(Filter message, ParDo.of(new DoFnString, TelematicsStatisticsMsg() {ProcessElementpublic void processElement(Element String element, MultiOutputReceiver out) {TelematicsStatisticsMsg msg GSON.fromJson(element, TelematicsStatisticsMsg.class);if (msg.timestamp0 || msg.vinnull) {out.get(otherMsgTag).output(element);} else {if (msg.timestampstart_process_time || msg.timestampstop_process_time) {out.get(otherMsgTag).output(element);} else {out.get(statisticsMsgTag).output(msg);}}}}).withOutputTags(statisticsMsgTag, TupleTagList.of(otherMsgTag))); // Get the filter out msg PCollectionTelematicsStatisticsMsg statisticsMsg msgTuple.get(statisticsMsgTag); // Save the raw records to Bigquery statisticsMsg.apply(Convert raw records to BigQuery TableRow, MapElements.into(TypeDescriptor.of(TableRow.class)).via(TelematicsStatisticsMsg - new TableRow().set(timestamp, Instant.ofEpochMilli(TelematicsStatisticsMsg.timestamp).toString()).set(vin, TelematicsStatisticsMsg.vin).set(service, TelematicsStatisticsMsg.service).set(type, TelematicsStatisticsMsg.messageType))).apply(Save raw records to BigQuery, BigQueryIO.writeTableRows().to(options.getStatisticsOutputTable()).withSchema(new TableSchema().setFields(Arrays.asList(new TableFieldSchema().setName(timestamp).setType(TIMESTAMP),new TableFieldSchema().setName(vin).setType(STRING),new TableFieldSchema().setName(service).setType(STRING),new TableFieldSchema().setName(type).setType(STRING)))).withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED).withWriteDisposition(WriteDisposition.WRITE_APPEND));PipelineResult result pipeline.run(); try {result.getState();result.waitUntilFinish(); } catch (UnsupportedOperationException e) {// do nothing } catch (Exception e) {e.printStackTrace(); } 需要注意的是每次处理任务完成后我们需要把当前的stopReadTime记录下来下次任务运行的时候把这个时间戳作为startReadTime这样可以避免某些情况下的数据缺失读取的问题。这个时间戳我们可以把其记录在GCS的bucket里面。这里略过这部分代码。 提交Dataflow任务 之后我们就可以调用Google的Cloud Build功能来把代码打包为Flex Template 首先在Java项目中运行mvn clean package打包jar文件 然后在命令行中设置以下环境变量 export TEMPLATE_PATHgs://[your project ID]/dataflow/templates/telematics-pipeline.json export TEMPLATE_IMAGEgcr.io/[your project ID]/telematics-pipeline:latest export REGIONus-west1 之后运行gcloud build的命令来构建镜像 gcloud dataflow flex-template build $TEMPLATE_PATH --image-gcr-path $TEMPLATE_IMAGE --sdk-language JAVA --flex-template-base-image gcr.io/dataflow-templates-base/java17-template-launcher-base:20230308_RC00 --jar target/telematics-pipeline-1.0-SNAPSHOT.jar --env FLEX_TEMPLATE_JAVA_MAIN_CLASScom.example.TelematicsBatch 最后就可以调用命令来提交任务执行了 gcloud dataflow flex-template run analytics-pipeline-date %Y%m%d-%H%M%S --template-file-gcs-location $TEMPLATE_PATH --region us-west1 --parameters ^~^bootstrapServerkafka-1:9094,kafka-2:9094~statisticsOutputTableyouprojectid:dataset.tablename~serviceAccountxxxprojectid.iam.gserviceaccount.com~regionus-west1~usePublicIpsfalse~runnerDataflowRunner~subnetworkXXXX~tempLocationgs://bucketname/temp/~startTime1693530000000~stopTime1697216400000~processStartTime1693530000000~processStopTime1697216400000 如果我们需要任务自动定期执行还可以在dataflow里面import一个Pipeline用之前指定的Template_path来导入。然后设置任务的定期周期和启动时间即可非常方便。
http://www.pierceye.com/news/7021/

相关文章:

  • 化肥厂的网站摸板网站制作案例市场
  • 湖南建设银行官网网站首页空间网站购买
  • 免费个人网站注册方法吉林建设工程信息网站
  • 广州设计网站公司网页设计与制作html
  • 免费贴图素材网站如何做adsense网站
  • 个人网站有什么外国广告做官网定制
  • 绍兴网站建设做网站wordpress 课程
  • 深圳网站建设公司大全网站链接怎么做二维码
  • 免费营销型网站单页网站怎么优化
  • wordpress建站seo好做吗网页制作大概需要多少钱
  • 域名除了做网站还能做什么化妆品网站建设推广方案
  • php高性能网站建设深圳网站建设 联雅
  • 我自己的网站jqueryui做的网站
  • 格兰仕网站开发方案泰安网络优化公司
  • wordpress企业网站模版阿里巴巴外贸平台中文
  • 网站开发用微网站开发微网站建设
  • 网站后台管理怎么做东莞广告公司电话
  • 哪里有做枪网站的网站首页广告图片伸缩代码又关闭
  • 企业宣传网站建设模板dw做网站步骤
  • 网站建设总结材料做网站的软件工程师
  • 网站图片上传却不显示不出来个人网页在线制作
  • 建立网站的步骤是什么营销型网站内容
  • 临沂大企业网站新媒体营销概念
  • 随州网站设计开发制作宁波网站制作优化服务公司
  • html5网站有点淘宝网站怎么做网站
  • wordpress 社区插件图片网站seo
  • 做o2o平台网站需要多少钱平和县建设局网站
  • weui做购物网站的案例重庆首页工程设计咨询有限责任公司
  • 个人网站模板源码福田做商城网站建设找哪家公司好
  • 校友会网站建设方案wordpress+andriod