lionbridge_translation_provider-8.x-2.4/tmgmt_contentapi/src/Services/QueueOperations.php
tmgmt_contentapi/src/Services/QueueOperations.php
<?php
namespace Drupal\tmgmt_contentapi\Services;
use Drupal\Core\Database\Connection;
use Drupal\Core\Logger\LoggerChannelFactoryInterface;
use Drupal\Core\Queue\QueueFactory;
use Drupal\Core\State\StateInterface;
use Symfony\Component\DependencyInjection\ContainerInterface;
use Drupal\Core\Access\CsrfTokenGenerator;
use GuzzleHttp\ClientInterface;
use Drupal\Core\Entity\EntityTypeManagerInterface;
use Drupal\tmgmt\JobInterface;
use Drupal\tmgmt\JobItemInterface;
/**
* Service for handling queue operations.
*/
class QueueOperations {
/**
* The database connection.
*
* @var \Drupal\Core\Database\Connection
*/
protected $database;
/**
* Logger service.
*
* @var \Drupal\Core\Logger\LoggerChannelFactoryInterface
*/
protected $logger;
/**
* The queue factory.
*
* @var \Drupal\Core\Queue\QueueFactory
*/
protected $queueFactory;
/**
* The state service for storing temporary data.
*
* @var \Drupal\Core\State\StateInterface
*/
protected $state;
/**
* The state service for storing temporary data.
*
* @var \GuzzleHttp\ClientInterface
*/
protected $httpClient;
/**
* CsrfToken to access controller.
*
* @var \Drupal\Core\Access\CsrfTokenGenerator
*/
protected $csrfToken;
/**
* The entity type manager.
*
* @var \Drupal\Core\Entity\EntityTypeManagerInterface
*/
protected $entityTypeManager;
/**
* Queue name for generate files to CAPI.
*/
const QUEUE_NAME_GENERATE_FILES = 'generate_file_for_translation_to_capi';
/**
* Value for zip job path.
*/
const ZIP_JOB_PATH = 'zip_job_';
/**
* Zip extenstion.
*/
const ZIP_EXTENSION = ".zip";
/**
* Public file path for sent files.
*/
const PUBLIC_SENT_FILE_PATH = '://tmgmt_contentapi/LioxSentFiles/';
/**
* Constructs a QueueOperations service.
*
* @param \Drupal\Core\Database\Connection $database
* The database connection.
* @param \Drupal\Core\Logger\LoggerChannelFactoryInterface $logger
* The logger service.
* @param \Drupal\Core\Queue\QueueFactory $queueFactory
* The queue object used for processing job items.
* @param \Drupal\Core\State\StateInterface $state
* The state service.
* @param \Drupal\Core\Access\CsrfTokenGenerator $csrfToken
* The CSRF token generator service.
* @param \Drupal\Core\Entity\EntityTypeManagerInterface $entityTypeManager
* The entity type manager service.
*/
public function __construct(Connection $database, LoggerChannelFactoryInterface $logger, QueueFactory $queueFactory, StateInterface $state, CsrfTokenGenerator $csrfToken, EntityTypeManagerInterface $entityTypeManager) {
$this->database = $database;
$this->logger = $logger;
$this->queueFactory = $queueFactory;
$this->state = $state;
$this->csrfToken = $csrfToken;
$this->entityTypeManager = $entityTypeManager;
}
/**
* Factory method for service instantiation.
*
* @param \Symfony\Component\DependencyInjection\ContainerInterface $container
* The service container.
*
* @return static
* The instantiated service.
*/
public static function create(ContainerInterface $container) {
return new static(
$container->get('database'),
$container->get('logger.factory'),
$container->get('queue'),
$container->get('state'),
$container->get('csrf_token'),
$container->get('entity_type.manager')
);
}
/**
* Logic to start workers for processing the queue.
*
* @param string $queue_name
* The name of the queue.
* @param int $batch_size_queue
* The batch size for export queue.
*/
public function processQueue(string $queue_name, $batch_size_queue = 30) {
// Give option to alter the batch size on the basis of queue name.
\Drupal::moduleHandler()->alter('tmgmt_contentapi_process_queue', $queue_name, $batch_size_queue);
// Process the queue via controller.
$this->processQueueViaController($queue_name, $batch_size_queue);
}
/**
* Process the queue via controller.
*
* @param string $queue_name
* The name of the queue.
* @param int $batch_size_queue
* The batch size for export queue.
*/
protected function processQueueViaController(string $queue_name, $batch_size_queue) {
// Check if current user has permission to run the queue.
if (!\Drupal::currentUser()->hasPermission('access queue process')) {
$this->logger->get('TMGMT_CONTENTAPI')->notice('Instant queue execution not permitted, queue will process via cron.');
return;
}
// Generate a CSRF token for the request.
$csrf_token = $this->csrfToken->get('queue_process_background');
$this->httpClient = \Drupal::httpClient();
try {
$site_url = \Drupal::request()->getSchemeAndHttpHost();
$endpoint = $site_url . '/tmgmt-contentapi/queue-process-background/' . $queue_name . '/' . $batch_size_queue;
// Get the session cookie for the current user.
$session_name = \Drupal::service('session_manager')->getName();
$session_id = \Drupal::service('session_manager')->getId();
$cookie = $session_name . '=' . $session_id;
// Make a non-blocking request.
$options = [
'headers' => [
'X-CSRF-Token' => $csrf_token,
'Content-Type' => 'application/json',
'Cookie' => $cookie,
],
'timeout' => 1,
];
$this->httpClient->request('POST', $endpoint, $options);
}
catch (\Exception $e) {
}
}
/**
* Get all items from the export queue.
*
* @param string $queue_name
* The name of the queue.
* @param string $index_key
* The index key to fetch from the data.
* @param bool $is_manual_check
* If TRUE, only items with 'is_manual' set in data will be returned.
*/
public function getQueueItems(string $queue_name, string $index_key, $is_manual_check = FALSE) {
$results = $this->database->select('queue', 'q')
->fields('q', ['item_id', 'data', 'created'])
->condition('name', $queue_name)
->execute()->fetchAll();
$items = [];
foreach ($results as $record) {
$data = unserialize($record->data);
// Check if is_manual_check is true then check if $data['is_manual'] is set then add it to the item array else continue.
if ($is_manual_check && !isset($data['is_manual'])) {
continue;
}
$items[$record->item_id] = $data[$index_key];
}
return $items;
}
/**
* Get number of items exist in queue.
*
* @param string $queue_name
* Queue name.
*
* @return int
* Remaining count.
*/
public function getRemainingItemsFromQueue(string $queue_name) {
return $remaining_count = $this->database->select('queue', 'q')
->condition('name', $queue_name)
->countQuery()
->execute()
->fetchField();
}
/**
* Function to delete item from queue.
*
* @param int $item_id
* The item id to delete from the queue.
*/
public function deleteItemFromQueue($item_id) {
try {
$this->database->delete('queue')
->condition('item_id', $item_id)
->execute();
}
catch (\Exception $e) {
$this->logger->get('TMGMT_CONTENTAPI')->error('Error deleting item from queue: @message', ['@message' => $e->getMessage()]);
}
}
/**
* Remove items from a specific queue by matching item data content.
*
* This method searches through queue items by deserializing their data
* and removes items that match the specified item ID in the data.
* Used to prevent race conditions during partial cleanup requeue operations.
*
* @param string $queue_name
* The name of the queue to remove items from.
* @param int $item_id
* The item ID to match in the queue data (tjiid field).
*
* @return int
* Number of items removed from the queue.
*/
public function removeItemFromQueue(string $queue_name, $item_id): int {
$removed_count = 0;
try {
// Get all items from the specified queue
$results = $this->database->select('queue', 'q')
->fields('q', ['item_id', 'data'])
->condition('name', $queue_name)
->execute()
->fetchAll();
foreach ($results as $record) {
$data = unserialize($record->data);
// Check if this queue item matches the item ID we want to remove
// Support both 'id' (new format) and 'item_id' (legacy format) for compatibility
$matches = FALSE;
if (isset($data['id']) && $data['id'] == $item_id) {
$matches = TRUE;
} elseif (isset($data['item_id']) && $data['item_id'] == $item_id) {
$matches = TRUE;
}
if ($matches) {
// Remove this item from the queue
$delete_result = $this->database->delete('queue')
->condition('item_id', $record->item_id)
->execute();
if ($delete_result) {
$removed_count++;
$this->logger->get('TMGMT_CONTENTAPI')->info(
'Removed item @item_id from queue @queue_name (queue item_id: @queue_item_id)',
[
'@item_id' => $item_id,
'@queue_name' => $queue_name,
'@queue_item_id' => $record->item_id,
]
);
}
}
}
if ($removed_count > 0) {
$this->logger->get('TMGMT_CONTENTAPI')->info(
'Queue cleanup: Removed @count items with ID @item_id from queue @queue_name',
[
'@count' => $removed_count,
'@item_id' => $item_id,
'@queue_name' => $queue_name,
]
);
}
} catch (\Exception $e) {
$this->logger->get('TMGMT_CONTENTAPI')->error(
'Error removing item @item_id from queue @queue_name: @message',
[
'@item_id' => $item_id,
'@queue_name' => $queue_name,
'@message' => $e->getMessage(),
]
);
}
return $removed_count;
}
/**
* Function to add item to state variable.
*
* @param string $state_key
* The state key to store the data.
* @param mixed $new_item
* The new item to add to the state variable.
* @param mixed $key
* The key to use for the new item in the state variable array.
*/
public function addValueToStatevariable(string $state_key, mixed $new_item, $key = NULL) {
$current_data = $this->state->get($state_key, []);
// Ensure the state variable is an array.
if (!is_array($current_data)) {
$current_data = [];
}
// Add the new item to the array.
$current_data[$key] = $new_item;
// Save the updated array back to the state variable.
$this->state->set($state_key, $current_data);
}
/**
* Function to get array from the state variable.
*
* @param string $state_key
* The state key to store the data.
*
* @return array
* The array of items.
*/
public function getvalueFromStatevariable(string $state_key) {
return $this->state->get($state_key, []);
}
/**
* Function to delete state variable.
*
* @param string $state_key
* The state key to delete.
*/
public function deleteStateVariable(string $state_key) {
$this->state->delete($state_key);
}
/**
* Get failed upload items for a job.
*
* @param int $jobid
* The local Drupal job ID.
* @param string $capi_job_id
* The CAPI job ID (optional).
*
* @return array
* Array of failed items with their details.
*/
public function getFailedUploadItems($jobid, $capi_job_id = NULL): array {
$query = $this->database->select('tmgmt_capi_request_processor', 't')
->fields('t', ['rid', 'tjiid', 'tjid', 'jobid', 'requestid', 'file_upload_status', 'file_upload_attempts', 'errormessage', 'statuscode'])
->condition('tjid', $jobid);
if ($capi_job_id) {
$query->condition('jobid', $capi_job_id);
}
// Items that failed upload - updated to include new status codes
$or_group = $query->orConditionGroup()
->condition('file_upload_status', 'FAILED')
->condition('file_upload_status', 'READY_FOR_RETRY') // ADD THIS
->condition('file_upload_attempts', 3, '>='); // Max attempts reached
$query->condition($or_group);
return $query->execute()->fetchAllAssoc('tjiid', \PDO::FETCH_ASSOC);
}
/**
* Get upload status summary for a job.
*
* @param int $jobid
* The local Drupal job ID.
* @param string $capi_job_id
* The CAPI job ID (optional).
*
* @return array
* Status summary with counts.
*/
public function getJobUploadStatus($jobid, $capi_job_id = NULL): array {
$query = $this->database->select('tmgmt_capi_request_processor', 't')
->fields('t', ['file_upload_status'])
->condition('tjid', $jobid);
if ($capi_job_id) {
$query->condition('jobid', $capi_job_id);
}
$query->addExpression('COUNT(*)', 'count');
$query->groupBy('file_upload_status');
$results = $query->execute()->fetchAllKeyed();
return [
'total' => array_sum($results),
'uploaded' => $results['UPLOADED'] ?? 0,
'failed' => $results['FAILED'] ?? 0,
'pending' => $results['PENDING'] ?? 0,
'in_progress' => $results['IN_PROGRESS'] ?? 0,
'file_generated' => $results['FILE_GENERATED'] ?? 0,
];
}
/**
* Requeue failed items back to appropriate queue using smart retry strategy.
*
* @param int $jobid
* The local Drupal job ID.
* @param string $capi_job_id
* The CAPI job ID.
* @param array $failed_item_ids
* Optional array of specific item IDs to requeue. If empty, all failed items will be requeued.
*
* @return array
* Result with success status and requeued items count.
*/
public function requeueFailedItems($jobid, $capi_job_id, array $failed_item_ids = []): array {
try {
// Load job to determine retry strategy
/** @var \Drupal\tmgmt\JobInterface $job */
$job = $this->entityTypeManager->getStorage('tmgmt_job')->load($jobid);
if (!$job) {
throw new \Exception("Job {$jobid} not found");
}
// Determine upload scenario
$one_export_file = $job->getTranslator()->getSetting('one_export_file');
$zip_transfer = $job->getTranslator()->getSetting('transfer-settings');
$is_single_response = $one_export_file || $zip_transfer;
if ($is_single_response) {
// SMART RETRY: Single file/ZIP scenarios - only retry first item
return $this->requeueSingleResponseJob($jobid, $capi_job_id);
} else {
// SEPARATE FILES: Retry individual failed items
return $this->requeueSeparateFilesJob($jobid, $capi_job_id, $failed_item_ids);
}
} catch (\Exception $e) {
$this->logger->get('TMGMT_CONTENTAPI')->error(
'Error requeuing failed items for job @job_id: @error',
['@job_id' => $jobid, '@error' => $e->getMessage()]
);
return [
'success' => FALSE,
'message' => 'Error requeuing failed items: ' . $e->getMessage(),
'requeued_count' => 0
];
}
}
/**
* Requeue single-response job (Single File/ZIP) - only retry first item.
*
* @param int $jobid
* The local Drupal job ID.
* @param string $capi_job_id
* The CAPI job ID.
*
* @return array
* Result with success status and details.
*/
private function requeueSingleResponseJob($jobid, $capi_job_id): array {
// Get first item marked for retry
$first_item = $this->database->select('tmgmt_capi_request_processor', 't')
->fields('t', ['tjiid', 'requestid'])
->condition('tjid', $jobid)
->condition('jobid', $capi_job_id)
->condition('statuscode', 'COMPLETE_CLEANUP_FAILED')
->orderBy('tjiid', 'ASC')
->range(0, 1)
->execute()
->fetchAssoc();
if (!$first_item) {
return ['success' => FALSE, 'message' => 'No first item found for single-response retry', 'requeued_count' => 0];
}
// Check if already in queue
if ($this->isItemInQueue(self::QUEUE_NAME_GENERATE_FILES, $first_item['tjiid'])) {
return ['success' => FALSE, 'message' => 'First item already in queue', 'requeued_count' => 0];
}
$queue = $this->queueFactory->get(self::QUEUE_NAME_GENERATE_FILES);
// Load the job and job item entities to get required data
/** @var \Drupal\tmgmt\JobInterface $job */
$job = $this->entityTypeManager->getStorage('tmgmt_job')->load($jobid);
/** @var \Drupal\tmgmt\JobItemInterface $job_item */
$job_item = $this->entityTypeManager->getStorage('tmgmt_job_item')->load($first_item['tjiid']);
if (!$job || !$job_item) {
return ['success' => FALSE, 'message' => 'Job or first item not found for single-response retry', 'requeued_count' => 0];
}
// Get job settings for queue data
$one_export_file = $job->getTranslator()->getSetting('one_export_file');
$is_translation_memory = $job->getTranslator()->getSetting('is_translation_memory') ?? FALSE;
// Get total items count for this job
$total_items = count($job->getItems());
// Generate proper file paths using ExportJobFiles service
$file_paths = $this->generateFilePathsForQueue($job, $capi_job_id, $is_translation_memory);
// Build proper queue data structure matching expected format
$queue_data = [
'id' => $job_item->id(), // Correct key name: 'id' not 'item_id'
'jobid' => $job->id(), // Local Drupal job ID
'capi_job_id' => $capi_job_id, // Remote CAPI job ID
'translator' => $job->getTranslator(), // Translator plugin object
'is_translation_memory' => $is_translation_memory, // TM setting
'getRemoteSourceLanguage' => $job->getRemoteSourceLanguage(), // Source language
'getRemoteTargetLanguage' => $job->getRemoteTargetLanguage(), // Target language
'total_items' => $total_items, // Total job items count
'one_export_file' => $one_export_file, // Export file setting
'allFilesPath' => $file_paths['allFilesPath'], // Proper file path for retry from service
'zipPath' => $file_paths['zipPath'], // Proper zip path for retry from service
// Add retry-specific flags for single response handling
'retry_attempt' => TRUE,
'single_response_retry' => TRUE, // Flag for upload handler
'propagate_success' => TRUE, // Update all items on success
]; // Add to queue
$queue->createItem($queue_data);
// Reset first item for retry
$this->resetItemForRetry($first_item['tjiid']);
$this->logger->get('TMGMT_CONTENTAPI')->info(
'SMART RETRY: Requeued first item @item_id for single-response job @job_id (will propagate success to all items)',
['@item_id' => $first_item['tjiid'], '@job_id' => $jobid]
);
return [
'success' => TRUE,
'message' => 'Single-response retry: First item requeued (success will propagate to all items)',
'requeued_count' => 1,
'strategy' => 'SINGLE_RESPONSE_RETRY',
'first_item' => $first_item['tjiid']
];
}
/**
* Requeue separate files job - retry individual failed items.
*
* @param int $jobid
* The local Drupal job ID.
* @param string $capi_job_id
* The CAPI job ID.
* @param array $failed_item_ids
* Optional specific item IDs to requeue.
*
* @return array
* Result with success status and details.
*/
private function requeueSeparateFilesJob($jobid, $capi_job_id, array $failed_item_ids = []): array {
// Get failed items
$failed_items = $this->getFailedUploadItems($jobid, $capi_job_id);
if (empty($failed_items)) {
return ['success' => TRUE, 'message' => 'No failed items found', 'requeued_count' => 0];
}
// Filter to specific items if provided
if (!empty($failed_item_ids)) {
$failed_items = array_filter($failed_items, function($item) use ($failed_item_ids) {
return in_array($item['tjiid'], $failed_item_ids);
});
}
$requeued_count = 0;
$queue = $this->queueFactory->get('generate_file_for_translation_to_capi');
foreach ($failed_items as $item) {
// Remove item from queue send_file_for_translation_to_capi if exists
$this->removeItemFromQueue('send_file_for_translation_to_capi', $item['tjiid']);
// Check if item already exists in queue to prevent duplicates
if (!$this->isItemInQueue('generate_file_for_translation_to_capi', $item['tjiid'])) {
// Load the job and job item entities to get required data
/** @var \Drupal\tmgmt\JobInterface $job */
$job = $this->entityTypeManager->getStorage('tmgmt_job')->load($jobid);
/** @var \Drupal\tmgmt\JobItemInterface $job_item */
$job_item = $this->entityTypeManager->getStorage('tmgmt_job_item')->load($item['tjiid']);
if (!$job || !$job_item) {
$this->logger->get('TMGMT_CONTENTAPI')->error('Job @job_id or item @item_id not found for requeue', ['@job_id' => $jobid, '@item_id' => $item['tjiid']]);
continue;
}
// Get job settings for queue data
$one_export_file = $job->getTranslator()->getSetting('one_export_file');
$is_translation_memory = $job->getTranslator()->getSetting('is_translation_memory') ?? FALSE;
// Get total items count for this job
$total_items = count($job->getItems());
// Generate proper file paths using ExportJobFiles service
$file_paths = $this->generateFilePathsForQueue($job, $capi_job_id, $is_translation_memory);
// Build proper queue data structure matching expected format
$queue_data = [
'id' => $job_item->id(), // Correct key name: 'id' not 'item_id'
'jobid' => $job->id(), // Local Drupal job ID
'capi_job_id' => $capi_job_id, // Remote CAPI job ID
'translator' => $job->getTranslator(), // Translator plugin object
'is_translation_memory' => $is_translation_memory, // TM setting
'getRemoteSourceLanguage' => $job->getRemoteSourceLanguage(), // Source language
'getRemoteTargetLanguage' => $job->getRemoteTargetLanguage(), // Target language
'total_items' => $total_items, // Total job items count
'one_export_file' => $one_export_file, // Export file setting
'allFilesPath' => $file_paths['allFilesPath'], // Proper file path for retry from service
'zipPath' => $file_paths['zipPath'], // Proper zip path for retry from service
];
// Add to queue
$queue->createItem($queue_data);
// Reset database status for retry
$this->resetItemForRetry($item['tjiid']);
$requeued_count++;
$this->logger->get('TMGMT_CONTENTAPI')->info(
'Requeued individual failed item @item_id for separate files job @job_id',
['@item_id' => $item['tjiid'], '@job_id' => $jobid]
);
}
}
return [
'success' => TRUE,
'message' => "Successfully requeued {$requeued_count} individual failed items",
'requeued_count' => $requeued_count,
'strategy' => 'SEPARATE_FILES_RETRY',
'failed_items' => array_keys($failed_items)
];
}
/**
* Check if an item already exists in a specific queue.
*
* @param string $queue_name
* The queue name to check.
* @param int $item_id
* The item ID to look for.
*
* @return bool
* TRUE if item exists in queue, FALSE otherwise.
*/
public function isItemInQueue(string $queue_name, $item_id): bool {
$results = $this->database->select('queue', 'q')
->fields('q', ['data'])
->condition('name', $queue_name)
->execute()
->fetchCol();
foreach ($results as $serialized_data) {
$data = unserialize($serialized_data);
// Check for both 'id' (new correct format) and 'item_id' (legacy format) for compatibility
if ((isset($data['id']) && $data['id'] == $item_id) ||
(isset($data['item_id']) && $data['item_id'] == $item_id)) {
return TRUE;
}
}
return FALSE;
}
/**
* Reset item status for retry processing.
*
* @param int $item_id
* The item ID to reset.
*/
private function resetItemForRetry($item_id): void {
$this->database->update('tmgmt_capi_request_processor')
->fields([
'file_upload_status' => 'PENDING',
'file_upload_attempts' => 0,
'errormessage' => NULL,
'lastupdated' => $this->database->query('SELECT UTC_TIMESTAMP()')->fetchField(),
])
->condition('tjiid', $item_id)
->condition('file_upload_status', 'READY_FOR_RETRY')
->execute();
}
/**
* Clean up partial job state when some items failed.
*
* IMPORTANT: Partial cleanup only works for "Separate Files" scenario.
* For single-file or ZIP scenarios, any failure requires complete cleanup.
*
* @param int $jobid
* The local Drupal job ID.
* @param string $capi_job_id
* The CAPI job ID.
*
* @return array
* Cleanup result.
*/
public function cleanupPartialJobFailure($jobid, $capi_job_id): array {
try {
$status = $this->getJobUploadStatus($jobid, $capi_job_id);
// If all items failed, delegate to complete cleanup
if ($status['failed'] == $status['total']) {
return [
'success' => FALSE,
'message' => 'All items failed - requires complete cleanup via JobUploadManagerService',
'should_fallback' => TRUE,
'status' => $status
];
}
// Load job to check upload scenario
/** @var \Drupal\tmgmt\JobInterface $job */
$job = $this->entityTypeManager->getStorage('tmgmt_job')->load($jobid);
if (!$job) {
throw new \Exception("Job {$jobid} not found");
}
$one_export_file = $job->getTranslator()->getSetting('one_export_file');
$zip_transfer = $job->getTranslator()->getSetting('transfer-settings');
// Determine if partial cleanup is possible
$is_separate_files = !$one_export_file && !$zip_transfer;
if (!$is_separate_files) {
// Single file or ZIP scenarios - cannot do partial cleanup
$scenario = '';
if ($one_export_file && !$zip_transfer) {
$scenario = 'Single File';
} elseif ($one_export_file && $zip_transfer) {
$scenario = 'ZIP + Single File';
} elseif (!$one_export_file && $zip_transfer) {
$scenario = 'ZIP + Multiple Files';
}
$this->logger->get('TMGMT_CONTENTAPI')->warning(
'Partial cleanup not supported for @scenario scenario (job @job_id) - requires complete cleanup',
['@scenario' => $scenario, '@job_id' => $jobid]
);
return [
'success' => FALSE,
'message' => "Partial cleanup not supported for {$scenario} scenario - requires complete cleanup",
'should_fallback' => TRUE,
'scenario' => $scenario,
'status' => $status
];
}
// Separate Files scenario - can perform partial cleanup
$failed_items = $this->getFailedUploadItems($jobid, $capi_job_id);
$cleaned_count = 0;
if (!empty($failed_items)) {
// Reset failed items to retryable state (don't delete - allows reprocessing)
$failed_item_ids = array_keys($failed_items);
$cleaned_count = $this->database->update('tmgmt_capi_request_processor')
->fields([
'file_upload_status' => 'READY_FOR_RETRY',
'file_upload_attempts' => 0,
'errormessage' => 'Reset after partial cleanup - ready for retry',
'lastupdated' => $this->database->query('SELECT UTC_TIMESTAMP()')->fetchField(),
])
->condition('tjid', $jobid)
->condition('jobid', $capi_job_id)
->condition('tjiid', $failed_item_ids, 'IN')
->condition('statuscode', 'SENDING')
->execute();
$this->logger->get('TMGMT_CONTENTAPI')->info(
'Partial cleanup for Separate Files scenario: reset @count failed items for retry in job @job_id (preserved @successful successful items)',
[
'@count' => $cleaned_count,
'@job_id' => $jobid,
'@successful' => $status['uploaded'],
'@action' => 'RESET_FOR_RETRY'
]
);
// Automatically requeue the reset items
if ($cleaned_count > 0) {
$requeue_result = $this->requeueFailedItems($jobid, $capi_job_id, $failed_item_ids);
$this->logger->get('TMGMT_CONTENTAPI')->info(
'Auto-requeue after partial cleanup: @requeued_count items requeued for job @job_id (success: @success)',
[
'@requeued_count' => $requeue_result['requeued_count'] ?? 0,
'@job_id' => $jobid,
'@success' => $requeue_result['success'] ? 'true' : 'false'
]
);
}
}
return [
'success' => TRUE,
'message' => 'Partial cleanup completed for Separate Files scenario - failed items reset and requeued for retry',
'scenario' => 'Separate Files',
'reset_items' => $cleaned_count,
'preserved_items' => $status['uploaded'],
'action' => 'RESET_AND_REQUEUED',
'requeue_result' => $requeue_result ?? ['success' => FALSE, 'requeued_count' => 0],
'status' => $status
];
} catch (\Exception $e) {
$this->logger->get('TMGMT_CONTENTAPI')->error(
'Partial cleanup failed for job @job_id: @error',
['@job_id' => $jobid, '@error' => $e->getMessage()]
);
return [
'success' => FALSE,
'message' => 'Cleanup failed: ' . $e->getMessage()
];
}
}
/**
* Get all failed jobs from complete cleanup failures that can be retried.
*
* @return array
* Array of failed jobs with their details.
*/
public function getCompleteCleanupFailures(): array {
$query = $this->database->select('tmgmt_capi_request_processor', 't')
->fields('t', ['tjid', 'jobid', 'errormessage', 'lastupdated'])
->condition('statuscode', 'COMPLETE_CLEANUP_FAILED')
->condition('haserror', 1)
->groupBy(['tjid', 'jobid', 'errormessage', 'lastupdated'])
->orderBy('lastupdated', 'DESC');
return $query->execute()->fetchAllAssoc('tjid', \PDO::FETCH_ASSOC);
}
/**
* Reset a job from complete cleanup failure back to processable state.
*
* @param int $jobid
* The local Drupal job ID.
* @param string $capi_job_id
* The CAPI job ID.
*
* @return array
* Result of the reset operation.
*/
public function resetCompleteCleanupFailure($jobid, $capi_job_id): array {
try {
// Reset all records for this job back to CREATED state for retry
$reset_count = $this->database->update('tmgmt_capi_request_processor')
->fields([
'statuscode' => 'CREATED',
'haserror' => 0,
'errormessage' => 'Reset from complete cleanup failure - ready for retry',
'file_upload_status' => 'PENDING',
'file_upload_attempts' => 0,
'lastupdated' => $this->database->query('SELECT UTC_TIMESTAMP()')->fetchField(),
])
->condition('tjid', $jobid)
->condition('jobid', $capi_job_id)
->condition('statuscode', 'COMPLETE_CLEANUP_FAILED')
->execute();
// Reset job state to unprocessed so it can be resubmitted
/** @var \Drupal\tmgmt\JobInterface $job */
$job = $this->entityTypeManager->getStorage('tmgmt_job')->load($jobid);
if ($job) {
$job->setState(JobInterface::STATE_UNPROCESSED);
$job->save();
}
$this->logger->get('TMGMT_CONTENTAPI')->info(
'Reset @count records from complete cleanup failure for job @job_id - ready for resubmission',
['@count' => $reset_count, '@job_id' => $jobid]
);
return [
'success' => TRUE,
'message' => "Job {$jobid} reset from complete cleanup failure - ready for resubmission",
'reset_count' => $reset_count
];
} catch (\Exception $e) {
$this->logger->get('TMGMT_CONTENTAPI')->error(
'Error resetting complete cleanup failure for job @job_id: @error',
['@job_id' => $jobid, '@error' => $e->getMessage()]
);
return [
'success' => FALSE,
'message' => 'Reset failed: ' . $e->getMessage()
];
}
}
/**
* Propagate success from first item to all waiting items in single-response scenario.
*
* @param int $jobid
* The local Drupal job ID.
* @param string $capi_job_id
* The CAPI job ID.
* @param int $first_item_id
* The first item ID that succeeded.
*
* @return array
* Result of the propagation operation.
*/
public function propagateSuccessToWaitingItems($jobid, $capi_job_id, $first_item_id): array {
try {
// Update all waiting items to UPLOADED status
$updated_count = $this->database->update('tmgmt_capi_request_processor')
->fields([
'statuscode' => 'SENT_TO_PROVIDER', // Match successful status
'haserror' => 0,
'errormessage' => 'Success propagated from first item',
'file_upload_status' => 'UPLOADED',
'lastupdated' => $this->database->query('SELECT UTC_TIMESTAMP()')->fetchField(),
])
->condition('tjid', $jobid)
->condition('jobid', $capi_job_id)
->condition('statuscode', 'WAITING_FOR_RETRY')
->condition('tjiid', $first_item_id, '!=') // Don't update the first item itself
->execute();
$this->logger->get('TMGMT_CONTENTAPI')->info(
'SUCCESS PROPAGATION: Updated @count waiting items to UPLOADED after first item @first_item success (job @job_id)',
[
'@count' => $updated_count,
'@first_item' => $first_item_id,
'@job_id' => $jobid
]
);
return [
'success' => TRUE,
'message' => "Success propagated to {$updated_count} waiting items",
'updated_count' => $updated_count
];
} catch (\Exception $e) {
$this->logger->get('TMGMT_CONTENTAPI')->error(
'Error propagating success for job @job_id: @error',
['@job_id' => $jobid, '@error' => $e->getMessage()]
);
return [
'success' => FALSE,
'message' => 'Error propagating success: ' . $e->getMessage()
];
}
}
/**
* Get jobs that have items waiting for retry completion.
*
* @return array
* List of jobs with waiting items.
*/
public function getJobsWithWaitingItems(): array {
$query = $this->database->select('tmgmt_capi_request_processor', 't')
->fields('t', ['tjid', 'jobid'])
->condition('statuscode', 'WAITING_FOR_RETRY')
->groupBy(['tjid', 'jobid'])
->execute();
return $query->fetchAllAssoc('tjid', \PDO::FETCH_ASSOC);
}
/**
* Generate file paths for queue processing without setting internal state.
*
* @param \Drupal\tmgmt\JobInterface $job
* The job object.
* @param string $capi_job_id
* The CAPI job ID.
* @param bool $is_translation_memory
* Whether this is a translation memory update.
*
* @return array
* Array containing 'allFilesPath' and 'zipPath' for queue processing.
*/
public function generateFilePathsForQueue(JobInterface $job, string $capi_job_id, bool $is_translation_memory = FALSE): array {
// Generate directory name using same logic as jobFilesToTransfer
if ($is_translation_memory) {
$dir_name_all_files = $capi_job_id . '_tmupdate_' . $job->id() . "_" . $job->getRemoteSourceLanguage() . "_" . $job->getRemoteTargetLanguage() . "_tm";
} else {
$dir_name_all_files = $capi_job_id . '_' . $job->id() . "_" . $job->getRemoteSourceLanguage() . "_" . $job->getRemoteTargetLanguage();
}
$zip_name = self::ZIP_JOB_PATH . $dir_name_all_files . self::ZIP_EXTENSION;
$all_files_path = $job->getSetting('scheme') . self::PUBLIC_SENT_FILE_PATH . $dir_name_all_files;
$zip_path = $all_files_path . "/" . $zip_name;
return [
'allFilesPath' => $all_files_path,
'zipPath' => $zip_path,
'dirName' => $dir_name_all_files,
'zipName' => $zip_name,
];
}
}
