ActiveMQ

入门概述

前言

在何种场景下使用了消息中间件

为什么要在系统里引入消息中间件

分类

MQ消息中间件,天上飞的理念,必然有落地的实现产品

  • kafka:采用java、Scala

  • rabbitmq:采用erlang

  • rocketmq:采用java

  • activemq:采用java

技术维度

  • api发送和接收
  • MQ的高可用性
  • MQ的集群和容错配置
  • MQ的持久化
  • 延时发送/定时投递
  • 签收机制
  • spring整合

产生背景

学生请教老师问题

无MQ

来一个学生,老师解答一个学生的问题,如果有多个学生,需要排队,会产生阻塞

有MQ

学生按照老师的要求制定一个模板(格式/约定),模板内容:问题、提问人、手机、问题正文,每个学生都按照该模板提前将内容准备好,按照先后顺序,将模板内容交个班长,而不再是直接找老师,可以避免了长时间排队等候。

微服务架构

微服务架构后,链式调用是我们在写程序时候一般流程,为了完成一个整体功能会将其拆分成多个函数或者是子模块,比如模块A调用模块B,模板B调用模块C,模板C调用模块D。但是在大小分布式应用中,系统间的RPC交互繁杂,一个功能背后要调用上百个接口并非不可能,从单机架构过渡到分布式微服务架构的通例。

系统之间直接调用的这种架构在实际工程落地存在着几个问题

  • 系统之家接口耦合比较严重
  • 面对大流量高并发时,容易被冲垮
  • 等待同步存在性能问题

系统耦合严重

每新增一个下游功能,都要对上有的相关接口进行改造

举个例子:加入系统A要发送数据给系统B和C,发送给每个系统的数据可能有差异,因此系统A对要发送给每个系统的数据进行了组装,然后逐一发送;

当代码上线后又新增了一个需求:

把数据也发送给D,新上了一个系统D也要接受A系统的数据。此时就需要修改系统A,让他感知到D系统的存在,同时把数据处理好再由系统A发送数据给系统D。在这个过程中你会看到,每接入一个下游系统,都要对系统A进行代码改造,开发联调的效率很低。

高并发

每个接口模块的吞吐能力是有限的,这个上限能力就好像现实生活中的堤坝,当大流量(洪水)来临时,容易被冲垮

举个例子秒杀业务:

上游系统发起下单购买操作,就是一个下单操作

下游系统完成秒杀业务逻辑却包含了以下内容

  • 读取订单
  • 库存检查
  • 库存冻结
  • 余额检查
  • 余额冻结
  • 订单生成
  • 余额扣减
  • 库存扣减
  • 生成流水
  • 余额解冻
  • 库存解冻

等待同步存在性能问题

RPC接口基本上是同步调用,整体的服务性能遵循“木桶理论”,即整体系统的耗时取决于链路中最慢的那个接口。

比如A调用B/C/D都是50ms,但此时B又调用了B1,花费2000ms,那么直接就拖累了整个服务性能。

1565837421943

解决

根据上述的几个问题,在系统设计时可以明确要达到的目标:

  • 要做到系统解耦,当新的模块接进来时,可以做到代码改动最小;能够解耦
  • 设置流量缓存池,可以让后端系统按照自身吞吐能力进行消费,不被冲垮;能够削峰
  • 强弱依赖梳理能将非关键调用链路的操作异步化并提升整体系统的吞吐能力;能够异步

因此引入了ActiveMq,达到解耦、削峰和异步

消息中间件

定义

面向消息的中间件Message-oriented middleware (MOM)能够很好的解决以上问题

是指利用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数据通信来进行分布式系统的集成

通过提供消息传递和消息排队模型在分布式环境下提供应用解耦、弹性伸缩、冗余存储、流量削峰、异步通信、数据同步等功能

大致的过程是这样:

发送者把消息发送给消息服务器,消息服务器将消息存放在于若干队列/主题中,在合适的时候,消息服务器会将消息转发给接受者,在这个过程中,发送和接受是异步的,也就是发送无需等待,而且发送者和接受者的生命周期也没有必然关系;

尤其在发布/订阅模式下,也可以完成一对多的通信,即让一个消息有多个接受者。

1565599014324

特点

采用异步处理模式

消息发送者可以发送一个消息而无须等待响应。消息发送者将消息发送到一条虚拟的通道(主题或队列)上;

消息接收者则订阅或监听该通道。一条信息可能最终被转发给一个或多个消息接收者,这些接收者都无须对消息发送至做成同步回应。整个过程都是异步的。

案例:

也就是说,一个系统跟另外一个系统之间进行通信的时候,假如系统A希望发送一个消息给系统B,让他去处理。

