03-websocket并发发送消息出错

03-websocket并发发送消息出错

前言:本文主要介绍在spring框架下使用spring集成的websocket,并发发送消息,演示websocket消息在并发场景下出错

1 环境搭建

这里,我是用的是spring集成的websocket,当然也可以使用javax.websocket,都可以达到效果。

1.1 总体流程介绍

  1. 使用spring集成的websocket,参考上一篇文章的内容,这是文章的链接地址:https://blog.csdn.net/weixin_43716785/article/details/135713471?spm=1001.2014.3001.5502

  2. 书写controller,调用里面的方法实现并发 发送消息

    controller里面的代码说明:

    变量i1

    final int i1 = 50表示给每个客户端发送50条消息

    参数:@RequestParam Integer count

    先用apiPost7建立3个或者更多ws链接,指定id为1,2,3,这样方便测试;

    在调用测试接口时,传一个count值,使用for循环,1~count的值就是客户端对应的Id(我设定的是scanPoint,一样的)

    然后根据循环体的值,新开一个线程,并发给指定的客户端的发送消息

    import lombok.extern.slf4j.Slf4j;
    import org.example.service.MyWebSocketHandler;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RequestParam;
    import org.springframework.web.bind.annotation.RestController;
    
    import java.io.IOException;
    
    /**
     * 类描述
     *
     * @author cjia
     * @date 2024/1/21 上午 11:34
     */
    @Slf4j
    @RestController
    public class TestController {
        @Autowired
        private MyWebSocketHandler webSocketHandler;
    
        /**
         * 每个session 50条消息
         */
        final int i1 = 50;
    
        /**
         * 先用apiPost7建立几个连接,我建立的3个,对应的id分别是1,2,3
         * 这样的话,count传值3,就可以模拟给这3个客户端发消息
         *
         * @param count
         * @return
         * @throws IOException
         */
        @GetMapping("/test-01")
        public Object test(@RequestParam Integer count) {
            for (int j = 0; j < i1; j++) {
                for (int i = 1; i <= count; i++) {
                    int finalI = i;
                    new Thread(()->{
                        try {
                            //handler提供的发送消息的方法
                            webSocketHandler.sendMessage(String.valueOf(finalI), "测试消息" + finalI);
                            log.info("================>当前线程返回:{}", Thread.currentThread().getName());
                        } catch (Exception e) {
                            throw new RuntimeException(e);
                        }
                    }).start();
    
                }
            }
            //返回的结果,不用在意
            return  "{'code':200,'message':'接口调用成功!'}";
        }
    
    }
    
  3. 先使用apiPost7建立3个websocket连接,如图

    在这里插入图片描述

2 调用接口,并发发送wbsocket消息

在这里插入图片描述

控制台结果截图

在这里插入图片描述

看到了吗?是不是很熟悉的错误提示

The remote endpoint was in state [TEXT_PARTIAL_WRITING] which is an invalid state for called method

顺手翻译下:远程端点处于状态 [TEXT_PARTIAL_WRITING],这对于被调用方法来说是无效状态

3 websocket消息在并发场景下出错的原因解析

3.1 并发场景下很难调试,我这里先给出发送单条消息的流程

先给出流程图,可以看出最终还是调用的tomcat的方法发送消息

StandardWebSocketSession的sendxxx方法 中通过 getNativeSession().getBasicRemote().sendText(message.getPayload(),

message.isLast())来发送消息。注意:getBasicRemote()和getAsyncRemote()是不一样的,getAsyncRemote()是支持异步或者并发的,

getBasicRemote()是同步的,那,那为什么spring不用这个方法呢?还是先埋个坑,后面在填,这里主要介绍整个发送消息的流程,有一个整体的印象即可

在这里插入图片描述


这个是tomcat最终消息发送流程图

在这里插入图片描述


下面是debug的截图

注意:下面是一大波图

3.1.1 我这里通过服务端收到消息,直接返回该消息,来调用发送发消息的api,也可以单独写接口来发送单条消息

在这里插入图片描述

3.1.2 进入自定义handler发送消息的方法

在这里插入图片描述

3.1.3 进入spring的AbstractWebSocketSession

在 sendMessage方法中,根据消息的类型来确定需要调用的发送方法(我使用的消息类型是Textmessage)

在这里插入图片描述

3.1.4 进入StandardWebSocketSession(AbstractWebSocketSession的实现类)

通过sendTextMessage方法调用tomcat的websocket的发送方法,这里往后,就进入tomcat的依赖中

在这里插入图片描述

3.1.5 进入tomcat的WsRemoteEndpointBasic

通过sendXX发送消息

在这里插入图片描述

3.1.6 发送消息前的状态机进行状态校验

在这里插入图片描述

3.1.7 状态校验通过,修改当前的状态值,不通过,抛出异常

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

3.1.7 发送消息

在这里插入图片描述

3.1.8 状态复位,准备下一次消息发送

在这里插入图片描述

3.2 并发场景下出错的原因

通过上面的tomcat最终消息发送流程图,可以看出,WsRemoteEndpointImplBase内部有一个维护状态的状态机,通过状态机的状态来判断是否可以发送消息,但是这个状态明显是线程不安全的,因为还未来得及状态复位的时候,这个时候来了一条消息,又需要给当前的客户端发送一条消息,但是由于上一条消息还未发送完,状态还未来得及复位,所以后面消息就会发送失败,直到状态复位之后,才能发送消息给客户端

在这里插入图片描述

4 总结

由于tomcat的websocket的内部的状态机,每次在发送消息的时候,都会进行状态校验,但是并发场景下,当状态来不及复位的时候,就会发送消息异常。我感觉是不是websocket本身只支持同时接收一条消息,就像打电话一样,你一次只能接一个人的电话,当你在通话中的时候,另一个人打进来,就提示正在通话中,相当于通话失败。


后面会继续更新解决方案的文章