linux 系统调用

todo

1.攻克linux L10/L11/L7/L8/L16/L17/L18/L19

2.L8信号量的没看懂

3.L13内存锁定和保护

4.不太明白:I/O复用不熟悉,内存高级应用,信号量高级应用,进程优先级,

快速回忆系统调用和一些零碎的shell命令

linux基础

文件系统

基本函数

int lseek(int fd, off_t offset, int whence);

1
2
lseek(fd,10*sizeof(int),SEEK_SET);//注意大小是字节
//我们的offset指针被文件保存,下方的操作都是按照隐藏的指针来读写

int open(char *pathname )

int write(int fd, void *buf, size_t count);

int read(int fd, void *buf, size_t count);

int close(int fd);

!查看具体使用:多进程

缓冲函数

内核上的缓冲

int fdatasync(int fd);//刷新数据到磁盘
int fsync(int fd);//刷新文件到磁盘

应用层的缓冲

有一个获取文件信息的系统调用 stat。其获得的文件属性信息中,有一项建议了文件 IO
缓存大小,低于此值的缓存大小会被认为是低效的。其函数原型为:

int stat(const char * pathname, struct stat *statbuf);

这一项正是 struct stat 结构体中的 st_blksize 字段。设置应用层的缓存大小,至少不小于该字段给出的数值。

此外, glibc 中提供的 fread 和 fwrite 函数,其内部都维护了一个数据缓存,用来尽量减少系统调用次数
。默认选择的缓存大小已进行了充分优化。如果还是不满意,可以用 glibc 的 setvbuf 和 setbuffer
函数自定义缓存大小和缓存行为。这两个函数的原型分别为:

int setvbuf(FILE *stream, char *buf, int mode, size_t size);
void setbuffer(FILE *stream, char *buf, size_t size);

其中的 setvbuf 函数允许开发者指定缓冲方式,主要有以下三种可选方式。

  • _IONBF:不缓冲,标准错误输出默认选择该缓冲方式,以保证错误信息及时输出来。
  • _IOLBF:行缓冲,也就是遇到换行符时,对之前的内容执行 read、write 系统调用,终端设备默认执行该缓冲方式。
  • _IOFBF:全缓冲,也叫块缓冲,当指定大小的缓冲区满了之后,才会触发调用一次系统调用,磁盘文件默认使用该缓冲方式。同时,glibc 还提供了 fflush 函数,应用可以在缓冲区数据满之前,手动将数据刷新到内核缓冲区。

出于性能上的考虑,读写磁盘文件应该使用 fread 和 fwrite 函数,而不是直接使用 read 和 write

混合使用glibc和系统调用

前面讨论过,为了尽量减少系统调用的次数,我们推荐使用 glibc 的 fread 和 fwrite 函数操作文件,这两个函数需要的参数是 FILE 类型。

如果我们既希望使用系统调用控制数据同步和内核缓冲行为,又需要使用以整数型为参数的系统调用,该如何混合使用两者操作同一个文件呢?

其实,C 标准库提供了实现两者间互相转换的函数:

int fileno(FILE *fp)
FILE * fdopen(int fd, const char * mode)

其中 fdopen 中的文件模式需要和 open 打开文件时的模式相同,否则会失败。

EXT2

文件扩展属性

文件系统监控

虚拟文件系统

管道

无名管道

int pipe(int fds[2]);//0代表读取,1代表写入,如果返回了2,那么代表错误

1
2
3
//我们通常使用两个进程之间的管道进行读和写,因此务必注意单向的情况下close不需要的窗口fd
//linux中预留给管道的是一个环形缓冲区,大小有16页,因此每次写入大小不要超过一页,否则管道函数会重新寻找新的页而不是顺序存放,并且读的时候需要保证乱序拼接消息,容易出错。
//如果环形缓冲区满,那么写进程会睡眠,直到空间可用。

dup&dup2(int fd);

有名管道FIFO

可以保证不同进程空间之间的文件访问(而不是无名下的父子之间共享的fd来文件访问)。

创建 FIFO 的系统调用如下:

int mkfifo(const char *pathname, mode_t mode);

FIFO 的实现与行为和管道非常相似

!更多IPC看多进程

内存

内存映射

基本使用

实际上内存映射就是创建进程中的部分过程被包装成系统调用给了用户来使用:加载磁盘(文件)并找到物理内存直接映射到用户空间而已。这样进程I/O就不再需要多次系统调用了:不需要内核缓冲区,变相扩大了应用层缓冲区,牺牲一点空间换时间。同时也引入了零拷贝:通过PGFLT来增加映射。

void * mmap(void *addr, size_t length, int prot, int flags, int fd, off_t offset);

其中:

  • 参数 addr 指定期望的映射目标地址,如果给 addr 传递 NULL,则表示让内核选择一个合适的地址;
  • 参数 length 指定要映射的数据的字节数,内核会把它对齐到内存页大小的下一个倍数;
  • 参数 prot 指定地址映射之后的访问权限,它的取值可以是 PROT_NONE,或者 PROT_READ、PROT_WRITE、PROT_EXEC 几种值的任意组合;
  • 参数 fd 指定要映射的文件;
  • 参数 offset 指定映射内容在文件中的偏移量。

flags 参数比较复杂,它指定执行内存映射时使用的操作选项,不同选项的组合适用于不同的应用场景。 其中有两个选项是互斥并且是必选项。

  • MAP_PRIVATE:私有映射。在这种映射模式下,映射到内存中的内容会根据文件内容进行初始化,但是内存中内容的变化不会更新到磁盘文件中去,而且,映射建立之后,一旦应用对内存内容有更新,内核会使用写时拷贝技术为被修改的内存页创建新的副本。

  • MAP_SHARED:共享映射。在这种映射模式下,映射后内存内容的修改会更新到磁盘文件内,而且,如果有多个进程映射了同一个磁盘文件的相同区域,那么其中任意一个进程对内存内容的修改都会立即被其他进程可见。

除了这两个互斥的必选项之外,还可以组合使用一些其他的操作选项,常见的比如有
MAP_ANONYMOUS(创建匿名映射)、MAP_FIXED(使用指定的固定地址执行映射)等,了解更多请读者自行查看帮助手册。

glib的应用

在 glibc 的内存分配函数 malloc 的内部,有时会 使用 mmap 函数代替堆内存边界调整函数 brk。

用 mmap 分配内存的 好处在于,可以独立使用 munmap 高效地解除映射 ,映射解除后,应用占用的虚拟内存总量就会降下来。

而如果使用 brk 调整边界的方式,当释放的内存不在堆内存边界的时候,堆内存的最高水位线是降不下来的。

但是 mmap 也有缺点,最大缺点就是系统开销比较大 ,因为需要经过内核,内核先要查找合适的映射地址,还会把分配的内存内容都置为
0,再返回给应用层。

所以,在 glibc 的设计中, 只有生命周期比较长的大块内存才适合使用 mmap 进行分配 。在最新的 glibc 实现中:

  • 超过 32 MB 的内存块会被认为是大内存块,且总是会使用 mmap 分配;
  • 小于 128 KB 的内存申请,总是会在动态堆内存区进行分配;
  • 大小在 128 KB 到 32 MB 之间的内存申请,会受到动态调整的分界水位线的影响,水位线之下的在动态堆内存区分配,高于水位线的用 mmap 分配,而水位线的调整时机,就是在 mmap 分配的内存区上执行 free 操作时。

