1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 |
<?php //本文摘自鸟哥的博客 http://www.laruence.com/2015/05/28/3038.html //重新整理一遍只是为了加深记忆和理解 function xrange($start, $end, $step=1) { for ($i = $start; $i <= $end; $i += $step) { yield $i; } } foreach (xrange(1,10,1) as $v) { echo $v,'<br>'; } foreach (range(1,10,1) as $v) { echo $v,'<br>'; } var_dump(xrange(1,10,1)); var_dump(range(1,10,1)); //虽然foreach输出是一样的,但是var_dump可以看出,xrange返回的是Generator生成器对象,而range返回的是数组 //数据量足够大的时候,xrange的优势在于可以不用一次性把数据加载到内存中 //yield是生成器的断点 //http://php.net/manual/zh/class.generator.php //Generator不能通过new操作来实例化 /* class Generator implements Iterator { public function rewind(); // 返回到迭代器的第一个元素。 public function valid(); // 返回false如果迭代器已经关闭,否则返回true public function current(); // 返回当前yield值. public function key(); // 返回当前yield键名. public function next(); // 恢复生成器的执行。 public function send($value); // 将传入的值作为yield表达式的结果并且恢复发生器的执行。 } */ //协程 // 协程的支持是在迭代生成器的基础上, 增加了可以回送数据给生成器的功能(调用者发送数据给被调用的生成器函数). 这就把生成器到调用者的单向通信转变为两者之间的双向通信. function myLog($filename) { while (true) { file_put_contents($filename, yield . "\n", FILE_APPEND); } } $log = myLog(__DIR__ . '/test.log'); $log->send('first row'); $log->send('second row'); //这里的yield并没有作为一个语句使用,可以理解为调用send()方法的参数值的占位符 function gen() { $ret = (yield 'yield1'); var_dump($ret); $ret = (yield 'yield2'); var_dump($ret); } $gen = gen(); //$gen->current()为第一个yield断点,其值为yield1,var_dump()将它输出 var_dump($gen->current()); //$gen->send('ret1')将ret1替换掉第一个yield的值,然后继续走下去,在gen()函数内部将它var_dump()了 //此时碰到第2个yield断点,中止运行,此时var_dump输出的就是第2个yield的值,yield2 var_dump($gen->send('ret1')); //$gen->current()为第二个yield断点,其值为yield2,var_dump()将它输出,增加这步更易理解 var_dump($gen->current()); //$gen->send('ret2')将ret2替换掉第二个yield的值,然后继续走下去,在gen()函数内部将它var_dump()了 //此时没有yield断点了,函数返回null var_dump($gen->send('ret2')); //php7以后(yield 'yield1')不是必填了 //生成迭代对象的时候已经隐含地执行了rewind操作 function gen1() { yield 'foo'; yield 'bar'; } $gen1 = gen1(); //下面一行代码会输出bar,因为调用send方法后,第一个yield被执行了,替换成something,然而函数体内没有操作这个值,所以继续往下走(相当于执行了next()方法),到第2个yield中断,所以var_dump()输出第2个yield的值 var_dump($gen1->send('something')); //任务类,需要一个生成器对象,可以理解为通过yield关键字返回的对象 class Task { protected $taskId; protected $coroutine; protected $sendValue = null; protected $beforeFirstYield = true; public function __construct($taskId, Generator $coroutine) { $this->taskId = $taskId; $this->coroutine = $coroutine; } public function getTaskId() { return $this->taskId; } public function setSendValue($sendValue) { $this->sendValue = $sendValue; } public function run() { if ($this->beforeFirstYield) { $this->beforeFirstYield = false; return $this->coroutine->current(); } else { $retval = $this->coroutine->send($this->sendValue); $this->sendValue = null; return $retval; } } public function isFinished() { return !$this->coroutine->valid(); } } //调度器类,用来调度任务 class Scheduler { protected $maxTaskId = 0; protected $taskMap = []; // taskId => task protected $taskQueue; public function __construct() { $this->taskQueue = new SplQueue(); } public function newTask(Generator $coroutine) { $tid = ++$this->maxTaskId; $task = new Task($tid, $coroutine); $this->taskMap[$tid] = $task; $this->schedule($task); return $tid; } public function schedule(Task $task) { $this->taskQueue->enqueue($task); } public function run() { while (!$this->taskQueue->isEmpty()) { $task = $this->taskQueue->dequeue(); $task->run(); if ($task->isFinished()) { unset($this->taskMap[$task->getTaskId()]); } else { $this->schedule($task); } } } } //task1生成器对象 function task1() { for ($i = 1; $i <= 10; ++$i) { echo "This is task 1 iteration $i.<br>"; yield; } } //task2生成器对象 function task2() { for ($i = 1; $i <= 5; ++$i) { echo "This is task 2 iteration $i.<br>"; yield; } } //运行流程:newTask()会将新建的task对象推进队列 //执行run()方法,取出队列中的第一个task对象,执行task对象的run()方法,该方法会判断是否在第一次yield之前,分别调用current或send //此时会运行到task1的yield断点,在yield之前会有一段输出,然后判断这个对象是否该释放或继续调度(放回调度队列) //循环往下走,继续取出队列中的第2个task对象,这个对象属于task2(),接着同以上流程... $scheduler = new Scheduler; $scheduler->newTask(task1()); $scheduler->newTask(task2()); $scheduler->run(); //系统调用类 class SystemCall { protected $callback; public function __construct(callable $callback) { $this->callback = $callback; } public function __invoke(Task $task, Scheduler $scheduler) { $callback = $this->callback; return $callback($task, $scheduler); } } //修改调度器的run()方法 public function run() { while (!$this->taskQueue->isEmpty()) { $task = $this->taskQueue->dequeue(); $retval = $task->run(); if ($retval instanceof SystemCall) { $retval($task, $this); continue; } if ($task->isFinished()) { unset($this->taskMap[$task->getTaskId()]); } else { $this->schedule($task); } } } //实例化一个系统调用类,将当前task的taskId,传给sendValue()调度的回调方法传给系统调用类 function getTaskId() { return new SystemCall(function(Task $task, Scheduler $scheduler) { $task->setSendValue($task->getTaskId()); $scheduler->schedule($task); }); } function task($max) { $tid = (yield getTaskId()); //系统调用 for ($i = 1; $i <= $max; ++$i) { echo "This is task $tid iteration $i.<br>"; yield; } } //这个案例的输出值跟上面那个一样,运行流程:基本同上面那个类似 //不同的是,当执行调度器的run()方法时,会判断返回值是否是SystemCall的实例 //而task()第一个断点处的SystemCall实例,会被Task的run()方法的current()调用到 //然后会执行回调,将当前task的taskId传给setSendValue(),最后把任务扔回调度器 $scheduler = new Scheduler; $scheduler->newTask(task(10)); $scheduler->newTask(task(5)); $scheduler->run(); //新建任务 function newTask(Generator $coroutine) { return new SystemCall( function(Task $task, Scheduler $scheduler) use ($coroutine) { $task->setSendValue($scheduler->newTask($coroutine)); $scheduler->schedule($task); } ); } //杀掉任务 function killTask($tid) { return new SystemCall( function(Task $task, Scheduler $scheduler) use ($tid) { $task->setSendValue($scheduler->killTask($tid)); $scheduler->schedule($task); } ); } //在调度器中新增杀掉任务的方法 public function killTask($tid) { if (!isset($this->taskMap[$tid])) { return false; } unset($this->taskMap[$tid]); foreach ($this->taskQueue as $i => $task) { if ($task->getTaskId() === $tid) { unset($this->taskQueue[$i]); break; } } return true; } function childTask() { $tid = (yield getTaskId()); while (true) { echo "Child task $tid still alive!<br>"; yield; } } function task() { $tid = (yield getTaskId()); $childTid = (yield newTask(childTask())); for ($i = 1; $i <= 6; ++$i) { echo "Parent task $tid iteration $i.<br>"; yield; if ($i == 3) yield killTask($childTid); } } //第一次current()方法会碰到getTaskId()函数中的SystemCall实例,然后setSendValue($taskId),然后在第一个yield处返回刚才设置的taskId //代码继续走下去,跟上面类似,获得了childTid,然后进入交替进入for和while循环,当$i==3时,杀掉了childTask, //所以从第4个起,childTask就不在输出,注意if($i==3)放在for中的yield;下方,所以第4个起的子任务被杀了, //如果放在yield;上方,则第3个起的子任务就会被杀掉 $scheduler = new Scheduler; $scheduler->newTask(task()); $scheduler->run(); |