一、SSE 服务端消息推送
本文主要探索两个方面的实践:一个是客户端发送请求,服务端的返回是分多次进行传输的,直到传输完成,这种情况下请求结束后,就可以考虑关闭 SSE了,所以这种连接可以认为是暂时的。另一种是由服务端在特定的时机下主动推送消息给到客户端,推送的时机具有不确定性,随时性,所以这种情况下需要客户端和服务端保持长久连接。
本次使用的
tornado==6.3.2
二、短暂性场景下的 SSE 实现
短暂性场景下就是对应上面的第一点,客户端主动发送请求后,服务端分多次传输,直到完成,数据获取完成后连接就可以断开了,适用于一些接口复杂,操作步骤多的场景,可以提前告诉客户端现在进行到了哪一步了,并且这种方式也有利于服务端的横向扩展。
在
import time
from tornado.concurrent import run_on_executor
from tornado.web import RequestHandler
import tornado.gen
from concurrent.futures.thread import ThreadPoolExecutor
class SSE(RequestHandler):
def initialize(self):
# 关闭自动结束
self._auto_finish = False
print("initialize")
def set_default_headers(self):
# 设置为事件驱动模式
self.set_header('Content-Type', "text/event-stream")
# 不使用缓存
self.set_header('Content-Control', "no-cache")
# 保持长连接
self.set_header('Connection', "keep-alive")
# 允许跨域
self.set_header('Access-Control-Allow-Origin', "*")
def prepare(self):
# 准备线程池
self.executor = self.application.pool
@tornado.gen.coroutine
def get(self):
result = yield self.doHandle()
self.write(result)
# 结束
self.finish()
@run_on_executor
def doHandle(self):
tornado.ioloop.IOLoop.current()
# 分十次推送信息
for i in range(10):
time.sleep(1)
self.flush()
self.callback(f"current: {i}")
return f"data: end
"
def callback(self, message):
# 事件推送
message = f"data: {message}
"
self.write(message)
self.flush()
class Application(tornado.web.Application):
def __init__(self):
handlers = [
("/sse", SSE),
("/(.*)$", tornado.web.StaticFileHandler, {
"path": "resources/static",
"default_filename": "index.html"
})
]
super(Application, self).__init__(handlers)
self.pool = ThreadPoolExecutor(200)
def startServer(port):
app = Application()
httpserver = tornado.httpserver.HTTPServer(app)
httpserver.listen(port)
print(f"Start server success", f"The prot = {port}")
tornado.ioloop.IOLoop.current().start()
if __name__ == '__main__':
startServer(8020)
运行后可以到浏览器访问:

那如何在前端用
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>测试服务器推送技术</title>
</head>
<body>
<div id="messages"></div>
</body>
<script>
const eventSource = new EventSource('http://localhost:8020/sse');
// 事件回调
eventSource.onmessage = (event) => {
console.log(event.data)
const messagesDiv = document.getElementById('messages');
messagesDiv.innerHTML += '<p>' + event.data + '</p>';
};
// 异常
eventSource.onerror = (error) => {
console.error('EventSource failed:', error);
eventSource.close();
};
eventSource.onopen = ()=>{
console.log("开启")
}
</script>
</html>
运行后可以看到服务端分阶段推送过来的数据:

三、长连接场景下的 SSE 实现
上面实现了客户端请求后,分批次返回,但是有些情况下是客户端连接后没有东西返回,而是在某个特定的时机下返回给某几个客户端,所以这种情况,我们需要和客户端保持长久的连接,同时进行客户端连接的缓存,因为同时有可能有
下面是一个实现案例:
import time
from tornado.concurrent import run_on_executor
from tornado.web import RequestHandler
import tornado.gen
from concurrent.futures.thread import ThreadPoolExecutor
# 单例
def singleton(cls):
instances = {}
def wrapper(*args, **kwargs):
if cls not in instances:
instances[cls] = cls(*args, **kwargs)
return instances[cls]
return wrapper
# 订阅推送工具类
@singleton
class Pusher():
def __init__(self):
self.clients = {}
def add_client(self, client_id, callback):
if client_id not in self.clients:
self.clients[client_id] = callback
print(f"{client_id} 连接")
def send_all(self, message):
for client_id in self.clients:
callback = self.clients[client_id]
print("发送消息给:", client_id)
callback(message)
def send(self, client_id, message):
callback = self.clients[client_id]
print("发送消息给:", client_id)
callback(message)
class SSE(RequestHandler):
# 定义推送者
pusher = Pusher()
def initialize(self):
# 关闭自动结束
self._auto_finish = False
print("initialize")
def set_default_headers(self):
# 设置为事件驱动模式
self.set_header('Content-Type', "text/event-stream")
# 不使用缓存
self.set_header('Content-Control', "no-cache")
# 保持长连接
self.set_header('Connection', "keep-alive")
# 允许跨域
self.set_header('Access-Control-Allow-Origin', "*")
@tornado.gen.coroutine
def get(self):
# 客户端唯一标识
client_id = self.get_argument("client_id")
self.pusher.add_client(client_id, self.callback)
def callback(self, message):
# 事件推送
message = f"data: {message}
"
self.write(message)
self.flush()
# 定义推送接口,模拟推送
class Push(RequestHandler):
# 定义推送者
pusher = Pusher()
def prepare(self):
# 准备线程池
self.executor = self.application.pool
@tornado.gen.coroutine
def get(self):
# 客户端标识
client_id = self.get_argument("client_id")
# 推送的消息
message = self.get_argument("message")
result = yield self.doHandle(client_id, message)
self.write(result)
@run_on_executor
def doHandle(self, client_id, message):
tornado.ioloop.IOLoop.current()
self.pusher.send(client_id, message)
return "success"
class Application(tornado.web.Application):
def __init__(self):
handlers = [
("/sse", SSE),
("/push", Push),
("/(.*)$", tornado.web.StaticFileHandler, {
"path": "resources/static",
"default_filename": "index.html"
})
]
super(Application, self).__init__(handlers)
self.pool = ThreadPoolExecutor(200)
def startServer(port):
app = Application()
httpserver = tornado.httpserver.HTTPServer(app)
httpserver.listen(port)
print(f"Start server success", f"The prot = {port}")
tornado.ioloop.IOLoop.current().start()
if __name__ == '__main__':
startServer(8020)
这里我定义了一个
同样前端也要修改,需要给自己定义
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>测试服务器推送技术</title>
</head>
<body>
<div id="client"></div>
<div id="messages"></div>
</body>
<script>
function generateUUID() {
let uuid = 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, function(c) {
const r = Math.random() * 16 | 0;
const v = c === 'x' ? r : (r & 0x3 | 0x8);
return v.toString(16);
});
return uuid;
}
// 利用uuid 模拟生成唯一的客户端ID
let client_id = generateUUID();
document.getElementById('client').innerHTML = "当前 client_id = "+client_id;
const eventSource = new EventSource('http://localhost:8020/sse?client_id='+client_id);
// 事件回调
eventSource.onmessage = (event) => {
console.log(event.data)
const messagesDiv = document.getElementById('messages');
messagesDiv.innerHTML += '<p>' + event.data + '</p>';
};
// 异常
eventSource.onerror = (error) => {
console.error('EventSource failed:', error);
eventSource.close();
};
eventSource.onopen = ()=>{
console.log("开启")
}
</script>
</html>
这里我用
下面使用浏览器打开三个页面,可以看到三个不同的



在服务端的日志中也能看到这三个客户端的连接:

下面调用

下面看到

已经成功收到推送的消息,反之看另外两个:


都没有消息,到这里就实现了长连接下不定时的服务端消息推送方案。