drupalorg-1.0.x-dev/src/Plugin/QueueWorker/DrupalOrgDelayedItemsQueueWorker.php

src/Plugin/QueueWorker/DrupalOrgDelayedItemsQueueWorker.php
<?php

namespace Drupal\drupalorg\Plugin\QueueWorker;

use Drupal\Core\Plugin\ContainerFactoryPluginInterface;
use Drupal\Core\Queue\DelayableQueueInterface;
use Drupal\Core\Queue\DelayedRequeueException;
use Drupal\Core\Queue\QueueFactory;
use Drupal\Core\Queue\QueueWorkerBase;
use Drupal\Core\StringTranslation\StringTranslationTrait;
use Psr\Log\LoggerInterface;
use Symfony\Component\DependencyInjection\ContainerInterface;

/**
 * Defines 'drupalorg_delayed_items_queue_worker' queue worker.
 *
 * Run via `drush` like this:
 * `drush queue:run drupalorg_delayed_items_queue_worker`.
 *
 * Do NOT use this via rabbitMQ, we want the database handler.
 *
 * @QueueWorker(
 *   id = "drupalorg_delayed_items_queue_worker",
 *   title = @Translation("Delayed Items Queue Worker")
 * )
 */
class DrupalOrgDelayedItemsQueueWorker extends QueueWorkerBase implements ContainerFactoryPluginInterface {

  use StringTranslationTrait;

  /**
   * Seconds since the item was originally queued.
   *
   * @var int
   */
  const THRESHOLD = 30;

  /**
   * Maximum number of delayed attempts.
   *
   * @var int
   */
  const MAX_ATTEMPTS = 15;

  /**
   * {@inheritdoc}
   */
  public static function create(ContainerInterface $container, array $configuration, $plugin_id, $plugin_definition) {
    return new static(
      $configuration,
      $plugin_id,
      $plugin_definition,
      $container->get('logger.factory')->get('drupalorg'),
      $container->get('queue')
    );
  }

  /**
   * {@inheritdoc}
   */
  public function __construct(
    array $configuration,
    $plugin_id,
    $plugin_definition,
    protected LoggerInterface $logger,
    protected QueueFactory $queueFactory,
  ) {
    parent::__construct($configuration, $plugin_id, $plugin_definition);
  }

  /**
   * {@inheritdoc}
   */
  public function processItem($data) {
    $this_queue = $this->queueFactory->get($this->getPluginId());
    $original_queue = $data['queue'] ?? NULL;

    // Basic data.
    if (empty($original_queue)) {
      $this->logger->error('Could not process delayed item, missing "queue" name.');
      return;
    }
    if (empty($data['delayed_at'])) {
      $this->logger->error('Could not process delayed item, missing "delayed_at" attribute.');
      return;
    }

    // How many times has this item being in this queue.
    if (empty($data['delayed_attempts'])) {
      $data['delayed_attempts'] = 1;
    }
    if ($data['delayed_attempts'] > self::MAX_ATTEMPTS) {
      // This might be beyond repair.
      $this->logger->error('Item from queue @queue has exceeded the maximum delayed-queue attempts of @max', [
        '@queue' => $original_queue,
        '@max' => self::MAX_ATTEMPTS,
      ]);
      return;
    }

    // See if we re-send it back to the original queue or wait a bit longer.
    $delayed_at = $data['delayed_at'];
    $now = strtotime('now');
    $diff = $now - $delayed_at;
    if ($diff > self::THRESHOLD) {
      // Send it back to the original queue for reprocessing.
      unset($data['attempts']);
      unset($data['queue']);
      unset($data['delayed_at']);
      $data['delayed_attempts']++;
      $this->queueFactory->get($original_queue)->createItem($data);
      $this->logger->info('Delayed item from @queue sent back to the queue after @seconds seconds.', [
        '@queue' => $original_queue,
        '@seconds' => $diff,
      ]);
    }
    elseif ($this_queue instanceof DelayableQueueInterface) {
      // Delay the item.
      $this->logger->info('Item from @queue delayed @seconds seconds.', [
        '@queue' => $original_queue,
        '@seconds' => self::THRESHOLD,
      ]);
      throw new DelayedRequeueException(self::THRESHOLD, $this->t('Item from @queue delayed @seconds seconds.', [
        '@queue' => $original_queue,
        '@seconds' => self::THRESHOLD,
      ]));
    }
    else {
      throw new \Exception('Queue does not support delay');
    }
  }

}

Главная | Обратная связь

drupal hosting | друпал хостинг | it patrol .inc