acquia_vwo-1.0.x-dev/modules/acquia_vwo_content/src/Service/Export/ExportQueue.php
modules/acquia_vwo_content/src/Service/Export/ExportQueue.php
<?php
namespace Drupal\acquia_vwo_content\Service\Export;
use Drupal\acquia_vwo_content\Service\Helper\Util;
use Drupal\Core\Config\ConfigFactoryInterface;
use Drupal\Core\DependencyInjection\DependencySerializationTrait;
use Drupal\Core\Entity\EntityTypeManagerInterface;
use Drupal\Core\Messenger\MessengerInterface;
use Drupal\Core\Queue\QueueFactory;
use Drupal\Core\Queue\QueueWorkerManager;
use Drupal\Core\Queue\SuspendQueueException;
use Drupal\Core\Render\RendererInterface;
use Drupal\Core\StringTranslation\StringTranslationTrait;
use GuzzleHttp\Exception\TransferException;
/**
* Implements an Export Queue for CIS.
*/
class ExportQueue {
use StringTranslationTrait;
use DependencySerializationTrait;
/**
* Renderer.
*
* @var \Drupal\Core\Render\RendererInterface
*/
protected $renderer;
/**
* The entity type manager.
*
* @var \Drupal\Core\Entity\EntityTypeManagerInterface
*/
protected $entityTypeManager;
/**
* The Export Content Queue.
*
* @var \Drupal\Core\Queue\QueueInterface
*/
protected $queue;
/**
* The Queue Worker.
*
* @var \Drupal\Core\Queue\QueueWorkerManager
*/
protected $queueManager;
/**
* The messenger object.
*
* @var \Drupal\Core\Messenger\MessengerInterface
*/
protected $messenger;
/**
* The config factory object.
*
* @var \Drupal\Core\Config\ConfigFactoryInterface
*/
protected $configFactory;
/**
* {@inheritdoc}
*/
public function __construct(
ConfigFactoryInterface $config_factory,
RendererInterface $renderer,
EntityTypeManagerInterface $entity_type_manager,
QueueFactory $queue_factory,
QueueWorkerManager $queue_manager,
MessengerInterface $messenger,
) {
$this->configFactory = $config_factory;
$this->renderer = $renderer;
$this->entityTypeManager = $entity_type_manager;
$this->queue = $queue_factory->get('acquia_vwo_content_export');
$this->queueManager = $queue_manager;
$this->messenger = $messenger;
}
/**
* Obtains the number of items in the export queue.
*
* @return mixed
* The number of items in the export queue.
*/
public function getQueueCount(): mixed {
return $this->queue->numberOfItems();
}
/**
* Add entities to the Export Queue.
*
* @param string $action
* The action.
* @param array $entities
* Entities that should be exported.
* Format:
* [
* entity_type: '',
* entity_id: '',
* entity_uuid: ''
* ].
* @param string $langcode
* Language code of the entity translation that should be exported.
* 'all' value means that all entity translations should be exported.
*/
public function addBulkQueueItem(string $action, array $entities, string $langcode = 'all'): void {
$this->queue->createItem([
'action' => $action,
'entities' => $entities,
'langcode' => $langcode,
]);
}
/**
* Remove all the export queue items.
*/
public function purgeQueue(): void {
$this->queue->deleteQueue();
}
/**
* Get config item from the perz settings.
*
* @param string $config_item
* The config item name.
*
* @return array|mixed|null
* Returns The config item value.
*/
public function getSettingsConfigItem(string $config_item): mixed {
$settings = $this->configFactory->get('acquia_vwo_content.settings');
return $settings->get($config_item);
}
/**
* Get entity types from Entity configuration form.
*
* @return array|mixed|null
* Returns list of entity types > bundle > view modes
* from Entity configuration form.
*/
public function getEntityTypesConfig(): mixed {
return $this->configFactory
->get('acquia_vwo_content.entity_config')
->get('entity_config');
}
/**
* Enqueue content (bulk).
*/
public function rescanContentBulk($use_batch = TRUE): void {
if ($this->getQueueCount() > 0) {
$this->purgeQueue();
}
$batch = [
'title' => $this->t("Rescan Content Bulk Process"),
'operations' => [],
'finished' => [[$this, 'rescanBatchFinished'], []],
];
$entity_types = $this->getEntityTypesConfig();
$bulk = [];
foreach ($entity_types as $entity_type_id => $bundles) {
$entity_ids = $this->getRescannedEntities($entity_type_id, $bundles);
foreach ($entity_ids as $entity_id) {
// $is_eligible_for_queue = TRUE;
$entity = $this->entityTypeManager
->getStorage($entity_type_id)
->load($entity_id);
// If ($is_eligible_for_queue) {.
$bulk[] = [
'entity_type_id' => $entity_type_id,
'entity_id' => $entity_id,
'entity_uuid' => $entity->uuid(),
];
if ($use_batch) {
$batch['operations'][] = [
[$this, 'rescanBatchBulkProcess'],
[$bulk],
];
}
else {
$this->rescanBatchBulkProcess($bulk);
}
$bulk = [];
}
}
if ($use_batch) {
// Send the rest if it's present.
if (!empty($bulk)) {
$batch['operations'][] = [
[$this, 'rescanBatchBulkProcess'],
[$bulk],
];
}
// Adds the batch sets.
batch_set($batch);
return;
}
if (!empty($bulk)) {
$this->rescanBatchBulkProcess($bulk);
}
}
/**
* Returns entity ids by entity type id and passed bundles.
*
* @param string $entity_type_id
* The entity type id.
* @param array $bundles
* List of bundles of entity type.
*/
protected function getRescannedEntities(string $entity_type_id, array $bundles) {
$available_bundles = array_keys($bundles);
if (empty($available_bundles)) {
return [];
}
$bundle_property_name = $this
->entityTypeManager
->getStorage($entity_type_id)
->getEntityType()
->getKey('bundle');
$query = $this
->entityTypeManager
->getStorage($entity_type_id)
->getQuery()
->accessCheck(TRUE);
// For single-bundle entity types like 'user'
// we don't use bundle related property.
if (!empty($bundle_property_name)) {
$query = $query->condition($bundle_property_name, $available_bundles, 'IN');
}
return $query->execute();
}
/**
* Rescan content batch bulk processing callback.
*
* @param array $entities
* The list of entities.
* Format:
* [
* [
* entity_type_id: 'node'
* entity_id: 5
* entity_uuid: '...'
* ],
* ...
* ].
*/
public function rescanBatchBulkProcess(array $entities): void {
$this->addBulkQueueItem('insert_or_update', $entities);
}
/**
* Rescan content batch finished callback.
*
* @param bool $success
* Whether the batch process succeeded or not.
* @param array $results
* The results array.
* @param array $operations
* An array of operations.
*/
public function rescanBatchFinished(bool $success, array $results, array $operations): void {
if ($success) {
$this->messenger->addMessage($this->t("The contents are successfully rescanned."));
}
else {
$error_operation = reset($operations);
$this->messenger->addMessage($this->t('An error occurred while processing @operation with arguments : @args', [
'@operation' => $error_operation[0],
'@args' => print_r($error_operation[0], TRUE),
]
));
}
// Providing a report on the items processed by the queue.
$elements = [
'#theme' => 'item_list',
'#type' => 'ul',
'#items' => $results,
];
$queue_report = $this->renderer->render($elements);
$this->messenger->addMessage($queue_report);
}
/**
* Process all queue items with batch API (bulk).
*/
public function exportBulkQueueItems(): void {
Util::clearLibrariesFromState();
// Create batch which collects all the specified queue items and process
// them one after another.
$batch = [
'title' => $this->t("Process Export Queue"),
'operations' => [],
'finished' => [[$this, 'exportBatchFinished'], []],
'progressive' => FALSE,
];
// Count number of the items in this queue, create enough batch operations.
for ($i = 0; $i < $this->getQueueCount(); $i++) {
// Create batch operations.
$batch['operations'][] = [[$this, 'exportBulkBatchProcess'], []];
}
// Adds the batch sets.
batch_set($batch);
}
/**
* Export bulk batch processing callback for all operations.
*
* @param mixed $context
* The context array.
*/
public function exportBulkBatchProcess(mixed &$context): void {
$queue_worker = $this->queueManager->createInstance('acquia_vwo_content_export');
// Get a queued item.
if ($item = $this->queue->claimItem()) {
try {
if (is_object($item) && $item->data) {
// Generating a list of entities.
$msg_label = $this->t('(@entities)', [
'@entities' => serialize($item->data['entities']),
]);
// Process item.
$bulks_processed = $queue_worker->processItem($item->data);
if ($bulks_processed == FALSE) {
// Indicate that the item could not be processed.
if ($bulks_processed === FALSE) {
$message = $this->t('There was an error processing bulks: @bulk and their dependencies. The item has been sent back to the queue to be processed again later. Check your logs for more info.', [
'@entities' => $msg_label,
]);
}
else {
$message = $this->t('No processing was done for bulks: @bulk and their dependencies. The item has been sent back to the queue to be processed again later. Check your logs for more info.', [
'@entities' => $msg_label,
]);
}
$context['message'] = $message->jsonSerialize();
$context['results'][] = $message->jsonSerialize();
}
else {
// If everything was correct, delete processed item from the queue.
$this->queue->deleteItem($item);
// Creating a text message to present to the user.
$message = $this->t('Processed items: (@count @label sent).', [
'@count' => $bulks_processed,
'@label' => $bulks_processed == 1 ? $this->t('item') : $this->t('items'),
]);
$context['message'] = $message->jsonSerialize();
$context['results'][] = $message->jsonSerialize();
}
}
}
catch (\RuntimeException $e) {
if ($e instanceof SuspendQueueException
|| $e instanceof TransferException) {
// @todo Handle exception.
$this->addBulkQueueItem(
$item->data['action'],
$item->data['entities'],
$item->data['langcode']
);
$this->queue->deleteItem($item);
}
}
}
}
/**
* Batch finished callback.
*
* @param bool $success
* Whether the batch process succeeded or not.
* @param array $results
* The results array.
* @param array $operations
* An array of operations.
*/
public function exportBatchFinished(bool $success, array $results, array $operations): void {
if ($success) {
$this->messenger->addMessage($this->t("The contents are successfully exported."));
}
else {
$error_operation = reset($operations);
$this->messenger->addMessage($this->t('An error occurred while processing @operation with arguments : @args', [
'@operation' => $error_operation[0],
'@args' => print_r($error_operation[0], TRUE),
]
));
}
// Providing a report on the items processed by the queue.
$elements = [
'#theme' => 'item_list',
'#type' => 'ul',
'#items' => $results,
];
$queue_report = $this->renderer->render($elements);
$this->messenger->addMessage($queue_report);
}
}
