lionbridge_translation_provider-8.x-2.4/tmgmt_contentapi/src/Services/CapiDataProcessor.php

tmgmt_contentapi/src/Services/CapiDataProcessor.php
<?php

namespace Drupal\tmgmt_contentapi\Services;

use Drupal\Core\Database\Connection;
use Drupal\Core\Datetime\DrupalDateTime;
use Drupal\Core\Entity\EntityTypeManagerInterface;
use Drupal\Core\Logger\LoggerChannelFactoryInterface;
use Drupal\tmgmt\Entity\Job;
use Drupal\tmgmt\JobInterface;
use Drupal\tmgmt_contentapi\Swagger\Client\Api\JobApi;
use Drupal\tmgmt_contentapi\Swagger\Client\Api\ProviderApi;
use Drupal\tmgmt_contentapi\Swagger\Client\Api\StatusUpdateApi;
use Drupal\tmgmt_contentapi\Swagger\Client\Api\TokenApi;
use Symfony\Component\DependencyInjection\ContainerInterface;
use Drupal\tmgmt_contentapi\Services\QueueOperations;

/**
 * Class CapiDataProcessor.
 *
 * CAPI Data Processor at Drupal End.
 *
 * @category Class
 * @package Drupal\tmgmt_contentapi\Services
 * @author Lionbridge Team
 */
class CapiDataProcessor {
  const IGNORED = 'IGNORED';

  const TO_PROCESS = 'TO_PROCESS';

  const SCANNED = 'SCANNED';

  const IMPORTED = 'IMPORTED';

  const CANCELLED = 'CANCELLED';

  const COMPLETED = 'COMPLETED';

  const CREATED = 'CREATED';

  const IN_QUEUE = 'IN_QUEUE';

   const SENDING = 'SENDING';

  /**
   * Queue name for migration jobs.
   */
  const QUEUE_NAME_MIGRATE_JOBS = 'migrate_jobs_to_new_structure_queue';

  /**
   * The capi token.
   *
   * @var string
   */
  protected $capiToken;

  /**
   * The job.
   *
   * @var \Drupal\tmgmt\JobInterface
   */
  protected $job;

  /**
   * The job api.
   *
   * @var \Drupal\tmgmt_contentapi\Swagger\Client\Api\JobApi
   */
  protected $jobApi;

  /**
   * Token API object.
   *
   * @var \Drupal\tmgmt_contentapi\Swagger\Client\Api\TokenApi
   */
  protected $capi;

  /**
   * The filesystem service.
   *
   * @var \Drupal\Core\Datetime\DrupalDateTime
   */
  protected $drupalDateTime;

  /**
   * The translator object.
   *
   * @var \Drupal\tmgmt\TranslatorInterface
   */
  protected $translator;

  /**
   * Status api object.
   *
   * @var \Drupal\tmgmt_contentapi\Swagger\Client\Api\StatusUpdateApi
   */
  protected $statusApi;

  /**
   * The capi v2 jobs array.
   *
   * @var array
   */
  protected $capiJobs;

  /**
   * The database connection.
   *
   * @var \Drupal\Core\Database\Connection
   */
  protected $dbconnection;

  /**
   * The database connection.
   *
   * @var \Drupal\tmgmt_contentapi\Swagger\Client\Api\ProviderApi
   */
  protected $provider;

  /**
   * Logger Factory.
   *
   * @var \Drupal\Core\Logger\LoggerChannelFactoryInterface
   */
  protected $loggerFactory;

  /**
   * Entity type manager.
   *
   * @var \Drupal\Core\Entity\EntityTypeManagerInterface
   */
  protected $entityTypeManager;

  /**
   * The queue operations.
   *
   * @var \Drupal\tmgmt_contentapi\Services\QueueOperations
   */
  protected $queueOperations;

  /**
   * Constructor.
   *
   * @param \Drupal\Core\Database\Connection $connection
   *   The database connection.
   * @param \Drupal\Core\Logger\LoggerChannelFactoryInterface $logger_factory
   *   The logger factory.
   * @param \Drupal\Core\Entity\EntityTypeManagerInterface $entityTypeManager
   *   The entity type manager.
   * @param \Drupal\tmgmt_contentapi\Services\QueueOperations $queueOperations
   *   Service for queue operations.
   */
  public function __construct(Connection $connection, LoggerChannelFactoryInterface $logger_factory, EntityTypeManagerInterface $entityTypeManager, QueueOperations $queueOperations,) {
    // Injected objects.
    $this->dbconnection = $connection;
    $this->loggerFactory = $logger_factory;
    $this->entityTypeManager = $entityTypeManager;
    $this->queueOperations = $queueOperations;
    // Intantiated objects.
    $this->jobApi = new JobApi();
    $this->capi = new TokenApi();
    $this->statusApi = new StatusUpdateApi();
    $this->provider = new ProviderApi();
    $this->drupalDateTime = new DrupalDateTime('now', new \DateTimeZone('Z'));
  }

