当前位置: 首页 > news >正文

网站的动态新闻数据库怎么做哪些网站可以做视频直播

网站的动态新闻数据库怎么做,哪些网站可以做视频直播,沈阳网站建设公司电话,旅行网站系统DorisWriterManager 的类#xff0c;用于将数据写入到 Doris 中。以下是代码的具体作用和功能解释#xff1a; 导入必要的包和类#xff1a; 代码开头导入了所需的包和类#xff0c;包括日志记录、线程池、字符编码和其他相关工具类。类成员变量定义#xff1a; 下面是一…DorisWriterManager 的类用于将数据写入到 Doris 中。以下是代码的具体作用和功能解释 导入必要的包和类 代码开头导入了所需的包和类包括日志记录、线程池、字符编码和其他相关工具类。类成员变量定义 下面是一些类的成员变量定义这些变量在类的不同方法中使用 LOG: 用于记录日志的 Logger 对象。visitor: DorisStreamLoadObserver 类的实例用于处理数据写入 Doris 的观察者。options: Keys 类的实例包含了一些配置选项。buffer: 存储待写入 Doris 的数据。batchCount: 当前批次中的记录数量。batchSize: 当前批次中的数据大小。closed: 标志位表示是否已关闭写入。flushException: 异步刷新数据时可能发生的异常。flushQueue: 用于异步刷新数据的队列。scheduler: 用于定期刷新数据的调度器。scheduledFuture: 用于取消定时任务的句柄。构造函数 DorisWriterManager 构造函数接受一个 Keys 对象作为参数设置了初始化的配置信息并初始化了 visitor 和 flushQueue。接着它调用 startScheduler() 启动定期刷新任务以及 startAsyncFlushing() 启动异步刷新线程。startScheduler() 方法 此方法负责启动定时刷新任务。它首先调用 stopScheduler() 停止之前的定时任务。然后创建一个单线程的调度器scheduler并设置一个定时任务定期触发数据刷新操作。在定时任务内部它会检查是否关闭了写入操作然后根据配置信息进行数据刷新。如果当前批次为空重新启动定时任务确保数据持续刷新。stopScheduler() 方法 此方法用于停止定时任务。它会取消之前的定时任务并关闭调度器。writeRecord(String record) 方法 该方法用于将记录写入缓冲区。它首先调用 checkFlushException() 方法检查是否存在刷新异常。然后将记录转换成字节数组并添加到缓冲区中同时更新批次计数和数据大小。如果当前批次的记录数量或数据大小超过了阈值就会触发数据刷新。flush(String label, boolean waitUntilDone) 方法 此方法用于手动触发数据刷新操作。它首先检查是否存在刷新异常然后根据当前批次的情况决定是否执行刷新。如果当前批次为空且 waitUntilDone 为真它会等待之前的异步刷新操作完成。否则它将当前批次的数据放入刷新队列并根据 waitUntilDone 参数决定是否等待刷新操作完成。close() 方法 此方法用于关闭 DorisWriterManager。它首先检查是否已经关闭然后触发一次最终的数据刷新操作。如果当前批次有数据会记录相应日志。最后它检查是否有刷新异常并抛出相应异常。createBatchLabel() 方法 此方法用于创建批次标签用于标识一批数据。它根据配置的前缀和随机 UUID 生成标签。startAsyncFlushing() 方法 此方法启动一个异步刷新线程。线程会循环调用 asyncFlush() 方法将数据异步刷新到 Doris 中。waitAsyncFlushingDone() 方法 该方法用于等待之前的异步刷新操作完成。它向刷新队列添加空的 WriterTuple以确保之前的刷新操作完成。然后它检查是否存在刷新异常。asyncFlush() 方法 此方法用于异步刷新数据到 Doris。它从刷新队列中取出 WriterTuple然后根据批次的标签执行数据刷新操作。如果发生异常它会尝试多次直到达到最大重试次数。如果需要重新创建批次标签则生成新的标签。重试之间会休眠一段时间。成功后重新启动定时任务。checkFlushException() 方法 此方法用于检查是否存在刷新异常如果存在则抛出异常。 这个 DorisWriterManager 类的目的是管理数据写入到 Doris 数据库的操作。它通过定时任务和异步刷新线程来控制数据的批量写入同时处理异常情况确保数据的稳定写入。 添加详细注释代码如下: import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; import java.util.UUID; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.concurrent.BasicThreadFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory;public class DorisWriterManager {private static final Logger LOG LoggerFactory.getLogger(DorisWriterManager.class);private final DorisStreamLoadObserver visitor;private final Keys options;private final Listbyte[] buffer new ArrayList(); // 缓冲区用于存储待写入 Doris 的数据private int batchCount 0; // 当前批次中的记录数量private long batchSize 0; // 当前批次中的数据大小private volatile boolean closed false; // 标志位表示是否已关闭private volatile Exception flushException; // 异步刷新数据时可能发生的异常private final LinkedBlockingDequeWriterTuple flushQueue; // 用于异步刷新数据的队列private ScheduledExecutorService scheduler; // 用于定期刷新数据的调度器private ScheduledFuture? scheduledFuture;public DorisWriterManager(Keys options) {this.options options;this.visitor new DorisStreamLoadObserver(options);flushQueue new LinkedBlockingDeque(options.getFlushQueueLength());this.startScheduler(); // 启动定期刷新调度器this.startAsyncFlushing(); // 启动异步刷新线程}// 启动定期刷新调度器public void startScheduler() {stopScheduler(); // 停止之前的调度器this.scheduler Executors.newScheduledThreadPool(1, new BasicThreadFactory.Builder().namingPattern(Doris-interval-flush).daemon(true).build());this.scheduledFuture this.scheduler.schedule(() - {synchronized (DorisWriterManager.this) {if (!closed) {try {String label createBatchLabel();LOG.info(String.format(Doris interval Sinking triggered: label[%s]., label));if (batchCount 0) {startScheduler(); // 如果当前批次为空重新启动定时任务}flush(label, false);} catch (Exception e) {flushException e;}}}}, options.getFlushInterval(), TimeUnit.MILLISECONDS);}// 停止定期刷新调度器public void stopScheduler() {if (this.scheduledFuture ! null) {scheduledFuture.cancel(false);this.scheduler.shutdown();}}// 写入一条记录到缓冲区public final synchronized void writeRecord(String record) throws IOException {checkFlushException(); // 检查是否有刷新异常try {byte[] bts record.getBytes(StandardCharsets.UTF_8);buffer.add(bts);batchCount;batchSize bts.length;if (batchCount options.getBatchRows() || batchSize options.getBatchSize()) {String label createBatchLabel();LOG.debug(String.format(Doris buffer Sinking triggered: rows[%d] label[%s]., batchCount, label));flush(label, false); // 当记录数量或数据大小超过阈值时触发刷新}} catch (Exception e) {throw new IOException(Writing records to Doris failed., e);}}// 手动触发刷新缓冲区的数据public synchronized void flush(String label, boolean waitUntilDone) throws Exception {checkFlushException(); // 检查是否有刷新异常if (batchCount 0) {if (waitUntilDone) {waitAsyncFlushingDone(); // 如果当前批次为空等待之前的刷新操作完成}return;}flushQueue.put(new WriterTuple(label, batchSize, new ArrayList(buffer))); // 将数据放入刷新队列if (waitUntilDone) {waitAsyncFlushingDone(); // 等待刷新操作完成}buffer.clear();batchCount 0;batchSize 0;}// 关闭 DorisWriterManager触发最后一次刷新操作public synchronized void close() {if (!closed) {closed true;try {String label createBatchLabel();if (batchCount 0) LOG.debug(String.format(Doris Sink is about to close: label[%s]., label));flush(label, true); // 关闭时触发刷新操作} catch (Exception e) {throw new RuntimeException(Writing records to Doris failed., e);}}checkFlushException();}// 创建批次标签通常用于标识一批数据public String createBatchLabel() {StringBuilder sb new StringBuilder();if (!Strings.isNullOrEmpty(options.getLabelPrefix())) {sb.append(options.getLabelPrefix());}return sb.append(UUID.randomUUID().toString()).toString();}// 启动异步刷新线程private void startAsyncFlushing() {Thread flushThread new Thread(new Runnable() {public void run() {while (true) {try {asyncFlush(); // 异步刷新数据} catch (Exception e) {flushException e;}}}});flushThread.setDaemon(true);flushThread.start();}// 等待之前的刷新操作完成private void waitAsyncFlushingDone() throws InterruptedException {for (int i 0; i options.getFlushQueueLength(); i) {flushQueue.put(new WriterTuple(, 0L, null));}checkFlushException();}// 异步刷新数据到 Dorisprivate void asyncFlush() throws Exception {WriterTuple flushData flushQueue.take();if (Strings.isNullOrEmpty(flushData.getLabel())) {return;}stopScheduler(); // 停止定时任务LOG.debug(String.format(Async stream load: rows[%d] bytes[%d] label[%s]., flushData.getRows().size(), flushData.getBytes(), flushData.getLabel()));for (int i 0; i options.getMaxRetries(); i) {try {// 利用 DorisStreamLoadObserver 进行数据刷新visitor.streamLoad(flushData);LOG.info(String.format(Async stream load finished: label[%s]., flushData.getLabel()));startScheduler(); // break;} catch (Exception e) {LOG.warn(Failed to flush batch data to Doris, retry times {}, i, e);if (i options.getMaxRetries()) {throw new IOException(e);}if (e instanceof DorisWriterExcetion (( DorisWriterExcetion )e).needReCreateLabel()) {String newLabel createBatchLabel();LOG.warn(String.format(Batch label changed from [%s] to [%s], flushData.getLabel(), newLabel));flushData.setLabel(newLabel);}try {Thread.sleep(1000l * Math.min(i 1, 10));} catch (InterruptedException ex) {Thread.currentThread().interrupt();throw new IOException(Unable to flush, interrupted while doing another attempt, e);}}}}private void checkFlushException() {if (flushException ! null) {throw new RuntimeException(Writing records to Doris failed., flushException);}} }
http://www.pierceye.com/news/402181/

