feeds-8.x-3.0-alpha1/src/Plugin/QueueWorker/FeedRefresh.php

src/Plugin/QueueWorker/FeedRefresh.php
<?php

namespace Drupal\feeds\Plugin\QueueWorker;

use Drupal\feeds\Event\CleanEvent;
use Drupal\feeds\Event\FeedsEvents;
use Drupal\feeds\Event\FetchEvent;
use Drupal\feeds\Event\InitEvent;
use Drupal\feeds\Event\ParseEvent;
use Drupal\feeds\Event\ProcessEvent;
use Drupal\feeds\Exception\LockException;
use Drupal\feeds\FeedInterface;
use Drupal\feeds\Feeds\Item\ItemInterface;
use Drupal\feeds\Result\FetcherResultInterface;
use Drupal\feeds\StateInterface;

/**
 * @QueueWorker(
 *   id = "feeds_feed_refresh",
 *   title = @Translation("Feed refresh"),
 *   cron = {"time" = 60},
 *   deriver = "Drupal\feeds\Plugin\Derivative\FeedQueueWorker"
 * )
 */
class FeedRefresh extends FeedQueueWorkerBase {

  /**
   * Parameter passed when starting a new import.
   *
   * @var string
   */
  const BEGIN = 'begin';

  /**
   * Parameter passed when continuing an import.
   *
   * @var string
   */
  const RESUME = 'resume';

  /**
   * Parameter passed when parsing.
   *
   * @var string
   */
  const PARSE = 'parse';

  /**
   * Parameter passed when processing.
   *
   * @var string
   */
  const PROCESS = 'process';

  /**
   * Parameter passed when cleaning.
   *
   * @var string
   */
  const CLEAN = 'clean';

  /**
   * Parameter passed when finishing.
   *
   * @var string
   */
  const FINISH = 'finish';

  /**
   * {@inheritdoc}
   */
  public function processItem($data) {
    list($feed, $stage, $params) = $data;

    if (!$feed instanceof FeedInterface) {
      return;
    }

    $switcher = $this->switchAccount($feed);

    try {
      switch ($stage) {
        case static::BEGIN:
        case static::RESUME:
          $this->import($feed, $stage);
          break;

        case static::PARSE:
          $this->doParse($feed, $params['fetcher_result']);
          break;

        case static::PROCESS:
          $this->doProcess($feed, $params['item']);
          break;

        case static::CLEAN:
          $this->doClean($feed);
          break;

        case static::FINISH:
          $this->finish($feed, $params['fetcher_result']);
          break;
      }
    }
    catch (\Exception $exception) {
      return $this->handleException($feed, $exception);
    }
    finally {
      $switcher->switchBack();
    }
  }

  /**
   * Queues an item.
   *
   * @param \Drupal\feeds\FeedInterface $feed
   *   The feed for which to queue an item.
   * @param string $stage
   *   The stage of importing.
   * @param array $params
   *   Additional parameters.
   */
  protected function queueItem(FeedInterface $feed, $stage, $params = []) {
    $this->queueFactory->get('feeds_feed_refresh:' . $feed->bundle())
      ->createItem([$feed, $stage, $params]);
  }

  /**
   * Begin or resume an import.
   *
   * @param \Drupal\feeds\FeedInterface $feed
   *   The feed to perform an import on.
   * @param string $stage
   *   The stage of importing.
   */
  protected function import(FeedInterface $feed, $stage) {
    if ($stage === static::BEGIN) {
      try {
        $feed->lock();
      }
      catch (LockException $e) {
        return;
      }

      $feed->clearStates();
    }

    $this->dispatchEvent(FeedsEvents::INIT_IMPORT, new InitEvent($feed, 'fetch'));
    $fetch_event = $this->dispatchEvent(FeedsEvents::FETCH, new FetchEvent($feed));
    $feed->setState(StateInterface::PARSE, NULL);

    $feed->saveStates();
    $this->queueItem($feed, static::PARSE, [
      'fetcher_result' => $fetch_event->getFetcherResult(),
    ]);
  }

  /**
   * Parses.
   *
   * @param \Drupal\feeds\FeedInterface $feed
   *   The feed to perform a parse event on.
   * @param \Drupal\feeds\Result\FetcherResultInterface
   *   The fetcher result.
   */
  protected function doParse(FeedInterface $feed, FetcherResultInterface $fetcher_result) {
    $this->dispatchEvent(FeedsEvents::INIT_IMPORT, new InitEvent($feed, 'parse'));
    $parse_event = $this->dispatchEvent(FeedsEvents::PARSE, new ParseEvent($feed, $fetcher_result));

    $feed->saveStates();

    foreach ($parse_event->getParserResult() as $item) {
      $this->queueItem($feed, static::PROCESS, [
        'item' => $item,
      ]);
    }

    // Add a final queue item that finalizes the import.
    $this->queueItem($feed, static::FINISH, [
      'fetcher_result' => $fetcher_result,
    ]);
  }

  /**
   * Processes an item.
   *
   * @param \Drupal\feeds\FeedInterface $feed
   *   The feed to perform a process event on.
   * @param \Drupal\feeds\Feeds\Item\ItemInterface
   *   The item to import.
   */
  protected function doProcess(FeedInterface $feed, ItemInterface $item) {
    $this->dispatchEvent(FeedsEvents::INIT_IMPORT, new InitEvent($feed, 'process'));
    $this->dispatchEvent(FeedsEvents::PROCESS, new ProcessEvent($feed, $item));

    $feed->saveStates();
  }

  /**
   * Cleans an entity.
   *
   * @param \Drupal\feeds\FeedInterface $feed
   *   The feed to perform a clean event on.
   */
  protected function doClean(FeedInterface $feed) {
    $state = $feed->getState(StateInterface::CLEAN);

    $entity = $state->nextEntity();
    if ($entity) {
      $this->dispatchEvent(FeedsEvents::INIT_IMPORT, new InitEvent($feed, 'clean'));
      $this->dispatchEvent(FeedsEvents::CLEAN, new CleanEvent($feed, $entity));
    }

    if (!$state->count()) {
      $state->setCompleted();
    }

    $feed->saveStates();
  }

  /**
   * Finalizes the import.
   */
  protected function finish(FeedInterface $feed, FetcherResultInterface $fetcher_result) {
    // Update item count.
    $feed->save();

    if ($feed->progressParsing() !== StateInterface::BATCH_COMPLETE) {
      $this->queueItem($feed, static::PARSE, [
        'fetcher_result' => $fetcher_result,
      ]);
    }
    elseif ($feed->progressFetching() !== StateInterface::BATCH_COMPLETE) {
      $this->queueItem($feed, static::RESUME);
    }
    elseif ($feed->progressCleaning() !== StateInterface::BATCH_COMPLETE) {
      $clean_state = $feed->getState(StateInterface::CLEAN);
      for ($i = 0; $i < $clean_state->count(); $i++) {
        $this->queueItem($feed, static::CLEAN);
      }

      // Add a final queue item that finalizes the import.
      $this->queueItem($feed, static::FINISH, [
        'fetcher_result' => $fetcher_result,
      ]);
    }
    else {
      $feed->finishImport();
    }
  }

}

Главная | Обратная связь

drupal hosting | друпал хостинг | it patrol .inc