redis-8.x-1.x-dev/src/Queue/RedisQueue.php

src/Queue/RedisQueue.php
<?php

namespace Drupal\redis\Queue;

use Drupal\Core\Queue\QueueInterface;
use Drupal\redis\ClientInterface;

/**
 * Redis queue implementation using PhpRedis extension backend.
 *
 * @ingroup queue
 */
class RedisQueue implements QueueInterface {

  /**
   * The Redis client.
   */
  protected ClientInterface $client;

  /**
   * Prefix used with all keys.
   */
  const KEY_PREFIX = 'drupal:queue:';

  /**
   * The name of the queue this instance is working with.
   *
   * @var string
   */
  protected $name;

  /**
   * Key for list of available items.
   *
   * @var string
   */
  protected $availableListKey;

  /**
   * Key for list of claimed items.
   *
   * @var string
   */
  protected $claimedListKey;

  /**
   * Key prefix for items that are used to track expiration of leased items.
   *
   * @var string
   */
  protected $leasedKeyPrefix;

  /**
   * Key of increment counter key.
   *
   * @var string
   */
  protected $incrementCounterKey;

  /**
   * Key for hash table of available queue items.
   *
   * @var string
   */
  protected $availableItems;

  /**
   * Reserve timeout for blocking item claim.
   *
   * This will be set to number of seconds to wait for an item to be claimed.
   * Non-blocking approach will be used when set to NULL.
   *
   * @var int|null
   */
  protected $reserveTimeout;

  /**
   * Constructs a \Drupal\redis\Queue\PhpRedis object.
   *
   * @param string $name
   *   The name of the queue.
   * @param array $settings
   *   Array of Redis-related settings for this queue.
   * @param \Redis $client
   *   The PhpRedis client.
   */
  public function __construct($name, array $settings, ClientInterface $client) {
    $this->name = $name;
    $this->reserveTimeout = $settings['reserve_timeout'];
    $this->availableListKey = static::KEY_PREFIX . $name . ':avail';
    $this->availableItems = static::KEY_PREFIX . $name . ':items';
    $this->claimedListKey = static::KEY_PREFIX . $name . ':claimed';
    $this->leasedKeyPrefix = static::KEY_PREFIX . $name . ':lease:';
    $this->incrementCounterKey = static::KEY_PREFIX . $name . ':counter';
    $this->client = $client;

    $this->client->addIgnorePattern(static::KEY_PREFIX . $name . ':*');
  }

  /**
   * {@inheritdoc}
   */
  public function createQueue() {
    // Nothing to do here.
  }

  /**
   * {@inheritdoc}
   */
  public function createItem($data) {
    $record = new \stdClass();
    $record->data = $data;
    $record->item_id = $this->incrementId();
    // We cannot rely on REQUEST_TIME because many items might be created
    // by a single request which takes longer than 1 second.
    $record->timestamp = time();

    if (!$this->client->hsetnx($this->availableItems, $record->item_id, serialize($record))) {
      return FALSE;
    }

    $start_len = $this->client->lLen($this->availableListKey);
    if ($start_len < $this->client->lpush($this->availableListKey, $record->item_id)) {
      return $record->item_id;
    }

    return FALSE;
  }

  /**
   * Gets next serial ID for Redis queue items.
   *
   * @return int
   *   Next serial ID for Redis queue item.
   */
  protected function incrementId() {
    return $this->client->incr($this->incrementCounterKey);
  }

  /**
   * {@inheritdoc}
   */
  public function numberOfItems() {
    return $this->client->lLen($this->availableListKey) + $this->client->lLen($this->claimedListKey);
  }

  /**
   * {@inheritdoc}
   */
  public function claimItem($lease_time = 30) {
    // Is it OK to do garbage collection here (we need to loop list of claimed
    // items)?
    $this->garbageCollection();
    $item = FALSE;

    if ($this->reserveTimeout !== NULL) {
      // A blocking version of claimItem to be used with long-running queue workers.
      $qid = $this->client->brpoplpush($this->availableListKey, $this->claimedListKey, $this->reserveTimeout);
    }
    else {
      $qid = $this->client->rpoplpush($this->availableListKey, $this->claimedListKey);
    }

    if ($qid) {
      $job = $this->client->hget($this->availableItems, $qid);
      if ($job) {
        $item = unserialize($job);
        $item->item_id ??= $item->qid;
        $this->client->setex($this->leasedKeyPrefix . $item->item_id, $lease_time, '1');
      }
    }

    return $item;
  }

  /**
   * {@inheritdoc}
   */
  public function releaseItem($item) {
    $this->client->lrem($this->claimedListKey, $item->item_id, -1);
    $this->client->lpush($this->availableListKey, $item->item_id);
    return TRUE;
  }

  /**
   * {@inheritdoc}
   */
  public function deleteItem($item) {
    $this->client->lrem($this->claimedListKey, $item->item_id, -1);
    $this->client->hdel($this->availableItems, $item->item_id);
  }

  /**
   * {@inheritdoc}
   */
  public function deleteQueue() {
    $keys_to_remove = [
      $this->claimedListKey,
      $this->availableListKey,
      $this->availableItems,
      $this->incrementCounterKey
    ];

    foreach ($this->client->keys($this->leasedKeyPrefix . '*') as $key) {
      $keys_to_remove[] = $key;
    }

    $this->client->del($keys_to_remove);
  }

  /**
   * Automatically release items, that have been claimed and exceeded lease time.
   */
  protected function garbageCollection() {
    foreach ($this->client->lrange($this->claimedListKey, 0, -1) as $qid) {
      if (!$this->client->exists($this->leasedKeyPrefix . $qid)) {
        // The lease expired for this ID.
        $this->client->lrem($this->claimedListKey, $qid, -1);
        $this->client->lpush($this->availableListKey, $qid);
      }
    }
  }
}

Главная | Обратная связь

drupal hosting | друпал хостинг | it patrol .inc