Linux(12)进程间通信之管道

news/2024/7/7 22:49:07

文章目录

  • 匿名管道
    • pipe通信基本过程
    • 父进程控制子进程
    • 父进程控制多个子进程
    • 管道特点总结
  • 命名管道

进程是具有独立性的,进程间想要交互数据,成本会非常高

进程间通信的目的:

  • 数据传输:一个进程需要将它的数据发送给另一个进程
  • 资源共享:多个进程之间共享同样的资源
  • 通知事件:一个进程需要向另一个或一组进程发送消息,通知它(它们)发生了某种事件(如进程终止时要通知父进程)。
  • 进程控制:有些进程希望完全控制另一个进程的指向(如 Debug 进程),此时控制进程希望能够拦截另一个进程的所有陷入和异常,并能够及时知道它的状态改变。

匿名管道

管道是 Unix 中最古老的进程间通信的形式

我们把从一个进程连接到另一个进程的一个数据流称为一个管道,管道是单向的。

管道本质上也是一个文件,但是它和普通的文件不一样,普通的文件是为了向磁盘上读写数据,而管道是为了进程间通信,而且管道的操作属于内存级别。

下面简单讲述管道的原理,需要用到文件描述符的知识👉[Linux](10)系统级I/O,文件描述符,重定向,缓冲区,文件操作模拟实现_世真的博客-CSDN博客

  1. 父进程创建管道

    1

  2. 创建子进程,子进程会继承父进程打开的文件

    2

  3. 关闭父进程的读端 fd[0] 和子进程的写端 fd[1],保证单向性

    3

pipe 函数

创建管道

NAME
       pipe, pipe2 - create pipe

SYNOPSIS
       #include <unistd.h>

       int pipe(int pipefd[2]);
// 创建成功:返回0。出错:返回-1

pipe() 创建一个管道,它是一个可用于进程间通信的单向数据通道。数组 pipefd 用于返回引用管道末端的两个文件描述符。pipefd[0] 表示管道的读取端。pipefd[1] 表示管道的写入端。从管道写入端写入的数据将由内核缓冲,直到从管道读取端读取。

pipe通信基本过程

例子

创建管道并查看pipefd的两个文件描述符:

#include <iostream>
#include <cstdio>
#include <unistd.h>
using namespace std;

int main()
{
    int pipefd[2] = { 0 };
    if (pipe(pipefd) != 0)
    {
        cerr << "pipe error" << endl;
        return 1;
    }
    cout << "fd[0]: " << pipefd[0] << endl;
    cout << "fd[1]: " << pipefd[1] << endl;
    return 0;
}
运行结果:
fd[0]: 3
fd[1]: 4

接下来创建子进程,并通过管道让父进程给子进程发消息:

#include <iostream>
#include <cstdio>
#include <cstring>
#include <unistd.h>
#include <sys/wait.h>
#include <sys/types.h>
using namespace std;

int main()
{
    // 创建管道
    int pipefd[2] = { 0 };
    if (pipe(pipefd) != 0)
    {
        cerr << "pipe error" << endl;
        return 1;
    }
    // 创建子进程
    pid_t id = fork();
    if (id < 0)
    {
        cerr << "fork error" << endl;
        return 2;
    }
    else if (id == 0)
    {
        // 子进程
        // 子进程进行读取,关闭写端
        close(pipefd[1]);
        #define NUM 1024
        char buffer[NUM];
        while (true)
        {
            memset(buffer, 0, sizeof(buffer));
            size_t s = read(pipefd[0], buffer, sizeof(buffer));
            if (s > 0)
            {
                buffer[s] = '\0';
                cout << "子进程收到消息,内容是:" << buffer << endl;
            }
            else if (s == 0)
            {
                cout << "父进程写完,子进程读完,退出" << endl;
                break;
            }
            else
            {
                // nothing to do
            }
        }
        close(pipefd[0]);
        exit(0);
    }
    else
    {
        // 父进程
        // 父进程进行写入,关闭读端
        close(pipefd[0]);
        string msg = "儿子,我是你爸爸";
        int cnt = 0;
        while (cnt < 5)
        {
            write(pipefd[1], msg.c_str(), msg.size());
            sleep(1);
            ++cnt;
        }
        close(pipefd[1]);
        cout << "父进程写入完毕" << endl;
    }
    pid_t res = waitpid(id, nullptr, 0);
    if (res > 0)
    {
        cout << "等待子进程成功" << endl;
    }
    return 0;
}

