webserver 之 线程同步 && 线程池(半同步半反应堆)

目录

??前言

??B / S 模型 

??线程同步机制

??概念

(1)RAII

(2)信号量

(3)互斥量

(4)条件变量

??功能

(1)锁机制

(2)封装

??线程池 -- 概念

(1)服务器基本框架

(2)五种 I/O 模型

(3)事件处理模式

(4)同步 I/O 模拟 Proactor 模式

(5)并发编程模式

(6)半同步 / 半反应堆

(7)线程池

(8)静态成员   变量 && 函数 

(9)pthread_create 陷阱

(10)this 指针的锅

?线程池 -- 代码分析

(1)线程池 -- 类定义

(2)线程池 -- 创建与回收

(3)请求队列 -- 添加任务

(4)线程处理函数

(5)run 执行任务


??前言

建议同步跟原书 《Linux高性能服务器编程》 (游双  著)

源码地址:TinyWebServer/README.md at master · qinguoyi/TinyWebServer (github.com)

步骤

(1)结合 GPT,Google,cppreference 跟一遍博客

(2)同步跟原书(速刷)

(3)看源码(理解着敲一遍)(大体框架要知道)(所有类图,用例图,时序图等,自己用语雀画一遍)

............................

项目做完后,结合八股理解

-----------------------

还有能力或时间,就自己扩展新内容然后 pull request

??B / S 模型 

Browser / Server 区别于 Client / Server

??指的是,客户端(Client) 通过 浏览器(Browser),访问 Web服务器(Server) 上的网页

客户端和浏览器端之间走的报文是http协议(即超文本传输协议) 

C / S 模式中,走的时 TCP 协议(Transmission Control Protocol,传输控制协议

bs模型和cs模型「建议收藏」-腾讯云开发者社区-腾讯云 (tencent.com)

??线程同步机制

??概念

最新版Web服务器项目详解 - 01 线程同步机制封装类 (qq.com)

(1)RAII

RAII:资源获取即初始化?? Resource Acquisition is Initialization

构造函数 -- 分配资源

析构函数 -- 释放资源

一个对象创建时,自动调用构造函数

当对象超出作用域的时候,会自动调用析构函数

在 RAII 的指导下,我们使用类来管理资源,将资源的对象的生命周期绑定

RAII 核心:资源与对象的 生命周期 绑定,通过C++语言机制,实现对资源的安全管理

比如智能指针??

彻底搞懂之C++智能指针-腾讯云开发者社区-腾讯云 (tencent.com)

快速看一下理论??

(2)信号量

变量,支持 2 种操作,假设一信号量 SV

a. 等待(P):SV > 0,SV--; SV == 0,挂起执行

b. 信号(V):存在其他进程因等待 SV 挂起,就唤醒该进程;否则,SV++

二进制信号量,最常用,只有 0 和 1 

a. sem_init() 初始化 信号量

b. sem_destory() 销毁 信号量

c. sem_wait() 原子操作方式,信号量 -1;信号量 == 0,sem_wait() 阻塞

d. sem_post() 原子操作方式,信号量 +1;信号量 > 0,唤醒调用 sem_post()的线程

以上,成功 return 0;  失败返回 errno

(3)互斥量

即 互斥锁:保护关键代码段,确保 独占式 访问

a. 进入关键代码段 -- 获得互斥锁并加锁

b. 离开关键代码段 -- 唤醒等待该互斥锁的线程

a. pthread_mutex_init() 初始化互斥锁

b. pthread_mutex_destory() 销毁互斥锁

c. pthread_mutex_lock() 原子操作方式,给互斥锁,枷锁

d. pthread_mutex_unlock() 原子操作方式,给互斥锁,解锁

成功,返回 0;失败,返回 errno

(4)条件变量

提供 线程间 通知机制,某个共享数据达到某个值,唤醒等待这个共享数据的 线程

a. pthread_cond_init() 初始化

b. pthread_cond_destory() 销毁

c. pthread_cond_broadcast() 广播方式,唤醒所有等待目标条件变量的 线程

d. pthread_cond_wait() 等待目标条件变量
调用时,传入 mutex 参数 (加锁的互斥锁)
执行时,1) 调用线程 放入条件变量的 请求队列
       2) 互斥锁 mutex 解锁
       3) 函数返回 0 时,互斥锁再次被锁上
       4) 也就是说,函数内部,会有一次 解锁 和 加锁 操作

