// The file scanner incrementally updates an sqlite database with file // stats. Additionally, it runs "processors" on files, which precompute // expensive data such as running `ffprobe` on all media to get the // duration. // // Processors are also used to derive compressed and optimized assets, // which is how automatic JXL / AV1 encoding is done. Derived files are // uploaded to the clover NAS to be pulled by VPS instances for hosting. // // This is the third iteration of the scanner, hence its name "scan3"; // Remember that any software you want to be maintainable and high // quality cannot be written with AI. const root = path.resolve("/Volumes/clover/Published"); const workDir = path.resolve(".clover/derived"); export async function main() { const start = performance.now(); const timerSpinner = new Spinner({ text: () => `paper clover's scan3 [${((performance.now() - start) / 1000).toFixed( 1, )}s]`, fps: 10, }); using _endTimerSpinner = { [Symbol.dispose]: () => timerSpinner.stop() }; // Read a directory or file stat and queue up changed files. using qList = new async.Queue({ name: "Discover Tree", async fn(absPath: string) { const stat = await fs.stat(absPath); const publicPath = toPublicPath(absPath); const mediaFile = MediaFile.getByPath(publicPath); if (stat.isDirectory()) { const items = await fs.readdir(absPath); qList.addMany(items.map((subPath) => path.join(absPath, subPath))); if (mediaFile) { const deleted = mediaFile .getChildren() .filter((child) => !items.includes(child.basename)) .flatMap((child) => child.kind === MediaFileKind.directory ? child.getRecursiveFileChildren() : child, ); qMeta.addMany( deleted.map((mediaFile) => ({ absPath: path.join(root, mediaFile.path), publicPath: mediaFile.path, stat: null, mediaFile, })), ); } return; } // All processes must be performed again if there is no file. if ( !mediaFile || stat.size !== mediaFile.size || stat.mtime.getTime() !== mediaFile.date.getTime() ) { qMeta.add({ absPath, publicPath, stat, mediaFile }); return; } // If the scanners changed, it may mean more processes should be run. queueProcessors({ absPath, stat, mediaFile }); }, maxJobs: 24, }); using qMeta = new async.Queue({ name: "Update Metadata", async fn({ absPath, publicPath, stat, mediaFile }: UpdateMetadataJob) { if (!stat) { // File was deleted. await runUndoProcessors(UNWRAP(mediaFile)); return; } // TODO: run scrubLocationMetadata first const hash = await new Promise((resolve, reject) => { const reader = fs.createReadStream(absPath); reader.on("error", reject); const hasher = crypto.createHash("sha1").setEncoding("hex"); hasher.on("error", reject); hasher.on("readable", () => resolve(hasher.read())); reader.pipe(hasher); }); let date = stat.mtime; if ( mediaFile && mediaFile.date.getTime() < stat.mtime.getTime() && Date.now() - stat.mtime.getTime() < monthMilliseconds ) { date = mediaFile.date; console.warn( `M-time on ${publicPath} was likely corrupted. ${formatDate( mediaFile.date, )} -> ${formatDate(stat.mtime)}`, ); } mediaFile = MediaFile.createFile({ path: publicPath, date, hash, size: stat.size, duration: mediaFile?.duration ?? 0, dimensions: mediaFile?.dimensions ?? "", contents: mediaFile?.contents ?? "", }); await queueProcessors({ absPath, stat, mediaFile }); }, getItemText: (job) => job.publicPath.slice(1) + (job.stat ? "" : " (deleted)"), maxJobs: 10, }); using qProcess = new async.Queue({ name: "Process Contents", async fn( { absPath, stat, mediaFile, processor, index, after }: ProcessJob, spin, ) { await processor.run({ absPath, stat, mediaFile, spin }); mediaFile.setProcessed(mediaFile.processed | (1 << (16 + index))); for (const dependantJob of after) { ASSERT( dependantJob.needs > 0, `dependantJob.needs > 0, ${dependantJob.needs}`, ); dependantJob.needs -= 1; if (dependantJob.needs == 0) qProcess.add(dependantJob); } }, getItemText: ({ mediaFile, processor }) => `${mediaFile.path.slice(1)} - ${processor.name}`, maxJobs: 4, }); function decodeProcessors(input: string) { return input .split(";") .filter(Boolean) .map(([a, b, c]) => ({ id: a, hash: (b.charCodeAt(0) << 8) + c.charCodeAt(0), })); } async function queueProcessors({ absPath, stat, mediaFile, }: Omit) { const ext = mediaFile.extensionNonEmpty.toLowerCase(); let possible = processors.filter((p) => p.include ? p.include.has(ext) : !p.exclude?.has(ext), ); if (possible.length === 0) return; const hash = possible.reduce((a, b) => a ^ b.hash, 0) | 1; ASSERT(hash <= 0xffff, `${hash.toString(16)} has no bits above 16 set`); let processed = mediaFile.processed; // If the hash has changed, migrate the bitfield over. // This also runs when the processor hash is in it's initial 0 state. const order = decodeProcessors(mediaFile.processors); if ((processed & 0xffff) !== hash) { const previous = order.filter( (_, i) => (processed & (1 << (16 + i))) !== 0, ); processed = hash; for (const { id, hash } of previous) { const p = processors.find((p) => p.id === id); if (!p) continue; const index = possible.indexOf(p); if (index !== -1 && p.hash === hash) { processed |= 1 << (16 + index); } else { if (p.undo) await p.undo(mediaFile); } } mediaFile.setProcessors( processed, possible .map((p) => p.id + String.fromCharCode(p.hash >> 8, p.hash & 0xff)) .join(";"), ); } else { possible = order.map(({ id }) => UNWRAP(possible.find((p) => p.id === id)), ); } // Queue needed processors. const jobs: ProcessJob[] = []; for (let i = 0, { length } = possible; i < length; i += 1) { if ((processed & (1 << (16 + i))) === 0) { const job: ProcessJob = { absPath, stat, mediaFile, processor: possible[i], index: i, after: [], needs: possible[i].depends.length, }; jobs.push(job); if (job.needs === 0) qProcess.add(job); } } for (const job of jobs) { for (const dependId of job.processor.depends) { const dependJob = jobs.find((j) => j.processor.id === dependId); if (dependJob) { dependJob.after.push(job); } else { ASSERT(job.needs > 0, `job.needs !== 0, ${job.needs}`); job.needs -= 1; if (job.needs === 0) qProcess.add(job); } } } } async function runUndoProcessors(mediaFile: MediaFile) { const { processed } = mediaFile; const previous = decodeProcessors(mediaFile.processors).filter( (_, i) => (processed & (1 << (16 + i))) !== 0, ); for (const { id } of previous) { const p = processors.find((p) => p.id === id); if (!p) continue; if (p.undo) { await p.undo(mediaFile); } } mediaFile.delete(); } // Add the root & recursively iterate! qList.add(root); await qList.done(); await qMeta.done(); await qProcess.done(); // Update directory metadata const dirs = MediaFile.getDirectoriesToReindex().sort( (a, b) => b.path.length - a.path.length, ); for (const dir of dirs) { const children = dir.getChildren(); // readme.txt const readmeContent = children.find((x) => x.basename === "readme.txt")?.contents ?? ""; // dirsort let dirsort: string[] | null = null; const dirSortRaw = children.find((x) => x.basename === ".dirsort")?.contents ?? ""; if (dirSortRaw) { dirsort = dirSortRaw .split("\n") .map((x) => x.trim()) .filter(Boolean); } // Permissions if (children.some((x) => x.basename === ".friends")) { FilePermissions.setPermissions(dir.path, 1); } else { FilePermissions.setPermissions(dir.path, 0); } // Recursive stats. let totalSize = 0; let newestDate = new Date(0); let allHashes = ""; for (const child of children) { totalSize += child.size; allHashes += child.hash; if (child.basename !== "/readme.txt" && child.date > newestDate) { newestDate = child.date; } } const dirHash = crypto .createHash("sha1") .update(dir.path + allHashes) .digest("hex"); MediaFile.markDirectoryProcessed({ id: dir.id, timestamp: newestDate, contents: readmeContent, size: totalSize, hash: dirHash, dirsort, }); } // Sync to remote if ((await fs.readdir(workDir)).length > 0) { await rsync.spawn({ args: [ "--links", "--recursive", "--times", "--partial", "--progress", "--remove-source-files", "--delay-updates", workDir + "/", "clo@zenith:/mnt/storage1/clover/Documents/Config/clover_file/derived/", ], title: "Uploading Derived Assets", cwd: process.cwd(), }); await fs.removeEmptyDirectories(workDir); } else { console.info("No new derived assets"); } console.info( "Updated file viewer index in \x1b[1m" + ((performance.now() - start) / 1000).toFixed(1) + "s\x1b[0m", ); MediaFile.db.prepare("VACUUM").run(); const { duration, count } = MediaFile.db .prepare<[], { count: number; duration: number }>( ` select count(*) as count, sum(duration) as duration from media_files `, ) .getNonNull(); console.info(); console.info( "Global Stats:\n" + `- File Count: \x1b[1m${count}\x1b[0m\n` + `- Canonical Size: \x1b[1m${formatSize(MediaFile.getByPath("/")!.size)}\x1b[0m\n` + `- Media Duration: \x1b[1m${formatDurationLong(duration)}\x1b[0m\n`, ); } interface Process { name: string; enable?: boolean; include?: Set; exclude?: Set; depends?: string[]; version?: number; /* Perform an action. */ run(args: ProcessFileArgs): Promise; /* Should detect if `run` was never even run before before undoing state */ undo?(mediaFile: MediaFile): Promise; } const execFileRaw = util.promisify(child_process.execFile); const execFile: typeof execFileRaw = (( ...args: Parameters ) => execFileRaw(...args).catch((e: any) => { if (e?.message?.startsWith?.("Command failed")) { if (e.code > 2 ** 31) e.code |= 0; const code = e.signal ? `signal ${e.signal}` : `code ${e.code}`; e.message = `${e.cmd.split(" ")[0]} failed with ${code}`; } throw e; })) as any; const ffprobeBin = testProgram("ffprobe", "--help"); const ffmpegBin = testProgram("ffmpeg", "--help"); const ffmpegOptions = ["-hide_banner", "-loglevel", "warning"]; const procDuration: Process = { name: "calculate duration", enable: ffprobeBin !== null, include: rules.extsDuration, async run({ absPath, mediaFile }) { const { stdout } = await execFile(ffprobeBin!, [ "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", absPath, ]); const duration = parseFloat(stdout.trim()); if (Number.isNaN(duration)) { throw new Error("Could not extract duration from " + stdout); } mediaFile.setDuration(Math.ceil(duration)); }, }; // NOTE: Never re-order the processors. Add new ones at the end. const procDimensions: Process = { name: "calculate dimensions", enable: ffprobeBin != null, include: rules.extsDimensions, async run({ absPath, mediaFile }) { const ext = path.extname(absPath); let dimensions; if (ext === ".svg") { // Parse out of text data const content = await fs.readFile(absPath, "utf8"); const widthMatch = content.match(/width="(\d+)"/); const heightMatch = content.match(/height="(\d+)"/); if (widthMatch && heightMatch) { dimensions = `${widthMatch[1]}x${heightMatch[1]}`; } } else { // Use ffprobe to observe streams const { stdout } = await execFile("ffprobe", [ "-v", "error", "-select_streams", "v:0", "-show_entries", "stream=width,height", "-of", "csv=s=x:p=0", absPath, ]); if (stdout.includes("x")) { dimensions = stdout.trim(); } } mediaFile.setDimensions(dimensions ?? ""); }, }; const procLoadTextContents: Process = { name: "load text content", include: rules.extsReadContents, async run({ absPath, mediaFile, stat }) { if (stat.size > 1_000_000) return; const text = await fs.readFile(absPath, "utf-8"); console.log({ text }); mediaFile.setContents(text); }, }; const procHighlightCode: Process = { name: "highlight source code", include: new Set(rules.extsCode.keys()), async run({ absPath, mediaFile, stat }) { const language = UNWRAP( rules.extsCode.get(path.extname(absPath).toLowerCase()), ); // An issue is that .ts is an overloaded extension, shared between // 'transport stream' and 'typescript'. // // Filter used here is: // - more than 1mb // - invalid UTF-8 if (stat.size > 1_000_000) return; let code; const buf = await fs.readFile(absPath); try { code = new TextDecoder("utf-8", { fatal: true }).decode(buf); } catch (error) { mediaFile.setContents(""); return; } const content = await highlight.highlightCode(code, language); mediaFile.setContents(content); }, }; const procImageSubsets: Process = { name: "encode image subsets", include: rules.extsImage, depends: ["calculate dimensions"], version: 2, async run({ absPath, mediaFile, spin }) { const { width, height } = UNWRAP(mediaFile.parseDimensions()); const targetSizes = transcodeRules.imageSizes.filter((w) => w < width); const baseStatus = spin.text; using stack = new DisposableStack(); for (const size of targetSizes) { const { w, h } = resizeDimensions(width, height, size); for (const { ext, args } of transcodeRules.imagePresets) { spin.text = baseStatus + ` (${w}x${h}, ${ext.slice(1).toUpperCase()})`; stack.use( await produceAsset(`${mediaFile.hash}/${size}${ext}`, async (out) => { await fs.mkdir(path.dirname(out)); await fs.rm(out, { force: true }); await execFile(ffmpegBin!, [ ...ffmpegOptions, "-i", absPath, "-vf", `scale=${w}:${h}:force_original_aspect_ratio=increase,crop=${w}:${h}`, ...args, out, ]); return [out]; }), ); } } stack.move(); }, async undo(mediaFile) { const { width } = UNWRAP(mediaFile.parseDimensions()); const targetSizes = transcodeRules.imageSizes.filter((w) => w < width); for (const size of targetSizes) { for (const { ext } of transcodeRules.imagePresets) { unproduceAsset(`${mediaFile.hash}/${size}${ext}`); } } }, }; const qualityMap: Record = { u: "ultra-high", h: "high", m: "medium", l: "low", d: "data-saving", }; const procVideos = transcodeRules.videoFormats.map((preset) => ({ name: `encode ${preset.codec} ${UNWRAP(qualityMap[preset.id[1]])}`, include: rules.extsVideo, enable: ffmpegBin != null, async run({ absPath, mediaFile, spin }) { if ((mediaFile.duration ?? 0) < 10) return; await produceAsset(`${mediaFile.hash}/${preset.id}`, async (base) => { base = path.dirname(base); await fs.mkdir(base); let inputArgs = ["-i", absPath]; try { const config = await fs.readJson( path.join( path.dirname(absPath), path.basename(absPath, path.extname(absPath)) + ".json", ), ); if (config.encoder && typeof config.encoder.videoSrc === "string") { const { videoSrc, audioSrc, rate } = config.encoder; inputArgs = [ ...(rate ? ["-r", String(rate)] : []), "-i", videoSrc, ...(audioSrc ? ["-i", audioSrc] : []), ]; } } catch (err: any) { if (err?.code !== "ENOENT") throw err; } const args = transcodeRules.getVideoArgs(preset, base, inputArgs); try { const fakeProgress = new Progress({ text: spin.text, spinner: null }); fakeProgress.stop(); spin.format = (now: number) => fakeProgress.format(now); // @ts-expect-error fakeProgress.redraw = () => spin.redraw(); await ffmpeg.spawn({ ffmpeg: ffmpegBin!, title: fakeProgress.text, progress: fakeProgress, args, cwd: base, }); return await collectFiles(); } catch (err) { for (const file of await collectFiles()) { try { fs.rm(file); } catch {} } throw err; } async function collectFiles(): Promise { return (await fs.readdir(base)) .filter((basename) => basename.startsWith(preset.id)) .map((basename) => path.join(base, basename)); } }); }, })); const procCompression = [ { name: "gzip", fn: () => zlib.createGzip({ level: 9 }) }, { name: "zstd", fn: () => zlib.createZstdCompress() }, ].map( ({ name, fn }) => ({ name: `compress ${name}`, exclude: rules.extsPreCompressed, async run({ absPath, mediaFile }) { if ((mediaFile.size ?? 0) < 10) return; await produceAsset(`${mediaFile.hash}/${name}`, async (base) => { fs.mkdirSync(path.dirname(base)); await stream.promises.pipeline( fs.createReadStream(absPath), fn(), fs.createWriteStream(base), ); return [base]; }); }, }) satisfies Process as Process, ); const processors = [ procDimensions, procDuration, procLoadTextContents, procHighlightCode, procImageSubsets, ...procVideos, ...procCompression, ].map((process, id, all) => { const strIndex = (id: number) => String.fromCharCode("a".charCodeAt(0) + id); return { ...(process as Process), id: strIndex(id), // Create a unique key. hash: new Uint16Array( crypto .createHash("sha1") .update( process.run.toString() + (process.version ? String(process.version) : ""), ) .digest().buffer, ).reduce((a, b) => a ^ b), depends: (process.depends ?? []).map((depend) => { const index = all.findIndex((p) => p.name === depend); if (index === -1) throw new Error(`Cannot find depend '${depend}'`); if (index === id) throw new Error(`Cannot depend on self: '${depend}'`); return strIndex(index); }), }; }); function resizeDimensions(w: number, h: number, desiredWidth: number) { ASSERT(desiredWidth < w, `${desiredWidth} < ${w}`); return { w: desiredWidth, h: Math.floor((h / w) * desiredWidth) }; } async function produceAsset( key: string, builder: (prefix: string) => Promise, ) { const asset = AssetRef.putOrIncrement(key); try { if (asset.refs === 1) { const paths = await builder(path.join(workDir, key)); asset.addFiles( paths.map((file) => path.relative(workDir, file).replaceAll("\\", "/")), ); } return { [Symbol.dispose]: () => asset.unref(), }; } catch (err: any) { if (err && typeof err === "object") err.assetKey = key; asset.unref(); throw err; } } async function unproduceAsset(key: string) { const ref = AssetRef.get(key); if (ref) { ref.unref(); console.log(`unref ${key}`); // TODO: remove associated files from target } } interface UpdateMetadataJob { absPath: string; publicPath: string; stat: fs.Stats | null; mediaFile: MediaFile | null; } interface ProcessFileArgs { absPath: string; stat: fs.Stats; mediaFile: MediaFile; spin: Spinner; } interface ProcessJob { absPath: string; stat: fs.Stats; mediaFile: MediaFile; processor: (typeof processors)[0]; index: number; after: ProcessJob[]; needs: number; } export function skipBasename(basename: string): boolean { // dot files must be incrementally tracked if (basename === ".dirsort") return true; if (basename === ".friends") return true; return ( basename.startsWith(".") || basename.startsWith("._") || basename.startsWith(".tmp") || basename === ".DS_Store" || basename.toLowerCase() === "thumbs.db" || basename.toLowerCase() === "desktop.ini" ); } export function toPublicPath(absPath: string) { ASSERT(path.isAbsolute(absPath), "non-absolute " + absPath); if (absPath === root) return "/"; return "/" + path.relative(root, absPath).replaceAll("\\", "/"); } export function testProgram(name: string, helpArgument: string) { try { child_process.spawnSync(name, [helpArgument]); return name; } catch (err) { console.warn(`Missing or corrupt executable '${name}'`); } return null; } const monthMilliseconds = 30 * 24 * 60 * 60 * 1000; import { Progress } from "@paperclover/console/Progress"; import { Spinner } from "@paperclover/console/Spinner"; import * as async from "#sitegen/async"; import * as fs from "#sitegen/fs"; import * as path from "node:path"; import * as zlib from "node:zlib"; import * as child_process from "node:child_process"; import * as util from "node:util"; import * as crypto from "node:crypto"; import * as stream from "node:stream"; import { MediaFile, MediaFileKind } from "@/file-viewer/models/MediaFile.ts"; import { AssetRef } from "@/file-viewer/models/AssetRef.ts"; import { FilePermissions } from "@/file-viewer/models/FilePermissions.ts"; import { formatDate, formatDurationLong, formatSize, } from "@/file-viewer/format.ts"; import * as rules from "@/file-viewer/rules.ts"; import * as highlight from "@/file-viewer/highlight.ts"; import * as ffmpeg from "@/file-viewer/ffmpeg.ts"; import * as rsync from "@/file-viewer/rsync.ts"; import * as transcodeRules from "@/file-viewer/transcode-rules.ts";