当前位置:网站首页>How can an e-commerce system automatically cancel an order when it times out?
How can an e-commerce system automatically cancel an order when it times out?
2022-06-21 13:57:00 【The forest wind is at ease】
One 、 background
User orders in the system , The system orders are generally distributed transaction operations , You want to automatically cancel the order when it times out , We can base it on MQ Implementation of delay queue and dead letter queue . The overall implementation idea should be considered in three cases , The first is the creation and delivery of orders to MQ, The second is the consumption of normal order messages , The other is the consumption of messages after timeout .
Two 、 Realize the idea
For order creation , As long as the producer successfully delivers the message to MQ, It is considered that the order was created successfully .MQ return ack It indicates that the message is delivered successfully , At this point, a message is sent to the delay queue , The delay queue is used to mount the dead letter queue . The purpose of this is : If the messages in the delay queue reach the threshold and have not been consumed , Will enter the dead letter queue , At this time, the listener of the dead letter queue will get the expired order information , You can cancel , conversely , Then follow the normal order consumption process .
The overall implementation idea is as follows :

3、 ... and 、 Specific code
This article is based on RabbitMQ Realization , With the help of RabbitMQ The delay queue for TTL And dead letter lines .
The configuration file :
server.port=8080
# Design rabbitmq Connect
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=root
spring.rabbitmq.password=123456
# Set up the virtual host
spring.rabbitmq.virtual-host=keduw-order
# Set up publisher confirmation mechanism
# correlated The callback method will be triggered when the message is successfully published to the switch , The default is none
spring.rabbitmq.publisher-confirm-type=correlated
spring.rabbitmq.publisher-returns=true
# The message is a manual acknowledgement
spring.rabbitmq.listener.direct.acknowledge-mode=manual
increase RabbitMQ Configuration class , Create corresponding queue 、 converter 、 Listener and queue information binding , The remarks are very detailed , I won't go into details here .
**
* RabbitMQ Configuration class
*/
@Configuration
public class RabbitMqConfig {
/**
* Use DirectMessageListenerContainer, You need to make sure ConnectionFactory A task executor is configured ,
* The actuator is using the ConnectionFactory There are enough threads in all of the listener containers to support the required concurrency .
* The default connection pool size is just 5.
*
* Concurrency is based on configured queues and consumersPerQueue. Each user of each queue uses a separate channel ,
* Concurrency is determined by rabbit Client library control ; By default , It USES 5 A thread pool ;
* You can configure the taskExecutor To provide the maximum concurrency required .
*
* @param connectionFactory
* @return
*/
@Bean(name = "rabbitMessageListenerContainer")
public DirectMessageListenerContainer listenerContainer(ConnectionFactory connectionFactory){
// When writing , By default DEFAULT_NUM_THREADS = Runtime.getRuntime().availableProcessors() * 2 Threads
DirectMessageListenerContainer container = new DirectMessageListenerContainer(connectionFactory);
// Set the mode of the confirmation message
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
container.setPrefetchCount(5);
container.setConsumersPerQueue(5);
container.setMessagesPerAck(1);
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(10);
taskExecutor.setMaxPoolSize(20);
// Set this property , Flexible setting of concurrency , Multithreading .
container.setTaskExecutor(taskExecutor);
return container;
}
/**
* Set up message converter , Used to convert objects to JSON data
* Can pass converterAndSend Send object to message queue
* The listener can also deserialize the receiving object into java object
*
* @return Jackson converter
*/
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
return new RabbitAdmin(connectionFactory);
}
/**
* Order message queue
* @return
*/
@Bean
public Queue orderQueue(){
return QueueBuilder.durable("q.order").build();
}
/**
* Delayed message queuing
* @return
*/
@Bean
public Queue ttlQueue(){
Map<String,Object> args = new HashMap<>();
// Messages from this queue 10s expire
args.put("x-message-ttl", 10000);
// Set up dead letter queue switch ,( When Queuing messages TTL No consumption after expiration , Then join the dead letter queue )
args.put("x-dead-letter-exchange","x.dlx");
// Set the private message queue routing key , Set the name of the dead letter exchanger associated with the queue routingKey, If there is no special designation , Using the original queue routingKey
args.put("x-dead-letter-routing-key","k.dlx");
Queue queue = new Queue("q.ttl",true,false,false, args);
return queue;
}
/**
* Dead letter queue , Used to cancel user orders
* When 10s Orders that have not been paid will enter the dead letter queue , Consumption dead letter queue , Cancel user order
*
* @return
*/
@Bean
public Queue dlxQueue(){
Map<String,Object> args = new HashMap<>();
Queue dlq = new Queue("q.dlx",true,false,false, args);
return dlq;
}
/**
* Order exchanger
* @return
*/
@Bean
public Exchange orderExchange(){
Map<String, Object> args = new HashMap<>();
DirectExchange exchange = new DirectExchange("x.order", true, false, args);
return exchange;
}
/**
* Delay queue switch
* @return
*/
@Bean
public Exchange ttlExchange(){
Map<String, Object> args = new HashMap<>();
return new DirectExchange("x.ttl", true, false, args);
}
/**
* Dead letter queue exchanger
* @return
*/
@Bean
public Exchange dlxExchange(){
Map<String, Object> args = new HashMap<>();
DirectExchange exchange = new DirectExchange("x.dlx", true, false, args);
return exchange;
}
/**
* Used to send orders , Doing distributed transactions MQ
* @return
*/
@Bean
public Binding orderBinding(){
return BindingBuilder.bind(orderQueue())
.to(orderExchange())
.with("k.order")
.noargs();
}
/**
* Delay queue binding for waiting for user payment
* @return
*/
@Bean
public Binding ttlBinding(){
return BindingBuilder.bind(ttlQueue())
.to(ttlExchange())
.with("k.ttl")
.noargs();
}
/**
* Dead letter queue binding for payment timeout cancellation of user orders
* @return
*/
@Bean
public Binding dlxBinding(){
return BindingBuilder.bind(dlxQueue())
.to(dlxExchange())
.with("k.dlx")
.noargs();
}
}
Create an order listener , Used to listen to normal payment submission and overtime cancellation of orders .
/**
* Normal order payment process monitoring
*/
@Component
public class OrderNormalListener {
@RabbitListener(queues = "q.order",ackMode = "MANUAL")
public void onMessage(Order order , Channel channel , Message message) throws IOException {
System.out.println(" Write to database ");
System.out.println(order);
for (OrderDetail detail : order.getDetails()){
System.out.println(detail);
}
channel.basicAck(message.getMessageProperties().getDeliveryTag() , false);
}
}
Create order timeout auto cancel listener , Listening is the dead letter queue .
/**
* Order timeout automatically cancels listening
*/
@Component
public class OrderCancelListener implements ChannelAwareMessageListener {
@Override
@RabbitListener(queues = "q.dlx" , ackMode = "MANUAL")
public void onMessage(Message message, Channel channel) throws Exception {
String orderId = new String(message.getBody());
System.out.println(" Cancellation of order :" + orderId);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
For order submission , After normal submission, one more copy will be delivered to the delay queue at the same time , Used to delay cancellation .
// Build order information
Order order = new Order();
order.setUserId(IdUtils.generateUserId());
order.setOrderId(IdUtils.generateOrderId());
// Set the status to be paid
order.setStatus(OrderStatus.TO_BE_PAYED.toString());
order.setDetails(details);
// Deliver a message
CorrelationData correlationData = new CorrelationData();
rabbitTemplate.convertAndSend("x.order","k.order", order, correlationData);
// Synchronization waiting , It can be set as asynchronous callback
CorrelationData.Confirm confirm = correlationData.getFuture().get();
// Determine whether the sent message gets broker The confirmation of
boolean confirmAck = confirm.isAck();
if (confirmAck){
// Send delay wait message
rabbitTemplate.convertAndSend("x.ttl","k.ttl" , order.getOrderId());
}
Four 、 summary
Come here , This basically realizes the idea of automatic cancellation of the whole order delay , But there are still problems .
Post the order message to MQ After that, one more copy should be delivered to the delay queue , There may be cases where the first delivery is successful but the delivery to the delay queue fails , Here you need to rely on distributed locks or add compensation mechanisms ; There are also coding problems ,MQ Queue names are best extracted , Of course it's just demo, There is no such standard , If it's product development , These all need the best regulation , Convenient later maintenance .
边栏推荐
- Implementation principle and difference between C value type and reference type
- Lamp architecture 5 - MySQL Cluster and master-slave structure
- [deeply understand tcapulusdb technology] tcapulusdb import data
- Web3.js connection to metamask wallet transfer
- [googolu] takeout rebate system - business domain name of KFC configuration applet
- [deeply understand tcapulusdb technology] tmonitor system upgrade
- How does JMeter implement interface association?
- 服务治理的工作内容
- Are you still using generator to generate crud code of XXX management system? Let's see what I wrote
- Automatic operation and maintenance 4 - variables and encryption in ansible
猜你喜欢
MySQL constraints (descriptions of various conditions when creating tables)

Map collection traversal, adding, replacing and deleting elements

MySQL - table constraints

Lamp Architecture 3 -- compilation and use of PHP source code

Cvpr2022 | the action sequence verification task was first proposed by X xiaohongshu of Shanghai University of science and technology, which can be applied to multiple scenarios such as scoring of spo

Automation operation and maintenance 1 - installation and deployment of ansible

Are you still using generator to generate crud code of XXX management system? Let's see what I wrote

How to write test cases

Kube-prometheus grafana安装插件和grafana-image-renderer

Eureka's timedsupersortask class (periodic task with automatic interval adjustment)
随机推荐
Leetcode height checker
[untitled]
Test the interface between app and wechat applet
In the autumn of 2022, from being rejected to sp+, talk about the experience and harvest of YK bacteria in 2021
[deeply understand tcapulusdb technology] tmonitor background one click installation
Mr. Ali taught you how to use JMeter for pressure test (detailed drawing)
[test process and theory - software development process and project management]
Redis learning (1) -- overview and common commands
Use of MySQL 8.0.19 under alicloud lightweight application server linux-centos7
1. memory partition model
What is software testing?
如何使用搜索引擎?
MySQL - table operation
C#&. Net to implement a distributed event bus from 0 (1)
哪个期货平台 交易更安全放心。求推荐。
6. functions
Modification method of EKF extended Kalman filter for omnidirectional ground
Setting of Seaborn drawing style
[test process and theory - test process system]
Explanation of vim, makefile and GDB tools