This commit is contained in:
Anthony Stirling 2025-06-19 00:27:53 +01:00
parent c080158b1c
commit 2d18413676
16 changed files with 2228 additions and 1 deletions

View File

@ -28,4 +28,5 @@ dependencies {
api 'org.snakeyaml:snakeyaml-engine:2.9' api 'org.snakeyaml:snakeyaml-engine:2.9'
api "org.springdoc:springdoc-openapi-starter-webmvc-ui:2.8.9" api "org.springdoc:springdoc-openapi-starter-webmvc-ui:2.8.9"
api 'jakarta.mail:jakarta.mail-api:2.1.3' api 'jakarta.mail:jakarta.mail-api:2.1.3'
api 'org.springframework.boot:spring-boot-starter-aop'
} }

View File

@ -0,0 +1,47 @@
package stirling.software.common.annotations;
import java.lang.annotation.*;
import org.springframework.core.annotation.AliasFor;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@RequestMapping(method = RequestMethod.POST)
public @interface AutoJobPostMapping {
@AliasFor(annotation = RequestMapping.class, attribute = "value")
String[] value() default {};
@AliasFor(annotation = RequestMapping.class, attribute = "consumes")
String[] consumes() default {"multipart/form-data"};
/**
* Custom timeout in milliseconds for this specific job. If not specified, the default system
* timeout will be used.
*/
long timeout() default -1;
/** Maximum number of times to retry the job on failure. Default is 1 (no retries). */
int retryCount() default 1;
/**
* Whether to track and report progress for this job. If enabled, the job will send progress
* updates through WebSocket.
*/
boolean trackProgress() default true;
/**
* Whether this job can be queued when system resources are limited. If enabled, jobs will be
* queued instead of rejected when the system is under high load. The queue size is dynamically
* adjusted based on available memory and CPU resources.
*/
boolean queueable() default false;
/**
* Optional resource weight of this job (1-100). Higher values indicate more resource-intensive
* jobs that may need stricter queuing. Default is 50 (medium weight).
*/
int resourceWeight() default 50;
}

View File

