Swoole深入学习-task进程异步任务实现

技术探讨  2019-03-11 13:31   7820 swoole task

Swoole深入学习-task进程异步任务实现


在前面我们已经了解了swoole的进程模型,同时也了解了同步异步相关的概念,那么本节课我们学习下异步任务的实现。

 

1、什么是task进程?

    Task进程是独立与worker进程的一个进程.他主要处理耗时较长的业务逻辑.并且不影响worker进程处理客户端的请求,这大大提高了swoole的并发能力当有耗时较长的任务时,worker进程通过task()函数把数据投递到Task进程去处理

 

回顾一下:

微信图片_20190312091650.png

 

worker进程当中我们可以执行客户端提交过来的数据执行相应的业务逻辑,也就是在触发的事件当中,比如接收客户端提交的数据,处理同步的业务,这是在worker进程当中执行,那什么时候需要task进程呢?


2、task进程适用场景

   情景一:管理员需要给100W用户发送邮件,当点击发送,浏览器会一直转圈,直到邮件全部发送完毕。

  情景二:千万微博大V发送一条微博,其关注的粉丝相应的会接收到这个消息,是不是大V需要一直等待消息发送完成,才能执行其它操作

从我们理解的角度思考,这其实都是php程一直被阻塞,客户端才一直在等待服务端的响应,我们的代码就是同步执行的。

用户而言,这就是漫长的等待。如何优雅的提高用户体验就是一个非常棘手的问题。

那么task的目的就是这个下面我们来介绍下task的使用

  微信图片_20190312091610.png

 

1、worker进程到中,我们调用对应的task()方法发送数据通知到task worker进程

2、task worker进程会在onTask()回调中接收到这些数据,并进行处理。

3、处理完成之后通过调用finsh()函数或者直接return返回消息给worker进程

4、worker进程在onFinsh()进程收到这些消息并进行处理

三、代码测试

代码如下:

class taskServer
{
    
private $serv;
    
public function __construct() {
    
$this->serv new Swoole\Server('0.0.0.0'9501);
    
//初始化swoole服务
    
$this->serv->set(array(
        
'worker_num'  => 8,
        
'task_worker_num' => //异步任务进程
    
));
    
//设置监听
    
$this->serv->on('Connect'array($this'onConnect'));
    
$this->serv->on("Receive"array($this'onReceive'));
    
$this->serv->on("Close"array($this'onClose'));
    
$this->serv->on("Task"array($this'onTask'));
    
$this->serv->on("Finish"array($this'onFinish'));
    
//开启
    
$this->serv->start();
    
}
    
public function onConnect($serv$fd) {
    }
    
public function onReceive($serv$fd$from_id$data) {
        
echo "接收到来自客户端的消息{$fd}:{$data}\n";
        
$param array(
            
'fd' => $fd
        
);
        
//投递任务到task进程当中
        
$serv->task(json_encode($param));

        
echo "我能不能先执行\n";
    
}
    
public function onClose($serv$fd) {
    }
    
public function onTask($serv$task_id$from_id$data) {
    
echo "消息任务:{$task_id}来自worker:{$from_id}\n";
    
echo "数据: {$data}\n";
    
for($i $i 20 $i ++ ) {
        sleep(
1);
        
echo "task:{$task_id}-处理中{$i\n";
    
}
    
$fd = json_decode($datatrue);
    
//可以直接通知客户端,看业务逻辑需求
    
$serv->send($fd['fd'"task数据处理完成{$task_id}");
    
//返回到worker进程
    
return "task任务处理完成";
}
    
public function onFinish($serv,$task_id$data) {
     
echo "Task:{$task_id}-处理完成 \n";
  
}
}
$server new taskServer();

 

注意事项:

1、开启task功能

task功能默认是关闭的,开启task功能需要满足两个条件

1.配置task进程的数量,即配置task_worker_num这个配置项

2.注册task的回调函数onTask和onFinish

2、task怎么使用?

Task进程其实是要在worker进程内发起的,即我们把需要投递的任务,通过worker进程投递到task进程中去处理。

怎么操作呢?我们可以利用swoole_server->task函数把任务数据投递到task进程池中。

swoole_server->task函数是非阻塞函数,任务投递到task进程中后会立即返回,即不管任务需要在task进程内处理多久,worker进程也不需要任何的等待,不会影响到worker进程的其他操作。但是task进程却是阻塞的,如果当前task进程都处于繁忙状态即都在处理任务,你又投递过来更多任务,这个时候新投递的任务就只能乖乖的排队等task进程空闲才能继续处理。

如果投递的任务量总是大于task进程的处理能力,建议适当的调大task_worker_num的数量,增加task进程数,不然一旦task塞满缓冲区,就会导致worker进程阻塞,们不期望的结果。

3、Task常见问题:

Task传递数据的大小
数据小于8k直接通过管道传递,数据大于8k写入临时文件传递
onTask会读取这个文件,把他读出来

运行Task,必须要在swoole服务中配置参数task_worker_num,此外,必须给swoole_server绑定两个回调函数:onTask和onFinish。 
onTask要return 数据 

 

  Task传递对象
默认task只能传递数据,可以通过序列化传递一个对象的拷贝,Task中对象的改变并不会反映到worker进程中数据库连接,网络连接对象不可传递,会引起php报错

 

  Task的onFinish回调
Task的onFinish回调会回调调用task方法的worker进程

 

四、客户端测试

 客户端请求:
  
  客户端代码利用之前的client代码就可以

         微信图片_20190312091619.png

  服务端运行结果:

  微信图片_20190312091624.png 

 

看箭头指示我们发现最终确实在屏幕上输出了,worker进程当中写的(task处理完成),并且是异步执行最开始就输出了(我能不能先执行)。

                                                                       


注:转载请注明出处为http://www.www.xtaike.com/article/94.html。

沙豆网 站长

追求卓越,奋斗不息!

167
文章
8816
点赞

更多文章