广西城市建设学校手机官方网站,国土局网站建设情况,宝盈集团直营网站怎么做,西安网站开发公司定制一、架构和技术介绍 1、简介 ActiveMQ 是Apache出品#xff0c;最流行的#xff0c;能力强劲的开源消息总线。完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现 2、activemq的特性 1. 多种语言和协议编写客户端。语言: Java, C, C, C#, Ruby, Perl, Python, PHP。应用协议: … 一、架构和技术介绍 1、简介 ActiveMQ 是Apache出品最流行的能力强劲的开源消息总线。完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现 2、activemq的特性 1. 多种语言和协议编写客户端。语言: Java, C, C, C#, Ruby, Perl, Python, PHP。应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP 2. 完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务) 3. 对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性 4. 通过了常见J2EE服务器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的测试,其中通过JCA 1.5 resourceadaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE1.4商业服务器上 5. 支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA 6. 支持通过JDBC和journal提供高速的消息持久化 7. 从设计上保证了高性能的集群,客户端-服务器,点对点 8. 支持Ajax 9. 支持与Axis的整合 10. 可以很容易得调用内嵌JMS provider,进行测试 3、下载和安装ActiveMQ 1、下载 ActiveMQ的最新版本是5.10.0但由于我们内网下载存在问题所以目前通过内网只能下载到5.9.0下载地址http://activemq.apache.org/activemq-590-release.html。 2、安装 如果是在windows系统中运行可以直接解压apache-activemq-5.9.0-bin.zip并运行bin目录下的activemq.bat文件此时使用的是默认的服务端口61616和默认的console端口8161。 如果是在linux或unix下运行在bin目录下执行命令./activemq setup 3、修改ActiveMQ的服务端口和console端口 A、修改服务端口打开conf/activemq.xml文件修改以下红色字体部分 transportConnectors transportConnector nameopenwire uritcp://10.42.220.72:61618discoveryUrimulticast://default/ /transportConnectors B、修改console的地址和端口:打开conf/jetty.xml文件修改以下红色字体部分 bean idjettyPortclassorg.apache.activemq.web.WebConsolePortinit-methodstart property nameport value8162/ /bean 4、通过客户端代码试用ActiveMQ 需要提前将activemq解压包中的lib目录下的相关包引入到工程中再进行如下编码 1、发送端的代码 importjavax.jms.Connection; importjavax.jms.ConnectionFactory; importjavax.jms.DeliveryMode; importjavax.jms.Destination; importjavax.jms.MessageProducer; importjavax.jms.Session; importjavax.jms.TextMessage; importorg.apache.activemq.ActiveMQConnection; importorg.apache.activemq.ActiveMQConnectionFactory; publicclass Sender { privatestaticfinalintSEND_NUMBER 5; publicstaticvoid main(String[] args) { // ConnectionFactory连接工厂JMS用它创建连接 ConnectionFactory connectionFactory; // ConnectionJMS客户端到JMS Provider的连接 Connection connection null; // Session一个发送或接收消息的线程 Session session; // Destination消息的目的地;消息发送给谁. Destination destination; // MessageProducer消息发送者 MessageProducer producer; // TextMessage message; //构造ConnectionFactory实例对象此处采用ActiveMq的实现jar connectionFactory new ActiveMQConnectionFactory( ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, failover:(tcp://10.42.220.72:61617,tcp://10.42.220.72:61618)); try { //构造从工厂得到连接对象 connection connectionFactory.createConnection(); //启动 connection.start(); //获取操作连接 session connection.createSession(true, Session.AUTO_ACKNOWLEDGE); //获取session destination session.createQueue(FirstQueue); //得到消息生成者【发送者】 producer session.createProducer(destination); //设置不持久化此处学习实际根据项目决定 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); //构造消息此处写死项目就是参数或者方法获取 sendMessage(session, producer); session.commit(); } catch (Exception e) { e.printStackTrace(); } finally { try { if (null ! connection) connection.close(); } catch (Throwable ignore) { } } } publicstaticvoid sendMessage(Session session,MessageProducer producer) throws Exception { for (int i 1; i SEND_NUMBER; i) { TextMessage message session .createTextMessage(ActiveMq发送的消息 i); //发送消息到目的地方 System.out.println(发送消息 ActiveMq 发送的消息 i); producer.send(message); } } } 2、接收端代码 importjavax.jms.Connection; importjavax.jms.ConnectionFactory; importjavax.jms.Destination; importjavax.jms.MessageConsumer; importjavax.jms.Session; importjavax.jms.TextMessage; importorg.apache.activemq.ActiveMQConnection; importorg.apache.activemq.ActiveMQConnectionFactory; publicclass Receive { publicstaticvoid main(String[] args) { // ConnectionFactory连接工厂JMS用它创建连接 ConnectionFactory connectionFactory; // ConnectionJMS客户端到JMS Provider的连接 Connection connection null; // Session一个发送或接收消息的线程 Session session; // Destination消息的目的地;消息发送给谁. Destination destination; //消费者消息接收者 MessageConsumer consumer; connectionFactory new ActiveMQConnectionFactory( ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, failover:(tcp://10.42.220.72:61617,tcp://10.42.220.72:61618)); try { //构造从工厂得到连接对象 connection connectionFactory.createConnection(); //启动 connection.start(); //获取操作连接 session connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //获取session destination session.createQueue(FirstQueue); consumer session.createConsumer(destination); while (true) { //设置接收者接收消息的时间为了便于测试这里谁定为100s TextMessage message (TextMessage) consumer.receive(100000); if (null ! message) { System.out.println(收到消息 message.getText()); } else { break; } } } catch (Exception e) { e.printStackTrace(); } finally { try { if (null ! connection) connection.close(); } catch (Throwable ignore) { } } } } 3、通过监控查看消息堆栈的记录 登陆http://localhost:8162/admin/queues.jsp默认的用户名和密码admin/admin 二、ActiveMQ的多种部署方式 单点的ActiveMQ作为企业应用无法满足高可用和集群的需求所以ActiveMQ提供了master-slave、broker cluster等多种部署方式但通过分析多种部署方式之后我认为需要将两种部署方式相结合才能满足我们公司分布式和高可用的需求所以后面就重点将解如何将两种部署方式相结合。 1、Master-Slave部署方式 1shared filesystem Master-Slave部署方式 主要是通过共享存储目录来实现master和slave的热备所有的ActiveMQ应用都在不断地获取共享目录的控制权哪个应用抢到了控制权它就成为master。 多个共享存储目录的应用谁先启动谁就可以最早取得共享目录的控制权成为master其他的应用就只能作为slave。 2shared database Master-Slave方式 与shared filesystem方式类似只是共享的存储介质由文件系统改成了数据库而已。 3Replicated LevelDB Store方式 这种主备方式是ActiveMQ5.9以后才新增的特性使用ZooKeeper协调选择一个node作为master。被选择的master broker node开启并接受客户端连接。 其他node转入slave模式连接master并同步他们的存储状态。slave不接受客户端连接。所有的存储操作都将被复制到连接至Master的slaves。 如果master死了得到了最新更新的slave被允许成为master。fialed node能够重新加入到网络中并连接master进入slave mode。所有需要同步的disk的消息操作都将等待存储状态被复制到其他法定节点的操作完成才能完成。所以如果你配置了replicas3那么法定大小是(3/2)12. Master将会存储并更新然后等待 (2-1)1个slave存储和更新完成才汇报success。至于为什么是2-1熟悉Zookeeper的应该知道有一个node要作为观擦者存在。 单一个新的master被选中你需要至少保障一个法定node在线以能够找到拥有最新状态的node。这个node将会成为新的master。因此推荐运行至少3个replica nodes以防止一个node失败了服务中断。 2、Broker-Cluster部署方式 前面的Master-Slave的方式虽然能解决多服务热备的高可用问题但无法解决负载均衡和分布式的问题。Broker-Cluster的部署方式就可以解决负载均衡的问题。 Broker-Cluster部署方式中各个broker通过网络互相连接并共享queue。当broker-A上面指定的queue-A中接收到一个message处于pending状态而此时没有consumer连接broker-A时。如果cluster中的broker-B上面由一个consumer在消费queue-A的消息那么broker-B会先通过内部网络获取到broker-A上面的message并通知自己的consumer来消费。 1static Broker-Cluster部署 在activemq.xml文件中静态指定Broker需要建立桥连接的其他Broker 1、 首先在Broker-A节点中添加networkConnector节点 networkConnectors networkConnector uristatic:(tcp:// 0.0.0.0:61617)duplexfalse/ /networkConnectors 2、 修改Broker-A节点中的服务提供端口为61616 transportConnectors transportConnectornameopenwireuritcp://0.0.0.0:61616?maximumConnections1000amp;wireFormat.maxFrameSize104857600/ /transportConnectors 3、 在Broker-B节点中添加networkConnector节点 networkConnectors networkConnector uristatic:(tcp:// 0.0.0.0:61616)duplexfalse/ /networkConnectors 4、 修改Broker-A节点中的服务提供端口为61617 transportConnectors transportConnectornameopenwireuritcp://0.0.0.0:61617?maximumConnections1000amp;wireFormat.maxFrameSize104857600/ /transportConnectors 5、分别启动Broker-A和Broker-B。 2Dynamic Broker-Cluster部署 在activemq.xml文件中不直接指定Broker需要建立桥连接的其他Broker由activemq在启动后动态查找 1、 首先在Broker-A节点中添加networkConnector节点 networkConnectors networkConnectorurimulticast://default dynamicOnlytrue networkTTL3 prefetchSize1 decreaseNetworkConsumerPrioritytrue / /networkConnectors 2、修改Broker-A节点中的服务提供端口为61616 transportConnectors transportConnectornameopenwireuritcp://0.0.0.0:61616? discoveryUrimulticast://default/ /transportConnectors 3、在Broker-B节点中添加networkConnector节点 networkConnectors networkConnectorurimulticast://default dynamicOnlytrue networkTTL3 prefetchSize1 decreaseNetworkConsumerPrioritytrue / /networkConnectors 4、修改Broker-B节点中的服务提供端口为61617 transportConnectors transportConnectornameopenwireuritcp://0.0.0.0:61617 discoveryUrimulticast://default/ /transportConnectors 5、启动Broker-A和Broker-B 2、Master-Slave与Broker-Cluster相结合的部署方式 可以看到Master-Slave的部署方式虽然解决了高可用的问题但不支持负载均衡Broker-Cluster解决了负载均衡但当其中一个Broker突然宕掉的话那么存在于该Broker上处于Pending状态的message将会丢失无法达到高可用的目的。 由于目前ActiveMQ官网上并没有一个明确的将两种部署方式相结合的部署方案所以我尝试者把两者结合起来部署 1、部署的配置修改 这里以Broker-A Broker-B建立clusterBroker-C作为Broker-B的slave为例 1首先在Broker-A节点中添加networkConnector节点 networkConnectors networkConnector urimasterslave:(tcp://0.0.0.0:61617,tcp:// 0.0.0.0:61618) duplexfalse/ /networkConnectors 2修改Broker-A节点中的服务提供端口为61616 transportConnectors transportConnectornameopenwireuritcp://0.0.0.0:61616?maximumConnections1000amp;wireFormat.maxFrameSize104857600/ /transportConnectors 3在Broker-B节点中添加networkConnector节点 networkConnectors networkConnector uristatic:(tcp:// 0.0.0.0:61616)duplexfalse/ /networkConnectors 4修改Broker-B节点中的服务提供端口为61617 transportConnectors transportConnectornameopenwireuritcp://0.0.0.0:61617?maximumConnections1000amp;wireFormat.maxFrameSize104857600/ /transportConnectors 5修改Broker-B节点中的持久化方式 persistenceAdapter kahaDB directory/localhost/kahadb/ /persistenceAdapter 6在Broker-C节点中添加networkConnector节点 networkConnectors networkConnector uristatic:(tcp:// 0.0.0.0:61616)duplexfalse/ /networkConnectors 7修改Broker-C节点中的服务提供端口为61618 transportConnectors transportConnectornameopenwireuritcp://0.0.0.0:61618?maximumConnections1000amp;wireFormat.maxFrameSize104857600/ /transportConnectors 8修改Broker-B节点中的持久化方式 persistenceAdapter kahaDB directory/localhost/kahadb/ /persistenceAdapter 9分别启动broker-A、broker-B、broker-C因为是broker-B先启动所以“/localhost/kahadb”目录被lock住broker-C将一直处于挂起状态当人为停掉broker-B之后broker-C将获取目录“/localhost/kahadb”的控制权重新与broker-A组成cluster提供服务。