cms_content_sync-3.0.x-dev/src/Controller/FlowPull.php
src/Controller/FlowPull.php
<?php
namespace Drupal\cms_content_sync\Controller;
use Drupal\cms_content_sync\Entity\EntityStatus;
use Drupal\cms_content_sync\Entity\Flow;
use Drupal\cms_content_sync\Entity\Pool;
use Drupal\cms_content_sync\Plugin\Type\EntityHandlerPluginManager;
use Drupal\cms_content_sync\PullIntent;
use Drupal\cms_content_sync\SyncCoreInterface\SyncCoreFactory;
use Drupal\cms_content_sync\SyncIntent;
use Drupal\Core\Controller\ControllerBase;
use Drupal\Core\Url;
use EdgeBox\SyncCore\Exception\SyncCoreException;
use EdgeBox\SyncCore\Exception\TimeoutException;
use EdgeBox\SyncCore\Interfaces\ISyncCore;
use EdgeBox\SyncCore\Interfaces\Syndication\IPullAll;
/**
* Pull controller.
*/
class FlowPull extends ControllerBase {
/**
* Pull all entities of this Flow.
*
* @param string $cms_content_sync_flow
* The flow to pull entities for.
* @param string $pull_mode
* The pull mode.
*
* @throws \Drupal\Component\Plugin\Exception\InvalidPluginDefinitionException
* @throws \Drupal\Component\Plugin\Exception\PluginNotFoundException
*
* @return null|\Symfony\Component\HttpFoundation\RedirectResponse
* The redirect response
*/
public function pull($cms_content_sync_flow, $pull_mode) {
/**
* @var \Drupal\cms_content_sync\Entity\Flow $flow
*/
$flow = \Drupal::entityTypeManager()
->getStorage('cms_content_sync_flow')
->load($cms_content_sync_flow);
$force_pull = FALSE;
if ('all_entities' == $pull_mode) {
$force_pull = TRUE;
}
$result = FlowPull::pullAll($flow, $force_pull);
$operations = [];
foreach ($result as $id => $operation) {
$operations[] = [
'\Drupal\cms_content_sync\Controller\FlowPull::batch',
[$operation],
];
}
$batch = [
'title' => t('Pull all'),
'operations' => $operations,
'finished' => '\Drupal\cms_content_sync\Controller\FlowPull::batchFinished',
];
batch_set($batch);
return batch_process(Url::fromRoute('entity.cms_content_sync_flow.collection'));
}
/**
* Force pull an entity for a specific flow.
*
* @param string $flow_id
* The ID of the flow to force pull the entity for.
* @param string $entity_type
* The entity type of the entity to force pull the entity for.
* @param string $entity_uuid
* The entity UUID of the entity to force pull the entity for.
*
* @throws \Drupal\Component\Plugin\Exception\InvalidPluginDefinitionException
* @throws \Drupal\Component\Plugin\Exception\PluginNotFoundException
*/
public static function forcePullEntity($flow_id, $entity_type, $entity_uuid) {
$entity = EntityStatus::getInfosForEntity($entity_type, $entity_uuid, ['flow' => $flow_id]);
$entity = reset($entity);
/** @var \Drupal\cms_content_sync\Entity\EntityStatus $entity */
if ($entity instanceof EntityStatus) {
// If the entity is embedded, we need to pull the parent entity instead.
if ($entity->wasPulledEmbedded()) {
$parent = $entity->getParentEntity();
if (!$parent) {
$raw = $entity->getData(EntityStatus::DATA_PARENT_ENTITY);
\Drupal::messenger()->addMessage(t("The @type with the ID @uuid was pulled embedded into another entity but that parent @parent_type with ID @parent_uuid doesn't exist.", [
'@type' => $entity->get('entity_type')->getValue()[0]['value'],
'@uuid' => $entity->get('entity_uuid')->getValue()[0]['value'],
'@parent_type' => $raw['type'],
'@parent_uuid' => $raw['uuid'],
]), 'warning');
return;
}
self::forcePullEntity($flow_id, $parent->getEntityTypeId(), $parent->uuid());
}
$source = $entity->getEntity();
if (empty($source)) {
\Drupal::messenger()->addMessage(t("The @type with the ID @uuid doesn't exist locally, pull skipped.", [
'@type' => $entity->get('entity_type')->getValue()[0]['value'],
'@uuid' => $entity->get('entity_uuid')->getValue()[0]['value'],
]), 'warning');
return;
}
$pool = $entity->getPool();
if (empty($pool)) {
\Drupal::messenger()->addMessage(t('The Pool for @type %label doesn\'t exist anymore, push skipped.', [
'@type' => $entity->get('entity_type')->getValue()[0]['value'],
'%label' => $source->label(),
]), 'warning');
return;
}
$entity_type_name = $source->getEntityTypeId();
$entity_bundle = $source->bundle();
$manual = FALSE;
$flow = $entity->getFlow();
if (!$flow || !$flow->getController()->canPullEntity($entity_type_name, $entity_bundle, PullIntent::PULL_AUTOMATICALLY, SyncIntent::ACTION_CREATE, TRUE)) {
if ($flow && $flow->getController()->canPullEntity($entity_type_name, $entity_bundle, PullIntent::PULL_MANUALLY, SyncIntent::ACTION_CREATE, TRUE)) {
$manual = TRUE;
}
elseif (!$flow || !$flow->getController()->canPullEntity($entity_type_name, $entity_bundle, PullIntent::PULL_AS_DEPENDENCY, SyncIntent::ACTION_CREATE, TRUE)) {
// The flow from the status entity no longer pulls this entity type / bundle => look for a new Flow to replace it.
$flow = Flow::getFlowForPoolAndEntityType($pool, $entity_type_name, $entity_bundle, PullIntent::PULL_AUTOMATICALLY, SyncIntent::ACTION_CREATE, TRUE);
if (!$flow) {
$flow = Flow::getFlowForPoolAndEntityType($pool, $entity_type_name, $entity_bundle, PullIntent::PULL_MANUALLY, SyncIntent::ACTION_CREATE, TRUE);
if ($flow) {
$manual = TRUE;
}
else {
$flow = Flow::getFlowForPoolAndEntityType($pool, $entity_type_name, $entity_bundle, PullIntent::PULL_AS_DEPENDENCY, SyncIntent::ACTION_CREATE, TRUE);
if (!$flow) {
\Drupal::messenger()->addMessage(t('No Flow exists to pull @type %label, pull skipped.', [
'@type' => $entity->get('entity_type')->getValue()[0]['value'],
'%label' => $source->label(),
]), 'warning');
return;
}
}
}
}
}
$shared_entity_id = EntityHandlerPluginManager::getSharedId($source);
try {
$pool
->getClient()
->getSyndicationService()
->pullSingle($flow->id, $entity_type_name, $entity_bundle, $shared_entity_id)
->fromPool($pool->id)
->manually((bool) $manual)
->execute();
\Drupal::messenger()->addMessage(t('Pull of @type %label has been triggered.', [
'@type' => $entity->get('entity_type')->getValue()[0]['value'],
'%label' => $source->label(),
]));
}
catch (SyncCoreException $e) {
\Drupal::messenger()->addMessage(t('Failed to pull @type %label: @message', [
'@type' => $entity->get('entity_type')->getValue()[0]['value'],
'%label' => $source->label(),
'@message' => $e->getMessage(),
]), 'warning');
}
}
else {
\Drupal::messenger()->addMessage(t('No local status entity found for Entity Type: @type having UUID: @uuid.', [
'@type' => $entity_type,
'@uuid' => $entity_uuid,
]), 'warning');
return;
}
}
/**
* Kindly ask the Sync Core to pull all entities that are auto pulled.
*
* @param \Drupal\cms_content_sync\Entity\Flow $flow
* The flow to pull entities for.
* @param bool $force
* Whether to force the pull of entities that are already pulled.
*
* @return \EdgeBox\SyncCore\Interfaces\Syndication\IPullAll[]
* An array of entities that have been pulled.
*/
public static function pullAll(Flow $flow, $force = FALSE) {
$flow_import = $force ? [PullIntent::PULL_AUTOMATICALLY] : [
PullIntent::PULL_AUTOMATICALLY,
PullIntent::PULL_MANUALLY
];
$pool_import = $force ? [Pool::POOL_USAGE_FORCE] : [
Pool::POOL_USAGE_FORCE,
Pool::POOL_USAGE_ALLOW
];
/**
* @var \EdgeBox\SyncCore\Interfaces\Syndication\IPullAll[] $result
*/
$result = [];
$pools = Pool::getAll();
foreach ($flow->getController()->getEntityTypeConfig() as $entity_type_name => $bundles) {
foreach ($bundles as $bundle_name => $type) {
$version = $type['version'];
if (Flow::HANDLER_IGNORE == $type['handler']) {
continue;
}
if (!in_array($type['import'], $flow_import)) {
continue;
}
$entity_type_pools = [];
if (isset($type['import_pools'])) {
foreach ($type['import_pools'] as $pool_id => $state) {
if (!isset($entity_type_pools[$pool_id])) {
$entity_type_pools[$pool_id] = [];
}
$entity_type_pools[$pool_id]['import'] = $state;
}
}
foreach ($entity_type_pools as $pool_id => $definition) {
if (empty($pools[$pool_id])) {
continue;
}
$pool = $pools[$pool_id];
$import = $definition['import'] ?? NULL;
if (!in_array($import, $pool_import)) {
continue;
}
$client = $pool
->getClient();
if (SyncCoreFactory::featureEnabled(ISyncCore::FEATURE_PULL_ALL_WITHOUT_POOL)) {
$result[] = $client
->getSyndicationService()
->pullAll($flow->id, $entity_type_name, $bundle_name, $version)
->force($force);
break;
}
$result[] = $client
->getSyndicationService()
->pullAll($flow->id, $entity_type_name, $bundle_name, $version)
->fromPool($pool->id)
->force($force);
}
}
}
return $result;
}
/**
* Batch pull finished callback.
*/
public static function batchFinished($success, $results, $operations) {
$failed = 0;
$empty = 0;
$synchronized = 0;
foreach ($results as $result) {
if ('FAILURE' == $result['type']) {
++$failed;
}
elseif ('EMPTY' == $result['type']) {
++$empty;
}
else {
$synchronized += $result['total'];
}
}
if ($failed) {
\Drupal::messenger()->addMessage(t('Failed to pull from %failed entity pools.', ['%failed' => $failed]));
}
if ($empty) {
\Drupal::messenger()->addMessage(t('%empty entity pools were empty or had no new entities.', ['%empty' => $empty]));
}
if ($synchronized) {
\Drupal::messenger()->addMessage(t('%synchronized entities have been pulled.', ['%synchronized' => $synchronized]));
}
}
/**
* Batch pull callback for the pull-all operation.
*
* @param \EdgeBox\SyncCore\Interfaces\Syndication\IPullAll $operation
* The pull operation.
* @param array $context
* The batch context.
*
* @throws \EdgeBox\SyncCore\Exception\SyncCoreException
*/
public static function batch(IPullAll $operation, array &$context) {
if (empty($context['sandbox']['operation'])) {
$context['sandbox']['operation'] = $operation->execute();
if (!$operation->total()) {
$context['results'][] = [
'type' => 'EMPTY',
];
return;
}
}
/**
* @var \EdgeBox\SyncCore\Interfaces\Syndication\IPullAll $operation
*/
$operation = $context['sandbox']['operation'];
try {
$progress = $operation->progress();
$total = $operation->total();
if ($progress < $total) {
// Don't spam the Sync Core...
sleep(5);
}
if ($progress == $total) {
$context['results'][] = ['type' => 'success', 'total' => $total];
}
$context['finished'] = $progress / $operation->total();
$context['message'] = 'Pulled ' . $progress . ' of ' . $operation->total() . ' ' . $operation->getTypeMachineName() . '.' . $operation->getBundleMachineName() . ' from ' . $operation->getSourceName() . '...';
}
// Ignore timeouts, just wait until the Sync Core becomes responsive again.
catch (TimeoutException $e) {
}
}
}
