并发编程第一篇

2020-03-22  本文已影响0人  明翼

这也是篇学习笔记,加上不少自己的理解。对于并发编程来说,想必做开发的同学都不会陌生。逻辑控制流在时间上是重叠的,就是并发。并发和并行是两个概念,并发多是指在一定的时间间隔内,看起来是同时运行,一般采用时间片,分时操作来完成不同进程的上下文切换,由于cpu运行速度很快,在我们看来整个程序是并行运行的,比如在单核cpu的机器上,也可以一边听歌,一边写文档。
并行那是不管在时间间隔内,微观上,在同一个时刻,多个线程在不同的核心上同时执行。
有些书上对这两个概念也定义的没那么准确,知道就行,也没有必要这么去扣字眼。本文所述的主要是并行,应用级的并行。

一 并行的必要性和实现方法

1.1 并行执行的必要性

1.2 并行实现方法

并行实现除了我们常说的多线程,多进程,还有一个就是IO多路复用,我个人觉得IO多路复用只能算是整体的并行,微观上算不上真正的并行,因为它是一个进程,共享同一个地址空间。只是通过将逻辑流转成状态机,根据不同的状态执行不同的操作,这个在高性能服务器开发领域用的比较多。

二 并行举例

用网络服务器来举例,还是比较好阐述应用的并行性。

2.1 基于进程的并发编程

网络编程嘛,那就是开发网络服务器服务多个客户端,对于基于进程的网络编程的办法很简单,每接受到一个连接来的时候,就可以开启一个新的进程为这个客户端服务器,当客户端关闭的时候进程销毁。
这种方式的网络服务器,一般比较稳定,但是支持并发数量少,另外由于进程的创建和销毁都是比较重的操作,所以以此方式开发的网络服务器性能一般不够好。对于进程开销大的问题,也可以通过类似与线程池的办法,先预先创建多个进程为即将到来的客户端服务器,比如Apache的prefork模式; 还有的是如果是内部连接数肯定不多的情况下,可以采用这种模式。


多进程

这种模式下需要注意点是:

  1. 在fork进程后,新进程是对老进程的数据进行了拷贝,这里面的数据包括地址空间,打开的文件描述符,程序计数器,还有程序执行的代码。所以父子进程需要关闭各自不需要的套接字。对于监听的父进程来说,创建子进程后不需要关心连接的套接字,所以可以直接关闭;对于被创建的子进程来说,不需要关心的是服务的套接字,所以需要关闭监听的套接字。
  2. 子进程执行完毕后,子进程需要被回收,通过注册事件处理来完成父进程对子进程的资源回收:
signal(SIGCHLD, sigchld_handler);  

子进程执行结束后,会发送SIGCHLD 信号给父进程,默认是忽略不处理此信号,如果不回收,子进程就成为僵尸进程,销毁系统资源,多了就把系统搞挂了。


#include "lib/common.h"

#define MAX_LINE 4096

char rot13_char(char c) {
    if ((c >= 'a' && c <= 'm') || (c >= 'A' && c <= 'M'))
        return c + 13;
    else if ((c >= 'n' && c <= 'z') || (c >= 'N' && c <= 'Z'))
        return c - 13;
    else
        return c;
}

void child_run(int fd) {
    char outbuf[MAX_LINE + 1];
    size_t outbuf_used = 0;
    ssize_t result;

    while (1) {
        char ch;
        result = recv(fd, &ch, 1, 0);
        if (result == 0) {
            break;
        } else if (result == -1) {
            perror("read");
            break;
        }

        if (outbuf_used < sizeof(outbuf)) {
            outbuf[outbuf_used++] = rot13_char(ch);
        }

        if (ch == '\n') {
            send(fd, outbuf, outbuf_used, 0);
            outbuf_used = 0;
            continue;
        }
    }
}

void sigchld_handler(int sig) {
   // 回收子进程资源
    while (waitpid(-1, 0, WNOHANG) > 0);
    return;
}

int main(int c, char **v) {
    int listener_fd = tcp_server_listen(SERV_PORT);
    signal(SIGCHLD, sigchld_handler);
    while (1) {
        struct sockaddr_storage ss;
        socklen_t slen = sizeof(ss);
        int fd = accept(listener_fd, (struct sockaddr *) &ss, &slen);
        if (fd < 0) {
            error(1, errno, "accept failed");
            exit(1);
        }

        if (fork() == 0) {
            // 子进程关闭服务套接字
            close(listener_fd);
            child_run(fd);
            exit(0);
        } else {
             // 父进程 关闭连接的客户端套接字,因为已经复制给子进程处理了
            close(fd);
        }
    }

    return 0;
}

