2024-05-25 00:22:01 +08:00
|
|
|
package stirling.software.SPDF.utils;
|
|
|
|
|
|
|
|
import static java.nio.file.StandardWatchEventKinds.*;
|
|
|
|
|
|
|
|
import java.io.IOException;
|
|
|
|
import java.nio.file.*;
|
|
|
|
import java.util.*;
|
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
|
import java.util.function.Predicate;
|
|
|
|
import java.util.stream.Stream;
|
|
|
|
|
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
|
import org.springframework.beans.factory.annotation.Qualifier;
|
|
|
|
import org.springframework.scheduling.annotation.Scheduled;
|
|
|
|
import org.springframework.stereotype.Component;
|
|
|
|
|
2024-12-17 10:26:18 +01:00
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
|
|
2024-05-25 00:22:01 +08:00
|
|
|
@Component
|
2024-12-17 10:26:18 +01:00
|
|
|
@Slf4j
|
2024-05-25 00:22:01 +08:00
|
|
|
public class FileMonitor {
|
2024-12-17 10:26:18 +01:00
|
|
|
|
2024-05-25 00:22:01 +08:00
|
|
|
private final Map<Path, WatchKey> path2KeyMapping;
|
|
|
|
private final Set<Path> newlyDiscoveredFiles;
|
|
|
|
private final ConcurrentHashMap.KeySetView<Path, Boolean> readyForProcessingFiles;
|
|
|
|
private final WatchService watchService;
|
|
|
|
private final Predicate<Path> pathFilter;
|
2024-05-29 23:01:53 +08:00
|
|
|
private final Path rootDir;
|
2024-05-25 00:22:01 +08:00
|
|
|
private Set<Path> stagingFiles;
|
|
|
|
|
|
|
|
/**
|
|
|
|
* @param rootDirectory the root directory to monitor
|
2024-06-01 13:55:28 +01:00
|
|
|
* @param pathFilter the filter to apply to the paths, return true if the path should be
|
|
|
|
* monitored, false otherwise
|
2024-05-25 00:22:01 +08:00
|
|
|
*/
|
|
|
|
@Autowired
|
|
|
|
public FileMonitor(
|
|
|
|
@Qualifier("watchedFoldersDir") String rootDirectory,
|
|
|
|
@Qualifier("directoryFilter") Predicate<Path> pathFilter)
|
|
|
|
throws IOException {
|
|
|
|
this.newlyDiscoveredFiles = new HashSet<>();
|
|
|
|
this.path2KeyMapping = new HashMap<>();
|
|
|
|
this.stagingFiles = new HashSet<>();
|
|
|
|
this.pathFilter = pathFilter;
|
|
|
|
this.readyForProcessingFiles = ConcurrentHashMap.newKeySet();
|
|
|
|
this.watchService = FileSystems.getDefault().newWatchService();
|
2024-05-29 23:01:53 +08:00
|
|
|
this.rootDir = Path.of(rootDirectory);
|
2024-05-25 00:22:01 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
private boolean shouldNotProcess(Path path) {
|
|
|
|
return !pathFilter.test(path);
|
|
|
|
}
|
|
|
|
|
|
|
|
private void recursivelyRegisterEntry(Path dir) throws IOException {
|
|
|
|
WatchKey key = dir.register(watchService, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY);
|
|
|
|
path2KeyMapping.put(dir, key);
|
2024-12-17 10:26:18 +01:00
|
|
|
log.info("Registered directory: {}", dir);
|
2024-05-25 00:22:01 +08:00
|
|
|
|
|
|
|
try (Stream<Path> directoryVisitor = Files.walk(dir, 1)) {
|
|
|
|
final Iterator<Path> iterator = directoryVisitor.iterator();
|
|
|
|
while (iterator.hasNext()) {
|
|
|
|
Path path = iterator.next();
|
|
|
|
if (path.equals(dir) || shouldNotProcess(path)) continue;
|
|
|
|
|
|
|
|
if (Files.isDirectory(path)) {
|
|
|
|
recursivelyRegisterEntry(path);
|
|
|
|
} else if (Files.isRegularFile(path)) {
|
|
|
|
handleFileCreation(path);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
@Scheduled(fixedRate = 5000)
|
|
|
|
public void trackFiles() {
|
|
|
|
/*
|
|
|
|
All files observed changes in the last iteration will be considered as staging files.
|
|
|
|
If those files are not modified in current iteration, they will be considered as ready for processing.
|
|
|
|
*/
|
|
|
|
stagingFiles = new HashSet<>(newlyDiscoveredFiles);
|
|
|
|
readyForProcessingFiles.clear();
|
2024-05-29 23:01:53 +08:00
|
|
|
|
|
|
|
if (path2KeyMapping.isEmpty()) {
|
2024-12-17 10:26:18 +01:00
|
|
|
log.warn("not monitoring any directory, even the root directory itself: {}", rootDir);
|
2024-05-29 23:01:53 +08:00
|
|
|
if (Files.exists(
|
|
|
|
rootDir)) { // if the root directory exists, re-register the root directory
|
|
|
|
try {
|
|
|
|
recursivelyRegisterEntry(rootDir);
|
|
|
|
} catch (IOException e) {
|
2024-12-17 10:26:18 +01:00
|
|
|
log.error("unable to register monitoring", e);
|
2024-05-29 23:01:53 +08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2024-05-25 00:22:01 +08:00
|
|
|
WatchKey key;
|
|
|
|
while ((key = watchService.poll()) != null) {
|
|
|
|
final Path watchingDir = (Path) key.watchable();
|
|
|
|
key.pollEvents()
|
|
|
|
.forEach(
|
|
|
|
(evt) -> {
|
|
|
|
final Path path = (Path) evt.context();
|
|
|
|
final WatchEvent.Kind<?> kind = evt.kind();
|
|
|
|
if (shouldNotProcess(path)) return;
|
|
|
|
|
|
|
|
try {
|
|
|
|
if (Files.isDirectory(path)) {
|
|
|
|
if (kind == ENTRY_CREATE) {
|
|
|
|
handleDirectoryCreation(path);
|
|
|
|
}
|
|
|
|
/*
|
|
|
|
we don't need to handle directory deletion or modification
|
|
|
|
- directory deletion will be handled by key.reset()
|
|
|
|
- directory modification indicates a new file creation or deletion, which is handled by below
|
|
|
|
*/
|
|
|
|
}
|
|
|
|
Path relativePathFromRoot = watchingDir.resolve(path);
|
|
|
|
if (kind == ENTRY_CREATE) {
|
|
|
|
handleFileCreation(relativePathFromRoot);
|
|
|
|
} else if (kind == ENTRY_DELETE) {
|
|
|
|
handleFileRemoval(relativePathFromRoot);
|
|
|
|
} else if (kind == ENTRY_MODIFY) {
|
|
|
|
handleFileModification(relativePathFromRoot);
|
|
|
|
}
|
|
|
|
} catch (Exception e) {
|
2024-12-17 10:26:18 +01:00
|
|
|
log.error("Error while processing file: {}", path, e);
|
2024-05-25 00:22:01 +08:00
|
|
|
}
|
|
|
|
});
|
|
|
|
|
|
|
|
boolean isKeyValid = key.reset();
|
|
|
|
if (!isKeyValid) { // key is invalid when the directory itself is no longer exists
|
|
|
|
path2KeyMapping.remove((Path) key.watchable());
|
|
|
|
}
|
|
|
|
}
|
|
|
|
readyForProcessingFiles.addAll(stagingFiles);
|
|
|
|
}
|
|
|
|
|
|
|
|
private void handleDirectoryCreation(Path dir) throws IOException {
|
|
|
|
WatchKey key = dir.register(watchService, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY);
|
|
|
|
path2KeyMapping.put(dir, key);
|
|
|
|
}
|
|
|
|
|
|
|
|
private void handleFileRemoval(Path path) {
|
|
|
|
newlyDiscoveredFiles.remove(path);
|
|
|
|
stagingFiles.remove(path);
|
|
|
|
}
|
|
|
|
|
|
|
|
private void handleFileCreation(Path path) {
|
|
|
|
newlyDiscoveredFiles.add(path);
|
|
|
|
stagingFiles.remove(path);
|
|
|
|
}
|
|
|
|
|
|
|
|
private void handleFileModification(Path path) {
|
|
|
|
// the logic is the same
|
|
|
|
handleFileCreation(path);
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Check if the file is ready for processing.
|
|
|
|
*
|
|
|
|
* <p>A file is ready for processing if it is not being modified for 5000ms.
|
|
|
|
*
|
|
|
|
* @param path the path of the file
|
|
|
|
* @return true if the file is ready for processing, false otherwise
|
|
|
|
*/
|
|
|
|
public boolean isFileReadyForProcessing(Path path) {
|
|
|
|
return readyForProcessingFiles.contains(path);
|
|
|
|
}
|
|
|
|
}
|