网站新闻对百度优化有用吗,网站的主机地址,wordpress中文商城,WordPress 百度 主动1.作用 
之前介绍的都是我们向网管NCE发起请求获取数据#xff0c;消息订阅则反过来#xff0c;是网管NCE系统给我们推送信息。其原理和MQ#xff0c;JMS这些差不多#xff0c;这里不过多累述。 
2.场景 
所支持订阅的场景有如下#xff0c;以告警通知为例#xff0c;当我…1.作用 
之前介绍的都是我们向网管NCE发起请求获取数据消息订阅则反过来是网管NCE系统给我们推送信息。其原理和MQJMS这些差不多这里不过多累述。 
2.场景 
所支持订阅的场景有如下以告警通知为例当我们订阅告警通知以后如果NCE网管有告警通知产生以后就会给订阅的人发送一个通知也就是实时告警推送。那么我们就可以接收到如下的通知。 
2024-06-06 00:09:30c.c.s.m.c.n.ConsumerNotice - 收到事件通知NT_ALARM告警通知通知参数{X.733::ProposedRepairActions, rcaiIndicatorfalse, probableCauseQualifier0-2, serviceAffectingSA_NON_SERVICE_AFFECTING, additionalTextHuawei/NCE;167772242, X.733::CorrelatedNotifications21, neTime20240605160140.0Z, X.733::EventTypesecurityAlarm, emsTime20240605160142.0Z, objectTypeOT_MANAGED_ELEMENT, objectTypeQualifier, probableCauseUNIDENTIFIED, perceivedSeverityPS_CRITICAL, nativeEMSNameHuawei/NCE;土默特, nativeProbableCauseNE_NOT_LOGIN, layerRate1, additionalInfo21, objectName21, notificationId11191929201784751, isClearabletrue, affectedTPList21}
2024-06-06 00:09:36c.c.s.m.c.n.ConsumerNotice - 收到事件通知NT_ALARM告警通知通知参数{X.733::ProposedRepairActions, rcaiIndicatorfalse, probableCauseQualifier0-2, serviceAffectingSA_NON_SERVICE_AFFECTING, additionalTextHuawei/NCE;167772242, X.733::CorrelatedNotifications21, neTime20240605160147.0Z, X.733::EventTypesecurityAlarm, emsTime20240605160149.0Z, objectTypeOT_MANAGED_ELEMENT, objectTypeQualifier, probableCauseUNIDENTIFIED, perceivedSeverityPS_CLEARED, nativeEMSNameHuawei/NCE;土默特, nativeProbableCauseNE_NOT_LOGIN, layerRate1, additionalInfo21, objectName21, notificationId11191929201784752, isClearabletrue, affectedTPList21}
2024-06-06 00:09:43c.c.s.m.c.n.ConsumerNotice - 收到事件通知NT_ALARM告警通知通知参数{X.733::ProposedRepairActions, rcaiIndicatorfalse, probableCauseQualifier0-2, serviceAffectingSA_NON_SERVICE_AFFECTING, additionalTextHuawei/NCE;167772242, X.733::CorrelatedNotifications21, neTime20240605160155.0Z, X.733::EventTypesecurityAlarm, emsTime20240605160156.0Z, objectTypeOT_MANAGED_ELEMENT, objectTypeQualifier, probableCauseUNIDENTIFIED, perceivedSeverityPS_CRITICAL, nativeEMSNameHuawei/NCE;土默特, nativeProbableCauseNE_NOT_LOGIN, layerRate1, additionalInfo21, objectName21, notificationId11191929201784753, isClearabletrue, affectedTPList21}
2024-06-06 00:09:50c.c.s.m.c.n.ConsumerNotice - 收到事件通知NT_ALARM告警通知通知参数{X.733::ProposedRepairActions, rcaiIndicatorfalse, probableCauseQualifier0-2, serviceAffectingSA_NON_SERVICE_AFFECTING, additionalTextHuawei/NCE;167772242, X.733::CorrelatedNotifications21, neTime20240605160202.0Z, X.733::EventTypesecurityAlarm, emsTime20240605160203.0Z, objectTypeOT_MANAGED_ELEMENT, objectTypeQualifier, probableCauseUNIDENTIFIED, perceivedSeverityPS_CLEARED, nativeEMSNameHuawei/NCE;土默特, nativeProbableCauseNE_NOT_LOGIN, layerRate1, additionalInfo21, objectName21, notificationId11191929201784755, isClearabletrue, affectedTPList21}
2024-06-06 00:10:01c.c.s.m.c.n.ConsumerNotice - 收到事件通知NT_ALARM告警通知通知参数{X.733::ProposedRepairActions, rcaiIndicatorfalse, probableCauseQualifier0-2, serviceAffectingSA_NON_SERVICE_AFFECTING, additionalTextHuawei/NCE;167772242, X.733::CorrelatedNotifications21, neTime20240605160213.0Z, X.733::EventTypesecurityAlarm, emsTime20240605160214.0Z, objectTypeOT_MANAGED_ELEMENT, objectTypeQualifier, probableCauseUNIDENTIFIED, perceivedSeverityPS_CRITICAL, nativeEMSNameHuawei/NCE;土默特, nativeProbableCauseNE_NOT_LOGIN, layerRate1, additionalInfo21, objectName21, notificationId11191929201784756, isClearabletrue, affectedTPList21}同理如果我们订阅了文件传输状态通知当存在文件传输完成的时候会收到如下通知通知信息中包含了文件传输完成后文件的存储地址。 
2024-06-06 10:15:26c.c.s.m.c.n.ConsumerNotice - 收到事件通知NT_FILE_TRANSFER_STATUS文件传输状态通知通知参数{notificationId11191929201786334, fileNamepm/sdh/0605-0606/3145740.txt, transferStatusFT_COMPLETED, percentComplete100, failureReason}
2024-06-06 10:15:39c.c.s.m.c.n.ConsumerNotice - 收到事件通知NT_FILE_TRANSFER_STATUS文件传输状态通知通知参数{notificationId11191929201786335, fileNamepm/sdh/0605-0606/3145734.txt, transferStatusFT_COMPLETED, percentComplete100, failureReason}
2024-06-06 10:15:42c.c.s.m.c.n.ConsumerNotice - 收到事件通知NT_FILE_TRANSFER_STATUS文件传输状态通知通知参数{notificationId11191929201786336, fileNamepm/sdh/0605-0606/3145739.txt, transferStatusFT_COMPLETED, percentComplete100, failureReason}通知类型说明NT_ALARM告警通知NT_ALARM_UPDATED告警更新通知NT_TCA性能越限告警通知NT_OBJECT_CREATION对象创建通知NT_OBJECT_DELETION对象删除通知NT_ATTRIBUTE_VALUE_CHANGE属性改变通知NT_STATE_CHANGE状态改变通知NT_ROUTE_CHANGE路由改变通知NT_PROTECTION_SWITCH保护倒换通知NT_FILE_TRANSFER_STATUS文件传输状态通知NT_EPROTECTION_SWITCH设备保护倒换通知事件NT_ASON_RESOURCE_CHANGE智能资源改变通知NT_PRBSTEST_STATUS伪随机码测试状态通知NT_WDMPROTECTION_SWITCH波分保护倒换通知NT_ATMPROTECTION_SWITCH ATM保护倒换通知NT_RPRPROTECTION_SWITCH RPR保护组倒换通知事件格式NT_IPPROTECTION_SWITCH Tunnel保护组倒换通知事件格式 
3.如何开订阅SpringBoot为例 
3.1登录NCE 
3.1.1CorbaLoginReq 
配置文件的登录参数如下 
huawei: nce: login: corba:host: 127.0.0.1port: 12001userName: 111111passWord: 111111配置文件参数注入Spring Bean 
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.boot.context.properties.ConfigurationProperties;import lombok.Data;Data
SpringBootConfiguration
ConfigurationProperties(prefix  huawei.nce.login.corba)
public class CorbaLoginReq {private String host;private String port;private String userName;private String passWord;
}3.1.2CorbaLoginRes 
登录返回参数 
import org.omg.DynamicAny.DynAnyFactory;import lombok.Data;
import mtnm.tmforum.org.emsSession.EmsSession_I;Data
public class CorbaLoginRes {private org.omg.CORBA.ORB orb;private org.omg.PortableServer.POA rootPOA ;private EmsSession_I emsSession;private DynAnyFactory dynAnyFactory;
}3.1.3TANmsSession_IImpl 
import mtnm.tmforum.org.nmsSession.NmsSession_IPOA;
import mtnm.tmforum.org.session.Session_I;
/*** NmsSession_IPOA for EMS(NCE) invoking. * author**/
public class TANmsSession_IImpl extends NmsSession_IPOA {public void eventLossCleared(String endTime) {log(TANmsSession_IImpl.eventLossCleared(String endTime) is invoked by EMS(NCE).);log(endTime:endTime);}public void eventLossOccurred(String startTime, String notificationId) {log(TANmsSession_IImpl.eventLossOccurred(String startTime, String notificationId) is invoked by EMS.);log(startTime:startTime, notificationId:notificationId);}public Session_I associatedSession() {log(TANmsSession_IImpl.associatedSession() is invoked by EMS(NCE).);return null;}public void endSession() {log(TANmsSession_IImpl.endSession() is invoked by EMS(NCE).);}public void ping() {log(TANmsSession_IImpl.ping() is invoked by EMS(NCE).);}private static void log(String str){System.out.println(str);}
}3.1.4BaseCorbaService 
public interface BaseCorbaService {/*** description:登录华为nce-corba* author:hutao* mail:hutao1epri.sgcc.com.cn* date:2024年3月1日 下午4:19:59*/CorbaLoginRes login();/*** description:清空登录* author:hutao* mail:hutao1epri.sgcc.com.cn* date:2024年6月7日 下午3:24:02*/void clearLogin();
}import java.util.Arrays;
import java.util.List;import org.omg.CosNaming.NameComponent;
import org.omg.DynamicAny.DynAnyFactory;
import org.omg.DynamicAny.DynAnyFactoryHelper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import com.collect.sdh.module.corba.entity.CorbaLoginReq;
import com.collect.sdh.module.corba.entity.CorbaLoginRes;
import com.collect.sdh.module.corba.entity.TANmsSession_IImpl;
import com.collect.sdh.module.corba.service.BaseCorbaService;import mtnm.tmforum.org.common.Common_IHolder;
import mtnm.tmforum.org.emsMgr.EMSMgr_I;
import mtnm.tmforum.org.emsMgr.EMSMgr_IHelper;
import mtnm.tmforum.org.emsSession.EmsSession_I;
import mtnm.tmforum.org.emsSession.EmsSession_IHolder;
import mtnm.tmforum.org.emsSession.EmsSession_IPackage.managerNames_THolder;
import mtnm.tmforum.org.emsSessionFactory.EmsSessionFactory_I;
import mtnm.tmforum.org.emsSessionFactory.EmsSessionFactory_IHelper;
import mtnm.tmforum.org.equipment.EquipmentInventoryMgr_I;
import mtnm.tmforum.org.equipment.EquipmentInventoryMgr_IHelper;
import mtnm.tmforum.org.equipment.EquipmentOrHolderIterator_IHolder;
import mtnm.tmforum.org.equipment.EquipmentOrHolderList_THolder;
import mtnm.tmforum.org.equipment.EquipmentOrHolder_T;
import mtnm.tmforum.org.equipment.ObjectAdditionalInfoList_THolder;
import mtnm.tmforum.org.equipment.ObjectAdditionalInfo_T;
import mtnm.tmforum.org.equipment.PhysicalLocationInfoList_THolder;
import mtnm.tmforum.org.equipment.PhysicalLocationInfo_T;
import mtnm.tmforum.org.globaldefs.NameAndStringValue_T;
import mtnm.tmforum.org.globaldefs.NamingAttributesIterator_IHolder;
import mtnm.tmforum.org.globaldefs.NamingAttributesList_THolder;
import mtnm.tmforum.org.globaldefs.ProcessingFailureException;
import mtnm.tmforum.org.managedElement.ManagedElementIterator_IHolder;
import mtnm.tmforum.org.managedElement.ManagedElementList_THolder;
import mtnm.tmforum.org.managedElement.ManagedElement_T;
import mtnm.tmforum.org.managedElement.ManagedElement_THolder;
import mtnm.tmforum.org.managedElementManager.ManagedElementMgr_I;
import mtnm.tmforum.org.managedElementManager.ManagedElementMgr_IHelper;
import mtnm.tmforum.org.multiLayerSubnetwork.MultiLayerSubnetworkMgr_I;
import mtnm.tmforum.org.multiLayerSubnetwork.MultiLayerSubnetworkMgr_IHelper;
import mtnm.tmforum.org.multiLayerSubnetwork.MultiLayerSubnetwork_T;
import mtnm.tmforum.org.multiLayerSubnetwork.SubnetworkIterator_IHolder;
import mtnm.tmforum.org.multiLayerSubnetwork.SubnetworkList_THolder;
import mtnm.tmforum.org.nmsSession.NmsSession_I;
import mtnm.tmforum.org.nmsSession.NmsSession_IPOA;
import mtnm.tmforum.org.subnetworkConnection.CCIterator_IHolder;
import mtnm.tmforum.org.subnetworkConnection.CrossConnectList_THolder;
import mtnm.tmforum.org.subnetworkConnection.CrossConnect_T;
import mtnm.tmforum.org.subnetworkConnection.Route_THolder;
import mtnm.tmforum.org.subnetworkConnection.SNCIterator_IHolder;
import mtnm.tmforum.org.subnetworkConnection.SubnetworkConnectionList_THolder;
import mtnm.tmforum.org.subnetworkConnection.SubnetworkConnection_T;
import mtnm.tmforum.org.subnetworkConnection.SubnetworkConnection_THolder;
import mtnm.tmforum.org.terminationPoint.TerminationPointIterator_IHolder;
import mtnm.tmforum.org.terminationPoint.TerminationPointList_THolder;
import mtnm.tmforum.org.terminationPoint.TerminationPoint_T;
import mtnm.tmforum.org.topologicalLink.TopologicalLinkIterator_IHolder;
import mtnm.tmforum.org.topologicalLink.TopologicalLinkList_THolder;
import mtnm.tmforum.org.topologicalLink.TopologicalLink_T;Service
public class BaseCorbaServiceImpl implements BaseCorbaService {Autowiredprivate CorbaLoginReq loginReq;private CorbaLoginRes login;/*** description:清空登录* author:hutao* mail:hutao1epri.sgcc.com.cn* date:2024年6月7日 下午3:24:02*/Overridepublic void clearLogin() {login  null;}/*** description:登录华为nce-corba* author:hutao* mail:hutao1epri.sgcc.com.cn* date:2024年3月1日 下午4:19:59*/Overridepublic CorbaLoginRes login() {if(login ! null) {/*本应该检测登录是否可用如果可用则返回登录信息不可用则重新登录(不知道是否可以使用emsSession.ping()来判断)但是没找到华为有这个接口因此如果出现不可抗力因素导致登录无效例如网络中断则通过com.collect.sdh.module.test.TestCorbaController.cleanLogin()清空登录*/	return login;}try {login  new CorbaLoginRes();String[] argv  new String[2];argv[0]  -ORBInitRef;argv[1]  NameServicecorbaloc::  loginReq.getHost()  :  loginReq.getPort()  /NameService;org.omg.CORBA.ORB orb  org.omg.CORBA.ORB.init(argv, null);org.omg.PortableServer.POA rootPOA  org.omg.PortableServer.POAHelper.narrow(orb.resolve_initial_references(RootPOA));rootPOA.the_POAManager().activate();DynAnyFactory dynAnyFactory  DynAnyFactoryHelper.narrow(orb.resolve_initial_references(DynAnyFactory));org.omg.CosNaming.NamingContextExt nc  org.omg.CosNaming.NamingContextExtHelper.narrow(orb.resolve_initial_references(NameService));org.omg.CosNaming.NameComponent[] name;name  new NameComponent[5];name[0]  new NameComponent(TMF_MTNM, Class);name[1]  new NameComponent(HUAWEI, Vendor);name[2]  new NameComponent(Huawei/NCE, EmsInstance);name[3]  new NameComponent(2.0, Version);name[4]  new NameComponent(Huawei/NCE, EmsSessionFactory_I);EmsSessionFactory_I emsSessionFactory  EmsSessionFactory_IHelper.narrow(nc.resolve(name));NmsSession_IPOA pNmsSessionServant  new TANmsSession_IImpl();NmsSession_I nmsSession  pNmsSessionServant._this(orb);EmsSession_IHolder emsSessionInterfaceHolder  new EmsSession_IHolder();emsSessionFactory.getEmsSession(loginReq.getUserName(), loginReq.getPassWord(), nmsSession, emsSessionInterfaceHolder);EmsSession_I emsSession  emsSessionInterfaceHolder.value;login.setDynAnyFactory(dynAnyFactory);login.setOrb(orb);login.setRootPOA(rootPOA);login.setEmsSession(emsSession);return login;} catch (Exception e) {e.printStackTrace();return null;}}
}3.2定制通知 
3.2.1ConsumerNotice 
需要实现接口org.omg.CosNotifyComm.StructuredPushConsumerPOA 
import java.util.HashMap;
import java.util.Map;import org.omg.CosEventComm.Disconnected;
import org.omg.CosNotification.EventType;
import org.omg.CosNotification.StructuredEvent;
import org.omg.CosNotifyComm.InvalidEventType;
import org.omg.CosNotifyComm.StructuredPushConsumerPOA;
import org.springframework.util.ObjectUtils;import com.collect.sdh.module.corba.entity.CorbaLoginRes;
import com.collect.sdh.utils.AnyUtil;import lombok.extern.log4j.Log4j2;/*** description:消费通知* author:hutao* mail:hutao1epri.sgcc.com.cn* date:2024年5月7日 上午10:57:26*/
Log4j2
public class ConsumerNotice extends StructuredPushConsumerPOA{private CorbaLoginRes loginRes;public ConsumerNotice(CorbaLoginRes loginRes) {super();this.loginRes  loginRes;}private static MapString, String noticeTypes  new HashMap();static {noticeTypes.put(NT_ALARM, 告警通知);noticeTypes.put(NT_ALARM_UPDATED, 告警更新通知);noticeTypes.put(NT_TCA, 性能越限告警通知);noticeTypes.put(NT_OBJECT_CREATION, 对象创建通知);noticeTypes.put(NT_OBJECT_DELETION, 对象删除通知);noticeTypes.put(NT_ATTRIBUTE_VALUE_CHANGE, 属性改变通知);noticeTypes.put(NT_STATE_CHANGE, 状态改变通知);noticeTypes.put(NT_ROUTE_CHANGE, 路由改变通知);noticeTypes.put(NT_PROTECTION_SWITCH, 保护倒换通知);noticeTypes.put(NT_FILE_TRANSFER_STATUS, 文件传输状态通知);noticeTypes.put(NT_EPROTECTION_SWITCH, 设备保护倒换通知事件);noticeTypes.put(NT_ASON_RESOURCE_CHANGE, 智能资源改变通知);noticeTypes.put(NT_PRBSTEST_STATUS, 伪随机码测试状态通知);noticeTypes.put(NT_WDMPROTECTION_SWITCH, 波分保护倒换通知);noticeTypes.put(NT_ATMPROTECTION_SWITCH, ATM保护倒换通知);noticeTypes.put(NT_RPRPROTECTION_SWITCH, RPR保护组倒换通知事件格式);noticeTypes.put(NT_IPPROTECTION_SWITCH, Tunnel保护组倒换通知事件格式);}Overridepublic void disconnect_structured_push_consumer() {log.info(Consumer disconnect_structured_push_consumer);}Overridepublic void push_structured_event(StructuredEvent event) throws Disconnected {String eventType  event.header.fixed_header.event_type.type_name;MapString, Object eventData  new HashMap(event.filterable_data.length);for (int i  0; i  event.filterable_data.length; i) {if (!ObjectUtils.isEmpty(event.filterable_data[i])) {eventData.put(event.filterable_data[i].name, AnyUtil.parseAny( event.filterable_data[i].value, loginRes.getDynAnyFactory()));}}log.info(收到事件通知{}{}通知参数{},eventType, noticeTypes.get(eventType), eventData);}Overridepublic void offer_change(EventType[] arg0, EventType[] arg1) throws InvalidEventType {}}3.2.2AnyUtil 
用于解析返回的信息。 
import org.omg.CORBA.Any;
import org.omg.CORBA.TCKind;
import org.omg.DynamicAny.DynAnyFactory;
import org.omg.DynamicAny.DynArray;
import org.omg.DynamicAny.DynEnum;
import org.omg.DynamicAny.DynSequence;
import org.omg.DynamicAny.DynStruct;
import org.omg.DynamicAny.DynUnion;/*** description:org.omg.DynamicAny格式化工具* author:hutao* mail:hutao1epri.sgcc.com.cn* date:2024年5月7日 上午11:33:17*/
public class AnyUtil {/*** description:格式化数据* author:hutao* mail:hutao1epri.sgcc.com.cn* date:2024年5月7日 上午11:34:17*/public static String parseAny(Any any, DynAnyFactory factory){if( nullany ){return null;}StringBuilder result  new StringBuilder();try {switch (any.type().kind().value()) {case TCKind._tk_char:result.append(any.extract_char());break;case TCKind._tk_null:break;case TCKind._tk_boolean:result.append(any.extract_boolean());break;case TCKind._tk_short:result.append(any.extract_short());break;case TCKind._tk_long:result.append(any.extract_long());break;case TCKind._tk_double:result.append(any.extract_double());break;case TCKind._tk_float:result.append(any.extract_float());break;case TCKind._tk_octet:result.append(any.extract_octet());break;case TCKind._tk_ulong:result.append(any.extract_ulong());break;case TCKind._tk_string:result.append(any.extract_string());break;case TCKind._tk_enum:{DynEnum dynEnum  (DynEnum) factory.create_dyn_any(any);result.append(dynEnum.get_as_string());break;}case TCKind._tk_any:{anyfactory.create_dyn_any(any).get_any();result.append(any);break;}case TCKind._tk_objref:{result.append(any.extract_Object());break;}case TCKind._tk_struct:case TCKind._tk_except:{DynStruct dynstruct  (DynStruct) factory.create_dyn_any(any);org.omg.DynamicAny.NameValuePair[] members  dynstruct.get_members();result.append({);for (int i  0; i  members.length; i) {if(i0){result.append( );}result.append(members[i].id).append( ).append(parseAny(members[i].value, factory));}result.append(});break;}case TCKind._tk_union:DynUnion dynunion  (DynUnion) factory.create_dyn_any(any);result.append(dynunion.member_name()).append( );result.append(parseAny(dynunion.member().to_any(), factory));break;case TCKind._tk_sequence:DynSequence dynseq  (DynSequence) factory.create_dyn_any(any);Any[] contents  dynseq.get_elements();result.append({);for (int i  0; i  contents.length; i){result.append(parseAny(contents[i], factory));}result.append(});break;case TCKind._tk_array:DynArray dynarray  (DynArray) factory.create_dyn_any(any);Any[] arrayContents  dynarray.get_elements();result.append({);for (int i  0; i  arrayContents.length; i){result.append(parseAny(arrayContents[i], factory)).append();}result.append(});break;default:result.append(any.type().kind().value());}} catch (Exception ex) {ex.printStackTrace();}return new String(result.toString().getBytes(StandardCharsets.ISO_8859_1));}
}3.3订阅通知 
SubscribeNotice 实现 Runnable即订阅的时候另起一个线程来订阅。该线程负责订阅。 
3.3.1SubscribeNotice 
import org.omg.CORBA.IntHolder;
import org.omg.CORBA.Object;
import org.omg.CosNotifyChannelAdmin.ClientType;
import org.omg.CosNotifyChannelAdmin.ConsumerAdmin;
import org.omg.CosNotifyChannelAdmin.EventChannelHolder;
import org.omg.CosNotifyChannelAdmin.ProxySupplier;
import org.omg.CosNotifyChannelAdmin.StructuredProxyPushSupplier;
import org.omg.CosNotifyChannelAdmin.StructuredProxyPushSupplierHelper;
import org.omg.CosNotifyComm.StructuredPushConsumerHelper;
import org.springframework.util.ObjectUtils;import com.collect.sdh.module.corba.entity.CorbaLoginRes;
import com.collect.sdh.utils.JsonUtils;import lombok.extern.log4j.Log4j2;/*** description:订阅消费通知* author:hutao* mail:hutao1epri.sgcc.com.cn* date:2024年5月7日 上午9:33:18*/
Log4j2
public class SubscribeNotice implements Runnable{/*** 登录corba成功后的参数*/private CorbaLoginRes loginRes;/*** 记录订阅通知的通道ID的存储文件地址*/private String poxyIdPath;public SubscribeNotice(CorbaLoginRes loginRes, String poxyIdPath) {super();this.loginRes  loginRes;this.poxyIdPath  poxyIdPath;}Overridepublic void run() {try {//获取通道IntHolder poxyId  new IntHolder();poxyId.value  getPoxyId(poxyIdPath);EventChannelHolder eventChannel  new EventChannelHolder();loginRes.getEmsSession().getEventChannel(eventChannel);//ConsumerNotice extends StructuredPushConsumerPOA 为消费者ConsumerNotice consumerNotice  new ConsumerNotice(loginRes);ConsumerAdmin defaultConsumerAdmin  eventChannel.value.default_consumer_admin();//连接通道如果发现通道已经打开则先关闭之前的通道已经打开的通道即使不可以北向接口并未释放该接口的资源但是会限制连接通道数量  3try {if (poxyId.value  0){log.info(释放旧的消费通道{}, poxyId.value);ProxySupplier oldSupplier  defaultConsumerAdmin.get_proxy_supplier(poxyId.value);assert (oldSupplier ! null);StructuredProxyPushSupplier myOldPoxy  StructuredProxyPushSupplierHelper.narrow(oldSupplier);myOldPoxy.disconnect_structured_push_supplier();}}catch (Exception e) {e.printStackTrace();}ProxySupplier tmpSupplier  defaultConsumerAdmin.obtain_notification_push_supplier(ClientType.STRUCTURED_EVENT, poxyId);StructuredProxyPushSupplier proxyPushSupplier  StructuredProxyPushSupplierHelper.narrow(tmpSupplier);Object servant  loginRes.getRootPOA().servant_to_reference(consumerNotice);proxyPushSupplier.connect_structured_push_consumer(StructuredPushConsumerHelper.narrow(servant));savePoxyId(poxyIdPath, poxyId.value);log.info(保存此次的消费通道{}, poxyId.value);loginRes.getOrb().run();} catch (Exception e) {e.printStackTrace();}}/*** description:获取已经连接的消费通道ID* author:hutao* mail:hutao1epri.sgcc.com.cn* date:2024年5月7日 上午10:40:57*/public int getPoxyId(String path) {int poxyId  -1;//备注这里没有提供JsonUtils这里你可以改为存储到数据库或者其他地方这里我是将记录的poxyId 存储到文件中因为我采集的程序不需要连接数据库String str  JsonUtils.readStringFromSystemPath(path);if(!ObjectUtils.isEmpty(str)) {poxyId  Integer.parseInt(str);}return poxyId;}/*** description:保存已经连接的消费通道ID* author:hutao* mail:hutao1epri.sgcc.com.cn* date:2024年5月7日 上午10:41:33*/public void savePoxyId(String path, int poxyId) {//备注这里没有提供JsonUtils这里你可以改为存储到数据库或者其他地方这里我是将记录的poxyId 存储到文件中因为我采集的程序不需要连接数据库JsonUtils.writeStringToSystemPath(path, String.valueOf(poxyId));}
}3.3.2JsonUtils 
为了保证代码完整性如果你完全抄上面的代码这里提供了代码需要的两个文件操作示例 
public static String readStringFromSystemPath(String path) {String data  ;try {InputStream inputStream  new FileInputStream(path);byte[] bdata  FileCopyUtils.copyToByteArray(inputStream);data  new String(bdata, StandardCharsets.UTF_8);} catch (FileNotFoundException e) {log.info(文件不存在文件地址{}, path);} catch (Exception e) {log.info(读取文件失败文件地址{}失败原因{}, path,e.getMessage());} return data;}public static void writeStringToSystemPath(String filePath, String str) {Writer write  null;try {File file  new File(filePath);if(file.exists()) {file.delete();}if (!file.getParentFile().exists()) {file.getParentFile().mkdirs();}if(file.createNewFile()) {write  new OutputStreamWriter(new FileOutputStream(file), StandardCharsets.UTF_8);write.write(str);write.flush();}} catch (Exception e) {e.printStackTrace();} finally {if(write !null ) {try {write.close();} catch (IOException e) {e.printStackTrace();}}}}3.4启动订阅 
这里我们使用SpringBoot启动的时候启动订阅即实现ApplicationRunner然后使用线程池的单线程来启动上面我们编写的线程。 
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;import com.collect.sdh.module.corba.entity.CorbaLoginRes;
import com.collect.sdh.module.corba.service.BaseCorbaService;/*** description:启动订阅corba的消费* author:hutao* mail:hutao1epri.sgcc.com.cn* date:2024年5月7日 下午4:18:16*/
Component
public class SubscribeRunner implements ApplicationRunner  {Value(value  ${file-save-path})private String poxyIdPath;Autowiredprivate BaseCorbaService baseCorbaService;Overridepublic void run(ApplicationArguments args) throws Exception {poxyIdPath  poxyIdPath  poxyId;CorbaLoginRes login  baseCorbaService.login();ExecutorService executor  Executors.newSingleThreadExecutor();executor.submit(new SubscribeNotice(login, poxyIdPath));}
}4.效果展示