1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 |
<?php class asyncmysql { protected $poolSize = 20; protected $freePool = array(); protected $busyPool = array(); protected $waitQueue = array(); protected $waitSize = 50; protected $serv; public function run() { $serv = new swoole_server("127.0.0.1", 9501); $serv->on('connect', array($this, 'onConnect')); $serv->on('receive', array($this, 'onReceive')); $serv->on('close', array($this, 'onClose')); $serv->on('workerstart', array($this, 'onWorkerStart')); $serv->start(); } public function onConnect($serv, $fd) { } public function onClose($serv, $fd) { } public function onWorkerStart($serv, $work_id) { $this->serv = $serv; for ($i=0; $i < $this->poolSize; $i++) { $db = new mysqli(); $db->connect('localhost', 'root', '123456', 'swooletest'); //swoole获取mysqli的socket文件描述符,需要php有mysqlnd、mysqli扩展 //swoole编译时需要加入 --enable-async-mysql $db_sock = swoole_get_mysqli_sock($db); //将mysqli的socket加入到事件监听 swoole_event_add($db_sock, array($this, 'query')); $this->freePool[]=array( 'db' => $db, 'db_sock' => $db_sock, 'fd' => 0 ); } } public function onReceive($serv, $fd, $from_id, $sql) { if (!empty($this->freePool)) { $this->preQuery($fd, $sql); } else { if (count($this->waitQueue) >= $this->waitSize) { $serv->send($fd, '服务器繁忙,请稍后再试!'); } else { $this->waitQueue[]=array( 'fd' => $fd, 'sql' => $sql ); } } } public function preQuery($fd, $sql) { $dbArray = array_pop($this->freePool); $db = $dbArray['db']; for ($i=0; $i < 2; $i++) { //mysqli异步query,并不是真正的执行 $rs = $db->query($sql, MYSQLI_ASYNC); if ($rs === false) { //尝试一次超时重连 if ($db->errno == 2013 || $db->errno == 2006) { $db->close(); $r = $db->connect(); if ($r === true) continue; } } break; } $dbArray['fd'] = $fd; $this->busyPool[$dbArray['db_sock']] = $dbArray; } public function query($db_sock) { $dbArray = $this->busyPool[$db_sock]; $db = $dbArray['db']; $fd = $dbArray['fd']; //执行mysqli异步查询 $rs = $db->reap_async_query(); if ($rs === true) { $this->serv->send($fd, '操作成功!'); } elseif ($rs === false) { $this->serv->send($fd, '操作失败!'); } else { $rs = $rs->fetch_all(MYSQLI_ASSOC); $this->serv->send($fd, json_encode($rs, JSON_UNESCAPED_UNICODE)); unset($rs); } $this->freePool[]=$dbArray; unset($this->busyPool[$db_sock]); //如果等待队列中有未执行的SQL,循环所有的空闲连接执行SQL if (count($this->waitQueue) > 0) { $freeCount = count($this->freePool); for ($i=0; $i < $freePool; $i++) { $waitArray = array_shift($this->waitQueue); $this->preQuery($waitArray['fd'], $waitArray['sql']); } } } } $test = new asyncmysql(); $test->run(); |