??功能

(1)锁机制

实现 多线程 同步

确保任意时刻,只能有一个线程,进入关键代码段

(2)封装

常见的 Linux 下的三种锁:互斥锁(mutex),读写锁(read-write lock)

和 条件变量(condition variable)

RAII 是一种 C++ 编程范式,在对象的

构造函数 获取资源

析构函数 释放资源 

RAII 确保在对象创建时,获取锁;对象销毁时,释放锁

解释

sem_t 是 POSIX 标准中定义的信号量类型,用于在多线程或多进程之间进行同步和互斥操作

在类中 封装 锁的创建 和 锁的销毁 函数??,并将其防止在 构造 和 析构 函数中

即可实现 RAII 机制(资源获取即初始化)

#include<semaphore.h> // 信号量相关
#include<exception>

class sem {
public:
    // 构造函数
    sem()     
    {
        // 信号量初始化, m_sem 初始值为 0
        if ( sem_init(&m_sem, 0, 0) != 0 )
            throw std::exception(); // 初始化失败,抛出异常
    }
    
    // 析构函数
    ~sem()
    {
        // 销毁信号量
        sem_destory(&m_sem);
    }

private:
    sem_t m_sem; // 信号量变量
};

-------------------------- 分界线 ------------------------- 

 使用宏 PTHREAD_MUTEX_INITIALIZERPTHREAD_COND_INITIALIZER 初始化

重复使用的代码,封装为函数??,减少代码重用

#include <pthread.h> // 多线程相关

class ConditionVariable {
public:
    bool wait() 
    {
        int ret = 0; // 函数返回值
        pthread_mutex_lock(&m_mutex); // 互斥量 加锁
        ret = pthread_cond_wait(&m_cond, &m_mutex); // 等待条件变量, 自动解锁互斥量
        pthread_mutex_unlock(&m_mutex); // 互斥量 解锁
        return ret == 0; // 调用成功
    }

    bool signal()
    {
        return pthread_cond_signal(&m_cond) == 0; // 返送条件变量信号,唤醒线程
    }

private:
    pthread_mutex_t m_mutex = PTHREAD_MUTEX_INITIALIZER; // 定义互斥量, 并初始化
    pthread_cond_t m_cond = PTHREAD_COND_INITIALIZER; // 定义条件变量, 并初始化
};

??线程池 -- 概念

(1)服务器基本框架

由 I/O 单元,逻辑单元 和 网络存储单元 组成

每个单元间,通过 请求队列 通信,协同完成任务

I/O 单元:处理客户端连接,读写网络数据

逻辑单元:处理业务逻辑的线程

网络存储单元:包括本地数据库和文件

(2)五种 I/O 模型

1)阻塞 I/O

调用函数后,需要不停检查函数是否返回,期间什么也不能做,直到函数返回

2)非阻塞 I/O

每隔一段时间,检查 I/O 时间是否就绪,未就绪也可做其他事

非阻塞 I/O 执行系统调用,总是立即返回,不管事件是否发生

未发生返回 -1,然后根据 errno(整形变量) 区分两种情况(事件是否发生)??

详细解释 

