月眸


ActiveMQ学习一:ActiveMQ快速上手

毛毛小妖 2019-07-29 209浏览 0条评论
首页/ 正文
分享到: / / / /

随着业务系统复杂度的提高,我们需要对系统进行拆分,不同的系统负责不同的业务,这样便可以实现软件架构的解耦。但是随之而来又会出现一些新的问题,比如我们要新增一个功能,都要对其上游接口或者关联系统进行改造,相信有很多系统都是这样子的,牵一发而动全身。

一、理论

举个常见的例子

系统A要发送数据给系统B和系统C,发送给每个系统的数据可能会有差异,因此系统A要对发送给系统B和系统C的数据进行封装,然后依次发送给B和C。

当系统上线之后有新增了一个需求:把数据也发送给系统D,此时就要修改系统A,再进行一次数据封装成D。在这个过程中,每接入一个下游系统,都要对A系统进行改造,开发联调的效率很低下,如下如:

而这样的架构存在什么样的问题呢?

1、首先,系统之间耦合比较严重

2、面对大流量并发时,很容易被冲垮

3、等待同步存在性能问题

那么如何解决呢?

传统的RPC基本上都是同步调用,整体的服务性能遵守“木桶理论”,也就是说整体的性能取决于链路中国最慢的那个接口,比如A调用B/C/D都花费了50ms,但是B又调用了B1,花费了2000ms,那就直接降低了整体服务性能。

所以,我们需要一种能够解决上述问题的解决方案,他要具备以下三点,也就是我们即将要说的消息中间件(MQ)的好处:

1、当新的模块接入进来时,可以做到代码改动最小;能够解耦

2、设置流量缓冲池,可以让后端系统按照自身吞吐能力进行消费,不被冲垮;能够削峰

3、能够将非关键链路调用的操作异步化并提升整体系统的吞吐能力;能够异步

面向消息的中间件MOM(message-oriented middleware)能够很好的解决上述问题。

MQ是利用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过传递提供消息传递消息队列模型在分布式环境下提供应用解耦、弹性伸缩、冗余存储、流量削峰,异步通信、数据同步等功能。

大概过程是这样子的:

发送者把消息发送给消息服务器,消息服务器把消息存放于若干队列/主题中,在合适的时候,消息服务器将消息发送给接收者。在此过程中,发送和接收和异步的,即发送无需等待,而且发送者和接收者的生命周期无必然联系。尤其是在发布/订阅模式下,可以完成一对多的通信,即让一个消息有多个接收者。

消息发送者可以发送一个消息而无需等待响应,消息发送者将消息发送到一条虚拟的通道(主题/队列)上;

消息接收者则订阅或监听该通道,一条消息可能最终转发给一个或多个消息接收者,这些接收者都无需等待消息发送者做出同步回应,整个过程都是异步的。

举个例子:

一个系统跟另外一个系统通信的时候,假如系统A希望发送一个消息给系统B,让他去处理。但是系统A并不关注系统B怎么处理或者有没有处理好,所以系统A把消息发送给MQ,然后就不管这条消息的“死活”了。接着系统B从MQ取出消息进行消费即可,至于怎么处理,是否处理完毕,都是系统B的事情,与系统A无关。

ActiveMQ有两种模式,即queue(队列)和topic(主题),如下图所示:

1、队列模式:每一个消息只能被一个消费者消费,当有一个消费者使用该消息后,即从我们的队列中取出该消息。

2、主题模式:每一个消息可以被多个消费者消费

二、环境准备

需要安装的软件有:

1、安装jdk环境(略)

2、安装ActiveMQ

