当前位置: 首页 > news >正文

河南省建设工会网站重庆平台网站建设设计

河南省建设工会网站,重庆平台网站建设设计,深圳建工集团股份有限公司待遇,wordpress genesis3.8.基于Flink将数据写入到ClickHouse 编写Flink完成数据写入到ClickHouse操作, 后续基于CK完成指标统计操作 3.8.1.ClickHouse基本介绍 ClickHouse 是俄罗斯的Yandex于2016年开源的列式存储数据库#xff08;DBMS#xff09;#xff0c;使用C语言编写#xff0c;主要用…3.8.基于Flink将数据写入到ClickHouse 编写Flink完成数据写入到ClickHouse操作, 后续基于CK完成指标统计操作 3.8.1.ClickHouse基本介绍 ClickHouse 是俄罗斯的Yandex于2016年开源的列式存储数据库DBMS使用C语言编写主要用于在线分析处理查询OLAP能够使用SQL查询实时生成分析数据报告。 结论: ClickHouse像很多OLAP数据库一样单表查询速度由于关联查询而且ClickHouse的两者差距更为明显。 3.8.2.ClickHouse安装步骤 本项目中,我们仅需要安装单机测试版本即可使用(node2安装), 在实际生产中, 大家可以直接将分布式集群版本 1-设置yum源 sudo yum install yum-utils sudo rpm --import https://repo.clickhouse.com/CLICKHOUSE-KEY.GPG sudo yum-config-manager --add-repo https://repo.clickhouse.com/rpm/stable/x86_642- 直接基于yum安装即可 sudo yum install clickhouse-server clickhouse-client3-修改配置文件 vim /etc/clickhouse-server/config.xml 修改178行: 打开这一行的注释 listen_host::/listen_host4-启动clickhouse的server systemctl start clickhouse-server 停止: systemctl stop clickhouse-server 重启 systemctl restart clickhouse-server5-进入客户端 3.8.3.在ClickHouse中创建目标表 create database itcast_ck; use itcast_ck; create table itcast_ck.itcast_ck_ems( id int, sid varchar(128), ip varchar(128), create_time varchar(128), session_id varchar(128), yearInfo varchar(128), monthInfo varchar(128), dayInfo varchar(128), hourInfo varchar(128), seo_source varchar(128), area varchar(128), origin_channel varchar(128), msg_count int(128), from_url varchar(128), PRIMARY KEY (id) ) ENGINEReplacingMergeTree();3.8.4.编写Flink代码完成写入到CK操作 import com.itheima.pojo.PulsarTopicPojo; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.io.jdbc.JDBCAppendTableSink; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSource; import org.apache.flink.streaming.connectors.pulsar.internal.JsonDeser; import org.apache.flink.types.Row;import java.sql.Types; import java.util.Properties;// 基于Flink完成读取Pulsar中数据将消息数据写入到clickhouse中 public class ItcastFlinkToClickHouse {public static void main(String[] args) throws Exception {//1. 创建Flinnk流式处理核心环境类对象 和 Table API 核心环境类对象StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();//2. 添加Source组件, 从Pulsar中读取消息数据Properties props new Properties();props.setProperty(topic,persistent://public/default/itcast_ems_tab);props.setProperty(partition.discovery.interval-millis,5000);FlinkPulsarSourcePulsarTopicPojo pulsarSource new FlinkPulsarSourcePulsarTopicPojo(pulsar://node1:6650,node2:6650,node3:6650,http://node1:8080,node2:8080,node3:8080,JsonDeser.of(PulsarTopicPojo.class),props);//2.1 设置pulsarSource组件在消费数据的时候, 默认从什么位置开始消费pulsarSource.setStartFromLatest();DataStreamSourcePulsarTopicPojo dataStreamSource env.addSource(pulsarSource);//2.2 转换数据操作: 将 PulsarTopicPojo 转换为ROW对象SingleOutputStreamOperatorRow rowDataSteam dataStreamSource.map(new MapFunctionPulsarTopicPojo, Row() {Overridepublic Row map(PulsarTopicPojo pulsarTopicPojo) throws Exception {return Row.of(pulsarTopicPojo.getId(), pulsarTopicPojo.getSid(), pulsarTopicPojo.getIp(), pulsarTopicPojo.getCreate_time(),pulsarTopicPojo.getSession_id(), pulsarTopicPojo.getYearInfo(), pulsarTopicPojo.getMonthInfo(), pulsarTopicPojo.getDayInfo(),pulsarTopicPojo.getHourInfo(), pulsarTopicPojo.getSeo_source(), pulsarTopicPojo.getArea(), pulsarTopicPojo.getOrigin_channel(),pulsarTopicPojo.getMsg_count(), pulsarTopicPojo.getFrom_url());}});//2.3: 设置sink操作写入到CK操作String insertSql insert into itcast_ck.itcast_ck_ems (id,sid,ip,create_time,session_id,yearInfo,monthInfo,dayInfo,hourInfo,seo_source,area,origin_channel,msg_count,from_url) values(?,?,?,?,?,?,?,?,?,?,?,?,?,?);JDBCAppendTableSink tableSink JDBCAppendTableSink.builder().setDrivername(ru.yandex.clickhouse.ClickHouseDriver).setDBUrl(jdbc:clickhouse://node2:8123/itcast_ck).setQuery(insertSql).setBatchSize(1).setParameterTypes(Types.INTEGER,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.INTEGER,Types.VARCHAR).build();tableSink.emitDataStream(rowDataSteam);//3. 提交执行env.execute(itcast_to_ck);} }3.9.HBase对接Phoenix实现即席查询 3.9.1.Phoenix安装操作 Phoenix是属于apache旗下的一款基于hbase的工具, 此工具提供一种全新的方式来操作hbase中数据(SQL), 同时Phoenix对hbase进行大量的优化工作, 能够让我们更加有效的操作hbase 整个安装操作, 大家可以参考资料中安装手册, 进行安装即可 3.9.2.在Phoenix中创建表 create view itcast_h_ems ( id integer primary key, f1.sid varchar, f1.ip varchar, f1.create_time varchar, f1.session_id varchar, f1.yearInfo varchar, f1.monthInfo varchar, f1.dayInfo varchar, f1.hourInfo varchar, f1.seo_source varchar, f1.area varchar, f1.origin_channel varchar, f1.msg_count integer, f1.from_url varchar );3.9.3.在Phoenix中类型说明
http://www.pierceye.com/news/809235/

