当前位置: 首页 > news >正文

聚震网站开发唐河县住房和城乡建设局网站

聚震网站开发,唐河县住房和城乡建设局网站,ai室内设计生成软件,关于网站备案前置审批的相关说明 吉林【背景】 flink有几种聚合#xff0c;使用上是有一些不同#xff0c;需要加以区分#xff1a; 分组聚合#xff1a;group agg over聚合#xff1a;over agg 窗口聚合#xff1a;window agg 省流版#xff1a; 触发计算时机 结果流类型 状态大小 分组聚合group ag…【背景】 flink有几种聚合使用上是有一些不同需要加以区分 分组聚合group agg over聚合over agg 窗口聚合window agg 省流版 触发计算时机 结果流类型 状态大小 分组聚合group agg 每当有新行就输出更新的结果 update流 保持中间结果所以状态可能无限膨胀 over agg 每当有新行就输出更新的结果,类似一个滑动窗口 append流 保持中间结果所以状态可能无限膨胀 window agg 窗口结束产生一个总的聚合结果 append流 不生成中间结果自动清除状态 下面是详细对比和具体的例子主要讨论的是流处理下的情况。 over聚合over agg OVER 聚合通过排序后的范围数据为每行输入计算出聚合值。和 GROUP BY 聚合不同 OVER 聚合不会把结果通过分组减少到一行它会为每行输入增加一个聚合值结果是一个append流。  OVER 窗口的语法。 SELECTagg_func(agg_col) OVER ([PARTITION BY col1[, col2, ...]]ORDER BY time_colrange_definition),... FROM ... over聚合很少用到所以本地自己做了一个测试 测试sql如下 create table test_window_tab(region String,qa_id String,count_qa_id Bigint) COMMENT with(properties.bootstrap.servers ,json.fail-on-missing-field false,connector kafka,format json,topic test_window_tab);create table dwm_qa_score(,qa_id String   ,agent_id String,region String,saas_id String,version_timestamp bigint, ts as to_timestamp(from_unixtime(version_timestamp, yyyy-MM-dd HH:mm:ss)),event_time TIMESTAMP(3) METADATA FROM timestamp VIRTUAL,WATERMARK FOR ts AS ts - INTERVAL 10 SECOND) COMMENT with(properties.bootstrap.servers ,json.fail-on-missing-field false,connector kafka,format json,scan.startup.mode earliest-offset,topic dwm_qa_score);insert into test_window_tab(region,qa_id,count_qa_id)select region,qa_id,count(1)  over w as count_qa_idfrom dwm_qa_scorewindow w as(partition by region,qa_idorder by tsrows between 2 preceding and current row) dwm_qa_score这个topic现有数据 { qa_id: 123, agent_id: 497235295815123, region: TH, version_timestamp: 1709807228 } { qa_id: 123, agent_id: 497235295815123, region: TH, version_timestamp: 1709807228 } { qa_id: 123, agent_id: 497235295815123, region: TH, version_timestamp: 1709807228 } { qa_id: 123, agent_id: 497235295815123, region: TH, version_timestamp: 1709807228 } { qa_id: 123, agent_id: 497235295815123, region: TH, version_timestamp: 1709807228 } { qa_id: 1234, agent_id: 497235295815123, region: TH, version_timestamp: 1709807228 } 当读数据选择了offsetealiest-offset则运行程序会得到结果如下 {region:TH,qa_id:123,count_qa_id:1} {region:TH,qa_id:123,count_qa_id:2} {region:TH,qa_id:123,count_qa_id:3} {region:TH,qa_id:123,count_qa_id:3} {region:TH,qa_id:123,count_qa_id:3} {region:TH,qa_id:1234,count_qa_id:1} 这里注意 对每条数据都会返回一个聚合值由于我们是“rows between 2 preceding and current row“所以count_qa_id最多是3 如果此时往dwm_qa_score这个topic插入新数据 { qa_id: 1234, agent_id: 497235295815123, region: TH } 或者 { qa_id: 1234, agent_id: 497235295815123, region: TH,version_timestamp: null } 或者 { qa_id: 1234, agent_id: 497235295815123, region: TH,version_timestamp: 0 } 会发现flink作业中输出的record多了一条 但是在目标kafkatest_window_tab中没有新增结果 原因是我们插入的新数据中没有version_timestamp这一列为空或为0 如果往dwm_qa_score这个topic插入新数据 { qa_id: 1234, region: TH, version_timestamp: 1710145110 } 则可以看到对应目标kafkatest_window_tab中会新增结果数据 {region:TH,qa_id:1234,count_qa_id:2} 如果等一分钟后再次往dwm_qa_score这个topic插入新数据 { qa_id: 1234, region: TH, version_timestamp: 1710145110 } 则在目标kafkatest_window_tab中没有新增结果原因应该是数据过期被丢弃了watermark) 你可以在一个 SELECT 子句中定义多个 OVER 窗口聚合。然而对于流式查询由于目前的限制所有聚合的 OVER 窗口必须是相同的。 ORDER BY OVER 窗口需要数据是有序的。因为表没有固定的排序所以 ORDER BY 子句是强制的。对于流式查询Flink 目前只支持 OVER 窗口定义在升序asc的 时间属性 上。其他的排序不支持。 PARTITION BY OVER 窗口可以定义在一个分区表上。PARTITION BY 子句代表着每行数据只在其所属的数据分区进行聚合。 范围RANGE定义 范围RANGE定义指定了聚合中包含了多少行数据。范围通过 BETWEEN 子句定义上下边界其内的所有行都会聚合。Flink 只支持 CURRENT ROW 作为上边界。 有两种方法可以定义范围ROWS 间隔 和 RANGE 间隔 RANGE 间隔 RANGE 间隔是定义在排序列值上的在 Flink 里排序列总是一个时间属性。下面的 RANG 间隔定义了聚合会在比当前行的时间属性小 30 分钟的所有行上进行。 RANGE BETWEEN INTERVAL 30 MINUTE PRECEDING AND CURRENT ROWROW 间隔 ROWS 间隔基于计数。它定义了聚合操作包含的精确行数。下面的 ROWS 间隔定义了当前行  之前的 10 行也就是11行都会被聚合。 ROWS BETWEEN 10 PRECEDING AND CURRENT ROW常见错误 OVER windows ordering in stream mode must be defined on a time attribute.  这个报错是建表的时候需要指定时间语义的字段WATERMARK 是必须的而且WATERMARK所用字段必须是order by的时间字段例如下面用的是 order by load_date那么WATERMARK就要用load_date生成即WATERMARK FOR load_date AS load_date - INTERVAL 1 MINUTE object SqlOverRows02 {def main(args: Array[String]): Unit  {val settings  EnvironmentSettings.newInstance().inStreamingMode().build()val tEnv  TableEnvironment.create(settings)    tEnv.executeSql(|create table projects(|id int,|name string,|score double,|load_date timestamp(3),|WATERMARK FOR load_date AS load_date - INTERVAL 1 MINUTE|)with(|connector  kafka,|topic  test-topic,|properties.bootstrap.servers  server120:9092,|properties.group.id  testGroup,|scan.startup.mode  latest-offset,|format  csv|)|.stripMargin)tEnv.executeSql(|select| name,| max(score)|   over(partition by name|     order by load_date|     RANGE BETWEEN INTERVAL 10 SECOND PRECEDING AND CURRENT ROW )max_score,| min(score)|   over(partition by name|     order by load_date|     RANGE BETWEEN INTERVAL 10 SECOND PRECEDING AND CURRENT ROW )min_score,| current_time| from| projects|.stripMargin).print()} }分组聚合group agg Apache Flink 支持标准的 GROUP BY 子句来聚合数据。 SELECT COUNT(*) FROM Orders GROUP BY order_id 特点 1、聚合函数把多行输入数据计算为一行结果。例如有一些聚合函数可以计算一组行的 “COUNT”、“SUM”、“AVG”、“MAX”和 “MIN”。 2、对于流式查询重要的是要理解 Flink 运行的是连续查询永远不会终止会根据其输入表的更新来更新其结果表。对于上述查询每当有新行插入 Orders 表时Flink 都会实时计算并输出更新后的结果。  3、对于流式查询用于计算查询结果的状态可能无限膨胀。状态的大小取决于分组的数量以及聚合函数的数量和类型。例如MIN/MAX 的状态是重量级的COUNT 是轻量级的因为COUNT只需要保存计数值。 因此可以设置table-exec-state-ttl但是可能会影响查询结果的正确性因为状态超时会被丢弃。 注意 Flink 对于分组聚合提供了一系列性能优化的方法。更多参见性能优化包括MiniBatch 聚合、Local-Global 聚合、拆分 distinct 聚合、在 distinct 聚合上使用 FILTER 修饰符 、MiniBatch Regular Joins 窗口聚合window agg 窗口聚合是通过 GROUP BY 子句定义的其特征是包含 窗口表值函数 产生的 “window_start” 和 “window_end” 列必须包含否则就变成分组聚合等了。和普通的 GROUP BY 子句一样窗口聚合对于每个组会计算出一行数据。 SELECT ... FROM windowed_table -- relation applied windowing TVF GROUP BY window_start, window_end, ... 窗口聚合不产生中间结果只在窗口结束产生一个总的聚合结果另外窗口聚合会清除不需要的中间状态(watermark超过窗口endallowlateness,就会销毁窗口。 具体例子: SELECT window_start, window_end, SUM(price) AS total_price FROM TABLE(     TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL 10 MINUTES)) GROUP BY window_start, window_end; ------------------------------------------------- |     window_start |       window_end | total_price | ------------------------------------------------- | 2020-04-15 08:00 | 2020-04-15 08:10 | 11.00 | | 2020-04-15 08:10 | 2020-04-15 08:20 | 10.00 | -------------------------------------------------
http://www.pierceye.com/news/282449/

