From 2d1841367625ce61be894606e46f25352fc81ae7 Mon Sep 17 00:00:00 2001 From: Anthony Stirling <77850077+Frooodle@users.noreply.github.com.> Date: Thu, 19 Jun 2025 00:27:53 +0100 Subject: [PATCH] init --- common/build.gradle | 1 + .../annotations/AutoJobPostMapping.java | 47 ++ .../software/common/aop/AutoJobAspect.java | 231 +++++++++ .../software/common/model/api/PDFFile.java | 6 +- .../common/model/job/JobProgress.java | 15 + .../common/model/job/JobResponse.java | 14 + .../software/common/model/job/JobResult.java | 94 ++++ .../software/common/model/job/JobStats.java | 43 ++ .../common/service/FileOrUploadService.java | 78 +++ .../software/common/service/FileStorage.java | 152 ++++++ .../common/service/JobExecutorService.java | 452 ++++++++++++++++++ .../software/common/service/JobQueue.java | 400 ++++++++++++++++ .../common/service/ResourceMonitor.java | 277 +++++++++++ .../software/common/service/TaskManager.java | 274 +++++++++++ .../software/common/util/ExecutorFactory.java | 31 ++ .../common/controller/JobController.java | 114 +++++ 16 files changed, 2228 insertions(+), 1 deletion(-) create mode 100644 common/src/main/java/stirling/software/common/annotations/AutoJobPostMapping.java create mode 100644 common/src/main/java/stirling/software/common/aop/AutoJobAspect.java create mode 100644 common/src/main/java/stirling/software/common/model/job/JobProgress.java create mode 100644 common/src/main/java/stirling/software/common/model/job/JobResponse.java create mode 100644 common/src/main/java/stirling/software/common/model/job/JobResult.java create mode 100644 common/src/main/java/stirling/software/common/model/job/JobStats.java create mode 100644 common/src/main/java/stirling/software/common/service/FileOrUploadService.java create mode 100644 common/src/main/java/stirling/software/common/service/FileStorage.java create mode 100644 common/src/main/java/stirling/software/common/service/JobExecutorService.java create mode 100644 common/src/main/java/stirling/software/common/service/JobQueue.java create mode 100644 common/src/main/java/stirling/software/common/service/ResourceMonitor.java create mode 100644 common/src/main/java/stirling/software/common/service/TaskManager.java create mode 100644 common/src/main/java/stirling/software/common/util/ExecutorFactory.java create mode 100644 stirling-pdf/src/main/java/stirling/software/common/controller/JobController.java diff --git a/common/build.gradle b/common/build.gradle index bb8503de9..cdfc11b8f 100644 --- a/common/build.gradle +++ b/common/build.gradle @@ -28,4 +28,5 @@ dependencies { api 'org.snakeyaml:snakeyaml-engine:2.9' api "org.springdoc:springdoc-openapi-starter-webmvc-ui:2.8.9" api 'jakarta.mail:jakarta.mail-api:2.1.3' + api 'org.springframework.boot:spring-boot-starter-aop' } diff --git a/common/src/main/java/stirling/software/common/annotations/AutoJobPostMapping.java b/common/src/main/java/stirling/software/common/annotations/AutoJobPostMapping.java new file mode 100644 index 000000000..755f143aa --- /dev/null +++ b/common/src/main/java/stirling/software/common/annotations/AutoJobPostMapping.java @@ -0,0 +1,47 @@ +package stirling.software.common.annotations; + +import java.lang.annotation.*; + +import org.springframework.core.annotation.AliasFor; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestMethod; + +@Target(ElementType.METHOD) +@Retention(RetentionPolicy.RUNTIME) +@Documented +@RequestMapping(method = RequestMethod.POST) +public @interface AutoJobPostMapping { + @AliasFor(annotation = RequestMapping.class, attribute = "value") + String[] value() default {}; + + @AliasFor(annotation = RequestMapping.class, attribute = "consumes") + String[] consumes() default {"multipart/form-data"}; + + /** + * Custom timeout in milliseconds for this specific job. If not specified, the default system + * timeout will be used. + */ + long timeout() default -1; + + /** Maximum number of times to retry the job on failure. Default is 1 (no retries). */ + int retryCount() default 1; + + /** + * Whether to track and report progress for this job. If enabled, the job will send progress + * updates through WebSocket. + */ + boolean trackProgress() default true; + + /** + * Whether this job can be queued when system resources are limited. If enabled, jobs will be + * queued instead of rejected when the system is under high load. The queue size is dynamically + * adjusted based on available memory and CPU resources. + */ + boolean queueable() default false; + + /** + * Optional resource weight of this job (1-100). Higher values indicate more resource-intensive + * jobs that may need stricter queuing. Default is 50 (medium weight). + */ + int resourceWeight() default 50; +} diff --git a/common/src/main/java/stirling/software/common/aop/AutoJobAspect.java b/common/src/main/java/stirling/software/common/aop/AutoJobAspect.java new file mode 100644 index 000000000..4bcad47b9 --- /dev/null +++ b/common/src/main/java/stirling/software/common/aop/AutoJobAspect.java @@ -0,0 +1,231 @@ +package stirling.software.common.aop; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import org.aspectj.lang.ProceedingJoinPoint; +import org.aspectj.lang.annotation.*; +import org.springframework.stereotype.Component; +import org.springframework.web.multipart.MultipartFile; + +import jakarta.servlet.http.HttpServletRequest; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +import stirling.software.common.annotations.AutoJobPostMapping; +import stirling.software.common.model.api.PDFFile; +import stirling.software.common.service.FileOrUploadService; +import stirling.software.common.service.FileStorage; +import stirling.software.common.service.JobExecutorService; + +@Aspect +@Component +@RequiredArgsConstructor +@Slf4j +public class AutoJobAspect { + + private final JobExecutorService jobExecutorService; + private final HttpServletRequest request; + private final FileOrUploadService fileOrUploadService; + private final FileStorage fileStorage; + + @Around("@annotation(autoJobPostMapping)") + public Object wrapWithJobExecution( + ProceedingJoinPoint joinPoint, AutoJobPostMapping autoJobPostMapping) { + // Extract parameters from the request and annotation + boolean async = Boolean.parseBoolean(request.getParameter("async")); + long timeout = autoJobPostMapping.timeout(); + int retryCount = autoJobPostMapping.retryCount(); + boolean trackProgress = autoJobPostMapping.trackProgress(); + + log.debug( + "AutoJobPostMapping execution with async={}, timeout={}, retryCount={}, trackProgress={}", + async, + timeout > 0 ? timeout : "default", + retryCount, + trackProgress); + + // Inspect and possibly mutate arguments + Object[] args = joinPoint.getArgs(); + + for (int i = 0; i < args.length; i++) { + Object arg = args[i]; + + if (arg instanceof PDFFile pdfFile) { + // Case 1: fileId is provided but no fileInput + if (pdfFile.getFileInput() == null && pdfFile.getFileId() != null) { + try { + log.debug("Using fileId {} to get file content", pdfFile.getFileId()); + MultipartFile file = fileStorage.retrieveFile(pdfFile.getFileId()); + pdfFile.setFileInput(file); + } catch (Exception e) { + throw new RuntimeException( + "Failed to resolve file by ID: " + pdfFile.getFileId(), e); + } + } + // Case 2: For async requests, we need to make a copy of the MultipartFile + else if (async && pdfFile.getFileInput() != null) { + try { + log.debug("Making persistent copy of uploaded file for async processing"); + MultipartFile originalFile = pdfFile.getFileInput(); + String fileId = fileStorage.storeFile(originalFile); + + // Store the fileId for later reference + pdfFile.setFileId(fileId); + + // Replace the original MultipartFile with our persistent copy + MultipartFile persistentFile = fileStorage.retrieveFile(fileId); + pdfFile.setFileInput(persistentFile); + + log.debug("Created persistent file copy with fileId: {}", fileId); + } catch (IOException e) { + throw new RuntimeException( + "Failed to create persistent copy of uploaded file", e); + } + } + } + } + + // Extract queueable and resourceWeight parameters + boolean queueable = autoJobPostMapping.queueable(); + int resourceWeight = autoJobPostMapping.resourceWeight(); + + // Integrate with the JobExecutorService + if (retryCount <= 1) { + // No retries needed, simple execution + return jobExecutorService.runJobGeneric( + async, + () -> { + try { + // Note: Progress tracking is handled in TaskManager/JobExecutorService + // The trackProgress flag controls whether detailed progress is stored + // for REST API queries, not WebSocket notifications + return joinPoint.proceed(args); + } catch (Throwable ex) { + log.error( + "AutoJobAspect caught exception during job execution: {}", + ex.getMessage(), + ex); + throw new RuntimeException(ex); + } + }, + timeout, + queueable, + resourceWeight); + } else { + // Use retry logic + return executeWithRetries( + joinPoint, + args, + async, + timeout, + retryCount, + trackProgress, + queueable, + resourceWeight); + } + } + + private Object executeWithRetries( + ProceedingJoinPoint joinPoint, + Object[] args, + boolean async, + long timeout, + int maxRetries, + boolean trackProgress, + boolean queueable, + int resourceWeight) { + + AtomicInteger attempts = new AtomicInteger(0); + // Keep jobId reference for progress tracking in TaskManager + AtomicReference jobIdRef = new AtomicReference<>(); + + return jobExecutorService.runJobGeneric( + async, + () -> { + int currentAttempt = attempts.incrementAndGet(); + try { + if (trackProgress && async) { + // Get jobId for progress tracking in TaskManager + // This enables REST API progress queries, not WebSocket + if (jobIdRef.get() == null) { + jobIdRef.set(getJobIdFromContext()); + } + String jobId = jobIdRef.get(); + if (jobId != null) { + log.debug( + "Tracking progress for job {} (attempt {}/{})", + jobId, + currentAttempt, + maxRetries); + // Progress is tracked in TaskManager for REST API access + // No WebSocket notifications sent here + } + } + + return joinPoint.proceed(args); + } catch (Throwable ex) { + log.error( + "AutoJobAspect caught exception during job execution (attempt {}/{}): {}", + currentAttempt, + Math.max(1, maxRetries), + ex.getMessage(), + ex); + + // Check if we should retry + if (currentAttempt < maxRetries) { + log.info( + "Retrying operation, attempt {}/{}", + currentAttempt + 1, + maxRetries); + + if (trackProgress && async) { + String jobId = jobIdRef.get(); + if (jobId != null) { + log.debug( + "Recording retry attempt for job {} in TaskManager", + jobId); + // Retry info is tracked in TaskManager for REST API access + } + } + + try { + // Simple exponential backoff + Thread.sleep(100 * currentAttempt); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } + + // Recursive call to retry + return executeWithRetries( + joinPoint, + args, + async, + timeout, + maxRetries, + trackProgress, + queueable, + resourceWeight); + } + + // No more retries, throw the exception + throw new RuntimeException("Job failed: " + ex.getMessage(), ex); + } + }, + timeout, + queueable, + resourceWeight); + } + + // Get the job ID from the context for progress tracking in TaskManager + private String getJobIdFromContext() { + try { + return (String) request.getAttribute("jobId"); + } catch (Exception e) { + log.debug("Could not retrieve job ID from context: {}", e.getMessage()); + return null; + } + } +} diff --git a/common/src/main/java/stirling/software/common/model/api/PDFFile.java b/common/src/main/java/stirling/software/common/model/api/PDFFile.java index 8ea3f0456..cc564f81e 100644 --- a/common/src/main/java/stirling/software/common/model/api/PDFFile.java +++ b/common/src/main/java/stirling/software/common/model/api/PDFFile.java @@ -14,8 +14,12 @@ import lombok.NoArgsConstructor; public class PDFFile { @Schema( description = "The input PDF file", - requiredMode = Schema.RequiredMode.REQUIRED, contentMediaType = "application/pdf", format = "binary") private MultipartFile fileInput; + + @Schema( + description = "File ID for server-side files (can be used instead of fileInput)", + example = "a1b2c3d4-5678-90ab-cdef-ghijklmnopqr") + private String fileId; } diff --git a/common/src/main/java/stirling/software/common/model/job/JobProgress.java b/common/src/main/java/stirling/software/common/model/job/JobProgress.java new file mode 100644 index 000000000..e8cbdb6ca --- /dev/null +++ b/common/src/main/java/stirling/software/common/model/job/JobProgress.java @@ -0,0 +1,15 @@ +package stirling.software.common.model.job; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@NoArgsConstructor +@AllArgsConstructor +public class JobProgress { + private String jobId; + private String status; + private int percentComplete; + private String message; +} diff --git a/common/src/main/java/stirling/software/common/model/job/JobResponse.java b/common/src/main/java/stirling/software/common/model/job/JobResponse.java new file mode 100644 index 000000000..bd98955f0 --- /dev/null +++ b/common/src/main/java/stirling/software/common/model/job/JobResponse.java @@ -0,0 +1,14 @@ +package stirling.software.common.model.job; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@NoArgsConstructor +@AllArgsConstructor +public class JobResponse { + private boolean async; + private String jobId; + private T result; +} diff --git a/common/src/main/java/stirling/software/common/model/job/JobResult.java b/common/src/main/java/stirling/software/common/model/job/JobResult.java new file mode 100644 index 000000000..214e92c1a --- /dev/null +++ b/common/src/main/java/stirling/software/common/model/job/JobResult.java @@ -0,0 +1,94 @@ +package stirling.software.common.model.job; + +import java.time.LocalDateTime; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** Represents the result of a job execution. Used by the TaskManager to store job results. */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class JobResult { + + /** The job ID */ + private String jobId; + + /** Flag indicating if the job is complete */ + private boolean complete; + + /** Error message if the job failed */ + private String error; + + /** The file ID of the result file, if applicable */ + private String fileId; + + /** Original file name, if applicable */ + private String originalFileName; + + /** MIME type of the result, if applicable */ + private String contentType; + + /** Time when the job was created */ + private LocalDateTime createdAt; + + /** Time when the job was completed */ + private LocalDateTime completedAt; + + /** The actual result object, if not a file */ + private Object result; + + /** + * Create a new JobResult with the given job ID + * + * @param jobId The job ID + * @return A new JobResult + */ + public static JobResult createNew(String jobId) { + return JobResult.builder() + .jobId(jobId) + .complete(false) + .createdAt(LocalDateTime.now()) + .build(); + } + + /** + * Mark this job as complete with a file result + * + * @param fileId The file ID of the result + * @param originalFileName The original file name + * @param contentType The content type of the file + */ + public void completeWithFile(String fileId, String originalFileName, String contentType) { + this.complete = true; + this.fileId = fileId; + this.originalFileName = originalFileName; + this.contentType = contentType; + this.completedAt = LocalDateTime.now(); + } + + /** + * Mark this job as complete with a general result + * + * @param result The result object + */ + public void completeWithResult(Object result) { + this.complete = true; + this.result = result; + this.completedAt = LocalDateTime.now(); + } + + /** + * Mark this job as failed with an error message + * + * @param error The error message + */ + public void failWithError(String error) { + this.complete = true; + this.error = error; + this.completedAt = LocalDateTime.now(); + } +} diff --git a/common/src/main/java/stirling/software/common/model/job/JobStats.java b/common/src/main/java/stirling/software/common/model/job/JobStats.java new file mode 100644 index 000000000..d336b95d4 --- /dev/null +++ b/common/src/main/java/stirling/software/common/model/job/JobStats.java @@ -0,0 +1,43 @@ +package stirling.software.common.model.job; + +import java.time.LocalDateTime; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** Represents statistics about jobs in the system */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class JobStats { + + /** Total number of jobs (active and completed) */ + private int totalJobs; + + /** Number of active (incomplete) jobs */ + private int activeJobs; + + /** Number of completed jobs */ + private int completedJobs; + + /** Number of failed jobs */ + private int failedJobs; + + /** Number of successful jobs */ + private int successfulJobs; + + /** Number of jobs with file results */ + private int fileResultJobs; + + /** The oldest active job's creation timestamp */ + private LocalDateTime oldestActiveJobTime; + + /** The newest active job's creation timestamp */ + private LocalDateTime newestActiveJobTime; + + /** The average processing time for completed jobs in milliseconds */ + private long averageProcessingTimeMs; +} diff --git a/common/src/main/java/stirling/software/common/service/FileOrUploadService.java b/common/src/main/java/stirling/software/common/service/FileOrUploadService.java new file mode 100644 index 000000000..0b72d3dc8 --- /dev/null +++ b/common/src/main/java/stirling/software/common/service/FileOrUploadService.java @@ -0,0 +1,78 @@ +package stirling.software.common.service; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.nio.file.*; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; +import org.springframework.web.multipart.MultipartFile; + +import lombok.RequiredArgsConstructor; + +@Service +@RequiredArgsConstructor +public class FileOrUploadService { + + @Value("${stirling.tempDir:/tmp/stirling-files}") + private String tempDirPath; + + public Path resolveFilePath(String fileId) { + return Path.of(tempDirPath).resolve(fileId); + } + + public MultipartFile toMockMultipartFile(String name, byte[] data) throws IOException { + return new CustomMultipartFile(name, data); + } + + // Custom implementation of MultipartFile + private static class CustomMultipartFile implements MultipartFile { + private final String name; + private final byte[] content; + + public CustomMultipartFile(String name, byte[] content) { + this.name = name; + this.content = content; + } + + @Override + public String getName() { + return name; + } + + @Override + public String getOriginalFilename() { + return name; + } + + @Override + public String getContentType() { + return "application/pdf"; + } + + @Override + public boolean isEmpty() { + return content == null || content.length == 0; + } + + @Override + public long getSize() { + return content.length; + } + + @Override + public byte[] getBytes() throws IOException { + return content; + } + + @Override + public java.io.InputStream getInputStream() throws IOException { + return new ByteArrayInputStream(content); + } + + @Override + public void transferTo(java.io.File dest) throws IOException, IllegalStateException { + Files.write(dest.toPath(), content); + } + } +} diff --git a/common/src/main/java/stirling/software/common/service/FileStorage.java b/common/src/main/java/stirling/software/common/service/FileStorage.java new file mode 100644 index 000000000..e200ded8a --- /dev/null +++ b/common/src/main/java/stirling/software/common/service/FileStorage.java @@ -0,0 +1,152 @@ +package stirling.software.common.service; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.UUID; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; +import org.springframework.web.multipart.MultipartFile; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +/** + * Service for storing and retrieving files with unique file IDs. Used by the AutoJobPostMapping + * system to handle file references. + */ +@Service +@RequiredArgsConstructor +@Slf4j +public class FileStorage { + + @Value("${stirling.tempDir:/tmp/stirling-files}") + private String tempDirPath; + + private final FileOrUploadService fileOrUploadService; + + /** + * Store a file and return its unique ID + * + * @param file The file to store + * @return The unique ID assigned to the file + * @throws IOException If there is an error storing the file + */ + public String storeFile(MultipartFile file) throws IOException { + String fileId = generateFileId(); + Path filePath = getFilePath(fileId); + + // Ensure the directory exists + Files.createDirectories(filePath.getParent()); + + // Transfer the file to the storage location + file.transferTo(filePath.toFile()); + + log.debug("Stored file with ID: {}", fileId); + return fileId; + } + + /** + * Store a byte array as a file and return its unique ID + * + * @param bytes The byte array to store + * @param originalName The original name of the file (for extension) + * @return The unique ID assigned to the file + * @throws IOException If there is an error storing the file + */ + public String storeBytes(byte[] bytes, String originalName) throws IOException { + String fileId = generateFileId(); + Path filePath = getFilePath(fileId); + + // Ensure the directory exists + Files.createDirectories(filePath.getParent()); + + // Write the bytes to the file + Files.write(filePath, bytes); + + log.debug("Stored byte array with ID: {}", fileId); + return fileId; + } + + /** + * Retrieve a file by its ID as a MultipartFile + * + * @param fileId The ID of the file to retrieve + * @return The file as a MultipartFile + * @throws IOException If the file doesn't exist or can't be read + */ + public MultipartFile retrieveFile(String fileId) throws IOException { + Path filePath = getFilePath(fileId); + + if (!Files.exists(filePath)) { + throw new IOException("File not found with ID: " + fileId); + } + + byte[] fileData = Files.readAllBytes(filePath); + return fileOrUploadService.toMockMultipartFile(fileId, fileData); + } + + /** + * Retrieve a file by its ID as a byte array + * + * @param fileId The ID of the file to retrieve + * @return The file as a byte array + * @throws IOException If the file doesn't exist or can't be read + */ + public byte[] retrieveBytes(String fileId) throws IOException { + Path filePath = getFilePath(fileId); + + if (!Files.exists(filePath)) { + throw new IOException("File not found with ID: " + fileId); + } + + return Files.readAllBytes(filePath); + } + + /** + * Delete a file by its ID + * + * @param fileId The ID of the file to delete + * @return true if the file was deleted, false otherwise + */ + public boolean deleteFile(String fileId) { + try { + Path filePath = getFilePath(fileId); + return Files.deleteIfExists(filePath); + } catch (IOException e) { + log.error("Error deleting file with ID: {}", fileId, e); + return false; + } + } + + /** + * Check if a file exists by its ID + * + * @param fileId The ID of the file to check + * @return true if the file exists, false otherwise + */ + public boolean fileExists(String fileId) { + Path filePath = getFilePath(fileId); + return Files.exists(filePath); + } + + /** + * Get the path for a file ID + * + * @param fileId The ID of the file + * @return The path to the file + */ + private Path getFilePath(String fileId) { + return Path.of(tempDirPath).resolve(fileId); + } + + /** + * Generate a unique file ID + * + * @return A unique file ID + */ + private String generateFileId() { + return UUID.randomUUID().toString(); + } +} diff --git a/common/src/main/java/stirling/software/common/service/JobExecutorService.java b/common/src/main/java/stirling/software/common/service/JobExecutorService.java new file mode 100644 index 000000000..f6f7db929 --- /dev/null +++ b/common/src/main/java/stirling/software/common/service/JobExecutorService.java @@ -0,0 +1,452 @@ +package stirling.software.common.service; + +import java.io.IOException; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.Supplier; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.http.HttpHeaders; +import org.springframework.http.MediaType; +import org.springframework.http.ResponseEntity; +import org.springframework.stereotype.Service; +import org.springframework.web.multipart.MultipartFile; + +import jakarta.servlet.http.HttpServletRequest; + +import lombok.extern.slf4j.Slf4j; + +import stirling.software.common.model.job.JobResponse; +import stirling.software.common.util.ExecutorFactory; + +/** Service for executing jobs asynchronously or synchronously */ +@Service +@Slf4j +public class JobExecutorService { + + private final TaskManager taskManager; + private final FileStorage fileStorage; + private final HttpServletRequest request; + private final ResourceMonitor resourceMonitor; + private final JobQueue jobQueue; + private final ExecutorService executor = ExecutorFactory.newVirtualOrCachedThreadExecutor(); + private final long effectiveTimeoutMs; + + public JobExecutorService( + TaskManager taskManager, + FileStorage fileStorage, + HttpServletRequest request, + ResourceMonitor resourceMonitor, + JobQueue jobQueue, + @Value("${spring.mvc.async.request-timeout:1200000}") long asyncRequestTimeoutMs, + @Value("${server.servlet.session.timeout:30m}") String sessionTimeout) { + this.taskManager = taskManager; + this.fileStorage = fileStorage; + this.request = request; + this.resourceMonitor = resourceMonitor; + this.jobQueue = jobQueue; + + // Parse session timeout and calculate effective timeout once during initialization + long sessionTimeoutMs = parseSessionTimeout(sessionTimeout); + this.effectiveTimeoutMs = Math.min(asyncRequestTimeoutMs, sessionTimeoutMs); + log.debug( + "Job executor configured with effective timeout of {} ms", this.effectiveTimeoutMs); + } + + /** + * Run a job either asynchronously or synchronously + * + * @param async Whether to run the job asynchronously + * @param work The work to be done + * @return The response + */ + public ResponseEntity runJobGeneric(boolean async, Supplier work) { + return runJobGeneric(async, work, -1); + } + + /** + * Run a job either asynchronously or synchronously with a custom timeout + * + * @param async Whether to run the job asynchronously + * @param work The work to be done + * @param customTimeoutMs Custom timeout in milliseconds, or -1 to use the default + * @return The response + */ + public ResponseEntity runJobGeneric( + boolean async, Supplier work, long customTimeoutMs) { + return runJobGeneric(async, work, customTimeoutMs, false, 50); + } + + /** + * Run a job either asynchronously or synchronously with custom parameters + * + * @param async Whether to run the job asynchronously + * @param work The work to be done + * @param customTimeoutMs Custom timeout in milliseconds, or -1 to use the default + * @param queueable Whether this job can be queued when system resources are limited + * @param resourceWeight The resource weight of this job (1-100) + * @return The response + */ + public ResponseEntity runJobGeneric( + boolean async, + Supplier work, + long customTimeoutMs, + boolean queueable, + int resourceWeight) { + String jobId = UUID.randomUUID().toString(); + + // Store the job ID in the request for potential use by other components + if (request != null) { + request.setAttribute("jobId", jobId); + } + + // Determine which timeout to use + long timeoutToUse = customTimeoutMs > 0 ? customTimeoutMs : effectiveTimeoutMs; + + log.debug( + "Running job with ID: {}, async: {}, timeout: {}ms, queueable: {}, weight: {}", + jobId, + async, + timeoutToUse, + queueable, + resourceWeight); + + // Check if we need to queue this job based on resource availability + boolean shouldQueue = + queueable + && async + && // Only async jobs can be queued + resourceMonitor.shouldQueueJob(resourceWeight); + + if (shouldQueue) { + // Queue the job instead of executing immediately + log.debug( + "Queueing job {} due to resource constraints (weight: {})", + jobId, + resourceWeight); + + taskManager.createTask(jobId); + + // Create a specialized wrapper that updates the TaskManager + Supplier wrappedWork = + () -> { + try { + Object result = work.get(); + processJobResult(jobId, result); + return result; + } catch (Exception e) { + log.error( + "Error executing queued job {}: {}", jobId, e.getMessage(), e); + taskManager.setError(jobId, e.getMessage()); + throw e; + } + }; + + // Queue the job and get the future + CompletableFuture> future = + jobQueue.queueJob(jobId, resourceWeight, wrappedWork, timeoutToUse); + + // Return immediately with job ID + return ResponseEntity.ok().body(new JobResponse<>(true, jobId, null)); + } else if (async) { + taskManager.createTask(jobId); + executor.execute( + () -> { + try { + log.debug( + "Running async job {} with timeout {} ms", jobId, timeoutToUse); + + // Execute with timeout + Object result = executeWithTimeout(() -> work.get(), timeoutToUse); + processJobResult(jobId, result); + } catch (TimeoutException te) { + log.error("Job {} timed out after {} ms", jobId, timeoutToUse); + taskManager.setError(jobId, "Job timed out"); + } catch (Exception e) { + log.error("Error executing job {}: {}", jobId, e.getMessage(), e); + taskManager.setError(jobId, e.getMessage()); + } + }); + + return ResponseEntity.ok().body(new JobResponse<>(true, jobId, null)); + } else { + try { + log.debug("Running sync job with timeout {} ms", timeoutToUse); + + // Execute with timeout + Object result = executeWithTimeout(() -> work.get(), timeoutToUse); + + // If the result is already a ResponseEntity, return it directly + if (result instanceof ResponseEntity) { + return (ResponseEntity) result; + } + + // Process different result types + return handleResultForSyncJob(result); + } catch (TimeoutException te) { + log.error("Synchronous job timed out after {} ms", timeoutToUse); + return ResponseEntity.internalServerError() + .body(Map.of("error", "Job timed out after " + timeoutToUse + " ms")); + } catch (Exception e) { + log.error("Error executing synchronous job: {}", e.getMessage(), e); + // Construct a JSON error response + return ResponseEntity.internalServerError() + .body(Map.of("error", "Job failed: " + e.getMessage())); + } + } + } + + /** + * Process the result of an asynchronous job + * + * @param jobId The job ID + * @param result The result + */ + private void processJobResult(String jobId, Object result) { + try { + if (result instanceof byte[]) { + // Store byte array as a file + String fileId = fileStorage.storeBytes((byte[]) result, "result.pdf"); + taskManager.setFileResult(jobId, fileId, "result.pdf", "application/pdf"); + log.debug("Stored byte[] result with fileId: {}", fileId); + } else if (result instanceof ResponseEntity) { + ResponseEntity response = (ResponseEntity) result; + Object body = response.getBody(); + + if (body instanceof byte[]) { + // Extract filename from content-disposition header if available + String filename = "result.pdf"; + String contentType = "application/pdf"; + + if (response.getHeaders().getContentDisposition() != null) { + String disposition = + response.getHeaders().getContentDisposition().toString(); + if (disposition.contains("filename=")) { + filename = + disposition.substring( + disposition.indexOf("filename=") + 9, + disposition.lastIndexOf("\"")); + } + } + + if (response.getHeaders().getContentType() != null) { + contentType = response.getHeaders().getContentType().toString(); + } + + String fileId = fileStorage.storeBytes((byte[]) body, filename); + taskManager.setFileResult(jobId, fileId, filename, contentType); + log.debug("Stored ResponseEntity result with fileId: {}", fileId); + } else { + // Check if the response body contains a fileId + if (body != null && body.toString().contains("fileId")) { + try { + // Try to extract fileId using reflection + java.lang.reflect.Method getFileId = + body.getClass().getMethod("getFileId"); + String fileId = (String) getFileId.invoke(body); + + if (fileId != null && !fileId.isEmpty()) { + // Try to get filename and content type + String filename = "result.pdf"; + String contentType = "application/pdf"; + + try { + java.lang.reflect.Method getOriginalFileName = + body.getClass().getMethod("getOriginalFilename"); + String origName = (String) getOriginalFileName.invoke(body); + if (origName != null && !origName.isEmpty()) { + filename = origName; + } + } catch (Exception e) { + log.debug( + "Could not get original filename: {}", e.getMessage()); + } + + try { + java.lang.reflect.Method getContentType = + body.getClass().getMethod("getContentType"); + String ct = (String) getContentType.invoke(body); + if (ct != null && !ct.isEmpty()) { + contentType = ct; + } + } catch (Exception e) { + log.debug("Could not get content type: {}", e.getMessage()); + } + + taskManager.setFileResult(jobId, fileId, filename, contentType); + log.debug("Extracted fileId from response body: {}", fileId); + + taskManager.setComplete(jobId); + return; + } + } catch (Exception e) { + log.debug( + "Failed to extract fileId from response body: {}", + e.getMessage()); + } + } + + // Store generic result + taskManager.setResult(jobId, body); + } + } else if (result instanceof MultipartFile) { + MultipartFile file = (MultipartFile) result; + String fileId = fileStorage.storeFile(file); + taskManager.setFileResult( + jobId, fileId, file.getOriginalFilename(), file.getContentType()); + log.debug("Stored MultipartFile result with fileId: {}", fileId); + } else { + // Check if result has a fileId field + if (result != null) { + try { + // Try to extract fileId using reflection + java.lang.reflect.Method getFileId = + result.getClass().getMethod("getFileId"); + String fileId = (String) getFileId.invoke(result); + + if (fileId != null && !fileId.isEmpty()) { + // Try to get filename and content type + String filename = "result.pdf"; + String contentType = "application/pdf"; + + try { + java.lang.reflect.Method getOriginalFileName = + result.getClass().getMethod("getOriginalFilename"); + String origName = (String) getOriginalFileName.invoke(result); + if (origName != null && !origName.isEmpty()) { + filename = origName; + } + } catch (Exception e) { + log.debug("Could not get original filename: {}", e.getMessage()); + } + + try { + java.lang.reflect.Method getContentType = + result.getClass().getMethod("getContentType"); + String ct = (String) getContentType.invoke(result); + if (ct != null && !ct.isEmpty()) { + contentType = ct; + } + } catch (Exception e) { + log.debug("Could not get content type: {}", e.getMessage()); + } + + taskManager.setFileResult(jobId, fileId, filename, contentType); + log.debug("Extracted fileId from result object: {}", fileId); + + taskManager.setComplete(jobId); + return; + } + } catch (Exception e) { + log.debug( + "Failed to extract fileId from result object: {}", e.getMessage()); + } + } + + // Default case: store the result as is + taskManager.setResult(jobId, result); + } + + taskManager.setComplete(jobId); + } catch (Exception e) { + log.error("Error processing job result: {}", e.getMessage(), e); + taskManager.setError(jobId, "Error processing result: " + e.getMessage()); + } + } + + /** + * Handle different result types for synchronous jobs + * + * @param result The result object + * @return The appropriate ResponseEntity + * @throws IOException If there is an error processing the result + */ + private ResponseEntity handleResultForSyncJob(Object result) throws IOException { + if (result instanceof byte[]) { + // Return byte array as PDF + return ResponseEntity.ok() + .contentType(MediaType.APPLICATION_PDF) + .header( + HttpHeaders.CONTENT_DISPOSITION, + "form-data; name=\"attachment\"; filename=\"result.pdf\"") + .body(result); + } else if (result instanceof MultipartFile) { + // Return MultipartFile content + MultipartFile file = (MultipartFile) result; + return ResponseEntity.ok() + .contentType(MediaType.parseMediaType(file.getContentType())) + .header( + HttpHeaders.CONTENT_DISPOSITION, + "form-data; name=\"attachment\"; filename=\"" + + file.getOriginalFilename() + + "\"") + .body(file.getBytes()); + } else { + // Default case: return as JSON + return ResponseEntity.ok(result); + } + } + + /** + * Parse session timeout string (e.g., "30m", "1h") to milliseconds + * + * @param timeout The timeout string + * @return The timeout in milliseconds + */ + private long parseSessionTimeout(String timeout) { + if (timeout == null || timeout.isEmpty()) { + return 30 * 60 * 1000; // Default: 30 minutes + } + + try { + String value = timeout.replaceAll("[^\\d.]", ""); + String unit = timeout.replaceAll("[\\d.]", ""); + + double numericValue = Double.parseDouble(value); + + return switch (unit.toLowerCase()) { + case "s" -> (long) (numericValue * 1000); + case "m" -> (long) (numericValue * 60 * 1000); + case "h" -> (long) (numericValue * 60 * 60 * 1000); + case "d" -> (long) (numericValue * 24 * 60 * 60 * 1000); + default -> (long) (numericValue * 60 * 1000); // Default to minutes + }; + } catch (Exception e) { + log.warn("Could not parse session timeout '{}', using default", timeout); + return 30 * 60 * 1000; // Default: 30 minutes + } + } + + /** + * Execute a supplier with a timeout + * + * @param supplier The supplier to execute + * @param timeoutMs The timeout in milliseconds + * @return The result from the supplier + * @throws TimeoutException If the execution times out + * @throws Exception If the supplier throws an exception + */ + private T executeWithTimeout(Supplier supplier, long timeoutMs) + throws TimeoutException, Exception { + java.util.concurrent.CompletableFuture future = + java.util.concurrent.CompletableFuture.supplyAsync(supplier); + + try { + return future.get(timeoutMs, TimeUnit.MILLISECONDS); + } catch (java.util.concurrent.TimeoutException e) { + future.cancel(true); + throw new TimeoutException("Execution timed out after " + timeoutMs + " ms"); + } catch (java.util.concurrent.ExecutionException e) { + throw (Exception) e.getCause(); + } catch (java.util.concurrent.CancellationException e) { + throw new Exception("Execution was cancelled", e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new Exception("Execution was interrupted", e); + } + } +} diff --git a/common/src/main/java/stirling/software/common/service/JobQueue.java b/common/src/main/java/stirling/software/common/service/JobQueue.java new file mode 100644 index 000000000..af3a494c3 --- /dev/null +++ b/common/src/main/java/stirling/software/common/service/JobQueue.java @@ -0,0 +1,400 @@ +package stirling.software.common.service; + +import java.time.Instant; +import java.util.Map; +import java.util.concurrent.*; +import java.util.function.Supplier; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.http.ResponseEntity; +import org.springframework.stereotype.Service; + +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + +import stirling.software.common.util.ExecutorFactory; + +/** + * Manages a queue of jobs with dynamic sizing based on system resources. Used when system resources + * are limited to prevent overloading. + */ +@Service +@Slf4j +public class JobQueue { + + private final ResourceMonitor resourceMonitor; + + @Value("${stirling.job.queue.base-capacity:10}") + private int baseQueueCapacity = 10; + + @Value("${stirling.job.queue.min-capacity:2}") + private int minQueueCapacity = 2; + + @Value("${stirling.job.queue.check-interval-ms:1000}") + private long queueCheckIntervalMs = 1000; + + @Value("${stirling.job.queue.max-wait-time-ms:600000}") + private long maxWaitTimeMs = 600000; // 10 minutes + + private BlockingQueue jobQueue; + private final Map jobMap = new ConcurrentHashMap<>(); + private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); + private final ExecutorService jobExecutor = ExecutorFactory.newVirtualOrCachedThreadExecutor(); + + private boolean shuttingDown = false; + + @Getter private int rejectedJobs = 0; + + @Getter private int totalQueuedJobs = 0; + + @Getter private int currentQueueSize = 0; + + /** Represents a job waiting in the queue. */ + @Data + @AllArgsConstructor + private static class QueuedJob { + private final String jobId; + private final int resourceWeight; + private final Supplier work; + private final long timeoutMs; + private final Instant queuedAt; + private CompletableFuture> future; + private volatile boolean cancelled = false; + } + + public JobQueue(ResourceMonitor resourceMonitor) { + this.resourceMonitor = resourceMonitor; + + // Initialize with dynamic capacity + int capacity = + resourceMonitor.calculateDynamicQueueCapacity(baseQueueCapacity, minQueueCapacity); + this.jobQueue = new LinkedBlockingQueue<>(capacity); + } + + @PostConstruct + public void initialize() { + log.debug( + "Starting job queue with base capacity {}, min capacity {}", + baseQueueCapacity, + minQueueCapacity); + + // Periodically process the job queue + scheduler.scheduleWithFixedDelay( + this::processQueue, 0, queueCheckIntervalMs, TimeUnit.MILLISECONDS); + + // Periodically update queue capacity based on resource usage + scheduler.scheduleWithFixedDelay( + this::updateQueueCapacity, + 10000, // Initial delay + 30000, // 30 second interval + TimeUnit.MILLISECONDS); + } + + @PreDestroy + public void shutdown() { + log.info("Shutting down job queue"); + shuttingDown = true; + + // Complete any futures that are still waiting + jobMap.forEach( + (id, job) -> { + if (!job.future.isDone()) { + job.future.completeExceptionally( + new RuntimeException("Server shutting down, job cancelled")); + } + }); + + scheduler.shutdownNow(); + jobExecutor.shutdownNow(); + + log.info( + "Job queue shutdown complete. Stats: total={}, rejected={}", + totalQueuedJobs, + rejectedJobs); + } + + /** + * Queues a job for execution when resources permit. + * + * @param jobId The job ID + * @param resourceWeight The resource weight of the job (1-100) + * @param work The work to be done + * @param timeoutMs The timeout in milliseconds + * @return A CompletableFuture that will complete when the job is executed + */ + public CompletableFuture> queueJob( + String jobId, int resourceWeight, Supplier work, long timeoutMs) { + + // Create a CompletableFuture to track this job's completion + CompletableFuture> future = new CompletableFuture<>(); + + // Create the queued job + QueuedJob job = + new QueuedJob(jobId, resourceWeight, work, timeoutMs, Instant.now(), future, false); + + // Store in our map for lookup + jobMap.put(jobId, job); + + // Update stats + totalQueuedJobs++; + currentQueueSize = jobQueue.size(); + + // Try to add to the queue + try { + boolean added = jobQueue.offer(job, 5, TimeUnit.SECONDS); + if (!added) { + log.warn("Queue full, rejecting job {}", jobId); + rejectedJobs++; + future.completeExceptionally( + new RuntimeException("Job queue full, please try again later")); + jobMap.remove(jobId); + return future; + } + + log.debug( + "Job {} queued for execution (weight: {}, queue size: {})", + jobId, + resourceWeight, + jobQueue.size()); + + return future; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + future.completeExceptionally(new RuntimeException("Job queue interrupted")); + jobMap.remove(jobId); + return future; + } + } + + /** + * Gets the current capacity of the job queue. + * + * @return The current capacity + */ + public int getQueueCapacity() { + return ((LinkedBlockingQueue) jobQueue).remainingCapacity() + jobQueue.size(); + } + + /** Updates the capacity of the job queue based on available system resources. */ + private void updateQueueCapacity() { + try { + // Calculate new capacity + int newCapacity = + resourceMonitor.calculateDynamicQueueCapacity( + baseQueueCapacity, minQueueCapacity); + + int currentCapacity = getQueueCapacity(); + if (newCapacity != currentCapacity) { + log.debug( + "Updating job queue capacity from {} to {}", currentCapacity, newCapacity); + + // Create new queue with updated capacity + BlockingQueue newQueue = new LinkedBlockingQueue<>(newCapacity); + + // Transfer jobs from old queue to new queue + jobQueue.drainTo(newQueue); + jobQueue = newQueue; + + currentQueueSize = jobQueue.size(); + } + } catch (Exception e) { + log.error("Error updating queue capacity: {}", e.getMessage(), e); + } + } + + /** Processes jobs in the queue, executing them when resources permit. */ + private void processQueue() { + if (shuttingDown || jobQueue.isEmpty()) { + return; + } + + try { + // Get current resource status + ResourceMonitor.ResourceStatus status = resourceMonitor.getCurrentStatus().get(); + + // Check if we should execute any jobs + boolean canExecuteJobs = (status != ResourceMonitor.ResourceStatus.CRITICAL); + + if (!canExecuteJobs) { + // Under critical load, don't execute any jobs + log.debug("System under critical load, delaying job execution"); + return; + } + + // Get jobs from the queue, up to a limit based on resource availability + int jobsToProcess = + Math.max( + 1, + switch (status) { + case OK -> 3; + case WARNING -> 1; + case CRITICAL -> 0; + }); + + for (int i = 0; i < jobsToProcess && !jobQueue.isEmpty(); i++) { + QueuedJob job = jobQueue.poll(); + if (job == null) break; + + // Check if it's been waiting too long + long waitTimeMs = Instant.now().toEpochMilli() - job.queuedAt.toEpochMilli(); + if (waitTimeMs > maxWaitTimeMs) { + log.warn( + "Job {} exceeded maximum wait time ({} ms), executing anyway", + job.jobId, + waitTimeMs); + } + + // Remove from our map + jobMap.remove(job.jobId); + currentQueueSize = jobQueue.size(); + + // Execute the job + executeJob(job); + } + } catch (Exception e) { + log.error("Error processing job queue: {}", e.getMessage(), e); + } + } + + /** + * Executes a job from the queue. + * + * @param job The job to execute + */ + private void executeJob(QueuedJob job) { + if (job.cancelled) { + log.debug("Job {} was cancelled, not executing", job.jobId); + return; + } + + jobExecutor.execute( + () -> { + log.debug("Executing queued job {} (queued at {})", job.jobId, job.queuedAt); + + try { + // Execute with timeout + Object result = executeWithTimeout(job.work, job.timeoutMs); + + // Process the result + if (result instanceof ResponseEntity) { + job.future.complete((ResponseEntity) result); + } else { + job.future.complete(ResponseEntity.ok(result)); + } + + } catch (Exception e) { + log.error( + "Error executing queued job {}: {}", job.jobId, e.getMessage(), e); + job.future.completeExceptionally(e); + } + }); + } + + /** + * Execute a supplier with a timeout. + * + * @param supplier The supplier to execute + * @param timeoutMs The timeout in milliseconds + * @return The result from the supplier + * @throws Exception If there is an execution error + */ + private T executeWithTimeout(Supplier supplier, long timeoutMs) throws Exception { + CompletableFuture future = CompletableFuture.supplyAsync(supplier); + + try { + if (timeoutMs <= 0) { + // No timeout + return future.join(); + } else { + // With timeout + return future.get(timeoutMs, TimeUnit.MILLISECONDS); + } + } catch (TimeoutException e) { + future.cancel(true); + throw new TimeoutException("Job timed out after " + timeoutMs + "ms"); + } catch (ExecutionException e) { + throw (Exception) e.getCause(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new InterruptedException("Job was interrupted"); + } + } + + /** + * Checks if a job is queued. + * + * @param jobId The job ID + * @return true if the job is queued + */ + public boolean isJobQueued(String jobId) { + return jobMap.containsKey(jobId); + } + + /** + * Gets the current position of a job in the queue. + * + * @param jobId The job ID + * @return The position (0-based) or -1 if not found + */ + public int getJobPosition(String jobId) { + if (!jobMap.containsKey(jobId)) { + return -1; + } + + // Count positions + int position = 0; + for (QueuedJob job : jobQueue) { + if (job.jobId.equals(jobId)) { + return position; + } + position++; + } + + // If we didn't find it in the queue but it's in the map, + // it might be executing already + return -1; + } + + /** + * Cancels a queued job. + * + * @param jobId The job ID + * @return true if the job was cancelled, false if not found + */ + public boolean cancelJob(String jobId) { + QueuedJob job = jobMap.remove(jobId); + if (job != null) { + job.cancelled = true; + job.future.completeExceptionally(new RuntimeException("Job cancelled by user")); + + // Try to remove from queue if it's still there + jobQueue.remove(job); + currentQueueSize = jobQueue.size(); + + log.debug("Job {} cancelled", jobId); + + return true; + } + + return false; + } + + /** + * Get queue statistics. + * + * @return A map containing queue statistics + */ + public Map getQueueStats() { + return Map.of( + "queuedJobs", jobQueue.size(), + "queueCapacity", getQueueCapacity(), + "totalQueuedJobs", totalQueuedJobs, + "rejectedJobs", rejectedJobs, + "resourceStatus", resourceMonitor.getCurrentStatus().get().name()); + } +} diff --git a/common/src/main/java/stirling/software/common/service/ResourceMonitor.java b/common/src/main/java/stirling/software/common/service/ResourceMonitor.java new file mode 100644 index 000000000..cd1e766d1 --- /dev/null +++ b/common/src/main/java/stirling/software/common/service/ResourceMonitor.java @@ -0,0 +1,277 @@ +package stirling.software.common.service; + +import java.lang.management.ManagementFactory; +import java.lang.management.MemoryMXBean; +import java.lang.management.OperatingSystemMXBean; +import java.time.Duration; +import java.time.Instant; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; + +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; + +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + +/** + * Monitors system resources (CPU, memory) to inform job scheduling decisions. Provides information + * about available resources to prevent overloading the system. + */ +@Service +@Slf4j +public class ResourceMonitor { + + @Value("${stirling.resource.memory.critical-threshold:0.9}") + private double memoryCriticalThreshold = 0.9; // 90% usage is critical + + @Value("${stirling.resource.memory.high-threshold:0.75}") + private double memoryHighThreshold = 0.75; // 75% usage is high + + @Value("${stirling.resource.cpu.critical-threshold:0.9}") + private double cpuCriticalThreshold = 0.9; // 90% usage is critical + + @Value("${stirling.resource.cpu.high-threshold:0.75}") + private double cpuHighThreshold = 0.75; // 75% usage is high + + @Value("${stirling.resource.monitor.interval-ms:60000}") + private long monitorIntervalMs = 60000; // 60 seconds + + private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); + private final MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean(); + private final OperatingSystemMXBean osMXBean = ManagementFactory.getOperatingSystemMXBean(); + + @Getter + private final AtomicReference currentStatus = + new AtomicReference<>(ResourceStatus.OK); + + @Getter + private final AtomicReference latestMetrics = + new AtomicReference<>(new ResourceMetrics()); + + /** Represents the current status of system resources. */ + public enum ResourceStatus { + /** Resources are available, normal operations can proceed */ + OK, + + /** Resources are under strain, consider queueing high-resource operations */ + WARNING, + + /** Resources are critically low, queue all operations */ + CRITICAL + } + + /** Detailed metrics about system resources. */ + @Getter + public static class ResourceMetrics { + private final double cpuUsage; + private final double memoryUsage; + private final long freeMemoryBytes; + private final long totalMemoryBytes; + private final long maxMemoryBytes; + private final Instant timestamp; + + public ResourceMetrics() { + this(0, 0, 0, 0, 0, Instant.now()); + } + + public ResourceMetrics( + double cpuUsage, + double memoryUsage, + long freeMemoryBytes, + long totalMemoryBytes, + long maxMemoryBytes, + Instant timestamp) { + this.cpuUsage = cpuUsage; + this.memoryUsage = memoryUsage; + this.freeMemoryBytes = freeMemoryBytes; + this.totalMemoryBytes = totalMemoryBytes; + this.maxMemoryBytes = maxMemoryBytes; + this.timestamp = timestamp; + } + + /** + * Gets the age of these metrics. + * + * @return Duration since these metrics were collected + */ + public Duration getAge() { + return Duration.between(timestamp, Instant.now()); + } + + /** + * Check if these metrics are stale (older than threshold). + * + * @param thresholdMs Staleness threshold in milliseconds + * @return true if metrics are stale + */ + public boolean isStale(long thresholdMs) { + return getAge().toMillis() > thresholdMs; + } + } + + @PostConstruct + public void initialize() { + log.debug("Starting resource monitoring with interval of {}ms", monitorIntervalMs); + scheduler.scheduleAtFixedRate( + this::updateResourceMetrics, 0, monitorIntervalMs, TimeUnit.MILLISECONDS); + } + + @PreDestroy + public void shutdown() { + log.info("Shutting down resource monitoring"); + scheduler.shutdownNow(); + } + + /** Updates the resource metrics by sampling current system state. */ + private void updateResourceMetrics() { + try { + // Get CPU usage + double cpuUsage = osMXBean.getSystemLoadAverage() / osMXBean.getAvailableProcessors(); + if (cpuUsage < 0) cpuUsage = getAlternativeCpuLoad(); // Fallback if not available + + // Get memory usage + long heapUsed = memoryMXBean.getHeapMemoryUsage().getUsed(); + long nonHeapUsed = memoryMXBean.getNonHeapMemoryUsage().getUsed(); + long totalUsed = heapUsed + nonHeapUsed; + + long maxMemory = Runtime.getRuntime().maxMemory(); + long totalMemory = Runtime.getRuntime().totalMemory(); + long freeMemory = Runtime.getRuntime().freeMemory(); + + double memoryUsage = (double) totalUsed / maxMemory; + + // Create new metrics + ResourceMetrics metrics = + new ResourceMetrics( + cpuUsage, + memoryUsage, + freeMemory, + totalMemory, + maxMemory, + Instant.now()); + latestMetrics.set(metrics); + + // Determine system status + ResourceStatus newStatus; + if (cpuUsage > cpuCriticalThreshold || memoryUsage > memoryCriticalThreshold) { + newStatus = ResourceStatus.CRITICAL; + } else if (cpuUsage > cpuHighThreshold || memoryUsage > memoryHighThreshold) { + newStatus = ResourceStatus.WARNING; + } else { + newStatus = ResourceStatus.OK; + } + + // Update status if it changed + ResourceStatus oldStatus = currentStatus.getAndSet(newStatus); + if (oldStatus != newStatus) { + log.info("System resource status changed from {} to {}", oldStatus, newStatus); + log.info( + "Current metrics - CPU: {:.1f}%, Memory: {:.1f}%, Free Memory: {} MB", + cpuUsage * 100, memoryUsage * 100, freeMemory / (1024 * 1024)); + } + } catch (Exception e) { + log.error("Error updating resource metrics: {}", e.getMessage(), e); + } + } + + /** + * Alternative method to estimate CPU load if getSystemLoadAverage() is not available. This is a + * fallback and less accurate than the official JMX method. + * + * @return Estimated CPU load as a value between 0.0 and 1.0 + */ + private double getAlternativeCpuLoad() { + try { + // Try to get CPU time if available through reflection + // This is a fallback since we can't directly cast to platform-specific classes + try { + java.lang.reflect.Method m = + osMXBean.getClass().getDeclaredMethod("getProcessCpuLoad"); + m.setAccessible(true); + return (double) m.invoke(osMXBean); + } catch (Exception e) { + // Try the older method + try { + java.lang.reflect.Method m = + osMXBean.getClass().getDeclaredMethod("getSystemCpuLoad"); + m.setAccessible(true); + return (double) m.invoke(osMXBean); + } catch (Exception e2) { + log.trace( + "Could not get CPU load through reflection, assuming moderate load (0.5)"); + return 0.5; + } + } + } catch (Exception e) { + log.trace("Could not get CPU load, assuming moderate load (0.5)"); + return 0.5; // Default to moderate load + } + } + + /** + * Calculates the dynamic job queue capacity based on current resource usage. + * + * @param baseCapacity The base capacity when system is under minimal load + * @param minCapacity The minimum capacity to maintain even under high load + * @return The calculated job queue capacity + */ + public int calculateDynamicQueueCapacity(int baseCapacity, int minCapacity) { + ResourceMetrics metrics = latestMetrics.get(); + ResourceStatus status = currentStatus.get(); + + // Simple linear reduction based on memory and CPU load + double capacityFactor = + switch (status) { + case OK -> 1.0; + case WARNING -> 0.6; + case CRITICAL -> 0.3; + }; + + // Apply additional reduction based on specific memory pressure + if (metrics.memoryUsage > 0.8) { + capacityFactor *= 0.5; // Further reduce capacity under memory pressure + } + + // Calculate capacity with minimum safeguard + int capacity = (int) Math.max(minCapacity, Math.ceil(baseCapacity * capacityFactor)); + + log.debug( + "Dynamic queue capacity: {} (base: {}, factor: {:.2f}, status: {})", + capacity, + baseCapacity, + capacityFactor, + status); + + return capacity; + } + + /** + * Checks if a job with the given weight can be executed immediately or should be queued based + * on current resource availability. + * + * @param resourceWeight The resource weight of the job (1-100) + * @return true if the job should be queued, false if it can run immediately + */ + public boolean shouldQueueJob(int resourceWeight) { + ResourceStatus status = currentStatus.get(); + + // Always run lightweight jobs (weight < 20) unless critical + if (resourceWeight < 20 && status != ResourceStatus.CRITICAL) { + return false; + } + + // Medium weight jobs run immediately if resources are OK + if (resourceWeight < 60 && status == ResourceStatus.OK) { + return false; + } + + // Heavy jobs (weight >= 60) and any job during WARNING/CRITICAL should be queued + return true; + } +} diff --git a/common/src/main/java/stirling/software/common/service/TaskManager.java b/common/src/main/java/stirling/software/common/service/TaskManager.java new file mode 100644 index 000000000..1bfd0b47f --- /dev/null +++ b/common/src/main/java/stirling/software/common/service/TaskManager.java @@ -0,0 +1,274 @@ +package stirling.software.common.service; + +import java.time.LocalDateTime; +import java.time.temporal.ChronoUnit; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; + +import jakarta.annotation.PreDestroy; + +import lombok.extern.slf4j.Slf4j; + +import stirling.software.common.model.job.JobResult; +import stirling.software.common.model.job.JobStats; + +/** Manages async tasks and their results */ +@Service +@Slf4j +public class TaskManager { + private final Map jobResults = new ConcurrentHashMap<>(); + + @Value("${stirling.jobResultExpiryMinutes:30}") + private int jobResultExpiryMinutes = 30; + + private final FileStorage fileStorage; + private final ScheduledExecutorService cleanupExecutor = + Executors.newSingleThreadScheduledExecutor(); + + /** Initialize the task manager and start the cleanup scheduler */ + public TaskManager(FileStorage fileStorage) { + this.fileStorage = fileStorage; + + // Schedule periodic cleanup of old job results + cleanupExecutor.scheduleAtFixedRate( + this::cleanupOldJobs, + 10, // Initial delay + 10, // Interval + TimeUnit.MINUTES); + + log.debug( + "Task manager initialized with job result expiry of {} minutes", + jobResultExpiryMinutes); + } + + /** + * Create a new task with the given job ID + * + * @param jobId The job ID + */ + public void createTask(String jobId) { + jobResults.put(jobId, JobResult.createNew(jobId)); + log.debug("Created task with job ID: {}", jobId); + } + + /** + * Set the result of a task as a general object + * + * @param jobId The job ID + * @param result The result object + */ + public void setResult(String jobId, Object result) { + JobResult jobResult = getOrCreateJobResult(jobId); + jobResult.completeWithResult(result); + log.debug("Set result for job ID: {}", jobId); + } + + /** + * Set the result of a task as a file + * + * @param jobId The job ID + * @param fileId The file ID + * @param originalFileName The original file name + * @param contentType The content type of the file + */ + public void setFileResult( + String jobId, String fileId, String originalFileName, String contentType) { + JobResult jobResult = getOrCreateJobResult(jobId); + jobResult.completeWithFile(fileId, originalFileName, contentType); + log.debug("Set file result for job ID: {} with file ID: {}", jobId, fileId); + } + + /** + * Set an error for a task + * + * @param jobId The job ID + * @param error The error message + */ + public void setError(String jobId, String error) { + JobResult jobResult = getOrCreateJobResult(jobId); + jobResult.failWithError(error); + log.debug("Set error for job ID: {}: {}", jobId, error); + } + + /** + * Mark a task as complete + * + * @param jobId The job ID + */ + public void setComplete(String jobId) { + JobResult jobResult = getOrCreateJobResult(jobId); + if (jobResult.getResult() == null + && jobResult.getFileId() == null + && jobResult.getError() == null) { + // If no result or error has been set, mark it as complete with an empty result + jobResult.completeWithResult("Task completed successfully"); + } + log.debug("Marked job ID: {} as complete", jobId); + } + + /** + * Check if a task is complete + * + * @param jobId The job ID + * @return true if the task is complete, false otherwise + */ + public boolean isComplete(String jobId) { + JobResult result = jobResults.get(jobId); + return result != null && result.isComplete(); + } + + /** + * Get the result of a task + * + * @param jobId The job ID + * @return The result object, or null if the task doesn't exist or is not complete + */ + public JobResult getJobResult(String jobId) { + return jobResults.get(jobId); + } + + /** + * Get statistics about all jobs in the system + * + * @return Job statistics + */ + public JobStats getJobStats() { + int totalJobs = jobResults.size(); + int activeJobs = 0; + int completedJobs = 0; + int failedJobs = 0; + int successfulJobs = 0; + int fileResultJobs = 0; + + LocalDateTime oldestActiveJobTime = null; + LocalDateTime newestActiveJobTime = null; + long totalProcessingTimeMs = 0; + + for (JobResult result : jobResults.values()) { + if (result.isComplete()) { + completedJobs++; + + // Calculate processing time for completed jobs + if (result.getCreatedAt() != null && result.getCompletedAt() != null) { + long processingTimeMs = + java.time.Duration.between( + result.getCreatedAt(), result.getCompletedAt()) + .toMillis(); + totalProcessingTimeMs += processingTimeMs; + } + + if (result.getError() != null) { + failedJobs++; + } else { + successfulJobs++; + if (result.getFileId() != null) { + fileResultJobs++; + } + } + } else { + activeJobs++; + + // Track oldest and newest active jobs + if (result.getCreatedAt() != null) { + if (oldestActiveJobTime == null + || result.getCreatedAt().isBefore(oldestActiveJobTime)) { + oldestActiveJobTime = result.getCreatedAt(); + } + + if (newestActiveJobTime == null + || result.getCreatedAt().isAfter(newestActiveJobTime)) { + newestActiveJobTime = result.getCreatedAt(); + } + } + } + } + + // Calculate average processing time + long averageProcessingTimeMs = + completedJobs > 0 ? totalProcessingTimeMs / completedJobs : 0; + + return JobStats.builder() + .totalJobs(totalJobs) + .activeJobs(activeJobs) + .completedJobs(completedJobs) + .failedJobs(failedJobs) + .successfulJobs(successfulJobs) + .fileResultJobs(fileResultJobs) + .oldestActiveJobTime(oldestActiveJobTime) + .newestActiveJobTime(newestActiveJobTime) + .averageProcessingTimeMs(averageProcessingTimeMs) + .build(); + } + + /** + * Get or create a job result + * + * @param jobId The job ID + * @return The job result + */ + private JobResult getOrCreateJobResult(String jobId) { + return jobResults.computeIfAbsent(jobId, JobResult::createNew); + } + + /** Clean up old completed job results */ + public void cleanupOldJobs() { + LocalDateTime expiryThreshold = + LocalDateTime.now().minus(jobResultExpiryMinutes, ChronoUnit.MINUTES); + int removedCount = 0; + + try { + for (Map.Entry entry : jobResults.entrySet()) { + JobResult result = entry.getValue(); + + // Remove completed jobs that are older than the expiry threshold + if (result.isComplete() + && result.getCompletedAt() != null + && result.getCompletedAt().isBefore(expiryThreshold)) { + + // If the job has a file result, delete the file + if (result.getFileId() != null) { + try { + fileStorage.deleteFile(result.getFileId()); + } catch (Exception e) { + log.warn( + "Failed to delete file for job {}: {}", + entry.getKey(), + e.getMessage()); + } + } + + // Remove the job result + jobResults.remove(entry.getKey()); + removedCount++; + } + } + + if (removedCount > 0) { + log.info("Cleaned up {} expired job results", removedCount); + } + } catch (Exception e) { + log.error("Error during job cleanup: {}", e.getMessage(), e); + } + } + + /** Shutdown the cleanup executor */ + @PreDestroy + public void shutdown() { + try { + log.info("Shutting down job result cleanup executor"); + cleanupExecutor.shutdown(); + if (!cleanupExecutor.awaitTermination(5, TimeUnit.SECONDS)) { + cleanupExecutor.shutdownNow(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + cleanupExecutor.shutdownNow(); + } + } +} diff --git a/common/src/main/java/stirling/software/common/util/ExecutorFactory.java b/common/src/main/java/stirling/software/common/util/ExecutorFactory.java new file mode 100644 index 000000000..13cca386e --- /dev/null +++ b/common/src/main/java/stirling/software/common/util/ExecutorFactory.java @@ -0,0 +1,31 @@ +package stirling.software.common.util; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class ExecutorFactory { + + /** + * Creates an ExecutorService using virtual threads if available (Java 21+), or falls back to a + * cached thread pool on older Java versions. + */ + public static ExecutorService newVirtualOrCachedThreadExecutor() { + try { + ExecutorService executor = + (ExecutorService) + Executors.class + .getMethod("newVirtualThreadPerTaskExecutor") + .invoke(null); + return executor; + } catch (NoSuchMethodException e) { + log.debug("Virtual threads not available; falling back to cached thread pool."); + } catch (Exception e) { + log.debug("Error initializing virtual thread executor: {}", e.getMessage(), e); + } + + return Executors.newCachedThreadPool(); + } +} diff --git a/stirling-pdf/src/main/java/stirling/software/common/controller/JobController.java b/stirling-pdf/src/main/java/stirling/software/common/controller/JobController.java new file mode 100644 index 000000000..a08d97575 --- /dev/null +++ b/stirling-pdf/src/main/java/stirling/software/common/controller/JobController.java @@ -0,0 +1,114 @@ +package stirling.software.common.controller; + +import java.util.Map; + +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RestController; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +import stirling.software.common.model.job.JobResult; +import stirling.software.common.model.job.JobStats; +import stirling.software.common.service.FileStorage; +import stirling.software.common.service.TaskManager; + +/** REST controller for job-related endpoints */ +@RestController +@RequiredArgsConstructor +@Slf4j +public class JobController { + + private final TaskManager taskManager; + private final FileStorage fileStorage; + + /** + * Get the status of a job + * + * @param jobId The job ID + * @return The job result + */ + @GetMapping("/api/v1/general/job/{jobId}") + public ResponseEntity getJobStatus(@PathVariable("jobId") String jobId) { + JobResult result = taskManager.getJobResult(jobId); + if (result == null) { + return ResponseEntity.notFound().build(); + } + return ResponseEntity.ok(result); + } + + /** + * Get the result of a job + * + * @param jobId The job ID + * @return The job result + */ + @GetMapping("/api/v1/general/job/{jobId}/result") + public ResponseEntity getJobResult(@PathVariable("jobId") String jobId) { + JobResult result = taskManager.getJobResult(jobId); + if (result == null) { + return ResponseEntity.notFound().build(); + } + + if (!result.isComplete()) { + return ResponseEntity.badRequest().body("Job is not complete yet"); + } + + if (result.getError() != null) { + return ResponseEntity.badRequest().body("Job failed: " + result.getError()); + } + + if (result.getFileId() != null) { + try { + byte[] fileContent = fileStorage.retrieveBytes(result.getFileId()); + return ResponseEntity.ok() + .header("Content-Type", result.getContentType()) + .header( + "Content-Disposition", + "form-data; name=\"attachment\"; filename=\"" + + result.getOriginalFileName() + + "\"") + .body(fileContent); + } catch (Exception e) { + log.error("Error retrieving file for job {}: {}", jobId, e.getMessage(), e); + return ResponseEntity.internalServerError() + .body("Error retrieving file: " + e.getMessage()); + } + } + + return ResponseEntity.ok(result.getResult()); + } + + /** + * Get statistics about jobs in the system + * + * @return Job statistics + */ + @GetMapping("/api/v1/general/job/stats") + public ResponseEntity getJobStats() { + JobStats stats = taskManager.getJobStats(); + return ResponseEntity.ok(stats); + } + + /** + * Manually trigger cleanup of old jobs + * + * @return A response indicating how many jobs were cleaned up + */ + @PostMapping("/api/v1/general/job/cleanup") + public ResponseEntity cleanupOldJobs() { + int beforeCount = taskManager.getJobStats().getTotalJobs(); + taskManager.cleanupOldJobs(); + int afterCount = taskManager.getJobStats().getTotalJobs(); + int removedCount = beforeCount - afterCount; + + return ResponseEntity.ok( + Map.of( + "message", "Cleanup complete", + "removedJobs", removedCount, + "remainingJobs", afterCount)); + } +}