当前位置: 首页 > news >正文

中联网站建设muse怎么做网站

中联网站建设,muse怎么做网站,温州建设小学的网站,前程无忧网深圳网站建设类岗位1.Flink数据源 Flink可以从各种数据源获取数据#xff0c;然后构建DataStream 进行处理转换。source就是整个数据处理程序的输入端。 数据集合数据文件Socket数据kafka数据自定义Source 2.案例 2.1.从集合中获取数据 创建 FlinkSource_List 类#xff0c;再创建个 Student 类…1.Flink数据源 Flink可以从各种数据源获取数据然后构建DataStream 进行处理转换。source就是整个数据处理程序的输入端。 数据集合数据文件Socket数据kafka数据自定义Source 2.案例 2.1.从集合中获取数据 创建 FlinkSource_List 类再创建个 Student 类姓名、年龄、性别三个属性就行反正测试用 package com.qiyu;import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.ArrayList;/*** author MR.Liu* version 1.0* data 2023-10-18 16:13*/ public class FlinkSource_List {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);ArrayListStudent clicks new ArrayList();clicks.add(new Student(Mary,25,1));clicks.add(new Student(Bob,26,2));DataStreamStudent stream env.fromCollection(clicks);stream.print();env.execute();} }运行结果 Student{nameMary, age25, sex1} Student{nameBob, age26, sex2} 2.2.从文件中读取数据 文件数据 spark hello world kafka spark hadoop spark package com.qiyu;import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** author MR.Liu* version 1.0* data 2023-10-18 16:31*/ public class FlinkSource_File {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamString stream env.readTextFile(input/words.txt);stream.print();env.execute();} }运行结果没做任何处理 spark hello world kafka spark hadoop spark 2.3.从Socket中读取数据 package com.qiyu;import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** author MR.Liu* version 1.0* data 2023-10-18 17:41*/ public class FlinkSource_Socket {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 2. 读取文本流DataStreamSourceString lineDSS env.socketTextStream(192.168.220.130,7777);lineDSS.print();env.execute();} }运行结果 服务器上执行 nc -lk 7777 疯狂输出 控制台打印结果  6 hello 7 world 2.4.从Kafka中读取数据 pom.xml 添加Kafka连接依赖 dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-kafka_${scala.binary.version}/artifactIdversion${flink.version}/version/dependency package com.qiyu;import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import java.util.Properties;/*** author MR.Liu* version 1.0* data 2023-10-19 10:01*/ public class FlinkSource_Kafka {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);Properties properties new Properties();properties.setProperty(bootstrap.servers, hadoop102:9092);properties.setProperty(group.id, consumer-group);properties.setProperty(key.deserializer,org.apache.kafka.common.serialization.StringDeserializer);properties.setProperty(value.deserializer,org.apache.kafka.common.serialization.StringDeserializer);properties.setProperty(auto.offset.reset, latest);DataStreamSourceString stream env.addSource(new FlinkKafkaConsumerString(clicks, new SimpleStringSchema(), properties));stream.print(Kafka);env.execute();} }启动 zk 和kafka 创建topic bin/kafka-topics.sh --create --bootstrap-server hadoop102:9092 --replication-factor 1 --partitions 1 --topic clicks 生产者、消费者命令 bin/kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic clicks bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic clicks --from-beginning 启动生产者命令后疯狂输入  运行java类运行结果和生产者输入的是一样的 Kafka flinks Kafka hadoop Kafka hello Kafka nihaop 2.5.从自定义Source中读取数据 大多数情况下前面几个数据源已经满足需求了。但是遇到特殊情况我们需要自定义的数据源。实现方式如下 1.编辑自定义源Source package com.qiyu;import org.apache.flink.streaming.api.functions.source.SourceFunction;import java.util.Calendar; import java.util.Random;/*** author MR.Liu* version 1.0* data 2023-10-19 10:37*//**** 主要实现2个方法 run() 和 cancel()*/ public class FlinkSource_Custom implements SourceFunctionStudent {// 声明一个布尔变量作为控制数据生成的标识位private Boolean running true;Overridepublic void run(SourceContextStudent sourceContext) throws Exception {Random random new Random(); // 在指定的数据集中随机选取数据String[] name {Mary, Alice, Bob, Cary};int[] sex {1,2};int age 0;while (running) {sourceContext.collect(new Student(name[random.nextInt(name.length)],sex[random.nextInt(sex.length)],random.nextInt(100)));// 隔 1 秒生成一个点击事件方便观测Thread.sleep(1000);}}Overridepublic void cancel() {running false;} }2.编写主程序 package com.qiyu;import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** author MR.Liu* version 1.0* data 2023-10-19 10:46*/ public class FlinkSource_Custom2 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1); //有了自定义的 source function调用 addSource 方法DataStreamSourceStudent stream env.addSource(new FlinkSource_Custom());stream.print(SourceCustom);env.execute();} }运行主程序运行结果 SourceCustom Student{nameMary, age1, sex46} SourceCustom Student{nameCary, age2, sex52} SourceCustom Student{nameBob, age1, sex14} SourceCustom Student{nameAlice, age1, sex84} SourceCustom Student{nameAlice, age2, sex82} SourceCustom Student{nameCary, age1, sex28} .............
http://www.pierceye.com/news/352649/

相关文章:

  • 公选课网页制作与网站建设网页游戏平台十大排名
  • 无锡锡牛网站建设网站倒计时
  • 南通做外贸网站网站建设培训四川
  • 微小店适合卖做分类网站吗手机开发者网站
  • 广州建企业网站网页设计是啥意思
  • wap手机网站建设刀模 东莞网站建设
  • 怎样做网站的外链做推广优化的网站有哪些内容
  • 永嘉规划建设局网站备案个人网站做淘宝客
  • 枣庄网站建设电话网站怎么做 凡科
  • 视频网站点击链接怎么做的宁波网站建设接单
  • 网站报价表怎么做wordpress 横向扩展
  • 溧阳网站建设哪家好网站建设的教程
  • 360怎么做网站做pop网站
  • 网站建设方案书2000字中国正国级名单
  • 企业网站的布局类型网站移动页面怎么做的
  • 人是用什么做的视频网站吗wordpress如何设水印图片
  • 蛋糕店的网站建设咋写深圳市宝安区邮政编码
  • 东莞横沥网站建设杭州网站制作排名
  • 百合怎么做网站网站开发语
  • 网站搭建哪里找最好天津市建设工程信息网站
  • 有免费注册网站吗做教育网站还挣钱吗
  • 网站做百度推广需要哪些条件店铺推广软文范例
  • 台州企业网站搭建特点迅美网站建设
  • 做营销网站推广官方网站建设方法
  • 网页设计精选网站网站查询功能怎么做
  • 重庆专业网站推广流程建立平台的步骤
  • 舟山市普陀区建设局网站net网站开发 兼职
  • 网站备案流程阿里云南宁网站建设官网
  • h5网站制作介绍简单的静态 新闻 asp 网站源码
  • 济南seo网站推广公司帮别人做彩票网站吗