2022-10-19 15:43:52 +02:00
|
|
|
import fs from 'fs';
|
|
|
|
import path from 'path';
|
|
|
|
import zip from 'zlib';
|
|
|
|
import { Duplex } from 'stream';
|
2022-10-19 15:49:34 +02:00
|
|
|
import { chain, Writable } from 'stream-chain';
|
2022-10-19 15:43:52 +02:00
|
|
|
import { stringer } from 'stream-json/jsonl/Stringer';
|
2022-10-31 10:13:32 +01:00
|
|
|
import type { Cipher } from 'crypto';
|
2022-10-19 15:43:52 +02:00
|
|
|
|
2022-10-24 17:11:59 +02:00
|
|
|
import type { IDestinationProvider, ProviderType, Stream } from '../../types';
|
2022-10-31 10:13:32 +01:00
|
|
|
import { createCipher } from '../encryption/encrypt';
|
2022-10-19 15:43:52 +02:00
|
|
|
|
|
|
|
export interface ILocalFileDestinationProviderOptions {
|
|
|
|
// Encryption
|
2022-10-24 16:27:56 +02:00
|
|
|
encryption: {
|
|
|
|
enabled: boolean;
|
|
|
|
key: string;
|
|
|
|
};
|
|
|
|
|
|
|
|
// Compressions
|
|
|
|
compression: {
|
|
|
|
enabled: boolean;
|
|
|
|
};
|
|
|
|
|
|
|
|
// File
|
|
|
|
file: {
|
|
|
|
path: string;
|
|
|
|
maxSize?: number;
|
|
|
|
};
|
2022-10-19 15:43:52 +02:00
|
|
|
}
|
|
|
|
|
2022-10-24 17:11:59 +02:00
|
|
|
export const createLocalFileDestinationProvider = (
|
|
|
|
options: ILocalFileDestinationProviderOptions
|
|
|
|
) => {
|
2022-10-20 10:01:36 +02:00
|
|
|
return new LocalFileDestinationProvider(options);
|
|
|
|
};
|
|
|
|
|
|
|
|
class LocalFileDestinationProvider implements IDestinationProvider {
|
2022-10-20 09:52:18 +02:00
|
|
|
name: string = 'destination::local-file';
|
2022-10-19 15:43:52 +02:00
|
|
|
type: ProviderType = 'destination';
|
|
|
|
options: ILocalFileDestinationProviderOptions;
|
|
|
|
|
|
|
|
constructor(options: ILocalFileDestinationProviderOptions) {
|
|
|
|
this.options = options;
|
|
|
|
}
|
|
|
|
|
|
|
|
bootstrap(): void | Promise<void> {
|
2022-10-24 16:27:56 +02:00
|
|
|
const rootDir = this.options.file.path;
|
2022-10-19 15:43:52 +02:00
|
|
|
const dirExists = fs.existsSync(rootDir);
|
|
|
|
|
|
|
|
if (dirExists) {
|
|
|
|
fs.rmSync(rootDir, { force: true, recursive: true });
|
|
|
|
}
|
|
|
|
|
|
|
|
fs.mkdirSync(rootDir, { recursive: true });
|
|
|
|
fs.mkdirSync(path.join(rootDir, 'entities'));
|
|
|
|
fs.mkdirSync(path.join(rootDir, 'links'));
|
|
|
|
fs.mkdirSync(path.join(rootDir, 'media'));
|
|
|
|
fs.mkdirSync(path.join(rootDir, 'configuration'));
|
|
|
|
}
|
|
|
|
|
|
|
|
rollback(): void | Promise<void> {
|
2022-10-24 16:27:56 +02:00
|
|
|
fs.rmSync(this.options.file.path, { force: true, recursive: true });
|
2022-10-19 15:43:52 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
getMetadata() {
|
|
|
|
return null;
|
|
|
|
}
|
|
|
|
|
|
|
|
getEntitiesStream(): Duplex {
|
|
|
|
const filePathFactory = (fileIndex: number = 0) => {
|
|
|
|
return path.join(
|
|
|
|
// Backup path
|
2022-10-24 16:27:56 +02:00
|
|
|
this.options.file.path,
|
2022-10-19 15:43:52 +02:00
|
|
|
// "entities/" directory
|
|
|
|
'entities',
|
|
|
|
// "entities_00000.jsonl" file
|
|
|
|
`entities_${String(fileIndex).padStart(5, '0')}.jsonl`
|
|
|
|
);
|
|
|
|
};
|
|
|
|
|
|
|
|
const streams: any[] = [
|
|
|
|
// create jsonl strings from object entities
|
|
|
|
stringer(),
|
|
|
|
];
|
|
|
|
|
|
|
|
// Compression
|
2022-10-24 16:50:12 +02:00
|
|
|
if (this.options.compression.enabled) {
|
2022-10-19 15:43:52 +02:00
|
|
|
streams.push(zip.createGzip());
|
|
|
|
}
|
|
|
|
|
|
|
|
// Encryption
|
2022-10-31 10:13:32 +01:00
|
|
|
if (this.options.encryption?.enabled) {
|
2022-11-02 10:24:35 +01:00
|
|
|
streams.push(createCipher(this.options.encryption.key));
|
2022-10-31 10:13:32 +01:00
|
|
|
}
|
2022-10-19 15:43:52 +02:00
|
|
|
|
|
|
|
// FS write stream
|
2022-10-24 16:27:56 +02:00
|
|
|
streams.push(createMultiFilesWriteStream(filePathFactory, this.options.file.maxSize));
|
2022-10-19 15:43:52 +02:00
|
|
|
|
|
|
|
return chain(streams);
|
|
|
|
}
|
|
|
|
|
|
|
|
getLinksStream(): Duplex | Promise<Duplex> {
|
|
|
|
const options = {
|
|
|
|
encryption: {
|
|
|
|
enabled: true,
|
|
|
|
key: 'Hello World!',
|
|
|
|
},
|
|
|
|
compression: {
|
|
|
|
enabled: false,
|
|
|
|
},
|
|
|
|
file: {
|
|
|
|
maxSize: 100000,
|
|
|
|
},
|
|
|
|
};
|
|
|
|
|
|
|
|
const filePathFactory = (fileIndex: number = 0) => {
|
|
|
|
return path.join(
|
|
|
|
// Backup path
|
2022-10-24 16:27:56 +02:00
|
|
|
this.options.file.path,
|
2022-10-19 15:43:52 +02:00
|
|
|
// "links/" directory
|
|
|
|
'links',
|
|
|
|
// "links_00000.jsonl" file
|
|
|
|
`links_${String(fileIndex).padStart(5, '0')}.jsonl`
|
|
|
|
);
|
|
|
|
};
|
|
|
|
|
|
|
|
const streams: any[] = [
|
|
|
|
// create jsonl strings from object links
|
|
|
|
stringer(),
|
|
|
|
];
|
|
|
|
|
|
|
|
// Compression
|
2022-10-24 16:50:12 +02:00
|
|
|
if (options.compression.enabled) {
|
2022-10-19 15:43:52 +02:00
|
|
|
streams.push(zip.createGzip());
|
|
|
|
}
|
|
|
|
|
|
|
|
// Encryption
|
|
|
|
// if (options.encryption?.enabled) {
|
|
|
|
// streams.push(encrypt(options.encryption.key).cipher());
|
|
|
|
// }
|
|
|
|
|
|
|
|
// FS write stream
|
2022-10-24 16:50:12 +02:00
|
|
|
streams.push(createMultiFilesWriteStream(filePathFactory, options.file.maxSize));
|
2022-10-19 15:43:52 +02:00
|
|
|
|
|
|
|
return chain(streams);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Create a writable stream that can split the streamed data into
|
|
|
|
* multiple files based on a provided maximum file size value.
|
|
|
|
*/
|
|
|
|
const createMultiFilesWriteStream = (
|
|
|
|
filePathFactory: (index?: number) => string,
|
|
|
|
maxFileSize?: number
|
2022-10-19 15:49:34 +02:00
|
|
|
): Stream => {
|
2022-10-19 15:43:52 +02:00
|
|
|
let fileIndex = 0;
|
|
|
|
let fileSize = 0;
|
|
|
|
let maxSize = maxFileSize;
|
|
|
|
|
2022-10-19 15:49:34 +02:00
|
|
|
let writeStream: fs.WriteStream;
|
2022-10-19 15:43:52 +02:00
|
|
|
|
|
|
|
const createIndexedWriteStream = () => fs.createWriteStream(filePathFactory(fileIndex));
|
|
|
|
|
|
|
|
// If no maximum file size is provided, then return a basic fs write stream
|
|
|
|
if (maxFileSize === undefined) {
|
|
|
|
return createIndexedWriteStream();
|
|
|
|
}
|
|
|
|
|
|
|
|
if (maxFileSize <= 0) {
|
|
|
|
throw new Error('Max file size must be a positive number');
|
|
|
|
}
|
|
|
|
|
|
|
|
return new Writable({
|
|
|
|
write(chunk, encoding, callback) {
|
|
|
|
// Initialize the write stream value if undefined
|
|
|
|
if (!writeStream) {
|
|
|
|
writeStream = createIndexedWriteStream();
|
|
|
|
}
|
|
|
|
|
|
|
|
// Check that by adding this new chunk of data, we
|
|
|
|
// are not going to reach the maximum file size.
|
2022-10-24 17:11:59 +02:00
|
|
|
if (maxSize && fileSize + chunk.length > maxSize) {
|
2022-10-19 15:43:52 +02:00
|
|
|
// Update the counters' value
|
|
|
|
fileIndex++;
|
|
|
|
fileSize = 0;
|
|
|
|
|
|
|
|
// Replace old write stream
|
|
|
|
writeStream.destroy();
|
|
|
|
writeStream = createIndexedWriteStream();
|
|
|
|
}
|
|
|
|
|
|
|
|
// Update the actual file size
|
|
|
|
fileSize += chunk.length;
|
|
|
|
|
|
|
|
// Transfer the data to the up-to-date write stream
|
|
|
|
writeStream.write(chunk, encoding, callback);
|
|
|
|
},
|
|
|
|
});
|
|
|
|
};
|