高级用法

1.共享内存的实现

在执行内存映射时, 使用 MAP_SHARED 选项 ,并关联到相同的磁盘文件,就 可以创建在多个进程间共享的内存块。

这样创建的映射内存与 IPC 的共享内存很相似,不同之处在于 它会同时把内存中的更新保存到磁盘文件上,所以可以提供数据的持久化保存功能。

如果要在父子进程之间共享内存块,而不需要数据持久化功能,还可以使用共享的匿名映射(MAP_SHARED | MAP_ANONYMOUS)。

与使用共享内存的方式相比,以这样的方式创建的共享内存块,不会显示在 IPC
的输出列表里,而且可以把共享范围严格限制在父子进程之间,因而更加适合有私密性要求的数据的共享。

2.多个文件映射内存,优化逻辑处理

flags 还有一个 应用比较多的选项 MAP_FIXED ,它表示把内存映射到某个指定的固定位置。使用这个选项,就可以
把分布在多个文件,或者单个文件不同位置处的内容,映射到连续的内存地址处 ,如此处理之后就可以使后续的处理逻辑得以简化。

如果使用的是 MAP_SHARED 与 MAP_FIXED 组合的方式,还可以实现内存中更新的内容自动保存到相应文件的功能。

在使用 MAP_FIXED 选项时,通常会与匿名私有映射配合使用,也就是首先以如下方式让操作系统帮助我们选择一个与当前虚拟地址不冲突的映射地址:

void * addr = mmap(NULL, length, prot_flag, MAP_PRIVATE | MAP_ANONYMOUS, 0);

然后以 addr 地址为基地址,在映射内存的相对位置上使用 MAP_FIXED 标记安排后续每块数据的映射位置。

共享内存

共享内存shm和内存映射mmap一点点不一样:前者用来共享,后者为了优化内核访问。

实际上共享内存就是针对两个进程的内存映射:寻找物理页,映射时共享页表项(可写的COW)为了IPC而已。

1
2
3
4
5
6
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/shm.h>
int shmget(key_t key, size_t size, int shmflg);
void *shmat(int shmid, const void *shmaddr, int shmflg);
int shmdt(const void *shmaddr);
  • shmget() 函数 创建或获取一块指定大小(size)的共享内存 ,key 和 shmflg 的意义与消息队列函数中的 key 和 flag 类似:都是内核里的id而不是自定义
  • shmat() 将指定的共享内存附加到进程的线性地址空间内 ,可以指定起始线性地址(shmaddr),而更常见的做法是让内核决定起始地址(shmaddr == NULL)。函数成功执行后,返回值是该共享内存附加到进程的线性起始地址。这两步操作成功之后,进程就可以像使用其他内存一样使用这块内存区。如果还有其他进程附加了该共享内存,任意进程对内存区域的修改对其他进程都是可见的。基于此种数据交换方式,共享内存通常可与信号量配合使用, 实现临界区的一致性保护 ,除非在其上实现的是某种无锁的数据结构。
  • shmdt() 函数用于 将共享内存段从当前进程中分离

如果附加共享内存时让操作系统决定起始地址,进程多次运行时选择的起始地址将不固定。若要存储指向共享内存区内某数据对象的地址,
应使用偏移量形式,而不能直接存储绝对地址

另外,共享内存的生存周期与进程内存不同,共享内存会在进程退出之后仍被系统保留。因此,如果共享内存中有指向进程内存的指针,应该在进程重启时重置。

共享内存的各种限制同样可以通过内核参数设置,如下所示。

  • /proc/sys/kernel/shmmni:可以创建的共享内存块的数量。
  • /proc/sys/kernel/shmmax:共享内存段的最大容量,实际上限同时依赖物理内存和交换空间的大小。
  • /proc/sys/kernel/shmall:系统中所有共享内存的分页总数上限,同时受限于物理内存和交换空间的大小。

另外,附加共享内存实际上只是把内存页表指向特定的物理内存页,在使用 fork()
创建子进程之后,这些数据也会被复制一份,所以,子进程会继承父进程附加的共享内存段。而当 exec() 成功执行后,共享内存段会在新进程中被分离。

使用共享内存的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
进程A
#include <stdio.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/shm.h>

#define SHM_PATH "/tmp/shm"
#define SHM_SIZE 128

int main(int argc, char *argv[])
{
//init
int shmid;
char *addr;
key_t key = ftok(SHM_PATH, 0x6666);
//create shm
shmid = shmget(key, SHM_SIZE, IPC_CREAT|IPC_EXCL|0666);
if (shmid < 0) {
printf("failed to create share memory\n");
return -1;
}
//map shm
addr = shmat(shmid, NULL, 0);
if (addr <= 0) {
printf("failed to map share memory\n");
return -1;
}
//easily op to shm
sprintf(addr, "%s", "Hello World\n");

return 0;
}
进程B
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/shm.h>

#define SHM_PATH "/tmp/shm"
#define SHM_SIZE 128

int main(int argc, char *argv[])
{
int shmid;
char *addr;
key_t key = ftok(SHM_PATH, 0x6666);

char buf[128];
//just map shm
shmid = shmget(key, SHM_SIZE, IPC_CREAT);
if (shmid < 0) {
printf("failed to get share memory\n");
return -1;
}

addr = shmat(shmid, NULL, 0);
if (addr <= 0) {
printf("failed to map share memory\n");
return -1;
}

strcpy(buf, addr, 128);
printf("%s", buf);

return 0;
}

内存管理

进程

创建进程

fork

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
多进程模型的网络服务程序中,为什么要在子进程中关闭监听套接字,同时要在父进程中关闭新连接的套接字呢?

原因在于 fork() 执行之后,所有已经打开的套接字都被增加了引用计数,在其中任一个进程中都无法彻底关闭套接字,只能减少该文件的引用计数。因此,在
fork() 之后,每个进程立即关闭不再需要的文件是个好的策略,否则很容易导致大量没有正确关闭的文件一直占用系统资源的现象。

**再比如** ,下面这段代码是否存在问题?为什么在输出文件中会出现两行重复的文本?




int main()
{
FILE * fp = fopen("output.txt", "w");
fputs("Message in parent\n", fp);
switch(fork())
{
case -1:
perror("fork failed");
return -1;
case 0:
fputs("Message in Child\n", fp);
break;
default:
break;
}
fclose(fp);
return 0;
}



[root@TealCode process]# cat output.txt
Message in parent
Message in parent
Message in Child


原因是 fputs 库函数带有缓冲,fork() 创建的子进程完全拷贝父进程用户空间内存时,fputs 库函数的缓冲区也被包含进来了。所以,fork()
执行之后,子进程同样获得了一份 fputs 缓冲区中的数据,导致“Message in
parent”这条消息在子进程中又被输出了一次。要解决这个问题,只需在 fork() 之前,利用 fflush 刷新到内核缓冲并放弃应用层缓冲,因此进程的缓冲就不会被包含。

另外,利用父子进程共享相同的只读数据段的特性,是不是可以实现一套父子进程间的通信机制呢?
A:父全局定义一个数据并实时修改,子读取即可