在非阻塞 I/O 中,如果事件没有发生,例如一个 accept 调用没有新的客户端连接请求、一个 recv 调用没有可读数据、或者一个 send 调用无法立即发送所有数据,系统调用会立即返回并返回 -1。此时,可以通过检查 errno 变量来区分两种情况

  • 如果 errno 的值是 EAGAIN,表示当前操作暂时无法执行,需要等待或重试。例如对于 accept,这意味着当前没有新的客户端连接请求,需要等待新的请求到来;对于 recvsend,这意味着当前没有可读或可写数据,需要等待数据到达或通道空闲

  • 如果 errno 的值不是 EAGAIN,则表示发生了其他错误,例如连接被中断、参数无效、内存不足等等。此时应用程序通常需要采取相应的错误处理策略,例如输出错误信息、记录日志、重试操作等等

3)信号驱动 I/O: 

信号驱动 I/O 是一种在 Linux 中使用套接字进行的 I/O 操作方式

通过安装一个信号处理函数来实现异步 I/O,使得进程在进行 I/O 操作时不会被阻塞

在使用信号驱动 I/O 时,将套接字设置为非阻塞模式,并为该套接字关联一个信号处理函数

当 I/O 事件就绪时(例如有数据可读或可写),内核会发送一个 SIGIO 信号给进程

进程收到这个信号后,可以立即处理相应的 I/O 事件,而不需要等待或阻塞

这种方式允许进程继续运行,并且只在 I/O 事件就绪时才会中断执行并处理事件

相比于阻塞 I/O,信号驱动 I/O 允许进程在进行其他任务时同时等待 I/O 事件的发生,提高了系统的效率和响应性

4)I/O 复用

I/O 复用,同时监视多个IO操作的机制,使用 select 或 poll 函数实现

这俩函数与 阻塞IO 地区别是,可以同时监视多个 IO操作,并允许进程等待 IO 事件发生时被阻塞

使用 IO 复用时,将多个套接字添加到一个监视集合中,然后调用 select 或 poll 函数,等待其中任意一个 IO 操作就绪(有数据可读或可写)

当某个 IO操作 就绪时,函数会返回,并告诉我们哪些 IO 操作已就绪

接着,遍历监视集合来确定哪些 IO操作 已就绪

再调用相应 IO操作函数进行实际独写操作 --> 以避免阻塞同时处理多个IO操作

概括地说

IO复用:同时监视多个 IO 操作,并且只有其中任意一个操作就绪,才会调用IO操作函数

5)异步IO

Linux中,异步IO,通过调用 aio_read 等函数,告诉内核 IO操作 的信息(文件描述符,缓冲区指针和大小等),并立即返回

接着,内核在 后台完成 IO操作,完成后通知应用

使得应用可以继续执行其他任务,不需要等待 IO操作 的完成,提高并发性和响应性

----------  补充  -----------

阻塞IO,非阻塞IO,信号驱动IO,IO复用??

都是同步IO

同步IO指的是,内核向应用程序统治的是,就绪事件(比如只通知,有客户端连接,要求用户自行执行I/O操作)

异步IO指的是,内核向应用通知的是,完成事件(比如读取客户端数据后,才通知应用,由内核完成I/O操作)

(3)事件处理模式

概念

  • reactor模式中,主线程(I/O处理单元)只负责监听文件描述符上是否有事件发生,有的话立即通知工作线程(逻辑单元 ),读写数据、接受新连接及处理客户请求 均在工作线程中完成。通常由同步I/O实现。

  • proactor模式中,主线程和内核负责处理读写数据、接受新连接等I/O操作,工作 线程仅负责业务逻辑,如处理客户请求。通常由异步I/O实现。

解释 

Reactor 是非阻塞 同步 网络模型,如果把I/O操作改为 异步 就能够进一步提升性能,这就是异步网络模型 Proactor 

(这里 “同步” 指用户进程在执行 read 和 send 这类 I/O 操作 的时候是同步的)

先回顾下 服务器 基本框架??

线程池??

线程池是一种管理和复用线程资源的机制。它在应用程序启动时预先创建一组线程,这些线程处于等待任务的状态。当有任务到达时,线程池中的空闲线程会被唤醒并分配任务进行处理

线程池的好处在于避免了频繁创建和销毁线程的开销,提高了线程的复用性和执行效率。通过限制线程池中的线程数量,还可以控制并发度,防止系统资源过度占用

