池化技术
池化技术其实是分治思想的一个体现,在生活中也有很多例子
例如,河水的水资源非常丰富,但是要多次从河水中取水使用是比较繁琐且低效的,尤其是当家离河流较远的时候(消耗较大),这时候我们就可以每次取一大缸水,需要使用的时候直接取水使用就可以了
进程池也是类似的思想,因为每一次fork创建子进程都需要进行系统调用,也需要消耗一定的资源,因此我们可以一次申请一定数量的进程PCB,分别分配对应的任务给他们就可以了
这实际上就是一种多进程并发运行的思想,我们可以用父进程来进行子进程的管理与任务分配、任务验收,然后让这些兄弟进程来运行不同的任务,这其中传递信息就需要用到我们前一篇所学的匿名管道
但是在这里我们先不具体分配,只是体会其中的思想即可
进程池
进程池原理
画成示意图就是这样的

蓝色是每一次我们fork子进程和pipe管道之后,需要关闭的接口,因为每一次的读接口都是3,所以从4、5、6以后的分别对应的就是不同的子进程的写管道
这里其实还有一个隐藏起来的bug,就是在第二个子进程以及之后的子进程fork的时候,实际上他拷贝的是父进程的文件描述符表,因此他也存在一个接口指向之前申请的写接口,只有最后一个子进程是只有一个写接口的