1>首先去官网(http://activemq.apache.org/activemq-5144-release.html)下载linux最新版的ActiveMQ

2>把下载好的文件找一个地方存放并解压,比如“/home/mq/”目录下

tar -zxvf apache-activemq-5.14.4-bin.tar.gz  //解压
mv apache-activemq-5.14.4 activemq   //重命名

3>开启防火墙或设置白名单

4>启动ActiveMQ

进入解压好的ActiveMQ目录下:

cd activemq/bin/
./activemq start

5>执行完了,可以在浏览器访问:http://localhost:8161,出现如下界面,则说明启动成功。

说明:控制台默认用户名和密码为:admin/admin,存放在jetty-realm.properties,可进行修改,控制台默认端口为8161。

三、使用queue模式编码

了解了MQ的相关知识,下面就来编码吧。每一个天上飞的理论都有一个落地的实现,activeMQ仅仅是MQ的一种,我们就以activeMQ来进行编码吧,这次编码没有用到spring,后面会有spring版本的文章,敬请期待。其他的比如kafka、RocketMQ、RabbitMQ道理类似。在编码之前,我们先来了解一下编码的整体流程:

1,首先初始化工厂

2,创建连接

3,创建session

4,生产者发送消息到目的地/消费者从目的地消费消息

是不是很眼熟呢?没错,就跟刚开始学java时连接数据库类似的流程。

首先我们要新建一个maven工程,在pom.xml文件中引入activemq的jar包。具体怎么搭建maven工程就不说啦,项目demo可在https://gitee.com/shengyu4/activemqDemo下载。主要代码如下:

1>pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>com.mq</groupId>
	<artifactId>activemqDemo</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<packaging>pom</packaging>

	<name>activemqDemo</name>
	<url>http://maven.apache.org</url>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<activemq.version>5.14.4</activemq.version>
		<xbean-spring.version>3.16</xbean-spring.version>
	</properties>

	<dependencies>
		<!-- activemq所需jar包 -->
		<dependency>
			<groupId>org.apache.activemq</groupId>
			<artifactId>activemq-all</artifactId>
			<version>${activemq.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.xbean</groupId>
			<artifactId>xbean-spring</artifactId>
			<version>${xbean-spring.version}</version>
		</dependency>
		
		<!-- junit所需jar包 -->
		<dependency>
			<groupId>junit</groupId>
			<artifactId>junit</artifactId>
			<version>3.8.1</version>
			<scope>test</scope>
		</dependency>
		
		<!-- log4j所需jar包 -->
		<dependency>
			<groupId>org.slf4j</groupId>
			<artifactId>slf4j-api</artifactId>
			<version>1.7.25</version>
		</dependency>
		<dependency>
			<groupId>ch.qos.logback</groupId>
			<artifactId>logback-classic</artifactId>
			<version>1.2.3</version>
		</dependency>
	</dependencies>
	<modules>
		<module>producer</module>
		<module>consumer</module>
	</modules>
</project>

2>生产者

package org.producer;

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class ActiveMqProducer{
	
	private static final Log logger = LogFactory.getLog(ActiveMqProducer.class);
	
	//activeMQ服务器地址
	private static final String ACTIVEMQ_URL = "tcp://192.168.179.128:61616";
	//activeMQ用户名(默认用户名为admin)
	private static final String USERNAME = "admin";
	//activeMQ密码(默认密码为admin)
	private static final String PASSWORD = "admin";
	//消息队列名称
	private static final String QUEUE_NAME = "queuetest";
	
    public static void main( String[] args ) throws JMSException{
    	//1.创建连接工厂
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,ACTIVEMQ_URL);
        //2.通过连接工厂,获得连接并启动访问
        Connection conn = factory.createConnection();
        conn.start();
        
        //3.创建会话session,两个参数分别为(事务和签收)
        Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //4.创建目的地(具体为队列queue或者主题topic)
        Queue queue = session.createQueue(QUEUE_NAME);
        
        //5.创建消息的生产者
        MessageProducer producer = session.createProducer(queue);
        //6.创建TextMessage类型的消息
        TextMessage message = session.createTextMessage("hello,world");
        //7.由生产者发送消息
        producer.send(message);
        
        //8.关闭资源
        producer.close();
        session.close();
        conn.close();
        
        logger.info("消息发布成功!");
    }
}

运行上述生产者代码,运行结果如下表示运行成功。

再打开控制台看一下,发现有了一条消息:

控制台说明:

Number Of Pending Messages 等待消费的消息 这个是当前未出队列的数量。(总接收数 - 总出队数)
Number Of Consumers 消费者数量 消费者端的消费者数量
Messages Enqueued 进队消息数 进入队列的总数量
Messages Dequeued 出队消息数 可理解为消费者消费掉的数量

3>消费者

package org.consumer;

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class ActiveMqConsumer {
	
	//activeMQ服务器地址
	private static final String ACTIVEMQ_URL = "tcp://192.168.179.128:61616";
	//activeMQ用户名(默认用户名为admin)
	private static final String USERNAME = "admin";
	//activeMQ密码(默认密码为admin)
	private static final String PASSWORD = "admin";
	//消息队列名称
	private static final String QUEUE_NAME = "queuetest";
	
    public static void main( String[] args ) throws JMSException{
    	//1.创建连接工厂
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,ACTIVEMQ_URL);
        //2.通过连接工厂,获得连接并启动访问
        Connection conn = factory.createConnection();
        conn.start();
        
        //3.创建会话session,两个参数分别为(事务和签收)
        Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //4.创建目的地(具体为队列queue或者主题topic)
        Queue queue = session.createQueue(QUEUE_NAME);
        
        //5.创建消息的消费者
        MessageConsumer consumer = session.createConsumer(queue);
        while(true){
            //第一种方式:通过同步接收的方式来接受消息
        	TextMessage message = (TextMessage)consumer.receive();
        	if(message!=null){
        		System.out.println("消费者收到消息:" + message.getText());
        	}
        }

        /*
         * 第二方式:通过设置监听的方式来接收消息
        consumer.setMessageListener(new MessageListener() {
			
			public void onMessage(Message message) {
				if(message!=null && message instanceof TextMessage){
					TextMessage textMessage = (TextMessage) message;
					try {
						System.out.println("消费者收到消息:" + textMessage.getText());
					} catch (JMSException e) {
						e.printStackTrace();
					}
				}
			}
		});
        try {
			System.in.read();
		} catch (IOException e) {
			e.printStackTrace();
		}*/
        consumer.close();
        session.close();
        conn.close();
    }
}