详细了解下 Reactor 和 Proactor 模式?? 

C++ IO框架 - Reactor 和 Proactor | C++ 全栈知识体系 (stibel.icu)

 I/O多路复用 + 线程池  =  Reactor

??常用组合

(4)同步 I/O 模拟 Proactor 模式

Linux 中 异步 I/O 尚未成熟,基本都是采用 Reactor 模式

这里用 同步 I/O 模拟实现 Proactor 模式?? 

同步 I/O 模型工作流程(epoll_wait为例)??

1)主线程往 epoll 内核事件表注册 socket 上的 读就绪 事件

2)主线程调用 epoll_wait 等待 socket 有数据可读

3)当 socket 有数据可读,epoll_wait 通知主线程,主线程从 socket 循环读取数据,直到没有更多数据可读,接着将读取到的数据封装成一个请求对象,并插入请求队列

4)睡眠在请求队列上的某个 工作线程 被唤醒,它获得请求对象并处理客户请求,然后往 epoll 内核事件表中,注册该 socket 的 写就绪事件

5)主线程调用 epoll_wait 等待 socket 可写

6)当 socket 有数据可写,epoll_wait 通知主线程,主线程往 socket 上写入服务器处理客户请求的结果

(5)并发编程模式

并发编程 分 多线程 和 多进程

但是,此处涉及的并发模式指的是:I/O 处理单元 与 逻辑单元 协同完成任务的方法

【Socket】两种高效事件处理模式&并发模式 - 掘金 (juejin.cn)

1)半同步 / 半异步模式

a. 同步读        b. 异步读??

2)领导者 / 追随者模式

(6)半同步 / 半反应堆

半同步 / 半异步 的变体,将 半异步 具体化为某种事件处理模式 

1)并发模式中的 同步 和 异步 

a. 同步:程序完全按照代码序列的顺序执行

b. 异步:程序的执行,需要由 系统事件 驱动

2)半同步 / 半异步模式 工作流程

a. 同步线程 -> 处理客户逻辑

b. 异步线程 -> 处理 I/O 事件

c. 异步线程监听到 客户请求  后,将其封装成 请求对象,并插入请求队列 

d. 请求队列通知某个工作在 同步模式的工作线程 来读取,并处理该请求对象

3)半同步 / 半反应堆 工作流程(Proactor 模式为例)

a. 主线程 充当 异步线程,监听所有 socket 的事件

b. 新请求到来,主线程接受后得到新的 连接 socket,然后往 epoll 内核事件表中,注册该 socket 的 读写事件

c. 如果 连接 socket 有 读写事件 发生,主线程从 socket 接收数据,并将数据封装成

请求对象 插入 请求队列

d. 所有 工作线程 睡眠在 请求队列 上,当有任务到来,通过 竞争(互斥锁)获得任务接管权

(7)线程池

1)空间换时间,浪费服务器硬件资源,换取效率

2)池,一组资源的集合,这组资源在服务器启动之初,就被创建并 初始化,即 静态资源

3)当 服务器 进入正式运行阶段,开始处理 客户请求 时,如果需要资源,可以直接从 池 获取,无需 动态分配

4)服务器 处理完 一个 客户连接 后,把资源放回 池,无需 释放资源(通过系统调用)

补充理解?? 

图解线程池原理与C++实现_线程池的工作原理c++及例子-CSDN博客

(8)静态成员   变量 && 函数 

1)静态成员变量 

static 关键字

无论建立多少个对象,都只有一个静态成员变量的   拷贝

静态成员变量属于  一个类,所有对象  共享

静态变量,编译阶段就分配了空间,对象还没创建时就分配了,放到了全局静态区 

  • 静态成员变量

    • 类内声明,类外初始化(以免类名访问静态成员访问不到)

    • 无论公有,私有,静态成员都可以在类外定义,但私有成员仍有访问权限

    • 非静态成员类外不能初始化

    • 静态成员数据是共享的

