Queue, timeouts, retries, tests!

This commit is contained in:
Anthony Stirling 2025-06-03 01:18:07 +01:00
parent 2ee43b2070
commit 8d4f5f16cb
11 changed files with 1847 additions and 21 deletions

View File

@ -16,4 +16,32 @@ public @interface AutoJobPostMapping {
@AliasFor(annotation = RequestMapping.class, attribute = "consumes")
String[] consumes() default {"multipart/form-data"};
/**
* Custom timeout in milliseconds for this specific job. If not specified, the default system
* timeout will be used.
*/
long timeout() default -1;
/** Maximum number of times to retry the job on failure. Default is 1 (no retries). */
int retryCount() default 1;
/**
* Whether to track and report progress for this job. If enabled, the job will send progress
* updates through WebSocket.
*/
boolean trackProgress() default true;
/**
* Whether this job can be queued when system resources are limited. If enabled, jobs will be
* queued instead of rejected when the system is under high load. The queue size is dynamically
* adjusted based on available memory and CPU resources.
*/
boolean queueable() default false;
/**
* Optional resource weight of this job (1-100). Higher values indicate more resource-intensive
* jobs that may need stricter queuing. Default is 50 (medium weight).
*/
int resourceWeight() default 50;
}

View File

@ -1,6 +1,8 @@
package stirling.software.common.aop;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.*;
@ -13,7 +15,9 @@ import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import stirling.software.common.annotations.AutoJobPostMapping;
import stirling.software.common.controller.WebSocketProgressController;
import stirling.software.common.model.api.PDFFile;
import stirling.software.common.model.job.JobProgress;
import stirling.software.common.service.FileOrUploadService;
import stirling.software.common.service.FileStorage;
import stirling.software.common.service.JobExecutorService;
@ -28,15 +32,26 @@ public class AutoJobAspect {
private final HttpServletRequest request;
private final FileOrUploadService fileOrUploadService;
private final FileStorage fileStorage;
private final WebSocketProgressController webSocketSender;
@Around("@annotation(autoJobPostMapping)")
public Object wrapWithJobExecution(
ProceedingJoinPoint joinPoint, AutoJobPostMapping autoJobPostMapping) {
// Extract parameters from the request and annotation
boolean async = Boolean.parseBoolean(request.getParameter("async"));
long timeout = autoJobPostMapping.timeout();
int retryCount = autoJobPostMapping.retryCount();
boolean trackProgress = autoJobPostMapping.trackProgress();
log.debug(
"AutoJobPostMapping execution with async={}, timeout={}, retryCount={}, trackProgress={}",
async,
timeout > 0 ? timeout : "default",
retryCount,
trackProgress);
// Inspect and possibly mutate arguments
Object[] args = joinPoint.getArgs();
boolean isAsyncRequest = async;
for (int i = 0; i < args.length; i++) {
Object arg = args[i];
@ -54,7 +69,7 @@ public class AutoJobAspect {
}
}
// Case 2: For async requests, we need to make a copy of the MultipartFile
else if (isAsyncRequest && pdfFile.getFileInput() != null) {
else if (async && pdfFile.getFileInput() != null) {
try {
log.debug("Making persistent copy of uploaded file for async processing");
MultipartFile originalFile = pdfFile.getFileInput();
@ -76,20 +91,159 @@ public class AutoJobAspect {
}
}
// Wrap job execution
// Extract queueable and resourceWeight parameters
boolean queueable = autoJobPostMapping.queueable();
int resourceWeight = autoJobPostMapping.resourceWeight();
// Integrate with the enhanced JobExecutorService
if (retryCount <= 1) {
// No retries needed, simple execution
return jobExecutorService.runJobGeneric(
async,
() -> {
try {
if (trackProgress && async) {
String jobId = (String) request.getAttribute("jobId");
if (jobId != null) {
webSocketSender.sendProgress(
jobId,
new JobProgress(
jobId,
"Processing",
50,
"Executing operation"));
}
}
return joinPoint.proceed(args);
} catch (Throwable ex) {
log.error(
"AutoJobAspect caught exception during job execution: {}",
ex.getMessage(),
ex);
throw new RuntimeException(ex);
}
},
timeout,
queueable,
resourceWeight);
} else {
// Use retry logic
return executeWithRetries(
joinPoint,
args,
async,
timeout,
retryCount,
trackProgress,
queueable,
resourceWeight);
}
}
private Object executeWithRetries(
ProceedingJoinPoint joinPoint,
Object[] args,
boolean async,
long timeout,
int maxRetries,
boolean trackProgress,
boolean queueable,
int resourceWeight) {
AtomicInteger attempts = new AtomicInteger(0);
// Use AtomicReference to make the jobId effectively final for lambda usage
AtomicReference<String> jobIdRef = new AtomicReference<>();
return jobExecutorService.runJobGeneric(
async,
() -> {
int currentAttempt = attempts.incrementAndGet();
try {
if (trackProgress && async) {
// Only get the jobId once and store it in the AtomicReference
if (jobIdRef.get() == null) {
jobIdRef.set(getJobIdFromContext());
}
String jobId = jobIdRef.get();
if (jobId != null) {
webSocketSender.sendProgress(
jobId,
new JobProgress(
jobId,
"Started",
0,
"Executing (attempt "
+ currentAttempt
+ " of "
+ Math.max(1, maxRetries)
+ ")"));
}
}
return joinPoint.proceed(args);
} catch (Throwable ex) {
log.error(
"AutoJobAspect caught exception during job execution: {}",
"AutoJobAspect caught exception during job execution (attempt {}/{}): {}",
currentAttempt,
Math.max(1, maxRetries),
ex.getMessage(),
ex);
// Ensure we wrap the exception but preserve the original message
throw new RuntimeException(ex);
// Check if we should retry
if (currentAttempt < maxRetries) {
log.info(
"Retrying operation, attempt {}/{}",
currentAttempt + 1,
maxRetries);
String jobId = jobIdRef.get();
if (trackProgress && async && jobId != null) {
webSocketSender.sendProgress(
jobId,
new JobProgress(
jobId,
"Retrying",
(int) (currentAttempt * 100.0 / maxRetries),
"Retry attempt "
+ (currentAttempt + 1)
+ " of "
+ maxRetries));
}
try {
// Simple exponential backoff
Thread.sleep(100 * currentAttempt);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
// Recursive call to retry
return executeWithRetries(
joinPoint,
args,
async,
timeout,
maxRetries,
trackProgress,
queueable,
resourceWeight);
}
// No more retries, throw the exception
throw new RuntimeException("Job failed: " + ex.getMessage(), ex);
}
});
},
timeout,
queueable,
resourceWeight);
}
// Try to get the job ID from the context (if this is an async job)
private String getJobIdFromContext() {
try {
return (String) request.getAttribute("jobId");
} catch (Exception e) {
log.debug("Could not retrieve job ID from context: {}", e.getMessage());
return null;
}
}
}

View File

@ -11,7 +11,6 @@ import org.springframework.web.bind.annotation.RestController;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import stirling.software.common.annotations.AutoJobPostMapping;
import stirling.software.common.model.job.JobResult;
import stirling.software.common.model.job.JobStats;
import stirling.software.common.service.FileStorage;

View File

@ -3,6 +3,7 @@ package stirling.software.common.service;
import java.io.IOException;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@ -16,6 +17,8 @@ import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Service;
import org.springframework.web.multipart.MultipartFile;
import jakarta.servlet.http.HttpServletRequest;
import lombok.extern.slf4j.Slf4j;
import stirling.software.common.controller.WebSocketProgressController;
@ -30,6 +33,9 @@ public class JobExecutorService {
private final TaskManager taskManager;
private final WebSocketProgressController webSocketSender;
private final FileStorage fileStorage;
private final HttpServletRequest request;
private final ResourceMonitor resourceMonitor;
private final JobQueue jobQueue;
private final ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
private final long effectiveTimeoutMs;
@ -37,11 +43,17 @@ public class JobExecutorService {
TaskManager taskManager,
WebSocketProgressController webSocketSender,
FileStorage fileStorage,
HttpServletRequest request,
ResourceMonitor resourceMonitor,
JobQueue jobQueue,
@Value("${spring.mvc.async.request-timeout:1200000}") long asyncRequestTimeoutMs,
@Value("${server.servlet.session.timeout:30m}") String sessionTimeout) {
this.taskManager = taskManager;
this.webSocketSender = webSocketSender;
this.fileStorage = fileStorage;
this.request = request;
this.resourceMonitor = resourceMonitor;
this.jobQueue = jobQueue;
// Parse session timeout and calculate effective timeout once during initialization
long sessionTimeoutMs = parseSessionTimeout(sessionTimeout);
@ -58,10 +70,94 @@ public class JobExecutorService {
* @return The response
*/
public ResponseEntity<?> runJobGeneric(boolean async, Supplier<Object> work) {
String jobId = UUID.randomUUID().toString();
log.debug("Running job with ID: {}, async: {}", jobId, async);
return runJobGeneric(async, work, -1);
}
if (async) {
/**
* Run a job either asynchronously or synchronously with a custom timeout
*
* @param async Whether to run the job asynchronously
* @param work The work to be done
* @param customTimeoutMs Custom timeout in milliseconds, or -1 to use the default
* @return The response
*/
public ResponseEntity<?> runJobGeneric(
boolean async, Supplier<Object> work, long customTimeoutMs) {
return runJobGeneric(async, work, customTimeoutMs, false, 50);
}
/**
* Run a job either asynchronously or synchronously with custom parameters
*
* @param async Whether to run the job asynchronously
* @param work The work to be done
* @param customTimeoutMs Custom timeout in milliseconds, or -1 to use the default
* @param queueable Whether this job can be queued when system resources are limited
* @param resourceWeight The resource weight of this job (1-100)
* @return The response
*/
public ResponseEntity<?> runJobGeneric(
boolean async,
Supplier<Object> work,
long customTimeoutMs,
boolean queueable,
int resourceWeight) {
String jobId = UUID.randomUUID().toString();
// Store the job ID in the request for potential use by other components
if (request != null) {
request.setAttribute("jobId", jobId);
}
// Determine which timeout to use
long timeoutToUse = customTimeoutMs > 0 ? customTimeoutMs : effectiveTimeoutMs;
log.debug(
"Running job with ID: {}, async: {}, timeout: {}ms, queueable: {}, weight: {}",
jobId,
async,
timeoutToUse,
queueable,
resourceWeight);
// Check if we need to queue this job based on resource availability
boolean shouldQueue =
queueable
&& async
&& // Only async jobs can be queued
resourceMonitor.shouldQueueJob(resourceWeight);
if (shouldQueue) {
// Queue the job instead of executing immediately
log.debug(
"Queueing job {} due to resource constraints (weight: {})",
jobId,
resourceWeight);
taskManager.createTask(jobId);
// Create a specialized wrapper that updates the TaskManager
Supplier<Object> wrappedWork =
() -> {
try {
Object result = work.get();
processJobResult(jobId, result);
return result;
} catch (Exception e) {
log.error(
"Error executing queued job {}: {}", jobId, e.getMessage(), e);
taskManager.setError(jobId, e.getMessage());
throw e;
}
};
// Queue the job and get the future
CompletableFuture<ResponseEntity<?>> future =
jobQueue.queueJob(jobId, resourceWeight, wrappedWork, timeoutToUse);
// Return immediately with job ID
return ResponseEntity.ok().body(new JobResponse<>(true, jobId, null));
} else if (async) {
taskManager.createTask(jobId);
webSocketSender.sendProgress(jobId, new JobProgress(jobId, "Started", 0, "Running"));
@ -69,18 +165,15 @@ public class JobExecutorService {
() -> {
try {
log.debug(
"Running async job {} with timeout {} ms",
jobId,
effectiveTimeoutMs);
"Running async job {} with timeout {} ms", jobId, timeoutToUse);
// Execute with timeout
Object result =
executeWithTimeout(() -> work.get(), effectiveTimeoutMs);
Object result = executeWithTimeout(() -> work.get(), timeoutToUse);
processJobResult(jobId, result);
webSocketSender.sendProgress(
jobId, new JobProgress(jobId, "Done", 100, "Complete"));
} catch (TimeoutException te) {
log.error("Job {} timed out after {} ms", jobId, effectiveTimeoutMs);
log.error("Job {} timed out after {} ms", jobId, timeoutToUse);
taskManager.setError(jobId, "Job timed out");
webSocketSender.sendProgress(
jobId, new JobProgress(jobId, "Error", 100, "Job timed out"));
@ -95,10 +188,10 @@ public class JobExecutorService {
return ResponseEntity.ok().body(new JobResponse<>(true, jobId, null));
} else {
try {
log.debug("Running sync job with timeout {} ms", effectiveTimeoutMs);
log.debug("Running sync job with timeout {} ms", timeoutToUse);
// Execute with timeout
Object result = executeWithTimeout(() -> work.get(), effectiveTimeoutMs);
Object result = executeWithTimeout(() -> work.get(), timeoutToUse);
// If the result is already a ResponseEntity, return it directly
if (result instanceof ResponseEntity) {
@ -108,9 +201,9 @@ public class JobExecutorService {
// Process different result types
return handleResultForSyncJob(result);
} catch (TimeoutException te) {
log.error("Synchronous job timed out after {} ms", effectiveTimeoutMs);
log.error("Synchronous job timed out after {} ms", timeoutToUse);
return ResponseEntity.internalServerError()
.body("Job timed out after " + effectiveTimeoutMs + " ms");
.body(Map.of("error", "Job timed out after " + timeoutToUse + " ms"));
} catch (Exception e) {
log.error("Error executing synchronous job: {}", e.getMessage(), e);
// Construct a JSON error response

View File

@ -0,0 +1,441 @@
package stirling.software.common.service;
import java.time.Instant;
import java.util.Map;
import java.util.concurrent.*;
import java.util.function.Supplier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Service;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import stirling.software.common.controller.WebSocketProgressController;
import stirling.software.common.model.job.JobProgress;
/**
* Manages a queue of jobs with dynamic sizing based on system resources. Used when system resources
* are limited to prevent overloading.
*/
@Service
@Slf4j
public class JobQueue {
private final ResourceMonitor resourceMonitor;
private final WebSocketProgressController webSocketSender;
@Value("${stirling.job.queue.base-capacity:10}")
private int baseQueueCapacity = 10;
@Value("${stirling.job.queue.min-capacity:2}")
private int minQueueCapacity = 2;
@Value("${stirling.job.queue.check-interval-ms:1000}")
private long queueCheckIntervalMs = 1000;
@Value("${stirling.job.queue.max-wait-time-ms:600000}")
private long maxWaitTimeMs = 600000; // 10 minutes
private BlockingQueue<QueuedJob> jobQueue;
private final Map<String, QueuedJob> jobMap = new ConcurrentHashMap<>();
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
private final ExecutorService jobExecutor;
// Initialize executor based on Java version
{
ExecutorService executor;
try {
// Try to use Virtual Threads (Java 21+)
executor = Executors.newVirtualThreadPerTaskExecutor();
log.info("Using Virtual Thread executor (Java 21+)");
} catch (NoSuchMethodError e) {
// Fall back to thread pool for Java < 21
executor = Executors.newCachedThreadPool();
log.info("Using cached thread pool executor (Java < 21)");
}
jobExecutor = executor;
}
private boolean shuttingDown = false;
@Getter private int rejectedJobs = 0;
@Getter private int totalQueuedJobs = 0;
@Getter private int currentQueueSize = 0;
/** Represents a job waiting in the queue. */
@Data
@AllArgsConstructor
private static class QueuedJob {
private final String jobId;
private final int resourceWeight;
private final Supplier<Object> work;
private final long timeoutMs;
private final Instant queuedAt;
private CompletableFuture<ResponseEntity<?>> future;
private volatile boolean cancelled = false;
}
public JobQueue(ResourceMonitor resourceMonitor, WebSocketProgressController webSocketSender) {
this.resourceMonitor = resourceMonitor;
this.webSocketSender = webSocketSender;
// Initialize with dynamic capacity
int capacity =
resourceMonitor.calculateDynamicQueueCapacity(baseQueueCapacity, minQueueCapacity);
this.jobQueue = new LinkedBlockingQueue<>(capacity);
}
@PostConstruct
public void initialize() {
log.info(
"Starting job queue with base capacity {}, min capacity {}",
baseQueueCapacity,
minQueueCapacity);
// Periodically process the job queue
scheduler.scheduleWithFixedDelay(
this::processQueue, 0, queueCheckIntervalMs, TimeUnit.MILLISECONDS);
// Periodically update queue capacity based on resource usage
scheduler.scheduleWithFixedDelay(
this::updateQueueCapacity,
10000, // Initial delay
30000, // 30 second interval
TimeUnit.MILLISECONDS);
}
@PreDestroy
public void shutdown() {
log.info("Shutting down job queue");
shuttingDown = true;
// Complete any futures that are still waiting
jobMap.forEach(
(id, job) -> {
if (!job.future.isDone()) {
job.future.completeExceptionally(
new RuntimeException("Server shutting down, job cancelled"));
}
});
scheduler.shutdownNow();
jobExecutor.shutdownNow();
log.info(
"Job queue shutdown complete. Stats: total={}, rejected={}",
totalQueuedJobs,
rejectedJobs);
}
/**
* Queues a job for execution when resources permit.
*
* @param jobId The job ID
* @param resourceWeight The resource weight of the job (1-100)
* @param work The work to be done
* @param timeoutMs The timeout in milliseconds
* @return A CompletableFuture that will complete when the job is executed
*/
public CompletableFuture<ResponseEntity<?>> queueJob(
String jobId, int resourceWeight, Supplier<Object> work, long timeoutMs) {
// Create a CompletableFuture to track this job's completion
CompletableFuture<ResponseEntity<?>> future = new CompletableFuture<>();
// Create the queued job
QueuedJob job =
new QueuedJob(jobId, resourceWeight, work, timeoutMs, Instant.now(), future, false);
// Store in our map for lookup
jobMap.put(jobId, job);
// Update stats
totalQueuedJobs++;
currentQueueSize = jobQueue.size();
// Try to add to the queue
try {
boolean added = jobQueue.offer(job, 5, TimeUnit.SECONDS);
if (!added) {
log.warn("Queue full, rejecting job {}", jobId);
rejectedJobs++;
future.completeExceptionally(
new RuntimeException("Job queue full, please try again later"));
jobMap.remove(jobId);
return future;
}
log.debug(
"Job {} queued for execution (weight: {}, queue size: {})",
jobId,
resourceWeight,
jobQueue.size());
// Notify client via WebSocket that job is queued
webSocketSender.sendProgress(
jobId, new JobProgress(jobId, "Queued", 0, "Waiting in queue for resources"));
return future;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
future.completeExceptionally(new RuntimeException("Job queue interrupted"));
jobMap.remove(jobId);
return future;
}
}
/**
* Gets the current capacity of the job queue.
*
* @return The current capacity
*/
public int getQueueCapacity() {
return ((LinkedBlockingQueue<QueuedJob>) jobQueue).remainingCapacity() + jobQueue.size();
}
/** Updates the capacity of the job queue based on available system resources. */
private void updateQueueCapacity() {
try {
// Calculate new capacity
int newCapacity =
resourceMonitor.calculateDynamicQueueCapacity(
baseQueueCapacity, minQueueCapacity);
int currentCapacity = getQueueCapacity();
if (newCapacity != currentCapacity) {
log.debug(
"Updating job queue capacity from {} to {}", currentCapacity, newCapacity);
// Create new queue with updated capacity
BlockingQueue<QueuedJob> newQueue = new LinkedBlockingQueue<>(newCapacity);
// Transfer jobs from old queue to new queue
jobQueue.drainTo(newQueue);
jobQueue = newQueue;
currentQueueSize = jobQueue.size();
}
} catch (Exception e) {
log.error("Error updating queue capacity: {}", e.getMessage(), e);
}
}
/** Processes jobs in the queue, executing them when resources permit. */
private void processQueue() {
if (shuttingDown || jobQueue.isEmpty()) {
return;
}
try {
// Get current resource status
ResourceMonitor.ResourceStatus status = resourceMonitor.getCurrentStatus().get();
// Check if we should execute any jobs
boolean canExecuteJobs = (status != ResourceMonitor.ResourceStatus.CRITICAL);
if (!canExecuteJobs) {
// Under critical load, don't execute any jobs
log.debug("System under critical load, delaying job execution");
return;
}
// Get jobs from the queue, up to a limit based on resource availability
int jobsToProcess =
Math.max(
1,
switch (status) {
case OK -> 3;
case WARNING -> 1;
case CRITICAL -> 0;
});
for (int i = 0; i < jobsToProcess && !jobQueue.isEmpty(); i++) {
QueuedJob job = jobQueue.poll();
if (job == null) break;
// Check if it's been waiting too long
long waitTimeMs = Instant.now().toEpochMilli() - job.queuedAt.toEpochMilli();
if (waitTimeMs > maxWaitTimeMs) {
log.warn(
"Job {} exceeded maximum wait time ({} ms), executing anyway",
job.jobId,
waitTimeMs);
}
// Remove from our map
jobMap.remove(job.jobId);
currentQueueSize = jobQueue.size();
// Notify via WebSocket
webSocketSender.sendProgress(
job.jobId,
new JobProgress(job.jobId, "Processing", 10, "Starting job execution"));
// Execute the job
executeJob(job);
}
} catch (Exception e) {
log.error("Error processing job queue: {}", e.getMessage(), e);
}
}
/**
* Executes a job from the queue.
*
* @param job The job to execute
*/
private void executeJob(QueuedJob job) {
if (job.cancelled) {
log.debug("Job {} was cancelled, not executing", job.jobId);
return;
}
jobExecutor.execute(
() -> {
log.debug("Executing queued job {} (queued at {})", job.jobId, job.queuedAt);
try {
// Execute with timeout
Object result = executeWithTimeout(job.work, job.timeoutMs);
// Process the result
if (result instanceof ResponseEntity) {
job.future.complete((ResponseEntity<?>) result);
} else {
job.future.complete(ResponseEntity.ok(result));
}
// Update WebSocket
webSocketSender.sendProgress(
job.jobId,
new JobProgress(
job.jobId, "Complete", 100, "Job completed successfully"));
} catch (Exception e) {
log.error(
"Error executing queued job {}: {}", job.jobId, e.getMessage(), e);
job.future.completeExceptionally(e);
// Update WebSocket
webSocketSender.sendProgress(
job.jobId,
new JobProgress(
job.jobId, "Error", 100, "Job failed: " + e.getMessage()));
}
});
}
/**
* Execute a supplier with a timeout.
*
* @param supplier The supplier to execute
* @param timeoutMs The timeout in milliseconds
* @return The result from the supplier
* @throws Exception If there is an execution error
*/
private <T> T executeWithTimeout(Supplier<T> supplier, long timeoutMs) throws Exception {
CompletableFuture<T> future = CompletableFuture.supplyAsync(supplier);
try {
if (timeoutMs <= 0) {
// No timeout
return future.join();
} else {
// With timeout
return future.get(timeoutMs, TimeUnit.MILLISECONDS);
}
} catch (TimeoutException e) {
future.cancel(true);
throw new TimeoutException("Job timed out after " + timeoutMs + "ms");
} catch (ExecutionException e) {
throw (Exception) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new InterruptedException("Job was interrupted");
}
}
/**
* Checks if a job is queued.
*
* @param jobId The job ID
* @return true if the job is queued
*/
public boolean isJobQueued(String jobId) {
return jobMap.containsKey(jobId);
}
/**
* Gets the current position of a job in the queue.
*
* @param jobId The job ID
* @return The position (0-based) or -1 if not found
*/
public int getJobPosition(String jobId) {
if (!jobMap.containsKey(jobId)) {
return -1;
}
// Count positions
int position = 0;
for (QueuedJob job : jobQueue) {
if (job.jobId.equals(jobId)) {
return position;
}
position++;
}
// If we didn't find it in the queue but it's in the map,
// it might be executing already
return -1;
}
/**
* Cancels a queued job.
*
* @param jobId The job ID
* @return true if the job was cancelled, false if not found
*/
public boolean cancelJob(String jobId) {
QueuedJob job = jobMap.remove(jobId);
if (job != null) {
job.cancelled = true;
job.future.completeExceptionally(new RuntimeException("Job cancelled by user"));
// Try to remove from queue if it's still there
jobQueue.remove(job);
currentQueueSize = jobQueue.size();
log.debug("Job {} cancelled", jobId);
// Update WebSocket
webSocketSender.sendProgress(
jobId, new JobProgress(jobId, "Cancelled", 100, "Job cancelled by user"));
return true;
}
return false;
}
/**
* Get queue statistics.
*
* @return A map containing queue statistics
*/
public Map<String, Object> getQueueStats() {
return Map.of(
"queuedJobs", jobQueue.size(),
"queueCapacity", getQueueCapacity(),
"totalQueuedJobs", totalQueuedJobs,
"rejectedJobs", rejectedJobs,
"resourceStatus", resourceMonitor.getCurrentStatus().get().name());
}
}

View File

@ -0,0 +1,277 @@
package stirling.software.common.service;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean;
import java.lang.management.OperatingSystemMXBean;
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
/**
* Monitors system resources (CPU, memory) to inform job scheduling decisions. Provides information
* about available resources to prevent overloading the system.
*/
@Service
@Slf4j
public class ResourceMonitor {
@Value("${stirling.resource.memory.critical-threshold:0.9}")
private double memoryCriticalThreshold = 0.9; // 90% usage is critical
@Value("${stirling.resource.memory.high-threshold:0.75}")
private double memoryHighThreshold = 0.75; // 75% usage is high
@Value("${stirling.resource.cpu.critical-threshold:0.9}")
private double cpuCriticalThreshold = 0.9; // 90% usage is critical
@Value("${stirling.resource.cpu.high-threshold:0.75}")
private double cpuHighThreshold = 0.75; // 75% usage is high
@Value("${stirling.resource.monitor.interval-ms:5000}")
private long monitorIntervalMs = 5000; // 5 seconds
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
private final MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
private final OperatingSystemMXBean osMXBean = ManagementFactory.getOperatingSystemMXBean();
@Getter
private final AtomicReference<ResourceStatus> currentStatus =
new AtomicReference<>(ResourceStatus.OK);
@Getter
private final AtomicReference<ResourceMetrics> latestMetrics =
new AtomicReference<>(new ResourceMetrics());
/** Represents the current status of system resources. */
public enum ResourceStatus {
/** Resources are available, normal operations can proceed */
OK,
/** Resources are under strain, consider queueing high-resource operations */
WARNING,
/** Resources are critically low, queue all operations */
CRITICAL
}
/** Detailed metrics about system resources. */
@Getter
public static class ResourceMetrics {
private final double cpuUsage;
private final double memoryUsage;
private final long freeMemoryBytes;
private final long totalMemoryBytes;
private final long maxMemoryBytes;
private final Instant timestamp;
public ResourceMetrics() {
this(0, 0, 0, 0, 0, Instant.now());
}
public ResourceMetrics(
double cpuUsage,
double memoryUsage,
long freeMemoryBytes,
long totalMemoryBytes,
long maxMemoryBytes,
Instant timestamp) {
this.cpuUsage = cpuUsage;
this.memoryUsage = memoryUsage;
this.freeMemoryBytes = freeMemoryBytes;
this.totalMemoryBytes = totalMemoryBytes;
this.maxMemoryBytes = maxMemoryBytes;
this.timestamp = timestamp;
}
/**
* Gets the age of these metrics.
*
* @return Duration since these metrics were collected
*/
public Duration getAge() {
return Duration.between(timestamp, Instant.now());
}
/**
* Check if these metrics are stale (older than threshold).
*
* @param thresholdMs Staleness threshold in milliseconds
* @return true if metrics are stale
*/
public boolean isStale(long thresholdMs) {
return getAge().toMillis() > thresholdMs;
}
}
@PostConstruct
public void initialize() {
log.info("Starting resource monitoring with interval of {}ms", monitorIntervalMs);
scheduler.scheduleAtFixedRate(
this::updateResourceMetrics, 0, monitorIntervalMs, TimeUnit.MILLISECONDS);
}
@PreDestroy
public void shutdown() {
log.info("Shutting down resource monitoring");
scheduler.shutdownNow();
}
/** Updates the resource metrics by sampling current system state. */
private void updateResourceMetrics() {
try {
// Get CPU usage
double cpuUsage = osMXBean.getSystemLoadAverage() / osMXBean.getAvailableProcessors();
if (cpuUsage < 0) cpuUsage = getAlternativeCpuLoad(); // Fallback if not available
// Get memory usage
long heapUsed = memoryMXBean.getHeapMemoryUsage().getUsed();
long nonHeapUsed = memoryMXBean.getNonHeapMemoryUsage().getUsed();
long totalUsed = heapUsed + nonHeapUsed;
long maxMemory = Runtime.getRuntime().maxMemory();
long totalMemory = Runtime.getRuntime().totalMemory();
long freeMemory = Runtime.getRuntime().freeMemory();
double memoryUsage = (double) totalUsed / maxMemory;
// Create new metrics
ResourceMetrics metrics =
new ResourceMetrics(
cpuUsage,
memoryUsage,
freeMemory,
totalMemory,
maxMemory,
Instant.now());
latestMetrics.set(metrics);
// Determine system status
ResourceStatus newStatus;
if (cpuUsage > cpuCriticalThreshold || memoryUsage > memoryCriticalThreshold) {
newStatus = ResourceStatus.CRITICAL;
} else if (cpuUsage > cpuHighThreshold || memoryUsage > memoryHighThreshold) {
newStatus = ResourceStatus.WARNING;
} else {
newStatus = ResourceStatus.OK;
}
// Update status if it changed
ResourceStatus oldStatus = currentStatus.getAndSet(newStatus);
if (oldStatus != newStatus) {
log.info("System resource status changed from {} to {}", oldStatus, newStatus);
log.info(
"Current metrics - CPU: {:.1f}%, Memory: {:.1f}%, Free Memory: {} MB",
cpuUsage * 100, memoryUsage * 100, freeMemory / (1024 * 1024));
}
} catch (Exception e) {
log.error("Error updating resource metrics: {}", e.getMessage(), e);
}
}
/**
* Alternative method to estimate CPU load if getSystemLoadAverage() is not available. This is a
* fallback and less accurate than the official JMX method.
*
* @return Estimated CPU load as a value between 0.0 and 1.0
*/
private double getAlternativeCpuLoad() {
try {
// Try to get CPU time if available through reflection
// This is a fallback since we can't directly cast to platform-specific classes
try {
java.lang.reflect.Method m =
osMXBean.getClass().getDeclaredMethod("getProcessCpuLoad");
m.setAccessible(true);
return (double) m.invoke(osMXBean);
} catch (Exception e) {
// Try the older method
try {
java.lang.reflect.Method m =
osMXBean.getClass().getDeclaredMethod("getSystemCpuLoad");
m.setAccessible(true);
return (double) m.invoke(osMXBean);
} catch (Exception e2) {
log.warn(
"Could not get CPU load through reflection, assuming moderate load (0.5)");
return 0.5;
}
}
} catch (Exception e) {
log.warn("Could not get CPU load, assuming moderate load (0.5)");
return 0.5; // Default to moderate load
}
}
/**
* Calculates the dynamic job queue capacity based on current resource usage.
*
* @param baseCapacity The base capacity when system is under minimal load
* @param minCapacity The minimum capacity to maintain even under high load
* @return The calculated job queue capacity
*/
public int calculateDynamicQueueCapacity(int baseCapacity, int minCapacity) {
ResourceMetrics metrics = latestMetrics.get();
ResourceStatus status = currentStatus.get();
// Simple linear reduction based on memory and CPU load
double capacityFactor =
switch (status) {
case OK -> 1.0;
case WARNING -> 0.6;
case CRITICAL -> 0.3;
};
// Apply additional reduction based on specific memory pressure
if (metrics.memoryUsage > 0.8) {
capacityFactor *= 0.5; // Further reduce capacity under memory pressure
}
// Calculate capacity with minimum safeguard
int capacity = (int) Math.max(minCapacity, Math.ceil(baseCapacity * capacityFactor));
log.debug(
"Dynamic queue capacity: {} (base: {}, factor: {:.2f}, status: {})",
capacity,
baseCapacity,
capacityFactor,
status);
return capacity;
}
/**
* Checks if a job with the given weight can be executed immediately or should be queued based
* on current resource availability.
*
* @param resourceWeight The resource weight of the job (1-100)
* @return true if the job should be queued, false if it can run immediately
*/
public boolean shouldQueueJob(int resourceWeight) {
ResourceStatus status = currentStatus.get();
// Always run lightweight jobs (weight < 20) unless critical
if (resourceWeight < 20 && status != ResourceStatus.CRITICAL) {
return false;
}
// Medium weight jobs run immediately if resources are OK
if (resourceWeight < 60 && status == ResourceStatus.OK) {
return false;
}
// Heavy jobs (weight >= 60) and any job during WARNING/CRITICAL should be queued
return true;
}
}

View File

@ -0,0 +1,214 @@
package stirling.software.common.annotations;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import org.junit.jupiter.api.BeforeEach;
import java.util.Arrays;
import java.util.function.Supplier;
import org.aspectj.lang.ProceedingJoinPoint;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.http.ResponseEntity;
import org.springframework.web.multipart.MultipartFile;
import jakarta.servlet.http.HttpServletRequest;
import stirling.software.common.aop.AutoJobAspect;
import stirling.software.common.controller.WebSocketProgressController;
import stirling.software.common.model.api.PDFFile;
import stirling.software.common.service.FileOrUploadService;
import stirling.software.common.service.FileStorage;
import stirling.software.common.service.JobExecutorService;
import stirling.software.common.service.JobQueue;
import stirling.software.common.service.ResourceMonitor;
@ExtendWith(MockitoExtension.class)
class AutoJobPostMappingIntegrationTest {
private AutoJobAspect autoJobAspect;
@Mock
private JobExecutorService jobExecutorService;
@Mock
private HttpServletRequest request;
@Mock
private FileOrUploadService fileOrUploadService;
@Mock
private FileStorage fileStorage;
@Mock
private WebSocketProgressController webSocketSender;
@Mock
private ResourceMonitor resourceMonitor;
@Mock
private JobQueue jobQueue;
@BeforeEach
void setUp() {
autoJobAspect = new AutoJobAspect(
jobExecutorService,
request,
fileOrUploadService,
fileStorage,
webSocketSender
);
}
@Mock
private ProceedingJoinPoint joinPoint;
@Mock
private AutoJobPostMapping autoJobPostMapping;
@Captor
private ArgumentCaptor<Supplier<Object>> workCaptor;
@Captor
private ArgumentCaptor<Boolean> asyncCaptor;
@Captor
private ArgumentCaptor<Long> timeoutCaptor;
@Captor
private ArgumentCaptor<Boolean> queueableCaptor;
@Captor
private ArgumentCaptor<Integer> resourceWeightCaptor;
@Test
void shouldExecuteWithCustomParameters() throws Throwable {
// Given
PDFFile pdfFile = new PDFFile();
pdfFile.setFileId("test-file-id");
Object[] args = new Object[] { pdfFile };
when(joinPoint.getArgs()).thenReturn(args);
when(request.getParameter("async")).thenReturn("true");
when(autoJobPostMapping.timeout()).thenReturn(60000L);
when(autoJobPostMapping.retryCount()).thenReturn(3);
when(autoJobPostMapping.trackProgress()).thenReturn(true);
when(autoJobPostMapping.queueable()).thenReturn(true);
when(autoJobPostMapping.resourceWeight()).thenReturn(75);
MultipartFile mockFile = mock(MultipartFile.class);
when(fileStorage.retrieveFile("test-file-id")).thenReturn(mockFile);
when(jobExecutorService.runJobGeneric(
anyBoolean(), any(Supplier.class), anyLong(), anyBoolean(), anyInt()))
.thenReturn(ResponseEntity.ok("success"));
// When
Object result = autoJobAspect.wrapWithJobExecution(joinPoint, autoJobPostMapping);
// Then
assertEquals(ResponseEntity.ok("success"), result);
verify(jobExecutorService).runJobGeneric(
asyncCaptor.capture(),
workCaptor.capture(),
timeoutCaptor.capture(),
queueableCaptor.capture(),
resourceWeightCaptor.capture());
assertTrue(asyncCaptor.getValue(), "Async should be true");
assertEquals(60000L, timeoutCaptor.getValue(), "Timeout should be 60000ms");
assertTrue(queueableCaptor.getValue(), "Queueable should be true");
assertEquals(75, resourceWeightCaptor.getValue(), "Resource weight should be 75");
// Test that file was resolved
assertNotNull(pdfFile.getFileInput(), "File input should be set");
}
@Test
void shouldRetryOnError() throws Throwable {
// Given
when(joinPoint.getArgs()).thenReturn(new Object[0]);
when(request.getParameter("async")).thenReturn("false");
when(autoJobPostMapping.timeout()).thenReturn(-1L);
when(autoJobPostMapping.retryCount()).thenReturn(2);
when(autoJobPostMapping.trackProgress()).thenReturn(false);
when(autoJobPostMapping.queueable()).thenReturn(false);
when(autoJobPostMapping.resourceWeight()).thenReturn(50);
// First call throws exception, second succeeds
when(joinPoint.proceed(any()))
.thenThrow(new RuntimeException("First attempt failed"))
.thenReturn(ResponseEntity.ok("retry succeeded"));
// Mock jobExecutorService to execute the work immediately
when(jobExecutorService.runJobGeneric(
anyBoolean(), any(Supplier.class), anyLong(), anyBoolean(), anyInt()))
.thenAnswer(invocation -> {
Supplier<Object> work = invocation.getArgument(1);
return work.get();
});
// When
Object result = autoJobAspect.wrapWithJobExecution(joinPoint, autoJobPostMapping);
// Then
assertEquals(ResponseEntity.ok("retry succeeded"), result);
// Verify that proceed was called twice (initial attempt + 1 retry)
verify(joinPoint, times(2)).proceed(any());
}
@Test
void shouldHandlePDFFileWithAsyncRequests() throws Throwable {
// Given
PDFFile pdfFile = new PDFFile();
pdfFile.setFileInput(mock(MultipartFile.class));
Object[] args = new Object[] { pdfFile };
when(joinPoint.getArgs()).thenReturn(args);
when(request.getParameter("async")).thenReturn("true");
when(autoJobPostMapping.retryCount()).thenReturn(1);
when(fileStorage.storeFile(any(MultipartFile.class))).thenReturn("stored-file-id");
when(fileStorage.retrieveFile("stored-file-id")).thenReturn(mock(MultipartFile.class));
// Mock job executor to return a successful response
when(jobExecutorService.runJobGeneric(
anyBoolean(), any(Supplier.class), anyLong(), anyBoolean(), anyInt()))
.thenReturn(ResponseEntity.ok("success"));
// When
autoJobAspect.wrapWithJobExecution(joinPoint, autoJobPostMapping);
// Then
assertEquals("stored-file-id", pdfFile.getFileId(),
"FileId should be set to the stored file id");
assertNotNull(pdfFile.getFileInput(), "FileInput should be replaced with persistent file");
// Verify storage operations
verify(fileStorage).storeFile(any(MultipartFile.class));
verify(fileStorage).retrieveFile("stored-file-id");
}
}
// Move PDFFileTest to its own class file if needed

View File

@ -0,0 +1,208 @@
package stirling.software.common.service;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.test.util.ReflectionTestUtils;
import jakarta.servlet.http.HttpServletRequest;
import stirling.software.common.controller.WebSocketProgressController;
import stirling.software.common.model.job.JobProgress;
import stirling.software.common.model.job.JobResponse;
@ExtendWith(MockitoExtension.class)
class JobExecutorServiceTest {
private JobExecutorService jobExecutorService;
@Mock
private TaskManager taskManager;
@Mock
private WebSocketProgressController webSocketSender;
@Mock
private FileStorage fileStorage;
@Mock
private HttpServletRequest request;
@Mock
private ResourceMonitor resourceMonitor;
@Mock
private JobQueue jobQueue;
@Captor
private ArgumentCaptor<String> jobIdCaptor;
@BeforeEach
void setUp() {
// Initialize the service manually with all its dependencies
jobExecutorService = new JobExecutorService(
taskManager,
webSocketSender,
fileStorage,
request,
resourceMonitor,
jobQueue,
30000L, // asyncRequestTimeoutMs
"30m" // sessionTimeout
);
}
@Test
void shouldRunSyncJobSuccessfully() throws Exception {
// Given
Supplier<Object> work = () -> "test-result";
// When
ResponseEntity<?> response = jobExecutorService.runJobGeneric(false, work);
// Then
assertEquals(HttpStatus.OK, response.getStatusCode());
assertEquals("test-result", response.getBody());
// Verify request attribute was set with jobId
verify(request).setAttribute(eq("jobId"), anyString());
}
@Test
void shouldRunAsyncJobSuccessfully() throws Exception {
// Given
Supplier<Object> work = () -> "test-result";
// When
ResponseEntity<?> response = jobExecutorService.runJobGeneric(true, work);
// Then
assertEquals(HttpStatus.OK, response.getStatusCode());
assertTrue(response.getBody() instanceof JobResponse);
JobResponse<?> jobResponse = (JobResponse<?>) response.getBody();
assertTrue(jobResponse.isAsync());
assertNotNull(jobResponse.getJobId());
// Verify task manager was called
verify(taskManager).createTask(jobIdCaptor.capture());
verify(webSocketSender).sendProgress(eq(jobIdCaptor.getValue()), any(JobProgress.class));
}
@Test
void shouldHandleSyncJobError() {
// Given
Supplier<Object> work = () -> {
throw new RuntimeException("Test error");
};
// When
ResponseEntity<?> response = jobExecutorService.runJobGeneric(false, work);
// Then
assertEquals(HttpStatus.INTERNAL_SERVER_ERROR, response.getStatusCode());
@SuppressWarnings("unchecked")
Map<String, String> errorMap = (Map<String, String>) response.getBody();
assertEquals("Job failed: Test error", errorMap.get("error"));
}
@Test
void shouldQueueJobWhenResourcesLimited() {
// Given
Supplier<Object> work = () -> "test-result";
CompletableFuture<ResponseEntity<?>> future = new CompletableFuture<>();
// Configure resourceMonitor to indicate job should be queued
when(resourceMonitor.shouldQueueJob(80)).thenReturn(true);
// Configure jobQueue to return our future
when(jobQueue.queueJob(anyString(), eq(80), any(), anyLong())).thenReturn(future);
// When
ResponseEntity<?> response = jobExecutorService.runJobGeneric(
true, work, 5000, true, 80);
// Then
assertEquals(HttpStatus.OK, response.getStatusCode());
assertTrue(response.getBody() instanceof JobResponse);
// Verify job was queued
verify(jobQueue).queueJob(anyString(), eq(80), any(), eq(5000L));
verify(taskManager).createTask(anyString());
}
@Test
void shouldUseCustomTimeoutWhenProvided() throws Exception {
// Given
Supplier<Object> work = () -> "test-result";
long customTimeout = 60000L;
// Use reflection to access the private executeWithTimeout method
java.lang.reflect.Method executeMethod = JobExecutorService.class
.getDeclaredMethod("executeWithTimeout", Supplier.class, long.class);
executeMethod.setAccessible(true);
// Create a spy on the JobExecutorService to verify method calls
JobExecutorService spy = Mockito.spy(jobExecutorService);
// When
spy.runJobGeneric(false, work, customTimeout);
// Then
verify(spy).runJobGeneric(eq(false), any(Supplier.class), eq(customTimeout));
}
@Test
void shouldHandleTimeout() throws Exception {
// Given
Supplier<Object> work = () -> {
try {
Thread.sleep(100); // Simulate long-running job
return "test-result";
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
};
// Use reflection to access the private executeWithTimeout method
java.lang.reflect.Method executeMethod = JobExecutorService.class
.getDeclaredMethod("executeWithTimeout", Supplier.class, long.class);
executeMethod.setAccessible(true);
// When/Then
try {
executeMethod.invoke(jobExecutorService, work, 1L); // Very short timeout
} catch (Exception e) {
assertTrue(e.getCause() instanceof TimeoutException);
}
}
}

View File

@ -0,0 +1,107 @@
package stirling.software.common.service;
import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import stirling.software.common.controller.WebSocketProgressController;
import stirling.software.common.model.job.JobProgress;
import stirling.software.common.service.ResourceMonitor.ResourceStatus;
@ExtendWith(MockitoExtension.class)
class JobQueueTest {
private JobQueue jobQueue;
@Mock
private ResourceMonitor resourceMonitor;
@Mock
private WebSocketProgressController webSocketSender;
private final AtomicReference<ResourceStatus> statusRef = new AtomicReference<>(ResourceStatus.OK);
@BeforeEach
void setUp() {
// Mark stubbing as lenient to avoid UnnecessaryStubbingException
lenient().when(resourceMonitor.calculateDynamicQueueCapacity(anyInt(), anyInt())).thenReturn(10);
lenient().when(resourceMonitor.getCurrentStatus()).thenReturn(statusRef);
jobQueue = new JobQueue(resourceMonitor, webSocketSender);
}
@Test
void shouldQueueJob() {
String jobId = "test-job-1";
int resourceWeight = 50;
Supplier<Object> work = () -> "test-result";
long timeoutMs = 1000;
jobQueue.queueJob(jobId, resourceWeight, work, timeoutMs);
verify(webSocketSender).sendProgress(
org.mockito.ArgumentMatchers.eq(jobId),
org.mockito.ArgumentMatchers.any(JobProgress.class));
assertTrue(jobQueue.isJobQueued(jobId));
assertEquals(1, jobQueue.getTotalQueuedJobs());
}
@Test
void shouldCancelJob() {
String jobId = "test-job-2";
Supplier<Object> work = () -> "test-result";
jobQueue.queueJob(jobId, 50, work, 1000);
boolean cancelled = jobQueue.cancelJob(jobId);
assertTrue(cancelled);
assertFalse(jobQueue.isJobQueued(jobId));
}
@Test
void shouldGetQueueStats() {
when(resourceMonitor.getCurrentStatus()).thenReturn(statusRef);
jobQueue.queueJob("job1", 50, () -> "ok", 1000);
jobQueue.queueJob("job2", 50, () -> "ok", 1000);
jobQueue.cancelJob("job2");
Map<String, Object> stats = jobQueue.getQueueStats();
assertEquals(2, stats.get("totalQueuedJobs"));
assertTrue(stats.containsKey("queuedJobs"));
assertTrue(stats.containsKey("resourceStatus"));
}
@Test
void shouldCalculateQueueCapacity() {
when(resourceMonitor.calculateDynamicQueueCapacity(5, 2)).thenReturn(8);
int capacity = resourceMonitor.calculateDynamicQueueCapacity(5, 2);
assertEquals(8, capacity);
}
@Test
void shouldCheckIfJobIsQueued() {
String jobId = "job-123";
Supplier<Object> work = () -> "hello";
jobQueue.queueJob(jobId, 40, work, 500);
assertTrue(jobQueue.isJobQueued(jobId));
assertFalse(jobQueue.isJobQueued("nonexistent"));
}
}

View File

@ -0,0 +1,137 @@
package stirling.software.common.service;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.lang.management.MemoryMXBean;
import java.lang.management.MemoryUsage;
import java.lang.management.OperatingSystemMXBean;
import java.time.Instant;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Spy;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.test.util.ReflectionTestUtils;
import stirling.software.common.service.ResourceMonitor.ResourceMetrics;
import stirling.software.common.service.ResourceMonitor.ResourceStatus;
@ExtendWith(MockitoExtension.class)
class ResourceMonitorTest {
@InjectMocks
private ResourceMonitor resourceMonitor;
@Mock
private OperatingSystemMXBean osMXBean;
@Mock
private MemoryMXBean memoryMXBean;
@Spy
private AtomicReference<ResourceStatus> currentStatus = new AtomicReference<>(ResourceStatus.OK);
@Spy
private AtomicReference<ResourceMetrics> latestMetrics = new AtomicReference<>(new ResourceMetrics());
@BeforeEach
void setUp() {
// Set thresholds for testing
ReflectionTestUtils.setField(resourceMonitor, "memoryCriticalThreshold", 0.9);
ReflectionTestUtils.setField(resourceMonitor, "memoryHighThreshold", 0.75);
ReflectionTestUtils.setField(resourceMonitor, "cpuCriticalThreshold", 0.9);
ReflectionTestUtils.setField(resourceMonitor, "cpuHighThreshold", 0.75);
ReflectionTestUtils.setField(resourceMonitor, "osMXBean", osMXBean);
ReflectionTestUtils.setField(resourceMonitor, "memoryMXBean", memoryMXBean);
ReflectionTestUtils.setField(resourceMonitor, "currentStatus", currentStatus);
ReflectionTestUtils.setField(resourceMonitor, "latestMetrics", latestMetrics);
}
@Test
void shouldCalculateDynamicQueueCapacity() {
// Given
int baseCapacity = 10;
int minCapacity = 2;
// Mock current status as OK
currentStatus.set(ResourceStatus.OK);
// When
int capacity = resourceMonitor.calculateDynamicQueueCapacity(baseCapacity, minCapacity);
// Then
assertEquals(baseCapacity, capacity, "With OK status, capacity should equal base capacity");
// Given
currentStatus.set(ResourceStatus.WARNING);
// When
capacity = resourceMonitor.calculateDynamicQueueCapacity(baseCapacity, minCapacity);
// Then
assertEquals(6, capacity, "With WARNING status, capacity should be reduced to 60%");
// Given
currentStatus.set(ResourceStatus.CRITICAL);
// When
capacity = resourceMonitor.calculateDynamicQueueCapacity(baseCapacity, minCapacity);
// Then
assertEquals(3, capacity, "With CRITICAL status, capacity should be reduced to 30%");
// Test minimum capacity enforcement
assertEquals(minCapacity, resourceMonitor.calculateDynamicQueueCapacity(1, minCapacity),
"Should never go below minimum capacity");
}
@ParameterizedTest
@CsvSource({
"10, OK, false", // Light job, OK status
"10, WARNING, false", // Light job, WARNING status
"10, CRITICAL, true", // Light job, CRITICAL status
"30, OK, false", // Medium job, OK status
"30, WARNING, true", // Medium job, WARNING status
"30, CRITICAL, true", // Medium job, CRITICAL status
"80, OK, true", // Heavy job, OK status
"80, WARNING, true", // Heavy job, WARNING status
"80, CRITICAL, true" // Heavy job, CRITICAL status
})
void shouldQueueJobBasedOnWeightAndStatus(int weight, ResourceStatus status, boolean shouldQueue) {
// Given
currentStatus.set(status);
// When
boolean result = resourceMonitor.shouldQueueJob(weight);
// Then
assertEquals(shouldQueue, result,
String.format("For weight %d and status %s, shouldQueue should be %s",
weight, status, shouldQueue));
}
@Test
void resourceMetricsShouldDetectStaleState() {
// Given
Instant now = Instant.now();
Instant pastInstant = now.minusMillis(6000);
ResourceMetrics staleMetrics = new ResourceMetrics(0.5, 0.5, 1024, 2048, 4096, pastInstant);
ResourceMetrics freshMetrics = new ResourceMetrics(0.5, 0.5, 1024, 2048, 4096, now);
// When/Then
assertTrue(staleMetrics.isStale(5000), "Metrics from 6 seconds ago should be stale with 5s threshold");
assertFalse(freshMetrics.isStale(5000), "Fresh metrics should not be stale");
}
}

168
docs/AutoJobPostMapping.md Normal file
View File

@ -0,0 +1,168 @@
# AutoJobPostMapping Annotation
The `AutoJobPostMapping` annotation simplifies the creation of job-based REST endpoints in Stirling-PDF. It automatically handles job execution, file persistence, error handling, retries, and progress tracking.
## Features
- Wraps endpoint methods with job execution logic
- Supports both synchronous and asynchronous execution (via `?async=true` query parameter)
- Custom timeout configuration per endpoint
- Automatic retries with configurable retry count
- WebSocket-based progress tracking
- Consistent error handling and reporting
- Automatic persistence of uploaded files for async processing
## Usage
```java
@AutoJobPostMapping("/api/v1/security/remove-password")
public ResponseEntity<byte[]> removePassword(@ModelAttribute PDFPasswordRequest request)
throws IOException {
MultipartFile fileInput = request.getFileInput();
String password = request.getPassword();
PDDocument document = pdfDocumentFactory.load(fileInput, password);
document.setAllSecurityToBeRemoved(true);
return WebResponseUtils.pdfDocToWebResponse(
document,
Filenames.toSimpleFileName(fileInput.getOriginalFilename())
.replaceFirst("[.][^.]+$", "")
+ "_password_removed.pdf");
}
```
## Parameters
The `AutoJobPostMapping` annotation accepts the following parameters:
| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `value` | String[] | `{}` | The path mapping URIs (e.g., "/api/v1/security/add-password") |
| `consumes` | String[] | `{"multipart/form-data"}` | Supported media types for requests |
| `timeout` | long | `-1` (use system default) | Custom timeout in milliseconds for this job |
| `retryCount` | int | `1` (no retries) | Maximum number of retry attempts on failure |
| `trackProgress` | boolean | `true` | Enable WebSocket progress tracking for async jobs |
| `queueable` | boolean | `false` | Whether this job can be queued when system resources are limited |
| `resourceWeight` | int | `50` | Resource weight of this job (1-100), higher values indicate more resource-intensive jobs |
## Examples
### Basic Usage
```java
@AutoJobPostMapping("/api/v1/security/remove-password")
public ResponseEntity<byte[]> removePassword(@ModelAttribute PDFPasswordRequest request) {
// Implementation
}
```
### With Custom Timeout
```java
// Set a 5-minute timeout for this operation
@AutoJobPostMapping(value = "/api/v1/misc/ocr-pdf", timeout = 300000)
public ResponseEntity<byte[]> ocrPdf(@ModelAttribute OCRRequest request) {
// OCR implementation
}
```
### With Retries
```java
// Allow up to 3 retry attempts for external API calls
@AutoJobPostMapping(value = "/api/v1/convert/url-to-pdf", retryCount = 3)
public ResponseEntity<byte[]> convertUrlToPdf(@ModelAttribute WebsiteToPDFRequest request) {
// Implementation
}
```
### Disable Progress Tracking
```java
// Simple, fast operation that doesn't need progress tracking
@AutoJobPostMapping(value = "/api/v1/misc/flatten", trackProgress = false)
public ResponseEntity<byte[]> flattenPdf(@ModelAttribute FlattenRequest request) {
// Implementation
}
```
### Enable Job Queueing for Resource-Intensive Operations
```java
// Resource-intensive operation that can be queued during high system load
@AutoJobPostMapping(
value = "/api/v1/misc/ocr-pdf",
queueable = true,
resourceWeight = 80, // High resource usage
timeout = 600000 // 10 minutes
)
public ResponseEntity<byte[]> ocrPdf(@ModelAttribute OCRRequest request) {
// OCR implementation
}
```
### Lightweight Operation
```java
// Very lightweight operation with low resource requirements
@AutoJobPostMapping(
value = "/api/v1/misc/get-page-count",
queueable = false,
resourceWeight = 10 // Very low resource usage
)
public ResponseEntity<Integer> getPageCount(@ModelAttribute PDFFile request) {
// Simple page count implementation
}
```
## Client-Side Integration
For asynchronous jobs, clients can:
1. Submit the job with `?async=true` parameter
2. Receive a job ID in the response
3. Connect to the WebSocket at `/ws/progress/{jobId}` to receive progress updates
4. Fetch the completed result from `/api/v1/general/job/{jobId}/result` when done
Example WebSocket message:
```json
{
"jobId": "b4c9a31d-4b7e-42b2-8ab9-3cbe99d5b94f",
"status": "Processing",
"progress": 65,
"message": "OCR processing page 13/20"
}
```
## Resource-Aware Job Queueing
The `queueable` parameter enables intelligent resource-aware job queueing for heavy operations. When enabled:
1. Jobs are automatically queued when system resources (CPU, memory) are constrained
2. Queue capacity dynamically adjusts based on available resources
3. Queue position and status updates are sent via WebSocket
4. Jobs with high `resourceWeight` values have stricter queueing conditions
5. Long-running jobs don't block the system from handling other requests
### Resource Weight Guidelines
When setting the `resourceWeight` parameter, use these guidelines:
| Weight Range | Appropriate For |
|--------------|----------------|
| 1-20 | Lightweight operations: metadata reads, simple transforms, etc. |
| 21-50 | Medium operations: basic PDF manipulation, simple image operations |
| 51-80 | Heavy operations: PDF merging, image conversions, medium OCR |
| 81-100 | Very intensive operations: large OCR jobs, complex transformations |
### Example Queue Status Messages
```json
{
"jobId": "b4c9a31d-4b7e-42b2-8ab9-3cbe99d5b94f",
"status": "Queued",
"progress": 0,
"message": "Waiting in queue for resources (position 3)"
}
```
```json
{
"jobId": "b4c9a31d-4b7e-42b2-8ab9-3cbe99d5b94f",
"status": "Starting",
"progress": 10,
"message": "Resources available, starting job execution"
}
```