@ -0,0 +1,231 @@
package stirling.software.common.aop;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.*;
import org.springframework.stereotype.Component;
import org.springframework.web.multipart.MultipartFile;
import jakarta.servlet.http.HttpServletRequest;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import stirling.software.common.annotations.AutoJobPostMapping;
import stirling.software.common.model.api.PDFFile;
import stirling.software.common.service.FileOrUploadService;
import stirling.software.common.service.FileStorage;
import stirling.software.common.service.JobExecutorService;
@Aspect
@Component
@RequiredArgsConstructor
@Slf4j
public class AutoJobAspect {
private final JobExecutorService jobExecutorService;
private final HttpServletRequest request;
private final FileOrUploadService fileOrUploadService;
private final FileStorage fileStorage;
@Around("@annotation(autoJobPostMapping)")
public Object wrapWithJobExecution(
ProceedingJoinPoint joinPoint, AutoJobPostMapping autoJobPostMapping) {
// Extract parameters from the request and annotation
boolean async = Boolean.parseBoolean(request.getParameter("async"));
long timeout = autoJobPostMapping.timeout();
int retryCount = autoJobPostMapping.retryCount();
boolean trackProgress = autoJobPostMapping.trackProgress();
log.debug(
"AutoJobPostMapping execution with async={}, timeout={}, retryCount={}, trackProgress={}",
async,
timeout > 0 ? timeout : "default",
retryCount,
trackProgress);
// Inspect and possibly mutate arguments
Object[] args = joinPoint.getArgs();
for (int i = 0; i < args.length; i++) {
Object arg = args[i];
if (arg instanceof PDFFile pdfFile) {
// Case 1: fileId is provided but no fileInput
if (pdfFile.getFileInput() == null && pdfFile.getFileId() != null) {
try {
log.debug("Using fileId {} to get file content", pdfFile.getFileId());
MultipartFile file = fileStorage.retrieveFile(pdfFile.getFileId());
pdfFile.setFileInput(file);
} catch (Exception e) {
throw new RuntimeException(
"Failed to resolve file by ID: " + pdfFile.getFileId(), e);
}
}
// Case 2: For async requests, we need to make a copy of the MultipartFile
else if (async && pdfFile.getFileInput() != null) {
try {
log.debug("Making persistent copy of uploaded file for async processing");
MultipartFile originalFile = pdfFile.getFileInput();
String fileId = fileStorage.storeFile(originalFile);
// Store the fileId for later reference
pdfFile.setFileId(fileId);
// Replace the original MultipartFile with our persistent copy
MultipartFile persistentFile = fileStorage.retrieveFile(fileId);
pdfFile.setFileInput(persistentFile);
log.debug("Created persistent file copy with fileId: {}", fileId);
} catch (IOException e) {
throw new RuntimeException(
"Failed to create persistent copy of uploaded file", e);
}
}
}
}
// Extract queueable and resourceWeight parameters
boolean queueable = autoJobPostMapping.queueable();
int resourceWeight = autoJobPostMapping.resourceWeight();
// Integrate with the JobExecutorService
if (retryCount <= 1) {
// No retries needed, simple execution
return jobExecutorService.runJobGeneric(
async,
() -> {
try {
// Note: Progress tracking is handled in TaskManager/JobExecutorService
// The trackProgress flag controls whether detailed progress is stored
// for REST API queries, not WebSocket notifications
return joinPoint.proceed(args);
} catch (Throwable ex) {
log.error(
"AutoJobAspect caught exception during job execution: {}",
ex.getMessage(),
ex);
throw new RuntimeException(ex);
}
},
timeout,
queueable,
resourceWeight);
} else {
// Use retry logic
return executeWithRetries(
joinPoint,
args,
async,
timeout,
retryCount,
trackProgress,
queueable,
resourceWeight);
}
}
private Object executeWithRetries(
ProceedingJoinPoint joinPoint,
Object[] args,
boolean async,
long timeout,
int maxRetries,
boolean trackProgress,
boolean queueable,
int resourceWeight) {
AtomicInteger attempts = new AtomicInteger(0);
// Keep jobId reference for progress tracking in TaskManager
AtomicReference<String> jobIdRef = new AtomicReference<>();
return jobExecutorService.runJobGeneric(
async,
() -> {
int currentAttempt = attempts.incrementAndGet();
try {
if (trackProgress && async) {
// Get jobId for progress tracking in TaskManager
// This enables REST API progress queries, not WebSocket
if (jobIdRef.get() == null) {
jobIdRef.set(getJobIdFromContext());
}
String jobId = jobIdRef.get();
if (jobId != null) {
log.debug(
"Tracking progress for job {} (attempt {}/{})",
jobId,
currentAttempt,
maxRetries);
// Progress is tracked in TaskManager for REST API access
// No WebSocket notifications sent here
}
}
return joinPoint.proceed(args);
} catch (Throwable ex) {
log.error(
"AutoJobAspect caught exception during job execution (attempt {}/{}): {}",
currentAttempt,
Math.max(1, maxRetries),
ex.getMessage(),
ex);
// Check if we should retry
if (currentAttempt < maxRetries) {
log.info(
"Retrying operation, attempt {}/{}",
currentAttempt + 1,
maxRetries);
if (trackProgress && async) {
String jobId = jobIdRef.get();
if (jobId != null) {
log.debug(
"Recording retry attempt for job {} in TaskManager",
jobId);
// Retry info is tracked in TaskManager for REST API access
}
}
try {
// Simple exponential backoff
Thread.sleep(100 * currentAttempt);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
// Recursive call to retry
return executeWithRetries(
joinPoint,
args,
async,
timeout,
maxRetries,
trackProgress,
queueable,
resourceWeight);
}
// No more retries, throw the exception
throw new RuntimeException("Job failed: " + ex.getMessage(), ex);
}
},
timeout,
queueable,
resourceWeight);
}
// Get the job ID from the context for progress tracking in TaskManager
private String getJobIdFromContext() {
try {
return (String) request.getAttribute("jobId");
} catch (Exception e) {
log.debug("Could not retrieve job ID from context: {}", e.getMessage());
return null;
}
}
}

View File

@ -14,8 +14,12 @@ import lombok.NoArgsConstructor;
public class PDFFile { public class PDFFile {
@Schema( @Schema(
description = "The input PDF file", description = "The input PDF file",
requiredMode = Schema.RequiredMode.REQUIRED,
contentMediaType = "application/pdf", contentMediaType = "application/pdf",
format = "binary") format = "binary")
private MultipartFile fileInput; private MultipartFile fileInput;
@Schema(
description = "File ID for server-side files (can be used instead of fileInput)",
example = "a1b2c3d4-5678-90ab-cdef-ghijklmnopqr")
private String fileId;
} }

View File

@ -0,0 +1,15 @@
package stirling.software.common.model.job;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class JobProgress {
private String jobId;
private String status;
private int percentComplete;
private String message;
}

View File

@ -0,0 +1,14 @@
package stirling.software.common.model.job;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class JobResponse<T> {
private boolean async;
private String jobId;
private T result;
}

View File

@ -0,0 +1,94 @@
package stirling.software.common.model.job;
import java.time.LocalDateTime;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
/** Represents the result of a job execution. Used by the TaskManager to store job results. */
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class JobResult {
/** The job ID */
private String jobId;
/** Flag indicating if the job is complete */
private boolean complete;
/** Error message if the job failed */
private String error;
/** The file ID of the result file, if applicable */
private String fileId;
/** Original file name, if applicable */
private String originalFileName;
/** MIME type of the result, if applicable */
private String contentType;
/** Time when the job was created */
private LocalDateTime createdAt;
/** Time when the job was completed */
private LocalDateTime completedAt;
/** The actual result object, if not a file */
private Object result;
/**
* Create a new JobResult with the given job ID
*
* @param jobId The job ID
* @return A new JobResult
*/
public static JobResult createNew(String jobId) {
return JobResult.builder()
.jobId(jobId)
.complete(false)
.createdAt(LocalDateTime.now())
.build();
}
/**
* Mark this job as complete with a file result
*
* @param fileId The file ID of the result
* @param originalFileName The original file name
* @param contentType The content type of the file
*/
public void completeWithFile(String fileId, String originalFileName, String contentType) {
this.complete = true;
this.fileId = fileId;
this.originalFileName = originalFileName;
this.contentType = contentType;
this.completedAt = LocalDateTime.now();
}
/**
* Mark this job as complete with a general result
*
* @param result The result object
*/
public void completeWithResult(Object result) {
this.complete = true;
this.result = result;
this.completedAt = LocalDateTime.now();
}
/**
* Mark this job as failed with an error message
*
* @param error The error message
*/
public void failWithError(String error) {
this.complete = true;
this.error = error;
this.completedAt = LocalDateTime.now();
}
}

View File

@ -0,0 +1,43 @@
package stirling.software.common.model.job;
import java.time.LocalDateTime;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
/** Represents statistics about jobs in the system */
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class JobStats {
/** Total number of jobs (active and completed) */
private int totalJobs;
/** Number of active (incomplete) jobs */
private int activeJobs;
/** Number of completed jobs */
private int completedJobs;
/** Number of failed jobs */
private int failedJobs;
/** Number of successful jobs */
private int successfulJobs;
/** Number of jobs with file results */
private int fileResultJobs;
/** The oldest active job's creation timestamp */
private LocalDateTime oldestActiveJobTime;
/** The newest active job's creation timestamp */
private LocalDateTime newestActiveJobTime;
/** The average processing time for completed jobs in milliseconds */
private long averageProcessingTimeMs;
}

View File

@ -0,0 +1,78 @@
package stirling.software.common.service;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.file.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.web.multipart.MultipartFile;
import lombok.RequiredArgsConstructor;
@Service
@RequiredArgsConstructor
public class FileOrUploadService {
@Value("${stirling.tempDir:/tmp/stirling-files}")
private String tempDirPath;
public Path resolveFilePath(String fileId) {
return Path.of(tempDirPath).resolve(fileId);
}
public MultipartFile toMockMultipartFile(String name, byte[] data) throws IOException {
return new CustomMultipartFile(name, data);
}
// Custom implementation of MultipartFile
private static class CustomMultipartFile implements MultipartFile {
private final String name;
private final byte[] content;
public CustomMultipartFile(String name, byte[] content) {
this.name = name;
this.content = content;
}
@Override
public String getName() {
return name;
}
@Override
public String getOriginalFilename() {
return name;
}
@Override
public String getContentType() {
return "application/pdf";
}
@Override
public boolean isEmpty() {
return content == null || content.length == 0;
}
@Override
public long getSize() {
return content.length;
}
@Override
public byte[] getBytes() throws IOException {
return content;
}
@Override
public java.io.InputStream getInputStream() throws IOException {
return new ByteArrayInputStream(content);
}
@Override
public void transferTo(java.io.File dest) throws IOException, IllegalStateException {
Files.write(dest.toPath(), content);
}
}
}

View File

@ -0,0 +1,152 @@
package stirling.software.common.service;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.UUID;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.web.multipart.MultipartFile;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
/**
* Service for storing and retrieving files with unique file IDs. Used by the AutoJobPostMapping
* system to handle file references.
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class FileStorage {
@Value("${stirling.tempDir:/tmp/stirling-files}")
private String tempDirPath;
private final FileOrUploadService fileOrUploadService;
/**
* Store a file and return its unique ID
*
* @param file The file to store
* @return The unique ID assigned to the file
* @throws IOException If there is an error storing the file
*/
public String storeFile(MultipartFile file) throws IOException {
String fileId = generateFileId();
Path filePath = getFilePath(fileId);
// Ensure the directory exists
Files.createDirectories(filePath.getParent());
// Transfer the file to the storage location
file.transferTo(filePath.toFile());
log.debug("Stored file with ID: {}", fileId);
return fileId;
}
/**
* Store a byte array as a file and return its unique ID
*
* @param bytes The byte array to store
* @param originalName The original name of the file (for extension)
* @return The unique ID assigned to the file
* @throws IOException If there is an error storing the file
*/
public String storeBytes(byte[] bytes, String originalName) throws IOException {
String fileId = generateFileId();
Path filePath = getFilePath(fileId);
// Ensure the directory exists
Files.createDirectories(filePath.getParent());
// Write the bytes to the file
Files.write(filePath, bytes);
log.debug("Stored byte array with ID: {}", fileId);
return fileId;
}
/**
* Retrieve a file by its ID as a MultipartFile
*
* @param fileId The ID of the file to retrieve
* @return The file as a MultipartFile
* @throws IOException If the file doesn't exist or can't be read
*/
public MultipartFile retrieveFile(String fileId) throws IOException {
Path filePath = getFilePath(fileId);
if (!Files.exists(filePath)) {
throw new IOException("File not found with ID: " + fileId);
}
byte[] fileData = Files.readAllBytes(filePath);
return fileOrUploadService.toMockMultipartFile(fileId, fileData);
}
/**
* Retrieve a file by its ID as a byte array
*
* @param fileId The ID of the file to retrieve
* @return The file as a byte array
* @throws IOException If the file doesn't exist or can't be read
*/
public byte[] retrieveBytes(String fileId) throws IOException {
Path filePath = getFilePath(fileId);
if (!Files.exists(filePath)) {
throw new IOException("File not found with ID: " + fileId);
}
return Files.readAllBytes(filePath);
}
/**
* Delete a file by its ID
*
* @param fileId The ID of the file to delete
* @return true if the file was deleted, false otherwise
*/
public boolean deleteFile(String fileId) {
try {
Path filePath = getFilePath(fileId);
return Files.deleteIfExists(filePath);
} catch (IOException e) {
log.error("Error deleting file with ID: {}", fileId, e);
return false;
}
}
/**
* Check if a file exists by its ID
*
* @param fileId The ID of the file to check
* @return true if the file exists, false otherwise
*/
public boolean fileExists(String fileId) {
Path filePath = getFilePath(fileId);
return Files.exists(filePath);
}
/**
* Get the path for a file ID
*
* @param fileId The ID of the file
* @return The path to the file
*/
private Path getFilePath(String fileId) {
return Path.of(tempDirPath).resolve(fileId);
}
/**
* Generate a unique file ID
*
* @return A unique file ID
*/
private String generateFileId() {
return UUID.randomUUID().toString();
}
}

