Skip to content

Queues Reference

CiviCRM has a system for putting tasks in a queue to be processed. This processing can be in the browser or via code or via coworker if you configure it for your site

Overview

The code that creates the queue and queue tasks may be separate to the code that runs the queue and often will not take any responsibility for running it.

  1. Create a queue object via the service provider
  2. Create tasks (or other items) for the queue
  3. Run the queue
  4. Interact with the queue

There's a demo queue extension that shows how to use an SQL-based queue.

Definitions

  • Queue: An object representing a queue, which may be new or existing.
  • Queue type: A storage backend for the list of queue items.
  • Queue runner: A class for processing the queued items.
  • Item: A single job on a queue
  • Task: A particular type of item, as expected by CiviCRM's queue runner.
  • Release time: The time at which a job can be considered to have crashed if it is not completed (defaults to 1 hour). Note that it may well not have crashed and could still actually be running, especially in PHP FPM environments!
  • Claim item: Refers to fetching the first item in a queue (if one exists) unless that item's release time has not been reached (typically meaning that item is currently being processed).
  • Steal item: Refers to fetching the first item in a queue regardless and setting a new release time.
  • Release item: Refers to leaving a failed queue item on the queue (e.g. for later retry)

1. Creating a queue

Before we can read and write items in our queue, we must create a $queue object using CRM_Queue_Service. Each $queue is an instance of CRM_Queue_Queue.

