电商网站的银行支付接入该怎么做,网页设计与网站架设,北京手机网站设计费用,网站域名绑定破解作者#xff1a;刘大龙唯品会
随着互联网的发展进入下半场#xff0c;数据的时效性对企业的精细化运营越来越重要#xff0c; 商场如战场#xff0c;在每天产生的海量数据中#xff0c;如何能实时有效的挖掘出有价值的信息#xff0c; 对企业的决策运营策略调整有很大帮…作者刘大龙唯品会
随着互联网的发展进入下半场数据的时效性对企业的精细化运营越来越重要 商场如战场在每天产生的海量数据中如何能实时有效的挖掘出有价值的信息 对企业的决策运营策略调整有很大帮助。此外随着 5G 技术的成熟、广泛应用 对于工业互联网、物联网等数据时效性要求非常高的行业企业就更需要一套完整成熟的实时数据体系来提高自身的行业竞争力。
本文从上述现状及实时数据需求出发结合工业界案例、笔者的实时数据开发经验 梳理总结了实时数据体系建设的总体方案本文主要分为三个部分 第一部分主要介绍了当下在工业界比较火热的实时计算引擎 Flink 在实时数据体系建设过程中主要的应用场景及对应解决方案 第二部分从实时数据体系架构、实时数据模型分层、实时数据体系建设方式、流批一体实时数据架构发展等四个方面思考了实时数据体系的建设方案 第三部分则以一个具体案例介绍如何使用 Flink SQL 完成实时数据统计类需求。 一、Flink 实时应用场景
目前看来Flink 在实时计算领域内的主要应用场景主要可分为四类场景 分别是实时数据同步、流式 ETL、实时数据分析和复杂事件处理具体的业务场景和对应的解决方案可详细研究下图 文字层面不再详述。 二、实时数据体系架构
实时数据体系大致分为三类场景流量类、业务类和特征类这三种场景各有不同。
在数据模型上流量类是扁平化的宽表业务数仓更多是基于范式的建模特征数据是 KV 存储从数据来源区分流量数仓的数据来源一般是日志数据业务数仓的数据来源是业务 binlog 数据特征数仓的数据来源则多种多样;从数据量而言流量和特征数仓都是海量数据每天十亿级以上而业务数仓的数据量一般每天百万到千万级从数据更新频率而言流量数据极少更新则业务和特征数据更新较多流量数据一般关注时序和趋势业务数据和特征数据关注状态变更在数据准确性上流量数据要求较低而业务数据和特征数据要求较高。
2.1 实时数据体系整体架构 整个实时数据体系架构分为五层分别是接入层存储层计算层、平台层和应用层上图只是整体架构的概要图每一层具体要做的事情接下来通过文字来详述。
接入层该层利用各种数据接入工具收集各个系统的数据包括 binlog 日志、埋点日志、以及后端服务日志数据会被收集到 Kafka 中这些数据不只是参与实时计算也会参与离线计算保证实时和离线的原始数据是统一的存储层该层对原始数据、清洗关联后的明细数据进行存储基于统一的实时数据模型分层理念将不同应用场景的数据分别存储在 Kafka、HDFS、Kudu、 Clickhouse、Hbase、Redis、Mysql 等存储引擎中各种存储引擎存放的具体的数据类型在实时数据模型分层部分会详细介绍计算层计算层主要使用 Flink、Spark、Presto 以及 ClickHouse 自带的计算能力等四种计算引擎Flink 计算引擎主要用于实时数据同步、 流式 ETL、关键系统秒级实时指标计算场景Spark SQL 主要用于复杂多维分析的准实时指标计算需求场景Presto 和 ClickHouse 主要满足多维自助分析、对查询响应时间要求不太高的场景平台层在平台层主要做三个方面的工作分别是对外提供统一查询服务、元数据及指标管理、数据质量及血缘应用层以统一查询服务对各个业务线数据场景进行支持业务主要包括实时大屏、实时数据产品、实时 OLAP、实时特征等。
其中平台层详细工作如下
统一查询服务支持从底层明细数据到聚合层数据的查询支持以SQL化方式查询Redis、Hbase等KV存储中的数据元数据及指标管理主要对实时的Kafka表、Kudu表、Clickhouse表、Hive表等进行统一管理以数仓模型中表的命名方式规范表的命名明确每张表的字段含义、使用方指标管理则是尽量通过指标管理系统将所有的实时指标统一管理起来明确计算口径提供给不同的业务方使用数据质量及血缘分析数据质量分为平台监控和数据监控两个部分血缘分析则主要是对实时数据依赖关系、实时任务的依赖关系进行分析。
平台监控部分一是对任务运行状态进行监控对异常的任务进行报警并根据设定的参数对任务进行自动拉起与恢复二是针对 Flink 任务要对 Kafka 消费处理延迟进行监控并实时报警。
数据据监控则分为两个部分首先流式 ETL 是整个实时数据流转过程中重要的一环ETL 的过程中会关联各种维表实时关联时定时对没有关联上的记录上报异常日志到监控平台当数量达到一定阈值时触发报警 其次部分关键实时指标采用了 lambda 架构因此需要对历史的实时指标与离线 hive 计算的数据定时做对比提供实时数据的数据质量监控对超过阈值的指标数据进行报警。
为了配合数据监控需要做实时数据血缘主要是梳理实时数据体系中数据依赖关系以及实时任务的依赖关系从底层ODS 到 DW 再到 DM以及 DM 层被哪些模型用到 将整个链条串联起来这样做在数据/任务主动调整时可以通知关联的下游指标异常时借助血缘定位问题同时基于血缘关系的分析我们也能评估数据的应用价值核算数据的计算成本。
2.2 实时数据模型分层 离线数仓考虑到效率问题一般会采取空间换时间的方式层级划分会比较多实时数仓考虑到实时性问题分层则越少越好另外也减少了中间流程出错的可能性因此将其分为四层。
■ ODS 层
操作数据层保存原始数据对非结构化的数据进行结构化处理轻度清洗几乎不删除原始数据该层的数据主要来自业务数据库的 binlog 日志、埋点日志和应用程序日志对于 binlog 日志通过 canal 监听写到消息队列 Kafka 中对应于埋点和应用程序日志则通过 Filebeat 采集 nginx 和 tomcat 日志上报到Kafka 中除了存储在 Kafka 中同时也会对业务数据库的 binlog 日志通过 Flink 写入 HDFS、Kudu 等存储引擎落地到 5min Hive 表供查询明细数据同时也提供给离线数仓做为其原始数据另外对于埋点日志数据由于 ODS 层是非结构化的则没有必要落地。
■ DWD 层
实时明细数据层以业务过程作为建模驱动基于每个具体的业务过程特点构建最细粒度的明细层事实表可以结合企业的数据使用特点将明细事实表的某些重要维度属性字段做适当冗余也即宽表化处理该层的数据来源于 ODS 层通过简单的 Streaming ETL 后得到对于 binlog 日志的处理主要进行简单的数据清洗、处理数据漂移以及可能对多个 ODS 层的表进行 Streaming Join对流量日志主要是做一些通用ETL 处理将非结构化的数据结构化关联通用的维度字段该层的数据存储在消息队列 Kafka 中同时也会用 Flink 实时写入 Hive 5min 表供查询明细数据同时要提供给离线数仓做为其原始数据。
■ DIM 层
公共维度层基于维度建模理念思想建立整个业务过程的一致性维度降低数据计算口径和算法不统一风险DIM 层数据来源于两部分一部分是Flink程序实时处理ODS层数据得到另外一部分是通过离线任务出仓得到DIM 层维度数据主要使用 MySQL、Hbase、Redis 三种存储引擎对于维表数据比较少的情况可以使用 MySQL对于单条数据大小比较小查询 QPS 比较高的情况可以使用 Redis 存储降低机器内存资源占用对于数据量比较大对维表数据变化不是特别敏感的场景可以使用HBase 存储。
■ DM 层
1数据集市层
以数据域业务域的理念建设公共汇总层对于DM层比较复杂需要综合考虑对于数据落地的要求以及具体的查询引擎来选择不同的存储方式分为轻度汇总层和高度汇总层同时产出高度汇总层数据用于前端比较简单的KV查询 提升查询性能比如实时大屏实时报表等数据的时效性要求为秒级轻度汇总层Kafka中宽表实时写入OLAP存储引擎用于前端产品复杂的OLAP查询场景满足自助分析和产出复杂报表的需求对数据的时效性要求可容忍到分钟级
2轻度汇总层
轻度汇总层由明细层通过Streaming ETL得到主要以宽表的形式存在业务明细汇总是由业务事实明细表和维度表join得到流量明细汇总是由流量日志按业务线拆分和维度表join得到轻度汇总层数据存储比较多样化首先利用Flink实时消费DWD层Kafka中明细数据join业务过程需要的维表实时打宽后写入该层的Kafka中以Json或PB格式存储同时对多维业务明细汇总数据通过Flink实时写入Kudu用于查询明细数据和更复杂的多维数据分析需求对于流量数据通过Flink分别写入HDFS和ClickHouse用于复杂的多维数据分析 实时特征数据则通过Flink join维表后实时写入HDFS用于下游的离线ETL消费对于落地Kudu和HDFS的宽表数据可用Spark SQL做分钟级的预计算满足业务方复杂数据分析需求提供分钟级延迟的数据从而加速离线ETL过程的延迟 另外随着Flink SQL与Hive生态集成的不断完善可尝试用Flink SQL做离线ETL和OLAP计算任务(Flink流计算基于内存计算的特性和presto非常类似这使其也可以成为一个OLAP计算引擎)用一套计算引擎解决实时离线需求从而实现批流统一对于Kudu中的业务明细数据、ClickHouse中的流量明细数据也可以满足业务方的个性化数据分析需求利用强大的OLAP计算引擎实时查询明细数据在10s量级的响应时间内给出结果这类需求也即是实时OLAP需求灵活性比较高。
3高度汇总层
高度汇总层由明细数据层或轻度汇总层通过聚合计算后写入到存储引擎中产出一部分实时数据指标需求灵活性比较差计算引擎使用Flink Datastream API和Flink SQL指标存储引擎根据不同的需求对于常见的简单指标汇总模型可直接放在MySQL里面维度比较多的、写入更新比较大的模型会放在HBase里面 还有一种是需要做排序、对查询QPS、响应时间要求非常高、且不需要持久化存储如大促活动期间在线TopN商品等直接存储在Redis里面在秒级指标需求中需要混用Lambda和Kappa架构大部分实时指标使用Kappa架构完成计算少量关键指标如金额相关使用Lambda架构用批处理重新处理计算增加一次校对过程。
总体来说 DM 层对外提供三种时效性的数据
首先是 Flink 等实时计算引擎预计算好的秒级实时指标这种需求对数据的时效性要求非常高用于实时大屏、计算维度不复杂的实时报表需求。
其次是 Spark SQL 预计算的延迟在分钟级的准实时指标 该类指标满足一些比较复杂但对数据时效性要求不太高的数据分析场景可能会涉及到多个事实表的join如销售归因等需求。
最后一种则是不需要预计算ad-hoc查询的复杂多维数据分析场景此类需求比较个性化灵活性比较高如果 OLAP 计算引擎性能足够强大也可完全满足秒级计算需求的场景; 对外提供的秒级实时数据和另外两种准实时数据的比例大致为 37绝大多数的业务需求都优先考虑准实时计算或 ad-hoc 方式可以降低资源使用、提升数据准确性以更灵活的方式满足复杂的业务场景。
2.3 实时数据体系建设方式
整个实时数据体系分为两种建设方式即实时和准实时(它们的实现方式分别是基于流计算引擎和 ETL、OLAP 引擎数据时效性则分别是秒级和分钟级。
在调度开销方面准实时数据是批处理过程因此仍然需要调度系统支持调度频率较高而实时数据却没有调度开销在业务灵活性方面因为准实时数据是基于 ETL 或 OLAP 引擎实现灵活性优于基于流计算的方式在对数据晚到的容忍度方面因为准实时数据可以基于一个周期内的数据进行全量计算因此对于数据晚到的容忍度也是比较高的而实时数据使用的是增量计算对于数据晚到的容忍度更低一些在适用场景方面准实时数据主要用于有实时性要求但不太高、涉及多表关联和业务变更频繁的场景如交易类型的实时分析实时数据则更适用于实时性要求高、数据量大的场景如实时特征、流量类型实时分析等场景。
2.4 流批一体实时数据架构发展
从1990年 Inmon 提出数据仓库概念到今天大数据架构经历了从最初的离线大数据架构、Lambda 架构、Kappa 架构以及 Flink 的火热带出的流批一体架构数据架构技术不断演进本质是在往流批一体的方向发展让用户能以最自然、最小的成本完成实时计算。
离线大数据架构数据源通过离线的方式导入到离线数仓中下游应用根据业务需求选择直接读取 DM 或加一层数据服务比如 MySQL 或 Redis数据存储引擎是 HDFS/HiveETL 工具可以是 MapReduce 脚本或 HiveSQL。数据仓库从模型层面分为操作数据层 ODS、数据仓库明细层 DWD、数据集市层 DMLambda 架构随着大数据应用的发展人们逐渐对系统的实时性提出了要求为了计算一些实时指标就在原来离线数仓的基础上增加了一个实时计算的链路并对数据源做流式改造即把数据发送到消息队列实时计算去订阅消息队列直接完成指标增量的计算推送到下游的数据服务中去由数据服务层完成离线实时结果的合并Kappa 架构Lambda 架构虽然满足了实时的需求但带来了更多的开发与运维工作其架构背景是流处理引擎还不完善流处理的结果只作为临时的、近似的值提供参考。后来随着 Flink 等流处理引擎的出现流处理技术成熟起来这时为了解决两套代码的问题LickedIn 的 Jay Kreps 提出了 Kappa 架构流批一体架构流批一体架构比较完美的实现方式是采用流计算 交互式分析双引擎架构在这个架构中流计算负责的是基础数据而交互式分析引擎是中心流计算引擎对数据进行实时 ETL 工作与离线相比降低了 ETL 过程的 latency交互式分析引擎则自带存储通过计算存储的协同优化 实现高写入 TPS、高查询 QPS 和低查询 latency 从而做到全链路的实时化和 SQL 化这样就可以用批的方式实现实时分析和按需分析并能快速的响应业务的变化两者配合实现 1 1 2 的效果 该架构对交互式分析引擎的要求非常高也许是未来大数据库技术发展的一个重点和方向。
为了应对业务方更复杂的多维实时数据分析需求笔者目前在数据开发中引入 Kudu这个 OLAP 存储引擎对订单等业务数据使用 Presto Kudu 的计算方案也是在探索流批一体架构在实时数据分析领域的可行性。此外目前比较热的数据湖技术如 Delta lake、Hudi 等支持在 HDFS 上进行 upsert 更新随着其流式写入、SQL 引擎支持的成熟未来可以用一套存储引擎解决实时、离线数据需求从而减少多引擎运维开发成本。
三、Flink SQL 实时计算 UV 指标
上一部分从宏观层面介绍了如何建设实时数据体系非常不接地气可能大家需要的只是一个具体的 case 来了解一下该怎么做那么接下来用一个接地气的案例来介绍如何实时计算 UV 数据。大家都知道在 ToC 的互联网公司UV 是一个很重要的指标对于老板、商务、运营的及时决策会产生很大的影响笔者在电商公司目前主要的工作就是计算 UV、销售等各类实时数据体验就特别深刻 因此就用一个简单demo 演示如何用 Flink SQL 消费 Kafka 中的 PV 数据实时计算出 UV 指标后写入 Hbase。
3.1 Kafka 源数据解析
PV 数据来源于埋点数据经 FileBeat 上报清洗后以 ProtoBuffer 格式写入下游 Kafka消费时第一步要先反序列化 PB 格式的数据为 Flink 能识别的 Row 类型因此也就需要自定义实现 DeserializationSchema 接口具体如下代码 这里只抽取计算用到的 PV 的 mid、事件时间 time_local并从其解析得到 log_date 字段
public class PageViewDeserializationSchema implements DeserializationSchemaRow {public static final Logger LOG LoggerFactory.getLogger(PageViewDeserializationSchema.class);protected SimpleDateFormat dayFormatter;private final RowTypeInfo rowTypeInfo;public PageViewDeserializationSchema(RowTypeInfo rowTypeInfo){dayFormatter new SimpleDateFormat(yyyyMMdd, Locale.UK);this.rowTypeInfo rowTypeInfo;}Overridepublic Row deserialize(byte[] message) throws IOException {Row row new Row(rowTypeInfo.getArity());MobilePage mobilePage null;try {mobilePage MobilePage.parseFrom(message);String mid mobilePage.getMid();row.setField(0, mid);Long timeLocal mobilePage.getTimeLocal();String logDate dayFormatter.format(timeLocal);row.setField(1, logDate);row.setField(2, timeLocal);}catch (Exception e){String mobilePageError (mobilePage ! null) ? mobilePage.toString() : ;LOG.error(error parse bytes payload is {}, pageview error is {}, message.toString(), mobilePageError, e);}return null;}
3.2 编写 Flink Job 主程序
将 PV 数据解析为 Flink 的 Row 类型后接下来就很简单了编写主函数写 SQL 就能统计 UV 指标了代码如下
public class RealtimeUV {public static void main(String[] args) throws Exception {//step1 从properties配置文件中解析出需要的Kakfa、Hbase配置信息、checkpoint参数信息MapString, String config PropertiesUtil.loadConfFromFile(args[0]);String topic config.get(source.kafka.topic);String groupId config.get(source.group.id);String sourceBootStrapServers config.get(source.bootstrap.servers);String hbaseTable config.get(hbase.table.name);String hbaseZkQuorum config.get(hbase.zk.quorum);String hbaseZkParent config.get(hbase.zk.parent);int checkPointPeriod Integer.parseInt(config.get(checkpoint.period));int checkPointTimeout Integer.parseInt(config.get(checkpoint.timeout));StreamExecutionEnvironment sEnv StreamExecutionEnvironment.getExecutionEnvironment();//step2 设置Checkpoint相关参数用于Failover容错sEnv.getConfig().registerTypeWithKryoSerializer(MobilePage.class,ProtobufSerializer.class);sEnv.getCheckpointConfig().setFailOnCheckpointingErrors(false);sEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1);sEnv.enableCheckpointing(checkPointPeriod, CheckpointingMode.EXACTLY_ONCE);sEnv.getCheckpointConfig().setCheckpointTimeout(checkPointTimeout);sEnv.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//step3 使用Blink planner、创建TableEnvironment,并且设置状态过期时间避免Job OOMEnvironmentSettings environmentSettings EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tEnv StreamTableEnvironment.create(sEnv, environmentSettings);tEnv.getConfig().setIdleStateRetentionTime(Time.days(1), Time.days(2));Properties sourceProperties new Properties();sourceProperties.setProperty(bootstrap.servers, sourceBootStrapServers);sourceProperties.setProperty(auto.commit.interval.ms, 3000);sourceProperties.setProperty(group.id, groupId);//step4 初始化KafkaTableSource的Schema信息笔者这里使用register TableSource的方式将源表注册到Flink中而没有用register DataStream方式也是因为想熟悉一下如何注册KafkaTableSource到Flink中TableSchema schema TableSchemaUtil.getAppPageViewTableSchema();OptionalString proctimeAttribute Optional.empty();ListRowtimeAttributeDescriptor rowtimeAttributeDescriptors Collections.emptyList();MapString, String fieldMapping new HashMap();ListString columnNames new ArrayList();RowTypeInfo rowTypeInfo new RowTypeInfo(schema.getFieldTypes(), schema.getFieldNames());columnNames.addAll(Arrays.asList(schema.getFieldNames()));columnNames.forEach(name - fieldMapping.put(name, name));PageViewDeserializationSchema deserializationSchema new PageViewDeserializationSchema(rowTypeInfo);MapKafkaTopicPartition, Long specificOffsets new HashMap();Kafka011TableSource kafkaTableSource new Kafka011TableSource(schema,proctimeAttribute,rowtimeAttributeDescriptors,Optional.of(fieldMapping),topic,sourceProperties,deserializationSchema,StartupMode.EARLIEST,specificOffsets);tEnv.registerTableSource(pageview, kafkaTableSource);//step5 初始化Hbase TableSchema、写入参数并将其注册到Flink中HBaseTableSchema hBaseTableSchema new HBaseTableSchema();hBaseTableSchema.setRowKey(log_date, String.class);hBaseTableSchema.addColumn(f, UV, Long.class);HBaseOptions hBaseOptions HBaseOptions.builder().setTableName(hbaseTable).setZkQuorum(hbaseZkQuorum).setZkNodeParent(hbaseZkParent).build();HBaseWriteOptions hBaseWriteOptions HBaseWriteOptions.builder().setBufferFlushMaxRows(1000).setBufferFlushIntervalMillis(1000).build();HBaseUpsertTableSink hBaseSink new HBaseUpsertTableSink(hBaseTableSchema, hBaseOptions, hBaseWriteOptions);tEnv.registerTableSink(uv_index, hBaseSink);//step6 实时计算当天UV指标sql, 这里使用最简单的group by agg没有使用minibatch或窗口在大数据量优化时最好使用后两种方式String uvQuery insert into uv_index select log_date,\n ROW(count(distinct mid) as UV)\n from pageview\n group by log_date;tEnv.sqlUpdate(uvQuery);//step7 执行JobsEnv.execute(UV Job);}
}
以上就是一个简单的使用 Flink SQL 统计 UV 的 case, 代码非常简单只需要理清楚如何解析 Kafka 中数据如何初始化 Table Schema以及如何将表注册到 Flink中即可使用 Flink SQL 完成各种复杂的实时数据统计类的业务需求学习成本比API 的方式低很多。说明一下笔者这个 demo 是基于目前业务场景而开发的在生产环境中可以真实运行起来可能不能拆箱即用你需要结合自己的业务场景自定义相应的 kafka 数据解析类。
参考资料
Flink Use Cases基于Flink的严选实时数仓实践如果你也想做实时数仓菜鸟供应链实时数仓的架构演进及应用场景美团点评基于 Flink 的实时数仓平台实践知乎实时数仓架构实践及演进OPPO数据中台之基石基于Flink SQL构建实数据仓库友信金服基于Flink构建实时用户画像系统的实践实时数据中台如何能做得更好
原文链接 本文为云栖社区原创内容未经允许不得转载。