cms_content_sync-3.0.x-dev/src/Controller/FlowPull.php

src/Controller/FlowPull.php
<?php

namespace Drupal\cms_content_sync\Controller;

use Drupal\cms_content_sync\Entity\EntityStatus;
use Drupal\cms_content_sync\Entity\Flow;
use Drupal\cms_content_sync\Entity\Pool;
use Drupal\cms_content_sync\Plugin\Type\EntityHandlerPluginManager;
use Drupal\cms_content_sync\PullIntent;
use Drupal\cms_content_sync\SyncCoreInterface\SyncCoreFactory;
use Drupal\cms_content_sync\SyncIntent;
use Drupal\Core\Controller\ControllerBase;
use Drupal\Core\Url;
use EdgeBox\SyncCore\Exception\SyncCoreException;
use EdgeBox\SyncCore\Exception\TimeoutException;
use EdgeBox\SyncCore\Interfaces\ISyncCore;
use EdgeBox\SyncCore\Interfaces\Syndication\IPullAll;

/**
 * Pull controller.
 */
class FlowPull extends ControllerBase {

  /**
   * Pull all entities of this Flow.
   *
   * @param string $cms_content_sync_flow
   *   The flow to pull entities for.
   * @param string $pull_mode
   *   The pull mode.
   *
   * @throws \Drupal\Component\Plugin\Exception\InvalidPluginDefinitionException
   * @throws \Drupal\Component\Plugin\Exception\PluginNotFoundException
   *
   * @return null|\Symfony\Component\HttpFoundation\RedirectResponse
   *   The redirect response
   */
  public function pull($cms_content_sync_flow, $pull_mode) {
    /**
     * @var \Drupal\cms_content_sync\Entity\Flow $flow
     */
    $flow = \Drupal::entityTypeManager()
      ->getStorage('cms_content_sync_flow')
      ->load($cms_content_sync_flow);

    $force_pull = FALSE;
    if ('all_entities' == $pull_mode) {
      $force_pull = TRUE;
    }

    $result = FlowPull::pullAll($flow, $force_pull);

    $operations = [];

    foreach ($result as $id => $operation) {
      $operations[] = [
        '\Drupal\cms_content_sync\Controller\FlowPull::batch',
            [$operation],
      ];
    }

    $batch = [
      'title' => t('Pull all'),
      'operations' => $operations,
      'finished' => '\Drupal\cms_content_sync\Controller\FlowPull::batchFinished',
    ];
    batch_set($batch);

    return batch_process(Url::fromRoute('entity.cms_content_sync_flow.collection'));
  }