但是系统A不关注系统B到底怎么处理或者有没有处理好,所以系统A把消息发送给MQ,然后就不管这条消息的“死活”了,接着系统B从MQ里消费出来即可。至于怎么处理,是否处理完毕,什么时候处理,都是系统B的事,与系统A无关。

1565839394785

这样的一直通向方式,就是所谓的“异步”通向方式对于系统A来说,只要把消息发送给MQ,然后系统B就会异步的去进行处理了,系统A不需要“同步”的等待系统B处理完。这样的好处即所谓的解耦

应用系统之间解耦合

发送者和接收者不必了解对方,只需要确认消息

发送者和接收者不必同时在线

1565599265022

ActiveMQ安装和控制台

官网下载

ActiveMQ

1565599497215

Linux安装

解压安装

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
[root@192 modules]# ll
total 587972
-rw-r--r--. 1 root root 58588039 Aug 3 23:43 apache-activemq-5.15.9-bin.tar.gz
drwxr-xr-x. 9 es es 4096 Jul 9 17:16 elasticsearch-6.3.1
-rw-r--r--. 1 root root 91429350 Jul 8 08:42 elasticsearch-6.3.1.tar.gz
-rwxrwxrwx. 1 root root 178418154 Jul 7 21:08 jdk-8u202-linux-x64.rpm
drwxrwxr-x. 11 es es 4096 Jun 30 2018 kibana-6.3.1-linux-x86_64
-rw-r--r--. 1 root root 205397076 Jul 8 08:42 kibana-6.3.1-linux-x86_64.tar.gz
-rw-r--r--. 1 root root 17855952 Jun 8 20:04 MySQL-client-5.5.48-1.linux2.6.x86_64.rpm
-rw-r--r--. 1 root root 50372369 Jun 8 20:24 MySQL-server-5.5.48-1.linux2.6.x86_64.rpm

[root@192 modules]# pwd
/opt/modules
[root@192 modules]# tar -zxvf apache-activemq-5.15.9-bin.tar.gz
[root@192 modules]# mkdir /myactiveMQ
[root@192 modules]# cp -r apache-activemq-5.15.9 /myactiveMQ/
[root@192 modules]# ls /myactiveMQ/
apache-activemq-5.15.9

普通启动

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
[root@192 bin]# pwd
/myactiveMQ/apache-activemq-5.15.9/bin
[root@192 bin]# ll
total 152
-rwxr-xr-x. 1 root root 21534 Aug 15 14:43 activemq
-rwxr-xr-x. 1 root root 6189 Aug 15 14:43 activemq-diag
-rw-r--r--. 1 root root 16068 Aug 15 14:43 activemq.jar
-rw-r--r--. 1 root root 4946 Aug 15 14:43 env
drwxr-xr-x. 2 root root 4096 Aug 15 14:43 linux-x86-32
drwxr-xr-x. 2 root root 4096 Aug 15 14:43 linux-x86-64
drwxr-xr-x. 2 root root 4096 Aug 15 14:43 macosx
-rw-r--r--. 1 root root 83820 Aug 15 14:43 wrapper.jar
[root@192 bin]# ./activemq start
INFO: Loading '/myactiveMQ/apache-activemq-5.15.9//bin/env'
INFO: Using java '/usr/java/jdk1.8.0_202-amd64/bin/java'
INFO: Starting - inspect logfiles specified in logging.properties and log4j.properties to get details
INFO: pidfile created : '/myactiveMQ/apache-activemq-5.15.9//data/activemq.pid' (pid '4015')
[root@192 bin]# lsof -i:61616
COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
java 4015 root 129u IPv6 26205 0t0 TCP *:61616 (LISTEN)
[root@192 bin]# ps -ef|grep activemq
root 4015 1 21 14:46 pts/0 00:00:26 /usr/java/jdk1.8.0_202-amd64/bin/java -Xms64M -Xmx1G -Djava.util.logging.config.file=logging.properties -Djava.security.auth.login.config=/myactiveMQ/apache-activemq-5.15.9//conf/login.config -Dcom.sun.management.jmxremote -Djava.awt.headless=true -Djava.io.tmpdir=/myactiveMQ/apache-activemq-5.15.9//tmp -Dactivemq.classpath=/myactiveMQ/apache-activemq-5.15.9//conf:/myactiveMQ/apache-activemq-5.15.9//../lib/: -Dactivemq.home=/myactiveMQ/apache-activemq-5.15.9/ -Dactivemq.base=/myactiveMQ/apache-activemq-5.15.9/ -Dactivemq.conf=/myactiveMQ/apache-activemq-5.15.9//conf -Dactivemq.data=/myactiveMQ/apache-activemq-5.15.9//data -jar /myactiveMQ/apache-activemq-5.15.9//bin/activemq.jar start
root 4070 3911 0 14:48 pts/0 00:00:00 grep activemq
[root@192 bin]# ps -ef|grep activemq|grep -v grep
root 4015 1 19 14:46 pts/0 00:00:26 /usr/java/jdk1.8.0_202-amd64/bin/java -Xms64M -Xmx1G -Djava.util.logging.config.file=logging.properties -Djava.security.auth.login.config=/myactiveMQ/apache-activemq-5.15.9//conf/login.config -Dcom.sun.management.jmxremote -Djava.awt.headless=true -Djava.io.tmpdir=/myactiveMQ/apache-activemq-5.15.9//tmp -Dactivemq.classpath=/myactiveMQ/apache-activemq-5.15.9//conf:/myactiveMQ/apache-activemq-5.15.9//../lib/: -Dactivemq.home=/myactiveMQ/apache-activemq-5.15.9/ -Dactivemq.base=/myactiveMQ/apache-activemq-5.15.9/ -Dactivemq.conf=/myactiveMQ/apache-activemq-5.15.9//conf -Dactivemq.data=/myactiveMQ/apache-activemq-5.15.9//data -jar /myactiveMQ/apache-activemq-5.15.9//bin/activemq.jar start
[root@192 bin]# netstat -anp|grep 61616
tcp 0 0 :::61616 :::* LISTEN 4015/java

