关于在MQ消息队列实践中的一些问题

消息相关

这里都是保证消费者角度考虑,不考虑MQ自身问题

大致原理看了前辈写的代码,通过aop增强,将注解@RabbitHandler作为切入点进行环绕通知。

因此在使用@RabbitListener监听消息队列的时候加入注解@RabbitHandler即可实现增强

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
import com.alibaba.fastjson.JSONObject;
import com.sugon.cloud.rabbitmq.common.constants.MessageLogConstant;
import com.sugon.cloud.rabbitmq.common.mapper.MessageReceiverMapper;
import com.rabbitmq.client.Channel;
import com.sugon.cloud.rabbitmq.common.model.MessageLog;
import lombok.AllArgsConstructor;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.springframework.amqp.core.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

import java.util.Date;
import java.util.List;
import java.util.Objects;
import java.util.UUID;

@Component
@Aspect
@Slf4j
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class MQReceiverAspect {

private final MessageReceiverMapper messageReceiverMapper;

private final Environment env;

//切点为RabbitHandler注解
@Pointcut("@annotation(org.springframework.amqp.rabbit.annotation.RabbitHandler)")
public void messageCheck() {
}

//环绕通知为messageCheck()方法
@Around("messageCheck()")
public Object around(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
log.info("CLASS_METHOD : " + proceedingJoinPoint.getSignature().getDeclaringTypeName() + "." + proceedingJoinPoint.getSignature().getName());
Object result = null;
// onMessage参数必须为Message和Channel
Message message = (Message) proceedingJoinPoint.getArgs()[0];
Channel channel = (Channel) proceedingJoinPoint.getArgs()[1];

// 如果没传RequestId则直接抛异常
String requestId = message.getMessageProperties().getMessageId();
String param = new String(message.getBody());
if (StringUtils.isEmpty(requestId)) {
log.error("接受到RequestId为空的MQ消息:" + param);
// TODO 如果接受到的消息不正常 也需要ack 否则消息被重复消费一直打印异常
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
return new Object();
}
try {
List<MessageLog> messageLogList = messageReceiverMapper.queryMessageLogByRequestId(requestId);
if (messageLogList.size() == 0) {
// 如果数据库中没有相同requestId的消息则直接保存,并把MessageId设置为数据记录的ID
saveMessageLogAndSetMessageId(requestId, param, message);
// 调用onMessage方法
result = proceedingJoinPoint.proceed(proceedingJoinPoint.getArgs());
} else { // 如果数据库中已有相同requestId的消息,则判断是重复消息还是重试消息
for (MessageLog mLog : messageLogList) {
if (mLog.getBusinessStatus() == MessageLogConstant.BUSINESS_STATUS_INIT) {
// 如果数据库中已有一条正在处理中的消息,说明有其他线程正在处理相同requestId的消息,则当前线程需要等待其他线程处理完毕再做下一步操作
MessageLog messageLog = buildMessageLog(requestId, param);
messageReceiverMapper.saveConflictMessageLog(messageLog);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
log.error("RequestID为 " + requestId + " 的消息正在被其他线程处理,当前线程暂不处理,记录状态为4!");
break;
} else if (mLog.getBusinessStatus() == MessageLogConstant.BUSINESS_STATUS_SUCCESS) {
// 如果数据库中相同RequestId的消息已经被成功处理,则当前线程不再处理该消息,只保留一条日志.
MessageLog messageLog = buildMessageLog(requestId, param);
messageReceiverMapper.saveDuplicateMessageLog(messageLog);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
log.info("RequestID为 " + requestId + " 的消息已被成功处理,当前线程将不再处理!记录状态为3");
break;
} else if (mLog.getBusinessStatus() == MessageLogConstant.BUSINESS_STATUS_FAIL) {
// 如果数据库中相同requestId的消息为业务处理失败,则当前线程收到的消息为重试消息,重新执行。
saveMessageLogAndSetMessageId(requestId, param, message);
// 调用onMessage方法
log.info("RequestID为 " + requestId + " 的消息未被成功处理,当前线程继续处理!");
result = proceedingJoinPoint.proceed(proceedingJoinPoint.getArgs());
break;
}
}
}
} catch (Exception e) {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
log.error("消息幂等性处理异常", e);
}
return result;
}

private void saveMessageLogAndSetMessageId(String requestId, String param, Message message) {
MessageLog messageLog = buildMessageLog(requestId, param);
messageReceiverMapper.saveMessageLog(messageLog);
// 为了便于在onMessage方法中更新businessStatus
message.getMessageProperties().setMessageId(messageLog.getId());
}

private MessageLog buildMessageLog(String requestId, String param) {
MessageLog messageLog = new MessageLog();
messageLog.setId(UUID.randomUUID().toString());
messageLog.setRequestId(requestId);
messageLog.setParam(param);
messageLog.setCreateDate(new Date());
return messageLog;
}
}

同步队列 RPC

A端向B发送一条查询消息,A端一直阻塞等待B端返回消息,或者超时。在MQ官网中,很像RPC模式。称为direct-reply-to直接回复,大概意思就是发送消息时声明一个响应队列,通过该队列发送消息和接收消息,然后再销毁这个“临时队列”。

参考一下文章spring -amqp request-replydirect-reply-toRPC

image-20210921153926966

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
public String callAgentSync(String params, Integer replyTimeout) {
String result = null;
try {
String tempUUId = UUIDUtil.getUUID();
RabbitTemplate template = new RabbitTemplate(rabbitTemplate.getConnectionFactory());
ConnectionFactory connectionFactory = rabbitTemplate.getConnectionFactory();
Connection connection = connectionFactory.createConnection();
Channel channel = connection.createChannel(false);
channel.exchangeDeclare(messageMqConfig.getPgsql().getSend().getExchange(), BuiltinExchangeType.TOPIC, true);
//创建自动删除的固定队列,负责此次消息
channel.queueDeclare(tempUUId, false, true, true, null);
//将队列与交换机绑定
channel.queueBind(tempUUId, messageMqConfig.getPgsql().getSend().getExchange(), tempUUId);

if (Objects.nonNull(replyTimeout)) {
template.setReplyTimeout(replyTimeout);
}
//此次消息返回地址为上诉固定队列
template.setReplyAddress(tempUUId);

SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(template.getConnectionFactory());
//这一步非常重要,固定队列模式要,一定要主动设置 SimpleMessageListenerContainer监听容器,监听应答队列
container.setQueueNames(tempUUId);
container.setMessageListener(template);
container.setConcurrentConsumers(100);
container.setConcurrentConsumers(100);
container.setPrefetchCount(250);
container.start();

CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
MessageProperties messageProperties = new MessageProperties();
messageProperties.setReplyTo(tempUUId);
messageProperties.setReplyToAddress(new Address(messageMqConfig.getPgsql().getSend().getExchange(),tempUUId));
messageProperties.setCorrelationId(correlationId.getId());
messageProperties.setExpiration("10000");
messageProperties.setReplyTo(tempUUId);
Message message = new Message(params.getBytes(),messageProperties);
log.debug("queue id[{}],params=[{}]", tempUUId, params);
Message response = template.sendAndReceive(messageMqConfig.getPgsql().getSend().getExchange(), messageMqConfig.getPgsql().getSend().getRoutingkey(), message);

if (Objects.nonNull(response)) {
result = new String(response.getBody());
}
container.stop();
channel.close();
connection.close();
template.stop();
log.debug("queue id[{}], result=[{}]", tempUUId, result);
}catch (Exception e){
log.error("callAgentSync error:", e);
}
return result;
}