queue_stats-8.x-1.0-beta1/src/Plugin/QueueStatistic/ProcessingRate.php
src/Plugin/QueueStatistic/ProcessingRate.php
<?php
namespace Drupal\queue_stats\Plugin\QueueStatistic;
use Drupal\Core\Plugin\ContainerFactoryPluginInterface;
use Drupal\Core\State\StateInterface;
use Drupal\queue_stats\Event\QueueItemEvent;
use Drupal\queue_stats\MonitoredQueueInterface;
use Drupal\queue_stats\Plugin\QueueStatisticBase;
use Drupal\queue_stats\Plugin\StatefulStatistic;
use Drupal\queue_stats\Statistics\ExponentialMovingAverage;
use Symfony\Component\DependencyInjection\ContainerInterface;
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
/**
* Calculate how many items are processed over time.
*
* @QueueStatistic(
* id = "processing_rate",
* label = "Average processing rate"
* )
*/
class ProcessingRate extends QueueStatisticBase implements EventSubscriberInterface, ContainerFactoryPluginInterface {
use StatefulStatistic;
/**
* ProcessingRate constructor.
*
* @param array $configuration
* The plugin configuration.
* @param string $plugin_id
* The plugin id.
* @param mixed $plugin_definition
* The plugin definition.
* @param \Drupal\Core\State\StateInterface $state
* The site state.
*/
public function __construct(
array $configuration,
string $plugin_id,
$plugin_definition,
StateInterface $state
) {
parent::__construct($configuration, $plugin_id, $plugin_definition);
$this->state = $state;
}
/**
* {@inheritdoc}
*/
public static function create(
ContainerInterface $container,
array $configuration,
$plugin_id,
$plugin_definition
) {
return new static(
$configuration,
$plugin_id,
$plugin_definition,
$container->get('state')
);
}
/**
* {@inheritdoc}
*/
public static function getSubscribedEvents() {
return [
QueueItemEvent::PROCESSING_COMPLETED => 'trackCompletion'
];
}
/**
* Event handler for when items have been processed successfully.
*
* @param \Drupal\queue_stats\Event\QueueItemEvent $event
* The event.
*/
public function trackCompletion(QueueItemEvent $event) {
/** @var \Drupal\queue_stats\Statistics\ExponentialMovingAverage $value */
$value = $this->retrieveValue($event->getQueue(), 'value', new ExponentialMovingAverage(10));
$last_processing_time = $this->retrieveValue($event->getQueue(), 'last_processing_time');
$processing_rate = 1 / ($event->getTimestamp() - $last_processing_time);
// Protect against parallelized processing or PHP time handling causing
// situations where the last processing is actually after the completion
// of the event.
if ($processing_rate > 0) {
$value->add($processing_rate);
$this->storeValue($event->getQueue(), 'value', $value);
$this->storeValue($event->getQueue(), 'last_processing_time', $event->getTimestamp());
}
}
/**
* {@inheritdoc}
*/
public function getValue(MonitoredQueueInterface $queue) {
/** @var \Drupal\queue_stats\Statistics\ExponentialMovingAverage $item_rate */
$item_rate = $this->retrieveValue($queue, 'value', new ExponentialMovingAverage(10));
return $item_rate->average();
}
/**
* {@inheritdoc}
*/
public function formatValue($value) {
$unit_interval_map = [
'items/second' => 1,
'items/minute' => 60,
'items/hour' => 60 * 60,
'items/day' => 60 * 60,
'items/week' => 60 * 60 * 7,
'items/month' => 60 * 60 * 30,
'items/year' => 60 * 60 * 365
];
$item_rate_unit = array_map(function ($interval) use ($value) {
return $value * $interval;
}, $unit_interval_map);
$valid_item_rate_unit = array_filter($item_rate_unit, function ($rate) {
return $rate > 1;
});
$displayed_rates = array_values($valid_item_rate_unit);
$displayed_units = array_keys($valid_item_rate_unit);
$rate = '';
if (!empty($displayed_rates)) {
$displayed_rate = round($displayed_rates[0], 0);
$rate = "{$displayed_rate} {$displayed_units[0]}";
}
return $rate;
}
/**
* {@inheritdoc}
*/
public function reset(MonitoredQueueInterface $queue) {
$this->deleteValue($queue, 'value');
}
}
