deepseek-1.x-dev/src/Plugin/AiDbVectors/PgvectorDbVector.php
src/Plugin/AiDbVectors/PgvectorDbVector.php
<?php
namespace Drupal\deepseek\Plugin\AiDbVectors;
use Drupal\Core\Config\Config;
use Drupal\Core\Database\Connection;
use Drupal\Core\Database\DatabaseExceptionWrapper;
use Drupal\Core\Entity\EntityTypeManagerInterface;
use Drupal\Core\Link;
use Drupal\Core\Logger\LoggerChannelFactoryInterface;
use Drupal\Core\Plugin\ContainerFactoryPluginInterface;
use Drupal\Core\StringTranslation\TranslatableMarkup;
use Drupal\deepseek\AiDbVectorsInterface;
use Drupal\deepseek\AiDbVectorsPluginManager;
use Drupal\deepseek\Attribute\AiDbVectors;
use Symfony\Component\DependencyInjection\ContainerInterface;
/**
* Plugin implementation of the AI database vectors.
*/
#[AiDbVectors(
id: 'pgsql',
label: new TranslatableMarkup('PostgreSQL + pgvector'),
description: new TranslatableMarkup('PostgreSQL uses with plugin pgvector.'),
table_name: 'ai_embedding',
)]
class PgvectorDbVector implements AiDbVectorsInterface, ContainerFactoryPluginInterface {
/**
* Limit vector 768 dimensions.
*
* @var int
*/
public int $limit = 768;
/**
* Constructs a PostgreSQL vector object.
*
* @param string $plugin_id
* The plugin_id for the formatter.
* @param mixed $plugin_definition
* The plugin implementation definition.
* @param mixed $field_definition
* The definition of the field to which the formatter is associated.
* @param \Drupal\Core\Database\Connection $connection
* The database connection service.
* @param \Drupal\Core\Logger\LoggerChannelFactoryInterface $logger
* The logger service.
* @param \Drupal\deepseek\AiDbVectorsPluginManager $vectors
* The vector database service.
* @param \Drupal\Core\Entity\EntityTypeManagerInterface $entityTypeManager
* The entity type manager interface.
* @param \Drupal\Core\Config\Config $configFactory
* The config factory service.
*/
public function __construct($plugin_id, $plugin_definition, $field_definition, protected Connection $connection, protected LoggerChannelFactoryInterface $logger, protected AiDbVectorsPluginManager $vectors, protected EntityTypeManagerInterface $entityTypeManager, protected Config $configFactory) {
unset($plugin_id, $plugin_definition, $field_definition);
}
/**
* {@inheritdoc}
*/
public static function create(ContainerInterface $container, $configuration, $plugin_id, $plugin_definition) {
return new static(
$plugin_id,
$plugin_definition,
$configuration,
$container->get('database'),
$container->get('logger.factory'),
$container->get('plugin.manager.ai_db_vectors'),
$container->get('entity_type.manager'),
$container->get('config.factory')->get('deepseek.settings'),
);
}
/**
* Search for the best matching chunks and return related content.
*
* @param array $vectors
* The vector array to search for.
* @param string $message
* The message string to search for.
* @param int $limit
* The number of results to return.
*
* @return array
* Array of content records sorted by delta.
*/
public function search(array $vectors, string $message, $limit = 1): array {
try {
$dbVector = $this->configFactory->get('db_vector');
$vector = $this->vectors->getDefinitions()[$dbVector] ?? NULL;
if (!$vector) {
return [];
}
// This operator <=>: computes the cosine similarity.
// Can be replaced <->: calculates the Euclidean distance vs 2 vectors.
$embeddingTable = $vector['table_name'];
$distance_function = "embedding <=> :query_vector";
// Step 1: Find the best matching chunks.
$query = $this->connection->select($embeddingTable, 'e')
->fields('e', ['entity_type', 'entity_id', 'delta'])
->condition('embedding', NULL, 'IS NOT NULL')
->orderBy($distance_function, 'ASC')
->range(0, $limit);
$vectors = array_pad(array_values($vectors), $this->limit, 0);
$vectorString = json_encode($vectors, JSON_NUMERIC_CHECK);
$query->addExpression($distance_function, 'distance', [
':query_vector' => $vectorString,
':search_text' => $message,
]);
$query->groupBy('e.entity_type');
$query->groupBy('e.entity_id');
$results = $query->execute()->fetchAll();
if (empty($results)) {
return [];
}
// Step 2: Collect all content for matching entity_type and entity_id.
$content = [];
$links = [];
foreach ($results as $result) {
$entity_query = $this->connection->select($embeddingTable, 'e')
->condition('entity_type', $result->entity_type)
->condition('entity_id', $result->entity_id)
->condition('delta', range($result->delta - 5, $result->delta + 5), 'IN')
->orderBy('delta', 'ASC');
$entity_query->addExpression("GROUP_CONCAT(e.content ORDER BY e.delta SEPARATOR ' ')", 'merged_content');
$content[] = $entity_query->execute()->fetchField();
// Get links for the entity.
$entity = $this->entityTypeManager->getStorage($result['entity_type'])->load($result['entity_id']);
if ($entity) {
$url = $entity->toUrl();
$title = $entity?->label() ?? $result->entity_type;
$link = Link::fromTextAndUrl($title, $url)->toString();
$links[] = $link;
}
}
return [
'content' => $content,
'links' => array_unique($links),
];
}
catch (\Exception $e) {
$this->logger->get('ai_db_vectors')->error('Search error: @error', ['@error' => $e->getMessage()]);
return [];
}
}
/**
* Delete all chunks of an entity by entity type and ID.
*
* @param string $entityType
* The entity type.
* @param int $entityId
* The entity ID.
*
* @return bool
* TRUE if deletion was successful, FALSE otherwise.
*/
public function delete(string $entityType, int $entityId) {
$dbVector = $this->configFactory->get('db_vector');
$vector = $this->vectors->getDefinitions()[$dbVector] ?? [];
$embeddingTable = $vector['table_name'] ?? FALSE;
if (!$embeddingTable) {
return FALSE;
}
try {
$this->connection->delete($embeddingTable)
->condition('entity_type', $entityType)
->condition('entity_id', $entityId)
->execute();
return TRUE;
}
catch (\Exception $e) {
$this->logger->get('embedding')->error('Error deleting embedding: @error', ['@error' => $e->getMessage()]);
return FALSE;
}
}
/**
* Check if an entity by entity type and ID.
*
* @param string $entityType
* The entity type.
* @param int $entityId
* The entity ID.
*
* @return bool
* TRUE if entity is existed, FALSE otherwise.
*/
public function exist(string $entityType, int $entityId): bool {
$embeddingTable = $this->configFactory->get('table_name');
$query = $this->connection->select($embeddingTable, 't')
->fields('t', ['entity_id'])
->condition('t.entity_type', $entityType)
->condition('t.entity_id', $entityId)
->range(0, 1);
$result = $query->execute()->fetchField();
return !empty($result);
}
/**
* Insert a chunk into the AI embedding table.
*
* @param string $entityType
* The entity type.
* @param int $entityId
* The entity ID.
* @param string $chunk
* The content chunk.
* @param array $vectors
* The vector array.
* @param int $delta
* The delta value.
*
* @return bool
* TRUE if insertion was successful, FALSE otherwise.
*/
public function insert(string $entityType, int $entityId, string $chunk, array $vectors, int $delta = 0) {
$dbVector = $this->configFactory->get('db_vector');
$vector = $this->vectors->getDefinitions()[$dbVector];
$tableName = $vector['table_name'];
try {
$embeddingJson = json_encode(array_pad(array_values($vectors), $this->limit, 0), JSON_NUMERIC_CHECK);
$query = "INSERT INTO {$tableName} (entity_type, entity_id, delta, content, embedding)
VALUES (:entity_type, :entity_id, :delta, :content, :embedding::vector)";
$this->connection->query($query, [
':entity_type' => $entityType,
':entity_id' => $entityId,
':delta' => $delta,
':content' => $chunk,
':embedding' => $embeddingJson,
]);
return TRUE;
}
catch (DatabaseExceptionWrapper $e) {
$this->logger->get('embedding')->error('Error inserting embedding: @error', ['@error' => $e->getMessage()]);
return FALSE;
}
}
/**
* Create the AI embedding table.
*
* @param string $table_name
* The table name.
* @param int $dimension
* Vector dimensionality.
*
* @return bool
* TRUE if creation was successful, FALSE if it exists or fails.
*/
public function createTable(string $table_name, int $dimension): bool {
$dbVector = $this->configFactory->get('db_vector');
$vector = $this->vectors->getDefinitions()[$dbVector];
$tableName = $vector['table_name'];
$schema = $this->connection->schema();
$type = "vector ($dimension)";
if (!$schema->tableExists($tableName)) {
$has_pg_vector = $this->connection->query("SELECT EXISTS (
SELECT FROM pg_extension
WHERE extname = 'vector'
)")->fetchField();
if (!$has_pg_vector) {
try {
$this->connection->query("CREATE EXTENSION IF NOT EXISTS vector");
}
catch (\Exception $e) {
\Drupal::logger('db_vector')->error('Cannot create vector extension: @error', ['@error' => $e->getMessage()]);
return FALSE;
}
}
$schema->createTable($tableName, [
'description' => 'AI embedding for chatbot',
'fields' => [
'id' => [
'type' => 'serial',
'not null' => TRUE,
],
'entity_type' => [
'type' => 'varchar',
'length' => 32,
'not null' => FALSE,
],
'entity_id' => [
'type' => 'int',
'not null' => FALSE,
],
'delta' => [
'type' => 'int',
'default' => 0,
'not null' => FALSE,
],
'content' => [
'type' => 'text',
'size' => 'normal',
'length' => $dimension,
'not null' => TRUE,
],
'embedding' => [
'type' => $type,
'pgsql_type' => $type,
'size' => 'big',
'not null' => TRUE,
],
'created' => [
'type' => 'timestamp',
'default' => 'CURRENT_TIMESTAMP',
'not null' => TRUE,
],
'changed' => [
'type' => 'timestamp',
'default' => 'CURRENT_TIMESTAMP',
'not null' => TRUE,
],
],
'primary key' => ['id'],
'indexes' => [
'idx_entity' => ['entity_type', 'entity_id'],
],
]);
// Create indexes for PostgreSQL.
try {
// GIN index for full-text search.
$this->connection->query("CREATE INDEX idx_content_gin ON {{$tableName}} USING gin(to_tsvector('english', content))");
// HNSW index for vector similarity.
$this->connection->query("CREATE INDEX idx_embedding_hnsw ON {{$tableName}} USING hnsw (embedding vector_cosine_ops)");
// Trigger for updated timestamp.
$this->connection->query("
CREATE OR REPLACE FUNCTION update_changed_column()
RETURNS TRIGGER AS \$\$
BEGIN
NEW.changed = CURRENT_TIMESTAMP;
RETURN NEW;
END;
\$\$ language 'plpgsql';
");
$this->connection->query("
CREATE TRIGGER update_{$tableName}_changed
BEFORE UPDATE ON {{$tableName}}
FOR EACH ROW EXECUTE FUNCTION update_changed_column();
");
}
catch (\Exception $e) {
\Drupal::logger('db_vector')->warning('Could not create indexes: @error', ['@error' => $e->getMessage()]);
}
return TRUE;
}
return FALSE;
}
}
