wse-1.0.x-dev/modules/wse_task_monitor/src/Controller/TaskMonitorController.php

modules/wse_task_monitor/src/Controller/TaskMonitorController.php
<?php

declare(strict_types=1);

namespace Drupal\wse_task_monitor\Controller;

use Drupal\Core\Controller\ControllerBase;
use Drupal\Component\Datetime\TimeInterface;
use Drupal\Core\Logger\LoggerChannelInterface;
use Drupal\wse_task_monitor\WorkspaceTaskRepositoryInterface;
use Drupal\wse_task_monitor\TaskStatus;
use Symfony\Component\DependencyInjection\ContainerInterface;
use Symfony\Component\HttpFoundation\EventStreamResponse;
use Symfony\Component\HttpFoundation\ServerEvent;

/**
 * Provides endpoints for task monitor data.
 */
class TaskMonitorController extends ControllerBase {

  /**
   * Padding size in bytes to overcome Apache buffering (32KB).
   */
  private const PADDING_SIZE = 32768;

  /**
   * Maximum SSE connection lifetime in seconds (5 minutes).
   */
  private const MAX_CONNECTION_LIFETIME = 300;

  /**
   * Number of iterations to wait before shutdown after tasks complete.
   */
  private const SHUTDOWN_DELAY_ITERATIONS = 5;

  /**
   * Idle timeout in seconds before closing connection with no tasks.
   */
  private const IDLE_TIMEOUT_SECONDS = 60;

  /**
   * Heartbeat interval in seconds for keepalive messages.
   */
  private const HEARTBEAT_INTERVAL_SECONDS = 10;

  public function __construct(
    protected readonly WorkspaceTaskRepositoryInterface $repository,
    protected readonly TimeInterface $time,
    protected readonly LoggerChannelInterface $logger,
  ) {}

  /**
   * {@inheritdoc}
   */
  public static function create(ContainerInterface $container) {
    return new static(
      $container->get(WorkspaceTaskRepositoryInterface::class),
      $container->get('datetime.time'),
      $container->get('logger.channel.workspaces'),
    );
  }

  /**
   * Determines if padding is needed to overcome server buffering.
   *
   * @return bool
   *   TRUE if the server configuration requires padding for SSE.
   */
  private function needsPaddingForBuffering(): bool {
    // Check SAPI for Apache and FPM indicators.
    $sapi = PHP_SAPI;
    if (in_array($sapi, ['apache2handler', 'fpm-fcgi', 'cgi-fcgi'], TRUE)) {
      return TRUE;
    }

    // Check server software as fallback.
    if (isset($_SERVER['SERVER_SOFTWARE']) &&
        stripos($_SERVER['SERVER_SOFTWARE'], 'apache') !== FALSE) {
      return TRUE;
    }

    return FALSE;
  }

