archivesspace-8.x-1.x-dev/src/BatchUpdateBuilder.php

src/BatchUpdateBuilder.php
<?php

namespace Drupal\archivesspace;

use Drupal\Core\StringTranslation\StringTranslationTrait;
use Drupal\migrate\MigrateExecutable;
use Drupal\migrate\MigrateMessage;
use Drupal\migrate\Plugin\MigrationInterface;
use Psr\Log\LoggerInterface;

/**
 * Builds batch objects for updates.
 */
class BatchUpdateBuilder {
  use StringTranslationTrait;
  /**
   * ArchivesSpaceSession that will allow us to issue API requests.
   *
   * @var ArchivesSpaceSession
   */
  protected $archivesspaceSession;

  /**
   * ArchivesSpace Utils.
   *
   * @var ArchivesSpaceUtils
   */
  protected $utils;

  /**
   * The logger factory.
   *
   * @var \Drupal\Core\Logger\LoggerChannelFactoryInterface
   */
  protected $logger;

  /**
   * Supported ArchivesSpace item types.
   *
   * @var array
   */
  protected $supportedTypes = [
    'resource',
    'archival_object',
    'agent_person',
    'agent_corporate_entity',
    'agent_family',
    'subject',
    'top_container',
    'repository',
    'classifications',
    'classification_term',
  ];

  /**
   * The maximum number of pages of 10 items we want to update with this batch.
   *
   * @var int
   */
  protected $maxPages = 0;

  /**
   * An ISO 8601 timestamp string updates should be more recent than.
   *
   * @var string
   */
  protected $startMtime = '1970-01-01T00:00:00Z';

  /**
   * The type of ArchivesSpace item to update. Empty for all types.
   *
   * @var string
   */
  protected $itemType = '';

  /**
   * Constructor to set defaults.
   */
  public function __construct(LoggerInterface $logger, ArchivesSpaceUtils $utils) {
    $this->logger = $logger;
    $this->utils = $utils;
    // Get the latest_user_mtime from state to use as an initial startMtime.
    if ($state_update_time = \Drupal::state()->get('archivesspace.latest_user_mtime')) {
      // We can *probably* trust the state value to be a valid ISO 8601
      // timestamp, but it won't hurt to be paranoid here..
      if ($timestamp = strtotime($state_update_time)) {
        $this->startMtime = date(DATE_ATOM, $timestamp);
      }
    }

    // Get a session with default settings in state.
    // Devs can use ArchivesSpace::withConnectionInfo and the
    // BatchUpdateBuilder->setArchivesSpaceSession if they want different
    // credentials.
    $this->archivesspaceSession = new ArchivesSpaceSession();
  }

  /**
   * ArchivesSpaceSession setter.
   */
  public function setArchivesSpaceSession(ArchivesSpaceSession $archivesspace_session) {
    $this->archivesspaceSession = $archivesspace_session;
  }

  /**
   * ArchivesSpaceSession getter.
   */
  public function getArchivesSpaceSession() {
    return $this->archivesspaceSession;
  }

  /**
   * Max pages setter.
   */
  public function setMaxPages(int $max_pages) {
    $this->maxPages = $max_pages;
  }

  /**
   * UpdatedSince setter.
   *
   * @throws Exception.
   */
  public function setUpdatedSince(string $updated_since) {
    // Parse the string to make sure it is a valid timestamp.
    if (!empty($updated_since) && $timestamp = strtotime($updated_since)) {
      $this->startMtime = date(DATE_ATOM, $timestamp);
    }
    else {
      throw new \Exception("Provided string '{$updated_since}' is not recognized. Please provide a valid timestamp (ISO 8601 preferred). E.g. 2020-01-01T00:00:00Z.");
    }
  }

  /**
   * Type setter.
   */
  public function setType(string $type_to_update) {
    if (!empty($type_to_update) && !in_array($type_to_update, $this->supportedTypes)) {
      $error = dt('ArchivesSpace type "@type" not available. Set blank to update all or use one of the following: @types.',
        [
          '@type' => $type_to_update,
          '@types' => implode(', ', $this->supportedTypes),
        ]
      );
      $this->logger->error($error);
      throw new \Exception($error);
    }
    else {
      $this->itemType = $type_to_update;
    }
  }

