commercetools-8.x-1.2-alpha1/src/CommercetoolsMessages.php
src/CommercetoolsMessages.php
<?php
namespace Drupal\commercetools;
use Drupal\commercetools\Event\CommercetoolsMessageEvent;
use Drupal\Core\State\StateInterface;
use GraphQL\Actions\Query;
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
/**
* A Commercetools Messages service.
*/
class CommercetoolsMessages {
/**
* The key to store the last processed message time in the state.
*/
const LAST_PROCESSED_MESSAGE_CREATED_AT = "commercetools.messages.last_processed_message_created_at";
/**
* Constructs a CommercetoolsMessages object.
*
* @param \Drupal\commercetools\CommercetoolsApiService $ctApi
* The Commercetools API service.
* @param \Drupal\Core\State\StateInterface $state
* The state service.
* @param \Symfony\Contracts\EventDispatcher\EventDispatcherInterface $eventDispatcher
* The event dispatcher.
* @param \Drupal\commercetools\CommercetoolsConfiguration $ctConfig
* The commercetools configuration service.
*/
public function __construct(
protected CommercetoolsApiService $ctApi,
protected StateInterface $state,
protected EventDispatcherInterface $eventDispatcher,
protected CommercetoolsConfiguration $ctConfig,
) {
}
/**
* Processes the message queue.
*/
public function processMessageQueue() {
if (!$this->ctApi->isAccessConfigured() && $this->ctConfig->isSubscriptionsConfigured()) {
return;
}
$messages = $this->getMessages();
$this->processMessages($messages);
}
/**
* Gets the messages.
*
* @return array
* The messages.
*/
public function getMessages() {
$lastProcessedMessageCreatedAt = $this->state->get(self::LAST_PROCESSED_MESSAGE_CREATED_AT);
$query = new Query('messages', ['limit' => 50, 'sort' => 'createdAt']);
$query->use('total');
$results = $query->results([])->use(
'id',
'type',
'createdAt',
);
$results->resourceRef([])->use(
'id',
'typeId',
);
$variables = [];
if ($lastProcessedMessageCreatedAt) {
// @todo Rework this when a builder for the "where" constructor is
// implemented.
$variables['where'] = "createdAt > \"$lastProcessedMessageCreatedAt\"";
}
$response = $this->ctApi->executeGraphQlOperation($query, $variables);
$data = $response->getData();
$messages = $data['messages']['results'];
return $messages;
}
/**
* Processes the messages.
*
* @param array $messages
* The messages.
*/
public function processMessages(array $messages) {
foreach ($messages as $message) {
$event = new CommercetoolsMessageEvent($message);
$this->eventDispatcher->dispatch($event);
$this->state->set(self::LAST_PROCESSED_MESSAGE_CREATED_AT, $message['createdAt']);
}
}
}
