当前位置: 首页 > news >正文

哪里能找到网站网页版梦幻西游五色石攻略

哪里能找到网站,网页版梦幻西游五色石攻略,免费模板网站都有什么,wordpress分享到微信在数据库中的静态表上做 OLAP 分析时#xff0c;两表 join 是非常常见的操作。同理#xff0c;在流式处理作业中#xff0c;有时也需要在两条流上做 join 以获得更丰富的信息。Flink DataStream API 为用户提供了3个算子来实现双流 join#xff0c;分别是#xff1a; join…在数据库中的静态表上做 OLAP 分析时两表 join 是非常常见的操作。同理在流式处理作业中有时也需要在两条流上做 join 以获得更丰富的信息。Flink DataStream API 为用户提供了3个算子来实现双流 join分别是 join()coGroup()intervalJoin()本文举例说明它们的使用方法顺便聊聊比较特殊的 interval join 的原理。 准备数据 从 Kafka 分别接入点击流和订单流并转化为 POJO。 DataStreamString clickSourceStream env.addSource(new FlinkKafkaConsumer011(ods_analytics_access_log,new SimpleStringSchema(),kafkaProps).setStartFromLatest()); DataStreamString orderSourceStream env.addSource(new FlinkKafkaConsumer011(ods_ms_order_done,new SimpleStringSchema(),kafkaProps).setStartFromLatest());DataStreamAnalyticsAccessLogRecord clickRecordStream clickSourceStream.map(message - JSON.parseObject(message, AnalyticsAccessLogRecord.class)); DataStreamOrderDoneLogRecord orderRecordStream orderSourceStream.map(message - JSON.parseObject(message, OrderDoneLogRecord.class)); join() join() 算子提供的语义为Window join即按照指定字段和滚动/滑动/会话窗口进行 inner join支持处理时间和事件时间两种时间特征。以下示例以10秒滚动窗口将两个流通过商品 ID 关联取得订单流中的售价相关字段。 clickRecordStream.join(orderRecordStream).where(record - record.getMerchandiseId()).equalTo(record - record.getMerchandiseId()).window(TumblingProcessingTimeWindows.of(Time.seconds(10))).apply(new JoinFunctionAnalyticsAccessLogRecord, OrderDoneLogRecord, String() {Overridepublic String join(AnalyticsAccessLogRecord accessRecord, OrderDoneLogRecord orderRecord) throws Exception {return StringUtils.join(Arrays.asList(accessRecord.getMerchandiseId(),orderRecord.getPrice(),orderRecord.getCouponMoney(),orderRecord.getRebateAmount()), \t);}}).print().setParallelism(1); 简单易用。 coGroup() 只有 inner join 肯定还不够如何实现 left/right outer join 呢答案就是利用 coGroup() 算子。它的调用方式类似于 join() 算子也需要开窗但是 CoGroupFunction 比 JoinFunction 更加灵活可以按照用户指定的逻辑匹配左流和/或右流的数据并输出。 以下的例子就实现了点击流 left join 订单流的功能是很朴素的 nested loop join 思想二重循环。 clickRecordStream.coGroup(orderRecordStream).where(record - record.getMerchandiseId()).equalTo(record - record.getMerchandiseId()).window(TumblingProcessingTimeWindows.of(Time.seconds(10))).apply(new CoGroupFunctionAnalyticsAccessLogRecord, OrderDoneLogRecord, Tuple2String, Long() {Overridepublic void coGroup(IterableAnalyticsAccessLogRecord accessRecords, IterableOrderDoneLogRecord orderRecords, CollectorTuple2String, Long collector) throws Exception {for (AnalyticsAccessLogRecord accessRecord : accessRecords) {boolean isMatched false;for (OrderDoneLogRecord orderRecord : orderRecords) {// 右流中有对应的记录collector.collect(new Tuple2(accessRecord.getMerchandiseName(), orderRecord.getPrice()));isMatched true;}if (!isMatched) {// 右流中没有对应的记录collector.collect(new Tuple2(accessRecord.getMerchandiseName(), null));}}}}).print().setParallelism(1); intervalJoin() join() 和 coGroup() 都是基于窗口做关联的。但是在某些情况下两条流的数据步调未必一致。例如订单流的数据有可能在点击流的购买动作发生之后很久才被写入如果用窗口来圈定很容易 join 不上。所以 Flink 又提供了Interval join的语义按照指定字段以及右流相对左流偏移的时间区间进行关联即 right.timestamp ∈ [left.timestamp lowerBound; left.timestamp upperBound]interval join 也是 inner join虽然不需要开窗但是需要用户指定偏移区间的上下界并且只支持事件时间。 示例代码如下。注意在运行之前需要分别在两个流上应用 assignTimestampsAndWatermarks() 方法获取事件时间戳和水印。 clickRecordStream.keyBy(record - record.getMerchandiseId()).intervalJoin(orderRecordStream.keyBy(record - record.getMerchandiseId())).between(Time.seconds(-30), Time.seconds(30)).process(new ProcessJoinFunctionAnalyticsAccessLogRecord, OrderDoneLogRecord, String() {Overridepublic void processElement(AnalyticsAccessLogRecord accessRecord, OrderDoneLogRecord orderRecord, Context context, CollectorString collector) throws Exception {collector.collect(StringUtils.join(Arrays.asList(accessRecord.getMerchandiseId(),orderRecord.getPrice(),orderRecord.getCouponMoney(),orderRecord.getRebateAmount()), \t));}}).print().setParallelism(1); 由上可见interval join 与 window join 不同是两个 KeyedStream 之上的操作并且需要调用 between() 方法指定偏移区间的上下界。如果想令上下界是开区间可以调用 upperBoundExclusive()/lowerBoundExclusive() 方法。 interval join 的实现原理 以下是 KeyedStream.process(ProcessJoinFunction) 方法调用的重载方法的逻辑。 public OUT SingleOutputStreamOperatorOUT process(ProcessJoinFunctionIN1, IN2, OUT processJoinFunction,TypeInformationOUT outputType) {Preconditions.checkNotNull(processJoinFunction);Preconditions.checkNotNull(outputType);final ProcessJoinFunctionIN1, IN2, OUT cleanedUdf left.getExecutionEnvironment().clean(processJoinFunction);final IntervalJoinOperatorKEY, IN1, IN2, OUT operator new IntervalJoinOperator(lowerBound,upperBound,lowerBoundInclusive,upperBoundInclusive,left.getType().createSerializer(left.getExecutionConfig()),right.getType().createSerializer(right.getExecutionConfig()),cleanedUdf);return left.connect(right).keyBy(keySelector1, keySelector2).transform(Interval Join, outputType, operator); } 可见是先对两条流执行 connect() 和 keyBy() 操作然后利用 IntervalJoinOperator 算子进行转换。在 IntervalJoinOperator 中会利用两个 MapState 分别缓存左流和右流的数据。 private transient MapStateLong, ListBufferEntryT1 leftBuffer; private transient MapStateLong, ListBufferEntryT2 rightBuffer;Override public void initializeState(StateInitializationContext context) throws Exception {super.initializeState(context);this.leftBuffer context.getKeyedStateStore().getMapState(new MapStateDescriptor(LEFT_BUFFER,LongSerializer.INSTANCE,new ListSerializer(new BufferEntrySerializer(leftTypeSerializer))));this.rightBuffer context.getKeyedStateStore().getMapState(new MapStateDescriptor(RIGHT_BUFFER,LongSerializer.INSTANCE,new ListSerializer(new BufferEntrySerializer(rightTypeSerializer)))); } 其中 Long 表示事件时间戳List 表示该时刻到来的数据记录。当左流和右流有数据到达时会分别调用 processElement1() 和 processElement2() 方法它们都调用了 processElement() 方法代码如下。 Override public void processElement1(StreamRecordT1 record) throws Exception {processElement(record, leftBuffer, rightBuffer, lowerBound, upperBound, true); }Override public void processElement2(StreamRecordT2 record) throws Exception {processElement(record, rightBuffer, leftBuffer, -upperBound, -lowerBound, false); }SuppressWarnings(unchecked) private THIS, OTHER void processElement(final StreamRecordTHIS record,final MapStateLong, ListIntervalJoinOperator.BufferEntryTHIS ourBuffer,final MapStateLong, ListIntervalJoinOperator.BufferEntryOTHER otherBuffer,final long relativeLowerBound,final long relativeUpperBound,final boolean isLeft) throws Exception {final THIS ourValue record.getValue();final long ourTimestamp record.getTimestamp();if (ourTimestamp Long.MIN_VALUE) {throw new FlinkException(Long.MIN_VALUE timestamp: Elements used in interval stream joins need to have timestamps meaningful timestamps.);}if (isLate(ourTimestamp)) {return;}addToBuffer(ourBuffer, ourValue, ourTimestamp);for (Map.EntryLong, ListBufferEntryOTHER bucket: otherBuffer.entries()) {final long timestamp bucket.getKey();if (timestamp ourTimestamp relativeLowerBound ||timestamp ourTimestamp relativeUpperBound) {continue;}for (BufferEntryOTHER entry: bucket.getValue()) {if (isLeft) {collect((T1) ourValue, (T2) entry.element, ourTimestamp, timestamp);} else {collect((T1) entry.element, (T2) ourValue, timestamp, ourTimestamp);}}}long cleanupTime (relativeUpperBound 0L) ? ourTimestamp relativeUpperBound : ourTimestamp;if (isLeft) {internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_LEFT, cleanupTime);} else {internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_RIGHT, cleanupTime);} } 这段代码的思路是 取得当前流 StreamRecord 的时间戳调用 isLate() 方法判断它是否是迟到数据即时间戳小于当前水印值如是则丢弃。调用 addToBuffer() 方法将时间戳和数据一起插入当前流对应的 MapState。遍历另外一个流的 MapState如果数据满足前述的时间区间条件则调用 collect() 方法将该条数据投递给用户定义的 ProcessJoinFunction 进行处理。collect() 方法的代码如下注意结果对应的时间戳是左右流时间戳里较大的那个。private void collect(T1 left, T2 right, long leftTimestamp, long rightTimestamp) throws Exception {final long resultTimestamp Math.max(leftTimestamp, rightTimestamp);collector.setAbsoluteTimestamp(resultTimestamp);context.updateTimestamps(leftTimestamp, rightTimestamp, resultTimestamp);userFunction.processElement(left, right, context, collector); } 调用 TimerService.registerEventTimeTimer() 注册时间戳为 timestamp relativeUpperBound 的定时器该定时器负责在水印超过区间的上界时执行状态的清理逻辑防止数据堆积。注意左右流的定时器所属的 namespace 是不同的具体逻辑则位于 onEventTime() 方法中。Override public void onEventTime(InternalTimerK, String timer) throws Exception {long timerTimestamp timer.getTimestamp();String namespace timer.getNamespace();logger.trace(onEventTime {}, timerTimestamp);switch (namespace) {case CLEANUP_NAMESPACE_LEFT: {long timestamp (upperBound 0L) ? timerTimestamp : timerTimestamp - upperBound;logger.trace(Removing from left buffer {}, timestamp);leftBuffer.remove(timestamp);break;}case CLEANUP_NAMESPACE_RIGHT: {long timestamp (lowerBound 0L) ? timerTimestamp lowerBound : timerTimestamp;logger.trace(Removing from right buffer {}, timestamp);rightBuffer.remove(timestamp);break;}default:throw new RuntimeException(Invalid namespace namespace);} } 原文链接 本文为阿里云原创内容未经允许不得转载。
http://www.pierceye.com/news/733069/