相关文章:

  • 通信管理局网站备案cms网站建设的实训总结
  • 西安知名网站建设公司百度网页版微信
  • 单纯python能完成网站开发吗门户网站衰落的原因
  • 唐山微网站建设价格宁波外贸网站推广优化
  • 如何能把网站做的更大赤峰网站建设赤峰
  • 织梦大气绿色大气农业能源化工机械产品企业网站源码模版网站设计是用ps做图吗
  • 长沙建设网站公司浙江网站建设上市公司
  • 成都艾邦视觉专业网站建设公司有内涵大气的公司名字
  • 制作学校网站编程基础知识大全
  • 建设银行网站买手机阿里云已备案域名购买
  • 12个优秀的平面设计素材网站wordpress 标题 拼音
  • 瑶海区网站建设公司上海app开发定制公司
  • 北海建设厅网站局域网的电脑怎么做网站服务器
  • 莱芜网站建设价格域名注册成功后怎么使用网站
  • 衡阳县建设局网站wordpress 图片缓存
  • 浙江门户网站建设公司新闻稿发布
  • 温州网站建设排名wordpress 汉化失败
  • 做数据可视化的网站推广类软文案例
  • 外包做网站的要求怎么写做网站 360
  • 温州网站建设价格技术微信公众号免费开通
  • 做网站推广销售怎么样辽宁省网站备案系统
  • html公司网站模板源码企业信息填报系统
  • 有口碑的赣州网站建设微信开放社区
  • 外贸网站做SEO电脑浏览器打不开网页是什么原因
  • 做网站需要下载啥google建站推广
  • 沈阳哪里有教做网站的会做网站怎么赚钱
  • iis如何做同时运行两个网站80端口做汽车网站费用
  • 网站规划与设计一千字网红营销模式
  • 西安 域名空间网站制作淘宝客网站主题下载
  • 网页制作与网站建设pdf网站开发前端和后端工作