当前位置: 首页 > news >正文

专业营销网站建设四川盼之网络科技官网

专业营销网站建设,四川盼之网络科技官网,linux网站建设技术指南,aso.net 网站开发1.设计 执行任务找一个落地场景#xff1a;连接设备采集参数。设备有不同的协议#xff0c;如#xff1a;modbus rtu、modbus tcp、opc ua、simens s7等。协议多种多样#xff0c;需要的参数也不同#xff0c;连接及任务执行参数存放在t_job表的link_spec中#xff0c;任…1.设计 执行任务找一个落地场景连接设备采集参数。设备有不同的协议如modbus rtu、modbus tcp、opc ua、simens s7等。协议多种多样需要的参数也不同连接及任务执行参数存放在t_job表的link_spec中任务的配置存储在job_spec中存储格式都是json。 1.1 任务的流转 1.2 类设计 1.2.1 Job类设计 JobAllotStrategy是接口目前只定义了allotJob(分配任务)业务方法后面会根据情况增加下面AbstractAllotStrategy是抽象类实现了JobAllotStrategy接口将底层的具体实现的公共方法放在这里。 AverageJobAllotStrategy平均分配任务策略实现类 WeightJobAllotStrategy权重分配任务策略实现类 ... 后面可以有很多 1.2.2 辅助类设计 ExecuteDttaskJobContext任务相关参数的封装 JobAllotManager对不同任务分配策略的集中管理 1.2.3 协议相关类的设计 IProtocol顶层接口定义协议的类型、开始方法和处理配置方法 AbstractProtocol抽象类对下面实际协议的实现的方式进行再细化以及共用逻辑的统一 ModbusRTUProtocol针对Modbus RTU协议的处理 ModbusTCPProtocol针对Modbus TCP协议的处理 VirtualProtocol:针对测试环境虚拟化的协议处理 .... 可以自定义扩充 1.2.4 协议相关辅助类 CollectDataService采集数据的最上层服务类 ICollectDataWorker采集数据的接口定义此处提供接口方便后续不同采集数据实现的扩充 CommonCollectDataWorker一般采集数据接口实现使用java原生的ScheduledExecutorService实现周期性采集任务 2. 实现 2.1 Job相关类实现 2.1.1 JobAllotStrategy 目前此类很简单只有分配job的类型和分配job两个方法后面再去实现rebalanceJob的功能 public interface JobAllotStrategy {JobAllotStrategyType getType();void allotJob();} 2.1.2 AbstractJobAllotStrategy JobAllotStrategy的抽象实现后续所有Strategy的通用实现以及一些复杂逻辑的组合可以放在这里 public abstract class AbstractJobAllotStrategy implements JobAllotStrategy {public ListJob getAllJob() {return BeanUseHelper.entityHelpService().getAllJob();}public ListDttaskJob getAllCrawlerJob() {return BeanUseHelper.entityHelpService().getAllDttaskJob();}public ListDttaskJob getByCrawlerId(long dttaskId) {return BeanUseHelper.entityHelpService().queryDttaskJob(dttaskId);}} 2.1.3 AverageJobAllotStrategy 平均分配job的策略实现 WeightJobAllotStrategy可以模仿此类实现主要逻辑是从每个Dttask节点解析配置的不同weight指标默认是1:1:1 可以配置成2:1:1,这样如果有8个任务那么1号节点分配4个2号和3号节点各分配2个 Component Slf4j public class AverageJobAllotStrategy extends AbstractJobAllotStrategy {Autowiredprivate CollectDataService collectDataService;Overridepublic JobAllotStrategyType getType() {return JobAllotStrategyType.AVERAGE;}Overridepublic void allotJob() {EntityHelpService entityHelpService BeanUseHelper.entityHelpService();MapLong, ListJob allotJobMap getAllotJobMap();log.info(allotJobMap{}, allotJobMap);entityHelpService.invalidAllDttaskJob();entityHelpService.saveAllDttaskJob(allotJobMap);MapLong, ListDttaskJob dttaskJobMap entityHelpService.queryDttaskJob();executeDttaskJob(new ExecuteDttaskJobContext(dttaskJobMap, true));}private void executeDttaskJob(ExecuteDttaskJobContext executeDttaskJobContext) {MapLong, ListDttaskJob dttaskJobMap executeDttaskJobContext.getDttaskJobMap();boolean startFlag executeDttaskJobContext.getStartFlag();dttaskJobMap.forEach((dttaskId, dttaskJobList) - {if (!Objects.equals(ServerInfo.getServerId(), dttaskId)) {// 向其它节点发送 任务控制 命令SetLong dttaskJobIdList dttaskJobList.stream().map(DttaskJob::getId).collect(Collectors.toSet());DttaskMessage controlCollectMessage DttaskMessage.buildControlCollectMessage(dttaskJobIdList, startFlag, dttaskId);log.info(向nodeId{}发送采集控制指令{}, controlCollectMessage);ServerInfo.getChannelByServerId(dttaskId).writeAndFlush(controlCollectMessage);} else {log.info({}分配给自己的采集任务{}, startFlag ? 执行 : 停止, dttaskJobList);SetLong dttaskJobIds dttaskJobList.stream().map(DttaskJob::getId).collect(Collectors.toSet());if (startFlag) {collectDataService.startCollectData(dttaskJobIds);} else {collectDataService.stopCollectData(dttaskJobIds);}}});}private MapLong, ListJob getAllotJobMap() {ListJob allJob getAllJob();return average(allJob);}private T MapLong, ListT average(ListT list) {ListNodeInfo nodeInfoList ServerInfo.getNodeInfoList();int nodeCount nodeInfoList.size();MapLong, ListT allotJobMap new HashMap();int averageJobCount list.size() / nodeCount;int remainingJobCount list.size() % nodeCount;int currentIndex 0;for (NodeInfo nodeInfo : nodeInfoList) {allotJobMap.put(nodeInfo.getServerId(), list.subList(currentIndex, currentIndex averageJobCount));currentIndex averageJobCount;}while (remainingJobCount ! 0) {for (Map.EntryLong, ListT entry : allotJobMap.entrySet()) {entry.getValue().addAll(list.subList(currentIndex, currentIndex 1));currentIndex;remainingJobCount--;}}return allotJobMap;}} 2.1.4 ExecuteDttaskJobContext 执行任务传递的上下文信息这里还比较简单只是一个Map和执行的是启动还是停止后续可以根据业务扩展它也可以自带逻辑 Data AllArgsConstructor NoArgsConstructor public class ExecuteDttaskJobContext {private MapLong, ListDttaskJob dttaskJobMap;private Boolean startFlag;} 2.1.5 JobAllotStrategyType 分配job的策略枚举类 public enum JobAllotStrategyType {AVERAGE(0), WEIGHT(1), SPECIFIC(2);int code;JobAllotStrategyType(int code) {this.code code;}public static JobAllotStrategyType from(int code) {for (JobAllotStrategyType value : values()) {if (value.code code) {return value;}}throw new BusinessException(CharSequenceUtil.format(code{}不在JobAllotStrategyType中, code));} } 2.1.6 JobAllotManager 和前面的netty任务处理策略一样这里我也借助Spring容器管理所有JobAllotStrategy的策略并根据配置选择当前任务的策略 Component Slf4j public class JobAllotManager {Autowiredprivate ListJobAllotStrategy jobAllotStrategies;private static final MapJobAllotStrategyType, JobAllotStrategy map new EnumMap(JobAllotStrategyType.class);PostConstructpublic void init() {if (jobAllotStrategies ! null) {for (JobAllotStrategy jobAllotStrategy : jobAllotStrategies) {map.put(jobAllotStrategy.getType(), jobAllotStrategy);}}}public static JobAllotStrategy getStrategy() {JobAllotStrategyType allotJobStrategyType JobAllotStrategyType.from(BeanUseHelper.dttaskServerConfig().getAllotJobStrategyType());if (map.containsKey(allotJobStrategyType)) {return map.get(allotJobStrategyType);}throw new BusinessException(CharSequenceUtil.format(allotJobStrategyType{} 配置有误, allotJobStrategyType));}} 2.2 Protocol相关类实现 Protocol后面的变化会很多不同协议需要的参数处理的逻辑都不相同目前仅进行较简化的封装 2.2.1 IProtocol 协议接口主要是启动协议处理我暂时将协议关闭的逻辑放到了Job这一侧去完成也可以在这里提供一个stop接口自己协议完成 public interface IProtocol {int getType();void start(ProtocolContext protocolContext);void parseConfig(ProtocolContext protocolContext);}2.2.2 AbstractProtocol Slf4j public abstract class AbstractProtocol implements IProtocol {protected Long dttaskId;protected Long dttaskJobId;protected Long deviceId;protected ProtocolContext protocolContext;public void parseConfig(ProtocolContext protocolContext) {this.dttaskId protocolContext.getDttaskId();this.dttaskJobId protocolContext.getDttaskJobId();this.deviceId protocolContext.getDeviceId();this.protocolContext protocolContext;doParseConfig(protocolContext);}protected abstract void doParseConfig(ProtocolContext protocolContext);public abstract void doStart();public void start(ProtocolContext protocolContext) {log.info(进入 AbstractProtocol.start, protocolContext{}, protocolContext);parseConfig(protocolContext);doStart();}} 2.2.3 Modbus RTU相关类实现 2.2.3.1 ModbusRTUProtocol Data Slf4j Component public class ModbusRTUProtocol extends AbstractProtocol {private ModbusRTUSpecModel modbusRTUSpecModel;Autowiredprivate ICollectDataWorker collectDataWorker;Overridepublic void doParseConfig(ProtocolContext protocolContext) {JSONObject jsonObject protocolContext.getParam();this.modbusRTUSpecModel ModbusRTUSpecModel.getFromParam(jsonObject);}Overridepublic int getType() {return 0;}Overridepublic void doStart() {try {SerialParameters serialParameters new SerialParameters();serialParameters.setPortName(modbusRTUSpecModel.getPortName());serialParameters.setBaudRate(modbusRTUSpecModel.getBaudRate());serialParameters.setDatabits(modbusRTUSpecModel.getDatabits());serialParameters.setStopbits(modbusRTUSpecModel.getStopbits());serialParameters.setParity(modbusRTUSpecModel.getParity());serialParameters.setEncoding(modbusRTUSpecModel.getEncoding());serialParameters.setEcho(modbusRTUSpecModel.getEcho());serialParameters.setPortName(modbusRTUSpecModel.getPortName());ModbusSerialMaster modbusSerialMaster new ModbusSerialMaster(serialParameters);modbusSerialMaster.connect();ModbusRTUCollectWorker modbusRTUCollectWorker new ModbusRTUCollectWorker(this.dttaskId, this.dttaskJobId, deviceId, modbusSerialMaster, modbusRTUSpecModel);collectDataWorker.addCollectTask(dttaskJobId, modbusRTUCollectWorker, 5, 1, TimeUnit.SECONDS);} catch (Exception e) {log.error(DttaskId{}采集config{}出现异常, this.dttaskId, protocolContext, e);}} } 2.2.3.2 ModbusRTUCollectWorker Slf4j public class ModbusRTUCollectWorker implements Runnable {private Long dttaskId;private Long dttaskJobId;private Long deviceId;private ModbusSerialMaster master;private ModbusRTUSpecModel modbusRTUSpecModel;private MapString, Long lastReadMap new HashMap();public ModbusRTUCollectWorker(Long dttaskId, Long dttaskJobId, Long deviceId, ModbusSerialMaster master, ModbusRTUSpecModel modbusRTUSpecModel) {this.dttaskId dttaskId;this.dttaskJobId dttaskJobId;this.deviceId deviceId;this.master master;this.modbusRTUSpecModel modbusRTUSpecModel;}Overridepublic void run() {long current new Date().getTime();for (ModbusRTUSpecModel.PointDetail pointDetail : modbusRTUSpecModel.getPointDetailList()) {String key pointDetail.getKey();if (lastReadMap.containsKey(key) (current - lastReadMap.get(key)) % pointDetail.getSamplingInterval() 1) {try {Register[] registers master.readMultipleRegisters(modbusRTUSpecModel.getUnitId(),pointDetail.getOffset(),pointDetail.getNumOfRegisters());for (Register register : registers) {log.info(Register value:{}, register.getValue());}} catch (Exception e) {log.error(DttaskId{}采集registerConfig{}出现异常, this.dttaskId, pointDetail, e);}}}} }2.2.3.3 ModbusRTUSpecModel Data public class ModbusRTUSpecModel {private Integer mode;private Integer unitId;private String portName;private Integer baudRate;private Integer databits;private String parity;private Integer stopbits;private String encoding RTU;private Boolean echo false;private ListPointDetail pointDetailList;Datapublic static class PointDetail {private String key;private Integer offset;private Integer numOfRegisters;private Integer samplingInterval;}public static ModbusRTUSpecModel getFromParam(JSONObject param) {JSONObject linkSpec param.getJSONObject(linkSpec);ModbusRTUSpecModel modbusRTUSpecModel new ModbusRTUSpecModel();modbusRTUSpecModel.setMode(linkSpec.getInteger(mode));modbusRTUSpecModel.setUnitId(linkSpec.getInteger(unitId));modbusRTUSpecModel.setPortName(linkSpec.getString(portName));modbusRTUSpecModel.setBaudRate(linkSpec.getInteger(baudRate));modbusRTUSpecModel.setDatabits(linkSpec.getInteger(databits));modbusRTUSpecModel.setParity(linkSpec.getString(parity));modbusRTUSpecModel.setStopbits(linkSpec.getInteger(stopbits));JSONArray pointDetailJsonArray linkSpec.getJSONArray(pointDetailList);ListPointDetail rtuPointDetailList new ArrayList();for (Object pointDetailObject : pointDetailJsonArray) {JSONObject pointDetail (JSONObject)pointDetailObject;PointDetail rtuPointDetail new PointDetail();rtuPointDetail.setKey(pointDetail.getString(key));rtuPointDetail.setOffset(pointDetail.getInteger(offset));rtuPointDetail.setNumOfRegisters(pointDetail.getInteger(numOfRegisters));rtuPointDetail.setSamplingInterval(pointDetail.getInteger(samplingInterval));rtuPointDetailList.add(rtuPointDetail);}modbusRTUSpecModel.setPointDetailList(rtuPointDetailList);return modbusRTUSpecModel;} } 2.2.4 Modbus TCP相关类实现 2.2.4.1 ModbusTCPProtocol Component Slf4j public class ModbusTCPProtocol extends AbstractProtocol {private ModbusTCPSpecModel modbusTCPSpecModel;Autowiredprivate ICollectDataWorker collectDataWorker;Overridepublic int getType() {return Constant.EntityConstants.LINK_TYPE_MODBUSTCP;}Overrideprotected void doParseConfig(ProtocolContext protocolContext) {JSONObject param protocolContext.getParam();modbusTCPSpecModel ModbusTCPSpecModel.getFromParam(param);}Overridepublic void doStart() {try {ModbusTCPMaster modbusTCPMaster new ModbusTCPMaster(modbusTCPSpecModel.getIp(), modbusTCPSpecModel.getPort(), 5, true);modbusTCPMaster.connect();ModbusTCPCollectWorker worker new ModbusTCPCollectWorker(this.dttaskId, this.dttaskJobId, deviceId, modbusTCPSpecModel, modbusTCPMaster);collectDataWorker.addCollectTask(dttaskJobId, worker, 5, 1, TimeUnit.SECONDS);} catch (Exception e) {log.error(DttaskId{}采集config{}出现异常, dttaskId, modbusTCPSpecModel, e);}} } 2.2.4.2 ModbusTCPCollectWorker Slf4j public class ModbusTCPCollectWorker implements Runnable {private Long dttaskId;private Long dttaskJobId;private Long deviceId;private ModbusTCPSpecModel modbusTCPSpecModel;private ModbusTCPMaster master;private MapString, Long lastReadMap new HashMap();public ModbusTCPCollectWorker(Long dttaskId, Long dttaskJobId, Long deviceId, ModbusTCPSpecModel modbusTCPSpecModel, ModbusTCPMaster master) {this.dttaskId dttaskId;this.dttaskJobId dttaskJobId;this.deviceId deviceId;this.modbusTCPSpecModel modbusTCPSpecModel;this.master master;}Overridepublic void run() {long current new Date().getTime();for (ModbusTCPSpecModel.PointDetail pointDetail : modbusTCPSpecModel.getPointDetailList()) {String key pointDetail.getKey();if (lastReadMap.containsKey(key) (current - lastReadMap.get(key)) % pointDetail.getSamplingInterval() 1) {try {Register[] registers master.readMultipleRegisters(modbusTCPSpecModel.getSlaveId(),pointDetail.getOffset(),pointDetail.getNumOfRegisters());for (Register register : registers) {log.info(Register value:{}, register.getValue());}} catch (Exception e) {log.error(DttaskId{}采集pointDetail{}出现异常, dttaskId, pointDetail, e);}}}} } 2.2.4.3 ModbusTCPSpecModel Data public class ModbusTCPSpecModel {private Integer mode;private Long deviceId;private String ip;private Integer port;private Integer slaveId;private ListPointDetail pointDetailList;Datapublic static class PointDetail {private String key;private Integer offset;private Integer numOfRegisters;private Integer samplingInterval;}public static ModbusTCPSpecModel getFromParam(JSONObject param) {JSONObject linkSpec param.getJSONObject(linkSpec);ModbusTCPSpecModel modbusTCPSpecModel new ModbusTCPSpecModel();modbusTCPSpecModel.setMode(linkSpec.getInteger(mode));modbusTCPSpecModel.setDeviceId(linkSpec.getLong(deviceId));modbusTCPSpecModel.setIp(linkSpec.getString(ip));modbusTCPSpecModel.setPort(linkSpec.getInteger(port));modbusTCPSpecModel.setSlaveId(linkSpec.getInteger(slaveId));JSONArray pointDetailJsonArray linkSpec.getJSONArray(pointDetailList);ListPointDetail tcpPointDetailList new ArrayList();for (Object pointDetailObject : pointDetailJsonArray) {JSONObject pointDetail (JSONObject)pointDetailObject; PointDetail tcpPointDetail new PointDetail();tcpPointDetail.setKey(pointDetail.getString(key));tcpPointDetail.setOffset(pointDetail.getInteger(offset));tcpPointDetail.setNumOfRegisters(pointDetail.getInteger(numOfRegisters));tcpPointDetail.setSamplingInterval(pointDetail.getInteger(samplingInterval));tcpPointDetailList.add(tcpPointDetail);}modbusTCPSpecModel.setPointDetailList(tcpPointDetailList);return modbusTCPSpecModel;} } 2.2.5 Virtual相关类实现 2.2.5.1 VirtualProtocol Data Slf4j Component public class VirtualProtocol extends AbstractProtocol {private VirtualSpecModel virtualSpecModel;Autowiredprivate ICollectDataWorker collectDataWorker;Overridepublic int getType() {return -1;}Overridepublic void doParseConfig(ProtocolContext protocolContext) {JSONObject jsonObject protocolContext.getParam();this.virtualSpecModel VirtualSpecModel.getFromParam(jsonObject);}Overridepublic void doStart() {try {VirtualCollectWorker worker new VirtualCollectWorker(dttaskId, dttaskJobId, deviceId, virtualSpecModel);collectDataWorker.addCollectTask(dttaskJobId, worker, 5, 1, TimeUnit.SECONDS);} catch (Exception e) {log.error(DttaskId{}采集config{}出现异常, dttaskId, protocolContext, e);}} } 2.2.5.2 VirtualCollectWorker Slf4j public class VirtualCollectWorker implements Runnable {private Long dttaskId;private Long dttaskJobId;private Long deviceId;private VirtualSpecModel virtualSpecModel;public VirtualCollectWorker(Long dttaskId, Long dttaskJobId, Long deviceId, VirtualSpecModel virtualSpecModel) {this.dttaskId dttaskId;this.dttaskJobId dttaskJobId;this.deviceId deviceId;this.virtualSpecModel virtualSpecModel;}Overridepublic void run() {log.info(deviceId{},dttaskId{},dttaskJobId{},virtualSpecModel{},deviceId, dttaskId, dttaskJobId, virtualSpecModel);} } 2.2.5.3 VirtualSpecModel Data Slf4j public class VirtualSpecModel {private VirtualSpecModel() {}public static VirtualSpecModel getFromParam(JSONObject jsonObject) {log.debug(VirtualSpecModel.getFromParam param{}, jsonObject);return new VirtualSpecModel();}} 2.3 将Job和Protocol串起来的类设计 2.3.1 ICollectDataWorker public interface ICollectDataWorker {void addCollectTask(long dttaskJobId, Runnable runnable, int delay, int period, TimeUnit timeUnit);void removeCollectMonitor(long dttaskJobId);ScheduledFutureVoid getCollectMonitorScheduledFuture(long dttaskJobId);void doCollect(SetLong dttaskJobId);void stopCollect(SetLong dttaskJobId); } 2.3.2 CommonCollectDataWorker Component Slf4j public class CommonCollectDataWorker implements ICollectDataWorker {private ScheduledExecutorService collectDataExecutor new InfiniteScheduledThreadPoolExecutor(10, new CustomThreadFactory(CommonCollectDataWorker-THREAD-));private MapLong, ScheduledFutureVoid monitorDttaskJobStateMap new ConcurrentHashMap();public void addCollectTask(long dttaskJobId, Runnable runnable, int delay, int period, TimeUnit timeUnit) {ScheduledFutureVoid scheduledFuture (ScheduledFutureVoid) collectDataExecutor.scheduleAtFixedRate(runnable, delay, period, timeUnit);monitorDttaskJobStateMap.put(dttaskJobId, scheduledFuture);}public synchronized void removeCollectMonitor(long dttaskJobId) {monitorDttaskJobStateMap.remove(dttaskJobId);}public ScheduledFutureVoid getCollectMonitorScheduledFuture(long dttaskJobId) {return monitorDttaskJobStateMap.get(dttaskJobId);}Overridepublic void doCollect(SetLong dttaskJobIds) {log.info(进入 CommonCollectDataWorker.doCollect, dttaskJobIds{}, dttaskJobIds);ListDttaskJob dttaskJobs BeanUseHelper.entityHelpService().queryDttaskJob(dttaskJobIds);for (DttaskJob dttaskJob : dttaskJobs) {ProtocolContext protocolContext new ProtocolContext();protocolContext.setDttaskId(dttaskJob.getDttaskId());protocolContext.setDeviceId(dttaskJob.getDeviceId());protocolContext.setDttaskJobId(dttaskJob.getId());protocolContext.setLinkType(dttaskJob.getLinkType());JSONObject param new JSONObject();param.put(linkSpec, JSON.parseObject(JSON.toJSONString(dttaskJob.getLinkSpec())));param.put(jobSpec, JSON.parseObject(JSON.toJSONString(dttaskJob.getJobSpec())));protocolContext.setParam(param);IProtocol protocol ProtocolManager.getProtocol(protocolContext.getLinkType());protocol.start(protocolContext);}}Overridepublic void stopCollect(SetLong dttaskJobIds) {log.info(进入 CommonCollectDataWorker.stopCollect, dttaskJobIds{}, dttaskJobIds);for (Long dttaskJobId : dttaskJobIds) {ScheduledFuture? scheduledFuture getCollectMonitorScheduledFuture(dttaskJobId);scheduledFuture.cancel(true);removeCollectMonitor(dttaskJobId);}} } 2.3.2 CollectDataService Component Slf4j public class CollectDataService {Autowiredprivate ICollectDataWorker collectDataWorker;private CollectDataService() {}/*** 开始采集数据*/public void startCollectData(SetLong dttaskJobId) {BeanUseHelper.entityHelpService().runDttaskJob(dttaskJobId, ServerInfo.getServerId());collectDataWorker.doCollect(dttaskJobId);}public void stopCollectData(SetLong dttaskJobId) {collectDataWorker.stopCollect(dttaskJobId);BeanUseHelper.entityHelpService().stopDttaskJob(dttaskJobId, ServerInfo.getServerId());} } 根据业务需要使用CollectDataService它作为采集数据的业务管理类承接上层业务目的然后组织实现逻辑后交给ICollectDataWorker的实现去完成。 ICollectDataWorker的实现对接Protocol的实现组合Protocol需要的参数调用具体的Protocol完成特定协议采集数据的功能 3. 验证 代码地址在GitHub - swsm/dttask: 分布式插件化任务执行框架 欢迎Star❤️ 3.1 准备数据 数据库脚本 这里job使用的protocol都是Virtual方便大家测试 INSERT INTO t_job (id, device_id, device_link_id, link_type, link_spec, job_spec, remark, delete_flag, created_at, updated_at) VALUES (1, 1, 1, -1, {\mode\: \0\, \parity\: \None\, \unitId\: 1, \baudRate\: 9600, \databits\: 8, \deviceId\: 1, \portName\: \COM4\, \stopbits\: 1, \pointDetail\: [{\key\: \1\, \offset\: 1, \numOfRegisters\: 2, \samplingInterval\: 1000}, {\key\: \2\, \offset\: 2, \numOfRegisters\: 1, \samplingInterval\: 2000}, {\key\: \3\, \offset\: 3, \numOfRegisters\: 3, \samplingInterval\: 1000}, {\key\: \4\, \offset\: 4, \numOfRegisters\: 5, \samplingInterval\: 3000}]}, {\mode\: \0\, \parity\: \None\, \unitId\: 1, \baudRate\: 9600, \databits\: 8, \deviceId\: 1, \portName\: \COM4\, \stopbits\: 1, \pointDetail\: [{\key\: \1\, \offset\: 1, \numOfRegisters\: 2, \samplingInterval\: 1000}, {\key\: \2\, \offset\: 2, \numOfRegisters\: 1, \samplingInterval\: 2000}, {\key\: \3\, \offset\: 3, \numOfRegisters\: 3, \samplingInterval\: 1000}, {\key\: \4\, \offset\: 4, \numOfRegisters\: 5, \samplingInterval\: 3000}]}, NULL, 0, 2023-12-12 16:35:37, 2023-12-12 16:35:37); INSERT INTO t_job (id, device_id, device_link_id, link_type, link_spec, job_spec, remark, delete_flag, created_at, updated_at) VALUES (2, 2, 2, -1, {\ip\: \127.0.0.1\, \mode\: \1\, \port\: 1234, \slaveId\: 1, \pointDetail\: [{\key\: \1\, \offset\: 1, \numOfRegisters\: 2, \samplingInterval\: 1000}, {\key\: \2\, \offset\: 2, \numOfRegisters\: 1, \samplingInterval\: 2000}, {\key\: \3\, \offset\: 3, \numOfRegisters\: 3, \samplingInterval\: 1000}, {\key\: \4\, \offset\: 4, \numOfRegisters\: 5, \samplingInterval\: 3000}]}, {\ip\: \127.0.0.1\, \mode\: \1\, \port\: 1234, \slaveId\: 1, \pointDetail\: [{\key\: \1\, \offset\: 1, \numOfRegisters\: 2, \samplingInterval\: 1000}, {\key\: \2\, \offset\: 2, \numOfRegisters\: 1, \samplingInterval\: 2000}, {\key\: \3\, \offset\: 3, \numOfRegisters\: 3, \samplingInterval\: 1000}, {\key\: \4\, \offset\: 4, \numOfRegisters\: 5, \samplingInterval\: 3000}]}, NULL, 0, 2023-12-12 16:35:40, 2023-12-12 16:35:40); INSERT INTO t_job (id, device_id, device_link_id, link_type, link_spec, job_spec, remark, delete_flag, created_at, updated_at) VALUES (3, 3, 3, -1, {\mode\: \0\, \parity\: \None\, \unitId\: 1, \baudRate\: 9600, \databits\: 8, \deviceId\: 1, \portName\: \COM4\, \stopbits\: 1, \pointDetail\: [{\key\: \1\, \offset\: 1, \numOfRegisters\: 2, \samplingInterval\: 1000}, {\key\: \2\, \offset\: 2, \numOfRegisters\: 1, \samplingInterval\: 2000}, {\key\: \3\, \offset\: 3, \numOfRegisters\: 3, \samplingInterval\: 1000}, {\key\: \4\, \offset\: 4, \numOfRegisters\: 5, \samplingInterval\: 3000}]}, {\mode\: \0\, \parity\: \None\, \unitId\: 1, \baudRate\: 9600, \databits\: 8, \deviceId\: 1, \portName\: \COM4\, \stopbits\: 1, \pointDetail\: [{\key\: \1\, \offset\: 1, \numOfRegisters\: 2, \samplingInterval\: 1000}, {\key\: \2\, \offset\: 2, \numOfRegisters\: 1, \samplingInterval\: 2000}, {\key\: \3\, \offset\: 3, \numOfRegisters\: 3, \samplingInterval\: 1000}, {\key\: \4\, \offset\: 4, \numOfRegisters\: 5, \samplingInterval\: 3000}]}, NULL, 0, 2023-12-12 16:35:44, 2023-12-12 16:35:44); INSERT INTO t_job (id, device_id, device_link_id, link_type, link_spec, job_spec, remark, delete_flag, created_at, updated_at) VALUES (4, 4, 4, -1, {\ip\: \127.0.0.1\, \mode\: \1\, \port\: 1234, \slaveId\: 1, \pointDetail\: [{\key\: \1\, \offset\: 1, \numOfRegisters\: 2, \samplingInterval\: 1000}, {\key\: \2\, \offset\: 2, \numOfRegisters\: 1, \samplingInterval\: 2000}, {\key\: \3\, \offset\: 3, \numOfRegisters\: 3, \samplingInterval\: 1000}, {\key\: \4\, \offset\: 4, \numOfRegisters\: 5, \samplingInterval\: 3000}]}, {\ip\: \127.0.0.1\, \mode\: \1\, \port\: 1234, \slaveId\: 1, \pointDetail\: [{\key\: \1\, \offset\: 1, \numOfRegisters\: 2, \samplingInterval\: 1000}, {\key\: \2\, \offset\: 2, \numOfRegisters\: 1, \samplingInterval\: 2000}, {\key\: \3\, \offset\: 3, \numOfRegisters\: 3, \samplingInterval\: 1000}, {\key\: \4\, \offset\: 4, \numOfRegisters\: 5, \samplingInterval\: 3000}]}, NULL, 0, 2023-12-12 16:35:45, 2023-12-12 16:35:45); INSERT INTO t_job (id, device_id, device_link_id, link_type, link_spec, job_spec, remark, delete_flag, created_at, updated_at) VALUES (5, 5, 5, -1, {\mode\: \0\, \parity\: \None\, \unitId\: 1, \baudRate\: 9600, \databits\: 8, \deviceId\: 1, \portName\: \COM4\, \stopbits\: 1, \pointDetail\: [{\key\: \1\, \offset\: 1, \numOfRegisters\: 2, \samplingInterval\: 1000}, {\key\: \2\, \offset\: 2, \numOfRegisters\: 1, \samplingInterval\: 2000}, {\key\: \3\, \offset\: 3, \numOfRegisters\: 3, \samplingInterval\: 1000}, {\key\: \4\, \offset\: 4, \numOfRegisters\: 5, \samplingInterval\: 3000}]}, {\mode\: \0\, \parity\: \None\, \unitId\: 1, \baudRate\: 9600, \databits\: 8, \deviceId\: 1, \portName\: \COM4\, \stopbits\: 1, \pointDetail\: [{\key\: \1\, \offset\: 1, \numOfRegisters\: 2, \samplingInterval\: 1000}, {\key\: \2\, \offset\: 2, \numOfRegisters\: 1, \samplingInterval\: 2000}, {\key\: \3\, \offset\: 3, \numOfRegisters\: 3, \samplingInterval\: 1000}, {\key\: \4\, \offset\: 4, \numOfRegisters\: 5, \samplingInterval\: 3000}]}, NULL, 0, 2023-12-12 16:35:45, 2023-12-12 16:35:45); INSERT INTO t_job (id, device_id, device_link_id, link_type, link_spec, job_spec, remark, delete_flag, created_at, updated_at) VALUES (6, 6, 6, -1, {\ip\: \127.0.0.1\, \mode\: \1\, \port\: 1234, \slaveId\: 1, \pointDetail\: [{\key\: \1\, \offset\: 1, \numOfRegisters\: 2, \samplingInterval\: 1000}, {\key\: \2\, \offset\: 2, \numOfRegisters\: 1, \samplingInterval\: 2000}, {\key\: \3\, \offset\: 3, \numOfRegisters\: 3, \samplingInterval\: 1000}, {\key\: \4\, \offset\: 4, \numOfRegisters\: 5, \samplingInterval\: 3000}]}, {\ip\: \127.0.0.1\, \mode\: \1\, \port\: 1234, \slaveId\: 1, \pointDetail\: [{\key\: \1\, \offset\: 1, \numOfRegisters\: 2, \samplingInterval\: 1000}, {\key\: \2\, \offset\: 2, \numOfRegisters\: 1, \samplingInterval\: 2000}, {\key\: \3\, \offset\: 3, \numOfRegisters\: 3, \samplingInterval\: 1000}, {\key\: \4\, \offset\: 4, \numOfRegisters\: 5, \samplingInterval\: 3000}]}, NULL, 0, 2023-12-12 16:35:47, 2023-12-12 16:35:47);3.2 启动三个节点 这里会看到3个节点启动完成1号节点为Controller2 3号节点为Follower这是前面的逻辑不清楚的可以看前面的文章。 然后Controller会将任务按照任务分配策略分配给所有节点(包括自己)然后每个节点执行对应的任务。 数据库t_dttask_job表里有了对应每个节点的任务 各节点每秒执行一次任务 因为CollectManager类已经封装了doCollect 和 stopCollect的方法大家自行创建Controller调用方法就可以实现对某个任务的停止和启动了
http://www.pierceye.com/news/295044/

