CSAPP=并发编程
?php// CSAPP 第12章 并发编程 - PHP 大白话演示 // I/O多路复用用 stream_select 真做; 线程/信号量用逻辑模拟讲清语义declare(strict_types1);functionT($n,$t){echo\n {$n}. {$t} \n;}// 并发三大流派总览echo并发编程三大流派:\n;echo 1) 多进程 : 每客户端fork一个进程. 隔离好, 但开销大, 共享数据难\n;echo 2) I/O多路复用: 单进程用select/epoll盯多个fd. 高效, 但逻辑复杂(状态机)\n;echo 3) 多线程 : 单进程多线程. 共享内存方便, 但要小心同步\n;// 287 基于进程的并发T(287,基于进程的并发);echo服务器accept后fork: 子进程处理这个客户端, 父进程继续accept下一个\n;echoC关键点:\n;echo - 父进程必须close已连接socket, 子进程必须close监听socket (否则fd泄漏)\n;echo - 必须回收僵尸子进程: SIGCHLD handler里 waitpid(-1,s,WNOHANG)循环\n;echo优点: 进程隔离, 一个崩了不影响别的; 共享文件表自然\n;echo缺点: 进程开销大(fork~微秒级), 进程间共享状态要IPC\n;echo - 第8章的fork第11章的accept 经典并发服务器\n;// 288 I/O多路复用: 真做! 单进程同时服务多个客户端T(288,I/O多路复用 - 真起一个并发echo服务器);echo核心: select/poll/epoll 同时监视多个fd, 哪个就绪处理哪个, 单线程搞定并发\n\n;$port9100(getmypid()%800);$addr127.0.0.1:$port;$serverstream_socket_server(tcp://$addr,$errno,$errstr);stream_set_blocking($server,false);if(!$server){echo起服务器失败\n;}else{echo并发服务器 listen $addr (非阻塞)\n;// 起3个客户端子进程, 几乎同时连入$clients[];for($i1;$i3;$i){$codeusleep(.($i*20000).);$fpstream_socket_client(tcp://.$addr.,$e,$s,3);if($fp){for($k1;$k2;$k){fwrite($fp,client.$i.-msg$k\n);echotrim(fread($fp,256)).\n;usleep(10000);}fwrite($fp,BYE\n);fclose($fp);};$clients[$i]proc_open([PHP_BINARY,-r,$code],[1[pipe,w],2[pipe,w]],$pipes);$clientPipes[$i]$pipes;}// 事件循环: 这就是I/O多路复用的核心 $conns[];// 已连接客户端 fd 池$served0;$activeClients0;$totalMsgs0;$startTimetime();while(time()-$startTime5){// 构造要监视的读集合: 监听socket 所有已连接socket$readSet$conns;$readSet[]$server;$writenull;$exceptnull;// select: 阻塞直到任意fd就绪 (有超时)$readystream_select($readSet,$write,$except,1);if($readyfalse||$ready0){if(empty($conns)$served3)break;// 全部处理完continue;}foreach($readSet as $fd){if($fd$server){// 监听socket就绪 有新连接$connstream_socket_accept($server,0);if($conn){stream_set_blocking($conn,false);$conns[]$conn;$served;$activeClients;$peerstream_socket_get_name($conn,true);echo [事件] 新连接 #$served 来自 $peer (当前活跃: $activeClients)\n;}}else{// 已连接socket就绪 客户端发来数据$keyarray_search($fd,$conns,true);$linefgets($fd);if($linefalse||$line){// 客户端断开fclose($fd);unset($conns[$key]);$activeClients--;continue;}$linertrim($line);if($lineBYE){fclose($fd);unset($conns[$key]);$activeClients--;echo [事件] 一个客户端告别 (剩余活跃: $activeClients)\n;continue;}$totalMsgs;fwrite($fd,ECHO[$line]\n);// 回显}}$connsarray_values($conns);}echo事件循环结束: 单进程服务了 $served 个客户端, 共 $totalMsgs 条消息\n;foreach($clients as $i$proc){$outstream_get_contents($clientPipes[$i][1]);fclose($clientPipes[$i][1]);fclose($clientPipes[$i][2]);proc_close($proc);echo 客户端$i 收到: .str_replace(\n, | ,trim($out)).\n;}fclose($server);echo★ 关键: 始终只有1个进程1个线程, 却同时服务了3个客户端!\n;echo 这就是 nginx / redis / Node.js 的并发模型 (epoll事件循环)\n;}// 289-292 基于线程的并发T(289,基于线程的并发);echo线程 进程内的多个执行流, 共享同一地址空间(代码/数据/堆)\n;echo比进程轻: 创建快, 切换快, 共享数据直接读写(不用IPC)\n;echo比进程险: 共享数据要同步, 一个线程崩可能拖垮整个进程\n;T(290,线程执行模型);echo一个进程 主线程 若干对等线程(peer threads)\n;echo每个线程有自己的: 线程ID(tid) / 栈 / 栈指针 / PC / 通用寄存器\n;echo所有线程共享: 代码 / 全局数据 / 堆 / 打开的文件\n;echo对等关系: 任何线程都能kill/join其它线程, 没有严格父子树(不像进程)\n;T(291,Posix线程 Pthreads);echo标准C线程API:\n;echo pthread_create(tid, NULL, routine, arg) 创建\n;echo pthread_join(tid, retval) 等待并回收\n;echo pthread_self() 获取自己tid\n;echo pthread_detach(tid) 分离(自动回收,不用join)\n;echo pthread_exit(retval) 线程退出\n;echoPHP没内置线程, 用 parallel 扩展 或 多进程替代\n;T(292,创建与回收线程 - 用并行进程模拟);// PHP无线程, 用并行子进程演示多个执行流同时跑$workers[];$t0microtime(true);for($i1;$i4;$i){$codeusleep(50000); echo worker.$i. 算出 ..(.$i.*.$i.);;$workers[$i]proc_open([PHP_BINARY,-r,$code],[1[pipe,w]],$p);$wp[$i]$p;}echo主线程创建了4个worker, 它们同时运行:\n;foreach($workers as $i$proc){// join: 等每个回收$outstream_get_contents($wp[$i][1]);fclose($wp[$i][1]);proc_close($proc);echo join worker$i - $out\n;}printf(4个worker并行总耗时 %.0fms (若串行需200ms)\n,(microtime(true)-$t0)*1000);// 293 共享变量T(293,共享变量);echo全局变量、堆上对象 - 所有线程可见可改 - 这是线程通信的方式, 也是bug之源\n;echo局部变量在各自栈上 - 通常私有 (但传地址出去就可能被共享)\n;echo判断一个变量是否共享: 看它是否被多个线程引用到\n;// 294-295 信号量与互斥锁: 逻辑模拟T(294,信号量同步);echo信号量 s 一个非负整数, 两个原子操作:\n;echo P(s): 等到 s0, 然后 s-- (申请资源)\n;echo V(s): s (释放资源), 可能唤醒等待者\n;echo原子性是关键: P/V不可被打断, 否则计数会错\n\n;// 用类模拟信号量语义(单进程内顺序演示)class Semaphore{privateint$count;private array $waitLog[];public function__construct(int$init){$this-count$init;}public functionP(string $who):bool{// 申请if($this-count0){$this-count--;echo $who P() 成功, 剩余资源$this-count\n;returntrue;}echo $who P() 阻塞 (资源0, 排队等待)\n;$this-waitLog[]$who;returnfalse;}public functionV(string $who):void{// 释放$this-count;$wake$this-waitLog? - 唤醒 .array_shift($this-waitLog):;echo $who V() 释放, 资源$this-count$wake\n;}public functionval():int{return$this-count;}}T(295,互斥锁 mutex 二元信号量(初值1));echo保护临界区: 同一时刻只有一个线程能进\n;$mutexnewSemaphore(1);// 初值1 锁echo两个线程争抢计数器 cnt (临界区):\n;$mutex-P(线程A);// A拿锁echo 线程A 进入临界区, cnt: 0 - 1\n;$mutex-P(线程B);// B想拿锁, 阻塞$mutex-V(线程A);// A放锁, 唤醒Becho 线程B 进入临界区, cnt: 1 - 2\n;$mutex-V(线程B);echo结果 cnt2 正确. 若无锁, 两个 cnt 可能交错 - 丢更新, 结果1\n;// 296 生产者-消费者: 真实演示有界缓冲T(296,生产者-消费者 (有界缓冲区));echo三个信号量协作:\n;echo mutex1 : 保护缓冲区互斥访问\n;echo slotsN : 还有几个空位 (生产者P它)\n;echo items0 : 已有几个产品 (消费者P它)\n\n;class BoundedBuffer{private array $buf[];privateint$cap;private Semaphore $mutex,$slots,$items;public function__construct(int$n){$this-cap$n;$this-mutexnewSemaphore(1);$this-slotsnewSemaphore($n);// 初始n个空位$this-itemsnewSemaphore(0);// 初始0个产品}public functionproduce($x):void{// P(slots): 等空位; P(mutex): 锁; 放; V(mutex); V(items): 产品1if($this-slots-val()0){echo 生产者: 缓冲满, 等待消费\n;return;}$this-slots-P(生产者);$this-mutex-P(生产者);$this-buf[]$x;echo 生产 $x, 缓冲区: [.implode(,,$this-buf).]\n;$this-mutex-V(生产者);$this-items-V(生产者);}public functionconsume():mixed{if($this-items-val()0){echo 消费者: 缓冲空, 等待生产\n;returnnull;}$this-items-P(消费者);$this-mutex-P(消费者);$xarray_shift($this-buf);echo 消费 $x, 缓冲区: [.implode(,,$this-buf).]\n;$this-mutex-V(消费者);$this-slots-V(消费者);return$x;}}$bbnewBoundedBuffer(3);echo容量3的缓冲区, 生产者生产5个, 消费者穿插消费:\n;$bb-produce(A);$bb-produce(B);$bb-consume();$bb-produce(C);$bb-produce(D);$bb-produce(E);$bb-produce(F);// 此时满, 触发等待$bb-consume();$bb-consume();// 297 线程池/预线程化T(297,预线程化 (线程池));echo问题: 每来一个连接create一个线程, 创建/销毁开销大\n;echo方案: 启动时预创建一批worker线程, 主线程accept后把连接放队列\n;echo worker从队列取连接处理, 处理完不退出, 回去取下一个\n;echo 主线程 --accept-- [连接队列] --取-- worker池(N个线程)\n;echo现代等价: nginx worker进程 / PHP-FPM进程池 / Java线程池 / Go的goroutine调度\n;echo队列本身用 生产者-消费者 模型保护 (主线程生产, worker消费)\n;// 298 线程安全T(298,线程安全);echo线程安全函数: 被多个线程同时调用也能正确工作\n;echo四类不安全函数:\n;echo 1) 不保护共享变量 - 加锁修复\n;echo 2) 跨调用保持状态(如rand种子) - 改成传入状态\n;echo 3) 返回指向静态变量的指针(如ctime/gethostbyname) - 用可重入版_r\n;echo 4) 调用了线程不安全的函数 - 传染, 整个链不安全\n;// 299 可重入性T(299,可重入性 reentrant);echo可重入函数 不引用任何共享数据 - 天然线程安全(且更强)\n;echo 显式可重入: 完全不碰静态/全局, 只用参数和局部变量\n;echo 例: strtok(不可重入,有静态状态) vs strtok_r(可重入,状态由调用者传入)\n;echo rand vs rand_r, gethostbyname vs getaddrinfo(可重入,所以推荐)\n;echo关系: 可重入 ⊂ 线程安全 (可重入是线程安全的真子集)\n;// 300 竞争与死锁T(300,竞争 race 与 死锁 deadlock);echo【竞争】程序正确性依赖线程到达某点的相对顺序\n;echo 经典坑: 主线程for循环里 pthread_create(tid, ..., i), 线程读*i\n;echo - 主线程的i可能已经变了 - 传值而非传地址才对\n\n;echo【死锁】一组线程互相等待对方持有的资源, 都卡死\n;// 死锁演示: 两个锁, 两个线程以相反顺序获取echo 线程1: lock(A) 然后想 lock(B)\n;echo 线程2: lock(B) 然后想 lock(A)\n;echo 时序: T1锁A, T2锁B, T1等B(被T2占), T2等A(被T1占) - 死锁!\n\n;echo死锁四条件(全满足才死锁):\n;foreach([互斥资源一次只能一个线程占,持有等待占着一个还要等另一个,不可抢占不能强夺别人的资源,循环等待形成等待环 T1-T2-T1,]as $k$v)echo - $k: $v\n;echo破解: 最常用按固定全局顺序加锁(破坏循环等待)\n;echo 两个线程都先lock(A)再lock(B) - 永不成环 - 无死锁\n;echo\n----- 全书总结 -----\n;echo并发是现代系统的核心: 多核要并行, I/O要重叠, 服务器要同时服务海量用户\n;echo三种模型各有取舍:\n;echo 进程: 隔离最好,开销最大 (Chrome每标签一进程)\n;echo I/O多路复用: 单线程高并发 (nginx/redis/Node)\n;echo 线程: 共享方便,同步最难 (传统Java/C服务器)\n;echo现代趋势: 协程/async (Go goroutine, Rust async, PHP Swoole/Fiber) -- 兼顾三者优点\n;echo\n 第12章演示完毕 / CSAPP全书完结 \n;