  /**
   * Returns a Server-Sent Events stream for all task monitoring.
   *
   * @return \Symfony\Component\HttpFoundation\EventStreamResponse
   *   The SSE response stream.
   */
  public function getTaskStream(): EventStreamResponse {
    // Simple output buffer cleanup.
    if (ob_get_level()) {
      ob_end_clean();
    }

    // Create response with explicit streaming configuration.
    $response = new EventStreamResponse(function (): \Generator {
      // Determine if padding is needed based on server configuration.
      $padding = $this->needsPaddingForBuffering() ? str_repeat(' ', self::PADDING_SIZE) : '';

      // Send initial connection event with conditional padding.
      yield new ServerEvent(
        data: json_encode(['message' => 'Task monitoring connected']) . $padding,
        type: 'connected'
      );

      $sentTasks = [];
      $lastProgressSent = [];
      $iteration = 0;
      $startTime = $this->time->getCurrentTime();
      $lastHeartbeat = $this->time->getCurrentTime();
      $noTasksIterations = 0;
      $maxConnectionLifetime = self::MAX_CONNECTION_LIFETIME;
      $hadTasksBefore = FALSE;

      while (!connection_aborted()) {
        try {
          // Get all active tasks..
          $allTasks = $this->repository->findAll();

          // Track task statuses before refresh to detect completions.
          $taskStatusBefore = [];
          foreach ($allTasks as $task_id => $task) {
            $taskStatusBefore[$task_id] = $task->getStatus();
          }

          // Refresh progress for all tasks.
          foreach ($allTasks as $task) {
            try {
              $task->refreshProgress();
            }
            catch (\Exception $e) {
              // Log error but continue processing other tasks.
              $this->logger->error('Failed to refresh progress for task @id: @error', [
                '@id' => $task->getId(),
                '@error' => $e->getMessage(),
              ]);
            }
          }

          // Detect newly completed tasks and send completion events.
          foreach ($allTasks as $task_id => $task) {
            $statusBefore = $taskStatusBefore[$task_id];
            $statusAfter = $task->getStatus();

            // Save task if status changed during refresh.
            if ($statusBefore !== $statusAfter) {
              $this->repository->save($task);
            }

            // If task just completed during refresh, send completion event.
            if (!$statusBefore->isFinished() && $statusAfter->isFinished()) {
              $completionData = [
                'taskId' => $task_id,
                'percentage' => 100,
                'message' => $task->getMessage() ?: 'Task completed',
                'is_failed' => $statusAfter === TaskStatus::Failed,
              ];

              yield new ServerEvent(
                data: json_encode($completionData) . $padding,
                type: 'task-complete'
              );

              // Remove from tracking.
              unset($lastProgressSent[$task_id]);
              $sentTasks = array_diff($sentTasks, [$task_id]);
            }
          }

          $activeTasks = array_filter($allTasks, function ($task) {
            return !$task->getStatus()->isFinished();
          });

          // Check maximum connection lifetime after getting active tasks.
          if (($this->time->getCurrentTime() - $startTime) > $maxConnectionLifetime) {
            // Send different event based on whether tasks are active.
            if (!empty($activeTasks)) {
              // Connection timeout with active tasks - client should reconnect.
              yield new ServerEvent(
                data: json_encode(['message' => 'Connection timeout - reconnecting']) . $padding,
                type: 'connection-timeout'
              );
            }
            else {
              // Connection timeout with no active tasks - safe to close.
              yield new ServerEvent(
                data: json_encode(['message' => 'Connection timeout - no active tasks']) . $padding,
                type: 'idle-timeout'
              );
            }
            break;
          }

          // Send new task events for tasks we haven't seen before..
          foreach ($activeTasks as $task_id => $task) {
            if (!in_array($task_id, $sentTasks)) {
              $newTask = [
                'id' => $task_id,
                'label' => $task->getLabel(),
                'workspace_id' => $task->getWorkspaceId(),
                'status' => $task->getStatus()->value,
                'status_label' => $task->getStatus()->getLabel(),
                'progress' => $task->getProgress(),
                'message' => $task->getMessage() ?: 'Initializing...',
              ];

              yield new ServerEvent(
                data: json_encode($newTask) . $padding,
                type: 'new-task'
              );

              $sentTasks[] = $task_id;
              // Initialize progress tracking.
              $lastProgressSent[$task_id] = -1;
            }
          }

          // Send progress updates for active tasks..
          foreach ($activeTasks as $task_id => $task) {
            $status = $task->getStatus();
            $currentProgress = $task->getProgress();

            // Only send update if progress changed.
            if (!isset($lastProgressSent[$task_id]) || $currentProgress !== $lastProgressSent[$task_id]) {
              $progressData = [
                'taskId' => $task_id,
                'percentage' => $currentProgress,
                'message' => $task->getMessage() ?: $status->getLabel(),
                'status' => $status->isFinished() ? '100' : (string) $currentProgress,
                'status_label' => $status->getLabel(),
                'is_finished' => $status->isFinished(),
                'is_failed' => $status === TaskStatus::Failed,
                'iteration' => $iteration,
              ];

              yield new ServerEvent(
                data: json_encode($progressData) . $padding,
                type: 'task-progress'
              );

              $lastProgressSent[$task_id] = $currentProgress;

              // If task is finished, send completion event.
              if ($status->isFinished()) {
                $completionData = [
                  'taskId' => $task_id,
                  'percentage' => 100,
                  'message' => $task->getMessage() ?: 'Task completed',
                  'is_failed' => $status === TaskStatus::Failed,
                ];

                yield new ServerEvent(
                  data: json_encode($completionData) . $padding,
                  type: 'task-complete'
                );

                // Remove from tracking.
                unset($lastProgressSent[$task_id]);
                $sentTasks = array_diff($sentTasks, [$task_id]);
              }
            }
          }

          // Handle heartbeat and graceful shutdown.
          $currentTime = $this->time->getCurrentTime();

          // Track iterations with no active tasks for graceful shutdown.
          if (empty($activeTasks)) {
            $noTasksIterations++;
            // If we had tasks before and now they're all done, shut down
            // after 5 seconds.
            if ($hadTasksBefore && $noTasksIterations >= self::SHUTDOWN_DELAY_ITERATIONS) {
              yield new ServerEvent(
                data: json_encode(['message' => 'All tasks completed']) . $padding,
                type: 'all-complete'
              );

              return;
            }

            // If no tasks for 60 seconds from the start, shut down.
            // This is a true idle timeout - no tasks to monitor.
            if (!$hadTasksBefore && $noTasksIterations >= self::IDLE_TIMEOUT_SECONDS) {
              yield new ServerEvent(
                data: json_encode(['message' => 'No active tasks - closing connection']) . $padding,
                type: 'idle-timeout'
              );

              return;
            }
          }
          else {
            $hadTasksBefore = TRUE;
            $noTasksIterations = 0;
            // Reset heartbeat when we have activity.
            $lastHeartbeat = $currentTime;
          }

          // Send heartbeat if no activity for 10 seconds.
          if (($currentTime - $lastHeartbeat) >= self::HEARTBEAT_INTERVAL_SECONDS) {
            yield new ServerEvent(
              data: json_encode(['timestamp' => $currentTime, 'active_tasks' => count($activeTasks)]) . $padding,
              type: 'heartbeat'
            );

            $lastHeartbeat = $currentTime;
          }

          // Wait 1 second before next check.
          sleep(1);
          $iteration++;

        }
        catch (\Exception $e) {
          // Log and send error.
          $this->logger->error('Exception in SSE stream: @message', [
            '@message' => $e->getMessage(),
          ]);

          yield new ServerEvent(
            data: json_encode(['error' => 'Server error: ' . $e->getMessage()]) . $padding,
            type: 'error'
          );

          break;
        }
      }
    });

    return $response;
  }

}

Главная | Обратная связь

drupal hosting | друпал хостинг | it patrol .inc