PHP模拟supervisor的进程管理

PHP模拟supervisor的进程管理,第1张

概述PHP模拟supervisor的进程管理

推荐:《PHP视频教程》

前言

模拟supervisor进程管理DEMO(简易实现)

没错,是造轮子!目的在于学习!

截图:

在图中自己实现了一个copy子进程的功能。如果用在AMQP增减消费者时,我觉得应该会很有用。

实现

1、在主进程循环内启动子进程执行命令
2、在web输入 127.0.0.1:7865 获取子进程状态
3、socket接收请求消息,并且执行相应 *** 作,返回web页面
4、回收子进程,防止称为僵尸进程

不足:无法持续监听错误页面。由于socket得到的响应是通过include函数加载的,所以在加载的页面内不能出现tail -f命令,否则stream就会掉入了死循环了~。我想应该有方案解决(写了socket+多进程模式,模仿fpm在接收到请求之后就启动一个子进程去处理的模式,但是执行有问题。因此将代码贴出来希望得到大家的指点)。
延伸:由于对进程可以很好的管理(期望如此),那么就可以定制化自己的一些需求,比如:(1)定制AMQP的消费者进程管理服务。(2)模拟crontab定时服务。

知识点

代码实现的过程中,有很多的细节是值得学习的。
1、在while()循环中,启用了stream的非阻塞模式。所以不能在循环中使用sleep(1),而是用stream_select($read, $write, $except, 1)让stream内部阻塞。
关于阻塞非阻塞模式,可以参阅这里
2、能够执行外部程序的函数很多,但是都稍有不同。这里采用的是proc_open,是一个很强大的函数。在这之前我曾用pcntl_exec执行过外部程序,但是需要先pcntl_fork。而用其他的如exec,shell_exec无法对子进程进行管理。
3、重启或停止等 *** 作子进程时,只是先更改主进程中该子进程在内存中的的状态,并不是真正的对子进程 *** 作。在统一处init()处理子进程。如此才能防止因为子进程启动时的上下文导致的一些怪异的现象。

代码

由于代码过多,所以如果你对我的方案有更好的建议可以在github这里看。

主进程代码:Process.PHP

