strapi/packages/core/data-transfer/lib/providers/local-file-source-provider.ts
2022-12-13 13:27:05 +01:00

220 lines
5.9 KiB
TypeScript

import type { Readable } from 'stream';
import fs from 'fs-extra';
import zip from 'zlib';
import tar from 'tar';
import { keyBy } from 'lodash/fp';
import { chain } from 'stream-chain';
import { pipeline, PassThrough } from 'stream';
import { parser } from 'stream-json/jsonl/Parser';
import type { IMetadata, ISourceProvider, ProviderType } from '../../types';
import { collect } from '../utils';
import { createDecryptionCipher } from '../encryption';
type StreamItemArray = Parameters<typeof chain>[0];
/**
* Constant for the metadata file path
*/
const METADATA_FILE_PATH = 'metadata.json';
/**
* Provider options
*/
export interface ILocalFileSourceProviderOptions {
file: {
path: string;
};
encryption: {
enabled: boolean;
key?: string;
};
compression: {
enabled: boolean;
};
}
export const createLocalFileSourceProvider = (options: ILocalFileSourceProviderOptions) => {
return new LocalFileSourceProvider(options);
};
class LocalFileSourceProvider implements ISourceProvider {
type: ProviderType = 'source';
name = 'source::local-file';
options: ILocalFileSourceProviderOptions;
constructor(options: ILocalFileSourceProviderOptions) {
this.options = options;
const { encryption } = this.options;
if (encryption.enabled && encryption.key === undefined) {
throw new Error('Missing encryption key');
}
}
/**
* Pre flight checks regarding the provided options (making sure that the provided path is correct, etc...)
*/
async bootstrap() {
const { path } = this.options.file;
try {
// This is only to show a nicer error, it doesn't ensure the file will still exist when we try to open it later
await fs.access(path, fs.constants.R_OK);
} catch (e) {
throw new Error(`Can't access file "${path}".`);
}
}
getMetadata() {
// TODO: need to read the file & extract the metadata json file
// => we might also need to read the schema.jsonl files & implements a custom stream-check
const backupStream = this.#getBackupStream();
return this.#parseJSONFile<IMetadata>(backupStream, METADATA_FILE_PATH);
}
async getSchemas() {
const schemas = await collect(this.streamSchemas() as Readable);
return keyBy('uid', schemas);
}
streamEntities(): NodeJS.ReadableStream {
return this.#streamJsonlDirectory('entities');
}
streamSchemas(): NodeJS.ReadableStream | Promise<NodeJS.ReadableStream> {
return this.#streamJsonlDirectory('schemas');
}
streamLinks(): NodeJS.ReadableStream {
return this.#streamJsonlDirectory('links');
}
streamConfiguration(): NodeJS.ReadableStream {
// NOTE: TBD
return this.#streamJsonlDirectory('configuration');
}
#getBackupStream() {
const { file, encryption, compression } = this.options;
const streams: StreamItemArray = [];
try {
streams.push(fs.createReadStream(file.path));
} catch (e) {
throw new Error(`Could not read backup file path provided at "${this.options.file.path}"`);
}
if (encryption.enabled && encryption.key) {
streams.push(createDecryptionCipher(encryption.key));
}
if (compression.enabled) {
streams.push(zip.createGunzip());
}
return chain(streams);
}
#streamJsonlDirectory(directory: string) {
const inStream = this.#getBackupStream();
const outStream = new PassThrough({ objectMode: true });
pipeline(
[
inStream,
new tar.Parse({
filter(path, entry) {
if (entry.type !== 'File') {
return false;
}
const parts = path.split('/');
if (parts.length !== 2) {
return false;
}
return parts[0] === directory;
},
onentry(entry) {
const transforms = [
// JSONL parser to read the data chunks one by one (line by line)
parser(),
// The JSONL parser returns each line as key/value
(line: { key: string; value: any }) => line.value,
];
entry
// Pipe transforms
.pipe(chain(transforms))
// Pipe the out stream to the whole pipeline
// DO NOT send the 'end' event when this entry has finished
// emitting data, so that it doesn't close the out stream
.pipe(outStream, { end: false });
},
}),
],
() => {
// Manually send the 'end' event to the out stream
// once every entry has finished streaming its content
outStream.end();
}
);
return outStream;
}
async #parseJSONFile<T extends Record<string, any> = any>(
fileStream: NodeJS.ReadableStream,
filePath: string
): Promise<T> {
return new Promise<T>((resolve, reject) => {
pipeline(
[
fileStream,
// Custom backup archive parsing
new tar.Parse({
/**
* Filter the parsed entries to only keep the one that matches the given filepath
*/
filter(path, entry) {
return path === filePath && entry.type === 'File';
},
/**
* Whenever an entry passes the filter method, process it
*/
async onentry(entry) {
// Collect all the content of the entry file
const content = await entry.collect();
// Parse from buffer to string to JSON
const parsedContent = JSON.parse(content.toString());
// Resolve the Promise with the parsed content
resolve(parsedContent);
// Cleanup (close the stream associated to the entry)
entry.destroy();
},
}),
],
() => {
// If the promise hasn't been resolved and we've parsed all
// the archive entries, then the file doesn't exist
reject(new Error(`File "${filePath}" not found`));
}
);
});
}
}