2)静态成员函数

 static 关键字

  • 静态成员函数

    • 静态成员函数可以直接访问静态成员变量,不能直接访问普通成员变量,但可以通过 参数传递 的方式访问

    • 普通成员函数可以访问普通成员变量,也可以访问静态成员变量

    • 静态成员函数没有this指针。非静态数据成员为对象单独维护,但 静态成员函数为共享函数无法区分是哪个对象,因此不能直接访问普通变量成员,也没有this指针

(9)pthread_create 陷阱

函数原型??

#include <pthread.h> // POSIX 线程库头文件,多线程相关
// 创建一个新线程
int pthread_create (pthread_t *thread_tid, // 线程 id
                    const pthread_attr_t *attr, // 线程属性
                    void * (*start_routine) (void *), // 线程函数的地址
                    void *arg); // start_routine() 的参数,类型 void*

关于 start_routine??

指向函数的指针,该函数是新线程的 入口点

它接受一个 void* 类型的参数

并返回一个 void* 类型的指针

书上解释?? 

是 pthread_create() 第 3 个参数,为函数指针

指向处理线程函数的地址

该函数,要求为静态函数

如果处理的线程函数是  类成员函数,需要将其设置为  静态成员函数

(10)this 指针的锅

pthread_create() 函数原型的第 3 个参数,类型是函数指针

指向的线程处理函数的参数类型为 void*

若线程函数为 类成员函数,则 this 指针会作为默认的参数被传入函数

从而和线程函数参数 (void*) 不匹配,不能通过编译,??因为

this 指针是一个指向类的非静态成员函数所在对象的指针,即它指向调用该函数的对象

而 void* 是 通用的指针类型,自然不匹配

静态成员函数就没有这个问题,因为没有 this 指针

?线程池 -- 代码分析

线程池的 设计模式 --> 半同步/半反应堆

其中,反应堆 --> Proactor 事件处理模式

a. 具体来说,主线程 -- 异步线程,负责监听 文件描述符,接受 socket新连接

b. 若当前监听的 socket 发生了 读写事件,就将任务插入 请求队列

c. 工作线程从请求队列取出任务,完成 读写数据 的处理

(1)线程池 -- 类定义

线程处理函数  &&  运行函数  ----  私有

template<typename T>
class threadpool {
    public:
        // thread_num 线程数量
        // max_requests 请求数量(请求队列中 最多允许 && 等待处理)
        // connPool 数据库连接池 指针
        threadpool(connection_pool *connPool, 
                   int thread_number = 8,
                   int max_request = 10000);
        ~threadpool();

        // 请求队列 插入任务请求
        bool append(T* request);

    private:
        // 工作线程运行的函数
        // 不断从工作队列取出任务  并执行
        static void *worker(void *arg);

        void run();

    private:
        // 线程数
        int m_thread_number;

        // 请求队列最大请求数
        int m_max_requests;

        // 描述线程池的数组,大小 m_thread_number
        pthread_t *m_threads;

        // 请求队列
        std::list<T *> m_workququq;

        // 保护请求队列的 互斥锁
        locker m_queuelocker;

        // 有任务需要处理
        sem m_queuestat;

        // 结束线程
        bool m_stop;

        // 数据库连接池
        connection_pool *m_connPool;
};

(2)线程池 -- 创建与回收

构造函数 -- 创建线程池

pthread_create() 将类的对象作为参数,传递给 静态函数(worker)

在静态函数引用这个对象,并调用其动态方法(run)

具体来说,类对象传递时用 this 指针,传递给静态函数后,转换为 线程池类,并调用私有函数 run()