相关文章:

  • 网站建设(信奈辉煌电商)陕西富通建设工程有限公司网站
  • 南昌县住房和城乡建设局网站外海网站如何做网站的推广
  • 重庆网站推广报价wordpress全景图
  • 做那个的网站谁有建立什么指标体系和评价程序规范
  • 新旧网站对比全国建设厅网站
  • 有经验的番禺网站建设做球服的网站有哪些
  • 临泉建设网站互联网行业都有哪些工作
  • 甘肃省嘉峪关建设局网站做游戏交易网站
  • 校园网站做自己的广告惠州网络问政平台官网
  • 网站建设使用的什么和国外做贸易用什么网站
  • 苏州自助建站模板宁波seo快速优化
  • 做网站的可行性分析网络推广渠道公司
  • 企业网站优化兴田德润优惠汕头网络营销推广该怎么做
  • 安徽省住房和建设厅网站企业网站建设硬件
  • 网站怎样运营网站文章收录
  • 怎么制作微网站wordpress 自动安装
  • 建设软件网站七台河新闻直播
  • 深圳 公司网站设计网站建设得花多少钱
  • 社交型网站首页面设计分析宁夏电建网站
  • 网站开发需要掌握的知识中国建设网官方网站
  • 制作网站素材网页设计代码步骤
  • 做微信头图的网站网页编程语言有哪几种
  • 医生工作室网站建设男女激烈做羞羞事网站网站韩剧
  • 网站里面网友点评怎么做网站开发需要资质吗
  • gta 买房网站建设中软件下载免费大全网站
  • 中国移动网站专门拍短视频的公司
  • 网站制作网站建设报价南通优化网站怎么收费
  • 网站的连接二维码怎么做wordpress.org账号
  • 优秀的网站有哪些内容wordpress重新安装删除哪个文件
  • 网站建设与发布需要什么手机端开发app