import { Agent, get } from "node:https"; import * as fs from "node:fs"; import * as path from "node:path"; import { Buffer } from "node:buffer"; import type { ClientRequest } from "node:http"; import LRUCache from "lru-cache"; import { open } from "node:fs/promises"; import { createHash } from "node:crypto"; import { scoped } from "@paperclover/console"; import { escapeUri } from "./format.ts"; declare const Deno: any; const sourceOfTruth = "https://nas.paperclover.net:43250"; const caCert = fs.readFileSync("src/file-viewer/cert.pem"); const diskCacheRoot = path.join(import.meta.dirname, "../.clover/filecache/"); const diskCacheMaxSize = 14 * 1024 * 1024 * 1024; // 14GB const ramCacheMaxSize = 1 * 1024 * 1024 * 1024; // 1.5GB const loadInProgress = new Map< string, Promise<{ stream: ReadableStream }> | { stream: ReadableStream } >(); // Disk cache serializes the access times const diskCacheState: Record = loadDiskCacheState(); const diskCache = new LRUCache({ maxSize: diskCacheMaxSize, ttl: 0, sizeCalculation: (value) => value, dispose: (_, key) => { delete diskCacheState[key]; }, onInsert: (size, key) => { diskCacheState[key] = [size, Date.now()]; }, }); const ramCache = new LRUCache({ maxSize: ramCacheMaxSize, ttl: 0, sizeCalculation: (value) => value.byteLength, }); let diskCacheFlush: NodeJS.Timeout | undefined; { // Initialize the disk cache by validating all files exist, and then // inserting them in last to start order. State is repaired pessimistically. const toDelete = new Set(Object.keys(diskCacheState)); fs.mkdirSync(diskCacheRoot, { recursive: true }); for ( const file of fs.readdirSync(diskCacheRoot, { recursive: true, encoding: "utf-8", }) ) { const key = file.split("/").pop()!; if (key.length !== 40) continue; const entry = diskCacheState[key]; if (!entry) { fs.rmSync(path.join(diskCacheRoot, file), { recursive: true, force: true, }); delete diskCacheState[key]; continue; } toDelete.delete(key); } for (const key of toDelete) { delete diskCacheState[key]; } saveDiskCacheState(); const sorted = Object.keys(diskCacheState).sort((a, b) => diskCacheState[b][1] - diskCacheState[a][1] ); for (const key of sorted) { diskCache.set(key, diskCacheState[key][0]); } } export type CacheSource = "ram" | "disk" | "miss" | "lan" | "flight"; export type CompressionFormat = "gzip" | "zstd" | "raw"; const compressionFormatMap = { gzip: "gz", zstd: "zstd", raw: "file", } as const; const log = scoped("file_cache"); const lanMount = "/Volumes/clover/Published"; const hasLanMount = fs.existsSync(lanMount); /** * Fetches a file with the given compression format. * Uncompressed files are never persisted to disk. * * Returns a promise to either: * - Buffer: the data is from RAM cache * - ReadableStream: the data is being streamed in from disk/server * * Additionally, returns a string indicating the source of the data, for debugging. * * Callers must be able to consume both output types. */ export async function fetchFile( pathname: string, format: CompressionFormat = "raw", ): Promise< [Buffer | ReadableStream, encoding: CompressionFormat, src: CacheSource] > { // 1. Ram cache const cacheKey = hashKey(`${pathname}:${format}`); const ramCacheHit = ramCache.get(cacheKey); if (ramCacheHit) { log(`ram hit: ${format}${pathname}`); return [ramCacheHit, format, "ram"]; } // 2. Tee an existing loading stream. const inProgress = loadInProgress.get(cacheKey); if (inProgress) { const stream = await inProgress; const [stream1, stream2] = stream.stream.tee(); loadInProgress.set(cacheKey, { stream: stream2 }); log(`in-flight copy: ${format}${pathname}`); return [stream1, format, "flight"]; } // 3. Disk cache + Load into ram cache. if (format !== "raw") { const diskCacheHit = diskCache.get(cacheKey); if (diskCacheHit) { diskCacheState[cacheKey] = [diskCacheHit, Date.now()]; saveDiskCacheStateLater(); log(`disk hit: ${format}/${pathname}`); return [ startInProgress( cacheKey, new ReadableStream({ start: async (controller) => { const stream = fs.createReadStream( path.join(diskCacheRoot, cacheKey), ); const chunks: Buffer[] = []; stream.on("data", (chunk) => { controller.enqueue(chunk); chunks.push(chunk as Buffer); }); stream.on("end", () => { controller.close(); ramCache.set(cacheKey, Buffer.concat(chunks)); finishInProgress(cacheKey); }); stream.on("error", (error) => { controller.error(error); }); }, }), ), format, "disk", ]; } } // 4. Lan Mount (access files that prod may not have) if (hasLanMount) { log(`lan hit: ${format}/${pathname}`); return [ startInProgress( cacheKey, new ReadableStream({ start: async (controller) => { const stream = fs.createReadStream( path.join(lanMount, pathname), ); const chunks: Buffer[] = []; stream.on("data", (chunk) => { controller.enqueue(chunk); chunks.push(chunk as Buffer); }); stream.on("end", () => { controller.close(); ramCache.set(cacheKey, Buffer.concat(chunks)); finishInProgress(cacheKey); }); stream.on("error", (error) => { controller.error(error); }); }, }), ), "raw", "lan", ]; } // 4. Fetch from server const url = `${compressionFormatMap[format]}${escapeUri(pathname)}`; log(`miss: ${format}${pathname}`); const response = await startInProgress(cacheKey, fetchFileUncached(url)); const [stream1, stream2] = response.tee(); handleDownload(cacheKey, format, stream2); return [stream1, format, "miss"]; } export async function prefetchFile( pathname: string, format: CompressionFormat = "zstd", ) { const cacheKey = hashKey(`${pathname}:${format}`); const ramCacheHit = ramCache.get(cacheKey); if (ramCacheHit) { return; } if (hasLanMount) return; const url = `${compressionFormatMap[format]}${pathname}`; log(`prefetch: ${format}${pathname}`); const stream2 = await startInProgress(cacheKey, fetchFileUncached(url)); handleDownload(cacheKey, format, stream2); } async function handleDownload( cacheKey: string, format: CompressionFormat, stream2: ReadableStream, ) { let chunks: Buffer[] = []; if (format !== "raw") { const file = await open(path.join(diskCacheRoot, cacheKey), "w"); try { for await (const chunk of stream2) { await file.write(chunk); chunks.push(chunk); } } finally { file.close(); } } else { for await (const chunk of stream2) { chunks.push(chunk); } } const final = Buffer.concat(chunks); chunks.length = 0; ramCache.set(cacheKey, final); if (format !== "raw") { diskCache.set(cacheKey, final.byteLength); } finishInProgress(cacheKey); } function hashKey(key: string): string { return createHash("sha1").update(key).digest("hex"); } function startInProgress | ReadableStream>( cacheKey: string, promise: T, ): T { if (promise instanceof Promise) { let resolve2: (stream: { stream: ReadableStream }) => void; let reject2: (error: Error) => void; const stream2Promise = new Promise<{ stream: ReadableStream }>( (resolve, reject) => { resolve2 = resolve; reject2 = reject; }, ); const stream1Promise = new Promise((resolve, reject) => { promise.then((stream) => { const [stream1, stream2] = stream.tee(); const stream2Obj = { stream: stream2 }; resolve2(stream2Obj); loadInProgress.set(cacheKey, stream2Obj); resolve(stream1); }, reject); }); loadInProgress.set(cacheKey, stream2Promise); return stream1Promise as T; } else { const [stream1, stream2] = promise.tee(); loadInProgress.set(cacheKey, { stream: stream2 }); return stream1 as T; } } function loadDiskCacheState(): Record< string, [size: number, lastAccess: number] > { try { const state = JSON.parse( fs.readFileSync(path.join(diskCacheRoot, "state.json"), "utf-8"), ); return state; } catch (error) { return {}; } } function saveDiskCacheStateLater() { if (diskCacheFlush) { return; } diskCacheFlush = setTimeout(() => { saveDiskCacheState(); }, 60_000) as NodeJS.Timeout; if (diskCacheFlush.unref) { diskCacheFlush.unref(); } } process.on("exit", () => { saveDiskCacheState(); }); function saveDiskCacheState() { fs.writeFileSync( path.join(diskCacheRoot, "state.json"), JSON.stringify(diskCacheState), ); } function finishInProgress(cacheKey: string) { loadInProgress.delete(cacheKey); } // Self signed certificate must be trusted to be able to request the above URL. // // Unfortunately, Bun and Deno are both not node.js compatible, so those two // runtimes need fallback implementations. The fallback implementations calls // fetch with the `agent` value as the RequestInit. Since `fetch` decompresses // the body for you, it must be disabled. const agent: any = typeof Bun !== "undefined" ? { // Bun has two non-standard fetch extensions decompress: false, tls: { ca: caCert, }, } // TODO: https://github.com/denoland/deno/issues/12291 // : typeof Deno !== "undefined" // ? { // // Deno configures through the non-standard `client` extension // client: Deno.createHttpClient({ // caCerts: [caCert.toString()], // }), // } // Node.js supports node:http : new Agent({ ca: caCert, }); function fetchFileNode(pathname: string): Promise { return new Promise((resolve, reject) => { const request: ClientRequest = get(`${sourceOfTruth}/${pathname}`, { agent, }); request.on("response", (response) => { if (response.statusCode !== 200) { reject(new Error(`Failed to fetch ${pathname}`)); return; } const stream = new ReadableStream({ start(controller) { response.on("data", (chunk) => { controller.enqueue(chunk); }); response.on("end", () => { controller.close(); }); response.on("error", (error) => { controller.error(error); reject(error); }); }, }); resolve(stream); }); request.on("error", (error) => { reject(error); }); }); } async function fetchFileDenoBun(pathname: string): Promise { const req = await fetch(`${sourceOfTruth}/${pathname}`, agent); if (!req.ok) { throw new Error(`Failed to fetch ${pathname}`); } return req.body!; } const fetchFileUncached = typeof Bun !== "undefined" || typeof Deno !== "undefined" ? fetchFileDenoBun : fetchFileNode; export async function toBuffer( stream: ReadableStream | Buffer, ): Promise { if (!(stream instanceof ReadableStream)) { return stream; } const chunks: Buffer[] = []; for await (const chunk of stream) { chunks.push(chunk); } return Buffer.concat(chunks); }