做桂林网站的图片大全,网站运营与公司,阿里云已备案域名购买,网站开发网站建设制作费用用户域登录各窗口汇总表
主要任务#xff1a;从kafka页面日志主题读取数据#xff0c;统计 七日回流用户#xff1a;之前活跃的用户#xff0c;有一段时间不活跃了#xff0c;之后又开始活跃#xff0c;称为回流用户当日独立用户数#xff1a;同一个用户当天重复登录从kafka页面日志主题读取数据统计 七日回流用户之前活跃的用户有一段时间不活跃了之后又开始活跃称为回流用户当日独立用户数同一个用户当天重复登录只算作一个独立用户。
思路分析
读取kafka页面主题数据转换数据结构String - JSONObject过滤数据uid不为null 登录的两种情况 用户打开应用后自动登录用户打印应用后没有登录浏览后跳转到登录页面 过滤条件 uid不为null且last_page_id is nulllast_page_id login 设置水位线按照uid分组统计回流用户数和独立用户数开窗聚合写入doris
具体实现
设置端口、并行度、消费者组、kafka主题读取dwd页面主题数据 - stream.print()对数据进行清洗过滤uid不为空 stream.flatMap()使用flatMap过滤new FlatMapFunction(){}在该方法内部转换为JSONObject, 并且获取uid和lastPageId, try-catch这段代码判断是否满足思路分析中的条件如果中途发生异常直接catch后打印到控制台清理掉即可。 先注册水位线 jsonObjStream.assignTimestampAndWatermarknew SerializableTimestampAssigner, 提取数据中的ts 按照uid分组 stream.keyby()按照uid进行分组 判断独立用户和回流用户 创建UserLoginBean, 使用状态保存用户的登录信息在open方法中getRuntimeContext().getState(new ValueStateDescriptor(last_login_dt,String.class))创建状态记录用户上一次的登录时间在processElement方法中比较当前登录的日期和状态存储的日期 如果lastLoginDtnull是新用户如果不为空判断上次登录时间和当前时间的差值是否大于7天如果大于7天说明是回流用户。如果小于7天还需要判断上次登录时间是否是今天如果不是今天则说明该用户本次是独立用户。 开窗聚合 使用滚动窗口开窗聚合在reduce算子中写聚合逻辑在process算子中获取窗口信息 写入doris 创建doris sink写出到doris
核心代码
public static void main(String[] args) {new DwsUserUserLoginWindow().start(10024,4,dws_user_user_login_window, Constant.TOPIC_DWD_TRAFFIC_PAGE);}Overridepublic void handle(StreamExecutionEnvironment env, DataStreamSourceString stream) {//1.读取dwd页面数据//stream.print();//2. 对数据进行清洗过滤SingleOutputStreamOperatorJSONObject jsonObjStream etl(stream);//3. 注册水位线SingleOutputStreamOperatorJSONObject withWatermarkStream addWatermark(jsonObjStream);//4. 按照uid分组KeyedStreamJSONObject, String keyedStream getKeyedStream(withWatermarkStream);//5. 判断独立用户和回流用户SingleOutputStreamOperatorUserLoginBean processedStream getUserLoginBeanStream(keyedStream);//processedStream.print();//开窗聚合SingleOutputStreamOperatorUserLoginBean reducedStream getReducedStream(processedStream);//reducedStream.print();//写入DorisreducedStream.map(new DorisMapFunction()).sinkTo(FlinkSinkUtil.getDorisSink(Constant.DWS_USER_USER_LOGIN_WINDOW));}[gitee仓库地址(https://gitee.com/langpaian/gmall2023-realtime)