相关文章:

  • 网站内容管理校园网站建设报价
  • 哪个网站系统做的好网站设计原则的第三要素
  • 老区建设网站亚马逊市场营销案例分析
  • 固原建设厅官方网站智慧树网站的章节题做不了
  • 网站建设人才logo设计在线
  • 在网站上做广告教育网站平面设计
  • 中文html网站模板下载做健康类网站怎么备案
  • 何为响应式网站建设公司网站的步骤
  • 网站有哪些分类网游开发公司
  • 织梦网站做瀑布流方便建设网站平台的建议
  • 网站建设实验报告阿里云搭建个人博客wordpress
  • 深圳市福田建设局网站文创产品设计就业前景
  • 龙岗建设网站制作做网站的目的是什么
  • 网站公司做的比较好网站建设业务饱和了吗
  • 做网站做电脑版还是手机版好可以访问国外网站的dns
  • 网站制作素材图片英文站 wordpress seo优化
  • 现在ui做的比较好的网站徐州经济技术开发区
  • 网站设计公司网帐号售卖网站建设
  • 信阳建设网站哪家好wordpress 文章评价插件
  • 网校网站模板东莞网站关键字
  • 做游戏的php网站2019做seo网站
  • 做外贸那个网站好免费asp主机网站
  • 网站设计服务要不要交文化事业建设费在线解压zip网站
  • 沈阳关键词自然排名西安百度seo排名软件
  • 徐州网站建设市场分析手工灯笼简单又好看
  • 网站开发学什么语音提供设计的的网站
  • 微站和网站数据同步icp备案查询
  • 诸城网站制作wordpress圆角插件汉化
  • 杨家平网站建设小程序开发网站
  • 校园文化建设图片网站浅析图书馆门户网站建设