feeds-8.x-3.0-alpha1/src/FeedImportHandler.php

src/FeedImportHandler.php
<?php

namespace Drupal\feeds;

use Drupal\Core\Entity\EntityInterface;
use Drupal\feeds\Event\CleanEvent;
use Drupal\feeds\Event\FeedsEvents;
use Drupal\feeds\Event\FetchEvent;
use Drupal\feeds\Event\InitEvent;
use Drupal\feeds\Event\ParseEvent;
use Drupal\feeds\Event\ProcessEvent;
use Drupal\feeds\Exception\EmptyFeedException;
use Drupal\feeds\Exception\LockException;
use Drupal\feeds\Feeds\Item\ItemInterface;
use Drupal\feeds\Feeds\State\CleanStateInterface;
use Drupal\feeds\Result\FetcherResultInterface;
use Drupal\feeds\Result\ParserResultInterface;
use Drupal\feeds\Result\RawFetcherResult;

/**
 * Runs the actual import on a feed.
 */
class FeedImportHandler extends FeedHandlerBase {

  /**
   * The fetcher result.
   *
   * @var \Drupal\feeds\Result\FetcherResultInterface
   */
  protected $fetcherResult;

  /**
   * Imports the whole feed at once.
   *
   * @param \Drupal\feeds\FeedInterface $feed
   *   The feed to import for.
   *
   * @throws \Exception
   *   In case of an error.
   */
  public function import(FeedInterface $feed) {
    $feed->lock();
    $fetcher_result = $this->doFetch($feed);

    try {
      do {
        foreach ($this->doParse($feed, $fetcher_result) as $item) {
          $this->doProcess($feed, $item);
        }
      } while ($feed->progressImporting() !== StateInterface::BATCH_COMPLETE);

      // Clean up if needed.
      $clean_state = $feed->getState(StateInterface::CLEAN);
      if ($clean_state instanceof CleanStateInterface && $clean_state->count()) {
        while ($entity = $clean_state->nextEntity()) {
          $this->doClean($feed, $entity);
        }
      }
    }
    catch (EmptyFeedException $e) {
      // Not an error.
    }
    catch (\Exception $exception) {
      // Do nothing. Will throw later.
    }

    $feed->finishImport();

    if (isset($exception)) {
      throw $exception;
    }
  }

  /**
   * {@inheritdoc}
   */
  public function startBatchImport(FeedInterface $feed) {
    try {
      $feed->lock();
    }
    catch (LockException $e) {
      drupal_set_message(t('The feed became locked before the import could begin.'), 'warning');
      return;
    }

    $feed->clearStates();
    $this->startBatchFetch($feed);
  }

  /**
   * Sets the fetch batch.
   *
   * @param \Drupal\feeds\FeedInterface $feed
   *   The feed being fetched.
   */
  protected function startBatchFetch(FeedInterface $feed) {
    $batch = [
      'title' => $this->t('Fetching: %title', ['%title' => $feed->label()]),
      'init_message' => $this->t('Fetching: %title', ['%title' => $feed->label()]),
      'operations' => [
        [[$this, 'batchFetch'], [$feed]],
      ],
      'progress_message' => $this->t('Fetching: %title', ['%title' => $feed->label()]),
      'error_message' => $this->t('An error occored while fetching %title.', ['%title' => $feed->label()]),
    ];

    batch_set($batch);
  }

  /**
   * Performs the batch fetching.
   *
   * @param \Drupal\feeds\FeedInterface $feed
   *   The feed being fetched.
   */
  public function batchFetch(FeedInterface $feed) {
    try {
      $this->fetcherResult = $this->doFetch($feed);
    }
    catch (\Exception $exception) {
      return $this->handleException($feed, $exception);
    }

    $this->startBatchParse($feed);
    $feed->saveStates();
  }

  /**
   * Sets the parse batch.
   *
   * @param \Drupal\feeds\FeedInterface $feed
   *   The feed being fetched.
   */
  protected function startBatchParse(FeedInterface $feed) {
    $batch = [
      'title' => $this->t('Parsing: %title', ['%title' => $feed->label()]),
      'init_message' => $this->t('Parsing: %title', ['%title' => $feed->label()]),
      'operations' => [
        [[$this, 'batchParse'], [$feed]],
      ],
      'progress_message' => $this->t('Parsing: %title', ['%title' => $feed->label()]),
      'error_message' => $this->t('An error occored while parsing %title.', ['%title' => $feed->label()]),
    ];

    batch_set($batch);
  }