  /**
   * Force pull an entity for a specific flow.
   *
   * @param string $flow_id
   *   The ID of the flow to force pull the entity for.
   * @param string $entity_type
   *   The entity type of the entity to force pull the entity for.
   * @param string $entity_uuid
   *   The entity UUID of the entity to force pull the entity for.
   *
   * @throws \Drupal\Component\Plugin\Exception\InvalidPluginDefinitionException
   * @throws \Drupal\Component\Plugin\Exception\PluginNotFoundException
   */
  public static function forcePullEntity($flow_id, $entity_type, $entity_uuid) {
    $entity = EntityStatus::getInfosForEntity($entity_type, $entity_uuid, ['flow' => $flow_id]);
    $entity = reset($entity);

    /** @var \Drupal\cms_content_sync\Entity\EntityStatus $entity */
    if ($entity instanceof EntityStatus) {
      // If the entity is embedded, we need to pull the parent entity instead.
      if ($entity->wasPulledEmbedded()) {
        $parent = $entity->getParentEntity();
        if (!$parent) {
          $raw = $entity->getData(EntityStatus::DATA_PARENT_ENTITY);
          \Drupal::messenger()->addMessage(t("The @type with the ID @uuid was pulled embedded into another entity but that parent @parent_type with ID @parent_uuid doesn't exist.", [
            '@type' => $entity->get('entity_type')->getValue()[0]['value'],
            '@uuid' => $entity->get('entity_uuid')->getValue()[0]['value'],
            '@parent_type' => $raw['type'],
            '@parent_uuid' => $raw['uuid'],
          ]), 'warning');

          return;
        }

        self::forcePullEntity($flow_id, $parent->getEntityTypeId(), $parent->uuid());
      }

      $source = $entity->getEntity();
      if (empty($source)) {
        \Drupal::messenger()->addMessage(t("The @type with the ID @uuid doesn't exist locally, pull skipped.", [
          '@type' => $entity->get('entity_type')->getValue()[0]['value'],
          '@uuid' => $entity->get('entity_uuid')->getValue()[0]['value'],
        ]), 'warning');

        return;
      }

      $pool = $entity->getPool();
      if (empty($pool)) {
        \Drupal::messenger()->addMessage(t('The Pool for @type %label doesn\'t exist anymore, push skipped.', [
          '@type' => $entity->get('entity_type')->getValue()[0]['value'],
          '%label' => $source->label(),
        ]), 'warning');

        return;
      }

      $entity_type_name = $source->getEntityTypeId();
      $entity_bundle = $source->bundle();

      $manual = FALSE;

      $flow = $entity->getFlow();

      if (!$flow || !$flow->getController()->canPullEntity($entity_type_name, $entity_bundle, PullIntent::PULL_AUTOMATICALLY, SyncIntent::ACTION_CREATE, TRUE)) {
        if ($flow && $flow->getController()->canPullEntity($entity_type_name, $entity_bundle, PullIntent::PULL_MANUALLY, SyncIntent::ACTION_CREATE, TRUE)) {
          $manual = TRUE;
        }
        elseif (!$flow || !$flow->getController()->canPullEntity($entity_type_name, $entity_bundle, PullIntent::PULL_AS_DEPENDENCY, SyncIntent::ACTION_CREATE, TRUE)) {
          // The flow from the status entity no longer pulls this entity type / bundle => look for a new Flow to replace it.
          $flow = Flow::getFlowForPoolAndEntityType($pool, $entity_type_name, $entity_bundle, PullIntent::PULL_AUTOMATICALLY, SyncIntent::ACTION_CREATE, TRUE);

          if (!$flow) {
            $flow = Flow::getFlowForPoolAndEntityType($pool, $entity_type_name, $entity_bundle, PullIntent::PULL_MANUALLY, SyncIntent::ACTION_CREATE, TRUE);
            if ($flow) {
              $manual = TRUE;
            }
            else {
              $flow = Flow::getFlowForPoolAndEntityType($pool, $entity_type_name, $entity_bundle, PullIntent::PULL_AS_DEPENDENCY, SyncIntent::ACTION_CREATE, TRUE);
              if (!$flow) {
                \Drupal::messenger()->addMessage(t('No Flow exists to pull @type %label, pull skipped.', [
                  '@type' => $entity->get('entity_type')->getValue()[0]['value'],
                  '%label' => $source->label(),
                ]), 'warning');

                return;
              }
            }
          }
        }
      }

      $shared_entity_id = EntityHandlerPluginManager::getSharedId($source);

      try {
        $pool
          ->getClient()
          ->getSyndicationService()
          ->pullSingle($flow->id, $entity_type_name, $entity_bundle, $shared_entity_id)
          ->fromPool($pool->id)
          ->manually((bool) $manual)
          ->execute();

        \Drupal::messenger()->addMessage(t('Pull of @type %label has been triggered.', [
          '@type' => $entity->get('entity_type')->getValue()[0]['value'],
          '%label' => $source->label(),
        ]));
      }
      catch (SyncCoreException $e) {
        \Drupal::messenger()->addMessage(t('Failed to pull @type %label: @message', [
          '@type' => $entity->get('entity_type')->getValue()[0]['value'],
          '%label' => $source->label(),
          '@message' => $e->getMessage(),
        ]), 'warning');
      }
    }
    else {
      \Drupal::messenger()->addMessage(t('No local status entity found for Entity Type: @type having UUID: @uuid.', [
        '@type' => $entity_type,
        '@uuid' => $entity_uuid,
      ]), 'warning');

      return;
    }
  }