exec:#include <unistd.h>,只有在错误的时候返回

​ int execve(const char *filename, char *const argv[], char *const envp[]);//工业用

filename 用于指定要运行的程序的文件名,argv 和 envp
分别指定程序的运行参数和环境变量。除此之外,该系列函数还有很多变体,它们执行大体相同的功能,区别在于需要的参数不同,包括
execl、execlp、execle、execv、execvp、execvpe 等。它们的参数意义和使用方法请读者自行查看帮助手册。

​ Int execv(const char* path, char *const argv[]);//自己用

1
2
3
4
5
if(execv(strcat(pth, ecmd->argv[0]), ecmd->argv) < 0){//注意:第一个指针通常被忽略,默认认为是文件名,
printf("%s", pth);
fprintf(stderr, "execv not implemented\n");//<stdio.h>,即向终端输出信息
_exit(0);
}

对于执行 exec() 函数的应用,应该 总是使用内核为文件提供的执行时关闭标志(FD_CLOEXEC) 。设置了该标志之后,如果
exec() 执行成功,文件就会被自动关闭;如果 exec() 执行失败,那么文件会继续保持打开状态。使用系统调用 fcntl() 可以设置该标志。

fexecve() 函数

glibc 从 2.3.2 版本开始提供 fexecv() 函数,它与 execve() 的区别在于,
第一个参数使用的是打开的文件描述符,而非文件路径名

增加这个函数是为了满足这样的应用需求:有些应用在执行某个程序文件之前,需要先打开文件验证文件内容的校验和,确保文件内容没有被恶意修改过。

监视进程

  • pid_t wait(int * statua)

一直阻塞地等待任意一个子进程退出,返回值为退出的子进程的 ID,status 中包含子进程设置的退出标志。

  • pid_t waitpid(pid_t pid, int * status, int options)

可以用 pid 参数指定要等待的进程或进程组的 ID,options 可以控制是否阻塞,以及是否监控因信号而停止的子进程等。

  • int waittid(idtype_t idtype, id_t id, siginfo_t *infop, int options)

提供比 waitpid 更加精细的控制选项来监控指定子进程的运行状态。

  • wait3() 和 wait4() 系统调用

可以在子进程退出时,获取到子进程的资源使用数据。

即使父进程在业务逻辑上不关心子进程的终止状态,也需要使用 wait 类系统调用的底层原因:父进程使用wait真正杀死一个子进程,因此无论是kill命令,或者子进程在wait之前就终止了,仍然需要用wait来杀死子进程,在这之间就是僵尸状态。如果不wait,那么从内核资源角度看就会被大量占用,导致寻找free进程失败。如果父进程也进入僵尸状态,那么就会使用init进程来处理这种情况。

进程终止

正常终止一个进程可以用 _exit 系统调用来实现,原型为:

void _exit(int status);

其中的 status 会返回 wait() 类的系统调用。 进程退出时会清理掉该进程占用的所有系统资源
,包括关闭打开的文件描述符、释放持有的文件锁和内存锁、取消内存映射等,还会给一些子进程发送信号(后面课程再详细展开)。该系统调用一定会成功,永远不会返回。

在退出之前,还希望做一些 个性化的清理操作 ,可以使用库函数 exit() 。函数原型为:

void exit(int status);

这个库函数先调用退出处理程序,然后再利用 status 参数调用 _exit() 系统调用。这里的退出处理程序可以通过 atexit() 或
on_exit() 函数注册。其中 atexit() 只能注册返回值和参数都为空的回调函数,而 on_exit()
可以注册带参数的回调函数。退出处理函数的执行顺序与注册顺序相反。它们的函数原型如下所示:

int atexit(void (*func)(void));
int on_exit(void (*func)(int, void *), void *arg);

通常情况下, 个性化的退出处理函数只会在主进程中执行一次 ,所以 exit() 函数一般在主进程中使用,而在子进程中只使用 _exit()
系统调用结束当前进程。

多进程

IPC

4种IPC:

  • 管道与 FIFO
  • 消息队列!
  • 信号和信号量!
  • 共享内存

管道 还有一些固有的限制 ,比如下面这几项:

  • 管道与 FIFO 中传输的是比特流,没有消息边界的概念,很难实现这样一类需求——有多个读取进程,每个进程每次只从管道中读取自定义长度的数据;
  • 管道与 FIFO 中数据读出的顺序与数据写入的顺序严格一致,没有优先级的概念,必须要通过锁或者睡眠队列来保证顺序;
  • 管道和 FIFO 使用的都是内核存储空间,允许滞留在管道中的数据容量有限。

而消息队列则解决这个问题。

消息队列就是一个消息的双向链表,每个消息队列都有一个队列头,用结构struct msg_queue来描述。队列头中包含了该队列的大量信息,包括消息队列的键值、用户ID、组ID、消息数目、读写进程ID等。

消息队列在如下两个方面上比管道有所增强:

  • 消息队列中的数据是有边界的,发送端和接收端能以消息为单位进行交流,而不再是无分隔的字节流,这大大降低了某些应用的逻辑复杂度;
  • 每条消息都包括一个整形的类型标识,接收端可以读取特定类型的消息,而不需要严格按消息写入的顺序读取,这样可使消息优先级的实现非常简单,而且每个进程可以非常方便地只读取自己感兴趣的消息。
  • 我们可以从以上的图看到,消息队列在内核生成,但是我们的msg是自己定义的并传入内核,这种间接就有很多内核编写空间来规避管道的问题。?到底怎么规避
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
key_t ftok(char *pathname, char proj);
int msget(key_t key, int flag);
int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflag);
int msgrcv(int msqid, void *msgp, size_t maxmsgsz, long msgtp, int msgflag);
int msgctl(int msqid, int cmd, struct msqid_ds *buf);//



参数:

* msqid 是由 msgget() 生成的消息队列 ID;
* msgp 指向用户定义的消息体,第一个字段需要是 int msgtype,0代表第一个消息,正代表符合type第一条,负代表小于type绝对值的第一个,后续的其他字段可以自由定义;
简单情况:sender可以直接发送消息,receiver可以指定一个buf数组即可。
消息体:
struct _msg
{
long mtype;
char buf[256];
}msg1,msg2;//这个是可以自定义的,内核根据情况给定的指针进行修改即可
* msgsz 指定要发送的消息体的数据长度;
* msgflag 指定发送动作的行为参数,目前只有一个可选参数 IPC_NOWAIT,表示当内核中消息队列已满时不挂起发送进程,而是立即返回一个 EAGAIN 错误。
* cmd:IPC_STAT:用于获取消息队列信息,返回的信息存贮在参数buf中
IPC_SET:用于设置消息队列的属性,要设置的属性存储在参数buf中
PC_RMID:删除msqid标识的消息队列

* 消息读取函数中的 msgtp 字段指定了要读取的消息类型,可以有多种消息过滤的方法:
传入正值表示只取指定类型的消息;
传入 0 值表示不区分消息类型,按照先入先出的顺序依次读取;
传入负值表示按照优先级从高到低依次读取消息类型值不大于给定值的绝对值的消息。

使用场景代码

信号机制

