文章目录
- 匿名管道
- pipe通信基本过程
- 父进程控制子进程
- 父进程控制多个子进程
- 管道特点总结
- 命名管道
进程是具有独立性的,进程间想要交互数据,成本会非常高
进程间通信的目的:
- 数据传输:一个进程需要将它的数据发送给另一个进程
- 资源共享:多个进程之间共享同样的资源
- 通知事件:一个进程需要向另一个或一组进程发送消息,通知它(它们)发生了某种事件(如进程终止时要通知父进程)。
- 进程控制:有些进程希望完全控制另一个进程的指向(如 Debug 进程),此时控制进程希望能够拦截另一个进程的所有陷入和异常,并能够及时知道它的状态改变。
匿名管道
管道是 Unix 中最古老的进程间通信的形式
我们把从一个进程连接到另一个进程的一个数据流称为一个管道,管道是单向的。
管道本质上也是一个文件,但是它和普通的文件不一样,普通的文件是为了向磁盘上读写数据,而管道是为了进程间通信,而且管道的操作属于内存级别。
下面简单讲述管道的原理,需要用到文件描述符的知识👉[Linux](10)系统级I/O,文件描述符,重定向,缓冲区,文件操作模拟实现_世真的博客-CSDN博客
-
父进程创建管道
-
创建子进程,子进程会继承父进程打开的文件
-
关闭父进程的读端 fd[0] 和子进程的写端 fd[1],保证单向性
pipe 函数:
创建管道
NAME
pipe, pipe2 - create pipe
SYNOPSIS
#include <unistd.h>
int pipe(int pipefd[2]);
// 创建成功:返回0。出错:返回-1
pipe()
创建一个管道,它是一个可用于进程间通信的单向数据通道。数组 pipefd
用于返回引用管道末端的两个文件描述符。pipefd[0]
表示管道的读取端。pipefd[1]
表示管道的写入端。从管道写入端写入的数据将由内核缓冲,直到从管道读取端读取。
pipe通信基本过程
例子:
创建管道并查看pipefd的两个文件描述符:
#include <iostream>
#include <cstdio>
#include <unistd.h>
using namespace std;
int main()
{
int pipefd[2] = { 0 };
if (pipe(pipefd) != 0)
{
cerr << "pipe error" << endl;
return 1;
}
cout << "fd[0]: " << pipefd[0] << endl;
cout << "fd[1]: " << pipefd[1] << endl;
return 0;
}
运行结果:
fd[0]: 3
fd[1]: 4
接下来创建子进程,并通过管道让父进程给子进程发消息:
#include <iostream>
#include <cstdio>
#include <cstring>
#include <unistd.h>
#include <sys/wait.h>
#include <sys/types.h>
using namespace std;
int main()
{
// 创建管道
int pipefd[2] = { 0 };
if (pipe(pipefd) != 0)
{
cerr << "pipe error" << endl;
return 1;
}
// 创建子进程
pid_t id = fork();
if (id < 0)
{
cerr << "fork error" << endl;
return 2;
}
else if (id == 0)
{
// 子进程
// 子进程进行读取,关闭写端
close(pipefd[1]);
#define NUM 1024
char buffer[NUM];
while (true)
{
memset(buffer, 0, sizeof(buffer));
size_t s = read(pipefd[0], buffer, sizeof(buffer));
if (s > 0)
{
buffer[s] = '\0';
cout << "子进程收到消息,内容是:" << buffer << endl;
}
else if (s == 0)
{
cout << "父进程写完,子进程读完,退出" << endl;
break;
}
else
{
// nothing to do
}
}
close(pipefd[0]);
exit(0);
}
else
{
// 父进程
// 父进程进行写入,关闭读端
close(pipefd[0]);
string msg = "儿子,我是你爸爸";
int cnt = 0;
while (cnt < 5)
{
write(pipefd[1], msg.c_str(), msg.size());
sleep(1);
++cnt;
}
close(pipefd[1]);
cout << "父进程写入完毕" << endl;
}
pid_t res = waitpid(id, nullptr, 0);
if (res > 0)
{
cout << "等待子进程成功" << endl;
}
return 0;
}
上述代码,让父进程写5次消息到管道中,子进程一直读,父进程写完后等待子进程退出。
补充:read函数可以通过查看文件的引用计数来得知父进程有没有关闭描述符,如果关闭了,read就会读完然后返回0
输出结果:
[CegghnnoR@VM-4-13-centos pipe]$ ./mypipe
子进程收到消息,内容是:儿子,我是你爸爸
子进程收到消息,内容是:儿子,我是你爸爸
子进程收到消息,内容是:儿子,我是你爸爸
子进程收到消息,内容是:儿子,我是你爸爸
子进程收到消息,内容是:儿子,我是你爸爸
父进程写完,子进程读完,退出
父进程写入完毕
等待子进程成功
在观察结果的过程中很容易发现一个现象:父进程每隔1秒写入一次,子进程读消息的节奏也是1s一次。
当父进程没有写入的时候,子进程在干什么呢?
- 子进程在阻塞等待,所以父进程写入后,子进程才能read到数据,子进程打印读取数据要以父进程的节奏为主。
- 所以管道的这种读写是有顺序性的,而不像父子进程各自向显示器printf那样
- 管道内部没有数据,read必须阻塞等待
- 管道内部数据被写满,write必须阻塞等待
父进程控制子进程
创建一个任务集合,父进程通过发送任务码让子进程完成指定的任务。
#include <iostream>
#include <vector>
#include <unordered_map>
#include <cstdio>
#include <cstring>
#include <unistd.h>
#include <cstdlib>
#include <sys/wait.h>
#include <sys/types.h>
#include <cassert>
using namespace std;
typedef void(*functor)(); // 定义函数指针
vector<functor> functors; // 方法集合
unordered_map<uint32_t, string> info;
void f1()
{
printf("这是一个处理日志的任务,执行的进程 id [%d],执行时间是[%d]\n", getpid(), time(nullptr));
}
void f2()
{
printf("这是一个备份数据的任务,执行的进程 id [%d],执行时间是[%d]\n", getpid(), time(nullptr));
}
void f3()
{
printf("这是一个处理网络连接的任务,执行的进程 id [%d],执行时间是[%d]\n", getpid(), time(nullptr));
}
void loadFunctor()
{
info.insert({functors.size(), "处理日志的任务"});
functors.push_back(f1);
info.insert({functors.size(), "备份数据的任务"});
functors.push_back(f2);
info.insert({functors.size(), "处理网络连接的任务"});
functors.push_back(f3);
}
int main()
{
// 加载任务列表
loadFunctor();
// 创建管道
int pipefd[2] = { 0 };
if (pipe(pipefd) != 0)
{
cerr << "pipe error" << endl;
return 1;
}
// 创建子进程
pid_t id = fork();
if (id < 0)
{
cerr << "fork error" << endl;
return 2;
}
else if (id == 0)
{
// child, read
close(pipefd[1]);
while (true)
{
uint32_t operatorType = 0;
// 如果有数据,就读取,如果没有数据,就阻塞等待
ssize_t s = read(pipefd[0], &operatorType, sizeof(ssize_t));
if (s == 0)
{
cout << "父进程派发任务完毕,子进程退出" << endl;
break;
}
assert(s == sizeof(uint32_t));
(void)s;
if (operatorType < functors.size())
{
functors[operatorType]();
}
else
{
cerr << "bug? operatorType = " << operatorType << endl;
}
}
close(pipefd[0]);
exit(0);
}
else
{
srand((long long)time(nullptr));
// parent, write
close(pipefd[0]);
// 指派任务
int num = functors.size();
int cnt = 10;
while (cnt--)
{
// 形成任务码
uint32_t commandCode = rand() % num;
cout << "父进程指派任务完成,任务是:" << info[commandCode] << "任务编号:" << cnt << endl;
// 向指定的进程下达任务
write(pipefd[1], &commandCode, sizeof(uint32_t));
sleep(1);
}
close(pipefd[1]);
pid_t res = waitpid(id, nullptr ,0);
if (res) cout << "wait success" << endl;
}
return 0;
}
运行结果:
[CegghnnoR@VM-4-13-centos pipe]$ ./mypipe
父进程指派任务完成,任务是:处理网络连接的任务任务编号:9
这是一个处理网络连接的任务,执行的进程 id [11735],执行时间是[1667355875]
父进程指派任务完成,任务是:备份数据的任务任务编号:8
这是一个备份数据的任务,执行的进程 id [11735],执行时间是[1667355876]
父进程指派任务完成,任务是:处理网络连接的任务任务编号:7
这是一个处理网络连接的任务,执行的进程 id [11735],执行时间是[1667355877]
父进程指派任务完成,任务是:处理网络连接的任务任务编号:6
这是一个处理网络连接的任务,执行的进程 id [11735],执行时间是[1667355878]
父进程指派任务完成,任务是:处理网络连接的任务任务编号:5
这是一个处理网络连接的任务,执行的进程 id [11735],执行时间是[1667355879]
父进程指派任务完成,任务是:备份数据的任务任务编号:4
这是一个备份数据的任务,执行的进程 id [11735],执行时间是[1667355880]
父进程指派任务完成,任务是:处理日志的任务任务编号:3
这是一个处理日志的任务,执行的进程 id [11735],执行时间是[1667355881]
父进程指派任务完成,任务是:备份数据的任务任务编号:2
这是一个备份数据的任务,执行的进程 id [11735],执行时间是[1667355882]
父进程指派任务完成,任务是:备份数据的任务任务编号:1
这是一个备份数据的任务,执行的进程 id [11735],执行时间是[1667355883]
父进程指派任务完成,任务是:备份数据的任务任务编号:0
这是一个备份数据的任务,执行的进程 id [11735],执行时间是[1667355884]
父进程派发任务完毕,子进程退出
wait success
父进程控制多个子进程
#include <iostream>
#include <vector>
#include <unordered_map>
#include <cstdio>
#include <cstring>
#include <unistd.h>
#include <cstdlib>
#include <sys/wait.h>
#include <sys/types.h>
#include <cassert>
using namespace std;
typedef void(*functor)(); // 定义函数指针
vector<functor> functors; // 方法集合
unordered_map<uint32_t, string> info;
void f1()
{
printf("这是一个处理日志的任务,执行的进程 id [%d],执行时间是[%d]\n\n", getpid(), time(nullptr));
}
void f2()
{
printf("这是一个备份数据的任务,执行的进程 id [%d],执行时间是[%d]\n\n", getpid(), time(nullptr));
}
void f3()
{
printf("这是一个处理网络连接的任务,执行的进程 id [%d],执行时间是[%d]\n\n", getpid(), time(nullptr));
}
void loadFunctor()
{
info.insert({functors.size(), "处理日志的任务"});
functors.push_back(f1);
info.insert({functors.size(), "备份数据的任务"});
functors.push_back(f2);
info.insert({functors.size(), "处理网络连接的任务"});
functors.push_back(f3);
}
// int32_t:进程pid, int32_t:该进程对应的管道写端fd
typedef pair<int32_t, int32_t> elem;
vector<elem> assignMap;
int processNum = 5;
void work(int blockFd)
{
cout << "进程 [" << getpid() << "] " << "开始工作" << endl;
// 子进程核心工作的代码
while (true)
{
// 阻塞等待,获取任务信息
uint32_t operaotrCode = 0;
ssize_t s = read(blockFd, &operaotrCode, sizeof(uint32_t));
if (s == 0) break;
assert(s == sizeof(uint32_t));
(void)s;
// 处理任务
if (operaotrCode < functors.size()) functors[operaotrCode]();
}
cout << "进程 [" << getpid() << "] " << "结束工作" << endl;
}
void sendTask(const vector<elem>& processFds)
{
srand((long long)time(nullptr));
while (true)
{
sleep(1);
// 选择一个进程
uint32_t pick = rand() % processFds.size();
// 选择一个任务
uint32_t task = rand() % functors.size();
// 把任务给一个指定的进程
write(processFds[pick].second, &task, sizeof(task));
// 打印提示信息
printf("父进程指派任务: %s 给进程:%d, 编号: %d\n", info[task].c_str(), processFds[pick].first, pick);
}
}
int main()
{
loadFunctor();
// 创建processNum个进程
for (int i = 0; i < processNum; ++i)
{
int pipefd[2] = { 0 };
// 创建管道
pipe(pipefd);
// 创建子进程
pid_t id = fork();
if (id == 0)
{
// 子进程读取,r
close(pipefd[1]);
// 子进程执行
work(pipefd[0]);
exit(0);
}
// 父进程做的事情
close(pipefd[0]);
elem e(id, pipefd[1]);
assignMap.push_back(e);
}
cout << "create all process sucess" << endl;
// 父进程派发任务
sendTask(assignMap);
// 回收资源
for (int i = 0; i < processNum; ++i)
{
if (waitpid(assignMap[i].first, nullptr, 0) > 0)
printf("wait for: pid = %d, wait success, number: %d\n", assignMap[i].first, i);
close(assignMap[i].second);
}
return 0;
}
[CegghnnoR@VM-4-13-centos pipe]$ ./mypipe
进程 [19069] 开始工作
create all process sucess
进程 [19071] 开始工作
进程 [19072] 开始工作
进程 [19073] 开始工作
进程 [19070] 开始工作
父进程指派任务: 处理网络连接的任务 给进程:19069, 编号: 0
这是一个处理网络连接的任务,执行的进程 id [19069],执行时间是[1667397801]
父进程指派任务: 处理日志的任务 给进程:19073, 编号: 4
这是一个处理日志的任务,执行的进程 id [19073],执行时间是[1667397802]
父进程指派任务: 备份数据的任务 给进程:19069, 编号: 0
这是一个备份数据的任务,执行的进程 id [19069],执行时间是[1667397803]
父进程指派任务: 处理网络连接的任务 给进程:19071, 编号: 2
这是一个处理网络连接的任务,执行的进程 id [19071],执行时间是[1667397804]
父进程指派任务: 备份数据的任务 给进程:19071, 编号: 2
这是一个备份数据的任务,执行的进程 id [19071],执行时间是[1667397805]
父进程指派任务: 处理网络连接的任务 给进程:19073, 编号: 4
这是一个处理网络连接的任务,执行的进程 id [19073],执行时间是[1667397806]
父进程指派任务: 备份数据的任务 给进程:19072, 编号: 3
这是一个备份数据的任务,执行的进程 id [19072],执行时间是[1667397807]
补充:命令行中 |
也可以创建管道,|
左右的两个命令所创建的进程之间可以相互通信,这是子进程之间的通信。
管道特点总结
- 管道只能用来进行具有血缘关系的进程之间进行通信,常用于父子通信
- 管道只能单向通信(内核实现决定),是半双工的一种特殊情况
- 管道自带同步机制(访问控制)(pipe满,write等。pipe空,read等)
- 管道是面向字节流的——先写的字符,先被读取
- 管道是文件,其生命周期随进程创建和退出。
命名管道
上面的匿名管道只能进行有血缘关系的进程之间通信,对于毫不相干的进程间通信,需要使用命名管道。
创建命名管道的命令 mkfifo:
NAME
mkfifo - make FIFOs (named pipes)
SYNOPSIS
mkfifo [OPTION]... NAME...
使用该命令,创建一个管道文件:
[CegghnnoR@VM-4-13-centos named_pipe]$ mkfifo myfifo
[CegghnnoR@VM-4-13-centos named_pipe]$ ll
total 0
prw-rw-r-- 1 CegghnnoR CegghnnoR 0 Nov 2 22:54 myfifo
例子:
在一个终端中等待读取管道中的数据,在另一个终端中向管道写入数据,被写入的数据会在另一个终端中显示。
创建命名管道的函数接口:
NAME
mkfifo - make a FIFO special file (a named pipe)
SYNOPSIS
#include <sys/types.h>
#include <sys/stat.h>
int mkfifo(const char *pathname, mode_t mode);
// 创建成功:返回0,失败:返回-1
pathname
:管道文件路径
mode
:文件权限
需要注意的是:命名管道的通信依然是在内存里进行的,数据不会刷新到磁盘,所谓的管道文件,只是代表一种符号。
例子:
编写两个程序,一个表示客户端,一个表示服务端。让客户端输入的信息能够在服务端显示。
comm.h
#pragma once
#include <iostream>
#include <string>
#include <cstring>
#include <cerrno>
#include <cstdio>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#define IPC_PATH "./.fifo"
clientFifo.cpp
// 写入
#include "comm.h"
using namespace std;
int main()
{
int pipeFd = open(IPC_PATH, O_WRONLY);
if (pipeFd < 0)
{
cerr << "open: " << strerror(errno) << endl;
return 1;
}
#define NUM 1024
char line[NUM];
while (true)
{
printf("请输入你的消息# ");
fflush(stdout);
memset(line, 0, sizeof(line));
if (fgets(line, sizeof(line), stdin) != nullptr)
{
write(pipeFd, line, strlen(line));
}
else
{
break;
}
}
close(pipeFd);
cout << "客户端退出" << endl;
return 0;
}
serverFifo.cpp
// 读取
#include "comm.h"
using namespace std;
int main()
{
umask(0);
if (mkfifo(IPC_PATH, 0600) != 0)
{
cerr << "mkfifo error" << endl;
return 1;
}
int pipeFd = open(IPC_PATH, O_RDONLY);
if (pipeFd < 0)
{
cerr << "open fifo error" << endl;
return 2;
}
// 通信过程...
#define NUM 1024
char buffer[NUM];
while (true)
{
ssize_t s = read(pipeFd, buffer, sizeof(buffer) - 1);
if (s > 0)
{
buffer[s] = '\0';
cout << "客户端->服务器# " << buffer << endl;
}
else if (s == 0)
{
cout << "客户退出了,我服务也退出" << endl;
break;
}
else
{
//错误
cout << strerror(errno) << endl;
break;
}
}
close(pipeFd);
cout << "服务端退出" << endl;
return 0;
}
运行:
打开两个终端,下图分别为客户端(左)和服务端(右),先运行服务端创建管道并等待输入,然后运行客户端输入信息。
可以看到这两个程序是毫无关系的,但是它们可以通过命名管道实现通信。
最后一个小知识点:
使用unlink删除文件。
NAME
unlink - delete a name and possibly the file it refers to
SYNOPSIS
#include <unistd.h>
int unlink(const char *pathname);
当你在程序运行结束想删除管道文件,可以使用 unlink
//...
close(pipeFd);
cout << "服务端退出" << endl;
unlink(IPC_PATH); // 在程序运行结束就把管道文件删掉
return 0;
}