A convenient way to produce a $queue is to use the create-or-load pattern. This example will create a queue (if it doesn't exist) or load an existing queue (if it already exists).

// Create or load a SQL-based queue.
$queue = \Civi::queue('my-own-queue', [
  'type'  => 'Sql',
  'reset' => FALSE,
  'error' => 'abort',
]);

The create or load operation accepts these parameters: * name (required): Identifies the queue. If two processes instantiate a Sql queue with the same name, then they will be working with the same data.

Parameters (an array of the following)

  • type (required): Determines how data is written and read from the queue.
    • CiviCRM includes these queue types:
      • Sql: Stores the queue data in CiviCRM's SQL database. This is useful for queues that will be run in order.
      • SqlParallel: Stores the queue data in CiviCRM's SQL database. Supports more than one concurrent job. Items must be suitable for running out of order.
      • Memory: Stores the queue data in PHP's memory. This is useful for short-lived queues.
    • Each type corresponds to a class named CRM_Queue_Queue_{$type}. To support an additional queue type (such as "STOMP" or "Beanstalk"), one must implement a new driver class.
  • reset: Determines whether create() should reset the content of a queue.
    • TRUE : Create a new, empty queue. If there's an existing queue, it is destroyed and re-created.
    • FALSE(default): Only create the queue if needed. If the queue already exists, then load it.
  • error what should be done if an item fails. When set to error (default) the item will remain in the queue and further processing will not occur.
  • runner in most cases this should be 'task'. Optional but causes problems if not provided.
  • is_persistent (default is true) if true, then this queue is loaded from civicrm_queue list
  • retry_limit (default 0) How many times can items in the queue be attempted before being considered failed. 0 is unlimited
  • retry_interval How many seconds between attempts.

The create-or-load pattern is convenient, but it's not appropriate if you need to ensure that the operation starts with clean slate. In such cases, you should use the reset parameter.

2. Create items/tasks on the queue

You can add anything that's serialize-able to a queue, if you have your own routine to consume that queue, but if you want to use CiviCRM's queue runners you'll need to use CRM_Queue_Task objects. An example of the generic queue use is in the code comments for CRM_Queue_Service

A task object is created with a callback, arguments, and a title. All of these must be serializable. Example:

$task = $queue->createItem(new CRM_Queue_Task(
  // callback
  ['myClass', 'doMyWork'],
  // arguments 
  ['whatever', ['contact_id' => 123]], 
  // title
  "Task $i" 
));
// Set the permission context it will run in
$task->runAs = ['contactId' => CRM_Core_Session::getLoggedInContactID(), 'domainId' => CRM_Core_Config::domainID()];

The callback will receive a CRM_Queue_TaskContext object which has 2 properties: the queue object, and a CRM_Core_Error_Log (under log). This means that it's possible for a task to add more tasks to the queue. By default items are added to the end of the queue. However, you can use the weight property to change this, e.g. if the main queue has a default 'weight' of zero, you can add queue items before the next items by setting a lower weight, e.g. -1.

Queue tasks can also specify a specific release time which allows for delayed queue jobs.

Example

/**
 * Save an action into a queue for delayed processing
 *
 * @param \DateTime $delayTo
 * @param array $parameters
 */
public static function delayAction(DateTime $delayTo, $parameters) {
  $queue = \Civi::queue('my-queue-name', [
    'type' => 'Sql',
    'name' => self::QUEUE_NAME,
    'reset' => false, //do not flush queue upon creation
  ]);

  $queue->createItem(
    new \CRM_Queue_Task(
      ['CRM_MyQueue_Engine', 'executeDelayedAction'],
      $parameters,
    ),
    // Options relevant to the queue type - mysql supports release_time and weight
    [
      //save the task with a delay
      'release_time => $delayTo->format('YmdHis'),
      // Default weight is 0, lower is sooner.
      'weight' => 1,
    ]
  );
}

3. Run the queue

You can run the queue in a php process or via coworker. If you are running the queue within a php process CiviCRM's CRM_Queue_Runner provides three methods:

  1. runAllInteractive() This will check the 'enableBackgroundQueue' setting & depending on the value call runAllViaWeb() OR runAllViaBackground()

  2. runAllViaWeb() This sends the browser to a page with a progress bar on it. Ajax requests are used to trigger each job.

  3. runAllViaBackground() This sets the queue to active/ ready to process and it is up to the background processor (generally coworker) to process the queued job.

  4. runAll() This runs all the tasks one after another in one go.

  5. runNext() - This runs the next task in the queue list

This runner can optionally call a callback and issue a redirect to the browser on completion. By default the runner will stop the queue (and 'release' the current item) in the case of failure, but you can override that so that failed jobs are just deleted and processing continues with the next item.

For example this is the code that runs CiviCRM imports:

  /**
   * Run the import.
   *
   * @throws \CRM_Core_Exception
   */
  protected function runTheImport(): void {
    $parser = $this->getParser();
    $parser->queue();
    $queue = Civi::queue('user_job_' . $this->getUserJobID());
    $runner = new CRM_Queue_Runner([
      'title' => ts('Run import'),
      'queue' => $queue,
      'errorMode' => CRM_Queue_Runner::ERROR_ABORT,
      'onEndUrl' => CRM_Utils_System::url('civicrm/import/contact/summary', [
        'user_job_id' => $this->getUserJobID(),
        'reset' => 1,
      ], FALSE, NULL, FALSE),
    ]);
    $runner->runAllInteractive();
  }

Example 2 Running a queue via a cron job or API method style mechanism

function civicrm_api_job_runspecificqueue($params) {
  $returnValues = array();

  $queue = CRM_Demoqueue_Helper::singleton()->getQueue();
  $runner = new CRM_Queue_Runner([
    'title' => ts('Demo Queue Runner'),
    'queue' => $queue,
    'errorMode' => CRM_Queue_Runner::ERROR_CONTINUE,
  ]);

  $maxRunTime = time() + 30; //stop executing next item after 30 seconds
  $continue = TRUE;

  while(time() < $maxRunTime && $continue) {
    $result = $runner->runNext(false);
    if (!$result['is_continue']) {
      $continue = false; //all items in the queue are processed
    }
    $returnValues[] = $result;
  }
  // Spec: civicrm_api3_create_success($values = 1, $params = array(), $entity = NULL, $action = NULL)
  return civicrm_api3_create_success($returnValues, $params, 'Demoqueue', 'Run');

Warning

Server configuration has a big impact on how successfully queues will run. Read on!

Caution: runAllViaWeb

If the user closes their browser during a queue being processed via the web then the current job may (a) stop/crash or (b) continue running in the background depending on how the server runs PHP.

There is no way to safely re-connect with the queue. Re-opening the page at (/civicrm/queue/runner?reset=1&qrid=<your-queue-name>) will immediately cause the first job in the queue to be re-run -- even if it's already running and the other tasks will follow.

This method also suffers from timeouts - again dependent on your server configuration. This can lead to jobs reporting as crashed yet actually still running, which can lead to jobs running out of order, or in parallel.

Example: Imagine a queue with three tasks. Nginx may be configured to allow 5 minutes for PHP FPM to respond to a request. Task 1 is slow and after 5 minutes nginx returns a Gateway Timeout error and PHP merrily continues. The user sees two buttons: retry or skip. Hitting retry at this point will start a parallel execution of the current task! However, if task 1 completes while the user is thinking about what to do, then the user clicks Skip, they will skip to task 3, because when task 1 completed it was removed from the queue, leaving task 2 as the current task.

If one task depends on the completion of the other, this can lead to significant data corruption.

Tip

Program checks into your tasks so that task N has some way to confirm that task N - 1 successfully ran before starting; and possibly that there is no other task N still running.

Caution: runAll

If your queue uses runAll() and is triggered by a Scheduled Job then you need to understand how your cron is set up. If you run cron by accessing the URL over http(s), then you're likely to hit timeout issues on big jobs which can cause problems for your queue and also impact other scheduled jobs.

Tip

The safest way to use runAll() is when the script calling it is being run by PHP CLI, e.g. by drush or cv, since this usually means the script has no maximum execution time.

You may choose to run this separately from the normal CiviCRM cron if your queue is large, so that it doesn't get in the way of other tasks that may be more time dependent.

4. Interact with the queue

If the queue is being run using coworker you can temporarily pause it using the queue_paused setting or by implementing hook_civicrm_queueActive. This might be to reflect server load or maintenance.

Appendix: History

  • v4.2: Introduce CRM_Queue subsystem, including the CRM_Queue_Service, the single-threaded CRM_Queue_Runner, and the Sql/Memory drivers.
  • v5.28: Add SqlParallel driver.
  • v5.47: Add Civi::queue() helper and preliminary support for persistent queues (civicrm_queue, Civi\Api4\Queue, is_persistent).