javamq取消息代码 java mq消息队列详解

java如何获取rabbitmq队列中消息数量

下面是RabbitMQ的消息确认机制:“为了确保消息不会丢失,RabbitMQ支持消息确认机制。客户端在接受到消息并处理完后,可以发送一个ack消息给RabbitMQ,告诉它该消息可以安全的删除了。假如客户端在发送ack之前意外死掉了,那么RabbitMQ会将消息投递到下一个consumer客户端。如果有多个consumer客户端,RabbitMQ在投递消息时是轮询的。RabbitMQ如何判断客户端死掉了?唯一根据是客户端连接是否断开。这里没有超时机制,也就是说客户端可以处理一个消息很长时间,只要没断开连接,RabbitMQ就一直等待ack消息。”我现在遇到的问题是这样的:我这边有几条线程去消息队列里取数据,但是会有异常数据导致线程挂掉,就是上边的“客户端在发送ack之前意外死掉了”,RabbitMQ会将消息投递到下一个consumer客户端,这样一条异常数据会把我的所有线程挂掉,我现在想实现这样的功能:如果有异常数据导致进程挂掉,那么我不让RabbitMQ将这条消息投递到下一个consumer客户端,而是放到另一个地方或者另外处理,请问该如何实现呢?

成都创新互联公司专注于中大型企业的成都网站设计、做网站和网站改版、网站营销服务,追求商业策划与数据分析、创意艺术与技术开发的融合,累计客户上千,服务满意度达97%。帮助广大客户顺利对接上互联网浪潮,准确优选出符合自己需要的互联网运用,我们将一直专注品牌网站建设和互联网程序开发,在前进的路上,与客户一起成长!

java使用mq get api从mq中取数据怎样触发侦听器连续取数据

{

//前面是准备管理器和队列

MQQueueManager qMgr = new MQQueueManager(qManager);

int openOptions = MQConstants.MQOO_INPUT_AS_Q_DEF | MQConstants.MQOO_OUTPUT | MQConstants.MQOO_INQUIRE;

MQQueue queue = qMgr.accessQueue(qName, openOptions);

MQMessage rcvMessage = new MQMessage();

MQGetMessageOptions gmo = new MQGetMessageOptions();

gmo.options = gmo.options + MQConstants.MQGMO_WAIT + MQConstants.MQGMO_SYNCPOINT;

//读取五秒超时,这里目的是要有个读取阻塞,和Socket编程类似。

gmo.waitInterval = 5000;

queue.get(rcvMessage, gmo);

//后面就是操作消息的部分【略】

}catch(Exception e){{

//前面是准备管理器和队列

MQQueueManager qMgr = new MQQueueManager(qManager);

int openOptions = MQConstants.MQOO_INPUT_AS_Q_DEF | MQConstants.MQOO_OUTPUT | MQConstants.MQOO_INQUIRE;

MQQueue queue = qMgr.accessQueue(qName, openOptions);

MQMessage rcvMessage = new MQMessage();

MQGetMessageOptions gmo = new MQGetMessageOptions();

gmo.options = gmo.options + MQConstants.MQGMO_WAIT + MQConstants.MQGMO_SYNCPOINT;

//读取五秒超时,这里目的是要有个读取阻塞,和Socket编程类似。

gmo.waitInterval = 5000;

queue.get(rcvMessage, gmo);

//后面就是操作消息的部分【略】

}catch(Exception e){

java怎么将mq接收的文件消息提取出来

WebSphere MQ 接收发送

添加mq jar

类介绍:

SendMSG:消息发送类。

Main():主方法。

SendMSG():消息发送方法。

方法描述:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

package test;

public class SendMSG{

MQEnvironment.hostname = "192.168.10.201";

//通道类型为服务器连接通道

MQEnvironment.channel = "tongdao";

MQEnvironment.CCSID = 1381;

//消息队列端口号

MQEnvironment.port = 10618;

try{

//建立队列管理器QM_SERVER为队列管理器名称

MQQueueManager qMgr = new MQQueueManager("test");

int openOptions = MQC.MQOO_INPUT_AS_Q_DEF|MQC.MQOO_OUTPUTMQC.MQOO_INQUIRE;//建立队列INITQ队列名称INITQ为本地队列

MQQueue queue = qMgr.accessQueue("wanghui",openOptions,null,null,null);

System.out.println("成功建立通道");

MQMessage message = new MQMessage();

message.format = MQC.MQFMT_STRING;

message.characterSet = 1381;

message.writeString("王辉");

message.expiry = -1;//设置消息用不过期

queue.put(message);//将消息放入队列

queue.close();//关闭队列

qMgr.disconnect();//断开连接

}catch(EOFExceptione){

e.printStackTrace();

}catch(MQExceptione){

e.printStackTrace();

}catch(Exceptione){

e.printStackTrace();

}

}

ReceiveMSG:消息接收类。

Main():主方法。

ReceiveMSG():消息接收方法。

public class ReceiveMSG {

MQEnvironment.hostname="192.168.10.201";//通道类型为服务器连接通道

MQEnvironment.channel="tongdao";

MQEnvironment.CCSID=1381;

MQEnvironment.port=10618;

try{

//建立队列管理器QM_SERVER为队列管理器名称

MQQueueManager qMgr = new MQQueueManager("test");

int openOptions=MQC.MQOO_INPUT_AS_Q_DEF|MQC.MQOO_OUTPUT|MQC.MQOO_INQUIRE;//建立队列INITQ队列名称INITQ为本地队列

MQQueue queue=qMgr.accessQueue("wanghui",openOptions,null,null,null);

System.out.println("成功建立通道");

MQMessage message= new MQMessage();

message.format=MQC.MQFMT_STRING;

message.characterSet=1381;

//从队列中获取消息

MQGetMessage Optionspmo=new MQGetMessageOptions();

queue.get(message,pmo);

Stringchars=message.readLine();

System.out.println(chars);

queue.close();//关闭队列

qMgr.disconnect();//断开连接

}catch(EOFExceptione){

e.printStackTrace();

}catch(MQExceptione){

e.printStackTrace();

}catch(Exceptione){

e.printStackTrace();

}

}

用java代码如何设置activemq消息持久化到数据库中?

ActiveMQ持久化消息的二种方式;

1、持久化为文件

这个装ActiveMQ时默认就是这种,只要设置消息为持久化就可以了。涉及到的配置和代码有:

persistenceAdapter

kahaDB directory="${activemq.base}/data/kahadb"/

/persistenceAdapter

producer.Send(request, MsgDeliveryMode.Persistent, level, TimeSpan.MinValue);

2、持久化为MySql

首先需要把MySql的驱动放到ActiveMQ的Lib目录下,我用的文件名字是:mysql-connector-java-5.0.4-bin.jar

接下来修改配置文件

persistenceAdapter

jdbcPersistenceAdapter dataDirectory="${activemq.base}/data" dataSource="#derby-ds"/

/persistenceAdapter

在配置文件中的broker节点外增加

bean id="derby-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close"

property name="driverClassName" value="com.mysql.jdbc.Driver"/

property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/

property name="username" value="activemq"/

property name="password" value="activemq"/

property name="maxActive" value="200"/

property name="poolPreparedStatements" value="true"/

/bean

从配置中可以看出数据库的名称是activemq,需要手动在MySql中增加这个库。

然后重新启动消息队列,会发现多了3张表

1:activemq_acks

2:activemq_lock

3:activemq_msgs


当前名称:javamq取消息代码 java mq消息队列详解
文章起源:http://myzitong.com/article/doesdpp.html