  /**
   * Builds the request parameters for the "updated since advanced query".
   */
  public function buildRequestParameters() {
    // Note: ArchivesSpace compares the user_mtime as greater than the
    // provided time *plus one day*.
    // See https://github.com/archivesspace/archivesspace/blob/v2.5.2/backend/app/model/advanced_query_string.rb#L47-L48.
    // So, to get everything updated as of a certain time,
    // take your datetime and subtract one day.
    $update_time = date(DATE_ATOM, strtotime('-1 day', strtotime($this->startMtime)));
    $parameters = [
      'page' => '1',
      'sort' => 'user_mtime asc',
      'aq' => json_encode([
        'jsonmodel_type' => 'advanced_query',
        'query' => [
          'jsonmodel_type' => 'boolean_query',
          'op' => 'AND',
          'subqueries' => [
            // Update Time query.
            [
              'jsonmodel_type' => 'boolean_query',
              'op' => 'OR',
              'subqueries' => [
                [
                  'field' => 'user_mtime',
                  'value' => $update_time,
                  'comparator' => 'greater_than',
                  'jsonmodel_type' => 'date_field_query',
                ],
                // We add system time to catch "Publish All" updates to
                // archival_objects.
                [
                  'field' => 'system_mtime',
                  'value' => $update_time,
                  'comparator' => 'greater_than',
                  'jsonmodel_type' => 'date_field_query',
                ],
              ],
            ],
            // Filter out PUI only results from the index.
            [
              'field' => 'types',
              'value' => 'pui_only',
              'jsonmodel_type' => 'field_query',
              'negated' => TRUE,
            ],
          ],
        ],
      ]),
    ];
    if (!empty($this->itemType)) {
      $parameters['type[]'] = $this->itemType;
    }
    return $parameters;
  }

  /**
   * Builds a batch definition.
   */
  public function buildBatchDefinition() {
    $parameters = $this->buildRequestParameters();
    $this->logger->notice($this->t("Looking for updates since @time.", ['@time' => date(DATE_ATOM, strtotime($this->startMtime))]));
    $results = $this->archivesspaceSession->request('GET', '/search', $parameters);
    if (!$results || $results['last_page'] == 0) {
      $this->logger->notice("Nothing to update!");
      return FALSE;
    }
    $max_pages = (!empty($this->maxPages && $this->maxPages < $results['last_page'])) ? $this->maxPages : $results['last_page'];
    $this->logger->notice($this->t("Processing @pages pages of results out of @available possible pages.", [
      '@pages' => $max_pages,
      '@available' => $results['last_page'],
    ]));
    // Create an operation for each page, upto the max page count.
    $operations = [];
    for ($batchId = 1; $batchId <= $max_pages; $batchId++) {
      $batch_parameters = $parameters;
      $batch_parameters['page'] = $batchId;
      $operations[] = [
        [$this, 'updatePage'],
        [
          $batchId,
          $this->archivesspaceSession,
          $batch_parameters,
        ],
      ];
    }
    return([
      'title' => $this->t('Processing @num pages of updates.', ['@num' => $max_pages]),
      'operations' => $operations,
      'finished' => [$this, 'updatePageFinished'],
    ]);
  }