  /**
   * Kindly ask the Sync Core to pull all entities that are auto pulled.
   *
   * @param \Drupal\cms_content_sync\Entity\Flow $flow
   *   The flow to pull entities for.
   * @param bool $force
   *   Whether to force the pull of entities that are already pulled.
   *
   * @return \EdgeBox\SyncCore\Interfaces\Syndication\IPullAll[]
   *   An array of entities that have been pulled.
   */
  public static function pullAll(Flow $flow, $force = FALSE) {
    $flow_import = $force ? [PullIntent::PULL_AUTOMATICALLY] : [
      PullIntent::PULL_AUTOMATICALLY,
      PullIntent::PULL_MANUALLY
    ];
    $pool_import = $force ? [Pool::POOL_USAGE_FORCE] : [
      Pool::POOL_USAGE_FORCE,
      Pool::POOL_USAGE_ALLOW
    ];

    /**
     * @var \EdgeBox\SyncCore\Interfaces\Syndication\IPullAll[] $result
     */
    $result = [];

    $pools = Pool::getAll();

    foreach ($flow->getController()->getEntityTypeConfig() as $entity_type_name => $bundles) {
      foreach ($bundles as $bundle_name => $type) {
        $version = $type['version'];

        if (Flow::HANDLER_IGNORE == $type['handler']) {
          continue;
        }

        if (!in_array($type['import'], $flow_import)) {
          continue;
        }

        $entity_type_pools = [];
        if (isset($type['import_pools'])) {
          foreach ($type['import_pools'] as $pool_id => $state) {
            if (!isset($entity_type_pools[$pool_id])) {
              $entity_type_pools[$pool_id] = [];
            }
            $entity_type_pools[$pool_id]['import'] = $state;
          }
        }

        foreach ($entity_type_pools as $pool_id => $definition) {
          if (empty($pools[$pool_id])) {
            continue;
          }
          $pool = $pools[$pool_id];
          $import = $definition['import'] ?? NULL;

          if (!in_array($import, $pool_import)) {
            continue;
          }

          $client = $pool
            ->getClient();

          if (SyncCoreFactory::featureEnabled(ISyncCore::FEATURE_PULL_ALL_WITHOUT_POOL)) {
            $result[] = $client
              ->getSyndicationService()
              ->pullAll($flow->id, $entity_type_name, $bundle_name, $version)
              ->force($force);

            break;
          }

          $result[] = $client
            ->getSyndicationService()
            ->pullAll($flow->id, $entity_type_name, $bundle_name, $version)
            ->fromPool($pool->id)
            ->force($force);
        }
      }
    }

    return $result;
  }

  /**
   * Batch pull finished callback.
   */
  public static function batchFinished($success, $results, $operations) {
    $failed = 0;
    $empty = 0;
    $synchronized = 0;
    foreach ($results as $result) {
      if ('FAILURE' == $result['type']) {
        ++$failed;
      }
      elseif ('EMPTY' == $result['type']) {
        ++$empty;
      }
      else {
        $synchronized += $result['total'];
      }
    }

    if ($failed) {
      \Drupal::messenger()->addMessage(t('Failed to pull from %failed entity pools.', ['%failed' => $failed]));
    }
    if ($empty) {
      \Drupal::messenger()->addMessage(t('%empty entity pools were empty or had no new entities.', ['%empty' => $empty]));
    }
    if ($synchronized) {
      \Drupal::messenger()->addMessage(t('%synchronized entities have been pulled.', ['%synchronized' => $synchronized]));
    }
  }

  /**
   * Batch pull callback for the pull-all operation.
   *
   * @param \EdgeBox\SyncCore\Interfaces\Syndication\IPullAll $operation
   *   The pull operation.
   * @param array $context
   *   The batch context.
   *
   * @throws \EdgeBox\SyncCore\Exception\SyncCoreException
   */
  public static function batch(IPullAll $operation, array &$context) {
    if (empty($context['sandbox']['operation'])) {
      $context['sandbox']['operation'] = $operation->execute();

      if (!$operation->total()) {
        $context['results'][] = [
          'type' => 'EMPTY',
        ];

        return;
      }
    }

    /**
     * @var \EdgeBox\SyncCore\Interfaces\Syndication\IPullAll $operation
     */
    $operation = $context['sandbox']['operation'];

    try {
      $progress = $operation->progress();
      $total = $operation->total();

      if ($progress < $total) {
        // Don't spam the Sync Core...
        sleep(5);
      }

      if ($progress == $total) {
        $context['results'][] = ['type' => 'success', 'total' => $total];
      }

      $context['finished'] = $progress / $operation->total();
      $context['message'] = 'Pulled ' . $progress . ' of ' . $operation->total() . ' ' . $operation->getTypeMachineName() . '.' . $operation->getBundleMachineName() . ' from ' . $operation->getSourceName() . '...';
    }
    // Ignore timeouts, just wait until the Sync Core becomes responsive again.
    catch (TimeoutException $e) {
    }
  }

}

Главная | Обратная связь

drupal hosting | друпал хостинг | it patrol .inc