专业营销网站建设,四川盼之网络科技官网,linux网站建设技术指南,aso.net 网站开发1.设计
执行任务找一个落地场景#xff1a;连接设备采集参数。设备有不同的协议#xff0c;如#xff1a;modbus rtu、modbus tcp、opc ua、simens s7等。协议多种多样#xff0c;需要的参数也不同#xff0c;连接及任务执行参数存放在t_job表的link_spec中#xff0c;任…1.设计
执行任务找一个落地场景连接设备采集参数。设备有不同的协议如modbus rtu、modbus tcp、opc ua、simens s7等。协议多种多样需要的参数也不同连接及任务执行参数存放在t_job表的link_spec中任务的配置存储在job_spec中存储格式都是json。
1.1 任务的流转 1.2 类设计
1.2.1 Job类设计 JobAllotStrategy是接口目前只定义了allotJob(分配任务)业务方法后面会根据情况增加下面AbstractAllotStrategy是抽象类实现了JobAllotStrategy接口将底层的具体实现的公共方法放在这里。
AverageJobAllotStrategy平均分配任务策略实现类
WeightJobAllotStrategy权重分配任务策略实现类
... 后面可以有很多
1.2.2 辅助类设计
ExecuteDttaskJobContext任务相关参数的封装
JobAllotManager对不同任务分配策略的集中管理
1.2.3 协议相关类的设计 IProtocol顶层接口定义协议的类型、开始方法和处理配置方法
AbstractProtocol抽象类对下面实际协议的实现的方式进行再细化以及共用逻辑的统一
ModbusRTUProtocol针对Modbus RTU协议的处理
ModbusTCPProtocol针对Modbus TCP协议的处理
VirtualProtocol:针对测试环境虚拟化的协议处理
.... 可以自定义扩充
1.2.4 协议相关辅助类
CollectDataService采集数据的最上层服务类
ICollectDataWorker采集数据的接口定义此处提供接口方便后续不同采集数据实现的扩充
CommonCollectDataWorker一般采集数据接口实现使用java原生的ScheduledExecutorService实现周期性采集任务
2. 实现
2.1 Job相关类实现
2.1.1 JobAllotStrategy
目前此类很简单只有分配job的类型和分配job两个方法后面再去实现rebalanceJob的功能
public interface JobAllotStrategy {JobAllotStrategyType getType();void allotJob();}
2.1.2 AbstractJobAllotStrategy
JobAllotStrategy的抽象实现后续所有Strategy的通用实现以及一些复杂逻辑的组合可以放在这里
public abstract class AbstractJobAllotStrategy implements JobAllotStrategy {public ListJob getAllJob() {return BeanUseHelper.entityHelpService().getAllJob();}public ListDttaskJob getAllCrawlerJob() {return BeanUseHelper.entityHelpService().getAllDttaskJob();}public ListDttaskJob getByCrawlerId(long dttaskId) {return BeanUseHelper.entityHelpService().queryDttaskJob(dttaskId);}}
2.1.3 AverageJobAllotStrategy
平均分配job的策略实现 WeightJobAllotStrategy可以模仿此类实现主要逻辑是从每个Dttask节点解析配置的不同weight指标默认是1:1:1
可以配置成2:1:1,这样如果有8个任务那么1号节点分配4个2号和3号节点各分配2个
Component
Slf4j
public class AverageJobAllotStrategy extends AbstractJobAllotStrategy {Autowiredprivate CollectDataService collectDataService;Overridepublic JobAllotStrategyType getType() {return JobAllotStrategyType.AVERAGE;}Overridepublic void allotJob() {EntityHelpService entityHelpService BeanUseHelper.entityHelpService();MapLong, ListJob allotJobMap getAllotJobMap();log.info(allotJobMap{}, allotJobMap);entityHelpService.invalidAllDttaskJob();entityHelpService.saveAllDttaskJob(allotJobMap);MapLong, ListDttaskJob dttaskJobMap entityHelpService.queryDttaskJob();executeDttaskJob(new ExecuteDttaskJobContext(dttaskJobMap, true));}private void executeDttaskJob(ExecuteDttaskJobContext executeDttaskJobContext) {MapLong, ListDttaskJob dttaskJobMap executeDttaskJobContext.getDttaskJobMap();boolean startFlag executeDttaskJobContext.getStartFlag();dttaskJobMap.forEach((dttaskId, dttaskJobList) - {if (!Objects.equals(ServerInfo.getServerId(), dttaskId)) {// 向其它节点发送 任务控制 命令SetLong dttaskJobIdList dttaskJobList.stream().map(DttaskJob::getId).collect(Collectors.toSet());DttaskMessage controlCollectMessage DttaskMessage.buildControlCollectMessage(dttaskJobIdList, startFlag, dttaskId);log.info(向nodeId{}发送采集控制指令{}, controlCollectMessage);ServerInfo.getChannelByServerId(dttaskId).writeAndFlush(controlCollectMessage);} else {log.info({}分配给自己的采集任务{}, startFlag ? 执行 : 停止, dttaskJobList);SetLong dttaskJobIds dttaskJobList.stream().map(DttaskJob::getId).collect(Collectors.toSet());if (startFlag) {collectDataService.startCollectData(dttaskJobIds);} else {collectDataService.stopCollectData(dttaskJobIds);}}});}private MapLong, ListJob getAllotJobMap() {ListJob allJob getAllJob();return average(allJob);}private T MapLong, ListT average(ListT list) {ListNodeInfo nodeInfoList ServerInfo.getNodeInfoList();int nodeCount nodeInfoList.size();MapLong, ListT allotJobMap new HashMap();int averageJobCount list.size() / nodeCount;int remainingJobCount list.size() % nodeCount;int currentIndex 0;for (NodeInfo nodeInfo : nodeInfoList) {allotJobMap.put(nodeInfo.getServerId(), list.subList(currentIndex, currentIndex averageJobCount));currentIndex averageJobCount;}while (remainingJobCount ! 0) {for (Map.EntryLong, ListT entry : allotJobMap.entrySet()) {entry.getValue().addAll(list.subList(currentIndex, currentIndex 1));currentIndex;remainingJobCount--;}}return allotJobMap;}}
2.1.4 ExecuteDttaskJobContext
执行任务传递的上下文信息这里还比较简单只是一个Map和执行的是启动还是停止后续可以根据业务扩展它也可以自带逻辑
Data
AllArgsConstructor
NoArgsConstructor
public class ExecuteDttaskJobContext {private MapLong, ListDttaskJob dttaskJobMap;private Boolean startFlag;}
2.1.5 JobAllotStrategyType
分配job的策略枚举类
public enum JobAllotStrategyType {AVERAGE(0), WEIGHT(1), SPECIFIC(2);int code;JobAllotStrategyType(int code) {this.code code;}public static JobAllotStrategyType from(int code) {for (JobAllotStrategyType value : values()) {if (value.code code) {return value;}}throw new BusinessException(CharSequenceUtil.format(code{}不在JobAllotStrategyType中, code));}
}
2.1.6 JobAllotManager
和前面的netty任务处理策略一样这里我也借助Spring容器管理所有JobAllotStrategy的策略并根据配置选择当前任务的策略
Component
Slf4j
public class JobAllotManager {Autowiredprivate ListJobAllotStrategy jobAllotStrategies;private static final MapJobAllotStrategyType, JobAllotStrategy map new EnumMap(JobAllotStrategyType.class);PostConstructpublic void init() {if (jobAllotStrategies ! null) {for (JobAllotStrategy jobAllotStrategy : jobAllotStrategies) {map.put(jobAllotStrategy.getType(), jobAllotStrategy);}}}public static JobAllotStrategy getStrategy() {JobAllotStrategyType allotJobStrategyType JobAllotStrategyType.from(BeanUseHelper.dttaskServerConfig().getAllotJobStrategyType());if (map.containsKey(allotJobStrategyType)) {return map.get(allotJobStrategyType);}throw new BusinessException(CharSequenceUtil.format(allotJobStrategyType{} 配置有误, allotJobStrategyType));}}
2.2 Protocol相关类实现
Protocol后面的变化会很多不同协议需要的参数处理的逻辑都不相同目前仅进行较简化的封装
2.2.1 IProtocol
协议接口主要是启动协议处理我暂时将协议关闭的逻辑放到了Job这一侧去完成也可以在这里提供一个stop接口自己协议完成
public interface IProtocol {int getType();void start(ProtocolContext protocolContext);void parseConfig(ProtocolContext protocolContext);}2.2.2 AbstractProtocol
Slf4j
public abstract class AbstractProtocol implements IProtocol {protected Long dttaskId;protected Long dttaskJobId;protected Long deviceId;protected ProtocolContext protocolContext;public void parseConfig(ProtocolContext protocolContext) {this.dttaskId protocolContext.getDttaskId();this.dttaskJobId protocolContext.getDttaskJobId();this.deviceId protocolContext.getDeviceId();this.protocolContext protocolContext;doParseConfig(protocolContext);}protected abstract void doParseConfig(ProtocolContext protocolContext);public abstract void doStart();public void start(ProtocolContext protocolContext) {log.info(进入 AbstractProtocol.start, protocolContext{}, protocolContext);parseConfig(protocolContext);doStart();}}
2.2.3 Modbus RTU相关类实现
2.2.3.1 ModbusRTUProtocol
Data
Slf4j
Component
public class ModbusRTUProtocol extends AbstractProtocol {private ModbusRTUSpecModel modbusRTUSpecModel;Autowiredprivate ICollectDataWorker collectDataWorker;Overridepublic void doParseConfig(ProtocolContext protocolContext) {JSONObject jsonObject protocolContext.getParam();this.modbusRTUSpecModel ModbusRTUSpecModel.getFromParam(jsonObject);}Overridepublic int getType() {return 0;}Overridepublic void doStart() {try {SerialParameters serialParameters new SerialParameters();serialParameters.setPortName(modbusRTUSpecModel.getPortName());serialParameters.setBaudRate(modbusRTUSpecModel.getBaudRate());serialParameters.setDatabits(modbusRTUSpecModel.getDatabits());serialParameters.setStopbits(modbusRTUSpecModel.getStopbits());serialParameters.setParity(modbusRTUSpecModel.getParity());serialParameters.setEncoding(modbusRTUSpecModel.getEncoding());serialParameters.setEcho(modbusRTUSpecModel.getEcho());serialParameters.setPortName(modbusRTUSpecModel.getPortName());ModbusSerialMaster modbusSerialMaster new ModbusSerialMaster(serialParameters);modbusSerialMaster.connect();ModbusRTUCollectWorker modbusRTUCollectWorker new ModbusRTUCollectWorker(this.dttaskId, this.dttaskJobId, deviceId, modbusSerialMaster, modbusRTUSpecModel);collectDataWorker.addCollectTask(dttaskJobId, modbusRTUCollectWorker, 5, 1, TimeUnit.SECONDS);} catch (Exception e) {log.error(DttaskId{}采集config{}出现异常, this.dttaskId, protocolContext, e);}}
}
2.2.3.2 ModbusRTUCollectWorker
Slf4j
public class ModbusRTUCollectWorker implements Runnable {private Long dttaskId;private Long dttaskJobId;private Long deviceId;private ModbusSerialMaster master;private ModbusRTUSpecModel modbusRTUSpecModel;private MapString, Long lastReadMap new HashMap();public ModbusRTUCollectWorker(Long dttaskId, Long dttaskJobId, Long deviceId, ModbusSerialMaster master, ModbusRTUSpecModel modbusRTUSpecModel) {this.dttaskId dttaskId;this.dttaskJobId dttaskJobId;this.deviceId deviceId;this.master master;this.modbusRTUSpecModel modbusRTUSpecModel;}Overridepublic void run() {long current new Date().getTime();for (ModbusRTUSpecModel.PointDetail pointDetail : modbusRTUSpecModel.getPointDetailList()) {String key pointDetail.getKey();if (lastReadMap.containsKey(key) (current - lastReadMap.get(key)) % pointDetail.getSamplingInterval() 1) {try {Register[] registers master.readMultipleRegisters(modbusRTUSpecModel.getUnitId(),pointDetail.getOffset(),pointDetail.getNumOfRegisters());for (Register register : registers) {log.info(Register value:{}, register.getValue());}} catch (Exception e) {log.error(DttaskId{}采集registerConfig{}出现异常, this.dttaskId, pointDetail, e);}}}}
}2.2.3.3 ModbusRTUSpecModel
Data
public class ModbusRTUSpecModel {private Integer mode;private Integer unitId;private String portName;private Integer baudRate;private Integer databits;private String parity;private Integer stopbits;private String encoding RTU;private Boolean echo false;private ListPointDetail pointDetailList;Datapublic static class PointDetail {private String key;private Integer offset;private Integer numOfRegisters;private Integer samplingInterval;}public static ModbusRTUSpecModel getFromParam(JSONObject param) {JSONObject linkSpec param.getJSONObject(linkSpec);ModbusRTUSpecModel modbusRTUSpecModel new ModbusRTUSpecModel();modbusRTUSpecModel.setMode(linkSpec.getInteger(mode));modbusRTUSpecModel.setUnitId(linkSpec.getInteger(unitId));modbusRTUSpecModel.setPortName(linkSpec.getString(portName));modbusRTUSpecModel.setBaudRate(linkSpec.getInteger(baudRate));modbusRTUSpecModel.setDatabits(linkSpec.getInteger(databits));modbusRTUSpecModel.setParity(linkSpec.getString(parity));modbusRTUSpecModel.setStopbits(linkSpec.getInteger(stopbits));JSONArray pointDetailJsonArray linkSpec.getJSONArray(pointDetailList);ListPointDetail rtuPointDetailList new ArrayList();for (Object pointDetailObject : pointDetailJsonArray) {JSONObject pointDetail (JSONObject)pointDetailObject;PointDetail rtuPointDetail new PointDetail();rtuPointDetail.setKey(pointDetail.getString(key));rtuPointDetail.setOffset(pointDetail.getInteger(offset));rtuPointDetail.setNumOfRegisters(pointDetail.getInteger(numOfRegisters));rtuPointDetail.setSamplingInterval(pointDetail.getInteger(samplingInterval));rtuPointDetailList.add(rtuPointDetail);}modbusRTUSpecModel.setPointDetailList(rtuPointDetailList);return modbusRTUSpecModel;}
}
2.2.4 Modbus TCP相关类实现
2.2.4.1 ModbusTCPProtocol
Component
Slf4j
public class ModbusTCPProtocol extends AbstractProtocol {private ModbusTCPSpecModel modbusTCPSpecModel;Autowiredprivate ICollectDataWorker collectDataWorker;Overridepublic int getType() {return Constant.EntityConstants.LINK_TYPE_MODBUSTCP;}Overrideprotected void doParseConfig(ProtocolContext protocolContext) {JSONObject param protocolContext.getParam();modbusTCPSpecModel ModbusTCPSpecModel.getFromParam(param);}Overridepublic void doStart() {try {ModbusTCPMaster modbusTCPMaster new ModbusTCPMaster(modbusTCPSpecModel.getIp(), modbusTCPSpecModel.getPort(), 5, true);modbusTCPMaster.connect();ModbusTCPCollectWorker worker new ModbusTCPCollectWorker(this.dttaskId, this.dttaskJobId, deviceId, modbusTCPSpecModel, modbusTCPMaster);collectDataWorker.addCollectTask(dttaskJobId, worker, 5, 1, TimeUnit.SECONDS);} catch (Exception e) {log.error(DttaskId{}采集config{}出现异常, dttaskId, modbusTCPSpecModel, e);}}
}
2.2.4.2 ModbusTCPCollectWorker
Slf4j
public class ModbusTCPCollectWorker implements Runnable {private Long dttaskId;private Long dttaskJobId;private Long deviceId;private ModbusTCPSpecModel modbusTCPSpecModel;private ModbusTCPMaster master;private MapString, Long lastReadMap new HashMap();public ModbusTCPCollectWorker(Long dttaskId, Long dttaskJobId, Long deviceId, ModbusTCPSpecModel modbusTCPSpecModel, ModbusTCPMaster master) {this.dttaskId dttaskId;this.dttaskJobId dttaskJobId;this.deviceId deviceId;this.modbusTCPSpecModel modbusTCPSpecModel;this.master master;}Overridepublic void run() {long current new Date().getTime();for (ModbusTCPSpecModel.PointDetail pointDetail : modbusTCPSpecModel.getPointDetailList()) {String key pointDetail.getKey();if (lastReadMap.containsKey(key) (current - lastReadMap.get(key)) % pointDetail.getSamplingInterval() 1) {try {Register[] registers master.readMultipleRegisters(modbusTCPSpecModel.getSlaveId(),pointDetail.getOffset(),pointDetail.getNumOfRegisters());for (Register register : registers) {log.info(Register value:{}, register.getValue());}} catch (Exception e) {log.error(DttaskId{}采集pointDetail{}出现异常, dttaskId, pointDetail, e);}}}}
}
2.2.4.3 ModbusTCPSpecModel
Data
public class ModbusTCPSpecModel {private Integer mode;private Long deviceId;private String ip;private Integer port;private Integer slaveId;private ListPointDetail pointDetailList;Datapublic static class PointDetail {private String key;private Integer offset;private Integer numOfRegisters;private Integer samplingInterval;}public static ModbusTCPSpecModel getFromParam(JSONObject param) {JSONObject linkSpec param.getJSONObject(linkSpec);ModbusTCPSpecModel modbusTCPSpecModel new ModbusTCPSpecModel();modbusTCPSpecModel.setMode(linkSpec.getInteger(mode));modbusTCPSpecModel.setDeviceId(linkSpec.getLong(deviceId));modbusTCPSpecModel.setIp(linkSpec.getString(ip));modbusTCPSpecModel.setPort(linkSpec.getInteger(port));modbusTCPSpecModel.setSlaveId(linkSpec.getInteger(slaveId));JSONArray pointDetailJsonArray linkSpec.getJSONArray(pointDetailList);ListPointDetail tcpPointDetailList new ArrayList();for (Object pointDetailObject : pointDetailJsonArray) {JSONObject pointDetail (JSONObject)pointDetailObject; PointDetail tcpPointDetail new PointDetail();tcpPointDetail.setKey(pointDetail.getString(key));tcpPointDetail.setOffset(pointDetail.getInteger(offset));tcpPointDetail.setNumOfRegisters(pointDetail.getInteger(numOfRegisters));tcpPointDetail.setSamplingInterval(pointDetail.getInteger(samplingInterval));tcpPointDetailList.add(tcpPointDetail);}modbusTCPSpecModel.setPointDetailList(tcpPointDetailList);return modbusTCPSpecModel;}
}
2.2.5 Virtual相关类实现
2.2.5.1 VirtualProtocol
Data
Slf4j
Component
public class VirtualProtocol extends AbstractProtocol {private VirtualSpecModel virtualSpecModel;Autowiredprivate ICollectDataWorker collectDataWorker;Overridepublic int getType() {return -1;}Overridepublic void doParseConfig(ProtocolContext protocolContext) {JSONObject jsonObject protocolContext.getParam();this.virtualSpecModel VirtualSpecModel.getFromParam(jsonObject);}Overridepublic void doStart() {try {VirtualCollectWorker worker new VirtualCollectWorker(dttaskId, dttaskJobId, deviceId, virtualSpecModel);collectDataWorker.addCollectTask(dttaskJobId, worker, 5, 1, TimeUnit.SECONDS);} catch (Exception e) {log.error(DttaskId{}采集config{}出现异常, dttaskId, protocolContext, e);}}
}
2.2.5.2 VirtualCollectWorker
Slf4j
public class VirtualCollectWorker implements Runnable {private Long dttaskId;private Long dttaskJobId;private Long deviceId;private VirtualSpecModel virtualSpecModel;public VirtualCollectWorker(Long dttaskId, Long dttaskJobId, Long deviceId, VirtualSpecModel virtualSpecModel) {this.dttaskId dttaskId;this.dttaskJobId dttaskJobId;this.deviceId deviceId;this.virtualSpecModel virtualSpecModel;}Overridepublic void run() {log.info(deviceId{},dttaskId{},dttaskJobId{},virtualSpecModel{},deviceId, dttaskId, dttaskJobId, virtualSpecModel);}
}
2.2.5.3 VirtualSpecModel
Data
Slf4j
public class VirtualSpecModel {private VirtualSpecModel() {}public static VirtualSpecModel getFromParam(JSONObject jsonObject) {log.debug(VirtualSpecModel.getFromParam param{}, jsonObject);return new VirtualSpecModel();}}
2.3 将Job和Protocol串起来的类设计
2.3.1 ICollectDataWorker
public interface ICollectDataWorker {void addCollectTask(long dttaskJobId, Runnable runnable, int delay, int period, TimeUnit timeUnit);void removeCollectMonitor(long dttaskJobId);ScheduledFutureVoid getCollectMonitorScheduledFuture(long dttaskJobId);void doCollect(SetLong dttaskJobId);void stopCollect(SetLong dttaskJobId);
}
2.3.2 CommonCollectDataWorker
Component
Slf4j
public class CommonCollectDataWorker implements ICollectDataWorker {private ScheduledExecutorService collectDataExecutor new InfiniteScheduledThreadPoolExecutor(10, new CustomThreadFactory(CommonCollectDataWorker-THREAD-));private MapLong, ScheduledFutureVoid monitorDttaskJobStateMap new ConcurrentHashMap();public void addCollectTask(long dttaskJobId, Runnable runnable, int delay, int period, TimeUnit timeUnit) {ScheduledFutureVoid scheduledFuture (ScheduledFutureVoid) collectDataExecutor.scheduleAtFixedRate(runnable, delay, period, timeUnit);monitorDttaskJobStateMap.put(dttaskJobId, scheduledFuture);}public synchronized void removeCollectMonitor(long dttaskJobId) {monitorDttaskJobStateMap.remove(dttaskJobId);}public ScheduledFutureVoid getCollectMonitorScheduledFuture(long dttaskJobId) {return monitorDttaskJobStateMap.get(dttaskJobId);}Overridepublic void doCollect(SetLong dttaskJobIds) {log.info(进入 CommonCollectDataWorker.doCollect, dttaskJobIds{}, dttaskJobIds);ListDttaskJob dttaskJobs BeanUseHelper.entityHelpService().queryDttaskJob(dttaskJobIds);for (DttaskJob dttaskJob : dttaskJobs) {ProtocolContext protocolContext new ProtocolContext();protocolContext.setDttaskId(dttaskJob.getDttaskId());protocolContext.setDeviceId(dttaskJob.getDeviceId());protocolContext.setDttaskJobId(dttaskJob.getId());protocolContext.setLinkType(dttaskJob.getLinkType());JSONObject param new JSONObject();param.put(linkSpec, JSON.parseObject(JSON.toJSONString(dttaskJob.getLinkSpec())));param.put(jobSpec, JSON.parseObject(JSON.toJSONString(dttaskJob.getJobSpec())));protocolContext.setParam(param);IProtocol protocol ProtocolManager.getProtocol(protocolContext.getLinkType());protocol.start(protocolContext);}}Overridepublic void stopCollect(SetLong dttaskJobIds) {log.info(进入 CommonCollectDataWorker.stopCollect, dttaskJobIds{}, dttaskJobIds);for (Long dttaskJobId : dttaskJobIds) {ScheduledFuture? scheduledFuture getCollectMonitorScheduledFuture(dttaskJobId);scheduledFuture.cancel(true);removeCollectMonitor(dttaskJobId);}}
}
2.3.2 CollectDataService
Component
Slf4j
public class CollectDataService {Autowiredprivate ICollectDataWorker collectDataWorker;private CollectDataService() {}/*** 开始采集数据*/public void startCollectData(SetLong dttaskJobId) {BeanUseHelper.entityHelpService().runDttaskJob(dttaskJobId, ServerInfo.getServerId());collectDataWorker.doCollect(dttaskJobId);}public void stopCollectData(SetLong dttaskJobId) {collectDataWorker.stopCollect(dttaskJobId);BeanUseHelper.entityHelpService().stopDttaskJob(dttaskJobId, ServerInfo.getServerId());}
}
根据业务需要使用CollectDataService它作为采集数据的业务管理类承接上层业务目的然后组织实现逻辑后交给ICollectDataWorker的实现去完成。
ICollectDataWorker的实现对接Protocol的实现组合Protocol需要的参数调用具体的Protocol完成特定协议采集数据的功能
3. 验证
代码地址在GitHub - swsm/dttask: 分布式插件化任务执行框架 欢迎Star❤️
3.1 准备数据
数据库脚本 这里job使用的protocol都是Virtual方便大家测试
INSERT INTO t_job (id, device_id, device_link_id, link_type, link_spec, job_spec, remark, delete_flag, created_at, updated_at) VALUES (1, 1, 1, -1, {\mode\: \0\, \parity\: \None\, \unitId\: 1, \baudRate\: 9600, \databits\: 8, \deviceId\: 1, \portName\: \COM4\, \stopbits\: 1, \pointDetail\: [{\key\: \1\, \offset\: 1, \numOfRegisters\: 2, \samplingInterval\: 1000}, {\key\: \2\, \offset\: 2, \numOfRegisters\: 1, \samplingInterval\: 2000}, {\key\: \3\, \offset\: 3, \numOfRegisters\: 3, \samplingInterval\: 1000}, {\key\: \4\, \offset\: 4, \numOfRegisters\: 5, \samplingInterval\: 3000}]}, {\mode\: \0\, \parity\: \None\, \unitId\: 1, \baudRate\: 9600, \databits\: 8, \deviceId\: 1, \portName\: \COM4\, \stopbits\: 1, \pointDetail\: [{\key\: \1\, \offset\: 1, \numOfRegisters\: 2, \samplingInterval\: 1000}, {\key\: \2\, \offset\: 2, \numOfRegisters\: 1, \samplingInterval\: 2000}, {\key\: \3\, \offset\: 3, \numOfRegisters\: 3, \samplingInterval\: 1000}, {\key\: \4\, \offset\: 4, \numOfRegisters\: 5, \samplingInterval\: 3000}]}, NULL, 0, 2023-12-12 16:35:37, 2023-12-12 16:35:37);
INSERT INTO t_job (id, device_id, device_link_id, link_type, link_spec, job_spec, remark, delete_flag, created_at, updated_at) VALUES (2, 2, 2, -1, {\ip\: \127.0.0.1\, \mode\: \1\, \port\: 1234, \slaveId\: 1, \pointDetail\: [{\key\: \1\, \offset\: 1, \numOfRegisters\: 2, \samplingInterval\: 1000}, {\key\: \2\, \offset\: 2, \numOfRegisters\: 1, \samplingInterval\: 2000}, {\key\: \3\, \offset\: 3, \numOfRegisters\: 3, \samplingInterval\: 1000}, {\key\: \4\, \offset\: 4, \numOfRegisters\: 5, \samplingInterval\: 3000}]}, {\ip\: \127.0.0.1\, \mode\: \1\, \port\: 1234, \slaveId\: 1, \pointDetail\: [{\key\: \1\, \offset\: 1, \numOfRegisters\: 2, \samplingInterval\: 1000}, {\key\: \2\, \offset\: 2, \numOfRegisters\: 1, \samplingInterval\: 2000}, {\key\: \3\, \offset\: 3, \numOfRegisters\: 3, \samplingInterval\: 1000}, {\key\: \4\, \offset\: 4, \numOfRegisters\: 5, \samplingInterval\: 3000}]}, NULL, 0, 2023-12-12 16:35:40, 2023-12-12 16:35:40);
INSERT INTO t_job (id, device_id, device_link_id, link_type, link_spec, job_spec, remark, delete_flag, created_at, updated_at) VALUES (3, 3, 3, -1, {\mode\: \0\, \parity\: \None\, \unitId\: 1, \baudRate\: 9600, \databits\: 8, \deviceId\: 1, \portName\: \COM4\, \stopbits\: 1, \pointDetail\: [{\key\: \1\, \offset\: 1, \numOfRegisters\: 2, \samplingInterval\: 1000}, {\key\: \2\, \offset\: 2, \numOfRegisters\: 1, \samplingInterval\: 2000}, {\key\: \3\, \offset\: 3, \numOfRegisters\: 3, \samplingInterval\: 1000}, {\key\: \4\, \offset\: 4, \numOfRegisters\: 5, \samplingInterval\: 3000}]}, {\mode\: \0\, \parity\: \None\, \unitId\: 1, \baudRate\: 9600, \databits\: 8, \deviceId\: 1, \portName\: \COM4\, \stopbits\: 1, \pointDetail\: [{\key\: \1\, \offset\: 1, \numOfRegisters\: 2, \samplingInterval\: 1000}, {\key\: \2\, \offset\: 2, \numOfRegisters\: 1, \samplingInterval\: 2000}, {\key\: \3\, \offset\: 3, \numOfRegisters\: 3, \samplingInterval\: 1000}, {\key\: \4\, \offset\: 4, \numOfRegisters\: 5, \samplingInterval\: 3000}]}, NULL, 0, 2023-12-12 16:35:44, 2023-12-12 16:35:44);
INSERT INTO t_job (id, device_id, device_link_id, link_type, link_spec, job_spec, remark, delete_flag, created_at, updated_at) VALUES (4, 4, 4, -1, {\ip\: \127.0.0.1\, \mode\: \1\, \port\: 1234, \slaveId\: 1, \pointDetail\: [{\key\: \1\, \offset\: 1, \numOfRegisters\: 2, \samplingInterval\: 1000}, {\key\: \2\, \offset\: 2, \numOfRegisters\: 1, \samplingInterval\: 2000}, {\key\: \3\, \offset\: 3, \numOfRegisters\: 3, \samplingInterval\: 1000}, {\key\: \4\, \offset\: 4, \numOfRegisters\: 5, \samplingInterval\: 3000}]}, {\ip\: \127.0.0.1\, \mode\: \1\, \port\: 1234, \slaveId\: 1, \pointDetail\: [{\key\: \1\, \offset\: 1, \numOfRegisters\: 2, \samplingInterval\: 1000}, {\key\: \2\, \offset\: 2, \numOfRegisters\: 1, \samplingInterval\: 2000}, {\key\: \3\, \offset\: 3, \numOfRegisters\: 3, \samplingInterval\: 1000}, {\key\: \4\, \offset\: 4, \numOfRegisters\: 5, \samplingInterval\: 3000}]}, NULL, 0, 2023-12-12 16:35:45, 2023-12-12 16:35:45);
INSERT INTO t_job (id, device_id, device_link_id, link_type, link_spec, job_spec, remark, delete_flag, created_at, updated_at) VALUES (5, 5, 5, -1, {\mode\: \0\, \parity\: \None\, \unitId\: 1, \baudRate\: 9600, \databits\: 8, \deviceId\: 1, \portName\: \COM4\, \stopbits\: 1, \pointDetail\: [{\key\: \1\, \offset\: 1, \numOfRegisters\: 2, \samplingInterval\: 1000}, {\key\: \2\, \offset\: 2, \numOfRegisters\: 1, \samplingInterval\: 2000}, {\key\: \3\, \offset\: 3, \numOfRegisters\: 3, \samplingInterval\: 1000}, {\key\: \4\, \offset\: 4, \numOfRegisters\: 5, \samplingInterval\: 3000}]}, {\mode\: \0\, \parity\: \None\, \unitId\: 1, \baudRate\: 9600, \databits\: 8, \deviceId\: 1, \portName\: \COM4\, \stopbits\: 1, \pointDetail\: [{\key\: \1\, \offset\: 1, \numOfRegisters\: 2, \samplingInterval\: 1000}, {\key\: \2\, \offset\: 2, \numOfRegisters\: 1, \samplingInterval\: 2000}, {\key\: \3\, \offset\: 3, \numOfRegisters\: 3, \samplingInterval\: 1000}, {\key\: \4\, \offset\: 4, \numOfRegisters\: 5, \samplingInterval\: 3000}]}, NULL, 0, 2023-12-12 16:35:45, 2023-12-12 16:35:45);
INSERT INTO t_job (id, device_id, device_link_id, link_type, link_spec, job_spec, remark, delete_flag, created_at, updated_at) VALUES (6, 6, 6, -1, {\ip\: \127.0.0.1\, \mode\: \1\, \port\: 1234, \slaveId\: 1, \pointDetail\: [{\key\: \1\, \offset\: 1, \numOfRegisters\: 2, \samplingInterval\: 1000}, {\key\: \2\, \offset\: 2, \numOfRegisters\: 1, \samplingInterval\: 2000}, {\key\: \3\, \offset\: 3, \numOfRegisters\: 3, \samplingInterval\: 1000}, {\key\: \4\, \offset\: 4, \numOfRegisters\: 5, \samplingInterval\: 3000}]}, {\ip\: \127.0.0.1\, \mode\: \1\, \port\: 1234, \slaveId\: 1, \pointDetail\: [{\key\: \1\, \offset\: 1, \numOfRegisters\: 2, \samplingInterval\: 1000}, {\key\: \2\, \offset\: 2, \numOfRegisters\: 1, \samplingInterval\: 2000}, {\key\: \3\, \offset\: 3, \numOfRegisters\: 3, \samplingInterval\: 1000}, {\key\: \4\, \offset\: 4, \numOfRegisters\: 5, \samplingInterval\: 3000}]}, NULL, 0, 2023-12-12 16:35:47, 2023-12-12 16:35:47);3.2 启动三个节点
这里会看到3个节点启动完成1号节点为Controller2 3号节点为Follower这是前面的逻辑不清楚的可以看前面的文章。
然后Controller会将任务按照任务分配策略分配给所有节点(包括自己)然后每个节点执行对应的任务。
数据库t_dttask_job表里有了对应每个节点的任务 各节点每秒执行一次任务 因为CollectManager类已经封装了doCollect 和 stopCollect的方法大家自行创建Controller调用方法就可以实现对某个任务的停止和启动了