deepseek-1.x-dev/src/Plugin/AiDbVectors/MilvusDbVector.php
src/Plugin/AiDbVectors/MilvusDbVector.php
<?php
namespace Drupal\deepseek\Plugin\AiDbVectors;
use Drupal\Core\Config\Config;
use Drupal\Core\Database\Connection;
use Drupal\Core\Database\DatabaseExceptionWrapper;
use Drupal\Core\Entity\EntityTypeManagerInterface;
use Drupal\Core\Link;
use Drupal\Core\Logger\LoggerChannelFactoryInterface;
use Drupal\Core\Plugin\ContainerFactoryPluginInterface;
use Drupal\Core\StringTranslation\TranslatableMarkup;
use Drupal\deepseek\AiDbVectorsInterface;
use Drupal\deepseek\AiDbVectorsPluginManager;
use Drupal\deepseek\Attribute\AiDbVectors;
use HelgeSverre\Milvus\Milvus;
use Symfony\Component\DependencyInjection\ContainerInterface;
/**
* Plugin implementation of the AI database vectors.
*/
#[AiDbVectors(
id: 'milvus',
label: new TranslatableMarkup('Milvus'),
description: new TranslatableMarkup('Milvus database vector.'),
url: 'https://in03-somerandomstring.api.gcp-us-west1.zillizcloud.com',
port: '443',
db_name: 'default',
table_name: 'ai_embedding',
)]
class MilvusDbVector implements AiDbVectorsInterface, ContainerFactoryPluginInterface {
/**
* Constructs a milvus vector object.
*
* @param string $plugin_id
* The plugin_id for the formatter.
* @param mixed $plugin_definition
* The plugin implementation definition.
* @param mixed $field_definition
* The definition of the field to which the formatter is associated.
* @param \Drupal\Core\Database\Connection $connection
* The database connection service.
* @param \Drupal\Core\Logger\LoggerChannelFactoryInterface $logger
* The logger service.
* @param \Drupal\deepseek\AiDbVectorsPluginManager $vectors
* The vector database service.
* @param \Drupal\Core\Entity\EntityTypeManagerInterface $entityTypeManager
* The entity type manager interface.
* @param \Drupal\Core\Config\Config $configFactory
* The config factory service.
*/
public function __construct($plugin_id, $plugin_definition, $field_definition, protected Connection $connection, protected LoggerChannelFactoryInterface $logger, protected AiDbVectorsPluginManager $vectors, protected EntityTypeManagerInterface $entityTypeManager, protected Config $configFactory) {
unset($plugin_id, $plugin_definition, $field_definition);
}
/**
* {@inheritdoc}
*/
public static function create(ContainerInterface $container, $configuration, $plugin_id, $plugin_definition) {
return new static(
$plugin_id,
$plugin_definition,
$configuration,
$container->get('database'),
$container->get('logger.factory'),
$container->get('plugin.manager.ai_db_vectors'),
$container->get('entity_type.manager'),
$container->get('config.factory')->get('deepseek.settings'),
);
}
/**
* Create a Milvus instance.
*
* {@inheritdoc}
*/
protected function getMilvus() {
$token = $this->configFactory->get('token');
$host = $this->configFactory->get('url');
$port = $this->configFactory->get('port');
return new Milvus(
token: $token,
host: $host,
port: $port
);
}
/**
* {@inheritdoc}
*/
public function search(array $vectors, string $message, int $limit = 1): array {
try {
$milvus = $this->getMilvus();
$collectionName = $this->configFactory->get('table_name');
$searchResponse = $milvus->vector()->search(
collectionName: $collectionName,
vector: $vectors,
limit: $limit,
filter: "score > 0.8",
outputFields: ['delta', 'entity_id', 'entity_type'],
);
$links = [];
$results = [];
$data = $searchResponse->json()['data'] ?? [];
if (!empty($data)) {
$delta = (int) $data[0]['delta'];
$entityId = (int) $data[0]['entity_id'];
$filterId = "[" . $entityId . "]";
$filterType = "['" . $data[0]['entity_type'] . "']";
$filterDelta = $delta == 0 ? '[0, 1, 2, 3, 4 ,5]' : '[' . implode(',', range($delta - 5, $delta + 5)) . ']';
$filterData = "entity_id in " . $filterId . " and entity_type in " . $filterType . " and delta in " . $filterDelta;
$searchDelta = $milvus->vector()->query(
collectionName: $collectionName,
filter: $filterData,
outputFields: ['delta', 'content'],
);
// Output the search results.
$item = $searchDelta->json()['data'] ?? [];
usort($item, function ($a, $b) {
return $a['delta'] <=> $b['delta'];
});
foreach ($item as $result) {
$results[] = $result['content'];
}
foreach ($data as $d) {
$entity = $this->entityTypeManager->getStorage($d['entity_type'])->load($d['entity_id']);
if ($entity) {
$url = $entity->toUrl();
$url->setOption('attributes', ['target' => '_blank', 'rel' => 'noopener noreferrer']);
$title = $entity?->label() ?? $d['entity_type'];
$link = Link::fromTextAndUrl($title, $url)->toString();
$links[] = $link;
}
}
}
return [
'content' => $results,
'links' => array_unique($links),
];
}
catch (\Exception $e) {
$this->logger->get('embedding')->error('Error searching embedding: @error', ['@error' => $e->getMessage()]);
return [];
}
}
/**
* Delete vectors by entity type and ID.
*
* {@inheritdoc}
*/
public function delete(string $entityType, int $entityId): bool {
try {
$milvus = $this->getMilvus();
$collectionName = $this->configFactory->get('table_name');
// Search by entity_type và entity_id.
$searchResults = $milvus->vector()->query(
collectionName: $collectionName,
filter: "entity_type == '{$entityType}' && entity_id == {$entityId}",
outputFields: ['id']
);
if (empty($searchResults)) {
return FALSE;
}
if (is_array($searchResults)) {
$ids = array_map(fn($item) => $item['id'], $searchResults);
$milvus->vector()->delete($ids, $collectionName);
}
return TRUE;
}
catch (\Exception $e) {
$this->logger->get('embedding')->error('Error deleting embedding: @error', ['@error' => $e->getMessage()]);
return FALSE;
}
}
/**
* Check if an entity by entity type and ID.
*
* @param string $entityType
* The entity type.
* @param int $entityId
* The entity ID.
*
* @return bool
* TRUE if entity is existed, FALSE otherwise.
*/
public function exist(string $entityType, int $entityId): bool {
$milvus = $this->getMilvus();
$collectionName = $this->configFactory->get('table_name');
$search = $milvus->vector()->query(
collectionName: $collectionName,
filter: "entity_id in [" . $entityId . "] and entity_type in ['" . $entityType . "']",
outputFields: ['delta'],
);
$data = $search->json()['data'] ?? [];
return empty($data);
}
/**
* Insert a vector into Milvus.
*
* {@inheritdoc}
*/
public function insert(string $entityType, int $entityId, string $chunk, array $vectors, int $delta = 0) {
$milvus = $this->getMilvus();
$collectionName = $this->configFactory->get('table_name');
$dbName = $this->configFactory->get('db_name');
$collections = $milvus->collections()->list(dbName: $dbName)->json('data');
if ($collections && !in_array($collectionName, $collections)) {
$this->createTable($collectionName);
}
try {
$contents = [
'entity_type' => $entityType,
'entity_id' => $entityId,
'delta' => $delta,
'content' => $chunk,
'vector' => $vectors,
];
$milvus->vector()->insert(
collectionName: $collectionName,
data: $contents
);
return TRUE;
}
catch (\Exception $e) {
$this->logger->get('embedding')->error('Error inserting embedding: @error', ['@error' => $e->getMessage()]);
return FALSE;
}
}
/**
* Create the AI embedding table.
*
* @param mixed $table_name
* The collection name.
* @param int $dimension
* Vector dimensionality.
*
* @return bool
* TRUE if creation was successful, FALSE if it exists or fails.
*/
public function createTable($table_name = NULL, int $dimension = 786): bool {
$milvus = $this->getMilvus();
$collectionName = $table_name ?? 'ai_embedding';
try {
$milvus->collections()->create(
collectionName: $collectionName,
dimension: $dimension,
);
return TRUE;
}
catch (DatabaseExceptionWrapper $e) {
$this->logger->get('embedding')->error('Error inserting embedding: @error', ['@error' => $e->getMessage()]);
return FALSE;
}
}
}
