sitegen/framework/lib/async.ts
clover caruso f1d4be2553 feat: dynamic page regeneration (#24)
the asset system is reworked to support "dynamic" entries, where each
entry is a separate file on disk containing the latest generation's
headers+raw+gzip+zstd. when calling view.regenerate, it will look for
pages that had "export const regenerate" during generation, and render
those pages using the view system, but then store the results as assets
instead of sending as a response.

pages configured as regenerable are also bundled as views, using the
non-aliasing key "page:${page.id}". this cannot alias because file
paths may not contain a colon.
2025-08-11 22:43:27 -07:00

307 lines
7.4 KiB
TypeScript

const five_minutes = 5 * 60 * 1000;
interface QueueOptions<T, R> {
name: string;
fn: (item: T, spin: Spinner) => Promise<R>;
getItemText?: (item: T) => string;
maxJobs?: number;
passive?: boolean;
}
// Process multiple items in parallel, queue up as many.
export class Queue<T, R> {
#name: string;
#fn: (item: T, spin: Spinner) => Promise<R>;
#maxJobs: number;
#getItemText: (item: T) => string;
#passive: boolean;
#active: Spinner[] = [];
#queue: Array<[T] | [T, (result: R) => void, (err: unknown) => void]> = [];
#cachedProgress: Progress<{ active: Spinner[] }> | null = null;
#done: number = 0;
#total: number = 0;
#onComplete: (() => void) | null = null;
#estimate: number | null = null;
#errors: unknown[] = [];
constructor(options: QueueOptions<T, R>) {
this.#name = options.name;
this.#fn = options.fn;
this.#maxJobs = options.maxJobs ?? 5;
this.#getItemText = options.getItemText ?? defaultGetItemText;
this.#passive = options.passive ?? false;
}
cancel() {
const bar = this.#cachedProgress;
bar?.stop();
this.#queue = [];
}
get bar() {
const cached = this.#cachedProgress;
if (!cached) {
const bar = (this.#cachedProgress = new Progress({
spinner: null,
text: ({ active }) => {
const now = performance.now();
let text = `[${this.#done}/${this.#total}] ${this.#name}`;
let n = 0;
for (const item of active) {
let itemText = "- " + item.format(now);
text += `\n` +
itemText.slice(0, Math.max(0, process.stdout.columns - 1));
if (n > 10) {
text += `\n ... + ${active.length - n} more`;
break;
}
n++;
}
return text;
},
props: {
active: [] as Spinner[],
},
}));
bar.value = 0;
return bar;
}
return cached;
}
addReturn(args: T) {
this.#total += 1;
this.updateTotal();
if (this.#active.length >= this.#maxJobs) {
const { promise, resolve, reject } = Promise.withResolvers<R>();
this.#queue.push([args, resolve, reject]);
return promise;
}
return this.#run(args);
}
add(args: T) {
return this.addReturn(args).then(
() => {},
() => {},
);
}
addMany(items: T[]) {
this.#total += items.length;
this.updateTotal();
const runNowCount = this.#maxJobs - this.#active.length;
const runNow = items.slice(0, runNowCount);
const runLater = items.slice(runNowCount);
this.#queue.push(...runLater.reverse().map<[T]>((x) => [x]));
runNow.map((item) => this.#run(item).catch(() => {}));
}
async #run(args: T): Promise<R> {
const bar = this.bar;
const itemText = this.#getItemText(args);
const spinner = new Spinner(itemText);
spinner.stop();
(spinner as any).redraw = () => (bar as any).redraw();
const active = this.#active;
try {
active.unshift(spinner);
bar.props = { active };
// console.log(this.#name + ": " + itemText);
const result = await this.#fn(args, spinner);
this.#done++;
return result;
} catch (err) {
if (err && typeof err === "object") {
(err as any).job = itemText;
}
this.#errors.push(err);
console.error(util.inspect(err, false, Infinity, true));
throw err;
} finally {
active.splice(active.indexOf(spinner), 1);
bar.props = { active };
bar.value = this.#done;
// Process next item
const next = this.#queue.shift();
if (next) {
const args = next[0];
this.#run(args)
.then((result) => next[1]?.(result))
.catch((err) => next[2]?.(err));
} else if (this.#active.length === 0) {
if (this.#passive) {
this.bar.stop();
this.#cachedProgress = null;
}
this.#onComplete?.();
}
}
}
updateTotal() {
const bar = this.bar;
bar.total = Math.max(this.#total, this.#estimate ?? 0);
}
set estimate(e: number) {
this.#estimate = e;
if (this.#cachedProgress) {
this.updateTotal();
}
}
async done(o?: { method: "success" | "stop" }) {
if (this.#active.length === 0) {
this.#end(o);
return;
}
const { promise, resolve } = Promise.withResolvers<void>();
this.#onComplete = resolve;
await promise;
this.#end(o);
}
#end({
method = this.#passive ? "stop" : "success",
}: {
method?: "success" | "stop";
} = {}) {
const bar = this.#cachedProgress;
if (this.#errors.length > 0) {
if (bar) bar.stop();
throw new AggregateError(
this.#errors,
this.#errors.length + " jobs failed in '" + this.#name + "'",
);
}
if (bar) bar[method]();
}
get active(): boolean {
return this.#active.length !== 0;
}
[Symbol.dispose]() {
if (this.active) {
this.cancel();
}
}
}
const cwd = process.cwd();
function defaultGetItemText(item: unknown) {
let itemText = "";
if (typeof item === "string") {
itemText = item;
} else if (typeof item === "object" && item !== null) {
const { path, label, id } = item as any;
itemText = label ?? path ?? id ?? JSON.stringify(item);
} else {
itemText = JSON.stringify(item);
}
if (itemText.startsWith(cwd)) {
itemText = path.relative(cwd, itemText);
}
return itemText;
}
export class OnceMap<T> {
private ongoing = new Map<string, Promise<T>>();
get(key: string, compute: () => Promise<T>) {
if (this.ongoing.has(key)) {
return this.ongoing.get(key)!;
}
const result = compute();
this.ongoing.set(key, result);
return result;
}
}
interface ARCEValue<T> {
value: T;
[Symbol.dispose]: () => void;
}
export function RefCountedExpirable<T>(
init: () => Promise<T>,
deinit: (value: T) => void,
expire: number = five_minutes,
): () => Promise<ARCEValue<T>> {
let refs = 0;
let item: ARCEValue<T> | null = null;
let loading: Promise<ARCEValue<T>> | null = null;
let timer: ReturnType<typeof setTimeout> | null = null;
function deref() {
ASSERT(item !== null);
if (--refs !== 0) return;
ASSERT(timer === null);
timer = setTimeout(() => {
ASSERT(refs === 0);
ASSERT(loading === null);
ASSERT(item !== null);
deinit(item.value);
item = null;
timer = null;
}, expire);
}
return async function () {
if (timer !== null) {
clearTimeout(timer);
timer = null;
}
if (item !== null) {
refs++;
return item;
}
if (loading !== null) {
refs++;
return loading;
}
const p = Promise.withResolvers<ARCEValue<T>>();
loading = p.promise;
try {
const value = await init();
item = { value, [Symbol.dispose]: deref };
refs++;
p.resolve(item);
return item;
} catch (e) {
p.reject(e);
throw e;
} finally {
loading = null;
}
};
}
export function once<T>(fn: () => Promise<T>): () => Promise<T> {
let result: T | Promise<T> | null = null;
return async () => {
if (result) return result;
try {
result = await (result = fn());
} catch (err) {
result = null;
throw err;
}
return result;
};
}
import { Progress } from "@paperclover/console/Progress";
import { Spinner } from "@paperclover/console/Spinner";
import * as path from "node:path";
import process from "node:process";
import * as util from "node:util";