deepseek-1.x-dev/src/Plugin/AiDbVectors/PgvectorDbVector.php

src/Plugin/AiDbVectors/PgvectorDbVector.php
<?php

namespace Drupal\deepseek\Plugin\AiDbVectors;

use Drupal\Core\Config\Config;
use Drupal\Core\Database\Connection;
use Drupal\Core\Database\DatabaseExceptionWrapper;
use Drupal\Core\Entity\EntityTypeManagerInterface;
use Drupal\Core\Link;
use Drupal\Core\Logger\LoggerChannelFactoryInterface;
use Drupal\Core\Plugin\ContainerFactoryPluginInterface;
use Drupal\Core\StringTranslation\TranslatableMarkup;
use Drupal\deepseek\AiDbVectorsInterface;
use Drupal\deepseek\AiDbVectorsPluginManager;
use Drupal\deepseek\Attribute\AiDbVectors;
use Symfony\Component\DependencyInjection\ContainerInterface;

/**
 * Plugin implementation of the AI database vectors.
 */
#[AiDbVectors(
  id: 'pgsql',
  label: new TranslatableMarkup('PostgreSQL + pgvector'),
  description: new TranslatableMarkup('PostgreSQL uses with plugin pgvector.'),
  table_name: 'ai_embedding',
)]
class PgvectorDbVector implements AiDbVectorsInterface, ContainerFactoryPluginInterface {

  /**
   * Limit vector 768 dimensions.
   *
   * @var int
   */
  public int $limit = 768;

  /**
   * Constructs a PostgreSQL vector object.
   *
   * @param string $plugin_id
   *   The plugin_id for the formatter.
   * @param mixed $plugin_definition
   *   The plugin implementation definition.
   * @param mixed $field_definition
   *   The definition of the field to which the formatter is associated.
   * @param \Drupal\Core\Database\Connection $connection
   *   The database connection service.
   * @param \Drupal\Core\Logger\LoggerChannelFactoryInterface $logger
   *   The logger service.
   * @param \Drupal\deepseek\AiDbVectorsPluginManager $vectors
   *   The vector database service.
   * @param \Drupal\Core\Entity\EntityTypeManagerInterface $entityTypeManager
   *   The entity type manager interface.
   * @param \Drupal\Core\Config\Config $configFactory
   *   The config factory service.
   */
  public function __construct($plugin_id, $plugin_definition, $field_definition, protected Connection $connection, protected LoggerChannelFactoryInterface $logger, protected AiDbVectorsPluginManager $vectors, protected EntityTypeManagerInterface $entityTypeManager, protected Config $configFactory) {
    unset($plugin_id, $plugin_definition, $field_definition);
  }

  /**
   * {@inheritdoc}
   */
  public static function create(ContainerInterface $container, $configuration, $plugin_id, $plugin_definition) {
    return new static(
      $plugin_id,
      $plugin_definition,
      $configuration,
      $container->get('database'),
      $container->get('logger.factory'),
      $container->get('plugin.manager.ai_db_vectors'),
      $container->get('entity_type.manager'),
      $container->get('config.factory')->get('deepseek.settings'),
    );
  }