说明,代码来自极客时间《网络编程实战》

2.2 基于多线程的并发模型

进程的创建比较耗时,另外不同的进程之间通信要复杂的多,所以很多服务器采用多线程的模式。
同一个进程的多线程,共享相同的地址空间,使得它们之间的通信更为方便,线程由内核自动调度,不同的线程有不同的上下文,不同线程交错运行的时候,需要发生上线问切换,每个线程都有自己的线程上下文,包括线程ID,栈,栈指针,程序计数器,通用的目标寄存器和条件码。
多进程之间也发生切换,但是上下文更重,没有线程切换快,而且线程没有进程中父子进程的说法,所有的线程都是对等的。
POSIX线程标准接口,比较常用的操作:

#include <pthread.h>
typedef void *(func)(void *);
// 线程的创建
int pthread_create(pthread t *tid, pthread attr t *attr, func *f, void *arg);
// 获取自身的线程ID
pthread t pthread_self(void);
// 终止线程,thread_return为线程返回值
int pthread_exit(void *thread_return);
// 终止pid的线程
int pthread_cancel(pthread t tid);
// 调用后会阻塞等待,直到线程tid终止,回收线程资源
int pthread_join(pthread t tid, void **thread return);
// 分离线程,默认线程是joinable状态,调用后为detached
int pthread_detach(pthread t tid);

pthread_once_t once_control= PTHREAD_ONCE_INIT;

int  pthread_once(pthread_once_t * once_control,void (*init_routine)(void));

说明:

  1. 线程创建后,默认为joinable的,joinable的线程意味着,这个线程可以被其他线程回收资源和杀死,在其他线程回收之前,它的存器资源比如栈等是不会被释放的。当pthread_detach一个线程后,线程就是分离的,不能被其他线程回收和杀死,它的存器资源在终止时,被系统自动释放。
  2. pthread_once 此函数我以前很少用到,这个函数是为了多线程初始化使用,多个线程执行的时候如果用同一个once_control调用pthread_once,则init_routine只会被调用一次。在初始化多线程的时候有用。

相对于多进程来说,多线程共享数据更方便,多线程的共享部分包括整个进程的虚拟存储区域,它是由只读文本,读写数据,堆,共享的代码和数据区域组成,线程间也共享所有打开的文件集合。
所有的线程上下文的数据显然是无法共享的,比如不同线程的栈数据等。
这种共享带来了方便,也带来了麻烦,那就是共享如果不做控制,容易造成覆盖和错乱,因为共享所以多个线程可以同时访问,如果不做控制一个线程设置的结果可能会被另外一个线程所覆盖。

全局变量,本地的静态变量均是共享的,像函数内的局部变量,如果没有通过指针的方式传递给其他线程,则是非共享的,不同的线程中有不同的函数内的局部变量。

简单的多线程服务器如下:


#include "lib/common.h"

extern void loop_echo(int);

void thread_run(void *arg) {
    pthread_detach(pthread_self());
    int fd = (int) arg;
    loop_echo(fd);
}

int main(int c, char **v) {
    int listener_fd = tcp_server_listen(SERV_PORT);
    pthread_t tid;
    
    while (1) {
        struct sockaddr_storage ss;
        socklen_t slen = sizeof(ss);
        int fd = accept(listener_fd, (struct sockaddr *) &ss, &slen);
        if (fd < 0) {
            error(1, errno, "accept failed");
        } else {
            pthread_create(&tid, NULL, &thread_run, (void *) fd);
        }
    }

    return 0;
}


char rot13_char(char c) {
    if ((c >= 'a' && c <= 'm') || (c >= 'A' && c <= 'M'))
        return c + 13;
    else if ((c >= 'n' && c <= 'z') || (c >= 'N' && c <= 'Z'))
        return c - 13;
    else
        return c;
}

void loop_echo(int fd) {
    char outbuf[MAX_LINE + 1];
    size_t outbuf_used = 0;
    ssize_t result;
    while (1) {
        char ch;
        result = recv(fd, &ch, 1, 0);

        //断开连接或者出错
        if (result == 0) {
            break;
        } else if (result == -1) {
            error(1, errno, "read error");
            break;
        }

        if (outbuf_used < sizeof(outbuf)) {
            outbuf[outbuf_used++] = rot13_char(ch);
        }

        if (ch == '\n') {
            send(fd, outbuf, outbuf_used, 0);
            outbuf_used = 0;
            continue;
        }
    }
}

这个简单的服务器没什么可以说的,现实中更多是采用线程池结合队列的方式,整体的架构如下:


多线程架构图

