This commit is contained in:
Anthony Stirling 2025-07-21 13:08:21 +01:00
parent fe4a452a54
commit 73a340cbe2
2 changed files with 137 additions and 114 deletions

View File

@ -25,50 +25,57 @@ public class CleanupAsyncConfig {
exec.setThreadNamePrefix("cleanup-"); exec.setThreadNamePrefix("cleanup-");
// Set custom rejection handler to log when queue is full // Set custom rejection handler to log when queue is full
exec.setRejectedExecutionHandler(new RejectedExecutionHandler() { exec.setRejectedExecutionHandler(
private volatile long lastRejectionTime = 0; new RejectedExecutionHandler() {
private volatile int rejectionCount = 0; private volatile long lastRejectionTime = 0;
private volatile int rejectionCount = 0;
@Override @Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
long currentTime = System.currentTimeMillis(); long currentTime = System.currentTimeMillis();
rejectionCount++; rejectionCount++;
// Rate-limit logging to avoid spam // Rate-limit logging to avoid spam
if (currentTime - lastRejectionTime > 60000) { // Log at most once per minute if (currentTime - lastRejectionTime
log.warn("Cleanup task rejected #{} - queue full! Active: {}, Queue size: {}, Pool size: {}", > 60000) { // Log at most once per minute
rejectionCount, log.warn(
executor.getActiveCount(), "Cleanup task rejected #{} - queue full! Active: {}, Queue size: {}, Pool size: {}",
executor.getQueue().size(), rejectionCount,
executor.getPoolSize()); executor.getActiveCount(),
lastRejectionTime = currentTime; executor.getQueue().size(),
} executor.getPoolSize());
lastRejectionTime = currentTime;
}
// Try to discard oldest task and add this one // Try to discard oldest task and add this one
if (executor.getQueue().poll() != null) { if (executor.getQueue().poll() != null) {
log.debug("Discarded oldest queued cleanup task to make room"); log.debug("Discarded oldest queued cleanup task to make room");
try { try {
executor.execute(r); executor.execute(r);
return; return;
} catch (Exception e) { } catch (Exception e) {
// If still rejected, fall back to caller-runs // If still rejected, fall back to caller-runs
}
}
// Last resort: caller-runs with timeout protection
log.warn(
"Executing cleanup task #{} on scheduler thread as last resort",
rejectionCount);
long startTime = System.currentTimeMillis();
try {
r.run();
long duration = System.currentTimeMillis() - startTime;
if (duration > 30000) { // Warn if cleanup blocks scheduler for >30s
log.warn(
"Cleanup task on scheduler thread took {}ms - consider tuning",
duration);
}
} catch (Exception e) {
log.error("Cleanup task failed on scheduler thread", e);
}
} }
} });
// Last resort: caller-runs with timeout protection
log.warn("Executing cleanup task #{} on scheduler thread as last resort", rejectionCount);
long startTime = System.currentTimeMillis();
try {
r.run();
long duration = System.currentTimeMillis() - startTime;
if (duration > 30000) { // Warn if cleanup blocks scheduler for >30s
log.warn("Cleanup task on scheduler thread took {}ms - consider tuning", duration);
}
} catch (Exception e) {
log.error("Cleanup task failed on scheduler thread", e);
}
}
});
exec.initialize(); exec.initialize();
return exec; return exec;

View File

