月眸


ActiveMQ学习五:AvtiveMQ的消息存储和持久化

毛毛小妖 2019-08-01 262浏览 0条评论
首页/ 正文
分享到: / / / /

这里所说的持久化和之前说的持久是不一样的。之前我们说过保证MQ可靠性的时候,说过三个方面:

1,持久; 2,事务 ;3,签收

今天要说的持久化,也是为了保证MQ高可用的。但是他们是有区别的,前三个特性是MQ自带的,这里所说的持久化是指借助外力作用来保证的,也就是说存在一种机制,可以将消息同步到文件系统中,mysql或者oracle中。

一、官网介绍

https://activemq.apache.org/persistence

二、ActiveMQ持久化机制是什么?

总结就是:MQ宕机了,消息不会丢失的方案。为了避免意外宕机以后丢失消息,需要做到重启后可以恢复消息队列,消息系统一般都会采用持久化机制,ActiveMQ的消息持久化机制有JDBC、AMQ、KahaDB和LevelDB,无论采取哪一种持久化机制,消息的存储逻辑都是一致的。就是在消息发送出去之后,消息中心首先将消息存储到本地数据文件、内存数据库或者远程数据库等再试图将消息发送给接收者,成功则将消息从存储中删除,失败则继续尝试发送。消息中心启动以后首先要检查指定的存储位置,如果有未发送成功的消息,则需要把消息发送出去。

三、ActiveMQ持久化机制有哪些?

1、AMQ(了解)

基于文件的存储方式,现在不用了

2、kahaDB(默认)

基于日志文件,从5.4版本以后开始默认的持久化插件。我们可以在activeMQ.xml文件中看到默认配置的持久化插件。

<persistenceAdapter>
    <kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>

    a.kahaDB是目前默认的持久化存储方式,可用于任何场景,提高了性能和恢复能力。

    b.消息存储使用一个事务日志和一个索引文件来存储他所有的地址。

    c,kahaDB是一个专门针对消息持久化的解决方案,他对典型的消息使用模式进行了优化。

    d,数据被追击到data logs 当中。当不再需要log文件,log文件会被丢弃。

打开activeMQ安装位置的data/kahaDB目录可以看到如下文件:

kahaDB在消息保存目录中有四个文件和一个lock。图中少了一个db.free文件,这是因为我本机只是起了MQ,没有做其他操作。

1>db-[number].log

存储消息到预定义大小的数据记录文件中,当数据文件满了则会创建新的文件,number数字也会随之递增,它随着消息的增多而增加。当不再有引用数据文件的任何消息时,文件会被删除或归档。

2>db.data

该文件包含了持久化的BTree索引,索引了消息数据记录中的消息,他是消息的索引文件,本质上是BTree,使用BTree作为索引指向db-[number].log文件存储的消息。

3>db.free

当前db.data文件里面哪些页面是空的,文件具体内容是所有空闲页的ID

4>db.redo

用来进行消息恢复,如果KahaDB消息存储再强制退出后启动,用于恢复BTree索引。

3、LevelDB(了解)

这种文件系统是从5.8版本引入的,他和KahaDB非常相似,也是基于文件的本地数据库存储形式,但是它提供比KahaDB更快的持久性。它不使用BTree索引,而是使用基于LevalDB的索引,默认配置如下:

<persistenceAdapter>
    <levelDB directory = "activemq-data"/>
</persistenceAdapter>

4、JDBC

这个比较重要,标题4专门来说明,因为是用的比较多的

5、JDBC Message store with ActiveMQ Journey

这个比较重要,标题4专门来说明,因为是用的比较多的

四、持久化之JDBC方式和JDBC加强版

(一),JDBC消息存储:

1,MQ+Mysql

MQ和mysql的关系如下,mysql只是MQ持久化方案的一种:

 

2,添加mysql驱动包到lib文件夹

3,jdbcPersistenceAdapter配置

修改activemq.xml,按照如下修改

修改前KahaDB 修改后jdbcPersistenceAdapter
<persistenceAdapter>
    <KahaDB directory = "${activemq.data}/kahadb"/>
</persistenceAdapter>
<persistenceAdapter>
    <jdbcPersistenceAdapter dataSource="#mysql-ds"/>
</persistenceAdapter>
 

datasource:指定将要引用的持久化数据库的bean名称,

createTableOnStartup:是否在启动的时候创建数据表,默认是true,这样每次启动MQ时都会创建数据表,一般是第一次启动设置为true之后改成false

4,数据库连接池配置

