diff --git a/packages/core/data-transfer/lib/index.ts b/packages/core/data-transfer/lib/index.ts index 7a6a3286c8..95411a583a 100644 --- a/packages/core/data-transfer/lib/index.ts +++ b/packages/core/data-transfer/lib/index.ts @@ -1 +1,2 @@ export * from './engine'; +export * from './providers'; diff --git a/packages/core/data-transfer/lib/providers/index.ts b/packages/core/data-transfer/lib/providers/index.ts new file mode 100644 index 0000000000..7d22db5701 --- /dev/null +++ b/packages/core/data-transfer/lib/providers/index.ts @@ -0,0 +1 @@ +export * from './local-file-source-provider'; diff --git a/packages/core/data-transfer/lib/providers/local-file-source-provider.ts b/packages/core/data-transfer/lib/providers/local-file-source-provider.ts new file mode 100644 index 0000000000..eb45107585 --- /dev/null +++ b/packages/core/data-transfer/lib/providers/local-file-source-provider.ts @@ -0,0 +1,203 @@ +import fs from 'fs'; +import zip from 'zlib'; +import tar from 'tar'; +import { chain } from 'stream-chain'; +import { pipeline, Duplex } from 'stream'; +import { parser } from 'stream-json/jsonl/Parser'; + +import { IMetadata, ISourceProvider, ProviderType } from '../../types'; + +type StreamItemArray = Parameters[0]; + +/** + * Constant for the metadata file path + */ +const METADATA_FILE_PATH = 'metadata.json'; + +/** + * Provider options + */ +export interface ILocalFileSourceProviderOptions { + /** + * Path to the backup archive + */ + backupFilePath: string; + + /** + * Whether the backup data is encrypted or not + */ + encrypted?: boolean; + + /** + * Encryption key used to decrypt the encrypted data (if necessary) + */ + encryptionKey?: string; + + /** + * Whether the backup data is compressed or not + */ + compressed?: boolean; +} + +class LocalFileSourceProvider implements ISourceProvider { + type: ProviderType = 'source'; + name: string = 'source::local-file'; + + options: ILocalFileSourceProviderOptions; + + constructor(options: ILocalFileSourceProviderOptions) { + this.options = options; + + if (this.options.encrypted && this.options.encryptionKey === undefined) { + throw new Error('Missing encryption key'); + } + } + + /** + * Pre flight checks regarding the provided options (making sure that the provided path is correct, etc...) + */ + bootstrap() { + const path = this.options.backupFilePath; + const isValidBackupPath = fs.existsSync(path); + + // Check if the provided path exists + if (!isValidBackupPath) { + throw new Error( + `Invalid backup file path provided. "${path}" does not exist on the filesystem.` + ); + } + } + + getMetadata() { + // TODO: need to read the file & extract the metadata json file + // => we might also need to read the schema.jsonl files & implements a custom stream-check + const backupStream = this.#getBackupStream(); + + return this.#parseJSONFile(backupStream, METADATA_FILE_PATH); + } + + streamEntities(): NodeJS.ReadableStream { + return this.#streamJsonlDirectory('entities'); + } + + streamLinks(): NodeJS.ReadableStream { + return this.#streamJsonlDirectory('links'); + } + + streamConfiguration(): NodeJS.ReadableStream { + // NOTE: TBD + return this.#streamJsonlDirectory('configuration'); + } + + #getBackupStream(decompress: boolean = true) { + const path = this.options.backupFilePath; + const readStream = fs.createReadStream(path); + const streams: StreamItemArray = [readStream]; + + // Handle decompression + if (decompress) { + streams.push(zip.createGunzip()); + } + + return chain(streams); + } + + #streamJsonlDirectory(directory: string) { + const inStream = this.#getBackupStream(); + + const outStream = new Duplex(); + + pipeline( + [ + inStream, + new tar.Parse({ + filter(path, entry) { + if (entry.type !== 'File') { + return false; + } + + const parts = path.split('/'); + + if (parts.length !== 2) { + return false; + } + + return parts[0] === directory; + }, + + onentry(entry) { + const transforms = chain([ + // TODO: Add the decryption transform stream before parsing each line + // JSONL parser to read the data chunks one by one (line by line) + parser(), + // The JSONL parser returns each line as key/value + (line: { key: string; value: any }) => line.value, + ]); + + entry + // Pipe transforms + .pipe(transforms) + // Pipe the out stream to the whole pipeline + // DO NOT send the 'end' event when this entry has finished + // emitting data, so that it doesn't close the out stream + .pipe(outStream, { end: false }); + }, + }), + ], + () => { + // Manually send the 'end' event to the out stream + // once every entry has finished streaming its content + outStream.end(); + } + ); + + return outStream; + } + + async #parseJSONFile( + fileStream: NodeJS.ReadableStream, + filePath: string + ): Promise { + return new Promise((resolve, reject) => { + pipeline( + [ + fileStream, + // Custom backup archive parsing + new tar.Parse({ + /** + * Filter the parsed entries to only keep the one that matches the given filepath + */ + filter(path, entry) { + return path === filePath && entry.type === 'File'; + }, + + /** + * Whenever an entry passes the filter method, process it + */ + async onentry(entry) { + // Collect all the content of the entry file + const content = await entry.collect(); + // Parse from buffer to string to JSON + const parsedContent = JSON.parse(content.toString()); + + // Resolve the Promise with the parsed content + resolve(parsedContent); + + // Cleanup (close the stream associated to the entry) + entry.destroy(); + }, + }), + ], + () => { + // If the promise hasn't been resolved and we've parsed all + // the archive entries, then the file doesn't exist + reject(`${filePath} not found in the archive stream`); + } + ); + }); + } +} + +export const createLocalFileSourceProvider = (options: ILocalFileSourceProviderOptions) => { + return new LocalFileSourceProvider(options); +}; diff --git a/packages/core/data-transfer/package.json b/packages/core/data-transfer/package.json index 0ec16bec50..350f1a4f74 100644 --- a/packages/core/data-transfer/package.json +++ b/packages/core/data-transfer/package.json @@ -44,6 +44,8 @@ "devDependencies": { "@tsconfig/node16": "1.0.3", "@types/stream-chain": "2.0.1", + "@types/stream-json": "1.7.2", + "@types/tar": "6.1.3", "typescript": "4.8.4" }, "engines": { diff --git a/yarn.lock b/yarn.lock index e24fbaea14..dcab060a5a 100644 --- a/yarn.lock +++ b/yarn.lock @@ -6277,18 +6277,34 @@ resolved "https://registry.yarnpkg.com/@types/stack-utils/-/stack-utils-2.0.1.tgz#20f18294f797f2209b5f65c8e3b5c8e8261d127c" integrity sha512-Hl219/BT5fLAaz6NDkSuhzasy49dwQS/DSdu4MdggFB8zcXv7vflBI3xp7FEmkmdDkBUI2bPUNeMttp2knYdxw== -"@types/stream-chain@2.0.1": +"@types/stream-chain@*", "@types/stream-chain@2.0.1": version "2.0.1" resolved "https://registry.yarnpkg.com/@types/stream-chain/-/stream-chain-2.0.1.tgz#4d3cc47a32609878bc188de0bae420bcfd3bf1f5" integrity sha512-D+Id9XpcBpampptkegH7WMsEk6fUdf9LlCIX7UhLydILsqDin4L0QT7ryJR0oycwC7OqohIzdfcMHVZ34ezNGg== dependencies: "@types/node" "*" +"@types/stream-json@1.7.2": + version "1.7.2" + resolved "https://registry.yarnpkg.com/@types/stream-json/-/stream-json-1.7.2.tgz#8d7f1cc3a37a9a402a4af284348499cdf5e28248" + integrity sha512-i4LE2aWVb1R3p/Z6S6Sw9kmmOs4Drhg0SybZUyfM499I1c8p7MUKZHs4Sg9jL5eu4mDmcgfQ6eGIG3+rmfUWYw== + dependencies: + "@types/node" "*" + "@types/stream-chain" "*" + "@types/tapable@^1", "@types/tapable@^1.0.5": version "1.0.8" resolved "https://registry.yarnpkg.com/@types/tapable/-/tapable-1.0.8.tgz#b94a4391c85666c7b73299fd3ad79d4faa435310" integrity sha512-ipixuVrh2OdNmauvtT51o3d8z12p6LtFW9in7U79der/kwejjdNchQC5UMn5u/KxNoM7VHHOs/l8KS8uHxhODQ== +"@types/tar@6.1.3": + version "6.1.3" + resolved "https://registry.yarnpkg.com/@types/tar/-/tar-6.1.3.tgz#46a2ce7617950c4852dfd7e9cd41aa8161b9d750" + integrity sha512-YzDOr5kdAeqS8dcO6NTTHTMJ44MUCBDoLEIyPtwEn7PssKqUYL49R1iCVJPeiPzPlKi6DbH33eZkpeJ27e4vHg== + dependencies: + "@types/node" "*" + minipass "^3.3.5" + "@types/testing-library__jest-dom@^5.9.1": version "5.14.5" resolved "https://registry.yarnpkg.com/@types/testing-library__jest-dom/-/testing-library__jest-dom-5.14.5.tgz#d113709c90b3c75fdb127ec338dad7d5f86c974f" @@ -16323,6 +16339,13 @@ minipass@^3.0.0, minipass@^3.1.0, minipass@^3.1.1, minipass@^3.1.3, minipass@^3. dependencies: yallist "^4.0.0" +minipass@^3.3.5: + version "3.3.5" + resolved "https://registry.yarnpkg.com/minipass/-/minipass-3.3.5.tgz#6da7e53a48db8a856eeb9153d85b230a2119e819" + integrity sha512-rQ/p+KfKBkeNwo04U15i+hOwoVBVmekmm/HcfTkTN2t9pbQKCMm4eN5gFeqgrrSp/kH/7BYYhTIHOxGqzbBPaA== + dependencies: + yallist "^4.0.0" + minizlib@^2.0.0, minizlib@^2.1.1, minizlib@^2.1.2: version "2.1.2" resolved "https://registry.yarnpkg.com/minizlib/-/minizlib-2.1.2.tgz#e90d3466ba209b932451508a11ce3d3632145931"