03-websocket并发发送消息出错
前言:本文主要介绍在spring框架下使用spring集成的websocket,并发发送消息,演示websocket消息在并发场景下出错
1 环境搭建
这里,我是用的是spring集成的websocket,当然也可以使用javax.websocket,都可以达到效果。
1.1 总体流程介绍
-
使用spring集成的websocket,参考上一篇文章的内容,这是文章的链接地址:https://blog.csdn.net/weixin_43716785/article/details/135713471?spm=1001.2014.3001.5502
-
书写controller,调用里面的方法实现并发 发送消息
controller里面的代码说明:
变量i1
final int i1 = 50表示给每个客户端发送50条消息
参数:@RequestParam Integer count
先用apiPost7建立3个或者更多ws链接,指定id为1,2,3,这样方便测试;
在调用测试接口时,传一个count值,使用for循环,1~count的值就是客户端对应的Id(我设定的是scanPoint,一样的)
然后根据循环体的值,新开一个线程,并发给指定的客户端的发送消息
import lombok.extern.slf4j.Slf4j; import org.example.service.MyWebSocketHandler; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; import java.io.IOException; /** * 类描述 * * @author cjia * @date 2024/1/21 上午 11:34 */ @Slf4j @RestController public class TestController { @Autowired private MyWebSocketHandler webSocketHandler; /** * 每个session 50条消息 */ final int i1 = 50; /** * 先用apiPost7建立几个连接,我建立的3个,对应的id分别是1,2,3 * 这样的话,count传值3,就可以模拟给这3个客户端发消息 * * @param count * @return * @throws IOException */ @GetMapping("/test-01") public Object test(@RequestParam Integer count) { for (int j = 0; j < i1; j++) { for (int i = 1; i <= count; i++) { int finalI = i; new Thread(()->{ try { //handler提供的发送消息的方法 webSocketHandler.sendMessage(String.valueOf(finalI), "测试消息" + finalI); log.info("================>当前线程返回:{}", Thread.currentThread().getName()); } catch (Exception e) { throw new RuntimeException(e); } }).start(); } } //返回的结果,不用在意 return "{'code':200,'message':'接口调用成功!'}"; } }
-
先使用apiPost7建立3个websocket连接,如图
2 调用接口,并发发送wbsocket消息
控制台结果截图:
看到了吗?是不是很熟悉的错误提示
The remote endpoint was in state [TEXT_PARTIAL_WRITING] which is an invalid state for called method
顺手翻译下:远程端点处于状态 [TEXT_PARTIAL_WRITING],这对于被调用方法来说是无效状态
3 websocket消息在并发场景下出错的原因解析
3.1 并发场景下很难调试,我这里先给出发送单条消息的流程
先给出流程图,可以看出最终还是调用的tomcat的方法发送消息
在 StandardWebSocketSession的sendxxx方法 中通过 getNativeSession().getBasicRemote().sendText(message.getPayload(),
message.isLast())来发送消息。注意:getBasicRemote()和getAsyncRemote()是不一样的,getAsyncRemote()是支持异步或者并发的,
getBasicRemote()是同步的,那,那为什么spring不用这个方法呢?还是先埋个坑,后面在填,这里主要介绍整个发送消息的流程,有一个整体的印象即可
这个是tomcat最终消息发送流程图
下面是debug的截图
注意:下面是一大波图
3.1.1 我这里通过服务端收到消息,直接返回该消息,来调用发送发消息的api,也可以单独写接口来发送单条消息
3.1.2 进入自定义handler发送消息的方法
3.1.3 进入spring的AbstractWebSocketSession
在 sendMessage方法中,根据消息的类型来确定需要调用的发送方法(我使用的消息类型是Textmessage)
3.1.4 进入StandardWebSocketSession(AbstractWebSocketSession的实现类)
通过sendTextMessage方法调用tomcat的websocket的发送方法,这里往后,就进入tomcat的依赖中
3.1.5 进入tomcat的WsRemoteEndpointBasic
通过sendXX发送消息
3.1.6 发送消息前的状态机进行状态校验
3.1.7 状态校验通过,修改当前的状态值,不通过,抛出异常
3.1.7 发送消息
3.1.8 状态复位,准备下一次消息发送
3.2 并发场景下出错的原因
通过上面的tomcat最终消息发送流程图,可以看出,WsRemoteEndpointImplBase内部有一个维护状态的状态机,通过状态机的状态来判断是否可以发送消息,但是这个状态明显是线程不安全的,因为还未来得及状态复位的时候,这个时候来了一条消息,又需要给当前的客户端发送一条消息,但是由于上一条消息还未发送完,状态还未来得及复位,所以后面消息就会发送失败,直到状态复位之后,才能发送消息给客户端
4 总结
由于tomcat的websocket的内部的状态机,每次在发送消息的时候,都会进行状态校验,但是并发场景下,当状态来不及复位的时候,就会发送消息异常。我感觉是不是websocket本身只支持同时接收一条消息,就像打电话一样,你一次只能接一个人的电话,当你在通话中的时候,另一个人打进来,就提示正在通话中,相当于通话失败。
后面会继续更新解决方案的文章