Skip to content

Webhook Queue

Payment processors can generate a lot of webhooks. They can arrive simultaneously and out of order.

This extension provides a webhook queueing and scheduled execution using the PaymentprocessorWebhook entity.

To use this functionality you must add support to your Payment Processor.

PaymentprocessorWebhook

The table civicrm_paymentprocessor_webhook records each incoming webhook along with information required to process it and a status.

Processing status

  • \<empty>: The webhook has been received but not yet processed.
  • error: The webhook has been processed but there was an error.
  • success: The webhook has been processed successfully.
  • processing: The webhook is currently being processed by the API3 Job.process_paymentprocessor_webhooks (scheduled job).

Querying the webhook table

Use the API4 PaymentprocessorWebhook entity.

Implementing in your payment processor

Currently it is only implemented for Stripe and civicrm_api3_job_process_paymentprocessor_webhooks function would need to be modified to call the appropriate API method for that processor instead of Stripe.Ipn. The intention is to support Ipn API for any supported processor.

This is the paymentprocessor function that receives the webhook:

  public function handlePaymentNotification() {
    $rawData = file_get_contents("php://input");
    $ipnClass = new CRM_Core_Payment_StripeIPN($rawData);
    if ($ipnClass->onReceiveWebhook()) {
      http_response_code(200);
    }
  }

This is the paymentprocessor function that is used to manually process a webhook and is called from API3 Stripe.Ipn:

  public static function processPaymentNotification($paymentProcessorID, $rawData, $verifyRequest = TRUE, $emailReceipt = NULL) {
    $_GET['processor_id'] = $paymentProcessorID;
    $ipnClass = new CRM_Core_Payment_StripeIPN($rawData, $verifyRequest);
    $ipnClass->setExceptionMode(FALSE);
    if (isset($emailReceipt)) {
      $ipnClass->setSendEmailReceipt($emailReceipt);
    }
    return $ipnClass->processWebhook();
  }

In your IPN code instead of using a main() method create two functions: * onReceiveWebhook(): Triggered whenever a webhook is received. Use this to record the webhook. * processWebhook(): This is the method that actually processes the webhook and may be called immediately or via the scheduled job.

  /**
   * Get a unique identifier string based on webhook data.
   *
   * @return string
   */
  private function getWebhookUniqueIdentifier() {
    return "{$this->charge_id}:{$this->invoice_id}:{$this->subscription_id}";
  }

  /**
   * When CiviCRM receives a Stripe webhook call this method (via handlePaymentNotification()).
   * This checks the webhook and either queues or triggers processing (depending on existing webhooks in queue)
   *
   * @return bool
   * @throws \CRM_Core_Exception
   * @throws \CiviCRM_API3_Exception
   * @throws \Stripe\Exception\UnknownApiErrorException
   */
  public function onReceiveWebhook() {
    if (!in_array($this->eventType, CRM_Stripe_Webhook::getDefaultEnabledEvents())) {
      // We don't handle this event, return 200 OK so Stripe does not retry.
      return TRUE;
    }

    $uniqueIdentifier = $this->getWebhookUniqueIdentifier();

    // Get all received webhooks with matching identifier which have not been processed
    // This returns all webhooks that match the uniqueIdentifier above and have not been processed.
    // For example this would match both invoice.finalized and invoice.payment_succeeded events which must be
    // processed sequentially and not simultaneously.
    $paymentProcessorWebhooks = \Civi\Api4\PaymentprocessorWebhook::get()
      ->setCheckPermissions(FALSE) // Replace with ::update(FALSE) when minversion = 5.29
      ->addWhere('payment_processor_id', '=', $this->_paymentProcessor->getID())
      ->addWhere('identifier', '=', $uniqueIdentifier)
      ->addWhere('processed_date', 'IS NULL')
      ->execute();
    $processWebhook = FALSE;
    if (empty($paymentProcessorWebhooks->rowCount)) {
      // We have not received this webhook before. Record and process it.
      $processWebhook = TRUE;
    }
    else {
      // We have one or more webhooks with matching identifier
      /** @var \CRM_Mjwshared_BAO_PaymentprocessorWebhook $paymentProcessorWebhook */
      foreach ($paymentProcessorWebhooks as $paymentProcessorWebhook) {
        // Does the eventType match our webhook?
        if ($paymentProcessorWebhook->trigger === $this->eventType) {
          // Yes, We have already recorded this webhook and it is awaiting processing.
          // Exit
          return TRUE;
        }
      }
      // We have recorded another webhook with matching identifier but different eventType.
      // There is already a recorded webhook with matching identifier that has not yet been processed.
      // So we will record this webhook but will not process now (it will be processed later by the scheduled job).
    }

    \Civi\Api4\PaymentprocessorWebhook::create()
      ->setCheckPermissions(FALSE) // Replace with ::update(FALSE) when minversion = 5.29
      ->addValue('payment_processor_id', $this->_paymentProcessor->getID())
      ->addValue('trigger', $this->eventType)
      ->addValue('identifier', $uniqueIdentifier)
      ->addValue('event_id', $this->event_id)
      ->execute();

    if (!$processWebhook) {
      return TRUE;
    }

    return $this->processWebhook();
  }

  /**
   * Process the given webhook
   *
   * @return bool
   * @throws \API_Exception
   * @throws \Civi\API\Exception\UnauthorizedException
   */
  public function processWebhook() {
    try {
      $success = $this->processEventType();
    }
    catch (Exception $e) {
      $success = FALSE;
      \Civi::log()->error('StripeIPN: processWebhook failed. ' . $e->getMessage());
    }

    $uniqueIdentifier = $this->getWebhookUniqueIdentifier();

    // Record that we have processed this webhook (success or error)
    // If for some reason we ended up with multiple webhooks with the same identifier and same eventType this would
    // update all of them as "processed". That is ok because we don't need to process the "same" webhook multiple
    // times. Even if they have different event IDs but the same identifier/eventType.
    \Civi\Api4\PaymentprocessorWebhook::update()
      ->setCheckPermissions(FALSE) // Replace with ::update(FALSE) when minversion = 5.29
      ->addWhere('identifier', '=', $uniqueIdentifier)
      ->addWhere('trigger', '=', $this->eventType)
      ->addValue('status', $success ? 'success' : 'error')
      ->addValue('processed_date', 'now')
      ->execute();

    return $success;
  }