2022-11-11 13:41:01 +01:00
import _ from 'lodash/fp' ;
import { PassThrough } from 'stream-chain' ;
2022-10-24 17:11:59 +02:00
import type {
2022-10-13 11:01:35 +02:00
IDestinationProvider ,
2022-11-15 09:48:51 +01:00
IMetadata ,
2022-10-13 11:01:35 +02:00
ISourceProvider ,
ITransferEngine ,
ITransferEngineOptions ,
2022-11-08 14:26:48 +01:00
ITransferResults ,
2022-11-11 14:24:30 +01:00
TransferStage ,
2022-10-13 11:01:35 +02:00
} from '../../types' ;
2022-11-11 13:41:01 +01:00
type TransferProgress = {
2022-11-11 14:24:30 +01:00
[ key in TransferStage ] ? : {
2022-11-11 13:41:01 +01:00
count : number ;
2022-11-16 11:04:40 +01:00
bytes : number ;
2022-11-11 16:14:21 +01:00
aggregates ? : {
[ key : string ] : {
count : number ;
2022-11-16 11:04:40 +01:00
bytes : number ;
2022-11-11 16:14:21 +01:00
} ;
} ;
2022-11-11 13:41:01 +01:00
} ;
} ;
2022-11-15 18:42:34 +01:00
type TransferEngineProgress = {
data : any ;
stream : PassThrough ;
} ;
2022-11-08 14:26:48 +01:00
class TransferEngine <
S extends ISourceProvider = ISourceProvider ,
D extends IDestinationProvider = IDestinationProvider
> implements ITransferEngine
{
2022-10-13 11:01:35 +02:00
sourceProvider : ISourceProvider ;
destinationProvider : IDestinationProvider ;
options : ITransferEngineOptions ;
2022-11-15 09:48:51 +01:00
# metadata : { source? : IMetadata ; destination? : IMetadata } = { } ;
2022-11-15 14:30:42 +01:00
2022-11-15 18:42:34 +01:00
# transferProgress : TransferProgress = { } ;
2022-11-15 18:43:53 +01:00
// TODO: Type the stream chunks. Doesn't seem trivial, especially since PassThrough doesn't provide a PassThroughOptions type
2022-11-11 13:41:01 +01:00
# progressStream : PassThrough = new PassThrough ( { objectMode : true } ) ;
2022-11-15 18:42:34 +01:00
get progress ( ) : TransferEngineProgress {
return {
data : this. # transferProgress ,
stream : this. # progressStream ,
} ;
2022-11-11 13:41:01 +01:00
}
2022-10-13 11:01:35 +02:00
constructor (
sourceProvider : ISourceProvider ,
destinationProvider : IDestinationProvider ,
options : ITransferEngineOptions
) {
this . sourceProvider = sourceProvider ;
this . destinationProvider = destinationProvider ;
this . options = options ;
}
2022-11-15 14:30:42 +01:00
# increaseTransferProgress ( transferStage : TransferStage , data : any , aggregateKey? : string ) {
2022-11-15 18:42:34 +01:00
if ( ! this . # transferProgress [ transferStage ] ) {
this . # transferProgress [ transferStage ] = { count : 0 , bytes : 0 } ;
2022-11-11 13:41:01 +01:00
}
2022-11-15 18:42:34 +01:00
this . # transferProgress [ transferStage ] ! . count += 1 ;
2022-11-11 16:14:21 +01:00
const size = JSON . stringify ( data ) . length ;
2022-11-15 18:42:34 +01:00
this . # transferProgress [ transferStage ] ! . bytes ! += size ;
2022-11-11 16:14:21 +01:00
if ( aggregateKey && _ . has ( aggregateKey , data ) ) {
const aggKeyValue = data [ aggregateKey ] ;
2022-11-15 18:42:34 +01:00
if ( ! _ . has ( 'aggregates' , this . # transferProgress [ transferStage ] ) ) {
this . # transferProgress [ transferStage ] ! . aggregates = { } ;
2022-11-11 16:14:21 +01:00
}
2022-11-15 18:42:34 +01:00
if ( ! _ . has ( aggKeyValue , this . # transferProgress [ transferStage ] ! . aggregates ) ) {
this . # transferProgress [ transferStage ] ! . aggregates ! [ aggKeyValue ] = { count : 0 , bytes : 0 } ;
2022-11-11 16:14:21 +01:00
}
2022-11-15 18:42:34 +01:00
this . # transferProgress [ transferStage ] ! . aggregates ! [ aggKeyValue ] . count += 1 ;
this . # transferProgress [ transferStage ] ! . aggregates ! [ aggKeyValue ] . bytes ! += size ;
2022-11-11 16:14:21 +01:00
}
2022-11-11 13:41:01 +01:00
}
2022-11-15 14:30:42 +01:00
# countRecorder = ( transferStage : TransferStage , aggregateKey? : string ) = > {
2022-11-11 13:41:01 +01:00
return new PassThrough ( {
objectMode : true ,
2022-11-15 14:30:42 +01:00
transform : ( data , _encoding , callback ) = > {
this . # increaseTransferProgress ( transferStage , data , aggregateKey ) ;
2022-11-15 18:23:31 +01:00
this . # updateStage ( 'progress' , transferStage ) ;
2022-11-11 15:25:19 +01:00
callback ( null , data ) ;
2022-11-11 13:41:01 +01:00
} ,
} ) ;
} ;
2022-11-15 18:23:31 +01:00
# updateStage = ( type : 'start' | 'complete' | 'progress' , transferStage : TransferStage ) = > {
2022-11-15 18:42:34 +01:00
this . # progressStream . emit ( type , {
data : this. # transferProgress ,
2022-11-15 18:23:31 +01:00
stage : transferStage ,
2022-11-11 13:41:01 +01:00
} ) ;
} ;
2022-11-15 09:48:51 +01:00
# assertStrapiVersionIntegrity ( sourceVersion? : string , destinationVersion? : string ) {
2022-10-13 11:01:35 +02:00
const strategy = this . options . versionMatching ;
if ( ! sourceVersion || ! destinationVersion ) {
return ;
}
if ( strategy === 'ignore' ) {
return ;
}
if ( strategy === 'exact' && sourceVersion === destinationVersion ) {
return ;
}
const sourceTokens = sourceVersion . split ( '.' ) ;
const destinationTokens = destinationVersion . split ( '.' ) ;
const [ major , minor , patch ] = sourceTokens . map (
( value , index ) = > value === destinationTokens [ index ]
) ;
if (
( strategy === 'major' && major ) ||
( strategy === 'minor' && major && minor ) ||
( strategy === 'patch' && major && minor && patch )
) {
return ;
}
throw new Error (
` Strapi versions doesn't match ( ${ strategy } check): ${ sourceVersion } does not match with ${ destinationVersion } `
) ;
}
2022-11-16 11:16:14 +01:00
async init ( ) : Promise < void > {
// Resolve providers' resource and store
// them in the engine's internal state
await this . # resolveProviderResource ( ) ;
// Update the destination provider's source metadata
const { source : sourceMetadata } = this . # metadata ;
if ( sourceMetadata ) {
this . destinationProvider . setMetadata ? . ( 'source' , sourceMetadata ) ;
}
}
2022-11-16 11:08:08 +01:00
async bootstrap ( ) : Promise < void > {
2022-10-13 11:01:35 +02:00
await Promise . all ( [
// bootstrap source provider
this . sourceProvider . bootstrap ? . ( ) ,
// bootstrap destination provider
this . destinationProvider . bootstrap ? . ( ) ,
] ) ;
}
async close ( ) : Promise < void > {
await Promise . all ( [
// close source provider
this . sourceProvider . close ? . ( ) ,
// close destination provider
this . destinationProvider . close ? . ( ) ,
] ) ;
}
2022-11-15 09:48:51 +01:00
async # resolveProviderResource() {
2022-10-13 11:01:35 +02:00
const sourceMetadata = await this . sourceProvider . getMetadata ( ) ;
const destinationMetadata = await this . destinationProvider . getMetadata ( ) ;
2022-11-15 09:48:51 +01:00
if ( sourceMetadata ) {
this . # metadata . source = sourceMetadata ;
}
if ( destinationMetadata ) {
this . # metadata . destination = destinationMetadata ;
}
}
async integrityCheck ( ) : Promise < boolean > {
const { source : sourceMetadata , destination : destinationMetadata } = this . # metadata ;
2022-10-13 11:01:35 +02:00
if ( ! sourceMetadata || ! destinationMetadata ) {
return true ;
}
try {
// Version check
2022-11-15 09:48:51 +01:00
this . # assertStrapiVersionIntegrity (
2022-10-13 11:01:35 +02:00
sourceMetadata ? . strapi ? . version ,
destinationMetadata ? . strapi ? . version
) ;
return true ;
} catch ( error ) {
if ( error instanceof Error ) {
console . error ( 'Integrity checks failed:' , error . message ) ;
}
return false ;
}
}
2022-11-08 14:26:48 +01:00
async transfer ( ) : Promise < ITransferResults < S , D > > {
2022-10-13 11:01:35 +02:00
try {
2022-11-16 11:08:08 +01:00
await this . bootstrap ( ) ;
2022-11-16 11:16:14 +01:00
await this . init ( ) ;
2022-10-13 11:01:35 +02:00
const isValidTransfer = await this . integrityCheck ( ) ;
if ( ! isValidTransfer ) {
throw new Error (
` Unable to transfer the data between ${ this . sourceProvider . name } and ${ this . destinationProvider . name } . \ nPlease refer to the log above for more information. `
) ;
}
2022-11-15 09:48:51 +01:00
// Run the transfer stages
2022-11-03 10:12:16 +02:00
await this . transferSchemas ( ) ;
2022-11-08 15:23:14 +01:00
await this . transferEntities ( ) ;
2022-10-13 11:01:35 +02:00
await this . transferMedia ( ) ;
await this . transferLinks ( ) ;
await this . transferConfiguration ( ) ;
2022-11-15 09:48:51 +01:00
// Gracefully close the providers
2022-10-13 11:01:35 +02:00
await this . close ( ) ;
2022-11-08 15:23:14 +01:00
} catch ( e : any ) {
2022-11-09 15:27:17 +01:00
throw e ;
2022-10-13 11:01:35 +02:00
// Rollback the destination provider if an exception is thrown during the transfer
// Note: This will be configurable in the future
// await this.destinationProvider?.rollback(e);
}
2022-11-08 14:26:48 +01:00
return {
source : this.sourceProvider.results ,
destination : this.destinationProvider.results ,
} ;
2022-10-13 11:01:35 +02:00
}
2022-11-03 10:12:16 +02:00
async transferSchemas ( ) : Promise < void > {
2022-11-15 14:30:42 +01:00
const stageName : TransferStage = 'schemas' ;
2022-11-03 10:12:16 +02:00
const inStream = await this . sourceProvider . streamSchemas ? . ( ) ;
const outStream = await this . destinationProvider . getSchemasStream ? . ( ) ;
2022-11-07 11:10:05 +02:00
2022-11-08 09:41:53 +01:00
if ( ! inStream ) {
throw new Error ( 'Unable to transfer schemas, source stream is missing' ) ;
}
if ( ! outStream ) {
throw new Error ( 'Unable to transfer schemas, destination stream is missing' ) ;
2022-11-03 10:12:16 +02:00
}
2022-11-15 14:30:42 +01:00
this . # updateStage ( 'start' , stageName ) ;
2022-11-03 10:12:16 +02:00
return new Promise ( ( resolve , reject ) = > {
inStream
// Throw on error in the source
. on ( 'error' , reject ) ;
outStream
// Throw on error in the destination
2022-11-08 09:41:53 +01:00
. on ( 'error' , reject )
2022-11-03 10:12:16 +02:00
// Resolve the promise when the destination has finished reading all the data from the source
2022-11-11 13:41:01 +01:00
. on ( 'close' , ( ) = > {
2022-11-15 14:30:42 +01:00
this . # updateStage ( 'complete' , stageName ) ;
2022-11-11 13:41:01 +01:00
resolve ( ) ;
} ) ;
2022-11-03 10:12:16 +02:00
2022-11-15 14:30:42 +01:00
inStream . pipe ( this . # countRecorder ( stageName ) ) . pipe ( outStream ) ;
2022-11-03 10:12:16 +02:00
} ) ;
}
2022-10-13 11:01:35 +02:00
async transferEntities ( ) : Promise < void > {
2022-11-15 14:30:42 +01:00
const stageName : TransferStage = 'entities' ;
2022-10-24 17:11:59 +02:00
const inStream = await this . sourceProvider . streamEntities ? . ( ) ;
const outStream = await this . destinationProvider . getEntitiesStream ? . ( ) ;
2022-10-31 10:21:23 +01:00
if ( ! inStream ) {
throw new Error ( 'Unable to transfer entities, source stream is missing' ) ;
}
2022-11-08 09:41:53 +01:00
2022-10-31 10:21:23 +01:00
if ( ! outStream ) {
throw new Error ( 'Unable to transfer entities, destination stream is missing' ) ;
2022-10-24 17:11:59 +02:00
}
2022-11-15 14:30:42 +01:00
this . # updateStage ( 'start' , stageName ) ;
2022-11-11 13:41:01 +01:00
2022-10-24 17:11:59 +02:00
return new Promise ( ( resolve , reject ) = > {
inStream
// Throw on error in the source
. on ( 'error' , ( e ) = > {
reject ( e ) ;
} ) ;
outStream
// Throw on error in the destination
. on ( 'error' , ( e ) = > {
reject ( e ) ;
} )
// Resolve the promise when the destination has finished reading all the data from the source
2022-11-11 13:41:01 +01:00
. on ( 'close' , ( ) = > {
2022-11-15 14:30:42 +01:00
this . # updateStage ( 'complete' , stageName ) ;
2022-11-11 13:41:01 +01:00
resolve ( ) ;
} ) ;
2022-10-24 17:11:59 +02:00
2022-11-15 14:30:42 +01:00
inStream . pipe ( this . # countRecorder ( stageName , 'type' ) ) . pipe ( outStream ) ;
2022-10-24 17:11:59 +02:00
} ) ;
2022-10-13 11:01:35 +02:00
}
async transferLinks ( ) : Promise < void > {
2022-11-15 14:30:42 +01:00
const stageName : TransferStage = 'links' ;
2022-10-24 17:11:59 +02:00
const inStream = await this . sourceProvider . streamLinks ? . ( ) ;
const outStream = await this . destinationProvider . getLinksStream ? . ( ) ;
2022-10-31 10:21:23 +01:00
if ( ! inStream ) {
throw new Error ( 'Unable to transfer links, source stream is missing' ) ;
}
2022-11-08 09:41:53 +01:00
2022-10-31 10:21:23 +01:00
if ( ! outStream ) {
throw new Error ( 'Unable to transfer links, destination stream is missing' ) ;
2022-10-24 17:11:59 +02:00
}
2022-11-15 14:30:42 +01:00
this . # updateStage ( 'start' , 'links' ) ;
2022-11-11 13:41:01 +01:00
2022-10-24 17:11:59 +02:00
return new Promise ( ( resolve , reject ) = > {
inStream
// Throw on error in the source
. on ( 'error' , reject ) ;
outStream
// Throw on error in the destination
. on ( 'error' , reject )
// Resolve the promise when the destination has finished reading all the data from the source
2022-11-11 13:41:01 +01:00
. on ( 'close' , ( ) = > {
2022-11-15 14:30:42 +01:00
this . # updateStage ( 'complete' , stageName ) ;
2022-11-11 13:41:01 +01:00
resolve ( ) ;
} ) ;
2022-10-24 17:11:59 +02:00
2022-11-15 14:30:42 +01:00
inStream . pipe ( this . # countRecorder ( stageName ) ) . pipe ( outStream ) ;
2022-10-24 17:11:59 +02:00
} ) ;
2022-10-13 11:01:35 +02:00
}
async transferMedia ( ) : Promise < void > {
2022-11-15 14:30:42 +01:00
const stageName : TransferStage = 'media' ;
this . # updateStage ( 'start' , stageName ) ;
2022-11-08 09:41:53 +01:00
console . warn ( 'transferMedia not yet implemented' ) ;
2022-11-11 13:41:01 +01:00
return new Promise ( ( resolve ) = >
( ( ) = > {
2022-11-15 14:30:42 +01:00
this . # updateStage ( 'complete' , stageName ) ;
2022-11-11 13:41:01 +01:00
resolve ( ) ;
} ) ( )
) ;
2022-10-13 11:01:35 +02:00
}
async transferConfiguration ( ) : Promise < void > {
2022-11-15 14:30:42 +01:00
const stageName : TransferStage = 'configuration' ;
2022-11-02 17:26:32 +01:00
const inStream = await this . sourceProvider . streamConfiguration ? . ( ) ;
const outStream = await this . destinationProvider . getConfigurationStream ? . ( ) ;
if ( ! inStream ) {
throw new Error ( 'Unable to transfer configuration, source stream is missing' ) ;
}
2022-11-08 09:41:53 +01:00
2022-11-02 17:26:32 +01:00
if ( ! outStream ) {
throw new Error ( 'Unable to transfer configuration, destination stream is missing' ) ;
}
2022-11-15 14:30:42 +01:00
this . # updateStage ( 'start' , stageName ) ;
2022-11-11 13:41:01 +01:00
2022-11-02 17:26:32 +01:00
return new Promise ( ( resolve , reject ) = > {
inStream
// Throw on error in the source
. on ( 'error' , reject ) ;
outStream
// Throw on error in the destination
. on ( 'error' , reject )
// Resolve the promise when the destination has finished reading all the data from the source
2022-11-11 13:41:01 +01:00
. on ( 'close' , ( ) = > {
2022-11-15 14:30:42 +01:00
this . # updateStage ( 'complete' , stageName ) ;
2022-11-11 13:41:01 +01:00
resolve ( ) ;
} ) ;
2022-11-02 17:26:32 +01:00
2022-11-15 14:30:42 +01:00
inStream . pipe ( this . # countRecorder ( stageName ) ) . pipe ( outStream ) ;
2022-11-02 17:26:32 +01:00
} ) ;
2022-10-13 11:01:35 +02:00
}
}
2022-10-13 16:52:16 +02:00
2022-11-08 14:26:48 +01:00
export const createTransferEngine = < S extends ISourceProvider , D extends IDestinationProvider > (
sourceProvider : S ,
destinationProvider : D ,
2022-10-13 16:52:16 +02:00
options : ITransferEngineOptions
2022-11-08 14:26:48 +01:00
) : TransferEngine < S , D > = > {
return new TransferEngine < S , D > ( sourceProvider , destinationProvider , options ) ;
2022-10-13 16:52:16 +02:00
} ;