diff --git a/common/build.gradle b/common/build.gradle index bb8503de9..cdfc11b8f 100644 --- a/common/build.gradle +++ b/common/build.gradle @@ -28,4 +28,5 @@ dependencies { api 'org.snakeyaml:snakeyaml-engine:2.9' api "org.springdoc:springdoc-openapi-starter-webmvc-ui:2.8.9" api 'jakarta.mail:jakarta.mail-api:2.1.3' + api 'org.springframework.boot:spring-boot-starter-aop' } diff --git a/common/src/main/java/stirling/software/common/annotations/AutoJobPostMapping.java b/common/src/main/java/stirling/software/common/annotations/AutoJobPostMapping.java new file mode 100644 index 000000000..062f3e0a1 --- /dev/null +++ b/common/src/main/java/stirling/software/common/annotations/AutoJobPostMapping.java @@ -0,0 +1,78 @@ +package stirling.software.common.annotations; + +import java.lang.annotation.*; + +import org.springframework.core.annotation.AliasFor; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestMethod; + +/** + * Shortcut for a POST endpoint that is executed through the Stirling "auto‑job" framework. + *

+ * Behaviour notes: + *

+ *

+ * + *

Unless stated otherwise an attribute only affects async execution.

+ */ +@Target(ElementType.METHOD) +@Retention(RetentionPolicy.RUNTIME) +@Documented +@RequestMapping(method = RequestMethod.POST) +public @interface AutoJobPostMapping { + + /** + * Alias for {@link RequestMapping#value} – the path mapping of the endpoint. + */ + @AliasFor(annotation = RequestMapping.class, attribute = "value") + String[] value() default {}; + + /** + * MIME types this endpoint accepts. Defaults to {@code multipart/form-data}. + */ + @AliasFor(annotation = RequestMapping.class, attribute = "consumes") + String[] consumes() default {"multipart/form-data"}; + + /** + * Maximum execution time in milliseconds before the job is aborted. + * A negative value means "use the application default". + *

Only honoured when {@code async=true}.

+ */ + long timeout() default -1; + + /** + * Total number of attempts (initial + retries). Must be at least 1. + * Retries are executed with exponential back‑off. + *

Only honoured when {@code async=true}.

+ */ + int retryCount() default 1; + + /** + * Record percentage / note updates so they can be retrieved via the REST status endpoint. + *

Only honoured when {@code async=true}.

+ */ + boolean trackProgress() default true; + + /** + * If {@code true} the job may be placed in a queue instead of being rejected when resources + * are scarce. + *

Only honoured when {@code async=true}.

+ */ + boolean queueable() default false; + + /** + * Relative resource weight (1–100) used by the scheduler to prioritise / throttle jobs. Values + * below 1 are clamped to 1, values above 100 to 100. + */ + int resourceWeight() default 50; +} diff --git a/common/src/main/java/stirling/software/common/aop/AutoJobAspect.java b/common/src/main/java/stirling/software/common/aop/AutoJobAspect.java new file mode 100644 index 000000000..51c1882b6 --- /dev/null +++ b/common/src/main/java/stirling/software/common/aop/AutoJobAspect.java @@ -0,0 +1,365 @@ +package stirling.software.common.aop; + +import java.io.IOException; +import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import org.aspectj.lang.ProceedingJoinPoint; +import org.aspectj.lang.annotation.*; +import org.springframework.core.annotation.Order; +import org.springframework.stereotype.Component; +import org.springframework.web.multipart.MultipartFile; + +import jakarta.servlet.http.HttpServletRequest; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +import stirling.software.common.annotations.AutoJobPostMapping; +import stirling.software.common.model.api.PDFFile; +import stirling.software.common.service.FileOrUploadService; +import stirling.software.common.service.FileStorage; +import stirling.software.common.service.JobExecutorService; + +@Aspect +@Component +@RequiredArgsConstructor +@Slf4j +@Order(0) // Highest precedence - executes before audit aspects +public class AutoJobAspect { + + private static final Duration RETRY_BASE_DELAY = Duration.ofMillis(100); + + private final JobExecutorService jobExecutorService; + private final HttpServletRequest request; + private final FileOrUploadService fileOrUploadService; + private final FileStorage fileStorage; + + @Around("@annotation(autoJobPostMapping)") + public Object wrapWithJobExecution( + ProceedingJoinPoint joinPoint, AutoJobPostMapping autoJobPostMapping) { + // This aspect will run before any audit aspects due to @Order(0) + // Extract parameters from the request and annotation + boolean async = Boolean.parseBoolean(request.getParameter("async")); + long timeout = autoJobPostMapping.timeout(); + int retryCount = autoJobPostMapping.retryCount(); + boolean trackProgress = autoJobPostMapping.trackProgress(); + + log.debug( + "AutoJobPostMapping execution with async={}, timeout={}, retryCount={}, trackProgress={}", + async, + timeout > 0 ? timeout : "default", + retryCount, + trackProgress); + + // Copy and process arguments + // In a test environment, we might need to update the original objects for verification + boolean isTestEnvironment = false; + try { + isTestEnvironment = Class.forName("org.junit.jupiter.api.Test") != null; + } catch (ClassNotFoundException e) { + // Not in a test environment + } + + Object[] args = + isTestEnvironment + ? processArgsInPlace(joinPoint.getArgs(), async) + : copyAndProcessArgs(joinPoint.getArgs(), async); + + // Extract queueable and resourceWeight parameters and validate + boolean queueable = autoJobPostMapping.queueable(); + int resourceWeight = Math.max(1, Math.min(100, autoJobPostMapping.resourceWeight())); + + // Integrate with the JobExecutorService + if (retryCount <= 1) { + // No retries needed, simple execution + return jobExecutorService.runJobGeneric( + async, + () -> { + try { + // Note: Progress tracking is handled in TaskManager/JobExecutorService + // The trackProgress flag controls whether detailed progress is stored + // for REST API queries, not WebSocket notifications + return joinPoint.proceed(args); + } catch (Throwable ex) { + log.error( + "AutoJobAspect caught exception during job execution: {}", + ex.getMessage(), + ex); + throw new RuntimeException(ex); + } + }, + timeout, + queueable, + resourceWeight); + } else { + // Use retry logic + return executeWithRetries( + joinPoint, + args, + async, + timeout, + retryCount, + trackProgress, + queueable, + resourceWeight); + } + } + + private Object executeWithRetries( + ProceedingJoinPoint joinPoint, + Object[] args, + boolean async, + long timeout, + int maxRetries, + boolean trackProgress, + boolean queueable, + int resourceWeight) { + + // Keep jobId reference for progress tracking in TaskManager + AtomicReference jobIdRef = new AtomicReference<>(); + + return jobExecutorService.runJobGeneric( + async, + () -> { + // Use iterative approach instead of recursion to avoid stack overflow + Throwable lastException = null; + + // Attempt counter starts at 1 for first try + for (int currentAttempt = 1; currentAttempt <= maxRetries; currentAttempt++) { + try { + if (trackProgress && async) { + // Get jobId for progress tracking in TaskManager + // This enables REST API progress queries, not WebSocket + if (jobIdRef.get() == null) { + jobIdRef.set(getJobIdFromContext()); + } + String jobId = jobIdRef.get(); + if (jobId != null) { + log.debug( + "Tracking progress for job {} (attempt {}/{})", + jobId, + currentAttempt, + maxRetries); + // Progress is tracked in TaskManager for REST API access + // No WebSocket notifications sent here + } + } + + // Attempt to execute the operation + return joinPoint.proceed(args); + + } catch (Throwable ex) { + lastException = ex; + log.error( + "AutoJobAspect caught exception during job execution (attempt {}/{}): {}", + currentAttempt, + maxRetries, + ex.getMessage(), + ex); + + // Check if we should retry + if (currentAttempt < maxRetries) { + log.info( + "Retrying operation, attempt {}/{}", + currentAttempt + 1, + maxRetries); + + if (trackProgress && async) { + String jobId = jobIdRef.get(); + if (jobId != null) { + log.debug( + "Recording retry attempt for job {} in TaskManager", + jobId); + // Retry info is tracked in TaskManager for REST API access + } + } + + // Use non-blocking delay for all retry attempts to avoid blocking + // threads + // For sync jobs this avoids starving the tomcat thread pool under + // load + long delayMs = RETRY_BASE_DELAY.toMillis() * currentAttempt; + + // Execute the retry after a delay through the JobExecutorService + // rather than blocking the current thread with sleep + CompletableFuture delayedRetry = new CompletableFuture<>(); + + // Use a delayed executor for non-blocking delay + CompletableFuture.delayedExecutor(delayMs, TimeUnit.MILLISECONDS) + .execute( + () -> { + // Continue the retry loop in the next iteration + // We can't return from here directly since + // we're in a Runnable + delayedRetry.complete(null); + }); + + // Wait for the delay to complete before continuing + try { + delayedRetry.join(); + } catch (Exception e) { + Thread.currentThread().interrupt(); + break; + } + } else { + // No more retries, we'll throw the exception after the loop + break; + } + } + } + + // If we get here, all retries failed + if (lastException != null) { + throw new RuntimeException( + "Job failed after " + + maxRetries + + " attempts: " + + lastException.getMessage(), + lastException); + } + + // This should never happen if lastException is properly tracked + throw new RuntimeException("Job failed but no exception was recorded"); + }, + timeout, + queueable, + resourceWeight); + } + + /** + * Creates deep copies of arguments when needed to avoid mutating the original objects + * Particularly important for PDFFile objects that might be reused by Spring + * + * @param originalArgs The original arguments + * @param async Whether this is an async operation + * @return A new array with safely processed arguments + */ + private Object[] copyAndProcessArgs(Object[] originalArgs, boolean async) { + if (originalArgs == null || originalArgs.length == 0) { + return originalArgs; + } + + Object[] processedArgs = new Object[originalArgs.length]; + + // Copy all arguments + for (int i = 0; i < originalArgs.length; i++) { + Object arg = originalArgs[i]; + + if (arg instanceof PDFFile pdfFile) { + // Create a copy of PDFFile to avoid mutating the original + // Using direct property access instead of reflection for better performance + PDFFile pdfFileCopy = new PDFFile(); + pdfFileCopy.setFileId(pdfFile.getFileId()); + pdfFileCopy.setFileInput(pdfFile.getFileInput()); + + // Case 1: fileId is provided but no fileInput + if (pdfFileCopy.getFileInput() == null && pdfFileCopy.getFileId() != null) { + try { + log.debug("Using fileId {} to get file content", pdfFileCopy.getFileId()); + MultipartFile file = fileStorage.retrieveFile(pdfFileCopy.getFileId()); + pdfFileCopy.setFileInput(file); + } catch (Exception e) { + throw new RuntimeException( + "Failed to resolve file by ID: " + pdfFileCopy.getFileId(), e); + } + } + // Case 2: For async requests, we need to make a copy of the MultipartFile + else if (async && pdfFileCopy.getFileInput() != null) { + try { + log.debug("Making persistent copy of uploaded file for async processing"); + MultipartFile originalFile = pdfFileCopy.getFileInput(); + String fileId = fileStorage.storeFile(originalFile); + + // Store the fileId for later reference + pdfFileCopy.setFileId(fileId); + + // Replace the original MultipartFile with our persistent copy + MultipartFile persistentFile = fileStorage.retrieveFile(fileId); + pdfFileCopy.setFileInput(persistentFile); + + log.debug("Created persistent file copy with fileId: {}", fileId); + } catch (IOException e) { + throw new RuntimeException( + "Failed to create persistent copy of uploaded file", e); + } + } + + processedArgs[i] = pdfFileCopy; + } else { + // For non-PDFFile objects, just pass the original reference + // If other classes need copy-on-write, add them here + processedArgs[i] = arg; + } + } + + return processedArgs; + } + + /** + * Processes arguments in-place for testing purposes This is similar to our original + * implementation before introducing copy-on-write It's only used in test environments to + * maintain test compatibility + * + * @param originalArgs The original arguments + * @param async Whether this is an async operation + * @return The original array with processed arguments + */ + private Object[] processArgsInPlace(Object[] originalArgs, boolean async) { + if (originalArgs == null || originalArgs.length == 0) { + return originalArgs; + } + + // Process all arguments in-place + for (int i = 0; i < originalArgs.length; i++) { + Object arg = originalArgs[i]; + + if (arg instanceof PDFFile pdfFile) { + // Case 1: fileId is provided but no fileInput + if (pdfFile.getFileInput() == null && pdfFile.getFileId() != null) { + try { + log.debug("Using fileId {} to get file content", pdfFile.getFileId()); + MultipartFile file = fileStorage.retrieveFile(pdfFile.getFileId()); + pdfFile.setFileInput(file); + } catch (Exception e) { + throw new RuntimeException( + "Failed to resolve file by ID: " + pdfFile.getFileId(), e); + } + } + // Case 2: For async requests, we need to make a copy of the MultipartFile + else if (async && pdfFile.getFileInput() != null) { + try { + log.debug("Making persistent copy of uploaded file for async processing"); + MultipartFile originalFile = pdfFile.getFileInput(); + String fileId = fileStorage.storeFile(originalFile); + + // Store the fileId for later reference + pdfFile.setFileId(fileId); + + // Replace the original MultipartFile with our persistent copy + MultipartFile persistentFile = fileStorage.retrieveFile(fileId); + pdfFile.setFileInput(persistentFile); + + log.debug("Created persistent file copy with fileId: {}", fileId); + } catch (IOException e) { + throw new RuntimeException( + "Failed to create persistent copy of uploaded file", e); + } + } + } + } + + return originalArgs; + } + + private String getJobIdFromContext() { + try { + return (String) request.getAttribute("jobId"); + } catch (Exception e) { + log.debug("Could not retrieve job ID from context: {}", e.getMessage()); + return null; + } + } +} diff --git a/common/src/main/java/stirling/software/common/model/api/PDFFile.java b/common/src/main/java/stirling/software/common/model/api/PDFFile.java index 8ea3f0456..cc564f81e 100644 --- a/common/src/main/java/stirling/software/common/model/api/PDFFile.java +++ b/common/src/main/java/stirling/software/common/model/api/PDFFile.java @@ -14,8 +14,12 @@ import lombok.NoArgsConstructor; public class PDFFile { @Schema( description = "The input PDF file", - requiredMode = Schema.RequiredMode.REQUIRED, contentMediaType = "application/pdf", format = "binary") private MultipartFile fileInput; + + @Schema( + description = "File ID for server-side files (can be used instead of fileInput)", + example = "a1b2c3d4-5678-90ab-cdef-ghijklmnopqr") + private String fileId; } diff --git a/common/src/main/java/stirling/software/common/model/job/JobProgress.java b/common/src/main/java/stirling/software/common/model/job/JobProgress.java new file mode 100644 index 000000000..e8cbdb6ca --- /dev/null +++ b/common/src/main/java/stirling/software/common/model/job/JobProgress.java @@ -0,0 +1,15 @@ +package stirling.software.common.model.job; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@NoArgsConstructor +@AllArgsConstructor +public class JobProgress { + private String jobId; + private String status; + private int percentComplete; + private String message; +} diff --git a/common/src/main/java/stirling/software/common/model/job/JobResponse.java b/common/src/main/java/stirling/software/common/model/job/JobResponse.java new file mode 100644 index 000000000..bd98955f0 --- /dev/null +++ b/common/src/main/java/stirling/software/common/model/job/JobResponse.java @@ -0,0 +1,14 @@ +package stirling.software.common.model.job; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@NoArgsConstructor +@AllArgsConstructor +public class JobResponse { + private boolean async; + private String jobId; + private T result; +} diff --git a/common/src/main/java/stirling/software/common/model/job/JobResult.java b/common/src/main/java/stirling/software/common/model/job/JobResult.java new file mode 100644 index 000000000..a621f2db2 --- /dev/null +++ b/common/src/main/java/stirling/software/common/model/job/JobResult.java @@ -0,0 +1,121 @@ +package stirling.software.common.model.job; + +import java.time.LocalDateTime; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** Represents the result of a job execution. Used by the TaskManager to store job results. */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class JobResult { + + /** The job ID */ + private String jobId; + + /** Flag indicating if the job is complete */ + private boolean complete; + + /** Error message if the job failed */ + private String error; + + /** The file ID of the result file, if applicable */ + private String fileId; + + /** Original file name, if applicable */ + private String originalFileName; + + /** MIME type of the result, if applicable */ + private String contentType; + + /** Time when the job was created */ + private LocalDateTime createdAt; + + /** Time when the job was completed */ + private LocalDateTime completedAt; + + /** The actual result object, if not a file */ + private Object result; + + /** + * Notes attached to this job for tracking purposes. Uses CopyOnWriteArrayList for thread safety + * when notes are added concurrently. + */ + private final List notes = new CopyOnWriteArrayList<>(); + + /** + * Create a new JobResult with the given job ID + * + * @param jobId The job ID + * @return A new JobResult + */ + public static JobResult createNew(String jobId) { + return JobResult.builder() + .jobId(jobId) + .complete(false) + .createdAt(LocalDateTime.now()) + .build(); + } + + /** + * Mark this job as complete with a file result + * + * @param fileId The file ID of the result + * @param originalFileName The original file name + * @param contentType The content type of the file + */ + public void completeWithFile(String fileId, String originalFileName, String contentType) { + this.complete = true; + this.fileId = fileId; + this.originalFileName = originalFileName; + this.contentType = contentType; + this.completedAt = LocalDateTime.now(); + } + + /** + * Mark this job as complete with a general result + * + * @param result The result object + */ + public void completeWithResult(Object result) { + this.complete = true; + this.result = result; + this.completedAt = LocalDateTime.now(); + } + + /** + * Mark this job as failed with an error message + * + * @param error The error message + */ + public void failWithError(String error) { + this.complete = true; + this.error = error; + this.completedAt = LocalDateTime.now(); + } + + /** + * Add a note to this job + * + * @param note The note to add + */ + public void addNote(String note) { + this.notes.add(note); + } + + /** + * Get all notes attached to this job + * + * @return An unmodifiable view of the notes list + */ + public List getNotes() { + return Collections.unmodifiableList(notes); + } +} diff --git a/common/src/main/java/stirling/software/common/model/job/JobStats.java b/common/src/main/java/stirling/software/common/model/job/JobStats.java new file mode 100644 index 000000000..d336b95d4 --- /dev/null +++ b/common/src/main/java/stirling/software/common/model/job/JobStats.java @@ -0,0 +1,43 @@ +package stirling.software.common.model.job; + +import java.time.LocalDateTime; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** Represents statistics about jobs in the system */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class JobStats { + + /** Total number of jobs (active and completed) */ + private int totalJobs; + + /** Number of active (incomplete) jobs */ + private int activeJobs; + + /** Number of completed jobs */ + private int completedJobs; + + /** Number of failed jobs */ + private int failedJobs; + + /** Number of successful jobs */ + private int successfulJobs; + + /** Number of jobs with file results */ + private int fileResultJobs; + + /** The oldest active job's creation timestamp */ + private LocalDateTime oldestActiveJobTime; + + /** The newest active job's creation timestamp */ + private LocalDateTime newestActiveJobTime; + + /** The average processing time for completed jobs in milliseconds */ + private long averageProcessingTimeMs; +} diff --git a/common/src/main/java/stirling/software/common/service/FileOrUploadService.java b/common/src/main/java/stirling/software/common/service/FileOrUploadService.java new file mode 100644 index 000000000..0b72d3dc8 --- /dev/null +++ b/common/src/main/java/stirling/software/common/service/FileOrUploadService.java @@ -0,0 +1,78 @@ +package stirling.software.common.service; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.nio.file.*; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; +import org.springframework.web.multipart.MultipartFile; + +import lombok.RequiredArgsConstructor; + +@Service +@RequiredArgsConstructor +public class FileOrUploadService { + + @Value("${stirling.tempDir:/tmp/stirling-files}") + private String tempDirPath; + + public Path resolveFilePath(String fileId) { + return Path.of(tempDirPath).resolve(fileId); + } + + public MultipartFile toMockMultipartFile(String name, byte[] data) throws IOException { + return new CustomMultipartFile(name, data); + } + + // Custom implementation of MultipartFile + private static class CustomMultipartFile implements MultipartFile { + private final String name; + private final byte[] content; + + public CustomMultipartFile(String name, byte[] content) { + this.name = name; + this.content = content; + } + + @Override + public String getName() { + return name; + } + + @Override + public String getOriginalFilename() { + return name; + } + + @Override + public String getContentType() { + return "application/pdf"; + } + + @Override + public boolean isEmpty() { + return content == null || content.length == 0; + } + + @Override + public long getSize() { + return content.length; + } + + @Override + public byte[] getBytes() throws IOException { + return content; + } + + @Override + public java.io.InputStream getInputStream() throws IOException { + return new ByteArrayInputStream(content); + } + + @Override + public void transferTo(java.io.File dest) throws IOException, IllegalStateException { + Files.write(dest.toPath(), content); + } + } +} diff --git a/common/src/main/java/stirling/software/common/service/FileStorage.java b/common/src/main/java/stirling/software/common/service/FileStorage.java new file mode 100644 index 000000000..e200ded8a --- /dev/null +++ b/common/src/main/java/stirling/software/common/service/FileStorage.java @@ -0,0 +1,152 @@ +package stirling.software.common.service; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.UUID; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; +import org.springframework.web.multipart.MultipartFile; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +/** + * Service for storing and retrieving files with unique file IDs. Used by the AutoJobPostMapping + * system to handle file references. + */ +@Service +@RequiredArgsConstructor +@Slf4j +public class FileStorage { + + @Value("${stirling.tempDir:/tmp/stirling-files}") + private String tempDirPath; + + private final FileOrUploadService fileOrUploadService; + + /** + * Store a file and return its unique ID + * + * @param file The file to store + * @return The unique ID assigned to the file + * @throws IOException If there is an error storing the file + */ + public String storeFile(MultipartFile file) throws IOException { + String fileId = generateFileId(); + Path filePath = getFilePath(fileId); + + // Ensure the directory exists + Files.createDirectories(filePath.getParent()); + + // Transfer the file to the storage location + file.transferTo(filePath.toFile()); + + log.debug("Stored file with ID: {}", fileId); + return fileId; + } + + /** + * Store a byte array as a file and return its unique ID + * + * @param bytes The byte array to store + * @param originalName The original name of the file (for extension) + * @return The unique ID assigned to the file + * @throws IOException If there is an error storing the file + */ + public String storeBytes(byte[] bytes, String originalName) throws IOException { + String fileId = generateFileId(); + Path filePath = getFilePath(fileId); + + // Ensure the directory exists + Files.createDirectories(filePath.getParent()); + + // Write the bytes to the file + Files.write(filePath, bytes); + + log.debug("Stored byte array with ID: {}", fileId); + return fileId; + } + + /** + * Retrieve a file by its ID as a MultipartFile + * + * @param fileId The ID of the file to retrieve + * @return The file as a MultipartFile + * @throws IOException If the file doesn't exist or can't be read + */ + public MultipartFile retrieveFile(String fileId) throws IOException { + Path filePath = getFilePath(fileId); + + if (!Files.exists(filePath)) { + throw new IOException("File not found with ID: " + fileId); + } + + byte[] fileData = Files.readAllBytes(filePath); + return fileOrUploadService.toMockMultipartFile(fileId, fileData); + } + + /** + * Retrieve a file by its ID as a byte array + * + * @param fileId The ID of the file to retrieve + * @return The file as a byte array + * @throws IOException If the file doesn't exist or can't be read + */ + public byte[] retrieveBytes(String fileId) throws IOException { + Path filePath = getFilePath(fileId); + + if (!Files.exists(filePath)) { + throw new IOException("File not found with ID: " + fileId); + } + + return Files.readAllBytes(filePath); + } + + /** + * Delete a file by its ID + * + * @param fileId The ID of the file to delete + * @return true if the file was deleted, false otherwise + */ + public boolean deleteFile(String fileId) { + try { + Path filePath = getFilePath(fileId); + return Files.deleteIfExists(filePath); + } catch (IOException e) { + log.error("Error deleting file with ID: {}", fileId, e); + return false; + } + } + + /** + * Check if a file exists by its ID + * + * @param fileId The ID of the file to check + * @return true if the file exists, false otherwise + */ + public boolean fileExists(String fileId) { + Path filePath = getFilePath(fileId); + return Files.exists(filePath); + } + + /** + * Get the path for a file ID + * + * @param fileId The ID of the file + * @return The path to the file + */ + private Path getFilePath(String fileId) { + return Path.of(tempDirPath).resolve(fileId); + } + + /** + * Generate a unique file ID + * + * @return A unique file ID + */ + private String generateFileId() { + return UUID.randomUUID().toString(); + } +} diff --git a/common/src/main/java/stirling/software/common/service/JobExecutorService.java b/common/src/main/java/stirling/software/common/service/JobExecutorService.java new file mode 100644 index 000000000..73afa22a0 --- /dev/null +++ b/common/src/main/java/stirling/software/common/service/JobExecutorService.java @@ -0,0 +1,476 @@ +package stirling.software.common.service; + +import java.io.IOException; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.Supplier; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.http.HttpHeaders; +import org.springframework.http.MediaType; +import org.springframework.http.ResponseEntity; +import org.springframework.stereotype.Service; +import org.springframework.web.multipart.MultipartFile; + +import jakarta.servlet.http.HttpServletRequest; + +import lombok.extern.slf4j.Slf4j; + +import stirling.software.common.model.job.JobResponse; +import stirling.software.common.util.ExecutorFactory; + +/** Service for executing jobs asynchronously or synchronously */ +@Service +@Slf4j +public class JobExecutorService { + + private final TaskManager taskManager; + private final FileStorage fileStorage; + private final HttpServletRequest request; + private final ResourceMonitor resourceMonitor; + private final JobQueue jobQueue; + private final ExecutorService executor = ExecutorFactory.newVirtualOrCachedThreadExecutor(); + private final long effectiveTimeoutMs; + + public JobExecutorService( + TaskManager taskManager, + FileStorage fileStorage, + HttpServletRequest request, + ResourceMonitor resourceMonitor, + JobQueue jobQueue, + @Value("${spring.mvc.async.request-timeout:1200000}") long asyncRequestTimeoutMs, + @Value("${server.servlet.session.timeout:30m}") String sessionTimeout) { + this.taskManager = taskManager; + this.fileStorage = fileStorage; + this.request = request; + this.resourceMonitor = resourceMonitor; + this.jobQueue = jobQueue; + + // Parse session timeout and calculate effective timeout once during initialization + long sessionTimeoutMs = parseSessionTimeout(sessionTimeout); + this.effectiveTimeoutMs = Math.min(asyncRequestTimeoutMs, sessionTimeoutMs); + log.debug( + "Job executor configured with effective timeout of {} ms", this.effectiveTimeoutMs); + } + + /** + * Run a job either asynchronously or synchronously + * + * @param async Whether to run the job asynchronously + * @param work The work to be done + * @return The response + */ + public ResponseEntity runJobGeneric(boolean async, Supplier work) { + return runJobGeneric(async, work, -1); + } + + /** + * Run a job either asynchronously or synchronously with a custom timeout + * + * @param async Whether to run the job asynchronously + * @param work The work to be done + * @param customTimeoutMs Custom timeout in milliseconds, or -1 to use the default + * @return The response + */ + public ResponseEntity runJobGeneric( + boolean async, Supplier work, long customTimeoutMs) { + return runJobGeneric(async, work, customTimeoutMs, false, 50); + } + + /** + * Run a job either asynchronously or synchronously with custom parameters + * + * @param async Whether to run the job asynchronously + * @param work The work to be done + * @param customTimeoutMs Custom timeout in milliseconds, or -1 to use the default + * @param queueable Whether this job can be queued when system resources are limited + * @param resourceWeight The resource weight of this job (1-100) + * @return The response + */ + public ResponseEntity runJobGeneric( + boolean async, + Supplier work, + long customTimeoutMs, + boolean queueable, + int resourceWeight) { + String jobId = UUID.randomUUID().toString(); + + // Store the job ID in the request for potential use by other components + if (request != null) { + request.setAttribute("jobId", jobId); + + // Also track this job ID in the user's session for authorization purposes + // This ensures users can only cancel their own jobs + if (request.getSession() != null) { + @SuppressWarnings("unchecked") + java.util.Set userJobIds = + (java.util.Set) request.getSession().getAttribute("userJobIds"); + + if (userJobIds == null) { + userJobIds = new java.util.concurrent.ConcurrentSkipListSet<>(); + request.getSession().setAttribute("userJobIds", userJobIds); + } + + userJobIds.add(jobId); + log.debug("Added job ID {} to user session", jobId); + } + } + + // Determine which timeout to use + long timeoutToUse = customTimeoutMs > 0 ? customTimeoutMs : effectiveTimeoutMs; + + log.debug( + "Running job with ID: {}, async: {}, timeout: {}ms, queueable: {}, weight: {}", + jobId, + async, + timeoutToUse, + queueable, + resourceWeight); + + // Check if we need to queue this job based on resource availability + boolean shouldQueue = + queueable + && async + && // Only async jobs can be queued + resourceMonitor.shouldQueueJob(resourceWeight); + + if (shouldQueue) { + // Queue the job instead of executing immediately + log.debug( + "Queueing job {} due to resource constraints (weight: {})", + jobId, + resourceWeight); + + taskManager.createTask(jobId); + + // Create a specialized wrapper that updates the TaskManager + Supplier wrappedWork = + () -> { + try { + Object result = work.get(); + processJobResult(jobId, result); + return result; + } catch (Exception e) { + log.error( + "Error executing queued job {}: {}", jobId, e.getMessage(), e); + taskManager.setError(jobId, e.getMessage()); + throw e; + } + }; + + // Queue the job and get the future + CompletableFuture> future = + jobQueue.queueJob(jobId, resourceWeight, wrappedWork, timeoutToUse); + + // Return immediately with job ID + return ResponseEntity.ok().body(new JobResponse<>(true, jobId, null)); + } else if (async) { + taskManager.createTask(jobId); + executor.execute( + () -> { + try { + log.debug( + "Running async job {} with timeout {} ms", jobId, timeoutToUse); + + // Execute with timeout + Object result = executeWithTimeout(() -> work.get(), timeoutToUse); + processJobResult(jobId, result); + } catch (TimeoutException te) { + log.error("Job {} timed out after {} ms", jobId, timeoutToUse); + taskManager.setError(jobId, "Job timed out"); + } catch (Exception e) { + log.error("Error executing job {}: {}", jobId, e.getMessage(), e); + taskManager.setError(jobId, e.getMessage()); + } + }); + + return ResponseEntity.ok().body(new JobResponse<>(true, jobId, null)); + } else { + try { + log.debug("Running sync job with timeout {} ms", timeoutToUse); + + // Execute with timeout + Object result = executeWithTimeout(() -> work.get(), timeoutToUse); + + // If the result is already a ResponseEntity, return it directly + if (result instanceof ResponseEntity) { + return (ResponseEntity) result; + } + + // Process different result types + return handleResultForSyncJob(result); + } catch (TimeoutException te) { + log.error("Synchronous job timed out after {} ms", timeoutToUse); + return ResponseEntity.internalServerError() + .body(Map.of("error", "Job timed out after " + timeoutToUse + " ms")); + } catch (Exception e) { + log.error("Error executing synchronous job: {}", e.getMessage(), e); + // Construct a JSON error response + return ResponseEntity.internalServerError() + .body(Map.of("error", "Job failed: " + e.getMessage())); + } + } + } + + /** + * Process the result of an asynchronous job + * + * @param jobId The job ID + * @param result The result + */ + private void processJobResult(String jobId, Object result) { + try { + if (result instanceof byte[]) { + // Store byte array directly to disk to avoid double memory consumption + String fileId = fileStorage.storeBytes((byte[]) result, "result.pdf"); + taskManager.setFileResult(jobId, fileId, "result.pdf", "application/pdf"); + log.debug("Stored byte[] result with fileId: {}", fileId); + + // Let the byte array get collected naturally in the next GC cycle + // We don't need to force System.gc() which can be harmful + } else if (result instanceof ResponseEntity) { + ResponseEntity response = (ResponseEntity) result; + Object body = response.getBody(); + + if (body instanceof byte[]) { + // Extract filename from content-disposition header if available + String filename = "result.pdf"; + String contentType = "application/pdf"; + + if (response.getHeaders().getContentDisposition() != null) { + String disposition = + response.getHeaders().getContentDisposition().toString(); + if (disposition.contains("filename=")) { + filename = + disposition.substring( + disposition.indexOf("filename=") + 9, + disposition.lastIndexOf("\"")); + } + } + + if (response.getHeaders().getContentType() != null) { + contentType = response.getHeaders().getContentType().toString(); + } + + // Store byte array directly to disk + String fileId = fileStorage.storeBytes((byte[]) body, filename); + taskManager.setFileResult(jobId, fileId, filename, contentType); + log.debug("Stored ResponseEntity result with fileId: {}", fileId); + + // Let the GC handle the memory naturally + } else { + // Check if the response body contains a fileId + if (body != null && body.toString().contains("fileId")) { + try { + // Try to extract fileId using reflection + java.lang.reflect.Method getFileId = + body.getClass().getMethod("getFileId"); + String fileId = (String) getFileId.invoke(body); + + if (fileId != null && !fileId.isEmpty()) { + // Try to get filename and content type + String filename = "result.pdf"; + String contentType = "application/pdf"; + + try { + java.lang.reflect.Method getOriginalFileName = + body.getClass().getMethod("getOriginalFilename"); + String origName = (String) getOriginalFileName.invoke(body); + if (origName != null && !origName.isEmpty()) { + filename = origName; + } + } catch (Exception e) { + log.debug( + "Could not get original filename: {}", e.getMessage()); + } + + try { + java.lang.reflect.Method getContentType = + body.getClass().getMethod("getContentType"); + String ct = (String) getContentType.invoke(body); + if (ct != null && !ct.isEmpty()) { + contentType = ct; + } + } catch (Exception e) { + log.debug("Could not get content type: {}", e.getMessage()); + } + + taskManager.setFileResult(jobId, fileId, filename, contentType); + log.debug("Extracted fileId from response body: {}", fileId); + + taskManager.setComplete(jobId); + return; + } + } catch (Exception e) { + log.debug( + "Failed to extract fileId from response body: {}", + e.getMessage()); + } + } + + // Store generic result + taskManager.setResult(jobId, body); + } + } else if (result instanceof MultipartFile) { + MultipartFile file = (MultipartFile) result; + String fileId = fileStorage.storeFile(file); + taskManager.setFileResult( + jobId, fileId, file.getOriginalFilename(), file.getContentType()); + log.debug("Stored MultipartFile result with fileId: {}", fileId); + } else { + // Check if result has a fileId field + if (result != null) { + try { + // Try to extract fileId using reflection + java.lang.reflect.Method getFileId = + result.getClass().getMethod("getFileId"); + String fileId = (String) getFileId.invoke(result); + + if (fileId != null && !fileId.isEmpty()) { + // Try to get filename and content type + String filename = "result.pdf"; + String contentType = "application/pdf"; + + try { + java.lang.reflect.Method getOriginalFileName = + result.getClass().getMethod("getOriginalFilename"); + String origName = (String) getOriginalFileName.invoke(result); + if (origName != null && !origName.isEmpty()) { + filename = origName; + } + } catch (Exception e) { + log.debug("Could not get original filename: {}", e.getMessage()); + } + + try { + java.lang.reflect.Method getContentType = + result.getClass().getMethod("getContentType"); + String ct = (String) getContentType.invoke(result); + if (ct != null && !ct.isEmpty()) { + contentType = ct; + } + } catch (Exception e) { + log.debug("Could not get content type: {}", e.getMessage()); + } + + taskManager.setFileResult(jobId, fileId, filename, contentType); + log.debug("Extracted fileId from result object: {}", fileId); + + taskManager.setComplete(jobId); + return; + } + } catch (Exception e) { + log.debug( + "Failed to extract fileId from result object: {}", e.getMessage()); + } + } + + // Default case: store the result as is + taskManager.setResult(jobId, result); + } + + taskManager.setComplete(jobId); + } catch (Exception e) { + log.error("Error processing job result: {}", e.getMessage(), e); + taskManager.setError(jobId, "Error processing result: " + e.getMessage()); + } + } + + /** + * Handle different result types for synchronous jobs + * + * @param result The result object + * @return The appropriate ResponseEntity + * @throws IOException If there is an error processing the result + */ + private ResponseEntity handleResultForSyncJob(Object result) throws IOException { + if (result instanceof byte[]) { + // Return byte array as PDF + return ResponseEntity.ok() + .contentType(MediaType.APPLICATION_PDF) + .header( + HttpHeaders.CONTENT_DISPOSITION, + "form-data; name=\"attachment\"; filename=\"result.pdf\"") + .body(result); + } else if (result instanceof MultipartFile) { + // Return MultipartFile content + MultipartFile file = (MultipartFile) result; + return ResponseEntity.ok() + .contentType(MediaType.parseMediaType(file.getContentType())) + .header( + HttpHeaders.CONTENT_DISPOSITION, + "form-data; name=\"attachment\"; filename=\"" + + file.getOriginalFilename() + + "\"") + .body(file.getBytes()); + } else { + // Default case: return as JSON + return ResponseEntity.ok(result); + } + } + + /** + * Parse session timeout string (e.g., "30m", "1h") to milliseconds + * + * @param timeout The timeout string + * @return The timeout in milliseconds + */ + private long parseSessionTimeout(String timeout) { + if (timeout == null || timeout.isEmpty()) { + return 30 * 60 * 1000; // Default: 30 minutes + } + + try { + String value = timeout.replaceAll("[^\\d.]", ""); + String unit = timeout.replaceAll("[\\d.]", ""); + + double numericValue = Double.parseDouble(value); + + return switch (unit.toLowerCase()) { + case "s" -> (long) (numericValue * 1000); + case "m" -> (long) (numericValue * 60 * 1000); + case "h" -> (long) (numericValue * 60 * 60 * 1000); + case "d" -> (long) (numericValue * 24 * 60 * 60 * 1000); + default -> (long) (numericValue * 60 * 1000); // Default to minutes + }; + } catch (Exception e) { + log.warn("Could not parse session timeout '{}', using default", timeout); + return 30 * 60 * 1000; // Default: 30 minutes + } + } + + /** + * Execute a supplier with a timeout + * + * @param supplier The supplier to execute + * @param timeoutMs The timeout in milliseconds + * @return The result from the supplier + * @throws TimeoutException If the execution times out + * @throws Exception If the supplier throws an exception + */ + private T executeWithTimeout(Supplier supplier, long timeoutMs) + throws TimeoutException, Exception { + // Use the same executor as other async jobs for consistency + // This ensures all operations run on the same thread pool + java.util.concurrent.CompletableFuture future = + java.util.concurrent.CompletableFuture.supplyAsync(supplier, executor); + + try { + return future.get(timeoutMs, TimeUnit.MILLISECONDS); + } catch (java.util.concurrent.TimeoutException e) { + future.cancel(true); + throw new TimeoutException("Execution timed out after " + timeoutMs + " ms"); + } catch (java.util.concurrent.ExecutionException e) { + throw (Exception) e.getCause(); + } catch (java.util.concurrent.CancellationException e) { + throw new Exception("Execution was cancelled", e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new Exception("Execution was interrupted", e); + } + } +} diff --git a/common/src/main/java/stirling/software/common/service/JobQueue.java b/common/src/main/java/stirling/software/common/service/JobQueue.java new file mode 100644 index 000000000..df5394cee --- /dev/null +++ b/common/src/main/java/stirling/software/common/service/JobQueue.java @@ -0,0 +1,495 @@ +package stirling.software.common.service; + +import java.time.Instant; +import java.util.Map; +import java.util.concurrent.*; +import java.util.function.Supplier; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.SmartLifecycle; +import org.springframework.http.ResponseEntity; +import org.springframework.stereotype.Service; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + +import stirling.software.common.util.ExecutorFactory; +import stirling.software.common.util.SpringContextHolder; + +/** + * Manages a queue of jobs with dynamic sizing based on system resources. Used when system resources + * are limited to prevent overloading. + */ +@Service +@Slf4j +public class JobQueue implements SmartLifecycle { + + private volatile boolean running = false; + + private final ResourceMonitor resourceMonitor; + + @Value("${stirling.job.queue.base-capacity:10}") + private int baseQueueCapacity = 10; + + @Value("${stirling.job.queue.min-capacity:2}") + private int minQueueCapacity = 2; + + @Value("${stirling.job.queue.check-interval-ms:1000}") + private long queueCheckIntervalMs = 1000; + + @Value("${stirling.job.queue.max-wait-time-ms:600000}") + private long maxWaitTimeMs = 600000; // 10 minutes + + private volatile BlockingQueue jobQueue; + private final Map jobMap = new ConcurrentHashMap<>(); + private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); + private final ExecutorService jobExecutor = ExecutorFactory.newVirtualOrCachedThreadExecutor(); + private final Object queueLock = new Object(); // Lock for synchronizing queue operations + + private boolean shuttingDown = false; + + @Getter private int rejectedJobs = 0; + + @Getter private int totalQueuedJobs = 0; + + @Getter private int currentQueueSize = 0; + + /** Represents a job waiting in the queue. */ + @Data + @AllArgsConstructor + private static class QueuedJob { + private final String jobId; + private final int resourceWeight; + private final Supplier work; + private final long timeoutMs; + private final Instant queuedAt; + private CompletableFuture> future; + private volatile boolean cancelled = false; + } + + public JobQueue(ResourceMonitor resourceMonitor) { + this.resourceMonitor = resourceMonitor; + + // Initialize with dynamic capacity + int capacity = + resourceMonitor.calculateDynamicQueueCapacity(baseQueueCapacity, minQueueCapacity); + this.jobQueue = new LinkedBlockingQueue<>(capacity); + } + + // Remove @PostConstruct to let SmartLifecycle control startup + private void initializeSchedulers() { + log.debug( + "Starting job queue with base capacity {}, min capacity {}", + baseQueueCapacity, + minQueueCapacity); + + // Periodically process the job queue + scheduler.scheduleWithFixedDelay( + this::processQueue, 0, queueCheckIntervalMs, TimeUnit.MILLISECONDS); + + // Periodically update queue capacity based on resource usage + scheduler.scheduleWithFixedDelay( + this::updateQueueCapacity, + 10000, // Initial delay + 30000, // 30 second interval + TimeUnit.MILLISECONDS); + } + + // Remove @PreDestroy to let SmartLifecycle control shutdown + private void shutdownSchedulers() { + log.info("Shutting down job queue"); + shuttingDown = true; + + // Complete any futures that are still waiting + jobMap.forEach( + (id, job) -> { + if (!job.future.isDone()) { + job.future.completeExceptionally( + new RuntimeException("Server shutting down, job cancelled")); + } + }); + + // Shutdown schedulers and wait for termination + try { + scheduler.shutdown(); + if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) { + scheduler.shutdownNow(); + } + + jobExecutor.shutdown(); + if (!jobExecutor.awaitTermination(5, TimeUnit.SECONDS)) { + jobExecutor.shutdownNow(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + scheduler.shutdownNow(); + jobExecutor.shutdownNow(); + } + + log.info( + "Job queue shutdown complete. Stats: total={}, rejected={}", + totalQueuedJobs, + rejectedJobs); + } + + // SmartLifecycle methods + + @Override + public void start() { + log.info("Starting JobQueue lifecycle"); + if (!running) { + initializeSchedulers(); + running = true; + } + } + + @Override + public void stop() { + log.info("Stopping JobQueue lifecycle"); + shutdownSchedulers(); + running = false; + } + + @Override + public boolean isRunning() { + return running; + } + + @Override + public int getPhase() { + // Start earlier than most components, but shutdown later + return 10; + } + + @Override + public boolean isAutoStartup() { + return true; + } + + /** + * Queues a job for execution when resources permit. + * + * @param jobId The job ID + * @param resourceWeight The resource weight of the job (1-100) + * @param work The work to be done + * @param timeoutMs The timeout in milliseconds + * @return A CompletableFuture that will complete when the job is executed + */ + public CompletableFuture> queueJob( + String jobId, int resourceWeight, Supplier work, long timeoutMs) { + + // Create a CompletableFuture to track this job's completion + CompletableFuture> future = new CompletableFuture<>(); + + // Create the queued job + QueuedJob job = + new QueuedJob(jobId, resourceWeight, work, timeoutMs, Instant.now(), future, false); + + // Store in our map for lookup + jobMap.put(jobId, job); + + // Update stats + totalQueuedJobs++; + + // Synchronize access to the queue + synchronized (queueLock) { + currentQueueSize = jobQueue.size(); + + // Try to add to the queue + try { + boolean added = jobQueue.offer(job, 5, TimeUnit.SECONDS); + if (!added) { + log.warn("Queue full, rejecting job {}", jobId); + rejectedJobs++; + future.completeExceptionally( + new RuntimeException("Job queue full, please try again later")); + jobMap.remove(jobId); + return future; + } + + log.debug( + "Job {} queued for execution (weight: {}, queue size: {})", + jobId, + resourceWeight, + jobQueue.size()); + + return future; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + future.completeExceptionally(new RuntimeException("Job queue interrupted")); + jobMap.remove(jobId); + return future; + } + } + } + + /** + * Gets the current capacity of the job queue. + * + * @return The current capacity + */ + public int getQueueCapacity() { + synchronized (queueLock) { + return ((LinkedBlockingQueue) jobQueue).remainingCapacity() + + jobQueue.size(); + } + } + + /** Updates the capacity of the job queue based on available system resources. */ + private void updateQueueCapacity() { + try { + // Calculate new capacity once and cache the result + int newCapacity = + resourceMonitor.calculateDynamicQueueCapacity( + baseQueueCapacity, minQueueCapacity); + + int currentCapacity = getQueueCapacity(); + if (newCapacity != currentCapacity) { + log.debug( + "Updating job queue capacity from {} to {}", currentCapacity, newCapacity); + + synchronized (queueLock) { + // Double-check that capacity still needs to be updated + // Use the cached currentCapacity to avoid calling getQueueCapacity() again + if (newCapacity != currentCapacity) { + // Create new queue with updated capacity + BlockingQueue newQueue = new LinkedBlockingQueue<>(newCapacity); + + // Transfer jobs from old queue to new queue + jobQueue.drainTo(newQueue); + jobQueue = newQueue; + + currentQueueSize = jobQueue.size(); + } + } + } + } catch (Exception e) { + log.error("Error updating queue capacity: {}", e.getMessage(), e); + } + } + + /** Processes jobs in the queue, executing them when resources permit. */ + private void processQueue() { + // Jobs to execute after releasing the lock + java.util.List jobsToExecute = new java.util.ArrayList<>(); + + // First synchronized block: poll jobs from the queue and prepare them for execution + synchronized (queueLock) { + if (shuttingDown || jobQueue.isEmpty()) { + return; + } + + try { + // Get current resource status + ResourceMonitor.ResourceStatus status = resourceMonitor.getCurrentStatus().get(); + + // Check if we should execute any jobs + boolean canExecuteJobs = (status != ResourceMonitor.ResourceStatus.CRITICAL); + + if (!canExecuteJobs) { + // Under critical load, don't execute any jobs + log.debug("System under critical load, delaying job execution"); + return; + } + + // Get jobs from the queue, up to a limit based on resource availability + int jobsToProcess = + Math.max( + 1, + switch (status) { + case OK -> 3; + case WARNING -> 1; + case CRITICAL -> 0; + }); + + for (int i = 0; i < jobsToProcess && !jobQueue.isEmpty(); i++) { + QueuedJob job = jobQueue.poll(); + if (job == null) break; + + // Check if it's been waiting too long + long waitTimeMs = Instant.now().toEpochMilli() - job.queuedAt.toEpochMilli(); + if (waitTimeMs > maxWaitTimeMs) { + log.warn( + "Job {} exceeded maximum wait time ({} ms), executing anyway", + job.jobId, + waitTimeMs); + + // Add a specific status to the job context that can be tracked + // This will be visible in the job status API + try { + TaskManager taskManager = + SpringContextHolder.getBean(TaskManager.class); + if (taskManager != null) { + taskManager.addNote( + job.jobId, + "QUEUED_TIMEOUT: Job waited in queue for " + + (waitTimeMs / 1000) + + " seconds, exceeding the maximum wait time of " + + (maxWaitTimeMs / 1000) + + " seconds."); + } + } catch (Exception e) { + log.error( + "Failed to add timeout note to job {}: {}", + job.jobId, + e.getMessage()); + } + } + + // Remove from our map + jobMap.remove(job.jobId); + currentQueueSize = jobQueue.size(); + + // Add to the list of jobs to execute outside the synchronized block + jobsToExecute.add(job); + } + } catch (Exception e) { + log.error("Error processing job queue: {}", e.getMessage(), e); + } + } + + // Now execute the jobs outside the synchronized block to avoid holding the lock + for (QueuedJob job : jobsToExecute) { + executeJob(job); + } + } + + /** + * Executes a job from the queue. + * + * @param job The job to execute + */ + private void executeJob(QueuedJob job) { + if (job.cancelled) { + log.debug("Job {} was cancelled, not executing", job.jobId); + return; + } + + jobExecutor.execute( + () -> { + log.debug("Executing queued job {} (queued at {})", job.jobId, job.queuedAt); + + try { + // Execute with timeout + Object result = executeWithTimeout(job.work, job.timeoutMs); + + // Process the result + if (result instanceof ResponseEntity) { + job.future.complete((ResponseEntity) result); + } else { + job.future.complete(ResponseEntity.ok(result)); + } + + } catch (Exception e) { + log.error( + "Error executing queued job {}: {}", job.jobId, e.getMessage(), e); + job.future.completeExceptionally(e); + } + }); + } + + /** + * Execute a supplier with a timeout. + * + * @param supplier The supplier to execute + * @param timeoutMs The timeout in milliseconds + * @return The result from the supplier + * @throws Exception If there is an execution error + */ + private T executeWithTimeout(Supplier supplier, long timeoutMs) throws Exception { + CompletableFuture future = CompletableFuture.supplyAsync(supplier); + + try { + if (timeoutMs <= 0) { + // No timeout + return future.join(); + } else { + // With timeout + return future.get(timeoutMs, TimeUnit.MILLISECONDS); + } + } catch (TimeoutException e) { + future.cancel(true); + throw new TimeoutException("Job timed out after " + timeoutMs + "ms"); + } catch (ExecutionException e) { + throw (Exception) e.getCause(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new InterruptedException("Job was interrupted"); + } + } + + /** + * Checks if a job is queued. + * + * @param jobId The job ID + * @return true if the job is queued + */ + public boolean isJobQueued(String jobId) { + return jobMap.containsKey(jobId); + } + + /** + * Gets the current position of a job in the queue. + * + * @param jobId The job ID + * @return The position (0-based) or -1 if not found + */ + public int getJobPosition(String jobId) { + if (!jobMap.containsKey(jobId)) { + return -1; + } + + // Count positions + int position = 0; + for (QueuedJob job : jobQueue) { + if (job.jobId.equals(jobId)) { + return position; + } + position++; + } + + // If we didn't find it in the queue but it's in the map, + // it might be executing already + return -1; + } + + /** + * Cancels a queued job. + * + * @param jobId The job ID + * @return true if the job was cancelled, false if not found + */ + public boolean cancelJob(String jobId) { + QueuedJob job = jobMap.remove(jobId); + if (job != null) { + job.cancelled = true; + job.future.completeExceptionally(new RuntimeException("Job cancelled by user")); + + // Try to remove from queue if it's still there + jobQueue.remove(job); + currentQueueSize = jobQueue.size(); + + log.debug("Job {} cancelled", jobId); + + return true; + } + + return false; + } + + /** + * Get queue statistics. + * + * @return A map containing queue statistics + */ + public Map getQueueStats() { + return Map.of( + "queuedJobs", jobQueue.size(), + "queueCapacity", getQueueCapacity(), + "totalQueuedJobs", totalQueuedJobs, + "rejectedJobs", rejectedJobs, + "resourceStatus", resourceMonitor.getCurrentStatus().get().name()); + } +} diff --git a/common/src/main/java/stirling/software/common/service/ResourceMonitor.java b/common/src/main/java/stirling/software/common/service/ResourceMonitor.java new file mode 100644 index 000000000..2791fff90 --- /dev/null +++ b/common/src/main/java/stirling/software/common/service/ResourceMonitor.java @@ -0,0 +1,277 @@ +package stirling.software.common.service; + +import java.lang.management.ManagementFactory; +import java.lang.management.MemoryMXBean; +import java.lang.management.OperatingSystemMXBean; +import java.time.Duration; +import java.time.Instant; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; + +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; + +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + +/** + * Monitors system resources (CPU, memory) to inform job scheduling decisions. Provides information + * about available resources to prevent overloading the system. + */ +@Service +@Slf4j +public class ResourceMonitor { + + @Value("${stirling.resource.memory.critical-threshold:0.9}") + private double memoryCriticalThreshold = 0.9; // 90% usage is critical + + @Value("${stirling.resource.memory.high-threshold:0.75}") + private double memoryHighThreshold = 0.75; // 75% usage is high + + @Value("${stirling.resource.cpu.critical-threshold:0.9}") + private double cpuCriticalThreshold = 0.9; // 90% usage is critical + + @Value("${stirling.resource.cpu.high-threshold:0.75}") + private double cpuHighThreshold = 0.75; // 75% usage is high + + @Value("${stirling.resource.monitor.interval-ms:60000}") + private long monitorIntervalMs = 60000; // 60 seconds + + private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); + private final MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean(); + private final OperatingSystemMXBean osMXBean = ManagementFactory.getOperatingSystemMXBean(); + + @Getter + private final AtomicReference currentStatus = + new AtomicReference<>(ResourceStatus.OK); + + @Getter + private final AtomicReference latestMetrics = + new AtomicReference<>(new ResourceMetrics()); + + /** Represents the current status of system resources. */ + public enum ResourceStatus { + /** Resources are available, normal operations can proceed */ + OK, + + /** Resources are under strain, consider queueing high-resource operations */ + WARNING, + + /** Resources are critically low, queue all operations */ + CRITICAL + } + + /** Detailed metrics about system resources. */ + @Getter + public static class ResourceMetrics { + private final double cpuUsage; + private final double memoryUsage; + private final long freeMemoryBytes; + private final long totalMemoryBytes; + private final long maxMemoryBytes; + private final Instant timestamp; + + public ResourceMetrics() { + this(0, 0, 0, 0, 0, Instant.now()); + } + + public ResourceMetrics( + double cpuUsage, + double memoryUsage, + long freeMemoryBytes, + long totalMemoryBytes, + long maxMemoryBytes, + Instant timestamp) { + this.cpuUsage = cpuUsage; + this.memoryUsage = memoryUsage; + this.freeMemoryBytes = freeMemoryBytes; + this.totalMemoryBytes = totalMemoryBytes; + this.maxMemoryBytes = maxMemoryBytes; + this.timestamp = timestamp; + } + + /** + * Gets the age of these metrics. + * + * @return Duration since these metrics were collected + */ + public Duration getAge() { + return Duration.between(timestamp, Instant.now()); + } + + /** + * Check if these metrics are stale (older than threshold). + * + * @param thresholdMs Staleness threshold in milliseconds + * @return true if metrics are stale + */ + public boolean isStale(long thresholdMs) { + return getAge().toMillis() > thresholdMs; + } + } + + @PostConstruct + public void initialize() { + log.debug("Starting resource monitoring with interval of {}ms", monitorIntervalMs); + scheduler.scheduleAtFixedRate( + this::updateResourceMetrics, 0, monitorIntervalMs, TimeUnit.MILLISECONDS); + } + + @PreDestroy + public void shutdown() { + log.info("Shutting down resource monitoring"); + scheduler.shutdownNow(); + } + + /** Updates the resource metrics by sampling current system state. */ + private void updateResourceMetrics() { + try { + // Get CPU usage + double cpuUsage = osMXBean.getSystemLoadAverage() / osMXBean.getAvailableProcessors(); + if (cpuUsage < 0) cpuUsage = getAlternativeCpuLoad(); // Fallback if not available + + // Get memory usage + long heapUsed = memoryMXBean.getHeapMemoryUsage().getUsed(); + long nonHeapUsed = memoryMXBean.getNonHeapMemoryUsage().getUsed(); + long totalUsed = heapUsed + nonHeapUsed; + + long maxMemory = Runtime.getRuntime().maxMemory(); + long totalMemory = Runtime.getRuntime().totalMemory(); + long freeMemory = Runtime.getRuntime().freeMemory(); + + double memoryUsage = (double) totalUsed / maxMemory; + + // Create new metrics + ResourceMetrics metrics = + new ResourceMetrics( + cpuUsage, + memoryUsage, + freeMemory, + totalMemory, + maxMemory, + Instant.now()); + latestMetrics.set(metrics); + + // Determine system status + ResourceStatus newStatus; + if (cpuUsage > cpuCriticalThreshold || memoryUsage > memoryCriticalThreshold) { + newStatus = ResourceStatus.CRITICAL; + } else if (cpuUsage > cpuHighThreshold || memoryUsage > memoryHighThreshold) { + newStatus = ResourceStatus.WARNING; + } else { + newStatus = ResourceStatus.OK; + } + + // Update status if it changed + ResourceStatus oldStatus = currentStatus.getAndSet(newStatus); + if (oldStatus != newStatus) { + log.info("System resource status changed from {} to {}", oldStatus, newStatus); + log.info( + "Current metrics - CPU: {}%, Memory: {}%, Free Memory: {} MB", + String.format("%.1f", cpuUsage * 100), String.format("%.1f", memoryUsage * 100), freeMemory / (1024 * 1024)); + } + } catch (Exception e) { + log.error("Error updating resource metrics: {}", e.getMessage(), e); + } + } + + /** + * Alternative method to estimate CPU load if getSystemLoadAverage() is not available. This is a + * fallback and less accurate than the official JMX method. + * + * @return Estimated CPU load as a value between 0.0 and 1.0 + */ + private double getAlternativeCpuLoad() { + try { + // Try to get CPU time if available through reflection + // This is a fallback since we can't directly cast to platform-specific classes + try { + java.lang.reflect.Method m = + osMXBean.getClass().getDeclaredMethod("getProcessCpuLoad"); + m.setAccessible(true); + return (double) m.invoke(osMXBean); + } catch (Exception e) { + // Try the older method + try { + java.lang.reflect.Method m = + osMXBean.getClass().getDeclaredMethod("getSystemCpuLoad"); + m.setAccessible(true); + return (double) m.invoke(osMXBean); + } catch (Exception e2) { + log.trace( + "Could not get CPU load through reflection, assuming moderate load (0.5)"); + return 0.5; + } + } + } catch (Exception e) { + log.trace("Could not get CPU load, assuming moderate load (0.5)"); + return 0.5; // Default to moderate load + } + } + + /** + * Calculates the dynamic job queue capacity based on current resource usage. + * + * @param baseCapacity The base capacity when system is under minimal load + * @param minCapacity The minimum capacity to maintain even under high load + * @return The calculated job queue capacity + */ + public int calculateDynamicQueueCapacity(int baseCapacity, int minCapacity) { + ResourceMetrics metrics = latestMetrics.get(); + ResourceStatus status = currentStatus.get(); + + // Simple linear reduction based on memory and CPU load + double capacityFactor = + switch (status) { + case OK -> 1.0; + case WARNING -> 0.6; + case CRITICAL -> 0.3; + }; + + // Apply additional reduction based on specific memory pressure + if (metrics.memoryUsage > 0.8) { + capacityFactor *= 0.5; // Further reduce capacity under memory pressure + } + + // Calculate capacity with minimum safeguard + int capacity = (int) Math.max(minCapacity, Math.ceil(baseCapacity * capacityFactor)); + + log.debug( + "Dynamic queue capacity: {} (base: {}, factor: {:.2f}, status: {})", + capacity, + baseCapacity, + capacityFactor, + status); + + return capacity; + } + + /** + * Checks if a job with the given weight can be executed immediately or should be queued based + * on current resource availability. + * + * @param resourceWeight The resource weight of the job (1-100) + * @return true if the job should be queued, false if it can run immediately + */ + public boolean shouldQueueJob(int resourceWeight) { + ResourceStatus status = currentStatus.get(); + + // Always run lightweight jobs (weight < 20) unless critical + if (resourceWeight < 20 && status != ResourceStatus.CRITICAL) { + return false; + } + + // Medium weight jobs run immediately if resources are OK + if (resourceWeight < 60 && status == ResourceStatus.OK) { + return false; + } + + // Heavy jobs (weight >= 60) and any job during WARNING/CRITICAL should be queued + return true; + } +} diff --git a/common/src/main/java/stirling/software/common/service/TaskManager.java b/common/src/main/java/stirling/software/common/service/TaskManager.java new file mode 100644 index 000000000..c2b3ba8a8 --- /dev/null +++ b/common/src/main/java/stirling/software/common/service/TaskManager.java @@ -0,0 +1,293 @@ +package stirling.software.common.service; + +import java.time.LocalDateTime; +import java.time.temporal.ChronoUnit; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; + +import jakarta.annotation.PreDestroy; + +import lombok.extern.slf4j.Slf4j; + +import stirling.software.common.model.job.JobResult; +import stirling.software.common.model.job.JobStats; + +/** Manages async tasks and their results */ +@Service +@Slf4j +public class TaskManager { + private final Map jobResults = new ConcurrentHashMap<>(); + + @Value("${stirling.jobResultExpiryMinutes:30}") + private int jobResultExpiryMinutes = 30; + + private final FileStorage fileStorage; + private final ScheduledExecutorService cleanupExecutor = + Executors.newSingleThreadScheduledExecutor(); + + /** Initialize the task manager and start the cleanup scheduler */ + public TaskManager(FileStorage fileStorage) { + this.fileStorage = fileStorage; + + // Schedule periodic cleanup of old job results + cleanupExecutor.scheduleAtFixedRate( + this::cleanupOldJobs, + 10, // Initial delay + 10, // Interval + TimeUnit.MINUTES); + + log.debug( + "Task manager initialized with job result expiry of {} minutes", + jobResultExpiryMinutes); + } + + /** + * Create a new task with the given job ID + * + * @param jobId The job ID + */ + public void createTask(String jobId) { + jobResults.put(jobId, JobResult.createNew(jobId)); + log.debug("Created task with job ID: {}", jobId); + } + + /** + * Set the result of a task as a general object + * + * @param jobId The job ID + * @param result The result object + */ + public void setResult(String jobId, Object result) { + JobResult jobResult = getOrCreateJobResult(jobId); + jobResult.completeWithResult(result); + log.debug("Set result for job ID: {}", jobId); + } + + /** + * Set the result of a task as a file + * + * @param jobId The job ID + * @param fileId The file ID + * @param originalFileName The original file name + * @param contentType The content type of the file + */ + public void setFileResult( + String jobId, String fileId, String originalFileName, String contentType) { + JobResult jobResult = getOrCreateJobResult(jobId); + jobResult.completeWithFile(fileId, originalFileName, contentType); + log.debug("Set file result for job ID: {} with file ID: {}", jobId, fileId); + } + + /** + * Set an error for a task + * + * @param jobId The job ID + * @param error The error message + */ + public void setError(String jobId, String error) { + JobResult jobResult = getOrCreateJobResult(jobId); + jobResult.failWithError(error); + log.debug("Set error for job ID: {}: {}", jobId, error); + } + + /** + * Mark a task as complete + * + * @param jobId The job ID + */ + public void setComplete(String jobId) { + JobResult jobResult = getOrCreateJobResult(jobId); + if (jobResult.getResult() == null + && jobResult.getFileId() == null + && jobResult.getError() == null) { + // If no result or error has been set, mark it as complete with an empty result + jobResult.completeWithResult("Task completed successfully"); + } + log.debug("Marked job ID: {} as complete", jobId); + } + + /** + * Check if a task is complete + * + * @param jobId The job ID + * @return true if the task is complete, false otherwise + */ + public boolean isComplete(String jobId) { + JobResult result = jobResults.get(jobId); + return result != null && result.isComplete(); + } + + /** + * Get the result of a task + * + * @param jobId The job ID + * @return The result object, or null if the task doesn't exist or is not complete + */ + public JobResult getJobResult(String jobId) { + return jobResults.get(jobId); + } + + /** + * Add a note to a task. Notes are informational messages that can be attached to a job for + * tracking purposes. + * + * @param jobId The job ID + * @param note The note to add + * @return true if the note was added successfully, false if the job doesn't exist + */ + public boolean addNote(String jobId, String note) { + JobResult jobResult = jobResults.get(jobId); + if (jobResult != null) { + jobResult.addNote(note); + log.debug("Added note to job ID: {}: {}", jobId, note); + return true; + } + log.warn("Attempted to add note to non-existent job ID: {}", jobId); + return false; + } + + /** + * Get statistics about all jobs in the system + * + * @return Job statistics + */ + public JobStats getJobStats() { + int totalJobs = jobResults.size(); + int activeJobs = 0; + int completedJobs = 0; + int failedJobs = 0; + int successfulJobs = 0; + int fileResultJobs = 0; + + LocalDateTime oldestActiveJobTime = null; + LocalDateTime newestActiveJobTime = null; + long totalProcessingTimeMs = 0; + + for (JobResult result : jobResults.values()) { + if (result.isComplete()) { + completedJobs++; + + // Calculate processing time for completed jobs + if (result.getCreatedAt() != null && result.getCompletedAt() != null) { + long processingTimeMs = + java.time.Duration.between( + result.getCreatedAt(), result.getCompletedAt()) + .toMillis(); + totalProcessingTimeMs += processingTimeMs; + } + + if (result.getError() != null) { + failedJobs++; + } else { + successfulJobs++; + if (result.getFileId() != null) { + fileResultJobs++; + } + } + } else { + activeJobs++; + + // Track oldest and newest active jobs + if (result.getCreatedAt() != null) { + if (oldestActiveJobTime == null + || result.getCreatedAt().isBefore(oldestActiveJobTime)) { + oldestActiveJobTime = result.getCreatedAt(); + } + + if (newestActiveJobTime == null + || result.getCreatedAt().isAfter(newestActiveJobTime)) { + newestActiveJobTime = result.getCreatedAt(); + } + } + } + } + + // Calculate average processing time + long averageProcessingTimeMs = + completedJobs > 0 ? totalProcessingTimeMs / completedJobs : 0; + + return JobStats.builder() + .totalJobs(totalJobs) + .activeJobs(activeJobs) + .completedJobs(completedJobs) + .failedJobs(failedJobs) + .successfulJobs(successfulJobs) + .fileResultJobs(fileResultJobs) + .oldestActiveJobTime(oldestActiveJobTime) + .newestActiveJobTime(newestActiveJobTime) + .averageProcessingTimeMs(averageProcessingTimeMs) + .build(); + } + + /** + * Get or create a job result + * + * @param jobId The job ID + * @return The job result + */ + private JobResult getOrCreateJobResult(String jobId) { + return jobResults.computeIfAbsent(jobId, JobResult::createNew); + } + + /** Clean up old completed job results */ + public void cleanupOldJobs() { + LocalDateTime expiryThreshold = + LocalDateTime.now().minus(jobResultExpiryMinutes, ChronoUnit.MINUTES); + int removedCount = 0; + + try { + for (Map.Entry entry : jobResults.entrySet()) { + JobResult result = entry.getValue(); + + // Remove completed jobs that are older than the expiry threshold + if (result.isComplete() + && result.getCompletedAt() != null + && result.getCompletedAt().isBefore(expiryThreshold)) { + + // If the job has a file result, delete the file + if (result.getFileId() != null) { + try { + fileStorage.deleteFile(result.getFileId()); + } catch (Exception e) { + log.warn( + "Failed to delete file for job {}: {}", + entry.getKey(), + e.getMessage()); + } + } + + // Remove the job result + jobResults.remove(entry.getKey()); + removedCount++; + } + } + + if (removedCount > 0) { + log.info("Cleaned up {} expired job results", removedCount); + } + } catch (Exception e) { + log.error("Error during job cleanup: {}", e.getMessage(), e); + } + } + + /** Shutdown the cleanup executor */ + @PreDestroy + public void shutdown() { + try { + log.info("Shutting down job result cleanup executor"); + cleanupExecutor.shutdown(); + if (!cleanupExecutor.awaitTermination(5, TimeUnit.SECONDS)) { + cleanupExecutor.shutdownNow(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + cleanupExecutor.shutdownNow(); + } + } +} diff --git a/common/src/main/java/stirling/software/common/util/ExecutorFactory.java b/common/src/main/java/stirling/software/common/util/ExecutorFactory.java new file mode 100644 index 000000000..13cca386e --- /dev/null +++ b/common/src/main/java/stirling/software/common/util/ExecutorFactory.java @@ -0,0 +1,31 @@ +package stirling.software.common.util; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class ExecutorFactory { + + /** + * Creates an ExecutorService using virtual threads if available (Java 21+), or falls back to a + * cached thread pool on older Java versions. + */ + public static ExecutorService newVirtualOrCachedThreadExecutor() { + try { + ExecutorService executor = + (ExecutorService) + Executors.class + .getMethod("newVirtualThreadPerTaskExecutor") + .invoke(null); + return executor; + } catch (NoSuchMethodException e) { + log.debug("Virtual threads not available; falling back to cached thread pool."); + } catch (Exception e) { + log.debug("Error initializing virtual thread executor: {}", e.getMessage(), e); + } + + return Executors.newCachedThreadPool(); + } +} diff --git a/common/src/main/java/stirling/software/common/util/SpringContextHolder.java b/common/src/main/java/stirling/software/common/util/SpringContextHolder.java new file mode 100644 index 000000000..0e35f1a33 --- /dev/null +++ b/common/src/main/java/stirling/software/common/util/SpringContextHolder.java @@ -0,0 +1,82 @@ +package stirling.software.common.util; + +import org.springframework.beans.BeansException; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; +import org.springframework.stereotype.Component; + +import lombok.extern.slf4j.Slf4j; + +/** + * Utility class to access Spring managed beans from non-Spring managed classes. This is especially + * useful for classes that are instantiated by frameworks or created dynamically. + */ +@Component +@Slf4j +public class SpringContextHolder implements ApplicationContextAware { + + private static ApplicationContext applicationContext; + + @Override + public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { + SpringContextHolder.applicationContext = applicationContext; + log.debug("Spring context holder initialized"); + } + + /** + * Get a Spring bean by class type + * + * @param The bean type + * @param beanClass The bean class + * @return The bean instance, or null if not found + */ + public static T getBean(Class beanClass) { + if (applicationContext == null) { + log.warn( + "Application context not initialized when attempting to get bean of type {}", + beanClass.getName()); + return null; + } + + try { + return applicationContext.getBean(beanClass); + } catch (BeansException e) { + log.error("Error getting bean of type {}: {}", beanClass.getName(), e.getMessage()); + return null; + } + } + + /** + * Get a Spring bean by name + * + * @param The bean type + * @param beanName The bean name + * @return The bean instance, or null if not found + */ + public static T getBean(String beanName) { + if (applicationContext == null) { + log.warn( + "Application context not initialized when attempting to get bean '{}'", + beanName); + return null; + } + + try { + @SuppressWarnings("unchecked") + T bean = (T) applicationContext.getBean(beanName); + return bean; + } catch (BeansException e) { + log.error("Error getting bean '{}': {}", beanName, e.getMessage()); + return null; + } + } + + /** + * Check if the application context is initialized + * + * @return true if initialized, false otherwise + */ + public static boolean isInitialized() { + return applicationContext != null; + } +} diff --git a/common/src/test/java/stirling/software/common/annotations/AutoJobPostMappingIntegrationTest.java b/common/src/test/java/stirling/software/common/annotations/AutoJobPostMappingIntegrationTest.java new file mode 100644 index 000000000..e25ceddf9 --- /dev/null +++ b/common/src/test/java/stirling/software/common/annotations/AutoJobPostMappingIntegrationTest.java @@ -0,0 +1,208 @@ +package stirling.software.common.annotations; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import org.junit.jupiter.api.BeforeEach; + +import java.util.Arrays; +import java.util.function.Supplier; + +import org.aspectj.lang.ProceedingJoinPoint; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.springframework.http.ResponseEntity; +import org.springframework.web.multipart.MultipartFile; + +import jakarta.servlet.http.HttpServletRequest; + +import stirling.software.common.aop.AutoJobAspect; +import stirling.software.common.model.api.PDFFile; +import stirling.software.common.service.FileOrUploadService; +import stirling.software.common.service.FileStorage; +import stirling.software.common.service.JobExecutorService; +import stirling.software.common.service.JobQueue; +import stirling.software.common.service.ResourceMonitor; + +@ExtendWith(MockitoExtension.class) +class AutoJobPostMappingIntegrationTest { + + private AutoJobAspect autoJobAspect; + + @Mock + private JobExecutorService jobExecutorService; + + @Mock + private HttpServletRequest request; + + @Mock + private FileOrUploadService fileOrUploadService; + + @Mock + private FileStorage fileStorage; + + + @Mock + private ResourceMonitor resourceMonitor; + + @Mock + private JobQueue jobQueue; + + @BeforeEach + void setUp() { + autoJobAspect = new AutoJobAspect( + jobExecutorService, + request, + fileOrUploadService, + fileStorage + ); + } + + @Mock + private ProceedingJoinPoint joinPoint; + + @Mock + private AutoJobPostMapping autoJobPostMapping; + + @Captor + private ArgumentCaptor> workCaptor; + + @Captor + private ArgumentCaptor asyncCaptor; + + @Captor + private ArgumentCaptor timeoutCaptor; + + @Captor + private ArgumentCaptor queueableCaptor; + + @Captor + private ArgumentCaptor resourceWeightCaptor; + + @Test + void shouldExecuteWithCustomParameters() throws Throwable { + // Given + PDFFile pdfFile = new PDFFile(); + pdfFile.setFileId("test-file-id"); + Object[] args = new Object[] { pdfFile }; + + when(joinPoint.getArgs()).thenReturn(args); + when(request.getParameter("async")).thenReturn("true"); + when(autoJobPostMapping.timeout()).thenReturn(60000L); + when(autoJobPostMapping.retryCount()).thenReturn(3); + when(autoJobPostMapping.trackProgress()).thenReturn(true); + when(autoJobPostMapping.queueable()).thenReturn(true); + when(autoJobPostMapping.resourceWeight()).thenReturn(75); + + MultipartFile mockFile = mock(MultipartFile.class); + when(fileStorage.retrieveFile("test-file-id")).thenReturn(mockFile); + + + when(jobExecutorService.runJobGeneric( + anyBoolean(), any(Supplier.class), anyLong(), anyBoolean(), anyInt())) + .thenReturn(ResponseEntity.ok("success")); + + // When + Object result = autoJobAspect.wrapWithJobExecution(joinPoint, autoJobPostMapping); + + // Then + assertEquals(ResponseEntity.ok("success"), result); + + verify(jobExecutorService).runJobGeneric( + asyncCaptor.capture(), + workCaptor.capture(), + timeoutCaptor.capture(), + queueableCaptor.capture(), + resourceWeightCaptor.capture()); + + assertTrue(asyncCaptor.getValue(), "Async should be true"); + assertEquals(60000L, timeoutCaptor.getValue(), "Timeout should be 60000ms"); + assertTrue(queueableCaptor.getValue(), "Queueable should be true"); + assertEquals(75, resourceWeightCaptor.getValue(), "Resource weight should be 75"); + + // Test that file was resolved + assertNotNull(pdfFile.getFileInput(), "File input should be set"); + } + + @Test + void shouldRetryOnError() throws Throwable { + // Given + when(joinPoint.getArgs()).thenReturn(new Object[0]); + when(request.getParameter("async")).thenReturn("false"); + when(autoJobPostMapping.timeout()).thenReturn(-1L); + when(autoJobPostMapping.retryCount()).thenReturn(2); + when(autoJobPostMapping.trackProgress()).thenReturn(false); + when(autoJobPostMapping.queueable()).thenReturn(false); + when(autoJobPostMapping.resourceWeight()).thenReturn(50); + + // First call throws exception, second succeeds + when(joinPoint.proceed(any())) + .thenThrow(new RuntimeException("First attempt failed")) + .thenReturn(ResponseEntity.ok("retry succeeded")); + + // Mock jobExecutorService to execute the work immediately + when(jobExecutorService.runJobGeneric( + anyBoolean(), any(Supplier.class), anyLong(), anyBoolean(), anyInt())) + .thenAnswer(invocation -> { + Supplier work = invocation.getArgument(1); + return work.get(); + }); + + // When + Object result = autoJobAspect.wrapWithJobExecution(joinPoint, autoJobPostMapping); + + // Then + assertEquals(ResponseEntity.ok("retry succeeded"), result); + + // Verify that proceed was called twice (initial attempt + 1 retry) + verify(joinPoint, times(2)).proceed(any()); + } + + @Test + void shouldHandlePDFFileWithAsyncRequests() throws Throwable { + // Given + PDFFile pdfFile = new PDFFile(); + pdfFile.setFileInput(mock(MultipartFile.class)); + Object[] args = new Object[] { pdfFile }; + + when(joinPoint.getArgs()).thenReturn(args); + when(request.getParameter("async")).thenReturn("true"); + when(autoJobPostMapping.retryCount()).thenReturn(1); + + when(fileStorage.storeFile(any(MultipartFile.class))).thenReturn("stored-file-id"); + when(fileStorage.retrieveFile("stored-file-id")).thenReturn(mock(MultipartFile.class)); + + // Mock job executor to return a successful response + when(jobExecutorService.runJobGeneric( + anyBoolean(), any(Supplier.class), anyLong(), anyBoolean(), anyInt())) + .thenReturn(ResponseEntity.ok("success")); + + // When + autoJobAspect.wrapWithJobExecution(joinPoint, autoJobPostMapping); + + // Then + assertEquals("stored-file-id", pdfFile.getFileId(), + "FileId should be set to the stored file id"); + assertNotNull(pdfFile.getFileInput(), "FileInput should be replaced with persistent file"); + + // Verify storage operations + verify(fileStorage).storeFile(any(MultipartFile.class)); + verify(fileStorage).retrieveFile("stored-file-id"); + } +} diff --git a/common/src/test/java/stirling/software/common/service/FileStorageTest.java b/common/src/test/java/stirling/software/common/service/FileStorageTest.java new file mode 100644 index 000000000..f1ca1ffdf --- /dev/null +++ b/common/src/test/java/stirling/software/common/service/FileStorageTest.java @@ -0,0 +1,190 @@ +package stirling.software.common.service; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.Mockito.*; +import static org.mockito.AdditionalAnswers.*; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.UUID; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.springframework.test.util.ReflectionTestUtils; +import org.springframework.web.multipart.MultipartFile; + +class FileStorageTest { + + @TempDir + Path tempDir; + + @Mock + private FileOrUploadService fileOrUploadService; + + @InjectMocks + private FileStorage fileStorage; + + private MultipartFile mockFile; + + @BeforeEach + void setUp() { + MockitoAnnotations.openMocks(this); + ReflectionTestUtils.setField(fileStorage, "tempDirPath", tempDir.toString()); + + // Create a mock MultipartFile + mockFile = mock(MultipartFile.class); + when(mockFile.getOriginalFilename()).thenReturn("test.pdf"); + when(mockFile.getContentType()).thenReturn("application/pdf"); + } + + @Test + void testStoreFile() throws IOException { + // Arrange + byte[] fileContent = "Test PDF content".getBytes(); + when(mockFile.getBytes()).thenReturn(fileContent); + + // Set up mock to handle transferTo by writing the file + doAnswer(invocation -> { + java.io.File file = invocation.getArgument(0); + Files.write(file.toPath(), fileContent); + return null; + }).when(mockFile).transferTo(any(java.io.File.class)); + + // Act + String fileId = fileStorage.storeFile(mockFile); + + // Assert + assertNotNull(fileId); + assertTrue(Files.exists(tempDir.resolve(fileId))); + verify(mockFile).transferTo(any(java.io.File.class)); + } + + @Test + void testStoreBytes() throws IOException { + // Arrange + byte[] fileContent = "Test PDF content".getBytes(); + String originalName = "test.pdf"; + + // Act + String fileId = fileStorage.storeBytes(fileContent, originalName); + + // Assert + assertNotNull(fileId); + assertTrue(Files.exists(tempDir.resolve(fileId))); + assertArrayEquals(fileContent, Files.readAllBytes(tempDir.resolve(fileId))); + } + + @Test + void testRetrieveFile() throws IOException { + // Arrange + byte[] fileContent = "Test PDF content".getBytes(); + String fileId = UUID.randomUUID().toString(); + Path filePath = tempDir.resolve(fileId); + Files.write(filePath, fileContent); + + MultipartFile expectedFile = mock(MultipartFile.class); + when(fileOrUploadService.toMockMultipartFile(eq(fileId), eq(fileContent))) + .thenReturn(expectedFile); + + // Act + MultipartFile result = fileStorage.retrieveFile(fileId); + + // Assert + assertSame(expectedFile, result); + verify(fileOrUploadService).toMockMultipartFile(eq(fileId), eq(fileContent)); + } + + @Test + void testRetrieveBytes() throws IOException { + // Arrange + byte[] fileContent = "Test PDF content".getBytes(); + String fileId = UUID.randomUUID().toString(); + Path filePath = tempDir.resolve(fileId); + Files.write(filePath, fileContent); + + // Act + byte[] result = fileStorage.retrieveBytes(fileId); + + // Assert + assertArrayEquals(fileContent, result); + } + + @Test + void testRetrieveFile_FileNotFound() { + // Arrange + String nonExistentFileId = "non-existent-file"; + + // Act & Assert + assertThrows(IOException.class, () -> fileStorage.retrieveFile(nonExistentFileId)); + } + + @Test + void testRetrieveBytes_FileNotFound() { + // Arrange + String nonExistentFileId = "non-existent-file"; + + // Act & Assert + assertThrows(IOException.class, () -> fileStorage.retrieveBytes(nonExistentFileId)); + } + + @Test + void testDeleteFile() throws IOException { + // Arrange + byte[] fileContent = "Test PDF content".getBytes(); + String fileId = UUID.randomUUID().toString(); + Path filePath = tempDir.resolve(fileId); + Files.write(filePath, fileContent); + + // Act + boolean result = fileStorage.deleteFile(fileId); + + // Assert + assertTrue(result); + assertFalse(Files.exists(filePath)); + } + + @Test + void testDeleteFile_FileNotFound() { + // Arrange + String nonExistentFileId = "non-existent-file"; + + // Act + boolean result = fileStorage.deleteFile(nonExistentFileId); + + // Assert + assertFalse(result); + } + + @Test + void testFileExists() throws IOException { + // Arrange + byte[] fileContent = "Test PDF content".getBytes(); + String fileId = UUID.randomUUID().toString(); + Path filePath = tempDir.resolve(fileId); + Files.write(filePath, fileContent); + + // Act + boolean result = fileStorage.fileExists(fileId); + + // Assert + assertTrue(result); + } + + @Test + void testFileExists_FileNotFound() { + // Arrange + String nonExistentFileId = "non-existent-file"; + + // Act + boolean result = fileStorage.fileExists(nonExistentFileId); + + // Assert + assertFalse(result); + } +} \ No newline at end of file diff --git a/common/src/test/java/stirling/software/common/service/JobExecutorServiceTest.java b/common/src/test/java/stirling/software/common/service/JobExecutorServiceTest.java new file mode 100644 index 000000000..a4d293b1b --- /dev/null +++ b/common/src/test/java/stirling/software/common/service/JobExecutorServiceTest.java @@ -0,0 +1,202 @@ +package stirling.software.common.service; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeoutException; +import java.util.function.Supplier; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.jupiter.MockitoExtension; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import org.springframework.test.util.ReflectionTestUtils; + +import jakarta.servlet.http.HttpServletRequest; + +import stirling.software.common.model.job.JobProgress; +import stirling.software.common.model.job.JobResponse; + +@ExtendWith(MockitoExtension.class) +class JobExecutorServiceTest { + + private JobExecutorService jobExecutorService; + + @Mock + private TaskManager taskManager; + + @Mock + private FileStorage fileStorage; + + @Mock + private HttpServletRequest request; + + @Mock + private ResourceMonitor resourceMonitor; + + @Mock + private JobQueue jobQueue; + + @Captor + private ArgumentCaptor jobIdCaptor; + + @BeforeEach + void setUp() { + // Initialize the service manually with all its dependencies + jobExecutorService = new JobExecutorService( + taskManager, + fileStorage, + request, + resourceMonitor, + jobQueue, + 30000L, // asyncRequestTimeoutMs + "30m" // sessionTimeout + ); + } + + @Test + void shouldRunSyncJobSuccessfully() throws Exception { + // Given + Supplier work = () -> "test-result"; + + // When + ResponseEntity response = jobExecutorService.runJobGeneric(false, work); + + // Then + assertEquals(HttpStatus.OK, response.getStatusCode()); + assertEquals("test-result", response.getBody()); + + // Verify request attribute was set with jobId + verify(request).setAttribute(eq("jobId"), anyString()); + } + + @Test + void shouldRunAsyncJobSuccessfully() throws Exception { + // Given + Supplier work = () -> "test-result"; + + // When + ResponseEntity response = jobExecutorService.runJobGeneric(true, work); + + // Then + assertEquals(HttpStatus.OK, response.getStatusCode()); + assertTrue(response.getBody() instanceof JobResponse); + JobResponse jobResponse = (JobResponse) response.getBody(); + assertTrue(jobResponse.isAsync()); + assertNotNull(jobResponse.getJobId()); + + // Verify task manager was called + verify(taskManager).createTask(jobIdCaptor.capture()); + } + + + @Test + void shouldHandleSyncJobError() { + // Given + Supplier work = () -> { + throw new RuntimeException("Test error"); + }; + + // When + ResponseEntity response = jobExecutorService.runJobGeneric(false, work); + + // Then + assertEquals(HttpStatus.INTERNAL_SERVER_ERROR, response.getStatusCode()); + + @SuppressWarnings("unchecked") + Map errorMap = (Map) response.getBody(); + assertEquals("Job failed: Test error", errorMap.get("error")); + } + + @Test + void shouldQueueJobWhenResourcesLimited() { + // Given + Supplier work = () -> "test-result"; + CompletableFuture> future = new CompletableFuture<>(); + + // Configure resourceMonitor to indicate job should be queued + when(resourceMonitor.shouldQueueJob(80)).thenReturn(true); + + // Configure jobQueue to return our future + when(jobQueue.queueJob(anyString(), eq(80), any(), anyLong())).thenReturn(future); + + // When + ResponseEntity response = jobExecutorService.runJobGeneric( + true, work, 5000, true, 80); + + // Then + assertEquals(HttpStatus.OK, response.getStatusCode()); + assertTrue(response.getBody() instanceof JobResponse); + + // Verify job was queued + verify(jobQueue).queueJob(anyString(), eq(80), any(), eq(5000L)); + verify(taskManager).createTask(anyString()); + } + + @Test + void shouldUseCustomTimeoutWhenProvided() throws Exception { + // Given + Supplier work = () -> "test-result"; + long customTimeout = 60000L; + + // Use reflection to access the private executeWithTimeout method + java.lang.reflect.Method executeMethod = JobExecutorService.class + .getDeclaredMethod("executeWithTimeout", Supplier.class, long.class); + executeMethod.setAccessible(true); + + // Create a spy on the JobExecutorService to verify method calls + JobExecutorService spy = Mockito.spy(jobExecutorService); + + // When + spy.runJobGeneric(false, work, customTimeout); + + // Then + verify(spy).runJobGeneric(eq(false), any(Supplier.class), eq(customTimeout)); + } + + @Test + void shouldHandleTimeout() throws Exception { + // Given + Supplier work = () -> { + try { + Thread.sleep(100); // Simulate long-running job + return "test-result"; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + }; + + // Use reflection to access the private executeWithTimeout method + java.lang.reflect.Method executeMethod = JobExecutorService.class + .getDeclaredMethod("executeWithTimeout", Supplier.class, long.class); + executeMethod.setAccessible(true); + + // When/Then + try { + executeMethod.invoke(jobExecutorService, work, 1L); // Very short timeout + } catch (Exception e) { + assertTrue(e.getCause() instanceof TimeoutException); + } + } +} \ No newline at end of file diff --git a/common/src/test/java/stirling/software/common/service/JobQueueTest.java b/common/src/test/java/stirling/software/common/service/JobQueueTest.java new file mode 100644 index 000000000..813f5e172 --- /dev/null +++ b/common/src/test/java/stirling/software/common/service/JobQueueTest.java @@ -0,0 +1,102 @@ +package stirling.software.common.service; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import stirling.software.common.model.job.JobProgress; +import stirling.software.common.service.ResourceMonitor.ResourceStatus; + +@ExtendWith(MockitoExtension.class) +class JobQueueTest { + + private JobQueue jobQueue; + + @Mock + private ResourceMonitor resourceMonitor; + + + private final AtomicReference statusRef = new AtomicReference<>(ResourceStatus.OK); + + @BeforeEach + void setUp() { + // Mark stubbing as lenient to avoid UnnecessaryStubbingException + lenient().when(resourceMonitor.calculateDynamicQueueCapacity(anyInt(), anyInt())).thenReturn(10); + lenient().when(resourceMonitor.getCurrentStatus()).thenReturn(statusRef); + + // Initialize JobQueue with mocked ResourceMonitor + jobQueue = new JobQueue(resourceMonitor); + } + + @Test + void shouldQueueJob() { + String jobId = "test-job-1"; + int resourceWeight = 50; + Supplier work = () -> "test-result"; + long timeoutMs = 1000; + + jobQueue.queueJob(jobId, resourceWeight, work, timeoutMs); + + + assertTrue(jobQueue.isJobQueued(jobId)); + assertEquals(1, jobQueue.getTotalQueuedJobs()); + } + + @Test + void shouldCancelJob() { + String jobId = "test-job-2"; + Supplier work = () -> "test-result"; + + jobQueue.queueJob(jobId, 50, work, 1000); + boolean cancelled = jobQueue.cancelJob(jobId); + + assertTrue(cancelled); + assertFalse(jobQueue.isJobQueued(jobId)); + } + + @Test + void shouldGetQueueStats() { + when(resourceMonitor.getCurrentStatus()).thenReturn(statusRef); + + jobQueue.queueJob("job1", 50, () -> "ok", 1000); + jobQueue.queueJob("job2", 50, () -> "ok", 1000); + jobQueue.cancelJob("job2"); + + Map stats = jobQueue.getQueueStats(); + + assertEquals(2, stats.get("totalQueuedJobs")); + assertTrue(stats.containsKey("queuedJobs")); + assertTrue(stats.containsKey("resourceStatus")); + } + + @Test + void shouldCalculateQueueCapacity() { + when(resourceMonitor.calculateDynamicQueueCapacity(5, 2)).thenReturn(8); + int capacity = resourceMonitor.calculateDynamicQueueCapacity(5, 2); + assertEquals(8, capacity); + } + + @Test + void shouldCheckIfJobIsQueued() { + String jobId = "job-123"; + Supplier work = () -> "hello"; + + jobQueue.queueJob(jobId, 40, work, 500); + + assertTrue(jobQueue.isJobQueued(jobId)); + assertFalse(jobQueue.isJobQueued("nonexistent")); + } +} \ No newline at end of file diff --git a/common/src/test/java/stirling/software/common/service/ResourceMonitorTest.java b/common/src/test/java/stirling/software/common/service/ResourceMonitorTest.java new file mode 100644 index 000000000..a707b87e6 --- /dev/null +++ b/common/src/test/java/stirling/software/common/service/ResourceMonitorTest.java @@ -0,0 +1,137 @@ +package stirling.software.common.service; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.lang.management.MemoryMXBean; +import java.lang.management.MemoryUsage; +import java.lang.management.OperatingSystemMXBean; +import java.time.Instant; +import java.util.concurrent.atomic.AtomicReference; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.Spy; +import org.mockito.junit.jupiter.MockitoExtension; +import org.springframework.test.util.ReflectionTestUtils; + +import stirling.software.common.service.ResourceMonitor.ResourceMetrics; +import stirling.software.common.service.ResourceMonitor.ResourceStatus; + +@ExtendWith(MockitoExtension.class) +class ResourceMonitorTest { + + @InjectMocks + private ResourceMonitor resourceMonitor; + + @Mock + private OperatingSystemMXBean osMXBean; + + @Mock + private MemoryMXBean memoryMXBean; + + @Spy + private AtomicReference currentStatus = new AtomicReference<>(ResourceStatus.OK); + + @Spy + private AtomicReference latestMetrics = new AtomicReference<>(new ResourceMetrics()); + + @BeforeEach + void setUp() { + // Set thresholds for testing + ReflectionTestUtils.setField(resourceMonitor, "memoryCriticalThreshold", 0.9); + ReflectionTestUtils.setField(resourceMonitor, "memoryHighThreshold", 0.75); + ReflectionTestUtils.setField(resourceMonitor, "cpuCriticalThreshold", 0.9); + ReflectionTestUtils.setField(resourceMonitor, "cpuHighThreshold", 0.75); + ReflectionTestUtils.setField(resourceMonitor, "osMXBean", osMXBean); + ReflectionTestUtils.setField(resourceMonitor, "memoryMXBean", memoryMXBean); + ReflectionTestUtils.setField(resourceMonitor, "currentStatus", currentStatus); + ReflectionTestUtils.setField(resourceMonitor, "latestMetrics", latestMetrics); + } + + @Test + void shouldCalculateDynamicQueueCapacity() { + // Given + int baseCapacity = 10; + int minCapacity = 2; + + // Mock current status as OK + currentStatus.set(ResourceStatus.OK); + + // When + int capacity = resourceMonitor.calculateDynamicQueueCapacity(baseCapacity, minCapacity); + + // Then + assertEquals(baseCapacity, capacity, "With OK status, capacity should equal base capacity"); + + // Given + currentStatus.set(ResourceStatus.WARNING); + + // When + capacity = resourceMonitor.calculateDynamicQueueCapacity(baseCapacity, minCapacity); + + // Then + assertEquals(6, capacity, "With WARNING status, capacity should be reduced to 60%"); + + // Given + currentStatus.set(ResourceStatus.CRITICAL); + + // When + capacity = resourceMonitor.calculateDynamicQueueCapacity(baseCapacity, minCapacity); + + // Then + assertEquals(3, capacity, "With CRITICAL status, capacity should be reduced to 30%"); + + // Test minimum capacity enforcement + assertEquals(minCapacity, resourceMonitor.calculateDynamicQueueCapacity(1, minCapacity), + "Should never go below minimum capacity"); + } + + @ParameterizedTest + @CsvSource({ + "10, OK, false", // Light job, OK status + "10, WARNING, false", // Light job, WARNING status + "10, CRITICAL, true", // Light job, CRITICAL status + "30, OK, false", // Medium job, OK status + "30, WARNING, true", // Medium job, WARNING status + "30, CRITICAL, true", // Medium job, CRITICAL status + "80, OK, true", // Heavy job, OK status + "80, WARNING, true", // Heavy job, WARNING status + "80, CRITICAL, true" // Heavy job, CRITICAL status + }) + void shouldQueueJobBasedOnWeightAndStatus(int weight, ResourceStatus status, boolean shouldQueue) { + // Given + currentStatus.set(status); + + // When + boolean result = resourceMonitor.shouldQueueJob(weight); + + // Then + assertEquals(shouldQueue, result, + String.format("For weight %d and status %s, shouldQueue should be %s", + weight, status, shouldQueue)); + } + + @Test + void resourceMetricsShouldDetectStaleState() { + // Given + Instant now = Instant.now(); + Instant pastInstant = now.minusMillis(6000); + + ResourceMetrics staleMetrics = new ResourceMetrics(0.5, 0.5, 1024, 2048, 4096, pastInstant); + ResourceMetrics freshMetrics = new ResourceMetrics(0.5, 0.5, 1024, 2048, 4096, now); + + // When/Then + assertTrue(staleMetrics.isStale(5000), "Metrics from 6 seconds ago should be stale with 5s threshold"); + assertFalse(freshMetrics.isStale(5000), "Fresh metrics should not be stale"); + } +} \ No newline at end of file diff --git a/common/src/test/java/stirling/software/common/service/TaskManagerTest.java b/common/src/test/java/stirling/software/common/service/TaskManagerTest.java new file mode 100644 index 000000000..85f62aed4 --- /dev/null +++ b/common/src/test/java/stirling/software/common/service/TaskManagerTest.java @@ -0,0 +1,287 @@ +package stirling.software.common.service; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.Mockito.*; + +import java.time.LocalDateTime; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.springframework.test.util.ReflectionTestUtils; + +import stirling.software.common.model.job.JobResult; +import stirling.software.common.model.job.JobStats; + +class TaskManagerTest { + + @Mock + private FileStorage fileStorage; + + @InjectMocks + private TaskManager taskManager; + + private AutoCloseable closeable; + + @BeforeEach + void setUp() { + closeable = MockitoAnnotations.openMocks(this); + ReflectionTestUtils.setField(taskManager, "jobResultExpiryMinutes", 30); + } + + @AfterEach + void tearDown() throws Exception { + closeable.close(); + } + + @Test + void testCreateTask() { + // Act + String jobId = UUID.randomUUID().toString(); + taskManager.createTask(jobId); + + // Assert + JobResult result = taskManager.getJobResult(jobId); + assertNotNull(result); + assertEquals(jobId, result.getJobId()); + assertFalse(result.isComplete()); + assertNotNull(result.getCreatedAt()); + } + + @Test + void testSetResult() { + // Arrange + String jobId = UUID.randomUUID().toString(); + taskManager.createTask(jobId); + Object resultObject = "Test result"; + + // Act + taskManager.setResult(jobId, resultObject); + + // Assert + JobResult result = taskManager.getJobResult(jobId); + assertNotNull(result); + assertTrue(result.isComplete()); + assertEquals(resultObject, result.getResult()); + assertNotNull(result.getCompletedAt()); + } + + @Test + void testSetFileResult() { + // Arrange + String jobId = UUID.randomUUID().toString(); + taskManager.createTask(jobId); + String fileId = "file-id"; + String originalFileName = "test.pdf"; + String contentType = "application/pdf"; + + // Act + taskManager.setFileResult(jobId, fileId, originalFileName, contentType); + + // Assert + JobResult result = taskManager.getJobResult(jobId); + assertNotNull(result); + assertTrue(result.isComplete()); + assertEquals(fileId, result.getFileId()); + assertEquals(originalFileName, result.getOriginalFileName()); + assertEquals(contentType, result.getContentType()); + assertNotNull(result.getCompletedAt()); + } + + @Test + void testSetError() { + // Arrange + String jobId = UUID.randomUUID().toString(); + taskManager.createTask(jobId); + String errorMessage = "Test error"; + + // Act + taskManager.setError(jobId, errorMessage); + + // Assert + JobResult result = taskManager.getJobResult(jobId); + assertNotNull(result); + assertTrue(result.isComplete()); + assertEquals(errorMessage, result.getError()); + assertNotNull(result.getCompletedAt()); + } + + @Test + void testSetComplete_WithExistingResult() { + // Arrange + String jobId = UUID.randomUUID().toString(); + taskManager.createTask(jobId); + Object resultObject = "Test result"; + taskManager.setResult(jobId, resultObject); + + // Act + taskManager.setComplete(jobId); + + // Assert + JobResult result = taskManager.getJobResult(jobId); + assertNotNull(result); + assertTrue(result.isComplete()); + assertEquals(resultObject, result.getResult()); + } + + @Test + void testSetComplete_WithoutExistingResult() { + // Arrange + String jobId = UUID.randomUUID().toString(); + taskManager.createTask(jobId); + + // Act + taskManager.setComplete(jobId); + + // Assert + JobResult result = taskManager.getJobResult(jobId); + assertNotNull(result); + assertTrue(result.isComplete()); + assertEquals("Task completed successfully", result.getResult()); + } + + @Test + void testIsComplete() { + // Arrange + String jobId = UUID.randomUUID().toString(); + taskManager.createTask(jobId); + + // Assert - not complete initially + assertFalse(taskManager.isComplete(jobId)); + + // Act - mark as complete + taskManager.setComplete(jobId); + + // Assert - now complete + assertTrue(taskManager.isComplete(jobId)); + } + + @Test + void testGetJobStats() { + // Arrange + // 1. Create active job + String activeJobId = "active-job"; + taskManager.createTask(activeJobId); + + // 2. Create completed successful job with file + String successFileJobId = "success-file-job"; + taskManager.createTask(successFileJobId); + taskManager.setFileResult(successFileJobId, "file-id", "test.pdf", "application/pdf"); + + // 3. Create completed successful job without file + String successJobId = "success-job"; + taskManager.createTask(successJobId); + taskManager.setResult(successJobId, "Result"); + + // 4. Create failed job + String failedJobId = "failed-job"; + taskManager.createTask(failedJobId); + taskManager.setError(failedJobId, "Error message"); + + // Act + JobStats stats = taskManager.getJobStats(); + + // Assert + assertEquals(4, stats.getTotalJobs()); + assertEquals(1, stats.getActiveJobs()); + assertEquals(3, stats.getCompletedJobs()); + assertEquals(1, stats.getFailedJobs()); + assertEquals(2, stats.getSuccessfulJobs()); + assertEquals(1, stats.getFileResultJobs()); + assertNotNull(stats.getNewestActiveJobTime()); + assertNotNull(stats.getOldestActiveJobTime()); + assertTrue(stats.getAverageProcessingTimeMs() >= 0); + } + + @Test + void testCleanupOldJobs() throws Exception { + // Arrange + // 1. Create a recent completed job + String recentJobId = "recent-job"; + taskManager.createTask(recentJobId); + taskManager.setResult(recentJobId, "Result"); + + // 2. Create an old completed job with file result + String oldJobId = "old-job"; + taskManager.createTask(oldJobId); + JobResult oldJob = taskManager.getJobResult(oldJobId); + + // Manually set the completion time to be older than the expiry + LocalDateTime oldTime = LocalDateTime.now().minusHours(1); + ReflectionTestUtils.setField(oldJob, "completedAt", oldTime); + ReflectionTestUtils.setField(oldJob, "complete", true); + ReflectionTestUtils.setField(oldJob, "fileId", "file-id"); + ReflectionTestUtils.setField(oldJob, "originalFileName", "test.pdf"); + ReflectionTestUtils.setField(oldJob, "contentType", "application/pdf"); + + when(fileStorage.deleteFile("file-id")).thenReturn(true); + + // Obtain access to the private jobResults map + Map jobResultsMap = (Map) ReflectionTestUtils.getField(taskManager, "jobResults"); + + // 3. Create an active job + String activeJobId = "active-job"; + taskManager.createTask(activeJobId); + + // Verify all jobs are in the map + assertTrue(jobResultsMap.containsKey(recentJobId)); + assertTrue(jobResultsMap.containsKey(oldJobId)); + assertTrue(jobResultsMap.containsKey(activeJobId)); + + // Act + taskManager.cleanupOldJobs(); + + // Assert - the old job should be removed + assertFalse(jobResultsMap.containsKey(oldJobId)); + assertTrue(jobResultsMap.containsKey(recentJobId)); + assertTrue(jobResultsMap.containsKey(activeJobId)); + verify(fileStorage).deleteFile("file-id"); + } + + @Test + void testShutdown() throws Exception { + // This mainly tests that the shutdown method doesn't throw exceptions + taskManager.shutdown(); + + // Verify the executor service is shutdown + // This is difficult to test directly, but we can verify it doesn't throw exceptions + } + + @Test + void testAddNote() { + // Arrange + String jobId = UUID.randomUUID().toString(); + taskManager.createTask(jobId); + String note = "Test note"; + + // Act + boolean result = taskManager.addNote(jobId, note); + + // Assert + assertTrue(result); + JobResult jobResult = taskManager.getJobResult(jobId); + assertNotNull(jobResult); + assertNotNull(jobResult.getNotes()); + assertEquals(1, jobResult.getNotes().size()); + assertEquals(note, jobResult.getNotes().get(0)); + } + + @Test + void testAddNote_NonExistentJob() { + // Arrange + String jobId = "non-existent-job"; + String note = "Test note"; + + // Act + boolean result = taskManager.addNote(jobId, note); + + // Assert + assertFalse(result); + } +} diff --git a/common/src/test/java/stirling/software/common/util/SpringContextHolderTest.java b/common/src/test/java/stirling/software/common/util/SpringContextHolderTest.java new file mode 100644 index 000000000..9aa1aadf1 --- /dev/null +++ b/common/src/test/java/stirling/software/common/util/SpringContextHolderTest.java @@ -0,0 +1,73 @@ +package stirling.software.common.util; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.Mockito.*; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.context.ApplicationContext; + +class SpringContextHolderTest { + + private ApplicationContext mockApplicationContext; + private SpringContextHolder contextHolder; + + @BeforeEach + void setUp() { + mockApplicationContext = mock(ApplicationContext.class); + contextHolder = new SpringContextHolder(); + } + + @Test + void testSetApplicationContext() { + // Act + contextHolder.setApplicationContext(mockApplicationContext); + + // Assert + assertTrue(SpringContextHolder.isInitialized()); + } + + @Test + void testGetBean_ByType() { + // Arrange + contextHolder.setApplicationContext(mockApplicationContext); + TestBean expectedBean = new TestBean(); + when(mockApplicationContext.getBean(TestBean.class)).thenReturn(expectedBean); + + // Act + TestBean result = SpringContextHolder.getBean(TestBean.class); + + // Assert + assertSame(expectedBean, result); + verify(mockApplicationContext).getBean(TestBean.class); + } + + + @Test + void testGetBean_ApplicationContextNotSet() { + // Don't set application context + + // Act + TestBean result = SpringContextHolder.getBean(TestBean.class); + + // Assert + assertNull(result); + } + + @Test + void testGetBean_BeanNotFound() { + // Arrange + contextHolder.setApplicationContext(mockApplicationContext); + when(mockApplicationContext.getBean(TestBean.class)).thenThrow(new org.springframework.beans.BeansException("Bean not found") {}); + + // Act + TestBean result = SpringContextHolder.getBean(TestBean.class); + + // Assert + assertNull(result); + } + + // Simple test class + private static class TestBean { + } +} \ No newline at end of file diff --git a/proprietary/src/main/java/stirling/software/proprietary/audit/AuditAspect.java b/proprietary/src/main/java/stirling/software/proprietary/audit/AuditAspect.java index 8b9d46103..0975562ee 100644 --- a/proprietary/src/main/java/stirling/software/proprietary/audit/AuditAspect.java +++ b/proprietary/src/main/java/stirling/software/proprietary/audit/AuditAspect.java @@ -26,6 +26,8 @@ import stirling.software.proprietary.service.AuditService; @Component @Slf4j @RequiredArgsConstructor +@org.springframework.core.annotation.Order( + 10) // Lower precedence (higher number) - executes after AutoJobAspect public class AuditAspect { private final AuditService auditService; diff --git a/proprietary/src/main/java/stirling/software/proprietary/audit/ControllerAuditAspect.java b/proprietary/src/main/java/stirling/software/proprietary/audit/ControllerAuditAspect.java index 740555439..78d6bf2ab 100644 --- a/proprietary/src/main/java/stirling/software/proprietary/audit/ControllerAuditAspect.java +++ b/proprietary/src/main/java/stirling/software/proprietary/audit/ControllerAuditAspect.java @@ -36,6 +36,8 @@ import stirling.software.proprietary.service.AuditService; @Component @Slf4j @RequiredArgsConstructor +@org.springframework.core.annotation.Order( + 10) // Lower precedence (higher number) - executes after AutoJobAspect public class ControllerAuditAspect { private final AuditService auditService; @@ -77,6 +79,12 @@ public class ControllerAuditAspect { return auditController(joinPoint, "PATCH"); } + /** Intercept all methods with AutoJobPostMapping annotation */ + @Around("@annotation(stirling.software.common.annotations.AutoJobPostMapping)") + public Object auditAutoJobMethod(ProceedingJoinPoint joinPoint) throws Throwable { + return auditController(joinPoint, "POST"); + } + private Object auditController(ProceedingJoinPoint joinPoint, String httpMethod) throws Throwable { MethodSignature sig = (MethodSignature) joinPoint.getSignature(); diff --git a/proprietary/src/main/java/stirling/software/proprietary/controller/AdminJobController.java b/proprietary/src/main/java/stirling/software/proprietary/controller/AdminJobController.java new file mode 100644 index 000000000..cdb8f24a3 --- /dev/null +++ b/proprietary/src/main/java/stirling/software/proprietary/controller/AdminJobController.java @@ -0,0 +1,83 @@ +package stirling.software.proprietary.controller; + +import java.util.Map; + +import org.springframework.http.ResponseEntity; +import org.springframework.security.access.prepost.PreAuthorize; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RestController; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +import stirling.software.common.model.job.JobStats; +import stirling.software.common.service.JobQueue; +import stirling.software.common.service.TaskManager; + +/** + * Admin controller for job management. These endpoints require admin privileges and provide insight + * into system jobs and queues. + */ +@RestController +@RequiredArgsConstructor +@Slf4j +public class AdminJobController { + + private final TaskManager taskManager; + private final JobQueue jobQueue; + + /** + * Get statistics about jobs in the system (admin only) + * + * @return Job statistics + */ + @GetMapping("/api/v1/admin/job/stats") + @PreAuthorize("hasRole('ROLE_ADMIN')") + public ResponseEntity getJobStats() { + JobStats stats = taskManager.getJobStats(); + log.info( + "Admin requested job stats: {} active, {} completed jobs", + stats.getActiveJobs(), + stats.getCompletedJobs()); + return ResponseEntity.ok(stats); + } + + /** + * Get statistics about the job queue (admin only) + * + * @return Queue statistics + */ + @GetMapping("/api/v1/admin/job/queue/stats") + @PreAuthorize("hasRole('ROLE_ADMIN')") + public ResponseEntity getQueueStats() { + Map queueStats = jobQueue.getQueueStats(); + log.info("Admin requested queue stats: {} queued jobs", queueStats.get("queuedJobs")); + return ResponseEntity.ok(queueStats); + } + + /** + * Manually trigger cleanup of old jobs (admin only) + * + * @return A response indicating how many jobs were cleaned up + */ + @PostMapping("/api/v1/admin/job/cleanup") + @PreAuthorize("hasRole('ROLE_ADMIN')") + public ResponseEntity cleanupOldJobs() { + int beforeCount = taskManager.getJobStats().getTotalJobs(); + taskManager.cleanupOldJobs(); + int afterCount = taskManager.getJobStats().getTotalJobs(); + int removedCount = beforeCount - afterCount; + + log.info( + "Admin triggered job cleanup: removed {} jobs, {} remaining", + removedCount, + afterCount); + + return ResponseEntity.ok( + Map.of( + "message", "Cleanup complete", + "removedJobs", removedCount, + "remainingJobs", afterCount)); + } +} diff --git a/stirling-pdf/src/main/java/stirling/software/common/controller/JobController.java b/stirling-pdf/src/main/java/stirling/software/common/controller/JobController.java new file mode 100644 index 000000000..510488a64 --- /dev/null +++ b/stirling-pdf/src/main/java/stirling/software/common/controller/JobController.java @@ -0,0 +1,173 @@ +package stirling.software.common.controller; + +import java.util.Map; + +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.DeleteMapping; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.RestController; + +import jakarta.servlet.http.HttpServletRequest; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +import stirling.software.common.model.job.JobResult; +import stirling.software.common.service.FileStorage; +import stirling.software.common.service.JobQueue; +import stirling.software.common.service.TaskManager; + +/** REST controller for job-related endpoints */ +@RestController +@RequiredArgsConstructor +@Slf4j +public class JobController { + + private final TaskManager taskManager; + private final FileStorage fileStorage; + private final JobQueue jobQueue; + private final HttpServletRequest request; + + /** + * Get the status of a job + * + * @param jobId The job ID + * @return The job result + */ + @GetMapping("/api/v1/general/job/{jobId}") + public ResponseEntity getJobStatus(@PathVariable("jobId") String jobId) { + JobResult result = taskManager.getJobResult(jobId); + if (result == null) { + return ResponseEntity.notFound().build(); + } + + // Check if the job is in the queue and add queue information + if (!result.isComplete() && jobQueue.isJobQueued(jobId)) { + int position = jobQueue.getJobPosition(jobId); + Map resultWithQueueInfo = + Map.of( + "jobResult", + result, + "queueInfo", + Map.of("inQueue", true, "position", position)); + return ResponseEntity.ok(resultWithQueueInfo); + } + + return ResponseEntity.ok(result); + } + + /** + * Get the result of a job + * + * @param jobId The job ID + * @return The job result + */ + @GetMapping("/api/v1/general/job/{jobId}/result") + public ResponseEntity getJobResult(@PathVariable("jobId") String jobId) { + JobResult result = taskManager.getJobResult(jobId); + if (result == null) { + return ResponseEntity.notFound().build(); + } + + if (!result.isComplete()) { + return ResponseEntity.badRequest().body("Job is not complete yet"); + } + + if (result.getError() != null) { + return ResponseEntity.badRequest().body("Job failed: " + result.getError()); + } + + if (result.getFileId() != null) { + try { + byte[] fileContent = fileStorage.retrieveBytes(result.getFileId()); + return ResponseEntity.ok() + .header("Content-Type", result.getContentType()) + .header( + "Content-Disposition", + "form-data; name=\"attachment\"; filename=\"" + + result.getOriginalFileName() + + "\"") + .body(fileContent); + } catch (Exception e) { + log.error("Error retrieving file for job {}: {}", jobId, e.getMessage(), e); + return ResponseEntity.internalServerError() + .body("Error retrieving file: " + e.getMessage()); + } + } + + return ResponseEntity.ok(result.getResult()); + } + + // Admin-only endpoints have been moved to AdminJobController in the proprietary package + + /** + * Cancel a job by its ID + * + *

