activity_stream-1.0.x-dev/src/Plugin/QueueWorker/MessageQueueCreator.php

src/Plugin/QueueWorker/MessageQueueCreator.php
<?php

namespace Drupal\activity_stream\Plugin\QueueWorker;

use Drupal\activity_stream\Plugin\ActivityActionManager;
use Drupal\Core\Plugin\ContainerFactoryPluginInterface;
use Drupal\Core\Queue\QueueFactory;
use Drupal\node\Entity\Node;
use Symfony\Component\DependencyInjection\ContainerInterface;

/**
 * An activity stream message worker.
 *
 * @QueueWorker(
 *   id = "activity_stream_message",
 *   title = @Translation("Process activity_stream_message queue."),
 *   cron = {"time" = 60}
 * )
 *
 * This QueueWorker is responsible for creating activity / message items from the queue
 */
class MessageQueueCreator extends MessageQueueBase implements ContainerFactoryPluginInterface {

  /**
   * The action manager.
   *
   * @var \Drupal\activity_stream\Plugin\ActivityActionManager
   */
  protected $actionManager;

  /**
   * MessageQueueCreator constructor.
   *
   * @param array $configuration
   *   The configuration.
   * @param string $plugin_id
   *   The plugin id.
   * @param array $plugin_definition
   *   The plugin definition.
   * @param \Drupal\Core\Queue\QueueFactory $queue
   *   The queue.
   * @param \Drupal\activity_stream\Plugin\ActivityActionManager $actionManager
   *   The action manager.
   */
  public function __construct(array $configuration, $plugin_id, array $plugin_definition, QueueFactory $queue, ActivityActionManager $actionManager) {
    parent::__construct($configuration, $plugin_id, $plugin_definition, $queue);
    $this->actionManager = $actionManager;
  }

  /**
   * {@inheritdoc}
   */
  public static function create(ContainerInterface $container, array $configuration, $plugin_id, $plugin_definition) {
    return new static(
      $configuration,
      $plugin_id,
      $plugin_definition,
      $container->get('queue'),
      $container->get('plugin.manager.activity_action.processor')
    );
  }

  /**
   * {@inheritdoc}
   */
  public function processItem($data) {

    // First make sure it's an actual entity.
    if ($entity = Node::load($data['entity_id'])) {
      // Check if it's created more than 20 seconds ago.
      $timestamp = $entity->getCreatedTime();
      // Current time.
      $now = time();
      $diff = abs($now - $timestamp);

      // Items must be at least 5 seconds old.
      if ($diff <= 5 && $now > $timestamp) {
        // Wait for 100 milliseconds.
        // We don't want to flood the DB with unprocessable queue items.
        usleep(100000);
        $this->createQueueItem('activity_stream_message', $data);
      }
      else {
        // Trigger the create action for entities.
        if ($this->actionManager->hasDefinition('create_entitiy_action')) {
          $create_action = $this->actionManager->createInstance('create_entitiy_action');
          $create_action->createMessage($entity);
        }
      }
    }
  }

}

Главная | Обратная связь

drupal hosting | друпал хостинг | it patrol .inc