Queues Reference¶
CiviCRM has a system for putting tasks in a queue to be processed. This processing can
be in the browser or via code or via coworker
if you configure it for your site
Overview¶
The code that creates the queue and queue tasks may be separate to the code that runs the queue and often will not take any responsibility for running it.
- Create a queue object via the service provider
- Create tasks (or other items) for the queue
- Run the queue
- Interact with the queue
There's a demo queue extension that shows how to use an SQL-based queue.
Definitions¶
- Queue: An object representing a queue, which may be new or existing.
- Queue type: A storage backend for the list of queue items.
- Queue runner: A class for processing the queued items.
- Item: A single job on a queue
- Task: A particular type of item, as expected by CiviCRM's queue runner.
- Release time: The time at which a job can be considered to have crashed if it is not completed (defaults to 1 hour). Note that it may well not have crashed and could still actually be running, especially in PHP FPM environments!
- Claim item: Refers to fetching the first item in a queue (if one exists) unless that item's release time has not been reached (typically meaning that item is currently being processed).
- Steal item: Refers to fetching the first item in a queue regardless and setting a new release time.
- Release item: Refers to leaving a failed queue item on the queue (e.g. for later retry)
1. Creating a queue¶
Before we can read and write items in our queue, we must create a $queue
object using CRM_Queue_Service.
Each $queue
is an instance of CRM_Queue_Queue.
A convenient way to produce a $queue
is to use the create-or-load pattern. This example will create a queue (if it doesn't exist) or load an existing queue (if it already exists).
// Create or load a SQL-based queue.
$queue = \Civi::queue('my-own-queue', [
'type' => 'Sql',
'reset' => FALSE,
'error' => 'abort',
]);
The create or load operation accepts these parameters:
* name
(required): Identifies the queue. If two processes instantiate a Sql
queue with the same name, then they will be working with the same data.
Parameters (an array of the following)
type
(required): Determines how data is written and read from the queue.- CiviCRM includes these queue types:
Sql
: Stores the queue data in CiviCRM's SQL database. This is useful for queues that will be run in order.SqlParallel
: Stores the queue data in CiviCRM's SQL database. Supports more than one concurrent job. Items must be suitable for running out of order.Memory
: Stores the queue data in PHP's memory. This is useful for short-lived queues.
- Each type corresponds to a class named
CRM_Queue_Queue_{$type}
. To support an additional queue type (such as "STOMP" or "Beanstalk"), one must implement a new driver class.
- CiviCRM includes these queue types:
reset
: Determines whethercreate()
should reset the content of a queue.TRUE
: Create a new, empty queue. If there's an existing queue, it is destroyed and re-created.FALSE
(default): Only create the queue if needed. If the queue already exists, then load it.
error
what should be done if an item fails. When set toerror
(default) the item will remain in the queue and further processing will not occur.runner
in most cases this should be 'task'. Optional but causes problems if not provided.is_persistent
(default is true) if true, then this queue is loaded fromcivicrm_queue
listretry_limit
(default 0) How many times can items in the queue be attempted before being considered failed. 0 is unlimitedretry_interval
How many seconds between attempts.
The create-or-load pattern is convenient, but it's not appropriate if you need to ensure that the operation starts with clean slate. In such cases, you should use the reset
parameter.
2. Create items/tasks on the queue¶
You can add anything that's serialize
-able to a queue, if you have your own routine to consume that queue, but if you want to use CiviCRM's queue runners you'll need to use CRM_Queue_Task
objects. An example of the generic queue use is in the code comments for CRM_Queue_Service
A task object is created with a callback, arguments, and a title. All of these must be serializable. Example:
$task = $queue->createItem(new CRM_Queue_Task(
// callback
['myClass', 'doMyWork'],
// arguments
['whatever', ['contact_id' => 123]],
// title
"Task $i"
));
// Set the permission context it will run in
$task->runAs = ['contactId' => CRM_Core_Session::getLoggedInContactID(), 'domainId' => CRM_Core_Config::domainID()];
The callback will receive a CRM_Queue_TaskContext
object which has 2 properties: the queue object, and a CRM_Core_Error_Log
(under log
). This means that it's possible for a task to add more tasks to the queue. By default items are added to the end of the queue. However, you can use the weight property to change this, e.g. if the main queue has a default 'weight' of zero, you can add queue items before the next items by setting a lower weight, e.g. -1.
Queue tasks can also specify a specific release time which allows for delayed queue jobs.
Example
/**
* Save an action into a queue for delayed processing
*
* @param \DateTime $delayTo
* @param array $parameters
*/
public static function delayAction(DateTime $delayTo, $parameters) {
$queue = \Civi::queue('my-queue-name', [
'type' => 'Sql',
'name' => self::QUEUE_NAME,
'reset' => false, //do not flush queue upon creation
]);
$queue->createItem(
new \CRM_Queue_Task(
['CRM_MyQueue_Engine', 'executeDelayedAction'],
$parameters,
),
// Options relevant to the queue type - mysql supports release_time and weight
[
//save the task with a delay
'release_time => $delayTo->format('YmdHis'),
// Default weight is 0, lower is sooner.
'weight' => 1,
]
);
}
3. Run the queue¶
You can run the queue in a php process or via coworker
. If you are running the queue within a php process CiviCRM's CRM_Queue_Runner
provides three methods:
-
runAllInteractive()
This will check the 'enableBackgroundQueue' setting & depending on the value callrunAllViaWeb()
ORrunAllViaBackground()
-
runAllViaWeb()
This sends the browser to a page with a progress bar on it. Ajax requests are used to trigger each job. -
runAllViaBackground()
This sets the queue to active/ ready to process and it is up to the background processor (generallycoworker
) to process the queued job. -
runAll()
This runs all the tasks one after another in one go. -
runNext()
- This runs the next task in the queue list
This runner can optionally call a callback and issue a redirect to the browser on completion. By default the runner will stop the queue (and 'release' the current item) in the case of failure, but you can override that so that failed jobs are just deleted and processing continues with the next item.
For example this is the code that runs CiviCRM imports:
/**
* Run the import.
*
* @throws \CRM_Core_Exception
*/
protected function runTheImport(): void {
$parser = $this->getParser();
$parser->queue();
$queue = Civi::queue('user_job_' . $this->getUserJobID());
$runner = new CRM_Queue_Runner([
'title' => ts('Run import'),
'queue' => $queue,
'errorMode' => CRM_Queue_Runner::ERROR_ABORT,
'onEndUrl' => CRM_Utils_System::url('civicrm/import/contact/summary', [
'user_job_id' => $this->getUserJobID(),
'reset' => 1,
], FALSE, NULL, FALSE),
]);
$runner->runAllInteractive();
}
Example 2 Running a queue via a cron job or API method style mechanism
function civicrm_api_job_runspecificqueue($params) {
$returnValues = array();
$queue = CRM_Demoqueue_Helper::singleton()->getQueue();
$runner = new CRM_Queue_Runner([
'title' => ts('Demo Queue Runner'),
'queue' => $queue,
'errorMode' => CRM_Queue_Runner::ERROR_CONTINUE,
]);
$maxRunTime = time() + 30; //stop executing next item after 30 seconds
$continue = TRUE;
while(time() < $maxRunTime && $continue) {
$result = $runner->runNext(false);
if (!$result['is_continue']) {
$continue = false; //all items in the queue are processed
}
$returnValues[] = $result;
}
// Spec: civicrm_api3_create_success($values = 1, $params = array(), $entity = NULL, $action = NULL)
return civicrm_api3_create_success($returnValues, $params, 'Demoqueue', 'Run');
Warning
Server configuration has a big impact on how successfully queues will run. Read on!
Caution: runAllViaWeb
¶
If the user closes their browser during a queue being processed via the web then the current job may (a) stop/crash or (b) continue running in the background depending on how the server runs PHP.
There is no way to safely re-connect with the queue. Re-opening the page at (/civicrm/queue/runner?reset=1&qrid=<your-queue-name>
) will immediately cause the first job in the queue to be re-run -- even if it's already running and the other tasks will follow.
This method also suffers from timeouts - again dependent on your server configuration. This can lead to jobs reporting as crashed yet actually still running, which can lead to jobs running out of order, or in parallel.
Example: Imagine a queue with three tasks. Nginx may be configured to allow 5 minutes for PHP FPM to respond to a request. Task 1 is slow and after 5 minutes nginx returns a Gateway Timeout error and PHP merrily continues. The user sees two buttons: retry or skip. Hitting retry at this point will start a parallel execution of the current task! However, if task 1 completes while the user is thinking about what to do, then the user clicks Skip, they will skip to task 3, because when task 1 completed it was removed from the queue, leaving task 2 as the current task.
If one task depends on the completion of the other, this can lead to significant data corruption.
Tip
Program checks into your tasks so that task N has some way to confirm that task N - 1 successfully ran before starting; and possibly that there is no other task N still running.
Caution: runAll
¶
If your queue uses runAll()
and is triggered by a Scheduled Job then you need to understand how your cron is set up. If you run cron by accessing the URL over http(s), then you're likely to hit timeout issues on big jobs which can cause problems for your queue and also impact other scheduled jobs.
Tip
The safest way to use runAll()
is when the script calling it is being run by PHP CLI, e.g. by drush or cv, since this usually means the script has no maximum execution time.
You may choose to run this separately from the normal CiviCRM cron if your queue is large, so that it doesn't get in the way of other tasks that may be more time dependent.
4. Interact with the queue¶
If the queue is being run using coworker you can temporarily pause it using the
queue_paused
setting or by implementing hook_civicrm_queueActive
. This might
be to reflect server load or maintenance.
Appendix: History¶
- v4.2: Introduce
CRM_Queue
subsystem, including theCRM_Queue_Service
, the single-threadedCRM_Queue_Runner
, and theSql
/Memory
drivers. - v5.28: Add
SqlParallel
driver. - v5.47: Add
Civi::queue()
helper and preliminary support for persistent queues (civicrm_queue
,Civi\Api4\Queue
,is_persistent
).