Android 设计一个长链接消息分发机制

背景:

之前,项目里有条业务线单独实现了长链接,只负责自己业务的消息收发。但是当第二个业务也需要长链接功能的时候,运维建议不要再建立新的长链接,这样服务器压力太大。因此,设计一套长链接消息分发框架成了当务之急。


设计分析:

首先,分析下业务具体使用场景。目前不存在所有用户在App全生命周期都需要长链接的业务,因此不能App启动就打开它,这样服务器压力大,也会提高App耗电性能。那么,多个业务在对长链接的使用时间就存在三种可能,相离、相交和包含,所以我们需要当存在业务使用长链接时开启,没有业务使用的时候关闭。

那么,这个框架基本模型就出来了,最底层 Service 负责控制长链接的开关、消息收发还有心跳,中间层客户端负责管理消息的分发,最顶层就是实际业务层负责处理业务消息。

计划通,开始实践


首先,实现一个 WebSocketManager,这部分可以参考网上其他博客,然后在Service中对其实例化。然后,考虑到公司有多个App,以后可能多个客户端共用一个Service,那么考虑使用 list 保持跟踪当前所有注册的客户端,所有客户端都解除注册的时候,关闭长链接,并自动关闭 service

/**
 * 长链接服务
 *
 * 服务启动后,必须发送注册消息,否则无法接收到长链接消息
 */
class WebSocketService : Service() {

    companion object {

        /**
         * 命令服务注册客户端,接收来自服务的回调。Message的replyTo字段必须是应该发送回调
         * 的客户端的Messenger。
         *
         * Command to the service to register a client, receiving callbacks
         * from the service.  The Message's replyTo field must be a Messenger of
         * the client where callbacks should be sent.
         */
        const val MSG_REGISTER_CLIENT = 1

        /**
         * 命令服务注销客户端,或停止接收来自服务的回调。消息的 replyTo 字段必须是的
         * Messenger 客户端,如之前用 MSG_REGISTER_client 给出的。
         *
         * Command to the service to unregister a client, ot stop receiving callbacks
         * from the service.  The Message's replyTo field must be a Messenger of
         * the client as previously given with MSG_REGISTER_CLIENT.
         */
        const val MSG_UNREGISTER_CLIENT = 2

        /**
         * 命令服务通过 socket 链接发送客户端传递消息
         *
         * Command to the service send client's message by socket.
         */
        const val MSG_SEND_MESSAGE_TO_SERVICE = 3

        /**
         * 服务分发长链接返回的消息
         *
         * Message will be sent by the service to any registered clients.
         */
        const val MSG_SEND_MESSAGE_TO_CLIENT = 4
    }

    /**
     * 长链接管理类
     *
     * web socket manager
     */
    private val manager: WsManager by lazy {
        WsManagerImpl.INSTANCE.apply {
            mContext = this@WebSocketService
            wsStatusListener = object : WsStatusListener {
                override fun onOpen(response: Response?) {
                    mClients.forEach {
                        try {
                            it.send(Message.obtain(null, MSG_REGISTER_CLIENT))
                        } catch (e: RemoteException) {
                        }
                    }
                }

                override fun onMessage(text: String?) {
                    mClients.forEach {
                        val bundle = Bundle()
                        bundle.putString("msg", text)
                        try {
                            it.send(Message.obtain(null, MSG_SEND_MESSAGE_TO_CLIENT, bundle))
                        } catch (e: RemoteException) {
                        }
                    }
                }

                override fun onClosed(code: Int, reason: String?) {
                    mClients.forEach {
                        try {
                            it.send(Message.obtain(null, MSG_UNREGISTER_CLIENT))
                        } catch (e: RemoteException) {
                        }
                    }
                    mClients.clear()
                    stopSelf()
                }
            }
        }
    }

    /**
     * 保持跟踪当前所有注册的客户端
     * 所有客户端都解除注册的时候,自动关闭 service
     *
     * Keeps track of all current registered clients.
     * Stop self when all current registered clients unregistered.
     */
    private val mClients: MutableList<Messenger> = mutableListOf()

    /**
     * 来自客户端的传入消息的处理程序
     *
     * Handler of incoming messages from clients
     */
    internal inner class IncomingHandler(
//        context: Context,
//        private val applicationContext: Context = context.applicationContext
    ) : Handler() {
        override fun handleMessage(msg: Message) {
            when (msg.what) {
                MSG_REGISTER_CLIENT -> {
                    mClients.add(msg.replyTo)
                }
                MSG_UNREGISTER_CLIENT -> {
                    mClients.remove(msg.replyTo)
                    if (mClients.size == 0) {
                        // 关闭长链接
                        manager.stopConnect()
                        // 所有客户端都解除注册的时候,自动关闭 service
                        stopSelf()
                    }
                }
                MSG_SEND_MESSAGE_TO_SERVICE -> {
                    // 通过长链接发送
                    if (msg.obj is Bundle) {
                        val socketModule: SocketModel = JSON.parseObject(
                            (msg.obj as Bundle).getString("msg") ?: "",
                            SocketModel::class.java
                        )
                        manager.sendMessage(socketModule.m, socketModule.bt)
                    }
                }
                else -> {
                    super.handleMessage(msg)
                }
            }
        }
    }