  /**
   * Performs the batch parsing.
   *
   * @param \Drupal\feeds\FeedInterface $feed
   *   The feed.
   */
  public function batchParse(FeedInterface $feed) {
    try {
      $parser_result = $this->doParse($feed, $this->fetcherResult);
    }
    catch (\Exception $exception) {
      return $this->handleException($feed, $exception);
    }

    $this->startBatchProcess($feed, $parser_result);
    $feed->saveStates();
  }

  /**
   * Starts the process batch.
   *
   * @param \Drupal\feeds\FeedInterface $feed
   *   The feed.
   * @param \Drupal\feeds\Result\ParserResultInterface $parser_result
   *   The parser result.
   */
  protected function startBatchProcess(FeedInterface $feed, ParserResultInterface $parser_result) {
    $batch = [
      'title' => $this->t('Processing: %title', ['%title' => $feed->label()]),
      'init_message' => $this->t('Processing: %title', ['%title' => $feed->label()]),
      'progress_message' => $this->t('Processing: %title', ['%title' => $feed->label()]),
      'error_message' => $this->t('An error occored while processing %title.', ['%title' => $feed->label()]),
    ];

    foreach ($parser_result as $item) {
      $batch['operations'][] = [[$this, 'batchProcess'], [$feed, $item]];
    }
    $batch['operations'][] = [[$this, 'batchPostProcess'], [$feed]];

    batch_set($batch);
  }

  /**
   * Performs the batch processing.
   *
   * @param \Drupal\feeds\FeedInterface $feed
   *   The feed.
   * @param \Drupal\feeds\Feeds\Item\ItemInterface $item
   *   An item to process.
   */
  public function batchProcess(FeedInterface $feed, ItemInterface $item) {
    try {
      $this->doProcess($feed, $item);
    }
    catch (\Exception $exception) {
      return $this->handleException($feed, $exception);
    }

    $feed->saveStates();
  }

  /**
   * Finishes importing, or starts unfinished stages.
   *
   * @param \Drupal\feeds\FeedInterface $feed
   *   The feed.
   */
  public function batchPostProcess(FeedInterface $feed) {
    if ($feed->progressParsing() !== StateInterface::BATCH_COMPLETE) {
      $this->startBatchParse($feed);
    }
    elseif ($feed->progressFetching() !== StateInterface::BATCH_COMPLETE) {
      $this->startBatchFetch($feed);
    }
    elseif ($feed->progressCleaning() !== StateInterface::BATCH_COMPLETE) {
      $this->startBatchClean($feed);
    }
    else {
      $feed->finishImport();
      $feed->startBatchExpire();
    }
  }

  /**
   * Starts the clean batch.
   *
   * @param \Drupal\feeds\FeedInterface $feed
   *   The feed.
   */
  protected function startBatchClean(FeedInterface $feed) {
    $batch = [
      'title' => $this->t('Cleaning: %title', ['%title' => $feed->label()]),
      'init_message' => $this->t('Cleaning: %title', ['%title' => $feed->label()]),
      'progress_message' => $this->t('Cleaning: %title', ['%title' => $feed->label()]),
      'error_message' => $this->t('An error occored while cleaning %title.', ['%title' => $feed->label()]),
    ];

    $batch['operations'][] = [[$this, 'batchClean'], [$feed]];
    $batch['operations'][] = [[$this, 'batchPostProcess'], [$feed]];

    batch_set($batch);
  }

  /**
   * Performs the batch cleaning.
   *
   * @param \Drupal\feeds\FeedInterface $feed
   *   The feed.
   * @param array $context
   *   Batch context.
   */
  public function batchClean(FeedInterface $feed, array &$context) {
    $state = $feed->getState(StateInterface::CLEAN);
    if (empty($context['sandbox'])) {
      $context['sandbox']['max'] = $state->count();
      $context['sandbox']['progress'] = 0;
    }

    try {
      $entity = $state->nextEntity();
      if ($entity) {
        $this->doClean($feed, $entity);
      }
      $context['sandbox']['progress']++;
      $context['finished'] = ($context['sandbox']['progress'] >= $context['sandbox']['max']);
      if (!$state->count()) {
        $state->setCompleted();
      }
    }
    catch (\Exception $exception) {
      return $this->handleException($feed, $exception);
    }

    $feed->saveStates();
  }