运行消费者类,出现下列信息说明消费成功:

再来看下控制台,发现确实被消费掉了。

这种编码模式属于queue模式,他的特点如下:

      1,每个消息只能有一个消费者
      2,消息的生产者和消费者没有时间上的相关性,就比如发短信的过程,收消息的人关机也不影响基站消息的发送,开机就可以看到消息了
      3,消息被消费后,队列不再存储,所以消费者不会消费到已经被消费掉的消息

关于这种模式要注意的几点:

      1,先启动生产者,只启动一个消费者,发送的消息将由这一个消费者全部消费。

      2,先启动生产者,先启动1号消费者,再启动2号消费者,2号消费者将不能再消费。

      3,先启动两个消费者,再启动生产者,两个消费者将会平分消息,一人一半。

四、使用Topic模式编码

上面我们介绍了使用queue模式来编码,activeMQ还有一种形式就是主题模式。我们先来看一下Topic模式的特点吧:

1,每个Topic类型的消息可以有多个消费者,属于1对多的关系

2,生产者和消费者有时间上的关联。订阅一个主题的消费者只能消费自他订阅之后发布的消息。

3,生产者生产消息时,他是无状态的。假如无人订阅,那就是一条废消息,所以一般都是先启动消费者再启动生产者。

JMS允许客户创建持久订阅,这在一定程度上放松了时间上的想关心要求。持久订阅允许消费者消费他在未处于激活状态时发送的消息,就像我们的微信公众号订阅

说到这里,Toipc模式的编码就不再写了。就把之前queue模式的代码的queue换成topic就好了。

五、总结

(1)两种消费方式

      1,同步阻塞方式:receive()方法

      订阅者或接收者调用MessagConsumer的receive()方法,在接收到消息之前一直阻塞。

      2,异步非阻塞方式:onMessage()监听器

      订阅者或接收者通过MessagConsumer的setMessageListener()注册一个消息监听器,当消息到达之后,系统自动调用监听器的的onMessage()方法。

(2)两大模式的比较(queue/topic)

比较项目 工作模式 有无状态 传递完整性 处理效率
Queue队列形式 “负载均衡”模式,如果当前没有消费者,消息不会丢弃;如果有多个消费者,那么一条消息也只会发送其中一个消费者,并且需要消费者的确认信息 Queue形式默认会在服务器上以文件形式保存,也可以配置成DB存储 消息不会被丢弃 由于一条消息只发送给一个消费者,所以就算消费者再多,性能也不会明显降低
Topic主题形式 “订阅/发布”模式,如果当前没有订阅者,消息将会被丢弃。如果有多个订阅者,那么这些订阅者都会收到消息 无状态 如果没有订阅者,消息会被丢弃 由于这种模式是按照消费者的数量进行复制,所以处理性能会随着消费者的增多而下降

六、demo下载地址

https://gitee.com/shengyu4/activemqDemo

最后修改:2019-07-29 11:27:17 © 著作权归作者所有
如果觉得我的文章对你有用,请随意赞赏
扫一扫支付

上一篇

发表评论

说点什么吧~

评论列表

还没有人评论哦~赶快抢占沙发吧~