    /**
     * 我们为客户端发布的目标,以将消息发送到IncomingHandler。
     *
     * Target we publish for clients to send messages to IncomingHandler.
     */
    private val mMessenger: Messenger = Messenger(IncomingHandler())

    /**
     * 获取长链接token
     */
    private fun getToken() {
        val params = JSONObject()
        params["token"] = GlobalParams.getAppToken()
        params["appType"] = "zo"
        params["checkWay"] = "cas"
        OkServeService.requestGateWayService(this,
            "${ConstantValue.WEB_GATEWAY_ROOT}troy/dp/login",
            params, object : LocalCallback<SocketTokenBean?>() {
                override fun onResult(t: SocketTokenBean?) {
                    // 长链接建立
                    t?.let {
                        manager.initWebSocket(
                            url = it.wsAddress,
                            token = it.token,
                            heartBeatTick = (it.heartBeatTick ?: 31) - 1
                        )
                    }
                }

                override fun onFailure(str: String?) {
                    mClients.forEach {
                        try {
                            it.send(Message.obtain(null, MSG_UNREGISTER_CLIENT))
                        } catch (e: RemoteException) {
                        }
                    }
                    mClients.clear()
                    stopSelf()
                }
            })
    }

    override fun onCreate() {
        super.onCreate()
        getToken()
    }

    override fun onStartCommand(intent: Intent?, flags: Int, startId: Int): Int {
//        return super.onStartCommand(intent, flags, startId)
        return START_STICKY
    }

    /**
     * 当绑定到服务时,我们向信使返回一个接口,用于向服务发送消息。
     *
     * When binding to the service, we return an interface to our messenger
     * for sending messages to the service.
     */
    override fun onBind(intent: Intent?): IBinder? = mMessenger.binder

    override fun onDestroy() {
        super.onDestroy()
        // 如果长链接么有关闭,关闭长链接
        manager.stopConnect()
    }
}

在中间层,通过观察者模式,给业务发送消息,并且判断有多少个业务正在使用长链接,然后在他们都使用完成后,通知 Service 客户端解除注册。当然,为了防止业务开启和结束次数不匹配,增加 b_type 字段用来去重

(虽然信任同事能力,但是也相信屎山带来的意外,这叫什么,墨菲定律)

class WebSocketManager private constructor() {

    companion object {
        val INSTANCE = Holder.instance

        /**
         * 注册成功
         */
        const val MSG_REGISTER = 1

        /**
         * 长链接断开解除注册
         */
        const val MSG_UNREGISTER = 2

        /**
         * 消息发送给服务端
         */
        const val MSG_SEND_MESSAGE_TO_SERVICE = 3

        /**
         * 服务分发长链接返回的消息
         */
        const val MSG_SEND_MESSAGE_TO_CLIENT = 4
    }

    private object Holder {
        val instance = WebSocketManager()
    }

    private val listener: MutableList<SocketManagerBean> = mutableListOf()

    /**
     * 来自服务端的传入消息的处理程序
     *
     * Handler of incoming messages from clients
     */
    internal inner class IncomingHandler : Handler() {
        override fun handleMessage(msg: Message) {
            when (msg.what) {
                WebSocketService.MSG_REGISTER_CLIENT -> {
                    listener.forEach {
                        it.onOpenListener?.invoke()
                    }
                }
                WebSocketService.MSG_UNREGISTER_CLIENT -> {
                    listener.forEach {
                        it.onCloseListener?.invoke()
                    }
                    listener.clear()
                }
                WebSocketService.MSG_SEND_MESSAGE_TO_CLIENT -> {
                    listener.forEach {
                        if (msg.obj is Bundle) {
                            val jsonString = (msg.obj as Bundle).getString("msg")
                            it.msgTextListener?.invoke(jsonString)
                        }
                    }
                }
                else -> {
                    super.handleMessage(msg)
                }
            }
        }
    }

    private val mMessenger: Messenger = Messenger(IncomingHandler())

    private var mService: Messenger? = null