  /**
   * Create a new instance of the CapiDataProcessor.
   *
   * @param \Symfony\Component\DependencyInjection\ContainerInterface $container
   *   The container.
   *
   * @return \Drupal\tmgmt_contentapi\Services\CapiDataProcessor
   *   The CapiDataProcessor object.
   */
  public static function create(ContainerInterface $container) {
    return new static(
          $container->get('database'),
          $container->get('logger.factory'),
          $container->get('entity_type.manager')
      );
  }

  /**
   * Function to set the translator.
   */
  protected function getTranslator() {
    // Get all translators.
    $translators = $this->entityTypeManager->getStorage('tmgmt_translator')->loadByProperties(['plugin' => 'contentapi']);
    if (!$translators) {
      return [];
    }
    return $translators['contentapi'];
  }

  /**
   * Function to store data when job is created.
   *
   * @param array $job_details
   *   Job details.
   *
   * @return string
   *   Last inserted id
   */
  public function storeDataWhenJobCreated(array $job_details) {
    try {
      $fields = [
        'tjid' => $job_details['tjid'],
        'tjiid' => $job_details['tjiid'],
        'updateid' => '0',
        'jobid' => $job_details['job_id'],
        'providerid' => $job_details['provider_id'],
        'requestid' => $job_details['request_id'],
        'statuscode' => $job_details['job_status'],
        'updatedtime' => ($job_details['update_time']) ? $job_details['update_time'] : $this->dbconnection->query('SELECT UTC_TIMESTAMP()')->fetchField(),
        'haserror' => 0,
        'errormessage' => '',
        'status' => self::IGNORED,
        'lastupdated' => $this->dbconnection->query('SELECT UTC_TIMESTAMP()')->fetchField(),
      ];

      // Initialize file upload tracking for CREATED statuscode records.
      if ($job_details['job_status'] === self::SENDING) {
        $fields['file_upload_status'] = $job_details['file_upload_status'];
        $fields['file_upload_attempts'] = 0;
      }

      $this->dbconnection->insert('tmgmt_capi_request_processor')
        ->fields($fields)
        ->execute();
      // Get last inserted id.
      return $this->dbconnection->lastInsertId();
    }
    catch (\Exception $e) {
      // Log the error if failed to insert the data.
      $this->loggerFactory->get('tmgmt_contentapi')->error('Error while inserting data into tmgmt_capi_request_processor table: @error', ['@error' => $e->getMessage()]);
    }
  }

  /**
   * Function to process the data from CAPI.
   *
   * @param string $capi_job_id
   *   Capi job id.
   * @param string $tjid
   *   Drupal job id.
   */
  public function scanStatusData(string $capi_job_id = '', string $tjid = '') {
    // If job id and capi_job_id is set then fetch statused only for job related translator.
    if ($capi_job_id != '' && $tjid != '') {
      $job = job::load($tjid);
      $translator = $job->getTranslator();
      $this->capiToken = \DRUPAL::service('tmgmt_contentapi.capi_details')->getCapiToken($translator);
      $this->capiJobs = $this->statusApi->statusupdatesGet($this->capiToken);
    }
    else {
      // If auto cron runs then we can fetch updates using default translator.
      $translators = $this->entityTypeManager->getStorage('tmgmt_translator')->loadByProperties(['plugin' => 'contentapi']);
      $fetched_jobs = [];
      $unique_translators = [];
      // Loop through each unique translator and collect the capisettings data with key as translator id.
      foreach ($translators as $translator) {
        if (in_array($translator->getSetting('capi-settings')['capi_username_ctt'], $unique_translators)) {
          continue;
        }
        $unique_translators[] = $translator->getSetting('capi-settings')['capi_username_ctt'];
        $this->capiToken = \DRUPAL::service('tmgmt_contentapi.capi_details')->getCapiToken($translator);
        // Get statuses for both translators.
        $fetched_jobs[$translator->getOriginalId()] = $this->statusApi->statusupdatesGet($this->capiToken);
      }
      // Get statuses for all translators.
      $this->capiJobs = call_user_func_array('array_merge', array_values($fetched_jobs));
    }

    // Process the data and pass it to the database.
    if (empty($this->capiJobs)) {
      return;
    }
    // Loop through the capiJobs array.
    foreach ($this->capiJobs as $job_info) {
      // If jobid is set and not equal to job_info['job_id'] then continue with next foreach loop.
      // This will help to process only specific job. To avoid overhead when user want to import spcific job.
      if ($capi_job_id != '' && $capi_job_id != $job_info['job_id']) {
        continue;
      }
      // If jobid not exist then continue with next foreach loop.
      if (!$this->isJobIdExist($job_info['job_id'])) {
        continue;
      }
      // Add status in processor.
      $this->addStatusData($job_info);
    }
  }

