中职网站建设教学计划,申请企业邮箱需要准备什么材料,国内网站是cn还是com,wordpress个人网线前一篇文章KnowStreaming系列教程第二篇——项目整体架构分析_诸葛子房_的博客-CSDN博客
讲述了KS的整体项目目录#xff0c;这边文章来讲述下KS在调度模块里面对于指标采集和元数据同步
一、调度模块代码主要在km-task里面
public class TaskClusterAddedListener impleme…前一篇文章KnowStreaming系列教程第二篇——项目整体架构分析_诸葛子房_的博客-CSDN博客
讲述了KS的整体项目目录这边文章来讲述下KS在调度模块里面对于指标采集和元数据同步
一、调度模块代码主要在km-task里面
public class TaskClusterAddedListener implements ApplicationListenerClusterPhyAddedEvent {private static final ILog LOGGER LogFactory.getLog(TaskClusterAddedListener.class);Overridepublic void onApplicationEvent(ClusterPhyAddedEvent event) {LOGGER.info(methodonApplicationEvent||clusterPhyId{}||msglistened new cluster, event.getClusterPhyId());Long now System.currentTimeMillis();// 交由KS自定义的线程池异步执行任务FutureUtil.quickStartupFutureUtil.submitTask(() - triggerAllTask(event.getClusterPhyId(), now));}private void triggerAllTask(Long clusterPhyId, Long startTimeUnitMs) {ClusterPhy tempClusterPhy null;// 120秒内无加载进来则直接返回退出while (System.currentTimeMillis() - startTimeUnitMs 120L * 1000L) {tempClusterPhy LoadedClusterPhyCache.getByPhyId(clusterPhyId);if (tempClusterPhy ! null) {break;}BackoffUtils.backoff(1000);}if (tempClusterPhy null) {return;}// 获取到之后再延迟5秒保证相关的集群都被正常加载进来这里的5秒不固定BackoffUtils.backoff(5000);final ClusterPhy clusterPhy tempClusterPhy;// 集群执行集群元信息同步ListAbstractAsyncMetadataDispatchTask metadataServiceList new ArrayList(SpringTool.getBeansOfType(AbstractAsyncMetadataDispatchTask.class).values());for (AbstractAsyncMetadataDispatchTask dispatchTask: metadataServiceList) {try {dispatchTask.asyncProcessSubTask(clusterPhy, startTimeUnitMs);} catch (Exception e) {// ignore}}// 再延迟5秒保证集群元信息都已被正常同步至DB这里的5秒不固定BackoffUtils.backoff(5000);// 集群集群指标采集ListAbstractAsyncMetricsDispatchTask metricsServiceList new ArrayList(SpringTool.getBeansOfType(AbstractAsyncMetricsDispatchTask.class).values());for (AbstractAsyncMetricsDispatchTask dispatchTask: metricsServiceList) {try {dispatchTask.asyncProcessSubTask(clusterPhy, startTimeUnitMs);} catch (Exception e) {// ignore}}}
}
通过监听集群添加事件触发元数据同步和指标采集调度任务
具体实现可参考
spring 根据接口或者抽象类获取子类执行: https://blog.csdn.net/u012501054/article/details/103927674 二、调度任务分布式系统如何做到单节点运行避免多台机器调度
AbstractDispatchTask 里面的execute 方法通过实现任务分配
public TaskResult execute(JobContext jobContext) {try {long triggerTimeUnitMs System.currentTimeMillis();// 获取所有的任务ListE allTaskList this.listAllTasks();if (ValidateUtils.isEmptyList(allTaskList)) {LOGGER.debug(all-task is empty, finish process, taskName:{} jobContext:{}, taskName, jobContext);return TaskResult.SUCCESS;}// 计算当前机器需要执行的任务ListE subTaskList this.selectTask(allTaskList, jobContext.getAllWorkerCodes(), jobContext.getCurrentWorkerCode());if (ValidateUtils.isEmptyList(allTaskList)) {LOGGER.debug(sub-task is empty, finish process, taskName:{} jobContext:{}, taskName, jobContext);return TaskResult.SUCCESS;}// 进行任务处理TaskResult ret this.processTask(subTaskList, triggerTimeUnitMs);//组装信息TaskResult taskResult new TaskResult();taskResult.setCode(ret.getCode());taskResult.setMessage(ConvertUtil.list2String(subTaskList, ,));return taskResult;} catch (Exception e) {LOGGER.error(process task failed, taskName:{}, taskName, e);return new TaskResult(TaskResult.FAIL_CODE, e.toString());}}
对应代码解释如下 参考
https://github.com/didi/KnowStreaming/blob/master/docs/dev_guide/Task%E6%A8%A1%E5%9D%97%E7%AE%80%E4%BB%8B.md