当前位置: 首页 > news >正文

万网速成网站凡科网

万网速成网站,凡科网,东莞疾控中心最新通知,重庆大型网站建设重庆网站制作Flink SQL 语法篇#xff08;四#xff09;#xff1a;Group 聚合、Over 聚合 1.Group 聚合1.1 基础概念1.2 窗口聚合和 Group 聚合1.3 SQL 语义1.4 Group 聚合支持 Grouping sets、Rollup、Cube 2.Over 聚合2.1 时间区间聚合2.2 行数聚合 1.Group 聚合 1.1 基础概念 Grou… Flink SQL 语法篇四Group 聚合、Over 聚合 1.Group 聚合1.1 基础概念1.2 窗口聚合和 Group 聚合1.3 SQL 语义1.4 Group 聚合支持 Grouping sets、Rollup、Cube 2.Over 聚合2.1 时间区间聚合2.2 行数聚合 1.Group 聚合 1.1 基础概念 Group 聚合定义支持 Batch / Streaming 任务Flink 也支持 Group 聚合。Group 聚合和上面介绍到的窗口聚合的不同之处就在于 Group 聚合是按照数据的类别进行分组比如年龄、性别是横向的而窗口聚合是在时间粒度上对数据进行分组是纵向的。如下图所示就展示出了其区别。其中 按颜色分 key横向就是 Group 聚合按窗口划分纵向就是 窗口聚合。 1.2 窗口聚合和 Group 聚合 应用场景一般用于对数据进行分组然后后续使用聚合函数进行 count、sum 等聚合操作。 那么这时候小伙伴萌就会问到我其实可以把窗口聚合的写法也转换为 Group 聚合只需要把 Group 聚合的 Group By key 换成时间就行那这两个聚合的区别到底在哪 首先来举一个例子看看怎么将 窗口聚合 转换为 Group 聚合。假如一个窗口聚合是按照 1 1 1 分钟的粒度进行聚合如下 滚动窗口 SQL -- 数据源表 CREATE TABLE source_table (-- 维度数据dim STRING,-- 用户 iduser_id BIGINT,-- 用户price BIGINT,-- 事件时间戳row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),-- watermark 设置WATERMARK FOR row_time AS row_time - INTERVAL 5 SECOND ) WITH (connector datagen,rows-per-second 10,fields.dim.length 1,fields.user_id.min 1,fields.user_id.max 100000,fields.price.min 1,fields.price.max 100000 )-- 数据汇表 CREATE TABLE sink_table (dim STRING,pv BIGINT,sum_price BIGINT,max_price BIGINT,min_price BIGINT,uv BIGINT,window_start bigint ) WITH (connector print )-- 数据处理逻辑 insert into sink_table select dim,count(*) as pv,sum(price) as sum_price,max(price) as max_price,min(price) as min_price,-- 计算 uv 数count(distinct user_id) as uv,UNIX_TIMESTAMP(CAST(tumble_start(row_time, interval 1 minute) AS STRING)) * 1000 as window_start from source_table group bydim,-- 按照 Flink SQL tumble 窗口写法划分窗口tumble(row_time, interval 1 minute)转换为 Group 聚合 的写法如下 -- 数据源表 CREATE TABLE source_table (-- 维度数据dim STRING,-- 用户 iduser_id BIGINT,-- 用户price BIGINT,-- 事件时间戳row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),-- watermark 设置WATERMARK FOR row_time AS row_time - INTERVAL 5 SECOND ) WITH (connector datagen,rows-per-second 10,fields.dim.length 1,fields.user_id.min 1,fields.user_id.max 100000,fields.price.min 1,fields.price.max 100000 );-- 数据汇表 CREATE TABLE sink_table (dim STRING,pv BIGINT,sum_price BIGINT,max_price BIGINT,min_price BIGINT,uv BIGINT,window_start bigint ) WITH (connector print );-- 数据处理逻辑 insert into sink_table select dim,count(*) as pv,sum(price) as sum_price,max(price) as max_price,min(price) as min_price,-- 计算 uv 数count(distinct user_id) as uv,cast((UNIX_TIMESTAMP(CAST(row_time AS STRING))) / 60 as bigint) as window_start from source_table group bydim,-- 将秒级别时间戳 / 60 转化为 1mincast((UNIX_TIMESTAMP(CAST(row_time AS STRING))) / 60 as bigint)确实没错上面这个转换是一点问题都没有的。 但是窗口聚合和 Group by 聚合的差异在于 本质区别窗口聚合是具有时间语义的其本质是想实现窗口结束输出结果之后后续有迟到的数据也不会对原有的结果发生更改了即输出结果值是定值不考虑 allowLateness。而 Group by 聚合是没有时间语义的不管数据迟到多长时间只要数据来了就把上一次的输出的结果数据撤回然后把计算好的新的结果数据发出。运行层面窗口聚合是和 时间 绑定的窗口聚合其中窗口的计算结果触发都是由 时间Watermark推动的。Group by 聚合完全由 数据 推动触发计算新来一条数据去根据这条数据进行计算出结果发出由此可见两者的实现方式也大为不同。 1.3 SQL 语义 SQL 语义这里也拿离线和实时做对比Order 为 Kafkatarget_table 为 Kafka这个 SQL 生成的实时任务在执行时会生成三个算子。 数据源算子From Order数据源算子一直运行实时的从 Order Kafka 中一条一条的读取数据然后一条一条发送给下游的 Group 聚合算子向下游发送数据的 shuffle 策略是根据 group by 中的 key 进行发送相同的 key 发到同一个 SubTask并发 中。Group 聚合算子group by key sum / count / max / min接收到上游算子发的一条一条的数据去状态 state 中找这个 key 之前的 sum / count / max / min 结果。如果有结果 oldResult拿出来和当前的数据进行 sum / count / max / min 计算出这个 key 的新结果 newResult并将新结果 [key, newResult] 更新到 state 中在向下游发送新计算的结果之前先发一条撤回上次结果的消息 -[key, oldResult]然后再将新结果发往下游 [key, newResult]如果 state 中没有当前 key 的结果则直接使用当前这条数据计算 sum / max / min 结果 newResult并将新结果 [key, newResult] 更新到 state 中当前是第一次往下游发则不需要先发回撤消息直接发送 [key, newResult]。数据汇算子INSERT INTO target_table接收到上游发的一条一条的数据写入到 target_table Kafka 中这个实时任务也是 24 24 24 小时一直在运行的所有的算子在同一时刻都是处于 running 状态的。 1.4 Group 聚合支持 Grouping sets、Rollup、Cube Group 聚合也支持 Grouping sets、Rollup、Cube。举一个 Grouping sets 的案例 SELECT supplier_id, rating, product_id, COUNT(*) FROM (VALUES(supplier1, product1, 4),(supplier1, product2, 3),(supplier2, product3, 3),(supplier2, product4, 4)) AS Products(supplier_id, product_id, rating) GROUP BY GROUPING SET (( supplier_id, product_id, rating ),( supplier_id, product_id ),( supplier_id, rating ),( supplier_id ),( product_id, rating ),( product_id ),( rating ),( ) )2.Over 聚合 Over 聚合定义支持 Batch / Streaming可以理解为是一种特殊的滑动窗口聚合函数。 那这里我们拿 Over 聚合 与 窗口聚合 做一个对比其之间的最大不同之处在于 窗口聚合不在 group by 中的字段不能直接在 select 中拿到。Over 聚合能够保留原始字段。 注意其实在生产环境中Over 聚合的使用场景还是比较少的。在 Hive 中也有相同的聚合但是小伙伴萌可以想想你在离线数仓经常使用嘛 应用场景计算最近一段滑动窗口的聚合结果数据。实际案例查询每个产品最近一小时订单的金额总和。 SELECT order_id, order_time, amount,SUM(amount) OVER (PARTITION BY productORDER BY order_timeRANGE BETWEEN INTERVAL 1 HOUR PRECEDING AND CURRENT ROW) AS one_hour_prod_amount_sum FROM OrdersOver 聚合的语法总结如下 SELECTagg_func(agg_col) OVER ([PARTITION BY col1[, col2, ...]]ORDER BY time_colrange_definition),... FROM ...ORDER BY必须是时间戳列事件时间、处理时间。PARTITION BY标识了聚合窗口的聚合粒度如上述案例是按照 product 进行聚合。range_definition这个标识聚合窗口的聚合数据范围在 Flink 中有两种指定数据范围的方式。第一种为 按照行数聚合第二种为 按照时间区间聚合。如下案例所示。 2.1 时间区间聚合 按照时间区间聚合就是时间区间的一个滑动窗口比如下面案例 1 1 1 小时的区间最新输出的一条数据的 sum 聚合结果就是最近一小时数据的 amount 之和。 CREATE TABLE source_table (order_id BIGINT,product BIGINT,amount BIGINT,order_time as cast(CURRENT_TIMESTAMP as TIMESTAMP(3)),WATERMARK FOR order_time AS order_time - INTERVAL 0.001 SECOND ) WITH (connector datagen,rows-per-second 1,fields.order_id.min 1,fields.order_id.max 2,fields.amount.min 1,fields.amount.max 10,fields.product.min 1,fields.product.max 2 );CREATE TABLE sink_table (product BIGINT,order_time TIMESTAMP(3),amount BIGINT,one_hour_prod_amount_sum BIGINT ) WITH (connector print );INSERT INTO sink_table SELECT product, order_time, amount,SUM(amount) OVER (PARTITION BY productORDER BY order_time-- 标识统计范围是一个 product 的最近 1 小时的数据RANGE BETWEEN INTERVAL 1 HOUR PRECEDING AND CURRENT ROW) AS one_hour_prod_amount_sum FROM source_table2.2 行数聚合 按照行数聚合就是数据行数的一个滑动窗口比如下面案例最新输出的一条数据的 sum 聚合结果就是最近 5 5 5 行数据的 amount 之和。 CREATE TABLE source_table (order_id BIGINT,product BIGINT,amount BIGINT,order_time as cast(CURRENT_TIMESTAMP as TIMESTAMP(3)),WATERMARK FOR order_time AS order_time - INTERVAL 0.001 SECOND ) WITH (connector datagen,rows-per-second 1,fields.order_id.min 1,fields.order_id.max 2,fields.amount.min 1,fields.amount.max 2,fields.product.min 1,fields.product.max 2 );CREATE TABLE sink_table (product BIGINT,order_time TIMESTAMP(3),amount BIGINT,one_hour_prod_amount_sum BIGINT ) WITH (connector print );INSERT INTO sink_table SELECT product, order_time, amount,SUM(amount) OVER (PARTITION BY productORDER BY order_time-- 标识统计范围是一个 product 的最近 5 行数据ROWS BETWEEN 5 PRECEDING AND CURRENT ROW) AS one_hour_prod_amount_sum FROM source_table预跑结果如下 I[2, 2021-12-24T22:18:19.147, 1, 9] I[1, 2021-12-24T22:18:20.147, 2, 11] I[1, 2021-12-24T22:18:21.147, 2, 12] I[1, 2021-12-24T22:18:22.147, 2, 12] I[1, 2021-12-24T22:18:23.148, 2, 12] I[1, 2021-12-24T22:18:24.147, 1, 11] I[1, 2021-12-24T22:18:25.146, 1, 10] I[1, 2021-12-24T22:18:26.147, 1, 9] I[2, 2021-12-24T22:18:27.145, 2, 11] I[2, 2021-12-24T22:18:28.148, 1, 10] I[2, 2021-12-24T22:18:29.145, 2, 10]当然如果你在一个 SELECT 中有多个聚合窗口的聚合方式Flink SQL 支持了一种简化写法如下案例 SELECT order_id, order_time, amount,SUM(amount) OVER w AS sum_amount,AVG(amount) OVER w AS avg_amount FROM Orders -- 使用下面子句定义 Over Window WINDOW w AS (PARTITION BY productORDER BY order_timeRANGE BETWEEN INTERVAL 1 HOUR PRECEDING AND CURRENT ROW)
http://www.pierceye.com/news/134668/