View File

@ -0,0 +1,452 @@
package stirling.software.common.service;
import java.io.IOException;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Service;
import org.springframework.web.multipart.MultipartFile;
import jakarta.servlet.http.HttpServletRequest;
import lombok.extern.slf4j.Slf4j;
import stirling.software.common.model.job.JobResponse;
import stirling.software.common.util.ExecutorFactory;
/** Service for executing jobs asynchronously or synchronously */
@Service
@Slf4j
public class JobExecutorService {
private final TaskManager taskManager;
private final FileStorage fileStorage;
private final HttpServletRequest request;
private final ResourceMonitor resourceMonitor;
private final JobQueue jobQueue;
private final ExecutorService executor = ExecutorFactory.newVirtualOrCachedThreadExecutor();
private final long effectiveTimeoutMs;
public JobExecutorService(
TaskManager taskManager,
FileStorage fileStorage,
HttpServletRequest request,
ResourceMonitor resourceMonitor,
JobQueue jobQueue,
@Value("${spring.mvc.async.request-timeout:1200000}") long asyncRequestTimeoutMs,
@Value("${server.servlet.session.timeout:30m}") String sessionTimeout) {
this.taskManager = taskManager;
this.fileStorage = fileStorage;
this.request = request;
this.resourceMonitor = resourceMonitor;
this.jobQueue = jobQueue;
// Parse session timeout and calculate effective timeout once during initialization
long sessionTimeoutMs = parseSessionTimeout(sessionTimeout);
this.effectiveTimeoutMs = Math.min(asyncRequestTimeoutMs, sessionTimeoutMs);
log.debug(
"Job executor configured with effective timeout of {} ms", this.effectiveTimeoutMs);
}
/**
* Run a job either asynchronously or synchronously
*
* @param async Whether to run the job asynchronously
* @param work The work to be done
* @return The response
*/
public ResponseEntity<?> runJobGeneric(boolean async, Supplier<Object> work) {
return runJobGeneric(async, work, -1);
}
/**
* Run a job either asynchronously or synchronously with a custom timeout
*
* @param async Whether to run the job asynchronously
* @param work The work to be done
* @param customTimeoutMs Custom timeout in milliseconds, or -1 to use the default
* @return The response
*/
public ResponseEntity<?> runJobGeneric(
boolean async, Supplier<Object> work, long customTimeoutMs) {
return runJobGeneric(async, work, customTimeoutMs, false, 50);
}
/**
* Run a job either asynchronously or synchronously with custom parameters
*
* @param async Whether to run the job asynchronously
* @param work The work to be done
* @param customTimeoutMs Custom timeout in milliseconds, or -1 to use the default
* @param queueable Whether this job can be queued when system resources are limited
* @param resourceWeight The resource weight of this job (1-100)
* @return The response
*/
public ResponseEntity<?> runJobGeneric(
boolean async,
Supplier<Object> work,
long customTimeoutMs,
boolean queueable,
int resourceWeight) {
String jobId = UUID.randomUUID().toString();
// Store the job ID in the request for potential use by other components
if (request != null) {
request.setAttribute("jobId", jobId);
}
// Determine which timeout to use
long timeoutToUse = customTimeoutMs > 0 ? customTimeoutMs : effectiveTimeoutMs;
log.debug(
"Running job with ID: {}, async: {}, timeout: {}ms, queueable: {}, weight: {}",
jobId,
async,
timeoutToUse,
queueable,
resourceWeight);
// Check if we need to queue this job based on resource availability
boolean shouldQueue =
queueable
&& async
&& // Only async jobs can be queued
resourceMonitor.shouldQueueJob(resourceWeight);
if (shouldQueue) {
// Queue the job instead of executing immediately
log.debug(
"Queueing job {} due to resource constraints (weight: {})",
jobId,
resourceWeight);
taskManager.createTask(jobId);
// Create a specialized wrapper that updates the TaskManager
Supplier<Object> wrappedWork =
() -> {
try {
Object result = work.get();
processJobResult(jobId, result);
return result;
} catch (Exception e) {
log.error(
"Error executing queued job {}: {}", jobId, e.getMessage(), e);
taskManager.setError(jobId, e.getMessage());
throw e;
}
};
// Queue the job and get the future
CompletableFuture<ResponseEntity<?>> future =
jobQueue.queueJob(jobId, resourceWeight, wrappedWork, timeoutToUse);
// Return immediately with job ID
return ResponseEntity.ok().body(new JobResponse<>(true, jobId, null));
} else if (async) {
taskManager.createTask(jobId);
executor.execute(
() -> {
try {
log.debug(
"Running async job {} with timeout {} ms", jobId, timeoutToUse);
// Execute with timeout
Object result = executeWithTimeout(() -> work.get(), timeoutToUse);
processJobResult(jobId, result);
} catch (TimeoutException te) {
log.error("Job {} timed out after {} ms", jobId, timeoutToUse);
taskManager.setError(jobId, "Job timed out");
} catch (Exception e) {
log.error("Error executing job {}: {}", jobId, e.getMessage(), e);
taskManager.setError(jobId, e.getMessage());
}
});
return ResponseEntity.ok().body(new JobResponse<>(true, jobId, null));
} else {
try {
log.debug("Running sync job with timeout {} ms", timeoutToUse);
// Execute with timeout
Object result = executeWithTimeout(() -> work.get(), timeoutToUse);
// If the result is already a ResponseEntity, return it directly
if (result instanceof ResponseEntity) {
return (ResponseEntity<?>) result;
}
// Process different result types
return handleResultForSyncJob(result);
} catch (TimeoutException te) {
log.error("Synchronous job timed out after {} ms", timeoutToUse);
return ResponseEntity.internalServerError()
.body(Map.of("error", "Job timed out after " + timeoutToUse + " ms"));
} catch (Exception e) {
log.error("Error executing synchronous job: {}", e.getMessage(), e);
// Construct a JSON error response
return ResponseEntity.internalServerError()
.body(Map.of("error", "Job failed: " + e.getMessage()));
}
}
}
/**
* Process the result of an asynchronous job
*
* @param jobId The job ID
* @param result The result
*/
private void processJobResult(String jobId, Object result) {
try {
if (result instanceof byte[]) {
// Store byte array as a file
String fileId = fileStorage.storeBytes((byte[]) result, "result.pdf");
taskManager.setFileResult(jobId, fileId, "result.pdf", "application/pdf");
log.debug("Stored byte[] result with fileId: {}", fileId);
} else if (result instanceof ResponseEntity) {
ResponseEntity<?> response = (ResponseEntity<?>) result;
Object body = response.getBody();
if (body instanceof byte[]) {
// Extract filename from content-disposition header if available
String filename = "result.pdf";
String contentType = "application/pdf";
if (response.getHeaders().getContentDisposition() != null) {
String disposition =
response.getHeaders().getContentDisposition().toString();
if (disposition.contains("filename=")) {
filename =
disposition.substring(
disposition.indexOf("filename=") + 9,
disposition.lastIndexOf("\""));
}
}
if (response.getHeaders().getContentType() != null) {
contentType = response.getHeaders().getContentType().toString();
}
String fileId = fileStorage.storeBytes((byte[]) body, filename);
taskManager.setFileResult(jobId, fileId, filename, contentType);
log.debug("Stored ResponseEntity<byte[]> result with fileId: {}", fileId);
} else {
// Check if the response body contains a fileId
if (body != null && body.toString().contains("fileId")) {
try {
// Try to extract fileId using reflection
java.lang.reflect.Method getFileId =
body.getClass().getMethod("getFileId");
String fileId = (String) getFileId.invoke(body);
if (fileId != null && !fileId.isEmpty()) {
// Try to get filename and content type
String filename = "result.pdf";
String contentType = "application/pdf";
try {
java.lang.reflect.Method getOriginalFileName =
body.getClass().getMethod("getOriginalFilename");
String origName = (String) getOriginalFileName.invoke(body);
if (origName != null && !origName.isEmpty()) {
filename = origName;
}
} catch (Exception e) {
log.debug(
"Could not get original filename: {}", e.getMessage());
}
try {
java.lang.reflect.Method getContentType =
body.getClass().getMethod("getContentType");
String ct = (String) getContentType.invoke(body);
if (ct != null && !ct.isEmpty()) {
contentType = ct;
}
} catch (Exception e) {
log.debug("Could not get content type: {}", e.getMessage());
}
taskManager.setFileResult(jobId, fileId, filename, contentType);
log.debug("Extracted fileId from response body: {}", fileId);
taskManager.setComplete(jobId);
return;
}
} catch (Exception e) {
log.debug(
"Failed to extract fileId from response body: {}",
e.getMessage());
}
}
// Store generic result
taskManager.setResult(jobId, body);
}
} else if (result instanceof MultipartFile) {
MultipartFile file = (MultipartFile) result;
String fileId = fileStorage.storeFile(file);
taskManager.setFileResult(
jobId, fileId, file.getOriginalFilename(), file.getContentType());
log.debug("Stored MultipartFile result with fileId: {}", fileId);
} else {
// Check if result has a fileId field
if (result != null) {
try {
// Try to extract fileId using reflection
java.lang.reflect.Method getFileId =
result.getClass().getMethod("getFileId");
String fileId = (String) getFileId.invoke(result);
if (fileId != null && !fileId.isEmpty()) {
// Try to get filename and content type
String filename = "result.pdf";
String contentType = "application/pdf";
try {
java.lang.reflect.Method getOriginalFileName =
result.getClass().getMethod("getOriginalFilename");
String origName = (String) getOriginalFileName.invoke(result);
if (origName != null && !origName.isEmpty()) {
filename = origName;
}
} catch (Exception e) {
log.debug("Could not get original filename: {}", e.getMessage());
}
try {
java.lang.reflect.Method getContentType =
result.getClass().getMethod("getContentType");
String ct = (String) getContentType.invoke(result);
if (ct != null && !ct.isEmpty()) {
contentType = ct;
}
} catch (Exception e) {
log.debug("Could not get content type: {}", e.getMessage());
}
taskManager.setFileResult(jobId, fileId, filename, contentType);
log.debug("Extracted fileId from result object: {}", fileId);
taskManager.setComplete(jobId);
return;
}
} catch (Exception e) {
log.debug(
"Failed to extract fileId from result object: {}", e.getMessage());
}
}
// Default case: store the result as is
taskManager.setResult(jobId, result);
}
taskManager.setComplete(jobId);
} catch (Exception e) {
log.error("Error processing job result: {}", e.getMessage(), e);
taskManager.setError(jobId, "Error processing result: " + e.getMessage());
}
}
/**
* Handle different result types for synchronous jobs
*
* @param result The result object
* @return The appropriate ResponseEntity
* @throws IOException If there is an error processing the result
*/
private ResponseEntity<?> handleResultForSyncJob(Object result) throws IOException {
if (result instanceof byte[]) {
// Return byte array as PDF
return ResponseEntity.ok()
.contentType(MediaType.APPLICATION_PDF)
.header(
HttpHeaders.CONTENT_DISPOSITION,
"form-data; name=\"attachment\"; filename=\"result.pdf\"")
.body(result);
} else if (result instanceof MultipartFile) {
// Return MultipartFile content
MultipartFile file = (MultipartFile) result;
return ResponseEntity.ok()
.contentType(MediaType.parseMediaType(file.getContentType()))
.header(
HttpHeaders.CONTENT_DISPOSITION,
"form-data; name=\"attachment\"; filename=\""
+ file.getOriginalFilename()
+ "\"")
.body(file.getBytes());
} else {
// Default case: return as JSON
return ResponseEntity.ok(result);
}
}
/**
* Parse session timeout string (e.g., "30m", "1h") to milliseconds
*
* @param timeout The timeout string
* @return The timeout in milliseconds
*/
private long parseSessionTimeout(String timeout) {
if (timeout == null || timeout.isEmpty()) {
return 30 * 60 * 1000; // Default: 30 minutes
}
try {
String value = timeout.replaceAll("[^\\d.]", "");
String unit = timeout.replaceAll("[\\d.]", "");
double numericValue = Double.parseDouble(value);
return switch (unit.toLowerCase()) {
case "s" -> (long) (numericValue * 1000);
case "m" -> (long) (numericValue * 60 * 1000);
case "h" -> (long) (numericValue * 60 * 60 * 1000);
case "d" -> (long) (numericValue * 24 * 60 * 60 * 1000);
default -> (long) (numericValue * 60 * 1000); // Default to minutes
};
} catch (Exception e) {
log.warn("Could not parse session timeout '{}', using default", timeout);
return 30 * 60 * 1000; // Default: 30 minutes
}
}
/**
* Execute a supplier with a timeout
*
* @param supplier The supplier to execute
* @param timeoutMs The timeout in milliseconds
* @return The result from the supplier
* @throws TimeoutException If the execution times out
* @throws Exception If the supplier throws an exception
*/
private <T> T executeWithTimeout(Supplier<T> supplier, long timeoutMs)
throws TimeoutException, Exception {
java.util.concurrent.CompletableFuture<T> future =
java.util.concurrent.CompletableFuture.supplyAsync(supplier);
try {
return future.get(timeoutMs, TimeUnit.MILLISECONDS);
} catch (java.util.concurrent.TimeoutException e) {
future.cancel(true);
throw new TimeoutException("Execution timed out after " + timeoutMs + " ms");
} catch (java.util.concurrent.ExecutionException e) {
throw (Exception) e.getCause();
} catch (java.util.concurrent.CancellationException e) {
throw new Exception("Execution was cancelled", e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new Exception("Execution was interrupted", e);
}
}
}

View File

@ -0,0 +1,400 @@
package stirling.software.common.service;
import java.time.Instant;
import java.util.Map;
import java.util.concurrent.*;
import java.util.function.Supplier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Service;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import stirling.software.common.util.ExecutorFactory;
/**
* Manages a queue of jobs with dynamic sizing based on system resources. Used when system resources
* are limited to prevent overloading.
*/
@Service
@Slf4j
public class JobQueue {
private final ResourceMonitor resourceMonitor;
@Value("${stirling.job.queue.base-capacity:10}")
private int baseQueueCapacity = 10;
@Value("${stirling.job.queue.min-capacity:2}")
private int minQueueCapacity = 2;
@Value("${stirling.job.queue.check-interval-ms:1000}")
private long queueCheckIntervalMs = 1000;
@Value("${stirling.job.queue.max-wait-time-ms:600000}")
private long maxWaitTimeMs = 600000; // 10 minutes
private BlockingQueue<QueuedJob> jobQueue;
private final Map<String, QueuedJob> jobMap = new ConcurrentHashMap<>();
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
private final ExecutorService jobExecutor = ExecutorFactory.newVirtualOrCachedThreadExecutor();
private boolean shuttingDown = false;
@Getter private int rejectedJobs = 0;
@Getter private int totalQueuedJobs = 0;
@Getter private int currentQueueSize = 0;
/** Represents a job waiting in the queue. */
@Data
@AllArgsConstructor
private static class QueuedJob {
private final String jobId;
private final int resourceWeight;
private final Supplier<Object> work;
private final long timeoutMs;
private final Instant queuedAt;
private CompletableFuture<ResponseEntity<?>> future;
private volatile boolean cancelled = false;
}
public JobQueue(ResourceMonitor resourceMonitor) {
this.resourceMonitor = resourceMonitor;
// Initialize with dynamic capacity
int capacity =
resourceMonitor.calculateDynamicQueueCapacity(baseQueueCapacity, minQueueCapacity);
this.jobQueue = new LinkedBlockingQueue<>(capacity);
}
@PostConstruct
public void initialize() {
log.debug(
"Starting job queue with base capacity {}, min capacity {}",
baseQueueCapacity,
minQueueCapacity);
// Periodically process the job queue
scheduler.scheduleWithFixedDelay(
this::processQueue, 0, queueCheckIntervalMs, TimeUnit.MILLISECONDS);
// Periodically update queue capacity based on resource usage
scheduler.scheduleWithFixedDelay(
this::updateQueueCapacity,
10000, // Initial delay
30000, // 30 second interval
TimeUnit.MILLISECONDS);
}
@PreDestroy
public void shutdown() {
log.info("Shutting down job queue");
shuttingDown = true;
// Complete any futures that are still waiting
jobMap.forEach(
(id, job) -> {
if (!job.future.isDone()) {
job.future.completeExceptionally(
new RuntimeException("Server shutting down, job cancelled"));
}
});
scheduler.shutdownNow();
jobExecutor.shutdownNow();
log.info(
"Job queue shutdown complete. Stats: total={}, rejected={}",
totalQueuedJobs,
rejectedJobs);
}
/**
* Queues a job for execution when resources permit.
*
* @param jobId The job ID
* @param resourceWeight The resource weight of the job (1-100)
* @param work The work to be done
* @param timeoutMs The timeout in milliseconds
* @return A CompletableFuture that will complete when the job is executed
*/
public CompletableFuture<ResponseEntity<?>> queueJob(
String jobId, int resourceWeight, Supplier<Object> work, long timeoutMs) {
// Create a CompletableFuture to track this job's completion
CompletableFuture<ResponseEntity<?>> future = new CompletableFuture<>();
// Create the queued job
QueuedJob job =
new QueuedJob(jobId, resourceWeight, work, timeoutMs, Instant.now(), future, false);
// Store in our map for lookup
jobMap.put(jobId, job);
// Update stats
totalQueuedJobs++;
currentQueueSize = jobQueue.size();
// Try to add to the queue
try {
boolean added = jobQueue.offer(job, 5, TimeUnit.SECONDS);
if (!added) {
log.warn("Queue full, rejecting job {}", jobId);
rejectedJobs++;
future.completeExceptionally(
new RuntimeException("Job queue full, please try again later"));
jobMap.remove(jobId);
return future;
}
log.debug(
"Job {} queued for execution (weight: {}, queue size: {})",
jobId,
resourceWeight,
jobQueue.size());
return future;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
future.completeExceptionally(new RuntimeException("Job queue interrupted"));
jobMap.remove(jobId);
return future;
}
}
/**
* Gets the current capacity of the job queue.
*
* @return The current capacity
*/
public int getQueueCapacity() {
return ((LinkedBlockingQueue<QueuedJob>) jobQueue).remainingCapacity() + jobQueue.size();
}
/** Updates the capacity of the job queue based on available system resources. */
private void updateQueueCapacity() {
try {
// Calculate new capacity
int newCapacity =
resourceMonitor.calculateDynamicQueueCapacity(
baseQueueCapacity, minQueueCapacity);
int currentCapacity = getQueueCapacity();
if (newCapacity != currentCapacity) {
log.debug(
"Updating job queue capacity from {} to {}", currentCapacity, newCapacity);
// Create new queue with updated capacity
BlockingQueue<QueuedJob> newQueue = new LinkedBlockingQueue<>(newCapacity);
// Transfer jobs from old queue to new queue
jobQueue.drainTo(newQueue);
jobQueue = newQueue;
currentQueueSize = jobQueue.size();
}
} catch (Exception e) {
log.error("Error updating queue capacity: {}", e.getMessage(), e);
}
}
/** Processes jobs in the queue, executing them when resources permit. */
private void processQueue() {
if (shuttingDown || jobQueue.isEmpty()) {
return;
}
try {
// Get current resource status
ResourceMonitor.ResourceStatus status = resourceMonitor.getCurrentStatus().get();
// Check if we should execute any jobs
boolean canExecuteJobs = (status != ResourceMonitor.ResourceStatus.CRITICAL);
if (!canExecuteJobs) {
// Under critical load, don't execute any jobs
log.debug("System under critical load, delaying job execution");
return;
}
// Get jobs from the queue, up to a limit based on resource availability
int jobsToProcess =
Math.max(
1,
switch (status) {
case OK -> 3;
case WARNING -> 1;
case CRITICAL -> 0;
});
for (int i = 0; i < jobsToProcess && !jobQueue.isEmpty(); i++) {
QueuedJob job = jobQueue.poll();
if (job == null) break;
// Check if it's been waiting too long
long waitTimeMs = Instant.now().toEpochMilli() - job.queuedAt.toEpochMilli();
if (waitTimeMs > maxWaitTimeMs) {
log.warn(
"Job {} exceeded maximum wait time ({} ms), executing anyway",
job.jobId,
waitTimeMs);
}
// Remove from our map
jobMap.remove(job.jobId);
currentQueueSize = jobQueue.size();
// Execute the job
executeJob(job);
}
} catch (Exception e) {
log.error("Error processing job queue: {}", e.getMessage(), e);
}
}
/**
* Executes a job from the queue.
*
* @param job The job to execute
*/
private void executeJob(QueuedJob job) {
if (job.cancelled) {
log.debug("Job {} was cancelled, not executing", job.jobId);
return;
}
jobExecutor.execute(
() -> {
log.debug("Executing queued job {} (queued at {})", job.jobId, job.queuedAt);
try {
// Execute with timeout
Object result = executeWithTimeout(job.work, job.timeoutMs);
// Process the result
if (result instanceof ResponseEntity) {
job.future.complete((ResponseEntity<?>) result);
} else {
job.future.complete(ResponseEntity.ok(result));
}
} catch (Exception e) {
log.error(
"Error executing queued job {}: {}", job.jobId, e.getMessage(), e);
job.future.completeExceptionally(e);
}
});
}
/**
* Execute a supplier with a timeout.
*
* @param supplier The supplier to execute
* @param timeoutMs The timeout in milliseconds
* @return The result from the supplier
* @throws Exception If there is an execution error
*/
private <T> T executeWithTimeout(Supplier<T> supplier, long timeoutMs) throws Exception {
CompletableFuture<T> future = CompletableFuture.supplyAsync(supplier);
try {
if (timeoutMs <= 0) {
// No timeout
return future.join();
} else {
// With timeout
return future.get(timeoutMs, TimeUnit.MILLISECONDS);
}
} catch (TimeoutException e) {
future.cancel(true);
throw new TimeoutException("Job timed out after " + timeoutMs + "ms");
} catch (ExecutionException e) {
throw (Exception) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new InterruptedException("Job was interrupted");
}
}
/**
* Checks if a job is queued.
*
* @param jobId The job ID
* @return true if the job is queued
*/
public boolean isJobQueued(String jobId) {
return jobMap.containsKey(jobId);
}
/**
* Gets the current position of a job in the queue.
*
* @param jobId The job ID
* @return The position (0-based) or -1 if not found
*/
public int getJobPosition(String jobId) {
if (!jobMap.containsKey(jobId)) {
return -1;
}
// Count positions
int position = 0;
for (QueuedJob job : jobQueue) {
if (job.jobId.equals(jobId)) {
return position;
}
position++;
}
// If we didn't find it in the queue but it's in the map,
// it might be executing already
return -1;
}
/**
* Cancels a queued job.
*
* @param jobId The job ID
* @return true if the job was cancelled, false if not found
*/
public boolean cancelJob(String jobId) {
QueuedJob job = jobMap.remove(jobId);
if (job != null) {
job.cancelled = true;
job.future.completeExceptionally(new RuntimeException("Job cancelled by user"));
// Try to remove from queue if it's still there
jobQueue.remove(job);
currentQueueSize = jobQueue.size();
log.debug("Job {} cancelled", jobId);
return true;
}
return false;
}
/**
* Get queue statistics.
*
* @return A map containing queue statistics
*/
public Map<String, Object> getQueueStats() {
return Map.of(
"queuedJobs", jobQueue.size(),
"queueCapacity", getQueueCapacity(),
"totalQueuedJobs", totalQueuedJobs,
"rejectedJobs", rejectedJobs,
"resourceStatus", resourceMonitor.getCurrentStatus().get().name());
}
}

View File

@ -0,0 +1,277 @@
package stirling.software.common.service;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean;
import java.lang.management.OperatingSystemMXBean;
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
/**
* Monitors system resources (CPU, memory) to inform job scheduling decisions. Provides information
* about available resources to prevent overloading the system.
*/
@Service
@Slf4j
public class ResourceMonitor {
@Value("${stirling.resource.memory.critical-threshold:0.9}")
private double memoryCriticalThreshold = 0.9; // 90% usage is critical
@Value("${stirling.resource.memory.high-threshold:0.75}")
private double memoryHighThreshold = 0.75; // 75% usage is high
@Value("${stirling.resource.cpu.critical-threshold:0.9}")
private double cpuCriticalThreshold = 0.9; // 90% usage is critical
@Value("${stirling.resource.cpu.high-threshold:0.75}")
private double cpuHighThreshold = 0.75; // 75% usage is high
@Value("${stirling.resource.monitor.interval-ms:60000}")
private long monitorIntervalMs = 60000; // 60 seconds
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
private final MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
private final OperatingSystemMXBean osMXBean = ManagementFactory.getOperatingSystemMXBean();
@Getter
private final AtomicReference<ResourceStatus> currentStatus =
new AtomicReference<>(ResourceStatus.OK);
@Getter
private final AtomicReference<ResourceMetrics> latestMetrics =
new AtomicReference<>(new ResourceMetrics());
/** Represents the current status of system resources. */
public enum ResourceStatus {
/** Resources are available, normal operations can proceed */
OK,
/** Resources are under strain, consider queueing high-resource operations */
WARNING,
/** Resources are critically low, queue all operations */
CRITICAL
}
/** Detailed metrics about system resources. */
@Getter
public static class ResourceMetrics {
private final double cpuUsage;
private final double memoryUsage;
private final long freeMemoryBytes;
private final long totalMemoryBytes;
private final long maxMemoryBytes;
private final Instant timestamp;
public ResourceMetrics() {
this(0, 0, 0, 0, 0, Instant.now());
}
public ResourceMetrics(
double cpuUsage,
double memoryUsage,
long freeMemoryBytes,
long totalMemoryBytes,
long maxMemoryBytes,
Instant timestamp) {
this.cpuUsage = cpuUsage;
this.memoryUsage = memoryUsage;
this.freeMemoryBytes = freeMemoryBytes;
this.totalMemoryBytes = totalMemoryBytes;
this.maxMemoryBytes = maxMemoryBytes;
this.timestamp = timestamp;
}
/**
* Gets the age of these metrics.
*
* @return Duration since these metrics were collected
*/
public Duration getAge() {
return Duration.between(timestamp, Instant.now());
}
/**
* Check if these metrics are stale (older than threshold).
*
* @param thresholdMs Staleness threshold in milliseconds
* @return true if metrics are stale
*/
public boolean isStale(long thresholdMs) {
return getAge().toMillis() > thresholdMs;
}
}
@PostConstruct
public void initialize() {
log.debug("Starting resource monitoring with interval of {}ms", monitorIntervalMs);
scheduler.scheduleAtFixedRate(
this::updateResourceMetrics, 0, monitorIntervalMs, TimeUnit.MILLISECONDS);
}
@PreDestroy
public void shutdown() {
log.info("Shutting down resource monitoring");
scheduler.shutdownNow();
}
/** Updates the resource metrics by sampling current system state. */
private void updateResourceMetrics() {
try {
// Get CPU usage
double cpuUsage = osMXBean.getSystemLoadAverage() / osMXBean.getAvailableProcessors();
if (cpuUsage < 0) cpuUsage = getAlternativeCpuLoad(); // Fallback if not available
// Get memory usage
long heapUsed = memoryMXBean.getHeapMemoryUsage().getUsed();
long nonHeapUsed = memoryMXBean.getNonHeapMemoryUsage().getUsed();
long totalUsed = heapUsed + nonHeapUsed;
long maxMemory = Runtime.getRuntime().maxMemory();
long totalMemory = Runtime.getRuntime().totalMemory();
long freeMemory = Runtime.getRuntime().freeMemory();
double memoryUsage = (double) totalUsed / maxMemory;
// Create new metrics
ResourceMetrics metrics =
new ResourceMetrics(
cpuUsage,
memoryUsage,
freeMemory,
totalMemory,
maxMemory,
Instant.now());
latestMetrics.set(metrics);
// Determine system status
ResourceStatus newStatus;
if (cpuUsage > cpuCriticalThreshold || memoryUsage > memoryCriticalThreshold) {
newStatus = ResourceStatus.CRITICAL;
} else if (cpuUsage > cpuHighThreshold || memoryUsage > memoryHighThreshold) {
newStatus = ResourceStatus.WARNING;
} else {
newStatus = ResourceStatus.OK;
}
// Update status if it changed
ResourceStatus oldStatus = currentStatus.getAndSet(newStatus);
if (oldStatus != newStatus) {
log.info("System resource status changed from {} to {}", oldStatus, newStatus);
log.info(
"Current metrics - CPU: {:.1f}%, Memory: {:.1f}%, Free Memory: {} MB",
cpuUsage * 100, memoryUsage * 100, freeMemory / (1024 * 1024));
}
} catch (Exception e) {
log.error("Error updating resource metrics: {}", e.getMessage(), e);
}
}
/**
* Alternative method to estimate CPU load if getSystemLoadAverage() is not available. This is a
* fallback and less accurate than the official JMX method.
*
* @return Estimated CPU load as a value between 0.0 and 1.0
*/
private double getAlternativeCpuLoad() {
try {
// Try to get CPU time if available through reflection
// This is a fallback since we can't directly cast to platform-specific classes
try {
java.lang.reflect.Method m =
osMXBean.getClass().getDeclaredMethod("getProcessCpuLoad");
m.setAccessible(true);
return (double) m.invoke(osMXBean);
} catch (Exception e) {
// Try the older method
try {
java.lang.reflect.Method m =
osMXBean.getClass().getDeclaredMethod("getSystemCpuLoad");
m.setAccessible(true);
return (double) m.invoke(osMXBean);
} catch (Exception e2) {
log.trace(
"Could not get CPU load through reflection, assuming moderate load (0.5)");
return 0.5;
}
}
} catch (Exception e) {
log.trace("Could not get CPU load, assuming moderate load (0.5)");
return 0.5; // Default to moderate load
}
}
/**
* Calculates the dynamic job queue capacity based on current resource usage.
*
* @param baseCapacity The base capacity when system is under minimal load
* @param minCapacity The minimum capacity to maintain even under high load
* @return The calculated job queue capacity
*/
public int calculateDynamicQueueCapacity(int baseCapacity, int minCapacity) {
ResourceMetrics metrics = latestMetrics.get();
ResourceStatus status = currentStatus.get();
// Simple linear reduction based on memory and CPU load
double capacityFactor =
switch (status) {
case OK -> 1.0;
case WARNING -> 0.6;
case CRITICAL -> 0.3;
};
// Apply additional reduction based on specific memory pressure
if (metrics.memoryUsage > 0.8) {
capacityFactor *= 0.5; // Further reduce capacity under memory pressure
}
// Calculate capacity with minimum safeguard
int capacity = (int) Math.max(minCapacity, Math.ceil(baseCapacity * capacityFactor));
log.debug(
"Dynamic queue capacity: {} (base: {}, factor: {:.2f}, status: {})",
capacity,
baseCapacity,
capacityFactor,
status);
return capacity;
}
/**
* Checks if a job with the given weight can be executed immediately or should be queued based
* on current resource availability.
*
* @param resourceWeight The resource weight of the job (1-100)
* @return true if the job should be queued, false if it can run immediately
*/
public boolean shouldQueueJob(int resourceWeight) {
ResourceStatus status = currentStatus.get();
// Always run lightweight jobs (weight < 20) unless critical
if (resourceWeight < 20 && status != ResourceStatus.CRITICAL) {
return false;
}
// Medium weight jobs run immediately if resources are OK
if (resourceWeight < 60 && status == ResourceStatus.OK) {
return false;
}
// Heavy jobs (weight >= 60) and any job during WARNING/CRITICAL should be queued
return true;
}
}

View File

@ -0,0 +1,274 @@
package stirling.software.common.service;
import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import jakarta.annotation.PreDestroy;
import lombok.extern.slf4j.Slf4j;
import stirling.software.common.model.job.JobResult;
import stirling.software.common.model.job.JobStats;
/** Manages async tasks and their results */
@Service
@Slf4j
public class TaskManager {
private final Map<String, JobResult> jobResults = new ConcurrentHashMap<>();
@Value("${stirling.jobResultExpiryMinutes:30}")
private int jobResultExpiryMinutes = 30;
private final FileStorage fileStorage;
private final ScheduledExecutorService cleanupExecutor =
Executors.newSingleThreadScheduledExecutor();
/** Initialize the task manager and start the cleanup scheduler */
public TaskManager(FileStorage fileStorage) {
this.fileStorage = fileStorage;
// Schedule periodic cleanup of old job results
cleanupExecutor.scheduleAtFixedRate(
this::cleanupOldJobs,
10, // Initial delay
10, // Interval
TimeUnit.MINUTES);
log.debug(
"Task manager initialized with job result expiry of {} minutes",
jobResultExpiryMinutes);
}
/**
* Create a new task with the given job ID
*
* @param jobId The job ID
*/
public void createTask(String jobId) {
jobResults.put(jobId, JobResult.createNew(jobId));
log.debug("Created task with job ID: {}", jobId);
}
/**
* Set the result of a task as a general object
*
* @param jobId The job ID
* @param result The result object
*/
public void setResult(String jobId, Object result) {
JobResult jobResult = getOrCreateJobResult(jobId);
jobResult.completeWithResult(result);
log.debug("Set result for job ID: {}", jobId);
}
/**
* Set the result of a task as a file
*
* @param jobId The job ID
* @param fileId The file ID
* @param originalFileName The original file name
* @param contentType The content type of the file
*/
public void setFileResult(
String jobId, String fileId, String originalFileName, String contentType) {
JobResult jobResult = getOrCreateJobResult(jobId);
jobResult.completeWithFile(fileId, originalFileName, contentType);
log.debug("Set file result for job ID: {} with file ID: {}", jobId, fileId);
}
/**
* Set an error for a task
*
* @param jobId The job ID
* @param error The error message
*/
public void setError(String jobId, String error) {
JobResult jobResult = getOrCreateJobResult(jobId);
jobResult.failWithError(error);
log.debug("Set error for job ID: {}: {}", jobId, error);
}
/**
* Mark a task as complete
*
* @param jobId The job ID
*/
public void setComplete(String jobId) {
JobResult jobResult = getOrCreateJobResult(jobId);
if (jobResult.getResult() == null
&& jobResult.getFileId() == null
&& jobResult.getError() == null) {
// If no result or error has been set, mark it as complete with an empty result
jobResult.completeWithResult("Task completed successfully");
}
log.debug("Marked job ID: {} as complete", jobId);
}
/**
* Check if a task is complete
*
* @param jobId The job ID
* @return true if the task is complete, false otherwise
*/
public boolean isComplete(String jobId) {
JobResult result = jobResults.get(jobId);
return result != null && result.isComplete();
}
/**
* Get the result of a task
*
* @param jobId The job ID
* @return The result object, or null if the task doesn't exist or is not complete
*/
public JobResult getJobResult(String jobId) {
return jobResults.get(jobId);
}
/**
* Get statistics about all jobs in the system
*
* @return Job statistics
*/
public JobStats getJobStats() {
int totalJobs = jobResults.size();
int activeJobs = 0;
int completedJobs = 0;
int failedJobs = 0;
int successfulJobs = 0;
int fileResultJobs = 0;
LocalDateTime oldestActiveJobTime = null;
LocalDateTime newestActiveJobTime = null;
long totalProcessingTimeMs = 0;
for (JobResult result : jobResults.values()) {
if (result.isComplete()) {
completedJobs++;
// Calculate processing time for completed jobs
if (result.getCreatedAt() != null && result.getCompletedAt() != null) {
long processingTimeMs =
java.time.Duration.between(
result.getCreatedAt(), result.getCompletedAt())
.toMillis();
totalProcessingTimeMs += processingTimeMs;
}
if (result.getError() != null) {
failedJobs++;
} else {
successfulJobs++;
if (result.getFileId() != null) {
fileResultJobs++;
}
}
} else {
activeJobs++;
// Track oldest and newest active jobs
if (result.getCreatedAt() != null) {
if (oldestActiveJobTime == null
|| result.getCreatedAt().isBefore(oldestActiveJobTime)) {
oldestActiveJobTime = result.getCreatedAt();
}
if (newestActiveJobTime == null
|| result.getCreatedAt().isAfter(newestActiveJobTime)) {
newestActiveJobTime = result.getCreatedAt();
}
}
}
}
// Calculate average processing time
long averageProcessingTimeMs =
completedJobs > 0 ? totalProcessingTimeMs / completedJobs : 0;
return JobStats.builder()
.totalJobs(totalJobs)
.activeJobs(activeJobs)
.completedJobs(completedJobs)
.failedJobs(failedJobs)
.successfulJobs(successfulJobs)
.fileResultJobs(fileResultJobs)
.oldestActiveJobTime(oldestActiveJobTime)
.newestActiveJobTime(newestActiveJobTime)
.averageProcessingTimeMs(averageProcessingTimeMs)
.build();
}
/**
* Get or create a job result
*
* @param jobId The job ID
* @return The job result
*/
private JobResult getOrCreateJobResult(String jobId) {
return jobResults.computeIfAbsent(jobId, JobResult::createNew);
}
/** Clean up old completed job results */
public void cleanupOldJobs() {
LocalDateTime expiryThreshold =
LocalDateTime.now().minus(jobResultExpiryMinutes, ChronoUnit.MINUTES);
int removedCount = 0;
try {
for (Map.Entry<String, JobResult> entry : jobResults.entrySet()) {
JobResult result = entry.getValue();
// Remove completed jobs that are older than the expiry threshold
if (result.isComplete()
&& result.getCompletedAt() != null
&& result.getCompletedAt().isBefore(expiryThreshold)) {
// If the job has a file result, delete the file
if (result.getFileId() != null) {
try {
fileStorage.deleteFile(result.getFileId());
} catch (Exception e) {
log.warn(
"Failed to delete file for job {}: {}",
entry.getKey(),
e.getMessage());
}
}
// Remove the job result
jobResults.remove(entry.getKey());
removedCount++;
}
}
if (removedCount > 0) {
log.info("Cleaned up {} expired job results", removedCount);
}
} catch (Exception e) {
log.error("Error during job cleanup: {}", e.getMessage(), e);
}
}
/** Shutdown the cleanup executor */
@PreDestroy
public void shutdown() {
try {
log.info("Shutting down job result cleanup executor");
cleanupExecutor.shutdown();
if (!cleanupExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
cleanupExecutor.shutdownNow();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
cleanupExecutor.shutdownNow();
}
}
}

View File

@ -0,0 +1,31 @@
package stirling.software.common.util;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class ExecutorFactory {
/**
* Creates an ExecutorService using virtual threads if available (Java 21+), or falls back to a
* cached thread pool on older Java versions.
*/
public static ExecutorService newVirtualOrCachedThreadExecutor() {
try {
ExecutorService executor =
(ExecutorService)
Executors.class
.getMethod("newVirtualThreadPerTaskExecutor")
.invoke(null);
return executor;
} catch (NoSuchMethodException e) {
log.debug("Virtual threads not available; falling back to cached thread pool.");
} catch (Exception e) {
log.debug("Error initializing virtual thread executor: {}", e.getMessage(), e);
}
return Executors.newCachedThreadPool();
}
}

View File

@ -0,0 +1,114 @@
package stirling.software.common.controller;
import java.util.Map;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import stirling.software.common.model.job.JobResult;
import stirling.software.common.model.job.JobStats;
import stirling.software.common.service.FileStorage;
import stirling.software.common.service.TaskManager;
/** REST controller for job-related endpoints */
@RestController
@RequiredArgsConstructor
@Slf4j
public class JobController {
private final TaskManager taskManager;
private final FileStorage fileStorage;
/**
* Get the status of a job
*
* @param jobId The job ID
* @return The job result
*/
@GetMapping("/api/v1/general/job/{jobId}")
public ResponseEntity<?> getJobStatus(@PathVariable("jobId") String jobId) {
JobResult result = taskManager.getJobResult(jobId);
if (result == null) {
return ResponseEntity.notFound().build();
}
return ResponseEntity.ok(result);
}
/**
* Get the result of a job
*
* @param jobId The job ID
* @return The job result
*/
@GetMapping("/api/v1/general/job/{jobId}/result")
public ResponseEntity<?> getJobResult(@PathVariable("jobId") String jobId) {
JobResult result = taskManager.getJobResult(jobId);
if (result == null) {
return ResponseEntity.notFound().build();
}
if (!result.isComplete()) {
return ResponseEntity.badRequest().body("Job is not complete yet");
}
if (result.getError() != null) {
return ResponseEntity.badRequest().body("Job failed: " + result.getError());
}
if (result.getFileId() != null) {
try {
byte[] fileContent = fileStorage.retrieveBytes(result.getFileId());
return ResponseEntity.ok()
.header("Content-Type", result.getContentType())
.header(
"Content-Disposition",
"form-data; name=\"attachment\"; filename=\""
+ result.getOriginalFileName()
+ "\"")
.body(fileContent);
} catch (Exception e) {
log.error("Error retrieving file for job {}: {}", jobId, e.getMessage(), e);
return ResponseEntity.internalServerError()
.body("Error retrieving file: " + e.getMessage());
}
}
return ResponseEntity.ok(result.getResult());
}
/**
* Get statistics about jobs in the system
*
* @return Job statistics
*/
@GetMapping("/api/v1/general/job/stats")
public ResponseEntity<JobStats> getJobStats() {
JobStats stats = taskManager.getJobStats();
return ResponseEntity.ok(stats);
}
/**
* Manually trigger cleanup of old jobs
*
* @return A response indicating how many jobs were cleaned up
*/
@PostMapping("/api/v1/general/job/cleanup")
public ResponseEntity<?> cleanupOldJobs() {
int beforeCount = taskManager.getJobStats().getTotalJobs();
taskManager.cleanupOldJobs();
int afterCount = taskManager.getJobStats().getTotalJobs();
int removedCount = beforeCount - afterCount;
return ResponseEntity.ok(
Map.of(
"message", "Cleanup complete",
"removedJobs", removedCount,
"remainingJobs", afterCount));
}
}