queue_stats-8.x-1.0-beta1/src/Plugin/QueueStatisticManager.php
src/Plugin/QueueStatisticManager.php
<?php
namespace Drupal\queue_stats\Plugin;
use Drupal\Core\Plugin\DefaultPluginManager;
use Drupal\Core\Cache\CacheBackendInterface;
use Drupal\Core\Extension\ModuleHandlerInterface;
use Drupal\Core\Queue\QueueWorkerManagerInterface;
use Drupal\queue_stats\Event\QueueItemEvent;
use Drupal\queue_stats\MonitoredQueueFactory;
use Symfony\Component\EventDispatcher\EventDispatcher;
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
/**
* Provides the Queue statistic plugin manager.
*/
class QueueStatisticManager extends DefaultPluginManager implements EventSubscriberInterface {
/**
* The queue worker manager.
*
* @var \Drupal\Core\Queue\QueueWorkerManagerInterface
*/
protected $queueWorkerManager;
/**
* The queue factory.
*
* @var \Drupal\queue_stats\MonitoredQueueFactory
*/
protected $queueFactory;
/**
* The event dispatcher used to dispatch queue events.
*
* @var \Symfony\Component\EventDispatcher\EventDispatcher
*/
protected $eventDispatcher;
/**
* Constructs a new QueueStatisticManager object.
*
* @param \Traversable $namespaces
* An object that implements \Traversable which contains the root paths
* keyed by the corresponding namespace to look for plugin implementations.
* @param \Drupal\Core\Cache\CacheBackendInterface $cache_backend
* Cache backend instance to use.
* @param \Drupal\Core\Extension\ModuleHandlerInterface $module_handler
* The module handler to invoke the alter hook with.
* @param \Drupal\Core\Queue\QueueWorkerManagerInterface $queue_worker_manager
* The queue worker manager.
* @param \Drupal\queue_stats\MonitoredQueueFactory $queue_factory
* The queue factory.
*/
public function __construct(
\Traversable $namespaces,
CacheBackendInterface $cache_backend,
ModuleHandlerInterface $module_handler,
QueueWorkerManagerInterface $queue_worker_manager,
MonitoredQueueFactory $queue_factory
) {
parent::__construct('Plugin/QueueStatistic', $namespaces, $module_handler, 'Drupal\queue_stats\Plugin\QueueStatisticInterface', 'Drupal\queue_stats\Annotation\QueueStatistic');
// Make queue statistics alterable for extensibility and cachable for
// performance.
$this->alterInfo('queue_stats_queue_stat_info');
$this->setCacheBackend($cache_backend, 'queue_stats_queue_stat_plugins');
$this->queueWorkerManager = $queue_worker_manager;
$this->queueFactory = $queue_factory;
// Register all our queue statistics plugins as event handlers for our
// internal event handling. This way we avoid having to register them
// separately as event handlers. It also solves problems with plugin
// instantiation.
$plugin_definitions = $this->getDefinitions();
$plugins = array_map(function (array $plugin) {
return $this->createInstance($plugin['id']);
}, $plugin_definitions);
$event_subscriber_plugins = array_filter(
$plugins,
function ($plugin) {
return $plugin instanceof EventSubscriberInterface;
});
$this->eventDispatcher = new EventDispatcher();
foreach ($event_subscriber_plugins as $plugin) {
$this->eventDispatcher->addSubscriber($plugin);
}
}
/**
* {@inheritdoc}
*/
public function getDefinitions() {
$definitions = parent::getDefinitions();
uasort($definitions, function (array $a, array $b) {
$a_order = $a['order'] ?? 0;
$b_order = $b['order'] ?? 0;
return $a_order <=> $b_order;
});
return $definitions;
}
/**
* {@inheritdoc}
*/
public static function getSubscribedEvents() {
return [
QueueItemEvent::PROCESSING_STARTED => 'onEvent',
QueueItemEvent::PROCESSING_COMPLETED => 'onEvent',
QueueItemEvent::PROCESSING_ABORTED => 'onEvent'
];
}
/**
* Event handler for reacting to queue events.
*
* @param \Drupal\queue_stats\Event\QueueItemEvent $event
* Queue event.
*/
public function onEvent(QueueItemEvent $event) {
$this->eventDispatcher->dispatch($event, $event->getName());
}
/**
* Reset all statistics.
*/
public function resetStatistics() {
foreach ($this->getDefinitions() as $stat_definition) {
$stat = $this->createInstance($stat_definition['id']);
foreach ($this->queueWorkerManager->getDefinitions() as $queue_name => $queue_definition) {
$queue = $this->queueFactory->get($queue_name);
$stat->reset($queue);
}
}
}
}
