coveo-1.0.0-alpha1/src/Coveo/Index.php

src/Coveo/Index.php
<?php

declare(strict_types=1);

namespace Drupal\coveo\Coveo;

use Drupal\Core\Utility\Error as DrupalError;
use Drupal\coveo\API\Model\BatchDocumentBody;
use Drupal\coveo\Event\CoveoBatchAlter;
use GuzzleHttp\Client as GuzzleClient;
use GuzzleHttp\Exception\ConnectException;
use NecLimDul\Coveo\PushApi\Api\FileContainerApi;
use NecLimDul\Coveo\PushApi\Api\ItemApi;
use Neclimdul\OpenapiPhp\Helper\Logging\Error;
use Neclimdul\SplFileObjectWrapper\SplFileObjectStream;
use Psr\Log\LogLevel;
use Psr\Log\LoggerInterface;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;

// cspell:ignore connecttimeout

/**
 * Contains all the functions related to one index.
 */
class Index {

  public function __construct(
    private readonly string $organizationId,
    private readonly GuzzleClient $client,
    private readonly FileContainerApi $fileContainerApi,
    private readonly ItemApi $itemApi,
    private readonly LoggerInterface $logger,
    private readonly EventDispatcherInterface $eventDispatcher,
  ) {
  }

  /**
   * Override the content of several objects.
   *
   * Note: Each object must contain an objectID attribute.
   *
   * @param string $source_id
   *   Push source ID.
   * @param \Drupal\coveo\DocumentBody[] $objects
   *   Contains an array of objects to update.
   */
  public function addOrUpdate(string $source_id, array $objects): void {
    $batch = new BatchDocumentBody();
    $batch->setAddOrUpdate($objects);
    $this->batch(
      $source_id,
      $batch,
    );
  }

  /**
   * Send batch updates through the file/aws batch system.
   *
   * See README for more information on how this logic works.
   *
   * @param string $source_id
   *   Push source ID.
   * @param \Drupal\coveo\API\Model\BatchDocumentBody $batch
   *   Batch operation to write into AWS storage.
   *
   * @throws \GuzzleHttp\Exception\GuzzleException
   * @throws \Neclimdul\OpenapiPhp\Helper\Exception\ApiSerializationException
   *
   * @see https://docs.coveo.com/en/90/index-content/manage-batches-of-items-in-a-push-source
   */
  public function batch(
    string $source_id,
    BatchDocumentBody $batch,
  ): void {
    // One of these should be populated, or it's going to break when it gets to
    // coveo.
    assert($batch->valid());

    // Preprocess binary file fields.
    $this->preprocessBinaryFields($batch->getAddOrUpdate());

    $this->eventDispatcher->dispatch(new CoveoBatchAlter(
      $batch,
      $this,
    ));

    // Upload main batch.
    $file_id = $this->uploadToAwsContainer(json_encode($batch));

    // @todo better way of making sure aws file exists or batching this process.
    sleep(3);

    // Set container file id to the document batch.
    $item_response = $this->itemApi->organizationsOrganizationIdSourcesSourceIdDocumentsBatchPut(
      $source_id,
      $file_id,
      $this->organizationId,
    );
    if (!$item_response->isSuccess()) {
      Error::logError($this->logger, $item_response);
      throw new \Exception('Error submitting batched AWS data to Item API.');
    }
  }

  /**
   * Prepare binary fields for batch operations.
   *
   * @param \Drupal\coveo\DocumentBody[] $documents
   *   Documents getting ready to be indexed.
   *
   * @throws \GuzzleHttp\Exception\GuzzleException
   * @throws \Neclimdul\OpenapiPhp\Helper\Exception\ApiSerializationException
   *
   * @see https://docs.coveo.com/en/73/index-content/push-source-item-data
   *   Documents tradeoffs of various data field types.
   */
  private function preprocessBinaryFields(array $documents): void {
    foreach ($documents as $document) {
      if (isset($document->binaryFile)) {
        $file_id = $this->uploadToAwsContainer(
          new SplFileObjectStream($document->binaryFile)
        );
        $document->setCompressedBinaryDataFileId($file_id);
        $document->setAdditionalProperty('compressionType', 'UNCOMPRESSED');
        $document->setFileExtension($document->binaryFile->getExtension());
      }
    }
  }

  /**
   * Helper method to push an arbitrary file into AWS and get a Coveo reference.
   *
   * @param string|resource|\Psr\Http\Message\StreamInterface $body
   *   File contents.
   *
   * @return string
   *   Coveo reference fileId.
   *
   * @throws \GuzzleHttp\Exception\GuzzleException
   * @throws \Neclimdul\OpenapiPhp\Helper\Exception\ApiSerializationException
   */
  private function uploadToAwsContainer($body): string {
    $response = $this->fileContainerApi->organizationsOrganizationIdFilesPost(
      $this->organizationId,
    );

    if (!$response->isSuccess()) {
      throw new \Exception('Failed to create post location');
    }
    /** @var \NecLimDul\Coveo\PushApi\Model\PresignedUploadUrl $step1 */
    $step1 = $response->getData();

    // Step 2. Write into AWS file container.
    // Sometimes aws has trouble connecting on the first attempt so retry and
    // use CURLOPT_CONNECTTIMEOUT to extend the connection time.
    for ($i = 0; $i <= 3; $i++) {
      try {
        $aws_response = $this->client->put(
          $step1->getUploadUri(),
          [
            'headers' => $step1->getRequiredHeaders(),
            'body' => $body,
            'curl' => [
              CURLOPT_CONNECTTIMEOUT => 5,
            ],
          ],
        );
        // Success, break out of the loop.
        break;
      }
      catch (ConnectException $e) {
        if ($i < 3) {
          DrupalError::logException(
            $this->logger,
            $e,
            level: LogLevel::WARNING,
          );
          sleep(1);
        }
        else {
          // Give up and report the error.
          throw $e;
        }
      }
    }
    return $step1->getFileId();
  }

}

Главная | Обратная связь

drupal hosting | друпал хостинг | it patrol .inc