背景:
之前,项目里有条业务线单独实现了长链接,只负责自己业务的消息收发。但是当第二个业务也需要长链接功能的时候,运维建议不要再建立新的长链接,这样服务器压力太大。因此,设计一套长链接消息分发框架成了当务之急。
设计分析:
首先,分析下业务具体使用场景。目前不存在所有用户在App全生命周期都需要长链接的业务,因此不能App启动就打开它,这样服务器压力大,也会提高App耗电性能。那么,多个业务在对长链接的使用时间就存在三种可能,相离、相交和包含,所以我们需要当存在业务使用长链接时开启,没有业务使用的时候关闭。
那么,这个框架基本模型就出来了,最底层 Service 负责控制长链接的开关、消息收发还有心跳,中间层客户端负责管理消息的分发,最顶层就是实际业务层负责处理业务消息。
计划通,开始实践
首先,实现一个 WebSocketManager,这部分可以参考网上其他博客,然后在Service中对其实例化。然后,考虑到公司有多个App,以后可能多个客户端共用一个Service,那么考虑使用 list 保持跟踪当前所有注册的客户端,所有客户端都解除注册的时候,关闭长链接,并自动关闭 service
/**
* 长链接服务
*
* 服务启动后,必须发送注册消息,否则无法接收到长链接消息
*/
class WebSocketService : Service() {
companion object {
/**
* 命令服务注册客户端,接收来自服务的回调。Message的replyTo字段必须是应该发送回调
* 的客户端的Messenger。
*
* Command to the service to register a client, receiving callbacks
* from the service. The Message's replyTo field must be a Messenger of
* the client where callbacks should be sent.
*/
const val MSG_REGISTER_CLIENT = 1
/**
* 命令服务注销客户端,或停止接收来自服务的回调。消息的 replyTo 字段必须是的
* Messenger 客户端,如之前用 MSG_REGISTER_client 给出的。
*
* Command to the service to unregister a client, ot stop receiving callbacks
* from the service. The Message's replyTo field must be a Messenger of
* the client as previously given with MSG_REGISTER_CLIENT.
*/
const val MSG_UNREGISTER_CLIENT = 2
/**
* 命令服务通过 socket 链接发送客户端传递消息
*
* Command to the service send client's message by socket.
*/
const val MSG_SEND_MESSAGE_TO_SERVICE = 3
/**
* 服务分发长链接返回的消息
*
* Message will be sent by the service to any registered clients.
*/
const val MSG_SEND_MESSAGE_TO_CLIENT = 4
}
/**
* 长链接管理类
*
* web socket manager
*/
private val manager: WsManager by lazy {
WsManagerImpl.INSTANCE.apply {
mContext = this@WebSocketService
wsStatusListener = object : WsStatusListener {
override fun onOpen(response: Response?) {
mClients.forEach {
try {
it.send(Message.obtain(null, MSG_REGISTER_CLIENT))
} catch (e: RemoteException) {
}
}
}
override fun onMessage(text: String?) {
mClients.forEach {
val bundle = Bundle()
bundle.putString("msg", text)
try {
it.send(Message.obtain(null, MSG_SEND_MESSAGE_TO_CLIENT, bundle))
} catch (e: RemoteException) {
}
}
}
override fun onClosed(code: Int, reason: String?) {
mClients.forEach {
try {
it.send(Message.obtain(null, MSG_UNREGISTER_CLIENT))
} catch (e: RemoteException) {
}
}
mClients.clear()
stopSelf()
}
}
}
}
/**
* 保持跟踪当前所有注册的客户端
* 所有客户端都解除注册的时候,自动关闭 service
*
* Keeps track of all current registered clients.
* Stop self when all current registered clients unregistered.
*/
private val mClients: MutableList<Messenger> = mutableListOf()
/**
* 来自客户端的传入消息的处理程序
*
* Handler of incoming messages from clients
*/
internal inner class IncomingHandler(
// context: Context,
// private val applicationContext: Context = context.applicationContext
) : Handler() {
override fun handleMessage(msg: Message) {
when (msg.what) {
MSG_REGISTER_CLIENT -> {
mClients.add(msg.replyTo)
}
MSG_UNREGISTER_CLIENT -> {
mClients.remove(msg.replyTo)
if (mClients.size == 0) {
// 关闭长链接
manager.stopConnect()
// 所有客户端都解除注册的时候,自动关闭 service
stopSelf()
}
}
MSG_SEND_MESSAGE_TO_SERVICE -> {
// 通过长链接发送
if (msg.obj is Bundle) {
val socketModule: SocketModel = JSON.parseObject(
(msg.obj as Bundle).getString("msg") ?: "",
SocketModel::class.java
)
manager.sendMessage(socketModule.m, socketModule.bt)
}
}
else -> {
super.handleMessage(msg)
}
}
}
}
/**
* 我们为客户端发布的目标,以将消息发送到IncomingHandler。
*
* Target we publish for clients to send messages to IncomingHandler.
*/
private val mMessenger: Messenger = Messenger(IncomingHandler())
/**
* 获取长链接token
*/
private fun getToken() {
val params = JSONObject()
params["token"] = GlobalParams.getAppToken()
params["appType"] = "zo"
params["checkWay"] = "cas"
OkServeService.requestGateWayService(this,
"${ConstantValue.WEB_GATEWAY_ROOT}troy/dp/login",
params, object : LocalCallback<SocketTokenBean?>() {
override fun onResult(t: SocketTokenBean?) {
// 长链接建立
t?.let {
manager.initWebSocket(
url = it.wsAddress,
token = it.token,
heartBeatTick = (it.heartBeatTick ?: 31) - 1
)
}
}
override fun onFailure(str: String?) {
mClients.forEach {
try {
it.send(Message.obtain(null, MSG_UNREGISTER_CLIENT))
} catch (e: RemoteException) {
}
}
mClients.clear()
stopSelf()
}
})
}
override fun onCreate() {
super.onCreate()
getToken()
}
override fun onStartCommand(intent: Intent?, flags: Int, startId: Int): Int {
// return super.onStartCommand(intent, flags, startId)
return START_STICKY
}
/**
* 当绑定到服务时,我们向信使返回一个接口,用于向服务发送消息。
*
* When binding to the service, we return an interface to our messenger
* for sending messages to the service.
*/
override fun onBind(intent: Intent?): IBinder? = mMessenger.binder
override fun onDestroy() {
super.onDestroy()
// 如果长链接么有关闭,关闭长链接
manager.stopConnect()
}
}
在中间层,通过观察者模式,给业务发送消息,并且判断有多少个业务正在使用长链接,然后在他们都使用完成后,通知 Service 客户端解除注册。当然,为了防止业务开启和结束次数不匹配,增加 b_type 字段用来去重
(虽然信任同事能力,但是也相信屎山带来的意外,这叫什么,墨菲定律)
class WebSocketManager private constructor() {
companion object {
val INSTANCE = Holder.instance
/**
* 注册成功
*/
const val MSG_REGISTER = 1
/**
* 长链接断开解除注册
*/
const val MSG_UNREGISTER = 2
/**
* 消息发送给服务端
*/
const val MSG_SEND_MESSAGE_TO_SERVICE = 3
/**
* 服务分发长链接返回的消息
*/
const val MSG_SEND_MESSAGE_TO_CLIENT = 4
}
private object Holder {
val instance = WebSocketManager()
}
private val listener: MutableList<SocketManagerBean> = mutableListOf()
/**
* 来自服务端的传入消息的处理程序
*
* Handler of incoming messages from clients
*/
internal inner class IncomingHandler : Handler() {
override fun handleMessage(msg: Message) {
when (msg.what) {
WebSocketService.MSG_REGISTER_CLIENT -> {
listener.forEach {
it.onOpenListener?.invoke()
}
}
WebSocketService.MSG_UNREGISTER_CLIENT -> {
listener.forEach {
it.onCloseListener?.invoke()
}
listener.clear()
}
WebSocketService.MSG_SEND_MESSAGE_TO_CLIENT -> {
listener.forEach {
if (msg.obj is Bundle) {
val jsonString = (msg.obj as Bundle).getString("msg")
it.msgTextListener?.invoke(jsonString)
}
}
}
else -> {
super.handleMessage(msg)
}
}
}
}
private val mMessenger: Messenger = Messenger(IncomingHandler())
private var mService: Messenger? = null
private var mConnection = object : ServiceConnection {
override fun onServiceConnected(name: ComponentName?, service: IBinder?) {
mService = Messenger(service)
// 将 manager 注册到 service 中,否则无法接收回掉
mService?.send(Message.obtain(null, MSG_REGISTER).also {
it.replyTo = mMessenger
})
}
override fun onServiceDisconnected(name: ComponentName?) {
mService = null
}
}
/**
* 接收长链接消息通知
* </p>
* 如果此时没有启动过长链接,则启动长链接
*
* @param bType 业务名称/业务类型,保证业务场景不会重复接收
* @param msgListener 消息监听
*/
fun subscribeMessage(
context: Context,
bType: String? = null,
msgListener: OnMessageListener? = null
) {
if (listener.size == 0) {
Intent(context.applicationContext, WebSocketService::class.java).also {
context.applicationContext.startService(it)
}
}
// 如果bType存在,那么将之前的监听替换
var needAdd = true
if (!bType.isNullOrEmpty()) {
listener.forEach {
if (it.bType == bType) {
it.onOpenListener = {
msgListener?.onOpen()
}
it.msgTextListener = {
try {
val data: JSONObject? = JSONObject.parseObject(it)
if (data != null) {
msgListener?.onMessage(data)
}
} catch (e: Exception) {
LogUtils.e("ZRWEBSOCKET", e.message)
}
}
it.onCloseListener = {
msgListener?.onClosed()
}
// 替换后触发一次onOpen()
msgListener?.onOpen()
needAdd = false
}
}
}
if (needAdd) {
listener.add(SocketManagerBean(
bType = bType,
onOpenListener = {
msgListener?.onOpen()
},
msgTextListener = {
try {
// val data: BEAN = JSONObject.parseObject(it, object : TypeReference<BEAN>() {})
val data: JSONObject? = JSONObject.parseObject(it)
if (data != null) {
msgListener?.onMessage(data)
}
} catch (e: Exception) {
LogUtils.e("ZRWEBSOCKET", e.message)
}
},
onCloseListener = {
msgListener?.onClosed()
}
))
}
// 判断长链接服务是否已经启动,如果没启动则启动
Intent(context.applicationContext, WebSocketService::class.java).also {
context.applicationContext.bindService(it, mConnection, Context.BIND_AUTO_CREATE)
}
}
/**
* 解除消息接收
*/
fun unSubscribeMessage(bType: String?) {
val list = mutableListOf<SocketManagerBean>()
listener.forEach {
if (it.bType != bType) {
list.add(it)
}
}
listener.clear()
listener.addAll(list)
if (listener.size == 0) {
mService?.send(Message.obtain(null, MSG_UNREGISTER).also {
it.replyTo = mMessenger
})
}
}
/**
* 发送消息
*/
fun <BEAN> sendMsg(bType: String?, msg: BEAN?) {
val model = SocketModel().apply {
bt = bType
m = msg
}
val bundle = Bundle().also {
it.putString("msg", JSON.toJSONString(model))
}
mService?.send(Message.obtain(null, MSG_SEND_MESSAGE_TO_SERVICE, bundle).also {
it.replyTo = mMessenger
})
}
}
data class SocketManagerBean(
val bType: String? = null,
var onOpenListener: (() -> Unit)? = null,
var msgTextListener: ((String?) -> Unit)? = null,
var onCloseListener: (() -> Unit)? = null,
)
interface OnMessageListener {
fun onOpen() {}
fun onMessage(msg: JSONObject)
fun onClosed() {}
}
这样,到了最顶层的实际业务,我们的使用就非常简单了,记得用完后 unSubscribeMessage 就好~
WebSocketManager.Companion.INSTANCE.subscribeMessage(mContext.applicationContext, B_TYPE,
object : OnMessageListener {
override fun onOpen() {
LogUtils.i(TAG, "long socket is connected")
}
override fun onMessage(msg: JSONObject) {
LogUtils.i(TAG, "long socket return message$msg")
}
override fun onClosed() {
LogUtils.i(TAG, "long socket is disconnected")
}
})
最后,这套设计仅满足了基本的接收和分发功能,但是缺乏消息阻塞队列等,同时由于应用性质,这里也没有考虑保活操作
最最后,如果这篇文章存在不足,欢迎大家前来指正,也欢迎各位朋友一起讨论交流~