ai_upgrade_assistant-0.2.0-alpha2/src/Service/ParallelProcessingService.php
src/Service/ParallelProcessingService.php
<?php
namespace Drupal\ai_upgrade_assistant\Service;
use Drupal\Core\Config\ConfigFactoryInterface;
use Drupal\Core\Logger\LoggerChannelFactoryInterface;
use Symfony\Component\Process\Process;
use Symfony\Component\Process\PhpExecutableFinder;
/**
* Service for handling parallel processing operations.
*/
class ParallelProcessingService {
/**
* The config factory.
*
* @var \Drupal\Core\Config\ConfigFactoryInterface
*/
protected $configFactory;
/**
* The logger factory.
*
* @var \Drupal\Core\Logger\LoggerChannelFactoryInterface
*/
protected $loggerFactory;
/**
* Worker pool for parallel processing.
*
* @var array
*/
protected $workerPool = [];
/**
* Maximum number of parallel workers.
*
* @var int
*/
protected $maxWorkers;
/**
* Constructs a new ParallelProcessingService.
*
* @param \Drupal\Core\Config\ConfigFactoryInterface $config_factory
* The config factory.
* @param \Drupal\Core\Logger\LoggerChannelFactoryInterface $logger_factory
* The logger factory.
*/
public function __construct(
ConfigFactoryInterface $config_factory,
LoggerChannelFactoryInterface $logger_factory
) {
$this->configFactory = $config_factory;
$this->loggerFactory = $logger_factory;
$this->maxWorkers = $this->determineOptimalWorkerCount();
}
/**
* Determines the optimal number of worker processes.
*
* @return int
* The optimal number of workers.
*/
protected function determineOptimalWorkerCount() {
$config = $this->configFactory->get('ai_upgrade_assistant.settings');
$configuredWorkers = $config->get('parallel_workers');
if ($configuredWorkers) {
return $configuredWorkers;
}
// Default to number of CPU cores - 1, minimum 2
$cpuCount = 4; // Default assumption
if (function_exists('sys_getloadavg')) {
$cpuCount = count(sys_getloadavg());
}
return max(2, $cpuCount - 1);
}
/**
* Executes multiple tasks in parallel.
*
* @param array $tasks
* Array of callable tasks to execute.
* @param array $context
* Context data for the tasks.
*
* @return array
* Results from all tasks.
*/
public function executeTasks(array $tasks, array $context = []) {
$results = [];
$running = [];
$taskQueue = $tasks;
while (!empty($taskQueue) || !empty($running)) {
// Start new tasks if worker slots available
while (count($running) < $this->maxWorkers && !empty($taskQueue)) {
$task = array_shift($taskQueue);
$process = $this->startWorkerProcess($task, $context);
$running[] = [
'process' => $process,
'task' => $task,
'started' => microtime(true),
];
}
// Check running processes
foreach ($running as $key => $worker) {
$process = $worker['process'];
if (!$process->isRunning()) {
$output = $process->getOutput();
$results[] = [
'task' => $worker['task'],
'output' => $output,
'duration' => microtime(true) - $worker['started'],
'exitCode' => $process->getExitCode(),
];
unset($running[$key]);
}
}
// Prevent CPU overload
if (!empty($running)) {
usleep(50000); // 50ms
}
}
return $results;
}
/**
* Starts a worker process for a task.
*
* @param callable $task
* The task to execute.
* @param array $context
* Context data for the task.
*
* @return \Symfony\Component\Process\Process
* The created process.
*/
protected function startWorkerProcess($task, array $context) {
$phpBinary = (new PhpExecutableFinder())->find();
// Create isolated process
$process = new Process([
$phpBinary,
'-r',
$this->generateWorkerCode($task, $context),
]);
$process->start();
return $process;
}
/**
* Generates PHP code for worker process.
*
* @param callable $task
* The task to execute.
* @param array $context
* Context data for the task.
*
* @return string
* PHP code for worker process.
*/
protected function generateWorkerCode($task, array $context) {
// Implementation depends on task type
// This is a simplified example
return sprintf(
'require_once "%s"; $result = call_user_func(%s, %s); echo json_encode($result);',
__DIR__ . '/../../autoload.php',
var_export($task, true),
var_export($context, true)
);
}
/**
* Monitors system resources.
*
* @return array
* System resource statistics.
*/
public function getResourceStats() {
$stats = [
'memory_usage' => memory_get_usage(true),
'peak_memory' => memory_get_peak_usage(true),
'worker_count' => count($this->workerPool),
'max_workers' => $this->maxWorkers,
];
if (function_exists('sys_getloadavg')) {
$stats['load_average'] = sys_getloadavg();
}
return $stats;
}
}