这会造成什么影响呢,我们说过,当一个管道的写口被关闭了之后,读口再读的返回值就是0,这样就标志管道使用结束了,但是如果我们想通过父进程从从上至下主动关闭写口并等待子进程退出,就会导致阻塞(死循环)
这是因为虽然父进程的写口关了,但是子进程2和3的写口仍然开的,从而导致管道1不关闭,子进程1就一直等待管道关闭
这里就有两个解决的思路
第一个思路是由下至上关闭退出子进程
第一个方法是,因为最后一个子进程只有一个写口,关闭之后,管道也可以正常关闭,进程也就可以正常退出了
第二个方法是,我一口气把所有的写口全部关了,等子进程自己退出,这样做是没有问题的,因为本质上还是第一个方法,因为只有关到最后一个写口的时候,子进程才开始从下到上依次退出
第二个思路是
产生这个bug的原因本质上是因为写口是直接被拷贝过来的,因此只需要在创建初始化子进程的时候,就把之前父进程拷贝来的其他子进程写口全部关掉就可以了
我们采取第二个思路,因为第一个思路只能一口气全部关闭,或者关闭一个进程及其以后创建出来的进程,没有办法做到想关哪个关哪个,简直太不优雅了
模拟实现进程池
在实现进程池之前,我们遇到的第一个问题就是,怎么管理这些进程
在之前我们学习的过程中,只有一个子进程,我们可以在父进程的视角,通过进程id是否为0分出来子进程和父进程
但是在进程池的视角来看,所有子进程的pid都是0,也就无从下手管理,了吗?
创建与初始化
我们讲过所有管理的操作思想就是,先描述后组织,想要描述这些进程,我们可以用类和结构体,用到面向对象的思想进行管理即可
请看实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92
| #include<iostream> #include<string> #include<vector> #include<cassert> #include<unistd.h> #include<sys/types.h> #include<sys/wait.h> #include"Task.hpp"
const int process_num = 5; static int number = 1;
class channel { public: channel(int fd, pid_t id) : CtrlFD(fd) , WorkerID(id) { name = "Channel " + std::to_string(number++); } public: int CtrlFD; pid_t WorkerID; std::string name; };
void Work() {
}
void PrintFd(const std::vector<int>& fds) { std::cout<<getpid()<<"关闭的写口有:"; for(auto fd : fds) { std::cout<<fd<<" "; } std::cout<<std::endl; }
void CreateChannels(std::vector<channel>* channels) { std::vector<int> old_channels; for(int i=0; i<process_num; i++) { int pipefd[2]={0}; int n=pipe(pipefd); assert(n==0); (void)n;
pid_t id = fork(); assert(id!=-1);
if(id==0) { if(!old_channels.empty()) { for(auto fd : old_channels) { close(fd); } PrintFd(old_channels); }
close(pipefd[1]); dup2(pipefd[0],0); Work(); exit(0); }
close(pipefd[0]); channels->push_back(channel(pipefd[1], id)); old_channels.push_back(pipefd[1]); }
}
int main() { std::vector<channel> channels; CreateChannels(&channels); return 0; }
|
- 我们把进程用写口id、进程id、名字表述组成,称之为信道,这样可以控制写入(输入任务),关闭(waitpid)
- 用create函数来控制信道的创建,这个函数由父进程执行,主要任务是创建规定数目的子进程,并且进行初始化,让子进程进Work函数准备进行工作
- 初始化主要让子进程删除从父进程拷贝过来的写口,自己管道的写口也需要关闭
- 父进程需要关闭管道的读口,将此次创建的进程口记录下来
- 这里很巧妙的运用了子进程对父进程的拷贝,因为old_channel是在父进程声明的,所以每一个新创建的子进程都会复制一份,而不会拥有之后任意一个子进程的写口
分配任务
接下来要做的工作就是让子进程运行并且获取父进程分配给他的任务
我们可以设置很多种类的任务,分别设置对应的任务号,然后子进程可以获取对应的任务号,再使用自己的资源进行运行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92
| #pragma once
#include<iostream> #include<functional> #include<vector> #include<ctime> #include<unistd.h>
using task_t = std::function<void()>;
void Task1() { std::cout<<"这是任务1,交给pid:为"<<getpid()<<"运行"<<std::endl; } void Task2() { std::cout<<"这是任务2,交给pid:为"<<getpid()<<"运行"<<std::endl; } void Task3() { std::cout<<"这是任务3,交给pid:为"<<getpid()<<"运行"<<std::endl; } void Task4() { std::cout<<"这是任务4,交给pid:为"<<getpid()<<"运行"<<std::endl; }
class Init { public: Init() { tasks.push_back(Task1); tasks.push_back(Task2); tasks.push_back(Task3); tasks.push_back(Task4);
srand(time(nullptr)^getpid()); }
int SelectTask() { return rand() % tasks.size(); }
std::string TaskName(int code) { switch (code) { case Task1_Code: return "Task1"; break; case Task2_Code: return "Task2"; break; case Task3_Code: return "Task3"; break; case Task4_Code: return "Task4"; break; default: return "Unknow"; break; } }
bool CheckSafe(int code) { if(code>=0&&code<tasks.size()) return true; else return false; }
void RunTask(int code) { return tasks[code](); } public: const static int Task1_Code = 0; const static int Task2_Code = 1; const static int Task3_Code = 2; const static int Task4_Code = 3;
std::vector<task_t> tasks; };
Init init;
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146
| #include<iostream> #include<string> #include<vector> #include<cassert> #include<unistd.h> #include<sys/types.h> #include<sys/wait.h> #include"Task.hpp"
const int process_num = 5; static int number = 1;
class channel { public: channel(int fd, pid_t id) : CtrlFD(fd) , WorkerID(id) { name = "Channel " + std::to_string(number++); } public: int CtrlFD; pid_t WorkerID; std::string name; };
void Work() { while(true) { int code = 0; ssize_t n = read(0,&code,sizeof(code)); if(n==sizeof(code)) { if(!init.CheckSafe(code)) { std::cout<<"子进程未收到正确任务"<<std::endl; continue; } init.RunTask(code); } else if(n==0) { break; } } std::cout<<"子进程退出"<<std::endl;
}
void PrintFd(const std::vector<int>& fds) { std::cout<<getpid()<<"关闭的写口有:"; for(auto fd : fds) { std::cout<<fd<<" "; } std::cout<<std::endl; }
void CreateChannels(std::vector<channel>* channels) { std::vector<int> old_channels; for(int i=0; i<process_num; i++) { int pipefd[2]={0}; int n=pipe(pipefd); assert(n==0); (void)n;
pid_t id = fork(); assert(id!=-1);
if(id==0) { if(!old_channels.empty()) { for(auto fd : old_channels) { close(fd); } PrintFd(old_channels); }
close(pipefd[1]); dup2(pipefd[0],0); Work(); exit(0); }
close(pipefd[0]); channels->push_back(channel(pipefd[1], id)); old_channels.push_back(pipefd[1]); }
}
void SendTaskNum(const std::vector<channel> &c, bool flag, int num = -1) { int pos = 0; while(true) { int TaskNum = init.SelectTask();
const auto & channel = c[pos++]; pos%=c.size(); std::cout<<"指定的任务是"<<init.TaskName(TaskNum)<<"任务编号是"<<TaskNum<<"交付的子进程是"<<channel.WorkerID<<std::endl;
write(channel.CtrlFD, &TaskNum, sizeof(TaskNum));
if(!flag) { num--; if(num<=0) break; } sleep(1); } }
int main() { std::vector<channel> channels; CreateChannels(&channels);
const bool g_always_loop = true; SendTaskNum(channels, !g_always_loop, 10); return 0; }
|
这里我们从主函数看起
在创建完信道之后,我们设置了一个标志,用于表示这个任务是否循环执行
在SendTaskNum函数中,后两个参数表示是否循环执行和循环执行的次数
其次我们写了一个hpp文件,用来表示任务类,将所有任务放到包装器functions中,当作函数指针数组,方便任务类完成集中调用
父进程这里的选取任务算法是随机的,轮流分配给五个信道,让五个信道执行10轮分配的任务
子进程的work是一直循环接收父进程传递的消息,若有效接收,则判断是否为任务列表中的任务,然后执行,若父进程写口关闭,则跳出读取循环,关闭子进程
关闭子进程
1 2 3 4 5 6 7 8 9 10
| void ReleaseChannels(std::vector<channel> c) { for(const auto &channel : c) { close(channel.CtrlFD); waitpid(channel.WorkerID, nullptr, 0); } std::cout<<"子进程释放完毕"<<std::endl; }
|
最后的关闭子进程比较简单,因为我们在创建的过程中已经做好了bug排除,可以从头到尾依次释放
效果