这个模式也比较简单,就是客户端连接之后,主线程将socket描述符加入到socket缓存队列中,worker线程从socket缓存队列中取出套接字,然后提供服务。这是个典型的生产者消费者模式,由于主线程和工作线程之间有共享的队列,所以需要加锁;同时由于队列大小有限制,则需要通过信号量等方式控制,比如等队列空了之后,主线程才可以放,等队列中有数据了,worker线程才可以工作。

2.3 基于IO的多路复用的并发编程

我们只所以在前面中采用多线程或多进程方式来实现程序,是因为有多个套接字需要同时服务,而这些套接字操作是阻塞的,没办法进行进行同时处理。如果我们可以把这些套接字放在一起统一进行监控起来,任何套接字有请求我们都知道,根据请求时间的不同进行回调不同的函数进行处理,这就是IO多路复用,现在的高性能的服务器一般都是通过IO多路复用这种方式实现的。

如果是非阻塞的IO,我们可以通过轮询的方式,遍历套接字结合,找出需要进行IO处理的套接字,进行处理,但是这种方式有个缺点就是如果套接字的数据量很多,那么遍历一次会很耗时,在遍历的过程中,如果有其他的客户端发起请求,则无法响应,而且为了快速响应请求,cpu必须在极短的时间间隔内做遍历,消耗大量无用的cpu资源。

现在我们可以通过将套接字集合交给操作系统,系统来判断是否有套接字有IO事件的发生,发生了则返回或超时返回;比如select,poll,epoll等IO分发技术均可以实现这种多路IO复用。
select 函数说明如下:

#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>

int select(int nfds, fd_set *readfds, fd_set *writefds,
           fd_set *exceptfds, struct timeval *timeout);
//从集合中删除指定的fd描述符
void FD_CLR(int fd, fd_set *set); 
//判断指定的fd描述符是否存在于集合之中
int  FD_ISSET(int fd, fd_set *set); 
//将指定的fd添加到集合之中
void FD_SET(int fd, fd_set *set);
//初始化集合
void FD_ZERO(fd_set *set); 

select 操作的是称为fd_set的集合,逻辑上我们可以将集合看成一个有n位掩码的,哪一位设置为1,说明哪一位才是描述符集合中的一个元素。
使用select函数,要求内核挂起进程,当一个或多个io事件发生的时候,才会返回发生事件的IO集合,然后我们通过用FD_ISSET来判断fd是否在集合中。

参数说明:nfds 标识最大的描述符+1 ; readfds让内核检测这个集合上的可读事件;writefds是让内核检测的可写集合上的事件;exceptfds让内核检测异常的套接字的集合。


int main(int argc, char **argv) {
    if (argc != 2) {
        error(1, 0, "usage: select01 <IPaddress>");
    }
    int socket_fd = tcp_client(argv[1], SERV_PORT);

    char recv_line[MAXLINE], send_line[MAXLINE];
    int n;

    fd_set readmask;
    fd_set allreads;
    FD_ZERO(&allreads);
    FD_SET(0, &allreads);
    FD_SET(socket_fd, &allreads);

    for (;;) {
        readmask = allreads;
        int rc = select(socket_fd + 1, &readmask, NULL, NULL, NULL);

        if (rc <= 0) {
            error(1, errno, "select failed");
        }

        if (FD_ISSET(socket_fd, &readmask)) {
            n = read(socket_fd, recv_line, MAXLINE);
            if (n < 0) {
                error(1, errno, "read error");
            } else if (n == 0) {
                error(1, 0, "server terminated \n");
            }
            recv_line[n] = 0;
            fputs(recv_line, stdout);
            fputs("\n", stdout);
        }

        if (FD_ISSET(STDIN_FILENO, &readmask)) {
            if (fgets(send_line, MAXLINE, stdin) != NULL) {
                int i = strlen(send_line);
                if (send_line[i - 1] == '\n') {
                    send_line[i - 1] = 0;
                }

                printf("now sending %s\n", send_line);
                size_t rt = write(socket_fd, send_line, strlen(send_line));
                if (rt < 0) {
                    error(1, errno, "write failed ");
                }
                printf("send bytes: %zu \n", rt);
            }
        }
    }

}

初始的时候fd_set 通过FD_ZERO 来进行初始化,结果如下图:


初始化套接字

接着将标准输入的套接字加入进去,如下图:


标准输入加入到套接字集合中
需要特别注意的是:
       readmask = allreads;

因为select每次返回的时候,返回的是准备好的套接字集合,同时也会修改注册在select中的函数,所以每次都需要重新赋值。

select 函数虽然可以实现多路复用,但是有不少缺点:

暂时写到这里吧,下次可以接着写select的改进相关。

上一篇下一篇

猜你喜欢

热点阅读