1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
<?php class Queue{ static $redis = null; public function __construct() { } // public function setConnect($conn, $port) { if (Queue::$redis == null) { Queue::$redis = new \Redis(); Queue::$redis->connect($conn, $port); } } // public function push($key, $callback, $params, $level=false) { $redis = Queue::$redis; if ($redis == null) { return; } $data = array( 'callback' => $callback, 'params' => $params ); if ($level) { $redis->rPush($key, json_encode($data)); } else { $redis->lPush($key, json_encode($data)); } } } $queue = new Queue(); $queue->setConnect('127.0.0.1', 6379); //测试插入 for ($i=200; $i < 300; ++$i) { $queue->push('redisKey', array('Message', 'sendMess'), array('id' => $i, 'content' => '消息' . $i)); } echo 'end'; |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
<?php class Task{ static $redis = null; private $key = null; public function __construct() { } // public function setKey($key) { $this->key = $key; } // public function setConnect($conn, $port) { if (Task::$redis == null) { Task::$redis = new \Redis(); Task::$redis->connect($conn, $port); } } // public function doTask() { if ($this->key == null || Task::$redis == null) { return 'key or connect error'; } while ( $rs = Task::$redis->rpop($this->key)) { $data = json_decode($rs, true); call_user_func_array($data['callback'], $data['params']); } return 'end'; } } // class Message{ static $db = null; public static function sendMess($id, $content){ if (Message::$db == null) { Message::$db = new \PDO('mysql:host=127.0.0.1;port=3306;dbname=test;', 'root', '', array(PDO::MYSQL_ATTR_INIT_COMMAND => "set names 'utf8'")); } Message::$db->query("insert into message values ('{$id}','{$content}')"); } } // $task = new Task(); $task->setConnect('127.0.0.1', 6379); $task->setKey('redisKey'); $rs = $task->doTask(); var_dump($rs); |