# 消息中间件

back2专题

消息中间件

属于分布式系统中的一个子系统,关注于数据的发送和接收,利用高效可靠的消息传递机制对分布式系统中的其余各个子系统经进行集成

  • ActiveMQ 的社区算是比较成熟,但是较目前来说,ActiveMQ 的性能比较差,而且版本迭代很慢,不推荐使用。
  • RabbitMQ 在吞吐量方面虽然稍逊于 Kafka 和 RocketMQ ,但是由于它基于 erlang 开发,所以并发能力很强,性能极其好,延时很低,达到微秒级。但是也因为 RabbitMQ 基于 erlang 开发,所以国内很少有公司有实力做erlang源码级别的研究和定制。如果业务场景对并发量要求不是太高(十万级、百万级),那这四种消息队列中,RabbitMQ 一定是你的首选。如果是大数据领域的实时计算、日志采集等场景,用 Kafka 是业内标准的,绝对没问题,社区活跃度很高,绝对不会黄,何况几乎是全世界这个领域的事实性规范。
  • RocketMQ 阿里出品,Java 系开源项目,源代码我们可以直接阅读,然后可以定制自己公司的MQ,并且 RocketMQ 有阿里巴巴的实际业务场景的实战考验。RocketMQ 社区活跃度相对较为一般,不过也还可以,文档相对来说简单一些。还有就是阿里出台的技术,你得应对这个技术万一被抛弃,社区黄掉的风险,如果你们公司有技术实力我觉得用RocketMQ 挺好的
  • Kafka 的特点其实很明显,就是仅仅提供较少的核心功能,但是提供超高的吞吐量,ms 级的延迟,极高的可用性以及可靠性,而且分布式可以任意扩展。同时 Kafka 最好是支撑较少的 topic 数量即可,保证其超高吞吐量。Kafka 唯一的一点劣势是有可能消息重复消费,那么对数据准确性会造成极其轻微的影响,在大数据领域中以及日志采集中,这点轻微影响可以忽略。Kafka天然适合大数据实时计算以及日志收集。
RocketMQ KAFKA

# ActiveMQ

相关参数

