月眸


ActiveMQ学习七:AvtiveMQ高级特性

毛毛小妖 2019-08-07 258浏览 0条评论
首页/ 正文
分享到: / / / /

最后,来学习一下ActiveMQ的高级特性吧。

一、异步投递Async Sends

首先可以去官网看下介绍:https://activemq.apache.org/async-sends。对于一个slow consumer,使用同步发送消息可能出现producer堵塞等情况,因此这种情况适合使用异步发送。那么这个异步投递到底是个什么东东呢?

ActiveMQ支持同步和异步两种发送模式将消息发送到Broker,模式的选择对发送延时有巨大的影响。使用异步发送能显著提高发送的性能。

ActiveMQ默认使用异步发送模式:除非明确指定使用同步发送的方式或者在未使用事务的前提下发送持久化消息,这两种情况都是同步发送的。

异步发送带来了性能上的提升,但是会消耗较多的client端内存也会导致Broker端性能增加,消息可能会有概率丢失。

如何来配置异步发送呢?可以参考官网的配置说明:

还有一个问题,异步发送如何确定消息发送成功呢?

我们先来看一下异步发送丢失消息的场景:生产者设置useAsyncSend=true,使用producer.send(msg)持续发送消息。由于消息不阻塞,生产者会认为所有的消息均被成功发送至MQ,如果MQ突然宕机,此时生产者内存中尚未被发送的消息都将会丢失。所以正确的异步发送方式是需要接收回调的。所以,同步发送和异步发送的区别就在这里:

同步发送等send不阻塞了就表示发送成功了,异步发送需要接受回执并由客户端再判断一次是否发送成功。

代码如下:

二、延迟投递和定时投递

首先可以去官网看一下:https://activemq.apache.org/delay-and-schedule-message-delivery。官网上给出了配置延迟投递的四大属性:

Property name type description
AMQ_SCHEDULED_DELAY long 延迟投递的时间
AMQ_SCHEDULED_PERIOD long 重复投递的时间间隔
AMQ_SCHEDULED_REPEAT int 重复投递次数
AMQ_SCHEDULED_CRON String Cron表达式

下面来看看代码是怎么实现的吧:

首先得在activemq.xml中配置schedulerSupport属性为true

然后代码中要这样设置:

三、ActiveMQ消费重试机制

a.具体哪些情况会引发消息重发呢?

1:client用了transactions且在session中调用了rollback()

2:client用了transactions且在调用commit()之前关闭或者没有commit()

3:client在签收模式下调用了recover()

b.消息重发的间隔和次数

间隔:1s;次数:6次

c.有毒消息Poison ACK的理解

一个消息被重发6次,消费端会给MQ发送一个“Poison ack”表示这个消息有毒,告诉Broker不要再发了。这个时候broker会把这个消息放到DLQ(死信队列)。

当然了,消息重发的间隔和次数我们也可以自己设置,分别为:maximumRedeliveries和initialRedeliveryDelay

如果使用的是spring,那么可以这样配置:

四、死信队列

a.那么什么是死信队列呢?

ActiveMQ中引入了死信队列这个概念,即一条消息被重发了多次后,将会被MQ移入到DLQ中,开发人员可以在这个队列中查看出错的消息,进行人工处理。

b.死信队列的使用

一般生产环境在使用MQ的时候设计两个队列:一个核心业务队列,一个是死信队列

核心业务队列:比如上图用来让订单系统发送订单消息的,另一个死信队列就死用来存储异常情况的

c.死信队列的配置

死信队列有两种配置,分别是:

1>SharedDeadLetterStrategy

将所有的DeadLetter保存在一个共享队列中,这是AciveMQ broker端默认策略,共享队列默认为“ActiveMQ.DLQ”,可通过“deadLetterQueue”属性来设置。

<deadLetterstrategy>
    <sharedDeadLetterStrategy deadLetterQueue="DLQ-QUEUE" />
</deadLetterstrategy>

2>IndividualDeadLetterStrategy

将DeadLetter放入各自的死信通道中

d.注意的问题

有时候需要直接删除过期消息而不需要发送到死信队列中,“processExpired”表示是否将过期消息放入死信队列,默认为true

默认情况下,ActiveMQ不会把非持久的消息放到死信队列中。“processNonPersistence”表示将“非持久化”消息放入死信队列中,默认为false。如果你想把非持久的消息放到死信队列中,需要将processNonPersistence设置为true

五、如何保证消息不被重复消费,以及幂等性问题

网络延迟传输中,会造成进行MQ重试中,在重试过程中,可能会造成重复消费。

如果消息是做数据库的插入操作,给这个消息做一个唯一主键,就算重复消费也会出现主键冲突,避免数据库出现脏数据。

或者准备一个第三方服务来做消费记录。以redis为例,给消息分配一个全局id,只要消费过该消息,将<id,message>以K-V形式写入redis。消费者消费前先去redis查询有没有消费即可。

最后修改:2019-08-07 09:45:25 © 著作权归作者所有
如果觉得我的文章对你有用,请随意赞赏
扫一扫支付

上一篇

发表评论

评论列表

还没有人评论哦~赶快抢占沙发吧~