deepseek-1.x-dev/src/Plugin/AiDbVectors/MysqlDbVector.php
src/Plugin/AiDbVectors/MysqlDbVector.php
<?php
namespace Drupal\deepseek\Plugin\AiDbVectors;
use Drupal\Core\Config\Config;
use Drupal\Core\Database\Connection;
use Drupal\Core\Database\DatabaseExceptionWrapper;
use Drupal\Core\Entity\EntityTypeManagerInterface;
use Drupal\Core\Link;
use Drupal\Core\Logger\LoggerChannelFactoryInterface;
use Drupal\Core\Plugin\ContainerFactoryPluginInterface;
use Drupal\Core\StringTranslation\TranslatableMarkup;
use Drupal\deepseek\AiDbVectorsInterface;
use Drupal\deepseek\AiDbVectorsPluginManager;
use Drupal\deepseek\Attribute\AiDbVectors;
use Symfony\Component\DependencyInjection\ContainerInterface;
/**
* Plugin implementation of the AI database vectors.
*/
#[AiDbVectors(
id: 'mysql',
label: new TranslatableMarkup('Mysql 9.1'),
description: new TranslatableMarkup('Version 9+ and HeatWave MySQL on OCI.'),
table_name: 'ai_embedding',
)]
class MysqlDbVector implements AiDbVectorsInterface, ContainerFactoryPluginInterface {
/**
* Limit vector 768 dimensions.
*
* @var int
*/
public int $limit = 768;
/**
* Constructs a Mysql vector object.
*
* @param string $plugin_id
* The plugin_id for the formatter.
* @param mixed $plugin_definition
* The plugin implementation definition.
* @param mixed $field_definition
* The definition of the field to which the formatter is associated.
* @param \Drupal\Core\Database\Connection $connection
* The database connection service.
* @param \Drupal\Core\Logger\LoggerChannelFactoryInterface $logger
* The logger service.
* @param \Drupal\deepseek\AiDbVectorsPluginManager $vectors
* The vector database service.
* @param \Drupal\Core\Entity\EntityTypeManagerInterface $entityTypeManager
* The entity type manager interface.
* @param \Drupal\Core\Config\Config $configFactory
* The config factory service.
*/
public function __construct($plugin_id, $plugin_definition, $field_definition, protected Connection $connection, protected LoggerChannelFactoryInterface $logger, protected AiDbVectorsPluginManager $vectors, protected EntityTypeManagerInterface $entityTypeManager, protected Config $configFactory) {
unset($plugin_id, $plugin_definition, $field_definition);
}
/**
* {@inheritdoc}
*/
public static function create(ContainerInterface $container, $configuration, $plugin_id, $plugin_definition) {
return new static(
$plugin_id,
$plugin_definition,
$configuration,
$container->get('database'),
$container->get('logger.factory'),
$container->get('plugin.manager.ai_db_vectors'),
$container->get('entity_type.manager'),
$container->get('config.factory')->get('deepseek.settings'),
);
}
/**
* Search for the best matching chunks and return related content.
*
* @param array $vectors
* The vector array to search for.
* @param string $message
* The message string to search for.
* @param int $limit
* The number of results to return.
*
* @return array
* Array of content records sorted by delta.
*/
public function search(array $vectors, string $message, $limit = 1): array {
try {
$embeddingTable = $this->configFactory->get('table_name');
$vectorString = json_encode($vectors, JSON_NUMERIC_CHECK);
$subquery = $this->connection->select($embeddingTable, 'sub_e');
$subquery->addExpression(
'MAX((
(1 - DISTANCE(sub_e.embedding, STRING_TO_VECTOR(:query_vector), COSINE)) * 0.6
+ MATCH(sub_e.content) AGAINST(:key_word IN NATURAL LANGUAGE MODE) * 0.4))',
'max_hybrid_score',
[
':query_vector' => $vectorString,
':key_word' => $message,
]
);
$dbVector = $this->configFactory->get('db_vector');
$vector = $this->vectors->getDefinitions()[$dbVector] ?? NULL;
if (!$vector) {
return [];
}
$embeddingTable = $vector['table_name'];
$vectors = array_pad(array_values($vectors), $this->limit, 0);
$vectorString = json_encode($vectors, JSON_NUMERIC_CHECK);
$distance_function = "DISTANCE(sub_e.embedding, STRING_TO_VECTOR(:query_vector), 'COSINE')";
// Step 1: Find the best matching chunks.
$subquery = $this->connection->select($embeddingTable, 'sub_e');
$subquery->addExpression("MAX(1 - $distance_function)", 'min_distance', [
':query_vector' => $vectorString,
':key_word' => $message,
]);
$subquery->fields('sub_e', ['entity_type', 'entity_id']);
$subquery->isNotNull('sub_e.embedding');
$subquery->groupBy('sub_e.entity_type');
$subquery->groupBy('sub_e.entity_id');
// Build the main query.
$query = $this->connection->select($embeddingTable, 'e');
$query->fields('e', ['delta', 'entity_type', 'entity_id']);
$query->addExpression($distance_function, 'distance', [':query_vector' => $vectorString]);
// Join with the subquery.
$query->join($subquery, 'sub', 'e.entity_type = sub.entity_type AND e.entity_id = sub.entity_id AND VEC_DISTANCE_COSINE(e.embedding, VEC_FROMTEXT(:query_vector)) = sub.min_distance', [':query_vector' => $vectorString]);
$query->isNotNull('e.embedding');
$query->orderBy('distance', 'ASC');
$query->range(0, $limit);
$results = $query->execute()->fetchAll(\PDO::FETCH_ASSOC);
if (empty($results)) {
return [];
}
// Step 2: Collect all content for matching entity_type and entity_id.
$content = [];
$links = [];
foreach ($results as $result) {
$entity_query = $this->connection->select($embeddingTable, 'e')
->condition('entity_type', $result['entity_type'])
->condition('entity_id', $result['entity_id'])
->condition('delta', range($results['delta'] - 5, $results['delta'] + 5), 'IN');
$entity_query->addExpression("GROUP_CONCAT(e.content ORDER BY e.delta SEPARATOR ' ')", 'merged_content');
$content[] = $entity_query->execute()->fetchField();
// Get links for the entity.
$entity = $this->entityTypeManager->getStorage($result['entity_type'])->load($result['entity_id']); if ($entity) {
$url = $entity->toUrl();
$title = $entity?->label() ?? $result->entity_type;
$link = Link::fromTextAndUrl($title, $url)->toString();
$links[] = $link;
}
}
return [
'content' => $content,
'links' => array_unique($links),
];
}
catch (\Exception $e) {
$this->logger->get('ai_db_vectors')->error('Search error: @error', ['@error' => $e->getMessage()]);
return [];
}
}
/**
* Delete all chunks of an entity by entity type and ID.
*
* @param string $entityType
* The entity type.
* @param int $entityId
* The entity ID.
*
* @return bool
* TRUE if deletion was successful, FALSE otherwise.
*/
public function delete(string $entityType, int $entityId) {
$dbVector = $this->configFactory->get('db_vector');
$vector = $this->vectors->getDefinitions()[$dbVector] ?? [];
$embeddingTable = $vector['table_name'] ?? FALSE;
if (!$embeddingTable) {
return FALSE;
}
try {
$this->connection->delete($embeddingTable)
->condition('entity_type', $entityType)
->condition('entity_id', $entityId)
->execute();
return TRUE;
}
catch (\Exception $e) {
$this->logger->get('embedding')->error('Error deleting embedding: @error', ['@error' => $e->getMessage()]);
return FALSE;
}
}
/**
* Check if an entity by entity type and ID.
*
* @param string $entityType
* The entity type.
* @param int $entityId
* The entity ID.
*
* @return bool
* TRUE if entity is existed, FALSE otherwise.
*/
public function exist(string $entityType, int $entityId): bool {
$embeddingTable = $this->configFactory->get('table_name');
$query = $this->connection->select($embeddingTable, 't')
->fields('t', ['entity_id'])
->condition('t.entity_type', $entityType)
->condition('t.entity_id', $entityId)
->range(0, 1);
$result = $query->execute()->fetchField();
return !empty($result);
}
/**
* Insert a chunk into the AI embedding table.
*
* @param string $entityType
* The entity type.
* @param int $entityId
* The entity ID.
* @param string $chunk
* The content chunk.
* @param array $vectors
* The vector array.
* @param int $delta
* The delta value.
*
* @return bool
* TRUE if insertion was successful, FALSE otherwise.
*/
public function insert(string $entityType, int $entityId, string $chunk, array $vectors, int $delta = 0) {
$dbVector = $this->configFactory->get('db_vector');
$vector = $this->vectors->getDefinitions()[$dbVector];
$tableName = $vector['table_name'];
try {
$embeddingJson = json_encode(array_pad(array_values($vectors), $this->limit, 0), JSON_NUMERIC_CHECK);
$query = "INSERT INTO {$tableName} (entity_type, entity_id, delta, content, embedding)
VALUES (:entity_type, :entity_id, :delta, :content, STRING_TO_VECTOR(:embedding))";
$this->connection->query($query, [
':entity_type' => $entityType,
':entity_id' => $entityId,
':delta' => $delta,
':content' => $chunk,
':embedding' => $embeddingJson,
]);
return TRUE;
}
catch (DatabaseExceptionWrapper $e) {
$this->logger->get('embedding')->error('Error inserting embedding: @error', ['@error' => $e->getMessage()]);
return FALSE;
}
}
/**
* Create the AI embedding table.
*
* @param string $table_name
* The table name.
* @param int $dimension
* Vector dimensionality.
*
* @return bool
* TRUE if creation was successful, FALSE if it exists or fails.
*/
public function createTable(string $table_name, int $dimension): bool {
$dbVector = $this->configFactory->get('db_vector');
$vector = $this->vectors->getDefinitions()[$dbVector];
$tableName = $vector['table_name'];
$schema = $this->connection->schema();
$type = "VECTOR ($dimension)";
if (!$schema->tableExists($tableName)) {
$version = $this->connection->query("SELECT VERSION()")->fetchField();
$mysql_version = preg_replace('/^(\d+\.\d+)\..*/', '$1', $version);
if (version_compare($mysql_version, '9.1', '<')) {
return FALSE;
}
$schema->createTable($tableName, [
'description' => 'AI embedding for chatbot',
'fields' => [
'id' => [
'type' => 'serial',
'not null' => TRUE,
],
'entity_type' => [
'type' => 'varchar',
'length' => 32,
'not null' => FALSE,
],
'entity_id' => [
'type' => 'int',
'not null' => FALSE,
],
'delta' => [
'type' => 'int',
'default' => 0,
'not null' => FALSE,
],
'content' => [
'type' => 'text',
'size' => 'normal',
'length' => $dimension,
'not null' => TRUE,
],
'embedding' => [
'type' => $type,
'mysql_type' => $type,
'size' => 'big',
'not null' => TRUE,
],
],
'primary key' => ['id'],
'indexes' => [
'idx_entity' => ['entity_type', 'entity_id'],
],
]);
try {
$this->connection->query("
ALTER TABLE {{$tableName}}
ADD FULLTEXT INDEX idx_content (content),
ADD VECTOR INDEX idx_embedding (embedding) M=6 DISTANCE=euclidean");
}
catch (\Exception $e) {
\Drupal::logger('db_vector')->warning('Could not create vector index: @error', ['@error' => $e->getMessage()]);
}
$this->connection->query("
ALTER TABLE {$tableName}
ADD COLUMN created TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
ADD COLUMN changed TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
");
return TRUE;
}
return FALSE;
}
}
