lionbridge_translation_provider-8.x-2.4/tmgmt_contentapi/src/Services/CapiDataProcessor.php
tmgmt_contentapi/src/Services/CapiDataProcessor.php
<?php
namespace Drupal\tmgmt_contentapi\Services;
use Drupal\Core\Database\Connection;
use Drupal\Core\Datetime\DrupalDateTime;
use Drupal\Core\Entity\EntityTypeManagerInterface;
use Drupal\Core\Logger\LoggerChannelFactoryInterface;
use Drupal\tmgmt\Entity\Job;
use Drupal\tmgmt\JobInterface;
use Drupal\tmgmt_contentapi\Swagger\Client\Api\JobApi;
use Drupal\tmgmt_contentapi\Swagger\Client\Api\ProviderApi;
use Drupal\tmgmt_contentapi\Swagger\Client\Api\StatusUpdateApi;
use Drupal\tmgmt_contentapi\Swagger\Client\Api\TokenApi;
use Symfony\Component\DependencyInjection\ContainerInterface;
use Drupal\tmgmt_contentapi\Services\QueueOperations;
/**
* Class CapiDataProcessor.
*
* CAPI Data Processor at Drupal End.
*
* @category Class
* @package Drupal\tmgmt_contentapi\Services
* @author Lionbridge Team
*/
class CapiDataProcessor {
const IGNORED = 'IGNORED';
const TO_PROCESS = 'TO_PROCESS';
const SCANNED = 'SCANNED';
const IMPORTED = 'IMPORTED';
const CANCELLED = 'CANCELLED';
const COMPLETED = 'COMPLETED';
const CREATED = 'CREATED';
const IN_QUEUE = 'IN_QUEUE';
const SENDING = 'SENDING';
/**
* Queue name for migration jobs.
*/
const QUEUE_NAME_MIGRATE_JOBS = 'migrate_jobs_to_new_structure_queue';
/**
* The capi token.
*
* @var string
*/
protected $capiToken;
/**
* The job.
*
* @var \Drupal\tmgmt\JobInterface
*/
protected $job;
/**
* The job api.
*
* @var \Drupal\tmgmt_contentapi\Swagger\Client\Api\JobApi
*/
protected $jobApi;
/**
* Token API object.
*
* @var \Drupal\tmgmt_contentapi\Swagger\Client\Api\TokenApi
*/
protected $capi;
/**
* The filesystem service.
*
* @var \Drupal\Core\Datetime\DrupalDateTime
*/
protected $drupalDateTime;
/**
* The translator object.
*
* @var \Drupal\tmgmt\TranslatorInterface
*/
protected $translator;
/**
* Status api object.
*
* @var \Drupal\tmgmt_contentapi\Swagger\Client\Api\StatusUpdateApi
*/
protected $statusApi;
/**
* The capi v2 jobs array.
*
* @var array
*/
protected $capiJobs;
/**
* The database connection.
*
* @var \Drupal\Core\Database\Connection
*/
protected $dbconnection;
/**
* The database connection.
*
* @var \Drupal\tmgmt_contentapi\Swagger\Client\Api\ProviderApi
*/
protected $provider;
/**
* Logger Factory.
*
* @var \Drupal\Core\Logger\LoggerChannelFactoryInterface
*/
protected $loggerFactory;
/**
* Entity type manager.
*
* @var \Drupal\Core\Entity\EntityTypeManagerInterface
*/
protected $entityTypeManager;
/**
* The queue operations.
*
* @var \Drupal\tmgmt_contentapi\Services\QueueOperations
*/
protected $queueOperations;
/**
* Constructor.
*
* @param \Drupal\Core\Database\Connection $connection
* The database connection.
* @param \Drupal\Core\Logger\LoggerChannelFactoryInterface $logger_factory
* The logger factory.
* @param \Drupal\Core\Entity\EntityTypeManagerInterface $entityTypeManager
* The entity type manager.
* @param \Drupal\tmgmt_contentapi\Services\QueueOperations $queueOperations
* Service for queue operations.
*/
public function __construct(Connection $connection, LoggerChannelFactoryInterface $logger_factory, EntityTypeManagerInterface $entityTypeManager, QueueOperations $queueOperations,) {
// Injected objects.
$this->dbconnection = $connection;
$this->loggerFactory = $logger_factory;
$this->entityTypeManager = $entityTypeManager;
$this->queueOperations = $queueOperations;
// Intantiated objects.
$this->jobApi = new JobApi();
$this->capi = new TokenApi();
$this->statusApi = new StatusUpdateApi();
$this->provider = new ProviderApi();
$this->drupalDateTime = new DrupalDateTime('now', new \DateTimeZone('Z'));
}
/**
* Create a new instance of the CapiDataProcessor.
*
* @param \Symfony\Component\DependencyInjection\ContainerInterface $container
* The container.
*
* @return \Drupal\tmgmt_contentapi\Services\CapiDataProcessor
* The CapiDataProcessor object.
*/
public static function create(ContainerInterface $container) {
return new static(
$container->get('database'),
$container->get('logger.factory'),
$container->get('entity_type.manager')
);
}
/**
* Function to set the translator.
*/
protected function getTranslator() {
// Get all translators.
$translators = $this->entityTypeManager->getStorage('tmgmt_translator')->loadByProperties(['plugin' => 'contentapi']);
if (!$translators) {
return [];
}
return $translators['contentapi'];
}
/**
* Function to store data when job is created.
*
* @param array $job_details
* Job details.
*
* @return string
* Last inserted id
*/
public function storeDataWhenJobCreated(array $job_details) {
try {
$fields = [
'tjid' => $job_details['tjid'],
'tjiid' => $job_details['tjiid'],
'updateid' => '0',
'jobid' => $job_details['job_id'],
'providerid' => $job_details['provider_id'],
'requestid' => $job_details['request_id'],
'statuscode' => $job_details['job_status'],
'updatedtime' => ($job_details['update_time']) ? $job_details['update_time'] : $this->dbconnection->query('SELECT UTC_TIMESTAMP()')->fetchField(),
'haserror' => 0,
'errormessage' => '',
'status' => self::IGNORED,
'lastupdated' => $this->dbconnection->query('SELECT UTC_TIMESTAMP()')->fetchField(),
];
// Initialize file upload tracking for CREATED statuscode records.
if ($job_details['job_status'] === self::SENDING) {
$fields['file_upload_status'] = $job_details['file_upload_status'];
$fields['file_upload_attempts'] = 0;
}
$this->dbconnection->insert('tmgmt_capi_request_processor')
->fields($fields)
->execute();
// Get last inserted id.
return $this->dbconnection->lastInsertId();
}
catch (\Exception $e) {
// Log the error if failed to insert the data.
$this->loggerFactory->get('tmgmt_contentapi')->error('Error while inserting data into tmgmt_capi_request_processor table: @error', ['@error' => $e->getMessage()]);
}
}
/**
* Function to process the data from CAPI.
*
* @param string $capi_job_id
* Capi job id.
* @param string $tjid
* Drupal job id.
*/
public function scanStatusData(string $capi_job_id = '', string $tjid = '') {
// If job id and capi_job_id is set then fetch statused only for job related translator.
if ($capi_job_id != '' && $tjid != '') {
$job = job::load($tjid);
$translator = $job->getTranslator();
$this->capiToken = \DRUPAL::service('tmgmt_contentapi.capi_details')->getCapiToken($translator);
$this->capiJobs = $this->statusApi->statusupdatesGet($this->capiToken);
}
else {
// If auto cron runs then we can fetch updates using default translator.
$translators = $this->entityTypeManager->getStorage('tmgmt_translator')->loadByProperties(['plugin' => 'contentapi']);
$fetched_jobs = [];
$unique_translators = [];
// Loop through each unique translator and collect the capisettings data with key as translator id.
foreach ($translators as $translator) {
if (in_array($translator->getSetting('capi-settings')['capi_username_ctt'], $unique_translators)) {
continue;
}
$unique_translators[] = $translator->getSetting('capi-settings')['capi_username_ctt'];
$this->capiToken = \DRUPAL::service('tmgmt_contentapi.capi_details')->getCapiToken($translator);
// Get statuses for both translators.
$fetched_jobs[$translator->getOriginalId()] = $this->statusApi->statusupdatesGet($this->capiToken);
}
// Get statuses for all translators.
$this->capiJobs = call_user_func_array('array_merge', array_values($fetched_jobs));
}
// Process the data and pass it to the database.
if (empty($this->capiJobs)) {
return;
}
// Loop through the capiJobs array.
foreach ($this->capiJobs as $job_info) {
// If jobid is set and not equal to job_info['job_id'] then continue with next foreach loop.
// This will help to process only specific job. To avoid overhead when user want to import spcific job.
if ($capi_job_id != '' && $capi_job_id != $job_info['job_id']) {
continue;
}
// If jobid not exist then continue with next foreach loop.
if (!$this->isJobIdExist($job_info['job_id'])) {
continue;
}
// Add status in processor.
$this->addStatusData($job_info);
}
}
/**
* Add status data into the database.
*
* @param array $job_info
* Job details for migration.
*/
protected function addStatusData($job_info) {
// Check if the jobinfo is empty.
if (empty($job_info)) {
return;
}
// Insert the data into the database.
// Is job item exist in this DB.
$job_item = FALSE;
try {
foreach ($job_info['request_ids'] as $request_id) {
// Get tjiid from tmgmt_capi_request_processor table on the basis of request id.
// Raw query for reference.
// SELECT tjid, tjiid, providerid FROM tmgmt_capi_request_processor WHERE statuscode = 'CREATED' AND requestid = '1234' AND jobid = '1234';.
$result = $this->dbconnection->select('tmgmt_capi_request_processor', 'tcp')
->fields('tcp', ['tjid', 'tjiid', 'providerid'])
->condition('tcp.statuscode', self::CREATED)
->condition('tcp.requestid', $request_id)
->condition('tcp.jobid', $job_info['job_id'])
->execute()
->fetchAll();
// TODO: Check if this can be directly added via mysql query, rather than fetching and then inserting.
// Insert the data into the database.
foreach ($result as $tmgmtids) {
$job_item = TRUE;
$this->insertRequestRecords($tmgmtids, $request_id, $job_info);
}
}
try {
// Aknowledge the update id once all details are stored in the database.
if ($job_item) {
$this->loggerFactory->get('status scanner')->notice(
'Status scanner - Job: @jobid, UpdateID: @update_id, RequestIDs: @request_ids',
[
'@jobid' => $job_info['job_id'],
'@update_id' => $job_info['update_id'],
'@request_ids' => json_encode($job_info['request_ids']),
]
);
$job = JOB::load($tmgmtids->tjid);
$translator = $job->getTranslator();
$this->capiToken = \DRUPAL::service('tmgmt_contentapi.capi_details')->getCapiToken($translator);
$this->statusApi->statusupdatesUpdateIdAcknowledgePut($this->capiToken, $job_info['update_id']);
}
}
catch (\Exception $e) {
// Log the error if failed to update the data.
$this->loggerFactory->get('TMGMT_CONTENTAPI')->error('Error while status aknowledgement : @error', ['@error' => $e->getMessage()]);
}
}
catch (\Exception $e) {
// Do not aknowledge the update id if failed to store in db.
$this->loggerFactory->get('tmgmt_contentapi')->error('Error while inserting data into tmgmt_capi_request_processor table: @error', ['@error' => $e->getMessage()]);
}
}
/**
* Function to update the status of request id in the database.
*/
public function updateRequestStatus() {
// Update the status of the request id in the database.
// This query intentionally keep raw to avoid any issues.
// This query will make sure only latest record is set to TO_PROCESS.
$sql = "
UPDATE tmgmt_capi_request_processor AS t1
INNER JOIN (
SELECT tjid, jobid, statuscode, requestid, COUNT(*) AS num_records, MAX(updatedtime) AS max_updatedtime
FROM tmgmt_capi_request_processor
WHERE status = :scanned
GROUP BY tjid, jobid, statuscode, requestid
) AS t2
ON t1.jobid = t2.jobid AND t1.statuscode = t2.statuscode AND t1.requestid = t2.requestid
SET t1.status = CASE
WHEN t1.statuscode IN ('CREATED', 'SENDING', 'SENT_TO_PROVIDER', 'IN_TRANSLATION', 'CANCELLED') THEN :ignored
WHEN t1.updatedtime < t2.max_updatedtime THEN :ignored
ELSE :to_process
END;
";
$this->dbconnection->query($sql, [
':ignored' => self::IGNORED,
':to_process' => self::TO_PROCESS,
':scanned' => self::SCANNED,
]);
}
/**
* Add migration data into the database.
*
* @param array $job_info
* Job details for migration.
*/
protected function addMigrationData($job_info) {
// Check if the jobinfo is empty.
if (empty($job_info)) {
return;
}
// Insert the data into the database.
try {
foreach ($job_info['request_ids'] as $request_id) {
// Get tjiid from tmgmt_capi_request_processor table on the basis of request id.
// Raw query for reference.
// SELECT tjid, tjiid, providerid FROM tmgmt_capi_request_processor WHERE statuscode = 'CREATED' AND requestid = '1234' AND jobid = '1234';
// Ger current state of tjiid from tmgmt_job table.
// SELECT tjiid, status FROM tmgmt_job WHERE tjid = '1234';.
$result = $this->dbconnection->select('tmgmt_capi_request_processor', 'tcp')
->fields('tcp', ['tjid', 'tjiid', 'providerid'])
->condition('tcp.statuscode', self::CREATED)
->condition('tcp.requestid', $request_id)
->condition('tcp.jobid', $job_info['job_id']);
$result->innerJoin('tmgmt_job_item', 'tji', 'tcp.tjiid = tji.tjiid');
$result->addField('tji', 'state');
$result = $result->execute()->fetchAll();
// TODO: Check if this can be directly added via mysql query, rather than fetching and then inserting.
// Insert the data into the database.
foreach ($result as $tmgmtids) {
$this->insertRequestRecords($tmgmtids, $request_id, $job_info, TRUE);
}
}
}
catch (\Exception $e) {
// Do not aknowledge the update id if failed to store in db.
$this->loggerFactory->get('tmgmt_contentapi')->error('Error while inserting data into tmgmt_capi_request_processor table: @error', ['@error' => $e->getMessage()]);
}
}
/**
* Function to insert the request records.
*/
public function insertRequestRecords($tmgmtids, $request_id, $job_info, $ismigration = FALSE, $status = self::SCANNED) {
// Set status during migration else it will be default status scanned.
$lastupdated = $this->dbconnection->query('SELECT UTC_TIMESTAMP()')->fetchField();
if (isset($tmgmtids->state) && $ismigration) {
if ($tmgmtids->state == 1) {
$status = self::TO_PROCESS;
}
elseif ($tmgmtids->state == 2) {
$status = self::IMPORTED;
}
elseif ($tmgmtids->state == 3) {
$status = self::COMPLETED;
}
if (in_array($job_info['status_code'],
[
'CREATED',
'SENDING',
'SENT_TO_PROVIDER',
'IN_TRANSLATION',
'CANCELLED',
])) {
$status = self::IGNORED;
}
// Avoids using identical timestamps for 'Created' and other statuses during migration
// By adding 1 minute to the last updated time to prevent status ambiguity.
if ($job_info['status_code'] != 'CREATED') {
$lastupdated = $this->dbconnection->query('SELECT DATE_ADD(UTC_TIMESTAMP(), INTERVAL 1 MINUTE)')->fetchField();
}
}
// Insert the data into the database.
$this->dbconnection->insert('tmgmt_capi_request_processor')
->fields([
'tjid' => $tmgmtids->tjid,
'tjiid' => $tmgmtids->tjiid,
'updateid' => $job_info['update_id'],
'jobid' => $job_info['job_id'],
'providerid' => $tmgmtids->providerid,
'requestid' => $request_id,
'statuscode' => $job_info['status_code'],
'updatedtime' => $job_info['update_time']->format('Y-m-d H:i:s.u'),
'haserror' => ($job_info['has_error']) ? 1 : 0,
'errormessage' => !empty($job_info['error_message']) ? substr($job_info['error_message'], 0, 200) : '',
'status' => $status,
'lastupdated' => $lastupdated,
])
->execute();
}
/**
* Function to set the request as processed.
*/
public function setRequestProgressStatus($rid, $progressStatus) {
try {
// Get a requestid from rid, check if some more records exist with same requestid then update all of them.
$requestid = $this->dbconnection->select('tmgmt_capi_request_processor', 'sub')
->fields('sub', ['requestid'])
->condition('sub.rid', $rid)
->execute()
->fetchField();
$this->dbconnection->update('tmgmt_capi_request_processor')
->fields([
'status' => $progressStatus,
'lastupdated' => $this->dbconnection->query('SELECT UTC_TIMESTAMP()')->fetchField(),
])
->condition('requestid', $requestid, '=')
->condition('status', [self::IN_QUEUE, self::TO_PROCESS], 'IN')
->execute();
}
catch (\Exception $e) {
// Log the error if failed to update the data.
$this->loggerFactory->get('tmgmt_contentapi')->error('Error while updating data in tmgmt_capi_request_processor table: @error', ['@error' => $e->getMessage()]);
}
}
/**
* Function to update the status of tjiid/rid in the database.
*/
public function setRequestProgressStatusUsingItemId($tjiid, $progressStatus) {
try {
// Get rid from tjiid and status IMPORTED.
$rid = $this->dbconnection->select('tmgmt_capi_request_processor', 'sub')
->fields('sub', ['rid'])
->condition('sub.tjiid', $tjiid)
->condition('sub.status', self::IMPORTED)
->execute()
->fetchField();
$this->dbconnection->update('tmgmt_capi_request_processor')
->fields([
'status' => $progressStatus,
'lastupdated' => $this->dbconnection->query('SELECT UTC_TIMESTAMP()')->fetchField(),
])
->condition('rid', $rid, '=')
->execute();
}
catch (\Exception $e) {
// Log the error if failed to update the data.
$this->loggerFactory->get('tmgmt_contentapi')->error('Error while updating data in tmgmt_capi_request_processor table: @error', ['@error' => $e->getMessage()]);
}
}
/**
* Function to get all request id which are ready to import.
*/
public function getAllReadyRequestIdToImport($tjid = '') {
// Get the data from the database which set to TO_PRCESS.
// Raw Query for reference
// SELECT * FROM tmgmt_capi_request_processor WHERE status = 'TO_PROCESS' ORDER BY providerid ASC;.
$query = $this->dbconnection->select('tmgmt_capi_request_processor', 't1');
$query->fields('t1', [
'rid',
'tjid',
'updateid',
'jobid',
'providerid',
'requestid',
'statuscode',
'status'
]);
// Create a subquery to find the first rid for each requestid.
$subquery = $this->dbconnection->select('tmgmt_capi_request_processor', 't2');
$subquery->addField('t2', 'requestid');
$subquery->addExpression('MIN(rid)', 'min_rid');
$subquery->condition('t2.status', [self::TO_PROCESS, self::IN_QUEUE], 'IN');
if (!empty($tjid)) {
$subquery->condition('t2.tjid', $tjid);
}
$subquery->groupBy('t2.requestid');
// Join with subquery to get only one row per requestid.
$query->join($subquery, 'subq', 't1.rid = subq.min_rid');
// Add original conditions to main query.
$group = $query->orConditionGroup()
->condition('t1.status', self::TO_PROCESS)
->condition('t1.status', self::IN_QUEUE);
$query->condition($group);
if (!empty($tjid)) {
$query->condition('t1.tjid', $tjid);
}
// Add sorting.
$query->orderBy('t1.providerid', 'ASC');
$query->orderBy('t1.tjid', 'ASC');
$result = $query->execute()->fetchAll();
return $result;
}
/**
* Function to get latest status of a job from DB.
*/
public function getGlobalColumnInfoForJobOverview($tjid, $is_single_job = FALSE) {
$static_job_capi_details = &drupal_static(__FUNCTION__, []);
// Check if job details already exist in static cache.
if (!empty($static_job_capi_details)) {
// Check if key $job_id exist else return empty array.
if (isset($static_job_capi_details[$tjid])) {
return $static_job_capi_details[$tjid];
}
else {
return [];
}
}
if ($is_single_job) {
$job_ids = [$tjid];
}
else {
// Get all job ids from view.
$job_ids = tmgmt_contentapi_get_view_entity_ids('tmgmt_job_overview');
}
// Prepare placeholders for query.
foreach ($job_ids as $index => $job_id) {
$placeholder = ":job_id_$index";
$job_id_placeholders[] = $placeholder;
$query_arguments[$placeholder] = $job_id;
}
// Build the query.
$query = "
SELECT
tjid,
jobid,
providerid,
CASE
WHEN COUNT(DISTINCT tjiid) = SUM(CASE WHEN (statuscode = 'REVIEW_TRANSLATION' OR statuscode = 'TRANSLATION_APPROVED') AND status = :completed THEN 1 ELSE 0 END) THEN :completed
WHEN COUNT(DISTINCT tjiid) = SUM(CASE WHEN (statuscode = 'REVIEW_TRANSLATION' OR statuscode = 'TRANSLATION_APPROVED') AND status = :imported THEN 1 ELSE 0 END) THEN :imported
WHEN COUNT(DISTINCT tjiid) = SUM(CASE WHEN statuscode = 'COMPLETED' AND status = :completed THEN 1 ELSE 0 END) THEN :completed
WHEN SUM(CASE WHEN statuscode = 'IN_TRANSLATION' THEN 1 ELSE 0 END) > 0 THEN 'IN_TRANSLATION'
WHEN SUM(CASE WHEN statuscode = 'SENDING' THEN 1 ELSE 0 END) > 0 THEN 'SENDING'
WHEN SUM(CASE WHEN statuscode = 'SENT_TO_PROVIDER' THEN 1 ELSE 0 END) > 0 THEN 'SENT_TO_PROVIDER'
WHEN SUM(CASE WHEN statuscode = 'CANCELLED' THEN 1 ELSE 0 END) > 0 THEN :cancelled
WHEN SUM(CASE WHEN statuscode = 'REVIEW_TRANSLATION' THEN 1 ELSE 0 END) > 0
AND SUM(CASE WHEN status = :to_process THEN 1 ELSE 0 END) > 0
OR SUM(CASE WHEN status = :scanned THEN 1 ELSE 0 END) > 0
OR SUM(CASE WHEN status = :inqueue THEN 1 ELSE 0 END) > 0
OR SUM(CASE WHEN status = :imported THEN 1 ELSE 0 END) > 0 THEN 'REVIEW_TRANSLATION'
WHEN SUM(CASE WHEN statuscode = 'COMPLETE_CLEANUP_FAILED' AND haserror = 1 THEN 1 ELSE 0 END) > 0 THEN 'FAILED'
ELSE :created
END AS overall_job_status
FROM (
SELECT
tjid,
tjiid,
jobid,
providerid,
statuscode,
status,
haserror
FROM (
SELECT
t.*,
@row_number := IF(@prev_tjiid = tjiid, @row_number + 1, 1) AS rn,
@prev_tjiid := tjiid
FROM tmgmt_capi_request_processor t
JOIN (SELECT @row_number := 0, @prev_tjiid := NULL) AS vars
ORDER BY tjiid,
CASE
WHEN status IN ('COMPLETED', 'IMPORTED', 'TO_PROCESS', 'IN_QUEUE', 'SCANNED') THEN 0
WHEN status = 'IGNORED' THEN 1
ELSE 2
END ASC,
COALESCE(updatedtime, '1970-01-01 00:00:00.000000') DESC,
COALESCE(lastupdated, '1970-01-01 00:00:00') DESC,
rid DESC
) sub
WHERE rn = 1
) filtered_updates
WHERE tjid IN (" . implode(", ", $job_id_placeholders) . ")
GROUP BY tjid, jobid, providerid;
";
$query_arguments += [
':completed' => self::COMPLETED,
':imported' => self::IMPORTED,
':cancelled' => self::CANCELLED,
':to_process' => self::TO_PROCESS,
':scanned' => self::SCANNED,
':inqueue' => self::IN_QUEUE,
':created' => self::CREATED,
];
$result = $this->dbconnection->query($query, $query_arguments)->fetchAll();
foreach ($result as $row) {
if (in_array($row->tjid, $job_ids)) {
$static_job_capi_details[$row->tjid] = [
'jobid' => $row->jobid,
'providerid' => $row->providerid,
'overall_job_status' => $row->overall_job_status,
];
}
else {
$static_job_capi_details[$row->tjid] = [];
}
}
if (isset($static_job_capi_details[$tjid])) {
return $static_job_capi_details[$tjid];
}
return [];
}
/**
* Function to store job details in db.
*
* @param \Drupal\tmgmt\JobInterface $job
* Job under process.
* @param string $provider_id
* Provider id.
*
* @return array
* Request ids.
*/
public function jobToinsertInProcessor(JobInterface $job, string $provider_id) {
// Get all items of job.
$job_items = $job->getItems();
// Get capi related remote settings.
$jobcpsettings = unserialize($job->getSetting('capi-remote'));
// Prepare and store data.
foreach ($jobcpsettings as $key => $value) {
$native_id = explode('_', $value[0]->getSourceNativeId());
$job_details = [
'tjid' => $native_id[0],
'tjiid' => $native_id[1],
'job_id' => $value[0]->getJobId(),
'request_id' => $value[0]->getRequestId(),
'job_status' => $value[0]->getStatusCode(),
'provider_id' => $provider_id,
'update_time' => $value[0]->getCreatedDate()->format("Y-m-d H:i:s.u"),
];
// Create a request id array for further processing.
$request_ids[] = $value[0]->getRequestId();
// Manage single export file. Add each item id with same request id.
if ($native_id[1] == 'all') {
foreach ($job_items as $item_id => $value) {
// Store each item id with same request id.
$job_details['tjiid'] = $item_id;
$this->storeDataWhenJobCreated($job_details);
}
}
else {
$this->storeDataWhenJobCreated($job_details);
}
}
return $request_ids;
}
/**
* Function to add jobs into the queue for migration to new structure.
*/
public function addJobsToQueueForMigration() {
// STEP 1: Fetch all jobs for migration
// Get all jobs which are exist in the tmgmt_capi_request_processor table.
$subquery = $this->dbconnection->select('tmgmt_capi_request_processor', 'tcp')
->fields('tcp', ['tjid']);
// Build the main query. Ignore the jobs which are already present in the tmgmt_capi_request_processor table.
$query = $this->dbconnection->select('tmgmt_job', 'tj')
->fields('tj', ['tjid'])
->condition('tjid', $subquery, 'NOT IN')
->condition('translator', 'contentapi');
$result = $query->execute()->fetchAll();
// If $result is empty then return.
if (empty($result)) {
return;
}
// Insert job id into the queue.
foreach ($result as $row) {
\Drupal::service('queue')->get(self::QUEUE_NAME_MIGRATE_JOBS)->createItem($row->tjid);
}
// Process queue.
$this->queueOperations->processQueue(self::QUEUE_NAME_MIGRATE_JOBS);
}
/**
* Function to migrate jobs which are exist before version 9.x.
*/
public function migrateExistingJob(int $tjid) {
try {
// Get provider details
// Get stored CAPI job latest status with details stored in DB.
$tmgmt_message = \Drupal::database()->select('tmgmt_message', 'c');
$tmgmt_message->fields('c', ['message']);
$tmgmt_message->condition('c.type', 'jobinfo', '=');
$tmgmt_message->condition('c.tjid', $tjid);
$qryResult2 = $tmgmt_message->execute()->fetchAll();
// If no record exist then continue with next tjid.
if (empty($qryResult2)) {
return;
}
// STEP 2: Get perticular job relate information.
// Load job.
$job = job::load($tjid);
// Prepare details to insert job into the job processor table.
if (count($qryResult2) > 0) {
$result = array_reverse($qryResult2);
$job_info_tmgmt = json_decode(array_pop($result)->message);
}
// STEP 3: Create a structure to insert data into the tmgmt_capi_request_processor table.
$request_ids = $this->jobToinsertInProcessor($job, $job_info_tmgmt->providerId);
// STEP 4: Fetch Latest status of job and insert same in the processor.
// Add latest status entry for each request id to keep job in its latest status.
// Mean if jobs status not CREATED then add latest status to the processor.
if ($job_info_tmgmt->jobStatus != self::CREATED) {
$updateTime = new DrupalDateTime(substr($job_info_tmgmt->updateTime, 0, -7), new \DateTimeZone('Z'));
$job_info = [
'update_id' => 0,
'job_id' => $job_info_tmgmt->jobId,
'status_code' => $job_info_tmgmt->jobStatus,
'update_time' => $updateTime,
'has_error' => 0,
'error_message' => '',
'request_ids' => $request_ids,
];
$this->addMigrationData($job_info);
}
$this->loggerFactory->get('tmgmt_contentapi')->notice('Jobs id @job migrated successfully', ['@job' => $tjid]);
}
catch (\Exception $e) {
// Log the error if failed to migrate the job.
$this->loggerFactory->get('tmgmt_contentapi')->error(
'Error while migrating jobid: @tjid - Error: @error',
[
'@tjid' => $tjid,
'@error' => $e->getMessage(),
]
);
return;
}
// STEP 5: Delete the job from the tmgmt_message table.
// Skip this step if job is not migrated successfully.
// Delete the job from the tmgmt_message table.
$this->dbconnection->delete('tmgmt_message')
->condition('tjid', $tjid)
->condition('type', 'jobinfo')
->execute();
}
/**
* Function to check if jobid belongs to this environment.
*
* If not then return false.
*
* @param string $job_id
* Job id.
*
* @return bool
* Return true if job id exist.
*/
public function isJobIdExist($job_id) {
$query = $this->dbconnection->select('tmgmt_capi_request_processor', 'tcp');
$query->fields('tcp', ['jobid']);
$query->condition('jobid', $job_id);
$result = $query->execute()->fetchField();
// If job id exist then return true.
if ($result) {
return TRUE;
}
return FALSE;
}
/**
* Get information as per the request id.
*
* @param string $request_id
* Request id.
* @param string $job_id
* Capi job id.
*
* @return array
* return result object.
*/
public function getRequestDetailFromProcessor($request_id, $job_id) {
$result = $this->dbconnection->select('tmgmt_capi_request_processor', 'tcp')
->fields('tcp', ['tjid', 'tjiid', 'providerid'])
->condition('tcp.statuscode', self::CREATED)
->condition('tcp.requestid', $request_id)
->condition('tcp.jobid', $job_id)
->execute()
->fetchAll();
return $result;
}
/**
* Summary of updateRequestStatusToIgnoreForSaifFail.
*
* @param int $tjid
* Job id.
* @param int $tjiid
* Item id.
* @param string $requestid
* Request Id.
* @param string $progressStatus
* New status to set.
*/
public function updatePreviousRequestStatusToNew($tjid, $tjiid, $requestid, $progressStatus) {
try {
$fields = [
'status' => $progressStatus,
];
$this->dbconnection->update('tmgmt_capi_request_processor')
->fields($fields)
->condition('requestid', $requestid, '=')
->condition('tjid', $tjid, '=')
->condition('tjiid', $tjiid, '=')
->condition('status', [self::TO_PROCESS, self::IN_QUEUE], 'IN')
->execute();
}
catch (\Exception $e) {
// Log the error if failed to update the data.
$this->loggerFactory->get('tmgmt_contentapi')->error('Error while updating data in tmgmt_capi_request_processor table: @error', ['@error' => $e->getMessage()]);
}
}
/**
* Summary of getCapiProcessorDetailsBasisOftjid.
*
* @param array $tjiid_array
* Job id.
*
* @return array
* return result object.
*/
public function getCapiProcessorDetailsBasisOftjid(array $tjiid_array) {
$result = $this->dbconnection->select('tmgmt_capi_request_processor', 'tcp')
->fields('tcp', ['tjid', 'tjiid', 'jobid', 'providerid', 'requestid', 'statuscode', 'status'])
->condition('tcp.tjiid', $tjiid_array, 'IN')
->condition('tcp.statuscode', self::CREATED)
->execute()
->fetchAll();
return array_column($result, NULL, 'tjiid');
}
/**
* Function to set item status to in_queue.
*
* @param int $rid
* Request id.
* @param string $status
* Status to set.
*/
public function setRequestItemInQueueStatus(int $rid, $status = self::IN_QUEUE) {
// Update query to set status to IN_QUEUE.
$this->dbconnection->update('tmgmt_capi_request_processor')
->fields([
'status' => $status,
'lastupdated' => $this->dbconnection->query('SELECT UTC_TIMESTAMP()')->fetchField(),
])
->condition('rid', $rid, '=')
->execute();
}
/**
* Function to set item status to in_queue.
*
* @param string $update_id
* Update id.
* @param string $status
* Status to set.
*/
public function setRequestItemInQueueStatusAsPerUpdateId(string $update_id, string $status = self::IN_QUEUE) {
// Update query to set status to IN_QUEUE.
$this->dbconnection->update('tmgmt_capi_request_processor')
->fields([
'status' => $status,
'lastupdated' => $this->dbconnection->query('SELECT UTC_TIMESTAMP()')->fetchField(),
])
->condition('updateid', $update_id, '=')
->condition('status', [self::TO_PROCESS, self::IN_QUEUE], 'IN')
->execute();
}
/**
* Function delete the records whose status is ignored and last updated is older than 1 month.
*
* This will help to keep the table clean.
*/
public function deleteIgnoredRecords() {
// Get the records which are ignored and last updated is older than 1 month.
$result = $this->dbconnection->select('tmgmt_capi_request_processor', 'tcp')
->fields('tcp', ['rid'])
->condition('tcp.status', self::IGNORED)
->condition('tcp.lastupdated', strtotime('-1 month'), '<')
->execute()
->fetchAll();
// If no records found then return.
if (empty($result)) {
return;
}
// Prepare the array of rid.
$rids = [];
foreach ($result as $row) {
$rids[] = $row->rid;
}
// Delete the records.
$this->dbconnection->delete('tmgmt_capi_request_processor')
->condition('rid', $rids, 'IN')
->execute();
}
/**
* Function to delete unprocessed records from the processor table.
*
* @param string $capi_job_id
* Capi job id.
* @param int $tjid
* Job id.
*/
public function deleteProcessorRecords(string $capi_job_id, int $tjid) {
// Delete the records from processor table.
$this->dbconnection->delete('tmgmt_capi_request_processor')
->condition('jobid', $capi_job_id, '=')
->condition('tjid', $tjid, '=')
->execute();
}
/**
* Function to delete unprocessed records from the message table.
*
* @param int $tjid
* Job id.
*/
public function deleteMessageRecords(int $tjid) {
// Delete the records from message table.
$this->dbconnection->delete('tmgmt_message')
->condition('tjid', $tjid, '=')
->execute();
}
/**
* ROBUSTNESS IMPROVEMENT: Check if job has active upload tracking records.
*
* Checks for jobs that have active file uploads in progress to prevent duplicates.
*
* @param int $tjid
* The job ID to check.
*
* @return int
* Count of active upload records for the job.
*/
public function getActiveUploadCount(int $tjid) {
try {
$active_upload_count = $this->dbconnection->select('tmgmt_capi_request_processor', 'p')
->condition('tjid', $tjid)
->condition('statuscode', ['CREATED', 'SENDING'], 'IN')
->condition('file_upload_status', ['FILE_GENERATING', 'READY_FOR_UPLOAD', 'UPLOADING', 'UPLOADED'], 'IN')
->countQuery()
->execute()
->fetchField();
return (int) $active_upload_count;
}
catch (\Exception $e) {
// If database check fails, return 0 to allow fallback to original logic
$this->loggerFactory->get('TMGMT_CONTENTAPI_DUPLICATE_CHECK')->error(
'Error checking active upload count for job @tjid: @message',
['@tjid' => $tjid, '@message' => $e->getMessage()]
);
return 0;
}
}
/**
* Get list of item IDs that have been successfully uploaded.
*
* @param int $tjid
* The job ID to check.
*
* @return array
* Array of item IDs (tjiid values) that have been successfully uploaded.
*/
public function getAlreadyUploadItems(int $tjid) {
try {
$result = $this->dbconnection->select('tmgmt_capi_request_processor', 'p')
->fields('p', ['tjiid'])
->condition('tjid', $tjid)
->condition('statuscode', ['CREATED', 'SENDING'], 'IN')
->condition('file_upload_status', 'UPLOADED')
->execute()
->fetchCol();
return $result ?: [];
}
catch (\Exception $e) {
// If database check fails, return 0 to allow fallback to original logic
$this->loggerFactory->get('TMGMT_CONTENTAPI_DUPLICATE_CHECK')->error(
'Error checking active upload count for job @tjid: @message',
['@tjid' => $tjid, '@message' => $e->getMessage()]
);
return [];
}
}
}