通常我们的睡眠唤醒就是靠信号,中断向量也可以认为是内核信号。因此我们的硬件中断和系统调用都算是一种信号机制。那么信号从发送到接受处理是异步的。

实际上就是通过系统调用来给用户中断处理的权利:中断进入内核后返回用户态处理后再返回用户原本代码。

因此有以下三类处理

  • 自定义模拟中断:upcall
  • 屏蔽:啥也不干
  • 让系统处理信号:大部分是终止程序

我们来看看系统处理:这里认为内核还能收到多种中断信号,不是xv6(jos)那样的大内核锁,所以变复杂了。

0-31信号采用bitmap来解决,1代表还未处理。如果多次发送,bitmap处理是幂等的:如果正在处理,内核中断就忽略这个信号,继续切换回之前的信号处理,因此没法保证都会处理到。后续的使用队列,因此如果在处理,会有一个队列长度的等待。

存在以下信号机制处理问题:

  • 要重入的系统调用信号:比如malloc这样中断之后不会继续运行剩余的malloc

  • 小心全局数据:信号被另一个信号打断,全局就会改变

  • 小心全局errno:会被打断时中断覆盖errno后返回不是我要的errno

  • 保证全局数据的共享操作原子性:因为如果不屏蔽中断,mutex一定会失效,xchg也不太行:volatile(防止编译器优化) 和 保证不会中断的sig_atomic_t

    1
    volatile sig_atomic_t variable;
  • 小心++ 和 –

  • 进程内核异常栈爆了:自定义一个预留栈

    1
    2
    3
    4
    5
    6
    7
    8
    int sigaltstack(const stack_t * sigstack, stack_t * old_sigstack);

    typedef struct {
    void * ss_sp; //备选栈的起始地址
    int ss_flags; //备选栈操作标志:SS_ONSTACK 启用 SS_DISABLE禁用
    int ss_size; //备选栈空间大小
    }stack_t;
    }

接下来是一个对于信号被打断的一个简单处理

实际代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
static void block_readpipe(int pipefd, void *buffer, int sz) {
for (;;) {
int n = read(pipefd, buffer, sz);
if (n<0) {
if (errno == EINTR)
continue;
fprintf(stderr, "socket-server : read pipe error %s.\n",strerror(errno));
return;
}
// must atomic read from a pipe
assert(n == sz);
return;
}
}

上面的代码段出自云风的开源游戏服务器 Skynet。在这段代码中,在从管道里面读取指定字节的数据时,对 EINTR 错误做了特殊的处理,即:

  • 当发现 read 系统调用返回了错误,而错误代码是 EINTR(被打断)的时候,当做什么也没发生,继续读取后面的数据;
  • 其他类型的错误才会被作为管道出错的情况来处理。

其原因,就是因为在管道上的 read 系统调用时会阻塞的,如果当进程阻塞在 read
系统调用上时收到了信号,那么在执行完对应的信号处理程序之后,在再次返回主程序时,read 系统调用会返回错误码告知进程自己被打断过。

如果在 Socket 上执行阻塞的读取操作时,如果给 Socket
设置了超时时间属性,那么当阻塞时间超过设置的超时时间时,也会有一个信号出来打断阻塞的系统调用,这时,错误码会被设置为 EAGAIN 或者
EWOULDBLOCK,表示这次读取超时了,到此为止了,想要更多的数据就再试一次吧。

在 Linux 的标准中,规定在这种情况下,把 errno 设置为 EAGAIN 和 EWOULDBLOCK
都是可以的,所以,要想让你写的程序兼容性更好,总是应该把这两个错误码一起检查。

进程优先级

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#include <sched.h>
int
sched_setscheduler(pid_t pid, int policy, const struct sched_param * param);//设置实时进程的调度策略和优先级(sched_param)
int
sched_setparam(pid_t pid, const struct sched_param * param);//设置实时优先级
int
sched_yield(void);//主动放弃进行调度
int
sched_setaffinity(pid_t pid, size_t len, cpu_set_t * set);//设置进程的CPU亲和力
int
setpriority(int which, id_t who, int prio);//进程组设置优先级?
int
getpriority(int which, id_t who);//进程组得到优先级?

参数:
* pid 指定目标进程 ID ,如果为 0,表示修改调用进程。
* policy 参数指定进程采用的调度策略** ,如果决定采用实时调度策略,则可以取值 SCHED_RR、SCHED_FIFO;如果希望采用非实时的普通调度策略,则也可设置为 SCHED_OTHER(普通时间片轮转调度)、SCHED_BATCH(适用于批处理的进程)、SCHED_IDLE(比最大的 Nice 值的运行优先级还要低的后台进程)。
* param 定义为struct sched_param {
int sched_priority;
};
* cpu_set_t 是一个位掩码结构,能够指定目标进程可以运行的 CPU;在 CPU 位掩码结构中,对 CPU 的编号是从 0 开始的,比如一个 16 核的 CPU,其 CPU 核心的编号分别为 0 — 15。
* len 参数指定使用的 CPU 位掩码数据结构的字节数。
* cpu_set_t的宏
void CPU_ZERO(cpu_set_t *set); //把 CPU 位掩码 set 初始化为 0
void CPU_SET(int cpu, cpu_set_t *set); //将指定编号的 CPU 添加到位掩码 set 中
void CPU_CLR(int cpu, cpu_set_t *set); //将指定编号的 CPU 从位掩码 set 中删除
void CPU_ISSET(int cpu, cpu_set_t *set); //检查指定编号的 CPU 在位掩码 set 中是否存在

1.提高时间片:linux0.11分配方法

1
(*p)->counter = ((*p)->counter >> 1) + (*p)->priority

即每次调度时找不到就绪态时,对所有进程时间片减半后加优先级,对于睡眠队列(counter停滞)的进程友好。

我们现在的linux跟priority和nice值有关,值越小说明时间counter越多。对于优先级为数字的进程来说,PR 与 NI 的值总是对应的,PR 的取值范围为 0 — 39,而 NI 的取值范围为 -20 — 19,PR 的值总是等于
20 + NI。

Nice 在英文中是“和善”的意思,这里进程的 Nice 值,也表示进程的和善程度,Nice 值越高就越和善,越和善的进程,越不会与别的进程争抢 CPU
资源。

1
2
int setpriority(int which, id_t who, int prio);
int getpriority(int which, id_t who);

其中,which 和 who 参数联合指定要操作的进程或进程组(详情请参考帮助手册),setpriority() 中的参数 prio 和
getpriority() 的返回值都是操作目标的 Nice 值。

因为 Nice 值可以是负数,所以, 使用 getpriority() 系统调用之前需要先把 errno 显式设置为 0 ,如果返回值为负数,再联合
errno 区分是遇到错误,还是目标的 Nice 值本身就是负值。

2.调整优先级的限制

特权进程能够自由地修改任意进程的优先级;而非特权进程只能修改自身。我们查看:

1
2
3
$	ulimit -a
scheduling priority (-e) 0//0代表0的改动值
//我的不行?

我们如果找到nice==-20,那么代表内核工作队列的线程,获得CPU一定是最容易的。

