<?php
class asyncmysql
{
protected $poolSize = 20;
protected $freePool = array();
protected $busyPool = array();
protected $waitQueue = array();
protected $waitSize = 50;
protected $serv;
public function run()
{
$serv = new swoole_server("127.0.0.1", 9501);
$serv->on('connect', array($this, 'onConnect'));
$serv->on('receive', array($this, 'onReceive'));
$serv->on('close', array($this, 'onClose'));
$serv->on('workerstart', array($this, 'onWorkerStart'));
$serv->start();
}
public function onConnect($serv, $fd)
{
}
public function onClose($serv, $fd)
{
}
public function onWorkerStart($serv, $work_id)
{
$this->serv = $serv;
for ($i=0; $i < $this->poolSize; $i++) {
$db = new mysqli();
$db->connect('localhost', 'root', '123456', 'swooletest');
//swoole获取mysqli的socket文件描述符,需要php有mysqlnd、mysqli扩展
//swoole编译时需要加入 --enable-async-mysql
$db_sock = swoole_get_mysqli_sock($db);
//将mysqli的socket加入到事件监听
swoole_event_add($db_sock, array($this, 'query'));
$this->freePool[]=array(
'db' => $db,
'db_sock' => $db_sock,
'fd' => 0
);
}
}
public function onReceive($serv, $fd, $from_id, $sql)
{
if (!empty($this->freePool)) {
$this->preQuery($fd, $sql);
} else {
if (count($this->waitQueue) >= $this->waitSize) {
$serv->send($fd, '服务器繁忙,请稍后再试!');
} else {
$this->waitQueue[]=array(
'fd' => $fd,
'sql' => $sql
);
}
}
}
public function preQuery($fd, $sql)
{
$dbArray = array_pop($this->freePool);
$db = $dbArray['db'];
for ($i=0; $i < 2; $i++) {
//mysqli异步query,并不是真正的执行
$rs = $db->query($sql, MYSQLI_ASYNC);
if ($rs === false) {
//尝试一次超时重连
if ($db->errno == 2013 || $db->errno == 2006) {
$db->close();
$r = $db->connect();
if ($r === true) continue;
}
}
break;
}
$dbArray['fd'] = $fd;
$this->busyPool[$dbArray['db_sock']] = $dbArray;
}
public function query($db_sock)
{
$dbArray = $this->busyPool[$db_sock];
$db = $dbArray['db'];
$fd = $dbArray['fd'];
//执行mysqli异步查询
$rs = $db->reap_async_query();
if ($rs === true) {
$this->serv->send($fd, '操作成功!');
} elseif ($rs === false) {
$this->serv->send($fd, '操作失败!');
} else {
$rs = $rs->fetch_all(MYSQLI_ASSOC);
$this->serv->send($fd, json_encode($rs, JSON_UNESCAPED_UNICODE));
unset($rs);
}
$this->freePool[]=$dbArray;
unset($this->busyPool[$db_sock]);
//如果等待队列中有未执行的SQL,循环所有的空闲连接执行SQL
if (count($this->waitQueue) > 0) {
$freeCount = count($this->freePool);
for ($i=0; $i < $freePool; $i++) {
$waitArray = array_shift($this->waitQueue);
$this->preQuery($waitArray['fd'], $waitArray['sql']);
}
}
}
}
$test = new asyncmysql();
$test->run();