重庆市网站编辑,如何查询一个网站的注册信息,网站关键词google优化怎么做,服务器有了怎么做网站一、需求描述
每隔30min 统计最近 1hour的热门商品 top3, 并把统计的结果写入到mysql中。
二、需求分析
1.统计每个商品的点击量, 开窗2.分组窗口分组3.over窗口
三、需求实现
3.1、创建数据源示例
input/UserBehavior.csv
543462,1715,1464116,pv,1511658000
662867,22…一、需求描述
每隔30min 统计最近 1hour的热门商品 top3, 并把统计的结果写入到mysql中。
二、需求分析
1.统计每个商品的点击量, 开窗2.分组窗口分组3.over窗口
三、需求实现
3.1、创建数据源示例
input/UserBehavior.csv
543462,1715,1464116,pv,1511658000
662867,2244074,1575622,pv,1511658000
561558,3611281,965809,pv,1511658000
894923,3076029,1879194,pv,1511658000
834377,4541270,3738615,pv,1511658000
315321,942195,4339722,pv,1511658000
625915,1162383,570735,pv,1511658000
578814,176722,982926,pv,1511658000
873335,1256540,1451783,pv,1511658000
429984,4625350,2355072,pv,1511658000
866796,534083,4203730,pv,1511658000
937166,321683,2355072,pv,1511658000
156905,2901727,3001296,pv,1511658000
758810,5109495,1575622,pv,1511658000
107304,111477,4173315,pv,1511658000
452437,3255022,5099474,pv,1511658000
813974,1332724,2520771,buy,1511658000
524395,3887779,2366905,pv,15116580003.2、创建目标表
CREATE DATABASE flink_sql; //创建flink_sql库
USE flink_sql;
DROP TABLE IF EXISTS hot_item;
CREATE TABLE hot_item (w_end timestamp NOT NULL,item_id bigint(20) NOT NULL,item_count bigint(20) NOT NULL,rk bigint(20) NOT NULL,PRIMARY KEY (w_end,rk)
) ENGINEInnoDB DEFAULT CHARSETutf8;3.3、导入JDBC Connector依赖
!-- 导入JDBC Connector依赖 --dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-jdbc_${scala.binary.version}/artifactIdversion${flink.version}/version/dependency3.4、代码实现
package com.atguigu.flink.java.chapter_12;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;/*** Author lizhenchaoatguigu.cn* Date 2021/1/31 9:11*/
public class Flink01_HotItem_TopN {public static void main(String[] args) {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);StreamTableEnvironment tenv StreamTableEnvironment.create(env);// 使用sql从文件读取数据tenv.executeSql(create table user_behavior( user_id bigint, item_id bigint, category_id int, behavior string, ts bigint, event_time as to_timestamp(from_unixtime(ts, yyyy-MM-dd HH:mm:ss)), watermark for event_time as event_time - interval 5 second )with( connectorfilesystem, pathinput/UserBehavior.csv, formatcsv));// 每隔 10m 统计一次最近 1h 的热门商品 top// 1. 计算每每个窗口内每个商品的点击量Table t1 tenv.sqlQuery(select item_id, hop_end(event_time, interval 10 minute, interval 1 hour) w_end, count(*) item_count from user_behavior where behaviorpv group by hop(event_time, interval 10 minute, interval 1 hour), item_id);tenv.createTemporaryView(t1, t1);// 2. 按照窗口开窗, 对商品点击量进行排名Table t2 tenv.sqlQuery(select *, row_number() over(partition by w_end order by item_count desc) rk from t1);tenv.createTemporaryView(t2, t2);// 3. 取 top3Table t3 tenv.sqlQuery(select item_id, w_end, item_count, rk from t2 where rk3);// 4. 数据写入到mysql// 4.1 创建输出表tenv.executeSql(create table hot_item( item_id bigint, w_end timestamp(3), item_count bigint, rk bigint, PRIMARY KEY (w_end, rk) NOT ENFORCED) with( connector jdbc, url jdbc:mysql://hadoop162:3306/flink_sql?useSSLfalse, table-name hot_item, username root, password aaaaaa ));// 4.2 写入到输出表t3.executeInsert(hot_item);}
}四、总结
Flink 使用 OVER 窗口条件和过滤条件相结合以进行 Top-N 查询。利用 OVER 窗口的 PARTITION BY 子句的功能Flink 还支持逐组 Top-N 。 例如每个类别中实时销量最高的前五种产品。批处理表和流处理表都支持基于SQL的 Top-N 查询。 流处理模式需注意: TopN 查询的结果会带有更新。 Flink SQL 会根据排序键对输入的流进行排序若 top N 的记录发生了变化变化的部分会以撤销、更新记录的形式发送到下游。 推荐使用一个支持更新的存储作为 Top-N 查询的 sink 。另外若 top N 记录需要存储到外部存储则结果表需要拥有与 Top-N 查询相同的唯一键。