背景:
之前,项目里有条业务线单独实现了长链接,只负责自己业务的消息收发。但是当第二个业务也需要长链接功能的时候,运维建议不要再建立新的长链接,这样服务器压力太大。因此,设计一套长链接消息分发框架成了当务之急。
设计分析:
首先,分析下业务具体使用场景。目前不存在所有用户在App全生命周期都需要长链接的业务,因此不能App启动就打开它,这样服务器压力大,也会提高App耗电性能。那么,多个业务在对长链接的使用时间就存在三种可能,相离、相交和包含,所以我们需要当存在业务使用长链接时开启,没有业务使用的时候关闭。
那么,这个框架基本模型就出来了,最底层 Service 负责控制长链接的开关、消息收发还有心跳,中间层客户端负责管理消息的分发,最顶层就是实际业务层负责处理业务消息。
计划通,开始实践
首先,实现一个 WebSocketManager,这部分可以参考网上其他博客,然后在Service中对其实例化。然后,考虑到公司有多个App,以后可能多个客户端共用一个Service,那么考虑使用 list 保持跟踪当前所有注册的客户端,所有客户端都解除注册的时候,关闭长链接,并自动关闭 service /** * 长链接服务 * * 服务启动后,必须发送注册消息,否则无法接收到长链接消息 */ class WebSocketService : Service() { companion object { /** * 命令服务注册客户端,接收来自服务的回调。Message的replyTo字段必须是应该发送回调 * 的客户端的Messenger。 * * Command to the service to register a client, receiving callbacks * from the service. The Message's replyTo field must be a Messenger of * the client where callbacks should be sent. */ const val MSG_REGISTER_CLIENT = 1 /** * 命令服务注销客户端,或停止接收来自服务的回调。消息的 replyTo 字段必须是的 * Messenger 客户端,如之前用 MSG_REGISTER_client 给出的。 * * Command to the service to unregister a client, ot stop receiving callbacks * from the service. The Message's replyTo field must be a Messenger of * the client as previously given with MSG_REGISTER_CLIENT. */ const val MSG_UNREGISTER_CLIENT = 2 /** * 命令服务通过 socket 链接发送客户端传递消息 * * Command to the service send client's message by socket. */ const val MSG_SEND_MESSAGE_TO_SERVICE = 3 /** * 服务分发长链接返回的消息 * * Message will be sent by the service to any registered clients. */ const val MSG_SEND_MESSAGE_TO_CLIENT = 4 } /** * 长链接管理类 * * web socket manager */ private val manager: WsManager by lazy { WsManagerImpl.INSTANCE.apply { mContext = this@WebSocketService wsStatusListener = object : WsStatusListener { override fun onOpen(response: Response?) { mClients.forEach { try { it.send(Message.obtain(null, MSG_REGISTER_CLIENT)) } catch (e: RemoteException) { } } } override fun onMessage(text: String?) { mClients.forEach { val bundle = Bundle() bundle.putString("msg", text) try { it.send(Message.obtain(null, MSG_SEND_MESSAGE_TO_CLIENT, bundle)) } catch (e: RemoteException) { } } } override fun onClosed(code: Int, reason: String?) { mClients.forEach { try { it.send(Message.obtain(null, MSG_UNREGISTER_CLIENT)) } catch (e: RemoteException) { } } mClients.clear() stopSelf() } } } } /** * 保持跟踪当前所有注册的客户端 * 所有客户端都解除注册的时候,自动关闭 service * * Keeps track of all current registered clients. * Stop self when all current registered clients unregistered. */ private val mClients: MutableList<Messenger> = mutableListOf() /** * 来自客户端的传入消息的处理程序 * * Handler of incoming messages from clients */ internal inner class IncomingHandler( // context: Context, // private val applicationContext: Context = context.applicationContext ) : Handler() { override fun handleMessage(msg: Message) { when (msg.what) { MSG_REGISTER_CLIENT -> { mClients.add(msg.replyTo) } MSG_UNREGISTER_CLIENT -> { mClients.remove(msg.replyTo) if (mClients.size == 0) { // 关闭长链接 manager.stopConnect() // 所有客户端都解除注册的时候,自动关闭 service stopSelf() } } MSG_SEND_MESSAGE_TO_SERVICE -> { // 通过长链接发送 if (msg.obj is Bundle) { val socketModule: SocketModel = JSON.parseObject( (msg.obj as Bundle).getString("msg") ?: "", SocketModel::class.java ) manager.sendMessage(socketModule.m, socketModule.bt) } } else -> { super.handleMessage(msg) } } } } /** * 我们为客户端发布的目标,以将消息发送到IncomingHandler。 * * Target we publish for clients to send messages to IncomingHandler. */ private val mMessenger: Messenger = Messenger(IncomingHandler()) /** * 获取长链接token */ private fun getToken() { val params = JSONObject() params["token"] = GlobalParams.getAppToken() params["appType"] = "zo" params["checkWay"] = "cas" OkServeService.requestGateWayService(this, "${ConstantValue.WEB_GATEWAY_ROOT}troy/dp/login", params, object : LocalCallback<SocketTokenBean?>() { override fun onResult(t: SocketTokenBean?) { // 长链接建立 t?.let { manager.initWebSocket( url = it.wsAddress, token = it.token, heartBeatTick = (it.heartBeatTick ?: 31) - 1 ) } } override fun onFailure(str: String?) { mClients.forEach { try { it.send(Message.obtain(null, MSG_UNREGISTER_CLIENT)) } catch (e: RemoteException) { } } mClients.clear() stopSelf() } }) } override fun onCreate() { super.onCreate() getToken() } override fun onStartCommand(intent: Intent?, flags: Int, startId: Int): Int { // return super.onStartCommand(intent, flags, startId) return START_STICKY } /** * 当绑定到服务时,我们向信使返回一个接口,用于向服务发送消息。 * * When binding to the service, we return an interface to our messenger * for sending messages to the service. */ override fun onBind(intent: Intent?): IBinder? = mMessenger.binder override fun onDestroy() { super.onDestroy() // 如果长链接么有关闭,关闭长链接 manager.stopConnect() } }
在中间层,通过观察者模式,给业务发送消息,并且判断有多少个业务正在使用长链接,然后在他们都使用完成后,通知 Service 客户端解除注册。当然,为了防止业务开启和结束次数不匹配,增加 b_type 字段用来去重
(虽然信任同事能力,但是也相信屎山带来的意外,这叫什么,墨菲定律)
class WebSocketManager private constructor() { companion object { val INSTANCE = Holder.instance /** * 注册成功 */ const val MSG_REGISTER = 1 /** * 长链接断开解除注册 */ const val MSG_UNREGISTER = 2 /** * 消息发送给服务端 */ const val MSG_SEND_MESSAGE_TO_SERVICE = 3 /** * 服务分发长链接返回的消息 */ const val MSG_SEND_MESSAGE_TO_CLIENT = 4 } private object Holder { val instance = WebSocketManager() } private val listener: MutableList<SocketManagerBean> = mutableListOf() /** * 来自服务端的传入消息的处理程序 * * Handler of incoming messages from clients */ internal inner class IncomingHandler : Handler() { override fun handleMessage(msg: Message) { when (msg.what) { WebSocketService.MSG_REGISTER_CLIENT -> { listener.forEach { it.onOpenListener?.invoke() } } WebSocketService.MSG_UNREGISTER_CLIENT -> { listener.forEach { it.onCloseListener?.invoke() } listener.clear() } WebSocketService.MSG_SEND_MESSAGE_TO_CLIENT -> { listener.forEach { if (msg.obj is Bundle) { val jsonString = (msg.obj as Bundle).getString("msg") it.msgTextListener?.invoke(jsonString) } } } else -> { super.handleMessage(msg) } } } } private val mMessenger: Messenger = Messenger(IncomingHandler()) private var mService: Messenger? = null private var mConnection = object : ServiceConnection { override fun onServiceConnected(name: ComponentName?, service: IBinder?) { mService = Messenger(service) // 将 manager 注册到 service 中,否则无法接收回掉 mService?.send(Message.obtain(null, MSG_REGISTER).also { it.replyTo = mMessenger }) } override fun onServiceDisconnected(name: ComponentName?) { mService = null } } /** * 接收长链接消息通知 * </p> * 如果此时没有启动过长链接,则启动长链接 * * @param bType 业务名称/业务类型,保证业务场景不会重复接收 * @param msgListener 消息监听 */ fun subscribeMessage( context: Context, bType: String? = null, msgListener: OnMessageListener? = null ) { if (listener.size == 0) { Intent(context.applicationContext, WebSocketService::class.java).also { context.applicationContext.startService(it) } } // 如果bType存在,那么将之前的监听替换 var needAdd = true if (!bType.isNullOrEmpty()) { listener.forEach { if (it.bType == bType) { it.onOpenListener = { msgListener?.onOpen() } it.msgTextListener = { try { val data: JSONObject? = JSONObject.parseObject(it) if (data != null) { msgListener?.onMessage(data) } } catch (e: Exception) { LogUtils.e("ZRWEBSOCKET", e.message) } } it.onCloseListener = { msgListener?.onClosed() } // 替换后触发一次onOpen() msgListener?.onOpen() needAdd = false } } } if (needAdd) { listener.add(SocketManagerBean( bType = bType, onOpenListener = { msgListener?.onOpen() }, msgTextListener = { try { // val data: BEAN = JSONObject.parseObject(it, object : TypeReference<BEAN>() {}) val data: JSONObject? = JSONObject.parseObject(it) if (data != null) { msgListener?.onMessage(data) } } catch (e: Exception) { LogUtils.e("ZRWEBSOCKET", e.message) } }, onCloseListener = { msgListener?.onClosed() } )) } // 判断长链接服务是否已经启动,如果没启动则启动 Intent(context.applicationContext, WebSocketService::class.java).also { context.applicationContext.bindService(it, mConnection, Context.BIND_AUTO_CREATE) } } /** * 解除消息接收 */ fun unSubscribeMessage(bType: String?) { val list = mutableListOf<SocketManagerBean>() listener.forEach { if (it.bType != bType) { list.add(it) } } listener.clear() listener.addAll(list) if (listener.size == 0) { mService?.send(Message.obtain(null, MSG_UNREGISTER).also { it.replyTo = mMessenger }) } } /** * 发送消息 */ fun <BEAN> sendMsg(bType: String?, msg: BEAN?) { val model = SocketModel().apply { bt = bType m = msg } val bundle = Bundle().also { it.putString("msg", JSON.toJSONString(model)) } mService?.send(Message.obtain(null, MSG_SEND_MESSAGE_TO_SERVICE, bundle).also { it.replyTo = mMessenger }) } } data class SocketManagerBean( val bType: String? = null, var onOpenListener: (() -> Unit)? = null, var msgTextListener: ((String?) -> Unit)? = null, var onCloseListener: (() -> Unit)? = null, ) interface OnMessageListener { fun onOpen() {} fun onMessage(msg: JSONObject) fun onClosed() {} }
这样,到了最顶层的实际业务,我们的使用就非常简单了,记得用完后 unSubscribeMessage 就好~
WebSocketManager.Companion.INSTANCE.subscribeMessage(mContext.applicationContext, B_TYPE, object : OnMessageListener { override fun onOpen() { LogUtils.i(TAG, "long socket is connected") } override fun onMessage(msg: JSONObject) { LogUtils.i(TAG, "long socket return message$msg") } override fun onClosed() { LogUtils.i(TAG, "long socket is disconnected") } })
最后,这套设计仅满足了基本的接收和分发功能,但是缺乏消息阻塞队列等,同时由于应用性质,这里也没有考虑保活操作
最最后,如果这篇文章存在不足,欢迎大家前来指正,也欢迎各位朋友一起讨论交流~