  /**
   * Add status data into the database.
   *
   * @param array $job_info
   *   Job details for migration.
   */
  protected function addStatusData($job_info) {
    // Check if the jobinfo is empty.
    if (empty($job_info)) {
      return;
    }
    // Insert the data into the database.
    // Is job item exist in this DB.
    $job_item = FALSE;
    try {
      foreach ($job_info['request_ids'] as $request_id) {
        // Get tjiid from tmgmt_capi_request_processor table on the basis of request id.
        // Raw query for reference.
        // SELECT tjid, tjiid, providerid FROM tmgmt_capi_request_processor WHERE statuscode = 'CREATED' AND requestid = '1234' AND jobid = '1234';.
        $result = $this->dbconnection->select('tmgmt_capi_request_processor', 'tcp')
          ->fields('tcp', ['tjid', 'tjiid', 'providerid'])
          ->condition('tcp.statuscode', self::CREATED)
          ->condition('tcp.requestid', $request_id)
          ->condition('tcp.jobid', $job_info['job_id'])
          ->execute()
          ->fetchAll();

        // TODO: Check if this can be directly added via mysql query, rather than fetching and then inserting.
        // Insert the data into the database.
        foreach ($result as $tmgmtids) {
          $job_item = TRUE;
          $this->insertRequestRecords($tmgmtids, $request_id, $job_info);
        }
      }
      try {
        // Aknowledge the update id once all details are stored in the database.
        if ($job_item) {
          $this->loggerFactory->get('status scanner')->notice(
                'Status scanner - Job: @jobid, UpdateID: @update_id, RequestIDs: @request_ids',
                [
                  '@jobid' => $job_info['job_id'],
                  '@update_id' => $job_info['update_id'],
                  '@request_ids' => json_encode($job_info['request_ids']),
                ]
            );
          $job = JOB::load($tmgmtids->tjid);
          $translator = $job->getTranslator();
          $this->capiToken = \DRUPAL::service('tmgmt_contentapi.capi_details')->getCapiToken($translator);
          $this->statusApi->statusupdatesUpdateIdAcknowledgePut($this->capiToken, $job_info['update_id']);
        }
      }
      catch (\Exception $e) {
        // Log the error if failed to update the data.
        $this->loggerFactory->get('TMGMT_CONTENTAPI')->error('Error while status aknowledgement : @error', ['@error' => $e->getMessage()]);
      }
    }
    catch (\Exception $e) {
      // Do not aknowledge the update id if failed to store in db.
      $this->loggerFactory->get('tmgmt_contentapi')->error('Error while inserting data into tmgmt_capi_request_processor table: @error', ['@error' => $e->getMessage()]);
    }
  }

  /**
   * Function to update the status of request id in the database.
   */
  public function updateRequestStatus() {
    // Update the status of the request id in the database.
    // This query intentionally keep raw to avoid any issues.
    // This query will make sure only latest record is set to TO_PROCESS.
    $sql = "
      UPDATE tmgmt_capi_request_processor AS t1
      INNER JOIN (
          SELECT tjid, jobid, statuscode, requestid, COUNT(*) AS num_records, MAX(updatedtime) AS max_updatedtime
          FROM tmgmt_capi_request_processor
          WHERE status = :scanned
          GROUP BY tjid, jobid, statuscode, requestid
      ) AS t2
      ON t1.jobid = t2.jobid AND t1.statuscode = t2.statuscode AND t1.requestid = t2.requestid
      SET t1.status = CASE
          WHEN t1.statuscode IN ('CREATED', 'SENDING', 'SENT_TO_PROVIDER', 'IN_TRANSLATION', 'CANCELLED') THEN :ignored
          WHEN t1.updatedtime < t2.max_updatedtime THEN :ignored
          ELSE :to_process
      END;
    ";
    $this->dbconnection->query($sql, [
      ':ignored' => self::IGNORED,
      ':to_process' => self::TO_PROCESS,
      ':scanned' => self::SCANNED,
    ]);
  }

  /**
   * Add migration data into the database.
   *
   * @param array $job_info
   *   Job details for migration.
   */
  protected function addMigrationData($job_info) {
    // Check if the jobinfo is empty.
    if (empty($job_info)) {
      return;
    }
    // Insert the data into the database.
    try {
      foreach ($job_info['request_ids'] as $request_id) {
        // Get tjiid from tmgmt_capi_request_processor table on the basis of request id.
        // Raw query for reference.
        // SELECT tjid, tjiid, providerid FROM tmgmt_capi_request_processor WHERE statuscode = 'CREATED' AND requestid = '1234' AND jobid = '1234';
        // Ger current state of tjiid from tmgmt_job table.
        // SELECT tjiid, status FROM tmgmt_job WHERE tjid = '1234';.
        $result = $this->dbconnection->select('tmgmt_capi_request_processor', 'tcp')
          ->fields('tcp', ['tjid', 'tjiid', 'providerid'])
          ->condition('tcp.statuscode', self::CREATED)
          ->condition('tcp.requestid', $request_id)
          ->condition('tcp.jobid', $job_info['job_id']);

        $result->innerJoin('tmgmt_job_item', 'tji', 'tcp.tjiid = tji.tjiid');
        $result->addField('tji', 'state');
        $result = $result->execute()->fetchAll();

        // TODO: Check if this can be directly added via mysql query, rather than fetching and then inserting.
        // Insert the data into the database.
        foreach ($result as $tmgmtids) {
          $this->insertRequestRecords($tmgmtids, $request_id, $job_info, TRUE);
        }
      }
    }
    catch (\Exception $e) {
      // Do not aknowledge the update id if failed to store in db.
      $this->loggerFactory->get('tmgmt_contentapi')->error('Error while inserting data into tmgmt_capi_request_processor table: @error', ['@error' => $e->getMessage()]);
    }
  }