Linux 中有两种比较常用的实时调度策略,分别是:

  • SCHED_RR(Round-Robin 循环)为每个进程都分配一个固定长度的时间片,如果有优先级相同的进程,它们会以循环时间分享的形式交替执行,一个运行的进程使用完时间片或者自己主动放弃 CPU 时,会被放置到同优先级队列的队尾。
  • SCHED_FIFO(先入先出)没有为进程分配时间片,一个进程一旦获得运行权,就会一直运行,直到终止,或者自己主动放弃 CPU。

3.CPU亲和力的调整

通过精细地控制每个线程的 CPU
亲和力,可以在以下几个方面获得性能提升

  • 如果一个进程内有多个线程需要经常访问同样的数据,那么把它们绑定在同一个 CPU 上能带来性能的提升。因为如果它们在不同的 CPU 上运行,就需要经常执行这样的操作:把 CPU 高速缓冲器中的数据更新到内存,并在另一个 CPU 上从内存重新加载到高速缓冲器。
  • 如果有两个线程都是 CPU 计算密集的,而且都需要经常使用不同的数据,那把它们分别绑定在不同的 CPU 核心上,也会因为能够重复使用高速缓冲器内的数据而带来整体性能的提升。
  • 如果系统中存在关键功能的进程,专门给它们预留一个或几个 CPU,而把其他的所有进程都限制在别的 CPU 上,能提高系统在各种极端状况下的可用性。

(多)线程

!gdb调试时候添加-pthread来编译

线程共享了包括代码段、初始化数据段、未初始化数据段、堆内存段及动态链接内存段等,但是一般找不到其他线程(包括主线程stack)的stack

创建线程

pthread_create():比较符合概念模型的线程,但是必须自己实现调度算法。

clone() 是创建线程的底层系统调用,同时也为进程创建提供底层支持。其函数原型为:

int clone(int (*func)(void *), void * child_stack, int flags, void * func_args, ...)

其中,参数 flags 可以指定进程复制时与父进程共享的资源。在线程创建函数中,传递的 flags 如下所示:

CLONE_VM | CLONE_FILES | CLONE_FS | CLONE_SIGHAND | CLONE_THREAD | CLONE_SETTLS | CLONE_PARENT_SETTID | CLONE_CHILD_CLEARTID | CLONE_SYSVSEM

也就是说,新建的实体与父进程共享同一份虚拟内存页、同一个打开的文件描述符表、文件系统信息、信号处理函数表等,且新创建的实体会被放在创建者的线程组内(CLONE_THREAD)。相比之下,用
fork() 创建进程时的 flags 值仅包含 SIGCHILD 。这也是 线程和进程的本质区别

  • mutex
  • semaphore
  • spinlock
  • 读写锁

互斥锁

初始化

1
2
pthread_mutex_t lock;
pthread_mutex_init(&lock,NULL);//0即未上锁
  • pthread_mutex_lock(&lock) 负责在进入临界区之前对临界区加锁;
  • pthread_mutex_unlock(&lock) 负责在执行完临界区处理时给临界区解锁。

小心使用,由于互斥锁功能有限,会出现死锁:循环等待依赖。

int pthread_mutex_trylock(pthread_mutex_t *mutex);
int pthread_mutex_timedlock(pthread_mutex_t *restrict mutex, const struct timespec *restrict abs_timeout);

它们可以在锁定失败后立即返回,或在一段超时时间后返回,应用可以处理这种错误情况,而避免陷入无限的死锁中。

但是这里使用麻烦在于对于代码和文件需要在使用之前进行加锁,而文件我们使用包装的互斥锁,通常对于文件读写(包括管道)非常好用

1
2
3
int lockf(int fd, int cmd, off_t len);
//cmd:0 开锁,1 上锁
len通常是0,偏移闭区间,可以使用strlen来计算

信号量//暂时粗体

基本用法

无名信号量在进程空间里产生而不是内核,因此只能在线程之间使用(同一个进程内共享变量定位),而有名信号量在内核进行申请可以通过名字定位,因此可以用于进程之间IPC。这点和管道不一样,你可以通过头文件来确认。

同时信号量通过spinlock来保证原子性

​ 无名信号量

1
2
3
4
5
6
7
8
9
10
#include<semaphore.h>
//init
sem_t sem;
sem_init (&sem,0,10);//0代表线程,非0代表进程。10 is counter

//op
sem_wait(&sem);//-1
sem_post(&sem);//+1
sem_destroy(&sem);
//特别注意的是,我们在wait之后信号量小于0就会卡住,因此可以作为顺序

​ 有名信号量

1
2
3
4
5
#include <sys/sem.h>//通常也要加个无名的,因为?
sem_t *sem = sem_open("name_sem1",O_CREAT,0666,0);

//0666 表示 所有用户可读写,0代表初始值,如果存在这个有名,那么后两个参数会被忽略
//其他操作即沿用无名的

例如,管道通信中,如果父进程使用 fork()创建两个子进程1和2,子进程1,2按顺序向管道写一段文字,最后父进程从管道将子进程写入的内容读出来,要保证进程执行的先后顺序(信号量使用了队列?),可以用有名信号量来解决。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
#include<unistd.h>
#include<signal.h>
#include<stdio.h>
#include<string.h>
#include<stdlib.h>
#include<sys/wait.h>
#include<semaphore.h>
#include<sys/sem.h>
#include<sys/stat.h>

#include<fcntl.h>

int main(){
//init
int pid1,pid2;
sem_t *resource1; //注意是指针
sem_t *resource2;
int Cpid1,Cpid2=-1;
int fd[2];//0为读出段,1为写入端
char outpipe1[100],inpipe[200],outpipe2[100];
pipe(fd);//建立一个无名管道

pid1 = fork();

if(pid1<0){
printf("error in the first fork!");
}else if(pid1==0){//子进程1
//op in child1
resource1=sem_open("name_sem1",O_CREAT,0666,0);/0666 表示 所有用户可读写
Cpid1 = getpid();
close(fd[0]);//关掉读出端
lockf(fd[1],1,0);//上锁,则锁定从当前偏移量到文件结尾的区域
sprintf(outpipe1,"Child process 1 is sending a message!");
write(fd[1],outpipe1,strlen(outpipe2));//write from fd[1] to outpipe1,count=strlen()
lockf(fd[1],0,0);//解锁
sem_post(resource1);
sem_close(resource1);
exit(0);
}else{
pid2 = fork();
if(pid2<0){
printf("error in the second fork!\n");
}else if(pid2==0){
//op in child2
resource1=sem_open("name_sem1",O_CREAT,0666,0);
resource2=sem_open("name_sem2",O_CREAT,0666,0);
Cpid2 = getpid();
sem_wait(resource1);//child2等待
close(fd[0]);
lockf(fd[1],1,0);
sprintf(outpipe2,"Child process 2 is sending a message!");

write(fd[1],outpipe2,strlen(outpipe2));
lockf(fd[1],0,0);//解锁
sem_post(resource2);
sem_close(resource1);
sem_close(resource2);
exit(0);
}
//op in parent
if(pid1 > 0 && pid2 >0){
resource2=sem_open("name_sem2",O_CREAT,0666,0);
sem_wait(resource2);
waitpid(pid1,NULL,0);
waitpid(pid2,NULL,0);
close(fd[1]);//关掉写端
read(fd[0],inpipe,200);
printf("%s\n",inpipe);
sem_close(resource2);

exit(0);
}
sem_unlink("name_sem1");
sem_unlink("name_sem2");
}
return 0;
}

