thinkphp融合workerman的两种简单方式,直接在workerman里面用tp的orm功能
后台-插件-广告管理-内容页头部广告(手机) |
Workerman是什么?
Workerman是一款纯PHP开发的开源高性能的PHP 应用容器。
Workerman不是重复造轮子,它不是一个MVC框架,而是一个更底层更通用的服务框架,你可以用它开发tcp代理、梯子代理、做游戏服务器、邮件服务器、ftp服务器、甚至开发一个php版本的redis、php版本的数据库、php版本的nginx、php版本的php-fpm等等。Workerman可以说是PHP领域的一次创新,让开发者彻底摆脱了PHP只能做WEB的束缚。
实际上Workerman类似一个PHP版本的nginx,核心也是多进程+Epoll+非阻塞IO。Workerman每个进程能维持上万并发连接。由于本身常驻内存,不依赖Apache、nginx、php-fpm这些容器,拥有超高的性能。同时支持TCP、UDP、UNIXSOCKET,支持长连接,支持Websocket、HTTP、WSS、HTTPS等通讯协议以及各种自定义协议。拥有定时器、异步socket客户端、异步Redis、异步Http、异步消息队列等众多高性能组件。
在thinkphp里面使用workerman直接composer 引入即可调用,问题在用如何在workerman里面调用tp的ORM功能,官方示例是用几个start.php启动workerman相关服务,那么在Events.php里面怎样使用tp的ORM呢。下面用tp5.0.7,其它版本的差异自行研究哦。
方法一(命令行模式):
①新建几个tp的命令行(linux下就一个即可,windows下面方便调试就多每个服务独立一个)。
linux里面只要运行ChatAll就可以了。
里面就是调用下面对应的方法,取ChatAll为例(都调用了一次):
<?php
namespace app\command\controller;
use app\service\ChatWorkerService;
use think\console\Command;
use think\console\Input;
use think\console\Output;
use Workerman\Worker;
class ChatAll extends Command
{
public function configure()
{
$this->setName('ChatAll')
->setDescription('linux运行所有聊天服务')
->addArgument('action', null, null, 'start')//start|stop|restart|reload|status
->addArgument('type', null, null);//d|daemon 守护进程
}
public function execute(Input $input, Output $output)
{
$type = $input->getArgument('type');
// 标记是全局启动
define('GLOBAL_START', 1);
ChatWorkerService::Register();
ChatWorkerService::BusinessWorker();
ChatWorkerService::Gateway();
//以守护模式启动
//Worker::$daemonize = true;
if ($type == 'd' || $type == 'daemon') {
Worker::$daemonize = true;
}
// 运行所有服务
Worker::runAll();
}
}
②新建ChatWorkerService类(按自己的需求就行,我是用于聊天的),里面内容大概这样:
<?php
namespace app\service;
use app\api\controller\chat\Events;
use GatewayWorker\BusinessWorker;
use GatewayWorker\Gateway;
use GatewayWorker\Register;
use Workerman\Worker;
class ChatWorkerService
{
public static function BusinessWorker()
{
$config = config('worker');
// bussinessWorker 进程
$worker = new BusinessWorker();
// worker名称
$worker->name = 'ChatBusinessWorker';
$worker::$logFile = RUNTIME_PATH. 'worker.log';
$worker::$pidFile = RUNTIME_PATH. 'worker.pid';
// bussinessWorker进程数量
// 服务注册地址
$worker->registerAddress = $config['register_address'] . ':' . $config['register_port'];
$worker->eventHandler = Events::class;// 事件处理类
// 如果不是在根目录启动,则运行runAll方法
if (!defined('GLOBAL_START')) {
Worker::runAll();
}
}
public static function Register()
{
$config = config('worker');
// register 服务必须是text协议
$register = new Register('text://0.0.0.0:' . $config['register_port']);
$register->name = 'ChatRegister';
// 如果不是在根目录启动,则运行runAll方法
if (!defined('GLOBAL_START')) {
Worker::runAll();
}
}
public static function Gateway()
{
$config = config('worker');
// gateway 进程
$gateway = new Gateway(config('worker.gateway_address'));
// 设置名称,方便status时查看
$gateway->name = 'ChatGateway';
// 设置进程数,gateway进程数建议与cpu核数相同
$gateway->count = $config['count'];
// 分布式部署时请设置成内网ip(非127.0.0.1)
$gateway->lanIp = $config['lan_ip'];
// 内部通讯起始端口。假如$gateway->count=4,起始端口为2300
// 则一般会使用2300 2301 2302 2303 4个端口作为内部通讯端口
$gateway->startPort = $config['start_port'];
// 心跳间隔
$gateway->pingInterval = $config['ping_interval'];
$gateway->pingNotResponseLimit = $config['ping_not_response_limit'];
// 心跳数据(如果为空则前端需要设置心跳后端遍历是否存在)
$gateway->pingData = '{"type":"ping"}';
//$gateway->pingData = json_encode(['type' => 'ping','msg'=>'balala']);
// 服务注册地址
$gateway->registerAddress = $config['register_address'].':' . $config['register_port'];
// 如果不是在根目录启动,则运行runAll方法
if (!defined('GLOBAL_START')) {
Worker::runAll();
}
}
}
配置文件也提供一下吧:
<?php
return [
'register_port' => 1236,
'register_address' => '127.0.0.1', // Gateway内网地址,cluster模式下设置为0
'lan_ip' => '127.0.0.1', // 本机内网IP,用于网关和业务代码通信
'start_port' => 2300, // 起始端口
'ping_interval' => 10, // 心跳间隔
'ping_not_response_limit' => 1, // 允许的心跳失败次数,超过这个次数会断开连接
'ping_data' => '', // 心跳数据
'count' => 2, // 此业务启动的进程数
'user' => 'www', // 运行user
'group' => 'www', // 运行group
'daemonize' => false, // 是否守护进程化运行
'socket_buffer_size' => 1024 * 1024 * 2, // 单个socket的buffer大小
'gateway_address' => 'Websocket://0.0.0.0:7272',//监听地址
];
这种方式直接可以使用tp的orm,在Events里面爱怎么查询就怎么查询。
运行模式:
linux下面:php think ChatAll start d (在后台进程方式运行,不加d就是调试模式)
windows:依次执行三个命令行即可(除了ChatALL),为了方便启动我写了个bat脚本:
::@echo off
CHCP 65001
:: 检查并启动 ChatBusinessWorker
tasklist /FI "IMAGENAME eq php.exe" /FI "WINDOWTITLE eq ChatBusinessWorker" | find /I "php.exe" >nul
if errorlevel 1 (
start "ChatBusinessWorker" cmd /k "php think ChatBusinessWorker"
)
:: 检查并启动 ChatGateway
tasklist /FI "IMAGENAME eq php.exe" /FI "WINDOWTITLE eq ChatGateway" | find /I "php.exe" >nul
if errorlevel 1 (
start "ChatGateway" cmd /k "php think ChatGateway"
)
:: 检查并启动 ChatRegister
tasklist /FI "IMAGENAME eq php.exe" /FI "WINDOWTITLE eq ChatRegister" | find /I "php.exe" >nul
if errorlevel 1 (
start "ChatRegister" cmd /k "php think ChatRegister"
)
方法二(多个独立start_*.php方式):
这种也是官方的方式,官方没有说怎么调用tp的ORM,我是演示怎么用TP框架ORM的。
①直接下载workerman(你也可以用composer方式安装到tp目录,但是当引入TP的时候其他第三类库可能冲突)放到项目某个目录(我和application同级加上chat目录)
②修改三个start_*.php,里面的配置是测试用到,自己可以定义一个config.php,引入就行了。
<?php
use \Workerman\Worker;
use \GatewayWorker\BusinessWorker;
require_once __DIR__ . '/../vendor/autoload.php';
// bussinessWorker 进程
$worker = new BusinessWorker();
// worker名称
$worker->name = 'ChatBusinessWorker';
// bussinessWorker进程数量
$worker->count = 2;
// 服务注册地址
$worker->registerAddress = '127.0.0.1:1239';
!defined('RUNTIME_PATH') && define('RUNTIME_PATH', __DIR__ . '/../../runtime/');
$worker::$logFile = RUNTIME_PATH . 'worker.log';
$worker::$pidFile = RUNTIME_PATH . 'worker.pid';
// 如果不是在根目录启动,则运行runAll方法
if (!defined('GLOBAL_START')) {
Worker::runAll();
}
<?php
use \Workerman\Worker;
use \GatewayWorker\Gateway;
require_once __DIR__ . '/../vendor/autoload.php';
// gateway 进程
$gateway = new Gateway("Websocket://0.0.0.0:7272");
// 设置名称,方便status时查看
$gateway->name = 'ChatGateway';
// 设置进程数,gateway进程数建议与cpu核数相同
$gateway->count = 2;
// 分布式部署时请设置成内网ip(非127.0.0.1)
$gateway->lanIp = '127.0.0.1';
// 内部通讯起始端口。假如$gateway->count=4,起始端口为2300
// 则一般会使用2300 2301 2302 2303 4个端口作为内部通讯端口
$gateway->startPort = 2300;
// 心跳间隔
$gateway->pingInterval = 10;
$gateway->pingNotResponseLimit = 1;
// 心跳数据(如果为空则前端需要设置心跳后端遍历是否存在)
$gateway->pingData = '{"type":"ping"}';
// 服务注册地址
$gateway->registerAddress = '127.0.0.1:1236';
// 如果不是在根目录启动,则运行runAll方法
if(!defined('GLOBAL_START'))
{
Worker::runAll();
}
<?php
use \Workerman\Worker;
use \GatewayWorker\Register;
require_once __DIR__ . '/../vendor/autoload.php';
// register 服务必须是text协议
$register = new Register('text://0.0.0.0:1239');
$register->name='ChatRegister';
// 如果不是在根目录启动,则运行runAll方法
if(!defined('GLOBAL_START'))
{
Worker::runAll();
}
③修改Events.php,在里面加载tp框架
<?php
use think\App;
class Events
{
private static $events;
public static function onWorkerStart($worker)
{
// 初始化ThinkPHP应用环境tp5.0.7(其他tp版本的命令可能有差异哈)
define('APP_PATH', __DIR__ . '/../../application/');
define('THINK_PATH', __DIR__ . '/../../thinkphp/');
require __DIR__ . '/../../thinkphp/base.php';
// 执行应用
App::initCommon();//到此即可,千万不要Console::init()!!!;
self::$events = new \app\api\controller\chat\Events();
self::$events::onWorkerStart($worker);
}
public static function onWebSocketConnect($client_id, $data)
{
self:: $events::onWebSocketConnect($client_id, $data);
}
public static function onConnect($client_id)
{
self:: $events::onConnect($client_id);
}
//可以在这里为每一个businessWorker进程做一些清理工作,例如保存一些重要数据等。
public static function onWorkerStop($businessWorker)
{
self:: $events::onWorkerStop($businessWorker);
}
/**
* 有消息时
* @param int $client_id
* @param mixed $message
* @return void|bool
* @throws Exception
*/
public static function onMessage($client_id, $message)
{
self:: $events::onMessage($client_id, $message);
}
/**
* 当客户端断开连接时
* @param integer $client_id 客户端id
*/
public static function onClose($client_id)
{
self:: $events::onClose($client_id);
}
}
这样在\app\api\controller\chat\Events类里面随意调用tp的ORM啦
启动方式就很简单了:
linux 下面:php start.php start -d
windows下面:直接运行start_for_win.bat
\app\api\controller\chat\Events类:
<?php
/**
* 用于检测业务代码死循环或者长时间阻塞等问题
* 如果发现业务卡死,可以将下面declare打开(去掉//注释),并执行php start.php reload
* 然后观察一段时间workerman.log看是否有process_timeout异常
*/
#declare(ticks=1);
/**
* 聊天主逻辑
* 主要是处理 onMessage onClose
*/
namespace app\api\controller\chat;
use app\result\PingResult;
use Workerman\Timer;
//修改了之后要重启一下服务!!!!!
//修改了之后要重启一下服务!!!!!
//修改了之后要重启一下服务!!!!!
class Events
{
public static function onWorkerStart($worker)
{
//防止长链接数据库掉链的,30秒ping一次db保持活跃
Timer::add(30, function () use ($worker) {
PingResult::ping('chat_service');
});
}
public static function onWebSocketConnect($client_id, $data)
{
//ChatLog::add([$client_id, $data], 'onWebSocketConnect', 1);
}
public static function onConnect($client_id)
{
echo '开始连接: ' . $client_id, PHP_EOL;
// var_dump($client_id);
}
//可以在这里为每一个businessWorker进程做一些清理工作,例如保存一些重要数据等。
public static function onWorkerStop($businessWorker)
{
//清除所有客户端
// echo '$businessWorker';
}
/**
* 有消息时
* @param int $client_id
* @param mixed $message
* @return void|bool
*/
public static function onMessage($client_id, $message)
{
$messageData = @json_decode($message, true);
if (!$messageData) {
return;
}
switch ($messageData['type']) {
case 'pong':
return;
case 'login':
break;
case 'chat':
break;
}
}
/**
* 当客户端断开连接时
* @param integer $client_id 客户端id
*/
public static function onClose($client_id)
{
}
}
第一种方式supervisord 不太友好(可能与tp版本有关系),第二种方式supervisord 相对友好
1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,请转载时务必注明文章作者和来源,不尊重原创的行为我们将追究责任;3.作者投稿可能会经我们编辑修改或补充。
在线投稿:投稿 站长QQ:1888636
后台-插件-广告管理-内容页尾部广告(手机) |