当前位置:网站首页>How can an e-commerce system automatically cancel an order when it times out?

How can an e-commerce system automatically cancel an order when it times out?

2022-06-21 13:57:00 The forest wind is at ease

One 、 background

User orders in the system , The system orders are generally distributed transaction operations , You want to automatically cancel the order when it times out , We can base it on MQ Implementation of delay queue and dead letter queue . The overall implementation idea should be considered in three cases , The first is the creation and delivery of orders to MQ, The second is the consumption of normal order messages , The other is the consumption of messages after timeout .

Two 、 Realize the idea

For order creation , As long as the producer successfully delivers the message to MQ, It is considered that the order was created successfully .MQ return ack It indicates that the message is delivered successfully , At this point, a message is sent to the delay queue , The delay queue is used to mount the dead letter queue . The purpose of this is : If the messages in the delay queue reach the threshold and have not been consumed , Will enter the dead letter queue , At this time, the listener of the dead letter queue will get the expired order information , You can cancel , conversely , Then follow the normal order consumption process .

The overall implementation idea is as follows :

 Insert picture description here

3、 ... and 、 Specific code

This article is based on RabbitMQ Realization , With the help of RabbitMQ The delay queue for TTL And dead letter lines .
The configuration file :

server.port=8080

#  Design rabbitmq Connect 
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=root
spring.rabbitmq.password=123456
#  Set up the virtual host 
spring.rabbitmq.virtual-host=keduw-order

#  Set up publisher confirmation mechanism 
# correlated The callback method will be triggered when the message is successfully published to the switch , The default is none
spring.rabbitmq.publisher-confirm-type=correlated
spring.rabbitmq.publisher-returns=true

#  The message is a manual acknowledgement 
spring.rabbitmq.listener.direct.acknowledge-mode=manual

increase RabbitMQ Configuration class , Create corresponding queue 、 converter 、 Listener and queue information binding , The remarks are very detailed , I won't go into details here .

**
 * RabbitMQ Configuration class 
 */
@Configuration
public class RabbitMqConfig {

    /**
     *  Use DirectMessageListenerContainer, You need to make sure ConnectionFactory A task executor is configured ,
     *  The actuator is using the ConnectionFactory There are enough threads in all of the listener containers to support the required concurrency .
     *  The default connection pool size is just 5.
     *
     *  Concurrency is based on configured queues and consumersPerQueue. Each user of each queue uses a separate channel ,
     *  Concurrency is determined by rabbit Client library control ; By default , It USES 5 A thread pool ;
     *  You can configure the taskExecutor To provide the maximum concurrency required .
     *
     * @param connectionFactory
     * @return
     */
    @Bean(name = "rabbitMessageListenerContainer")
    public DirectMessageListenerContainer listenerContainer(ConnectionFactory connectionFactory){
        //  When writing , By default DEFAULT_NUM_THREADS = Runtime.getRuntime().availableProcessors() * 2 Threads 
        DirectMessageListenerContainer container = new DirectMessageListenerContainer(connectionFactory);
        //  Set the mode of the confirmation message 
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        container.setPrefetchCount(5);
        container.setConsumersPerQueue(5);
        container.setMessagesPerAck(1);

        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setCorePoolSize(10);
        taskExecutor.setMaxPoolSize(20);
        // Set this property , Flexible setting of concurrency  , Multithreading .
        container.setTaskExecutor(taskExecutor);

        return container;
    }