//对于child1和child2,进行sem(resource1 针对child2,resource2 针对 parent)和文件加锁,保证了child1,child2,parent的读写顺序,其实就是在多进程状态下使用了01sem的mutex来保证顺序

消费者和生产者进程代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
#define   __LIBRARY__
#include <unistd.h>
#include <sys/types.h>
#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>

_syscall2(sem_t*,sem_open,const char *,name,unsigned int,value);
_syscall1(int,sem_wait,sem_t*,sem);
_syscall1(int,sem_post,sem_t*,sem);
_syscall1(int,sem_unlink,const char *,name);

#define NUMBER 520 /*打出数字总数*/
#define CHILD 5 /*消费者进程数*/
#define BUFSIZE 10 /*缓冲区大小*/

sem_t *empty, *full, *mutex;
int fno; /*文件描述符*/

int main()
{
int i,j,k;
int data;
pid_t p;
int buf_out = 0; /*读取缓冲区的索引*/
int buf_in = 0; /*写入缓冲区的索引*/
/*打开信号量*/
if((mutex = sem_open("carmutex",1)) == SEM_FAILED)
{
perror("sem_open() error!\n");
return -1;
}
if((empty = sem_open("carempty",10)) == SEM_FAILED)
{
perror("sem_open() error!\n");
return -1;
}
if((full = sem_open("carfull",0)) == SEM_FAILED)
{
perror("sem_open() error!\n");
return -1;
}
fno = open("buffer.dat",O_CREAT|O_RDWR|O_TRUNC,0666);
/* 将待读取位置存入buffer后,以便 子进程 之间通信 */
lseek(fno,10*sizeof(int),SEEK_SET);
write(fno,(char *)&buf_out,sizeof(int));
/*生产者进程*/
if((p=fork())==0)
{
for( i = 0 ; i < NUMBER; i++)
{
sem_wait(empty);
sem_wait(mutex);
/*写入一个字符*/
lseek(fno, buf_in*sizeof(int), SEEK_SET);
write(fno,(char *)&i,sizeof(int));
buf_in = ( buf_in + 1)% BUFSIZE;

sem_post(mutex);
sem_post(full);
}
return 0;
}else if(p < 0)
{
perror("Fail to fork!\n");
return -1;
}
/*消费者进程*/
for( j = 0; j < CHILD ; j++ )
{
if((p=fork())==0)
{
for( k = 0; k < NUMBER/CHILD; k++ )
{
sem_wait(full);
sem_wait(mutex);
/*获得读取索引*/
lseek(fno,10*sizeof(int),SEEK_SET);
read(fno,(char *)&buf_out,sizeof(int));
/*读取数据到data*/
lseek(fno,buf_out*sizeof(int),SEEK_SET);
read(fno,(char *)&data,sizeof(int));
/*更新读取索引*/
buf_out = (buf_out + 1) % BUFSIZE;
lseek(fno,10*sizeof(int),SEEK_SET);
write(fno,(char *)&buf_out,sizeof(int));

sem_post(mutex);
sem_post(empty);
/*消费资源*/
printf("%d: %d\n",getpid(),data);
fflush(stdout);
}
return 0;
}else if(p<0)
{
perror("Fail to fork!\n");
return -1;
}
}
wait(NULL);
/*释放信号量*/
sem_unlink("carfull");
sem_unlink("carempty");
sem_unlink("carmutex");
/*释放资源*/
close(fno);
return 0;
}

高级用法

1.信号量的其他系统调用

1
2
3
int semget(key_t key, int nsems, int semflg);      //创建一组以 key 为标识的信号量
int semctl(int semid, int semnum, int cmd, ...); //在指定的信号量上执行控制操作
int semop(int semid, struct sembuf *sops, unsigned nsops); //在指定信号量上执行 PV 操作

2.条件变量

  • 每个工作线程都配备自己的任务队列;
  • 任务分发线程根据当前每个工作线程的负载水平,把任务直接追加到负载最低的工作线程的任务队列里去;
  • 当工作线程发现自己的任务队列中已经没有要处理的任务时,自己进入睡眠状态;
  • 任务分发线程在给某个工作线程分配任务时,如果发现分配之前的任务数是 0,就唤醒该工作线程;
  • 工作线程一旦被唤醒,就持续处理自己的任务队列中的所有任务。

在这种工作模式中,线程的休眠和唤醒仍然可以使用信号量,此外,还可以使用实现在 ptrhead 库中的更轻量级的条件变量,相关的接口为:

1
2
3
int pthread_cond_wait(pthread_cond_t * cond, pthread_mutex_t *mutex);
int pthread_cond_signal(pthread_cond_t * cond);
int pthread_cond_broadcast(pthread_cond_t *cond);

其中 pthread_cond_wait() 接口会阻塞一个线程,直到有另外的线程在同一个条件变量 cond 上用
pthread_cond_signal() 或 pthread_cond_broadcast() 发出通知时再继续执行。

pthread_cond_signal 和 pthread_cond_broadcast 的区别在于:

  • pthread_cond_signal 只保证唤醒至少一个被阻塞的线程;
  • pthread_cond_broadcast 会唤醒所有阻塞在条件变量 cond上的线程。

当唤醒目标很明确时,使用 pthread_cond_signal 会更有效率,而且能避免出现惊群的问题 。在上面改进版的任务处理模式中,就应该使用
pthread_cond_signal 来唤醒指定的工作线程。

当唤醒目标无法精确地确定,而只需要保证多个工作线程中至少有一个被唤醒来工作时,应该使用 pthread_cond_broadcast
,但是这时候要注意,每个线程都需要能正确处理多余和虚假的唤醒动作。常见的处理方式是把 pthread_cond_wait 包含于一条 while 循环中:

while(pthread_cond_wait(&cond, &mtx));
// Process the task

市面上有些高性能的防火墙产品对网络数据包的处理就是采用这种工作模式,有些产品还会使用双缓冲技术,进一步减少任务分发线程与工作线程在操作同一个任务队列时发生锁争抢的概率。

spinlock

自旋锁的 优点是等待时间短 ,一旦满足条件就可以马上拿到锁继续执行,不需要经历线程的切换。

它的缺点则是会占用 CPU 资源,一直保持忙等的状态

也因为它忙等的特点,只有非常快速就能完成的临界区才适合用自旋锁保护,而且,临界区内一定不能有对同一个临界区的递归调用,否则会发生死锁。

pthread 库中自旋锁相关的接口为:

int pthread_spin_init(pthread_spinlock_t *lock, int pshared);  //初始化自旋锁
int pthread_spin_destroy(pthread_spinlock_t *lock);            //销毁自旋锁
int pthread_spin_lock(pthread_spinlock_t *lock);               //锁定自旋锁

读写锁

读写锁是这样一种锁,它由读锁和写锁两部分组成,读锁是共享的,而写锁是独占的。也就是说:

  • 多个读线程可以同时进入临界区,因为临界区内并没有共享数据的更新,所以所有线程都可以正确地工作;
  • 而当有线程试图获取写锁时,如果有读锁,写锁线程会被阻塞,直到所有的读锁都被释放,同时,新的读锁请求也会被阻塞,直到写锁被释放。

