media_mpx-8.x-1.x-dev/src/NotificationListener.php

src/NotificationListener.php
<?php

namespace Drupal\media_mpx;

use Drupal\Core\State\StateInterface;
use Drupal\media_mpx\Plugin\media\Source\MpxMediaSourceInterface;
use GuzzleHttp\Exception\ConnectException;
use Lullabot\Mpx\DataService\DataServiceManager;
use Lullabot\Mpx\DataService\DiscoveredDataService;
use Lullabot\Mpx\DataService\Notification;
use Lullabot\Mpx\DataService\NotificationListener as MpxNotificationListener;
use Lullabot\Mpx\Exception\ClientException;
use Psr\Log\LoggerInterface;

/**
 * Listen for mpx notifications.
 *
 * This class is like \Lullabot\Mpx\DataService\NotificationListener, but
 * contains Drupal-specific dependencies like Media Sources.
 */
class NotificationListener {

  /**
   * The factory for authenticated mpx clients.
   *
   * @var \Drupal\media_mpx\AuthenticatedClientFactory
   */
  private $authenticatedClientFactory;

  /**
   * The manager to retrieve the service to listen for notifications on.
   *
   * @var \Lullabot\Mpx\DataService\DataServiceManager
   */
  private $dataServiceManager;

  /**
   * The state system to store the last notification id.
   *
   * @var \Drupal\Core\State\StateInterface
   */
  private $state;

  /**
   * The system logger.
   *
   * @var \Psr\Log\LoggerInterface
   */
  private $logger;

  /**
   * NotificationListener constructor.
   *
   * @param \Drupal\media_mpx\AuthenticatedClientFactory $authenticatedClientFactory
   *   The factory for authenticated mpx clients.
   * @param \Lullabot\Mpx\DataService\DataServiceManager $dataServiceManager
   *   The manager to retrieve the service to listen for notifications on.
   * @param \Drupal\Core\State\StateInterface $state
   *   The state system to store the last notification id.
   * @param \Psr\Log\LoggerInterface $logger
   *   The system logger.
   */
  public function __construct(AuthenticatedClientFactory $authenticatedClientFactory, DataServiceManager $dataServiceManager, StateInterface $state, LoggerInterface $logger) {
    $this->authenticatedClientFactory = $authenticatedClientFactory;
    $this->dataServiceManager = $dataServiceManager;
    $this->state = $state;
    $this->logger = $logger;
  }

  /**
   * Fetch notifications from mpx.
   *
   * @param \Drupal\media_mpx\Plugin\media\Source\MpxMediaSourceInterface $media_source
   *   The media source notifications are being listened for.
   * @param int $notification_id
   *   The last notification ID that was processed.
   *
   * @return \Lullabot\Mpx\DataService\Notification[]
   *   An array of notifications.
   */
  public function listen(MpxMediaSourceInterface $media_source, int $notification_id): array {
    $account = $media_source->getAccount();

    $client = $this->authenticatedClientFactory->fromAccount($account);
    $definition = $media_source->getPluginDefinition()['media_mpx'];
    $service = $this->dataServiceManager->getDataService($definition['service_name'], $definition['object_type'], $definition['schema_version']);
    // @todo Client ID needs to be configured somehow.
    // @todo Inject caches
    $listener = new MpxNotificationListener($client, $service, 'drush-drupal8-mpx');

    $promise = $listener->listen($notification_id);
    try {
      return $promise->wait();
    }
    catch (ConnectException | ClientException $e) {
      // This may be a timeout if no notifications are available. However, there
      // is no good method from the exception to determine if a timeout
      // occurred.
      if (strpos($e->getMessage(), 'cURL error 28') !== FALSE) {
        $this->logger->info('A timeout occurred while waiting for notifications. This is expected when no content is changing in mpx. No action is required.');
        return [];
      }

      if ($e->getCode() == 404) {
        return $this->resetStaleNotificationId($media_source, $notification_id, $service);
      }

      // Some other connection exception occurred, so throw that up.
      throw $e;
    }
  }

  /**
   * Return the current notification ID, or -1 if one is not set.
   *
   * @param string $media_type_id
   *   The media type ID to import for.
   *
   * @return int
   *   The notification ID.
   */
  public function getNotificationId(string $media_type_id): int {
    $notification_key = $this->getNotificationKey($media_type_id);
    if (!$notification_id = $this->state->get($notification_key)) {
      // @todo Should we throw a warning?
      $notification_id = -1;
    }

    return $notification_id;
  }

  /**
   * Set the last processed notification id.
   *
   * @param string $media_type_id
   *   The media type ID to import for.
   * @param \Lullabot\Mpx\DataService\Notification $notification
   *   The last notification that has been processed.
   */
  public function setNotificationId(string $media_type_id, Notification $notification) {
    $notification_key = $this->getNotificationKey($media_type_id);
    // @todo should this really be state?
    $this->state->set($notification_key, $notification->getId());
  }

  /**
   * Reset the notification ID to restart from the earliest notification.
   *
   * @param string $media_type_id
   *   The media type ID to reset notifications for.
   */
  public function resetNotificationId(string $media_type_id): void {
    $notification_key = $this->getNotificationKey($media_type_id);
    $this->state->delete($notification_key);
  }

  /**
   * Return the notification key in the state system.
   *
   * @param string $media_type_id
   *   The media type ID to import for.
   *
   * @return string
   *   The notification key.
   */
  private function getNotificationKey(string $media_type_id): string {
    return $media_type_id . '_notification_id';
  }

  /**
   * Reset notifications when the current ID is stale.
   *
   * @param \Drupal\media_mpx\Plugin\media\Source\MpxMediaSourceInterface $media_source
   *   The media source to reset notifications for.
   * @param int $notification_id
   *   The stale notification ID.
   * @param \Lullabot\Mpx\DataService\DiscoveredDataService $service
   *   The data service notifications are being listened for.
   *
   * @return \Lullabot\Mpx\DataService\Notification[]
   *   An array of notifications.
   */
  private function resetStaleNotificationId(MpxMediaSourceInterface $media_source, int $notification_id, DiscoveredDataService $service): array {
    $this->logger->warning(
      'The %object_type last notification ID %id for %account is older than 7 days and is too old to fetch notifications. The last notification ID has been reset to re-start ingestion.',
      [
        '%object_type' => $service->getAnnotation()->getObjectType(),
        '%id' => $notification_id,
        '%account' => $media_source->getAccount()->label(),
      ]);

    return $this->listen($media_source, -1);
  }

}

Главная | Обратная связь

drupal hosting | друпал хостинг | it patrol .inc