  /**
   * Search for the best matching chunks and return related content.
   *
   * @param array $vectors
   *   The vector array to search for.
   * @param string $message
   *   The message string to search for.
   * @param int $limit
   *   The number of results to return.
   *
   * @return array
   *   Array of content records sorted by delta.
   */
  public function search(array $vectors, string $message, $limit = 1): array {
    try {
      $dbVector = $this->configFactory->get('db_vector');
      $vector = $this->vectors->getDefinitions()[$dbVector] ?? NULL;
      if (!$vector) {
        return [];
      }
      // This operator <=>: computes the cosine similarity.
      // Can be replaced <->: calculates the Euclidean distance vs 2 vectors.
      $embeddingTable = $vector['table_name'];
      $distance_function = "embedding <=> :query_vector";

      // Step 1: Find the best matching chunks.
      $query = $this->connection->select($embeddingTable, 'e')
        ->fields('e', ['entity_type', 'entity_id', 'delta'])
        ->condition('embedding', NULL, 'IS NOT NULL')
        ->orderBy($distance_function, 'ASC')
        ->range(0, $limit);
      $vectors = array_pad(array_values($vectors), $this->limit, 0);
      $vectorString = json_encode($vectors, JSON_NUMERIC_CHECK);
      $query->addExpression($distance_function, 'distance', [
        ':query_vector' => $vectorString,
        ':search_text' => $message,
      ]);
      $query->groupBy('e.entity_type');
      $query->groupBy('e.entity_id');
      $results = $query->execute()->fetchAll();

      if (empty($results)) {
        return [];
      }

      // Step 2: Collect all content for matching entity_type and entity_id.
      $content = [];
      $links = [];
      foreach ($results as $result) {
        $entity_query = $this->connection->select($embeddingTable, 'e')
          ->condition('entity_type', $result->entity_type)
          ->condition('entity_id', $result->entity_id)
          ->condition('delta', range($result->delta - 5, $result->delta + 5), 'IN')
          ->orderBy('delta', 'ASC');
        $entity_query->addExpression("GROUP_CONCAT(e.content ORDER BY e.delta SEPARATOR ' ')", 'merged_content');
        $content[] = $entity_query->execute()->fetchField();
        // Get links for the entity.
        $entity = $this->entityTypeManager->getStorage($result['entity_type'])->load($result['entity_id']);
        if ($entity) {
          $url = $entity->toUrl();
          $title = $entity?->label() ?? $result->entity_type;
          $link = Link::fromTextAndUrl($title, $url)->toString();
          $links[] = $link;
        }
      }
      return [
        'content' => $content,
        'links' => array_unique($links),
      ];
    }
    catch (\Exception $e) {
      $this->logger->get('ai_db_vectors')->error('Search error: @error', ['@error' => $e->getMessage()]);
      return [];
    }
  }

  /**
   * Delete all chunks of an entity by entity type and ID.
   *
   * @param string $entityType
   *   The entity type.
   * @param int $entityId
   *   The entity ID.
   *
   * @return bool
   *   TRUE if deletion was successful, FALSE otherwise.
   */
  public function delete(string $entityType, int $entityId) {
    $dbVector = $this->configFactory->get('db_vector');
    $vector = $this->vectors->getDefinitions()[$dbVector] ?? [];
    $embeddingTable = $vector['table_name'] ?? FALSE;
    if (!$embeddingTable) {
      return FALSE;
    }
    try {
      $this->connection->delete($embeddingTable)
        ->condition('entity_type', $entityType)
        ->condition('entity_id', $entityId)
        ->execute();
      return TRUE;
    }
    catch (\Exception $e) {
      $this->logger->get('embedding')->error('Error deleting embedding: @error', ['@error' => $e->getMessage()]);
      return FALSE;
    }
  }

  /**
   * Check if an entity by entity type and ID.
   *
   * @param string $entityType
   *   The entity type.
   * @param int $entityId
   *   The entity ID.
   *
   * @return bool
   *   TRUE if entity is existed, FALSE otherwise.
   */
  public function exist(string $entityType, int $entityId): bool {
    $embeddingTable = $this->configFactory->get('table_name');
    $query = $this->connection->select($embeddingTable, 't')
      ->fields('t', ['entity_id'])
      ->condition('t.entity_type', $entityType)
      ->condition('t.entity_id', $entityId)
      ->range(0, 1);
    $result = $query->execute()->fetchField();
    return !empty($result);
  }

  /**
   * Insert a chunk into the AI embedding table.
   *
   * @param string $entityType
   *   The entity type.
   * @param int $entityId
   *   The entity ID.
   * @param string $chunk
   *   The content chunk.
   * @param array $vectors
   *   The vector array.
   * @param int $delta
   *   The delta value.
   *
   * @return bool
   *   TRUE if insertion was successful, FALSE otherwise.
   */
  public function insert(string $entityType, int $entityId, string $chunk, array $vectors, int $delta = 0) {
    $dbVector = $this->configFactory->get('db_vector');
    $vector = $this->vectors->getDefinitions()[$dbVector];
    $tableName = $vector['table_name'];
    try {
      $embeddingJson = json_encode(array_pad(array_values($vectors), $this->limit, 0), JSON_NUMERIC_CHECK);
      $query = "INSERT INTO {$tableName} (entity_type, entity_id, delta, content, embedding)
        VALUES (:entity_type, :entity_id, :delta, :content, :embedding::vector)";

      $this->connection->query($query, [
        ':entity_type' => $entityType,
        ':entity_id' => $entityId,
        ':delta' => $delta,
        ':content' => $chunk,
        ':embedding' => $embeddingJson,
      ]);

      return TRUE;
    }
    catch (DatabaseExceptionWrapper $e) {
      $this->logger->get('embedding')->error('Error inserting embedding: @error', ['@error' => $e->getMessage()]);
      return FALSE;
    }
  }