读写锁的行为有点像游戏服务器停服更新时的做法,发布停服通知后,新登录的用户会被告知服务器正在停服更新,同时服务区会等待一段时间,让当前正在玩的玩家能完成当前正在进行的游戏进程。等所有玩家都下线后,才真正开始执行服务器关闭,更新的操作。

对于更新频率很低的共享数据,使用读写锁代替互斥锁,能 明显提高只读取临界区数据的线程的并行化水平,提高系统处理效率 。但是选用读写锁时
一定要确保临界区数据确实满足上述访问模式 ,否则,其最终性能表现可能还不如互斥锁。

pthread 库中读写锁相关的接口定义为:

int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock, 
        const pthread_rwlockattr_t *restrict attr);         //初始化读写锁
int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);       //销毁读写锁
int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);        //加读锁
int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);        //加写锁

监听和连接/分离线程

等同于子进程一样,会被动等待杀死线程。但是不同于子进程由父进程杀死,这里的线程可以指定杀死其他线程。

int pthread_join(pthread_t thread, void **retval);

进程连接与线程连接 在以下几个方面存在一些区别 :

  • 任何线程都可以监听一个指定线程的退出,而不需要是创建该线程的线程;
  • 线程连接函数只能连接一个指定 ID 的线程,而不能像进程一样监听任意线程的退出;
  • 如果不想指定监听者,那么线程创建之后可以使用分离函数设置其不需要等待被连接,这种情况下,线程结束之后会被自动清理。

设置线程分离的函数为:

int pthread_detach(pthread_t thread);

处于分离状态的线程,无法被任何线程执行连接获取其状态,也无法再返回到可连接状态。

退出线程

我们还可以强制退出线程

比如,很多带 GUI 的应用都会对长时间运行的后台任务 设置一个取消按钮 ,还有一些服务器进程可能会 动态调整运行中的工作线程数量 。这些都
可以用线程取消函数来实现

int pthread_cancel(pthread_t thread);

同时,对退出线程状态和类型的掌控,可以进一步控制它们响应取消请求的处理过程。这两个标志可以通过下面两个函数来设置:

int pthread_setcancelstate(int state, int *oldstate);
int pthread_setcanceltype(int type, int *oldtype);

状态类型分为启用、关闭两种。关闭状态下的线程不可被取消,开启状态下的线程可进一步设置取消类型,在任一点取消,以及在预定的取消点取消。取消点是内核在一些函数实现中埋下的点,这些函数都是有可能造成进程阻塞或触发
IO 行为的函数,如 sleep()、wait()、fsync() 等。

合理控制线程的取消行为,是保证数据一致性、逻辑完整性不被破坏的必要手段 。更多细节可查看系统帮助手册了解。

线程独有数据

为了保证线程的安全性

创建线程特有数据 API 的函数为:

int pthread_key_create(pthread_key_t *key, void (*destructor)(void *));

该函数只在主线程或某个线程中执行一次,可以放在 pthread_once() 函数中执行。destructor
是一个析构函数,用来创建标识某个线程本地存储的 key。key
所对应的内存空间,需在每个线程中具体分享。这里的析构函数注册完成后就是用来在每个线程退出时释放各自内存空间的。

在线程中设置和获取 key 所对应内存空间的函数如下所示:

int pthread_setspecific(pthread_key_t key, const void *value);
void * pthread_getspecific(pthread_key_t key);

参数 value 可以是一个指向内存区域的指针,也可以是一个标量值,具体选用哪个由线程自己决定。在 Linux 中,最多可定义 1024 个线程特有数据的
key 值。如果考虑程序的可移植性,所定义的 key 数量不应超过 128 个。当确实需要更多线程特有数据时,可将多个值放置在一个结构中,从而减少 key
的数量。

套接字

基本函数

网络服务器最基本的部分

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//socket,bind,listen,accept
int
socket(int domain, int type, int protocol); //创建网络套接字
int
bind(int sockfd, const struct sockaddr *addr, socklen_t addrlen); //把套接字与一个具体的网络地址绑定
int
listen(int sockfd, int backlog); //设置指定的套接字为被动监听状态
int
accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen);//接受一个连接请求,并为之创建新的套接字

//read/write
ssize_t
read(int fd, void *buf, size_t count);
ssize_t
write(int fd, const void *buf, size_t count);

最简单的网络服务器:单线程,I/O阻塞

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
#include<stdio.h>
#include<stdlib.h>
#include<string.h>
#include<unistd.h>
#include<sys/socket.h>
#include<netinet/in.h>

#define BUFF_SIZE 1024
#define SVR_PORT 6677

//handle:业务处理逻辑
int doWork(int sockfd)
{
char buffer[BUFF_SIZE];
int n = read(sockfd, buffer, BUFF_SIZE); //读取对端输入
int resn;
char result[BUFF_SIZE];
//对请求执行处理,把处理结果放在 result,resn 保存结果数据长度
write(sockfd, result, resn); //处理结果发送给对端
}
//main
int main(void)
{

//socket,bind,listen
struct sockaddr_in server_addr, client_addr;
socklen_t clientaddr_len;

int listenfd, connfd;

listenfd = socket(AF_INET, SOCK_STREAM, 0); //创建 TCP 网络套接字

bzero(&server_addr, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
server_addr.sin_port = htons(SVR_PORT); //设置监听的本地地址和端口
//套接字与本地地址绑定
bind(listenfd, (struct sockaddr *)&server_addr, sizeof(server_addr));
listen(listenfd, 20);

//accept and handle events
while(1)
{
clientaddr_len = sizeof(client_addr);
connfd = accept(listenfd, (struct sockaddr *)&client_addr, &clientaddr_len); //接受连接请求,创建新的连接
doWork(connfd); //处理请求
close(sockfd); //处理完毕,关闭连接
}

return 0;
}

多进程模型:改成多进程的并发:一个进程使用一个套接字,I/O阻塞时,可以换进程进行处理

1
2
3
4
5
6
7
8
9
10
11
pid_t pid = 0;
while(1)
{
int connfd = accept(listenfd, ...);
if( (pid = fork()) == 0 )
{
close(listenfd);
dowork(connfd);
}
close(connfd);
}

更深入优化看I/O复用

connect()

send()

recv()

sendto()

recvfrom()

close()

shutdown()

setsockopt()

getsockopt()

getpeername()

gethostname()

socket选项

网络API

I/O复用

多进程的程切换花销也很大,如果使用多线程,实际经验是开销也挺大,所以应该再优化:使用进程池:复用这些进程来处理多个套接字下的accept(用户):

  • 轮询套接字
  • I/O复用:告诉内核需要监听的事件(套接字,也就是文件),当事件发生(文件接受到新的数据),告诉进程,从而让进程使用这个套接字。

select

1
2
3
4
5
6
7
int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);

//用于操作 fd_set 的宏定义
void FD_CLR(int fd, fd_set *set); //从 set 中清除 fd
int FD_ISSET(int fd, fd_set *set); //检查 set 中有没有指定的 fd
void FD_SET(int fd, fd_set *set); //在 set 中设置 fd
void FD_ZERO(fd_set *set); //清空 set

