ai_upgrade_assistant-0.2.0-alpha2/src/Service/ParallelProcessingService.php

src/Service/ParallelProcessingService.php
<?php

namespace Drupal\ai_upgrade_assistant\Service;

use Drupal\Core\Config\ConfigFactoryInterface;
use Drupal\Core\Logger\LoggerChannelFactoryInterface;
use Symfony\Component\Process\Process;
use Symfony\Component\Process\PhpExecutableFinder;

/**
 * Service for handling parallel processing operations.
 */
class ParallelProcessingService {

  /**
   * The config factory.
   *
   * @var \Drupal\Core\Config\ConfigFactoryInterface
   */
  protected $configFactory;

  /**
   * The logger factory.
   *
   * @var \Drupal\Core\Logger\LoggerChannelFactoryInterface
   */
  protected $loggerFactory;

  /**
   * Worker pool for parallel processing.
   *
   * @var array
   */
  protected $workerPool = [];

  /**
   * Maximum number of parallel workers.
   *
   * @var int
   */
  protected $maxWorkers;

  /**
   * Constructs a new ParallelProcessingService.
   *
   * @param \Drupal\Core\Config\ConfigFactoryInterface $config_factory
   *   The config factory.
   * @param \Drupal\Core\Logger\LoggerChannelFactoryInterface $logger_factory
   *   The logger factory.
   */
  public function __construct(
    ConfigFactoryInterface $config_factory,
    LoggerChannelFactoryInterface $logger_factory
  ) {
    $this->configFactory = $config_factory;
    $this->loggerFactory = $logger_factory;
    $this->maxWorkers = $this->determineOptimalWorkerCount();
  }

  /**
   * Determines the optimal number of worker processes.
   *
   * @return int
   *   The optimal number of workers.
   */
  protected function determineOptimalWorkerCount() {
    $config = $this->configFactory->get('ai_upgrade_assistant.settings');
    $configuredWorkers = $config->get('parallel_workers');
    
    if ($configuredWorkers) {
      return $configuredWorkers;
    }

    // Default to number of CPU cores - 1, minimum 2
    $cpuCount = 4; // Default assumption
    if (function_exists('sys_getloadavg')) {
      $cpuCount = count(sys_getloadavg());
    }
    
    return max(2, $cpuCount - 1);
  }

  /**
   * Executes multiple tasks in parallel.
   *
   * @param array $tasks
   *   Array of callable tasks to execute.
   * @param array $context
   *   Context data for the tasks.
   *
   * @return array
   *   Results from all tasks.
   */
  public function executeTasks(array $tasks, array $context = []) {
    $results = [];
    $running = [];
    $taskQueue = $tasks;

    while (!empty($taskQueue) || !empty($running)) {
      // Start new tasks if worker slots available
      while (count($running) < $this->maxWorkers && !empty($taskQueue)) {
        $task = array_shift($taskQueue);
        $process = $this->startWorkerProcess($task, $context);
        $running[] = [
          'process' => $process,
          'task' => $task,
          'started' => microtime(true),
        ];
      }

      // Check running processes
      foreach ($running as $key => $worker) {
        $process = $worker['process'];
        if (!$process->isRunning()) {
          $output = $process->getOutput();
          $results[] = [
            'task' => $worker['task'],
            'output' => $output,
            'duration' => microtime(true) - $worker['started'],
            'exitCode' => $process->getExitCode(),
          ];
          unset($running[$key]);
        }
      }

      // Prevent CPU overload
      if (!empty($running)) {
        usleep(50000); // 50ms
      }
    }

    return $results;
  }

  /**
   * Starts a worker process for a task.
   *
   * @param callable $task
   *   The task to execute.
   * @param array $context
   *   Context data for the task.
   *
   * @return \Symfony\Component\Process\Process
   *   The created process.
   */
  protected function startWorkerProcess($task, array $context) {
    $phpBinary = (new PhpExecutableFinder())->find();
    
    // Create isolated process
    $process = new Process([
      $phpBinary,
      '-r',
      $this->generateWorkerCode($task, $context),
    ]);

    $process->start();
    return $process;
  }

  /**
   * Generates PHP code for worker process.
   *
   * @param callable $task
   *   The task to execute.
   * @param array $context
   *   Context data for the task.
   *
   * @return string
   *   PHP code for worker process.
   */
  protected function generateWorkerCode($task, array $context) {
    // Implementation depends on task type
    // This is a simplified example
    return sprintf(
      'require_once "%s"; $result = call_user_func(%s, %s); echo json_encode($result);',
      __DIR__ . '/../../autoload.php',
      var_export($task, true),
      var_export($context, true)
    );
  }

  /**
   * Monitors system resources.
   *
   * @return array
   *   System resource statistics.
   */
  public function getResourceStats() {
    $stats = [
      'memory_usage' => memory_get_usage(true),
      'peak_memory' => memory_get_peak_usage(true),
      'worker_count' => count($this->workerPool),
      'max_workers' => $this->maxWorkers,
    ];

    if (function_exists('sys_getloadavg')) {
      $stats['load_average'] = sys_getloadavg();
    }

    return $stats;
  }

}

Главная | Обратная связь

drupal hosting | друпал хостинг | it patrol .inc