普通关闭

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
[root@192 bin]# ./activemq stop
INFO: Loading '/myactiveMQ/apache-activemq-5.15.9//bin/env'
INFO: Using java '/usr/java/jdk1.8.0_202-amd64/bin/java'
INFO: Waiting at least 30 seconds for regular process termination of pid '4015' :
Java Runtime: Oracle Corporation 1.8.0_202 /usr/java/jdk1.8.0_202-amd64/jre
Heap sizes: current=62976k free=62320k max=932352k
JVM args: -Xms64M -Xmx1G -Djava.util.logging.config.file=logging.properties -Djava.security.auth.login.config=/myactiveMQ/apache-activemq-5.15.9//conf/login.config -Dactivemq.classpath=/myactiveMQ/apache-activemq-5.15.9//conf:/myactiveMQ/apache-activemq-5.15.9//../lib/: -Dactivemq.home=/myactiveMQ/apache-activemq-5.15.9/ -Dactivemq.base=/myactiveMQ/apache-activemq-5.15.9/ -Dactivemq.conf=/myactiveMQ/apache-activemq-5.15.9//conf -Dactivemq.data=/myactiveMQ/apache-activemq-5.15.9//data
Extensions classpath:
[/myactiveMQ/apache-activemq-5.15.9/lib,/myactiveMQ/apache-activemq-5.15.9/lib/camel,/myactiveMQ/apache-activemq-5.15.9/lib/optional,/myactiveMQ/apache-activemq-5.15.9/lib/web,/myactiveMQ/apache-activemq-5.15.9/lib/extra]
ACTIVEMQ_HOME: /myactiveMQ/apache-activemq-5.15.9
ACTIVEMQ_BASE: /myactiveMQ/apache-activemq-5.15.9
ACTIVEMQ_CONF: /myactiveMQ/apache-activemq-5.15.9/conf
ACTIVEMQ_DATA: /myactiveMQ/apache-activemq-5.15.9/data
Connecting to pid: 4015
.Stopping broker: localhost
. TERMINATED
[root@192 bin]# ps -ef|grep activemq|grep -v grep

带运行日志的启动方式

1
2
3
4
5
6
7
8
[root@192 bin]# pwd
/myactiveMQ/apache-activemq-5.15.9/bin
[root@192 bin]# ./activemq start > /myactiveMQ/run_activemq.log
[root@192 bin]# cat /myactiveMQ/run_activemq.log
INFO: Loading '/myactiveMQ/apache-activemq-5.15.9//bin/env'
INFO: Using java '/usr/java/jdk1.8.0_202-amd64/bin/java'
INFO: Starting - inspect logfiles specified in logging.properties and log4j.properties to get details
INFO: pidfile created : '/myactiveMQ/apache-activemq-5.15.9//data/activemq.pid' (pid '4206')

使用

  • 主要功能:实现高可用、高性能、可伸缩、易用和安全的企业级面向消息服务的系统
  • 异步消息的消费和处理
  • 控制消息的消费顺序
  • 可以和spring/springboot整合简化编码
  • 配置集群容错的MQ集群

查看后台进程

1
2
3
4
5
ps -ef|grep activemq|grep -v grep      // grep -v  grep 可以不让显示grep 本来的信息

