网站没有做301定向,提供户型图免费设计,行业猎头网,软件公司门户网站模板一. 背景
FLINK 任务从一个数据源读取数据, 写入多个sink端.
二. 官方实例
写入多个Sink语句时#xff0c;需要以BEGIN STATEMENT SET;开头#xff0c;以END;结尾。--源表
CREATE TEMPORARY TABLE datagen_source (name VARCHAR,score BIGINT
) WITH (connector datagen
…一. 背景
FLINK 任务从一个数据源读取数据, 写入多个sink端.
二. 官方实例
写入多个Sink语句时需要以BEGIN STATEMENT SET;开头以END;结尾。--源表
CREATE TEMPORARY TABLE datagen_source (name VARCHAR,score BIGINT
) WITH (connector datagen
);--结果表A
CREATE TEMPORARY TABLE blackhole_sinkA(name VARCHAR,score BIGINT
) WITH (connector blackhole
);--结果表B
CREATE TEMPORARY TABLE blackhole_sinkB(name VARCHAR,score BIGINT
) WITH (connector blackhole
);--DML
BEGIN STATEMENT SET; --写入多个Sink时必填。
INSERT INTO blackhole_sinkA SELECT UPPER(name), sum(score) FROM datagen_source GROUP BY UPPER(name);
INSERT INTO blackhole_sinkB SELECT LOWER(name), max(score) FROM datagen_source GROUP BY LOWER(name);
END; --写入多个Sink时必填。三. 实操
3.1. 启动Standlone集群
进入到flink引擎包目录, 启动Standlone模式. ./bin/start-cluster.sh 3.2. 启动flink sql-client. ./bin/sql-client.sh embedded 3.3. 执行sql
Flink SQL CREATE TEMPORARY TABLE datagen_source (name VARCHAR,score BIGINT) WITH (connector datagen);
[INFO] Execute statement succeed.Flink SQL CREATE TEMPORARY TABLE blackhole_sinkA(name VARCHAR,score BIGINT) WITH (connector blackhole);[INFO] Execute statement succeed.Flink SQL CREATE TEMPORARY TABLE blackhole_sinkB(name VARCHAR,score BIGINT) WITH (connector blackhole);[INFO] Execute statement succeed.Flink SQL BEGIN STATEMENT SET;
[INFO] Begin a statement set.Flink SQL INSERT INTO blackhole_sinkASELECT UPPER(name), sum(score)FROM datagen_sourceGROUP BY UPPER(name);
[INFO] Add SQL update statement to the statement set.Flink SQL INSERT INTO blackhole_sinkBSELECT LOWER(name), max(score)FROM datagen_sourceGROUP BY LOWER(name);
[INFO] Add SQL update statement to the statement set.Flink SQL END;
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 37a1390129c356374601a267cb8080b63.4. 查看flink ui
查看flink ui页面,验证结论. http://master01:8081/#/job/37a1390129c356374601a267cb8080b6/overview