//rocketmq-spring-boot-starter提供的 *** 作rocketmq的类
@Autowired
private RocketMQTemplaterocketMQTemplate;
同步消息(sync message)
producer向broker发送消息,broker服务器返回发送结果后再继续往下执行
public void sendSyncMsg(Stringtopic,String msg){
rocketMQTemplate.syncSend(tpic,msg);
}
异步消息(sync message)
producer向broker发送消息时指定消息发送成功以及发送异常的回调方法,producer发送消息后线程不阻塞继续往下执行,消息发送成功或失败的回调任务在新的线程中执行
public void sendAsyncMsg(String topic,String msg){
rocketMQTemplate.asyncSend(topic, msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println(sendResult);
@Override
public void onException(Throwable throwable) {
System.out.println(throwable.getMessage());
}
});
}
单向消息(sync message)
producer向broker发送消息不等待broker服务器的结果
public void sendOneWayMsg(String topic,String msg){
rocketMQTemplate.sendOneWay(topic,msg);
}
rocketmq发送自定义消息类型
生产者
public void sendMsgByJson(String topic, OrderExt orderExt){
//同步发送
rocketMQTemplate.convertndSend(topic,orderExt);
}
消费者
@Component
@RocketMQMessageListener(topic = "rocketmq-demo-obj",consumerGroup = "demo-consumer-group-obj")
public class ConsumerSimpleObj implements RocketMQListener {
@Override
public void onMessage(OrderExt orderExt) {
System.out.println(orderExt);
}
}
rocketmq发送延迟消息
发送延迟消息只需要设置延迟时间即可,延迟时间存在18个等级(1s/5s/10s/30s/1m/2m/3m/4m/5m/6m/7m/8m/9m/10m/20m/30m/1h/2h),调用setDelayTimeLevel(level)设置与延迟时间相对于的延迟级别即可。
发送同步消息
public void sendMsgByJsonDelay(String topic, OrderExt orderExt){
Message message =MessageBuilder.withPayload(orderExt).build();
//第四个参数设置延迟级别
rocketMQTemplate.syncSend(topic,message,3000,3);
System.out.printf("send msg:%s",orderExt);
}
发送异步消息
public void sendAsyncMsgByJsonDelay(String topic, OrderExt orderExt) throws RemotingException, MQClientException, InterruptedException {
String s = JSON.toJSONString(orderExt);
org.apache.rocketmq.common.message.Message message = neworg.apache.rocketmq.common.message.Message(topic,s.getBytes(Charset.orName("utf-8")));
message.setDelayTimeLevel(3);
//异步消息使用原生方法发送
rocketMQTemplate.getProducer().send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println(sendResult);
}
@Override
public void onException(Throwable throwable) {
System.out.println(throwable.getMessage());
}
});
System.out.printf("sent msg:%s",s);
}
rocketmq消费重试
@Component
@RocketMQMessageListener(topic = "rocketmq-demo-obj",consumerGroup = "demo-consumer-group-obj")
public class ConsumerSimpleObj implements RocketMQListener {
@Override
public void onMessage(MessageExt messageExt) {
System.out.println(new Date());
System.out.println(JSON.parseObject(new String(messageExt.getBody()),OrderExt.class));
//消费发生异常后会重复消费同一数据,默认会按第3级及之后的延时时间间隔重复消费(即第一次重复消费在首次消费10s后,第二次重复消费在第一次重复消费30s后,以此类推),可以根据消费次数终止重复消费
if(messageExt.getReconsumeTimes()>2){
System.out.println("停止重试,写入数据库...");
return;
}
throw new RuntimeException("处理消息失败");
}
}
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)