2022-11-09 14:29:18 +01:00
|
|
|
import * as fs from 'fs';
|
2022-10-19 15:43:52 +02:00
|
|
|
import path from 'path';
|
|
|
|
import zip from 'zlib';
|
2022-11-02 17:26:32 +01:00
|
|
|
import { Writable } from 'stream';
|
|
|
|
import { chain } from 'stream-chain';
|
2022-10-19 15:43:52 +02:00
|
|
|
import { stringer } from 'stream-json/jsonl/Stringer';
|
|
|
|
|
2022-11-08 14:26:48 +01:00
|
|
|
import type {
|
|
|
|
IDestinationProvider,
|
|
|
|
IDestinationProviderTransferResults,
|
|
|
|
ProviderType,
|
2022-11-08 16:04:43 +01:00
|
|
|
TransferStage,
|
2022-11-08 14:26:48 +01:00
|
|
|
} from '../../types';
|
2022-10-31 10:13:32 +01:00
|
|
|
import { createCipher } from '../encryption/encrypt';
|
2022-11-08 16:04:43 +01:00
|
|
|
import { providerResultsCounter } from './util';
|
2022-10-19 15:43:52 +02:00
|
|
|
|
|
|
|
export interface ILocalFileDestinationProviderOptions {
|
|
|
|
// Encryption
|
2022-10-24 16:27:56 +02:00
|
|
|
encryption: {
|
|
|
|
enabled: boolean;
|
2022-11-07 18:48:33 +01:00
|
|
|
key?: string;
|
2022-10-24 16:27:56 +02:00
|
|
|
};
|
|
|
|
|
|
|
|
// Compressions
|
|
|
|
compression: {
|
|
|
|
enabled: boolean;
|
|
|
|
};
|
|
|
|
|
|
|
|
// File
|
|
|
|
file: {
|
|
|
|
path: string;
|
|
|
|
maxSize?: number;
|
2022-11-03 15:48:23 +01:00
|
|
|
maxSizeJsonl?: number;
|
2022-10-24 16:27:56 +02:00
|
|
|
};
|
2022-10-19 15:43:52 +02:00
|
|
|
}
|
|
|
|
|
2022-11-08 14:26:48 +01:00
|
|
|
export interface ILocalFileDestinationProviderTransferResults
|
|
|
|
extends IDestinationProviderTransferResults {
|
|
|
|
file?: {
|
|
|
|
path?: string;
|
|
|
|
};
|
|
|
|
}
|
|
|
|
|
2022-10-24 17:11:59 +02:00
|
|
|
export const createLocalFileDestinationProvider = (
|
|
|
|
options: ILocalFileDestinationProviderOptions
|
|
|
|
) => {
|
2022-10-20 10:01:36 +02:00
|
|
|
return new LocalFileDestinationProvider(options);
|
|
|
|
};
|
|
|
|
|
|
|
|
class LocalFileDestinationProvider implements IDestinationProvider {
|
2022-10-20 09:52:18 +02:00
|
|
|
name: string = 'destination::local-file';
|
2022-10-19 15:43:52 +02:00
|
|
|
type: ProviderType = 'destination';
|
|
|
|
options: ILocalFileDestinationProviderOptions;
|
2022-11-08 14:26:48 +01:00
|
|
|
results: ILocalFileDestinationProviderTransferResults = {};
|
2022-10-19 15:43:52 +02:00
|
|
|
|
|
|
|
constructor(options: ILocalFileDestinationProviderOptions) {
|
|
|
|
this.options = options;
|
|
|
|
}
|
|
|
|
|
2022-11-08 16:04:43 +01:00
|
|
|
#getDataTransformers(stage: TransferStage) {
|
2022-11-02 17:26:32 +01:00
|
|
|
const transforms = [];
|
|
|
|
|
|
|
|
// Convert to stringified JSON lines
|
|
|
|
transforms.push(stringer());
|
|
|
|
|
2022-11-08 16:04:43 +01:00
|
|
|
// add counter
|
|
|
|
transforms.push(providerResultsCounter(this.results, stage));
|
|
|
|
|
2022-11-02 17:26:32 +01:00
|
|
|
// Compression
|
|
|
|
if (this.options.compression.enabled) {
|
|
|
|
transforms.push(zip.createGzip());
|
|
|
|
}
|
|
|
|
|
|
|
|
// Encryption
|
|
|
|
if (this.options.encryption.enabled) {
|
2022-11-07 18:48:33 +01:00
|
|
|
if (!this.options.encryption.key) {
|
|
|
|
throw new Error("Can't encrypt without a key");
|
|
|
|
}
|
2022-11-02 17:26:32 +01:00
|
|
|
const cipher = createCipher(this.options.encryption.key);
|
|
|
|
transforms.push(cipher);
|
|
|
|
}
|
|
|
|
|
|
|
|
return transforms;
|
|
|
|
}
|
|
|
|
|
2022-10-19 15:43:52 +02:00
|
|
|
bootstrap(): void | Promise<void> {
|
2022-10-24 16:27:56 +02:00
|
|
|
const rootDir = this.options.file.path;
|
2022-10-19 15:43:52 +02:00
|
|
|
const dirExists = fs.existsSync(rootDir);
|
|
|
|
|
|
|
|
if (dirExists) {
|
|
|
|
fs.rmSync(rootDir, { force: true, recursive: true });
|
|
|
|
}
|
|
|
|
|
|
|
|
fs.mkdirSync(rootDir, { recursive: true });
|
2022-11-03 10:12:16 +02:00
|
|
|
fs.mkdirSync(path.join(rootDir, 'schemas'));
|
2022-10-19 15:43:52 +02:00
|
|
|
fs.mkdirSync(path.join(rootDir, 'entities'));
|
|
|
|
fs.mkdirSync(path.join(rootDir, 'links'));
|
|
|
|
fs.mkdirSync(path.join(rootDir, 'media'));
|
|
|
|
fs.mkdirSync(path.join(rootDir, 'configuration'));
|
2022-11-08 14:26:48 +01:00
|
|
|
|
|
|
|
this.results.file = { path: this.options.file.path };
|
2022-10-19 15:43:52 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
rollback(): void | Promise<void> {
|
2022-10-24 16:27:56 +02:00
|
|
|
fs.rmSync(this.options.file.path, { force: true, recursive: true });
|
2022-10-19 15:43:52 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
getMetadata() {
|
|
|
|
return null;
|
|
|
|
}
|
|
|
|
|
2022-11-03 10:12:16 +02:00
|
|
|
getSchemasStream() {
|
2022-11-04 09:40:55 +01:00
|
|
|
const filePathFactory = createFilePathFactory(this.options.file.path, 'schemas');
|
2022-10-19 15:43:52 +02:00
|
|
|
|
2022-11-04 09:40:55 +01:00
|
|
|
// Transform streams
|
2022-11-08 16:04:43 +01:00
|
|
|
const transforms: Writable[] = this.#getDataTransformers('schemas');
|
2022-10-19 15:43:52 +02:00
|
|
|
|
2022-11-03 10:12:16 +02:00
|
|
|
// FS write stream
|
2022-11-04 17:22:46 +01:00
|
|
|
const fileStream = createMultiFilesWriteStream(filePathFactory, this.options.file.maxSizeJsonl);
|
2022-11-04 09:40:55 +01:00
|
|
|
|
|
|
|
// Full pipeline
|
|
|
|
const streams = transforms.concat(fileStream);
|
2022-11-03 10:12:16 +02:00
|
|
|
|
|
|
|
return chain(streams);
|
|
|
|
}
|
|
|
|
|
2022-11-02 17:26:32 +01:00
|
|
|
getEntitiesStream(): NodeJS.WritableStream {
|
|
|
|
const filePathFactory = createFilePathFactory(this.options.file.path, 'entities');
|
2022-10-19 15:43:52 +02:00
|
|
|
|
2022-11-02 17:26:32 +01:00
|
|
|
// Transform streams
|
2022-11-08 16:04:43 +01:00
|
|
|
const transforms: Writable[] = this.#getDataTransformers('entities');
|
2022-10-19 15:43:52 +02:00
|
|
|
|
|
|
|
// FS write stream
|
2022-11-02 17:26:32 +01:00
|
|
|
const fileStream = createMultiFilesWriteStream(filePathFactory, this.options.file.maxSize);
|
|
|
|
|
|
|
|
// Full pipeline
|
|
|
|
const streams = transforms.concat(fileStream);
|
2022-10-19 15:43:52 +02:00
|
|
|
|
|
|
|
return chain(streams);
|
|
|
|
}
|
|
|
|
|
2022-11-02 17:26:32 +01:00
|
|
|
getLinksStream(): NodeJS.WritableStream {
|
|
|
|
const filePathFactory = createFilePathFactory(this.options.file.path, 'links');
|
2022-10-19 15:43:52 +02:00
|
|
|
|
2022-11-02 17:26:32 +01:00
|
|
|
// Transform streams
|
2022-11-08 16:04:43 +01:00
|
|
|
const transforms: Writable[] = this.#getDataTransformers('links');
|
2022-10-19 15:43:52 +02:00
|
|
|
|
2022-11-02 17:26:32 +01:00
|
|
|
// FS write stream
|
2022-11-04 17:22:46 +01:00
|
|
|
const fileStream = createMultiFilesWriteStream(filePathFactory, this.options.file.maxSizeJsonl);
|
2022-11-02 17:26:32 +01:00
|
|
|
|
|
|
|
// Full pipelines
|
|
|
|
const streams = transforms.concat(fileStream);
|
|
|
|
|
|
|
|
return chain(streams);
|
|
|
|
}
|
|
|
|
|
|
|
|
getConfigurationStream(): NodeJS.WritableStream {
|
|
|
|
const filePathFactory = createFilePathFactory(this.options.file.path, 'configuration');
|
|
|
|
|
|
|
|
// Transform streams
|
2022-11-08 16:04:43 +01:00
|
|
|
const transforms: Writable[] = this.#getDataTransformers('configuration');
|
2022-10-19 15:43:52 +02:00
|
|
|
|
|
|
|
// FS write stream
|
2022-11-02 17:26:32 +01:00
|
|
|
const fileStream = createMultiFilesWriteStream(filePathFactory, this.options.file.maxSize);
|
|
|
|
|
|
|
|
// Full pipeline
|
|
|
|
const streams = transforms.concat(fileStream);
|
2022-10-19 15:43:52 +02:00
|
|
|
|
|
|
|
return chain(streams);
|
|
|
|
}
|
2022-11-08 14:26:48 +01:00
|
|
|
|
|
|
|
close = async () => {
|
|
|
|
// TODO: this will need to be updated with extension
|
|
|
|
this.results.file = { path: this.options.file.path };
|
|
|
|
};
|
2022-10-19 15:43:52 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Create a writable stream that can split the streamed data into
|
|
|
|
* multiple files based on a provided maximum file size value.
|
|
|
|
*/
|
|
|
|
const createMultiFilesWriteStream = (
|
|
|
|
filePathFactory: (index?: number) => string,
|
|
|
|
maxFileSize?: number
|
2022-11-02 17:26:32 +01:00
|
|
|
): Writable => {
|
2022-10-19 15:43:52 +02:00
|
|
|
let fileIndex = 0;
|
|
|
|
let fileSize = 0;
|
|
|
|
let maxSize = maxFileSize;
|
|
|
|
|
2022-10-19 15:49:34 +02:00
|
|
|
let writeStream: fs.WriteStream;
|
2022-10-19 15:43:52 +02:00
|
|
|
|
|
|
|
const createIndexedWriteStream = () => fs.createWriteStream(filePathFactory(fileIndex));
|
|
|
|
|
|
|
|
// If no maximum file size is provided, then return a basic fs write stream
|
|
|
|
if (maxFileSize === undefined) {
|
|
|
|
return createIndexedWriteStream();
|
|
|
|
}
|
|
|
|
|
|
|
|
if (maxFileSize <= 0) {
|
|
|
|
throw new Error('Max file size must be a positive number');
|
|
|
|
}
|
|
|
|
|
|
|
|
return new Writable({
|
|
|
|
write(chunk, encoding, callback) {
|
|
|
|
// Initialize the write stream value if undefined
|
|
|
|
if (!writeStream) {
|
|
|
|
writeStream = createIndexedWriteStream();
|
|
|
|
}
|
|
|
|
|
|
|
|
// Check that by adding this new chunk of data, we
|
|
|
|
// are not going to reach the maximum file size.
|
2022-10-24 17:11:59 +02:00
|
|
|
if (maxSize && fileSize + chunk.length > maxSize) {
|
2022-10-19 15:43:52 +02:00
|
|
|
// Update the counters' value
|
|
|
|
fileIndex++;
|
|
|
|
fileSize = 0;
|
|
|
|
|
|
|
|
// Replace old write stream
|
|
|
|
writeStream.destroy();
|
|
|
|
writeStream = createIndexedWriteStream();
|
|
|
|
}
|
|
|
|
|
|
|
|
// Update the actual file size
|
|
|
|
fileSize += chunk.length;
|
|
|
|
|
|
|
|
// Transfer the data to the up-to-date write stream
|
|
|
|
writeStream.write(chunk, encoding, callback);
|
|
|
|
},
|
|
|
|
});
|
|
|
|
};
|
2022-11-02 17:26:32 +01:00
|
|
|
|
|
|
|
/**
|
|
|
|
* Create a file path factory for a given path & prefix.
|
|
|
|
* Upon being called, the factory will return a file path for a given index
|
|
|
|
*/
|
|
|
|
const createFilePathFactory =
|
|
|
|
(src: string, directory: string, prefix: string = directory) =>
|
|
|
|
(fileIndex: number = 0): string => {
|
|
|
|
return path.join(
|
|
|
|
// Backup path
|
|
|
|
src,
|
|
|
|
// "{directory}/" directory
|
|
|
|
directory,
|
|
|
|
// "${prefix}_XXXXX.jsonl" file
|
|
|
|
`${prefix}_${String(fileIndex).padStart(5, '0')}.jsonl`
|
|
|
|
);
|
|
|
|
};
|
2022-11-08 14:26:48 +01:00
|
|
|
|
|
|
|
export type ILocalFileDestinationProvider = InstanceType<typeof LocalFileDestinationProvider>;
|