相关文章:

  • 江苏商城网站制作公司网站备案时间
  • 网站开发用到什么技术公司做影视网站侵权
  • 自己做网站大概多少钱唐山丰南建设局网站
  • 建设法律法规文本查询网站什么是建设型的网站
  • 如何设计一个购物网站如何免费网络营销推广
  • 网站制作服务好的商家做网站送的企业邮箱能用吗
  • 免费行情软件app网站排行企业内部网站如何建设
  • 沧州网络运营中心在哪里新的seo网站优化排名 网站
  • 米拓建站免费模板wordpress那个主题收录好
  • 网站后台中小型网站建设的基本流程
  • 一键做网站的软件爱互融网站开发合同
  • 平顶山市哪里有做网站的高端的扬中网站建设
  • 网站定制电话如何自己开公众号
  • app开发网站建设及开发专业济南网站建设价格
  • 网站建设新闻分享免费制作网站app
  • 海口网站建设高端wordpress 论坛那
  • 谁能帮我做网站百度推广登录平台怎么收费
  • 有关于网站建设的论文如何开发一个微信公众号
  • 深圳网站建制作网上写文章用什么软件
  • 网站模版自适应网站建设全包方案
  • 广州网站建设鞍山家电网站首页制作
  • 西安注册公司网站网站建设找a金手指
  • 浙江省住房和城乡建设厅网站打不开设计书籍频道开放说明
  • 阿里巴巴 网站建设遵义网警
  • 宁夏建设厅网站官网如何做DJ网站
  • 龙岩做网站公司哪家好erp系统与网站对接长沙
  • 做二手房需要用到哪些网站搜集房源找人做设计的网站
  • 建设银行河北分行招聘网站可以下载新闻视频的网站
  • 凡客官网旗舰店襄阳seo关键词优化公司
  • 区域门户网站源码健身网站建设