drupalorg-1.0.x-dev/src/Plugin/QueueWorker/DrupalOrgDelayedItemsQueueWorker.php
src/Plugin/QueueWorker/DrupalOrgDelayedItemsQueueWorker.php
<?php
namespace Drupal\drupalorg\Plugin\QueueWorker;
use Drupal\Core\Plugin\ContainerFactoryPluginInterface;
use Drupal\Core\Queue\DelayableQueueInterface;
use Drupal\Core\Queue\DelayedRequeueException;
use Drupal\Core\Queue\QueueFactory;
use Drupal\Core\Queue\QueueWorkerBase;
use Drupal\Core\StringTranslation\StringTranslationTrait;
use Psr\Log\LoggerInterface;
use Symfony\Component\DependencyInjection\ContainerInterface;
/**
* Defines 'drupalorg_delayed_items_queue_worker' queue worker.
*
* Run via `drush` like this:
* `drush queue:run drupalorg_delayed_items_queue_worker`.
*
* Do NOT use this via rabbitMQ, we want the database handler.
*
* @QueueWorker(
* id = "drupalorg_delayed_items_queue_worker",
* title = @Translation("Delayed Items Queue Worker")
* )
*/
class DrupalOrgDelayedItemsQueueWorker extends QueueWorkerBase implements ContainerFactoryPluginInterface {
use StringTranslationTrait;
/**
* Seconds since the item was originally queued.
*
* @var int
*/
const THRESHOLD = 30;
/**
* Maximum number of delayed attempts.
*
* @var int
*/
const MAX_ATTEMPTS = 15;
/**
* {@inheritdoc}
*/
public static function create(ContainerInterface $container, array $configuration, $plugin_id, $plugin_definition) {
return new static(
$configuration,
$plugin_id,
$plugin_definition,
$container->get('logger.factory')->get('drupalorg'),
$container->get('queue')
);
}
/**
* {@inheritdoc}
*/
public function __construct(
array $configuration,
$plugin_id,
$plugin_definition,
protected LoggerInterface $logger,
protected QueueFactory $queueFactory,
) {
parent::__construct($configuration, $plugin_id, $plugin_definition);
}
/**
* {@inheritdoc}
*/
public function processItem($data) {
$this_queue = $this->queueFactory->get($this->getPluginId());
$original_queue = $data['queue'] ?? NULL;
// Basic data.
if (empty($original_queue)) {
$this->logger->error('Could not process delayed item, missing "queue" name.');
return;
}
if (empty($data['delayed_at'])) {
$this->logger->error('Could not process delayed item, missing "delayed_at" attribute.');
return;
}
// How many times has this item being in this queue.
if (empty($data['delayed_attempts'])) {
$data['delayed_attempts'] = 1;
}
if ($data['delayed_attempts'] > self::MAX_ATTEMPTS) {
// This might be beyond repair.
$this->logger->error('Item from queue @queue has exceeded the maximum delayed-queue attempts of @max', [
'@queue' => $original_queue,
'@max' => self::MAX_ATTEMPTS,
]);
return;
}
// See if we re-send it back to the original queue or wait a bit longer.
$delayed_at = $data['delayed_at'];
$now = strtotime('now');
$diff = $now - $delayed_at;
if ($diff > self::THRESHOLD) {
// Send it back to the original queue for reprocessing.
unset($data['attempts']);
unset($data['queue']);
unset($data['delayed_at']);
$data['delayed_attempts']++;
$this->queueFactory->get($original_queue)->createItem($data);
$this->logger->info('Delayed item from @queue sent back to the queue after @seconds seconds.', [
'@queue' => $original_queue,
'@seconds' => $diff,
]);
}
elseif ($this_queue instanceof DelayableQueueInterface) {
// Delay the item.
$this->logger->info('Item from @queue delayed @seconds seconds.', [
'@queue' => $original_queue,
'@seconds' => self::THRESHOLD,
]);
throw new DelayedRequeueException(self::THRESHOLD, $this->t('Item from @queue delayed @seconds seconds.', [
'@queue' => $original_queue,
'@seconds' => self::THRESHOLD,
]));
}
else {
throw new \Exception('Queue does not support delay');
}
}
}
