mirror of
https://github.com/strapi/strapi.git
synced 2025-12-28 23:57:32 +00:00
add schemas streaming
This commit is contained in:
parent
c38d159565
commit
2a0470186a
@ -110,6 +110,7 @@ class TransferEngine implements ITransferEngine {
|
||||
);
|
||||
}
|
||||
|
||||
await this.transferSchemas();
|
||||
await this.transferEntities()
|
||||
// Temporary while we don't have the final API for streaming data from the database
|
||||
.catch((e) => {
|
||||
@ -128,6 +129,31 @@ class TransferEngine implements ITransferEngine {
|
||||
}
|
||||
}
|
||||
|
||||
async transferSchemas(): Promise<void> {
|
||||
const inStream = await this.sourceProvider.streamSchemas?.();
|
||||
const outStream = await this.destinationProvider.getSchemasStream?.();
|
||||
console.log(inStream);
|
||||
if (!inStream || !outStream) {
|
||||
throw new Error('Unable to transfer schemas, one of the streams is missing');
|
||||
}
|
||||
|
||||
return new Promise((resolve, reject) => {
|
||||
inStream
|
||||
// Throw on error in the source
|
||||
.on('error', reject);
|
||||
|
||||
outStream
|
||||
// Throw on error in the destination
|
||||
.on('error', (e) => {
|
||||
reject(e);
|
||||
})
|
||||
// Resolve the promise when the destination has finished reading all the data from the source
|
||||
.on('close', resolve);
|
||||
|
||||
inStream.pipe(outStream);
|
||||
});
|
||||
}
|
||||
|
||||
async transferEntities(): Promise<void> {
|
||||
const inStream = await this.sourceProvider.streamEntities?.();
|
||||
const outStream = await this.destinationProvider.getEntitiesStream?.();
|
||||
|
||||
@ -4,6 +4,7 @@ import zip from 'zlib';
|
||||
import { Duplex } from 'stream';
|
||||
import { chain, Writable } from 'stream-chain';
|
||||
import { stringer } from 'stream-json/jsonl/Stringer';
|
||||
import { createCipher } from '../encryption/encrypt';
|
||||
|
||||
import type { IDestinationProvider, ProviderType, Stream } from '../../types';
|
||||
// import { encrypt } from '../encryption';
|
||||
@ -51,6 +52,7 @@ class LocalFileDestinationProvider implements IDestinationProvider {
|
||||
}
|
||||
|
||||
fs.mkdirSync(rootDir, { recursive: true });
|
||||
fs.mkdirSync(path.join(rootDir, 'schemas'));
|
||||
fs.mkdirSync(path.join(rootDir, 'entities'));
|
||||
fs.mkdirSync(path.join(rootDir, 'links'));
|
||||
fs.mkdirSync(path.join(rootDir, 'media'));
|
||||
@ -65,6 +67,39 @@ class LocalFileDestinationProvider implements IDestinationProvider {
|
||||
return null;
|
||||
}
|
||||
|
||||
getSchemasStream() {
|
||||
const filePathFactory = (fileIndex: number = 0) => {
|
||||
return path.join(
|
||||
// Backup path
|
||||
this.options.file.path,
|
||||
// "schemas/" directory
|
||||
'schemas',
|
||||
// "schemas_00000.jsonl" file
|
||||
`schemas_${String(fileIndex).padStart(5, '0')}.jsonl`
|
||||
);
|
||||
};
|
||||
|
||||
const streams: any[] = [
|
||||
// create jsonl strings from object entities
|
||||
stringer(),
|
||||
];
|
||||
|
||||
// Compression
|
||||
if (this.options.compression?.enabled) {
|
||||
streams.push(zip.createGzip());
|
||||
}
|
||||
|
||||
// Encryption;
|
||||
if (this.options.encryption?.enabled) {
|
||||
streams.push(createCipher(this.options.encryption.key));
|
||||
}
|
||||
|
||||
// FS write stream
|
||||
streams.push(createMultiFilesWriteStream(filePathFactory, this.options.file?.maxSize));
|
||||
|
||||
return chain(streams);
|
||||
}
|
||||
|
||||
getEntitiesStream(): Duplex {
|
||||
const filePathFactory = (fileIndex: number = 0) => {
|
||||
return path.join(
|
||||
|
||||
@ -1,7 +1,7 @@
|
||||
import type { ISourceProvider, ProviderType } from '../../../types';
|
||||
|
||||
import { chain } from 'stream-chain';
|
||||
|
||||
import { Readable } from 'stream';
|
||||
import { createEntitiesStream, createEntitiesTransformStream } from './entities';
|
||||
import { createLinksStream } from './links';
|
||||
|
||||
@ -64,4 +64,16 @@ class LocalStrapiSourceProvider implements ISourceProvider {
|
||||
|
||||
return createLinksStream(this.strapi);
|
||||
}
|
||||
|
||||
getSchemas() {
|
||||
if (!this.strapi) {
|
||||
throw new Error('Not able to get Schemas. Strapi instance not found');
|
||||
}
|
||||
|
||||
return [...Object.values(this.strapi.contentTypes), ...Object.values(this.strapi.components)];
|
||||
}
|
||||
|
||||
streamSchemas(): NodeJS.ReadableStream {
|
||||
return Readable.from(this.getSchemas());
|
||||
}
|
||||
}
|
||||
|
||||
@ -19,6 +19,8 @@ export interface ISourceProvider extends IProvider {
|
||||
streamLinks?(): NodeJS.ReadableStream | Promise<NodeJS.ReadableStream>;
|
||||
streamMedia?(): NodeJS.ReadableStream | Promise<NodeJS.ReadableStream>;
|
||||
streamConfiguration?(): NodeJS.ReadableStream | Promise<NodeJS.ReadableStream>;
|
||||
getSchemas?(): any;
|
||||
streamSchemas?(): NodeJS.ReadableStream;
|
||||
}
|
||||
|
||||
export interface IDestinationProvider extends IProvider {
|
||||
@ -32,4 +34,5 @@ export interface IDestinationProvider extends IProvider {
|
||||
getLinksStream?(): NodeJS.WritableStream | Promise<NodeJS.WritableStream>;
|
||||
getMediaStream?(): NodeJS.WritableStream | Promise<NodeJS.WritableStream>;
|
||||
getConfigurationStream?(): NodeJS.WritableStream | Promise<NodeJS.WritableStream>;
|
||||
getSchemasStream?(): NodeJS.WritableStream | Promise<NodeJS.WritableStream>;
|
||||
}
|
||||
|
||||
@ -48,6 +48,12 @@ export interface ITransferEngine {
|
||||
*/
|
||||
close(): Promise<void>;
|
||||
|
||||
/**
|
||||
* Start the schemas transfer by connecting the
|
||||
* related source and destination providers streams
|
||||
*/
|
||||
transferSchemas(): Promise<void>;
|
||||
|
||||
/**
|
||||
* Start the entities transfer by connecting the
|
||||
* related source and destination providers streams
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user