上述代码,让父进程写5次消息到管道中,子进程一直读,父进程写完后等待子进程退出。

补充:read函数可以通过查看文件的引用计数来得知父进程有没有关闭描述符,如果关闭了,read就会读完然后返回0

输出结果:

[CegghnnoR@VM-4-13-centos pipe]$ ./mypipe
子进程收到消息,内容是:儿子,我是你爸爸
子进程收到消息,内容是:儿子,我是你爸爸
子进程收到消息,内容是:儿子,我是你爸爸
子进程收到消息,内容是:儿子,我是你爸爸
子进程收到消息,内容是:儿子,我是你爸爸
父进程写完,子进程读完,退出
父进程写入完毕
等待子进程成功

在观察结果的过程中很容易发现一个现象:父进程每隔1秒写入一次,子进程读消息的节奏也是1s一次。

当父进程没有写入的时候,子进程在干什么呢?

  • 子进程在阻塞等待,所以父进程写入后,子进程才能read到数据,子进程打印读取数据要以父进程的节奏为主。
  • 所以管道的这种读写是有顺序性的,而不像父子进程各自向显示器printf那样
  • 管道内部没有数据,read必须阻塞等待
  • 管道内部数据被写满,write必须阻塞等待

父进程控制子进程

创建一个任务集合,父进程通过发送任务码让子进程完成指定的任务。

#include <iostream>
#include <vector>
#include <unordered_map>
#include <cstdio>
#include <cstring>
#include <unistd.h>
#include <cstdlib>
#include <sys/wait.h>
#include <sys/types.h>
#include <cassert>

using namespace std;

typedef void(*functor)(); // 定义函数指针

vector<functor> functors; // 方法集合

unordered_map<uint32_t, string> info;


void f1()
{
    printf("这是一个处理日志的任务,执行的进程 id [%d],执行时间是[%d]\n", getpid(), time(nullptr)); 
}
void f2()
{
    printf("这是一个备份数据的任务,执行的进程 id [%d],执行时间是[%d]\n", getpid(), time(nullptr)); 
}
void f3()
{
    printf("这是一个处理网络连接的任务,执行的进程 id [%d],执行时间是[%d]\n", getpid(), time(nullptr)); 
}

void loadFunctor()
{
    info.insert({functors.size(), "处理日志的任务"});
    functors.push_back(f1);
    info.insert({functors.size(), "备份数据的任务"});
    functors.push_back(f2);
    info.insert({functors.size(), "处理网络连接的任务"});
    functors.push_back(f3);
}