要使用 select 函数,首先需要自己准备一个最长为 FD_SETSIZE
的文件描述符数组,用于记录所有需要监听的文件描述符,同时要记录这些描述符中数值最大的一个,并把它 +1 传给 select
的第一个参数。初始状态下,当只有最早的监听套接字时,这个值就是监听套接字的文件描述符:

int client[FS_SETSIZE];
int maxfd = listenfd;
fd_set init_set, rec_set;
for(int i = 0; i < FS_SETSIZE; i++)
    client[i] = -1;   //用复数表示该位置空闲
FD_ZERO(&init_set);
FD_SET(listenfd, &init_set);

while(1)
{
    rec_set = init_set;
    int nevent = select(maxfd + 1, &rec_set, NULL, NULL, NULL);
    ...
}

然后,当 select 函数返回时,表示在传入的文件描述符集合中,至少有一个描述符上发生了需要处理的事件,所以,就需要找出是哪个,并依次处理:

if(FD_ISSET(listenfd, &rec_set))
{
    //表示服务器的监听套接字上发生了新的接入事件
    connfd = accept(listenfd, ...);

    //把新创建的描述符加入到监听集合,并记录在 client 数组
    FD_SET(connfd, &init_set);
    if(connfd > maxfd) maxfd = connfd;

    //找一个空位记录新的文件描述符
    for(int i = 0; i < FD_SETSIZE; i++){
        if(client[i] < 0) {
            client[i] = connfd;
            break;
        }
    }
}

//依次检查还有哪个文件描述符上发生了需要处理的事件
for(ini i = 0;  i < FD_SETSIZE; i++) {
    if(client[i] < 0) continue;   //跳过空位
    int sockfd = client[i];
    if(FD_SET(sockfd, &rec_set)) {
        doWork(sockfd);
    }
}

select 能同时监听的文件描述符集合,有最大 1024 的数量限制,要修改这个限制,需要修改宏定义 FD_SETSIZE,并重新编译内核。

有这样的限制,是因为在它被设计的那个年代,认为 1024 已经足够大了。Richard Stevens 教授的名著《Unix 网络编程 卷1:联网套接字
API》( Unix Network Programming, Volume 1: The Sockets Networking API
)中,也有原文说:

头文件 <sys/select.h> 中定义的 FD_SETSIZE 常值是数据类型 fd_set 中的描述符总数,其值通常是
1024,不过很少有程序用到那么多的描述符。

时代在发展,现在的情况已经变成了: 很少有服务器程序只能同时支持不超过 1024 个连接。

在现代,接任 select,担当 I/O 复用功能的函数是 poll,它破除了受到宏定义限制的最大连接数限制。其函数定义为:

struct pollfd {
               int   fd;         /* 要监听的文件描述符 */
               short events;     /* 感兴趣的监听事件 */
               short revents;    /* 实际发生的事件 */
           };

int poll(struct pollfd *fds, nfds_t nfds, int timeout); 

各参数释义如下:

  • 参数 fds 指定要监听的 pollfd 结构的数组;
  • 参数 nfds 说明了被监听的 pollfd 结构的数量;
  • timeout 则指定超时时间。

poll

poll 的使用方式与 select 很像,不过它把要监听的文件描述符数组与 fdset
合并在了一个结构中,所以更加方便直观。只要初始时把监听套接字放入监听数组,新连接建立之后再依次向后追加就可以了:

struct pollfd clients[MAX_LIMIT];    //MAX_LIMIT 是自定义的长度限制

clients[0].fd = listenfd;
clients[0].events = POLLRDNORM;   //监听新连接事件
for(int i = 1; i < MAX_LIMIT; i++) {
    clients[i].fd = -1;           //把其他的文件描述符标记为无效
}
int maxIdx = 1;

while(1) {
    int nevent = poll(clients, maxIdx, 0x8FFFFFFF);
    ...
}

然后,在收到任何事件通知,从 poll
调用返回后,需要依次检查每个事件。如果是发生在服务器监听套接字上的连接请求,就建立新的套接字,并添加到监听数组中;其他套接字上的事件,就执行消息读取,并依次处理:

if(clients[0].revent & POLLRDNORM) {
    //处理新连接请求
    int connfd = accept(listenfd, ...);
    //找个空闲的位置,保存新创建的套接字
    for(int i = 1; i < MAX_LIMIT; i++) {
        if(clients[i].fd < 0) {
            clients[i].fd = connfd;
            clients[i].events = POLLRDNORM;
            if(i > maxIdx) maxIdx = i;
            break;
        }
    }
}

for(int i = 1; i < maxIdx; i++) {
    if(clients[i].fd < 0) continue;

    if(clients[i].revents & POLLRDNORM) {
        doWork(clients[i].fd);
    }
}

上面的程序并没有处理客户端连接的关闭事件,因为本节课重点关注的是整个处理流程中影响性能的方面,所以,只关注正常业务的处理流程。

epoll

每当内核通知进程时候,返回文件描述符集合,因此进程对于select和poll会轮询套接字,因此我们需要直接让内核告诉进程哪个套接字(fd)需要处理。

int epoll_create(int size);
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

使用 epoll 编写的基本网络服务器程序的实例为:

#define MAX_EVENTS 50
struct epoll_event ev, events[MAX_EVENTS];
int listen_sock, conn_sock, nfds, epollfd;

epollfd = epoll_create(100);
if (epollfd == -1) {
    exit(EXIT_FAILURE);
}

ev.events = EPOLLIN;
ev.data.fd = listen_sock;
if (epoll_ctl(epollfd, EPOLL_CTL_ADD, listen_sock, &ev) == -1) {
    exit(EXIT_FAILURE);
}

for (;;) {
    nfds = epoll_wait(epollfd, events, MAX_EVENTS, -1);
    if (nfds == -1) {
        exit(EXIT_FAILURE);
    }

    for (n = 0; n < nfds; ++n) {
        if (events[n].data.fd == listen_sock) {
            conn_sock = accept(listen_sock, ...);
            if (conn_sock == -1) {
                exit(EXIT_FAILURE);
            }
            setnonblocking(conn_sock);
            ev.events = EPOLLIN | EPOLLET;
            ev.data.fd = conn_sock;
            if (epoll_ctl(epollfd, EPOLL_CTL_ADD, conn_sock,
                               &ev) == -1) {
                exit(EXIT_FAILURE);
            }
        } else {
            doWork(events[n].data.fd);
        }
    }
}

可以看到,epoll_wait() 的返回会指示发生了几个需要处理的事件,并且只有需要引用处理的事件才会被写入到 events
数组内。这就有效地避免了每次发生事件的时候,都要依次遍历所有监听的文件描述符,以确定是哪个上面发生了什么事件。

参考文献

[1].极客时间 - 宇文拓 - 攻克Linux系统编程

[2].帅地-面试攻略-linux

[3].哈工大-linux 0.11实验

Donate
  • Copyright: Copyright is owned by the author. For commercial reprints, please contact the author for authorization. For non-commercial reprints, please indicate the source.
  • Copyrights © 2020-2024 环烷烃
  • Visitors: | Views:

我很可爱,请我喝一瓶怡宝吧~

支付宝
微信