在消息队列(Message Queue,简称MQ)中,顺序消息的收发是确保业务操作按预期顺序执行的重要机制,以下是关于MQ顺序消息收发的详细解答:
基本概念
1、顺序消息:顺序消息是指消息队列中的消息按照发送的顺序被消费,这种机制对于需要严格保证操作顺序的业务场景非常重要,例如电商系统中的订单处理、支付流程等。
2、全局顺序和分区顺序:
全局顺序:对于指定的一个Topic,所有消息按照严格的先入先出(FIFO)的顺序进行发布和消费。
分区顺序:对于指定的一个Topic,所有消息根据Sharding Key进行区块分区,同一个分区内的消息按照严格的FIFO顺序进行发布和消费。
实现机制
1. RocketMQ
Producer:
RocketMQ通过Producer发送顺序消息时,会使用MessageQueueSelector指定消息发送到固定的MessageQueue,这样可以确保相同业务操作的消息被发送到同一个队列中,从而实现局部有序。
对于全局有序,需要将所有消息都发送到同一个MessageQueue中。
Consumer:
RocketMQ的消费者会注册一个监听器,进行消息的拉取和消费处理,对于顺序消息,消费者会通过加锁机制来保证消息消费的顺序性。
Broker端通过对MessageQueue进行加锁,确保同一个MessageQueue只能被同一个Consumer进行消费。
2. Kafka
Kafka在同一个Partition内保障消息顺序,如果Topic存在多个Partition则无法确保全局顺序,为了保障全局顺序,需要控制Partition数量为1个。
3. RabbitMQ
RabbitMQ中的queue是有序的消息集合,消息以FIFO方式进行排队和出队列,要实现RabbitMQ的顺序消息,需要配置一个Queue对应一个Consumer,并关闭autoack,prefetchCount=1,每次只消费一条信息,处理过后进行手工ack。
示例代码
1. RocketMQ示例代码
// Producer示例 public class Producer { private final String TOPIC = "OrderTopic"; private DefaultMQProducer producer; public Producer() throws MQClientException { producer = new DefaultMQProducer("producer_group"); producer.setNamesrvAddr("localhost:9876"); producer.start(); } public void sendMessage(Order order) throws MQClientException, InterruptedException, RemotingException, MQBrokerException { Message message = new Message(TOPIC, "create_order", JSON.toJSONBytes(order)); producer.send(message, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { Integer id = (Integer) arg; int index = id % mqs.size(); return mqs.get(index); } }, order.getId()); } public void shutdown() { producer.shutdown(); } }
// Consumer示例 public class Consumer { private final String TOPIC = "OrderTopic"; private DefaultMQPushConsumer consumer; public Consumer() throws MQClientException { consumer = new DefaultMQPushConsumer("consumer_group"); consumer.setNamesrvAddr("localhost:9876"); consumer.subscribe(TOPIC, "*"); consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) > { for (MessageExt msg : msgs) { System.out.println("Received message: " + new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); consumer.start(); } }
2. HTTP协议下的Java SDK示例代码(RocketMQ)
import com.aliyun.mq.http.MQClient; import com.aliyun.mq.http.MQProducer; import com.aliyun.mq.http.model.TopicMessage; import java.util.Date; public class OrderProducer { public static void main(String[] args) { MQClient mqClient = new MQClient( "${HTTP_ENDPOINT}", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET") ); final String topic = "${TOPIC}"; final String instanceId = "${INSTANCE_ID}"; MQProducer producer; if (instanceId != null && !instanceId.isEmpty()) { producer = mqClient.getProducer(instanceId, topic); } else { producer = mqClient.getProducer(topic); } try { for (int i = 0; i < 8; i++) { TopicMessage pubMsg = new TopicMessage( "hello mq!".getBytes(), "A" ); pubMsg.setShardingKey(String.valueOf(i % 2)); pubMsg.getProperties().put("a", String.valueOf(i)); TopicMessage pubResultMsg = producer.publishMessage(pubMsg); System.out.println(new Date() + " Send mq message success. Topic is:" + topic + ", msgId is: " + pubResultMsg.getMessageId() + ", bodyMD5 is: " + pubResultMsg.getMessageBodyMD5()); } } catch (Throwable e) { System.out.println(new Date() + " Send mq message failed. Topic is:" + topic); e.printStackTrace(); } finally { mqClient.shutdown(); } } }
注意事项
性能影响:顺序消息由于需要保证顺序性,可能会对性能产生影响,单个MessageQueue的消息量可能很大,而Consumer端消费时只能单线程消费,可能导致消息积压。
Broker故障:如果顺序消息所在的Broker发生故障,可能会导致消息顺序错乱。
配置要求:对于不同的MQ实现,顺序消息的配置要求也不同,Kafka需要控制Partition数量为1个,RabbitMQ需要配置普通queue并关闭autoack等。
希望以上信息能帮助您更好地理解MQ顺序消息的收发机制及其实现方法,如有更多问题,请随时提问。
原创文章,作者:未希,如若转载,请注明出处:https://www.kdun.com/ask/1223222.html
本网站发布或转载的文章及图片均来自网络,其原创性以及文中表达的观点和判断不代表本网站。如有问题,请联系客服处理。
发表回复