[TOC]
本文我们使用ActiveMQ实现简单的点对点的消息模型。
一、开发环境
这里我使用的是 apache-activemq-5.11.1 可以去官网下载
二、新建java项目
其中activemq-all-5.11.1.jar在下载下来的 apache-activemq-5.11.1 中就有,直接复制过来导入项目即可。
三、具体实现
JMSProducer:消息的生产者
JMSConsumer:消息的消费者
大致步骤:
(1)创建连接工厂
(2)使用连接工厂创建一个连接
(3)启动连接
(4)使用连接创建一个会话
(5)使用会话创建一个队列/主题
(6)使用会话创建一个生产者/消费者
(7)使用会话创建一个消息/对象/集合/文件/字节
(8)使用生产者/消费者 发送/获取 消息
1、编写生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87
| import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class JMSProducer { private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; private static final String URL = ActiveMQConnection.DEFAULT_BROKER_URL;
public static void main(String[] args){ ConnectionFactory connectionFactory; Connection connection = null; Session session; Destination destination; MessageProducer messageProducer;
try { connectionFactory = new ActiveMQConnectionFactory(JMSConsumer.USERNAME,JMSConsumer.PASSWORD,JMSConsumer.URL); connection = connectionFactory.createConnection(); connection.start(); session = connection.createSession(true,Session.AUTO_ACKNOWLEDGE); destination = session.createQueue("HelloWorld"); messageProducer = session.createProducer(destination);
sendMessage(session,messageProducer); session.commit();
} catch (JMSException e) { e.printStackTrace(); }finally { if (connection!=null){ try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } }
}
public static void sendMessage(Session session,MessageProducer messageProducer){ try { TextMessage textMessage = session.createTextMessage("你好,世界!"); messageProducer.send(textMessage); System.out.println("已发送消息:"+textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } }
}
|
2、编写消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80
| import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class JMSConsumer { private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; private static final String URL = ActiveMQConnection.DEFAULT_BROKER_URL;
public static void main(String[] args){ ConnectionFactory connectionFactory; Connection connection = null; Session session; Destination destination; MessageConsumer messageConsumer;
try { connectionFactory = new ActiveMQConnectionFactory(JMSConsumer.USERNAME,JMSConsumer.PASSWORD,JMSConsumer.URL); connection = connectionFactory.createConnection(); connection.start(); session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE); destination = session.createQueue("HelloWorld"); messageConsumer = session.createConsumer(destination);
messageConsumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { try { String text = ((TextMessage)message).getText(); System.out.println("收到的消息是:" +text ); } catch (JMSException e) { e.printStackTrace(); } } }); } catch (JMSException e) { e.printStackTrace(); } } }
|
3、session与事务处理
session = connection.createSession(true,Session.AUTO_ACKNOWLEDGE);
第一个参数
要使用事务处理,必须通过作为第一个参数设置为true来创建一个事务处理会话
事务处理允许您将整个系列的传入和传出消息分组在一起,并将它们视为原子单元。消息代理跟踪事务的各个消息的状态,但在您提交事务之前不会完成它们的传送。在发生故障时,您可以回滚事务,取消其所有消息并从头开始重新启动整个系列。
事务处理会话总是只有一个打开的事务,包含自会话创建或前一个事务完成以来发送或接收的所有消息。提交或回滚事务会结束该事务并自动开始另一个事务。
当交易中的所有消息都已成功交付时,您可以调用会话的commit方法来提交交易
session.commit();
所有会话的传入消息都会被确认,并且所有传出的消息都将被发送。交易被视为完成,并开始新的交易。
发送或接收操作失败时,会引发异常。虽然可以通过忽略或重试操作来处理异常,但建议您使用会话的rollback方法回退事务:
session.rollback();
第二个参数:
值可为Session.AUTO_ACKNOWLEDGE,Session.CLIENT_ACKNOWLEDGE,DUPS_OK_ACKNOWLEDGE其中一个。
Session.AUTO_ACKNOWLEDGE为自动确认,客户端发送和接收消息不需要做额外的工作。哪怕是接收端发生异常,也会被当作正常发送成功。
Session.CLIENT_ACKNOWLEDGE为客户端确认。客户端接收到消息后,必须调用javax.jms.Message的acknowledge方法。jms服务器才会当作发送成功,并删除消息。
DUPS_OK_ACKNOWLEDGE允许副本的确认模式。一旦接收方应用程序的方法调用从处理消息处返回,会话对象就会确认消息的接收;而且允许重复确认。
4、连接工厂 ConnectionFactory
connectionFactory = new ActiveMQConnectionFactory(JMSConsumer.USERNAME,JMSConsumer.PASSWORD,JMSConsumer.URL);
activemq默认是不需要密码,生产消费者就可以连接的
实例化连接工厂时我们可以去掉用户名与密码,同样可以连接到ActiveMQ
如果ActiveMQ是部署在你本地的,则默认的用户名为admin,默认密码也为admin,地址为:tcp://localhost:61616 我们可以直接从ActiveMQConnection 中 取得默认的用户名,密码与地址
5、关于同步与异步
同步
订阅者或接收者调用receive方法来接收消息,receive方法在能够接收到消息之前(或超时之前)将一直阻塞
异步
订阅者或接收者可以注册为一个消息监听器。当消息到达之后,系统自动调用监听器的onMessage方法。上面消费者的代码中使用的是这种方法
四、运行
1、启动ActiveMQ
如果你的64bit机器,则在这个目录下打开Activemq.bat
32bit机器可以执行 bin\win32\activemq.bat
打开后出现以下窗口后,表示启动成功,这个窗口不能关闭
打开ActiveMQ的管理界面:http://127.0.0.1:8161/admin/
分别点击队列与主题可以查看发送到ActiveMQ的队列消息或主题
2、运行生产者
ActiveMQ 启动好后,我们就可以运行生产者向ActiveMQ发送消息了
运行结束后,我们查看一下ActiveMQ的队列
表示ActiveMQ自动创建一个名为”HelloWorld“的队列,队列已经收到了一条消息,暂时没有消费者,我们也可以在ActiveMQ中对消息进行浏览删除等操作
3、运行消费者
这里我们使用异步接收,生产者再次发送消息时同样可以接收到。
如果使用同步接收,在规定的时间超时后会程序停止,关闭连接。
看看ActiveMQ中的队列发生的变化:
可以看到待处理消息已为0,出队消息加1,有一个消费者在线。
官方文档参考: https://docs.oracle.com/cd/E26576_01/doc.312/e24945/toc.htm