解释

  1. :: 运算符(作用域解析运算符):

    • 在命名空间中,:: 用于访问全局命名空间中的变量、函数或类型。例如,std::cout 中的 std 就是命名空间,cout 是其中的一个成员。
    • 在类中,:: 用于访问类的静态成员函数、静态成员变量或枚举常量。例如,ClassName::staticMember 中的 ClassName 是类名,staticMember 是其中的一个静态成员。
    • 在派生类中,:: 还可用于指定基类的作用域。例如,BaseClass::member 中的 BaseClass 是派生类所继承的基类名,member 是基类的成员。
  2. : 运算符(成员初始化列表):

    • 在类的构造函数定义中,: 用于初始化类的成员变量,即在构造函数主体之前指定成员变量的初始值。例如,m_thread_number(thread_number) 中的 m_thread_number 是成员变量名,thread_number 是传入的参数值,在构造函数执行之前将参数值赋给成员变量。

总结:

  • :: 用于访问命名空间、类的静态成员和基类作用域。
  • : 用于构造函数中的成员初始化列表

代码 

template<typename T>
threadpool<T>::threadpool( connection_pool *connPool, 
                           int thread_number,
                           int max_requests) 
                           :
                           m_thread_number(thread_number),
                           m_max_requests(max_requests),
                           m_stop(false), m_threads(NULL),
                           m_connPool(connPool) {
    if (thread_number <= 0 || max_requests <= 0)
        throw std::exception();

    // 线程 id 初始化
    m_threads = new pthread_t[m_thread_number];

    if (!m_threads)
        throw std::exception();

    for (int i = 0; i < thread_number; ++i) {
        // 循环创建线程,并将工作线程按要求运行
        if (thread_create(m_threads + i, NULL, worker, this) != 0) {
            delete [] m_threads;
            throw std::excveption();
        }

        // 线程分离后,不用单独回收工作线程
        if (pthread_detach(m_threads[i])) {
            delete [] m_threads;
            throw std::exception();
        }
    }
}

(3)请求队列 -- 添加任务

list 容器创建 请求队列

向队列添加任务时,通过  互斥锁  保证线程安全

添加完后,通过  信号量  提醒 “有任务要处理”

最后注意线程同步

template<typename T>
bool threadpool<T>::append(T* request)
{
    m_queuelocker.lock();

    // 根据硬件,预先设置请求队列最大值
    if (m_workqueue.size() > m_max_reqquests) {
        m_queuelocker.unlock();
        return false;
    }

    // 添加任务
    m_workqueue.push_back(request);
    m_queuelocker.unlock();

    // 信号量 提醒有任务处理
    m_queuestat.post();
    return true;
}

代码中的线程同步??

  1. 使用了互斥锁 m_queuelocker.lock()m_queuelocker.unlock() 来保护对任务队列 m_workqueue 的访问,防止多个线程同时访问引起数据竞争。

  2. 使用信号量 m_queuestat.post() 来通知空闲线程有任务需要处理,避免了一个线程获取多个任务的情况,也确保每个任务都能得到及时处理

(4)线程处理函数

内部访问私有函数 run(),完成线程处理要求

template<typename T>
void* threadpool<T>::worker(void* arg) {
    // 参数强转线程池类,调用成员方法
    threadpool* pool = (threadpool*)arg;
    pool->run();
    return pool;
}

(5)run 执行任务

工作线程  从  请求队列  取出某个任务进行处理,注意线程同步

template<typename T>
void threadpool<T>::run()
{
    while(!m_stop) {
        // 信号量等待
        m_queuestat.wait();

        // 被唤醒后先加互斥锁
        m_queuelocker.lock();
        if (m_workqueue.empty()) {
            m_queuelocker.unlock();
            continue;
        }

        // 请求队列取 第一个任务
        // 任务从请求队列 删除
        T* request = m_workqueue.front();
        m_workqueue.pop_front();
        m_queuelocker.unlock();
        if (!request) continue;

        // 连接池中取出一个数据库连接
        request->mysql = m_connPool->GetConnection();

        // process(模板类中的方法,这里是 http 类) 进行处理
        request->process();

        // 数据库连接 放回连接池
        m_connPool->ReleaseConnection(request->mysql);
    }
}