feeds-8.x-3.0-alpha1/src/EventSubscriber/PubSubHubbub.php

src/EventSubscriber/PubSubHubbub.php
<?php

namespace Drupal\feeds\EventSubscriber;

use Drupal\Core\Url as CoreUrl;
use Drupal\Core\Entity\EntityTypeManagerInterface;
use Drupal\feeds\Component\HttpHelpers;
use Drupal\feeds\Event\DeleteFeedsEvent;
use Drupal\feeds\Event\FeedsEvents;
use Drupal\feeds\Event\FetchEvent;
use Drupal\feeds\FeedInterface;
use Drupal\feeds\Result\FetcherResultInterface;
use Drupal\feeds\Result\HttpFetcherResultInterface;
use Drupal\feeds\SubscriptionInterface;
use GuzzleHttp\Exception\RequestException;
use GuzzleHttp\Url;
use Symfony\Component\EventDispatcher\EventSubscriberInterface;

/**
 * Event listener for PubSubHubbub subscriptions.
 */
class PubSubHubbub implements EventSubscriberInterface {

  /**
   * The subscription storage controller.
   *
   * @var \Drupal\Core\Entity\EntityStorageInterface
   */
  protected $storage;

  /**
   * Constructs a PubSubHubbub object.
   *
   * @param \Drupal\Core\Entity\EntityTypeManagerInterface $entity_type_manager
   *   The entity type manager.
   */
  public function __construct(EntityTypeManagerInterface $entity_type_manager) {
    $this->storage = $entity_type_manager->getStorage('feeds_subscription');
  }

  /**
   * {@inheritdoc}
   */
  public static function getSubscribedEvents() {
    $events = [];
    $events[FeedsEvents::FETCH][] = ['onPostFetch', FeedsEvents::AFTER];
    $events[FeedsEvents::FEEDS_DELETE][] = 'onDeleteMultipleFeeds';
    return $events;
  }

  /**
   * Subscribes to a feed.
   */
  public function onPostFetch(FetchEvent $event) {
    $feed = $event->getFeed();
    $fetcher = $feed->getType()->getFetcher();

    $subscription = $this->storage->load($feed->id());

    if (!$fetcher->getConfiguration('use_pubsubhubbub')) {
      return $this->unsubscribe($feed, $subscription);
    }

    if (!$hub = $this->findRelation($event->getFetcherResult(), 'hub')) {
      $hub = $fetcher->getConfiguration('fallback_hub');
    }

    // No hub found.
    if (!$hub) {
      return $this->unsubscribe($feed, $subscription);
    }

    // Used to make other URLs absolute.
    $source_url = Url::fromString($feed->getSource());

    $hub = (string) $source_url->combine($hub);

    // If there is a rel="self" relation.
    if ($topic = $this->findRelation($event->getFetcherResult(), 'self')) {
      $topic = (string) $source_url->combine($topic);
      $feed->setSource($topic);
    }
    else {
      $topic = $feed->getSource();
    }

    // Subscription does not exist yet.
    if (!$subscription) {
      $subscription = $this->storage->create([
        'fid' => $feed->id(),
        'topic' => $topic,
        'hub' => $hub,
      ]);

      return $this->subscribe($feed, $subscription);
    }

    if ($topic !== $subscription->getTopic() || $subscription->getHub() !== $hub || $subscription->getState() !== 'subscribed') {
      // Unsubscribe from the old feed.
      $this->unsubscribe($feed, $subscription);

      $subscription = $this->storage->create([
        'fid' => $feed->id(),
        'topic' => $topic,
        'hub' => $hub,
      ]);

      return $this->subscribe($feed, $subscription);
    }
  }

  /**
   *
   */
  protected function subscribe(FeedInterface $feed, SubscriptionInterface $subscription) {
    $subscription->subscribe();

    $batch = [
      'title' => t('Subscribing to: %title', ['%title' => $feed->label()]),
      'init_message' => t('Subscribing to: %title', ['%title' => $feed->label()]),
      'operations' => [
        ['Drupal\feeds\EventSubscriber\PubSubHubbub::runSubscribeBatch', [$subscription]],
      ],
      'progress_message' => t('Subscribing: %title', ['%title' => $feed->label()]),
      'error_message' => t('An error occored while subscribing to %title.', ['%title' => $feed->label()]),
    ];

    batch_set($batch);
  }

