当前位置:网站首页>Websocket multi-threaded sending message error TEXT_PARTIAL_WRITING--Use case of spin lock replacing synchronized exclusive lock

Websocket multi-threaded sending message error TEXT_PARTIAL_WRITING--Use case of spin lock replacing synchronized exclusive lock

2022-08-03 22:25:00 Master_Shifu_

1.背景:

websocketWhen using multithreading to push messages,If the same one exists in a large number of messagessessionThe session's sending multiple messages,If two threads get this at the same timesession发送消息就会报错

The remote endpoint was in state [TEXT_PARTIAL_WRITING] which is an invalid stat e for called method

原因就是: handlerA和handlerBIt is possible for both methods to execute simultaneously,当A或者BThe method traverses to a certain onesession并且调用sendMessage发送消息的时候,Another method also happens to be using the samesessionSend another message(同一个sessionMessage sending conflicted,That is, at the same time,Multiple threads to onesocketWrite data conflict),就会报上面的异常.

2.解决办法:

解决方法其实很简单,It is to add a lock when sending a message,(保证一个sessionwill not be called more than once at a time)

   /** * 发送信息给指定用户 * @param clientId * @param message * @return */
    public  static boolean sendMessageToUser(String clientId, TextMessage message) {
    
    	WebSocketSession session = socketMap.get(clientId);
    	if(session==null) {
    
    		return false;
    	}
    	if (!session.isOpen()) {
    
    		return false;
        }
    	try {
    
    		synchronized (session) {
    
    			session.sendMessage(message);
    		}
		} catch (IOException e) {
    
			e.printStackTrace();
		}
        return true;
    }
    /** * Broadcast the message out * @param message * @return */
    public static void sendMessageToAll(TextMessage message) {
    
    	for (WebSocketSession session : socketMap.values()) {
    
    		if(session==null||!session.isOpen()) {
    
        		continue;
        	}
    		try {
    
    			synchronized (session) {
    
    				session.sendMessage(message);
    			}
    		} catch (IOException e) {
    
    			e.printStackTrace();
    		}
		}
    }

3.存在问题:

根据之前的文章Java并发编程–Implementation and use cases of fair locks
不难发现,The following code block blocks all threads to send in order,This way even multi-threaded callssendMessageToUserIt is also a single-threaded efficient sequential send,Lost the meaning of messages sent by multiple threads,So the above method can solve the problem,But it doesn't substantially improve efficiency

synchronized (session) {
    
 	session.sendMessage(message);
}

4.Spin lock solution ideas:

only lock the samesession对象,Let get the same onesessionThreads can only be acquired sequentially,The action of another thread sending a message takes a very short time,Consider simplifying the use of exclusive locksCAS的自旋锁
根据之前的文章Java并发编程–Implementation and use of spinlocks
Spinlocks are implemented using the right lockThread 的nullNand is not empty to determine whether the single-front thread is locked,那我们把session从lock()方法中传入,That can replace the object of the lock withsession,Thereby performing spin locking and unlocking

Spinlock implementation tool class

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.atomic.AtomicReference;

@Slf4j
public class ReentrantSpinLock {
    

    private static AtomicReference<Object> sign = new AtomicReference<>();
  
    public static <T> void lock(T t, boolean reentrantFlag) {
    
      	// If the reentrant flag is true, And if the object trying to lock is the same as the object already in the lock,可重入,并加锁成功
        if (reentrantFlag && t == sign.get()) {
    
            return;
        }
        //If the lock is not acquired, it can be spun through CAS
        while (!sign.compareAndSet(null, t)) {
    
            // DO nothing
            log.info("自旋一会.");
        }
    }
  
    public static <T> void unlock(T t) {
    
      	// 锁的线程和目前的线程相等时,才允许释放锁
        if (t == sign.get()) {
    
                sign.compareAndSet(t, null);
            }
        }
}

其中reentrantFlagIt has been acquired for a certain threadsession,But also need to call othersessionWhether the method is reentrant flag

5.Modified spinlock lock send message code

   /** * 发送信息给指定用户 * @param clientId * @param message * @return */
    public  static boolean sendMessageToUser(String clientId, TextMessage message) {
    
    	WebSocketSession session = socketMap.get(clientId);
    	if(session==null) {
    
    		return false;
    	}
    	if (!session.isOpen()) {
    
    		return false;
        }
    	try {
    
    		// Spinlocks guarantee the same for different threadssessionMessages are sent in order
            ReentrantSpinLock.lock(session, false);
    		session.sendMessage(message);
    	} catch (IOException e) {
    
			e.printStackTrace();
		} finally {
    
         	ReentrantSpinLock.unlock(session);
        }
        return true;
    }
    /** * Broadcast the message out * @param message * @return */
    public static void sendMessageToAll(TextMessage message) {
    
    	for (WebSocketSession session : socketMap.values()) {
    
    		if(session==null||!session.isOpen()) {
    
        		continue;
        	}
    	try {
    
    		// Spinlocks guarantee the same for different threadssessionMessages are sent in order
            ReentrantSpinLock.lock(session, false);
    		session.sendMessage(message);
    	} catch (IOException e) {
    
			e.printStackTrace();
		} finally {
    
         	ReentrantSpinLock.unlock(session);
        }
		}
    }

参考:
The remote endpoint was in state [TEXT_PARTIAL_WRITING] which is an invalid stat e for called method
Java并发编程–Implementation and use cases of fair locks
Java并发编程–Implementation and use of spinlocks

原网站

版权声明
本文为[Master_Shifu_]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/215/202208032222087004.html