int main()
{
    // 加载任务列表
    loadFunctor();
    // 创建管道
    int pipefd[2] = { 0 };
    if (pipe(pipefd) != 0)
    {
        cerr << "pipe error" << endl;
        return 1;
    }
    // 创建子进程
    pid_t id = fork();
    if (id < 0)
    {
        cerr << "fork error" << endl;
        return 2;
    }
    else if (id == 0)
    {
        // child, read
        close(pipefd[1]);
        while (true)
        {
            uint32_t operatorType = 0;
            // 如果有数据,就读取,如果没有数据,就阻塞等待
            ssize_t s = read(pipefd[0], &operatorType, sizeof(ssize_t));
            if (s == 0)
            {
                cout << "父进程派发任务完毕,子进程退出" << endl;
                break;
            }
            assert(s == sizeof(uint32_t));
            (void)s;


            if (operatorType < functors.size())
            {
                functors[operatorType]();
            }
            else
            {
                cerr << "bug? operatorType = " << operatorType << endl;
            }
        }
        close(pipefd[0]);
        exit(0);
    }
    else
    {
        srand((long long)time(nullptr));
        // parent, write
        close(pipefd[0]);
        // 指派任务
        int num = functors.size();
        int cnt = 10;
        while (cnt--)
        {
            // 形成任务码
            uint32_t commandCode = rand() % num;
            cout << "父进程指派任务完成,任务是:"  << info[commandCode] << "任务编号:" << cnt << endl;
            // 向指定的进程下达任务
            write(pipefd[1], &commandCode, sizeof(uint32_t));
            sleep(1);
        }

        close(pipefd[1]);
        pid_t res = waitpid(id, nullptr ,0);
        if (res) cout << "wait success" << endl;
    }
    return 0;
}

运行结果

[CegghnnoR@VM-4-13-centos pipe]$ ./mypipe
父进程指派任务完成,任务是:处理网络连接的任务任务编号:9
这是一个处理网络连接的任务,执行的进程 id [11735],执行时间是[1667355875]
父进程指派任务完成,任务是:备份数据的任务任务编号:8
这是一个备份数据的任务,执行的进程 id [11735],执行时间是[1667355876]
父进程指派任务完成,任务是:处理网络连接的任务任务编号:7
这是一个处理网络连接的任务,执行的进程 id [11735],执行时间是[1667355877]
父进程指派任务完成,任务是:处理网络连接的任务任务编号:6
这是一个处理网络连接的任务,执行的进程 id [11735],执行时间是[1667355878]
父进程指派任务完成,任务是:处理网络连接的任务任务编号:5
这是一个处理网络连接的任务,执行的进程 id [11735],执行时间是[1667355879]
父进程指派任务完成,任务是:备份数据的任务任务编号:4
这是一个备份数据的任务,执行的进程 id [11735],执行时间是[1667355880]
父进程指派任务完成,任务是:处理日志的任务任务编号:3
这是一个处理日志的任务,执行的进程 id [11735],执行时间是[1667355881]
父进程指派任务完成,任务是:备份数据的任务任务编号:2
这是一个备份数据的任务,执行的进程 id [11735],执行时间是[1667355882]
父进程指派任务完成,任务是:备份数据的任务任务编号:1
这是一个备份数据的任务,执行的进程 id [11735],执行时间是[1667355883]
父进程指派任务完成,任务是:备份数据的任务任务编号:0
这是一个备份数据的任务,执行的进程 id [11735],执行时间是[1667355884]
父进程派发任务完毕,子进程退出
wait success

父进程控制多个子进程

#include <iostream>
#include <vector>
#include <unordered_map>
#include <cstdio>
#include <cstring>
#include <unistd.h>
#include <cstdlib>
#include <sys/wait.h>
#include <sys/types.h>
#include <cassert>

using namespace std;

typedef void(*functor)(); // 定义函数指针

vector<functor> functors; // 方法集合

unordered_map<uint32_t, string> info;


void f1()
{
    printf("这是一个处理日志的任务,执行的进程 id [%d],执行时间是[%d]\n\n", getpid(), time(nullptr)); 
}
void f2()
{
    printf("这是一个备份数据的任务,执行的进程 id [%d],执行时间是[%d]\n\n", getpid(), time(nullptr)); 
}
void f3()
{
    printf("这是一个处理网络连接的任务,执行的进程 id [%d],执行时间是[%d]\n\n", getpid(), time(nullptr)); 
}

void loadFunctor()
{
    info.insert({functors.size(), "处理日志的任务"});
    functors.push_back(f1);
    info.insert({functors.size(), "备份数据的任务"});
    functors.push_back(f2);
    info.insert({functors.size(), "处理网络连接的任务"});
    functors.push_back(f3);
}