  /**
   * Create the AI embedding table.
   *
   * @param string $table_name
   *   The table name.
   * @param int $dimension
   *   Vector dimensionality.
   *
   * @return bool
   *   TRUE if creation was successful, FALSE if it exists or fails.
   */
  public function createTable(string $table_name, int $dimension): bool {
    $dbVector = $this->configFactory->get('db_vector');
    $vector = $this->vectors->getDefinitions()[$dbVector];
    $tableName = $vector['table_name'];
    $schema = $this->connection->schema();
    $type = "vector ($dimension)";
    if (!$schema->tableExists($tableName)) {
      $has_pg_vector = $this->connection->query("SELECT EXISTS (
      SELECT FROM pg_extension
      WHERE extname = 'vector'
    )")->fetchField();
      if (!$has_pg_vector) {
        try {
          $this->connection->query("CREATE EXTENSION IF NOT EXISTS vector");
        }
        catch (\Exception $e) {
          \Drupal::logger('db_vector')->error('Cannot create vector extension: @error', ['@error' => $e->getMessage()]);
          return FALSE;
        }
      }
      $schema->createTable($tableName, [
        'description' => 'AI embedding for chatbot',
        'fields' => [
          'id' => [
            'type' => 'serial',
            'not null' => TRUE,
          ],
          'entity_type' => [
            'type' => 'varchar',
            'length' => 32,
            'not null' => FALSE,
          ],
          'entity_id' => [
            'type' => 'int',
            'not null' => FALSE,
          ],
          'delta' => [
            'type' => 'int',
            'default' => 0,
            'not null' => FALSE,
          ],
          'content' => [
            'type' => 'text',
            'size' => 'normal',
            'length' => $dimension,
            'not null' => TRUE,
          ],
          'embedding' => [
            'type' => $type,
            'pgsql_type' => $type,
            'size' => 'big',
            'not null' => TRUE,
          ],
          'created' => [
            'type' => 'timestamp',
            'default' => 'CURRENT_TIMESTAMP',
            'not null' => TRUE,
          ],
          'changed' => [
            'type' => 'timestamp',
            'default' => 'CURRENT_TIMESTAMP',
            'not null' => TRUE,
          ],
        ],
        'primary key' => ['id'],
        'indexes' => [
          'idx_entity' => ['entity_type', 'entity_id'],
        ],
      ]);

      // Create indexes for PostgreSQL.
      try {
        // GIN index for full-text search.
        $this->connection->query("CREATE INDEX idx_content_gin ON {{$tableName}} USING gin(to_tsvector('english', content))");

        // HNSW index for vector similarity.
        $this->connection->query("CREATE INDEX idx_embedding_hnsw ON {{$tableName}} USING hnsw (embedding vector_cosine_ops)");

        // Trigger for updated timestamp.
        $this->connection->query("
        CREATE OR REPLACE FUNCTION update_changed_column()
        RETURNS TRIGGER AS \$\$
        BEGIN
          NEW.changed = CURRENT_TIMESTAMP;
          RETURN NEW;
        END;
        \$\$ language 'plpgsql';
      ");

        $this->connection->query("
        CREATE TRIGGER update_{$tableName}_changed
        BEFORE UPDATE ON {{$tableName}}
        FOR EACH ROW EXECUTE FUNCTION update_changed_column();
      ");

      }
      catch (\Exception $e) {
        \Drupal::logger('db_vector')->warning('Could not create indexes: @error', ['@error' => $e->getMessage()]);
      }

      return TRUE;
    }
    return FALSE;
  }

}

Главная | Обратная связь

drupal hosting | друпал хостинг | it patrol .inc