当前位置: 首页 > news >正文

生鲜电商网站建设策划书云空间布置网站

生鲜电商网站建设策划书,云空间布置网站,视频直播sdk,ppt模板下载素材网站前言 今天是我写博客的第 200 篇#xff0c;恍惚间两年过去了#xff0c;现在已经是大三的学长了。仍然记得两年前第一次写博客的时候#xff0c;当时学的应该是 Java 语言#xff0c;菜的一批#xff0c;写了就删#xff0c;怕被人看到丢脸。当时就想着自己一年之后恍惚间两年过去了现在已经是大三的学长了。仍然记得两年前第一次写博客的时候当时学的应该是 Java 语言菜的一批写了就删怕被人看到丢脸。当时就想着自己一年之后两年之后能学到什么水平什么是 JDBC、什么是 MVC、SSM在当时都是特别好奇的东西不过都在后来的学习中慢慢接触到并且好多已经烂熟于心了。 那今天我在畅想一下一年后的今天我又学到了什么水平能否达到三花聚顶、草木山石皆可为码的超凡入圣的境界拿没拿到心仪的 offer和那个心动过的女孩相处怎么样了哈哈哈哈哈 输出算子Sink 学完了 Flink 在不同执行环境本地测试环境和集群环境下的多种读取多种数据源和转换操作多种转换算子最后就是输出操作了。 1、连接到外部系统 Flink 1.12 之前Sink 算子是通过调用 DataStream 的 addSink 方法来实现的 stream.addSink(new SinkFunction(...)); 从 Flink 1.12 开始Flink 重构了 Sink 架构 stream.sinkTo(...) 查看 Flink 支持的连接器 需要我们自己导入依赖比如上面的 Kfaka 和 DataGen 我们之前使用的时候都导入过相关依赖需要知道有的是只支持source有的只支持sink有的全都支持。 dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-kafka/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-datagen/artifactIdversion${flink.version}/version/dependency 2、输出到文件 Flink 专门提供了一个流式文件系统的连接器FileSink为批处理和流处理提供了一个统一的 Sink它可以将分区文件写入 Flink支持的文件系统。         它的主要操作是将数据写入桶buckets每个桶中的数据都可以分割成一个个大小有限的分区文件这样一来就实现真正意义上的分布式文件存储。我们可以通过各种配置来控制“分桶”的操作默认的分桶方式是基于时间的我们每小时写入一个新的桶。换句话说每个桶内保存的文件记录的都是 1 小时的输出数据。         FileSink 支持行编码Row-encoded和批量编码Bulk-encoded比如 Parquet格式。这两种不同的方式都有各自的构建器builder调用方法也非常简单可以直接调用 FileSink 的静态方法 行编码FileSink.forRowFormatbasePathrowEncoder。批量编码FileSink.forBulkFormatbasePathbulkWriterFactory。 在创建行或批量编码 Sink 时我们需要传入两个参数用来指定存储桶的基本路径basePath和数据的编码逻辑rowEncoder 或 bulkWriterFactory。 package com.lyh.sink;import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.serialization.SimpleStringEncoder; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy; import org.apache.flink.configuration.MemorySize; import org.apache.flink.connector.datagen.source.DataGeneratorSource; import org.apache.flink.connector.datagen.source.GeneratorFunction; import org.apache.flink.connector.file.sink.FileSink; import org.apache.flink.core.fs.Path; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig; import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner; import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;import java.time.Duration; import java.time.ZoneId;/*** author 刘xx* version 1.0* date 2023-11-18 9:51*/ public class SinkFile {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);// 必须开启 检查点 不然一直都是 .inprogressenv.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);DataGeneratorSourceString dataGeneratorSource new DataGeneratorSourceString(new GeneratorFunctionLong, String() {Overridepublic String map(Long value) throws Exception {return Number:value;}},Long.MAX_VALUE,RateLimiterStrategy.perSecond(10), // 每s 10条Types.STRING);DataStreamSourceString dataGen env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), data-generate);// todo 输出到文件系统FileSinkString fileSink FileSink.// 泛型方法 需要和输出结果的泛型保持一致StringforRowFormat(new Path(D:/Desktop), // 指定输出路径 可以是 hdfs:// 路径new SimpleStringEncoder(UTF-8)) // 指定编码.withOutputFileConfig(OutputFileConfig.builder().withPartPrefix(lyh).withPartSuffix(.log).build())// 按照目录分桶 一个小时一个目录(这里的时间格式别改为分钟 会报错: flink Relative path in absolute URI:).withBucketAssigner(new DateTimeBucketAssigner(yyyy-MM-dd HH, ZoneId.systemDefault()))// 设置文件滚动策略-时间或者大小 10s 或 1KB 或 5min内没有新数据写入 滚动一次// 滚动的时候 文件就会更名为我们设定的格式(前缀)不再写入.withRollingPolicy(DefaultRollingPolicy.builder().withRolloverInterval(Duration.ofSeconds(10L)) // 10s.withMaxPartSize(new MemorySize(1024)) // 1KB.withInactivityInterval(Duration.ofMinutes(5)) // 5min.build()).build();dataGen.sinkTo(fileSink);env.execute();} }这里我们创建了一个简单的文件 Sink通过.withRollingPolicy()方法指定了一个“滚动策略”。“滚动”的概念在日志文件的写入中经常遇到因为文件会有内容持续不断地写入所以我们应该给一个标准到什么时候就开启新的文件将之前的内容归档保存。也就是说上面的代码设置了在以下 3 种情况下我们就会滚动分区文件 ⚫ 至少包含 10 秒的数据 ⚫ 最近 5 分钟没有收到新的数据 ⚫ 文件大小已达到 1 KB 通过 withOutputFileConfig()方法指定了输出的文件名前缀和后缀。 需要特别注意的就是一定要开启检查点否则我们的数据一直都是正在写入的状态具体原因后面学习到检查点的时候会详细说。 运行结果 3、输出到 Kafka 需要添加 Kafka 依赖之前导入过了启动 Kafka编写示例代码 package com.lyh.sink;import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; import org.apache.flink.connector.kafka.sink.KafkaSink; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.kafka.clients.producer.ProducerConfig;/*** author 刘xx* version 1.0* date 2023-11-18 11:20*/ public class SinkKafka {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 如果是 精准一次 必须开启 checkpointenv.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);SingleOutputStreamOperatorString sensorDS env.socketTextStream(localhost, 9999);KafkaSinkString kafkaSink KafkaSink.Stringbuilder()// 指定 kafka 的地址和端口.setBootstrapServers(hadoop102:9092,hadoop103:9092,hadoop104:9092)// 指定序列化器 我们是发送方 所以我们是生产者.setRecordSerializer(KafkaRecordSerializationSchema.Stringbuilder().setTopic(like).setValueSerializationSchema(new SimpleStringSchema()).build())// 写到 kafka 的一致性级别: 精准一次 / 至少一次// 如果是精准一次// 1.必须开启检查点 env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE)// 2.必须设置事务的前缀// 3.必须设置事务的超时时间: 大于 checkpoint间隔 小于 max 15分钟.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE).setTransactionalIdPrefix(lyh-).setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,10*60*1000).build();sensorDS.sinkTo(kafkaSink);env.execute();} }启动 kafka 并开启一个消费者 kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic like运行结果 需要特别注意的三点 如果是精准一次1.必须开启检查点 env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE)2.必须设置事务的前缀3.必须设置事务的超时时间: 大于 checkpoint间隔 小于 max 15分钟 自定义序列化器 我们上面用的自带的序列化器但是如果我们有 key 的话就需要自定义序列化器了替换上面的代码 .setRecordSerializer(/*** 如果要指定写入 kafka 的key 就需要自定义序列化器* 实现一个接口 重写序列化方法* 指定key 转为 bytes[]* 指定value 转为 bytes[]* 返回一个 ProducerRecord(topic名,key,value)对象*/new KafkaRecordSerializationSchemaString() {NullableOverride// ProducerRecordbyte[], byte[] 返回一个生产者消息,key,value 分别对应两个字节数组public ProducerRecordbyte[], byte[] serialize(String element, KafkaSinkContext context, Long timestamp) {String[] datas element.split(,);byte[] key datas[0].getBytes(StandardCharsets.UTF_8);byte[] value element.getBytes(StandardCharsets.UTF_8);return new ProducerRecord(like,key,value);}} ) 运行结果  4、输出到 MySQL 添加依赖1.17版本的依赖需要指定仓库才能找到因为阿里云和默认的maven仓库是没有的 dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-jdbc/artifactIdversion1.17-SNAPSHOT/version/dependency dependencygroupIdmysql/groupIdartifactIdmysql-connector-java/artifactIdversion8.0.31/version/dependency....repositoriesrepositoryidapache-snapshots/idnameapache snapshots/nameurlhttps://repository.apache.org/content/repositories/snapshots//url/repository/repositories 创建表格  编写代码将输入的数据行分隔为对象参数每行数据生成一个对象进行处理。  package com.lyh.sink;import com.lyh.bean.WaterSensor; import function.WaterSensorFunction; import org.apache.flink.connector.jdbc.JdbcConnectionOptions; import org.apache.flink.connector.jdbc.JdbcExecutionOptions; import org.apache.flink.connector.jdbc.JdbcSink; import org.apache.flink.connector.jdbc.JdbcStatementBuilder; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.SinkFunction;import java.sql.PreparedStatement; import java.sql.SQLException;/*** author 刘xx* version 1.0* date 2023-11-18 12:32*/ public class SinkMySQL {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorWaterSensor sensorDS env.socketTextStream(localhost, 9999).map(new WaterSensorFunction()); //输入进来的数据自动转为 WaterSensor类型/*** todo 写入 mysql* 1.这里需要用旧的sink写法addSink* 2.JDBC的4个参数* (1) 执行的sql语句* (2) 对占位符进行填充* (3) 执行选项 - 攒批,重试* (4) 连接选项 - driver,username,password,url*/SinkFunctionWaterSensor jdbcSink JdbcSink.sink(insert into flink.ws values(?,?,?),// 指定 sql 中占位符的值new JdbcStatementBuilderWaterSensor() {Overridepublic void accept(PreparedStatement stmt, WaterSensor sensor) throws SQLException {// 占位符从 1 开始stmt.setString(1, sensor.getId());stmt.setLong(2, sensor.getTs());stmt.setInt(3, sensor.getVc());}}, JdbcExecutionOptions.builder().withMaxRetries(3) //最多重试3次(不包括第一次,共4次).withBatchSize(100) //每收集100条记录进行一次写入.withBatchIntervalMs(3000) // 批次3s(即使没有达到100条记录,只要过了3s JDBCSink也会进行记录的写入),这有助于确保数据及时写入而不是无限期地等待批处理大小达到。.build(), new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl(jdbc:mysql://localhost:3306/flink?serverTimezoneAsia/ShanghaiuseUnicodetruecharacterEncodingUTF-8).withDriverName(com.mysql.cj.jdbc.Driver).withUsername(root).withPassword(Yan1029.)// mysql 默认8小时不使用连接就主动断开连接.withConnectionCheckTimeoutSeconds(60) // 重试连接直接的间隔,上面我们设置最多重试3次,每次间隔60s.build());sensorDS.addSink(jdbcSink);env.execute();} }查询结果 5、自定义 Sink 输出 与 Source 类似Flink 为我们提供了通用的 SinkFunction 接口和对应的 RichSinkDunction抽象类只要实现它通过简单地调用 DataStream 的.addSink()方法就可以自定义写入任何外部存储。 这里我们自定义实现一个向 HBase 中插入数据的 Sink。 注意这里只是做一个简单的 Demo下面的代码不难发现我们只是对 nosq:student 表下的 info:name 进行了两次的覆盖。如果要实现复杂的处理功能需要对数据类型进行定义因为 HBase 的数据是按列存储的所以对于复杂的 Hbase 表我们难以通过 Java bean 来插入数据。而且一般经常用的连接器Flink 大部分已经提供了开发中我们一般也很少自定义 Sink 输出。 package com.lyh.sink;import com.lyh.utils.HBaseConnection; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Table;import java.nio.charset.StandardCharsets;/*** author 刘xx* version 1.0* date 2023-11-18 15:59*/ public class SinkCustomHBase {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.fromElements(tom,bob).addSink(new RichSinkFunctionString() {public Connection con;Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);con HBaseConnection.getConnection(hadoop102:2181);}Overridepublic void invoke(String value, Context context) throws Exception {super.invoke(value, context);Table table con.getTable(TableName.valueOf(nosql,student));Put put new Put(1001.getBytes(StandardCharsets.UTF_8));put.addColumn(info.getBytes(StandardCharsets.UTF_8),name.getBytes(StandardCharsets.UTF_8),value.getBytes(StandardCharsets.UTF_8));table.put(put);table.close();}Overridepublic void close() throws Exception {super.close();HBaseConnection.close();}});env.execute();} }这里用到一个简单的连接 HBase 的工具类   package com.lyh.utils;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory;import java.io.IOException;/*** author 刘xx* version 1.0* date 2023-11-18 16:04*/ public class HBaseConnection {private static Connection connection;public static Connection getConnection(String hosts) throws IOException {Configuration conf new Configuration();conf.set(hbase.zookeeper.quorum, hosts);conf.setInt(hbase.rpc.timeout, 10000); // 设置最大超时 10 sconnection ConnectionFactory.createConnection(conf);return connection;}public static void close() throws IOException {if (connection!null)connection.close();} }
http://www.pierceye.com/news/535124/