  /**
   * Function to insert the request records.
   */
  public function insertRequestRecords($tmgmtids, $request_id, $job_info, $ismigration = FALSE, $status = self::SCANNED) {
    // Set status during migration else it will be default status scanned.
    $lastupdated = $this->dbconnection->query('SELECT UTC_TIMESTAMP()')->fetchField();
    if (isset($tmgmtids->state) && $ismigration) {
      if ($tmgmtids->state == 1) {
        $status = self::TO_PROCESS;
      }
      elseif ($tmgmtids->state == 2) {
        $status = self::IMPORTED;
      }
      elseif ($tmgmtids->state == 3) {
        $status = self::COMPLETED;
      }
      if (in_array($job_info['status_code'],
      [
        'CREATED',
        'SENDING',
        'SENT_TO_PROVIDER',
        'IN_TRANSLATION',
        'CANCELLED',
      ])) {
        $status = self::IGNORED;
      }
      // Avoids using identical timestamps for 'Created' and other statuses during migration
      // By adding 1 minute to the last updated time to prevent status ambiguity.
      if ($job_info['status_code'] != 'CREATED') {
        $lastupdated = $this->dbconnection->query('SELECT DATE_ADD(UTC_TIMESTAMP(), INTERVAL 1 MINUTE)')->fetchField();
      }
    }

    // Insert the data into the database.
    $this->dbconnection->insert('tmgmt_capi_request_processor')
      ->fields([
        'tjid' => $tmgmtids->tjid,
        'tjiid' => $tmgmtids->tjiid,
        'updateid' => $job_info['update_id'],
        'jobid' => $job_info['job_id'],
        'providerid' => $tmgmtids->providerid,
        'requestid' => $request_id,
        'statuscode' => $job_info['status_code'],
        'updatedtime' => $job_info['update_time']->format('Y-m-d H:i:s.u'),
        'haserror' => ($job_info['has_error']) ? 1 : 0,
        'errormessage' => !empty($job_info['error_message']) ? substr($job_info['error_message'], 0, 200) : '',
        'status' => $status,
        'lastupdated' => $lastupdated,
      ])
      ->execute();
  }

  /**
   * Function to set the request as processed.
   */
  public function setRequestProgressStatus($rid, $progressStatus) {

    try {
      // Get a requestid from rid, check if some more records exist with same requestid then update all of them.
      $requestid = $this->dbconnection->select('tmgmt_capi_request_processor', 'sub')
        ->fields('sub', ['requestid'])
        ->condition('sub.rid', $rid)
        ->execute()
        ->fetchField();

      $this->dbconnection->update('tmgmt_capi_request_processor')
        ->fields([
          'status' => $progressStatus,
          'lastupdated' => $this->dbconnection->query('SELECT UTC_TIMESTAMP()')->fetchField(),
        ])
        ->condition('requestid', $requestid, '=')
        ->condition('status', [self::IN_QUEUE, self::TO_PROCESS], 'IN')
        ->execute();
    }
    catch (\Exception $e) {
      // Log the error if failed to update the data.
      $this->loggerFactory->get('tmgmt_contentapi')->error('Error while updating data in tmgmt_capi_request_processor table: @error', ['@error' => $e->getMessage()]);
    }
  }

  /**
   * Function to update the status of tjiid/rid in the database.
   */
  public function setRequestProgressStatusUsingItemId($tjiid, $progressStatus) {
    try {
      // Get rid from tjiid and status IMPORTED.
      $rid = $this->dbconnection->select('tmgmt_capi_request_processor', 'sub')
        ->fields('sub', ['rid'])
        ->condition('sub.tjiid', $tjiid)
        ->condition('sub.status', self::IMPORTED)
        ->execute()
        ->fetchField();
      $this->dbconnection->update('tmgmt_capi_request_processor')
        ->fields([
          'status' => $progressStatus,
          'lastupdated' => $this->dbconnection->query('SELECT UTC_TIMESTAMP()')->fetchField(),
        ])
        ->condition('rid', $rid, '=')
        ->execute();
    }
    catch (\Exception $e) {
      // Log the error if failed to update the data.
      $this->loggerFactory->get('tmgmt_contentapi')->error('Error while updating data in tmgmt_capi_request_processor table: @error', ['@error' => $e->getMessage()]);
    }
  }

