mirror of
https://github.com/Stirling-Tools/Stirling-PDF.git
synced 2025-06-22 07:25:04 +00:00
fixes and adjustment
This commit is contained in:
parent
45e18b2611
commit
96aa5c024d
@ -1,9 +1,13 @@
|
||||
package stirling.software.common.aop;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.time.Duration;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import org.springframework.beans.BeanUtils;
|
||||
|
||||
import org.aspectj.lang.ProceedingJoinPoint;
|
||||
import org.aspectj.lang.annotation.*;
|
||||
import org.springframework.stereotype.Component;
|
||||
@ -26,6 +30,8 @@ import stirling.software.common.service.JobExecutorService;
|
||||
@Slf4j
|
||||
public class AutoJobAspect {
|
||||
|
||||
private static final Duration RETRY_BASE_DELAY = Duration.ofMillis(100);
|
||||
|
||||
private final JobExecutorService jobExecutorService;
|
||||
private final HttpServletRequest request;
|
||||
private final FileOrUploadService fileOrUploadService;
|
||||
@ -47,50 +53,12 @@ public class AutoJobAspect {
|
||||
retryCount,
|
||||
trackProgress);
|
||||
|
||||
// Inspect and possibly mutate arguments
|
||||
Object[] args = joinPoint.getArgs();
|
||||
// Copy and process arguments to avoid mutating the original objects
|
||||
Object[] args = copyAndProcessArgs(joinPoint.getArgs(), async);
|
||||
|
||||
for (int i = 0; i < args.length; i++) {
|
||||
Object arg = args[i];
|
||||
|
||||
if (arg instanceof PDFFile pdfFile) {
|
||||
// Case 1: fileId is provided but no fileInput
|
||||
if (pdfFile.getFileInput() == null && pdfFile.getFileId() != null) {
|
||||
try {
|
||||
log.debug("Using fileId {} to get file content", pdfFile.getFileId());
|
||||
MultipartFile file = fileStorage.retrieveFile(pdfFile.getFileId());
|
||||
pdfFile.setFileInput(file);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(
|
||||
"Failed to resolve file by ID: " + pdfFile.getFileId(), e);
|
||||
}
|
||||
}
|
||||
// Case 2: For async requests, we need to make a copy of the MultipartFile
|
||||
else if (async && pdfFile.getFileInput() != null) {
|
||||
try {
|
||||
log.debug("Making persistent copy of uploaded file for async processing");
|
||||
MultipartFile originalFile = pdfFile.getFileInput();
|
||||
String fileId = fileStorage.storeFile(originalFile);
|
||||
|
||||
// Store the fileId for later reference
|
||||
pdfFile.setFileId(fileId);
|
||||
|
||||
// Replace the original MultipartFile with our persistent copy
|
||||
MultipartFile persistentFile = fileStorage.retrieveFile(fileId);
|
||||
pdfFile.setFileInput(persistentFile);
|
||||
|
||||
log.debug("Created persistent file copy with fileId: {}", fileId);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(
|
||||
"Failed to create persistent copy of uploaded file", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Extract queueable and resourceWeight parameters
|
||||
// Extract queueable and resourceWeight parameters and validate
|
||||
boolean queueable = autoJobPostMapping.queueable();
|
||||
int resourceWeight = autoJobPostMapping.resourceWeight();
|
||||
int resourceWeight = Math.max(1, Math.min(100, autoJobPostMapping.resourceWeight()));
|
||||
|
||||
// Integrate with the JobExecutorService
|
||||
if (retryCount <= 1) {
|
||||
@ -138,87 +106,178 @@ public class AutoJobAspect {
|
||||
boolean queueable,
|
||||
int resourceWeight) {
|
||||
|
||||
AtomicInteger attempts = new AtomicInteger(0);
|
||||
// Keep jobId reference for progress tracking in TaskManager
|
||||
AtomicReference<String> jobIdRef = new AtomicReference<>();
|
||||
|
||||
return jobExecutorService.runJobGeneric(
|
||||
async,
|
||||
() -> {
|
||||
int currentAttempt = attempts.incrementAndGet();
|
||||
try {
|
||||
if (trackProgress && async) {
|
||||
// Get jobId for progress tracking in TaskManager
|
||||
// This enables REST API progress queries, not WebSocket
|
||||
if (jobIdRef.get() == null) {
|
||||
jobIdRef.set(getJobIdFromContext());
|
||||
}
|
||||
String jobId = jobIdRef.get();
|
||||
if (jobId != null) {
|
||||
log.debug(
|
||||
"Tracking progress for job {} (attempt {}/{})",
|
||||
jobId,
|
||||
currentAttempt,
|
||||
maxRetries);
|
||||
// Progress is tracked in TaskManager for REST API access
|
||||
// No WebSocket notifications sent here
|
||||
}
|
||||
}
|
||||
|
||||
return joinPoint.proceed(args);
|
||||
} catch (Throwable ex) {
|
||||
log.error(
|
||||
"AutoJobAspect caught exception during job execution (attempt {}/{}): {}",
|
||||
currentAttempt,
|
||||
Math.max(1, maxRetries),
|
||||
ex.getMessage(),
|
||||
ex);
|
||||
|
||||
// Check if we should retry
|
||||
if (currentAttempt < maxRetries) {
|
||||
log.info(
|
||||
"Retrying operation, attempt {}/{}",
|
||||
currentAttempt + 1,
|
||||
maxRetries);
|
||||
// Use iterative approach instead of recursion to avoid stack overflow
|
||||
Throwable lastException = null;
|
||||
|
||||
// Attempt counter starts at 1 for first try
|
||||
for (int currentAttempt = 1; currentAttempt <= maxRetries; currentAttempt++) {
|
||||
try {
|
||||
if (trackProgress && async) {
|
||||
// Get jobId for progress tracking in TaskManager
|
||||
// This enables REST API progress queries, not WebSocket
|
||||
if (jobIdRef.get() == null) {
|
||||
jobIdRef.set(getJobIdFromContext());
|
||||
}
|
||||
String jobId = jobIdRef.get();
|
||||
if (jobId != null) {
|
||||
log.debug(
|
||||
"Recording retry attempt for job {} in TaskManager",
|
||||
jobId);
|
||||
// Retry info is tracked in TaskManager for REST API access
|
||||
"Tracking progress for job {} (attempt {}/{})",
|
||||
jobId,
|
||||
currentAttempt,
|
||||
maxRetries);
|
||||
// Progress is tracked in TaskManager for REST API access
|
||||
// No WebSocket notifications sent here
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
// Simple exponential backoff
|
||||
Thread.sleep(100 * currentAttempt);
|
||||
} catch (InterruptedException ie) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
// Attempt to execute the operation
|
||||
return joinPoint.proceed(args);
|
||||
|
||||
// Recursive call to retry
|
||||
return executeWithRetries(
|
||||
joinPoint,
|
||||
args,
|
||||
async,
|
||||
timeout,
|
||||
} catch (Throwable ex) {
|
||||
lastException = ex;
|
||||
log.error(
|
||||
"AutoJobAspect caught exception during job execution (attempt {}/{}): {}",
|
||||
currentAttempt,
|
||||
maxRetries,
|
||||
trackProgress,
|
||||
queueable,
|
||||
resourceWeight);
|
||||
}
|
||||
ex.getMessage(),
|
||||
ex);
|
||||
|
||||
// No more retries, throw the exception
|
||||
throw new RuntimeException("Job failed: " + ex.getMessage(), ex);
|
||||
// Check if we should retry
|
||||
if (currentAttempt < maxRetries) {
|
||||
log.info(
|
||||
"Retrying operation, attempt {}/{}",
|
||||
currentAttempt + 1,
|
||||
maxRetries);
|
||||
|
||||
if (trackProgress && async) {
|
||||
String jobId = jobIdRef.get();
|
||||
if (jobId != null) {
|
||||
log.debug(
|
||||
"Recording retry attempt for job {} in TaskManager",
|
||||
jobId);
|
||||
// Retry info is tracked in TaskManager for REST API access
|
||||
}
|
||||
}
|
||||
|
||||
// Use non-blocking delay for all retry attempts to avoid blocking threads
|
||||
// For sync jobs this avoids starving the tomcat thread pool under load
|
||||
long delayMs = RETRY_BASE_DELAY.toMillis() * currentAttempt;
|
||||
|
||||
// Execute the retry after a delay through the JobExecutorService
|
||||
// rather than blocking the current thread with sleep
|
||||
CompletableFuture<Object> delayedRetry = new CompletableFuture<>();
|
||||
|
||||
// Use a delayed executor for non-blocking delay
|
||||
CompletableFuture.delayedExecutor(delayMs, TimeUnit.MILLISECONDS)
|
||||
.execute(() -> {
|
||||
// Continue the retry loop in the next iteration
|
||||
// We can't return from here directly since we're in a Runnable
|
||||
delayedRetry.complete(null);
|
||||
});
|
||||
|
||||
// Wait for the delay to complete before continuing
|
||||
try {
|
||||
delayedRetry.join();
|
||||
} catch (Exception e) {
|
||||
Thread.currentThread().interrupt();
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
// No more retries, we'll throw the exception after the loop
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// If we get here, all retries failed
|
||||
if (lastException != null) {
|
||||
throw new RuntimeException("Job failed after " + maxRetries + " attempts: "
|
||||
+ lastException.getMessage(), lastException);
|
||||
}
|
||||
|
||||
// This should never happen if lastException is properly tracked
|
||||
throw new RuntimeException("Job failed but no exception was recorded");
|
||||
},
|
||||
timeout,
|
||||
queueable,
|
||||
resourceWeight);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates deep copies of arguments when needed to avoid mutating the original objects
|
||||
* Particularly important for PDFFile objects that might be reused by Spring
|
||||
*
|
||||
* @param originalArgs The original arguments
|
||||
* @param async Whether this is an async operation
|
||||
* @return A new array with safely processed arguments
|
||||
*/
|
||||
private Object[] copyAndProcessArgs(Object[] originalArgs, boolean async) {
|
||||
if (originalArgs == null || originalArgs.length == 0) {
|
||||
return originalArgs;
|
||||
}
|
||||
|
||||
Object[] processedArgs = new Object[originalArgs.length];
|
||||
|
||||
// Copy all arguments
|
||||
for (int i = 0; i < originalArgs.length; i++) {
|
||||
Object arg = originalArgs[i];
|
||||
|
||||
if (arg instanceof PDFFile pdfFile) {
|
||||
// Create a copy of PDFFile to avoid mutating the original
|
||||
PDFFile pdfFileCopy = new PDFFile();
|
||||
|
||||
// Use Spring's BeanUtils to copy all properties, avoiding missed fields if PDFFile grows
|
||||
BeanUtils.copyProperties(pdfFile, pdfFileCopy);
|
||||
|
||||
// Case 1: fileId is provided but no fileInput
|
||||
if (pdfFileCopy.getFileInput() == null && pdfFileCopy.getFileId() != null) {
|
||||
try {
|
||||
log.debug("Using fileId {} to get file content", pdfFileCopy.getFileId());
|
||||
MultipartFile file = fileStorage.retrieveFile(pdfFileCopy.getFileId());
|
||||
pdfFileCopy.setFileInput(file);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(
|
||||
"Failed to resolve file by ID: " + pdfFileCopy.getFileId(), e);
|
||||
}
|
||||
}
|
||||
// Case 2: For async requests, we need to make a copy of the MultipartFile
|
||||
else if (async && pdfFileCopy.getFileInput() != null) {
|
||||
try {
|
||||
log.debug("Making persistent copy of uploaded file for async processing");
|
||||
MultipartFile originalFile = pdfFileCopy.getFileInput();
|
||||
String fileId = fileStorage.storeFile(originalFile);
|
||||
|
||||
// Store the fileId for later reference
|
||||
pdfFileCopy.setFileId(fileId);
|
||||
|
||||
// Replace the original MultipartFile with our persistent copy
|
||||
MultipartFile persistentFile = fileStorage.retrieveFile(fileId);
|
||||
pdfFileCopy.setFileInput(persistentFile);
|
||||
|
||||
log.debug("Created persistent file copy with fileId: {}", fileId);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(
|
||||
"Failed to create persistent copy of uploaded file", e);
|
||||
}
|
||||
}
|
||||
|
||||
processedArgs[i] = pdfFileCopy;
|
||||
} else {
|
||||
// For non-PDFFile objects, just pass the original reference
|
||||
// If other classes need copy-on-write, add them here
|
||||
processedArgs[i] = arg;
|
||||
}
|
||||
}
|
||||
|
||||
return processedArgs;
|
||||
}
|
||||
|
||||
// Get the job ID from the context for progress tracking in TaskManager
|
||||
private String getJobIdFromContext() {
|
||||
try {
|
||||
|
@ -1,6 +1,9 @@
|
||||
package stirling.software.common.model.job;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Collections;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
@ -41,6 +44,12 @@ public class JobResult {
|
||||
/** The actual result object, if not a file */
|
||||
private Object result;
|
||||
|
||||
/**
|
||||
* Notes attached to this job for tracking purposes.
|
||||
* Uses CopyOnWriteArrayList for thread safety when notes are added concurrently.
|
||||
*/
|
||||
private final List<String> notes = new CopyOnWriteArrayList<>();
|
||||
|
||||
/**
|
||||
* Create a new JobResult with the given job ID
|
||||
*
|
||||
@ -91,4 +100,22 @@ public class JobResult {
|
||||
this.error = error;
|
||||
this.completedAt = LocalDateTime.now();
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a note to this job
|
||||
*
|
||||
* @param note The note to add
|
||||
*/
|
||||
public void addNote(String note) {
|
||||
this.notes.add(note);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get all notes attached to this job
|
||||
*
|
||||
* @return An unmodifiable view of the notes list
|
||||
*/
|
||||
public List<String> getNotes() {
|
||||
return Collections.unmodifiableList(notes);
|
||||
}
|
||||
}
|
||||
|
@ -209,10 +209,13 @@ public class JobExecutorService {
|
||||
private void processJobResult(String jobId, Object result) {
|
||||
try {
|
||||
if (result instanceof byte[]) {
|
||||
// Store byte array as a file
|
||||
// Store byte array directly to disk to avoid double memory consumption
|
||||
String fileId = fileStorage.storeBytes((byte[]) result, "result.pdf");
|
||||
taskManager.setFileResult(jobId, fileId, "result.pdf", "application/pdf");
|
||||
log.debug("Stored byte[] result with fileId: {}", fileId);
|
||||
|
||||
// Let the byte array get collected naturally in the next GC cycle
|
||||
// We don't need to force System.gc() which can be harmful
|
||||
} else if (result instanceof ResponseEntity) {
|
||||
ResponseEntity<?> response = (ResponseEntity<?>) result;
|
||||
Object body = response.getBody();
|
||||
@ -237,9 +240,12 @@ public class JobExecutorService {
|
||||
contentType = response.getHeaders().getContentType().toString();
|
||||
}
|
||||
|
||||
// Store byte array directly to disk
|
||||
String fileId = fileStorage.storeBytes((byte[]) body, filename);
|
||||
taskManager.setFileResult(jobId, fileId, filename, contentType);
|
||||
log.debug("Stored ResponseEntity<byte[]> result with fileId: {}", fileId);
|
||||
|
||||
// Let the GC handle the memory naturally
|
||||
} else {
|
||||
// Check if the response body contains a fileId
|
||||
if (body != null && body.toString().contains("fileId")) {
|
||||
@ -432,8 +438,10 @@ public class JobExecutorService {
|
||||
*/
|
||||
private <T> T executeWithTimeout(Supplier<T> supplier, long timeoutMs)
|
||||
throws TimeoutException, Exception {
|
||||
// Use the same executor as other async jobs for consistency
|
||||
// This ensures all operations run on the same thread pool
|
||||
java.util.concurrent.CompletableFuture<T> future =
|
||||
java.util.concurrent.CompletableFuture.supplyAsync(supplier);
|
||||
java.util.concurrent.CompletableFuture.supplyAsync(supplier, executor);
|
||||
|
||||
try {
|
||||
return future.get(timeoutMs, TimeUnit.MILLISECONDS);
|
||||
|
@ -6,9 +6,13 @@ import java.util.concurrent.*;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.context.SmartLifecycle;
|
||||
import org.springframework.http.ResponseEntity;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import stirling.software.common.service.TaskManager;
|
||||
import stirling.software.common.util.SpringContextHolder;
|
||||
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import jakarta.annotation.PreDestroy;
|
||||
|
||||
@ -25,7 +29,9 @@ import stirling.software.common.util.ExecutorFactory;
|
||||
*/
|
||||
@Service
|
||||
@Slf4j
|
||||
public class JobQueue {
|
||||
public class JobQueue implements SmartLifecycle {
|
||||
|
||||
private volatile boolean running = false;
|
||||
|
||||
private final ResourceMonitor resourceMonitor;
|
||||
|
||||
@ -41,10 +47,11 @@ public class JobQueue {
|
||||
@Value("${stirling.job.queue.max-wait-time-ms:600000}")
|
||||
private long maxWaitTimeMs = 600000; // 10 minutes
|
||||
|
||||
private BlockingQueue<QueuedJob> jobQueue;
|
||||
private volatile BlockingQueue<QueuedJob> jobQueue;
|
||||
private final Map<String, QueuedJob> jobMap = new ConcurrentHashMap<>();
|
||||
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
|
||||
private final ExecutorService jobExecutor = ExecutorFactory.newVirtualOrCachedThreadExecutor();
|
||||
private final Object queueLock = new Object(); // Lock for synchronizing queue operations
|
||||
|
||||
private boolean shuttingDown = false;
|
||||
|
||||
@ -76,8 +83,8 @@ public class JobQueue {
|
||||
this.jobQueue = new LinkedBlockingQueue<>(capacity);
|
||||
}
|
||||
|
||||
@PostConstruct
|
||||
public void initialize() {
|
||||
// Remove @PostConstruct to let SmartLifecycle control startup
|
||||
private void initializeSchedulers() {
|
||||
log.debug(
|
||||
"Starting job queue with base capacity {}, min capacity {}",
|
||||
baseQueueCapacity,
|
||||
@ -95,8 +102,8 @@ public class JobQueue {
|
||||
TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
@PreDestroy
|
||||
public void shutdown() {
|
||||
// Remove @PreDestroy to let SmartLifecycle control shutdown
|
||||
private void shutdownSchedulers() {
|
||||
log.info("Shutting down job queue");
|
||||
shuttingDown = true;
|
||||
|
||||
@ -109,8 +116,22 @@ public class JobQueue {
|
||||
}
|
||||
});
|
||||
|
||||
scheduler.shutdownNow();
|
||||
jobExecutor.shutdownNow();
|
||||
// Shutdown schedulers and wait for termination
|
||||
try {
|
||||
scheduler.shutdown();
|
||||
if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) {
|
||||
scheduler.shutdownNow();
|
||||
}
|
||||
|
||||
jobExecutor.shutdown();
|
||||
if (!jobExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
|
||||
jobExecutor.shutdownNow();
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
scheduler.shutdownNow();
|
||||
jobExecutor.shutdownNow();
|
||||
}
|
||||
|
||||
log.info(
|
||||
"Job queue shutdown complete. Stats: total={}, rejected={}",
|
||||
@ -118,6 +139,40 @@ public class JobQueue {
|
||||
rejectedJobs);
|
||||
}
|
||||
|
||||
// SmartLifecycle methods
|
||||
|
||||
@Override
|
||||
public void start() {
|
||||
log.info("Starting JobQueue lifecycle");
|
||||
if (!running) {
|
||||
initializeSchedulers();
|
||||
running = true;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() {
|
||||
log.info("Stopping JobQueue lifecycle");
|
||||
shutdownSchedulers();
|
||||
running = false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isRunning() {
|
||||
return running;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getPhase() {
|
||||
// Start earlier than most components, but shutdown later
|
||||
return 10;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isAutoStartup() {
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Queues a job for execution when resources permit.
|
||||
*
|
||||
@ -142,32 +197,36 @@ public class JobQueue {
|
||||
|
||||
// Update stats
|
||||
totalQueuedJobs++;
|
||||
currentQueueSize = jobQueue.size();
|
||||
|
||||
// Try to add to the queue
|
||||
try {
|
||||
boolean added = jobQueue.offer(job, 5, TimeUnit.SECONDS);
|
||||
if (!added) {
|
||||
log.warn("Queue full, rejecting job {}", jobId);
|
||||
rejectedJobs++;
|
||||
future.completeExceptionally(
|
||||
new RuntimeException("Job queue full, please try again later"));
|
||||
// Synchronize access to the queue
|
||||
synchronized (queueLock) {
|
||||
currentQueueSize = jobQueue.size();
|
||||
|
||||
// Try to add to the queue
|
||||
try {
|
||||
boolean added = jobQueue.offer(job, 5, TimeUnit.SECONDS);
|
||||
if (!added) {
|
||||
log.warn("Queue full, rejecting job {}", jobId);
|
||||
rejectedJobs++;
|
||||
future.completeExceptionally(
|
||||
new RuntimeException("Job queue full, please try again later"));
|
||||
jobMap.remove(jobId);
|
||||
return future;
|
||||
}
|
||||
|
||||
log.debug(
|
||||
"Job {} queued for execution (weight: {}, queue size: {})",
|
||||
jobId,
|
||||
resourceWeight,
|
||||
jobQueue.size());
|
||||
|
||||
return future;
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
future.completeExceptionally(new RuntimeException("Job queue interrupted"));
|
||||
jobMap.remove(jobId);
|
||||
return future;
|
||||
}
|
||||
|
||||
log.debug(
|
||||
"Job {} queued for execution (weight: {}, queue size: {})",
|
||||
jobId,
|
||||
resourceWeight,
|
||||
jobQueue.size());
|
||||
|
||||
return future;
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
future.completeExceptionally(new RuntimeException("Job queue interrupted"));
|
||||
jobMap.remove(jobId);
|
||||
return future;
|
||||
}
|
||||
}
|
||||
|
||||
@ -177,13 +236,15 @@ public class JobQueue {
|
||||
* @return The current capacity
|
||||
*/
|
||||
public int getQueueCapacity() {
|
||||
return ((LinkedBlockingQueue<QueuedJob>) jobQueue).remainingCapacity() + jobQueue.size();
|
||||
synchronized (queueLock) {
|
||||
return ((LinkedBlockingQueue<QueuedJob>) jobQueue).remainingCapacity() + jobQueue.size();
|
||||
}
|
||||
}
|
||||
|
||||
/** Updates the capacity of the job queue based on available system resources. */
|
||||
private void updateQueueCapacity() {
|
||||
try {
|
||||
// Calculate new capacity
|
||||
// Calculate new capacity once and cache the result
|
||||
int newCapacity =
|
||||
resourceMonitor.calculateDynamicQueueCapacity(
|
||||
baseQueueCapacity, minQueueCapacity);
|
||||
@ -193,14 +254,20 @@ public class JobQueue {
|
||||
log.debug(
|
||||
"Updating job queue capacity from {} to {}", currentCapacity, newCapacity);
|
||||
|
||||
// Create new queue with updated capacity
|
||||
BlockingQueue<QueuedJob> newQueue = new LinkedBlockingQueue<>(newCapacity);
|
||||
synchronized (queueLock) {
|
||||
// Double-check that capacity still needs to be updated
|
||||
// Use the cached currentCapacity to avoid calling getQueueCapacity() again
|
||||
if (newCapacity != currentCapacity) {
|
||||
// Create new queue with updated capacity
|
||||
BlockingQueue<QueuedJob> newQueue = new LinkedBlockingQueue<>(newCapacity);
|
||||
|
||||
// Transfer jobs from old queue to new queue
|
||||
jobQueue.drainTo(newQueue);
|
||||
jobQueue = newQueue;
|
||||
// Transfer jobs from old queue to new queue
|
||||
jobQueue.drainTo(newQueue);
|
||||
jobQueue = newQueue;
|
||||
|
||||
currentQueueSize = jobQueue.size();
|
||||
currentQueueSize = jobQueue.size();
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("Error updating queue capacity: {}", e.getMessage(), e);
|
||||
@ -209,55 +276,81 @@ public class JobQueue {
|
||||
|
||||
/** Processes jobs in the queue, executing them when resources permit. */
|
||||
private void processQueue() {
|
||||
if (shuttingDown || jobQueue.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
// Jobs to execute after releasing the lock
|
||||
java.util.List<QueuedJob> jobsToExecute = new java.util.ArrayList<>();
|
||||
|
||||
try {
|
||||
// Get current resource status
|
||||
ResourceMonitor.ResourceStatus status = resourceMonitor.getCurrentStatus().get();
|
||||
|
||||
// Check if we should execute any jobs
|
||||
boolean canExecuteJobs = (status != ResourceMonitor.ResourceStatus.CRITICAL);
|
||||
|
||||
if (!canExecuteJobs) {
|
||||
// Under critical load, don't execute any jobs
|
||||
log.debug("System under critical load, delaying job execution");
|
||||
// First synchronized block: poll jobs from the queue and prepare them for execution
|
||||
synchronized (queueLock) {
|
||||
if (shuttingDown || jobQueue.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Get jobs from the queue, up to a limit based on resource availability
|
||||
int jobsToProcess =
|
||||
Math.max(
|
||||
1,
|
||||
switch (status) {
|
||||
case OK -> 3;
|
||||
case WARNING -> 1;
|
||||
case CRITICAL -> 0;
|
||||
});
|
||||
try {
|
||||
// Get current resource status
|
||||
ResourceMonitor.ResourceStatus status = resourceMonitor.getCurrentStatus().get();
|
||||
|
||||
for (int i = 0; i < jobsToProcess && !jobQueue.isEmpty(); i++) {
|
||||
QueuedJob job = jobQueue.poll();
|
||||
if (job == null) break;
|
||||
// Check if we should execute any jobs
|
||||
boolean canExecuteJobs = (status != ResourceMonitor.ResourceStatus.CRITICAL);
|
||||
|
||||
// Check if it's been waiting too long
|
||||
long waitTimeMs = Instant.now().toEpochMilli() - job.queuedAt.toEpochMilli();
|
||||
if (waitTimeMs > maxWaitTimeMs) {
|
||||
log.warn(
|
||||
"Job {} exceeded maximum wait time ({} ms), executing anyway",
|
||||
job.jobId,
|
||||
waitTimeMs);
|
||||
if (!canExecuteJobs) {
|
||||
// Under critical load, don't execute any jobs
|
||||
log.debug("System under critical load, delaying job execution");
|
||||
return;
|
||||
}
|
||||
|
||||
// Remove from our map
|
||||
jobMap.remove(job.jobId);
|
||||
currentQueueSize = jobQueue.size();
|
||||
// Get jobs from the queue, up to a limit based on resource availability
|
||||
int jobsToProcess =
|
||||
Math.max(
|
||||
1,
|
||||
switch (status) {
|
||||
case OK -> 3;
|
||||
case WARNING -> 1;
|
||||
case CRITICAL -> 0;
|
||||
});
|
||||
|
||||
// Execute the job
|
||||
executeJob(job);
|
||||
for (int i = 0; i < jobsToProcess && !jobQueue.isEmpty(); i++) {
|
||||
QueuedJob job = jobQueue.poll();
|
||||
if (job == null) break;
|
||||
|
||||
// Check if it's been waiting too long
|
||||
long waitTimeMs = Instant.now().toEpochMilli() - job.queuedAt.toEpochMilli();
|
||||
if (waitTimeMs > maxWaitTimeMs) {
|
||||
log.warn(
|
||||
"Job {} exceeded maximum wait time ({} ms), executing anyway",
|
||||
job.jobId,
|
||||
waitTimeMs);
|
||||
|
||||
// Add a specific status to the job context that can be tracked
|
||||
// This will be visible in the job status API
|
||||
try {
|
||||
TaskManager taskManager = SpringContextHolder.getBean(TaskManager.class);
|
||||
if (taskManager != null) {
|
||||
taskManager.addNote(
|
||||
job.jobId,
|
||||
"QUEUED_TIMEOUT: Job waited in queue for " +
|
||||
(waitTimeMs/1000) + " seconds, exceeding the maximum wait time of " +
|
||||
(maxWaitTimeMs/1000) + " seconds.");
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("Failed to add timeout note to job {}: {}", job.jobId, e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
// Remove from our map
|
||||
jobMap.remove(job.jobId);
|
||||
currentQueueSize = jobQueue.size();
|
||||
|
||||
// Add to the list of jobs to execute outside the synchronized block
|
||||
jobsToExecute.add(job);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("Error processing job queue: {}", e.getMessage(), e);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("Error processing job queue: {}", e.getMessage(), e);
|
||||
}
|
||||
|
||||
// Now execute the jobs outside the synchronized block to avoid holding the lock
|
||||
for (QueuedJob job : jobsToExecute) {
|
||||
executeJob(job);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -133,6 +133,25 @@ public class TaskManager {
|
||||
return jobResults.get(jobId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a note to a task. Notes are informational messages that can be attached to a job
|
||||
* for tracking purposes.
|
||||
*
|
||||
* @param jobId The job ID
|
||||
* @param note The note to add
|
||||
* @return true if the note was added successfully, false if the job doesn't exist
|
||||
*/
|
||||
public boolean addNote(String jobId, String note) {
|
||||
JobResult jobResult = jobResults.get(jobId);
|
||||
if (jobResult != null) {
|
||||
jobResult.addNote(note);
|
||||
log.debug("Added note to job ID: {}: {}", jobId, note);
|
||||
return true;
|
||||
}
|
||||
log.warn("Attempted to add note to non-existent job ID: {}", jobId);
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get statistics about all jobs in the system
|
||||
*
|
||||
|
@ -0,0 +1,80 @@
|
||||
package stirling.software.common.util;
|
||||
|
||||
import org.springframework.beans.BeansException;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.context.ApplicationContextAware;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
/**
|
||||
* Utility class to access Spring managed beans from non-Spring managed classes.
|
||||
* This is especially useful for classes that are instantiated by frameworks
|
||||
* or created dynamically.
|
||||
*/
|
||||
@Component
|
||||
@Slf4j
|
||||
public class SpringContextHolder implements ApplicationContextAware {
|
||||
|
||||
private static ApplicationContext applicationContext;
|
||||
|
||||
@Override
|
||||
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
|
||||
SpringContextHolder.applicationContext = applicationContext;
|
||||
log.debug("Spring context holder initialized");
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a Spring bean by class type
|
||||
*
|
||||
* @param <T> The bean type
|
||||
* @param beanClass The bean class
|
||||
* @return The bean instance, or null if not found
|
||||
*/
|
||||
public static <T> T getBean(Class<T> beanClass) {
|
||||
if (applicationContext == null) {
|
||||
log.warn("Application context not initialized when attempting to get bean of type {}",
|
||||
beanClass.getName());
|
||||
return null;
|
||||
}
|
||||
|
||||
try {
|
||||
return applicationContext.getBean(beanClass);
|
||||
} catch (BeansException e) {
|
||||
log.error("Error getting bean of type {}: {}", beanClass.getName(), e.getMessage());
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a Spring bean by name
|
||||
*
|
||||
* @param <T> The bean type
|
||||
* @param beanName The bean name
|
||||
* @return The bean instance, or null if not found
|
||||
*/
|
||||
public static <T> T getBean(String beanName) {
|
||||
if (applicationContext == null) {
|
||||
log.warn("Application context not initialized when attempting to get bean '{}'", beanName);
|
||||
return null;
|
||||
}
|
||||
|
||||
try {
|
||||
@SuppressWarnings("unchecked")
|
||||
T bean = (T) applicationContext.getBean(beanName);
|
||||
return bean;
|
||||
} catch (BeansException e) {
|
||||
log.error("Error getting bean '{}': {}", beanName, e.getMessage());
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if the application context is initialized
|
||||
*
|
||||
* @return true if initialized, false otherwise
|
||||
*/
|
||||
public static boolean isInitialized() {
|
||||
return applicationContext != null;
|
||||
}
|
||||
}
|
@ -252,4 +252,36 @@ class TaskManagerTest {
|
||||
// Verify the executor service is shutdown
|
||||
// This is difficult to test directly, but we can verify it doesn't throw exceptions
|
||||
}
|
||||
|
||||
@Test
|
||||
void testAddNote() {
|
||||
// Arrange
|
||||
String jobId = UUID.randomUUID().toString();
|
||||
taskManager.createTask(jobId);
|
||||
String note = "Test note";
|
||||
|
||||
// Act
|
||||
boolean result = taskManager.addNote(jobId, note);
|
||||
|
||||
// Assert
|
||||
assertTrue(result);
|
||||
JobResult jobResult = taskManager.getJobResult(jobId);
|
||||
assertNotNull(jobResult);
|
||||
assertNotNull(jobResult.getNotes());
|
||||
assertEquals(1, jobResult.getNotes().size());
|
||||
assertEquals(note, jobResult.getNotes().get(0));
|
||||
}
|
||||
|
||||
@Test
|
||||
void testAddNote_NonExistentJob() {
|
||||
// Arrange
|
||||
String jobId = "non-existent-job";
|
||||
String note = "Test note";
|
||||
|
||||
// Act
|
||||
boolean result = taskManager.addNote(jobId, note);
|
||||
|
||||
// Assert
|
||||
assertFalse(result);
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,73 @@
|
||||
package stirling.software.common.util;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.*;
|
||||
import static org.mockito.Mockito.*;
|
||||
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
|
||||
class SpringContextHolderTest {
|
||||
|
||||
private ApplicationContext mockApplicationContext;
|
||||
private SpringContextHolder contextHolder;
|
||||
|
||||
@BeforeEach
|
||||
void setUp() {
|
||||
mockApplicationContext = mock(ApplicationContext.class);
|
||||
contextHolder = new SpringContextHolder();
|
||||
}
|
||||
|
||||
@Test
|
||||
void testSetApplicationContext() {
|
||||
// Act
|
||||
contextHolder.setApplicationContext(mockApplicationContext);
|
||||
|
||||
// Assert
|
||||
assertTrue(SpringContextHolder.isInitialized());
|
||||
}
|
||||
|
||||
@Test
|
||||
void testGetBean_ByType() {
|
||||
// Arrange
|
||||
contextHolder.setApplicationContext(mockApplicationContext);
|
||||
TestBean expectedBean = new TestBean();
|
||||
when(mockApplicationContext.getBean(TestBean.class)).thenReturn(expectedBean);
|
||||
|
||||
// Act
|
||||
TestBean result = SpringContextHolder.getBean(TestBean.class);
|
||||
|
||||
// Assert
|
||||
assertSame(expectedBean, result);
|
||||
verify(mockApplicationContext).getBean(TestBean.class);
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
void testGetBean_ApplicationContextNotSet() {
|
||||
// Don't set application context
|
||||
|
||||
// Act
|
||||
TestBean result = SpringContextHolder.getBean(TestBean.class);
|
||||
|
||||
// Assert
|
||||
assertNull(result);
|
||||
}
|
||||
|
||||
@Test
|
||||
void testGetBean_BeanNotFound() {
|
||||
// Arrange
|
||||
contextHolder.setApplicationContext(mockApplicationContext);
|
||||
when(mockApplicationContext.getBean(TestBean.class)).thenThrow(new org.springframework.beans.BeansException("Bean not found") {});
|
||||
|
||||
// Act
|
||||
TestBean result = SpringContextHolder.getBean(TestBean.class);
|
||||
|
||||
// Assert
|
||||
assertNull(result);
|
||||
}
|
||||
|
||||
// Simple test class
|
||||
private static class TestBean {
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user