相关文章:

  • 建网站需要多大的宽带wordpress 分享后可见
  • 自建营销型企业网站阿里网 网站备案流程
  • 与网站建设相关的论文题目wordpress图片上文字
  • 怎样搭建网站视频教程58企业网站如何做
  • 比较有名的网站建设公司wordpress 字数
  • 网站内容资源建设渭南市建设项目
  • 网站设置的参数wordpress弹窗登录注册
  • 网课系统软件网站建设费用网站做vr的收费
  • 海宁做网站的公司seo怎么学在哪里学
  • 佛山做网站多少钱服务器学生
  • 自己建网站卖东西怎么进入wordpress修改界面
  • 网站建设与制作报价wordpress菜单怎么设置目录册
  • 学生免费建设网站建设网站是否等于开展网络营销
  • 旅游网站结构图网站编程图
  • 达内网站开发培训价格安装百度到手机桌面
  • 网站服务器慢建站设计网站
  • wordpress 多站点 插件怎么做网站主页设计
  • 网站建设初稿wordpress删除自豪的
  • 某网站突然不能浏览了网站不备案能用吗
  • 厦门做个网站多少钱360建筑网官网下载
  • 镇江外贸网站建设电子工程王粟
  • 申请网站建设经费wordpress做商城网站
  • google下载app西安分类信息seo公司
  • 淘宝是什么语言做的网站手机网站开发+手机模拟器
  • 视频网站开发框架小说类网站功能建设
  • 网站规划与设计案例网站建设方案设计
  • 漯河网站建设费用成都网站建设重庆最加科技
  • 莱芜 网站wordpress 关闭警告
  • 深圳做棋牌网站建设哪家技术好建设一个网站的规划
  • 网站开发流程比较合理网站已经申请了域名 接下来怎么