当前位置:网站首页>websocket多线程发送消息报错TEXT_PARTIAL_WRITING--自旋锁替换synchronized独占锁的使用案例
websocket多线程发送消息报错TEXT_PARTIAL_WRITING--自旋锁替换synchronized独占锁的使用案例
2022-08-03 22:22:00 【Master_Shifu_】
1.背景:
websocket在使用多线程推送消息时,如果大量消息中存在同一个session的会话的发送多条消息,如果两个线程同时拿到这个session发送消息就会报错
The remote endpoint was in state [TEXT_PARTIAL_WRITING] which is an invalid stat e for called method
原因就是: handlerA和handlerB两个方法有可能同时执行,当A或者B方法遍历到某一个session并且调用sendMessage发送消息的时候,另外一个方法也正好也在使用相同的session发送另外一个消息(同一个session消息发送冲突了,也就是说同一个时刻,多个线程向一个socket写数据冲突了),就会报上面的异常。
2.解决办法:
解决方法其实很简单,就是在发送消息的时候加上一把锁,(保证一个session在某个时刻不会被调用多次)
/** * 发送信息给指定用户 * @param clientId * @param message * @return */
public static boolean sendMessageToUser(String clientId, TextMessage message) {
WebSocketSession session = socketMap.get(clientId);
if(session==null) {
return false;
}
if (!session.isOpen()) {
return false;
}
try {
synchronized (session) {
session.sendMessage(message);
}
} catch (IOException e) {
e.printStackTrace();
}
return true;
}
/** * 广播消息出去 * @param message * @return */
public static void sendMessageToAll(TextMessage message) {
for (WebSocketSession session : socketMap.values()) {
if(session==null||!session.isOpen()) {
continue;
}
try {
synchronized (session) {
session.sendMessage(message);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
3.存在问题:
根据之前的文章Java并发编程–公平锁的实现和使用案例
不难发现,以下代码块会阻塞所有线程按顺序发送,这样即使多线程调用sendMessageToUser也是单线程的效率的顺序发送,失去了多线程发送的消息意义,所以以上方法可以解决问题,但是本质上并没有提高效率
synchronized (session) {
session.sendMessage(message);
}
4.自旋锁解决思路:
只锁同一个session对象,让获取同一个session的线程只能按顺序获取,又一个线程发送消息的动作耗时非常短,可以考虑将独占锁简化为使用CAS的自旋锁
根据之前的文章Java并发编程–自旋锁的实现和使用
自旋锁实现锁使用的是对Thread 的null与非空来判断单前线程是否被锁,那我们把session从lock()方法中传入,那可以把锁的对象换成session,从而进行自旋加锁和解锁
自旋锁实现工具类
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.atomic.AtomicReference;
@Slf4j
public class ReentrantSpinLock {
private static AtomicReference<Object> sign = new AtomicReference<>();
public static <T> void lock(T t, boolean reentrantFlag) {
// 若可重入标志为true, 且若尝试加锁的对象和已加的锁中的对象相同,可重入,并加锁成功
if (reentrantFlag && t == sign.get()) {
return;
}
//If the lock is not acquired, it can be spun through CAS
while (!sign.compareAndSet(null, t)) {
// DO nothing
log.info("自旋一会.");
}
}
public static <T> void unlock(T t) {
// 锁的线程和目前的线程相等时,才允许释放锁
if (t == sign.get()) {
sign.compareAndSet(t, null);
}
}
}
其中reentrantFlag为某一个线程已经获取到session,但是还需要调用其他的session的方法是否可重入标志位
5.修改后的自旋锁锁定发送消息代码
/** * 发送信息给指定用户 * @param clientId * @param message * @return */
public static boolean sendMessageToUser(String clientId, TextMessage message) {
WebSocketSession session = socketMap.get(clientId);
if(session==null) {
return false;
}
if (!session.isOpen()) {
return false;
}
try {
// 自旋锁保证不同线程的同一个session消息按照顺序发送
ReentrantSpinLock.lock(session, false);
session.sendMessage(message);
} catch (IOException e) {
e.printStackTrace();
} finally {
ReentrantSpinLock.unlock(session);
}
return true;
}
/** * 广播消息出去 * @param message * @return */
public static void sendMessageToAll(TextMessage message) {
for (WebSocketSession session : socketMap.values()) {
if(session==null||!session.isOpen()) {
continue;
}
try {
// 自旋锁保证不同线程的同一个session消息按照顺序发送
ReentrantSpinLock.lock(session, false);
session.sendMessage(message);
} catch (IOException e) {
e.printStackTrace();
} finally {
ReentrantSpinLock.unlock(session);
}
}
}
参考:
The remote endpoint was in state [TEXT_PARTIAL_WRITING] which is an invalid stat e for called method
Java并发编程–公平锁的实现和使用案例
Java并发编程–自旋锁的实现和使用
边栏推荐
- DO280管理和监控OpenShift平台--资源限制
- 【MySQL进阶】数据库与表的创建和管理
- Embedded systems: overview
- 21天打卡挑战学习MySQL—Day第一周 第一篇
- 全球观之地理部分
- Internet user account information management regulations come into effect today: must crack down on account trading and gray products
- Shell编程的条件语句
- 七夕快乐!
- Makefile
- [MySQL Advanced] Creation and Management of Databases and Tables
猜你喜欢

CAS:908007-17-0_Biotin-azide_Biotin azide

Lift, Splat, Shoot: Encoding Images from Arbitrary Camera Rigs by Implicitly Unprojecting to 3D 论文笔记

CAS:1192802-98-4_UV 裂解的生物素-PEG2-叠氮

Embedded Systems: GPIO

2022-08-03 oracle执行慢SQL-Q17对比

2022-08-02 mysql/stonedb slow SQL-Q18 - memory usage surge analysis

Adobe是什么?

数据一致性:双删为什么要延时?

2022-08-02 mysql/stonedb慢SQL-Q18-内存使用暴涨分析

CAS: 1192802-98-4 _uv cracking of biotin - PEG2 - azide
随机推荐
golang写的存储引擎,基于b+树,mmap
趣链的产品构架
UVa 1025 - A Spy in the Metro(白书)
Kubernetes入门到精通-Operator 模式
Nine ways to teach you to read the file path in the resources directory
『百日百题 · 基础篇』备战面试,坚持刷题 第四话——循环语句!
老板:公司系统太多,能不能实现账号互通?
Makefile
嵌入式系统:概述
Lift, Splat, Shoot: Encoding Images from Arbitrary Camera Rigs by Implicitly Unprojecting to 3D 论文笔记
Data_web(九)mongodb增量同步到mongodb
VLAN实验
封装、包、访问权限修饰符、static变量
472. Concatenated Words
382. Linked List Random Node
384. Shuffle an Array
October 2019 Twice SQL Injection
Gains double award | know micro easily won the "2021 China digital twin solution suppliers in excellence" "made in China's smart excellent recommended products" double award!
Canvas App中点击图标生成PDF并保存到Dataverse中
[N1CTF 2018]eating_cms