相关文章:

  • 在服务器网站上做跳转网页设计代码动漫
  • 科协网站建设的意见合肥哪里有做网页的地方
  • 为企业做网站策划案永康网站推广
  • 做个企业网网站怎么做linux建网站
  • 专业建站公司主要做什么wordpress加入下载标签
  • 韩都衣舍网站建设方案美食网站怎么做dw
  • 电商网站开发 参考文献wordpress验证码注册
  • ic外贸网站建设wordpress和shopex
  • 网站技术制作流程图国内顶尖小程序开发公司
  • 免费网站建设下载优化关键词规则
  • 网站浮动窗口如何做自己怎么做淘宝客网站
  • 石材外贸在哪个网站做网页版 微信
  • 网站开发属于程序员吗sem 优化软件
  • 公司做网站是管理费用小程序官方文档
  • 公司网站推广技巧响水网站设计
  • 徐州本地网站wap页面是什么
  • 网站开发应用价值做套网站多少钱
  • asp.net网站模板免费下载怎么才能访问自己做的网站
  • 长沙企业网站制作宝安公司网站建设
  • 做网站需要拉多大的宽带dw做的网站怎么做后台
  • 公司网站建设设计公司哪家好wordpress自动封ip
  • 郫县网站制作wordpress搜索打钩
  • 哪些网站可以做招商广告语wordpress发文章的id怎么不连续
  • 家私网站栏目和功能需求策划网页样式库
  • 什么是网站网页主页企业电子邮箱格式
  • 金属建材企业网站建设方案用pycharm做网站
  • 重庆网站空间黄骅港一期码头潮汐表
  • 推广网站如何做做酒店网站所用到的算法
  • 最好的网站建设组织wordpress 删除google
  • 生物科技 网站模板下载在线室内设计