需要在broker标签外设置bean

5,建仓sql和建表说明

建一个名为activemq的数据库,数据库名称按照自己配置的修改。如果上述配置和库都建好了,重启MQ服务器,就会自动在数据库中生成三张表。建表语句如果指定编码为utf-8则会创建activemq_acks表失败。可以指定为latin1:

mysql> create databases activemq default character set latin1;

activemq_msgs表

存放queue和topic的消息

activemq_acks表

存放topic持久订阅的信息

activemq_lock表

在集群环境下才有用,只有一个Broker可以获得消息,成为master broker,其他的只能作为备份等待master broker不可用时,才可能成为下一个master broker。这个表用来记录那个Broker是当前的master broker

6,代码验证

注意,一定要开启持久化或者spring配置持久化:producer.setDeliveryMode(DeliveryMode.PERSISTENT);

在点对点类型中:

    a.当DeliveryMode设置为NON_PERSISTENCE时,消息保存在内存中;

    b.当DeliveryMode设置为PERSISTENCE时,消息保存在broker相应的文件或者数据库中;

    c.而且点对点类型的消息一旦被消费就从broker中删除。

在发布订阅类型中:

    a.要先运行消费者,再运行生产者

7,数据库情况

如果是queue类型的消息:

    在没有消费者消费的情况下会将消息保存到activemq_msgs表中,只要有任意一个消费者消费了,这些消息将会立即被删除

如果是topic类型的消息:

    一般是先启动消费者订阅然后再生产的情况下会将消息保存到activemq_acks

8,开发中的坑

在配置关系型数据库作为ActiveMQ的持久化存储方案时,要注意以下三点:

    a,记得将需要使用到的相关jar包放到MQ安装目录的lib下,默认连接池使用的是dbcp,如果要使用c3p0等其他连接池,也要加入相关jar包
    b,createTablesOnStartup属性默认为true,当第一次启动MQ时,会自动在数据库中创建表。启动完成之后可以设置为false
    c,下划线坑爹,启动时报异常,则可能是操作系统名字中有“_”。请更改机器名并且重启。

(二),JDBC加强版消息存储:

这种策略叫带有高速缓存的JDBC解决方案,相当于在mysql之前加了一个高速缓存。这种方法克服了JDBC store的不足,JDBC每次请求过来,都需要去写库和读库。

ActiveMQ Journey使用高速缓存写入技术,大大提高了性能。

当消费者的消费速度能够及时跟上生产者生产速度时,journey文件能够大大减少需要写入DB的消息。

举个例子:

生产者生产1000条消息,这1000条消息会保存到journey文件中,如果消费者消费速度很快,在journey文件还没有同步到DB之前,消费者已经消费了90%的消息,那么这个时候只需要同步剩下的10%消息到DB。如果消费者消费的速度很慢,这时journey文件可以使消息以批量方式写入DB。

1.配置

修改前PersistenceAdapter 修改后jdbcPersistenceAdapter
<persistenceAdapter>
    <jdbcPersistenceAdapter dataSource="#mysql-ds"/>
</persistenceAdapter>
<persistenceFactory>
      <journalPersistenceAdapterFactory journalLogFiles="5" dataDirectory="activemq-data"  dataSource="#mysql-ds" useJournel="true" useQuickJournel="true"/>
</persistenceFactory>

其他的就跟JDBC存储方式操作一致,还是熟悉的配方,还是熟悉的味道。

五、总结

持久化消息主要是指:

MQ所在的服务器宕机了,消息不会丢失的机制。

持久化机制演化过程:

从最初的AMQ方案到V4版本推出的高性能事务支持插件,并且同步推出了关于关系型数据库的方案。V5.3版本又推出了对KahaDB的支持,后来V5.8又支持levelDB,到现在提供了标准版的Zookeeper+LevelDB集群化方案。

ActiveMQ持久化的机制有:

AMQ:基于日志文件

KahaDB:基于日志文件,V5.4以后默认支持的持久化插件

JDBC:基于第三方数据库

LevelDB:基于文件的本地数据库存储

Replicated LevelDB Store:基于Zookeeper+LevelDB的数据复制方式,用于master-slave方式的首选数据复制方案。

无论使用哪一种诗酒花方式,消息的存储逻辑都是一致的。

最后修改:2019-08-01 14:07:48 © 著作权归作者所有
如果觉得我的文章对你有用,请随意赞赏
扫一扫支付

上一篇

发表评论

说点什么吧~

评论列表

还没有人评论哦~赶快抢占沙发吧~