entity_change_notifier-8.x-1.0/src/Plugin/QueueWorker/EntityPublisherWorker.php
src/Plugin/QueueWorker/EntityPublisherWorker.php
<?php
namespace Drupal\entity_change_notifier\Plugin\QueueWorker;
use Drupal\Core\Entity\EntityTypeManagerInterface;
use Drupal\Core\Logger\LoggerChannelInterface;
use Drupal\Core\Plugin\ContainerFactoryPluginInterface;
use Drupal\Core\Queue\QueueWorkerBase;
use Drupal\Core\StringTranslation\TranslatableMarkup;
use Drupal\entity_change_notifier\EntityPublisherInterface;
use Drupal\entity_change_notifier\Plugin\MessageDestination\NotifyException;
use Symfony\Component\DependencyInjection\ContainerInterface;
/**
* Queue worker implementation to handle retrying failed messages.
*
* @QueueWorker(
* id ="entity_change_notifier_retry",
* title = @Translation("Retries failed publisher actions"),
* cron = {15},
* )
*/
class EntityPublisherWorker extends QueueWorkerBase implements ContainerFactoryPluginInterface {
/**
* Entity Publisher Service.
*
* @var \Drupal\entity_change_notifier\EntityPublisherInterface
*/
protected $entityPublisher;
/**
* The service used to load Publisher config entities.
*
* @var \Drupal\Core\Entity\EntityTypeManagerInterface
*/
protected $entityTypeManager;
/**
* The logger used to notify about dropped messages.
*
* @var \Drupal\Core\Logger\LoggerChannelInterface
*/
protected $logger;
/**
* Constructs a EntityPublisherWorker object.
*
* @param array $configuration
* A configuration array containing information about the plugin instance.
* @param string $plugin_id
* The plugin_id for the plugin instance.
* @param mixed $plugin_definition
* The plugin implementation definition.
* @param \Drupal\entity_change_notifier\EntityPublisherInterface $entity_publisher
* The entity publisher.
* @param \Drupal\Core\Entity\EntityTypeManagerInterface $entity_type_manager
* The entity type manager.
* @param \Drupal\Core\Logger\LoggerChannelInterface $logger
* The logger used to log dropped notifications.
*/
public function __construct(array $configuration, $plugin_id, $plugin_definition, EntityPublisherInterface $entity_publisher, EntityTypeManagerInterface $entity_type_manager, LoggerChannelInterface $logger) {
parent::__construct($configuration, $plugin_id, $plugin_definition);
$this->entityPublisher = $entity_publisher;
$this->entityTypeManager = $entity_type_manager;
$this->logger = $logger;
}
/**
* {@inheritdoc}
*
* @codeCoverageIgnore
*/
public static function create(ContainerInterface $container, array $configuration, $plugin_id, $plugin_definition) {
return new static(
$configuration,
$plugin_id,
$plugin_definition,
$container->get('entity_change_notifier.entity_publisher'),
$container->get('entity_type.manager'),
$container->get('logger.channel.entity_change_notifier')
);
}
/**
* {@inheritdoc}
*/
public function processItem($data) {
/** @var \Drupal\entity_change_notifier\Plugin\QueueWorker\FailedItem $data */
/** @var \Drupal\entity_change_notifier\Entity\DestinationInterface $destinationConfigEntity */
$destinationConfigEntity = $this->entityTypeManager->getStorage('ecn_destination')->load($data->getMessageDestinationEntityId());
$entityId = $data->getData()['entity_id'];
$entityType = $data->getData()['entity_type'];
$entity = $this->entityTypeManager->getStorage($entityType)->load($entityId);
// Drop the queue item if the destination has been deleted.
if (!$destinationConfigEntity) {
// The entity could have been deleted too.
$entityPlaceholders = [
'%title' => isset($entity) ? $entity->label() : new TranslatableMarkup('Unknown title'),
'%entity_id' => $entityId,
'%type' => $entityType,
];
if ($entity && $entity->hasLinkTemplate('canonical')) {
$entityPlaceholders['link'] = new TranslatableMarkup('<a href="@url">View</a>', [
'@url' => $entity->toUrl('canonical')->toString(),
]);
}
$this->logger->error("Could not load destination %id. %title (%type %entity_id) will not be retried.",
[
'%id' => $data->getMessageDestinationEntityId(),
] + $entityPlaceholders);
return;
}
try {
$this->entityPublisher->retryNotification($destinationConfigEntity, $data->getData());
}
catch (NotifyException $e) {
if ($data->getExpires() < time()) {
$this->entityPublisher->logDropped($destinationConfigEntity, $e, $entity);
}
else {
// Cron's queue processor always logs exceptions, and there's no way to
// note an item has failed but doesn't need to be logged. In this case,
// we are in a different logger channel, so while there is some overlap
// in the messages it's not complete.
$entity = $this->entityTypeManager->getStorage($entityType)->load($entityId);
$this->entityPublisher->logRetry($destinationConfigEntity, $e, $entity);
throw $e;
}
}
}
}
