2022-11-11 13:41:01 +01:00
import { PassThrough } from 'stream-chain' ;
2022-11-28 16:54:36 +01:00
import { isEmpty , uniq } from 'lodash/fp' ;
2022-10-24 17:11:59 +02:00
import type {
2022-11-14 17:35:21 +01:00
Diff ,
2022-10-13 11:01:35 +02:00
IDestinationProvider ,
2022-11-15 09:48:51 +01:00
IMetadata ,
2022-10-13 11:01:35 +02:00
ISourceProvider ,
ITransferEngine ,
ITransferEngineOptions ,
2022-11-08 14:26:48 +01:00
ITransferResults ,
2022-11-11 14:24:30 +01:00
TransferStage ,
2022-10-13 11:01:35 +02:00
} from '../../types' ;
2022-11-15 14:15:25 +01:00
import compareSchemas from '../strategies' ;
2022-11-28 16:54:36 +01:00
// eslint-disable-next-line @typescript-eslint/no-var-requires
const semverDiff = require ( 'semver/functions/diff' ) ;
2022-10-13 11:01:35 +02:00
2022-11-11 13:41:01 +01:00
type TransferProgress = {
2022-11-11 14:24:30 +01:00
[ key in TransferStage ] ? : {
2022-11-11 13:41:01 +01:00
count : number ;
2022-11-16 11:04:40 +01:00
bytes : number ;
2022-11-11 16:14:21 +01:00
aggregates ? : {
[ key : string ] : {
count : number ;
2022-11-16 11:04:40 +01:00
bytes : number ;
2022-11-11 16:14:21 +01:00
} ;
} ;
2022-11-11 13:41:01 +01:00
} ;
} ;
2022-11-15 18:42:34 +01:00
type TransferEngineProgress = {
data : any ;
stream : PassThrough ;
} ;
2022-11-08 14:26:48 +01:00
class TransferEngine <
S extends ISourceProvider = ISourceProvider ,
D extends IDestinationProvider = IDestinationProvider
> implements ITransferEngine
{
2022-10-13 11:01:35 +02:00
sourceProvider : ISourceProvider ;
2022-11-28 16:54:36 +01:00
2022-10-13 11:01:35 +02:00
destinationProvider : IDestinationProvider ;
2022-11-28 16:54:36 +01:00
2022-10-13 11:01:35 +02:00
options : ITransferEngineOptions ;
2022-11-28 16:54:36 +01:00
2022-11-15 09:48:51 +01:00
# metadata : { source? : IMetadata ; destination? : IMetadata } = { } ;
2022-10-13 11:01:35 +02:00
2022-11-15 18:42:34 +01:00
# transferProgress : TransferProgress = { } ;
2022-11-28 16:54:36 +01:00
2022-11-15 18:43:53 +01:00
// TODO: Type the stream chunks. Doesn't seem trivial, especially since PassThrough doesn't provide a PassThroughOptions type
2022-11-11 13:41:01 +01:00
# progressStream : PassThrough = new PassThrough ( { objectMode : true } ) ;
2022-11-28 16:54:36 +01:00
2022-11-15 18:42:34 +01:00
get progress ( ) : TransferEngineProgress {
return {
data : this. # transferProgress ,
stream : this. # progressStream ,
} ;
2022-11-11 13:41:01 +01:00
}
2022-10-13 11:01:35 +02:00
constructor (
sourceProvider : ISourceProvider ,
destinationProvider : IDestinationProvider ,
options : ITransferEngineOptions
) {
2022-11-17 21:19:33 +01:00
if ( sourceProvider . type !== 'source' ) {
throw new Error ( "SourceProvider does not have type 'source'" ) ;
}
if ( destinationProvider . type !== 'destination' ) {
2022-11-21 13:51:37 +01:00
throw new Error ( "DestinationProvider does not have type 'destination'" ) ;
2022-11-17 21:19:33 +01:00
}
2022-10-13 11:01:35 +02:00
this . sourceProvider = sourceProvider ;
this . destinationProvider = destinationProvider ;
this . options = options ;
}
2022-11-15 14:30:42 +01:00
# increaseTransferProgress ( transferStage : TransferStage , data : any , aggregateKey? : string ) {
2022-11-15 18:42:34 +01:00
if ( ! this . # transferProgress [ transferStage ] ) {
this . # transferProgress [ transferStage ] = { count : 0 , bytes : 0 } ;
2022-11-11 13:41:01 +01:00
}
2022-11-15 18:42:34 +01:00
this . # transferProgress [ transferStage ] ! . count += 1 ;
2022-11-11 16:14:21 +01:00
const size = JSON . stringify ( data ) . length ;
2022-11-15 18:42:34 +01:00
this . # transferProgress [ transferStage ] ! . bytes ! += size ;
2022-11-11 16:14:21 +01:00
2022-11-18 09:47:09 +01:00
if ( aggregateKey && data && data [ aggregateKey ] ) {
2022-11-11 16:14:21 +01:00
const aggKeyValue = data [ aggregateKey ] ;
2022-11-28 16:54:36 +01:00
if ( ! this . # transferProgress [ transferStage ] ! . aggregates ) {
2022-11-15 18:42:34 +01:00
this . # transferProgress [ transferStage ] ! . aggregates = { } ;
2022-11-11 16:14:21 +01:00
}
2022-11-18 09:47:09 +01:00
if (
! (
this . # transferProgress [ transferStage ] ! . aggregates &&
this . # transferProgress [ transferStage ] ! . aggregates ! [ aggKeyValue ]
)
) {
2022-11-15 18:42:34 +01:00
this . # transferProgress [ transferStage ] ! . aggregates ! [ aggKeyValue ] = { count : 0 , bytes : 0 } ;
2022-11-11 16:14:21 +01:00
}
2022-11-15 18:42:34 +01:00
this . # transferProgress [ transferStage ] ! . aggregates ! [ aggKeyValue ] . count += 1 ;
this . # transferProgress [ transferStage ] ! . aggregates ! [ aggKeyValue ] . bytes ! += size ;
2022-11-11 16:14:21 +01:00
}
2022-11-11 13:41:01 +01:00
}
2022-11-15 14:30:42 +01:00
# countRecorder = ( transferStage : TransferStage , aggregateKey? : string ) = > {
2022-11-11 13:41:01 +01:00
return new PassThrough ( {
objectMode : true ,
2022-11-15 14:30:42 +01:00
transform : ( data , _encoding , callback ) = > {
this . # increaseTransferProgress ( transferStage , data , aggregateKey ) ;
2022-11-15 18:23:31 +01:00
this . # updateStage ( 'progress' , transferStage ) ;
2022-11-11 15:25:19 +01:00
callback ( null , data ) ;
2022-11-11 13:41:01 +01:00
} ,
} ) ;
} ;
2022-11-15 18:23:31 +01:00
# updateStage = ( type : 'start' | 'complete' | 'progress' , transferStage : TransferStage ) = > {
2022-11-15 18:42:34 +01:00
this . # progressStream . emit ( type , {
data : this. # transferProgress ,
2022-11-15 18:23:31 +01:00
stage : transferStage ,
2022-11-11 13:41:01 +01:00
} ) ;
} ;
2022-11-15 09:48:51 +01:00
# assertStrapiVersionIntegrity ( sourceVersion? : string , destinationVersion? : string ) {
2022-10-13 11:01:35 +02:00
const strategy = this . options . versionMatching ;
2022-11-23 12:07:20 +01:00
if (
! sourceVersion ||
! destinationVersion ||
strategy === 'ignore' ||
2022-11-23 14:08:47 +01:00
destinationVersion === sourceVersion
2022-11-23 12:07:20 +01:00
) {
2022-10-13 11:01:35 +02:00
return ;
}
2022-11-23 14:08:47 +01:00
let diff ;
try {
diff = semverDiff ( sourceVersion , destinationVersion ) ;
} catch ( e : unknown ) {
throw new Error (
` Strapi versions doesn't match ( ${ strategy } check): ${ sourceVersion } does not match with ${ destinationVersion } `
) ;
}
2022-11-23 12:07:20 +01:00
if ( ! diff ) {
2022-10-13 11:01:35 +02:00
return ;
}
2022-11-23 12:07:20 +01:00
const validPatch = [ 'prelease' , 'build' ] ;
const validMinor = [ . . . validPatch , 'patch' , 'prepatch' ] ;
const validMajor = [ . . . validMinor , 'minor' , 'preminor' ] ;
if ( strategy === 'patch' && validPatch . includes ( diff ) ) {
2022-10-13 11:01:35 +02:00
return ;
}
2022-11-23 12:07:20 +01:00
if ( strategy === 'minor' && validMinor . includes ( diff ) ) {
return ;
}
if ( strategy === 'major' && validMajor . includes ( diff ) ) {
2022-10-13 11:01:35 +02:00
return ;
}
throw new Error (
2022-11-14 17:35:21 +01:00
` Strapi versions doesn't match ( ${ strategy } check): ${ sourceVersion } does not match with ${ destinationVersion } `
2022-10-13 11:01:35 +02:00
) ;
}
2022-11-14 17:35:21 +01:00
# assertSchemasMatching ( sourceSchemas : any , destinationSchemas : any ) {
2022-11-15 14:15:25 +01:00
const strategy = this . options . schemasMatching || 'strict' ;
2022-11-14 17:35:21 +01:00
const keys = uniq ( Object . keys ( sourceSchemas ) . concat ( Object . keys ( destinationSchemas ) ) ) ;
const diffs : { [ key : string ] : Diff [ ] } = { } ;
keys . forEach ( ( key ) = > {
const sourceSchema = sourceSchemas [ key ] ;
const destinationSchema = destinationSchemas [ key ] ;
2022-11-15 14:15:25 +01:00
const schemaDiffs = compareSchemas ( sourceSchema , destinationSchema , strategy ) ;
2022-11-14 17:35:21 +01:00
2022-11-15 14:15:25 +01:00
if ( schemaDiffs . length ) {
diffs [ key ] = schemaDiffs ;
2022-11-14 17:35:21 +01:00
}
} ) ;
if ( ! isEmpty ( diffs ) ) {
throw new Error (
` Import process failed because the project doesn't have a matching data structure
2022-11-28 13:24:34 +01:00
$ { JSON . stringify ( diffs , null , 2 ) }
2022-11-14 17:35:21 +01:00
`
) ;
}
}
2022-11-16 11:16:14 +01:00
async init ( ) : Promise < void > {
// Resolve providers' resource and store
// them in the engine's internal state
await this . # resolveProviderResource ( ) ;
// Update the destination provider's source metadata
const { source : sourceMetadata } = this . # metadata ;
if ( sourceMetadata ) {
this . destinationProvider . setMetadata ? . ( 'source' , sourceMetadata ) ;
}
}
2022-11-16 11:08:08 +01:00
async bootstrap ( ) : Promise < void > {
2022-11-17 20:06:53 +01:00
await Promise . all ( [ this . sourceProvider . bootstrap ? . ( ) , this . destinationProvider . bootstrap ? . ( ) ] ) ;
2022-10-13 11:01:35 +02:00
}
async close ( ) : Promise < void > {
2022-11-17 20:06:53 +01:00
await Promise . all ( [ this . sourceProvider . close ? . ( ) , this . destinationProvider . close ? . ( ) ] ) ;
2022-10-13 11:01:35 +02:00
}
2022-11-15 09:48:51 +01:00
async # resolveProviderResource() {
2022-10-13 11:01:35 +02:00
const sourceMetadata = await this . sourceProvider . getMetadata ( ) ;
const destinationMetadata = await this . destinationProvider . getMetadata ( ) ;
2022-11-15 09:48:51 +01:00
if ( sourceMetadata ) {
this . # metadata . source = sourceMetadata ;
}
if ( destinationMetadata ) {
this . # metadata . destination = destinationMetadata ;
}
}
async integrityCheck ( ) : Promise < boolean > {
2022-11-14 17:35:21 +01:00
try {
const sourceMetadata = await this . sourceProvider . getMetadata ( ) ;
const destinationMetadata = await this . destinationProvider . getMetadata ( ) ;
2022-11-15 09:48:51 +01:00
2022-11-14 17:35:21 +01:00
if ( sourceMetadata && destinationMetadata ) {
this . # assertStrapiVersionIntegrity (
sourceMetadata ? . strapi ? . version ,
destinationMetadata ? . strapi ? . version
) ;
}
2022-11-15 09:48:51 +01:00
2022-11-14 17:35:21 +01:00
const sourceSchemas = await this . sourceProvider . getSchemas ? . ( ) ;
const destinationSchemas = await this . destinationProvider . getSchemas ? . ( ) ;
2022-10-13 11:01:35 +02:00
2022-11-14 17:35:21 +01:00
if ( sourceSchemas && destinationSchemas ) {
this . # assertSchemasMatching ( sourceSchemas , destinationSchemas ) ;
}
2022-10-13 11:01:35 +02:00
return true ;
} catch ( error ) {
return false ;
}
}
2022-11-08 14:26:48 +01:00
async transfer ( ) : Promise < ITransferResults < S , D > > {
2022-10-13 11:01:35 +02:00
try {
2022-11-16 11:08:08 +01:00
await this . bootstrap ( ) ;
2022-11-16 11:16:14 +01:00
await this . init ( ) ;
2022-10-13 11:01:35 +02:00
const isValidTransfer = await this . integrityCheck ( ) ;
if ( ! isValidTransfer ) {
2022-11-28 13:24:34 +01:00
// TODO: provide the log from the integrity check
2022-10-13 11:01:35 +02:00
throw new Error (
` Unable to transfer the data between ${ this . sourceProvider . name } and ${ this . destinationProvider . name } . \ nPlease refer to the log above for more information. `
) ;
}
2022-12-02 11:55:39 +02:00
await this . beforeTransfer ( ) ;
2022-11-15 09:48:51 +01:00
// Run the transfer stages
2022-11-03 10:12:16 +02:00
await this . transferSchemas ( ) ;
2022-11-08 09:41:53 +01:00
await this . transferEntities ( ) ;
2022-11-24 16:07:07 +01:00
await this . transferAssets ( ) ;
2022-10-13 11:01:35 +02:00
await this . transferLinks ( ) ;
await this . transferConfiguration ( ) ;
2022-11-15 09:48:51 +01:00
// Gracefully close the providers
2022-10-13 11:01:35 +02:00
await this . close ( ) ;
2022-11-17 14:32:28 +01:00
} catch ( e : unknown ) {
2022-10-13 11:01:35 +02:00
// Rollback the destination provider if an exception is thrown during the transfer
// Note: This will be configurable in the future
2022-11-17 14:32:28 +01:00
await this . destinationProvider . rollback ? . ( e as Error ) ;
2022-11-18 14:41:57 +01:00
throw e ;
2022-10-13 11:01:35 +02:00
}
2022-11-08 14:26:48 +01:00
return {
source : this.sourceProvider.results ,
destination : this.destinationProvider.results ,
2022-11-25 10:33:43 +01:00
engine : this. # transferProgress ,
2022-11-08 14:26:48 +01:00
} ;
2022-10-13 11:01:35 +02:00
}
2022-12-02 11:55:39 +02:00
async beforeTransfer ( ) : Promise < void > {
2022-11-18 15:14:11 +02:00
await this . sourceProvider . beforeStreaming ? . ( ) ;
await this . destinationProvider . beforeStreaming ? . ( ) ;
2022-11-16 15:00:29 +02:00
}
2022-11-03 10:12:16 +02:00
async transferSchemas ( ) : Promise < void > {
2022-11-15 14:30:42 +01:00
const stageName : TransferStage = 'schemas' ;
2022-11-08 09:41:53 +01:00
2022-11-15 20:58:50 +01:00
const inStream = await this . sourceProvider . streamSchemas ? . ( ) ;
2022-11-08 09:41:53 +01:00
if ( ! inStream ) {
2022-11-15 20:58:50 +01:00
return ;
2022-11-08 09:41:53 +01:00
}
2022-11-15 20:58:50 +01:00
const outStream = await this . destinationProvider . getSchemasStream ? . ( ) ;
2022-11-08 09:41:53 +01:00
if ( ! outStream ) {
2022-11-15 20:58:50 +01:00
return ;
2022-11-03 10:12:16 +02:00
}
2022-11-15 14:30:42 +01:00
this . # updateStage ( 'start' , stageName ) ;
2022-11-03 10:12:16 +02:00
return new Promise ( ( resolve , reject ) = > {
inStream
// Throw on error in the source
. on ( 'error' , reject ) ;
outStream
// Throw on error in the destination
2022-11-08 09:41:53 +01:00
. on ( 'error' , reject )
2022-11-03 10:12:16 +02:00
// Resolve the promise when the destination has finished reading all the data from the source
2022-11-11 13:41:01 +01:00
. on ( 'close' , ( ) = > {
2022-11-15 14:30:42 +01:00
this . # updateStage ( 'complete' , stageName ) ;
2022-11-11 13:41:01 +01:00
resolve ( ) ;
} ) ;
2022-11-03 10:12:16 +02:00
2022-11-15 14:30:42 +01:00
inStream . pipe ( this . # countRecorder ( stageName ) ) . pipe ( outStream ) ;
2022-11-03 10:12:16 +02:00
} ) ;
}
2022-10-13 11:01:35 +02:00
async transferEntities ( ) : Promise < void > {
2022-11-15 14:30:42 +01:00
const stageName : TransferStage = 'entities' ;
2022-10-24 17:11:59 +02:00
2022-11-15 20:58:50 +01:00
const inStream = await this . sourceProvider . streamEntities ? . ( ) ;
2022-10-31 10:21:23 +01:00
if ( ! inStream ) {
2022-11-15 20:58:50 +01:00
return ;
2022-10-31 10:21:23 +01:00
}
2022-11-08 09:41:53 +01:00
2022-11-15 20:58:50 +01:00
const outStream = await this . destinationProvider . getEntitiesStream ? . ( ) ;
2022-10-31 10:21:23 +01:00
if ( ! outStream ) {
2022-11-15 20:58:50 +01:00
return ;
2022-10-24 17:11:59 +02:00
}
2022-11-15 14:30:42 +01:00
this . # updateStage ( 'start' , stageName ) ;
2022-11-11 13:41:01 +01:00
2022-10-24 17:11:59 +02:00
return new Promise ( ( resolve , reject ) = > {
inStream
// Throw on error in the source
. on ( 'error' , ( e ) = > {
reject ( e ) ;
} ) ;
outStream
// Throw on error in the destination
. on ( 'error' , ( e ) = > {
reject ( e ) ;
} )
// Resolve the promise when the destination has finished reading all the data from the source
2022-11-11 13:41:01 +01:00
. on ( 'close' , ( ) = > {
2022-11-15 14:30:42 +01:00
this . # updateStage ( 'complete' , stageName ) ;
2022-11-11 13:41:01 +01:00
resolve ( ) ;
} ) ;
2022-10-24 17:11:59 +02:00
2022-11-15 14:30:42 +01:00
inStream . pipe ( this . # countRecorder ( stageName , 'type' ) ) . pipe ( outStream ) ;
2022-10-24 17:11:59 +02:00
} ) ;
2022-10-13 11:01:35 +02:00
}
async transferLinks ( ) : Promise < void > {
2022-11-15 14:30:42 +01:00
const stageName : TransferStage = 'links' ;
2022-10-24 17:11:59 +02:00
2022-11-15 20:58:50 +01:00
const inStream = await this . sourceProvider . streamLinks ? . ( ) ;
2022-10-31 10:21:23 +01:00
if ( ! inStream ) {
2022-11-15 20:58:50 +01:00
return ;
2022-10-31 10:21:23 +01:00
}
2022-11-08 09:41:53 +01:00
2022-11-15 20:58:50 +01:00
const outStream = await this . destinationProvider . getLinksStream ? . ( ) ;
2022-10-31 10:21:23 +01:00
if ( ! outStream ) {
2022-11-15 20:58:50 +01:00
return ;
2022-10-24 17:11:59 +02:00
}
2022-11-15 14:30:42 +01:00
this . # updateStage ( 'start' , 'links' ) ;
2022-11-11 13:41:01 +01:00
2022-10-24 17:11:59 +02:00
return new Promise ( ( resolve , reject ) = > {
inStream
// Throw on error in the source
. on ( 'error' , reject ) ;
outStream
// Throw on error in the destination
. on ( 'error' , reject )
// Resolve the promise when the destination has finished reading all the data from the source
2022-11-11 13:41:01 +01:00
. on ( 'close' , ( ) = > {
2022-11-15 14:30:42 +01:00
this . # updateStage ( 'complete' , stageName ) ;
2022-11-11 13:41:01 +01:00
resolve ( ) ;
} ) ;
2022-10-24 17:11:59 +02:00
2022-11-15 14:30:42 +01:00
inStream . pipe ( this . # countRecorder ( stageName ) ) . pipe ( outStream ) ;
2022-10-24 17:11:59 +02:00
} ) ;
2022-10-13 11:01:35 +02:00
}
2022-11-24 16:07:07 +01:00
async transferAssets ( ) : Promise < void > {
const stageName : TransferStage = 'assets' ;
const inStream = await this . sourceProvider . streamAssets ? . ( ) ;
2022-11-21 18:55:51 +01:00
if ( ! inStream ) {
2022-11-30 09:19:15 +01:00
return ;
2022-11-21 18:55:51 +01:00
}
2022-11-30 09:19:15 +01:00
const outStream = await this . destinationProvider . getAssetsStream ? . ( ) ;
2022-11-21 18:55:51 +01:00
if ( ! outStream ) {
2022-11-30 09:19:15 +01:00
return ;
2022-11-21 18:55:51 +01:00
}
2022-11-15 14:30:42 +01:00
this . # updateStage ( 'start' , stageName ) ;
2022-11-21 18:55:51 +01:00
return new Promise ( ( resolve , reject ) = > {
inStream
// Throw on error in the source
. on ( 'error' , reject ) ;
outStream
// Throw on error in the destination
. on ( 'error' , reject )
// Resolve the promise when the destination has finished reading all the data from the source
. on ( 'close' , ( ) = > {
this . # updateStage ( 'complete' , stageName ) ;
resolve ( ) ;
} ) ;
inStream . pipe ( this . # countRecorder ( stageName ) ) . pipe ( outStream ) ;
} ) ;
2022-10-13 11:01:35 +02:00
}
async transferConfiguration ( ) : Promise < void > {
2022-11-15 14:30:42 +01:00
const stageName : TransferStage = 'configuration' ;
2022-11-02 17:26:32 +01:00
2022-11-15 20:58:50 +01:00
const inStream = await this . sourceProvider . streamConfiguration ? . ( ) ;
2022-11-02 17:26:32 +01:00
if ( ! inStream ) {
2022-11-15 20:58:50 +01:00
return ;
2022-11-02 17:26:32 +01:00
}
2022-11-08 09:41:53 +01:00
2022-11-15 20:58:50 +01:00
const outStream = await this . destinationProvider . getConfigurationStream ? . ( ) ;
2022-11-02 17:26:32 +01:00
if ( ! outStream ) {
2022-11-15 20:58:50 +01:00
return ;
2022-11-02 17:26:32 +01:00
}
2022-11-15 14:30:42 +01:00
this . # updateStage ( 'start' , stageName ) ;
2022-11-11 13:41:01 +01:00
2022-11-02 17:26:32 +01:00
return new Promise ( ( resolve , reject ) = > {
inStream
// Throw on error in the source
. on ( 'error' , reject ) ;
outStream
// Throw on error in the destination
. on ( 'error' , reject )
// Resolve the promise when the destination has finished reading all the data from the source
2022-11-11 13:41:01 +01:00
. on ( 'close' , ( ) = > {
2022-11-15 14:30:42 +01:00
this . # updateStage ( 'complete' , stageName ) ;
2022-11-11 13:41:01 +01:00
resolve ( ) ;
} ) ;
2022-11-02 17:26:32 +01:00
2022-11-15 14:30:42 +01:00
inStream . pipe ( this . # countRecorder ( stageName ) ) . pipe ( outStream ) ;
2022-11-02 17:26:32 +01:00
} ) ;
2022-10-13 11:01:35 +02:00
}
}
2022-10-13 16:52:16 +02:00
2022-11-15 21:15:16 +01:00
export const createTransferEngine = <
S extends ISourceProvider = ISourceProvider ,
D extends IDestinationProvider = IDestinationProvider
> (
2022-11-08 14:26:48 +01:00
sourceProvider : S ,
destinationProvider : D ,
2022-10-13 16:52:16 +02:00
options : ITransferEngineOptions
2022-11-08 14:26:48 +01:00
) : TransferEngine < S , D > = > {
return new TransferEngine < S , D > ( sourceProvider , destinationProvider , options ) ;
2022-10-13 16:52:16 +02:00
} ;