首页 文章

在Laravel中使用原始json队列消息

提问于
浏览
6

通常,Laravel希望它排队后来消耗的任何消息 . 它创建一个带有 job 属性的有效负载,该属性稍后指示如何处理队列消息 . 当您使用Laravel排队工作,然后使用Laravel处理它们时,效果很好!

但是,我有一些非Laravel应用程序将json消息发布到队列中 . 我需要Laravel来获取这些消息并处理它们 .

我可以编写一个命令总线作业来处理消息,但我无法弄清楚如何告诉 queue:work 将消息发送到我的特定处理程序 .

看来Laravel有一个很难的假设,就是它要求处理的任何队列消息都会按照预期的方式进行适当的格式化,序列化和结构化 .

我怎样才能让Laravel获取这些原始json有效负载,忽略结构(没有任何东西可供理解),只需将有效负载交给我的处理程序?

例如,如果我有一个类似于的队列消息:

{
    "foo" : "bar"
}

再说一次,Laravel没有什么可以在这里检查或理解的 .

但我有一个知道如何处理这个的工作处理程序:

namespace App\Jobs;

class MyQueueHandler {
    public function handle($payload) {
        Log::info($payload['foo']); // yay!
    }
}

现在如何让 queue:workqueue:listen 简单地将任何有效负载交给这个 App\Jobs\MyQueueHandler 处理程序,我可以自己完成其余的工作?

2 回答

  • 0

    你没有指定哪个版本的Laravel,所以我猜5.1(在L4.2和L5.x中如何处理它的巨大差异) .

    如果您已经设置了 App\Jobs\MyQueueHandler ,并希望使用您希望的任何数据从控制器排队作业,您可以这样做:

    use App\Jobs\MyQueueHandler;
    
    class MyController 
    {
        public function myFunction() 
        {
            $this->dispatch(new MyQueueHandler(['foo' => 'bar']));
        }
    }
    

    MyQueueHandler -class中,有效负载实际上进入了构造函数 . 处理队列时仍会触发handle-method . 但是,如果您依赖于依赖注入(read more here,就在“当事情发生错误”之上),您可以在 handle -method上使用参数 . 所以这样的事情应该这样做:

    namespace App\Jobs;
    
    class MyQueueHandler 
    {
    
        protected $payload;
    
        public function __construct($payload)
        {
            $this->payload = $payload;
        }
    
        public function handle() {
            Log::info($this->payload['foo']); // yay!
        }
    }
    

    注意:如果要从主控制器外部调度作业(继承自标准 App\Http\Controller -class,请使用 DispatchesJobs 特性;

    MyClass 
    {
        use DispatchesJobs;
    
        public function myFunction()
        {
            $this->dispatch(new MyQueueHandler(['foo' => 'bar']));
        }
    }
    

    (使用Laravel 5.1.19和beanstalkd队列适配器测试的代码) .

  • 0

    当Laravel尝试执行Gearman有效载荷时,您要求的是不可能的(参见 \Illuminate\Bus\Dispatcher ) .

    我处于同样的情况,只是在Laravel工作班周围创建了一个包装 command . 这不是最好的解决方案,因为它将重新排队json队列中的事件,但您不必触及现有的作业类 . 也许拥有更多经验的人知道如何派遣工作而不是实际再次通过电汇发送 .

    让我们假设我们有一个名为 GenerateIdentApplicationPdfJob 的常规Laravel工作类 .

    class GenerateIdentApplicationPdfJob extends Job implements SelfHandling, ShouldQueue
    {
        use InteractsWithQueue, SerializesModels;
    
        /** @var User */
        protected $user;
    
        protected $requestId;
    
        /**
         * Create a new job instance.
         *
         * QUEUE_NAME = 'ident-pdf';
         *
         * @param User $user
         * @param      $requestId
         */
        public function __construct(User $user, $requestId)
        {
            $this->user      = $user;
            $this->requestId = $requestId;
        }
    
        /**
         * Execute the job.
         *
         * @return void
         */
        public function handle(Client $client)
        {
            // ...
        }
    }
    

    为了能够处理这个类,我们需要提供我们自己的构造函数参数 . 这些是我们的json队列所需的数据 .

    下面是Laravel commandGearmanPdfWorker ,它可以完成Gearman连接的所有样板,并且 json_decode 能够处理原始作业类 .

    class GearmanPdfWorker extends Command {

    /**
         * The console command name.
         *
         * @var string
         */
        protected $name = 'pdf:worker';
    
        /**
         * The console command description.
         *
         * @var string
         */
        protected $description = 'listen to the queue for pdf generation jobs';
    
        /**
         * @var \GearmanClient
         */
        private $client;
    
        /**
         * @var \GearmanWorker
         */
        private $worker;
    
        public function __construct(\GearmanClient $client, \GearmanWorker $worker) {
            parent::__construct();
            $this->client = $client;
            $this->worker = $worker;
        }
    
        /**
         * Wrapper listener for gearman jobs with plain json payload
         *
         * @return mixed
         */
        public function handle()
        {
            $gearmanHost = env('CB_GEARMAN_HOST');
            $gearmanPort = env('CB_GEARMAN_PORT');
    
            if (!$this->worker->addServer($gearmanHost, $gearmanPort)) {
                $this->error('Error adding gearman server: ' . $gearmanHost . ':' . $gearmanPort);
                return 1;
            } else {
                $this->info("added server $gearmanHost:$gearmanPort");
            }
    
            // use a different queue name than the original laravel command, since the payload is incompatible
            $queueName = 'JSON.' . GenerateIdentApplicationPdfJob::QUEUE_NAME;
            $this->info('using queue: ' . $queueName);
    
            if (!$this->worker->addFunction($queueName,
                function(\GearmanJob $job, $args) {
                    $queueName = $args[0];
                    $decoded = json_decode($job->workload());
                    $this->info("[$queueName] payload: " . print_r($decoded, 1));
    
                    $job = new GenerateIdentApplicationPdfJob(User::whereUsrid($decoded->usrid)->first(), $decoded->rid);
                    $job->onQueue(GenerateIdentApplicationPdfJob::QUEUE_NAME);
                    $this->info("[$queueName] dispatch: " . print_r(dispatch($job)));
                },
                [$queueName])) {
                $msg = "Error registering gearman handler to: $queueName";
                $this->error($msg);
                return 1;
            }
    
            while (1) {
                $this->info("Waiting for job on `$queueName` ...");
                $ret = $this->worker->work();
                if ($this->worker->returnCode() != GEARMAN_SUCCESS) {
                    $this->error("something went wrong on `$queueName`: $ret");
                    break;
                }
                $this->info("... done `$queueName`");
            }
        }
    }
    

    GearmanPdfWorker 需要在 \Bundle\Console\Kernel 中注册,如下所示:

    class Kernel extends ConsoleKernel
    {
        protected $commands = [
            // ...
            \Bundle\Console\Commands\GearmanPdfWorker::class
        ];
    
        // ...
    

    完成所有这些后,您可以调用 php artisan pdf:worker 来运行该工作程序并通过命令行将一个作业放入Gearman: gearman -v -f JSON.ident-pdf '{"usrid":9955,"rid":"ABC4711"}'

    您可以看到成功的操作

    added server localhost:4730
    using queue: JSON.ident-pdf
    Waiting for job on `JSON.ident-pdf` ...
    [JSON.ident-pdf] payload: stdClass Object
    (
        [usrid] => 9955
        [rid] => ABC4711
    )
    
    0[JSON.ident-pdf] dispatch: 1
    ... done `JSON.ident-pdf`
    Waiting for job on `JSON.ident-pdf` ...
    

相关问题