Updated Workflows (callbacks, new Operator)

This commit is contained in:
Felix Kaspar 2023-11-20 21:45:49 +01:00
parent a5060f0fd3
commit a02169048d
2 changed files with 32 additions and 46 deletions

View File

@ -32,19 +32,9 @@ router.post("/:workflowUuid?", [
if(req.body.async === "false") {
console.log("Don't do async");
const traverse = traverseOperations(workflow.operations, inputs);
let pdfResults;
let iteration;
while (true) {
iteration = await traverse.next();
if (iteration.done) {
pdfResults = iteration.value;
console.log("Done");
break;
}
console.log(iteration.value);
}
let pdfResults = await traverseOperations(workflow.operations, inputs, (state) => {
console.log("State: ", state);
})
console.log("Download");
await respondWithPdfFiles(res, pdfResults, "workflow-results");
@ -73,23 +63,16 @@ router.post("/:workflowUuid?", [
}
});
const traverse = traverseOperations(workflow.operations, inputs);
let pdfResults = await traverseOperations(workflow.operations, inputs, (state) => {
console.log("State: ", state);
if(activeWorkflow.eventStream)
activeWorkflow.eventStream.write(`data: ${state}\n\n`);
})
let pdfResults;
let iteration;
while (true) {
iteration = await traverse.next();
if (iteration.done) {
pdfResults = iteration.value;
if(activeWorkflow.eventStream) {
activeWorkflow.eventStream.write(`data: processing done\n\n`);
activeWorkflow.eventStream.end();
}
break;
}
if(activeWorkflow.eventStream)
activeWorkflow.eventStream.write(`data: ${iteration.value}\n\n`);
}
activeWorkflow.result = pdfResults;
activeWorkflow.finished = true;

View File

@ -4,14 +4,24 @@ import { PdfFile } from "../wrappers/PdfFile";
import { Progress } from "../functions";
import { Impose } from "../functions/impose";
// TODO: Fix Operators Type
export async function * traverseOperations(operations: Action[], input: PdfFile[] | PdfFile): AsyncGenerator<string, PdfFile[], void> {
export async function traverseOperations(operations: Action[], input: PdfFile[], progressCallback: (state: Progress) => void): Promise<PdfFile[]> {
// TODO: Validate using inbuilt validators:
/*
validationResult = impose.validate()
if(validationResult.valid) {
// Check Next
}
else {
return validationResult.reason
}
*/
const waitOperations = organizeWaitOperations(operations);
let results: PdfFile[] = [];
yield* nextOperation(operations, input);
await nextOperation(operations, input, progressCallback);
return results;
async function * nextOperation(actions: Action[] | undefined, input: PdfFile[] | PdfFile): AsyncGenerator<string, void, void> {
async function nextOperation(actions: Action[] | undefined, input: PdfFile[], progressCallback: (state: Progress) => void): Promise<void> {
console.log("Next Operation");
if(actions === undefined || (Array.isArray(actions) && actions.length == 0)) { // isEmpty
console.log("Last Operation");
@ -21,21 +31,15 @@ export async function * traverseOperations(operations: Action[], input: PdfFile[
results = results.concat(input);
return;
}
else {
console.log("operation done: " + input.filename);
results.push(input);
return;
}
}
for (let i = 0; i < actions.length; i++) {
yield* computeOperation(actions[i], Object.assign([], input)); // structuredClone-like for ts TODO: test if this really works
await computeOperation(actions[i], Object.assign([], input), progressCallback); // structuredClone-like for ts TODO: test if this really works
}
}
async function * computeOperation(action: Action, input: PdfFile[]): AsyncGenerator<string, void, void> {
async function computeOperation(action: Action, input: PdfFile[], progressCallback: (state: Progress) => void): Promise<void> {
console.log("Input: ", input);
yield "Starting: " + action.type;
switch (action.type) {
case "done": // Skip this, because it is a valid node.
break;
@ -51,7 +55,7 @@ export async function * traverseOperations(operations: Action[], input: PdfFile[
waitOperation.waitCount--;
if(waitOperation.waitCount == 0 && waitOperation.doneOperation.actions) {
yield* nextOperation(waitOperation.doneOperation.actions, waitOperation.input);
await nextOperation(waitOperation.doneOperation.actions, waitOperation.input, progressCallback);
}
break;
/*case "extract":
@ -62,11 +66,10 @@ export async function * traverseOperations(operations: Action[], input: PdfFile[
break;*/
case "impose":
let impose = new Impose(action);
if(impose.validate().valid) {
impose.run(input, (state: Progress) => {
console.log(state);
input = await impose.run(input, (state: Progress) => {
progressCallback(state);
});
}
await nextOperation(action.actions, input, progressCallback);
break;
/*case "merge":
yield* nToOne(input, action, async (inputs) => {