[Linux] 详解 Linux管道通信:匿名管道、命名管道的原理及使用等
进程通信
不同进程之间传输、交换数据
为什么要有进程通信
-
进程之间数据传输:
一个进程需要将自身的数据 发送到 另一个进程; 或者 一个进程 需要 另一个进程的数据
-
进程需要共享资源:
就像可能多个进程可以同时使用加载到内存中的动态库代码一样
-
通知事件
就比如, 在Linux中, 子进程运行终止 需要告诉父进程运行结果
-
进程控制另一个进程:
比如, 我们在使用程序调试代码的时候, 其实就是一个进程完全控制了另一个进程的执行, 让我们可以在进程运行时打上断点、执行下一个语句等
-
……
Linux进程通信方法
-
pipe 管道通信
管道通信相信许多人已经用过了, 在命令行中的标志就是:
|
:管道通信一般用于
本地进程
之间传输数据.比如在上面的例子中, 我们将ps 进程执行的数据通过管道传输给了 grep, 才能筛选出指定的内容
管道通信又分为:
匿名管道
和命名管道
管道通信是本片文章的主要内容
-
System V 进程通信
System V 是一套进程通信的标准, 可以为操作系统提供进程通信的接口, 非本篇文章的主要内容
-
POSIX 进程通信
POSIX 也是一套进程通信的标准, 这套标准可以为操作系统提供达成进程通信的接口, 也非本篇文章的主要内容
管道
进程之间是否可以通过操作系统中的资源进行通信
呢?不同进程需要先能够看到、能够获取同一份资源(文件、内存等)
这个资源 的种类, 其实就决定了进程通信的方式
什么是管道?
一个进程连接到另一个进程的数据流
. 就像我们生活中, 管道是输送资源的:石油、天然气等ps -ajx |grep
一个打开的文件
. 但是这个文件很特殊, 向这个文件中写入数据实际上并不会真正写入磁盘
中.files_struct
, 其中存储着 指向打开文件的数组fd_array
, 此数组的类型是 struct files*
.files结构体
中, 直接或间接描述了文件的所有属性, 以及 此文件的缓冲区相关信息:inode的此联合体, 可以表示三种文件:
- i_pipe, 管道文件
- i_dbev, 块设备(磁盘)文件
- i_cdev, 字符设备文件: 键盘等
向管道文件中写入数据实际上并不会写入到磁盘上, 而是只写入到文件的缓冲区中
** , 因为管道文件主要是用来进程间通信的, 如果先写入磁盘另一个进程再读取, 整个过程就太慢了这种不实际存储数据的行为特点, 其实也符合生活中管道的特点, 管道不能用来存储资源, 只能用来传输资源
管道是单向传输的
两个进程间使用管道通信时, 其中一个进程若以只写方式打开管道, 那么另一个进程就只能以只读方式打开文件
.管道的两端只能是不同的打开方式
- 匿名管道
- 命名管道
匿名管道
不会指定打开文件的文件名、文件路径等, 即不会有目标的打开文件
非明确目标的文件
, 也就意味着两个完全不相关的进程是无法一起访问这个管道的, 因为完全不相关的进程无法找到这个管道文件
.匿名管道其实只能用于具有血缘关系的进程间通信.
由父进程创建, 然后创建子进程继承父进程的管道, 然后再关闭管道的写入端或读取端
-
为什么父子进程要分别以只读和只写方式打开两次文件, 然后再创建子进程呢?
为什么不是父进程以一个方式打开, 子进程再以另一个方式打开呢?
因为
子进程会以继承父进程的方式打开同一个文件, 即子进程打开文件的方式与父进程是相同的
那这样的话, 父子进程通过想要通过管道实现进程通信, 子进程就需要先关闭已打开的文件, 再以某种方式打开同一个文件
这样比较麻烦, 如果在创建子进程之前, 父进程就已经以两种方式打开同一个文件, 那么再子进程创建之后, 只需要父进程关闭一个端口, 子进程关闭另一个端口就可以了
-
必须父进程关闭读取端, 子进程关闭写入端吗?
并不是的, 父子进程关闭哪个端口, 其实是
根据需求
关闭的.如果子进程要向父进程传输数据, 那么关闭读取端的就应该是子进程
-
进程是如何知道管道被打开了什么端口的?或者说
进程是如何知道管道被打开了几次的?
其实在file结构体中, 存在一个计数变量 f_count:
不过, 这个变量实际上还是一个结构体, 用于计数
匿名管道的创建与使用
创建管道成功, 则返回0, 否则返回-1
, 并设置errno输出型参数
执行成功之后, 参数数组内会存储两个元素
:pipe[0],
存储以 只读方式 打开管道时获得的fdpipe[1]
, 存储以 只写方式 打开管道时获得的fd
#include <iostream>
#include <unistd.h>
#include <cstring>
int main() {
// 父进程 pipe()系统调用, 打开管道
int pipeFd[2] = {0};
int ret = pipe(pipeFd);
if(ret != 0) {
std::cerr << "pipe error" << std::endl;
return 1;
}
// 创建子进程
// 并让 父进程 通过管道 向子进程 传输数据
pid_t id = fork();
if(id < 0) {
std::cerr << "fork error" << std::endl;
return 2;
}
else if(id == 0) {
// 子进程执行代码
// 子进程接收数据, 所以关闭只写端口 pipeFd[1]
close(pipeFd[1]);
char buffer[1024];
while (true)
{
memset(buffer, 0, 1024);
ssize_t s = read(pipeFd[0], buffer, sizeof(buffer)-1);
if(s > 0) {
// 读取成功
buffer[s] = '\0';
std::cout << buffer << std::endl;
}
else if(s == 0) {
// 读取结束
std::cout << "父进程写入结束, 子进程读取也结束!" << std::endl;
break;
}
else {
// 读取失败
}
}
}
else {
// 父进程执行代码
// 父进程发送数据, 所以关闭只读端口 pipeFd[0]
close(pipeFd[0]);
// 父进程每秒写入一句, 共5句
const char* msg = "你好子进程, 我是父进程, 我通过管道跟你通信, 此次发送编号:: ";
int cnt = 0;
while(cnt < 5) {
char sendBuffer[1024];
sprintf(sendBuffer, "%s %d", msg, cnt);
write(pipeFd[1], sendBuffer, strlen(sendBuffer));
sleep(1);
cnt++;
}
std::cout << "父进程写入完毕" << std::endl;
}
return 0;
}
-
父进程每1s, 写入一次数据
-
子进程死循环读取父进程写的数据
并不是子进程死循环读取父进程写入到管道的内容
在父进程没有向管道内写入数据时, 子进程在等待!父进程写入数据之后, 子进程才能read到管道内容, 子进程读取、打印数据是以父进程的节奏为主的
此顺序是:写入端必须先写入数据, 读取端才能够读取数据
当管道内部无数据时, 读取端的进程将会进入阻塞状态, 直到写入端写入数据
当管道内无数据时, 读取端进程会被放入到管道文件的等待队列中等待文件资源
当管道内数据被写满时, 写入端的进程将会进入阻塞状态, 直到读取端读取数据
pipe文件中, 存在等待队列:
当父进程存在一定的写入间隔时, 子进程读取管道数据也会根据父进程的写入间隔进行读取
当父进程将管道写满, 子进程还未读取时, 父进程不会再向管道中写入内容
pipe文件存在访问控制机制, 会将管道文件的读写顺序控制为:先写再读.
这其实也符合生活中的管道特点,
管道中传输的资源可以看作是一次性流通的
. 比如 向管道中倒入一瓶水, 这瓶水完全经过管道流通之后, 这瓶水就不在管道中了. 管道通信也是这样的, 当读取端读取过管道中存在的数据时, 就可以看作此数据已经流出管道了, 不能在被二次读取.
使用匿名管道控制进程
简单的例子就是, 可以通过匿名管道向进程派发任务, 以达到控制进程的目的
#include <iostream>
#include <unistd.h>
#include <ctime>
#include <cstring>
#include <string>
#include <vector>
#include <unordered_map>
#include <sys/wait.h>
#include <sys/types.h>
#include <cassert>
using std::cout;
using std::endl;
using std::cerr;
using std::vector;
using std::string;
using std::unordered_map;
typedef void (*functor)(); // typedef 函数指针为 functor
vector<functor> functors; // 创建函数指针数组, 用来存储函数指针
unordered_map<uint32_t, string> info; // 用来存储 functors 对应元素存储的任务的信息
// 只用函数举例, 不实现具体功能
void f1() {
cout << "这是一个处理日志的任务, 执行的进程 ID [" << getpid() << "]"
<< "执行时间是[" << time(nullptr) << "]\n" << endl;
//
}
void f2() {
cout << "这是一个备份数据任务, 执行的进程 ID [" << getpid() << "]"
<< "执行时间是[" << time(nullptr) << "]\n" << endl;
}
void f3() {
cout << "这是一个处理网络连接的任务, 执行的进程 ID [" << getpid() << "]"
<< "执行时间是[" << time(nullptr) << "]\n" << endl;
}
void loadFunctor() {
info.insert({functors.size(), "处理日志"});
functors.push_back(f1);
info.insert({functors.size(), "备份数据"});
functors.push_back(f2);
info.insert({functors.size(), "处理网络连接"});
functors.push_back(f3);
}
int main() {
// 0. 加载任务列表
loadFunctor(); // 加载任务到数组中, 即 加载任务列表
// 1. 创建管道
int pipeFd[2] = {0};
if(pipe(pipeFd) != 0) {
cerr << "pipe error" << endl;
}
// 2. 创建子进程
pid_t id = fork();
if(id < 0) {
cerr << "fork error" << endl;
}
else if(id == 0) {
// 子进程执行代码
// 关闭 写入端
close(pipeFd[1]);
while(true) {
// 与写入端写入的数据相同的数据类型
uint32_t operatorType = 0;
ssize_t ret = read(pipeFd[0], &operatorType, sizeof(uint32_t));
if(ret == 0) {
cout << "父进程任务派完了, 我要走了……" << endl;
break;
}
// 这里的read返回值 ret, 大小应该是sizeof(uint32_t), 可以断言判断一下
assert(ret == sizeof(uint32_t));
(void)ret;
// 这里将ret强转为void类型, 是为了解决release编译模式中, 有可能因为ret没被使用而出现的warning
// assert() 只在debug编译模式中有效, 使用release模式编译的话, assert()就没有了
// 所以会出现 ret没被使用的情况
if(operatorType < functors.size()) {
// 如果从管道中接收的数据, 在functors(任务列表)的范围内, 则执行任务
functors[operatorType]();
}
else {
// 否则, 就可能出 bug 了
cout << "BUG ? operatorType:: " << operatorType << endl;
}
}
// 执行任务完成, 关闭读取端
close(pipeFd[0]);
exit(0);
}
else {
// 父进程执行代码
// 随机向子进程分配任务, 则需要先设定一个 srand
srand((long long)time(nullptr)); // 用时间设定
close(pipeFd[0]);
int num = functors.size();
int cnt = 1;
while (cnt <= 10) // 随机派发 10 次任务
{
uint32_t commandCode = rand() % num; // 随机生成 派发的任务编号
cout << "父进程已派发任务:: " << info[commandCode] << ", 第 " << cnt << " 次派发" << endl;
cnt++;
write(pipeFd[1], &commandCode, sizeof(uint32_t)); // 向管道中写入任务编号
sleep(1);
}
// 派发完成之后, 关闭写入端, 并回收子进程
close(pipeFd[1]);
pid_t result = waitpid(id, nullptr, 0);
if(result) {
cout << "waitpid success" << endl;
}
}
return 0;
}
任务列表和任务信息部分:
这部分的代码
首先, 定义了一个任务列表:
vector<functor> functors
和 用来存放任务信息的哈希表:unordered_map<uint32_t, string> info
functors 用来存储函数指针, 其下标即为对应任务的任务号
info 用来存储任务信息, pair的first存储任务号, second存储任务信息
然后, 写了三个任务函数, 没有具体功能
最后, 写了一个 将任务加载到functors任务列表、将任务信息加载到info的函数.
然后就是main函数:
main函数的内部其实可以分为4大块:
加载任务列表, 即
执行 loadFunctor()函数
, 以保证任务列表和任务信息的正常使用创建匿名管道
创建子进程, 并编写子进程执行的代码
子进程需要执行的无非是, 读取管道信息, 并由读取到的信息判断、执行派发的任务
编写父进程需要执行的代码
而父进程需要执行的就是, 向管道中写入数据, 达到向子进程随机派发任务的目的
再然后应该去写, 父进程需要执行的代码:
父进程的功能是 向子进程随机派发任务列表中的任务, 也就需要取随机值, 先使用srand初始化随机数发射
然后
关闭读取端
然后派发任务:
派发任务需要
随机获取任务在functors中的编号
, 所以uint32_t commandCode = rand() % num
,commandCode
就是随机获取的任务编号然后
向管道中以uint32_t为单位, 写入一个任务编号
, 就可以了派发任务循环10次
子进程需要执行的代码:
子进程的需要实现:可以从管道中接收父进程写入的任务编号
而父进程写入任务编号的类型是 uint32_t, 所以
子进程读取时也需要以此类型读取
所以 子进程先关闭写入端
然后, 定义一个 uint32_t 类型的变量(operatorType)用于从管道中读取任务编号
然后,
ssize_t ret = read(pipeFd[0], &operatorType, sizeof(uint32_t))
read: 会返回读取到的数据的字节数, 为0时, 表示写入端已不再写入数据
判断一下read的返回值, 这里判断 为0是派发任务结束 之后
直接用assert断言, ret的大小是sizeof(uint32_t) 类型的, 不然就是读取错数据类型了
(void)ret
: 此语句的作用是 为了解决release编译模式中, 有可能因为ret没被使用而出现的waring assert() 只在debug编译模式中有效, 使用release模式编译的话, assert()就没有了 所以release模式编译会出现 ret没被使用的情况然后判断 读取到的数据是否在functors任务列表的范围内, 如果在则执行对应位置任务
否则就是bug, 需要告知用户
使用匿名管道 控制多个进程
多个匿名管道
需要让父进程知道, 对应的子进程所对应的管道的写端
#include <iostream>
#include <unistd.h>
#include <ctime>
#include <cstring>
#include <string>
#include <vector>
#include <unordered_map>
#include <sys/wait.h>
#include <sys/types.h>
#include <cassert>
using std::cout;
using std::endl;
using std::cerr;
using std::vector;
using std::string;
using std::unordered_map;
using std::pair;
typedef void (*functor)(); // typedef 函数指针为 functor
vector<functor> functors; // 创建函数指针数组, 用来存储函数指针
unordered_map<uint32_t, string> info; // 用来存储 functors 对应元素存储的任务的信息
typedef pair<pid_t, int> elem; // elem用来存储 子进程pid 以及对应管道的写入端fd
// first 存储子进程pid, second 存储对应管道写端fd
// 只用函数举例, 不实现具体功能
void f1() {
cout << "这是一个处理日志的任务, 执行的进程 ID [" << getpid() << "]"
<< "执行时间是[" << time(nullptr) << "]\n" << endl;
//
}
void f2() {
cout << "这是一个备份数据任务, 执行的进程 ID [" << getpid() << "]"
<< "执行时间是[" << time(nullptr) << "]\n" << endl;
}
void f3() {
cout << "这是一个处理网络连接的任务, 执行的进程 ID [" << getpid() << "]"
<< "执行时间是[" << time(nullptr) << "]\n" << endl;
}
void loadFunctor() {
info.insert({functors.size(), "处理日志"});
functors.push_back(f1);
info.insert({functors.size(), "备份数据"});
functors.push_back(f2);
info.insert({functors.size(), "处理网络连接"});
functors.push_back(f3);
}
void childProcWork(int readFd) {
sleep(1);
cout << "进程 [" << getpid() << "] 开始工作" << endl;
while (true) {
uint32_t operatorType = 0;
ssize_t ret = read(readFd, &operatorType, sizeof(uint32_t));
if(ret == 0) {
cout << "父进程任务派完了, 我要走了……" << endl;
break;
}
assert(ret == sizeof(uint32_t));
(void)ret;
if (operatorType < functors.size()) {
functors[operatorType]();
}
else {
cout << "BUG ? operatorType:: " << operatorType << endl;
}
}
cout << "进程 [" << getpid() << "] 结束工作" << endl;
}
void blanceAssignWork(const vector<elem> &processFds) {
srand((long long)time(nullptr)); // 设置随机数种子
// 随机对子进程 随机分配任务 num 次
int cnt = 0;
int num = 15;
while (cnt < num) {
sleep(1);
// 随机选择子进程
uint32_t pickProc = rand() % processFds.size();
// 随机选择任务
uint32_t pickWork = rand() % functors.size();
write(processFds[pickProc].second, &pickWork, sizeof(uint32_t));
cout << "父进程给进程: " << processFds[pickProc].first << " 派发任务->" << info[pickWork] <<
", 对应管道写端fd: " << pickProc << ", 第 " << cnt << " 次派发" << endl;
cnt--;
}
}
int main() {
// 0. 加载任务列表
loadFunctor();
// 循环创建5个子进程以及对应的管道
vector<elem> assignMap; // 子进程pid与对应管道的fd记录
int processNum = 5;
for(int i = 0; i < processNum; i++) {
int pipeFd[2] = {0};
if(pipe(pipeFd) != 0) {
cerr << "第 " << i << " 次, pipe 错误" << endl;
}
pid_t id = fork();
if(id == 0) {
// 子进程执行代码
close(pipeFd[1]);
childProcWork(pipeFd[0]); // 子进程功能具体函数
close(pipeFd[0]);
exit(0);
}
// 因为在if(id == 0) 的最后, 执行了 exit(0); 所以子进程不会跳出 if(id == 0) 的内部
// 所以下面都为父进程执行的代码
// 父进程执行代码
close(pipeFd[0]);
assignMap.push_back(elem(id, pipeFd[1]));
// elem(id, pipeFd[1]) 创建pair<uint32_t, uint32_t> 匿名对象, 存储 此次创建子进程pid 和 打开管道的写端fd
// 并存入 vector 中
}
cout << "创建子进程完毕" << endl;
cout << "父进程, 开始随机给子进程 随机派发任务\n" << endl;
sleep(1);
blanceAssignWork(assignMap); // 父进程派发任务函数
// 回收所有子进程
for(int i = 0; i < processNum; i++)
close(assignMap[i].second);
for(int i = 0; i < processNum; i++) {
if(waitpid(assignMap[i].first, nullptr, 0)) {
cout << "等待子进程_pid: " << assignMap[i].first << ", 等待成功. Number: " << i << endl;
}
}
return 0;
}
父进程向管道中写入的过程 和 子进程从管道中读取的过程 是没有变化的
在父进程中记录子进程的pid以及对应的管道写入端fd
匿名管道特点总结
兄弟进程知道其他管道的写入端fd, 就可以实现兄弟进程间的通信
.|
, 就是兄弟进程间的通信, 也就是说,| 其实就是命令行上的匿名管道
- 匿名管道 只能用于 具有血缘关系的进程之间的通信: 父子、兄弟
- 匿名管道 只能单向通信, 是根据管道的特点专门设计成这样的. 是半双工通信的特殊情况
- 匿名管道 自带同步机制(pipe满, 则writer阻塞; pipe空, 则reader阻塞), 即自带访问控制机制
- 匿名管道 是面向字节流的
- 匿名管道 的生命周期 取决于什么时候彻底关闭管道文件(即pipe文件的打开计数为0)
命名管道
命名管道
命名管道对用户来说, 是可见的, 也就是说在进程内是可以指定路径打开的
, 这也是 命名管道可以实现 毫不相干的进程之间通信的原因命名管道的创建
-
命令行创建
命名管道可以在命令行使用命令创建
mkfifo
:管道都是先进先出的, 但是命名管道是可见的
使用mkfifo 可以创建命名管道文件:
-
系统调用创建
Linux除了给了mkfifo命令, 还给了mkfifo()系统调用:
mkfifo(const char *pathname, mode_t mode)
有两个参数, 第一个参数肯定不用解释了, 是创建**文件的路径及文件名
**第二个参数 mode 是什么?
mode其实是创建文件的权限, 以这种格式传参
0000
命名管道的使用
common.h:
// 进程1 和 进程2 都需要包含的头文件
#pragma once
#include <iostream>
#include <cstdio>
#include <cstring>
#include <cerrno>
#include <sys/wait.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#define IPC_PATH "./.fifo" // 命名文件路径
clientFifo.cpp:
// 命名管道客户端, 即写入端, 不参与命名管道的创建
#include "common.h"
using std::cout;
using std::endl;
using std::cerr;
int main() {
int pipeFd = open(IPC_PATH, O_WRONLY); // 只写打开命名管道, 不参与创建
if(pipeFd < 0) {
cerr << "open fifo error" << endl;
return 1;
}
char line[1024]; // 用于接收命令行的信息
while (true) {
printf("请输入消息 $ ");
fflush(stdout); // printf没有刷新stdout, 所以手动刷新
memset(line, 0, sizeof(line));
if(fgets(line, sizeof(line), stdin) != nullptr) {
// 由于fgets 会接收 回车, 所以将 line的最后一位有效字符设置为 '\0'
line[strlen(line) - 1] = '\0';
// 向命名管道写入信息
write(pipeFd, line, strlen(line));
}
else {
break;
}
}
close(pipeFd);
cout << "客户端(写入端)推出啦" << endl;
return 0;
}
serverFifo.cpp:
// 命名管道服务端, 即读取端, 参与命名管道文件的创建
#include "common.h"
using std::cout;
using std::endl;
using std::cerr;
int main() {
umask(0);
if(mkfifo(IPC_PATH, 0666) != 0) {
cerr << "mkfifo error" << endl;
return 1;
}
int pipeFd = open(IPC_PATH, O_RDONLY);
if(pipeFd < 0) {
cerr << "open error" << endl;
return 2;
}
cout << "命名管道文件, 已创建, 已打开" << endl;
char buffer[1024];
while (true) {
ssize_t ret = read(pipeFd, buffer, sizeof(buffer)-1);
if (ret == 0) {
cout << "\n客户端(写入端)退出了, 我也退出吧";
break;
}
else if (ret > 0) {
cout << "客户端 -> 服务器 # " << buffer << endl;
}
else {
cout << "read error: " << strerror(errno) << endl;
break;
}
}
close(pipeFd);
cout << "\n服务端退出……" << endl;
unlink(IPC_PATH);
return 0;
}
makefile:
.PHONY:all
all:clientFifo serverFifo
clientFifo:clientFifo.cpp
g++ $^ -o $@
serverFifo:serverFifo.cpp
g++ $^ -o $@
.PHONY:clean
clean:
rm -f clientFifo serverFifo .fifo
作者: 哈米d1ch 发表日期:2023 年 4 月 2 日