    /**
     *  Set up message converter , Used to convert objects to JSON data 
     *  Can pass converterAndSend Send object to message queue 
     *  The listener can also deserialize the receiving object into java object 
     *
     * @return Jackson converter 
     */
    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
        return new RabbitAdmin(connectionFactory);
    }

    /**
     *  Order message queue 
     * @return
     */
    @Bean
    public Queue orderQueue(){
        return QueueBuilder.durable("q.order").build();
    }

    /**
     *  Delayed message queuing 
     * @return
     */
    @Bean
    public Queue ttlQueue(){
        Map<String,Object> args = new HashMap<>();
        //  Messages from this queue 10s expire 
        args.put("x-message-ttl", 10000);
        //  Set up dead letter queue switch ,( When Queuing messages TTL No consumption after expiration , Then join the dead letter queue )
        args.put("x-dead-letter-exchange","x.dlx");
        //  Set the private message queue routing key , Set the name of the dead letter exchanger associated with the queue routingKey, If there is no special designation , Using the original queue routingKey
        args.put("x-dead-letter-routing-key","k.dlx");
        Queue queue = new Queue("q.ttl",true,false,false, args);
        return queue;
    }

    /**
     *  Dead letter queue , Used to cancel user orders 
     *  When 10s Orders that have not been paid will enter the dead letter queue , Consumption dead letter queue , Cancel user order 
     *
     * @return
     */
    @Bean
    public Queue dlxQueue(){
        Map<String,Object> args = new HashMap<>();
        Queue dlq = new Queue("q.dlx",true,false,false, args);

        return dlq;
    }

    /**
     *  Order exchanger 
     * @return
     */
    @Bean
    public Exchange orderExchange(){
        Map<String, Object> args = new HashMap<>();
        DirectExchange exchange = new DirectExchange("x.order", true, false, args);

        return exchange;
    }

    /**
     *  Delay queue switch 
     * @return
     */
    @Bean
    public Exchange ttlExchange(){
        Map<String, Object> args = new HashMap<>();
        return new DirectExchange("x.ttl", true, false, args);
    }

    /**
     *  Dead letter queue exchanger 
     * @return
     */
    @Bean
    public Exchange dlxExchange(){
        Map<String, Object> args = new HashMap<>();
        DirectExchange exchange = new DirectExchange("x.dlx", true, false, args);
        return exchange;
    }

    /**
     *  Used to send orders , Doing distributed transactions MQ
     * @return
     */
    @Bean
    public Binding orderBinding(){
        return BindingBuilder.bind(orderQueue())
                .to(orderExchange())
                .with("k.order")
                .noargs();
    }

    /**
     *  Delay queue binding for waiting for user payment 
     * @return
     */
    @Bean
    public Binding ttlBinding(){
        return BindingBuilder.bind(ttlQueue())
                .to(ttlExchange())
                .with("k.ttl")
                .noargs();
    }

    /**
     *  Dead letter queue binding for payment timeout cancellation of user orders 
     * @return
     */
    @Bean
    public Binding dlxBinding(){
        return BindingBuilder.bind(dlxQueue())
                .to(dlxExchange())
                .with("k.dlx")
                .noargs();
    }

}

Create an order listener , Used to listen to normal payment submission and overtime cancellation of orders .

/**
 *  Normal order payment process monitoring 
 */
@Component
public class OrderNormalListener {

    @RabbitListener(queues = "q.order",ackMode = "MANUAL")
    public void onMessage(Order order , Channel channel , Message message) throws IOException {
        System.out.println(" Write to database ");
        System.out.println(order);

        for (OrderDetail detail : order.getDetails()){
            System.out.println(detail);
        }

        channel.basicAck(message.getMessageProperties().getDeliveryTag() , false);
    }

}

Create order timeout auto cancel listener , Listening is the dead letter queue .

/**
 *  Order timeout automatically cancels listening 
 */
@Component
public class OrderCancelListener implements ChannelAwareMessageListener {

    @Override
    @RabbitListener(queues = "q.dlx" , ackMode = "MANUAL")
    public void onMessage(Message message, Channel channel) throws Exception {
        String orderId = new String(message.getBody());
        System.out.println(" Cancellation of order :" + orderId);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

For order submission , After normal submission, one more copy will be delivered to the delay queue at the same time , Used to delay cancellation .

//  Build order information 
Order order = new Order();
order.setUserId(IdUtils.generateUserId());
order.setOrderId(IdUtils.generateOrderId());
//  Set the status to be paid 
order.setStatus(OrderStatus.TO_BE_PAYED.toString());
order.setDetails(details);

//  Deliver a message 
CorrelationData correlationData = new CorrelationData();
rabbitTemplate.convertAndSend("x.order","k.order", order, correlationData);
//  Synchronization waiting , It can be set as asynchronous callback 
CorrelationData.Confirm confirm = correlationData.getFuture().get();
//  Determine whether the sent message gets broker The confirmation of 
boolean confirmAck = confirm.isAck();
if (confirmAck){
    //  Send delay wait message 
    rabbitTemplate.convertAndSend("x.ttl","k.ttl" , order.getOrderId());
}

Four 、 summary

Come here , This basically realizes the idea of automatic cancellation of the whole order delay , But there are still problems .

Post the order message to MQ After that, one more copy should be delivered to the delay queue , There may be cases where the first delivery is successful but the delivery to the delay queue fails , Here you need to rely on distributed locks or add compensation mechanisms ; There are also coding problems ,MQ Queue names are best extracted , Of course it's just demo, There is no such standard , If it's product development , These all need the best regulation , Convenient later maintenance .

原网站

版权声明
本文为[The forest wind is at ease]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/02/202202221430525666.html