���� JFIF �� � ( %"1"%)+...383,7(-.-
![]() Server : Apache/2.4.6 (CentOS) OpenSSL/1.0.2k-fips PHP/7.4.20 System : Linux st2.domain.com 3.10.0-1127.10.1.el7.x86_64 #1 SMP Wed Jun 3 14:28:03 UTC 2020 x86_64 User : apache ( 48) PHP Version : 7.4.20 Disable Function : NONE Directory : /var/www/html/trader/vendor/workerman/workerman/Events/ |
<?php /** * This file is part of workerman. * * Licensed under The MIT License * For full copyright and license information, please see the MIT-LICENSE.txt * Redistributions of files must retain the above copyright notice. * * @author walkor<[email protected]> * @copyright walkor<[email protected]> * @link http://www.workerman.net/ * @license http://www.opensource.org/licenses/mit-license.php MIT License */ namespace Workerman\Events; use Throwable; use Workerman\Worker; /** * select eventloop */ class Select implements EventInterface { /** * All listeners for read/write event. * * @var array */ public $_allEvents = array(); /** * Event listeners of signal. * * @var array */ public $_signalEvents = array(); /** * Fds waiting for read event. * * @var array */ protected $_readFds = array(); /** * Fds waiting for write event. * * @var array */ protected $_writeFds = array(); /** * Fds waiting for except event. * * @var array */ protected $_exceptFds = array(); /** * Timer scheduler. * {['data':timer_id, 'priority':run_timestamp], ..} * * @var \SplPriorityQueue */ protected $_scheduler = null; /** * All timer event listeners. * [[func, args, flag, timer_interval], ..] * * @var array */ protected $_eventTimer = array(); /** * Timer id. * * @var int */ protected $_timerId = 1; /** * Select timeout. * * @var int */ protected $_selectTimeout = 100000000; /** * Paired socket channels * * @var array */ protected $channel = array(); /** * Construct. */ public function __construct() { // Init SplPriorityQueue. $this->_scheduler = new \SplPriorityQueue(); $this->_scheduler->setExtractFlags(\SplPriorityQueue::EXTR_BOTH); } /** * {@inheritdoc} */ public function add($fd, $flag, $func, $args = array()) { switch ($flag) { case self::EV_READ: case self::EV_WRITE: $count = $flag === self::EV_READ ? \count($this->_readFds) : \count($this->_writeFds); if ($count >= 1024) { echo "Warning: system call select exceeded the maximum number of connections 1024, please install event/libevent extension for more connections.\n"; } else if (\DIRECTORY_SEPARATOR !== '/' && $count >= 256) { echo "Warning: system call select exceeded the maximum number of connections 256.\n"; } $fd_key = (int)$fd; $this->_allEvents[$fd_key][$flag] = array($func, $fd); if ($flag === self::EV_READ) { $this->_readFds[$fd_key] = $fd; } else { $this->_writeFds[$fd_key] = $fd; } break; case self::EV_EXCEPT: $fd_key = (int)$fd; $this->_allEvents[$fd_key][$flag] = array($func, $fd); $this->_exceptFds[$fd_key] = $fd; break; case self::EV_SIGNAL: // Windows not support signal. if(\DIRECTORY_SEPARATOR !== '/') { return false; } $fd_key = (int)$fd; $this->_signalEvents[$fd_key][$flag] = array($func, $fd); \pcntl_signal($fd, array($this, 'signalHandler')); break; case self::EV_TIMER: case self::EV_TIMER_ONCE: $timer_id = $this->_timerId++; $run_time = \microtime(true) + $fd; $this->_scheduler->insert($timer_id, -$run_time); $this->_eventTimer[$timer_id] = array($func, (array)$args, $flag, $fd); $select_timeout = ($run_time - \microtime(true)) * 1000000; $select_timeout = $select_timeout <= 0 ? 1 : $select_timeout; if( $this->_selectTimeout > $select_timeout ){ $this->_selectTimeout = (int) $select_timeout; } return $timer_id; } return true; } /** * Signal handler. * * @param int $signal */ public function signalHandler($signal) { \call_user_func_array($this->_signalEvents[$signal][self::EV_SIGNAL][0], array($signal)); } /** * {@inheritdoc} */ public function del($fd, $flag) { $fd_key = (int)$fd; switch ($flag) { case self::EV_READ: unset($this->_allEvents[$fd_key][$flag], $this->_readFds[$fd_key]); if (empty($this->_allEvents[$fd_key])) { unset($this->_allEvents[$fd_key]); } return true; case self::EV_WRITE: unset($this->_allEvents[$fd_key][$flag], $this->_writeFds[$fd_key]); if (empty($this->_allEvents[$fd_key])) { unset($this->_allEvents[$fd_key]); } return true; case self::EV_EXCEPT: unset($this->_allEvents[$fd_key][$flag], $this->_exceptFds[$fd_key]); if(empty($this->_allEvents[$fd_key])) { unset($this->_allEvents[$fd_key]); } return true; case self::EV_SIGNAL: if(\DIRECTORY_SEPARATOR !== '/') { return false; } unset($this->_signalEvents[$fd_key]); \pcntl_signal($fd, SIG_IGN); break; case self::EV_TIMER: case self::EV_TIMER_ONCE; unset($this->_eventTimer[$fd_key]); return true; } return false; } /** * Tick for timer. * * @return void */ protected function tick() { $tasks_to_insert = []; while (!$this->_scheduler->isEmpty()) { $scheduler_data = $this->_scheduler->top(); $timer_id = $scheduler_data['data']; $next_run_time = -$scheduler_data['priority']; $time_now = \microtime(true); $this->_selectTimeout = (int) (($next_run_time - $time_now) * 1000000); if ($this->_selectTimeout <= 0) { $this->_scheduler->extract(); if (!isset($this->_eventTimer[$timer_id])) { continue; } // [func, args, flag, timer_interval] $task_data = $this->_eventTimer[$timer_id]; if ($task_data[2] === self::EV_TIMER) { $next_run_time = $time_now + $task_data[3]; $tasks_to_insert[] = [$timer_id, -$next_run_time]; } try { \call_user_func_array($task_data[0], $task_data[1]); } catch (Throwable $e) { Worker::stopAll(250, $e); } if (isset($this->_eventTimer[$timer_id]) && $task_data[2] === self::EV_TIMER_ONCE) { $this->del($timer_id, self::EV_TIMER_ONCE); } } else { break; } } foreach ($tasks_to_insert as $item) { $this->_scheduler->insert($item[0], $item[1]); } if (!$this->_scheduler->isEmpty()) { $scheduler_data = $this->_scheduler->top(); $next_run_time = -$scheduler_data['priority']; $time_now = \microtime(true); $this->_selectTimeout = \max((int) (($next_run_time - $time_now) * 1000000), 0); return; } $this->_selectTimeout = 100000000; } /** * {@inheritdoc} */ public function clearAllTimer() { $this->_scheduler = new \SplPriorityQueue(); $this->_scheduler->setExtractFlags(\SplPriorityQueue::EXTR_BOTH); $this->_eventTimer = array(); } /** * {@inheritdoc} */ public function loop() { while (1) { if(\DIRECTORY_SEPARATOR === '/') { // Calls signal handlers for pending signals \pcntl_signal_dispatch(); } $read = $this->_readFds; $write = $this->_writeFds; $except = $this->_exceptFds; $ret = false; if ($read || $write || $except) { // Waiting read/write/signal/timeout events. try { $ret = @stream_select($read, $write, $except, 0, $this->_selectTimeout); } catch (\Exception $e) {} catch (\Error $e) {} } else { $this->_selectTimeout >= 1 && usleep($this->_selectTimeout); } if (!$this->_scheduler->isEmpty()) { $this->tick(); } if (!$ret) { continue; } if ($read) { foreach ($read as $fd) { $fd_key = (int)$fd; if (isset($this->_allEvents[$fd_key][self::EV_READ])) { \call_user_func_array($this->_allEvents[$fd_key][self::EV_READ][0], array($this->_allEvents[$fd_key][self::EV_READ][1])); } } } if ($write) { foreach ($write as $fd) { $fd_key = (int)$fd; if (isset($this->_allEvents[$fd_key][self::EV_WRITE])) { \call_user_func_array($this->_allEvents[$fd_key][self::EV_WRITE][0], array($this->_allEvents[$fd_key][self::EV_WRITE][1])); } } } if($except) { foreach($except as $fd) { $fd_key = (int) $fd; if(isset($this->_allEvents[$fd_key][self::EV_EXCEPT])) { \call_user_func_array($this->_allEvents[$fd_key][self::EV_EXCEPT][0], array($this->_allEvents[$fd_key][self::EV_EXCEPT][1])); } } } } } /** * Destroy loop. * * @return void */ public function destroy() { } /** * Get timer count. * * @return integer */ public function getTimerCount() { return \count($this->_eventTimer); } }