相关文章:

  • 做兼职的网站策划书大连中山网站建设
  • 中国摄影网站深圳网站建设龙华
  • 个人网站怎么建立深圳网站建站费用
  • 笔趣阁建站教程网页设计 网站建设啥意思
  • 海门网站开发西安响应式网站建设服务提供商
  • 自适应网站建站哈尔滨市建设安全监察网站
  • nas服务器可以做网站吗电商类网站开发方案
  • 免费的个人的网站网站建设 考虑
  • 医院网站建设的目的高端网站有哪些优势
  • 佛山网站建设首选如何备份wordpress
  • 优化稳定网站排名网站建设需要学什么语言
  • 可以做设计私单的网站硬件开发工程师面试
  • 竞价网站单页网页设计师中级证书有用吗
  • 做网站 简单外包wordpress 插件api
  • 白城网站seo新手怎么建立自己网站
  • 建立用模板建立网站wordpress feed
  • 株洲品牌网站建设优质的杭州网站优化
  • 网站开发在哪个科目核算网站平台怎么做的好处
  • 网站底部模板代码江苏建站系统
  • 写出网站开发的基本流程品牌建设网站
  • 河北省建设机械协会网站双减之下托管班合法吗
  • 江门市城乡建设局网站阿里云万网域名购买
  • 网站推广技术哪家好专业网站开发建设
  • 义乌营销型网站建设淘宝做动图网站
  • dedecms能做什么网站素材网站怎么做
  • 一流导航设计网站wordpress 七牛 插件
  • 新开元电销系统济南网站优化技术厂家
  • 有名的网站建设wordpress安装到主机
  • 网站建设的指导思想p2p金融网站建设
  • 可在哪些网站做链接郑州展厅设计公司