netstat -anp|grep 61616 // activemq 的默认后台端口是61616

lsof -i:61616

带运行日志启动

1
./activemq start >  /myactivemq/myrunmq.log

Apache ActiveMQ控制台

访问

默认的用户名和密码是admin/admin

1565852259925

备注

采用61616端口提供JMS服务

采用8161端口提供管理控制台服务

Java编码实现ActiveMQ通讯

IDEA建maven工程

编写pom.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.zbiti</groupId>
<artifactId>activemqDemo</artifactId>
<version>1.0-SNAPSHOT</version>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>

<dependencies>
<!--activemq所需要的jar-->
<!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-all -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.9</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.xbean/xbean-spring -->
<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-spring</artifactId>
<version>3.16</version>
</dependency>

<!--junit/log4j等基础通用配置-->
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<!-- https://mvnrepository.com/artifact/ch.qos.logback/logback-classic -->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.18</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
</dependencies>

</project>

JMS编码总体架构

1565600057926

比对

传统jdbc

1565600092021

1565600178302

队列和主题

两大模式特性

队列:点对点

主题:一对多

1565600151461

在点对点的消息传递域中,目的地被称为队列(queue)

1565601303261

消费生产者

代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package com.zbiti.activemq.queue;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class JmsProduce {



public static final String ACTIVEMQ_URL = "tcp://192.168.1.101:61616";
public static final String QUEUE_NAME = "queue01";


public static void main(String[] args) throws JMSException {

// 1 创建连接工厂,按照给定的url地址,采用默认的用户名密码
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
// 2 通过连接工厂,获得连接connection并启动访问
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();

// 3 创建会话session
// 两个参数,第一个事务,第二个签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 4 创建目的地(队列或者是主题)
Queue queue = session.createQueue(QUEUE_NAME);

// 5 创建消息的生产者
MessageProducer messageProducer = session.createProducer(queue);
// 6 通过messageProducer生产3条消息发送到消息队列中
for (int i = 1; i < 4 ; i++) {
//7创建字消息
TextMessage textMessage = session.createTextMessage("msg--" + i);
//8通过messageProducer发送给mq
messageProducer.send(textMessage);
}
//9关闭资源
messageProducer.close();
session.close();
connection.close();

System.out.println(" **** 消息发送到MQ完成 ****");

}
}

控制台查看

1565855086814

控制台说明

字段 说明 备注
Number Of Pending Messages 等待消费的消息 这个是当前未出队的数量。公式=总接收数-总出队数
Number Of Consumers 消费者数量 消费者端的消费数量
Messages Enqueued 进队消息数 进入队列的总数量,包括出队列的。这个数量只增不减
Messages Dequeued 出队消息数 可以理解为是消费者消费掉的数量

总结

当有一个消息进入这个队列时,等待消费的消息是1,进入队列的消息是1。

当消息消费后,等待消费的消息是0,进入队列的消息是1,出队列的消息是1。

再来一条消息时,等待消费的消息是1,进入队列的消息就是2。

消息消费者

代码1

同步阻塞方式

订阅者或接受者调用messageConsumer.receive();方法来接收消息,receive方法能够在接收到消息之前(或超时之前)将一直阻塞。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package com.zbiti.activemq.queue;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class JmsConsumer {
public static final String ACTIVEMQ_URL = "tcp://192.168.1.101:61616";
public static final String QUEUE_NAME = "queue01"; // 1对1 的队列

public static void main(String[] args) throws Exception {
// 1 创建连接工厂,按照给定的url地址,采用默认的用户名密码
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
// 2 通过连接工厂,获得连接connection并启动访问
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();

// 3 创建会话session
// 两个参数,第一个事务,第二个签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 4 创建目的地(队列或者是主题)
Queue queue = session.createQueue(QUEUE_NAME);

//5 创建消费者
MessageConsumer messageConsumer = session.createConsumer(queue);
while(true){
TextMessage textMessage = (TextMessage)messageConsumer.receive();
if(null!=textMessage){
System.out.println("***消费者接收到消息:"+textMessage.getText());
}else {
break;
}
}
messageConsumer.close();
session.close();
connection.close();
}
}

输出

1
2
3
***消费者接收到消息:msg--1
***消费者接收到消息:msg--2
***消费者接收到消息:msg--3

代码2

通过监听的方式来消费消息

