高并发服务器 poll模型 非阻塞 讲解

代码还没有实测。已经修改了。

B站就业班视频代码搬运

对应课程

1.函数原型

int poll(struct pollfd *fds, nfds_t nfds, int timeout);

 poll 模型和select模型的作用相似,用于I/O 多路复用

1.1 函数参数;

1.1.1    struct pollfd *   fds

是结构体pollfd 组成的 数组的首地址,

struct pollfd

{

int fd;

short events;

short revents;

};

(1)int fd :文件描述符。如果fd == -1 表明内核不再监控。

(2)short events :输入参数,让内核去监视的事件,就像是 select 中让内核去监视读事件,写事件,异常事件

(3)short revents :输出参数,表示告诉内核,有哪些事件发生了改变。

调用 poll 函数,当监视的文件描述符有事件发生,内核修改的是 revents,而不修改 events 和 fd,这样做events 和 fd 就可以重用;poll 返回的是 revents 。

1.1.2   int ndfs:

表示内核监控的范围,具体:数组最大下标  +1

也就是说,现在数组开头6个文件描述符,  0  , 100, 200, 0,300, 400,然后后面的文件描述符都是0 ,ndfs就是 5 ,因为最大的不是0的文件描述符400的下标是5 ,表示poll监视着6个的变化。所以这个参数的含义是:第一个参数中,让内核监控的文件描述符的个数

1.1.3 timeout 单位是毫秒:  

=0  不阻塞,立刻返回

 -1  表示一直阻塞,直到有事件发生,

 > 0 表示阻塞时长,在时长范围内如果有事件发生会立刻返回,如果超时,也会立刻返回,

POLLIN  表示监控读事件

POLLPRI 紧急事件去读

POLLOUT  可写事件

POLLERR  错误事件。但是仅仅返回在revents , 在events 中错误被忽略了。

读事件,客户端有连接过来,才能读, 而写事件基本上用不着监控(主动写发送的),除非写的太多把缓冲区写满了。。

1.2.   返回值

一个正数, 代表revents 不是0的 个数,表示的意思是有事件返回或者有错误返回的文件描述符的总数。

如果返回0,表示没有任何文件描述符有变化。

如果返回-1 ,表明有一个错误errno没有被正确设置。

2.使用pol l 模型实现服务器的流程

  1. 创建socket ,得到监听文件描述符 lisenfd;     --socket()
  2. 设置端口复用  --setsockopt();
  3. 绑定 --bind ()
  4. 定义pollfd结构体的数组fdarray[1024]

第一次写的代码,不知道怎么的有BUG ,第一个开启的客户端完全正常,后面再连接的客户端,打字了服务器没有丝毫反应,必须等客户1的信息完成后,它们之前发的信息才全部从服务器返回,但是看网络状态netstat -a | grep '8888' 又是完全正常的,都是established 状态。

原因已经找到,poll的第二个参数写错了 。。等待明天修改

正确的写法:老师设置了一个变量maxindex, 表示poll函数的第一个参数中,不为-1的 文件描述符的那个最大位置的下标,例如, 这里有8个文件描述符, 

-1, 20,-1,-1,10, 1, -1,    3,  -1那么maxindex =7, 指向那个3  。

maxindex +1 就是poll函数的第二个参数,写进去。表示内核监控 这8个文件描述符。最后那个-1 不管( 数组里,都是下标从0 开始,最后一个 - 1  下标是 8)

