ActiveMQ 消息持久化机制:
ActiveMQ 消息的持久化机制有 JDBC、AMQ、KahaDB 和 LevelDB,其中本示例版本(5.15.2)默认机制为 KahaDB。无论哪种持久化机制, 消息的存储逻辑都是一致的。
即消息生产者将消息发送之后, 消息中心首先将消息存储到本地数据文件(AMQ/KahaDB),内存数据库(LevelDB)或远程数据库(JDBC)等,然后试图将消息发送给消费者,发送成功将消息从存储中删除,失败则继续尝试。
ActiveMQ启动以后首先会检查指定的持久化机制指定的存储位置,若有未发送的消息,将消息发送出去。
JDBC持久化 MySQL:
配置:
在conf/activemq.xml中persistenceAdapter节点配置jdbcPersistenceAdapter子节点,用于定义持久化方式,在此子节点中定义数据源属性(dataSource="#mysql-ds")指向数据源bean节点。
在beans主节点中定义数据源bean节点,id属性定义为持久化方式中的dataSource属性值。
12 3 4
定义数据源bean节点:
1 23 4 5 6 7 8
不可以在数据源中定义<property name="maxActive" value="200"/>属性,此版本activeMQ会抛出异常。
jvm 1 | WARN | Exception encountered during context initialization - cancelling refresh attempt: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'org.apache.activemq.xbean.XBeanBrokerService#0' defined in class path resource [activemq.xml]: Cannot create inner bean '(inner bean)#29403e43' of type [org.apache.activemq.store.jdbc.JDBCPersistenceAdapter] while setting bean property 'persistenceAdapter'; nested exception is org.springframework.beans.factory.BeanCreationException: Error creating bean with name '(inner bean)#29403e43' defined in class path resource [activemq.xml]: Cannot resolve reference to bean 'mysql-ds' while setting bean property 'dataSource'; nested exception is org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'mysql-ds' defined in class path resource [activemq.xml]: Error setting property values; nested exception is org.springframework.beans.NotWritablePropertyException: Invalid property 'maxActive' of bean class [org.apache.commons.dbcp2.BasicDataSource]: Bean property 'maxActive' is not writable or has an invalid setter method. Does the parameter type of the setter match the return type of the getter?
启动成功后,会在数据库中自动创建3个表:activemq_msgs,activemq_acks和activemq_lock(集群环境下才有用)。