<?PHPrequire_once __DIR__ . '/Consumer.PHP';require_once __DIR__ . '/StreamConnection.PHP';require_once __DIR__ . '/http.PHP';class Process{    /**      * 待启动的消费者数组     */    protected $consumers = array();    protected $childPIDs = array();    const PPID_file = __DIR__ . '/process';    protected $serializerConsumer;    public function __construct()    {        $this->consumers = $this->getConsumers();    }    // 这里是个DEMO,实际可以用读取配置文件的方式。    public function getConsumers()    {        $consumer = new Consumer([            'program' => 'test',            'command' => '/usr/bin/PHP test.PHP',            'directory' => __DIR__,            'logfile' => __DIR__ . '/test.log',            'uniqID' => uniqID(),            'auto_restart' => false,        ]);        return [            $consumer->uniqID => $consumer,        ];    }    public function run()    {        if (empty($this->consumers)) {            // consumer empty            return;        }        if ($this->_notifyMaster()) {            // master alive            return;        }        $pID = pcntl_fork();        if ($pID < 0) {            exit;        } elseif ($pID > 0) {            exit;        }        if (!posix_setsID()) {            exit;        }        $stream = new StreamConnection('tcp://0.0.0.0:7865');        @cli_set_process_Title('AMQP Master Process');        // 将主进程ID写入文件        file_put_contents(self::PPID_file, getmypID());        // master进程继续        while (true) {            $this->init();            pcntl_signal_dispatch();            $this->waitpID();            // 如果子进程被全部回收,则主进程退出            // if (empty($this->childPIDs)) {            //     $stream->close($stream->getSocket());            //     break;            // }            $stream->accept(function ($uniqID, $action) {                $this->handle($uniqID, $action);                return $this->display();            });        }    }    protected function init()    {        foreach ($this->consumers as &$c) {            switch ($c->state) {                case Consumer::RUNNING:                case Consumer::Stop:                    break;                case Consumer::NOMINAL:                case Consumer::STARTING:                    $this->fork($c);                    break;                case Consumer::STOPing:                    if ($c->pID && posix_kill($c->pID, SIGTERM)) {                        $this->reset($c, Consumer::Stop);                    }                    break;                case Consumer::RESTART:                    if (empty($c->pID)) {                        $this->fork($c);                        break;                    }                    if (posix_kill($c->pID, SIGTERM)) {                        $this->reset($c, Consumer::Stop);                        $this->fork($c);                    }                    break;                default:                    break;            }        }    }    protected function reset(Consumer $c, $state)    {        $c->pID = '';        $c->uptime = '';        $c->state = $state;        $c->process = null;    }    protected function waitpID()    {        foreach ($this->childPIDs as $uniqID => $pID) {            $result = pcntl_waitpID($pID, $status, WNOHANG);            if ($result == $pID || $result == -1) {                unset($this->childPIDs[$uniqID]);                $c = &$this->consumers[$uniqID];                $state = pcntl_wifexited($status) ? Consumer::EXITED : Consumer::Stop;                $this->reset($c, $state);            }        }    }    /**     * 父进程存活情况下,只会通知父进程信息,否则可能产生多个守护进程     */    private function _notifyMaster()    {        $ppID = file_get_contents(self::PPID_file );        $isAlive = $this->checkProcessAlive($ppID);        if (!$isAlive) return false;        return true;    }    public function checkProcessAlive($pID)    {        if (empty($pID)) return false;        $pIDinfo = `ps co pID {$pID} | xargs`;        $pIDinfo = trim($pIDinfo);        $pattern = "/.*?PID.*?(\d+).*?/";        preg_match($pattern, $pIDinfo, $matches);        return empty($matches) ? false : ($matches[1] == $pID ? true : false);    }    /**     * fork一个新的子进程     */    protected function fork(Consumer $c)    {        $descriptorspec = [2 => ['file', $c->logfile, 'a'],];        $process = proc_open('exec ' . $c->command, $descriptorspec, $pipes, $c->directory);        if ($process) {            $ret = proc_get_status($process);            if ($ret['running']) {                $c->state = Consumer::RUNNING;                $c->pID = $ret['pID'];                $c->process = $process;                $c->uptime = date('m-d H:i');                $this->childPIDs[$c->uniqID] = $ret['pID'];            } else {                $c->state = Consumer::EXITED;                proc_close($process);            }        } else {            $c->state = Consumer::ERROR;        }        return $c;    }    public function display()    {        $location = 'http://127.0.0.1:7865';        $basePath = http::$basePath;        $scriptname = isset($_SERVER['SCRIPT_name']) &&            !empty($_SERVER['SCRIPT_name']) &&            $_SERVER['SCRIPT_name'] != '/' ? $_SERVER['SCRIPT_name'] : '/index.PHP';        if ($scriptname == '/index.HTML') {            return http::status_301($location);        }        $sourcePath = $basePath . $scriptname;        if (!is_file($sourcePath)) {            return http::status_404();        }        ob_start();        include $sourcePath;        $response = ob_get_contents();        ob_clean();        return http::status_200($response);    }    public function handle($uniqID, $action)    {        if (!empty($uniqID) && !isset($this->consumers[$uniqID])) {            return;        }        switch ($action) {            case 'refresh':                break;            case 'restartall':                $this->killall(true);                break;            case 'stopall':                $this->killall();                break;            case 'stop':                $c = &$this->consumers[$uniqID];                if ($c->state != Consumer::RUNNING) break;                $c->state = Consumer::STOPing;                break;            case 'start':                $c = &$this->consumers[$uniqID];                if ($c->state == Consumer::RUNNING) break;                $c->state = Consumer::STARTING;                break;            case 'restart':                $c = &$this->consumers[$uniqID];                $c->state = Consumer::RESTART;                break;            case 'copy':                $c = $this->consumers[$uniqID];                $newC = clone $c;                $newC->uniqID = uniqID('C');                $newC->state = Consumer::NOMINAL;                $newC->pID = '';                $this->consumers[$newC->uniqID] = $newC;                break;            default:                break;        }    }    protected function killall($restart = false)    {        foreach ($this->consumers as &$c) {            $c->state = $restart ? Consumer::RESTART : Consumer::STOPing;        }    }}$cli = new Process();$cli->run();

Consumer消费者对象

<?PHPrequire_once __DIR__ . '/BaSEObject.PHP';class Consumer extends BaSEObject{    /** 开启多少个消费者 */    public $numprocs = 1;    /** 当前配置的唯一标志 */    public $program;    /** 执行的命令 */    public $command;    /** 当前工作的目录 */    public $directory;    /** 通过 $qos $queuename $duplicate 生成的 $queue */    public $queue;    /** 程序执行日志记录 */    public $logfile = '';    /** 消费进程的唯一ID */    public $uniqID;    /** 进程IDpID */    public $pID;    /** 进程状态 */    public $state = self::NOMINAL;    /** 自启动 */    public $auto_restart = false;    public $process;    /** 启动时间 */    public $uptime;    const RUNNING = 'running';    const Stop = 'stoped';    const NOMINAL = 'nominal';    const RESTART = 'restart';    const STOPing = 'stoPing';    const STARTING = 'stating';    const ERROR = 'error';    const BLOCKED = 'blocked';    const EXITED = 'exited';    const FATEL = 'fatel';}

stream相关代码:StreamConnection.PHP

<?PHPclass StreamConnection{    protected $socket;    protected $timeout = 2; //s    protected $clIEnt;    public function __construct($host)    {        $this->socket = $this->connect($host);    }    public function connect($host)    {        $socket = stream_socket_server($host, $errno, $errstr);        if (!$socket) {            exit('stream error');        }        stream_set_timeout($socket, $this->timeout);        stream_set_chunk_size($socket, 1024);        stream_set_blocking($socket, false);        $this->clIEnt = [$socket];        return $socket;    }    public function accept(Closure $callback)    {        $read = $this->clIEnt;        if (stream_select($read, $write, $except, 1) < 1) return;        if (in_array($this->socket, $read)) {            $cs = stream_socket_accept($this->socket);            $this->clIEnt[] = $cs;        }        foreach ($read as $s) {            if ($s == $this->socket) continue;            $header = fread($s, 1024);            if (empty($header)) {                $index = array_search($s, $this->clIEnt);                if ($index)                    unset($this->clIEnt[$index]);                $this->close($s);                continue;            }            http::parse_http($header);            $uniqID = isset($_GET['uniqID']) ? $_GET['uniqID'] : '';            $action = isset($_GET['action']) ? $_GET['action'] : '';            $response = $callback($uniqID, $action);            $this->write($s, $response);            $index = array_search($s, $this->clIEnt);            if ($index)                unset($this->clIEnt[$index]);            $this->close($s);        }    }    public function write($socket, $response)    {        $ret = fwrite($socket, $response, strlen($response));    }    public function close($socket)    {        $flag = fclose($socket);    }    public function getSocket()    {        return $this->socket;    }}

http响应代码:http.PHP

<?PHPclass http{    public static $basePath = __DIR__ . '/vIEws';    public static $max_age = 120; //秒    /*    *  函数:     parse_http    *  描述:     解析http协议    */    public static function parse_http($http)    {        // 初始化        $_POST = $_GET = $_cookie = $_REQUEST = $_SESSION = $_fileS =  array();        $GLOBALS['HTTP_RAW_POST_DATA'] = '';        // 需要设置的变量名        $_SERVER = array(            'query_STRING' => '',            'REQUEST_METHOD' => '',            'REQUEST_URI' => '',            'SERVER_PROTOCol' => '',            'SERVER_SOFTWARE' => '',            'SERVER_name' => '',            'http_HOST' => '',            'http_USER_AGENT' => '',            'http_ACCEPT' => '',            'http_ACCEPT_LANGUAGE' => '',            'http_ACCEPT_ENCoding' => '',            'http_cookie' => '',            'http_CONNECTION' => '',            'REMOTE_ADDR' => '',            'REMOTE_PORT' => '0',            'SCRIPT_name' => '',            'http_REFERER' => '',            'CONTENT_TYPE' => '',            'http_IF_NONE_MATCH' => '',        );        // 将header分割成数组        List($http_header, $http_body) = explode("\r\n\r\n", $http, 2);        $header_data = explode("\r\n", $http_header);        List($_SERVER['REQUEST_METHOD'], $_SERVER['REQUEST_URI'], $_SERVER['SERVER_PROTOCol']) = explode(' ', $header_data[0]);        unset($header_data[0]);        foreach ($header_data as $content) {            // \r\n\r\n            if (empty($content)) {                continue;            }            List($key, $value) = explode(':', $content, 2);            $key = strtolower($key);            $value = trim($value);            switch ($key) {                case 'host':                    $_SERVER['http_HOST'] = $value;                    $tmp = explode(':', $value);                    $_SERVER['SERVER_name'] = $tmp[0];                    if (isset($tmp[1])) {                        $_SERVER['SERVER_PORT'] = $tmp[1];                    }                    break;                case 'cookie':                    $_SERVER['http_cookie'] = $value;                    parse_str(str_replace('; ', '&', $_SERVER['http_cookie']), $_cookie);                    break;                case 'user-agent':                    $_SERVER['http_USER_AGENT'] = $value;                    break;                case 'accept':                    $_SERVER['http_ACCEPT'] = $value;                    break;                case 'accept-language':                    $_SERVER['http_ACCEPT_LANGUAGE'] = $value;                    break;                case 'accept-enCoding':                    $_SERVER['http_ACCEPT_ENCoding'] = $value;                    break;                case 'connection':                    $_SERVER['http_CONNECTION'] = $value;                    break;                case 'referer':                    $_SERVER['http_REFERER'] = $value;                    break;                case 'if-modifIEd-since':                    $_SERVER['http_IF_MODIFIED_SINCE'] = $value;                    break;                case 'if-none-match':                    $_SERVER['http_IF_NONE_MATCH'] = $value;                    break;                case 'content-type':                    if (!preg_match('/boundary="?(\S+)"?/', $value, $match)) {                        $_SERVER['CONTENT_TYPE'] = $value;                    } else {                        $_SERVER['CONTENT_TYPE'] = 'multipart/form-data';                        $http_post_boundary = '--' . $match[1];                    }                    break;            }        }        // script_name        $_SERVER['SCRIPT_name'] = parse_url($_SERVER['REQUEST_URI'], PHP_URL_PATH);        // query_STRING        $_SERVER['query_STRING'] = parse_url($_SERVER['REQUEST_URI'], PHP_URL_query);        if ($_SERVER['query_STRING']) {            // $GET            parse_str($_SERVER['query_STRING'], $_GET);        } else {            $_SERVER['query_STRING'] = '';        }        // REQUEST        $_REQUEST = array_merge($_GET, $_POST);        return array('get' => $_GET, 'post' => $_POST, 'cookie' => $_cookie, 'server' => $_SERVER, 'files' => $_fileS);    }    public static function status_404()    {        return <<<EOFhttp/1.1 404 OKcontent-type: text/HTMLEOF;    }    public static function status_301($location)    {        return <<<EOFhttp/1.1 301 Moved PermanentlyContent-Length: 0Content-Type: text/plainLocation: $locationCache-Control: no-cacheEOF;    }    public static function status_304()    {        return <<<EOFhttp/1.1 304 Not ModifIEdContent-Length: 0EOF;    }    public static function status_200($response)    {        $ContentType = $_SERVER['CONTENT_TYPE'];        $length = strlen($response);        $header = '';        if ($ContentType)            $header = 'Cache-Control: max-age=180';        return <<<EOFhttp/1.1 200 OKContent-Type: $ContentTypeContent-Length: $length$header$responseEOF;    }}

待执行的脚本:test.PHP

<?PHPwhile(true) {    file_put_contents(__DIR__  .  '/test.log', date('Y-m-d H:i:s'));    sleep(1);}

在当前目录下的视图页面:
|- Process.PHP
|- http.PHP
|- StreamConnection.PHP
|- Consumer.PHP
|- BaSEObject.PHP
|- vIEws/

更多编程相关知识,请访问:编程教学!! 总结

以上是内存溢出为你收集整理的PHP模拟supervisor的进程管理全部内容,希望文章能够帮你解决PHP模拟supervisor的进程管理所遇到的程序开发问题。

如果觉得内存溢出网站内容还不错,欢迎将内存溢出网站推荐给程序员好友。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/998046.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-21
下一篇 2022-05-21

发表评论

登录后才能评论

评论列表(0条)

保存