From fa8d4a58019666bdb538017ef0f680eace7599bc Mon Sep 17 00:00:00 2001 From: Christian Capeans Date: Wed, 23 Nov 2022 13:41:51 +0100 Subject: [PATCH 1/6] Add tests for local file destination provider --- .../__tests__/index.test.ts | 174 +++++++++++++++ .../__tests__/utils.test.ts | 76 +++++++ .../local-file-destination-provider/index.ts | 211 ++++++++++++++++++ .../local-file-destination-provider/utils.ts | 76 +++++++ 4 files changed, 537 insertions(+) create mode 100644 packages/core/data-transfer/lib/providers/local-file-destination-provider/__tests__/index.test.ts create mode 100644 packages/core/data-transfer/lib/providers/local-file-destination-provider/__tests__/utils.test.ts create mode 100644 packages/core/data-transfer/lib/providers/local-file-destination-provider/index.ts create mode 100644 packages/core/data-transfer/lib/providers/local-file-destination-provider/utils.ts diff --git a/packages/core/data-transfer/lib/providers/local-file-destination-provider/__tests__/index.test.ts b/packages/core/data-transfer/lib/providers/local-file-destination-provider/__tests__/index.test.ts new file mode 100644 index 0000000000..f97de6d007 --- /dev/null +++ b/packages/core/data-transfer/lib/providers/local-file-destination-provider/__tests__/index.test.ts @@ -0,0 +1,174 @@ +import { createLocalFileDestinationProvider, ILocalFileDestinationProviderOptions } from '../'; +import * as encryption from '../../../encryption/encrypt'; +import { + createFilePathFactory, + createTarEntryStream, +} from '../../local-file-destination-provider/utils'; + +const filePath = './test-file'; + +jest.mock('../../../encryption/encrypt', () => { + return { + __esModule: true, + createEncryptionCipher: (key: string) => {}, + }; +}); + +jest.mock('../../local-file-destination-provider/utils'); + +describe('Local File Destination Provider', () => { + (createFilePathFactory as jest.Mock).mockImplementation(jest.fn()); + + afterEach(() => { + jest.resetAllMocks(); + }); + + describe('Bootstrap', () => { + it('Throws an error if encryption is enabled and the key is not provided', () => { + const providerOptions = { + encryption: { enabled: true }, + compression: { enabled: false }, + file: { path: './test-file' }, + }; + const provider = createLocalFileDestinationProvider(providerOptions); + + expect(() => provider.bootstrap()).toThrowError("Can't encrypt without a key"); + }); + + it('Adds .gz extension to the archive path when compression is enabled', async () => { + const providerOptions = { + encryption: { enabled: false }, + compression: { enabled: true }, + file: { path: filePath }, + }; + const provider = createLocalFileDestinationProvider(providerOptions); + + await provider.bootstrap(); + + expect(provider.results.file.path).toEqual(`${filePath}.tar.gz`); + }); + + it('Adds .gpg extension to the archive path when encryption is enabled', async () => { + const providerOptions: ILocalFileDestinationProviderOptions = { + encryption: { enabled: true, key: 'key' }, + compression: { enabled: false }, + file: { path: filePath }, + }; + const provider = createLocalFileDestinationProvider(providerOptions); + + await provider.bootstrap(); + + expect(provider.results.file.path).toEqual(`${filePath}.tar.gpg`); + }); + + it('Adds .gz.gpg extension to the archive path when encryption and compression are enabled', async () => { + const providerOptions: ILocalFileDestinationProviderOptions = { + encryption: { enabled: true, key: 'key' }, + compression: { enabled: true }, + file: { path: filePath }, + }; + const provider = createLocalFileDestinationProvider(providerOptions); + + await provider.bootstrap(); + + expect(provider.results.file.path).toEqual(`${filePath}.tar.gz.gpg`); + }); + + it('Adds the compression step to the stream chain when compression is enabled', async () => { + const providerOptions: ILocalFileDestinationProviderOptions = { + encryption: { enabled: true, key: 'key' }, + compression: { enabled: true }, + file: { path: filePath }, + }; + const provider = createLocalFileDestinationProvider(providerOptions); + jest.spyOn(provider, 'createGzip'); + + await provider.bootstrap(); + + expect(provider.createGzip).toHaveBeenCalled(); + }); + + it('Adds the encryption step to the stream chain when encryption is enabled', async () => { + jest.spyOn(encryption, 'createEncryptionCipher'); + const key = 'key'; + const providerOptions: ILocalFileDestinationProviderOptions = { + encryption: { enabled: true, key }, + compression: { enabled: true }, + file: { path: filePath }, + }; + const provider = createLocalFileDestinationProvider(providerOptions); + + await provider.bootstrap(); + + expect(encryption.createEncryptionCipher).toHaveBeenCalledWith(key); + }); + }); + + describe('Streaming entities', () => { + it('Creates a tar entry stream', () => { + const providerOptions: ILocalFileDestinationProviderOptions = { + encryption: { enabled: false }, + compression: { enabled: false }, + file: { path: filePath }, + }; + + const provider = createLocalFileDestinationProvider(providerOptions); + (createTarEntryStream as jest.Mock).mockImplementation(jest.fn()); + provider.getEntitiesStream(); + + expect(createTarEntryStream).toHaveBeenCalled(); + expect(createFilePathFactory).toHaveBeenCalledWith('entities'); + }); + }); + + describe('Streaming schemas', () => { + it('Creates a tar entry stream for schemas', () => { + const providerOptions: ILocalFileDestinationProviderOptions = { + encryption: { enabled: false }, + compression: { enabled: false }, + file: { path: filePath }, + }; + + const provider = createLocalFileDestinationProvider(providerOptions); + (createTarEntryStream as jest.Mock).mockImplementation(jest.fn()); + provider.getSchemasStream(); + + expect(createTarEntryStream).toHaveBeenCalled(); + expect(createFilePathFactory).toHaveBeenCalledWith('schemas'); + }); + }); + + describe('Streaming links', () => { + it('Creates a tar entry stream for links', () => { + const providerOptions: ILocalFileDestinationProviderOptions = { + encryption: { enabled: false }, + compression: { enabled: false }, + file: { path: filePath }, + }; + + const provider = createLocalFileDestinationProvider(providerOptions); + (createTarEntryStream as jest.Mock).mockImplementation(jest.fn()); + provider.getLinksStream(); + + expect(createTarEntryStream).toHaveBeenCalled(); + expect(createFilePathFactory).toHaveBeenCalledWith('links'); + }); + }); + + describe('Streaming configuration', () => { + it('Creates a tar entry stream for configuration', () => { + const providerOptions: ILocalFileDestinationProviderOptions = { + encryption: { enabled: false }, + compression: { enabled: false }, + file: { path: filePath }, + }; + + const provider = createLocalFileDestinationProvider(providerOptions); + (createTarEntryStream as jest.Mock).mockImplementation(jest.fn()); + provider.getConfigurationStream(); + + expect(createTarEntryStream).toHaveBeenCalled(); + expect(createFilePathFactory).toHaveBeenCalledWith('configuration'); + }); + }); +}); diff --git a/packages/core/data-transfer/lib/providers/local-file-destination-provider/__tests__/utils.test.ts b/packages/core/data-transfer/lib/providers/local-file-destination-provider/__tests__/utils.test.ts new file mode 100644 index 0000000000..7c1c60a07e --- /dev/null +++ b/packages/core/data-transfer/lib/providers/local-file-destination-provider/__tests__/utils.test.ts @@ -0,0 +1,76 @@ +import tar from 'tar-stream'; +import { createFilePathFactory, createTarEntryStream } from '../utils'; + +describe('Local File Destination Provider - Utils', () => { + describe('Create File Path Factory', () => { + it('returns a function', () => { + const filePathFactory = createFilePathFactory('entities'); + expect(typeof filePathFactory).toBe('function'); + }); + it('returns a file path when calling a function', () => { + const type = 'entities'; + const fileIndex = 0; + const filePathFactory = createFilePathFactory(type); + + const path = filePathFactory(fileIndex); + + expect(path).toBe(`${type}/${type}_0000${fileIndex}.jsonl`); + }); + + describe('returns file paths when calling the factory', () => { + const cases = [ + ['schemas', 0, 'schemas/schemas_00000.jsonl'], + ['entities', 5, 'entities/entities_00005.jsonl'], + ['links', 11, 'links/links_00011.jsonl'], + ['schemas', 543, 'schemas/schemas_00543.jsonl'], + ['entities', 5213, 'entities/entities_05213.jsonl'], + ['links', 33231, 'links/links_33231.jsonl'], + ]; + + test.each(cases)( + 'Given type: %s and fileIndex: %d, returns the right file path: %s', + (type: string, fileIndex: number, filePath) => { + const filePathFactory = createFilePathFactory(type); + + const path = filePathFactory(fileIndex); + + expect(path).toBe(filePath); + } + ); + }); + }); + describe('Create Tar Entry Stream', () => { + it('Throws an error when the paylod is too large', async () => { + const maxSize = 3; + const chunk = 'test'; + const archive = tar.pack(); + const pathFactory = createFilePathFactory('entries'); + const tarEntryStream = createTarEntryStream(archive, pathFactory, maxSize); + + const write = async () => + await new Promise((resolve, reject) => { + tarEntryStream.on('finish', resolve); + tarEntryStream.on('error', reject); + tarEntryStream.write(chunk); + }); + + await expect(write).rejects.toThrow(`payload too large: ${chunk.length}>${maxSize}`); + }); + it('Resolves when the paylod is smaller than the max size', async () => { + const maxSize = 30; + const chunk = 'test'; + const archive = tar.pack(); + const pathFactory = createFilePathFactory('entries'); + const tarEntryStream = createTarEntryStream(archive, pathFactory, maxSize); + + const write = async () => + await new Promise((resolve, reject) => { + tarEntryStream.on('finish', resolve); + tarEntryStream.on('error', reject); + tarEntryStream.write(chunk); + }); + + expect(write).resolves; + }); + }); +}); diff --git a/packages/core/data-transfer/lib/providers/local-file-destination-provider/index.ts b/packages/core/data-transfer/lib/providers/local-file-destination-provider/index.ts new file mode 100644 index 0000000000..f7df8fc28a --- /dev/null +++ b/packages/core/data-transfer/lib/providers/local-file-destination-provider/index.ts @@ -0,0 +1,211 @@ +import type { + IDestinationProvider, + IDestinationProviderTransferResults, + IMetadata, + ProviderType, +} from '../../../types'; + +import { Readable } from 'stream'; +import fs from 'fs-extra'; +import tar from 'tar-stream'; +import zlib from 'zlib'; +import { stringer } from 'stream-json/jsonl/Stringer'; +import { chain } from 'stream-chain'; + +import { createEncryptionCipher } from '../../encryption/encrypt'; +import { createFilePathFactory, createTarEntryStream } from './utils'; + +export interface ILocalFileDestinationProviderOptions { + // Encryption + encryption: { + enabled: boolean; + key?: string; + }; + + // Compression + compression: { + enabled: boolean; + }; + + // File + file: { + path: string; + maxSize?: number; + maxSizeJsonl?: number; + }; +} + +export interface ILocalFileDestinationProviderTransferResults + extends IDestinationProviderTransferResults { + file?: { + path?: string; + }; +} + +export const createLocalFileDestinationProvider = ( + options: ILocalFileDestinationProviderOptions +) => { + return new LocalFileDestinationProvider(options); +}; + +class LocalFileDestinationProvider implements IDestinationProvider { + name: string = 'destination::local-file'; + type: ProviderType = 'destination'; + options: ILocalFileDestinationProviderOptions; + results: ILocalFileDestinationProviderTransferResults = {}; + + #providersMetadata: { source?: IMetadata; destination?: IMetadata } = {}; + #archive?: tar.Pack; + + constructor(options: ILocalFileDestinationProviderOptions) { + this.options = options; + } + + setMetadata(target: ProviderType, metadata: IMetadata): IDestinationProvider { + this.#providersMetadata[target] = metadata; + + return this; + } + + #getDataTransformers(options: { jsonl?: boolean } = {}) { + const { jsonl = true } = options; + const transforms = []; + + if (jsonl) { + // Convert to stringified JSON lines + transforms.push(stringer()); + } + + return transforms; + } + + createGzip(): zlib.Gzip { + return zlib.createGzip(); + } + + bootstrap(): void | Promise { + const { compression, encryption } = this.options; + + if (encryption.enabled && !encryption.key) { + throw new Error("Can't encrypt without a key"); + } + + this.#archive = tar.pack(); + + const outStream = fs.createWriteStream(this.#archivePath); + + const archiveTransforms = []; + + if (compression.enabled) { + archiveTransforms.push(this.createGzip()); + } + + if (encryption.enabled && encryption.key) { + archiveTransforms.push(createEncryptionCipher(encryption.key)); + } + + chain([this.#archive, ...archiveTransforms, outStream]); + this.results.file = { path: this.#archivePath }; + } + + async close() { + await this.#writeMetadata(); + this.#archive?.finalize(); + } + + async rollback(): Promise { + await this.close(); + fs.rmSync(this.#archivePath, { force: true }); + } + + getMetadata() { + return null; + } + + get #archivePath() { + const { encryption, compression, file } = this.options; + + let path = `${file.path}.tar`; + + if (compression.enabled) { + path += '.gz'; + } + + if (encryption.enabled) { + path += '.gpg'; + } + + return path; + } + + async #writeMetadata(): Promise { + const metadata = this.#providersMetadata.source; + + if (metadata) { + await new Promise((resolve) => { + const outStream = this.#getMetadataStream(); + const data = JSON.stringify(metadata, null, 2); + + Readable.from(data).pipe(outStream).on('close', resolve); + }); + } + } + + #getMetadataStream() { + return createTarEntryStream(this.#archive!, () => 'metadata.json'); + } + + getSchemasStream() { + // const filePathFactory = createFilePathFactory(this.options.file.path, 'schemas'); + const filePathFactory = createFilePathFactory('schemas'); + + // FS write stream + const entryStream = createTarEntryStream( + this.#archive!, + filePathFactory, + this.options.file.maxSize + ); + + return chain([stringer(), entryStream]); + } + + getEntitiesStream(): NodeJS.WritableStream { + // const filePathFactory = createFilePathFactory(this.options.file.path, 'entities'); + const filePathFactory = createFilePathFactory('entities'); + + // FS write stream + const entryStream = createTarEntryStream( + this.#archive!, + filePathFactory, + this.options.file.maxSize + ); + + return chain([stringer(), entryStream]); + } + + getLinksStream(): NodeJS.WritableStream { + const filePathFactory = createFilePathFactory('links'); + + // FS write stream + const entryStream = createTarEntryStream( + this.#archive!, + filePathFactory, + this.options.file.maxSize + ); + + return chain([stringer(), entryStream]); + } + + getConfigurationStream(): NodeJS.WritableStream { + const filePathFactory = createFilePathFactory('configuration'); + + // FS write stream + const entryStream = createTarEntryStream( + this.#archive!, + filePathFactory, + this.options.file.maxSize + ); + + return chain([stringer(), entryStream]); + } +} diff --git a/packages/core/data-transfer/lib/providers/local-file-destination-provider/utils.ts b/packages/core/data-transfer/lib/providers/local-file-destination-provider/utils.ts new file mode 100644 index 0000000000..2920087f32 --- /dev/null +++ b/packages/core/data-transfer/lib/providers/local-file-destination-provider/utils.ts @@ -0,0 +1,76 @@ +import { Writable } from 'stream'; +import path from 'path'; +import tar from 'tar-stream'; + +/** + * Create a file path factory for a given path & prefix. + * Upon being called, the factory will return a file path for a given index + */ +export const createFilePathFactory = + (type: string) => + (fileIndex: number = 0): string => { + return path.join( + // "{type}" directory + type, + // "${type}_XXXXX.jsonl" file + `${type}_${String(fileIndex).padStart(5, '0')}.jsonl` + ); + }; + +export const createTarEntryStream = ( + archive: tar.Pack, + pathFactory: (index?: number) => string, + maxSize: number = 2.56e8 +) => { + let fileIndex = 0; + let buffer = ''; + + const flush = async () => { + if (!buffer) { + return; + } + + const name = pathFactory(fileIndex++); + const size = buffer.length; + + await new Promise((resolve, reject) => { + archive.entry({ name, size }, buffer, (err) => { + if (err) { + reject(err); + } + + resolve(); + }); + }); + + buffer = ''; + }; + + const push = (chunk: string | Buffer) => { + buffer += chunk; + }; + + return new Writable({ + async destroy(err, callback) { + await flush(); + callback(err); + }, + + async write(chunk, _encoding, callback) { + const size = chunk.length; + + if (chunk.length > maxSize) { + callback(new Error(`payload too large: ${chunk.length}>${maxSize}`)); + return; + } + + if (buffer.length + size > maxSize) { + await flush(); + } + + push(chunk); + + callback(null); + }, + }); +}; From 25eca9a709f1b83477160cf5803bac57b36b83fe Mon Sep 17 00:00:00 2001 From: Christian Capeans Date: Wed, 23 Nov 2022 13:48:48 +0100 Subject: [PATCH 2/6] Change jest config to use swc --- packages/core/data-transfer/jest.config.js | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/packages/core/data-transfer/jest.config.js b/packages/core/data-transfer/jest.config.js index 70770ce9f4..fbb0e238e8 100644 --- a/packages/core/data-transfer/jest.config.js +++ b/packages/core/data-transfer/jest.config.js @@ -5,8 +5,10 @@ const pkg = require('./package.json'); module.exports = { ...baseConfig, - preset: 'ts-jest', displayName: (pkg.strapi && pkg.strapi.name) || pkg.name, roots: [__dirname], testMatch: ['**/__tests__/**/*.test.ts'], + transform: { + '^.+\\.(t|j)sx?$': ['@swc/jest'], + }, }; From 693e403416a5bda837424968bec8f20c0c75a76c Mon Sep 17 00:00:00 2001 From: Christian Capeans Date: Wed, 23 Nov 2022 14:18:41 +0100 Subject: [PATCH 3/6] Adapt tests to the last implementation changes --- .../__tests__/index.test.ts | 26 +++-- .../local-file-destination-provider/index.ts | 101 ++++++++++++------ 2 files changed, 83 insertions(+), 44 deletions(-) diff --git a/packages/core/data-transfer/lib/providers/local-file-destination-provider/__tests__/index.test.ts b/packages/core/data-transfer/lib/providers/local-file-destination-provider/__tests__/index.test.ts index f97de6d007..ec0bd708d3 100644 --- a/packages/core/data-transfer/lib/providers/local-file-destination-provider/__tests__/index.test.ts +++ b/packages/core/data-transfer/lib/providers/local-file-destination-provider/__tests__/index.test.ts @@ -45,10 +45,10 @@ describe('Local File Destination Provider', () => { await provider.bootstrap(); - expect(provider.results.file.path).toEqual(`${filePath}.tar.gz`); + expect(provider.results.file!.path).toEqual(`${filePath}.tar.gz`); }); - it('Adds .gpg extension to the archive path when encryption is enabled', async () => { + it('Adds .enc extension to the archive path when encryption is enabled', async () => { const providerOptions: ILocalFileDestinationProviderOptions = { encryption: { enabled: true, key: 'key' }, compression: { enabled: false }, @@ -58,10 +58,10 @@ describe('Local File Destination Provider', () => { await provider.bootstrap(); - expect(provider.results.file.path).toEqual(`${filePath}.tar.gpg`); + expect(provider.results.file!.path).toEqual(`${filePath}.tar.enc`); }); - it('Adds .gz.gpg extension to the archive path when encryption and compression are enabled', async () => { + it('Adds .gz.enc extension to the archive path when encryption and compression are enabled', async () => { const providerOptions: ILocalFileDestinationProviderOptions = { encryption: { enabled: true, key: 'key' }, compression: { enabled: true }, @@ -71,7 +71,7 @@ describe('Local File Destination Provider', () => { await provider.bootstrap(); - expect(provider.results.file.path).toEqual(`${filePath}.tar.gz.gpg`); + expect(provider.results.file!.path).toEqual(`${filePath}.tar.gz.enc`); }); it('Adds the compression step to the stream chain when compression is enabled', async () => { @@ -105,7 +105,7 @@ describe('Local File Destination Provider', () => { }); describe('Streaming entities', () => { - it('Creates a tar entry stream', () => { + it('Creates a tar entry stream', async () => { const providerOptions: ILocalFileDestinationProviderOptions = { encryption: { enabled: false }, compression: { enabled: false }, @@ -114,6 +114,8 @@ describe('Local File Destination Provider', () => { const provider = createLocalFileDestinationProvider(providerOptions); (createTarEntryStream as jest.Mock).mockImplementation(jest.fn()); + + await provider.bootstrap(); provider.getEntitiesStream(); expect(createTarEntryStream).toHaveBeenCalled(); @@ -122,7 +124,7 @@ describe('Local File Destination Provider', () => { }); describe('Streaming schemas', () => { - it('Creates a tar entry stream for schemas', () => { + it('Creates a tar entry stream for schemas', async () => { const providerOptions: ILocalFileDestinationProviderOptions = { encryption: { enabled: false }, compression: { enabled: false }, @@ -131,6 +133,8 @@ describe('Local File Destination Provider', () => { const provider = createLocalFileDestinationProvider(providerOptions); (createTarEntryStream as jest.Mock).mockImplementation(jest.fn()); + + await provider.bootstrap(); provider.getSchemasStream(); expect(createTarEntryStream).toHaveBeenCalled(); @@ -139,7 +143,7 @@ describe('Local File Destination Provider', () => { }); describe('Streaming links', () => { - it('Creates a tar entry stream for links', () => { + it('Creates a tar entry stream for links', async () => { const providerOptions: ILocalFileDestinationProviderOptions = { encryption: { enabled: false }, compression: { enabled: false }, @@ -148,6 +152,8 @@ describe('Local File Destination Provider', () => { const provider = createLocalFileDestinationProvider(providerOptions); (createTarEntryStream as jest.Mock).mockImplementation(jest.fn()); + + await provider.bootstrap(); provider.getLinksStream(); expect(createTarEntryStream).toHaveBeenCalled(); @@ -156,7 +162,7 @@ describe('Local File Destination Provider', () => { }); describe('Streaming configuration', () => { - it('Creates a tar entry stream for configuration', () => { + it('Creates a tar entry stream for configuration', async () => { const providerOptions: ILocalFileDestinationProviderOptions = { encryption: { enabled: false }, compression: { enabled: false }, @@ -165,6 +171,8 @@ describe('Local File Destination Provider', () => { const provider = createLocalFileDestinationProvider(providerOptions); (createTarEntryStream as jest.Mock).mockImplementation(jest.fn()); + + await provider.bootstrap(); provider.getConfigurationStream(); expect(createTarEntryStream).toHaveBeenCalled(); diff --git a/packages/core/data-transfer/lib/providers/local-file-destination-provider/index.ts b/packages/core/data-transfer/lib/providers/local-file-destination-provider/index.ts index f7df8fc28a..1f62c63e34 100644 --- a/packages/core/data-transfer/lib/providers/local-file-destination-provider/index.ts +++ b/packages/core/data-transfer/lib/providers/local-file-destination-provider/index.ts @@ -3,18 +3,19 @@ import type { IDestinationProviderTransferResults, IMetadata, ProviderType, + Stream, } from '../../../types'; -import { Readable } from 'stream'; import fs from 'fs-extra'; +import path from 'path'; import tar from 'tar-stream'; import zlib from 'zlib'; +import { Writable, Readable } from 'stream'; import { stringer } from 'stream-json/jsonl/Stringer'; import { chain } from 'stream-chain'; import { createEncryptionCipher } from '../../encryption/encrypt'; import { createFilePathFactory, createTarEntryStream } from './utils'; - export interface ILocalFileDestinationProviderOptions { // Encryption encryption: { @@ -55,12 +56,28 @@ class LocalFileDestinationProvider implements IDestinationProvider { results: ILocalFileDestinationProviderTransferResults = {}; #providersMetadata: { source?: IMetadata; destination?: IMetadata } = {}; - #archive?: tar.Pack; + #archive: { stream?: tar.Pack } = {}; constructor(options: ILocalFileDestinationProviderOptions) { this.options = options; } + get #archivePath() { + const { encryption, compression, file } = this.options; + + let path = `${file.path}.tar`; + + if (compression.enabled) { + path += '.gz'; + } + + if (encryption.enabled) { + path += '.enc'; + } + + return path; + } + setMetadata(target: ProviderType, metadata: IMetadata): IDestinationProvider { this.#providersMetadata[target] = metadata; @@ -69,7 +86,7 @@ class LocalFileDestinationProvider implements IDestinationProvider { #getDataTransformers(options: { jsonl?: boolean } = {}) { const { jsonl = true } = options; - const transforms = []; + const transforms: Stream[] = []; if (jsonl) { // Convert to stringified JSON lines @@ -90,11 +107,11 @@ class LocalFileDestinationProvider implements IDestinationProvider { throw new Error("Can't encrypt without a key"); } - this.#archive = tar.pack(); + this.#archive.stream = tar.pack(); const outStream = fs.createWriteStream(this.#archivePath); - const archiveTransforms = []; + const archiveTransforms: Stream[] = []; if (compression.enabled) { archiveTransforms.push(this.createGzip()); @@ -104,13 +121,27 @@ class LocalFileDestinationProvider implements IDestinationProvider { archiveTransforms.push(createEncryptionCipher(encryption.key)); } - chain([this.#archive, ...archiveTransforms, outStream]); + chain([this.#archive.stream, ...archiveTransforms, outStream]); + this.results.file = { path: this.#archivePath }; } async close() { + const { stream } = this.#archive; + + if (!stream) { + return; + } + await this.#writeMetadata(); - this.#archive?.finalize(); + + stream.finalize(); + + if (!stream.closed) { + await new Promise((resolve, reject) => { + stream.on('close', resolve).on('error', reject); + }); + } } async rollback(): Promise { @@ -122,22 +153,6 @@ class LocalFileDestinationProvider implements IDestinationProvider { return null; } - get #archivePath() { - const { encryption, compression, file } = this.options; - - let path = `${file.path}.tar`; - - if (compression.enabled) { - path += '.gz'; - } - - if (encryption.enabled) { - path += '.gpg'; - } - - return path; - } - async #writeMetadata(): Promise { const metadata = this.#providersMetadata.source; @@ -152,16 +167,24 @@ class LocalFileDestinationProvider implements IDestinationProvider { } #getMetadataStream() { - return createTarEntryStream(this.#archive!, () => 'metadata.json'); + const { stream } = this.#archive; + + if (!stream) { + throw new Error('Archive stream is unavailable'); + } + + return createTarEntryStream(stream, () => 'metadata.json'); } getSchemasStream() { - // const filePathFactory = createFilePathFactory(this.options.file.path, 'schemas'); + if (!this.#archive.stream) { + throw new Error('Archive stream is unavailable'); + } + const filePathFactory = createFilePathFactory('schemas'); - // FS write stream const entryStream = createTarEntryStream( - this.#archive!, + this.#archive.stream, filePathFactory, this.options.file.maxSize ); @@ -170,12 +193,14 @@ class LocalFileDestinationProvider implements IDestinationProvider { } getEntitiesStream(): NodeJS.WritableStream { - // const filePathFactory = createFilePathFactory(this.options.file.path, 'entities'); + if (!this.#archive.stream) { + throw new Error('Archive stream is unavailable'); + } + const filePathFactory = createFilePathFactory('entities'); - // FS write stream const entryStream = createTarEntryStream( - this.#archive!, + this.#archive.stream, filePathFactory, this.options.file.maxSize ); @@ -184,11 +209,14 @@ class LocalFileDestinationProvider implements IDestinationProvider { } getLinksStream(): NodeJS.WritableStream { + if (!this.#archive.stream) { + throw new Error('Archive stream is unavailable'); + } + const filePathFactory = createFilePathFactory('links'); - // FS write stream const entryStream = createTarEntryStream( - this.#archive!, + this.#archive.stream, filePathFactory, this.options.file.maxSize ); @@ -197,11 +225,14 @@ class LocalFileDestinationProvider implements IDestinationProvider { } getConfigurationStream(): NodeJS.WritableStream { + if (!this.#archive.stream) { + throw new Error('Archive stream is unavailable'); + } + const filePathFactory = createFilePathFactory('configuration'); - // FS write stream const entryStream = createTarEntryStream( - this.#archive!, + this.#archive.stream, filePathFactory, this.options.file.maxSize ); From 83d4e6ce2a2770408d5c612f032e5ddea05ec3e0 Mon Sep 17 00:00:00 2001 From: Christian Capeans Date: Wed, 23 Nov 2022 14:19:48 +0100 Subject: [PATCH 4/6] Delete unnecessary file --- .../local-file-destination-provider.ts | 311 ------------------ 1 file changed, 311 deletions(-) delete mode 100644 packages/core/data-transfer/lib/providers/local-file-destination-provider.ts diff --git a/packages/core/data-transfer/lib/providers/local-file-destination-provider.ts b/packages/core/data-transfer/lib/providers/local-file-destination-provider.ts deleted file mode 100644 index e684e53304..0000000000 --- a/packages/core/data-transfer/lib/providers/local-file-destination-provider.ts +++ /dev/null @@ -1,311 +0,0 @@ -import type { - IDestinationProvider, - IDestinationProviderTransferResults, - IMetadata, - ProviderType, - Stream, -} from '../../types'; - -import fs from 'fs-extra'; -import path from 'path'; -import tar from 'tar-stream'; -import zlib from 'zlib'; -import { Writable, Readable } from 'stream'; -import { stringer } from 'stream-json/jsonl/Stringer'; -import { chain } from 'stream-chain'; - -import { createEncryptionCipher } from '../encryption/encrypt'; - -export interface ILocalFileDestinationProviderOptions { - // Encryption - encryption: { - enabled: boolean; - key?: string; - }; - - // Compression - compression: { - enabled: boolean; - }; - - // File - file: { - path: string; - maxSize?: number; - maxSizeJsonl?: number; - }; -} - -export interface ILocalFileDestinationProviderTransferResults - extends IDestinationProviderTransferResults { - file?: { - path?: string; - }; -} - -export const createLocalFileDestinationProvider = ( - options: ILocalFileDestinationProviderOptions -) => { - return new LocalFileDestinationProvider(options); -}; - -class LocalFileDestinationProvider implements IDestinationProvider { - name: string = 'destination::local-file'; - type: ProviderType = 'destination'; - options: ILocalFileDestinationProviderOptions; - results: ILocalFileDestinationProviderTransferResults = {}; - - #providersMetadata: { source?: IMetadata; destination?: IMetadata } = {}; - #archive: { stream?: tar.Pack } = {}; - - constructor(options: ILocalFileDestinationProviderOptions) { - this.options = options; - } - - get #archivePath() { - const { encryption, compression, file } = this.options; - - let path = `${file.path}.tar`; - - if (compression.enabled) { - path += '.gz'; - } - - if (encryption.enabled) { - path += '.enc'; - } - - return path; - } - - setMetadata(target: ProviderType, metadata: IMetadata): IDestinationProvider { - this.#providersMetadata[target] = metadata; - - return this; - } - - #getDataTransformers(options: { jsonl?: boolean } = {}) { - const { jsonl = true } = options; - const transforms: Stream[] = []; - - if (jsonl) { - // Convert to stringified JSON lines - transforms.push(stringer()); - } - - return transforms; - } - - bootstrap(): void | Promise { - const { compression, encryption } = this.options; - - if (encryption.enabled && !encryption.key) { - throw new Error("Can't encrypt without a key"); - } - - this.#archive.stream = tar.pack(); - - const outStream = fs.createWriteStream(this.#archivePath); - - const archiveTransforms: Stream[] = []; - - if (compression.enabled) { - archiveTransforms.push(zlib.createGzip()); - } - - if (encryption.enabled && encryption.key) { - archiveTransforms.push(createEncryptionCipher(encryption.key)); - } - - chain([this.#archive.stream, ...archiveTransforms, outStream]); - - this.results.file = { path: this.#archivePath }; - } - - async close() { - const { stream } = this.#archive; - - if (!stream) { - return; - } - - await this.#writeMetadata(); - - stream.finalize(); - - if (!stream.closed) { - await new Promise((resolve, reject) => { - stream.on('close', resolve).on('error', reject); - }); - } - } - - async rollback(): Promise { - await this.close(); - fs.rmSync(this.#archivePath, { force: true }); - } - - getMetadata() { - return null; - } - - async #writeMetadata(): Promise { - const metadata = this.#providersMetadata.source; - - if (metadata) { - await new Promise((resolve) => { - const outStream = this.#getMetadataStream(); - const data = JSON.stringify(metadata, null, 2); - - Readable.from(data).pipe(outStream).on('close', resolve); - }); - } - } - - #getMetadataStream() { - const { stream } = this.#archive; - - if (!stream) { - throw new Error('Archive stream is unavailable'); - } - - return createTarEntryStream(stream, () => 'metadata.json'); - } - - getSchemasStream() { - if (!this.#archive.stream) { - throw new Error('Archive stream is unavailable'); - } - - const filePathFactory = createFilePathFactory('schemas'); - - const entryStream = createTarEntryStream( - this.#archive.stream, - filePathFactory, - this.options.file.maxSize - ); - - return chain([stringer(), entryStream]); - } - - getEntitiesStream(): NodeJS.WritableStream { - if (!this.#archive.stream) { - throw new Error('Archive stream is unavailable'); - } - - const filePathFactory = createFilePathFactory('entities'); - - const entryStream = createTarEntryStream( - this.#archive.stream, - filePathFactory, - this.options.file.maxSize - ); - - return chain([stringer(), entryStream]); - } - - getLinksStream(): NodeJS.WritableStream { - if (!this.#archive.stream) { - throw new Error('Archive stream is unavailable'); - } - - const filePathFactory = createFilePathFactory('links'); - - const entryStream = createTarEntryStream( - this.#archive.stream, - filePathFactory, - this.options.file.maxSize - ); - - return chain([stringer(), entryStream]); - } - - getConfigurationStream(): NodeJS.WritableStream { - if (!this.#archive.stream) { - throw new Error('Archive stream is unavailable'); - } - - const filePathFactory = createFilePathFactory('configuration'); - - const entryStream = createTarEntryStream( - this.#archive.stream, - filePathFactory, - this.options.file.maxSize - ); - - return chain([stringer(), entryStream]); - } -} - -/** - * Create a file path factory for a given path & prefix. - * Upon being called, the factory will return a file path for a given index - */ -const createFilePathFactory = - (type: string) => - (fileIndex: number = 0): string => { - return path.join( - // "{type}" directory - type, - // "${type}_XXXXX.jsonl" file - `${type}_${String(fileIndex).padStart(5, '0')}.jsonl` - ); - }; - -const createTarEntryStream = ( - archive: tar.Pack, - pathFactory: (index?: number) => string, - maxSize: number = 2.56e8 -) => { - let fileIndex = 0; - let buffer = ''; - - const flush = async () => { - if (!buffer) { - return; - } - - const name = pathFactory(fileIndex++); - const size = buffer.length; - - await new Promise((resolve, reject) => { - archive.entry({ name, size }, buffer, (err) => { - if (err) { - reject(err); - } - - resolve(); - }); - }); - - buffer = ''; - }; - - const push = (chunk: string | Buffer) => { - buffer += chunk; - }; - - return new Writable({ - async destroy(err, callback) { - await flush(); - callback(err); - }, - - async write(chunk, _encoding, callback) { - const size = chunk.length; - - if (chunk.length > maxSize) { - callback(new Error(`payload too large: ${chunk.length}>${maxSize}`)); - return; - } - - if (buffer.length + size > maxSize) { - await flush(); - } - - push(chunk); - - callback(null); - }, - }); -}; From 3eaccb5f38bc88114105faf6e48f7118eddb3330 Mon Sep 17 00:00:00 2001 From: Christian Capeans Date: Thu, 24 Nov 2022 11:39:16 +0100 Subject: [PATCH 5/6] Add tests and refactor --- .../__tests__/index.test.ts | 93 +++++++++++++------ .../__tests__/utils.test.ts | 6 +- .../local-file-destination-provider/index.ts | 3 +- 3 files changed, 69 insertions(+), 33 deletions(-) diff --git a/packages/core/data-transfer/lib/providers/local-file-destination-provider/__tests__/index.test.ts b/packages/core/data-transfer/lib/providers/local-file-destination-provider/__tests__/index.test.ts index ec0bd708d3..c81b2ad6d0 100644 --- a/packages/core/data-transfer/lib/providers/local-file-destination-provider/__tests__/index.test.ts +++ b/packages/core/data-transfer/lib/providers/local-file-destination-provider/__tests__/index.test.ts @@ -1,3 +1,5 @@ +import stream from 'stream'; + import { createLocalFileDestinationProvider, ILocalFileDestinationProviderOptions } from '../'; import * as encryption from '../../../encryption/encrypt'; import { @@ -105,15 +107,15 @@ describe('Local File Destination Provider', () => { }); describe('Streaming entities', () => { - it('Creates a tar entry stream', async () => { - const providerOptions: ILocalFileDestinationProviderOptions = { - encryption: { enabled: false }, - compression: { enabled: false }, - file: { path: filePath }, - }; + const providerOptions: ILocalFileDestinationProviderOptions = { + encryption: { enabled: false }, + compression: { enabled: false }, + file: { path: filePath }, + }; + (createTarEntryStream as jest.Mock).mockImplementation(jest.fn()); + it('Creates a tar entry stream', async () => { const provider = createLocalFileDestinationProvider(providerOptions); - (createTarEntryStream as jest.Mock).mockImplementation(jest.fn()); await provider.bootstrap(); provider.getEntitiesStream(); @@ -121,18 +123,26 @@ describe('Local File Destination Provider', () => { expect(createTarEntryStream).toHaveBeenCalled(); expect(createFilePathFactory).toHaveBeenCalledWith('entities'); }); + it('Returns a stream', async () => { + const provider = createLocalFileDestinationProvider(providerOptions); + + await provider.bootstrap(); + const entitiesStream = provider.getEntitiesStream(); + + expect(entitiesStream instanceof stream.Writable).toBeTruthy(); + }); }); describe('Streaming schemas', () => { - it('Creates a tar entry stream for schemas', async () => { - const providerOptions: ILocalFileDestinationProviderOptions = { - encryption: { enabled: false }, - compression: { enabled: false }, - file: { path: filePath }, - }; + const providerOptions: ILocalFileDestinationProviderOptions = { + encryption: { enabled: false }, + compression: { enabled: false }, + file: { path: filePath }, + }; + (createTarEntryStream as jest.Mock).mockImplementation(jest.fn()); + it('Creates a tar entry stream for schemas', async () => { const provider = createLocalFileDestinationProvider(providerOptions); - (createTarEntryStream as jest.Mock).mockImplementation(jest.fn()); await provider.bootstrap(); provider.getSchemasStream(); @@ -140,18 +150,27 @@ describe('Local File Destination Provider', () => { expect(createTarEntryStream).toHaveBeenCalled(); expect(createFilePathFactory).toHaveBeenCalledWith('schemas'); }); + + it('Returns a stream', async () => { + const provider = createLocalFileDestinationProvider(providerOptions); + + await provider.bootstrap(); + const schemasStream = provider.getSchemasStream(); + + expect(schemasStream instanceof stream.Writable).toBeTruthy(); + }); }); describe('Streaming links', () => { - it('Creates a tar entry stream for links', async () => { - const providerOptions: ILocalFileDestinationProviderOptions = { - encryption: { enabled: false }, - compression: { enabled: false }, - file: { path: filePath }, - }; + const providerOptions: ILocalFileDestinationProviderOptions = { + encryption: { enabled: false }, + compression: { enabled: false }, + file: { path: filePath }, + }; + (createTarEntryStream as jest.Mock).mockImplementation(jest.fn()); + it('Creates a tar entry stream for links', async () => { const provider = createLocalFileDestinationProvider(providerOptions); - (createTarEntryStream as jest.Mock).mockImplementation(jest.fn()); await provider.bootstrap(); provider.getLinksStream(); @@ -159,18 +178,27 @@ describe('Local File Destination Provider', () => { expect(createTarEntryStream).toHaveBeenCalled(); expect(createFilePathFactory).toHaveBeenCalledWith('links'); }); + + it('Returns a stream', async () => { + const provider = createLocalFileDestinationProvider(providerOptions); + + await provider.bootstrap(); + const linksStream = provider.getLinksStream(); + + expect(linksStream instanceof stream.Writable).toBeTruthy(); + }); }); describe('Streaming configuration', () => { - it('Creates a tar entry stream for configuration', async () => { - const providerOptions: ILocalFileDestinationProviderOptions = { - encryption: { enabled: false }, - compression: { enabled: false }, - file: { path: filePath }, - }; + const providerOptions: ILocalFileDestinationProviderOptions = { + encryption: { enabled: false }, + compression: { enabled: false }, + file: { path: filePath }, + }; + (createTarEntryStream as jest.Mock).mockImplementation(jest.fn()); + it('Creates a tar entry stream for configuration', async () => { const provider = createLocalFileDestinationProvider(providerOptions); - (createTarEntryStream as jest.Mock).mockImplementation(jest.fn()); await provider.bootstrap(); provider.getConfigurationStream(); @@ -178,5 +206,14 @@ describe('Local File Destination Provider', () => { expect(createTarEntryStream).toHaveBeenCalled(); expect(createFilePathFactory).toHaveBeenCalledWith('configuration'); }); + + it('Returns a stream', async () => { + const provider = createLocalFileDestinationProvider(providerOptions); + + await provider.bootstrap(); + const configurationStream = provider.getConfigurationStream(); + + expect(configurationStream instanceof stream.Writable).toBeTruthy(); + }); }); }); diff --git a/packages/core/data-transfer/lib/providers/local-file-destination-provider/__tests__/utils.test.ts b/packages/core/data-transfer/lib/providers/local-file-destination-provider/__tests__/utils.test.ts index 7c1c60a07e..974778991d 100644 --- a/packages/core/data-transfer/lib/providers/local-file-destination-provider/__tests__/utils.test.ts +++ b/packages/core/data-transfer/lib/providers/local-file-destination-provider/__tests__/utils.test.ts @@ -29,7 +29,7 @@ describe('Local File Destination Provider - Utils', () => { test.each(cases)( 'Given type: %s and fileIndex: %d, returns the right file path: %s', - (type: string, fileIndex: number, filePath) => { + (type: any, fileIndex: any, filePath: any) => { const filePathFactory = createFilePathFactory(type); const path = filePathFactory(fileIndex); @@ -40,7 +40,7 @@ describe('Local File Destination Provider - Utils', () => { }); }); describe('Create Tar Entry Stream', () => { - it('Throws an error when the paylod is too large', async () => { + it('Throws an error when the payload is too large', async () => { const maxSize = 3; const chunk = 'test'; const archive = tar.pack(); @@ -56,7 +56,7 @@ describe('Local File Destination Provider - Utils', () => { await expect(write).rejects.toThrow(`payload too large: ${chunk.length}>${maxSize}`); }); - it('Resolves when the paylod is smaller than the max size', async () => { + it('Resolves when the payload is smaller than the max size', async () => { const maxSize = 30; const chunk = 'test'; const archive = tar.pack(); diff --git a/packages/core/data-transfer/lib/providers/local-file-destination-provider/index.ts b/packages/core/data-transfer/lib/providers/local-file-destination-provider/index.ts index 1f62c63e34..5ebd688c95 100644 --- a/packages/core/data-transfer/lib/providers/local-file-destination-provider/index.ts +++ b/packages/core/data-transfer/lib/providers/local-file-destination-provider/index.ts @@ -7,10 +7,9 @@ import type { } from '../../../types'; import fs from 'fs-extra'; -import path from 'path'; import tar from 'tar-stream'; import zlib from 'zlib'; -import { Writable, Readable } from 'stream'; +import { Readable } from 'stream'; import { stringer } from 'stream-json/jsonl/Stringer'; import { chain } from 'stream-chain'; From 3c3e3b6ca8e86e1db6ec28ae2435c25f0753047f Mon Sep 17 00:00:00 2001 From: Christian Capeans Date: Wed, 30 Nov 2022 13:05:42 +0100 Subject: [PATCH 6/6] Add missing dependencies --- .../lib/providers/local-file-destination-provider/index.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/packages/core/data-transfer/lib/providers/local-file-destination-provider/index.ts b/packages/core/data-transfer/lib/providers/local-file-destination-provider/index.ts index 4bdfe1675d..2dff2909cd 100644 --- a/packages/core/data-transfer/lib/providers/local-file-destination-provider/index.ts +++ b/packages/core/data-transfer/lib/providers/local-file-destination-provider/index.ts @@ -8,10 +8,11 @@ import type { import fs from 'fs-extra'; import tar from 'tar-stream'; +import path from 'path'; import zlib from 'zlib'; import { Readable } from 'stream'; import { stringer } from 'stream-json/jsonl/Stringer'; -import { chain } from 'stream-chain'; +import { chain, Writable } from 'stream-chain'; import { createEncryptionCipher } from '../../encryption/encrypt'; import { createFilePathFactory, createTarEntryStream } from './utils';