• Spring和ActiveMq消息队列整合详解
    date_range 2019-05-20 19:23:10
    folder Spring专题
    person 陈付菲 公开
    thumb_up 点赞2
    remove_red_eye 围观851

    Spring和ActiveMq消息队列整合详解

    官方主页

    Spring

    ActiveMq

    一、概述

    消息中间件利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下扩展进程间的通信。对于消息中间件,常见的角色大致也就有Producer(生产者)、Consumer(消费者)

    常见的消息中间件产品:

    (1)ActiveMQ

    ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现。这里介绍的是ActiveMQ的使用。

    (2)RabbitMQ

    AMQP协议的领导实现,支持多种场景。淘宝的MySQL集群内部有使用它进行通讯,OpenStack开源云平台的通信组件,最先在金融行业得到运用。

    (3)ZeroMQ

    史上最快的消息队列系统

    (4)Kafka

    Apache下的一个子项目 。特点:高吞吐,在一台普通的服务器上既可以达到10W/s的吞吐速率;完全的分布式系统。适合处理海量数据。

    Jms

    JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。

    所以,ActiveMq是Jms标准的一个实现者。

    本篇通过对Mq消息消费、解析并组装发送为例讲述ActiveMq的使用过程。

    Git地址: Gitee

    品茗IT:提供在线快速构建Spring项目工具一站式Springboot项目生成

    如果大家正在寻找一个java的学习环境,或者在开发中遇到困难,可以 加入我们的java学习圈,点击即可加入 ,共同学习,节约学习时间,减少很多在学习中遇到的难题。

    二、环境配置

    2.1 ActiveMq的安装部署

    因为消息队列是对生产者消息的存储和处理,并转发给消费者,所以消息队列是独立于生产者和消费者的一个服务。

    ActiveMq可以在ActiveMq官网 下载并安装。

    选择需要的版本,下载完成后,解压文件(window下用winrar/其他解压工具,linux下tar命令)并进入文件夹的bin目录下:

    在这里插入图片描述

    如图所示:

    windows下可以在powershell下用:

    ./activemq start 或者./activemq.bat start

    在cmd下只能用:

    activemq.bat start

    linux下可能要更改activemq文件的执行权限,然后:

    ./activemq start

    启动后浏览器输入 http://127.0.0.1:8161/ 如图所示:

    在这里插入图片描述

    点击 Manage ActiveMQ broker 跳转到 http://127.0.0.1:8161/admin/; 可以在里面查看消息队列等信息及统计。

    2.2 项目建立

    本项目将消费者和生产者整合在一起,通过先消费掉mq中的json数据,通过数据中的标识找到对应的解析处理器,解析完成后组装并生产消息到mq的另一个队列中。

    2.2.1 maven依赖

    <?xml version="1.0"?>
    <project
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
        xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
        <modelVersion>4.0.0</modelVersion>
        <parent>
            <groupId>cn.pomit</groupId>
            <artifactId>SpringWork</artifactId>
            <version>0.0.1-SNAPSHOT</version>
        </parent>
        <artifactId>Activemq</artifactId>
        <packaging>jar</packaging>
        <name>Activemq</name>
        <url>http://maven.apache.org</url>
        <dependencies>
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-jms</artifactId>
            </dependency>
            <dependency>
                <groupId>org.apache.activemq</groupId>
                <artifactId>activemq-all</artifactId>
                <version>5.10.0</version>
            </dependency>
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
            </dependency>
        </dependencies>
    </project>

    父模块可以在https://www.pomit.cn/spring/SpringWork/pom.xml获取。

    2.2.2 spring-activemq.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop"
        xmlns:tx="http://www.springframework.org/schema/tx" xmlns:context="http://www.springframework.org/schema/context"
        xmlns:p="http://www.springframework.org/schema/p" xmlns:cache="http://www.springframework.org/schema/cache"
        xmlns:jms="http://www.springframework.org/schema/jms" xmlns:jaxws="http://cxf.apache.org/jaxws"
        xsi:schemaLocation="
                        http://www.springframework.org/schema/beans
                        http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
                        http://www.springframework.org/schema/tx 
                        http://www.springframework.org/schema/tx/spring-tx-4.0.xsd
                        http://www.springframework.org/schema/aop 
                        http://www.springframework.org/schema/aop/spring-aop-4.0.xsd
                        http://www.springframework.org/schema/context      
                        http://www.springframework.org/schema/context/spring-context-4.0.xsd
                        http://www.springframework.org/schema/cache 
                        http://www.springframework.org/schema/cache/spring-cache-4.0.xsd
                        http://www.springframework.org/schema/jms 
                        http://www.springframework.org/schema/jms/spring-jms-4.0.xsd   
                        http://cxf.apache.org/jaxws http://cxf.apache.org/schemas/jaxws.xsd ">
    
        <context:annotation-config />
        <context:component-scan base-package="cn.pomit.springwork">
        </context:component-scan>
    
        <bean id="annotationPropertyConfigurerJms"
            class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
            <property name="order" value="1" />
            <property name="ignoreUnresolvablePlaceholders" value="true" />
            <property name="locations">
                <list>
                    <value>classpath:jms.properties</value>
                </list>
            </property>
        </bean>
    
        <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
            <property name="brokerURL" value="${brokerURL}" />
        </bean>
        <bean id="connectionFactory"
            class="org.springframework.jms.connection.SingleConnectionFactory">
            <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
            <property name="targetConnectionFactory" ref="targetConnectionFactory" />
        </bean>
    
        <bean id="defaultQueueDestination" class="org.apache.activemq.command.ActiveMQQueue">
            <constructor-arg value="${destQueueName}" />
        </bean>
        <bean id="listenQueueDestination" class="org.apache.activemq.command.ActiveMQQueue">
            <constructor-arg value="${listenQueueName}" />
        </bean>
    
        <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
            <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
            <property name="connectionFactory" ref="connectionFactory" />
            <property name="defaultDestination" ref="defaultQueueDestination" />
        </bean>
    
        <bean id="mqMessageListener" class="cn.pomit.springwork.activemq.handler.MqMessageConsumer">
            <property name="handler" ref="jsonParseHander" />
        </bean>
    
        <bean id="jsonParse1001" class="cn.pomit.springwork.activemq.parser.JsonParse1001">
        </bean>
    
        <bean id="jsonParseHander" class="cn.pomit.springwork.activemq.handler.bussiness.JsonParseHander">
            <property name="nextHandler" ref="bussiNessHander" />
            <property name="parsers">
                <map>
                    <entry key="1001" value-ref="jsonParse1001" />
                </map>
            </property>
        </bean>
    
        <bean id="bussiNessHander" class="cn.pomit.springwork.activemq.handler.bussiness.BussiNessHander">
            <property name="nextHandler" ref="mqMessageProducer" />
        </bean>
    
        <bean id="mqMessageProducer" class="cn.pomit.springwork.activemq.handler.MqMessageProducer">
            <property name="jmsTemplate" ref="jmsTemplate" />
        </bean>
        <!-- 消息监听容器 -->
        <bean id="jmsContainer"
            class="org.springframework.jms.listener.DefaultMessageListenerContainer">
            <property name="concurrency" value="5-10"></property>
            <property name="connectionFactory" ref="connectionFactory" />
            <property name="destination" ref="listenQueueDestination" />
            <property name="messageListener" ref="mqMessageListener" />
            <property name="sessionTransacted" value="true"/>  
        </bean>
    </beans>

    这里面的配置可以分为连接、队列、发送、监听、容器、业务bean这几类:

    • 连接:targetConnectionFactory和connectionFactory定义了连接工厂,connectionFactory只是spring对activemq工厂的封装。

    • 队列:defaultQueueDestination和listenQueueDestination定义了一个发送目的队列和一个监听消息队列,如果只做发送或者只做监听,定义一个即可。

    • 发送:jmsTemplate是spring对消息队列发送方式的封装,类似于JdbcTemplate、RedisTemplate、RestTemplate等封装类。mqMessageProducer是项目自己封装了调用jmsTemplate发送消息的一个bean。

    • 监听:mqMessageListener是项目封装的监听消息的bean。

    • 容器:jmsContainer是将消息队列和监听bean整合起来,这样就保证用的时候能找到对应的bean。

    • 业务:jsonParse1001、jsonParseHander和bussiNessHander都是模拟业务逻辑的,jsonParse1001、jsonParseHander是定义对json数据的处理,bussiNessHander是模拟对业务逻辑的处理。这里使用责任链模式。

    jms.properties配置文件:

    brokerURL=tcp://localhost:61616
    destQueueName=destQueue
    listenQueueName=listenQueue

    2.2.3 监听器(消费者)

    前面我们定义了mqMessageListener监听器,其实就是消费者,用以接收监听队列中的消息。

    MqMessageConsumer :

    package cn.pomit.springwork.activemq.handler;
    
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageListener;
    import javax.jms.TextMessage;
    
    import cn.pomit.springwork.activemq.handler.bussiness.Handler;
    import cn.pomit.springwork.activemq.model.JmsMessage;
    
    public class MqMessageConsumer implements MessageListener{
        private Handler handler;
        public void onMessage(Message message) {
            TextMessage textMsg = (TextMessage) message;   
            System.out.println("接收到一个纯文本消息。");   
            try {   
                System.out.println("消息内容是:" + textMsg.getText());   
            } catch (JMSException e) {   
                e.printStackTrace();   
            }   
            JmsMessage<String> jm = new JmsMessage<String>();
            try {
                jm.setBody(textMsg.getText());
            } catch (JMSException e) {
                e.printStackTrace();
            }
            handler.hander(jm);
    
        }
        public Handler getHandler() {
            return handler;
        }
        public void setHandler(Handler handler) {
            this.handler = handler;
        }
    
    }
    

    2.2.4 发送方(生产者)

    前面定以的bean中,mqMessageProducer是负责发送消息的,即生产消息的,发送消息放在发送的目的消息队列中,队列是如何确定的呢?在我们定义的容器bean中。

    MqMessageProducer:

    package cn.pomit.springwork.activemq.handler;
    
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.springframework.jms.core.JmsTemplate;
    import org.springframework.jms.core.MessageCreator;
    
    import com.alibaba.fastjson.JSONObject;
    
    import cn.pomit.springwork.activemq.handler.bussiness.Handler;
    import cn.pomit.springwork.activemq.model.JmsMessage;
    
    public class MqMessageProducer implements Handler {
        private JmsTemplate jmsTemplate;
        public static String RETCODESUCCESS = "200";
    
        public void hander(JmsMessage msg) {
            JSONObject jSONObject = new JSONObject();
            jSONObject.put("msgType", msg.getMsgType());
            jSONObject.put("orderNo", msg.getOrderNo());
            jSONObject.put("retCode", RETCODESUCCESS);
    
            System.out.println("准备发送jms到[" + jmsTemplate.getDefaultDestination() + "]...");
    
            jmsTemplate.send(new MessageCreator() {
                public Message createMessage(Session session) throws JMSException {
                    TextMessage textmsg = session.createTextMessage();
                    textmsg.setText(jSONObject.toString());
                    return textmsg;
                }
            });
    
        }
    
        public JmsTemplate getJmsTemplate() {
            return jmsTemplate;
        }
    
        public void setJmsTemplate(JmsTemplate jmsTemplate) {
            this.jmsTemplate = jmsTemplate;
        }
    
    }
    

    2.2.5 业务逻辑

    我们让业务逻辑的处理器都实现Handler接口:

    Handler:

    package cn.pomit.springwork.activemq.handler.bussiness;
    
    import cn.pomit.springwork.activemq.model.JmsMessage;
    
    public interface Handler {
        public void hander(JmsMessage msg);
    }
    

    监听器首先调用的是JsonParseHander,对Json数据进行解析。

    JsonParseHander:

    package cn.pomit.springwork.activemq.handler.bussiness;
    
    import java.util.Map;
    
    import com.alibaba.fastjson.JSONObject;
    
    import cn.pomit.springwork.activemq.model.JmsMessage;
    import cn.pomit.springwork.activemq.parser.JsonParser;
    
    public class JsonParseHander implements Handler{
        private Handler nextHandler;
        private Map<String,JsonParser> parsers;
    
        @Override
        public void hander(JmsMessage msg) {
            String body = (String) msg.getBody();
            JSONObject json = JSONObject.parseObject(body);
            String msgType = json.getString("msgType");
            JsonParser jp = parsers.get(msgType);
            System.out.println("消息类型为:" + msgType + "处理类:" + jp.getClass().getName());   
    
            JmsMessage jm = jp.parse(body);
            System.out.println("消息为:" + jm.toString() );  
            if(nextHandler!=null){
                System.out.println("nextHandler为:" + nextHandler.getClass().getName());  
                nextHandler.hander(jm);
            }else{
                System.out.println("nextHandler为空");  
            }
        }
    
        public Map<String, JsonParser> getParsers() {
            return parsers;
        }
    
        public void setParsers(Map<String, JsonParser> parsers) {
            this.parsers = parsers;
        }
    
        public Handler getNextHandler() {
            return nextHandler;
        }
    
        public void setNextHandler(Handler nextHandler) {
            this.nextHandler = nextHandler;
        }
    }
    

    在spring的配置文件中,我们配置的责任链的下一个Handler是BussiNessHander。

    BussiNessHander:

    package cn.pomit.springwork.activemq.handler.bussiness;
    
    import cn.pomit.springwork.activemq.model.JmsMessage;
    
    public class BussiNessHander implements Handler{
        private Handler nextHandler;
    
        @Override
        public void hander(JmsMessage msg) {
            System.out.println(msg.getBody().toString());
            if(nextHandler!=null){
                nextHandler.hander(msg);
            }
        }
    
    
        public Handler getNextHandler() {
            return nextHandler;
        }
    
        public void setNextHandler(Handler nextHandler) {
            this.nextHandler = nextHandler;
        }
    }
    

    业务逻辑处理完成后,我们将消息发送到另一个队列上,这时候,我们可以将mqMessageProducer实现Handler接口,这样我们的责任链的下一个就是mqMessageProducer(2.2.4定义的发送方)。

    2.2.5 Json数据处理

    因为本篇是对消息队列监听,假设传输的业务数据是Json,而且有多种业务,那样每个业务的Json数据格式都不一样,比如我们的JsonParseHander是根据Json数据中的msgType,拿到JsonParseHander中的注入的map中jsonParse${msgType}对应的bean。

    这里我们先定义一个接口JsonParser:

    JsonParser:

    package cn.pomit.springwork.activemq.parser;
    
    import cn.pomit.springwork.activemq.model.JmsMessage;
    
    public interface JsonParser {
        public JmsMessage parse(String msg);
    }
    

    然后定义一个jsonParser,JsonParse1001。

    JsonParse1001:

    package cn.pomit.springwork.activemq.parser;
    
    import com.alibaba.fastjson.JSONObject;
    
    import cn.pomit.springwork.activemq.model.DefaultMqModel;
    import cn.pomit.springwork.activemq.model.JmsMessage;
    
    public class JsonParse1001 implements JsonParser {
    
        @Override
        public JmsMessage parse(String msg) {
            System.out.println("消息体为:" + msg);
            JmsMessage<DefaultMqModel> jm = new JmsMessage<DefaultMqModel>();
            JSONObject json = JSONObject.parseObject(msg);
            String msgType = json.getString("msgType");
            String time = json.getString("time");
            String orderNo = json.getString("orderNo");
            String body = json.getString("msgBody");
            jm.setMsgType(msgType);
            jm.setTime(time);
            jm.setOrderNo(orderNo);
            JSONObject jsonBody = JSONObject.parseObject(body);
            DefaultMqModel dm = new DefaultMqModel();
            dm.setTitle(jsonBody.getString("title"));
            dm.setContent(jsonBody.getString("content"));
            jm.setBody(dm);
            return jm;
        }
    
    }
    

    2.2.6 过程中用到的实体

    JmsMessage:

    
    package cn.pomit.springwork.activemq.model;
    
    public class JmsMessage<T> {
        public String time;
        public String msgType;
        public String orderNo;
        public T body;
        public String getTime() {
            return time;
        }
        public void setTime(String time) {
            this.time = time;
        }
        public String getMsgType() {
            return msgType;
        }
        public void setMsgType(String msgType) {
            this.msgType = msgType;
        }
        public String getOrderNo() {
            return orderNo;
        }
        public void setOrderNo(String orderNo) {
            this.orderNo = orderNo;
        }
        public T getBody() {
            return body;
        }
        public void setBody(T body) {
            this.body = body;
        }
    }
    

    DefaultMqModel:

    package cn.pomit.springwork.activemq.model;
    
    public class DefaultMqModel {
        public String title;
        public String content;
        public String getTitle() {
            return title;
        }
        public void setTitle(String title) {
            this.title = title;
        }
        public String getContent() {
            return content;
        }
        public void setContent(String content) {
            this.content = content;
        }
        @Override
        public String toString() {
            return "DefaultMqModel [title=" + title + ", content=" + content + "]";
        }
    }
    

    快速构建项目

    Spring组件化构建

评论列表
mode_edit