当前位置:网站首页>Websocket multi-threaded sending message error TEXT_PARTIAL_WRITING--Use case of spin lock replacing synchronized exclusive lock
Websocket multi-threaded sending message error TEXT_PARTIAL_WRITING--Use case of spin lock replacing synchronized exclusive lock
2022-08-03 22:25:00 【Master_Shifu_】
1.背景:
websocketWhen using multithreading to push messages,If the same one exists in a large number of messagessessionThe session's sending multiple messages,If two threads get this at the same timesession发送消息就会报错
The remote endpoint was in state [TEXT_PARTIAL_WRITING] which is an invalid stat e for called method
原因就是: handlerA和handlerBIt is possible for both methods to execute simultaneously,当A或者BThe method traverses to a certain onesession并且调用sendMessage发送消息的时候,Another method also happens to be using the samesessionSend another message(同一个sessionMessage sending conflicted,That is, at the same time,Multiple threads to onesocketWrite data conflict),就会报上面的异常.
2.解决办法:
解决方法其实很简单,It is to add a lock when sending a message,(保证一个sessionwill not be called more than once at a time)
/** * 发送信息给指定用户 * @param clientId * @param message * @return */
public static boolean sendMessageToUser(String clientId, TextMessage message) {
WebSocketSession session = socketMap.get(clientId);
if(session==null) {
return false;
}
if (!session.isOpen()) {
return false;
}
try {
synchronized (session) {
session.sendMessage(message);
}
} catch (IOException e) {
e.printStackTrace();
}
return true;
}
/** * Broadcast the message out * @param message * @return */
public static void sendMessageToAll(TextMessage message) {
for (WebSocketSession session : socketMap.values()) {
if(session==null||!session.isOpen()) {
continue;
}
try {
synchronized (session) {
session.sendMessage(message);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
3.存在问题:
根据之前的文章Java并发编程–Implementation and use cases of fair locks
不难发现,The following code block blocks all threads to send in order,This way even multi-threaded callssendMessageToUserIt is also a single-threaded efficient sequential send,Lost the meaning of messages sent by multiple threads,So the above method can solve the problem,But it doesn't substantially improve efficiency
synchronized (session) {
session.sendMessage(message);
}
4.Spin lock solution ideas:
only lock the samesession对象,Let get the same onesessionThreads can only be acquired sequentially,The action of another thread sending a message takes a very short time,Consider simplifying the use of exclusive locksCAS的自旋锁
根据之前的文章Java并发编程–Implementation and use of spinlocks
Spinlocks are implemented using the right lockThread 的nullNand is not empty to determine whether the single-front thread is locked,那我们把session从lock()方法中传入,That can replace the object of the lock withsession,Thereby performing spin locking and unlocking
Spinlock implementation tool class
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.atomic.AtomicReference;
@Slf4j
public class ReentrantSpinLock {
private static AtomicReference<Object> sign = new AtomicReference<>();
public static <T> void lock(T t, boolean reentrantFlag) {
// If the reentrant flag is true, And if the object trying to lock is the same as the object already in the lock,可重入,并加锁成功
if (reentrantFlag && t == sign.get()) {
return;
}
//If the lock is not acquired, it can be spun through CAS
while (!sign.compareAndSet(null, t)) {
// DO nothing
log.info("自旋一会.");
}
}
public static <T> void unlock(T t) {
// 锁的线程和目前的线程相等时,才允许释放锁
if (t == sign.get()) {
sign.compareAndSet(t, null);
}
}
}
其中reentrantFlagIt has been acquired for a certain threadsession,But also need to call othersessionWhether the method is reentrant flag
5.Modified spinlock lock send message code
/** * 发送信息给指定用户 * @param clientId * @param message * @return */
public static boolean sendMessageToUser(String clientId, TextMessage message) {
WebSocketSession session = socketMap.get(clientId);
if(session==null) {
return false;
}
if (!session.isOpen()) {
return false;
}
try {
// Spinlocks guarantee the same for different threadssessionMessages are sent in order
ReentrantSpinLock.lock(session, false);
session.sendMessage(message);
} catch (IOException e) {
e.printStackTrace();
} finally {
ReentrantSpinLock.unlock(session);
}
return true;
}
/** * Broadcast the message out * @param message * @return */
public static void sendMessageToAll(TextMessage message) {
for (WebSocketSession session : socketMap.values()) {
if(session==null||!session.isOpen()) {
continue;
}
try {
// Spinlocks guarantee the same for different threadssessionMessages are sent in order
ReentrantSpinLock.lock(session, false);
session.sendMessage(message);
} catch (IOException e) {
e.printStackTrace();
} finally {
ReentrantSpinLock.unlock(session);
}
}
}
参考:
The remote endpoint was in state [TEXT_PARTIAL_WRITING] which is an invalid stat e for called method
Java并发编程–Implementation and use cases of fair locks
Java并发编程–Implementation and use of spinlocks
边栏推荐
- Conditional Statements for Shell Programming
- 云计算国内外发展现状
- 嵌入式系统:时钟
- 网络基础学习系列四(网络层,数据链路层和一些其他重要协议或技术)
- 【day6】类与对象、封装、构造方法
- Shell编程的条件语句
- 如何基于WPF写一款数据库文档管理工具(二)
- Teach a Man How to Fish - How to Query the Properties of Any SAP UI5 Control by Yourself Documentation and Technical Implementation Details Demo
- Cisco ike2 IPSec configuration
- Unification of east-west and north-south communications
猜你喜欢
随机推荐
How to write a database document management tool based on WPF (2)
【MySQL进阶】数据库与表的创建和管理
VIM操作
472. Concatenated Words
Cisco ike2 IPSec configuration
Optimize the query (work in progress)
21天打卡挑战学习MySQL——《MySQL工具的使用》第一周 第二篇
什么是memoization,它有什么用?
Internet user account information management regulations come into effect today: must crack down on account trading and gray products
UVa 437 - The Tower of Babylon (White Book)
FinClip最易用的智能电视小程序
382. Linked List Random Node
Makefile
CAS:122567-66-2_DSPE-Biotin_DSPE-Biotin
golang写的存储引擎,基于b+树,mmap
488. Zuma Game
Embedded Systems: Clocks
Bytebase database schema change management tool
目标检测技术研究现状及发展趋势
466. Count The Repetitions

![[N1CTF 2018]eating_cms](/img/09/3599d889d9007eb45c6eab3043f0c4.png)