  /**
   * Function to get all request id which are ready to import.
   */
  public function getAllReadyRequestIdToImport($tjid = '') {
    // Get the data from the database which set to TO_PRCESS.
    // Raw Query for reference
    // SELECT * FROM tmgmt_capi_request_processor WHERE status = 'TO_PROCESS' ORDER BY providerid ASC;.
    $query = $this->dbconnection->select('tmgmt_capi_request_processor', 't1');
    $query->fields('t1', [
      'rid',
      'tjid',
      'updateid',
      'jobid',
      'providerid',
      'requestid',
      'statuscode',
      'status'
    ]);

    // Create a subquery to find the first rid for each requestid.
    $subquery = $this->dbconnection->select('tmgmt_capi_request_processor', 't2');
    $subquery->addField('t2', 'requestid');
    $subquery->addExpression('MIN(rid)', 'min_rid');
    $subquery->condition('t2.status', [self::TO_PROCESS, self::IN_QUEUE], 'IN');

    if (!empty($tjid)) {
      $subquery->condition('t2.tjid', $tjid);
    }
    $subquery->groupBy('t2.requestid');

    // Join with subquery to get only one row per requestid.
    $query->join($subquery, 'subq', 't1.rid = subq.min_rid');

    // Add original conditions to main query.
    $group = $query->orConditionGroup()
      ->condition('t1.status', self::TO_PROCESS)
      ->condition('t1.status', self::IN_QUEUE);
    $query->condition($group);

    if (!empty($tjid)) {
      $query->condition('t1.tjid', $tjid);
    }

    // Add sorting.
    $query->orderBy('t1.providerid', 'ASC');
    $query->orderBy('t1.tjid', 'ASC');

    $result = $query->execute()->fetchAll();
    return $result;
  }

  /**
   * Function to get latest status of a job from DB.
   */
  public function getGlobalColumnInfoForJobOverview($tjid, $is_single_job = FALSE) {
    $static_job_capi_details = &drupal_static(__FUNCTION__, []);
    // Check if job details already exist in static cache.
    if (!empty($static_job_capi_details)) {
      // Check if key $job_id exist else return empty array.
      if (isset($static_job_capi_details[$tjid])) {
        return $static_job_capi_details[$tjid];
      }
      else {
        return [];
      }
    }
    if ($is_single_job) {
      $job_ids = [$tjid];
    }
    else {
      // Get all job ids from view.
      $job_ids = tmgmt_contentapi_get_view_entity_ids('tmgmt_job_overview');
    }
    // Prepare placeholders for query.
    foreach ($job_ids as $index => $job_id) {
      $placeholder = ":job_id_$index";
      $job_id_placeholders[] = $placeholder;
      $query_arguments[$placeholder] = $job_id;
    }
    // Build the query.
    $query = "
      SELECT 
    tjid,
    jobid,
    providerid,
    CASE 
        WHEN COUNT(DISTINCT tjiid) = SUM(CASE WHEN (statuscode = 'REVIEW_TRANSLATION' OR statuscode = 'TRANSLATION_APPROVED') AND status = :completed THEN 1 ELSE 0 END) THEN :completed
        WHEN COUNT(DISTINCT tjiid) = SUM(CASE WHEN (statuscode = 'REVIEW_TRANSLATION' OR statuscode = 'TRANSLATION_APPROVED') AND status = :imported THEN 1 ELSE 0 END) THEN :imported
        WHEN COUNT(DISTINCT tjiid) = SUM(CASE WHEN statuscode = 'COMPLETED' AND status = :completed THEN 1 ELSE 0 END) THEN :completed
        WHEN SUM(CASE WHEN statuscode = 'IN_TRANSLATION' THEN 1 ELSE 0 END) > 0 THEN 'IN_TRANSLATION'
        WHEN SUM(CASE WHEN statuscode = 'SENDING' THEN 1 ELSE 0 END) > 0 THEN 'SENDING'
        WHEN SUM(CASE WHEN statuscode = 'SENT_TO_PROVIDER' THEN 1 ELSE 0 END) > 0 THEN 'SENT_TO_PROVIDER'
        WHEN SUM(CASE WHEN statuscode = 'CANCELLED' THEN 1 ELSE 0 END) > 0 THEN :cancelled
        WHEN SUM(CASE WHEN statuscode = 'REVIEW_TRANSLATION' THEN 1 ELSE 0 END) > 0
            AND SUM(CASE WHEN status = :to_process THEN 1 ELSE 0 END) > 0
            OR SUM(CASE WHEN status = :scanned THEN 1 ELSE 0 END) > 0
            OR SUM(CASE WHEN status = :inqueue THEN 1 ELSE 0 END) > 0
            OR SUM(CASE WHEN status = :imported THEN 1 ELSE 0 END) > 0 THEN 'REVIEW_TRANSLATION'
        WHEN SUM(CASE WHEN statuscode = 'COMPLETE_CLEANUP_FAILED' AND haserror = 1 THEN 1 ELSE 0 END) > 0 THEN 'FAILED'
        ELSE :created
    END AS overall_job_status
FROM (
    SELECT 
        tjid, 
        tjiid, 
        jobid, 
        providerid, 
        statuscode, 
        status,
        haserror
    FROM (
        SELECT 
            t.*,
            @row_number := IF(@prev_tjiid = tjiid, @row_number + 1, 1) AS rn,
            @prev_tjiid := tjiid
        FROM tmgmt_capi_request_processor t
        JOIN (SELECT @row_number := 0, @prev_tjiid := NULL) AS vars
        ORDER BY tjiid,
                 CASE 
                   WHEN status IN ('COMPLETED', 'IMPORTED', 'TO_PROCESS', 'IN_QUEUE', 'SCANNED') THEN 0
                   WHEN status = 'IGNORED' THEN 1
                   ELSE 2
                 END ASC,
                 COALESCE(updatedtime, '1970-01-01 00:00:00.000000') DESC,
                 COALESCE(lastupdated, '1970-01-01 00:00:00') DESC,
                 rid DESC
    ) sub
    WHERE rn = 1
) filtered_updates
WHERE tjid IN (" . implode(", ", $job_id_placeholders) . ")
GROUP BY tjid, jobid, providerid;
";

    $query_arguments += [
      ':completed' => self::COMPLETED,
      ':imported' => self::IMPORTED,
      ':cancelled' => self::CANCELLED,
      ':to_process' => self::TO_PROCESS,
      ':scanned' => self::SCANNED,
      ':inqueue' => self::IN_QUEUE,
      ':created' => self::CREATED,
    ];

    $result = $this->dbconnection->query($query, $query_arguments)->fetchAll();
    foreach ($result as $row) {
      if (in_array($row->tjid, $job_ids)) {
        $static_job_capi_details[$row->tjid] = [
          'jobid' => $row->jobid,
          'providerid' => $row->providerid,
          'overall_job_status' => $row->overall_job_status,
        ];
      }
      else {
        $static_job_capi_details[$row->tjid] = [];
      }
    }
    if (isset($static_job_capi_details[$tjid])) {
      return $static_job_capi_details[$tjid];
    }
    return [];
  }