  /**
   * Batch process callback.
   *
   * @param int $id
   *   Batch ID.
   * @param \Drupal\archivesspace\ArchivesSpaceSession $session
   *   Details of the operation.
   * @param array $parameters
   *   The operations' parameters.
   * @param object $context
   *   Context for operations.
   */
  public function updatePage($id, ArchivesSpaceSession $session, array $parameters, &$context) {
    // Get page number and query from operation details.
    // Issue query.
    $results = $session->request('GET', '/search', $parameters);
    $context['message'] = $this->t("Processing page @page of updates.", [
      '@page' => $results['this_page'],
    ]);
    // Group embedded data rows into migrations based on each's json_model.
    $migrations = [];
    foreach ($results['results'] as $result) {
      if ($migration_id = $this->utils->getUriMigration($result['uri'])) {
        $migrations[$migration_id][] = json_decode($result['json'], TRUE);
        // Grab the user_mtime from the last item.
        // By the end we will have the last user_mtime because results are
        // sorted by the user_mtime.
        $context['results']['last_user_mtime'] = $result['user_mtime'];
      }
    }

    foreach ($migrations as $migration_id => $data) {
      $context['message'] = $this->t("Running migration '@migration' with @rows items.", [
        '@migration' => $migration_id,
        '@rows' => count($data),
      ]);
      // Load the relevant migration with the embedded data source.
      $migration = \Drupal::service('plugin.manager.migration')->createInstance($migration_id, [
        'source' => [
          'plugin' => 'embedded_data',
          'data_rows' => $data,
          'ids' => ['uri' => ['type' => 'string']],
        ],
      ]);
      if (!$migration) {
        $context['message'] = $this->t("Could not find a migration with the ID '@id'!", ['@id' => $migration_id]);
        return;
      }
      // Force the migration for batches rather than fail
      // due to missed requirements.
      $migration->set('requirements', []);
      $migration->getIdMap()->prepareUpdate();
      $executable = new MigrateExecutable($migration, new MigrateMessage());
      $migration_result = $executable->import();
      if ($migration_result == MigrationInterface::RESULT_COMPLETED) {
        if (!array_key_exists('migration_counts', $context['results'])) {
          $context['results']['migration_counts'] = [];
        }
        if (!array_key_exists($migration_id, $context['results']['migration_counts'])) {
          $context['results']['migration_counts'][$migration_id] = 0;
        }
        $context['results']['migration_counts'][$migration_id] += count($data);
      }
      else {
        $result_msg = '';
        switch ($migration_result) {
          // Error messages pulled from the MigrationInterface API docs.
          case MigrationInterface::RESULT_DISABLED:
            $result_msg = "This migration is disabled, skipping.";
            break;

          case MigrationInterface::RESULT_FAILED:
            $result_msg = "The process had a fatal error.";
            break;

          case MigrationInterface::RESULT_INCOMPLETE:
            $result_msg = "The process has stopped itself (e.g., the memory limit is approaching).";
            break;

          case MigrationInterface::RESULT_SKIPPED:
            $result_msg = "Dependencies are unfulfilled - skip the process.";
            break;

          case MigrationInterface::RESULT_STOPPED:
            $result_msg = "The process was stopped externally (e.g., via drush migrate-stop).";
            break;
        }
        // Kill further processing.
        throw new \Exception("Migration '$migration_id' from page {$results['this_page']} failed to complete with the message: $result_msg");
      }
      // Update state with the most recent user_mtime since this operation
      // completed successfully, but the next one may not.
      // But only if we don't have a type filter.
      $existing_timestamp = strtotime(\Drupal::state()->get('archivesspace.latest_user_mtime'));
      $new_timestamp = strtotime($context['results']['last_user_mtime']);
      if (!array_key_exists('type[]', $parameters) && $new_timestamp > $existing_timestamp) {
        \Drupal::state()->set('archivesspace.latest_user_mtime', $context['results']['last_user_mtime']);
      }
    }

  }

  /**
   * Batch finished callback.
   *
   * @param bool $success
   *   Success of the operation.
   * @param array $results
   *   Array of results for post processing.
   * @param array $operations
   *   Array of operations.
   */
  public function updatePageFinished($success, array $results, array $operations) {
    $messenger = \Drupal::messenger();
    if ($success) {
      // Update the most recent update time with latest user_mtime found.
      if (!empty($results['last_user_mtime'])) {
        $messenger->addMessage($this->t("Proccessed items updated through @time.", ['@time' => $results['last_user_mtime']]));
        // Add one so we don't keep updating the last batch.
        $existing_timestamp = strtotime(\Drupal::state()->get('archivesspace.latest_user_mtime'));
        $new_timestamp = strtotime($results['last_user_mtime']) + 1;
        if ($new_timestamp > $existing_timestamp) {
          \Drupal::state()->set('archivesspace.latest_user_mtime', date(DATE_ATOM, $new_timestamp));
          $messenger->addMessage("New time: " . \Drupal::state()->get('archivesspace.latest_user_mtime'));
        }
      }
      else {
        $messenger->addWarning($this->t("Warning: Unable to to update the update's last modified time."));
      }

      // Report out migrations.
      if (!empty($results['migration_counts'])) {
        foreach ($results['migration_counts'] as $mid => $count) {
          $messenger->addMessage($this->t("Migration @mid processed @count items.", [
            '@mid' => $mid,
            '@count' => $count,
          ]));
        }
      }
      else {
        $messenger->addWarning($this->t("Warning: no migrations appear to have been run."));
      }

    }
    else {
      // An error occurred.
      // $operations contains the operations that remained unprocessed.
      $error_operation = reset($operations);
      $messenger->addWarning(
        $this->t('An error occurred while processing @operation with arguments : @args',
          [
            '@operation' => 'Update Page',
            '@args' => print_r($error_operation[1][2], TRUE),
          ]
        )
          );
    }
  }

}

Главная | Обратная связь

drupal hosting | друпал хостинг | it patrol .inc