网站上社保做增员怎么做,沙井网站优化,wordpress汽配网站,网站建设有哪些工作需求分析
关键词 统计关键词出现的频率
IK分词
进行分词需要引入IK分词器#xff0c;使用它时需要引入相关的依赖。它能够将搜索的关键字按照日常的使用习惯进行拆分。比如将苹果iphone 手机#xff0c;拆分为苹果#xff0c;iphone, 手机。
dependencygrou…需求分析
关键词 统计关键词出现的频率
IK分词
进行分词需要引入IK分词器使用它时需要引入相关的依赖。它能够将搜索的关键字按照日常的使用习惯进行拆分。比如将苹果iphone 手机拆分为苹果iphone, 手机。
dependencygroupIdorg.apache.doris/groupIdartifactIdflink-doris-connector-1.17/artifactId
/dependencydependencygroupIdcom.janeluo/groupIdartifactIdikanalyzer/artifactId
/dependency测试代码如下
public class IkUtil {public static void main(String[] args) throws IOException {String s Apple 苹果15 5G手机;StringReader stringReader new StringReader(s);IKSegmenter ikSegmenter new IKSegmenter(stringReader, true);//第二个参数表示是否再对拆分后的单词再进行拆分true时表示不在继续拆分Lexeme next ikSegmenter.next();while (next! null) {System.out.println(next.getLexemeText());next ikSegmenter.next();}}
}整体流程
创建自定义分词工具类IKUtilIK是一个分词工具依赖创建自定义函数类注册函数消费kafka DWD页面主题数据并设置水位线从主流中过滤搜索行为 page[‘item’] is not nullitem_type : “keyword”last_page_id: “search” 使用分词函数对keyword进行拆分对keyword进行分组开窗聚合写出到doris 创建doris sinkflink需要打开检查点才能将数据写出到doris 具体实现
import com.atguigu.gmall.realtime.common.base.BaseSQLApp;
import com.atguigu.gmall.realtime.common.constant.Constant;
import com.atguigu.gmall.realtime.common.util.SQLUtil;
import com.atguigu.gmall.realtime.dws.function.KwSplit;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;/*** title:** Author 浪拍岸* Create 28/12/2023 上午11:06* Version 1.0*/
public class DwsTrafficSourceKeywordPageViewWindow extends BaseSQLApp {public static void main(String[] args) {new DwsTrafficSourceKeywordPageViewWindow().start(10021,4,dws_traffic_source_keyword_page_view_window);}Overridepublic void handle(StreamExecutionEnvironment env, TableEnvironment tableEnv, String groupId) {//1. 读取主流dwd页面主题数据tableEnv.executeSql(create table page_info(\n common mapstring,string,\n page mapstring,string,\n ts bigint,\n row_time as to_timestamp_ltz(ts,3),\n WATERMARK FOR row_time AS row_time - INTERVAL 5 SECOND\n ) SQLUtil.getKafkaSourceSQL(Constant.TOPIC_DWD_TRAFFIC_PAGE, groupId));//测试是否获取到数据//tableEnv.executeSql(select * from page_info).print();//2. 筛选出关键字keywordsTable keywrodTable tableEnv.sqlQuery(select\n page[item] keywords,\n row_time,\n ts\n from page_info\n where page[last_page_id] search\n and page[item_type] keyword\n and page[item] is not null);tableEnv.createTemporaryView(keywords_table, keywrodTable);// 测试是否获取到数据//tableEnv.executeSql(select * from keywords_table).print();//3. 自定义分词函数并注册tableEnv.createTemporarySystemFunction(kwSplit, KwSplit.class );//4. 调用分词函数对keywords进行拆分Table splitKwTable tableEnv.sqlQuery(select keywords, keyword, row_time from keywords_table left join lateral Table(kwSplit(keywords)) on true);tableEnv.createTemporaryView(split_kw_table, splitKwTable);//tableEnv.executeSql(select * from split_kw_table).print();//5. 对keyword进行分组开窗聚合Table windowAggTable tableEnv.sqlQuery(select\n keyword,\n cast(tumble_start(row_time,interval 10 second ) as string) wStart,\n cast(tumble_end(row_time,interval 10 second ) as string) wEnd,\n cast(current_date as string) cur_date,\n count(*) keyword_count\n from split_kw_table\n group by tumble(row_time, interval 10 second), keyword);//tableEnv.createTemporaryView(result_table,table);//tableEnv.executeSql(select keyword,keyword_count1 from result_table).print();//6. 写出到doristableEnv.executeSql(create table doris_sink\n (\n keyword STRING,\n wStart STRING,\n wEnd STRING,\n cur_date STRING,\n keyword_count BIGINT\n ) SQLUtil.getDorisSinkSQL(Constant.DWS_TRAFFIC_SOURCE_KEYWORD_PAGE_VIEW_WINDOW));windowAggTable.insertInto(doris_sink).execute();}
}