failover:(tcp://192.168.25.234:61618)?timeout=10000&maxReconnectAttempts=3

Option Name Default Value Description
initialReconnectDelay 10 第一次重连的时间间隔(毫秒)
maxReconnectDelay 30000 最长重连的时间间隔(毫秒)
useExponentialBackOff true 重连时间间隔是否以指数形式增长
backOffMultiplier 2.0 指数的值
maxReconnectAttempts -1 >= AMQ v5.6 0 < AMQ v5.6, 自版本5.6起:-1为默认值,代表不限重试次数;0代表从不重试(只尝试连接一次,并不重连),5.6以前的版本:0为默认值,代表不限重试次数,所有版本:如果设置为大于0的数,代表最大重试次数
startupMaxReconnectAttempts 0 初始化时的最大重连次数。一旦连接上,将使用 maxReconnectAttempts的配置
randomize true 使用随机链接(注:达到负载均衡的目的)
backup false 提前初始化一个未使用连接,以便进行快速失败转移
timeout -1 设置发送操作的总计最大超时时间(毫秒)
trackMessages false 在重连过程中缓存消息
maxCacheSize 131072 缓存的最大字节数
updateURIsSupported true 设定是否可以动态修改broker uri(自版本5.4起)

# ActiveMQ是啥

back

ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。

# 消息传递方式介绍

①.Activemq支持两种方式的消息传递:

  • 广播模式:1-n的方式,是一种发布订阅模式,像腾讯新闻那样,只要我们微信关注了腾讯新闻,那么每个人都会收到推送的新闻
  • 队列模式:1-1的方式,只能有一个消费者端消费生产者生产的数据

②.消息类型介绍:

  • Activemq提供了两种消息类型:持久化和非持久化:
  • 消息生产者使用持久(persistent)传递模式发送消息的时候,Producer.send() 方法会被阻塞,直到 broker 发送一个确认消息给生产者(ProducerAck),这个确认消息暗示broker已经成功接收到消息并把消息保存到二级存储中。
    这个过程通常称为同步发送。速度较慢,数据基本不会丢失.可以持久化到kahaDB(aMq默认采用kahaDB存储引擎来存储消息)或数据库中

异步发送不会在受到 broker 的确认之前一直阻塞 Producer.send 方法,速度较快,不过可能会造成数据的丢失. 消息签收方式:

名称 说明
AUTO_ACKNOWLEDGE 自动确认
CLIENT_ACKNOWLEDGE 客户端手动确认
DUPS_OK_ACKNOWLEDGE 动批量确认
SESSION_TRANSACTED 事务提交并确认

下面我以一个程序例子来展现,ActiveMQ这里我安装在win上,具体的安装教程这里就不介绍了。

# ActiveMQ的1-1模式下的发送者

back

/**
     *  连接地址
     */
    private static final String URL = "tcp://localhost:61616";
    /**
     * 创建的队列名
     */
    private static final String queueName = "queue-test";

    public static void main(String[] args) throws JMSException {
        // 1. 获取连接工厂对象
        ConnectionFactory factory = new ActiveMQConnectionFactory(URL);
                // 2. 获取连接对象
        Connection connection = factory.createConnection();
                // 3. 启动连接
        connection.start();
                // 4. 创建会话 第一个参数代表的是是否启动事务 第二个参数表带的是接受的机制
        Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
                // 5. 创建目的地
        Destination destination = session.createQueue(queueName);
                // 6. 创建对应的生产者
        MessageProducer producer = session.createProducer(destination);
                for (int i = 0 ; i < 100; i ++){
            // 7. 创建消息对象
            TextMessage textMessage = session.createTextMessage("消息"+i);
            producer.send(textMessage);
                    System.out.println("发送:"+textMessage.getText());
        }
                // 7. 关闭对应的资源
        connection.close();
    }

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

# ActiveMQ的1-1模式下的接收者

back

public class AppReceiver {
    /**
     *  连接地址
     */
    private static final String URL = "tcp://localhost:61616";
    /**
     * 创建的队列名
     */
    private static final String queueName = "queue-test";

    public static void main(String[] args) throws JMSException {
        // 1. 获取连接工厂对象
        ConnectionFactory factory = new ActiveMQConnectionFactory(URL);

        // 2. 获取连接对象
        Connection connection = factory.createConnection();

        // 3. 启动连接
        connection.start();

        // 4. 创建会话 第一个参数代表的是是否启动事务 第二个参数表带的是接受的机制
        Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);

        // 5. 创建目的地
        Destination destination = session.createQueue(queueName);

        // 6. 创建对应的接收者
        MessageConsumer consumer = session.createConsumer(destination);

        // 7. 添加对应的监听器,回调得到对应的消息
        consumer.setMessageListener(new MessageListener() {
            public void onMessage(Message message) {
                TextMessage textMessage = (TextMessage) message;
                try {
                    System.out.println("接收到:"+textMessage.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# 海康事件订阅

back

@PostConstruct
private void initialize() {
    InputStream is = null;
    try {
        if (!"1".equals(customerProperties.getHikEnableAlarm())) {
            return;
        }
        HikUtil hikUtil = new HikUtil(customerProperties);
//String eventTypes = HikEventType.videoBlindAlarm.getIndex()+","+HikEventType.videoMoveAlarm.getIndex();//移动侦测、视频遮挡
        Map<String, String> map = hikUtil.getMqInfo(hikUtil.getDefaultUserUuid(), customerProperties.getHikEvents());

        log.info("开始连接>>>>>>>>>>> ");
        String brokerURL = customerProperties.getBrokerUrl().replaceAll(HikInterfaceConstStr.mqURL.getIndex(), map.get(HikInterfaceConstStr.mqURL.getIndex()));
        log.info("mq链接地址:"+brokerURL);
        connectionFactory = new ActiveMQConnectionFactory(brokerURL);
        connection = connectionFactory.createConnection();
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        connection.start();

        Topic subTopic = session.createTopic(map.get(HikInterfaceConstStr.destination.getIndex()));
        MessageConsumer consumer = session.createConsumer(subTopic);
        consumer.setMessageListener(this);
        log.info("海康设备告警监听开启成功");

    } catch (JMSException e) {
        e.printStackTrace();
    } catch (Exception e) {
        e.printStackTrace();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

# ActiveMQ的1-n模式下的发布者

back

public class AppProducer {
    /**
     *  连接地址
     */
    private static final String URL = "tcp://localhost:61616";
    /**
     * 创建的队列名
     */
    private static final String topicName = "queue-test";

    public static void main(String[] args) throws JMSException {
        // 1. 获取连接工厂对象
        ConnectionFactory factory = new ActiveMQConnectionFactory(URL);

        // 2. 获取连接对象
        Connection connection = factory.createConnection();

        // 3. 启动连接
        connection.start();

        // 4. 创建会话 第一个参数代表的是是否启动事务 第二个参数表带的是接受的机制
        Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);

        // 5. 创建目的地
        Destination destination = session.createTopic(topicName);

        // 6. 创建对应的生产者
        MessageProducer producer = session.createProducer(destination);

        for (int i = 0 ; i < 100; i ++){
            // 7. 创建消息对象
            TextMessage textMessage = session.createTextMessage("消息"+i);
            producer.send(textMessage);

            System.out.println("发送:"+textMessage.getText());
        }

        // 7. 关闭对应的资源
        connection.close();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41

# ActiveMQ的1-n模式下的订阅者

back

public class AppReceiver {
    /**
     *  连接地址
     */
    private static final String URL = "tcp://localhost:61616";
    /**
     * 创建的队列名
     */
    private static final String topicName = "queue-test";

    public static void main(String[] args) throws JMSException {
        // 1. 获取连接工厂对象
        ConnectionFactory factory = new ActiveMQConnectionFactory(URL);

        // 2. 获取连接对象
        Connection connection = factory.createConnection();

        // 3. 启动连接
        connection.start();

        // 4. 创建会话 第一个参数代表的是是否启动事务 第二个参数表带的是接受的机制
        Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);

        // 5. 创建目的地
        Destination destination = session.createTopic(topicName);

        // 6. 创建对应的接收者
        MessageConsumer consumer = session.createConsumer(destination);

        // 7. 添加对应的监听器,回调得到对应的消息
        consumer.setMessageListener(new MessageListener() {
            public void onMessage(Message message) {
                TextMessage textMessage = (TextMessage) message;
                try {
                    System.out.println("接收到:"+textMessage.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42

注意:ActiveMQ中的1-n模式下,订阅者无法接收到发布之前的发布的消息,也就是说订阅者必须提前预定主题才能够接收到对应的消息