import type { Readable } from 'stream'; import { createReadStream } from 'fs-extra'; import zip from 'zlib'; import tar from 'tar'; import { keyBy } from 'lodash/fp'; import { chain } from 'stream-chain'; import { pipeline, PassThrough } from 'stream'; import { parser } from 'stream-json/jsonl/Parser'; import type { IMetadata, ISourceProvider, ProviderType } from '../../types'; import { collect } from '../utils'; import { createDecryptionCipher } from '../encryption'; type StreamItemArray = Parameters[0]; /** * Constant for the metadata file path */ const METADATA_FILE_PATH = 'metadata.json'; /** * Provider options */ export interface ILocalFileSourceProviderOptions { file: { path: string; }; encryption: { enabled: boolean; key?: string; }; compression: { enabled: boolean; }; } export const createLocalFileSourceProvider = (options: ILocalFileSourceProviderOptions) => { return new LocalFileSourceProvider(options); }; class LocalFileSourceProvider implements ISourceProvider { type: ProviderType = 'source'; name = 'source::local-file'; options: ILocalFileSourceProviderOptions; #filestream?: Readable; constructor(options: ILocalFileSourceProviderOptions) { this.options = options; const { encryption } = this.options; if (encryption.enabled && encryption.key === undefined) { throw new Error('Missing encryption key'); } } /** * Pre flight checks regarding the provided options, opening the file */ async bootstrap() { try { this.#filestream = createReadStream(this.options.file.path); } catch (e) { throw new Error(`Could not read backup file path provided at "${this.options.file.path}"`); } } getMetadata() { // TODO: need to read the file & extract the metadata json file // => we might also need to read the schema.jsonl files & implements a custom stream-check const backupStream = this.#getBackupStream(); return this.#parseJSONFile(backupStream, METADATA_FILE_PATH); } async getSchemas() { const schemas = await collect(this.streamSchemas() as Readable); return keyBy('uid', schemas); } streamEntities(): NodeJS.ReadableStream { return this.#streamJsonlDirectory('entities'); } streamSchemas(): NodeJS.ReadableStream | Promise { return this.#streamJsonlDirectory('schemas'); } streamLinks(): NodeJS.ReadableStream { return this.#streamJsonlDirectory('links'); } streamConfiguration(): NodeJS.ReadableStream { // NOTE: TBD return this.#streamJsonlDirectory('configuration'); } #getBackupStream() { const { encryption, compression } = this.options; // This should be impossible as long as bootstrap was called first if (!this.#filestream) { throw new Error('Could not read file stream'); } const streams: StreamItemArray = [this.#filestream]; if (encryption.enabled && encryption.key) { streams.push(createDecryptionCipher(encryption.key)); } if (compression.enabled) { streams.push(zip.createGunzip()); } return chain(streams); } #streamJsonlDirectory(directory: string) { const inStream = this.#getBackupStream(); const outStream = new PassThrough({ objectMode: true }); pipeline( [ inStream, new tar.Parse({ filter(path, entry) { if (entry.type !== 'File') { return false; } const parts = path.split('/'); if (parts.length !== 2) { return false; } return parts[0] === directory; }, onentry(entry) { const transforms = [ // JSONL parser to read the data chunks one by one (line by line) parser(), // The JSONL parser returns each line as key/value (line: { key: string; value: any }) => line.value, ]; entry // Pipe transforms .pipe(chain(transforms)) // Pipe the out stream to the whole pipeline // DO NOT send the 'end' event when this entry has finished // emitting data, so that it doesn't close the out stream .pipe(outStream, { end: false }); }, }), ], () => { // Manually send the 'end' event to the out stream // once every entry has finished streaming its content outStream.end(); } ); return outStream; } async #parseJSONFile = any>( fileStream: NodeJS.ReadableStream, filePath: string ): Promise { return new Promise((resolve, reject) => { pipeline( [ fileStream, // Custom backup archive parsing new tar.Parse({ /** * Filter the parsed entries to only keep the one that matches the given filepath */ filter(path, entry) { return path === filePath && entry.type === 'File'; }, /** * Whenever an entry passes the filter method, process it */ async onentry(entry) { // Collect all the content of the entry file const content = await entry.collect(); // Parse from buffer to string to JSON const parsedContent = JSON.parse(content.toString()); // Resolve the Promise with the parsed content resolve(parsedContent); // Cleanup (close the stream associated to the entry) entry.destroy(); }, }), ], () => { // If the promise hasn't been resolved and we've parsed all // the archive entries, then the file doesn't exist reject(new Error(`File "${filePath}" not found`)); } ); }); } }