6.0socketserver实现并发

【一】引入

  • socket不能多并发,只能支持一个用户
  • socketserver 模块是 Python 中用于创建网络服务器的模块,提供了一种简单而一致的接口。
    • 它是构建网络服务器的框架,处理了创建、维护和关闭连接的许多底层细节
    • socketserversocket的再封装

【二】socketserver介绍

【1】简介

  • socketserver在python2中为SocketServer

    • 在python3种取消了首字母大写,改名为socketserver。
  • socketserver中包含了两种类,

    • 一种为服务类(server class):前者提供了许多方法
      • 像绑定,监听,运行…… (也就是建立连接的过程) 。
    • 一种为请求处理类(request handle class)
      • 专注于如何处理用户所发送的数据(也就是事务逻辑)。
  • 一般情况下,所有的服务,都是先建立连接,也就是建立一个服务类的实例,然后开始处理用户请求,也就是建立一个请求处理类的实例。

【2】socketserver 模块中的类

  • BaseServer:实现服务器的基本类。

  • TCPServer:处理 TCP 连接的服务器。

  • UDPServer:处理 UDP 连接的服务器。

  • UnixStreamServer:类似于TCPServer提供面向数据流的套接字连接,但是旨在UNIX平台上可用。

  • UnixDatagramServer:类似于UDPServer提供面向数据报的套接字连接,但是旨在UNIX平台上可用。

  • ForkingMixIn:实现了核心的进程化功能,用于与服务器类进行混合,提供异步特性。

  • ThreadingMixIn:实现了核心的线程化功能,用于与服务器类进行混合,异步特性。

  • ForkingTCPServer:每个请求创建一个新进程的 TCP 服务器。ForkingMixIn和TCPServer`组合。

  • ForkingUDPServer:每个请求创建一个新进程的 UDP 服务器。ForkingMixIn和UDPServer组合。

  • ThreadingTCPServer:在单独的线程中处理每个请求的 TCP 服务器。ThreadingMixIn和TCPServer组合。

  • ThreadingUDPServer:在单独的线程中处理每个请求的 UDP 服务器。ThreadingMixIn和UDPServer组合。

  • BaseRequestHandler:用于定制Handler类型,自定义的Handler类型只要继承自BaseRequestHandler,并覆盖写入它的handle() 方法即可。

  • StreamRequestHandler:TCP请求处理类的一个实现。

  • DataStreamRequestHandler:UDP请求处理类的一个实现。

【3】server类继承关系

在这里插入图片描述

【4】请求处理类继承关系

在这里插入图片描述

【三】socket 模块与 socketserver 关系

  • socket 和 socketserver 是两个不同的 Python 模块,都用于网络编程。

  • socket 模块提供了通用的套接字编程接口,可用于创建客户端和服务器端程序。

    • 它涵盖了与网络通信相关的底层细节
    • 如创建套接字、绑定地址、监听请求、接受连接、发送数据和接收数据。
  • socketserver 模块是 socket 模块的一个封装

    • 它抽象了服务器端网络编程的复杂度,使您能够快速编写服务器端程序。
    • 它提供了多种服务器类型
    • 如多线程服务器、多进程服务器和单进程服务器,以满足不同的网络编程需求。
    • 此外,它还提供了简单的面向对象编程模型,允许您扩展基础类并定制服务器行为。
  • 小结

    • 如果您需要快速编写简单的服务器端程序,那么 socketserver 模块可能是您的最佳选择
    • 如果您需要更多的灵活性和细节控制,则可以使用 socket 模块。

【四】socketserver使用

  • 继承关系图中有五个类,其中四个类表示四种类型的同步服务器:
+------------+
| BaseServer |
+------------+
      |
      v
+-----------+        +------------------+
| TCPServer |------->| UnixStreamServer |
+-----------+        +------------------+
      |
      v
+-----------+        +--------------------+
| UDPServer |------->| UnixDatagramServer |
+-----------+        +--------------------+

【1】创建 TCPServer

(1)语法规则

  • TCPServer 类是 Python3 中基于 TCP 协议的多线程服务器类
import socketserver

# 这将使用InternetTCP协议,该协议提供客户机和服务器之间的连续数据流。
# 如果 bind_and_activate 为true,构造函数自动尝试调用 server_bind() 和 server_activate() . 
# 其他参数将传递给 BaseServer 基类。
servser = socketserver.TCPServer(server_address, RequestHandlerClass, bind_and_activate=True)

# socketserver.UnixStreamServer:仅限于Unix系统的,Unix套接字流
servser = socketserver.UnixStreamServer(server_address, RequestHandlerClass, bind_and_activate=True)
  • 使用 UDPServer 类时,可以指定下列三个参数

    • server_address:服务器的地址,他应该是一个元组包含地址和端口如:(“localhost”, 9000)。

    • RequestHandlerClass:我们自定义的类,类中必须重写handle()方法。用于处理所有socket请求。

    • bind_and_activate:如果为True,将自动调用server_bind()和server_activate()。一般默认即可。

(2)示例

import socketserver


class MyTCPHandler(socketserver.BaseRequestHandler):
    def handle(self):
        # 接收客户端请求的数据
        self.data = self.request.recv(1024).strip()
        print("{} 发送了:{}".format(self.client_address[0], self.data))
        # 向客户端发送响应数据
        self.request.sendall(self.data.upper())


if __name__ == "__main__":
    # 创建服务器,绑定 IP 地址和端口号
    HOST, PORT = "localhost", 9999
    server = socketserver.TCPServer((HOST, PORT), MyTCPHandler)
    # 启动服务器
    server.serve_forever()
  • 该代码定义了一个简单的多线程服务器,它绑定在本地主机(localhost)的端口号 9999 上。
  • 当有客户端连接时,服务器会接收客户端发送的数据,然后将数据转换为大写并发送回客户端。
  • 请注意,您需要创建一个名为 MyTCPHandler 的处理程序类,该类必须从 socketserver.BaseRequestHandler 类继承,并实现 handle 方法。
  • 在 handle 方法中,您可以处理客户端请求,并向客户端发送响应数据。
  • 最后,您可以通过调用 server.serve_forever() 方法启动服务器,并使其处于持续监听状态。

【2】创建 UDPServer

(1)语法规则

import socketserver

# 它使用数据报,这些数据报是离散的信息包,在传输过程中可能出现无序到达或丢失。
server = socketserver.UDPServer(server_address, RequestHandlerClass, bind_and_activate=True)

# socketserver.UnixDatagramServer:仅限于Unix系统的,Unix套接字流
server = socketserver.UnixDatagramServer(server_address, RequestHandlerClass, bind_and_activate=True)
  • 使用 UDPServer 类时,可以指定下列三个参数:

    • server_address:服务器的 IP 地址和端口号。例如:(“localhost”, 9999)。

    • RequestHandlerClass:处理客户端请求的处理程序类。例如:MyUDPHandler。

    • bind_and_activate:如果为True,将自动调用server_bind()和server_activate()。一般默认即可。

(2)示例

import socketserver


class MyTCPHandler(socketserver.BaseRequestHandler):
    def handle(self):
        # 接收客户端请求的数据
        self.data = self.request.recv(1024).strip()
        print("{} 发送了:{}".format(self.client_address[0], self.data))
        # 向客户端发送响应数据
        self.request.sendall(self.data.upper())


if __name__ == "__main__":
    # 创建服务器,绑定 IP 地址和端口号
    HOST, PORT = "localhost", 9999
    server = socketserver.UDPServer((HOST, PORT), MyTCPHandler)
    # 启动服务器
    server.serve_forever()

【五】异步服务器类(多线程/多进程)

  • socketserver 模块中也提供了一些异步服务器类,它们可以在单独的线程中处理多个客户端请求,从而提高服务器的并发性能。下面一一介绍。

【1】ThreadingMixIn(多线程)

  • ThreadingMixIn 类是 socketserver 模块中的一个异步服务器类,用于创建基于多线程的异步服务器。
  • 您可以通过继承该类来创建自己的异步服务器类,然后通过该类创建服务器。
  • 例如,创建一个基于多线程的异步 TCP 服务器的代码如下:
import socketserver


class ThreadingTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
    pass


class MyTCPHandler(socketserver.BaseRequestHandler):
    def handle(self):
        # 接收客户端请求的数据
        self.data = self.request.recv(1024).strip()
        print("{} 发送了:{}".format(self.client_address[0], self.data))
        # 向客户端发送响应数据
        self.request.sendall(self.data.upper())


# 创建服务器,绑定 IP 地址和端口号
HOST, PORT = "localhost", 9999
server = ThreadingTCPServer((HOST, PORT), MyTCPHandler)
  • 在这里,您需要创建一个名为 MyTCPHandler 的处理程序类,该类必须从 socketserver.BaseRequestHandler 类继承,并实现 handle 方法。
  • 该方法是处理客户端请求的核心方法,在该方法中您可以处理客户端请求,并向客户端发送响应数据。
  • 在创建基于多线程的异步服务器时,每个客户端请求都会在一个单独的线程中处理,从而提高服务器的并发性能。
import socket
import threading
import socketserver


class ThreadedTCPRequestHandler(socketserver.BaseRequestHandler):

    def handle(self):
        data = str(self.request.recv(1024), 'ascii')
        cur_thread = threading.current_thread()
        response = bytes("{}: {}".format(cur_thread.name, data), 'ascii')
        self.request.sendall(response)


class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
    pass


def client(ip, port, message):
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
        sock.connect((ip, port))
        sock.sendall(bytes(message, 'ascii'))
        response = str(sock.recv(1024), 'ascii')
        print("Received: {}".format(response))


if __name__ == "__main__":
    # Port 0 means to select an arbitrary unused port
    HOST, PORT = "localhost", 0

    server = ThreadedTCPServer((HOST, PORT), ThreadedTCPRequestHandler)
    with server:
        ip, port = server.server_address

        # Start a thread with the server -- that thread will then start one
        # more thread for each request
        server_thread = threading.Thread(target=server.serve_forever)
        # Exit the server thread when the main thread terminates
        server_thread.daemon = True
        server_thread.start()
        print("Server loop running in thread:", server_thread.name)

        client(ip, port, "Hello World 1")
        client(ip, port, "Hello World 2")
        client(ip, port, "Hello World 3")

        server.shutdown()
Server loop running in thread: Thread-1
Received: Thread-2: Hello World 1
Received: Thread-3: Hello World 2
Received: Thread-4: Hello World 3

【2】ForkingMixIn(多进程)

  • ForkingMixIn 类是 socketserver 模块中的一个异步服务器类,用于创建基于多进程的异步服务器。
  • 您可以通过继承该类来创建自己的异步服务器类,然后通过该类创建服务器。
  • 例如,创建一个基于多进程的异步 TCP 服务器的代码如下:
import socketserver


class ForkingTCPServer(socketserver.ForkingMixIn, socketserver.TCPServer):
    pass


class MyTCPHandler(socketserver.BaseRequestHandler):
    def handle(self):
        # 接收客户端请求的数据
        self.data = self.request.recv(1024).strip()
        print("{} 发送了:{}".format(self.client_address[0], self.data))
        # 向客户端发送响应数据
        self.request.sendall(self.data.upper())
        

# 创建服务器,绑定 IP 地址和端口号
HOST, PORT = "localhost", 9999
server = ForkingTCPServer((HOST, PORT), MyTCPHandler)
  • 在这里,您需要创建一个名为 MyTCPHandler 的处理程序类,该类必须从 socketserver.BaseRequestHandler 类继承,并实现 handle 方法。
  • 该方法是处理客户端请求的核心方法,在该方法中您可以处理客户端请求,并向客户端发送响应数据。
  • 在创建基于多进程的异步服务器时,每个客户端请求都会在一个单独的进程中处理,从而提高服务器的并发性能。

【注意】

  • 在 Unix 系统上,使用多进程的异步服务器可能会导致内存占用增加。
  • 因此,在 Unix 系统上,建议使用多线程的异步服务器。

【3】ForkingTCPServer(TCP 多进程)

  • ForkingTCPServer 是 socketserver 模块中的一个异步服务器类,用于创建基于多进程的 TCP 服务器。

  • 要使用 ForkingTCPServer 类,您需要提供服务器的 IP 地址和端口号,以及处理客户端请求的处理程序类。

  • 以下是一个使用 ForkingTCPServer 类的示例:

import socketserver


class MyTCPHandler(socketserver.BaseRequestHandler):
    def handle(self):
        # 获取客户端请求的数据
        data = self.request.recv(1024).strip()
        # 向客户端发送响应数据
        self.request.sendall(bytes("Hello, {0}".format(data), "utf-8"))


# 创建服务器,绑定 IP 地址和端口号
HOST, PORT = "localhost", 9999
server = socketserver.ForkingTCPServer((HOST, PORT), MyTCPHandler)

# 启动服务器
server.serve_forever()
  • 在这个示例中,我们创建了一个名为 MyTCPHandler 的处理程序类,该类从 socketserver.BaseRequestHandler 类继承,并实现了 handle 方法。
  • 该方法是处理客户端请求的核心方法,在该方法中您可以处理客户端请求,并向客户端发送响应数据。
  • 接下来,我们使用 socketserver.ForkingTCPServer 类创建了一个 TCP 服务器,并绑定了服务器的 IP 地址和端口号。
  • 最后,我们使用 serve_forever 方法启动服务器,以便处理。

【4】ForkingUDPServer(UDP 多进程)

  • ForkingUDPServer 是 socketserver 模块中的一个异步服务器类,用于创建基于多进程的 UDP 服务器。

  • 要使用 ForkingUDPServer 类,您需要提供服务器的 IP 地址和端口号,以及处理客户端请求的处理程序类。

  • 以下是一个使用 ForkingUDPServer 类的示例:

import socketserver


class MyUDPHandler(socketserver.BaseRequestHandler):
    def handle(self):
        data = self.request[0].strip()
        socket = self.request[1]
        # 向客户端发送响应数据
        socket.sendto(bytes("Hello, {0}".format(data), "utf-8"), self.client_address)


# 创建服务器,绑定 IP 地址和端口号
HOST, PORT = "localhost", 9999
server = socketserver.ForkingUDPServer((HOST, PORT), MyUDPHandler)

# 启动服务器
server.serve_forever()
  • 在这个示例中,我们创建了一个名为 MyUDPHandler 的处理程序类,该类从 socketserver.BaseRequestHandler 类继承,并实现了 handle 方法。
  • 该方法是处理客户端请求的核心方法,在该方法中您可以处理客户端请求,并向客户端发送响应数据。
  • 接下来,我们使用 socketserver.ForkingUDPServer 类创建了一个 UDP 服务器,并绑定了服务器的 IP 地址和端口号。
  • 最后,我们使用 serve_forever 方法启动服务器,以便处理客户端请求。

【5】ThreadingTCPServer(TCP 多线程)

  • ThreadingTCPServer 是 socketserver 模块中的一个异步服务器类,用于创建基于多线程的 TCP 服务器。

  • 要使用 ThreadingTCPServer 类,您需要提供服务器的 IP 地址和端口号,以及处理客户端请求的处理程序类。

  • 以下是一个使用 ThreadingTCPServer 类的示例:

import socketserver


class MyTCPHandler(socketserver.BaseRequestHandler):
    def handle(self):
        data = self.request.recv(1024).strip()
        # 向客户端发送响应数据
        self.request.sendall(bytes("Hello, {0}".format(data), "utf-8"))


# 创建服务器,绑定 IP 地址和端口号
HOST, PORT = "localhost", 9999
server = socketserver.ThreadingTCPServer((HOST, PORT), MyTCPHandler)

# 启动服务器
server.serve_forever()
  • 在这个示例中,我们创建了一个名为 MyTCPHandler 的处理程序类,该类从 socketserver.BaseRequestHandler 类继承,并实现了 handle 方法。
  • 该方法是处理客户端请求的核心方法,在该方法中您可以处理客户端请求,并向客户端发送响应数据。
  • 接下来,我们使用 socketserver.ThreadingTCPServer 类创建了一个 TCP 服务器,并绑定了服务器的 IP 地址和端口号。
  • 最后,我们使用 serve_forever 方法启动服务器,以便处理客户端请求。

【6】ThreadingUDPServer(UDP 多线程)

  • ThreadingUDPServer 是 socketserver 模块中的一个异步服务器类,用于创建基于多线程的 UDP 服务器。

  • 要使用 ThreadingUDPServer 类,您需要提供服务器的 IP 地址和端口号,以及处理客户端请求的处理程序类。

  • 以下是一个使用 ThreadingUDPServer 类的示例:

import socketserver


class MyUDPHandler(socketserver.BaseRequestHandler):
    def handle(self):
        data = self.request[0].strip()
        socket = self.request[1]
        # 向客户端发送响应数据
        socket.sendto(bytes("Hello, {0}".format(data), "utf-8"), self.client_address)


# 创建服务器,绑定 IP 地址和端口号
HOST, PORT = "localhost", 9999
server = socketserver.ThreadingUDPServer((HOST, PORT), MyUDPHandler)

# 启动服务器
server.serve_forever()
  • 在这个示例中,我们创建了一个名为 MyUDPHandler 的处理程序类,该类从 socketserver.BaseRequestHandler 类继承,并实现了 handle 方法。
  • 该方法是处理客户端请求的核心方法,在该方法中您可以处理客户端请求,并向客户端发送响应数据。
  • 接下来,我们使用 socketserver.ThreadingUDPServer 类创建了一个 UDP 服务器,并绑定了服务器的 IP 地址和端口号。
  • 最后,我们使用 serve_forever 方法启动服务器,以便处理客户端请求。

【补充】socketserver

【0】引入

  • 基于tcp的套接字,关键就是两个循环
    • 一个链接循环
    • 一个通信循环
  • socketserver模块中分两大类:
    • server类(解决链接问题)
    • request类(解决通信问题)

【1】server类

img

【2】request类

img

【3】继承关系

(1)关系一

img

(2)关系二

img

(3)关系三

img

【补充】文件传输器

  • 以下述代码为例,分析socketserver源码:

    ftpserver = socketserver.ThreadingTCPServer(('127.0.0.1', 8080), FtpServer)
    
    ftpserver.serve_forever()
  • 查找属性的顺序:

    ThreadingTCPServer->ThreadingMixIn->TCPServer->BaseServer
  • 实例化得到ftpserver

    • 先找类ThreadingTCPServer的init
    • 在TCPServer中找到
    • 进而执行server_bind,server_active
  • 找ftpserver下的serve_forever

    • 在BaseServer中找到,进而执行self._handle_request_noblock(),该方法同样是在BaseServer中
  • 执行self._handle_request_noblock()进而执行request, client_address = self.get_request()(就是TCPServer中的self.socket.accept())

    • 然后执行self.process_request(request, client_address)
  • 在ThreadingMixIn中找到process_request,开启多线程应对并发

    • 进而执行process_request_thread
    • 执行self.finish_request(request, client_address)
  • 上述四部分完成了链接循环

    • 本部分开始进入处理通讯部分
    • 在BaseServer中找到finish_request
    • 触发我们自己定义的类的实例化
    • 去找init方法,而我们自己定义的类没有该方法
    • 则去它的父类也就是BaseRequestHandler中找….
  • 源码分析总结:

    • 基于tcp的socketserver我们自己定义的类中的
      • self.server即套接字对象
      • self.request即一个链接
      • self.client_address即客户端地址
    • 基于udp的socketserver我们自己定义的类中的
      • self.request是一个元组(第一个元素是客户端发来的数据,第二部分是服务端的udp套接字对象),如(b’adsf’, )
      • self.client_address即客户端地址

(1)FtpServer

import socketserver
import struct
import json
import os


# 创建服务类,继承了 socketserver.BaseRequestHandler , 基本的处理类
class FtpServer(socketserver.BaseRequestHandler):
    # 定义编码格式
    coding = 'utf-8'
    # 定义服务文件路径
    server_dir = 'file_upload'
    # 定义一次性最大传输
    max_packet_size = 1024
    # 定义根路径
    BASE_DIR = os.path.dirname(os.path.abspath(__file__))

    # 文件处理函数
    def handle(self):
        # 查看自己的request请求
        print(self.request)
        # 创建循环
        while True:
            # 调用自己的请求对象接收数据
            data = self.request.recv(4)
            # 数据解包
            data_len = struct.unpack('i', data)[0]
            # 取出打好的头
            head_json = self.request.recv(data_len).decode(self.coding)
            # 转数据格式
            head_dic = json.loads(head_json)
            # print(head_dic)
            # 获取命令
            cmd = head_dic['cmd']
            # 判断当前自己是否具有该方法
            if hasattr(self, cmd):
                func = getattr(self, cmd)
                func(head_dic)

    def _create_filename(self, path):
        if not os.path.exists(path):
            os.mkdir(path)

    # 定义上传处理函数
    def put(self, args):
        # 定义文件路径
        file_path = os.path.normpath(os.path.join(
            self.BASE_DIR,
            self.server_dir,
            args['filename']
        ))
        # 创建上传文件存储路径 ..file_upload
        self._create_filename(os.path.join(self.BASE_DIR, self.server_dir))
        # 取出文件大小
        filesize = args['filesize']
        # 定义起始文件接收量
        recv_size = 0
        print('----->', file_path)
        # 打开文件,保存文件数据
        with open(file_path, 'wb') as f:
            while recv_size < filesize:
                recv_data = self.request.recv(self.max_packet_size)
                f.write(recv_data)
                recv_size += len(recv_data)
                print('recvsize:%s filesize:%s' % (recv_size, filesize))


ftpserver = socketserver.ThreadingTCPServer(('127.0.0.1', 8080), FtpServer)
ftpserver.serve_forever()

(2)FtpClient

import socket
import struct
import json
import os


# 定义处理器类
class MYTCPClient:
    # 定义套接字家族
    address_family = socket.AF_INET

    # 定义协议类型
    socket_type = socket.SOCK_STREAM

    # 定义拒绝地址(黑名单)
    allow_reuse_address = False

    # 最大传输包大小
    max_packet_size = 8192

    # 默认编码格式
    coding = 'utf-8'

    # 最大请求次数
    request_queue_size = 5

    def __init__(self, server_address, connect=True):
        # 获取服务端地址
        self.server_address = server_address
        # 创建 socket 对象
        self.socket = socket.socket(self.address_family,
                                    self.socket_type)
        # 判断当前是否链接
        if connect:
            try:
                self.client_connect()
            except:
                self.client_close()
                raise

    # 定义连接方法
    def client_connect(self):
        # 连接指定服务端
        self.socket.connect(self.server_address)

    # 关闭连接
    def client_close(self):
        self.socket.close()

    # 启动函数
    def run(self):
        # 多次循环
        while True:
            # 输入指令
            inp = input(">>: ").strip()
            if not inp: continue
            # 切分指令
            l = inp.split()
            # 获取cmd命令
            cmd = l[0]
            # 判断自己当前是否含有该方法
            if hasattr(self, cmd):
                func = getattr(self, cmd)
                func(l)

    # 定义上传函数
    def put(self, args):
        cmd = args[0]
        filename = args[1]
        if not os.path.isfile(filename):
            print('file:%s is not exists' % filename)
            return
        else:
            filesize = os.path.getsize(filename)

        # 粘包处理 --- 打包
        head_dic = {'cmd': cmd, 'filename': os.path.basename(filename), 'filesize': filesize}
        print(head_dic)
        head_json = json.dumps(head_dic)
        head_json_bytes = bytes(head_json, encoding=self.coding)
        # 打包
        head_struct = struct.pack('i', len(head_json_bytes))
        # 传输包长度
        self.socket.send(head_struct)
        # 传输打包数据
        self.socket.send(head_json_bytes)
        send_size = 0
        # 打开文件,上传文件
        with open(filename, 'rb') as f:
            for line in f:
                self.socket.send(line)
                send_size += len(line)
                print(send_size)
            else:
                print('upload successful')


client = MYTCPClient(('127.0.0.1', 8080))

client.run()