This method should only allow cancellation of jobs that were created by the current user. + * The jobId should be part of the user's session or otherwise linked to their identity. + * + * @param jobId The job ID + * @return Response indicating whether the job was cancelled + */ + @DeleteMapping("/api/v1/general/job/{jobId}") + public ResponseEntity cancelJob(@PathVariable("jobId") String jobId) { + log.debug("Request to cancel job: {}", jobId); + + // Verify that this job belongs to the current user + // We can use the current request's session to validate ownership + Object sessionJobIds = request.getSession().getAttribute("userJobIds"); + if (sessionJobIds == null + || !(sessionJobIds instanceof java.util.Set) + || !((java.util.Set) sessionJobIds).contains(jobId)) { + // Either no jobs in session or jobId doesn't match user's jobs + log.warn("Unauthorized attempt to cancel job: {}", jobId); + return ResponseEntity.status(403) + .body(Map.of("message", "You are not authorized to cancel this job")); + } + + // First check if the job is in the queue + boolean cancelled = false; + int queuePosition = -1; + + if (jobQueue.isJobQueued(jobId)) { + queuePosition = jobQueue.getJobPosition(jobId); + cancelled = jobQueue.cancelJob(jobId); + log.info("Cancelled queued job: {} (was at position {})", jobId, queuePosition); + } + + // If not in queue or couldn't cancel, try to cancel in TaskManager + if (!cancelled) { + JobResult result = taskManager.getJobResult(jobId); + if (result != null && !result.isComplete()) { + // Mark as error with cancellation message + taskManager.setError(jobId, "Job was cancelled by user"); + cancelled = true; + log.info("Marked job as cancelled in TaskManager: {}", jobId); + } + } + + if (cancelled) { + return ResponseEntity.ok( + Map.of( + "message", + "Job cancelled successfully", + "wasQueued", + queuePosition >= 0, + "queuePosition", + queuePosition >= 0 ? queuePosition : "n/a")); + } else { + // Job not found or already complete + JobResult result = taskManager.getJobResult(jobId); + if (result == null) { + return ResponseEntity.notFound().build(); + } else if (result.isComplete()) { + return ResponseEntity.badRequest() + .body(Map.of("message", "Cannot cancel job that is already complete")); + } else { + return ResponseEntity.internalServerError() + .body(Map.of("message", "Failed to cancel job for unknown reason")); + } + } + } +} diff --git a/stirling-pdf/src/test/java/stirling/software/common/controller/JobControllerTest.java b/stirling-pdf/src/test/java/stirling/software/common/controller/JobControllerTest.java new file mode 100644 index 000000000..4ed005835 --- /dev/null +++ b/stirling-pdf/src/test/java/stirling/software/common/controller/JobControllerTest.java @@ -0,0 +1,406 @@ +package stirling.software.common.controller; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.Mockito.*; + +import java.util.Map; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import org.springframework.mock.web.MockHttpServletRequest; +import org.springframework.mock.web.MockHttpSession; +import jakarta.servlet.http.HttpServletRequest; +import jakarta.servlet.http.HttpSession; + +import stirling.software.common.model.job.JobResult; +import stirling.software.common.model.job.JobStats; +import stirling.software.common.service.FileStorage; +import stirling.software.common.service.JobQueue; +import stirling.software.common.service.TaskManager; + +class JobControllerTest { + + @Mock + private TaskManager taskManager; + + @Mock + private FileStorage fileStorage; + + @Mock + private JobQueue jobQueue; + + @Mock + private HttpServletRequest request; + + private MockHttpSession session; + + @InjectMocks + private JobController controller; + + @BeforeEach + void setUp() { + MockitoAnnotations.openMocks(this); + + // Setup mock session for tests + session = new MockHttpSession(); + when(request.getSession()).thenReturn(session); + } + + @Test + void testGetJobStatus_ExistingJob() { + // Arrange + String jobId = "test-job-id"; + JobResult mockResult = new JobResult(); + mockResult.setJobId(jobId); + when(taskManager.getJobResult(jobId)).thenReturn(mockResult); + + // Act + ResponseEntity response = controller.getJobStatus(jobId); + + // Assert + assertEquals(HttpStatus.OK, response.getStatusCode()); + assertEquals(mockResult, response.getBody()); + } + + @Test + void testGetJobStatus_ExistingJobInQueue() { + // Arrange + String jobId = "test-job-id"; + JobResult mockResult = new JobResult(); + mockResult.setJobId(jobId); + mockResult.setComplete(false); + when(taskManager.getJobResult(jobId)).thenReturn(mockResult); + when(jobQueue.isJobQueued(jobId)).thenReturn(true); + when(jobQueue.getJobPosition(jobId)).thenReturn(3); + + // Act + ResponseEntity response = controller.getJobStatus(jobId); + + // Assert + assertEquals(HttpStatus.OK, response.getStatusCode()); + + @SuppressWarnings("unchecked") + Map responseBody = (Map) response.getBody(); + assertEquals(mockResult, responseBody.get("jobResult")); + + @SuppressWarnings("unchecked") + Map queueInfo = (Map) responseBody.get("queueInfo"); + assertTrue((Boolean) queueInfo.get("inQueue")); + assertEquals(3, queueInfo.get("position")); + } + + @Test + void testGetJobStatus_NonExistentJob() { + // Arrange + String jobId = "non-existent-job"; + when(taskManager.getJobResult(jobId)).thenReturn(null); + + // Act + ResponseEntity response = controller.getJobStatus(jobId); + + // Assert + assertEquals(HttpStatus.NOT_FOUND, response.getStatusCode()); + } + + @Test + void testGetJobResult_CompletedSuccessfulWithObject() { + // Arrange + String jobId = "test-job-id"; + JobResult mockResult = new JobResult(); + mockResult.setJobId(jobId); + mockResult.setComplete(true); + String resultObject = "Test result"; + mockResult.completeWithResult(resultObject); + + when(taskManager.getJobResult(jobId)).thenReturn(mockResult); + + // Act + ResponseEntity response = controller.getJobResult(jobId); + + // Assert + assertEquals(HttpStatus.OK, response.getStatusCode()); + assertEquals(resultObject, response.getBody()); + } + + @Test + void testGetJobResult_CompletedSuccessfulWithFile() throws Exception { + // Arrange + String jobId = "test-job-id"; + String fileId = "file-id"; + String originalFileName = "test.pdf"; + String contentType = "application/pdf"; + byte[] fileContent = "Test file content".getBytes(); + + JobResult mockResult = new JobResult(); + mockResult.setJobId(jobId); + mockResult.completeWithFile(fileId, originalFileName, contentType); + + when(taskManager.getJobResult(jobId)).thenReturn(mockResult); + when(fileStorage.retrieveBytes(fileId)).thenReturn(fileContent); + + // Act + ResponseEntity response = controller.getJobResult(jobId); + + // Assert + assertEquals(HttpStatus.OK, response.getStatusCode()); + assertEquals(contentType, response.getHeaders().getFirst("Content-Type")); + assertTrue(response.getHeaders().getFirst("Content-Disposition").contains(originalFileName)); + assertEquals(fileContent, response.getBody()); + } + + @Test + void testGetJobResult_CompletedWithError() { + // Arrange + String jobId = "test-job-id"; + String errorMessage = "Test error"; + + JobResult mockResult = new JobResult(); + mockResult.setJobId(jobId); + mockResult.failWithError(errorMessage); + + when(taskManager.getJobResult(jobId)).thenReturn(mockResult); + + // Act + ResponseEntity response = controller.getJobResult(jobId); + + // Assert + assertEquals(HttpStatus.BAD_REQUEST, response.getStatusCode()); + assertTrue(response.getBody().toString().contains(errorMessage)); + } + + @Test + void testGetJobResult_IncompleteJob() { + // Arrange + String jobId = "test-job-id"; + + JobResult mockResult = new JobResult(); + mockResult.setJobId(jobId); + mockResult.setComplete(false); + + when(taskManager.getJobResult(jobId)).thenReturn(mockResult); + + // Act + ResponseEntity response = controller.getJobResult(jobId); + + // Assert + assertEquals(HttpStatus.BAD_REQUEST, response.getStatusCode()); + assertTrue(response.getBody().toString().contains("not complete")); + } + + @Test + void testGetJobResult_NonExistentJob() { + // Arrange + String jobId = "non-existent-job"; + when(taskManager.getJobResult(jobId)).thenReturn(null); + + // Act + ResponseEntity response = controller.getJobResult(jobId); + + // Assert + assertEquals(HttpStatus.NOT_FOUND, response.getStatusCode()); + } + + @Test + void testGetJobResult_ErrorRetrievingFile() throws Exception { + // Arrange + String jobId = "test-job-id"; + String fileId = "file-id"; + String originalFileName = "test.pdf"; + String contentType = "application/pdf"; + + JobResult mockResult = new JobResult(); + mockResult.setJobId(jobId); + mockResult.completeWithFile(fileId, originalFileName, contentType); + + when(taskManager.getJobResult(jobId)).thenReturn(mockResult); + when(fileStorage.retrieveBytes(fileId)).thenThrow(new RuntimeException("File not found")); + + // Act + ResponseEntity response = controller.getJobResult(jobId); + + // Assert + assertEquals(HttpStatus.INTERNAL_SERVER_ERROR, response.getStatusCode()); + assertTrue(response.getBody().toString().contains("Error retrieving file")); + } + + /* + * @Test void testGetJobStats() { // Arrange JobStats mockStats = + * JobStats.builder() .totalJobs(10) .activeJobs(3) .completedJobs(7) .build(); + * + * when(taskManager.getJobStats()).thenReturn(mockStats); + * + * // Act ResponseEntity response = controller.getJobStats(); + * + * // Assert assertEquals(HttpStatus.OK, response.getStatusCode()); + * assertEquals(mockStats, response.getBody()); } + * + * @Test void testCleanupOldJobs() { // Arrange when(taskManager.getJobStats()) + * .thenReturn(JobStats.builder().totalJobs(10).build()) + * .thenReturn(JobStats.builder().totalJobs(7).build()); + * + * // Act ResponseEntity response = controller.cleanupOldJobs(); + * + * // Assert assertEquals(HttpStatus.OK, response.getStatusCode()); + * + * @SuppressWarnings("unchecked") Map responseBody = + * (Map) response.getBody(); assertEquals("Cleanup complete", + * responseBody.get("message")); assertEquals(3, + * responseBody.get("removedJobs")); assertEquals(7, + * responseBody.get("remainingJobs")); + * + * verify(taskManager).cleanupOldJobs(); } + * + * @Test void testGetQueueStats() { // Arrange Map + * mockQueueStats = Map.of( "queuedJobs", 5, "queueCapacity", 10, + * "resourceStatus", "OK" ); + * + * when(jobQueue.getQueueStats()).thenReturn(mockQueueStats); + * + * // Act ResponseEntity response = controller.getQueueStats(); + * + * // Assert assertEquals(HttpStatus.OK, response.getStatusCode()); + * assertEquals(mockQueueStats, response.getBody()); + * verify(jobQueue).getQueueStats(); } + */ + @Test + void testCancelJob_InQueue() { + // Arrange + String jobId = "job-in-queue"; + + // Setup user session with job authorization + java.util.Set userJobIds = new java.util.HashSet<>(); + userJobIds.add(jobId); + session.setAttribute("userJobIds", userJobIds); + + when(jobQueue.isJobQueued(jobId)).thenReturn(true); + when(jobQueue.getJobPosition(jobId)).thenReturn(2); + when(jobQueue.cancelJob(jobId)).thenReturn(true); + + // Act + ResponseEntity response = controller.cancelJob(jobId); + + // Assert + assertEquals(HttpStatus.OK, response.getStatusCode()); + + @SuppressWarnings("unchecked") + Map responseBody = (Map) response.getBody(); + assertEquals("Job cancelled successfully", responseBody.get("message")); + assertTrue((Boolean) responseBody.get("wasQueued")); + assertEquals(2, responseBody.get("queuePosition")); + + verify(jobQueue).cancelJob(jobId); + verify(taskManager, never()).setError(anyString(), anyString()); + } + + @Test + void testCancelJob_Running() { + // Arrange + String jobId = "job-running"; + JobResult jobResult = new JobResult(); + jobResult.setJobId(jobId); + jobResult.setComplete(false); + + // Setup user session with job authorization + java.util.Set userJobIds = new java.util.HashSet<>(); + userJobIds.add(jobId); + session.setAttribute("userJobIds", userJobIds); + + when(jobQueue.isJobQueued(jobId)).thenReturn(false); + when(taskManager.getJobResult(jobId)).thenReturn(jobResult); + + // Act + ResponseEntity response = controller.cancelJob(jobId); + + // Assert + assertEquals(HttpStatus.OK, response.getStatusCode()); + + @SuppressWarnings("unchecked") + Map responseBody = (Map) response.getBody(); + assertEquals("Job cancelled successfully", responseBody.get("message")); + assertFalse((Boolean) responseBody.get("wasQueued")); + assertEquals("n/a", responseBody.get("queuePosition")); + + verify(jobQueue, never()).cancelJob(jobId); + verify(taskManager).setError(jobId, "Job was cancelled by user"); + } + + @Test + void testCancelJob_NotFound() { + // Arrange + String jobId = "non-existent-job"; + + // Setup user session with job authorization + java.util.Set userJobIds = new java.util.HashSet<>(); + userJobIds.add(jobId); + session.setAttribute("userJobIds", userJobIds); + + when(jobQueue.isJobQueued(jobId)).thenReturn(false); + when(taskManager.getJobResult(jobId)).thenReturn(null); + + // Act + ResponseEntity response = controller.cancelJob(jobId); + + // Assert + assertEquals(HttpStatus.NOT_FOUND, response.getStatusCode()); + } + + @Test + void testCancelJob_AlreadyComplete() { + // Arrange + String jobId = "completed-job"; + JobResult jobResult = new JobResult(); + jobResult.setJobId(jobId); + jobResult.setComplete(true); + + // Setup user session with job authorization + java.util.Set userJobIds = new java.util.HashSet<>(); + userJobIds.add(jobId); + session.setAttribute("userJobIds", userJobIds); + + when(jobQueue.isJobQueued(jobId)).thenReturn(false); + when(taskManager.getJobResult(jobId)).thenReturn(jobResult); + + // Act + ResponseEntity response = controller.cancelJob(jobId); + + // Assert + assertEquals(HttpStatus.BAD_REQUEST, response.getStatusCode()); + + @SuppressWarnings("unchecked") + Map responseBody = (Map) response.getBody(); + assertEquals("Cannot cancel job that is already complete", responseBody.get("message")); + } + + @Test + void testCancelJob_Unauthorized() { + // Arrange + String jobId = "unauthorized-job"; + + // Setup user session with other job IDs but not this one + java.util.Set userJobIds = new java.util.HashSet<>(); + userJobIds.add("other-job-1"); + userJobIds.add("other-job-2"); + session.setAttribute("userJobIds", userJobIds); + + // Act + ResponseEntity response = controller.cancelJob(jobId); + + // Assert + assertEquals(HttpStatus.FORBIDDEN, response.getStatusCode()); + + @SuppressWarnings("unchecked") + Map responseBody = (Map) response.getBody(); + assertEquals("You are not authorized to cancel this job", responseBody.get("message")); + + // Verify no cancellation attempts were made + verify(jobQueue, never()).isJobQueued(anyString()); + verify(jobQueue, never()).cancelJob(anyString()); + verify(taskManager, never()).getJobResult(anyString()); + verify(taskManager, never()).setError(anyString(), anyString()); + } +} \ No newline at end of file