wse-1.0.x-dev/modules/wse_task_monitor/src/Controller/TaskMonitorController.php
modules/wse_task_monitor/src/Controller/TaskMonitorController.php
<?php
declare(strict_types=1);
namespace Drupal\wse_task_monitor\Controller;
use Drupal\Core\Controller\ControllerBase;
use Drupal\Component\Datetime\TimeInterface;
use Drupal\Core\Logger\LoggerChannelInterface;
use Drupal\wse_task_monitor\WorkspaceTaskRepositoryInterface;
use Drupal\wse_task_monitor\TaskStatus;
use Symfony\Component\DependencyInjection\ContainerInterface;
use Symfony\Component\HttpFoundation\EventStreamResponse;
use Symfony\Component\HttpFoundation\ServerEvent;
/**
* Provides endpoints for task monitor data.
*/
class TaskMonitorController extends ControllerBase {
/**
* Padding size in bytes to overcome Apache buffering (32KB).
*/
private const PADDING_SIZE = 32768;
/**
* Maximum SSE connection lifetime in seconds (5 minutes).
*/
private const MAX_CONNECTION_LIFETIME = 300;
/**
* Number of iterations to wait before shutdown after tasks complete.
*/
private const SHUTDOWN_DELAY_ITERATIONS = 5;
/**
* Idle timeout in seconds before closing connection with no tasks.
*/
private const IDLE_TIMEOUT_SECONDS = 60;
/**
* Heartbeat interval in seconds for keepalive messages.
*/
private const HEARTBEAT_INTERVAL_SECONDS = 10;
public function __construct(
protected readonly WorkspaceTaskRepositoryInterface $repository,
protected readonly TimeInterface $time,
protected readonly LoggerChannelInterface $logger,
) {}
/**
* {@inheritdoc}
*/
public static function create(ContainerInterface $container) {
return new static(
$container->get(WorkspaceTaskRepositoryInterface::class),
$container->get('datetime.time'),
$container->get('logger.channel.workspaces'),
);
}
/**
* Determines if padding is needed to overcome server buffering.
*
* @return bool
* TRUE if the server configuration requires padding for SSE.
*/
private function needsPaddingForBuffering(): bool {
// Check SAPI for Apache and FPM indicators.
$sapi = PHP_SAPI;
if (in_array($sapi, ['apache2handler', 'fpm-fcgi', 'cgi-fcgi'], TRUE)) {
return TRUE;
}
// Check server software as fallback.
if (isset($_SERVER['SERVER_SOFTWARE']) &&
stripos($_SERVER['SERVER_SOFTWARE'], 'apache') !== FALSE) {
return TRUE;
}
return FALSE;
}
/**
* Returns a Server-Sent Events stream for all task monitoring.
*
* @return \Symfony\Component\HttpFoundation\EventStreamResponse
* The SSE response stream.
*/
public function getTaskStream(): EventStreamResponse {
// Simple output buffer cleanup.
if (ob_get_level()) {
ob_end_clean();
}
// Create response with explicit streaming configuration.
$response = new EventStreamResponse(function (): \Generator {
// Determine if padding is needed based on server configuration.
$padding = $this->needsPaddingForBuffering() ? str_repeat(' ', self::PADDING_SIZE) : '';
// Send initial connection event with conditional padding.
yield new ServerEvent(
data: json_encode(['message' => 'Task monitoring connected']) . $padding,
type: 'connected'
);
$sentTasks = [];
$lastProgressSent = [];
$iteration = 0;
$startTime = $this->time->getCurrentTime();
$lastHeartbeat = $this->time->getCurrentTime();
$noTasksIterations = 0;
$maxConnectionLifetime = self::MAX_CONNECTION_LIFETIME;
$hadTasksBefore = FALSE;
while (!connection_aborted()) {
try {
// Get all active tasks..
$allTasks = $this->repository->findAll();
// Track task statuses before refresh to detect completions.
$taskStatusBefore = [];
foreach ($allTasks as $task_id => $task) {
$taskStatusBefore[$task_id] = $task->getStatus();
}
// Refresh progress for all tasks.
foreach ($allTasks as $task) {
try {
$task->refreshProgress();
}
catch (\Exception $e) {
// Log error but continue processing other tasks.
$this->logger->error('Failed to refresh progress for task @id: @error', [
'@id' => $task->getId(),
'@error' => $e->getMessage(),
]);
}
}
// Detect newly completed tasks and send completion events.
foreach ($allTasks as $task_id => $task) {
$statusBefore = $taskStatusBefore[$task_id];
$statusAfter = $task->getStatus();
// Save task if status changed during refresh.
if ($statusBefore !== $statusAfter) {
$this->repository->save($task);
}
// If task just completed during refresh, send completion event.
if (!$statusBefore->isFinished() && $statusAfter->isFinished()) {
$completionData = [
'taskId' => $task_id,
'percentage' => 100,
'message' => $task->getMessage() ?: 'Task completed',
'is_failed' => $statusAfter === TaskStatus::Failed,
];
yield new ServerEvent(
data: json_encode($completionData) . $padding,
type: 'task-complete'
);
// Remove from tracking.
unset($lastProgressSent[$task_id]);
$sentTasks = array_diff($sentTasks, [$task_id]);
}
}
$activeTasks = array_filter($allTasks, function ($task) {
return !$task->getStatus()->isFinished();
});
// Check maximum connection lifetime after getting active tasks.
if (($this->time->getCurrentTime() - $startTime) > $maxConnectionLifetime) {
// Send different event based on whether tasks are active.
if (!empty($activeTasks)) {
// Connection timeout with active tasks - client should reconnect.
yield new ServerEvent(
data: json_encode(['message' => 'Connection timeout - reconnecting']) . $padding,
type: 'connection-timeout'
);
}
else {
// Connection timeout with no active tasks - safe to close.
yield new ServerEvent(
data: json_encode(['message' => 'Connection timeout - no active tasks']) . $padding,
type: 'idle-timeout'
);
}
break;
}
// Send new task events for tasks we haven't seen before..
foreach ($activeTasks as $task_id => $task) {
if (!in_array($task_id, $sentTasks)) {
$newTask = [
'id' => $task_id,
'label' => $task->getLabel(),
'workspace_id' => $task->getWorkspaceId(),
'status' => $task->getStatus()->value,
'status_label' => $task->getStatus()->getLabel(),
'progress' => $task->getProgress(),
'message' => $task->getMessage() ?: 'Initializing...',
];
yield new ServerEvent(
data: json_encode($newTask) . $padding,
type: 'new-task'
);
$sentTasks[] = $task_id;
// Initialize progress tracking.
$lastProgressSent[$task_id] = -1;
}
}
// Send progress updates for active tasks..
foreach ($activeTasks as $task_id => $task) {
$status = $task->getStatus();
$currentProgress = $task->getProgress();
// Only send update if progress changed.
if (!isset($lastProgressSent[$task_id]) || $currentProgress !== $lastProgressSent[$task_id]) {
$progressData = [
'taskId' => $task_id,
'percentage' => $currentProgress,
'message' => $task->getMessage() ?: $status->getLabel(),
'status' => $status->isFinished() ? '100' : (string) $currentProgress,
'status_label' => $status->getLabel(),
'is_finished' => $status->isFinished(),
'is_failed' => $status === TaskStatus::Failed,
'iteration' => $iteration,
];
yield new ServerEvent(
data: json_encode($progressData) . $padding,
type: 'task-progress'
);
$lastProgressSent[$task_id] = $currentProgress;
// If task is finished, send completion event.
if ($status->isFinished()) {
$completionData = [
'taskId' => $task_id,
'percentage' => 100,
'message' => $task->getMessage() ?: 'Task completed',
'is_failed' => $status === TaskStatus::Failed,
];
yield new ServerEvent(
data: json_encode($completionData) . $padding,
type: 'task-complete'
);
// Remove from tracking.
unset($lastProgressSent[$task_id]);
$sentTasks = array_diff($sentTasks, [$task_id]);
}
}
}
// Handle heartbeat and graceful shutdown.
$currentTime = $this->time->getCurrentTime();
// Track iterations with no active tasks for graceful shutdown.
if (empty($activeTasks)) {
$noTasksIterations++;
// If we had tasks before and now they're all done, shut down
// after 5 seconds.
if ($hadTasksBefore && $noTasksIterations >= self::SHUTDOWN_DELAY_ITERATIONS) {
yield new ServerEvent(
data: json_encode(['message' => 'All tasks completed']) . $padding,
type: 'all-complete'
);
return;
}
// If no tasks for 60 seconds from the start, shut down.
// This is a true idle timeout - no tasks to monitor.
if (!$hadTasksBefore && $noTasksIterations >= self::IDLE_TIMEOUT_SECONDS) {
yield new ServerEvent(
data: json_encode(['message' => 'No active tasks - closing connection']) . $padding,
type: 'idle-timeout'
);
return;
}
}
else {
$hadTasksBefore = TRUE;
$noTasksIterations = 0;
// Reset heartbeat when we have activity.
$lastHeartbeat = $currentTime;
}
// Send heartbeat if no activity for 10 seconds.
if (($currentTime - $lastHeartbeat) >= self::HEARTBEAT_INTERVAL_SECONDS) {
yield new ServerEvent(
data: json_encode(['timestamp' => $currentTime, 'active_tasks' => count($activeTasks)]) . $padding,
type: 'heartbeat'
);
$lastHeartbeat = $currentTime;
}
// Wait 1 second before next check.
sleep(1);
$iteration++;
}
catch (\Exception $e) {
// Log and send error.
$this->logger->error('Exception in SSE stream: @message', [
'@message' => $e->getMessage(),
]);
yield new ServerEvent(
data: json_encode(['error' => 'Server error: ' . $e->getMessage()]) . $padding,
type: 'error'
);
break;
}
}
});
return $response;
}
}