// int32_t:进程pid, int32_t:该进程对应的管道写端fd
typedef pair<int32_t, int32_t> elem;
vector<elem> assignMap;
int processNum = 5;

void work(int blockFd)
{
    cout << "进程 [" << getpid() << "] " << "开始工作" << endl;
    // 子进程核心工作的代码
    while (true)
    {
        // 阻塞等待,获取任务信息
        uint32_t operaotrCode = 0;
        ssize_t s = read(blockFd, &operaotrCode, sizeof(uint32_t));
        if (s == 0) break;
        assert(s == sizeof(uint32_t));
        (void)s;
        // 处理任务
        if (operaotrCode < functors.size()) functors[operaotrCode]();
    }
    cout << "进程 [" << getpid() << "] " << "结束工作" << endl;
}

void sendTask(const vector<elem>& processFds)
{
    srand((long long)time(nullptr));
    while (true)
    {
        sleep(1);
        // 选择一个进程
        uint32_t pick = rand() % processFds.size();
        // 选择一个任务
        uint32_t task = rand() % functors.size();
        // 把任务给一个指定的进程
        write(processFds[pick].second, &task, sizeof(task));
        // 打印提示信息
        printf("父进程指派任务: %s 给进程:%d, 编号: %d\n", info[task].c_str(), processFds[pick].first, pick);
    }
}

int main()
{
    loadFunctor();
    // 创建processNum个进程
    for (int i = 0; i < processNum; ++i)
    {
        int pipefd[2] = { 0 };
        // 创建管道
        pipe(pipefd);
        // 创建子进程
        pid_t id = fork();
        if (id == 0)
        {
            // 子进程读取,r
            close(pipefd[1]);
            // 子进程执行
            work(pipefd[0]);
            exit(0);
        }
        // 父进程做的事情
        close(pipefd[0]);
        elem e(id, pipefd[1]);
        assignMap.push_back(e);
    }
    cout << "create all process sucess" << endl;
    // 父进程派发任务
    sendTask(assignMap);
    // 回收资源
    for (int i = 0; i < processNum; ++i)
    {
        if (waitpid(assignMap[i].first, nullptr, 0) > 0)
            printf("wait for: pid = %d, wait success, number: %d\n", assignMap[i].first, i);
        close(assignMap[i].second);
    }
    return 0;
}
[CegghnnoR@VM-4-13-centos pipe]$ ./mypipe
进程 [19069] 开始工作
create all process sucess
进程 [19071] 开始工作
进程 [19072] 开始工作
进程 [19073] 开始工作
进程 [19070] 开始工作
父进程指派任务: 处理网络连接的任务 给进程:19069, 编号: 0
这是一个处理网络连接的任务,执行的进程 id [19069],执行时间是[1667397801]

父进程指派任务: 处理日志的任务 给进程:19073, 编号: 4
这是一个处理日志的任务,执行的进程 id [19073],执行时间是[1667397802]

父进程指派任务: 备份数据的任务 给进程:19069, 编号: 0
这是一个备份数据的任务,执行的进程 id [19069],执行时间是[1667397803]

父进程指派任务: 处理网络连接的任务 给进程:19071, 编号: 2
这是一个处理网络连接的任务,执行的进程 id [19071],执行时间是[1667397804]

父进程指派任务: 备份数据的任务 给进程:19071, 编号: 2
这是一个备份数据的任务,执行的进程 id [19071],执行时间是[1667397805]

父进程指派任务: 处理网络连接的任务 给进程:19073, 编号: 4
这是一个处理网络连接的任务,执行的进程 id [19073],执行时间是[1667397806]

父进程指派任务: 备份数据的任务 给进程:19072, 编号: 3
这是一个备份数据的任务,执行的进程 id [19072],执行时间是[1667397807]

补充:命令行中 | 也可以创建管道,| 左右的两个命令所创建的进程之间可以相互通信,这是子进程之间的通信。

