Run Celery tasks from PHP

If you are reading this article probably you already know what is Celery but for who think that it is something to eat I will give you a definition.
Celery is an asynchronous task queue/job queue based on distributed message passing developed in python.

If your using Celery for your back-office tasks will be very nice to run some tasks after an action performed on the front end of your web site, like an user action.
If your web front end is wrote in PHP there is an easy and useful way for do it trough Rabbit MQ, the message broker used by celery.
What we can do is simply put a message, that Celery workers can consume, in Rabbit MQ trough a pure AMQ PHP library called AMQPHP :-) .
You can download this implementation of the AMQ 0.9.1 protocol from the gitHub project’s page.

Follow the instruction for generate the code in the readme file and simply include it in your project.

When you are sure that the library works and is able to publish message on Rabbit MQ you can use the class provided at the end on this post for put message on the broker ready for be consumed from celery workers.
This class offer a method called runCeleryTask($task,$arguments) that allow you to run a specific registered task with N arguments.
If I have a celery registered task called package.exampleTask.dojob that accept a string and an Integer I can schedule it in this way

$taskRunner = new TaskRunner($ini_string);
$taskRunner->runCeleryTask("package.exampleTask.dojob","string",1);

The $ini_string variable contains the configuration for the connection to the broker, a possible string of configuration looks like this

host=localhost
port=5672
username=testing
userpass=tock
vhost=mauro
exchange=celery
exchangeType=direct
queueName=celery

Here is the class that do the “Dirty Job” is quite easy and simple, I hope that this could help someone.
You don’t need to declare exchange and queue again, this are supposed to be already declared by celery.

<?php

use amqp_091 as amqp;
use amqp_091\protocol;
use amqp_091\wire;

include "amqphp/amqp.php";

class TaskRunner {
/** Instance of amqp\Connection */
private  $conn;
/** Instance of amqp\Channel */
private  $chan;
/** Name of  exchange */
private  $exchange;
/** Type of  exchange */
private  $exchangeType;
/** Name of  queue */
private  $queueName;

function  __construct($ini_string){
$sParams = parse_ini_string($ini_string);
if (! $sParams) {
throw new Exception("Failed to find broker settings", 9854);
}
$this->exchange = $sParams['exchange'];
$this->queueName = $sParams['queueName'];
$this->exchangeType = $sParams['exchangeType'];

$this->conn = new amqp\Connection($sParams);
}

private function openConnection(){
if ($this->conn) $this->conn->connect();
}

private function closeConnection () {
if ($this->conn) $this->conn->shutdown();
}

function runCeleryTask($task,$arguments){
if (!is_array($arguments)){
throw new Exception("The second parameter of the function runTask() must be an array");
}

$this->openConnection();
$chan = $this->conn->getChannel();

//Publish the message
$basicP = $chan->basic('publish', array('content-type' => 'application/json',
'content-encoding' => 'UTF-8',
'routing-key' => 'celery',
'mandatory' => false,
'immediate' => false,
'exchange' => $this->exchange));

$basicP->setContent('{"id":"'.uniqid('php_').'",
"task": "'.$task.'",
"args": ['.'"'.implode('","',$arguments).'"'.'],
"kwargs": {}}');

$chan->invoke($basicP);

$this->closeConnection();
}

}

?>
This entry was posted in PHP, Python and tagged , , , , , , . Bookmark the permalink.