  /**
   * Function to store job details in db.
   *
   * @param \Drupal\tmgmt\JobInterface $job
   *   Job under process.
   * @param string $provider_id
   *   Provider id.
   *
   * @return array
   *   Request ids.
   */
  public function jobToinsertInProcessor(JobInterface $job, string $provider_id) {
    // Get all items of job.
    $job_items = $job->getItems();

    // Get capi related remote settings.
    $jobcpsettings = unserialize($job->getSetting('capi-remote'));
    // Prepare and store data.
    foreach ($jobcpsettings as $key => $value) {
      $native_id = explode('_', $value[0]->getSourceNativeId());
      $job_details = [
        'tjid' => $native_id[0],
        'tjiid' => $native_id[1],
        'job_id' => $value[0]->getJobId(),
        'request_id' => $value[0]->getRequestId(),
        'job_status' => $value[0]->getStatusCode(),
        'provider_id' => $provider_id,
        'update_time' => $value[0]->getCreatedDate()->format("Y-m-d H:i:s.u"),
      ];
      // Create a request id array for further processing.
      $request_ids[] = $value[0]->getRequestId();
      // Manage single export file. Add each item id with same request id.
      if ($native_id[1] == 'all') {
        foreach ($job_items as $item_id => $value) {
          // Store each item id with same request id.
          $job_details['tjiid'] = $item_id;
          $this->storeDataWhenJobCreated($job_details);
        }
      }
      else {
        $this->storeDataWhenJobCreated($job_details);
      }
    }
    return $request_ids;
  }

  /**
   * Function to add jobs into the queue for migration to new structure.
   */
  public function addJobsToQueueForMigration() {
    // STEP 1: Fetch all jobs for migration
    // Get all jobs which are exist in the tmgmt_capi_request_processor table.
    $subquery = $this->dbconnection->select('tmgmt_capi_request_processor', 'tcp')
      ->fields('tcp', ['tjid']);

    // Build the main query. Ignore the jobs which are already present in the tmgmt_capi_request_processor table.
    $query = $this->dbconnection->select('tmgmt_job', 'tj')
      ->fields('tj', ['tjid'])
      ->condition('tjid', $subquery, 'NOT IN')
      ->condition('translator', 'contentapi');
    $result = $query->execute()->fetchAll();

    // If $result is empty then return.
    if (empty($result)) {
      return;
    }
    // Insert job id into the queue.
    foreach ($result as $row) {
      \Drupal::service('queue')->get(self::QUEUE_NAME_MIGRATE_JOBS)->createItem($row->tjid);
    }
    // Process queue.
    $this->queueOperations->processQueue(self::QUEUE_NAME_MIGRATE_JOBS);

  }

