From 8d4f5f16cb00f90cba047902f11a3d4a5c7c7ec4 Mon Sep 17 00:00:00 2001 From: Anthony Stirling <77850077+Frooodle@users.noreply.github.com.> Date: Tue, 3 Jun 2025 01:18:07 +0100 Subject: [PATCH] Queue, timeouts, retries, tests! --- .../annotations/AutoJobPostMapping.java | 28 ++ .../software/common/aop/AutoJobAspect.java | 168 ++++++- .../common/controller/JobController.java | 1 - .../common/service/JobExecutorService.java | 119 ++++- .../software/common/service/JobQueue.java | 441 ++++++++++++++++++ .../common/service/ResourceMonitor.java | 277 +++++++++++ .../AutoJobPostMappingIntegrationTest.java | 214 +++++++++ .../service/JobExecutorServiceTest.java | 208 +++++++++ .../software/common/service/JobQueueTest.java | 107 +++++ .../common/service/ResourceMonitorTest.java | 137 ++++++ docs/AutoJobPostMapping.md | 168 +++++++ 11 files changed, 1847 insertions(+), 21 deletions(-) create mode 100644 common/src/main/java/stirling/software/common/service/JobQueue.java create mode 100644 common/src/main/java/stirling/software/common/service/ResourceMonitor.java create mode 100644 common/src/test/java/stirling/software/common/annotations/AutoJobPostMappingIntegrationTest.java create mode 100644 common/src/test/java/stirling/software/common/service/JobExecutorServiceTest.java create mode 100644 common/src/test/java/stirling/software/common/service/JobQueueTest.java create mode 100644 common/src/test/java/stirling/software/common/service/ResourceMonitorTest.java create mode 100644 docs/AutoJobPostMapping.md diff --git a/common/src/main/java/stirling/software/common/annotations/AutoJobPostMapping.java b/common/src/main/java/stirling/software/common/annotations/AutoJobPostMapping.java index 41793da82..755f143aa 100644 --- a/common/src/main/java/stirling/software/common/annotations/AutoJobPostMapping.java +++ b/common/src/main/java/stirling/software/common/annotations/AutoJobPostMapping.java @@ -16,4 +16,32 @@ public @interface AutoJobPostMapping { @AliasFor(annotation = RequestMapping.class, attribute = "consumes") String[] consumes() default {"multipart/form-data"}; + + /** + * Custom timeout in milliseconds for this specific job. If not specified, the default system + * timeout will be used. + */ + long timeout() default -1; + + /** Maximum number of times to retry the job on failure. Default is 1 (no retries). */ + int retryCount() default 1; + + /** + * Whether to track and report progress for this job. If enabled, the job will send progress + * updates through WebSocket. + */ + boolean trackProgress() default true; + + /** + * Whether this job can be queued when system resources are limited. If enabled, jobs will be + * queued instead of rejected when the system is under high load. The queue size is dynamically + * adjusted based on available memory and CPU resources. + */ + boolean queueable() default false; + + /** + * Optional resource weight of this job (1-100). Higher values indicate more resource-intensive + * jobs that may need stricter queuing. Default is 50 (medium weight). + */ + int resourceWeight() default 50; } diff --git a/common/src/main/java/stirling/software/common/aop/AutoJobAspect.java b/common/src/main/java/stirling/software/common/aop/AutoJobAspect.java index 6bb4bcc2c..a948b6c64 100644 --- a/common/src/main/java/stirling/software/common/aop/AutoJobAspect.java +++ b/common/src/main/java/stirling/software/common/aop/AutoJobAspect.java @@ -1,6 +1,8 @@ package stirling.software.common.aop; import java.io.IOException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.annotation.*; @@ -13,7 +15,9 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import stirling.software.common.annotations.AutoJobPostMapping; +import stirling.software.common.controller.WebSocketProgressController; import stirling.software.common.model.api.PDFFile; +import stirling.software.common.model.job.JobProgress; import stirling.software.common.service.FileOrUploadService; import stirling.software.common.service.FileStorage; import stirling.software.common.service.JobExecutorService; @@ -28,15 +32,26 @@ public class AutoJobAspect { private final HttpServletRequest request; private final FileOrUploadService fileOrUploadService; private final FileStorage fileStorage; + private final WebSocketProgressController webSocketSender; @Around("@annotation(autoJobPostMapping)") public Object wrapWithJobExecution( ProceedingJoinPoint joinPoint, AutoJobPostMapping autoJobPostMapping) { + // Extract parameters from the request and annotation boolean async = Boolean.parseBoolean(request.getParameter("async")); + long timeout = autoJobPostMapping.timeout(); + int retryCount = autoJobPostMapping.retryCount(); + boolean trackProgress = autoJobPostMapping.trackProgress(); + + log.debug( + "AutoJobPostMapping execution with async={}, timeout={}, retryCount={}, trackProgress={}", + async, + timeout > 0 ? timeout : "default", + retryCount, + trackProgress); // Inspect and possibly mutate arguments Object[] args = joinPoint.getArgs(); - boolean isAsyncRequest = async; for (int i = 0; i < args.length; i++) { Object arg = args[i]; @@ -54,7 +69,7 @@ public class AutoJobAspect { } } // Case 2: For async requests, we need to make a copy of the MultipartFile - else if (isAsyncRequest && pdfFile.getFileInput() != null) { + else if (async && pdfFile.getFileInput() != null) { try { log.debug("Making persistent copy of uploaded file for async processing"); MultipartFile originalFile = pdfFile.getFileInput(); @@ -76,20 +91,159 @@ public class AutoJobAspect { } } - // Wrap job execution + // Extract queueable and resourceWeight parameters + boolean queueable = autoJobPostMapping.queueable(); + int resourceWeight = autoJobPostMapping.resourceWeight(); + + // Integrate with the enhanced JobExecutorService + if (retryCount <= 1) { + // No retries needed, simple execution + return jobExecutorService.runJobGeneric( + async, + () -> { + try { + if (trackProgress && async) { + String jobId = (String) request.getAttribute("jobId"); + if (jobId != null) { + webSocketSender.sendProgress( + jobId, + new JobProgress( + jobId, + "Processing", + 50, + "Executing operation")); + } + } + return joinPoint.proceed(args); + } catch (Throwable ex) { + log.error( + "AutoJobAspect caught exception during job execution: {}", + ex.getMessage(), + ex); + throw new RuntimeException(ex); + } + }, + timeout, + queueable, + resourceWeight); + } else { + // Use retry logic + return executeWithRetries( + joinPoint, + args, + async, + timeout, + retryCount, + trackProgress, + queueable, + resourceWeight); + } + } + + private Object executeWithRetries( + ProceedingJoinPoint joinPoint, + Object[] args, + boolean async, + long timeout, + int maxRetries, + boolean trackProgress, + boolean queueable, + int resourceWeight) { + + AtomicInteger attempts = new AtomicInteger(0); + // Use AtomicReference to make the jobId effectively final for lambda usage + AtomicReference jobIdRef = new AtomicReference<>(); + return jobExecutorService.runJobGeneric( async, () -> { + int currentAttempt = attempts.incrementAndGet(); try { + if (trackProgress && async) { + // Only get the jobId once and store it in the AtomicReference + if (jobIdRef.get() == null) { + jobIdRef.set(getJobIdFromContext()); + } + String jobId = jobIdRef.get(); + if (jobId != null) { + webSocketSender.sendProgress( + jobId, + new JobProgress( + jobId, + "Started", + 0, + "Executing (attempt " + + currentAttempt + + " of " + + Math.max(1, maxRetries) + + ")")); + } + } + return joinPoint.proceed(args); } catch (Throwable ex) { log.error( - "AutoJobAspect caught exception during job execution: {}", + "AutoJobAspect caught exception during job execution (attempt {}/{}): {}", + currentAttempt, + Math.max(1, maxRetries), ex.getMessage(), ex); - // Ensure we wrap the exception but preserve the original message - throw new RuntimeException(ex); + + // Check if we should retry + if (currentAttempt < maxRetries) { + log.info( + "Retrying operation, attempt {}/{}", + currentAttempt + 1, + maxRetries); + String jobId = jobIdRef.get(); + if (trackProgress && async && jobId != null) { + webSocketSender.sendProgress( + jobId, + new JobProgress( + jobId, + "Retrying", + (int) (currentAttempt * 100.0 / maxRetries), + "Retry attempt " + + (currentAttempt + 1) + + " of " + + maxRetries)); + } + + try { + // Simple exponential backoff + Thread.sleep(100 * currentAttempt); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } + + // Recursive call to retry + return executeWithRetries( + joinPoint, + args, + async, + timeout, + maxRetries, + trackProgress, + queueable, + resourceWeight); + } + + // No more retries, throw the exception + throw new RuntimeException("Job failed: " + ex.getMessage(), ex); } - }); + }, + timeout, + queueable, + resourceWeight); + } + + // Try to get the job ID from the context (if this is an async job) + private String getJobIdFromContext() { + try { + return (String) request.getAttribute("jobId"); + } catch (Exception e) { + log.debug("Could not retrieve job ID from context: {}", e.getMessage()); + return null; + } } } diff --git a/common/src/main/java/stirling/software/common/controller/JobController.java b/common/src/main/java/stirling/software/common/controller/JobController.java index 5538045f5..a08d97575 100644 --- a/common/src/main/java/stirling/software/common/controller/JobController.java +++ b/common/src/main/java/stirling/software/common/controller/JobController.java @@ -11,7 +11,6 @@ import org.springframework.web.bind.annotation.RestController; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import stirling.software.common.annotations.AutoJobPostMapping; import stirling.software.common.model.job.JobResult; import stirling.software.common.model.job.JobStats; import stirling.software.common.service.FileStorage; diff --git a/common/src/main/java/stirling/software/common/service/JobExecutorService.java b/common/src/main/java/stirling/software/common/service/JobExecutorService.java index a6b8d1e47..e4ebb2561 100644 --- a/common/src/main/java/stirling/software/common/service/JobExecutorService.java +++ b/common/src/main/java/stirling/software/common/service/JobExecutorService.java @@ -3,6 +3,7 @@ package stirling.software.common.service; import java.io.IOException; import java.util.Map; import java.util.UUID; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -16,6 +17,8 @@ import org.springframework.http.ResponseEntity; import org.springframework.stereotype.Service; import org.springframework.web.multipart.MultipartFile; +import jakarta.servlet.http.HttpServletRequest; + import lombok.extern.slf4j.Slf4j; import stirling.software.common.controller.WebSocketProgressController; @@ -30,6 +33,9 @@ public class JobExecutorService { private final TaskManager taskManager; private final WebSocketProgressController webSocketSender; private final FileStorage fileStorage; + private final HttpServletRequest request; + private final ResourceMonitor resourceMonitor; + private final JobQueue jobQueue; private final ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor(); private final long effectiveTimeoutMs; @@ -37,11 +43,17 @@ public class JobExecutorService { TaskManager taskManager, WebSocketProgressController webSocketSender, FileStorage fileStorage, + HttpServletRequest request, + ResourceMonitor resourceMonitor, + JobQueue jobQueue, @Value("${spring.mvc.async.request-timeout:1200000}") long asyncRequestTimeoutMs, @Value("${server.servlet.session.timeout:30m}") String sessionTimeout) { this.taskManager = taskManager; this.webSocketSender = webSocketSender; this.fileStorage = fileStorage; + this.request = request; + this.resourceMonitor = resourceMonitor; + this.jobQueue = jobQueue; // Parse session timeout and calculate effective timeout once during initialization long sessionTimeoutMs = parseSessionTimeout(sessionTimeout); @@ -58,10 +70,94 @@ public class JobExecutorService { * @return The response */ public ResponseEntity runJobGeneric(boolean async, Supplier work) { - String jobId = UUID.randomUUID().toString(); - log.debug("Running job with ID: {}, async: {}", jobId, async); + return runJobGeneric(async, work, -1); + } - if (async) { + /** + * Run a job either asynchronously or synchronously with a custom timeout + * + * @param async Whether to run the job asynchronously + * @param work The work to be done + * @param customTimeoutMs Custom timeout in milliseconds, or -1 to use the default + * @return The response + */ + public ResponseEntity runJobGeneric( + boolean async, Supplier work, long customTimeoutMs) { + return runJobGeneric(async, work, customTimeoutMs, false, 50); + } + + /** + * Run a job either asynchronously or synchronously with custom parameters + * + * @param async Whether to run the job asynchronously + * @param work The work to be done + * @param customTimeoutMs Custom timeout in milliseconds, or -1 to use the default + * @param queueable Whether this job can be queued when system resources are limited + * @param resourceWeight The resource weight of this job (1-100) + * @return The response + */ + public ResponseEntity runJobGeneric( + boolean async, + Supplier work, + long customTimeoutMs, + boolean queueable, + int resourceWeight) { + String jobId = UUID.randomUUID().toString(); + + // Store the job ID in the request for potential use by other components + if (request != null) { + request.setAttribute("jobId", jobId); + } + + // Determine which timeout to use + long timeoutToUse = customTimeoutMs > 0 ? customTimeoutMs : effectiveTimeoutMs; + + log.debug( + "Running job with ID: {}, async: {}, timeout: {}ms, queueable: {}, weight: {}", + jobId, + async, + timeoutToUse, + queueable, + resourceWeight); + + // Check if we need to queue this job based on resource availability + boolean shouldQueue = + queueable + && async + && // Only async jobs can be queued + resourceMonitor.shouldQueueJob(resourceWeight); + + if (shouldQueue) { + // Queue the job instead of executing immediately + log.debug( + "Queueing job {} due to resource constraints (weight: {})", + jobId, + resourceWeight); + + taskManager.createTask(jobId); + + // Create a specialized wrapper that updates the TaskManager + Supplier wrappedWork = + () -> { + try { + Object result = work.get(); + processJobResult(jobId, result); + return result; + } catch (Exception e) { + log.error( + "Error executing queued job {}: {}", jobId, e.getMessage(), e); + taskManager.setError(jobId, e.getMessage()); + throw e; + } + }; + + // Queue the job and get the future + CompletableFuture> future = + jobQueue.queueJob(jobId, resourceWeight, wrappedWork, timeoutToUse); + + // Return immediately with job ID + return ResponseEntity.ok().body(new JobResponse<>(true, jobId, null)); + } else if (async) { taskManager.createTask(jobId); webSocketSender.sendProgress(jobId, new JobProgress(jobId, "Started", 0, "Running")); @@ -69,18 +165,15 @@ public class JobExecutorService { () -> { try { log.debug( - "Running async job {} with timeout {} ms", - jobId, - effectiveTimeoutMs); + "Running async job {} with timeout {} ms", jobId, timeoutToUse); // Execute with timeout - Object result = - executeWithTimeout(() -> work.get(), effectiveTimeoutMs); + Object result = executeWithTimeout(() -> work.get(), timeoutToUse); processJobResult(jobId, result); webSocketSender.sendProgress( jobId, new JobProgress(jobId, "Done", 100, "Complete")); } catch (TimeoutException te) { - log.error("Job {} timed out after {} ms", jobId, effectiveTimeoutMs); + log.error("Job {} timed out after {} ms", jobId, timeoutToUse); taskManager.setError(jobId, "Job timed out"); webSocketSender.sendProgress( jobId, new JobProgress(jobId, "Error", 100, "Job timed out")); @@ -95,10 +188,10 @@ public class JobExecutorService { return ResponseEntity.ok().body(new JobResponse<>(true, jobId, null)); } else { try { - log.debug("Running sync job with timeout {} ms", effectiveTimeoutMs); + log.debug("Running sync job with timeout {} ms", timeoutToUse); // Execute with timeout - Object result = executeWithTimeout(() -> work.get(), effectiveTimeoutMs); + Object result = executeWithTimeout(() -> work.get(), timeoutToUse); // If the result is already a ResponseEntity, return it directly if (result instanceof ResponseEntity) { @@ -108,9 +201,9 @@ public class JobExecutorService { // Process different result types return handleResultForSyncJob(result); } catch (TimeoutException te) { - log.error("Synchronous job timed out after {} ms", effectiveTimeoutMs); + log.error("Synchronous job timed out after {} ms", timeoutToUse); return ResponseEntity.internalServerError() - .body("Job timed out after " + effectiveTimeoutMs + " ms"); + .body(Map.of("error", "Job timed out after " + timeoutToUse + " ms")); } catch (Exception e) { log.error("Error executing synchronous job: {}", e.getMessage(), e); // Construct a JSON error response diff --git a/common/src/main/java/stirling/software/common/service/JobQueue.java b/common/src/main/java/stirling/software/common/service/JobQueue.java new file mode 100644 index 000000000..1e5c8a5a3 --- /dev/null +++ b/common/src/main/java/stirling/software/common/service/JobQueue.java @@ -0,0 +1,441 @@ +package stirling.software.common.service; + +import java.time.Instant; +import java.util.Map; +import java.util.concurrent.*; +import java.util.function.Supplier; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.http.ResponseEntity; +import org.springframework.stereotype.Service; + +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + +import stirling.software.common.controller.WebSocketProgressController; +import stirling.software.common.model.job.JobProgress; + +/** + * Manages a queue of jobs with dynamic sizing based on system resources. Used when system resources + * are limited to prevent overloading. + */ +@Service +@Slf4j +public class JobQueue { + + private final ResourceMonitor resourceMonitor; + private final WebSocketProgressController webSocketSender; + + @Value("${stirling.job.queue.base-capacity:10}") + private int baseQueueCapacity = 10; + + @Value("${stirling.job.queue.min-capacity:2}") + private int minQueueCapacity = 2; + + @Value("${stirling.job.queue.check-interval-ms:1000}") + private long queueCheckIntervalMs = 1000; + + @Value("${stirling.job.queue.max-wait-time-ms:600000}") + private long maxWaitTimeMs = 600000; // 10 minutes + + private BlockingQueue jobQueue; + private final Map jobMap = new ConcurrentHashMap<>(); + private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); + private final ExecutorService jobExecutor; + + // Initialize executor based on Java version + { + ExecutorService executor; + try { + // Try to use Virtual Threads (Java 21+) + executor = Executors.newVirtualThreadPerTaskExecutor(); + log.info("Using Virtual Thread executor (Java 21+)"); + } catch (NoSuchMethodError e) { + // Fall back to thread pool for Java < 21 + executor = Executors.newCachedThreadPool(); + log.info("Using cached thread pool executor (Java < 21)"); + } + jobExecutor = executor; + } + private boolean shuttingDown = false; + + @Getter private int rejectedJobs = 0; + + @Getter private int totalQueuedJobs = 0; + + @Getter private int currentQueueSize = 0; + + /** Represents a job waiting in the queue. */ + @Data + @AllArgsConstructor + private static class QueuedJob { + private final String jobId; + private final int resourceWeight; + private final Supplier work; + private final long timeoutMs; + private final Instant queuedAt; + private CompletableFuture> future; + private volatile boolean cancelled = false; + } + + public JobQueue(ResourceMonitor resourceMonitor, WebSocketProgressController webSocketSender) { + this.resourceMonitor = resourceMonitor; + this.webSocketSender = webSocketSender; + + // Initialize with dynamic capacity + int capacity = + resourceMonitor.calculateDynamicQueueCapacity(baseQueueCapacity, minQueueCapacity); + this.jobQueue = new LinkedBlockingQueue<>(capacity); + } + + @PostConstruct + public void initialize() { + log.info( + "Starting job queue with base capacity {}, min capacity {}", + baseQueueCapacity, + minQueueCapacity); + + // Periodically process the job queue + scheduler.scheduleWithFixedDelay( + this::processQueue, 0, queueCheckIntervalMs, TimeUnit.MILLISECONDS); + + // Periodically update queue capacity based on resource usage + scheduler.scheduleWithFixedDelay( + this::updateQueueCapacity, + 10000, // Initial delay + 30000, // 30 second interval + TimeUnit.MILLISECONDS); + } + + @PreDestroy + public void shutdown() { + log.info("Shutting down job queue"); + shuttingDown = true; + + // Complete any futures that are still waiting + jobMap.forEach( + (id, job) -> { + if (!job.future.isDone()) { + job.future.completeExceptionally( + new RuntimeException("Server shutting down, job cancelled")); + } + }); + + scheduler.shutdownNow(); + jobExecutor.shutdownNow(); + + log.info( + "Job queue shutdown complete. Stats: total={}, rejected={}", + totalQueuedJobs, + rejectedJobs); + } + + /** + * Queues a job for execution when resources permit. + * + * @param jobId The job ID + * @param resourceWeight The resource weight of the job (1-100) + * @param work The work to be done + * @param timeoutMs The timeout in milliseconds + * @return A CompletableFuture that will complete when the job is executed + */ + public CompletableFuture> queueJob( + String jobId, int resourceWeight, Supplier work, long timeoutMs) { + + // Create a CompletableFuture to track this job's completion + CompletableFuture> future = new CompletableFuture<>(); + + // Create the queued job + QueuedJob job = + new QueuedJob(jobId, resourceWeight, work, timeoutMs, Instant.now(), future, false); + + // Store in our map for lookup + jobMap.put(jobId, job); + + // Update stats + totalQueuedJobs++; + currentQueueSize = jobQueue.size(); + + // Try to add to the queue + try { + boolean added = jobQueue.offer(job, 5, TimeUnit.SECONDS); + if (!added) { + log.warn("Queue full, rejecting job {}", jobId); + rejectedJobs++; + future.completeExceptionally( + new RuntimeException("Job queue full, please try again later")); + jobMap.remove(jobId); + return future; + } + + log.debug( + "Job {} queued for execution (weight: {}, queue size: {})", + jobId, + resourceWeight, + jobQueue.size()); + + // Notify client via WebSocket that job is queued + webSocketSender.sendProgress( + jobId, new JobProgress(jobId, "Queued", 0, "Waiting in queue for resources")); + + return future; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + future.completeExceptionally(new RuntimeException("Job queue interrupted")); + jobMap.remove(jobId); + return future; + } + } + + /** + * Gets the current capacity of the job queue. + * + * @return The current capacity + */ + public int getQueueCapacity() { + return ((LinkedBlockingQueue) jobQueue).remainingCapacity() + jobQueue.size(); + } + + /** Updates the capacity of the job queue based on available system resources. */ + private void updateQueueCapacity() { + try { + // Calculate new capacity + int newCapacity = + resourceMonitor.calculateDynamicQueueCapacity( + baseQueueCapacity, minQueueCapacity); + + int currentCapacity = getQueueCapacity(); + if (newCapacity != currentCapacity) { + log.debug( + "Updating job queue capacity from {} to {}", currentCapacity, newCapacity); + + // Create new queue with updated capacity + BlockingQueue newQueue = new LinkedBlockingQueue<>(newCapacity); + + // Transfer jobs from old queue to new queue + jobQueue.drainTo(newQueue); + jobQueue = newQueue; + + currentQueueSize = jobQueue.size(); + } + } catch (Exception e) { + log.error("Error updating queue capacity: {}", e.getMessage(), e); + } + } + + /** Processes jobs in the queue, executing them when resources permit. */ + private void processQueue() { + if (shuttingDown || jobQueue.isEmpty()) { + return; + } + + try { + // Get current resource status + ResourceMonitor.ResourceStatus status = resourceMonitor.getCurrentStatus().get(); + + // Check if we should execute any jobs + boolean canExecuteJobs = (status != ResourceMonitor.ResourceStatus.CRITICAL); + + if (!canExecuteJobs) { + // Under critical load, don't execute any jobs + log.debug("System under critical load, delaying job execution"); + return; + } + + // Get jobs from the queue, up to a limit based on resource availability + int jobsToProcess = + Math.max( + 1, + switch (status) { + case OK -> 3; + case WARNING -> 1; + case CRITICAL -> 0; + }); + + for (int i = 0; i < jobsToProcess && !jobQueue.isEmpty(); i++) { + QueuedJob job = jobQueue.poll(); + if (job == null) break; + + // Check if it's been waiting too long + long waitTimeMs = Instant.now().toEpochMilli() - job.queuedAt.toEpochMilli(); + if (waitTimeMs > maxWaitTimeMs) { + log.warn( + "Job {} exceeded maximum wait time ({} ms), executing anyway", + job.jobId, + waitTimeMs); + } + + // Remove from our map + jobMap.remove(job.jobId); + currentQueueSize = jobQueue.size(); + + // Notify via WebSocket + webSocketSender.sendProgress( + job.jobId, + new JobProgress(job.jobId, "Processing", 10, "Starting job execution")); + + // Execute the job + executeJob(job); + } + } catch (Exception e) { + log.error("Error processing job queue: {}", e.getMessage(), e); + } + } + + /** + * Executes a job from the queue. + * + * @param job The job to execute + */ + private void executeJob(QueuedJob job) { + if (job.cancelled) { + log.debug("Job {} was cancelled, not executing", job.jobId); + return; + } + + jobExecutor.execute( + () -> { + log.debug("Executing queued job {} (queued at {})", job.jobId, job.queuedAt); + + try { + // Execute with timeout + Object result = executeWithTimeout(job.work, job.timeoutMs); + + // Process the result + if (result instanceof ResponseEntity) { + job.future.complete((ResponseEntity) result); + } else { + job.future.complete(ResponseEntity.ok(result)); + } + + // Update WebSocket + webSocketSender.sendProgress( + job.jobId, + new JobProgress( + job.jobId, "Complete", 100, "Job completed successfully")); + } catch (Exception e) { + log.error( + "Error executing queued job {}: {}", job.jobId, e.getMessage(), e); + job.future.completeExceptionally(e); + + // Update WebSocket + webSocketSender.sendProgress( + job.jobId, + new JobProgress( + job.jobId, "Error", 100, "Job failed: " + e.getMessage())); + } + }); + } + + /** + * Execute a supplier with a timeout. + * + * @param supplier The supplier to execute + * @param timeoutMs The timeout in milliseconds + * @return The result from the supplier + * @throws Exception If there is an execution error + */ + private T executeWithTimeout(Supplier supplier, long timeoutMs) throws Exception { + CompletableFuture future = CompletableFuture.supplyAsync(supplier); + + try { + if (timeoutMs <= 0) { + // No timeout + return future.join(); + } else { + // With timeout + return future.get(timeoutMs, TimeUnit.MILLISECONDS); + } + } catch (TimeoutException e) { + future.cancel(true); + throw new TimeoutException("Job timed out after " + timeoutMs + "ms"); + } catch (ExecutionException e) { + throw (Exception) e.getCause(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new InterruptedException("Job was interrupted"); + } + } + + /** + * Checks if a job is queued. + * + * @param jobId The job ID + * @return true if the job is queued + */ + public boolean isJobQueued(String jobId) { + return jobMap.containsKey(jobId); + } + + /** + * Gets the current position of a job in the queue. + * + * @param jobId The job ID + * @return The position (0-based) or -1 if not found + */ + public int getJobPosition(String jobId) { + if (!jobMap.containsKey(jobId)) { + return -1; + } + + // Count positions + int position = 0; + for (QueuedJob job : jobQueue) { + if (job.jobId.equals(jobId)) { + return position; + } + position++; + } + + // If we didn't find it in the queue but it's in the map, + // it might be executing already + return -1; + } + + /** + * Cancels a queued job. + * + * @param jobId The job ID + * @return true if the job was cancelled, false if not found + */ + public boolean cancelJob(String jobId) { + QueuedJob job = jobMap.remove(jobId); + if (job != null) { + job.cancelled = true; + job.future.completeExceptionally(new RuntimeException("Job cancelled by user")); + + // Try to remove from queue if it's still there + jobQueue.remove(job); + currentQueueSize = jobQueue.size(); + + log.debug("Job {} cancelled", jobId); + + // Update WebSocket + webSocketSender.sendProgress( + jobId, new JobProgress(jobId, "Cancelled", 100, "Job cancelled by user")); + + return true; + } + + return false; + } + + /** + * Get queue statistics. + * + * @return A map containing queue statistics + */ + public Map getQueueStats() { + return Map.of( + "queuedJobs", jobQueue.size(), + "queueCapacity", getQueueCapacity(), + "totalQueuedJobs", totalQueuedJobs, + "rejectedJobs", rejectedJobs, + "resourceStatus", resourceMonitor.getCurrentStatus().get().name()); + } +} diff --git a/common/src/main/java/stirling/software/common/service/ResourceMonitor.java b/common/src/main/java/stirling/software/common/service/ResourceMonitor.java new file mode 100644 index 000000000..e5a5341a7 --- /dev/null +++ b/common/src/main/java/stirling/software/common/service/ResourceMonitor.java @@ -0,0 +1,277 @@ +package stirling.software.common.service; + +import java.lang.management.ManagementFactory; +import java.lang.management.MemoryMXBean; +import java.lang.management.OperatingSystemMXBean; +import java.time.Duration; +import java.time.Instant; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; + +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; + +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + +/** + * Monitors system resources (CPU, memory) to inform job scheduling decisions. Provides information + * about available resources to prevent overloading the system. + */ +@Service +@Slf4j +public class ResourceMonitor { + + @Value("${stirling.resource.memory.critical-threshold:0.9}") + private double memoryCriticalThreshold = 0.9; // 90% usage is critical + + @Value("${stirling.resource.memory.high-threshold:0.75}") + private double memoryHighThreshold = 0.75; // 75% usage is high + + @Value("${stirling.resource.cpu.critical-threshold:0.9}") + private double cpuCriticalThreshold = 0.9; // 90% usage is critical + + @Value("${stirling.resource.cpu.high-threshold:0.75}") + private double cpuHighThreshold = 0.75; // 75% usage is high + + @Value("${stirling.resource.monitor.interval-ms:5000}") + private long monitorIntervalMs = 5000; // 5 seconds + + private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); + private final MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean(); + private final OperatingSystemMXBean osMXBean = ManagementFactory.getOperatingSystemMXBean(); + + @Getter + private final AtomicReference currentStatus = + new AtomicReference<>(ResourceStatus.OK); + + @Getter + private final AtomicReference latestMetrics = + new AtomicReference<>(new ResourceMetrics()); + + /** Represents the current status of system resources. */ + public enum ResourceStatus { + /** Resources are available, normal operations can proceed */ + OK, + + /** Resources are under strain, consider queueing high-resource operations */ + WARNING, + + /** Resources are critically low, queue all operations */ + CRITICAL + } + + /** Detailed metrics about system resources. */ + @Getter + public static class ResourceMetrics { + private final double cpuUsage; + private final double memoryUsage; + private final long freeMemoryBytes; + private final long totalMemoryBytes; + private final long maxMemoryBytes; + private final Instant timestamp; + + public ResourceMetrics() { + this(0, 0, 0, 0, 0, Instant.now()); + } + + public ResourceMetrics( + double cpuUsage, + double memoryUsage, + long freeMemoryBytes, + long totalMemoryBytes, + long maxMemoryBytes, + Instant timestamp) { + this.cpuUsage = cpuUsage; + this.memoryUsage = memoryUsage; + this.freeMemoryBytes = freeMemoryBytes; + this.totalMemoryBytes = totalMemoryBytes; + this.maxMemoryBytes = maxMemoryBytes; + this.timestamp = timestamp; + } + + /** + * Gets the age of these metrics. + * + * @return Duration since these metrics were collected + */ + public Duration getAge() { + return Duration.between(timestamp, Instant.now()); + } + + /** + * Check if these metrics are stale (older than threshold). + * + * @param thresholdMs Staleness threshold in milliseconds + * @return true if metrics are stale + */ + public boolean isStale(long thresholdMs) { + return getAge().toMillis() > thresholdMs; + } + } + + @PostConstruct + public void initialize() { + log.info("Starting resource monitoring with interval of {}ms", monitorIntervalMs); + scheduler.scheduleAtFixedRate( + this::updateResourceMetrics, 0, monitorIntervalMs, TimeUnit.MILLISECONDS); + } + + @PreDestroy + public void shutdown() { + log.info("Shutting down resource monitoring"); + scheduler.shutdownNow(); + } + + /** Updates the resource metrics by sampling current system state. */ + private void updateResourceMetrics() { + try { + // Get CPU usage + double cpuUsage = osMXBean.getSystemLoadAverage() / osMXBean.getAvailableProcessors(); + if (cpuUsage < 0) cpuUsage = getAlternativeCpuLoad(); // Fallback if not available + + // Get memory usage + long heapUsed = memoryMXBean.getHeapMemoryUsage().getUsed(); + long nonHeapUsed = memoryMXBean.getNonHeapMemoryUsage().getUsed(); + long totalUsed = heapUsed + nonHeapUsed; + + long maxMemory = Runtime.getRuntime().maxMemory(); + long totalMemory = Runtime.getRuntime().totalMemory(); + long freeMemory = Runtime.getRuntime().freeMemory(); + + double memoryUsage = (double) totalUsed / maxMemory; + + // Create new metrics + ResourceMetrics metrics = + new ResourceMetrics( + cpuUsage, + memoryUsage, + freeMemory, + totalMemory, + maxMemory, + Instant.now()); + latestMetrics.set(metrics); + + // Determine system status + ResourceStatus newStatus; + if (cpuUsage > cpuCriticalThreshold || memoryUsage > memoryCriticalThreshold) { + newStatus = ResourceStatus.CRITICAL; + } else if (cpuUsage > cpuHighThreshold || memoryUsage > memoryHighThreshold) { + newStatus = ResourceStatus.WARNING; + } else { + newStatus = ResourceStatus.OK; + } + + // Update status if it changed + ResourceStatus oldStatus = currentStatus.getAndSet(newStatus); + if (oldStatus != newStatus) { + log.info("System resource status changed from {} to {}", oldStatus, newStatus); + log.info( + "Current metrics - CPU: {:.1f}%, Memory: {:.1f}%, Free Memory: {} MB", + cpuUsage * 100, memoryUsage * 100, freeMemory / (1024 * 1024)); + } + } catch (Exception e) { + log.error("Error updating resource metrics: {}", e.getMessage(), e); + } + } + + /** + * Alternative method to estimate CPU load if getSystemLoadAverage() is not available. This is a + * fallback and less accurate than the official JMX method. + * + * @return Estimated CPU load as a value between 0.0 and 1.0 + */ + private double getAlternativeCpuLoad() { + try { + // Try to get CPU time if available through reflection + // This is a fallback since we can't directly cast to platform-specific classes + try { + java.lang.reflect.Method m = + osMXBean.getClass().getDeclaredMethod("getProcessCpuLoad"); + m.setAccessible(true); + return (double) m.invoke(osMXBean); + } catch (Exception e) { + // Try the older method + try { + java.lang.reflect.Method m = + osMXBean.getClass().getDeclaredMethod("getSystemCpuLoad"); + m.setAccessible(true); + return (double) m.invoke(osMXBean); + } catch (Exception e2) { + log.warn( + "Could not get CPU load through reflection, assuming moderate load (0.5)"); + return 0.5; + } + } + } catch (Exception e) { + log.warn("Could not get CPU load, assuming moderate load (0.5)"); + return 0.5; // Default to moderate load + } + } + + /** + * Calculates the dynamic job queue capacity based on current resource usage. + * + * @param baseCapacity The base capacity when system is under minimal load + * @param minCapacity The minimum capacity to maintain even under high load + * @return The calculated job queue capacity + */ + public int calculateDynamicQueueCapacity(int baseCapacity, int minCapacity) { + ResourceMetrics metrics = latestMetrics.get(); + ResourceStatus status = currentStatus.get(); + + // Simple linear reduction based on memory and CPU load + double capacityFactor = + switch (status) { + case OK -> 1.0; + case WARNING -> 0.6; + case CRITICAL -> 0.3; + }; + + // Apply additional reduction based on specific memory pressure + if (metrics.memoryUsage > 0.8) { + capacityFactor *= 0.5; // Further reduce capacity under memory pressure + } + + // Calculate capacity with minimum safeguard + int capacity = (int) Math.max(minCapacity, Math.ceil(baseCapacity * capacityFactor)); + + log.debug( + "Dynamic queue capacity: {} (base: {}, factor: {:.2f}, status: {})", + capacity, + baseCapacity, + capacityFactor, + status); + + return capacity; + } + + /** + * Checks if a job with the given weight can be executed immediately or should be queued based + * on current resource availability. + * + * @param resourceWeight The resource weight of the job (1-100) + * @return true if the job should be queued, false if it can run immediately + */ + public boolean shouldQueueJob(int resourceWeight) { + ResourceStatus status = currentStatus.get(); + + // Always run lightweight jobs (weight < 20) unless critical + if (resourceWeight < 20 && status != ResourceStatus.CRITICAL) { + return false; + } + + // Medium weight jobs run immediately if resources are OK + if (resourceWeight < 60 && status == ResourceStatus.OK) { + return false; + } + + // Heavy jobs (weight >= 60) and any job during WARNING/CRITICAL should be queued + return true; + } +} diff --git a/common/src/test/java/stirling/software/common/annotations/AutoJobPostMappingIntegrationTest.java b/common/src/test/java/stirling/software/common/annotations/AutoJobPostMappingIntegrationTest.java new file mode 100644 index 000000000..40bfd5e4f --- /dev/null +++ b/common/src/test/java/stirling/software/common/annotations/AutoJobPostMappingIntegrationTest.java @@ -0,0 +1,214 @@ +package stirling.software.common.annotations; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import org.junit.jupiter.api.BeforeEach; + +import java.util.Arrays; +import java.util.function.Supplier; + +import org.aspectj.lang.ProceedingJoinPoint; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.springframework.http.ResponseEntity; +import org.springframework.web.multipart.MultipartFile; + +import jakarta.servlet.http.HttpServletRequest; + +import stirling.software.common.aop.AutoJobAspect; +import stirling.software.common.controller.WebSocketProgressController; +import stirling.software.common.model.api.PDFFile; +import stirling.software.common.service.FileOrUploadService; +import stirling.software.common.service.FileStorage; +import stirling.software.common.service.JobExecutorService; +import stirling.software.common.service.JobQueue; +import stirling.software.common.service.ResourceMonitor; + +@ExtendWith(MockitoExtension.class) +class AutoJobPostMappingIntegrationTest { + + private AutoJobAspect autoJobAspect; + + @Mock + private JobExecutorService jobExecutorService; + + @Mock + private HttpServletRequest request; + + @Mock + private FileOrUploadService fileOrUploadService; + + @Mock + private FileStorage fileStorage; + + @Mock + private WebSocketProgressController webSocketSender; + + @Mock + private ResourceMonitor resourceMonitor; + + @Mock + private JobQueue jobQueue; + + @BeforeEach + void setUp() { + autoJobAspect = new AutoJobAspect( + jobExecutorService, + request, + fileOrUploadService, + fileStorage, + webSocketSender + ); + } + + @Mock + private ProceedingJoinPoint joinPoint; + + @Mock + private AutoJobPostMapping autoJobPostMapping; + + @Captor + private ArgumentCaptor> workCaptor; + + @Captor + private ArgumentCaptor asyncCaptor; + + @Captor + private ArgumentCaptor timeoutCaptor; + + @Captor + private ArgumentCaptor queueableCaptor; + + @Captor + private ArgumentCaptor resourceWeightCaptor; + + @Test + void shouldExecuteWithCustomParameters() throws Throwable { + // Given + PDFFile pdfFile = new PDFFile(); + pdfFile.setFileId("test-file-id"); + Object[] args = new Object[] { pdfFile }; + + when(joinPoint.getArgs()).thenReturn(args); + when(request.getParameter("async")).thenReturn("true"); + when(autoJobPostMapping.timeout()).thenReturn(60000L); + when(autoJobPostMapping.retryCount()).thenReturn(3); + when(autoJobPostMapping.trackProgress()).thenReturn(true); + when(autoJobPostMapping.queueable()).thenReturn(true); + when(autoJobPostMapping.resourceWeight()).thenReturn(75); + + MultipartFile mockFile = mock(MultipartFile.class); + when(fileStorage.retrieveFile("test-file-id")).thenReturn(mockFile); + + + when(jobExecutorService.runJobGeneric( + anyBoolean(), any(Supplier.class), anyLong(), anyBoolean(), anyInt())) + .thenReturn(ResponseEntity.ok("success")); + + // When + Object result = autoJobAspect.wrapWithJobExecution(joinPoint, autoJobPostMapping); + + // Then + assertEquals(ResponseEntity.ok("success"), result); + + verify(jobExecutorService).runJobGeneric( + asyncCaptor.capture(), + workCaptor.capture(), + timeoutCaptor.capture(), + queueableCaptor.capture(), + resourceWeightCaptor.capture()); + + assertTrue(asyncCaptor.getValue(), "Async should be true"); + assertEquals(60000L, timeoutCaptor.getValue(), "Timeout should be 60000ms"); + assertTrue(queueableCaptor.getValue(), "Queueable should be true"); + assertEquals(75, resourceWeightCaptor.getValue(), "Resource weight should be 75"); + + // Test that file was resolved + assertNotNull(pdfFile.getFileInput(), "File input should be set"); + } + + @Test + void shouldRetryOnError() throws Throwable { + // Given + when(joinPoint.getArgs()).thenReturn(new Object[0]); + when(request.getParameter("async")).thenReturn("false"); + when(autoJobPostMapping.timeout()).thenReturn(-1L); + when(autoJobPostMapping.retryCount()).thenReturn(2); + when(autoJobPostMapping.trackProgress()).thenReturn(false); + when(autoJobPostMapping.queueable()).thenReturn(false); + when(autoJobPostMapping.resourceWeight()).thenReturn(50); + + // First call throws exception, second succeeds + when(joinPoint.proceed(any())) + .thenThrow(new RuntimeException("First attempt failed")) + .thenReturn(ResponseEntity.ok("retry succeeded")); + + // Mock jobExecutorService to execute the work immediately + when(jobExecutorService.runJobGeneric( + anyBoolean(), any(Supplier.class), anyLong(), anyBoolean(), anyInt())) + .thenAnswer(invocation -> { + Supplier work = invocation.getArgument(1); + return work.get(); + }); + + // When + Object result = autoJobAspect.wrapWithJobExecution(joinPoint, autoJobPostMapping); + + // Then + assertEquals(ResponseEntity.ok("retry succeeded"), result); + + // Verify that proceed was called twice (initial attempt + 1 retry) + verify(joinPoint, times(2)).proceed(any()); + } + + @Test + void shouldHandlePDFFileWithAsyncRequests() throws Throwable { + // Given + PDFFile pdfFile = new PDFFile(); + pdfFile.setFileInput(mock(MultipartFile.class)); + Object[] args = new Object[] { pdfFile }; + + when(joinPoint.getArgs()).thenReturn(args); + when(request.getParameter("async")).thenReturn("true"); + when(autoJobPostMapping.retryCount()).thenReturn(1); + + when(fileStorage.storeFile(any(MultipartFile.class))).thenReturn("stored-file-id"); + when(fileStorage.retrieveFile("stored-file-id")).thenReturn(mock(MultipartFile.class)); + + // Mock job executor to return a successful response + when(jobExecutorService.runJobGeneric( + anyBoolean(), any(Supplier.class), anyLong(), anyBoolean(), anyInt())) + .thenReturn(ResponseEntity.ok("success")); + + // When + autoJobAspect.wrapWithJobExecution(joinPoint, autoJobPostMapping); + + // Then + assertEquals("stored-file-id", pdfFile.getFileId(), + "FileId should be set to the stored file id"); + assertNotNull(pdfFile.getFileInput(), "FileInput should be replaced with persistent file"); + + // Verify storage operations + verify(fileStorage).storeFile(any(MultipartFile.class)); + verify(fileStorage).retrieveFile("stored-file-id"); + } +} + +// Move PDFFileTest to its own class file if needed \ No newline at end of file diff --git a/common/src/test/java/stirling/software/common/service/JobExecutorServiceTest.java b/common/src/test/java/stirling/software/common/service/JobExecutorServiceTest.java new file mode 100644 index 000000000..c5be7dc24 --- /dev/null +++ b/common/src/test/java/stirling/software/common/service/JobExecutorServiceTest.java @@ -0,0 +1,208 @@ +package stirling.software.common.service; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeoutException; +import java.util.function.Supplier; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.jupiter.MockitoExtension; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import org.springframework.test.util.ReflectionTestUtils; + +import jakarta.servlet.http.HttpServletRequest; + +import stirling.software.common.controller.WebSocketProgressController; +import stirling.software.common.model.job.JobProgress; +import stirling.software.common.model.job.JobResponse; + +@ExtendWith(MockitoExtension.class) +class JobExecutorServiceTest { + + private JobExecutorService jobExecutorService; + + @Mock + private TaskManager taskManager; + + @Mock + private WebSocketProgressController webSocketSender; + + @Mock + private FileStorage fileStorage; + + @Mock + private HttpServletRequest request; + + @Mock + private ResourceMonitor resourceMonitor; + + @Mock + private JobQueue jobQueue; + + @Captor + private ArgumentCaptor jobIdCaptor; + + @BeforeEach + void setUp() { + // Initialize the service manually with all its dependencies + jobExecutorService = new JobExecutorService( + taskManager, + webSocketSender, + fileStorage, + request, + resourceMonitor, + jobQueue, + 30000L, // asyncRequestTimeoutMs + "30m" // sessionTimeout + ); + } + + @Test + void shouldRunSyncJobSuccessfully() throws Exception { + // Given + Supplier work = () -> "test-result"; + + // When + ResponseEntity response = jobExecutorService.runJobGeneric(false, work); + + // Then + assertEquals(HttpStatus.OK, response.getStatusCode()); + assertEquals("test-result", response.getBody()); + + // Verify request attribute was set with jobId + verify(request).setAttribute(eq("jobId"), anyString()); + } + + @Test + void shouldRunAsyncJobSuccessfully() throws Exception { + // Given + Supplier work = () -> "test-result"; + + // When + ResponseEntity response = jobExecutorService.runJobGeneric(true, work); + + // Then + assertEquals(HttpStatus.OK, response.getStatusCode()); + assertTrue(response.getBody() instanceof JobResponse); + JobResponse jobResponse = (JobResponse) response.getBody(); + assertTrue(jobResponse.isAsync()); + assertNotNull(jobResponse.getJobId()); + + // Verify task manager was called + verify(taskManager).createTask(jobIdCaptor.capture()); + verify(webSocketSender).sendProgress(eq(jobIdCaptor.getValue()), any(JobProgress.class)); + } + + + @Test + void shouldHandleSyncJobError() { + // Given + Supplier work = () -> { + throw new RuntimeException("Test error"); + }; + + // When + ResponseEntity response = jobExecutorService.runJobGeneric(false, work); + + // Then + assertEquals(HttpStatus.INTERNAL_SERVER_ERROR, response.getStatusCode()); + + @SuppressWarnings("unchecked") + Map errorMap = (Map) response.getBody(); + assertEquals("Job failed: Test error", errorMap.get("error")); + } + + @Test + void shouldQueueJobWhenResourcesLimited() { + // Given + Supplier work = () -> "test-result"; + CompletableFuture> future = new CompletableFuture<>(); + + // Configure resourceMonitor to indicate job should be queued + when(resourceMonitor.shouldQueueJob(80)).thenReturn(true); + + // Configure jobQueue to return our future + when(jobQueue.queueJob(anyString(), eq(80), any(), anyLong())).thenReturn(future); + + // When + ResponseEntity response = jobExecutorService.runJobGeneric( + true, work, 5000, true, 80); + + // Then + assertEquals(HttpStatus.OK, response.getStatusCode()); + assertTrue(response.getBody() instanceof JobResponse); + + // Verify job was queued + verify(jobQueue).queueJob(anyString(), eq(80), any(), eq(5000L)); + verify(taskManager).createTask(anyString()); + } + + @Test + void shouldUseCustomTimeoutWhenProvided() throws Exception { + // Given + Supplier work = () -> "test-result"; + long customTimeout = 60000L; + + // Use reflection to access the private executeWithTimeout method + java.lang.reflect.Method executeMethod = JobExecutorService.class + .getDeclaredMethod("executeWithTimeout", Supplier.class, long.class); + executeMethod.setAccessible(true); + + // Create a spy on the JobExecutorService to verify method calls + JobExecutorService spy = Mockito.spy(jobExecutorService); + + // When + spy.runJobGeneric(false, work, customTimeout); + + // Then + verify(spy).runJobGeneric(eq(false), any(Supplier.class), eq(customTimeout)); + } + + @Test + void shouldHandleTimeout() throws Exception { + // Given + Supplier work = () -> { + try { + Thread.sleep(100); // Simulate long-running job + return "test-result"; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + }; + + // Use reflection to access the private executeWithTimeout method + java.lang.reflect.Method executeMethod = JobExecutorService.class + .getDeclaredMethod("executeWithTimeout", Supplier.class, long.class); + executeMethod.setAccessible(true); + + // When/Then + try { + executeMethod.invoke(jobExecutorService, work, 1L); // Very short timeout + } catch (Exception e) { + assertTrue(e.getCause() instanceof TimeoutException); + } + } +} \ No newline at end of file diff --git a/common/src/test/java/stirling/software/common/service/JobQueueTest.java b/common/src/test/java/stirling/software/common/service/JobQueueTest.java new file mode 100644 index 000000000..810c33f59 --- /dev/null +++ b/common/src/test/java/stirling/software/common/service/JobQueueTest.java @@ -0,0 +1,107 @@ +package stirling.software.common.service; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import stirling.software.common.controller.WebSocketProgressController; +import stirling.software.common.model.job.JobProgress; +import stirling.software.common.service.ResourceMonitor.ResourceStatus; + +@ExtendWith(MockitoExtension.class) +class JobQueueTest { + + private JobQueue jobQueue; + + @Mock + private ResourceMonitor resourceMonitor; + + @Mock + private WebSocketProgressController webSocketSender; + + private final AtomicReference statusRef = new AtomicReference<>(ResourceStatus.OK); + + @BeforeEach + void setUp() { + // Mark stubbing as lenient to avoid UnnecessaryStubbingException + lenient().when(resourceMonitor.calculateDynamicQueueCapacity(anyInt(), anyInt())).thenReturn(10); + lenient().when(resourceMonitor.getCurrentStatus()).thenReturn(statusRef); + + jobQueue = new JobQueue(resourceMonitor, webSocketSender); + } + + @Test + void shouldQueueJob() { + String jobId = "test-job-1"; + int resourceWeight = 50; + Supplier work = () -> "test-result"; + long timeoutMs = 1000; + + jobQueue.queueJob(jobId, resourceWeight, work, timeoutMs); + + verify(webSocketSender).sendProgress( + org.mockito.ArgumentMatchers.eq(jobId), + org.mockito.ArgumentMatchers.any(JobProgress.class)); + + assertTrue(jobQueue.isJobQueued(jobId)); + assertEquals(1, jobQueue.getTotalQueuedJobs()); + } + + @Test + void shouldCancelJob() { + String jobId = "test-job-2"; + Supplier work = () -> "test-result"; + + jobQueue.queueJob(jobId, 50, work, 1000); + boolean cancelled = jobQueue.cancelJob(jobId); + + assertTrue(cancelled); + assertFalse(jobQueue.isJobQueued(jobId)); + } + + @Test + void shouldGetQueueStats() { + when(resourceMonitor.getCurrentStatus()).thenReturn(statusRef); + + jobQueue.queueJob("job1", 50, () -> "ok", 1000); + jobQueue.queueJob("job2", 50, () -> "ok", 1000); + jobQueue.cancelJob("job2"); + + Map stats = jobQueue.getQueueStats(); + + assertEquals(2, stats.get("totalQueuedJobs")); + assertTrue(stats.containsKey("queuedJobs")); + assertTrue(stats.containsKey("resourceStatus")); + } + + @Test + void shouldCalculateQueueCapacity() { + when(resourceMonitor.calculateDynamicQueueCapacity(5, 2)).thenReturn(8); + int capacity = resourceMonitor.calculateDynamicQueueCapacity(5, 2); + assertEquals(8, capacity); + } + + @Test + void shouldCheckIfJobIsQueued() { + String jobId = "job-123"; + Supplier work = () -> "hello"; + + jobQueue.queueJob(jobId, 40, work, 500); + + assertTrue(jobQueue.isJobQueued(jobId)); + assertFalse(jobQueue.isJobQueued("nonexistent")); + } +} diff --git a/common/src/test/java/stirling/software/common/service/ResourceMonitorTest.java b/common/src/test/java/stirling/software/common/service/ResourceMonitorTest.java new file mode 100644 index 000000000..9bf439af0 --- /dev/null +++ b/common/src/test/java/stirling/software/common/service/ResourceMonitorTest.java @@ -0,0 +1,137 @@ +package stirling.software.common.service; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.lang.management.MemoryMXBean; +import java.lang.management.MemoryUsage; +import java.lang.management.OperatingSystemMXBean; +import java.time.Instant; +import java.util.concurrent.atomic.AtomicReference; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.Spy; +import org.mockito.junit.jupiter.MockitoExtension; +import org.springframework.test.util.ReflectionTestUtils; + +import stirling.software.common.service.ResourceMonitor.ResourceMetrics; +import stirling.software.common.service.ResourceMonitor.ResourceStatus; + +@ExtendWith(MockitoExtension.class) +class ResourceMonitorTest { + + @InjectMocks + private ResourceMonitor resourceMonitor; + + @Mock + private OperatingSystemMXBean osMXBean; + + @Mock + private MemoryMXBean memoryMXBean; + + @Spy + private AtomicReference currentStatus = new AtomicReference<>(ResourceStatus.OK); + + @Spy + private AtomicReference latestMetrics = new AtomicReference<>(new ResourceMetrics()); + + @BeforeEach + void setUp() { + // Set thresholds for testing + ReflectionTestUtils.setField(resourceMonitor, "memoryCriticalThreshold", 0.9); + ReflectionTestUtils.setField(resourceMonitor, "memoryHighThreshold", 0.75); + ReflectionTestUtils.setField(resourceMonitor, "cpuCriticalThreshold", 0.9); + ReflectionTestUtils.setField(resourceMonitor, "cpuHighThreshold", 0.75); + ReflectionTestUtils.setField(resourceMonitor, "osMXBean", osMXBean); + ReflectionTestUtils.setField(resourceMonitor, "memoryMXBean", memoryMXBean); + ReflectionTestUtils.setField(resourceMonitor, "currentStatus", currentStatus); + ReflectionTestUtils.setField(resourceMonitor, "latestMetrics", latestMetrics); + } + + @Test + void shouldCalculateDynamicQueueCapacity() { + // Given + int baseCapacity = 10; + int minCapacity = 2; + + // Mock current status as OK + currentStatus.set(ResourceStatus.OK); + + // When + int capacity = resourceMonitor.calculateDynamicQueueCapacity(baseCapacity, minCapacity); + + // Then + assertEquals(baseCapacity, capacity, "With OK status, capacity should equal base capacity"); + + // Given + currentStatus.set(ResourceStatus.WARNING); + + // When + capacity = resourceMonitor.calculateDynamicQueueCapacity(baseCapacity, minCapacity); + + // Then + assertEquals(6, capacity, "With WARNING status, capacity should be reduced to 60%"); + + // Given + currentStatus.set(ResourceStatus.CRITICAL); + + // When + capacity = resourceMonitor.calculateDynamicQueueCapacity(baseCapacity, minCapacity); + + // Then + assertEquals(3, capacity, "With CRITICAL status, capacity should be reduced to 30%"); + + // Test minimum capacity enforcement + assertEquals(minCapacity, resourceMonitor.calculateDynamicQueueCapacity(1, minCapacity), + "Should never go below minimum capacity"); + } + + @ParameterizedTest + @CsvSource({ + "10, OK, false", // Light job, OK status + "10, WARNING, false", // Light job, WARNING status + "10, CRITICAL, true", // Light job, CRITICAL status + "30, OK, false", // Medium job, OK status + "30, WARNING, true", // Medium job, WARNING status + "30, CRITICAL, true", // Medium job, CRITICAL status + "80, OK, true", // Heavy job, OK status + "80, WARNING, true", // Heavy job, WARNING status + "80, CRITICAL, true" // Heavy job, CRITICAL status + }) + void shouldQueueJobBasedOnWeightAndStatus(int weight, ResourceStatus status, boolean shouldQueue) { + // Given + currentStatus.set(status); + + // When + boolean result = resourceMonitor.shouldQueueJob(weight); + + // Then + assertEquals(shouldQueue, result, + String.format("For weight %d and status %s, shouldQueue should be %s", + weight, status, shouldQueue)); + } + + @Test + void resourceMetricsShouldDetectStaleState() { + // Given + Instant now = Instant.now(); + Instant pastInstant = now.minusMillis(6000); + + ResourceMetrics staleMetrics = new ResourceMetrics(0.5, 0.5, 1024, 2048, 4096, pastInstant); + ResourceMetrics freshMetrics = new ResourceMetrics(0.5, 0.5, 1024, 2048, 4096, now); + + // When/Then + assertTrue(staleMetrics.isStale(5000), "Metrics from 6 seconds ago should be stale with 5s threshold"); + assertFalse(freshMetrics.isStale(5000), "Fresh metrics should not be stale"); + } +} \ No newline at end of file diff --git a/docs/AutoJobPostMapping.md b/docs/AutoJobPostMapping.md new file mode 100644 index 000000000..a210514f9 --- /dev/null +++ b/docs/AutoJobPostMapping.md @@ -0,0 +1,168 @@ +# AutoJobPostMapping Annotation + +The `AutoJobPostMapping` annotation simplifies the creation of job-based REST endpoints in Stirling-PDF. It automatically handles job execution, file persistence, error handling, retries, and progress tracking. + +## Features + +- Wraps endpoint methods with job execution logic +- Supports both synchronous and asynchronous execution (via `?async=true` query parameter) +- Custom timeout configuration per endpoint +- Automatic retries with configurable retry count +- WebSocket-based progress tracking +- Consistent error handling and reporting +- Automatic persistence of uploaded files for async processing + +## Usage + +```java +@AutoJobPostMapping("/api/v1/security/remove-password") +public ResponseEntity removePassword(@ModelAttribute PDFPasswordRequest request) + throws IOException { + MultipartFile fileInput = request.getFileInput(); + String password = request.getPassword(); + PDDocument document = pdfDocumentFactory.load(fileInput, password); + document.setAllSecurityToBeRemoved(true); + return WebResponseUtils.pdfDocToWebResponse( + document, + Filenames.toSimpleFileName(fileInput.getOriginalFilename()) + .replaceFirst("[.][^.]+$", "") + + "_password_removed.pdf"); +} +``` + +## Parameters + +The `AutoJobPostMapping` annotation accepts the following parameters: + +| Parameter | Type | Default | Description | +|-----------|------|---------|-------------| +| `value` | String[] | `{}` | The path mapping URIs (e.g., "/api/v1/security/add-password") | +| `consumes` | String[] | `{"multipart/form-data"}` | Supported media types for requests | +| `timeout` | long | `-1` (use system default) | Custom timeout in milliseconds for this job | +| `retryCount` | int | `1` (no retries) | Maximum number of retry attempts on failure | +| `trackProgress` | boolean | `true` | Enable WebSocket progress tracking for async jobs | +| `queueable` | boolean | `false` | Whether this job can be queued when system resources are limited | +| `resourceWeight` | int | `50` | Resource weight of this job (1-100), higher values indicate more resource-intensive jobs | + +## Examples + +### Basic Usage +```java +@AutoJobPostMapping("/api/v1/security/remove-password") +public ResponseEntity removePassword(@ModelAttribute PDFPasswordRequest request) { + // Implementation +} +``` + +### With Custom Timeout +```java +// Set a 5-minute timeout for this operation +@AutoJobPostMapping(value = "/api/v1/misc/ocr-pdf", timeout = 300000) +public ResponseEntity ocrPdf(@ModelAttribute OCRRequest request) { + // OCR implementation +} +``` + +### With Retries +```java +// Allow up to 3 retry attempts for external API calls +@AutoJobPostMapping(value = "/api/v1/convert/url-to-pdf", retryCount = 3) +public ResponseEntity convertUrlToPdf(@ModelAttribute WebsiteToPDFRequest request) { + // Implementation +} +``` + +### Disable Progress Tracking +```java +// Simple, fast operation that doesn't need progress tracking +@AutoJobPostMapping(value = "/api/v1/misc/flatten", trackProgress = false) +public ResponseEntity flattenPdf(@ModelAttribute FlattenRequest request) { + // Implementation +} +``` + +### Enable Job Queueing for Resource-Intensive Operations +```java +// Resource-intensive operation that can be queued during high system load +@AutoJobPostMapping( + value = "/api/v1/misc/ocr-pdf", + queueable = true, + resourceWeight = 80, // High resource usage + timeout = 600000 // 10 minutes +) +public ResponseEntity ocrPdf(@ModelAttribute OCRRequest request) { + // OCR implementation +} +``` + +### Lightweight Operation +```java +// Very lightweight operation with low resource requirements +@AutoJobPostMapping( + value = "/api/v1/misc/get-page-count", + queueable = false, + resourceWeight = 10 // Very low resource usage +) +public ResponseEntity getPageCount(@ModelAttribute PDFFile request) { + // Simple page count implementation +} +``` + +## Client-Side Integration + +For asynchronous jobs, clients can: +1. Submit the job with `?async=true` parameter +2. Receive a job ID in the response +3. Connect to the WebSocket at `/ws/progress/{jobId}` to receive progress updates +4. Fetch the completed result from `/api/v1/general/job/{jobId}/result` when done + +Example WebSocket message: +```json +{ + "jobId": "b4c9a31d-4b7e-42b2-8ab9-3cbe99d5b94f", + "status": "Processing", + "progress": 65, + "message": "OCR processing page 13/20" +} +``` + +## Resource-Aware Job Queueing + +The `queueable` parameter enables intelligent resource-aware job queueing for heavy operations. When enabled: + +1. Jobs are automatically queued when system resources (CPU, memory) are constrained +2. Queue capacity dynamically adjusts based on available resources +3. Queue position and status updates are sent via WebSocket +4. Jobs with high `resourceWeight` values have stricter queueing conditions +5. Long-running jobs don't block the system from handling other requests + +### Resource Weight Guidelines + +When setting the `resourceWeight` parameter, use these guidelines: + +| Weight Range | Appropriate For | +|--------------|----------------| +| 1-20 | Lightweight operations: metadata reads, simple transforms, etc. | +| 21-50 | Medium operations: basic PDF manipulation, simple image operations | +| 51-80 | Heavy operations: PDF merging, image conversions, medium OCR | +| 81-100 | Very intensive operations: large OCR jobs, complex transformations | + +### Example Queue Status Messages + +```json +{ + "jobId": "b4c9a31d-4b7e-42b2-8ab9-3cbe99d5b94f", + "status": "Queued", + "progress": 0, + "message": "Waiting in queue for resources (position 3)" +} +``` + +```json +{ + "jobId": "b4c9a31d-4b7e-42b2-8ab9-3cbe99d5b94f", + "status": "Starting", + "progress": 10, + "message": "Resources available, starting job execution" +} +``` \ No newline at end of file