//poll模型  第1次代码, 
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <ctype.h>
#include <errno.h>
#include <poll.h>
int main() {
    int sfd = socket(AF_INET, SOCK_STREAM, 0); 
    //设置端口复用
    int opt =1;
    setsockopt ( sfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(int));
   
    //定义一个服务器地址的 结构体,利用本机任意网络地址接口,端口为8888
    struct sockaddr_in servad;
    bzero(&servad, sizeof(servad));
    servad.sin_family =AF_INET;
    servad.sin_port =htons(8888);
    servad.sin_addr.s_addr = htonl (INADDR_ANY);
 
    int ret = bind(sfd, (struct sockaddr*)&servad, sizeof( servad));
 
    //将socket从主动变为被动(服务器必备),监听来自客户的请求
    listen(sfd,128);  

    //定义pollfd 结构体,并建立一个1024长度的数组fdarray ,
    struct pollfd fdarray[1024];

    //把all文件描述符都变成-1
    int i =0;
    for (i =0; i<1024; i++){
	    fdarray[i].fd = -1;
    }

    fdarray[0].fd = sfd;    
    fdarray[0].events = POLLIN;
    //zhuy注意别写成revents了,现在监听呢,没有什么返回,没有revents

    int newfd ;
    int nready =0;
    int sockfd;
    int maxindex =0;
    while (1) {

        //因为超时时间设置为NULL ,所以进程会无限阻塞在这一步,除非有了变化才往下走
        nready = poll (fdarray, maxindex+1, -1);

        //第二个参数每次有变化的描述符+1 ,第二个参数也跟着加1, 表明客户端连接来的
         //越多,内核监控的描述符跟着增多 

        if (nready < 0) {
	        if (errno == EINTR){
		        continue;
	        }
	        break;
	    }
        //第一种情况,有客户端的新请求
        //但是我现在不懂为什么新连接请求是pollin 这不是可写事件
	    if (fdarray[0].revents == POLLIN) {
	        newfd = accept (sfd,NULL, NULL);
            //寻找在fdarray中最靠近的不是-1的位置方下newfd
	        for (i =0;i<1024;i++){
		        if( fdarray[i].fd ==-1){
		            fdarray[i].fd = newfd;
		            fdarray[i].events = POLLIN;
		            break;
		        }
	        }
	        if ( i==1024){
		        close (newfd);
		        continue;
 	        }
            if (maxindex <i ){
                maxindex =i;
            }
	    }

	    //第二种情况,眼前的描述符,有可读事件发生 
            //为什么不从0开始遍历?0是你的监听描述符
 	    for (i =1;i <1024;i++){
	        if (fdarray[i].fd ==-1 ){
		        continue;
	        }
	        // i如果不能使用是要关闭的,为了避免代码变化,用sockfd代替i 
	        sockfd = fdarray[i].fd;
            char buf[256];
            //读数据
            memset(buf,0x00,sizeof(buf));
            int n =read (sockfd, buf, sizeof(buf));
            if (n <=0){ 
                printf("这里是服务器端, read error or client close
"); 
                close (sockfd);
		        fdarray[i].fd =-1;
	        }
            else {
                printf ("这里是服务器端n =%d ,读到的是%s 
", n,buf);
                //将小写字母都转化wie大写
                for (int j =0; j<n; j++) { 
                    buf[j] =toupper (buf[j]); 
                }    
                       //发送数据
                write (sockfd, buf, n);
            }
         }
    }

close (sfd);
return 0;
}

3. 自带手册里的代码

#include <poll.h>
#include <fcntl.h>
#include <sys/types.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#define errExit(msg)    do { perror(msg); exit(EXIT_FAILURE); 
                               } while (0)

int main(int argc, char *argv[]){
    int nfds, num_open_fds;
    struct pollfd *pfds;
    if (argc < 2) {
        fprintf(stderr, "Usage: %s file...
", argv[0]);
        exit(EXIT_FAILURE);
    }
    num_open_fds = nfds = argc - 1;
    pfds = calloc(nfds, sizeof(struct pollfd));
    if (pfds == NULL){
               errExit("malloc");
    }
           /* Open each file on command line, and add it 'pfds' array */
    for (int j = 0; j < nfds; j++) {
        pfds[j].fd = open(argv[j + 1], O_RDONLY);
        if (pfds[j].fd == -1){
            errExit("open");
        }
        printf("Opened "%s" on fd %d
", argv[j + 1], pfds[j].fd);
        pfds[j].events = POLLIN;
    }

      /* Keep calling poll() as long as at least one file descriptor is open */
    while (num_open_fds > 0) {
        int ready;
        printf("About to poll()
");
        ready = poll(pfds, nfds, -1);
        if (ready == -1){
            errExit("poll");
	    }
        printf("Ready: %d
", ready);
        /* Deal with array returned by poll() */
        for (int j = 0; j < nfds; j++) {
            char buf[10];
            if (pfds[j].revents != 0) {
                printf("  fd=%d; events: %s%s%s
", pfds[j].fd,
                  (pfds[j].revents & POLLIN)  ? "POLLIN "  : "",
                  (pfds[j].revents & POLLHUP) ? "POLLHUP " : "",
                  (pfds[j].revents & POLLERR) ? "POLLERR " : "");
	        }
            if (pfds[j].revents & POLLIN) {
                ssize_t s = read(pfds[j].fd, buf, sizeof(buf));
                if (s == -1)
                               errExit("read");
                printf(" read %zd bytes: %.*s
",s, (int) s, buf);
            }
	        else {                /* POLLERR | POLLHUP */
                printf("    closing fd %d
", pfds[j].fd);
                if (close(pfds[j].fd) == -1)
                      errExit("close");
                num_open_fds--;
            }
        }
    }
    printf("All file descriptors closed; bye
");
    exit(EXIT_SUCCESS);
}

/* Suppose we run the program in one terminal, asking it to open a FIFO:

           $ mkfifo myfifo
           $ ./poll_input myfifo

       In a second terminal window, we then open the FIFO for writing, write some data to  it,  and  close  the
       FIFO:

           $ echo aaaaabbbbbccccc > myfifo

       In the terminal where we are running the program, we would then see:
*/

这段代码首先定义了 2个整数 nfds  和 num_open_fds ,让它们等于你写的参数的个数,。也就是说它默认你必须在命令行写出   ./脚本名   文件名  (文件名可以不止一个),你写几个文件,这篇代码就打开几个文件,然后把open函数返回的 fd 文件描述符,全部纳入内核的监控,然后让poll 函数管理。

       然后当这些描述符有变化时,先判断事件类型

 (pfds[j].revents & POLLIN)  ? "POLLIN "  : "",

这行的意思是,返回结果revents和POLLIN这个宏,做“ 按位与” 运算,如果结果是POLLIN ,说明revents 就是POLLIN, 那么这行字符串结果就是 'POLLIN'   不然这个字符串就是 “”

,然后当revents 就是POLLIN时,开始读取,你刚才在另一终端写的东西都会体现在当前终端的屏幕上。