  /**
   * Function to migrate jobs which are exist before version 9.x.
   */
  public function migrateExistingJob(int $tjid) {
    try {
      // Get provider details
      // Get stored CAPI job latest status with details stored in DB.
      $tmgmt_message = \Drupal::database()->select('tmgmt_message', 'c');
      $tmgmt_message->fields('c', ['message']);
      $tmgmt_message->condition('c.type', 'jobinfo', '=');
      $tmgmt_message->condition('c.tjid', $tjid);
      $qryResult2 = $tmgmt_message->execute()->fetchAll();
      // If no record exist then continue with next tjid.
      if (empty($qryResult2)) {
        return;
      }
      // STEP 2: Get perticular job relate information.
      // Load job.
      $job = job::load($tjid);
      // Prepare details to insert job into the job processor table.
      if (count($qryResult2) > 0) {
        $result = array_reverse($qryResult2);
        $job_info_tmgmt = json_decode(array_pop($result)->message);
      }
      // STEP 3: Create a structure to insert data into the tmgmt_capi_request_processor table.
      $request_ids = $this->jobToinsertInProcessor($job, $job_info_tmgmt->providerId);
      // STEP 4: Fetch Latest status of job and insert same in the processor.
      // Add latest status entry for each request id to keep job in its latest status.
      // Mean if jobs status not CREATED then add latest status to the processor.
      if ($job_info_tmgmt->jobStatus != self::CREATED) {
        $updateTime = new DrupalDateTime(substr($job_info_tmgmt->updateTime, 0, -7), new \DateTimeZone('Z'));
        $job_info = [
          'update_id' => 0,
          'job_id' => $job_info_tmgmt->jobId,
          'status_code' => $job_info_tmgmt->jobStatus,
          'update_time' => $updateTime,
          'has_error' => 0,
          'error_message' => '',
          'request_ids' => $request_ids,
        ];
        $this->addMigrationData($job_info);
      }
      $this->loggerFactory->get('tmgmt_contentapi')->notice('Jobs id @job migrated successfully', ['@job' => $tjid]);

    }
    catch (\Exception $e) {
      // Log the error if failed to migrate the job.
      $this->loggerFactory->get('tmgmt_contentapi')->error(
            'Error while migrating jobid: @tjid - Error: @error',
            [
              '@tjid' => $tjid,
              '@error' => $e->getMessage(),
            ]
              );
      return;
    }
    // STEP 5: Delete the job from the tmgmt_message table.
    // Skip this step if job is not migrated successfully.
    // Delete the job from the tmgmt_message table.
    $this->dbconnection->delete('tmgmt_message')
      ->condition('tjid', $tjid)
      ->condition('type', 'jobinfo')
      ->execute();
  }

  /**
   * Function to check if jobid belongs to this environment.
   *
   * If not then return false.
   *
   * @param string $job_id
   *   Job id.
   *
   * @return bool
   *   Return true if job id exist.
   */
  public function isJobIdExist($job_id) {
    $query = $this->dbconnection->select('tmgmt_capi_request_processor', 'tcp');
    $query->fields('tcp', ['jobid']);
    $query->condition('jobid', $job_id);
    $result = $query->execute()->fetchField();
    // If job id exist then return true.
    if ($result) {
      return TRUE;
    }
    return FALSE;
  }

  /**
   * Get information as per the request id.
   *
   * @param string $request_id
   *   Request id.
   * @param string $job_id
   *   Capi job id.
   *
   * @return array
   *   return result object.
   */
  public function getRequestDetailFromProcessor($request_id, $job_id) {
    $result = $this->dbconnection->select('tmgmt_capi_request_processor', 'tcp')
      ->fields('tcp', ['tjid', 'tjiid', 'providerid'])
      ->condition('tcp.statuscode', self::CREATED)
      ->condition('tcp.requestid', $request_id)
      ->condition('tcp.jobid', $job_id)
      ->execute()
      ->fetchAll();
    return $result;
  }

  /**
   * Summary of updateRequestStatusToIgnoreForSaifFail.
   *
   * @param int $tjid
   *   Job id.
   * @param int $tjiid
   *   Item id.
   * @param string $requestid
   *   Request Id.
   * @param string $progressStatus
   *   New status to set.
   */
  public function updatePreviousRequestStatusToNew($tjid, $tjiid, $requestid, $progressStatus) {
    try {
      $fields = [
        'status' => $progressStatus,
      ];
     
      $this->dbconnection->update('tmgmt_capi_request_processor')
        ->fields($fields)
        ->condition('requestid', $requestid, '=')
        ->condition('tjid', $tjid, '=')
        ->condition('tjiid', $tjiid, '=')
        ->condition('status', [self::TO_PROCESS, self::IN_QUEUE], 'IN')
        ->execute();
    }
    catch (\Exception $e) {
      // Log the error if failed to update the data.
      $this->loggerFactory->get('tmgmt_contentapi')->error('Error while updating data in tmgmt_capi_request_processor table: @error', ['@error' => $e->getMessage()]);
    }
  }

  /**
   * Summary of getCapiProcessorDetailsBasisOftjid.
   *
   * @param array $tjiid_array
   *   Job id.
   *
   * @return array
   *   return result object.
   */
  public function getCapiProcessorDetailsBasisOftjid(array $tjiid_array) {
    $result = $this->dbconnection->select('tmgmt_capi_request_processor', 'tcp')
      ->fields('tcp', ['tjid', 'tjiid', 'jobid', 'providerid', 'requestid', 'statuscode', 'status'])
      ->condition('tcp.tjiid', $tjiid_array, 'IN')
      ->condition('tcp.statuscode', self::CREATED)
      ->execute()
      ->fetchAll();
    return array_column($result, NULL, 'tjiid');
  }