管道特点总结

  • 管道只能用来进行具有血缘关系的进程之间进行通信,常用于父子通信
  • 管道只能单向通信(内核实现决定),是半双工的一种特殊情况
  • 管道自带同步机制(访问控制)(pipe满,write等。pipe空,read等)
  • 管道是面向字节流的——先写的字符,先被读取
  • 管道是文件,其生命周期随进程创建和退出。

命名管道

上面的匿名管道只能进行有血缘关系的进程之间通信,对于毫不相干的进程间通信,需要使用命名管道。

创建命名管道的命令 mkfifo

NAME
       mkfifo - make FIFOs (named pipes)

SYNOPSIS
       mkfifo [OPTION]... NAME...

使用该命令,创建一个管道文件:

[CegghnnoR@VM-4-13-centos named_pipe]$ mkfifo myfifo
[CegghnnoR@VM-4-13-centos named_pipe]$ ll
total 0
prw-rw-r-- 1 CegghnnoR CegghnnoR 0 Nov  2 22:54 myfifo

例子

在一个终端中等待读取管道中的数据,在另一个终端中向管道写入数据,被写入的数据会在另一个终端中显示。

guandao


创建命名管道的函数接口

NAME
       mkfifo - make a FIFO special file (a named pipe)

SYNOPSIS
       #include <sys/types.h>
       #include <sys/stat.h>

       int mkfifo(const char *pathname, mode_t mode);
		//	创建成功:返回0,失败:返回-1

pathname:管道文件路径

mode:文件权限

需要注意的是:命名管道的通信依然是在内存里进行的,数据不会刷新到磁盘,所谓的管道文件,只是代表一种符号。

例子

编写两个程序,一个表示客户端,一个表示服务端。让客户端输入的信息能够在服务端显示。

comm.h

#pragma once

#include <iostream>
#include <string>
#include <cstring>
#include <cerrno>
#include <cstdio>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>

#define IPC_PATH "./.fifo"

clientFifo.cpp

// 写入
#include "comm.h"
using namespace std;

int main()
{
    int pipeFd = open(IPC_PATH, O_WRONLY);
    if (pipeFd < 0)
    {
        cerr << "open: " << strerror(errno) << endl;
        return 1;
    }
#define NUM 1024
    char line[NUM];
    while (true)
    {
        printf("请输入你的消息# ");
        fflush(stdout);
        memset(line, 0, sizeof(line));
        if (fgets(line, sizeof(line), stdin) != nullptr)
        {
            write(pipeFd, line, strlen(line));
        }
        else
        {
            break;
        }
    }

    close(pipeFd);
    cout << "客户端退出" << endl;
    return 0;
}

serverFifo.cpp

// 读取
#include "comm.h"
using namespace std;

int main()
{
    umask(0);
    if (mkfifo(IPC_PATH, 0600) != 0)
    {
        cerr << "mkfifo error" << endl;
        return 1;
    }

    int pipeFd = open(IPC_PATH, O_RDONLY);
    if (pipeFd < 0)
    {
        cerr << "open fifo error" << endl;
        return 2;
    }
    // 通信过程...
#define NUM 1024
    char buffer[NUM];
    while (true)
    {

        ssize_t s = read(pipeFd, buffer, sizeof(buffer) -  1);
        if (s > 0)
        {
            buffer[s] = '\0';
            cout << "客户端->服务器# " << buffer << endl;
        }
        else if (s == 0)
        {
            cout << "客户退出了,我服务也退出" << endl;
            break;
        }
        else
        {
            //错误
            cout << strerror(errno) << endl;
            break;
        }
    }
    close(pipeFd);
    cout << "服务端退出" << endl;
    return 0;
}

运行

打开两个终端,下图分别为客户端(左)和服务端(右),先运行服务端创建管道并等待输入,然后运行客户端输入信息。

mingming