    private var mConnection = object : ServiceConnection {
        override fun onServiceConnected(name: ComponentName?, service: IBinder?) {
            mService = Messenger(service)

            // 将 manager 注册到 service 中,否则无法接收回掉
            mService?.send(Message.obtain(null, MSG_REGISTER).also {
                it.replyTo = mMessenger
            })
        }

        override fun onServiceDisconnected(name: ComponentName?) {
            mService = null
        }

    }

    /**
     * 接收长链接消息通知
     * </p>
     * 如果此时没有启动过长链接,则启动长链接
     *
     * @param bType 业务名称/业务类型,保证业务场景不会重复接收
     * @param msgListener 消息监听
     */
    fun subscribeMessage(
        context: Context,
        bType: String? = null,
        msgListener: OnMessageListener? = null
    ) {
        if (listener.size == 0) {
            Intent(context.applicationContext, WebSocketService::class.java).also {
                context.applicationContext.startService(it)
            }
        }
        // 如果bType存在,那么将之前的监听替换
        var needAdd = true
        if (!bType.isNullOrEmpty()) {
            listener.forEach {
                if (it.bType == bType) {
                    it.onOpenListener = {
                        msgListener?.onOpen()
                    }
                    it.msgTextListener = {
                        try {
                            val data: JSONObject? = JSONObject.parseObject(it)
                            if (data != null) {
                                msgListener?.onMessage(data)
                            }
                        } catch (e: Exception) {
                            LogUtils.e("ZRWEBSOCKET", e.message)
                        }
                    }
                    it.onCloseListener = {
                        msgListener?.onClosed()
                    }
                    // 替换后触发一次onOpen()
                    msgListener?.onOpen()
                    needAdd = false
                }
            }
        }
        if (needAdd) {
            listener.add(SocketManagerBean(
                bType = bType,
                onOpenListener = {
                    msgListener?.onOpen()
                },
                msgTextListener = {
                    try {
//                            val data: BEAN = JSONObject.parseObject(it, object : TypeReference<BEAN>() {})
                        val data: JSONObject? = JSONObject.parseObject(it)
                        if (data != null) {
                            msgListener?.onMessage(data)
                        }
                    } catch (e: Exception) {
                        LogUtils.e("ZRWEBSOCKET", e.message)
                    }
                },
                onCloseListener = {
                    msgListener?.onClosed()
                }
            ))
        }
        // 判断长链接服务是否已经启动,如果没启动则启动
        Intent(context.applicationContext, WebSocketService::class.java).also {
            context.applicationContext.bindService(it, mConnection, Context.BIND_AUTO_CREATE)
        }
    }

    /**
     * 解除消息接收
     */
    fun unSubscribeMessage(bType: String?) {
        val list = mutableListOf<SocketManagerBean>()
        listener.forEach {
            if (it.bType != bType) {
                list.add(it)
            }
        }

        listener.clear()
        listener.addAll(list)

        if (listener.size == 0) {
            mService?.send(Message.obtain(null, MSG_UNREGISTER).also {
                it.replyTo = mMessenger
            })
        }
    }

    /**
     * 发送消息
     */
    fun <BEAN> sendMsg(bType: String?, msg: BEAN?) {
        val model = SocketModel().apply {
            bt = bType
            m = msg
        }
        val bundle = Bundle().also {
            it.putString("msg", JSON.toJSONString(model))
        }
        mService?.send(Message.obtain(null, MSG_SEND_MESSAGE_TO_SERVICE, bundle).also {
            it.replyTo = mMessenger
        })
    }
}

data class SocketManagerBean(
    val bType: String? = null,
    var onOpenListener: (() -> Unit)? = null,
    var msgTextListener: ((String?) -> Unit)? = null,
    var onCloseListener: (() -> Unit)? = null,
)

interface OnMessageListener {

    fun onOpen() {}

    fun onMessage(msg: JSONObject)

    fun onClosed() {}
}

这样,到了最顶层的实际业务,我们的使用就非常简单了,记得用完后 unSubscribeMessage 就好~

WebSocketManager.Companion.INSTANCE.subscribeMessage(mContext.applicationContext, B_TYPE,
                    object : OnMessageListener {
                        override fun onOpen() {
                            LogUtils.i(TAG, "long socket is connected")
                        }

                        override fun onMessage(msg: JSONObject) {
                            LogUtils.i(TAG, "long socket return message$msg")
                        }

                        override fun onClosed() {
                            LogUtils.i(TAG, "long socket is disconnected")
                        }
                    })

最后,这套设计仅满足了基本的接收和分发功能,但是缺乏消息阻塞队列等,同时由于应用性质,这里也没有考虑保活操作

最最后,如果这篇文章存在不足,欢迎大家前来指正,也欢迎各位朋友一起讨论交流~