当前位置:网站首页>websocket多线程发送消息报错TEXT_PARTIAL_WRITING--自旋锁替换synchronized独占锁的使用案例
websocket多线程发送消息报错TEXT_PARTIAL_WRITING--自旋锁替换synchronized独占锁的使用案例
2022-08-03 22:22:00 【Master_Shifu_】
1.背景:
websocket在使用多线程推送消息时,如果大量消息中存在同一个session的会话的发送多条消息,如果两个线程同时拿到这个session发送消息就会报错
The remote endpoint was in state [TEXT_PARTIAL_WRITING] which is an invalid stat e for called method
原因就是: handlerA和handlerB两个方法有可能同时执行,当A或者B方法遍历到某一个session并且调用sendMessage发送消息的时候,另外一个方法也正好也在使用相同的session发送另外一个消息(同一个session消息发送冲突了,也就是说同一个时刻,多个线程向一个socket写数据冲突了),就会报上面的异常。
2.解决办法:
解决方法其实很简单,就是在发送消息的时候加上一把锁,(保证一个session在某个时刻不会被调用多次)
/** * 发送信息给指定用户 * @param clientId * @param message * @return */
public static boolean sendMessageToUser(String clientId, TextMessage message) {
WebSocketSession session = socketMap.get(clientId);
if(session==null) {
return false;
}
if (!session.isOpen()) {
return false;
}
try {
synchronized (session) {
session.sendMessage(message);
}
} catch (IOException e) {
e.printStackTrace();
}
return true;
}
/** * 广播消息出去 * @param message * @return */
public static void sendMessageToAll(TextMessage message) {
for (WebSocketSession session : socketMap.values()) {
if(session==null||!session.isOpen()) {
continue;
}
try {
synchronized (session) {
session.sendMessage(message);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
3.存在问题:
根据之前的文章Java并发编程–公平锁的实现和使用案例
不难发现,以下代码块会阻塞所有线程按顺序发送,这样即使多线程调用sendMessageToUser也是单线程的效率的顺序发送,失去了多线程发送的消息意义,所以以上方法可以解决问题,但是本质上并没有提高效率
synchronized (session) {
session.sendMessage(message);
}
4.自旋锁解决思路:
只锁同一个session对象,让获取同一个session的线程只能按顺序获取,又一个线程发送消息的动作耗时非常短,可以考虑将独占锁简化为使用CAS的自旋锁
根据之前的文章Java并发编程–自旋锁的实现和使用
自旋锁实现锁使用的是对Thread 的null与非空来判断单前线程是否被锁,那我们把session从lock()方法中传入,那可以把锁的对象换成session,从而进行自旋加锁和解锁
自旋锁实现工具类
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.atomic.AtomicReference;
@Slf4j
public class ReentrantSpinLock {
private static AtomicReference<Object> sign = new AtomicReference<>();
public static <T> void lock(T t, boolean reentrantFlag) {
// 若可重入标志为true, 且若尝试加锁的对象和已加的锁中的对象相同,可重入,并加锁成功
if (reentrantFlag && t == sign.get()) {
return;
}
//If the lock is not acquired, it can be spun through CAS
while (!sign.compareAndSet(null, t)) {
// DO nothing
log.info("自旋一会.");
}
}
public static <T> void unlock(T t) {
// 锁的线程和目前的线程相等时,才允许释放锁
if (t == sign.get()) {
sign.compareAndSet(t, null);
}
}
}
其中reentrantFlag为某一个线程已经获取到session,但是还需要调用其他的session的方法是否可重入标志位
5.修改后的自旋锁锁定发送消息代码
/** * 发送信息给指定用户 * @param clientId * @param message * @return */
public static boolean sendMessageToUser(String clientId, TextMessage message) {
WebSocketSession session = socketMap.get(clientId);
if(session==null) {
return false;
}
if (!session.isOpen()) {
return false;
}
try {
// 自旋锁保证不同线程的同一个session消息按照顺序发送
ReentrantSpinLock.lock(session, false);
session.sendMessage(message);
} catch (IOException e) {
e.printStackTrace();
} finally {
ReentrantSpinLock.unlock(session);
}
return true;
}
/** * 广播消息出去 * @param message * @return */
public static void sendMessageToAll(TextMessage message) {
for (WebSocketSession session : socketMap.values()) {
if(session==null||!session.isOpen()) {
continue;
}
try {
// 自旋锁保证不同线程的同一个session消息按照顺序发送
ReentrantSpinLock.lock(session, false);
session.sendMessage(message);
} catch (IOException e) {
e.printStackTrace();
} finally {
ReentrantSpinLock.unlock(session);
}
}
}
参考:
The remote endpoint was in state [TEXT_PARTIAL_WRITING] which is an invalid stat e for called method
Java并发编程–公平锁的实现和使用案例
Java并发编程–自旋锁的实现和使用
边栏推荐
猜你喜欢

Network basic learning series four (network layer, data link layer and some other important protocols or technologies)

CAS:122567-66-2_DSPE-Biotin_DSPE-Biotin

Teach a Man How to Fish - How to Query the Properties of Any SAP UI5 Control by Yourself Documentation and Technical Implementation Details Demo

CAS: 1192802-98-4 _uv cracking of biotin - PEG2 - azide

封装、包、访问权限修饰符、static变量

CAS:1797415-74-7_TAMRA-Azide-PEG-Biotin

PowerMockup 4.3.4::::Crack

Bytebase database schema change management tool

LabVIEW code generation error 61056

嵌入式开发:嵌入式基础——代码和数据空间揭秘
随机推荐
Bytebase数据库 Schema 变更管理工具
386. Lexicographical Numbers
Golang第一章:入门
云平台建设解决方案
Optimize the query (work in progress)
一些思考:腾讯股价为何持续都低
【云原生实用技巧】使用 skopeo 批量同步 helm chart 依赖镜像
嵌入式开发:嵌入式基础——代码和数据空间揭秘
伴随着元宇宙、web3.0等概念的兴起,数字人、数字场景等诸多数字化的形态开始出现
优化查询(工作中)
UVa 1025 - A Spy in the Metro(白书)
Embedded systems: overview
HCIP第十五天
九种方式,教你读取 resources 目录下的文件路径
CAS:1260586-88-6_Biotin-C5-Azide_Biotin-C5-Azide
易观分析:2022年Q2中国网络零售B2C市场交易规模达23444.7亿元
Flink--Join以及Flink函数
for循环练习题
Lift, Splat, Shoot: Encoding Images from Arbitrary Camera Rigs by Implicitly Unprojecting to 3D 论文笔记
CAS:1797415-74-7_TAMRA-Azide-PEG-Biotin