System.in.read();

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package com.zbiti.activemq.queue;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class JmsConsumer {
public static final String ACTIVEMQ_URL = "tcp://192.168.1.101:61616";
public static final String QUEUE_NAME = "queue01"; // 1对1 的队列

public static void main(String[] args) throws Exception {
// 1 创建连接工厂,按照给定的url地址,采用默认的用户名密码
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
// 2 通过连接工厂,获得连接connection并启动访问
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();

// 3 创建会话session
// 两个参数,第一个事务,第二个签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 4 创建目的地(队列或者是主题)
Queue queue = session.createQueue(QUEUE_NAME);

//5 创建消费者
MessageConsumer messageConsumer = session.createConsumer(queue);
messageConsumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
if (null != message && message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("***消费者接收到消息:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
System.in.read();
messageConsumer.close();
session.close();
connection.close();
}
}

输出

1
2
3
***消费者接收到消息:MessageListener--1
***消费者接收到消息:MessageListener--2
***消费者接收到消息:MessageListener--3

控制台

1566291915135

测试

先生产,只启动1号消费者。问题:1号消费者能消费消息吗? Y

先生产,先启动1号消费者,再启动2号消费者,问题:2号消费者还能消费消息吗? N

先启动2个消费者,再生产6条消息,问题:消费情况如何? 一人一半

编码小总结

JMS开发的基本步骤

1566293774911

两种消费方式

  • 同步阻塞方式(receive())

订阅者或接收者调用MessageConsumer的receive()方法来接收消息,receive方法在能接收到消息之前(或超时之前)将一直阻塞。

  • 异步非阻塞方式(监听器onMessage()

订阅者或接收者通过MessageConsumersetMessageListener(MessageListener listener)注册一个消息监听器

当消息到达之后,系统自动调用监听器MessageListeneronMessage(Message message)方法。

在发布订阅消息传递域中,目的地被称为主题(topic)

1565601347749

1565601368275

发布主题生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package com.zbiti.activemq.queue;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class JmsProduce_Topic {
public static final String ACTIVEMQ_URL = "tcp://192.168.1.101:61616";
public static final String TOPIC_NAME = "topic01";


public static void main(String[] args) throws JMSException {

// 1 创建连接工厂,按照给定的url地址,采用默认的用户名密码
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
// 2 通过连接工厂,获得连接connection并启动访问
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();

// 3 创建会话session
// 两个参数,第一个事务,第二个签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 4 创建目的地(队列或者是主题)
Topic topic = session.createTopic(TOPIC_NAME);


// 5 创建消息的生产者
MessageProducer messageProducer = session.createProducer(topic);
// 6 通过messageProducer生产3条消息发送到消息队列中
for (int i = 1; i <=3 ; i++) {
//7创建字消息
TextMessage textMessage = session.createTextMessage("TOPIC_NAME--" + i);
//8通过messageProducer发送给mq
messageProducer.send(textMessage);
}
//9关闭资源
messageProducer.close();
session.close();
connection.close();

System.out.println("****TOPIC_NAME消息发送到MQ完成 ****");

}
}

订阅主题消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package com.zbiti.activemq.queue;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class JmsConsumer_Topic {
public static final String ACTIVEMQ_URL = "tcp://192.168.1.101:61616";
public static final String TOPIC_NAME = "topic01"; // 1对1 的队列

public static void main(String[] args) throws Exception {
System.out.println("我是3号消费者");
// 1 创建连接工厂,按照给定的url地址,采用默认的用户名密码
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
// 2 通过连接工厂,获得连接connection并启动访问
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();

// 3 创建会话session
// 两个参数,第一个事务,第二个签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 4 创建目的地(队列或者是主题)
Topic topic = session.createTopic(TOPIC_NAME);

//5 创建消费者
MessageConsumer messageConsumer = session.createConsumer(topic);

messageConsumer.setMessageListener((message)->{
if (null != message && message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("***消费者接收到消息:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
System.in.read();
messageConsumer.close();
session.close();
connection.close();
}
}

先启动订阅再启动生产,不然发送的消息是废消息

消费者1号

1566299150243

消费者2号

1566299180002

消费者3号

1566299189471

生产者

1566299214843

控制台

启动3个消费者,生产者发布3条消息,3个消费者均接收到这3条消息

1566299089832

小总结

两大模式比较

比较项目 Topic模式队列 Queue模式队列
工作模式 “订阅-发布”模式,如果当前没有订阅者,消息将会被丢弃。如果有多个订阅者,那么这些订阅者都会收到消息 “负载均衡”模式,如果当前没有消费者,消息也不会被丢弃;如果有多个消费者,那么一条消息也只会发送给妻子一个消费者,并且要求消费者A出口信息。
有无状态 无状态 Queue数据默认会在mq服务器上以文件形式保存,比如Active_MQ一般保存在$AMQ_HOME\data\kr-store\data下面。也可以配置DB存储。
传递完整性 如果没有订阅者,消息会被丢弃 消息不会被丢弃
处理效率 由于消息要按照订阅者的数量进行复制,所以除了性能会随着订阅者的增加而明显降低,并且还要结合不太消息协议自身性能差异 由于一条消息只发送给一个消费者,所以就算消费者再多,性能也不会有明显降低。当然不太消息协议的具体性能也是有差异的。

JMS规范和落地产品

是什么

JavaEE

javaEE是一套使用java进行企业级应用开发的大家一致遵循的13个核心规范工业标准。JavaEE平台提供了一个基于组件的方法来加快设计、开发、装配及部署企业应用程序

  • JDBC(Java Database)数据库连接
  • JNDI(Java Naming and Directory Interface)java的命名和目录接口
  • EJB(Enterprise JavaBean)
  • RMI(Remote Method Invoke)远程方法调用
  • JAVA IDL(Interface Description Language)/CORBA(Common Object Broker Architecture)接口的定义语言/公用对象请求代理程序体系结构
  • JSP(Java Server Pages)
  • Servlet
  • XML(Extensible Markup Language)可扩展的标记语言
  • JMS(Java Message Service)java消息服务
  • JTA(Java Transition Service)java事务API
  • JTS(Java Transition Service)java事务服务
  • JavaMail
  • JAF(JavaBean Activation Framework)

JMS

Java Message Service(Java消息服务是JavaEE中的一个技术)

什么是消息服务

Java消息服务指的是两个应用程序之间进行异步通信的API,它为标准消息协议和消息服务提供了一组通用接口,包括创建、发送、读取消息等,用于支持java应用程序开发。在JavaEE中,当两个应用程序使用JMS进行通信时,它们之间并不是直接相连的,而是通过一个共同的消息收发服务组件关联起来以达到解耦/异步削峰的效果。

1566300799629

MQ中间件的其他落地产品

1566299938242

消息队列的详细比较

特性 ActiveMQ RabbitMQ Kafka RocketMQ
PRODUCER-CONSUMER 支持 支持 支持 支持
PUBLISH-SUBSCRIBE 支持 支持 支持 支持
REQUEST-REPLY 支持 支持 - 支持
API完备性 低(静态配置)
多语言支持 支持,java优先 语言无关 支持,java优先 支持
单机吞吐量 万级 万级 十万级 单机万级
消息延迟 - 微秒级 毫秒级 -
可用性 高(主从) 高(主从) 非常高(分布式)
消息丢失 - 理论上不会丢失 -
消息重复 - 可控制 理论上不会有重复 -
文档的完备性
提供快速入门
首次部署难度 -

JMS的组成结果和特点

JMS provider

实现JMS接口和规范的消息中间件,也就是我们的MQ服务器

JMS producer

消息生产者,创建和发送JMS消息的客户端应用

JMS consumer

消息消费者,接收和处理JMS消息的客户端应用

JMS message

消息头

JMSDestination

消息发送的目的地,主要是指Queue和Topic

1
void send(Destination destination, Message message) throws JMSException;
JMSDeliveryMode

持久模式和非持久模式

一条持久性的消息:应该被传送“一次仅仅一次”,这就意味着如果JMS提供者出现故障,该消息并不会丢失,它会在服务器恢复之后再次传递。

一条非持久的消息:最低会传送一次,这就意味着服务器出现故障,该消息将永远丢失。

1
void setDeliveryMode(int deliveryMode) throws JMSException;
JMSExpiration

设置过期时间

可以设置消息在一点时间后过去,默认是永不过期

消息过期时间,等于Destination的send方法中的timeToLive值加上发送时刻的GMT时间值。

如果timeToLive值等于零,则JMSExpiration被设为零,表示该消息永不过期。

如果发送后,在消息过期时间之后消息还没有被发送到目的地,则该消息被清除。

1
2
void send(Message message, int deliveryMode, int priority, long timeToLive)
throws JMSException;
JMSPriority

优先级

消息优先级,从0-9十个级别,0-4是普通消息,5-9是加急消息。

JMS不要求MQ严格按照这十个优先级发送消息,但必须保证加急消息要先于普通消息到达。默认是4级。

JMSMessageID

唯一识别每个消息的标识由MQ产生

1
2
3
void setDisableMessageID(boolean value) throws JMSException;

boolean getDisableMessageID() throws JMSException;

消息属性

如果需要除消息头字段以外的值,那么可以使用消息属性

识别、去重、重点标注等操作非常有用的方法

他们是以属性名和属性值对的形式制定的。可以将属性是为消息头的扩展,属性指定一些消息头没有包括的附加信息,比如可以在属性里指定消息选择器。

消息的属性就行可以分配给一条消息的附加消息头一样。他们允许开发者添加有关消息的不透明附加信息。

它们还用于暴露消息选择器在消息过滤时使用的数据。

1
2
3
TextMessage textMessage = session.createTextMessage("MessageListener--" + i);
textMessage.setStringProperty("username","z3");
messageProducer.send(textMessage);

消息体

封装具体的消息数据
5种消息体格式
  • TextMessage

普通字符串消息,包含一个string

  • MapMessage

一个Map类型的消息,key为string类型,而值为java的基本类型

1
2
3
MapMessage mapMessage = session.createMapMessage();
mapMessage.setString("k1","v1");
messageProducer.send(mapMessage);
  • ByteMessage

二进制数组消息,包含一个byte[]

  • StreamMessage

java数据流消息,用标注流操作来顺序的填充和读取

  • ObjectMessage

对象消息,包含一个可序列化的java对象

发送和接收的消息体类型必须一致对应

JMS的可靠性

持久性PERSISTENT

参数设置说明

持久化:当服务器宕机,消息依然存在

1
messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);

非持久化:当服务器宕机,消息不存在

1
messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

持久的Queue

持久化消息

这是队列的默认传递模式,此模式保证这些消息只被传送一次和成功使用一次。对于这些消息,可靠性是优先考虑的因素。

可靠性的另一个重要方面是确保持久性消息传送至目标后,消息服务在向消费者传送它们之前不会丢失这些消息。

持久的Topic

代码

先启动订阅再启动生产

持久的发布主题生产者

持久的订阅主题消费者

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package com.zbiti.activemq.queue;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class JmsConsumer_Topic {
public static final String ACTIVEMQ_URL = "tcp://192.168.1.101:61616";
public static final String TOPIC_NAME = "topic01"; // 1对1 的队列

public static void main(String[] args) throws Exception {
System.out.println("*****z3");
// 1 创建连接工厂,按照给定的url地址,采用默认的用户名密码
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
// 2 通过连接工厂,获得连接connection
Connection connection = activeMQConnectionFactory.createConnection();
connection.setClientID("z3");

// 3 创建会话session
// 两个参数,第一个事务,第二个签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 4 创建目的地(队列或者是主题)
Topic topic = session.createTopic(TOPIC_NAME);
TopicSubscriber topicSubscriber = session.createDurableSubscriber(topic, "remark.....");
connection.start();


//5 创建消费者
Message message = topicSubscriber.receive();
while (null!=message){
TextMessage textMessage = (TextMessage) message;
System.out.println("*********收到持久化topic:"+textMessage.getText());
message = topicSubscriber.receive(1000L);

}
session.close();
connection.close();
}
}

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package com.zbiti.activemq.queue;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class JmsProduce_Topic {
public static final String ACTIVEMQ_URL = "tcp://192.168.1.101:61616";
public static final String TOPIC_NAME = "topic01";


public static void main(String[] args) throws JMSException {

// 1 创建连接工厂,按照给定的url地址,采用默认的用户名密码
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
// 2 通过连接工厂,获得连接connection
Connection connection = activeMQConnectionFactory.createConnection();


// 3 创建会话session
// 两个参数,第一个事务,第二个签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 4 创建目的地(队列或者是主题)
Topic topic = session.createTopic(TOPIC_NAME);

// 5 创建消息的生产者
MessageProducer messageProducer = session.createProducer(topic);
messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);

connection.start();

// 6 通过messageProducer生产3条消息发送到消息队列中
for (int i = 1; i <=3 ; i++) {
//7创建字消息
TextMessage textMessage = session.createTextMessage("TOPIC_NAME--" + i);
//8通过messageProducer发送给mq
messageProducer.send(textMessage);
}
//9关闭资源
messageProducer.close();
session.close();
connection.close();

System.out.println("****TOPIC_NAME消息发送到MQ完成 ****");

}
}
控制台

启动消费者

1566386535090

1566386560232

1566386690941

启动生产者

1566386677038

1566386610236

1566386628266

结论

一定要先运行一次消费者,等于向MQ注册,类似我订阅了这个主题。

然后再运行生产者发送消息,此时,无论消费者是否在线,都会接收到消息,不在线的话,下次连接的时候,会把没有收过来的消息都接收下来。

类似微信公众号订阅发布

事务

producer提交时的事务

false

只要执行send,就进入队列中

关闭事务,那第2个签收参数的设置需要有效

true

先执行send再执行commit,消息才被真正的提交到队列中

消息需要批量发送,需要缓冲区处理

1566387579870

推荐使用这个可以保证高可用,类似JDBC,如果中间出现异常可以回滚。

事务偏生产者/签收偏消费者

签收Acknowledge

非事务

自动签收(默认

1566389516158

1566389458893

手动签收

客户端调用acknowledge方法手动签收

1566389701829

1566389472542

允许重复消息

1566389717420

事务

生产事务开启,只有commit后才能将全部消息变为已消费
消息生产者

代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package com.zbiti.activemq.queue;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class JmsProduce_TX {
public static final String ACTIVEMQ_URL = "tcp://192.168.1.101:61616";
public static final String QUEUE_NAME = "queue01";


public static void main(String[] args) throws JMSException {

// 1 创建连接工厂,按照给定的url地址,采用默认的用户名密码
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
// 2 通过连接工厂,获得连接connection并启动访问
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();

// 3 创建会话session
// 两个参数,第一个事务,第二个签收
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
// 4 创建目的地(队列或者是主题)
Queue queue = session.createQueue(QUEUE_NAME);

// 5 创建消息的生产者
MessageProducer messageProducer = session.createProducer(queue);

// 6 通过messageProducer生产3条消息发送到消息队列中
for (int i = 1; i < 7; i++) {
//7创建字消息
TextMessage textMessage = session.createTextMessage("MessageListener--" + i);

messageProducer.send(textMessage);
}
//9关闭资源
messageProducer.close();
session.commit();
session.close();
connection.close();

System.out.println(" **** 消息发送到MQ完成 ****");
}
}

控制台

1566390629357

消息消费者

消费者设置了事务并且commit,尽管是手动签收不调用textMessage.acknowledge();,消息也会被消费一次,自动和手动签收没啥区别

代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package com.zbiti.activemq.queue;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class JmsConsumer_TX {
public static final String ACTIVEMQ_URL = "tcp://192.168.1.101:61616";
public static final String QUEUE_NAME = "queue01"; // 1对1 的队列

public static void main(String[] args) throws Exception {
// 1 创建连接工厂,按照给定的url地址,采用默认的用户名密码
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
// 2 通过连接工厂,获得连接connection并启动访问
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();

// 3 创建会话session
// 两个参数,第一个事务,第二个签收
Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
// 4 创建目的地(队列或者是主题)
Queue queue = session.createQueue(QUEUE_NAME);
//5 创建消费者
MessageConsumer messageConsumer = session.createConsumer(queue);
while(true){
TextMessage textMessage = (TextMessage)messageConsumer.receive(4000L);
if(null!=textMessage){
System.out.println("***消费者接收到消息:"+textMessage.getText());
// textMessage.acknowledge();
}else {
break;
}
}

messageConsumer.close();
session.commit();
session.close();
connection.close();
}
}

控制台

1566391484553

签收和事务关系

在事务性会话中,当一个事务被成功提交则消息被自动签收

如果事务回滚,则消息会被再次传送

非事务性会话中,消息何时被签收取决于创建会话时的应答模式(acknowledgement mode)

JMS的点对点总结

点对点模型基于队列的,生产者发消息到队列,消费者从队列接收消息,队列的存在使得消息的异步传输成为可能。

和我们平时给朋友发送短信类似。

  • 如果在Session关闭时有部分消息已被收到但还没有被签收(acknowledged),那当消费者下次连接到相同的队列时,这些消息还会被再次接收。
  • 队列可以长久地保存消息直到消费者收到消息。消费者不需要因为担心消息会丢失而时刻和队列保持激活的连接状态,充分体现了异步传输模式的优势

JMS的发布订阅总结

JMS Pub/Sub模型定义了如何向一个内容节点发布和订阅消息,这些节点被称作topic

主题可以被认为是消息的传输中介,发布者(publisher)发布消息到主题,订阅者(subscribe)从主题订阅消息。

主题使得消息订阅者和消息发布者保持互相独立,不需要接触即可保证消息的传送。

非持久订阅

非持久订阅只有当客户端处于激活状态,也就是和MQ保持连接状态才能收到发送到某个主题的消息。

如果消费者处于离线状态,生产者发送的主题消息将会丢失作废,消费者永远不会收到。

一句话:先要订阅注册才能接收到发布的消息,只给订阅者发布消息

持久订阅

当客户端首先向MQ注册一个自己的身份ID识别号,当这个客户端处于离线时,生产者会为这个ID保存所有发送到主题的消息,当客户端再次连接到MQ时会根据消费者的ID得到索引当自己处于离线时发送到主题的消息。

非持久订阅状态下,不能恢复或重新派送一个未签收的消息。

持久订阅才能恢复或重新派送一个未签收的消息。

用哪个

当所有消息必须被接收,则用持久订阅。当丢失消息能够被容忍,则用非持久订阅

ActiveMQ的Broker

spring整合ActiveMQ

ActiveMQ的传输协议

ActiveMQ的消息存储和持久化

高级特性和大厂常考重点