  /**
   * Function to set item status to in_queue.
   *
   * @param int $rid
   *   Request id.
   * @param string $status
   *   Status to set.
   */
  public function setRequestItemInQueueStatus(int $rid, $status = self::IN_QUEUE) {
    // Update query to set status to IN_QUEUE.
    $this->dbconnection->update('tmgmt_capi_request_processor')
      ->fields([
        'status' => $status,
        'lastupdated' => $this->dbconnection->query('SELECT UTC_TIMESTAMP()')->fetchField(),
      ])
      ->condition('rid', $rid, '=')
      ->execute();
  }

  /**
   * Function to set item status to in_queue.
   *
   * @param string $update_id
   *   Update id.
   * @param string $status
   *   Status to set.
   */
  public function setRequestItemInQueueStatusAsPerUpdateId(string $update_id, string $status = self::IN_QUEUE) {
    // Update query to set status to IN_QUEUE.
    $this->dbconnection->update('tmgmt_capi_request_processor')
      ->fields([
        'status' => $status,
        'lastupdated' => $this->dbconnection->query('SELECT UTC_TIMESTAMP()')->fetchField(),
      ])
      ->condition('updateid', $update_id, '=')
      ->condition('status', [self::TO_PROCESS, self::IN_QUEUE], 'IN')
      ->execute();
  }

  /**
   * Function delete the records whose status is ignored and last updated is older than 1 month.
   *
   * This will help to keep the table clean.
   */
  public function deleteIgnoredRecords() {
    // Get the records which are ignored and last updated is older than 1 month.
    $result = $this->dbconnection->select('tmgmt_capi_request_processor', 'tcp')
      ->fields('tcp', ['rid'])
      ->condition('tcp.status', self::IGNORED)
      ->condition('tcp.lastupdated', strtotime('-1 month'), '<')
      ->execute()
      ->fetchAll();
    // If no records found then return.
    if (empty($result)) {
      return;
    }
    // Prepare the array of rid.
    $rids = [];
    foreach ($result as $row) {
      $rids[] = $row->rid;
    }
    // Delete the records.
    $this->dbconnection->delete('tmgmt_capi_request_processor')
      ->condition('rid', $rids, 'IN')
      ->execute();
  }

  /**
   * Function to delete unprocessed records from the processor table.
   *
   * @param string $capi_job_id
   *   Capi job id.
   * @param int $tjid
   *   Job id.
   */
  public function deleteProcessorRecords(string $capi_job_id, int $tjid) {
    // Delete the records from processor table.
    $this->dbconnection->delete('tmgmt_capi_request_processor')
      ->condition('jobid', $capi_job_id, '=')
      ->condition('tjid', $tjid, '=')
      ->execute();
  }

  /**
   * Function to delete unprocessed records from the message table.
   *
   * @param int $tjid
   *   Job id.
   */
  public function deleteMessageRecords(int $tjid) {
    // Delete the records from message table.
    $this->dbconnection->delete('tmgmt_message')
      ->condition('tjid', $tjid, '=')
      ->execute();
  }

  /**
   * ROBUSTNESS IMPROVEMENT: Check if job has active upload tracking records.
   * 
   * Checks for jobs that have active file uploads in progress to prevent duplicates.
   *
   * @param int $tjid
   *   The job ID to check.
   *
   * @return int
   *   Count of active upload records for the job.
   */
  public function getActiveUploadCount(int $tjid) {
    try {
      $active_upload_count = $this->dbconnection->select('tmgmt_capi_request_processor', 'p')
        ->condition('tjid', $tjid)
        ->condition('statuscode', ['CREATED', 'SENDING'], 'IN')
        ->condition('file_upload_status', ['FILE_GENERATING', 'READY_FOR_UPLOAD', 'UPLOADING', 'UPLOADED'], 'IN')
        ->countQuery()
        ->execute()
        ->fetchField();

      return (int) $active_upload_count;
    }
    catch (\Exception $e) {
      // If database check fails, return 0 to allow fallback to original logic
      $this->loggerFactory->get('TMGMT_CONTENTAPI_DUPLICATE_CHECK')->error(
        'Error checking active upload count for job @tjid: @message',
        ['@tjid' => $tjid, '@message' => $e->getMessage()]
      );
      return 0;
    }
  }

  /**
   * Get list of item IDs that have been successfully uploaded.
   * 
   * @param int $tjid
   *   The job ID to check.
   *
   * @return array
   *   Array of item IDs (tjiid values) that have been successfully uploaded.
   */
  public function getAlreadyUploadItems(int $tjid) {
    try {
      $result = $this->dbconnection->select('tmgmt_capi_request_processor', 'p')
        ->fields('p', ['tjiid'])
        ->condition('tjid', $tjid)
        ->condition('statuscode', ['CREATED', 'SENDING'], 'IN')
        ->condition('file_upload_status', 'UPLOADED')
        ->execute()
        ->fetchCol();

      return $result ?: [];
    }
    catch (\Exception $e) {
      // If database check fails, return 0 to allow fallback to original logic
      $this->loggerFactory->get('TMGMT_CONTENTAPI_DUPLICATE_CHECK')->error(
        'Error checking active upload count for job @tjid: @message',
        ['@tjid' => $tjid, '@message' => $e->getMessage()]
      );
      return [];
    }
  }

}

Главная | Обратная связь

drupal hosting | друпал хостинг | it patrol .inc