  /**
   *
   */
  protected function unsubscribe(FeedInterface $feed, SubscriptionInterface $subscription = NULL) {
    if (!$subscription) {
      return;
    }

    $subscription->unsubscribe();

    $batch = [
      'title' => t('Unsubscribing from: %title', ['%title' => $feed->label()]),
      'init_message' => t('Unsubscribing from: %title', ['%title' => $feed->label()]),
      'operations' => [
        ['Drupal\feeds\EventSubscriber\PubSubHubbub::runSubscribeBatch', [$subscription]],
      ],
      'progress_message' => t('Unsubscribing: %title', ['%title' => $feed->label()]),
      'error_message' => t('An error occored while unsubscribing from %title.', ['%title' => $feed->label()]),
    ];

    batch_set($batch);
  }

  /**
   *
   */
  public static function runSubscribeBatch(SubscriptionInterface $subscription) {
    switch ($subscription->getState()) {
      case 'subscribing':
        $mode = 'subscribe';
        break;

      case 'unsubscribing':
        $mode = 'unsubscribe';
        // The subscription has been deleted, store it for a bit to handle the
        // response.
        $id = $subscription->getToken() . ':' . $subscription->id();
        \Drupal::keyValueExpirable('feeds_push_unsubscribe')->setWithExpire($id, $subscription, 3600);
        break;

      default:
        throw new \LogicException('A subscription was found in an invalid state.');
    }

    $args = [
      'feeds_subscription_id' => $subscription->id(),
      'feeds_push_token' => $subscription->getToken(),
    ];
    $callback = CoreUrl::fromRoute('entity.feeds_feed.subscribe', $args, ['absolute' => TRUE])->toString();

    $post_body = [
      'hub.callback' => $callback,
      'hub.mode' => $mode,
      'hub.topic' => $subscription->getTopic(),
      'hub.secret' => $subscription->getSecret(),
    ];

    $response = static::retry($subscription, $post_body);

    // Response failed.
    if (!$response || $response->getStatusCode() != 202) {
      switch ($subscription->getState()) {
        case 'subscribing':
          // Deleting the subscription will make it re-subscribe on the next
          // import.
          $subscription->delete();
          break;

        case 'unsubscribing':
          // Unsubscribe failed. The hub should give up eventually.
          break;
      }
    }
  }

  /**
   * Retries a POST request.
   *
   * @param \Drupal\feeds\SubscriptionInterface $subscription
   *   The subscription.
   * @param array $body
   *   The POST body.
   * @param int $retries
   *   (optional) The number of retries. Defaults to 3.
   *
   * @return \GuzzleHttp\Message\Response
   *   The Guzzle response.
   */
  protected static function retry(SubscriptionInterface $subscription, array $body, $retries = 3) {
    $tries = 0;

    do {

      $tries++;

      try {
        return \Drupal::httpClient()->post($subscription->getHub(), ['body' => $body]);
      }
      catch (RequestException $e) {
        \Drupal::logger('feeds')->warning('Subscription error: %error', ['%error' => $e->getMessage()]);
      }
    } while ($tries <= $retries);
  }

  /**
   * Finds a hub from a fetcher result.
   *
   * @param \Drupal\feeds\Result\FetcherResultInterface $fetcher_result
   *   The fetcher result.
   *
   * @return string|null
   *   The hub URL or null if one wasn't found.
   */
  protected function findRelation(FetcherResultInterface $fetcher_result, $relation) {
    if ($fetcher_result instanceof HttpFetcherResultInterface) {
      if ($rel = HttpHelpers::findLinkHeader($fetcher_result->getHeaders(), $relation)) {
        return $rel;
      }
    }

    return HttpHelpers::findRelationFromXml($fetcher_result->getRaw(), $relation);
  }

  /**
   * Deletes subscriptions when feeds are deleted.
   */
  public function onDeleteMultipleFeeds(DeleteFeedsEvent $event) {
    $subscriptions = $this->storage->loadMultiple(array_keys($event->getFeeds()));

    foreach ($event->getFeeds() as $feed) {
      if (!isset($subscriptions[$feed->id()])) {
        continue;
      }

      $this->unsubscribe($feed, $subscriptions[$feed->id()]);
    }
  }

}

Главная | Обратная связь

drupal hosting | друпал хостинг | it patrol .inc