  /**
   * Handles a push import.
   *
   * @param \Drupal\feeds\FeedInterface $feed
   *   The feed receiving the push.
   * @param string $payload
   *   The feed contents.
   *
   * @todo Move this to a queue.
   */
  public function pushImport(FeedInterface $feed, $payload) {
    $feed->lock();
    $fetcher_result = new RawFetcherResult($payload);

    try {
      do {
        foreach ($this->doParse($feed, $fetcher_result) as $item) {
          $this->doProcess($feed, $item);
        }
      } while ($feed->progressImporting() !== StateInterface::BATCH_COMPLETE);

      // Clean up if needed.
      $clean_state = $feed->getState(StateInterface::CLEAN);
      if ($clean_state instanceof CleanStateInterface && $clean_state->count()) {
        while ($entity = $clean_state->nextEntity()) {
          $this->doClean($feed, $entity);
        }
      }
    }
    catch (EmptyFeedException $e) {
      // Not an error.
    }
    catch (\Exception $exception) {
      // Do nothing. Will throw later.
    }

    $feed->finishImport();

    if (isset($exception)) {
      throw $exception;
    }
  }

  /**
   * Invokes the fetch stage.
   *
   * @param \Drupal\feeds\FeedInterface $feed
   *   The feed to fetch.
   *
   * @return \Drupal\feeds\Result\FetcherResultInterface
   *   The result of the fetcher.
   */
  protected function doFetch(FeedInterface $feed) {
    $this->dispatchEvent(FeedsEvents::INIT_IMPORT, new InitEvent($feed, 'fetch'));
    $fetch_event = $this->dispatchEvent(FeedsEvents::FETCH, new FetchEvent($feed));
    $feed->setState(StateInterface::PARSE, NULL);

    return $fetch_event->getFetcherResult();
  }

  /**
   * Invokes the parse stage.
   *
   * @param \Drupal\feeds\FeedInterface $feed
   *   The feed to fetch.
   * @param \Drupal\feeds\Result\FetcherResultInterface $fetcher_result
   *   The result of the fetcher.
   *
   * @return \Drupal\feeds\Result\ParserResultInterface
   *   The result of the parser.
   */
  protected function doParse(FeedInterface $feed, FetcherResultInterface $fetcher_result) {
    $this->dispatchEvent(FeedsEvents::INIT_IMPORT, new InitEvent($feed, 'parse'));

    $parse_event = $this->dispatchEvent(FeedsEvents::PARSE, new ParseEvent($feed, $fetcher_result));

    return $parse_event->getParserResult();
  }

  /**
   * Invokes the process stage.
   *
   * @param \Drupal\feeds\FeedInterface $feed
   *   The feed to fetch.
   * @param \Drupal\feeds\Feeds\Item\ItemInterface $item
   *   The item to process.
   */
  protected function doProcess(FeedInterface $feed, ItemInterface $item) {
    $this->dispatchEvent(FeedsEvents::INIT_IMPORT, new InitEvent($feed, 'process'));
    $this->dispatchEvent(FeedsEvents::PROCESS, new ProcessEvent($feed, $item));
  }

  /**
   * Invokes the clean stage.
   *
   * @param \Drupal\feeds\FeedInterface $feed
   *   The feed to fetch.
   * @param \Drupal\Core\Entity\EntityInterface $entity
   *   The entity to apply an action on.
   */
  protected function doClean(FeedInterface $feed, EntityInterface $entity) {
    $this->dispatchEvent(FeedsEvents::INIT_IMPORT, new InitEvent($feed, 'clean'));
    $this->dispatchEvent(FeedsEvents::CLEAN, new CleanEvent($feed, $entity));
  }

  /**
   * Handles an exception during importing.
   *
   * @param \Drupal\feeds\FeedInterface $feed
   *   The feed.
   * @param \Exception $exception
   *   The exception that was thrown.
   *
   * @throws \Exception
   *   Thrown if $exception is not an instance of EmptyFeedException.
   */
  protected function handleException(FeedInterface $feed, \Exception $exception) {
    $feed->finishImport();

    if ($exception instanceof EmptyFeedException) {
      return;
    }
    if ($exception instanceof \RuntimeException) {
      drupal_set_message($exception->getMessage(), 'error');
      return;
    }

    throw $exception;
  }

}

Главная | Обратная связь

drupal hosting | друпал хостинг | it patrol .inc