sitegen/src/file-viewer/bin/scan3.ts

804 lines
23 KiB
TypeScript

// The file scanner incrementally updates an sqlite database with file
// stats. Additionally, it runs "processors" on files, which precompute
// expensive data such as running `ffprobe` on all media to get the
// duration.
//
// Processors are also used to derive compressed and optimized assets,
// which is how automatic JXL / AV1 encoding is done. Derived files are
// uploaded to the clover NAS to be pulled by VPS instances for hosting.
//
// This is the third iteration of the scanner, hence its name "scan3";
// Remember that any software you want to be maintainable and high
// quality cannot be written with AI.
const workDir = path.resolve(".clover/derived");
const sotToken = UNWRAP(process.env.CLOVER_SOT_KEY);
export async function main() {
const start = performance.now();
const timerSpinner = new Spinner({
text: () =>
`paper clover's scan3 [${
((performance.now() - start) / 1000).toFixed(
1,
)
}s]`,
fps: 10,
});
using _endTimerSpinner = { [Symbol.dispose]: () => timerSpinner.stop() };
// Read a directory or file stat and queue up changed files.
using qList = new async.Queue({
name: "Discover Tree",
async fn(absPath: string) {
const stat = await fs.stat(absPath);
const publicPath = toPublicPath(absPath);
const mediaFile = MediaFile.getByPath(publicPath);
if (stat.isDirectory()) {
const items = await fs.readdir(absPath);
qList.addMany(items.map((subPath) => path.join(absPath, subPath)));
if (mediaFile) {
const deleted = mediaFile
.getChildren()
.filter((child) => !items.includes(child.basename))
.flatMap((child) =>
child.kind === MediaFileKind.directory
? child.getRecursiveFileChildren()
: child
);
qMeta.addMany(
deleted.map((mediaFile) => ({
absPath: path.join(root, mediaFile.path),
publicPath: mediaFile.path,
stat: null,
mediaFile,
})),
);
}
return;
}
// All processes must be performed again if there is no file.
if (
!mediaFile ||
stat.size !== mediaFile.size ||
stat.mtime.getTime() !== mediaFile.date.getTime()
) {
qMeta.add({ absPath, publicPath, stat, mediaFile });
return;
}
// If the scanners changed, it may mean more processes should be run.
queueProcessors({ absPath, stat, mediaFile });
},
maxJobs: 24,
});
using qMeta = new async.Queue({
name: "Update Metadata",
async fn({ absPath, publicPath, stat, mediaFile }: UpdateMetadataJob) {
if (!stat) {
// File was deleted.
await runUndoProcessors(UNWRAP(mediaFile));
return;
}
// TODO: run scrubLocationMetadata first
const hash = await new Promise<string>((resolve, reject) => {
const reader = fs.createReadStream(absPath);
reader.on("error", reject);
const hasher = crypto.createHash("sha1").setEncoding("hex");
hasher.on("error", reject);
hasher.on("readable", () => resolve(hasher.read()));
reader.pipe(hasher);
});
let date = stat.mtime;
if (
mediaFile &&
mediaFile.date.getTime() < stat.mtime.getTime() &&
Date.now() - stat.mtime.getTime() < monthMilliseconds
) {
date = mediaFile.date;
console.warn(
`M-time on ${publicPath} was likely corrupted. ${
formatDate(
mediaFile.date,
)
} -> ${formatDate(stat.mtime)}`,
);
}
mediaFile = MediaFile.createFile({
path: publicPath,
date,
hash,
size: stat.size,
duration: mediaFile?.duration ?? 0,
dimensions: mediaFile?.dimensions ?? "",
contents: mediaFile?.contents ?? "",
});
await queueProcessors({ absPath, stat, mediaFile });
},
getItemText: (job) =>
job.publicPath.slice(1) + (job.stat ? "" : " (deleted)"),
maxJobs: 10,
});
using qProcess = new async.Queue({
name: "Process Contents",
async fn(
{ absPath, stat, mediaFile, processor, index, after }: ProcessJob,
spin,
) {
await processor.run({ absPath, stat, mediaFile, spin });
mediaFile.setProcessed(mediaFile.processed | (1 << (16 + index)));
for (const dependantJob of after) {
ASSERT(
dependantJob.needs > 0,
`dependantJob.needs > 0, ${dependantJob.needs}`,
);
dependantJob.needs -= 1;
if (dependantJob.needs == 0) qProcess.add(dependantJob);
}
},
getItemText: ({ mediaFile, processor }) =>
`${mediaFile.path.slice(1)} - ${processor.name}`,
maxJobs: 4,
});
function decodeProcessors(input: string) {
return input
.split(";")
.filter(Boolean)
.map(([a, b, c]) => ({
id: a,
hash: (b.charCodeAt(0) << 8) + c.charCodeAt(0),
}));
}
async function queueProcessors({
absPath,
stat,
mediaFile,
}: Omit<ProcessFileArgs, "spin">) {
const ext = mediaFile.extensionNonEmpty.toLowerCase();
let possible = processors.filter((p) =>
p.include ? p.include.has(ext) : !p.exclude?.has(ext)
);
if (possible.length === 0) return;
const hash = possible.reduce((a, b) => a ^ b.hash, 0) | 1;
ASSERT(hash <= 0xffff, `${hash.toString(16)} has no bits above 16 set`);
let processed = mediaFile.processed;
// If the hash has changed, migrate the bitfield over.
// This also runs when the processor hash is in it's initial 0 state.
const order = decodeProcessors(mediaFile.processors);
if ((processed & 0xffff) !== hash) {
const previous = order.filter(
(_, i) => (processed & (1 << (16 + i))) !== 0,
);
processed = hash;
for (const { id, hash } of previous) {
const p = processors.find((p) => p.id === id);
if (!p) continue;
const index = possible.indexOf(p);
if (index !== -1 && p.hash === hash) {
processed |= 1 << (16 + index);
} else {
if (p.undo) await p.undo(mediaFile);
}
}
mediaFile.setProcessors(
processed,
possible
.map((p) => p.id + String.fromCharCode(p.hash >> 8, p.hash & 0xff))
.join(";"),
);
} else {
possible = order.map(({ id }) =>
UNWRAP(possible.find((p) => p.id === id))
);
}
// Queue needed processors.
const jobs: ProcessJob[] = [];
for (let i = 0, { length } = possible; i < length; i += 1) {
if ((processed & (1 << (16 + i))) === 0) {
const job: ProcessJob = {
absPath,
stat,
mediaFile,
processor: possible[i],
index: i,
after: [],
needs: possible[i].depends.length,
};
jobs.push(job);
if (job.needs === 0) qProcess.add(job);
}
}
for (const job of jobs) {
for (const dependId of job.processor.depends) {
const dependJob = jobs.find((j) => j.processor.id === dependId);
if (dependJob) {
dependJob.after.push(job);
} else {
ASSERT(job.needs > 0, `job.needs !== 0, ${job.needs}`);
job.needs -= 1;
if (job.needs === 0) qProcess.add(job);
}
}
}
}
async function runUndoProcessors(mediaFile: MediaFile) {
const { processed } = mediaFile;
const previous = decodeProcessors(mediaFile.processors).filter(
(_, i) => (processed & (1 << (16 + i))) !== 0,
);
for (const { id } of previous) {
const p = processors.find((p) => p.id === id);
if (!p) continue;
if (p.undo) {
await p.undo(mediaFile);
}
}
mediaFile.delete();
}
// Add the root & recursively iterate!
qList.add(root);
await qList.done();
await qMeta.done();
await qProcess.done();
// Update directory metadata
const dirs = MediaFile.getDirectoriesToReindex().sort(
(a, b) => b.path.length - a.path.length,
);
for (const dir of dirs) {
const children = dir.getChildren();
// readme.txt
const readmeContent = children.find((x) =>
x.basename === "readme.txt"
)?.contents ?? "";
// dirsort
let dirsort: string[] | null = null;
const dirSortRaw =
children.find((x) => x.basename === ".dirsort")?.contents ?? "";
if (dirSortRaw) {
dirsort = dirSortRaw
.split("\n")
.map((x) => x.trim())
.filter(Boolean);
}
// Permissions
if (children.some((x) => x.basename === ".friends")) {
FilePermissions.setPermissions(dir.path, 1);
} else {
FilePermissions.setPermissions(dir.path, 0);
}
// Recursive stats.
let totalSize = 0;
let newestDate = new Date(0);
let allHashes = "";
for (const child of children) {
totalSize += child.size;
allHashes += child.hash;
if (child.basename !== "/readme.txt" && child.date > newestDate) {
newestDate = child.date;
}
}
const dirHash = crypto
.createHash("sha1")
.update(dir.path + allHashes)
.digest("hex");
MediaFile.markDirectoryProcessed({
id: dir.id,
timestamp: newestDate,
contents: readmeContent,
size: totalSize,
hash: dirHash,
dirsort,
});
}
// Sync to remote
if ((await fs.readdir(workDir)).length > 0) {
await rsync.spawn({
args: [
"--links",
"--recursive",
"--times",
"--partial",
"--progress",
"--remove-source-files",
"--delay-updates",
workDir + "/",
"clo@zenith:/mnt/storage1/clover/Documents/Config/clover_file/derived/",
],
title: "Uploading Derived Assets",
cwd: process.cwd(),
});
await fs.removeEmptyDirectories(workDir);
} else {
console.info("No new derived assets");
}
MediaFile.db.prepare("VACUUM").run();
MediaFile.db.reload();
// TODO: reload prod web instance
await rsync.spawn({
args: [
MediaFile.db.file,
"clo@zenith:/mnt/storage1/clover/Documents/Config/paperclover/cache.sqlite",
],
title: "Uploading Database",
cwd: process.cwd(),
});
{
const res = await fetch("https://db.paperclover.net/reload", {
method: "post",
headers: {
Authorization: sotToken,
},
});
if (!res.ok) {
console.warn(
`Failed to reload remote database ${res.status} ${res.statusText}`,
);
}
}
console.info(
"Updated file viewer index in \x1b[1m" +
((performance.now() - start) / 1000).toFixed(1) +
"s\x1b[0m",
);
const { duration, count } = MediaFile.db
.prepare<[], { count: number; duration: number }>(
`
select
count(*) as count,
sum(duration) as duration
from media_files
`,
)
.getNonNull();
console.info();
console.info(
"Global Stats:\n" +
`- File Count: \x1b[1m${count}\x1b[0m\n` +
`- Canonical Size: \x1b[1m${
formatSize(MediaFile.getByPath("/")!.size)
}\x1b[0m\n` +
`- Media Duration: \x1b[1m${formatDurationLong(duration)}\x1b[0m\n`,
);
}
interface Process {
name: string;
enable?: boolean;
include?: Set<string>;
exclude?: Set<string>;
depends?: string[];
version?: number;
/* Perform an action. */
run(args: ProcessFileArgs): Promise<void>;
/* Should detect if `run` was never even run before before undoing state */
undo?(mediaFile: MediaFile): Promise<void>;
}
const ffprobeBin = testProgram("ffprobe", "--help");
const ffmpegBin = testProgram("ffmpeg", "--help");
const ffmpegOptions = ["-hide_banner", "-loglevel", "warning"];
const procDuration: Process = {
name: "calculate duration",
enable: ffprobeBin !== null,
include: rules.extsDuration,
async run({ absPath, mediaFile }) {
const { stdout } = await subprocess.exec(ffprobeBin!, [
"-v",
"error",
"-show_entries",
"format=duration",
"-of",
"default=noprint_wrappers=1:nokey=1",
absPath,
]);
const duration = parseFloat(stdout.trim());
if (Number.isNaN(duration)) {
throw new Error("Could not extract duration from " + stdout);
}
mediaFile.setDuration(Math.ceil(duration));
},
};
// NOTE: Never re-order the processors. Add new ones at the end.
const procDimensions: Process = {
name: "calculate dimensions",
enable: ffprobeBin != null,
include: rules.extsDimensions,
async run({ absPath, mediaFile }) {
const ext = path.extname(absPath);
let dimensions;
if (ext === ".svg") {
// Parse out of text data
const content = await fs.readFile(absPath, "utf8");
const widthMatch = content.match(/width="(\d+)"/);
const heightMatch = content.match(/height="(\d+)"/);
if (widthMatch && heightMatch) {
dimensions = `${widthMatch[1]}x${heightMatch[1]}`;
}
} else {
// Use ffprobe to observe streams
const { stdout } = await execFile("ffprobe", [
"-v",
"error",
"-select_streams",
"v:0",
"-show_entries",
"stream=width,height",
"-of",
"csv=s=x:p=0",
absPath,
]);
if (stdout.includes("x")) {
dimensions = stdout.trim();
}
}
mediaFile.setDimensions(dimensions ?? "");
},
};
const procLoadTextContents: Process = {
name: "load text content",
include: rules.extsReadContents,
async run({ absPath, mediaFile, stat }) {
if (stat.size > 1_000_000) return;
const text = await fs.readFile(absPath, "utf-8");
mediaFile.setContents(text);
},
};
const procHighlightCode: Process = {
name: "highlight source code",
include: new Set(rules.extsCode.keys()),
async run({ absPath, mediaFile, stat }) {
const language = UNWRAP(
rules.extsCode.get(path.extname(absPath).toLowerCase()),
);
// An issue is that .ts is an overloaded extension, shared between
// 'transport stream' and 'typescript'.
//
// Filter used here is:
// - more than 1mb
// - invalid UTF-8
if (stat.size > 1_000_000) return;
let code;
const buf = await fs.readFile(absPath);
try {
code = new TextDecoder("utf-8", { fatal: true }).decode(buf);
} catch (error) {
mediaFile.setContents("");
return;
}
const content = await highlight.highlightCode(code, language);
mediaFile.setContents(content);
},
};
const procImageSubsets: Process = {
name: "encode image subsets",
include: rules.extsImage,
depends: ["calculate dimensions"],
version: 2,
async run({ absPath, mediaFile, spin }) {
const { width, height } = UNWRAP(mediaFile.parseDimensions());
const targetSizes = transcodeRules.imageSizes.filter((w) => w < width);
const baseStatus = spin.text;
using stack = new DisposableStack();
for (const size of targetSizes) {
const { w, h } = resizeDimensions(width, height, size);
for (const { ext, args } of transcodeRules.imagePresets) {
spin.text = baseStatus + ` (${w}x${h}, ${ext.slice(1).toUpperCase()})`;
stack.use(
await produceAsset(`${mediaFile.hash}/${size}${ext}`, async (out) => {
await fs.mkdir(path.dirname(out));
await fs.rm(out, { force: true });
await execFile(ffmpegBin!, [
...ffmpegOptions,
"-i",
absPath,
"-vf",
`scale=${w}:${h}:force_original_aspect_ratio=increase,crop=${w}:${h}`,
...args,
out,
]);
return [out];
}),
);
}
}
stack.move();
},
async undo(mediaFile) {
const { width } = UNWRAP(mediaFile.parseDimensions());
const targetSizes = transcodeRules.imageSizes.filter((w) => w < width);
for (const size of targetSizes) {
for (const { ext } of transcodeRules.imagePresets) {
unproduceAsset(`${mediaFile.hash}/${size}${ext}`);
}
}
},
};
const qualityMap: Record<string, string> = {
u: "ultra-high",
h: "high",
m: "medium",
l: "low",
d: "data-saving",
};
const procVideos = transcodeRules.videoFormats.map<Process>((preset) => ({
name: `encode ${preset.codec} ${UNWRAP(qualityMap[preset.id[1]])}`,
include: rules.extsVideo,
enable: ffmpegBin != null,
async run({ absPath, mediaFile, spin }) {
if ((mediaFile.duration ?? 0) < 10) return;
await produceAsset(`${mediaFile.hash}/${preset.id}`, async (base) => {
base = path.dirname(base);
await fs.mkdir(base);
let inputArgs = ["-i", absPath];
try {
const config = await fs.readJson<any>(
path.join(
path.dirname(absPath),
path.basename(absPath, path.extname(absPath)) + ".json",
),
);
if (config.encoder && typeof config.encoder.videoSrc === "string") {
const { videoSrc, audioSrc, rate } = config.encoder;
inputArgs = [
...(rate ? ["-r", String(rate)] : []),
"-i",
videoSrc,
...(audioSrc ? ["-i", audioSrc] : []),
];
}
} catch (err: any) {
if (err?.code !== "ENOENT") throw err;
}
const args = transcodeRules.getVideoArgs(preset, base, inputArgs);
try {
const fakeProgress = new Progress({ text: spin.text, spinner: null });
fakeProgress.stop();
spin.format = (now: number) => fakeProgress.format(now);
// @ts-expect-error
fakeProgress.redraw = () => spin.redraw();
await ffmpeg.spawn({
ffmpeg: ffmpegBin!,
title: fakeProgress.text,
progress: fakeProgress,
args,
cwd: base,
});
return await collectFiles();
} catch (err) {
for (const file of await collectFiles()) {
try {
fs.rm(file);
} catch {}
}
throw err;
}
async function collectFiles(): Promise<string[]> {
return (await fs.readdir(base))
.filter((basename) => basename.startsWith(preset.id))
.map((basename) => path.join(base, basename));
}
});
},
}));
const procCompression = [
{ name: "gzip", fn: () => zlib.createGzip({ level: 9 }) },
{ name: "zstd", fn: () => zlib.createZstdCompress() },
].map(
({ name, fn }) =>
({
name: `compress ${name}`,
exclude: rules.extsPreCompressed,
async run({ absPath, mediaFile }) {
if ((mediaFile.size ?? 0) < 10) return;
await produceAsset(`${mediaFile.hash}/${name}`, async (base) => {
fs.mkdirSync(path.dirname(base));
await stream.promises.pipeline(
fs.createReadStream(absPath),
fn(),
fs.createWriteStream(base),
);
return [base];
});
},
}) satisfies Process as Process,
);
const processors = [
procDimensions,
procDuration,
procLoadTextContents,
procHighlightCode,
procImageSubsets,
...procVideos,
...procCompression,
].map((process, id, all) => {
const strIndex = (id: number) => String.fromCharCode("a".charCodeAt(0) + id);
return {
...(process as Process),
id: strIndex(id),
// Create a unique key.
hash: new Uint16Array(
crypto
.createHash("sha1")
.update(
process.run.toString() +
(process.version ? String(process.version) : ""),
)
.digest().buffer,
).reduce((a, b) => a ^ b),
depends: (process.depends ?? []).map((depend) => {
const index = all.findIndex((p) => p.name === depend);
if (index === -1) throw new Error(`Cannot find depend '${depend}'`);
if (index === id) throw new Error(`Cannot depend on self: '${depend}'`);
return strIndex(index);
}),
};
});
function resizeDimensions(w: number, h: number, desiredWidth: number) {
ASSERT(desiredWidth < w, `${desiredWidth} < ${w}`);
return { w: desiredWidth, h: Math.floor((h / w) * desiredWidth) };
}
async function produceAsset(
key: string,
builder: (prefix: string) => Promise<string[]>,
) {
const asset = AssetRef.putOrIncrement(key);
try {
if (asset.refs === 1) {
const paths = await builder(path.join(workDir, key));
asset.addFiles(
paths.map((file) => path.relative(workDir, file).replaceAll("\\", "/")),
);
}
return {
[Symbol.dispose]: () => asset.unref(),
};
} catch (err: any) {
if (err && typeof err === "object") err.assetKey = key;
asset.unref();
throw err;
}
}
async function unproduceAsset(key: string) {
const ref = AssetRef.get(key);
if (ref) {
ref.unref();
console.warn(`TODO: unref ${key}`);
// TODO: remove associated files from target
}
}
interface UpdateMetadataJob {
absPath: string;
publicPath: string;
stat: fs.Stats | null;
mediaFile: MediaFile | null;
}
interface ProcessFileArgs {
absPath: string;
stat: fs.Stats;
mediaFile: MediaFile;
spin: Spinner;
}
interface ProcessJob {
absPath: string;
stat: fs.Stats;
mediaFile: MediaFile;
processor: (typeof processors)[0];
index: number;
after: ProcessJob[];
needs: number;
}
export function skipBasename(basename: string): boolean {
// dot files must be incrementally tracked
if (basename === ".dirsort") return true;
if (basename === ".friends") return true;
return (
basename.startsWith(".") ||
basename.startsWith("._") ||
basename.startsWith(".tmp") ||
basename === ".DS_Store" ||
basename.toLowerCase() === "thumbs.db" ||
basename.toLowerCase() === "desktop.ini"
);
}
export function toPublicPath(absPath: string) {
ASSERT(path.isAbsolute(absPath), "non-absolute " + absPath);
if (absPath === root) return "/";
return "/" + path.relative(root, absPath).replaceAll("\\", "/");
}
export function testProgram(name: string, helpArgument: string) {
try {
child_process.spawnSync(name, [helpArgument]);
return name;
} catch (err) {
console.warn(`Missing or corrupt executable '${name}'`);
}
return null;
}
const monthMilliseconds = 30 * 24 * 60 * 60 * 1000;
import { Progress } from "@paperclover/console/Progress";
import { Spinner } from "@paperclover/console/Spinner";
import * as async from "#sitegen/async";
import * as fs from "#sitegen/fs";
import * as subprocess from "#sitegen/subprocess";
import * as path from "node:path";
import * as zlib from "node:zlib";
import * as child_process from "node:child_process";
import * as crypto from "node:crypto";
import * as stream from "node:stream";
import { MediaFile, MediaFileKind } from "@/file-viewer/models/MediaFile.ts";
import { AssetRef } from "@/file-viewer/models/AssetRef.ts";
import { FilePermissions } from "@/file-viewer/models/FilePermissions.ts";
import {
formatDate,
formatDurationLong,
formatSize,
} from "@/file-viewer/format.ts";
import * as rules from "@/file-viewer/rules.ts";
import * as highlight from "@/file-viewer/highlight.ts";
import * as ffmpeg from "@/file-viewer/ffmpeg.ts";
import * as rsync from "@/file-viewer/rsync.ts";
import * as transcodeRules from "@/file-viewer/transcode-rules.ts";
import { rawFileRoot as root } from "../paths.ts";