queue_stats-8.x-1.0-beta1/src/MonitoredQueue.php
src/MonitoredQueue.php
<?php
namespace Drupal\queue_stats;
use Drupal\Component\Datetime\TimeInterface;
use Drupal\Core\Queue\QueueGarbageCollectionInterface;
use Drupal\Core\Queue\QueueInterface;
use Drupal\queue_stats\Event\QueueItemEvent;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
/**
* Decorator for monitored queues.
*/
class MonitoredQueue implements MonitoredQueueInterface, QueueInterface, QueueGarbageCollectionInterface {
/**
* The decorated queue.
*
* @var \Drupal\Core\Queue\QueueInterface
*/
protected $queue;
/**
* The queue name.
*
* @var string
*/
protected $name;
/**
* The event dispatcher.
*
* @var \Symfony\Component\EventDispatcher\EventDispatcherInterface
*/
protected $dispatcher;
/**
* The current time.
*
* @var \Drupal\Component\Datetime\TimeInterface
*/
protected $time;
/**
* MonitoredQueue constructor.
*
* @param \Drupal\Core\Queue\QueueInterface $queue
* The queue to be monitored.
* @param string $name
* The queue name.
* @param \Symfony\Component\EventDispatcher\EventDispatcherInterface $dispatcher
* The event dispatcher.
* @param \Drupal\Component\Datetime\TimeInterface $time
* The current time.
*/
public function __construct(QueueInterface $queue, string $name, EventDispatcherInterface $dispatcher, TimeInterface $time) {
$this->queue = $queue;
$this->name = $name;
$this->dispatcher = $dispatcher;
$this->time = $time;
}
/**
* Deletes a finished item from the queue.
*
* This will update statistics about the processing of the item.
*
* @param mixed $item
* The item returned by \Drupal\Core\Queue\QueueInterface::claimItem().
*/
public function deleteItem($item) {
$event = new QueueItemEvent(QueueItemEvent::PROCESSING_COMPLETED, $this, $item, $this->time->getCurrentMicroTime());
$this->dispatcher->dispatch($event, $event->getName());
$this->queue->deleteItem($item);
}
/**
* {@inheritdoc}
*/
public function claimItem($lease_time = 3600) {
$item = $this->queue->claimItem($lease_time);
if ($item !== FALSE) {
$event = new QueueItemEvent(QueueItemEvent::PROCESSING_STARTED, $this, $item, $this->time->getCurrentMicroTime());
$this->dispatcher->dispatch($event, $event->getName());
}
return $item;
}
/**
* {@inheritdoc}
*/
public function createItem($data) {
return $this->queue->createItem($data);
}
/**
* {@inheritdoc}
*/
public function numberOfItems() {
return $this->queue->numberOfItems();
}
/**
* {@inheritdoc}
*/
public function releaseItem($item) {
$released = $this->queue->releaseItem($item);
if ($released) {
$event = new QueueItemEvent(QueueItemEvent::PROCESSING_ABORTED, $this, $item, $this->time->getCurrentMicroTime());
$this->dispatcher->dispatch($event, $event->getName());
}
return $released;
}
/**
* {@inheritdoc}
*/
public function createQueue() {
$this->queue->createQueue();
}
/**
* {@inheritdoc}
*/
public function deleteQueue() {
$this->queue->deleteQueue();
}
/**
* {@inheritdoc}
*/
public function garbageCollection() {
// We cannot be sure whether our wrapped queue requires garbage collection
// or not. If it does then we do it.
if ($this->queue instanceof QueueGarbageCollectionInterface) {
$this->queue->garbageCollection();
}
}
/**
* {@inheritdoc}
*/
public function getName() {
return $this->name;
}
/**
* Return the queue being monitored.
*
* @return \Drupal\Core\Queue\QueueInterface
* The queue being monitored.
*/
public function getMonitoredQueue() {
return $this->queue;
}
}
