当前位置:网站首页>websocket多线程发送消息报错TEXT_PARTIAL_WRITING--自旋锁替换synchronized独占锁的使用案例
websocket多线程发送消息报错TEXT_PARTIAL_WRITING--自旋锁替换synchronized独占锁的使用案例
2022-08-03 22:22:00 【Master_Shifu_】
1.背景:
websocket在使用多线程推送消息时,如果大量消息中存在同一个session的会话的发送多条消息,如果两个线程同时拿到这个session发送消息就会报错
The remote endpoint was in state [TEXT_PARTIAL_WRITING] which is an invalid stat e for called method
原因就是: handlerA和handlerB两个方法有可能同时执行,当A或者B方法遍历到某一个session并且调用sendMessage发送消息的时候,另外一个方法也正好也在使用相同的session发送另外一个消息(同一个session消息发送冲突了,也就是说同一个时刻,多个线程向一个socket写数据冲突了),就会报上面的异常。
2.解决办法:
解决方法其实很简单,就是在发送消息的时候加上一把锁,(保证一个session在某个时刻不会被调用多次)
/** * 发送信息给指定用户 * @param clientId * @param message * @return */
public static boolean sendMessageToUser(String clientId, TextMessage message) {
WebSocketSession session = socketMap.get(clientId);
if(session==null) {
return false;
}
if (!session.isOpen()) {
return false;
}
try {
synchronized (session) {
session.sendMessage(message);
}
} catch (IOException e) {
e.printStackTrace();
}
return true;
}
/** * 广播消息出去 * @param message * @return */
public static void sendMessageToAll(TextMessage message) {
for (WebSocketSession session : socketMap.values()) {
if(session==null||!session.isOpen()) {
continue;
}
try {
synchronized (session) {
session.sendMessage(message);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
3.存在问题:
根据之前的文章Java并发编程–公平锁的实现和使用案例
不难发现,以下代码块会阻塞所有线程按顺序发送,这样即使多线程调用sendMessageToUser也是单线程的效率的顺序发送,失去了多线程发送的消息意义,所以以上方法可以解决问题,但是本质上并没有提高效率
synchronized (session) {
session.sendMessage(message);
}
4.自旋锁解决思路:
只锁同一个session对象,让获取同一个session的线程只能按顺序获取,又一个线程发送消息的动作耗时非常短,可以考虑将独占锁简化为使用CAS的自旋锁
根据之前的文章Java并发编程–自旋锁的实现和使用
自旋锁实现锁使用的是对Thread 的null与非空来判断单前线程是否被锁,那我们把session从lock()方法中传入,那可以把锁的对象换成session,从而进行自旋加锁和解锁
自旋锁实现工具类
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.atomic.AtomicReference;
@Slf4j
public class ReentrantSpinLock {
private static AtomicReference<Object> sign = new AtomicReference<>();
public static <T> void lock(T t, boolean reentrantFlag) {
// 若可重入标志为true, 且若尝试加锁的对象和已加的锁中的对象相同,可重入,并加锁成功
if (reentrantFlag && t == sign.get()) {
return;
}
//If the lock is not acquired, it can be spun through CAS
while (!sign.compareAndSet(null, t)) {
// DO nothing
log.info("自旋一会.");
}
}
public static <T> void unlock(T t) {
// 锁的线程和目前的线程相等时,才允许释放锁
if (t == sign.get()) {
sign.compareAndSet(t, null);
}
}
}
其中reentrantFlag为某一个线程已经获取到session,但是还需要调用其他的session的方法是否可重入标志位
5.修改后的自旋锁锁定发送消息代码
/** * 发送信息给指定用户 * @param clientId * @param message * @return */
public static boolean sendMessageToUser(String clientId, TextMessage message) {
WebSocketSession session = socketMap.get(clientId);
if(session==null) {
return false;
}
if (!session.isOpen()) {
return false;
}
try {
// 自旋锁保证不同线程的同一个session消息按照顺序发送
ReentrantSpinLock.lock(session, false);
session.sendMessage(message);
} catch (IOException e) {
e.printStackTrace();
} finally {
ReentrantSpinLock.unlock(session);
}
return true;
}
/** * 广播消息出去 * @param message * @return */
public static void sendMessageToAll(TextMessage message) {
for (WebSocketSession session : socketMap.values()) {
if(session==null||!session.isOpen()) {
continue;
}
try {
// 自旋锁保证不同线程的同一个session消息按照顺序发送
ReentrantSpinLock.lock(session, false);
session.sendMessage(message);
} catch (IOException e) {
e.printStackTrace();
} finally {
ReentrantSpinLock.unlock(session);
}
}
}
参考:
The remote endpoint was in state [TEXT_PARTIAL_WRITING] which is an invalid stat e for called method
Java并发编程–公平锁的实现和使用案例
Java并发编程–自旋锁的实现和使用
边栏推荐
猜你喜欢
随机推荐
目标检测技术研究现状及发展趋势
九种方式,教你读取 resources 目录下的文件路径
UVa 10003 - Cutting Sticks(白书,区间DP)
L2-041 插松枝
Cisco ike2 IPSec configuration
如何创建一个Web项目
HCIP第十六天
Codeup brushing notes - simple simulation
4年工作经验,多线程间的5种通信方式都说不出来,你敢信?
Lift, Splat, Shoot: Encoding Images from Arbitrary Camera Rigs by Implicitly Unprojecting to 3D 论文笔记
【bug】汇总Elipse项目中代码中文乱码解决方法!
2022的七夕,奉上7个精美的表白代码,同时教大家快速改源码自用
Makefile
LabVIEW code generation error 61056
Optimize the query (work in progress)
21天打卡挑战学习MySQL——《MySQL工具的使用》第一周 第二篇
物联网新零售模式,引领购物新潮流
FinClip最易用的智能电视小程序
Makefile
一文带你了解软件测试是干什么的?薪资高不高?0基础怎么学?