外贸建立网站怎么做,网站大图怎么优化,网站的营销方案,2019做网站赚钱么Flink 系列文章
1、Flink 部署、概念介绍、source、transformation、sink使用示例、四大基石介绍和示例等系列综合文章链接
13、Flink 的table api与sql的基本概念、通用api介绍及入门示例 14、Flink 的table api与sql之数据类型: 内置数据类型以及它们的属性 15、Flink 的ta…Flink 系列文章
1、Flink 部署、概念介绍、source、transformation、sink使用示例、四大基石介绍和示例等系列综合文章链接
13、Flink 的table api与sql的基本概念、通用api介绍及入门示例 14、Flink 的table api与sql之数据类型: 内置数据类型以及它们的属性 15、Flink 的table api与sql之流式概念-详解的介绍了动态表、时间属性配置如何处理更新结果、时态表、流上的join、流上的确定性以及查询配置 16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及FileSystem示例1 16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Elasticsearch示例2 16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Kafka示例3 16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及JDBC示例4 16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Hive示例6 17、Flink 之Table API: Table API 支持的操作1 17、Flink 之Table API: Table API 支持的操作2 18、Flink的SQL 支持的操作和语法 19、Flink 的Table API 和 SQL 中的内置函数及示例1 19、Flink 的Table API 和 SQL 中的自定义函数及示例2 19、Flink 的Table API 和 SQL 中的自定义函数及示例3 19、Flink 的Table API 和 SQL 中的自定义函数及示例4 20、Flink SQL之SQL Client: 不用编写代码就可以尝试 Flink SQL可以直接提交 SQL 任务到集群上 21、Flink 的table API与DataStream API 集成1- 介绍及入门示例、集成说明 21、Flink 的table API与DataStream API 集成2- 批处理模式和inser-only流处理 21、Flink 的table API与DataStream API 集成3- changelog流处理、管道示例、类型转换和老版本转换示例 21、Flink 的table API与DataStream API 集成完整版 22、Flink 的table api与sql之创建表的DDL 24、Flink 的table api与sql之Catalogs介绍、类型、java api和sql实现ddl、java api和sql操作catalog-1 24、Flink 的table api与sql之Catalogsjava api操作数据库、表-2 24、Flink 的table api与sql之Catalogsjava api操作视图-3 24、Flink 的table api与sql之Catalogsjava api操作分区与函数-4 25、Flink 的table api与sql之函数(自定义函数示例) 26、Flink 的SQL之概览与入门示例 27、Flink 的SQL之SELECT (select、where、distinct、order by、limit、集合操作和去重)介绍及详细示例1 27、Flink 的SQL之SELECT (SQL Hints 和 Joins)介绍及详细示例2 27、Flink 的SQL之SELECT (窗口函数)介绍及详细示例3 27、Flink 的SQL之SELECT (窗口聚合)介绍及详细示例4 27、Flink 的SQL之SELECT (Group Aggregation分组聚合、Over Aggregation Over聚合 和 Window Join 窗口关联)介绍及详细示例5 27、Flink 的SQL之SELECT (Top-N、Window Top-N 窗口 Top-N 和 Window Deduplication 窗口去重)介绍及详细示例6 27、Flink 的SQL之SELECT (Pattern Recognition 模式检测)介绍及详细示例7 28、Flink 的SQL之DROP 、ALTER 、INSERT 、ANALYZE 语句 29、Flink SQL之DESCRIBE、EXPLAIN、USE、SHOW、LOAD、UNLOAD、SET、RESET、JAR、JOB Statements、UPDATE、DELETE1 29、Flink SQL之DESCRIBE、EXPLAIN、USE、SHOW、LOAD、UNLOAD、SET、RESET、JAR、JOB Statements、UPDATE、DELETE2 30、Flink SQL之SQL 客户端通过kafka和filesystem的例子介绍了配置文件使用-表、视图等 31、Flink的SQL Gateway介绍及示例 32、Flink table api和SQL 之用户自定义 Sources Sinks实现及详细示例 33、Flink 的Table API 和 SQL 中的时区 35、Flink 的 Formats 之CSV 和 JSON Format 36、Flink 的 Formats 之Parquet 和 Orc Format 41、Flink之Hive 方言介绍及详细示例 40、Flink 的Apache Kafka connectorkafka source的介绍及使用示例-1 40、Flink 的Apache Kafka connectorkafka sink的介绍及使用示例-2 40、Flink 的Apache Kafka connectorkafka source 和sink 说明及使用示例 完整版 42、Flink 的table api与sql之Hive Catalog 43、Flink之Hive 读写及详细验证示例 44、Flink之module模块介绍及使用示例和Flink SQL使用hive内置函数及自定义函数详细示例–网上有些说法好像是错误的 45、Flink 的指标体系介绍及验证1-指标类型及指标实现示例 45、Flink 的指标体系介绍及验证2-指标的scope、报告、系统指标以及追踪、api集成示例和dashboard集成 45、Flink 的指标体系介绍及验证3- 完整版 46、Flink 的table api与sql之配项列表及示例 文章目录 Flink 系列文章一、Flink 指标体系1、Registering metrics 注册指标1、指标类型2、计数器3、Gauge4、Histogram5、Meter 2、Scope 范围1、用户范围2、系统范围System Scope3、所有变量列表4、用户变量 3、Reporter4、System metrics1、CPU2、Memory3、Threads4、GarbageCollection5、ClassLoader6、Network7、Default shuffle service8、Cluster9、Availability10、Checkpointing11、State Access Latency12、RocksDB13、State Changelog14、IO15、Connectors16、System resources17、预测执行 5、End-to-End latency tracking 延迟跟踪6、State access latency tracking 延迟跟踪7、REST API integration1/jobmanager/metrics示例2 taskmanagers/taskmanagerid/metrics?getmetric1,metric2示例3/taskmanagers/metrics?getmetric1,metric2示例4/taskmanagers/metrics?getmetric1,metric2aggmin,max示例 8、Dashboard integration 本文简单的介绍了Flink 的指标体系内容即指标类型以及四种类型的代码实现示例、scope、系统指标、报告、跟踪、api与dashboard集成。 本专题分为三部分即 45、Flink 的指标体系介绍及验证1-指标类型及指标实现示例 45、Flink 的指标体系介绍及验证2-指标的scope、报告、系统指标以及追踪、api集成示例和dashboard集成 45、Flink 的指标体系介绍及验证3- 完整版
本文依赖nc、flink能正常使用。 本文分为8个部分即指标注册、scope、系统指标、报告、跟踪、api与dashboard集成。 本文的示例是在Flink 1.17版本中运行。
一、Flink 指标体系
Flink暴露了一个度量系统允许收集度量并将其公开给外部系统。 本文涉及的maven依赖 propertiesencodingUTF-8/encodingproject.build.sourceEncodingUTF-8/project.build.sourceEncodingmaven.compiler.source1.8/maven.compiler.sourcemaven.compiler.target1.8/maven.compiler.targetjava.version1.8/java.versionscala.version2.12/scala.versionflink.version1.17.0/flink.version/propertiesdependencies!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients --dependencygroupIdorg.apache.flink/groupIdartifactIdflink-clients/artifactIdversion${flink.version}/versionscopeprovided/scope/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-java/artifactIdversion${flink.version}/versionscopeprovided/scope/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-streaming-java/artifactIdversion${flink.version}/versionscopeprovided/scope/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-csv/artifactIdversion${flink.version}/versionscopeprovided/scope/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-json/artifactIdversion${flink.version}/versionscopeprovided/scope/dependency!-- flink连接器 --!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka --dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-kafka/artifactIdversion${flink.version}/version/dependency/dependencies1、Registering metrics 注册指标
通过调用getRuntimeContext().getMetricGroup()您可以从任何扩展RichFunction的用户函数访问度量系统。此方法返回一个MetricGroup对象您可以在该对象上创建和注册新度量。
1、指标类型
Flink支持计数器、仪表盘、柱状图和计量表。Counters, Gauges, Histograms and Meters.
2、计数器
计数器是用来统计数量的。当前值可以是in-或使用 inc()/inc(long n)或dec()/dec(long n)增减。您可以通过调用MetricGroup上的 counter(String name)来创建和注册计数器。 本示例提供了多种实现方式供参考。
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;/*** author alanchan**/
public class TestMetricsDemo {// public class LineMapper extends RichMapFunctionString, String {
// private transient Counter counter;
//
// Override
// public void open(Configuration config) {
// this.counter getRuntimeContext().getMetricGroup().counter(result2LineCounter);
// }
//
// Override
// public String map(String value) throws Exception {
// this.counter.inc();
// return value;
// }
// }public static void test1() throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);env.setParallelism(1);// sourceDataStreamString lines env.socketTextStream(192.168.10.42, 9999);// transformationDataStreamTuple2String, Integer result lines.flatMap(new FlatMapFunctionString, String() {Overridepublic void flatMap(String value, CollectorString out) throws Exception {String[] arr value.split(,);for (String word : arr) {out.collect(word);}}}).map(new MapFunctionString, Tuple2String, Integer() {Overridepublic Tuple2String, Integer map(String value) throws Exception {return Tuple2.of(value, 1);}}).keyBy(t - t.f0).sum(1);// SingleOutputStreamOperatorTuple2Integer, Integer result1 lines.map(new RichMapFunctionString, Tuple2Integer, Integer() {
//
// Override
// public Tuple2Integer, Integer map(String value) throws Exception {
// int subTaskId getRuntimeContext().getIndexOfThisSubtask();// 子任务id/分区编号
// return new Tuple2(subTaskId, 1);
// }
// // 按照子任务id/分区编号分组,并统计每个子任务/分区中有几个元素
// }).keyBy(t - t.f0).sum(1);// RichFlatMapFunctionIN, OUT// Tuple3String, Long, Integer 输入的字符串行数统计单词的总数DataStreamTuple3String, Long, Integer result2 lines.flatMap(new RichFlatMapFunctionString, Tuple2String, Long() {
// private transient Counter counter;private long result2LineCounter 0;Overridepublic void open(Configuration config) {
// this.counter getRuntimeContext().getMetricGroup().counter(result2LineCounter:);result2LineCounter getRuntimeContext().getMetricGroup().counter(result2LineCounter:).getCount();}Overridepublic void flatMap(String value, CollectorTuple2String, Long out) throws Exception {
// this.counter.inc();result2LineCounter;System.out.println(计数器行数 result2LineCounter);String[] arr value.split(,);for (String word : arr) {out.collect(Tuple2.of(word, result2LineCounter));}}}).map(new MapFunctionTuple2String, Long, Tuple3String, Long, Integer() {Overridepublic Tuple3String, Long, Integer map(Tuple2String, Long value) throws Exception {
// Tuple3String, Long, Integer t Tuple3.of(value.f0, value.f1, 1);return Tuple3.of(value.f0, value.f1, 1);}}).keyBy(t - t.f0).sum(2);// sinkresult.print(result:);result2.print(result2:);env.execute();}public static void main(String[] args) throws Exception {test1();
// StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();
// env.setParallelism(1);
// DataStreamString input env.fromElements(a, b, c, a, b, c);
//
// input.keyBy(value - value).map(new RichMapFunctionString, String() {
// private long count 0;
//
// Override
// public void open(Configuration parameters) throws Exception {super.open(parameters);
// count getRuntimeContext().getMetricGroup().counter(myCounter).getCount();
// }
//
// Override
// public String map(String value) throws Exception {
// count;
// return value : count;
// }
// }).print();
//
// env.execute(Flink Count Counter Example);}}
///验证数据///
// 输入数据
[alanchanserver2 bin]$ nc -lk 9999
hello,123
alan,flink,good
alan_chan,hi,flink//控制台输出
计数器行数1
result: (hello,1)
result2: (hello,1,1)
result: (123,1)
result2: (123,1,1)
计数器行数2
result2: (alan,2,1)
result: (alan,1)
result2: (flink,2,1)
result: (flink,1)
result2: (good,2,1)
result: (good,1)
计数器行数3
result: (alan_chan,1)
result2: (alan_chan,3,1)
result: (hi,1)
result2: (hi,3,1)
result: (flink,2)
result2: (flink,2,2)或者您也可以使用自己的Counter实现
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;/*** author alanchan**/
public class TestMetricsDemo {public static void test2() throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);env.setParallelism(1);// sourceDataStreamString lines env.socketTextStream(192.168.10.42, 9999);// transformation// Tuple3String, Long, Integer 输入的字符串行数统计单词的总数DataStreamTuple3String, Long, Integer result lines.flatMap(new RichFlatMapFunctionString, Tuple2String, Long() {private transient Counter counter;Overridepublic void open(Configuration config) {this.counter getRuntimeContext().getMetricGroup().counter(result2LineCounter, new AlanCustomCounter());}Overridepublic void flatMap(String value, CollectorTuple2String, Long out) throws Exception {this.counter.inc();
// result2LineCounter;System.out.println(计数器行数 this.counter.getCount());String[] arr value.split(,);for (String word : arr) {out.collect(Tuple2.of(word, this.counter.getCount()));}}}).map(new MapFunctionTuple2String, Long, Tuple3String, Long, Integer() {Overridepublic Tuple3String, Long, Integer map(Tuple2String, Long value) throws Exception {return Tuple3.of(value.f0, value.f1, 1);}}).keyBy(t - t.f0).sum(2);// sinkresult.print(result:);env.execute();}public static class AlanCustomCounter implements Counter {private long count;Overridepublic void inc() {count 2;}Overridepublic void inc(long n) {count n;}Overridepublic void dec() {count - 2;}Overridepublic void dec(long n) {count - n;}Overridepublic long getCount() {return count;}}public static void main(String[] args) throws Exception {test2();}}///验证数据///
// 输入数据
[alanchanserver2 bin]$ nc -lk 9999
hello,123
alan,flink,good
alan_chan,hi,flink//控制台输出
计数器行数2
result: (hello,2,1)
result: (123,2,1)
计数器行数4
result: (alan,4,1)
result: (flink,4,1)
result: (good,4,1)
计数器行数6
result: (alan_chan,6,1)
result: (hi,6,1)
result: (flink,4,2)3、Gauge
仪表可根据需要提供任何类型的值。为了使用Gauge您必须首先创建一个实现org.apache.flink.metrics.Guge接口的类。返回值的类型没有限制。您可以通过调用MetricGroup上的gauge(String name, Gauge gauge) 来注册gauge。
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;/*** author alanchan**/
public class TestMetricsGaugeDemo {
// public class MyMapper extends RichMapFunctionString, String {
// private transient int valueToExpose 0;
//
// Override
// public void open(Configuration config) {
// getRuntimeContext().getMetricGroup().gauge(MyGauge, new GaugeInteger() {
// Override
// public Integer getValue() {
// return valueToExpose;
// }
// });
// }
//
// Override
// public String map(String value) throws Exception {
// valueToExpose;
// return value;
// }
// }public static void test1() throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);env.setParallelism(1);// sourceDataStreamString lines env.socketTextStream(192.168.10.42, 9999);// transformation// RichFlatMapFunctionIN, OUT// Tuple3String, String, Integer 输入的字符串alan lines[行数]统计单词的总数DataStreamTuple3String, String, Integer result lines.flatMap(new RichFlatMapFunctionString, Tuple2String, String() {private long result2LineCounter 0;private GaugeString gauge null;Overridepublic void open(Configuration config) {result2LineCounter getRuntimeContext().getMetricGroup().counter(resultLineCounter:).getCount();gauge getRuntimeContext().getMetricGroup().gauge(alanGauge, new GaugeString() {Overridepublic String getValue() {return alan lines[ result2LineCounter ];}});}Overridepublic void flatMap(String value, CollectorTuple2String, String out) throws Exception {result2LineCounter;System.out.println(计数器行数 result2LineCounter);String[] arr value.split(,);for (String word : arr) {out.collect(Tuple2.of(word, gauge.getValue()));}}}).map(new MapFunctionTuple2String, String, Tuple3String, String, Integer() {Overridepublic Tuple3String, String, Integer map(Tuple2String, String value) throws Exception {return Tuple3.of(value.f0, value.f1, 1);}}).keyBy(t - t.f0).sum(2);// sinkresult.print(result:);env.execute();}public static void main(String[] args) throws Exception {test1();}}///验证数据///
// 输入数据
[alanchanserver2 bin]$ nc -lk 9999
hello,123
alan,flink,good
alan_chan,hi,flink//控制台输出
计数器行数1
result: (hello,alan lines[1],1)
result: (123,alan lines[1],1)
计数器行数2
result: (alan,alan lines[2],1)
result: (flink,alan lines[2],1)
result: (good,alan lines[2],1)
计数器行数3
result: (alan_chan,alan lines[3],1)
result: (hi,alan lines[3],1)
result: (flink,alan lines[2],2)
报告器会将暴露的对象转换为String这意味着需要一个有意义的toString()实现。
4、Histogram
直方图测量长值的分布。您可以通过调用MetricGroup上的histogram(String name, Histogram histogram) 来注册一个对象。 下面的示例是自己实现的Histogram接口仅仅用于演示实现过程。
import java.io.Serializable;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Gauge;
//import com.codahale.metrics.Histogram;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.HistogramStatistics;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;/*** author alanchan**/
public class TestMetricsHistogramDemo {// public class MyMapper extends RichMapFunctionLong, Long {
// private transient Histogram histogram;
//
// Override
// public void open(Configuration config) {
// this.histogram getRuntimeContext().getMetricGroup().histogram(alanHistogram, new AlanHistogram());
// }
//
// Override
// public Long map(Long value) throws Exception {
// this.histogram.update(value);
// return value;
// }
// }public static class AlanHistogram implements Histogram {private CircularDoubleArray descriptiveStatistics new CircularDoubleArray(10);;public AlanHistogram() {}public AlanHistogram(int windowSize) {this.descriptiveStatistics new CircularDoubleArray(windowSize);}Overridepublic void update(long value) {this.descriptiveStatistics.addValue(value);}Overridepublic long getCount() {return this.descriptiveStatistics.getElementsSeen();}Overridepublic HistogramStatistics getStatistics() {
// return new DescriptiveStatisticsHistogramStatistics(this.descriptiveStatistics);return null;}class CircularDoubleArray implements Serializable {private static final long serialVersionUID 1L;private final double[] backingArray;private int nextPos 0;private boolean fullSize false;private long elementsSeen 0;CircularDoubleArray(int windowSize) {this.backingArray new double[windowSize];}synchronized void addValue(double value) {backingArray[nextPos] value;elementsSeen;nextPos;if (nextPos backingArray.length) {nextPos 0;fullSize true;}}synchronized double[] toUnsortedArray() {final int size getSize();double[] result new double[size];System.arraycopy(backingArray, 0, result, 0, result.length);return result;}private synchronized int getSize() {return fullSize ? backingArray.length : nextPos;}private synchronized long getElementsSeen() {return elementsSeen;}}}public static void test1() throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);env.setParallelism(1);// sourceDataStreamString lines env.socketTextStream(192.168.10.42, 9999);// transformation// RichFlatMapFunctionIN, OUT// Tuple3String, String, Integer 输入的字符串alan lines[行数]统计单词的总数DataStreamTuple3String, String, Integer result lines.flatMap(new RichFlatMapFunctionString, Tuple2String, String() {private long result2LineCounter 0;private GaugeString gauge null;private Histogram histogram null;;Overridepublic void open(Configuration config) {result2LineCounter getRuntimeContext().getMetricGroup().counter(resultLineCounter:).getCount();gauge getRuntimeContext().getMetricGroup().gauge(alanGauge, new GaugeString() {Overridepublic String getValue() {return alan lines[ result2LineCounter ];}});this.histogram getRuntimeContext().getMetricGroup().histogram(alanHistogram, new AlanHistogram());}Overridepublic void flatMap(String value, CollectorTuple2String, String out) throws Exception {result2LineCounter;this.histogram.update(result2LineCounter * 3);// 此处仅仅示例this.histogram.getCount()的值没有实际的意义System.out.println(计数器行数 result2LineCounter histogram: this.histogram.getCount());String[] arr value.split(,);for (String word : arr) {out.collect(Tuple2.of(word, gauge.getValue()));}}}).map(new MapFunctionTuple2String, String, Tuple3String, String, Integer() {Overridepublic Tuple3String, String, Integer map(Tuple2String, String value) throws Exception {return Tuple3.of(value.f0, value.f1, 1);}}).keyBy(t - t.f0).sum(2);// sinkresult.print(result:);env.execute();}public static void main(String[] args) throws Exception {test1();}}///验证数据///
// 输入数据
[alanchanserver2 bin]$ nc -lk 9999
hello,123
alan,flink,good
alan_chan,hi,flink//控制台输出
计数器行数1 histogram:1
result: (hello,alan lines[1],1)
result: (123,alan lines[1],1)
计数器行数2 histogram:2
result: (alan,alan lines[2],1)
result: (flink,alan lines[2],1)
result: (good,alan lines[2],1)
计数器行数3 histogram:3
result: (alan_chan,alan lines[3],1)
result: (hi,alan lines[3],1)
result: (flink,alan lines[2],2)
Flink没有提供直方图的默认实现但提供了一个允许使用Codahale/DropWizard直方图的包装器。要使用此包装器 在pom.xml中添加以下依赖项
dependencygroupIdorg.apache.flink/groupIdartifactIdflink-metrics-dropwizard/artifactIdversion1.17.1/version
/dependency下面的示例是使用 Codahale/DropWizard直方图如下所示
import java.io.Serializable;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper;
import org.apache.flink.metrics.Gauge;
//import com.codahale.metrics.Histogram;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.HistogramStatistics;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;import com.codahale.metrics.SlidingWindowReservoir;/*** author alanchan**/
public class TestMetricsHistogramDemo {public static void test2() throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);env.setParallelism(1);// sourceDataStreamString lines env.socketTextStream(192.168.10.42, 9999);// transformation// RichFlatMapFunctionIN, OUT// Tuple3String, String, Integer 输入的字符串alan lines[行数]统计单词的总数DataStreamTuple3String, String, Integer result lines.flatMap(new RichFlatMapFunctionString, Tuple2String, String() {private long result2LineCounter 0;private GaugeString gauge null;private Histogram histogram null;;Overridepublic void open(Configuration config) {result2LineCounter getRuntimeContext().getMetricGroup().counter(resultLineCounter:).getCount();gauge getRuntimeContext().getMetricGroup().gauge(alanGauge, new GaugeString() {Overridepublic String getValue() {return alan lines[ result2LineCounter ];}});com.codahale.metrics.Histogram dropwizardHistogram new com.codahale.metrics.Histogram(new SlidingWindowReservoir(500));
// this.histogram getRuntimeContext().getMetricGroup().histogram(alanHistogram, new AlanHistogram());this.histogram getRuntimeContext().getMetricGroup().histogram(alanHistogram, new DropwizardHistogramWrapper(dropwizardHistogram));}Overridepublic void flatMap(String value, CollectorTuple2String, String out) throws Exception {result2LineCounter;this.histogram.update(result2LineCounter * 3);// 此处仅仅示例this.histogram.getCount()的值没有实际的意义System.out.println(计数器行数 result2LineCounter histogram: this.histogram.getCount());String[] arr value.split(,);for (String word : arr) {out.collect(Tuple2.of(word, gauge.getValue()));}}}).map(new MapFunctionTuple2String, String, Tuple3String, String, Integer() {Overridepublic Tuple3String, String, Integer map(Tuple2String, String value) throws Exception {return Tuple3.of(value.f0, value.f1, 1);}}).keyBy(t - t.f0).sum(2);// sinkresult.print(result:);env.execute();}public static void main(String[] args) throws Exception {test2();}}///验证数据///
// 输入数据
[alanchanserver2 bin]$ nc -lk 9999
hello,123
alan,flink,good
alan_chan,hi,flink//控制台输出//控制台输出
计数器行数1 histogram:1
result: (hello,alan lines[1],1)
result: (123,alan lines[1],1)
计数器行数2 histogram:2
result: (alan,alan lines[2],1)
result: (flink,alan lines[2],1)
result: (good,alan lines[2],1)
计数器行数3 histogram:3
result: (alan_chan,alan lines[3],1)
result: (hi,alan lines[3],1)
result: (flink,alan lines[2],2)
5、Meter
仪表测量平均吞吐量。可以使用markEvent()方法注册事件的发生。可以使用markEvent(long n)方法注册同时发生多个事件。您可以通过在MetricGroup上调用meter(String name, Meter meter)来注册meter。
下面的示例展示了自定义的Meter实现可能很不严谨实际上应用更多的是本部分的第二个示例。
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.Meter;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;//import com.codahale.metrics.Meter;
import com.codahale.metrics.SlidingWindowReservoir;/*** author alanchan**/
public class TestMetricsMeterDemo {public class MyMapper extends RichMapFunctionLong, Long {private transient Meter meter;Overridepublic void open(Configuration config) {this.meter getRuntimeContext().getMetricGroup().meter(myMeter, new AlanMeter());}Overridepublic Long map(Long value) throws Exception {this.meter.markEvent();return value;}}public static class AlanMeter implements Meter {/** The underlying counter maintaining the count. */private final Counter counter new SimpleCounter();;/** The time-span over which the average is calculated. */private final int timeSpanInSeconds 0;/** Circular array containing the history of values. */private final long[] values null;;/** The index in the array for the current time. */private int time 0;/** The last rate we computed. */private double currentRate 0;Overridepublic void markEvent() {this.counter.inc();}Overridepublic void markEvent(long n) {this.counter.inc(n);}Overridepublic long getCount() {return counter.getCount();}Overridepublic double getRate() {return currentRate;}public void update() {time (time 1) % values.length;values[time] counter.getCount();currentRate ((double) (values[time] - values[(time 1) % values.length]) / timeSpanInSeconds);}}public static void test1() throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);env.setParallelism(1);// sourceDataStreamString lines env.socketTextStream(192.168.10.42, 9999);// transformation// RichFlatMapFunctionIN, OUT// Tuple3String, String, Integer 输入的字符串alan lines[行数]统计单词的总数DataStreamTuple3String, String, Integer result lines.flatMap(new RichFlatMapFunctionString, Tuple2String, String() {private long result2LineCounter 0;private GaugeString gauge null;private Histogram histogram null;private Meter meter;Overridepublic void open(Configuration config) {result2LineCounter getRuntimeContext().getMetricGroup().counter(resultLineCounter:).getCount();gauge getRuntimeContext().getMetricGroup().gauge(alanGauge, new GaugeString() {Overridepublic String getValue() {return alan lines[ result2LineCounter ];}});com.codahale.metrics.Histogram dropwizardHistogram new com.codahale.metrics.Histogram(new SlidingWindowReservoir(500));this.histogram getRuntimeContext().getMetricGroup().histogram(alanHistogram, new DropwizardHistogramWrapper(dropwizardHistogram));this.meter getRuntimeContext().getMetricGroup().meter(alanMeter, new AlanMeter());}Overridepublic void flatMap(String value, CollectorTuple2String, String out) throws Exception {result2LineCounter;this.histogram.update(result2LineCounter * 3);this.meter.markEvent();// 此处仅仅示例this.histogram.getCount()、this.meter.getRate()的值没有实际的意义具体使用以实际使用场景为准System.out.println(计数器行数 result2LineCounter , histogram: this.histogram.getCount() , meter.getRate: this.meter.getRate());String[] arr value.split(,);for (String word : arr) {out.collect(Tuple2.of(word, gauge.getValue()));}}}).map(new MapFunctionTuple2String, String, Tuple3String, String, Integer() {Overridepublic Tuple3String, String, Integer map(Tuple2String, String value) throws Exception {return Tuple3.of(value.f0, value.f1, 1);}}).keyBy(t - t.f0).sum(2);// sinkresult.print(result:);env.execute();}public static void main(String[] args) throws Exception {test1();}}///验证数据///
// 输入数据
[alanchanserver2 bin]$ nc -lk 9999
hello,123
alan,flink,good
alan_chan,hi,flink//控制台输出
计数器行数1, histogram:1, meter.getRate:0.0
result: (hello,alan lines[1],1)
result: (123,alan lines[1],1)
计数器行数2, histogram:2, meter.getRate:0.0
result: (alan,alan lines[2],1)
result: (flink,alan lines[2],1)
result: (good,alan lines[2],1)
计数器行数3, histogram:3, meter.getRate:0.0
result: (alan_chan,alan lines[3],1)
result: (hi,alan lines[3],1)
result: (flink,alan lines[2],2)
Flink提供了一个允许使用Codahale/DropWizard仪表的包装器。要使用此包装器 在pom.xml中添加以下依赖项
dependencygroupIdorg.apache.flink/groupIdartifactIdflink-metrics-dropwizard/artifactIdversion1.17.1/version
/dependency下面使用Codahale/DropWizard注册的示例如下所示
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper;
import org.apache.flink.dropwizard.metrics.DropwizardMeterWrapper;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.Meter;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;//import com.codahale.metrics.Meter;
import com.codahale.metrics.SlidingWindowReservoir;/*** author alanchan**/
public class TestMetricsMeterDemo {public static void test2() throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);env.setParallelism(1);// sourceDataStreamString lines env.socketTextStream(192.168.10.42, 9999);// transformation// RichFlatMapFunctionIN, OUT// Tuple3String, String, Integer 输入的字符串alan lines[行数]统计单词的总数DataStreamTuple3String, String, Integer result lines.flatMap(new RichFlatMapFunctionString, Tuple2String, String() {private long result2LineCounter 0;private GaugeString gauge null;private Histogram histogram null;private Meter meter;Overridepublic void open(Configuration config) {result2LineCounter getRuntimeContext().getMetricGroup().counter(resultLineCounter:).getCount();gauge getRuntimeContext().getMetricGroup().gauge(alanGauge, new GaugeString() {Overridepublic String getValue() {return alan lines[ result2LineCounter ];}});com.codahale.metrics.Histogram dropwizardHistogram new com.codahale.metrics.Histogram(new SlidingWindowReservoir(500));this.histogram getRuntimeContext().getMetricGroup().histogram(alanHistogram, new DropwizardHistogramWrapper(dropwizardHistogram));// this.meter getRuntimeContext().getMetricGroup().meter(alanMeter, new AlanMeter());com.codahale.metrics.Meter dropwizardMeter new com.codahale.metrics.Meter();this.meter getRuntimeContext().getMetricGroup().meter(alanMeter, new DropwizardMeterWrapper(dropwizardMeter));}Overridepublic void flatMap(String value, CollectorTuple2String, String out) throws Exception {result2LineCounter;this.histogram.update(result2LineCounter * 3);this.meter.markEvent();// 此处仅仅示例this.histogram.getCount()、this.meter.getRate()的值没有实际的意义具体使用以实际使用场景为准System.out.println(计数器行数 result2LineCounter , histogram: this.histogram.getCount() , meter.getRate: this.meter.getRate());String[] arr value.split(,);for (String word : arr) {out.collect(Tuple2.of(word, gauge.getValue()));}}}).map(new MapFunctionTuple2String, String, Tuple3String, String, Integer() {Overridepublic Tuple3String, String, Integer map(Tuple2String, String value) throws Exception {return Tuple3.of(value.f0, value.f1, 1);}}).keyBy(t - t.f0).sum(2);// sinkresult.print(result:);env.execute();}public static void main(String[] args) throws Exception {test2();}}//控制台输出
计数器行数1, histogram:1, meter.getRate:0.0
result: (hello,alan lines[1],1)
result: (123,alan lines[1],1)
计数器行数2, histogram:2, meter.getRate:0.0
result: (alan,alan lines[2],1)
result: (flink,alan lines[2],1)
result: (good,alan lines[2],1)
计数器行数3, histogram:3, meter.getRate:0.0
result: (alan_chan,alan lines[3],1)
result: (hi,alan lines[3],1)
result: (flink,alan lines[2],2)
2、Scope 范围
本部分的示例比较简单不再提供具体的验证内容。
每个metric 度量都被分配了一个标识符和一组key-value对在这些key-value对下将报告度量。
标识符基于3个组件注册度量时的用户定义名称、可选的用户定义范围和系统提供的范围。例如如果A.B是系统作用域C.D是用户作用域E是名称那么度量的标识符将是A.B.C.D.E。
您可以通过在conf/flink-conf.yaml中设置metrics.scope.delimiter键来配置用于标识符的分隔符默认值.。
1、用户范围
您可以通过调用MetricGroup#addGroup(String name)、MetricGroup#addGroup(int name) 或MetricGroup#addGroup(String key, String value)来定义用户作用域。这些方法影响MetricGroup#getMetricIdentifier和MetricGroup#getScopeComponents返回的内容。
counter getRuntimeContext().getMetricGroup().addGroup(MyMetrics).counter(myCounter);counter getRuntimeContext().getMetricGroup().addGroup(MyMetricsKey, MyMetricsValue).counter(myCounter);2、系统范围System Scope
系统范围包含有关度量的上下文信息例如它在哪个任务中注册或者该任务属于哪个作业。
应该包括哪些上下文信息可以通过在conf/flink-conf.yaml中设置以下键来配置。这些键中的每一个都需要一个格式字符串该字符串可能包含常量例如“taskmanager”和变量例如“task_id”这些常量和变量将在运行时被替换。
metrics.scope.jm Default: .jobmanager 应用于job manager范围内的所有指标metrics.scope.jm-job Default: .jobmanager.job_name 应用于 job manager and job范围内的所有度量metrics.scope.tm Default: .taskmanager.tm_id 应用于task manager范围内的所有度量metrics.scope.tm-job Default: .taskmanager.tm_id.job_name 应用于范围为task manager and job的所有度量metrics.scope.task Default: .taskmanager.tm_id.job_name.task_name.subtask_index 应用于task范围内的所有度量metrics.scope.operator Default: .taskmanager.tm_id.job_name.operator_name.subtask_index 应用于作用域为operator的所有度量
变量的数量或顺序没有限制。变量区分大小写。 操作员度量的默认作用域将产生类似于 localhost.taskmanager.1234.MyJob.MyOperator.0.MyMetric 的标识符
如果还希望包含任务名称但省略task manager信息则可以指定以下格式 metrics.scope.operator: .job_name.task_name.operator_name.subtask_index
这可以创建标识符localhost localhost.MyJob.MySource_-_MyOperator.MyOperator.0.MyMetric.
对于此格式字符串如果同一作业同时运行多次可能会发生标识符冲突从而导致度量数据不一致。因此建议使用通过包括id例如job_id或通过为作业和运算符分配唯一名称来提供一定程度的唯一性的格式字符串。
3、所有变量列表
JobManager: TaskManager: , tm_idJob: job_id, job_nameTask: task_id, task_name, task_attempt_id, task_attempt_num, subtask_indexOperator: operator_id,operator_name, subtask_index
对于Batch API, operator_id task_id.
4、用户变量
您可以通过调用MetricGroup#addGroup(String key, String value)来定义用户变量。此方法会影响MetricGroup#getMetricIdentifier、MetricGroup#getScopeComponents和MetricGroup#getAllVariables()返回的内容。
用户变量不能用于范围格式。
3、Reporter
Flink 支持用户将 Flink 的各项运行时指标发送给外部系统。 有关如何设置Flink的度量报告程序的信息请查看47、Flink 的指标报告介绍及示例。
4、System metrics
默认情况下Flink会收集几个指标这些指标可以深入了解当前状态。本节是所有这些指标的参考。 下表通常有5列
“Scope”列描述了用于生成系统范围的scope format。例如如果单元格包含“Operator”则使用“metrics.scope.Operator”的scope format。如果单元格包含多个值用斜线分隔则会多次报告不同实体的metrics 例如作业管理器和任务管理器。optional“Infix”列描述了将哪个Infix附加到system scope。“Metrics”列列出了为给定 scope and infix注册的所有度量的名称。“Description”列提供了有关给定度量的测量信息。“Type”列描述了用于测量的度量类型。
请注意 infix/metric名称列中的所有点仍受“metrics.demitter”设置的约束。
因此为了推断metric identifier 1、采用基于“Scope”列的scope-format 2、将值附加到“Infix”列中如果存在并说明“metrics.demitter”设置 3、附加metric 名称。
1、CPU 2、Memory
与内存相关的指标要求Oracle的内存管理也包含在OpenJDK的Hotspot实现中到位。在使用其他JVM实现例如IBM’s J9时某些度量可能不会公开。
3、Threads 4、GarbageCollection 5、ClassLoader 6、Network 不推荐使用默认的Default shuffle service 中的指标 7、Default shuffle service
与使用netty网络通信的任务执行器之间的数据交换相关的度量。 8、Cluster 9、Availability
此表中的指标可用于以下每个作业状态INITIALIZING、CREATED、RUNNING、RESTARTING、CANCELLING、FAILING。是否报告这些指标取决于metrics.job.status.enable设置。 这些度量的语义可能会在以后的版本中发生变化。 实验阶段功能 当作业处于RUNNING状态时此表中的指标提供了有关作业当前正在执行的操作的其他详细信息。是否报告这些指标取决于metrics.job.status.enable设置。 在以下情况下作业被视为正在部署任务
对于流作业任何任务都处于“正在部署”状态对于批处理作业如果至少有一个任务处于展开状态并且没有INITIALIZING/RUNNING任务 10、Checkpointing 对于失败的检查点度量是在尽最大努力的基础上更新的可能不准确。 11、State Access Latency 12、RocksDB
某些RocksDB本机指标可用但默认情况下已禁用您可以在此处找到完整的文档
13、State Changelog 这些指标只能通过报告器获得。 14、IO 15、Connectors
Kafka Connectors Kinesis 源 Kinesis 接收器 HBase Connectors
16、System resources
默认情况下系统资源报告处于禁用状态。启用metrics.system-resource后以下列出的其他度量将在Job和TaskManager上可用。系统资源度量被定期更新并且它们呈现配置的间隔metrics.System resource probing interval的平均值。 系统资源报告要求类路径上存在一个可选的依赖项例如位于Flink的lib目录中
com.github.oshioshi核心6.1.5根据MIT license授权
包括它的可传递依赖项
net.java.dev.jna:jna平台jar:5.10.0net.java.dev.jna:jar:5.10.0
这方面的故障将报告为警告消息如SystemResourcesMetricInitializer在启动期间记录的NoClassDefFoundError。
System CPU System memory System network
17、预测执行
以下指标可以用来衡量预测执行的有效性。
5、End-to-End latency tracking 延迟跟踪
Flink允许跟踪在系统中传输的记录的延迟。默认情况下此功能处于禁用状态。要启用延迟跟踪必须在Flink配置或ExecutionConfig中将latencyTracingInterval设置为正数。
在latencyTracingInterval源将周期性地发出一个特殊记录称为LatencyMarker。标记包含从记录在源处发出的时间开始的时间戳。延迟标记无法超过常规用户记录因此如果记录在operator面前排队则会增加标记跟踪的延迟。
延迟标记并没有考虑用户记录在运算符中花费的时间因为它们正在绕过它们。特别是标记没有考虑记录在窗口缓冲区中花费的时间。只有当operator无法接受新记录因此他们正在排队时使用标记测量的延迟才会反映这一点。 LatencyMarketers用于导出拓扑的源和每个下游操作符之间的延迟分布。这些分布被报告为直方图度量。这些分布的粒度可以在Flink配置中进行控制。对于最高粒度的子任务Flink将导出每个源子任务和每个下游子任务之间的延迟分布这将导致直方图的二次就并行性而言数量。
目前Flink假设集群中所有机器的时钟都是同步的。我们建议设置自动时钟同步服务如NTP以避免错误的延迟结果。
警告启用延迟度量可能会显著影响集群的性能尤其是子任务粒度。强烈建议仅将它们用于调试目的。
6、State access latency tracking 延迟跟踪
Flink还允许跟踪标准Flinkstate-backends或从AbstractStateBackend扩展的自定义state-backends的keyed state访问延迟。默认情况下此功能处于禁用状态。要启用此功能必须在Flink配置中将state.backend.latency-track.keyed-state-enabled设置为true。
启用跟踪keyed state访问延迟后Flink将对每N次访问的状态访问延迟进行采样其中N由state.backend.latency-track.sample-interval定义。此配置的默认值为100。较小的值将获得更准确的结果但由于采样频率更高因此对性能的影响更大。
由于此延迟度量的类型为直方图state.backend.latency-track.history-size将控制历史记录值的最大数量默认值为128。此配置的较大值将需要更多的内存但将提供更准确的结果。
警告启用状态访问延迟度量可能会影响性能。建议仅将它们用于调试目的。
7、REST API integration
可以通过监控REST API查询度量。
下面是可用endpoints的列表其中包含一个示例JSON响应。 所有endpoints均为示例形式http://hostname:8081/jobmanager/metrics
下面我们只列出URL的路径部分。 例如中的值是变量http://hostname:8081/jobs//metrics必须被请求 例如 http://192.168.10.49:8081/jobs/cb4443fd87ed97873b55be1bdefede30/metrics.
特定实体的请求度量
/jobmanager/metrics/taskmanagers/taskmanagerid/metrics/jobs/jobid/metrics/jobs/jobid/vertices/vertexid/subtasks/subtaskindex示例 通过日志或web ui界面可以很容易找到jobid。 下图为示例性图示 本示例下面的参数如 jobidbb741e7e46d97541a83a492c948e000d taskmanagerid192.168.10.42:42933-a2a682 subtaskindex0 vertexidcbc357ccb763df2852fee8c4fc7d55f2
## 1、/jobmanager/metrics
http://192.168.10.41:9081/jobmanager/metrics
[{id:Status.JVM.GarbageCollector.PS_MarkSweep.Time},{id:Status.JVM.Memory.Mapped.TotalCapacity},{id:taskSlotsAvailable},{id:taskSlotsTotal},{id:Status.JVM.Memory.Mapped.MemoryUsed},{id:Status.JVM.CPU.Time},{id:Status.JVM.Threads.Count},{id:Status.JVM.Memory.Heap.Committed},{id:Status.JVM.Memory.Metaspace.Committed},{id:Status.JVM.GarbageCollector.PS_MarkSweep.Count},{id:Status.JVM.GarbageCollector.PS_Scavenge.Time},{id:Status.JVM.Memory.Direct.Count},{id:Status.JVM.GarbageCollector.PS_Scavenge.Count},{id:Status.JVM.Memory.NonHeap.Max},{id:numRegisteredTaskManagers},{id:Status.JVM.Memory.NonHeap.Committed},{id:Status.JVM.Memory.NonHeap.Used},{id:Status.JVM.Memory.Metaspace.Max},{id:Status.JVM.Memory.Direct.MemoryUsed},{id:Status.JVM.Memory.Direct.TotalCapacity},{id:numRunningJobs},{id:Status.JVM.ClassLoader.ClassesLoaded},{id:Status.JVM.Memory.Mapped.Count},{id:Status.JVM.Memory.Metaspace.Used},{id:Status.JVM.CPU.Load},{id:Status.JVM.Memory.Heap.Max},{id:Status.JVM.Memory.Heap.Used},{id:Status.JVM.ClassLoader.ClassesUnloaded}]## 2、/taskmanagers/taskmanagerid/metrics
http://192.168.10.41:9081/taskmanagers/192.168.10.42:42933-a2a682/metrics
[{id:Status.JVM.Memory.Mapped.TotalCapacity},{id:Status.Network.AvailableMemorySegments},{id:Status.Network.TotalMemorySegments},{id:Status.JVM.Memory.Mapped.MemoryUsed},{id:Status.Flink.Memory.Managed.Total},{id:Status.JVM.CPU.Time},{id:Status.JVM.GarbageCollector.G1_Young_Generation.Count},{id:Status.JVM.Threads.Count},{id:Status.Shuffle.Netty.UsedMemory},{id:Status.JVM.Memory.Heap.Committed},{id:Status.Shuffle.Netty.TotalMemory},{id:Status.JVM.Memory.Metaspace.Committed},{id:Status.JVM.Memory.Direct.Count},{id:Status.Shuffle.Netty.AvailableMemorySegments},{id:Status.JVM.Memory.NonHeap.Max},{id:Status.Shuffle.Netty.TotalMemorySegments},{id:Status.JVM.Memory.NonHeap.Committed},{id:Status.JVM.Memory.NonHeap.Used},{id:Status.JVM.Memory.Metaspace.Max},{id:Status.JVM.GarbageCollector.G1_Old_Generation.Count},{id:Status.JVM.Memory.Direct.MemoryUsed},{id:Status.JVM.Memory.Direct.TotalCapacity},{id:Status.JVM.GarbageCollector.G1_Old_Generation.Time},{id:Status.Shuffle.Netty.UsedMemorySegments},{id:Status.JVM.ClassLoader.ClassesLoaded},{id:Status.JVM.Memory.Mapped.Count},{id:Status.JVM.Memory.Metaspace.Used},{id:Status.Flink.Memory.Managed.Used},{id:Status.JVM.CPU.Load},{id:Status.JVM.Memory.Heap.Used},{id:Status.JVM.Memory.Heap.Max},{id:Status.JVM.ClassLoader.ClassesUnloaded},{id:Status.JVM.GarbageCollector.G1_Young_Generation.Time},{id:Status.Shuffle.Netty.AvailableMemory}]## 3、/jobs/jobid/metrics
http://192.168.10.41:9081/jobs/bb741e7e46d97541a83a492c948e000d/metrics
[{id:numberOfFailedCheckpoints},{id:lastCheckpointSize},{id:totalNumberOfCheckpoints},{id:lastCheckpointExternalPath},{id:lastCheckpointRestoreTimestamp},{id:uptime},{id:restartingTime},{id:numberOfInProgressCheckpoints},{id:downtime},{id:lastCheckpointProcessedData},{id:numberOfCompletedCheckpoints},{id:numRestarts},{id:fullRestarts},{id:lastCheckpointDuration},{id:lastCheckpointPersistedData}]## 4、/jobs/jobid/vertices/vertexid/subtasks/subtaskindex
## 在作业详情页面中找到并点击 Task Managers 选项卡。
## 在 Task Managers 页面中您可以查看每个 Task Manager 的详细信息包括其分配给该 Task Manager 的任务即 vertex及其 ID。
## 或者在chrome中右击检查中查看vertexid由于下面的链接内容太多仅仅截图展示
http://192.168.10.41:9081/jobs/1b0700f7510a9dd7fd65aee66ad0382a/vertices/cbc357ccb763df2852fee8c4fc7d55f2/subtasks/metrics在相应类型的所有实体中聚合的请求度量
/taskmanagers/metrics/jobs/metrics/jobs/jobid/vertices/vertexid/subtasks/metrics示例 通过日志或web ui界面可以很容易找到jobid。 本示例下面的参数如 jobidbb741e7e46d97541a83a492c948e000d vertexidcbc357ccb763df2852fee8c4fc7d55f2
## 1、/taskmanagers/metrics
http://192.168.10.41:9081/taskmanagers/metrics
[{id:Status.JVM.Memory.Mapped.TotalCapacity},{id:Status.Network.AvailableMemorySegments},{id:Status.Network.TotalMemorySegments},{id:Status.JVM.Memory.Mapped.MemoryUsed},{id:Status.Flink.Memory.Managed.Total},{id:Status.JVM.CPU.Time},{id:Status.JVM.GarbageCollector.G1_Young_Generation.Count},{id:Status.JVM.Threads.Count},{id:Status.Shuffle.Netty.UsedMemory},{id:Status.JVM.Memory.Heap.Committed},{id:Status.Shuffle.Netty.TotalMemory},{id:Status.JVM.Memory.Metaspace.Committed},{id:Status.JVM.Memory.Direct.Count},{id:Status.Shuffle.Netty.AvailableMemorySegments},{id:Status.JVM.Memory.NonHeap.Max},{id:Status.Shuffle.Netty.TotalMemorySegments},{id:Status.JVM.Memory.NonHeap.Committed},{id:Status.JVM.Memory.NonHeap.Used},{id:Status.JVM.Memory.Metaspace.Max},{id:Status.JVM.GarbageCollector.G1_Old_Generation.Count},{id:Status.JVM.Memory.Direct.MemoryUsed},{id:Status.JVM.GarbageCollector.G1_Old_Generation.Time},{id:Status.JVM.Memory.Direct.TotalCapacity},{id:Status.Shuffle.Netty.UsedMemorySegments},{id:Status.JVM.ClassLoader.ClassesLoaded},{id:Status.JVM.Memory.Mapped.Count},{id:Status.Flink.Memory.Managed.Used},{id:Status.JVM.Memory.Metaspace.Used},{id:Status.JVM.CPU.Load},{id:Status.JVM.Memory.Heap.Max},{id:Status.JVM.Memory.Heap.Used},{id:Status.JVM.ClassLoader.ClassesUnloaded},{id:Status.JVM.GarbageCollector.G1_Young_Generation.Time},{id:Status.Shuffle.Netty.AvailableMemory}]## 2、/jobs/metrics
http://192.168.10.41:9081/jobs/metrics
[{id:numberOfFailedCheckpoints},{id:lastCheckpointSize},{id:totalNumberOfCheckpoints},{id:lastCheckpointExternalPath},{id:lastCheckpointRestoreTimestamp},{id:uptime},{id:restartingTime},{id:numberOfInProgressCheckpoints},{id:downtime},{id:lastCheckpointProcessedData},{id:numberOfCompletedCheckpoints},{id:numRestarts},{id:fullRestarts},{id:lastCheckpointDuration},{id:lastCheckpointPersistedData}]## 3、/jobs/jobid/vertices/vertexid/subtasks/metrics
## 在作业详情页面中找到并点击 Task Managers 选项卡。
## 在 Task Managers 页面中您可以查看每个 Task Manager 的详细信息包括其分配给该 Task Manager 的任务即 vertex及其 ID。
## 或者在chrome中右击检查中查看vertexid由于下面的链接内容太多仅仅截图展示
http://192.168.10.41:9081/jobs/1b0700f7510a9dd7fd65aee66ad0382a/vertices/cbc357ccb763df2852fee8c4fc7d55f2/subtasks/metrics 在相应类型的所有实体的子集上聚合的请求度量
/taskmanagers/metrics?taskmanagersA,B,C/jobs/metrics?jobsD,E,F/jobs/jobid/vertices/vertexid/subtasks/metrics?subtask1,2,3示例 本示例下面的参数如 jobidbb741e7e46d97541a83a492c948e000d taskmanagerid1192.168.10.42:42933-a2a682 taskmanagerid2192.168.10.43:38542-8d626d taskmanagerid3192.168.10.44:43904-9a6f04 vertexidcbc357ccb763df2852fee8c4fc7d55f2
## 1、/taskmanagers/metrics?taskmanagersA,B,C
http://192.168.10.41:9081/taskmanagers/metrics?taskmanagers192.168.10.42:42933-a2a682,192.168.10.43:38542-8d626d,192.168.10.44:43904-9a6f04
[{id:Status.JVM.Memory.Mapped.TotalCapacity},{id:Status.Network.AvailableMemorySegments},{id:Status.Network.TotalMemorySegments},{id:Status.JVM.Memory.Mapped.MemoryUsed},{id:Status.Flink.Memory.Managed.Total},{id:Status.JVM.CPU.Time},{id:Status.JVM.GarbageCollector.G1_Young_Generation.Count},{id:Status.JVM.Threads.Count},{id:Status.Shuffle.Netty.UsedMemory},{id:Status.JVM.Memory.Heap.Committed},{id:Status.Shuffle.Netty.TotalMemory},{id:Status.JVM.Memory.Metaspace.Committed},{id:Status.JVM.Memory.Direct.Count},{id:Status.Shuffle.Netty.AvailableMemorySegments},{id:Status.JVM.Memory.NonHeap.Max},{id:Status.Shuffle.Netty.TotalMemorySegments},{id:Status.JVM.Memory.NonHeap.Committed},{id:Status.JVM.Memory.NonHeap.Used},{id:Status.JVM.Memory.Metaspace.Max},{id:Status.JVM.GarbageCollector.G1_Old_Generation.Count},{id:Status.JVM.Memory.Direct.MemoryUsed},{id:Status.JVM.Memory.Direct.TotalCapacity},{id:Status.JVM.GarbageCollector.G1_Old_Generation.Time},{id:Status.Shuffle.Netty.UsedMemorySegments},{id:Status.JVM.ClassLoader.ClassesLoaded},{id:Status.JVM.Memory.Mapped.Count},{id:Status.JVM.Memory.Metaspace.Used},{id:Status.Flink.Memory.Managed.Used},{id:Status.JVM.CPU.Load},{id:Status.JVM.Memory.Heap.Used},{id:Status.JVM.Memory.Heap.Max},{id:Status.JVM.ClassLoader.ClassesUnloaded},{id:Status.JVM.GarbageCollector.G1_Young_Generation.Time},{id:Status.Shuffle.Netty.AvailableMemory}]## 2、/jobs/metrics?jobsD,E,F
http://192.168.10.41:9081/jobs/metrics?jobsbb741e7e46d97541a83a492c948e000d
[{id:numberOfFailedCheckpoints},{id:lastCheckpointSize},{id:totalNumberOfCheckpoints},{id:lastCheckpointExternalPath},{id:lastCheckpointRestoreTimestamp},{id:uptime},{id:restartingTime},{id:numberOfInProgressCheckpoints},{id:downtime},{id:lastCheckpointProcessedData},{id:numberOfCompletedCheckpoints},{id:numRestarts},{id:fullRestarts},{id:lastCheckpointDuration},{id:lastCheckpointPersistedData}]## 3、/jobs/jobid/vertices/vertexid/subtasks/metrics?subtask1,2,3
## 在作业详情页面中找到并点击 Task Managers 选项卡。
## 在 Task Managers 页面中您可以查看每个 Task Manager 的详细信息包括其分配给该 Task Manager 的任务即 vertex及其 ID。
## 或者在chrome中右击检查中查看vertexid由于下面的链接内容太多仅仅截图展示
## 以下按照步骤显示subtask的metrics
### 1、查询所有的subtask其内容如上图不再赘述
http://192.168.10.41:9081/jobs/1b0700f7510a9dd7fd65aee66ad0382a/vertices/cbc357ccb763df2852fee8c4fc7d55f2/subtasks/metrics
### 2、
### 上图中有id为 Source__TableSourceScan(table[[default_catalog__default_database__alanchan_kafk.KafkaConsumer.bytes-consumed-total、Source__TableSourceScan(table[[default_catalog__default_database__alanchan_kafk.failed-reauthentication-rate等本示例就以查其2个子任务
http://192.168.10.41:9081/jobs/1b0700f7510a9dd7fd65aee66ad0382a/vertices/cbc357ccb763df2852fee8c4fc7d55f2/subtasks/metrics?subtask Source__TableSourceScan(table[[default_catalog__default_database__alanchan_kafk.KafkaConsumer.bytes-consumed-total,Source__TableSourceScan(table[[default_catalog__default_database__alanchan_kafk.failed-reauthentication-rate
### 其内容太多见下面截图 警告度量名称可以包含查询度量时需要转义的特殊字符。例如“a_b”将转义为“a%2B_b”。
应转义的字符列表
1/jobmanager/metrics示例
GET /jobmanager/metrics
http://192.168.10.41:9081/jobmanager/metrics
[{id:Status.JVM.GarbageCollector.PS_MarkSweep.Time},{id:Status.JVM.Memory.Mapped.TotalCapacity},{id:taskSlotsAvailable},{id:taskSlotsTotal},{id:Status.JVM.Memory.Mapped.MemoryUsed},{id:Status.JVM.CPU.Time},{id:Status.JVM.Threads.Count},{id:Status.JVM.Memory.Heap.Committed},{id:Status.JVM.Memory.Metaspace.Committed},{id:Status.JVM.GarbageCollector.PS_MarkSweep.Count},{id:Status.JVM.GarbageCollector.PS_Scavenge.Time},{id:Status.JVM.Memory.Direct.Count},{id:Status.JVM.GarbageCollector.PS_Scavenge.Count},{id:Status.JVM.Memory.NonHeap.Max},{id:numRegisteredTaskManagers},{id:Status.JVM.Memory.NonHeap.Committed},{id:Status.JVM.Memory.NonHeap.Used},{id:Status.JVM.Memory.Metaspace.Max},{id:Status.JVM.Memory.Direct.MemoryUsed},{id:Status.JVM.Memory.Direct.TotalCapacity},{id:numRunningJobs},{id:Status.JVM.ClassLoader.ClassesLoaded},{id:Status.JVM.Memory.Mapped.Count},{id:Status.JVM.Memory.Metaspace.Used},{id:Status.JVM.CPU.Load},{id:Status.JVM.Memory.Heap.Max},{id:Status.JVM.Memory.Heap.Used},{id:Status.JVM.ClassLoader.ClassesUnloaded}]
2 taskmanagers//metrics?getmetric1,metric2示例
请求特定taskmanagers的 Metric 的值未聚合 GET taskmanagers//metrics?getmetric1,metric2 本示例下面的参数如 taskmanagerid1192.168.10.42:42933-a2a682 taskmanagerid2192.168.10.43:38542-8d626d taskmanagerid3192.168.10.44:43904-9a6f04
## 1、获取taskmananger的指标
http://192.168.10.41:9081/taskmanagers/metrics
[{id:Status.JVM.Memory.Mapped.TotalCapacity},{id:Status.Network.AvailableMemorySegments},{id:Status.Network.TotalMemorySegments},{id:Status.JVM.Memory.Mapped.MemoryUsed},{id:Status.Flink.Memory.Managed.Total},{id:Status.JVM.CPU.Time},{id:Status.JVM.GarbageCollector.G1_Young_Generation.Count},{id:Status.JVM.Threads.Count},{id:Status.Shuffle.Netty.UsedMemory},{id:Status.JVM.Memory.Heap.Committed},{id:Status.Shuffle.Netty.TotalMemory},{id:Status.JVM.Memory.Metaspace.Committed},{id:Status.JVM.Memory.Direct.Count},{id:Status.Shuffle.Netty.AvailableMemorySegments},{id:Status.JVM.Memory.NonHeap.Max},{id:Status.Shuffle.Netty.TotalMemorySegments},{id:Status.JVM.Memory.NonHeap.Committed},{id:Status.JVM.Memory.NonHeap.Used},{id:Status.JVM.Memory.Metaspace.Max},{id:Status.JVM.GarbageCollector.G1_Old_Generation.Count},{id:Status.JVM.Memory.Direct.MemoryUsed},{id:Status.JVM.GarbageCollector.G1_Old_Generation.Time},{id:Status.JVM.Memory.Direct.TotalCapacity},{id:Status.Shuffle.Netty.UsedMemorySegments},{id:Status.JVM.ClassLoader.ClassesLoaded},{id:Status.JVM.Memory.Mapped.Count},{id:Status.Flink.Memory.Managed.Used},{id:Status.JVM.Memory.Metaspace.Used},{id:Status.JVM.CPU.Load},{id:Status.JVM.Memory.Heap.Max},{id:Status.JVM.Memory.Heap.Used},{id:Status.JVM.ClassLoader.ClassesUnloaded},{id:Status.JVM.GarbageCollector.G1_Young_Generation.Time},{id:Status.Shuffle.Netty.AvailableMemory}]## 2、获取指定taskmanager的指定指标Status.JVM.Memory.Mapped.TotalCapacity和Status.JVM.CPU.Load的值
http://192.168.10.41:9081/taskmanagers/192.168.10.42:42933-a2a682/metrics?getStatus.JVM.Memory.Mapped.TotalCapacity,Status.JVM.CPU.Load[{id: Status.JVM.Memory.Mapped.TotalCapacity,value: 0
}, {id: Status.JVM.CPU.Load,value: 0.001329512482145905
}]3/taskmanagers/metrics?getmetric1,metric2示例
请求特定 Metric 的聚合值 GET /taskmanagers/metrics?getmetric1,metric2 GET /taskmanagers/metrics?getmetric1,metric2 ## 1、获取taskmananger的指标
http://192.168.10.41:9081/taskmanagers/metrics
[{id:Status.JVM.Memory.Mapped.TotalCapacity},{id:Status.Network.AvailableMemorySegments},{id:Status.Network.TotalMemorySegments},{id:Status.JVM.Memory.Mapped.MemoryUsed},{id:Status.Flink.Memory.Managed.Total},{id:Status.JVM.CPU.Time},{id:Status.JVM.GarbageCollector.G1_Young_Generation.Count},{id:Status.JVM.Threads.Count},{id:Status.Shuffle.Netty.UsedMemory},{id:Status.JVM.Memory.Heap.Committed},{id:Status.Shuffle.Netty.TotalMemory},{id:Status.JVM.Memory.Metaspace.Committed},{id:Status.JVM.Memory.Direct.Count},{id:Status.Shuffle.Netty.AvailableMemorySegments},{id:Status.JVM.Memory.NonHeap.Max},{id:Status.Shuffle.Netty.TotalMemorySegments},{id:Status.JVM.Memory.NonHeap.Committed},{id:Status.JVM.Memory.NonHeap.Used},{id:Status.JVM.Memory.Metaspace.Max},{id:Status.JVM.GarbageCollector.G1_Old_Generation.Count},{id:Status.JVM.Memory.Direct.MemoryUsed},{id:Status.JVM.GarbageCollector.G1_Old_Generation.Time},{id:Status.JVM.Memory.Direct.TotalCapacity},{id:Status.Shuffle.Netty.UsedMemorySegments},{id:Status.JVM.ClassLoader.ClassesLoaded},{id:Status.JVM.Memory.Mapped.Count},{id:Status.Flink.Memory.Managed.Used},{id:Status.JVM.Memory.Metaspace.Used},{id:Status.JVM.CPU.Load},{id:Status.JVM.Memory.Heap.Max},{id:Status.JVM.Memory.Heap.Used},{id:Status.JVM.ClassLoader.ClassesUnloaded},{id:Status.JVM.GarbageCollector.G1_Young_Generation.Time},{id:Status.Shuffle.Netty.AvailableMemory}]## 2、获取taskmanagers的指定指标Status.JVM.Memory.Mapped.TotalCapacity和Status.JVM.CPU.Load的值
http://192.168.10.41:9081/taskmanagers/metrics?getStatus.JVM.Memory.Mapped.TotalCapacity,Status.JVM.CPU.Load
[{id: Status.JVM.Memory.Mapped.TotalCapacity,min: 0.0,max: 0.0,avg: 0.0,sum: 0.0},{id: Status.JVM.CPU.Load,min: 5.440145745299967E-4,max: 0.0015120478111207314,avg: 9.553803717257513E-4,sum: 0.002866141115177254}
]
4/taskmanagers/metrics?getmetric1,metric2aggmin,max示例
请求特定 Metric 的特定值的聚合值 GET /taskmanagers/metrics?getmetric1,metric2aggmin,max
## 1、获取taskmananger的指标
http://192.168.10.41:9081/taskmanagers/metrics
[{id:Status.JVM.Memory.Mapped.TotalCapacity},{id:Status.Network.AvailableMemorySegments},{id:Status.Network.TotalMemorySegments},{id:Status.JVM.Memory.Mapped.MemoryUsed},{id:Status.Flink.Memory.Managed.Total},{id:Status.JVM.CPU.Time},{id:Status.JVM.GarbageCollector.G1_Young_Generation.Count},{id:Status.JVM.Threads.Count},{id:Status.Shuffle.Netty.UsedMemory},{id:Status.JVM.Memory.Heap.Committed},{id:Status.Shuffle.Netty.TotalMemory},{id:Status.JVM.Memory.Metaspace.Committed},{id:Status.JVM.Memory.Direct.Count},{id:Status.Shuffle.Netty.AvailableMemorySegments},{id:Status.JVM.Memory.NonHeap.Max},{id:Status.Shuffle.Netty.TotalMemorySegments},{id:Status.JVM.Memory.NonHeap.Committed},{id:Status.JVM.Memory.NonHeap.Used},{id:Status.JVM.Memory.Metaspace.Max},{id:Status.JVM.GarbageCollector.G1_Old_Generation.Count},{id:Status.JVM.Memory.Direct.MemoryUsed},{id:Status.JVM.GarbageCollector.G1_Old_Generation.Time},{id:Status.JVM.Memory.Direct.TotalCapacity},{id:Status.Shuffle.Netty.UsedMemorySegments},{id:Status.JVM.ClassLoader.ClassesLoaded},{id:Status.JVM.Memory.Mapped.Count},{id:Status.Flink.Memory.Managed.Used},{id:Status.JVM.Memory.Metaspace.Used},{id:Status.JVM.CPU.Load},{id:Status.JVM.Memory.Heap.Max},{id:Status.JVM.Memory.Heap.Used},{id:Status.JVM.ClassLoader.ClassesUnloaded},{id:Status.JVM.GarbageCollector.G1_Young_Generation.Time},{id:Status.Shuffle.Netty.AvailableMemory}]## 2、获取taskmanagers的指定指标Status.JVM.Memory.Mapped.TotalCapacity和Status.JVM.CPU.Load的值
http://192.168.10.41:9081/taskmanagers/metrics?getStatus.JVM.Memory.Mapped.TotalCapacity,Status.JVM.CPU.Loadaggmin,max[{id: Status.JVM.Memory.Mapped.TotalCapacity,min: 0.0,max: 0.0},{id: Status.JVM.CPU.Load,min: 3.784653231147696E-4,max: 0.001422205366454916}
]
8、Dashboard integration
为每个task or operator收集的度量也可以在仪表板中可视化。在作业的主页面上选择度量选项卡。在顶部图形中选择一个任务后可以使用添加度量下拉菜单选择要显示的度量。
task指标列为subtask_indexmetric_name。operator指标列为subtask_indexoperator_namemetric_name。
每个度量将被可视化为一个单独的图形x轴表示时间y轴表示测量值。所有图形每10秒自动更新一次并在导航到另一个页面时继续更新。 可视化度量的数量没有限制然而只有数字度量可以被可视化。
以上本文简单的介绍了Flink 的指标体系内容即指标类型以及四种类型的代码实现示例、scope、系统指标、报告、跟踪、api与dashboard集成。 本专题分为三部分即 45、Flink 的指标体系介绍及验证1-指标类型及指标实现示例 45、Flink 的指标体系介绍及验证2-指标的scope、报告、系统指标以及追踪、api集成示例和dashboard集成 45、Flink 的指标体系介绍及验证3- 完整版