事务采用commit和rollback来提交和回滚事务:
session.commit(); session.rollback();签收模式
签收代表接收端的session已收到消息的一次确认,反馈给broker,分别为以下三种:
-
Session.AUTO_ACKNOWLEDGE:当客户端从receiver或onMessage成功返回时,Session自动签收客户端的这条消息的收条。
-
Session.CLIENT_ACKNOWLEDGE:客户端通过调用消息(Message)的acknowledge方法签收消息。在这种情况下,签收发生在Session层面:签收一个已经消费的消息会自动地签收这个Session所有已消费的收条。
-
Session.DUPS_OK_ACKNOWLEDGE:Session不必确保对传送消息的签收,这个模式可能会引起消息的重复,但是降低了Session的开销,所以只有客户端能容忍重复的消息,才可使用。
默认持久化是开启的,producer端可以在设置持久化:
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT)优先级
可以设置信息的优先级,但是必须配置activemq.xml文件,使其支持优先级:
然后就可以在代码中设置优先级:
producer.setPriority消息超时/过期
设置了消息超时的消息,消费端在超时后无法在消费到此消息。
producer.setTimeToLive死信
此类消息会进入到ActiveMQ.DLQ队列且不会自动清除,称为死信。如果一直不关注这个队列,可能导致OOM
1、修改死信队列名称
useQueueForQueueMessages: 设置使用队列保存死信,还可以设置useQueueForTopicMessages,使用Topic来保存死信
2、让非持久化的消息也进入死信队列
默认情况,对于非持久化的消息,不会进入死信队列,可以通过如下配置让非持久化的消息也进入死信队列
3、设置过期消息不进死信队列
独占消费者
通过在创建队列的时候,指定参数consumer.exclusive为true:
Queue queue = session.createQueue("test?consumer.exclusive=true");
还可以设置优先级
Queue queue = session.createQueue("xxoo?consumer.exclusive=true&consumer.priority=10");消息类型 object
1、发送端
Person person = new Person("qiqi", 25, 398.0); Message message = session.createObjectMessage(girl);
2、接受端
if(message instanceof ActiveMQObjectMessage) { Person person = (Person)((ActiveMQObjectMessage)message).getObject(); System.out.println(person); }
注意,当使用对象时,需要提前添加信任:
connectionFactory.setTrustedPackages(Arrays.asList("com.zjw.entity", "com.zjw.model"));bytesMessage
1、发送端
BytesMessage bytesMessage = session.createBytesMessage(); bytesMessage.writeBytes("Hello".getBytes()); bytesMessage.writeUTF("你好!");
2、接受端
if(message instanceof BytesMessage) { BytesMessage bm = (BytesMessage)message; byte[] b = new byte[1024]; int len = -1; while ((len = bm.readBytes(b)) != -1) { System.out.println(new String(b, 0, len)); } }
还可以使用ActiveMQ给提供的便捷方法,但要注意读取和写入的顺序
bm.readBoolean() bm.readUTF()
3、写入文件
FileOutputStream out = new FileOutputStream("/tmp/test.txt"); byte[] bytes = new byte[1024]; int len = 0 ; while((len = bm.readBytes(bytes))!= -1){ out.write(bytes,0,len); }MapMessage
1、发送端
MapMessage mapMessage = session.createMapMessage(); mapMessage.setString("name","lucy"); mapMessage.setBoolean("yihun",false); mapMessage.setInt("age", 17); producer.send(mapMessage);
2、接收端
Message message = consumer.receive(); MapMessage mes = (MapMessage) message; System.out.println(mes); System.out.println(mes.getString("name"));消息发送原理 同步与异步
我们可以通过以下几种方式来设置异步发送:
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, ACTIVEMQ_HOST); // 通过factory设置 factory.setUseAsyncSend(true); ActiveMQConnection conn = (ActiveMQConnection)factory.createConnection(); // 通过连接设置 conn.setUseAsyncSend(true);消息堆积
producer每发送一个消息,统计一下发送的字节数,当字节数达到ProducerWindowSize值时,需要等待broker的确认,才能继续发送。
-
可以在brokerUrl中设置: tcp://localhost:61616?jms.producerWindowSize=1048576
-
可以destinationUri中设置: test?producer.windowSize=1048576
要是用该功能,首先在配置文件中开启延迟和调度项:schedulerSupport="true"
如下:
延迟发送
消息发送端,对消息进行属性设置:
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 10*1000); // 10s带间隔的重复发送
long delay = 10 * 1000; long period = 2 * 1000; int repeat = 9; // 这里需要注意写入的数据类型,否则不生效 message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay); message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period); message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat); createProducer.send(message);Cron表达式定时发送
Cron表达式是一个字符串,字符串以5或6个空格隔开,分为6或7个域,每一个域代表一个含义,Cron有如下两种语法格式:
Seconds Minutes Hours DayofMonth Month DayofWeek Year 或 Seconds Minutes Hours DayofMonth Month DayofWeek
每一个域可出现的字符如下:
-
Seconds: 可出现, - * /四个字符,有效范围为0-59的整数
-
Minutes: 可出现, - * /四个字符,有效范围为0-59的整数
-
Hours: 可出现, - * /四个字符,有效范围为0-23的整数
-
DayofMonth: 可出现, - * / ? L W C八个字符,有效范围为0-31的整数
-
Month: 可出现, - * /四个字符,有效范围为1-12的整数或JAN-DEC
-
DayofWeek: 可出现, - * / ? L C #四个字符,有效范围为1-7的整数或SUN-SAT两个范围。1表示星期天,2表示星期一, 依次类推
-
Year: 可出现, - * /四个字符,有效范围为1970-2099年
每一个域都使用数字,但还可以出现如下特殊字符,它们的含义是:
-
*:表示匹配该域的任意值,假如在Minutes域使用*, 即表示每分钟都会触发事件。
-
?:只能用在DayofMonth和DayofWeek两个域。它也匹配域的任意值,但实际不会。因为DayofMonth和 DayofWeek会相互影响。例如想在每月的20日触发调度,不管20日到底是星期几,则只能使用如下写法: 13 13 15 20 * ?, 其中最后一位只能用?,而不能使用*,如果使用*表示不管星期几都会触发,实际上并不是这样。
-
-:表示范围,例如在Minutes域使用5-20,表示从5分到20分钟每分钟触发一次
-
/:表示起始时间开始触发,然后每隔固定时间触发一次,例如在Minutes域使用5/20,则意味着5分钟触发一次,而25,45等分别触发一次.
-
,:表示列出枚举值值。例如:在Minutes域使用5,20,则意味着在5和20分每分钟触发一次。
-
L:表示最后,只能出现在DayofWeek和DayofMonth域,如果在DayofWeek域使用5L,意味着在最后的一个星期四触发。
-
W:表示有效工作日(周一到周五),只能出现在DayofMonth域,系统将在离指定日期的最近的有效工作日触发事件。例如:在 DayofMonth使用5W,如果5日是星期六,则将在最近的工作日:星期五,即4日触发。如果5日是星期天,则在6日(周一)触发;如果5日在星期一 到星期五中的一天,则就在5日触发。另外一点,W的最近寻找不会跨过月份
-
LW:这两个字符可以连用,表示在某个月最后一个工作日,即最后一个星期五。
-
#:用于确定每个月第几个星期几,只能出现在DayofMonth域。例如在4#2,表示某月的第二个星期三。
举几个例子:
0 0 2 1 * ? * 表示在每月的1日的凌晨2点调度任务
0 15 10 ? * MON-FRI 表示周一到周五每天上午10:15执行作业
0 15 10 ? 6L 2002-2006 表示2002-2006年的每个月的最后一个星期五上午10:15执行作
0 0 10,14,16 * * ? 每天上午10点,下午2点,4点
0 0/30 9-17 * * ? 朝九晚五工作时间内每半小时
0 0 12 ? * WED 表示每个星期三中午12点
0 0 12 * * ? 每天中午12点触发
0 15 10 ? * * 每天上午10:15触发
0 15 10 * * ? 每天上午10:15触发
0 15 10 * * ? * 每天上午10:15触发
0 15 10 * * ? 2005 2005年的每天上午10:15触发
0 * 14 * * ? 在每天下午2点到下午2:59期间的每1分钟触发
0 0/5 14 * * ? 在每天下午2点到下午2:55期间的每5分钟触发
0 0/5 14,18 * * ? 在每天下午2点到2:55期间和下午6点到6:55期间的每5分钟触发
0 0-5 14 * * ? 在每天下午2点到下午2:05期间的每1分钟触发
0 10,44 14 ? 3 WED 每年三月的星期三的下午2:10和2:44触发
0 15 10 ? * MON-FRI 周一至周五的上午10:15触发
0 15 10 15 * ? 每月15日上午10:15触发
0 15 10 L * ? 每月最后一日的上午10:15触发
0 15 10 ? * 6L 每月的最后一个星期五上午10:15触发
0 15 10 ? * 6L 2002-2005 2002年至2005年的每月的最后一个星期五上午10:15触发
0 15 10 ? * 6#3 每月的第三个星期五上午10:15触发
监听器可以使用监听器来处理消息接收
// 设定消费者的监听器 consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { // 当是Map 消息的情况 if (message instanceof MapMessage) { try { MapMessage msg = (MapMessage) message; //接受消息 msg.acknowledge(); String username = msg.getString("username"); String nickname = msg.getString("nickname"); int age = msg.getInt("age"); System.out.printf("获取消费者信息t%s:%s:%drn", username, nickname, age); } catch (JMSException e) { e.printStackTrace(); } } } });
当收到消息后会调起onMessage方法
消息过滤 消息发送Session session = conn.createSession(true, Session.CLIENT_ACKNOWLEDGE); MessageProducer producter = session.createProducer(null); MapMessage msg1 = session.createMapMessage(); msg1.setString("username", "张三"); msg1.setInt("age", 12); msg1.setString("nickname", "zhangsan"); // meta data msg1.setIntProperty("age", 12); msg1.setStringProperty("nickname", "zhangsan"); MapMessage msg2 = session.createMapMessage(); msg2.setString("username", "李四"); msg2.setInt("age", 17); msg2.setString("nickname", "lisi"); msg2.setIntProperty("age", 18); msg2.setStringProperty("nickname", "lisi"); MapMessage msg3 = session.createMapMessage(); msg3.setString("username", "王五"); msg3.setInt("age", 22); msg3.setString("nickname", "wangwu"); msg3.setIntProperty("age", 122); msg3.setStringProperty("nickname", "wangwu"); MapMessage msg4 = session.createMapMessage(); msg4.setString("username", "赵六"); msg4.setInt("age", 24); msg4.setString("nickname", "zhaoliu"); msg4.setIntProperty("age", 24); msg4.setStringProperty("nickname", "zhaoliu"); MapMessage msg5 = session.createMapMessage(); msg5.setString("username", "田七"); msg5.setInt("age", 24); msg5.setString("nickname", "jerry"); msg5.setIntProperty("age", 24); msg5.setStringProperty("nickname", "jerry"); Destination destination = session.createQueue("test"); // 发送消息 producter.send(destination, msg1, DeliveryMode.NON_PERSISTENT, 4, 1000 * 60); producter.send(destination, msg2, DeliveryMode.NON_PERSISTENT, 4, 1000 * 60); producter.send(destination, msg3, DeliveryMode.NON_PERSISTENT, 4, 1000 * 60); producter.send(destination, msg4, DeliveryMode.NON_PERSISTENT, 4, 1000 * 60); producter.send(destination, msg5, DeliveryMode.NON_PERSISTENT, 4, 1000 * 60); session.commit();消息接收
//查询年大于 18的数据, 字符串的过滤,需要有'' MessageConsumer consumer = session.createConsumer(destination, "age >18 OR nickname = 'jerry'");
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)