关于在MQ消息队列实践中的一些问题
消息相关
这里都是保证消费者角度考虑,不考虑MQ自身问题
大致原理看了前辈写的代码,通过aop增强,将注解@RabbitHandler作为切入点进行环绕通知。
因此在使用@RabbitListener监听消息队列的时候加入注解@RabbitHandler即可实现增强
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112
| import com.alibaba.fastjson.JSONObject; import com.sugon.cloud.rabbitmq.common.constants.MessageLogConstant; import com.sugon.cloud.rabbitmq.common.mapper.MessageReceiverMapper; import com.rabbitmq.client.Channel; import com.sugon.cloud.rabbitmq.common.model.MessageLog; import lombok.AllArgsConstructor; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Aspect; import org.aspectj.lang.annotation.Pointcut; import org.springframework.amqp.core.Message; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.core.env.Environment; import org.springframework.stereotype.Component; import org.springframework.util.StringUtils;
import java.util.Date; import java.util.List; import java.util.Objects; import java.util.UUID;
@Component @Aspect @Slf4j @RequiredArgsConstructor(onConstructor = @__(@Autowired)) public class MQReceiverAspect {
private final MessageReceiverMapper messageReceiverMapper;
private final Environment env;
@Pointcut("@annotation(org.springframework.amqp.rabbit.annotation.RabbitHandler)") public void messageCheck() { }
@Around("messageCheck()") public Object around(ProceedingJoinPoint proceedingJoinPoint) throws Throwable { log.info("CLASS_METHOD : " + proceedingJoinPoint.getSignature().getDeclaringTypeName() + "." + proceedingJoinPoint.getSignature().getName()); Object result = null; Message message = (Message) proceedingJoinPoint.getArgs()[0]; Channel channel = (Channel) proceedingJoinPoint.getArgs()[1];
String requestId = message.getMessageProperties().getMessageId(); String param = new String(message.getBody()); if (StringUtils.isEmpty(requestId)) { log.error("接受到RequestId为空的MQ消息:" + param); channel.basicAck(message.getMessageProperties().getDeliveryTag(), true); return new Object(); } try { List<MessageLog> messageLogList = messageReceiverMapper.queryMessageLogByRequestId(requestId); if (messageLogList.size() == 0) { saveMessageLogAndSetMessageId(requestId, param, message); result = proceedingJoinPoint.proceed(proceedingJoinPoint.getArgs()); } else { for (MessageLog mLog : messageLogList) { if (mLog.getBusinessStatus() == MessageLogConstant.BUSINESS_STATUS_INIT) { MessageLog messageLog = buildMessageLog(requestId, param); messageReceiverMapper.saveConflictMessageLog(messageLog); channel.basicAck(message.getMessageProperties().getDeliveryTag(), true); log.error("RequestID为 " + requestId + " 的消息正在被其他线程处理,当前线程暂不处理,记录状态为4!"); break; } else if (mLog.getBusinessStatus() == MessageLogConstant.BUSINESS_STATUS_SUCCESS) { MessageLog messageLog = buildMessageLog(requestId, param); messageReceiverMapper.saveDuplicateMessageLog(messageLog); channel.basicAck(message.getMessageProperties().getDeliveryTag(), true); log.info("RequestID为 " + requestId + " 的消息已被成功处理,当前线程将不再处理!记录状态为3"); break; } else if (mLog.getBusinessStatus() == MessageLogConstant.BUSINESS_STATUS_FAIL) { saveMessageLogAndSetMessageId(requestId, param, message); log.info("RequestID为 " + requestId + " 的消息未被成功处理,当前线程继续处理!"); result = proceedingJoinPoint.proceed(proceedingJoinPoint.getArgs()); break; } } } } catch (Exception e) { channel.basicAck(message.getMessageProperties().getDeliveryTag(), true); log.error("消息幂等性处理异常", e); } return result; }
private void saveMessageLogAndSetMessageId(String requestId, String param, Message message) { MessageLog messageLog = buildMessageLog(requestId, param); messageReceiverMapper.saveMessageLog(messageLog); message.getMessageProperties().setMessageId(messageLog.getId()); }
private MessageLog buildMessageLog(String requestId, String param) { MessageLog messageLog = new MessageLog(); messageLog.setId(UUID.randomUUID().toString()); messageLog.setRequestId(requestId); messageLog.setParam(param); messageLog.setCreateDate(new Date()); return messageLog; } }
|
同步队列 RPC
A端向B发送一条查询消息,A端一直阻塞等待B端返回消息,或者超时。在MQ官网中,很像RPC模式。称为direct-reply-to直接回复,大概意思就是发送消息时声明一个响应队列,通过该队列发送消息和接收消息,然后再销毁这个“临时队列”。
参考一下文章spring -amqp request-reply、direct-reply-to、RPC
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
| public String callAgentSync(String params, Integer replyTimeout) { String result = null; try { String tempUUId = UUIDUtil.getUUID(); RabbitTemplate template = new RabbitTemplate(rabbitTemplate.getConnectionFactory()); ConnectionFactory connectionFactory = rabbitTemplate.getConnectionFactory(); Connection connection = connectionFactory.createConnection(); Channel channel = connection.createChannel(false); channel.exchangeDeclare(messageMqConfig.getPgsql().getSend().getExchange(), BuiltinExchangeType.TOPIC, true); channel.queueDeclare(tempUUId, false, true, true, null); channel.queueBind(tempUUId, messageMqConfig.getPgsql().getSend().getExchange(), tempUUId);
if (Objects.nonNull(replyTimeout)) { template.setReplyTimeout(replyTimeout); } template.setReplyAddress(tempUUId);
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(template.getConnectionFactory()); container.setQueueNames(tempUUId); container.setMessageListener(template); container.setConcurrentConsumers(100); container.setConcurrentConsumers(100); container.setPrefetchCount(250); container.start();
CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString()); MessageProperties messageProperties = new MessageProperties(); messageProperties.setReplyTo(tempUUId); messageProperties.setReplyToAddress(new Address(messageMqConfig.getPgsql().getSend().getExchange(),tempUUId)); messageProperties.setCorrelationId(correlationId.getId()); messageProperties.setExpiration("10000"); messageProperties.setReplyTo(tempUUId); Message message = new Message(params.getBytes(),messageProperties); log.debug("queue id[{}],params=[{}]", tempUUId, params); Message response = template.sendAndReceive(messageMqConfig.getPgsql().getSend().getExchange(), messageMqConfig.getPgsql().getSend().getRoutingkey(), message);
if (Objects.nonNull(response)) { result = new String(response.getBody()); } container.stop(); channel.close(); connection.close(); template.stop(); log.debug("queue id[{}], result=[{}]", tempUUId, result); }catch (Exception e){ log.error("callAgentSync error:", e); } return result; }
|