����JFIF��� ( %"1"%)+...383,7(-.- 404 Not Found
Sh3ll
OdayForums


Server : Apache/2.4.6 (CentOS) OpenSSL/1.0.2k-fips PHP/7.4.20
System : Linux st2.domain.com 3.10.0-1127.10.1.el7.x86_64 #1 SMP Wed Jun 3 14:28:03 UTC 2020 x86_64
User : apache ( 48)
PHP Version : 7.4.20
Disable Function : NONE
Directory :  /var/www/html/trader/vendor/linwj/binance/src/Api/WebSocket/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Current File : //var/www/html/trader/vendor/linwj/binance/src/Api/WebSocket/SocketServer.php
<?php
/**
 * @author lin <[email protected]>
 * */

namespace Lin\Binance\Api\WebSocket;

use Lin\Binance\Api\WebSocket\SocketGlobal;
use Lin\Binance\Api\WebSocket\SocketFunction;
use Workerman\Lib\Timer;
use Workerman\Worker;
use Workerman\Connection\AsyncTcpConnection;

class SocketServer
{
    use SocketGlobal;
    use SocketFunction;

    private $worker;

    private $connection=[];
    private $connectionIndex=0;
    private $config=[];
    private $local_global=['public'=>[],'private'=>[]];

    function __construct(array $config=[])
    {
        $this->config=$config;
    }

    public function start(){
        $this->worker = new Worker();
        $this->server();

        $this->worker->onWorkerStart = function() {
            $this->addConnection('public');
        };

        Worker::runAll();
    }

    private function addConnection(string $tag,array $keysecret=[]){
        $this->newConnection()($tag,$keysecret);
    }

    private function getBaseUrl($global,$keysecret){
        $baseurl=isset($this->config['baseurl']) ? $this->config['baseurl'] : 'ws://stream.binance.com:9443';

        if(empty($keysecret)) {
            $baseurl.='/stream';
        }else{
            //$baseurl='ws://stream.binance.com:9443/ws/w2DAeRP0jmuWT0yEw01gAfM9BUAwdwSW9cjU4VKyesUWtHe4Uw3Ch82gp9D6';
            $client_keysecret=$global->get('keysecret');
            $baseurl.='/ws/'.$client_keysecret[$keysecret['key']]['listen_key'];
        }

        return $baseurl;
    }

    private function newConnection(){
        return function($tag,$keysecret){
            $global=$this->client();

            $baseurl=$this->getBaseUrl($global,$keysecret);

            $this->connection[$this->connectionIndex] = new AsyncTcpConnection($baseurl);
            $this->connection[$this->connectionIndex]->transport = 'ssl';

            $this->log('Connection '.$baseurl);

            //自定义属性
            $this->connection[$this->connectionIndex]->tag=$tag;//标记公共连接还是私有连接
            $this->connection[$this->connectionIndex]->tag_reconnection_num=0;//标记当前已重连次数
            if(!empty($keysecret)) $this->connection[$this->connectionIndex]->tag_keysecret=$keysecret;//标记私有连接

            $this->connection[$this->connectionIndex]->onConnect=$this->onConnect($keysecret);
            $this->connection[$this->connectionIndex]->onMessage=$this->onMessage($global);
            $this->connection[$this->connectionIndex]->onClose=$this->onClose($global);
            $this->connection[$this->connectionIndex]->onError=$this->onError();

            $this->connect($this->connection[$this->connectionIndex]);
            $this->ping($this->connection[$this->connectionIndex]);
            $this->other($this->connection[$this->connectionIndex],$global);

            $this->connectionIndex++;
        };
    }

    private function onConnect(array $keysecret){
        return function($con) use($keysecret){
            if(empty($keysecret)) return;

            $this->keysecretInit($keysecret,[
                'connection'=>1
            ]);

            $this->log($keysecret['key'].' new connect send');
        };
    }

    private function onMessage($global){
        return function($con,$data) use($global){
            $data=json_decode($data,true);

            if(isset($data['stream'])) {
                /*$debug=$global->get('debug2');
                if($debug==1){
                    $con->tag_data_time='1618899692';
                    return;
                }*/

                $table=$data['stream'];
                //$global->save($table,$data);
                $this->local_global['public'][$table]=$data;

                //最后数据更新时间
                $con->tag_data_time=time();
                //成功接收数据重连次数回归0
                $con->tag_reconnection_num=0;
                return;
            }

            if(isset($data['e']) && $con->tag!='public') {
                $table=$this->userKey($con->tag_keysecret,$data['e']);

                $global->saveQueue($table,$data);

                $global->allSubUpdate([$con->tag_keysecret['key']=>[$table]],'add');
                return;
            }

            if(isset($data['result']) && empty($data['result'])) return;

            $this->log($data);
        };
    }

    private function onClose($global){
        return function($con) use($global){
            //这里连接失败 会轮询 connect
            if($con->tag=='public') {
                //TODO如果连接失败  应该public  private 都行重新加载
                $this->log($con->tag.' reconnection');

                //Clear public cached data
                foreach ($this->local_global['public'] as $k=>$v) unset($this->local_global['public'][$k]);

                $this->reconnection($global,'public');
            }else{
                $this->log('private connection close,ready to reconnect '.$con->tag_keysecret['key']);

                //更改为掉线状态
                $listen_key=$this->getListenKey($con->tag_keysecret);
                $this->keysecretInit($con->tag_keysecret,[
                    'connection'=>2,
                    'listen_key'=>$listen_key,
                    'listen_key_time'=>time(),
                    'connection_close'=>0,
                ]);

                //重新订阅私有频道
                $this->reconnection($global,'private',$con->tag_keysecret);
            }

            $con->reConnect(10);
        };
    }

    private function onError(){
        return function($con, $code, $msg){
            $this->log('onerror code:'.$code.' msg:'.$msg);
        };
    }

    private function connect($con){
        $con->connect();
    }

    private function ping($con){
        $time=isset($this->config['ping_time']) ? $this->config['ping_time'] : 20 ;

        $con->timer_ping=Timer::add($time, function() use ($con) {
            //$con->send('ping');

            $this->log($con->tag.' send ping');
        });
    }

    private function other($con,$global){
        $time=isset($this->config['listen_time']) ? $this->config['listen_time'] : 2 ;

        $con->timer_other=Timer::add($time, function() use($con,$global) {
            $this->subscribe($con,$global);

            $this->unsubscribe($con,$global);

            $this->account($con,$global);

            $this->coinFutureSubscribe($con,$global);

            $this->debug($con,$global);

            //公共数据如果60秒内无数据更新,则断开连接重新订阅,重试次数不超过10次
            if($con->tag=='public') {
                /*if(isset($con->tag_data_time)){
                    //debug
                    echo time() - $con->tag_data_time;
                    echo PHP_EOL;
                }*/

                //public
                if (isset($con->tag_data_time) && time() - $con->tag_data_time > 60 * ($con->tag_reconnection_num + 1) && $con->tag_reconnection_num <= 10) {
                    $con->close();

                    $con->tag_reconnection_num++;

                    $this->log('listen ' . $con->tag . ' reconnection_num:' . $con->tag_reconnection_num . ' tag_data_time:' . $con->tag_data_time);
                }
            }else{
                //private

            }

            $this->log('listen '.$con->tag);
        });

        //异步保存数据,不然会有阻塞问题。 0.2秒保存一次
        Timer::add(0.2, function() use($global) {
            $global->save('global_local',$this->local_global);
        });
    }

    /**
     * 调试用
     * @param $con
     * @param $global
     */
    private function debug($con,$global){
        $debug=$global->get('debug');

        if($con->tag=='public') {
            //public
            if(isset($debug['public']) && $debug['public'][$con->tag]=='close'){
                $this->log($con->tag.' debug '.json_encode($debug));

                $debug['public'][$con->tag]='recon';
                $global->save('debug',$debug);

                $con->close();
            }
        }else{
            //private
            if(isset($debug['private'][$con->tag_keysecret['key']]) && $debug['private'][$con->tag_keysecret['key']]==$con->tag_keysecret['key']){
                $this->log($con->tag_keysecret['key'].' debug '.json_encode($debug));

                unset($debug['private'][$con->tag_keysecret['key']]);
                $global->save('debug',$debug);

                //更改为掉线状态
                $this->keysecretInit($con->tag_keysecret,[
                    'connection'=>2,
                    'listen_key_time'=>time(),
                    'connection_close'=>0,
                ]);

                $con->close();
            }
        }
    }

    private function subscribe($con,$global){
        if(empty($global->get('add_sub'))) return;

        $sub=$global->get('add_sub');

        //公共连接 标记订阅频道是否有改变。
        if(empty($sub)) {
            $this->log('public dont change return');
            return;
        }

        $data=[
            "method"=>"SUBSCRIBE",
            'params'=>$sub,
            'id'=>$this->getId()
        ];

        $data=json_encode($data);
        $con->send($data);

        $this->log($data);
        $this->log('public subscribe send');

        //*******订阅成功后,删除add_sub  public 值
        $global->addSubUpdate();

        //*******订阅成功后 更改 all_sub  public 值
        $global->allSubUpdate($sub,'add');

        return;
    }

    private function unsubscribe($con,$global){
        if(empty($this->get('del_sub'))) return;

        $unsub=[];
        $temp=$this->get('del_sub');
        foreach ($temp as $v){
            $unsub[]=$v;
        }

        if(empty($unsub)) {
            $this->log('unsubscribe public return');
            return;
        }

        //判断当前是否已经重复订阅。可以无所谓。
        $data=[
            "method"=>"UNSUBSCRIBE",
            'params'=>$unsub,
            'id'=>$this->getId()
        ];

        $data=json_encode($data);
        $con->send($data);

        $this->log($data);
        $this->log('public unsubscribe send');

        //*******订阅成功后,删除del_sub  public 值
        $global->delSubUpdate();

        //*******订阅成功后 更改 all_sub  public 值
        $global->allSubUpdate($unsub,'del');

        return;
    }

    private function account($con,$global){
        $keysecret=$global->get('keysecret');
        if(empty($keysecret)) return;

        foreach ($keysecret as $k=>$v){
            //是否取消连接
            if($con->tag!='public' && isset($v['connection_close']) && $v['connection_close']==1){
                $con->close();

                $this->keysecretInit($v,[]);

                $this->log('private connection close '.$v['key']);
                continue;
            }


            //是否有新的连接
            if(isset($v['connection'])){
                switch ($v['connection']){
                    case 0:{
                        $listen_key=$this->getListenKey($v);

                        $this->keysecretInit($v,[
                            'connection'=>2,
                            'listen_key'=>$listen_key,
                            'listen_key_time'=>time(),
                            'connection_close'=>0,
                        ]);

                        $this->log('private account new connection '.$v['key']);

                        $this->addConnection($v['key'],$v);
                        break;
                    }
                    case 1:{
                        //listen_key是否过期  60分钟过期时间
                        if(time()-$v['listen_key_time']>=3000) {

                            $listen_key=$this->getListenKey($v);
                            $this->keysecretInit($v,[
                                'listen_key'=>$listen_key,
                                'listen_key_time'=>time(),
                            ]);

                            $this->log('private update listen_key_time '.$v['key']);
                        }
                        break;
                    }
                    case 2:{
                        $this->log('private already account return '.$v['key']);
                        break;
                    }
                }
            }

        }
    }

    /**
     * @param $con
     * @param $global
     */
    private function coinFutureSubscribe($con,$global){
        if($con->tag!='public' && $this->config['baseurl']=='ws://dstream.binance.com'){
            $data=[
                "method"=>"REQUEST",
                'params'=>[
                    '@account',
                    '@balance',
                    '@position',
                ],
                'id'=>$this->getId()
            ];
            $con->send(json_encode($data));
        }
    }
}

ZeroDay Forums Mini