进程池完整代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253
| #pragma once
#include<iostream> #include<functional> #include<vector> #include<ctime> #include<unistd.h>
using task_t = std::function<void()>;
void Task1() { std::cout<<"这是任务1,交给pid:为"<<getpid()<<"运行"<<std::endl; } void Task2() { std::cout<<"这是任务2,交给pid:为"<<getpid()<<"运行"<<std::endl; } void Task3() { std::cout<<"这是任务3,交给pid:为"<<getpid()<<"运行"<<std::endl; } void Task4() { std::cout<<"这是任务4,交给pid:为"<<getpid()<<"运行"<<std::endl; }
class Init { public: Init() { tasks.push_back(Task1); tasks.push_back(Task2); tasks.push_back(Task3); tasks.push_back(Task4);
srand(time(nullptr)^getpid()); }
int SelectTask() { return rand() % tasks.size(); }
std::string TaskName(int code) { switch (code) { case Task1_Code: return "Task1"; break; case Task2_Code: return "Task2"; break; case Task3_Code: return "Task3"; break; case Task4_Code: return "Task4"; break; default: return "Unknow"; break; } }
bool CheckSafe(int code) { if(code>=0&&code<tasks.size()) return true; else return false; }
void RunTask(int code) { return tasks[code](); } public: const static int Task1_Code = 0; const static int Task2_Code = 1; const static int Task3_Code = 2; const static int Task4_Code = 3;
std::vector<task_t> tasks; };
Init init;
#include<iostream> #include<string> #include<vector> #include<cassert> #include<unistd.h> #include<sys/types.h> #include<sys/wait.h> #include"Task.hpp"
const int process_num = 5; static int number = 1;
class channel { public: channel(int fd, pid_t id) : CtrlFD(fd) , WorkerID(id) { name = "Channel " + std::to_string(number++); } public: int CtrlFD; pid_t WorkerID; std::string name; };
void Work() { while(true) { int code = 0; ssize_t n = read(0,&code,sizeof(code)); if(n==sizeof(code)) { if(!init.CheckSafe(code)) { std::cout<<"子进程未收到正确任务"<<std::endl; continue; } init.RunTask(code); } else if(n==0) { break; } } std::cout<<"子进程退出"<<std::endl;
}
void PrintFd(const std::vector<int>& fds) { std::cout<<getpid()<<"关闭的写口有:"; for(auto fd : fds) { std::cout<<fd<<" "; } std::cout<<std::endl; }
void CreateChannels(std::vector<channel>* channels) { std::vector<int> old_channels; for(int i=0; i<process_num; i++) { int pipefd[2]={0}; int n=pipe(pipefd); assert(n==0); (void)n;
pid_t id = fork(); assert(id!=-1);
if(id==0) { if(!old_channels.empty()) { for(auto fd : old_channels) { close(fd); } PrintFd(old_channels); }
close(pipefd[1]); dup2(pipefd[0],0); Work(); exit(0); }
close(pipefd[0]); channels->push_back(channel(pipefd[1], id)); old_channels.push_back(pipefd[1]); }
}
void SendTaskNum(const std::vector<channel> &c, bool flag, int num = -1) { int pos = 0; while(true) { int TaskNum = init.SelectTask();
const auto & channel = c[pos++]; pos%=c.size(); std::cout<<"指定的任务是"<<init.TaskName(TaskNum)<<"任务编号是"<<TaskNum<<"交付的子进程是"<<channel.WorkerID<<std::endl;
write(channel.CtrlFD, &TaskNum, sizeof(TaskNum));
if(!flag) { num--; if(num<=0) break; } sleep(1); } }
void ReleaseChannels(std::vector<channel> c) { for(const auto &channel : c) { close(channel.CtrlFD); waitpid(channel.WorkerID, nullptr, 0); } std::cout<<"子进程释放完毕"<<std::endl; }
int main() { std::vector<channel> channels; CreateChannels(&channels);
const bool g_always_loop = true; SendTaskNum(channels, !g_always_loop, 10);
ReleaseChannels(channels); return 0; }
|