coveo-1.0.0-alpha1/src/Coveo/Index.php
src/Coveo/Index.php
<?php
declare(strict_types=1);
namespace Drupal\coveo\Coveo;
use Drupal\Core\Utility\Error as DrupalError;
use Drupal\coveo\API\Model\BatchDocumentBody;
use Drupal\coveo\Event\CoveoBatchAlter;
use GuzzleHttp\Client as GuzzleClient;
use GuzzleHttp\Exception\ConnectException;
use NecLimDul\Coveo\PushApi\Api\FileContainerApi;
use NecLimDul\Coveo\PushApi\Api\ItemApi;
use Neclimdul\OpenapiPhp\Helper\Logging\Error;
use Neclimdul\SplFileObjectWrapper\SplFileObjectStream;
use Psr\Log\LogLevel;
use Psr\Log\LoggerInterface;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
// cspell:ignore connecttimeout
/**
* Contains all the functions related to one index.
*/
class Index {
public function __construct(
private readonly string $organizationId,
private readonly GuzzleClient $client,
private readonly FileContainerApi $fileContainerApi,
private readonly ItemApi $itemApi,
private readonly LoggerInterface $logger,
private readonly EventDispatcherInterface $eventDispatcher,
) {
}
/**
* Override the content of several objects.
*
* Note: Each object must contain an objectID attribute.
*
* @param string $source_id
* Push source ID.
* @param \Drupal\coveo\DocumentBody[] $objects
* Contains an array of objects to update.
*/
public function addOrUpdate(string $source_id, array $objects): void {
$batch = new BatchDocumentBody();
$batch->setAddOrUpdate($objects);
$this->batch(
$source_id,
$batch,
);
}
/**
* Send batch updates through the file/aws batch system.
*
* See README for more information on how this logic works.
*
* @param string $source_id
* Push source ID.
* @param \Drupal\coveo\API\Model\BatchDocumentBody $batch
* Batch operation to write into AWS storage.
*
* @throws \GuzzleHttp\Exception\GuzzleException
* @throws \Neclimdul\OpenapiPhp\Helper\Exception\ApiSerializationException
*
* @see https://docs.coveo.com/en/90/index-content/manage-batches-of-items-in-a-push-source
*/
public function batch(
string $source_id,
BatchDocumentBody $batch,
): void {
// One of these should be populated, or it's going to break when it gets to
// coveo.
assert($batch->valid());
// Preprocess binary file fields.
$this->preprocessBinaryFields($batch->getAddOrUpdate());
$this->eventDispatcher->dispatch(new CoveoBatchAlter(
$batch,
$this,
));
// Upload main batch.
$file_id = $this->uploadToAwsContainer(json_encode($batch));
// @todo better way of making sure aws file exists or batching this process.
sleep(3);
// Set container file id to the document batch.
$item_response = $this->itemApi->organizationsOrganizationIdSourcesSourceIdDocumentsBatchPut(
$source_id,
$file_id,
$this->organizationId,
);
if (!$item_response->isSuccess()) {
Error::logError($this->logger, $item_response);
throw new \Exception('Error submitting batched AWS data to Item API.');
}
}
/**
* Prepare binary fields for batch operations.
*
* @param \Drupal\coveo\DocumentBody[] $documents
* Documents getting ready to be indexed.
*
* @throws \GuzzleHttp\Exception\GuzzleException
* @throws \Neclimdul\OpenapiPhp\Helper\Exception\ApiSerializationException
*
* @see https://docs.coveo.com/en/73/index-content/push-source-item-data
* Documents tradeoffs of various data field types.
*/
private function preprocessBinaryFields(array $documents): void {
foreach ($documents as $document) {
if (isset($document->binaryFile)) {
$file_id = $this->uploadToAwsContainer(
new SplFileObjectStream($document->binaryFile)
);
$document->setCompressedBinaryDataFileId($file_id);
$document->setAdditionalProperty('compressionType', 'UNCOMPRESSED');
$document->setFileExtension($document->binaryFile->getExtension());
}
}
}
/**
* Helper method to push an arbitrary file into AWS and get a Coveo reference.
*
* @param string|resource|\Psr\Http\Message\StreamInterface $body
* File contents.
*
* @return string
* Coveo reference fileId.
*
* @throws \GuzzleHttp\Exception\GuzzleException
* @throws \Neclimdul\OpenapiPhp\Helper\Exception\ApiSerializationException
*/
private function uploadToAwsContainer($body): string {
$response = $this->fileContainerApi->organizationsOrganizationIdFilesPost(
$this->organizationId,
);
if (!$response->isSuccess()) {
throw new \Exception('Failed to create post location');
}
/** @var \NecLimDul\Coveo\PushApi\Model\PresignedUploadUrl $step1 */
$step1 = $response->getData();
// Step 2. Write into AWS file container.
// Sometimes aws has trouble connecting on the first attempt so retry and
// use CURLOPT_CONNECTTIMEOUT to extend the connection time.
for ($i = 0; $i <= 3; $i++) {
try {
$aws_response = $this->client->put(
$step1->getUploadUri(),
[
'headers' => $step1->getRequiredHeaders(),
'body' => $body,
'curl' => [
CURLOPT_CONNECTTIMEOUT => 5,
],
],
);
// Success, break out of the loop.
break;
}
catch (ConnectException $e) {
if ($i < 3) {
DrupalError::logException(
$this->logger,
$e,
level: LogLevel::WARNING,
);
sleep(1);
}
else {
// Give up and report the error.
throw $e;
}
}
}
return $step1->getFileId();
}
}