可以看到这两个程序是毫无关系的,但是它们可以通过命名管道实现通信。

最后一个小知识点:

使用unlink删除文件。

NAME
       unlink - delete a name and possibly the file it refers to

SYNOPSIS
       #include <unistd.h>

       int unlink(const char *pathname);

当你在程序运行结束想删除管道文件,可以使用 unlink

    //...
	close(pipeFd);
    cout << "服务端退出" << endl;
    unlink(IPC_PATH); // 在程序运行结束就把管道文件删掉
    return 0;
}

http://lihuaxi.xjx100.cn/news/133246.html

相关文章

快速记忆杂乱无章

章节章节01 - 计算机组成原理与体系结构07 - 法律法规与标准化与多媒体基础02 - 操作系统基本原理08 - 设计模式03 - 数据库系统09 - 软件工程04 - 计算机网络10 - 面向对象05 - 数据结构与算法11 - 结构化开发与UML06 - 程序设计语言与语言处理程序基础12 - 下午题历年真题End…

【一文讲明白什么是云原生,有什么优势】

目录 什么是云原生&#xff1f; 云原生有什么优势&#xff1f; 云原生时代开发者必须掌握哪些能力&#xff1f; 微服务 网关 Kubernetes DevOps ServiceMesh 十二要素应用程序 总结 什么是云原生&#xff1f; 最近看见云原生比较火&#xff0c;越来越多的编程语言、框架开…

中英文说明书丨艾美捷MAPT单克隆抗体

艾美捷MAPT单克隆抗体英文说明书&#xff1a; Specification&#xff1a; Product Description:Mouse monoclonal antibody raised against a partial recombinant MAPT. Immunogen:MAPT (NP_058519.2, 167 a.a. ~ 266 a.a) partial recombinant protein with GST tag. MW o…

Spring 事务编程实践

Spring 事务实战 文章目录Spring 事务实战什么是 Spring 事务&#xff1f;声明式事务使用举例编程式事务使用举例声明式事务常见问题事务不生效Case1&#xff1a;类内部访问Case2&#xff1a;非 public 可重载方法Case3&#xff1a;异常不匹配(重点)Case4&#xff1a;多线程Cas…

LeetCode-754. 到达终点数字【数学】

LeetCode-754. 到达终点数字【数学】题目描述&#xff1a;解题思路一&#xff1a;三行代码。发现规律&#xff0c;将target取绝对值不影响结果。若未到达终点继续走即可。情况一&#xff1a;若最后s-target为偶数或0&#xff0c;我们发现可以在之前里面反走一步即可&#xff08…

MySQL系列-高级-深入理解Mysql事务隔离级别与锁机制01

MySQL系列-高级-深入理解Mysql事务隔离级别与锁机制1. 概述2.事务及其ACID属性1. ACID2. 并发事务处理带来的问题3. 事务隔离级别3. 锁1. 锁分类2. MylSAM表读/写锁案例分析1. 读锁操作2. 写锁操作3. InnoDB行锁案例分析行锁介绍行锁演示演示一演示二演示三演示2和演示3的问题本…

喜相逢再递表港交所:非控股股东均亏损,已提前“套现”数千万元

作者|汤汤 来源|贝多财经 近日&#xff0c;喜相逢控股有限公司&#xff08;下称“喜相逢”&#xff09;在港交所又一次递交招股书&#xff0c;准备在港交所主板上市。此前的2019年12月31日、2020年7月21日、2021年7月30日&#xff0c;喜相逢曾三度在港交所递交上市申请&#…

2022.11.3 英语背诵

contrive 发明&#xff0c;设计&#xff0c;设法&#xff0c;造成 amuse 使娱乐&#xff0c;逗笑 negligible 微不足道的 air 空气 incur 招致&#xff0c;遭受 The government had also ~red huge debts. molest 骚扰&#xff0c;干扰 men who ~ young boys monstrous 巨…