@ -142,44 +142,66 @@ public class TempFileCleanupService {
public CompletableFuture<Void> scheduledCleanup() { public CompletableFuture<Void> scheduledCleanup() {
// Check if cleanup is already running // Check if cleanup is already running
if (!cleanupRunning.compareAndSet(false, true)) { if (!cleanupRunning.compareAndSet(false, true)) {
log.warn("Cleanup already in progress (running for {}ms), skipping this cycle", log.warn(
System.currentTimeMillis() - lastCleanupTimestamp.get()); "Cleanup already in progress (running for {}ms), skipping this cycle",
System.currentTimeMillis() - lastCleanupTimestamp.get());
return CompletableFuture.completedFuture(null); return CompletableFuture.completedFuture(null);
} }
// Calculate timeout as 2x cleanup interval // Calculate timeout as 2x cleanup interval
long timeoutMinutes = applicationProperties.getSystem().getTempFileManagement().getCleanupIntervalMinutes() * 2; long timeoutMinutes =
applicationProperties
.getSystem()
.getTempFileManagement()
.getCleanupIntervalMinutes()
* 2;
return CompletableFuture.supplyAsync(() -> { CompletableFuture<Void> cleanupFuture =
long startTime = System.currentTimeMillis(); CompletableFuture.runAsync(
lastCleanupTimestamp.set(startTime); () -> {
long cleanupNumber = cleanupCount.incrementAndGet(); long startTime = System.currentTimeMillis();
lastCleanupTimestamp.set(startTime);
long cleanupNumber = cleanupCount.incrementAndGet();
try { try {
log.info("Starting cleanup #{} with {}min timeout", cleanupNumber, timeoutMinutes); log.info(
doScheduledCleanup(); "Starting cleanup #{} with {}min timeout",
cleanupNumber,
timeoutMinutes);
doScheduledCleanup();
long duration = System.currentTimeMillis() - startTime; long duration = System.currentTimeMillis() - startTime;
lastCleanupDuration.set(duration); lastCleanupDuration.set(duration);
log.info("Cleanup #{} completed successfully in {}ms", cleanupNumber, duration); log.info(
return null; "Cleanup #{} completed successfully in {}ms",
} catch (Exception e) { cleanupNumber,
long duration = System.currentTimeMillis() - startTime; duration);
lastCleanupDuration.set(duration); } catch (Exception e) {
log.error("Cleanup #{} failed after {}ms", cleanupNumber, duration, e); long duration = System.currentTimeMillis() - startTime;
return null; lastCleanupDuration.set(duration);
} finally { log.error(
cleanupRunning.set(false); "Cleanup #{} failed after {}ms",
} cleanupNumber,
}).orTimeout(timeoutMinutes, TimeUnit.MINUTES) duration,
.exceptionally(throwable -> { e);
if (throwable.getCause() instanceof TimeoutException) { } finally {
log.error("Cleanup #{} timed out after {}min - forcing cleanup state reset", cleanupRunning.set(false);
cleanupCount.get(), timeoutMinutes); }
cleanupRunning.set(false); });
}
return null; return cleanupFuture
}); .orTimeout(timeoutMinutes, TimeUnit.MINUTES)
.exceptionally(
throwable -> {
if (throwable.getCause() instanceof TimeoutException) {
log.error(
"Cleanup #{} timed out after {}min - forcing cleanup state reset",
cleanupCount.get(),
timeoutMinutes);
cleanupRunning.set(false);
}
return null;
});
} }
/** Internal method that performs the actual cleanup work */ /** Internal method that performs the actual cleanup work */
@ -409,8 +431,10 @@ public class TempFileCleanupService {
} }
if (consecutiveFailures >= MAX_CONSECUTIVE_FAILURES) { if (consecutiveFailures >= MAX_CONSECUTIVE_FAILURES) {
log.error("Aborting directory cleanup after {} consecutive failures in: {}", log.error(
consecutiveFailures, directory); "Aborting directory cleanup after {} consecutive failures in: {}",
consecutiveFailures,
directory);
return; // Early exit from cleanup return; // Early exit from cleanup
} }
} }
@ -420,8 +444,10 @@ public class TempFileCleanupService {
log.warn("Error processing path: {}", path, e); log.warn("Error processing path: {}", path, e);
if (consecutiveFailures >= MAX_CONSECUTIVE_FAILURES) { if (consecutiveFailures >= MAX_CONSECUTIVE_FAILURES) {
log.error("Aborting directory cleanup after {} consecutive failures in: {}", log.error(
consecutiveFailures, directory); "Aborting directory cleanup after {} consecutive failures in: {}",
consecutiveFailures,
directory);
return; // Early exit from cleanup return; // Early exit from cleanup
} }
} }
@ -537,9 +563,7 @@ public class TempFileCleanupService {
} }
} }
/** /** Get cleanup status and metrics for monitoring */
* Get cleanup status and metrics for monitoring
*/
public String getCleanupStatus() { public String getCleanupStatus() {
if (cleanupRunning.get()) { if (cleanupRunning.get()) {
long runningTime = System.currentTimeMillis() - lastCleanupTimestamp.get(); long runningTime = System.currentTimeMillis() - lastCleanupTimestamp.get();
@ -549,38 +573,30 @@ public class TempFileCleanupService {
long lastTime = lastCleanupTimestamp.get(); long lastTime = lastCleanupTimestamp.get();
if (lastTime > 0) { if (lastTime > 0) {
long timeSinceLastRun = System.currentTimeMillis() - lastTime; long timeSinceLastRun = System.currentTimeMillis() - lastTime;
return String.format("Last cleanup #%d: %dms duration, %dms ago", return String.format(
cleanupCount.get(), lastDuration, timeSinceLastRun); "Last cleanup #%d: %dms duration, %dms ago",
cleanupCount.get(), lastDuration, timeSinceLastRun);
} else { } else {
return "No cleanup runs yet"; return "No cleanup runs yet";
} }
} }
} }
/** /** Check if cleanup is currently running */
* Check if cleanup is currently running
*/
public boolean isCleanupRunning() { public boolean isCleanupRunning() {
return cleanupRunning.get(); return cleanupRunning.get();
} }
/** /** Get cleanup metrics */
* Get cleanup metrics
*/
public CleanupMetrics getMetrics() { public CleanupMetrics getMetrics() {
return new CleanupMetrics( return new CleanupMetrics(
cleanupCount.get(), cleanupCount.get(),
lastCleanupDuration.get(), lastCleanupDuration.get(),
lastCleanupTimestamp.get(), lastCleanupTimestamp.get(),
cleanupRunning.get() cleanupRunning.get());
);
} }
/** Simple record for cleanup metrics */ /** Simple record for cleanup metrics */
public record CleanupMetrics( public record CleanupMetrics(
long totalRuns, long totalRuns, long lastDurationMs, long lastRunTimestamp, boolean currentlyRunning) {}
long lastDurationMs,
long lastRunTimestamp,
boolean currentlyRunning
) {}
} }