2022-12-05 10:32:38 +01:00
import { PassThrough , Stream } from 'stream-chain' ;
import * as path from 'path' ;
2022-11-28 16:54:36 +01:00
import { isEmpty , uniq } from 'lodash/fp' ;
2022-12-05 10:32:38 +01:00
import type { Schema } from '@strapi/strapi' ;
2022-10-24 17:11:59 +02:00
import type {
2022-11-14 17:35:21 +01:00
Diff ,
2022-12-05 10:32:38 +01:00
IAsset ,
2022-10-13 11:01:35 +02:00
IDestinationProvider ,
2022-12-05 10:32:38 +01:00
IEntity ,
2022-11-15 09:48:51 +01:00
IMetadata ,
2022-10-13 11:01:35 +02:00
ISourceProvider ,
ITransferEngine ,
ITransferEngineOptions ,
2022-11-08 14:26:48 +01:00
ITransferResults ,
2022-11-11 14:24:30 +01:00
TransferStage ,
2022-10-13 11:01:35 +02:00
} from '../../types' ;
2022-11-15 14:15:25 +01:00
import compareSchemas from '../strategies' ;
2022-11-28 16:54:36 +01:00
// eslint-disable-next-line @typescript-eslint/no-var-requires
const semverDiff = require ( 'semver/functions/diff' ) ;
2022-10-13 11:01:35 +02:00
2022-11-11 13:41:01 +01:00
type TransferProgress = {
2022-11-11 14:24:30 +01:00
[ key in TransferStage ] ? : {
2022-11-11 13:41:01 +01:00
count : number ;
2022-11-16 11:04:40 +01:00
bytes : number ;
2022-11-11 16:14:21 +01:00
aggregates ? : {
[ key : string ] : {
count : number ;
2022-11-16 11:04:40 +01:00
bytes : number ;
2022-11-11 16:14:21 +01:00
} ;
} ;
2022-11-11 13:41:01 +01:00
} ;
} ;
2022-11-08 14:26:48 +01:00
class TransferEngine <
S extends ISourceProvider = ISourceProvider ,
D extends IDestinationProvider = IDestinationProvider
> implements ITransferEngine
{
2022-10-13 11:01:35 +02:00
sourceProvider : ISourceProvider ;
2022-11-28 16:54:36 +01:00
2022-10-13 11:01:35 +02:00
destinationProvider : IDestinationProvider ;
2022-11-28 16:54:36 +01:00
2022-10-13 11:01:35 +02:00
options : ITransferEngineOptions ;
2022-11-28 16:54:36 +01:00
2022-11-15 09:48:51 +01:00
# metadata : { source? : IMetadata ; destination? : IMetadata } = { } ;
2022-11-15 14:30:42 +01:00
2022-12-05 10:32:38 +01:00
progress : {
data : TransferProgress ;
stream : PassThrough ;
} ;
2022-11-11 13:41:01 +01:00
2022-10-13 11:01:35 +02:00
constructor (
sourceProvider : ISourceProvider ,
destinationProvider : IDestinationProvider ,
options : ITransferEngineOptions
) {
2022-11-17 21:19:33 +01:00
if ( sourceProvider . type !== 'source' ) {
throw new Error ( "SourceProvider does not have type 'source'" ) ;
}
if ( destinationProvider . type !== 'destination' ) {
2022-11-21 13:51:37 +01:00
throw new Error ( "DestinationProvider does not have type 'destination'" ) ;
2022-11-17 21:19:33 +01:00
}
2022-10-13 11:01:35 +02:00
this . sourceProvider = sourceProvider ;
this . destinationProvider = destinationProvider ;
this . options = options ;
2022-12-05 10:32:38 +01:00
this . progress = { data : { } , stream : new PassThrough ( ) } ;
2022-10-13 11:01:35 +02:00
}
2022-12-05 10:32:38 +01:00
# updateTransferProgress < T = unknown > (
stage : TransferStage ,
data : T ,
aggregate ? : {
size ? : ( value : T ) = > number ;
key ? : ( value : T ) = > string ;
}
) {
if ( ! this . progress . data [ stage ] ) {
this . progress . data [ stage ] = { count : 0 , bytes : 0 } ;
2022-11-11 13:41:01 +01:00
}
2022-12-05 10:32:38 +01:00
const stageProgress = this . progress . data [ stage ] ! ;
const size = aggregate ? . size ? . ( data ) ? ? JSON . stringify ( data ) . length ;
const key = aggregate ? . key ? . ( data ) ;
stageProgress . count += 1 ;
stageProgress . bytes += size ;
// Handle aggregate updates if necessary
if ( key ) {
if ( ! stageProgress . aggregates ) {
stageProgress . aggregates = { } ;
2022-11-11 16:14:21 +01:00
}
2022-12-05 10:32:38 +01:00
const { aggregates } = stageProgress ;
if ( ! aggregates [ key ] ) {
aggregates [ key ] = { count : 0 , bytes : 0 } ;
2022-11-11 16:14:21 +01:00
}
2022-12-05 10:32:38 +01:00
aggregates [ key ] . count += 1 ;
aggregates [ key ] . bytes += size ;
2022-11-11 16:14:21 +01:00
}
2022-11-11 13:41:01 +01:00
}
2022-12-05 10:32:38 +01:00
# progressTracker (
stage : TransferStage ,
aggregate ? : {
size ? ( value : unknown ) : number ;
key ? ( value : unknown ) : string ;
}
) {
2022-11-11 13:41:01 +01:00
return new PassThrough ( {
objectMode : true ,
2022-11-15 14:30:42 +01:00
transform : ( data , _encoding , callback ) = > {
2022-12-05 10:32:38 +01:00
this . # updateTransferProgress ( stage , data , aggregate ) ;
this . # emitStageUpdate ( 'progress' , stage ) ;
2022-11-11 15:25:19 +01:00
callback ( null , data ) ;
2022-11-11 13:41:01 +01:00
} ,
} ) ;
2022-12-05 10:32:38 +01:00
}
2022-11-11 13:41:01 +01:00
2022-12-05 10:32:38 +01:00
# emitStageUpdate ( type : 'start' | 'complete' | 'progress' , transferStage : TransferStage ) {
this . progress . stream . emit ( type , {
data : this.progress.data ,
2022-11-15 18:23:31 +01:00
stage : transferStage ,
2022-11-11 13:41:01 +01:00
} ) ;
2022-12-05 10:32:38 +01:00
}
2022-11-11 13:41:01 +01:00
2022-11-15 09:48:51 +01:00
# assertStrapiVersionIntegrity ( sourceVersion? : string , destinationVersion? : string ) {
2022-10-13 11:01:35 +02:00
const strategy = this . options . versionMatching ;
2022-11-23 12:07:20 +01:00
if (
! sourceVersion ||
! destinationVersion ||
strategy === 'ignore' ||
2022-11-23 14:08:47 +01:00
destinationVersion === sourceVersion
2022-11-23 12:07:20 +01:00
) {
2022-10-13 11:01:35 +02:00
return ;
}
2022-11-23 14:08:47 +01:00
let diff ;
try {
diff = semverDiff ( sourceVersion , destinationVersion ) ;
} catch ( e : unknown ) {
throw new Error (
` Strapi versions doesn't match ( ${ strategy } check): ${ sourceVersion } does not match with ${ destinationVersion } `
) ;
}
2022-11-23 12:07:20 +01:00
if ( ! diff ) {
2022-10-13 11:01:35 +02:00
return ;
}
2022-11-23 12:07:20 +01:00
const validPatch = [ 'prelease' , 'build' ] ;
const validMinor = [ . . . validPatch , 'patch' , 'prepatch' ] ;
const validMajor = [ . . . validMinor , 'minor' , 'preminor' ] ;
if ( strategy === 'patch' && validPatch . includes ( diff ) ) {
2022-10-13 11:01:35 +02:00
return ;
}
2022-11-23 12:07:20 +01:00
if ( strategy === 'minor' && validMinor . includes ( diff ) ) {
return ;
}
if ( strategy === 'major' && validMajor . includes ( diff ) ) {
2022-10-13 11:01:35 +02:00
return ;
}
throw new Error (
2022-11-14 17:35:21 +01:00
` Strapi versions doesn't match ( ${ strategy } check): ${ sourceVersion } does not match with ${ destinationVersion } `
2022-10-13 11:01:35 +02:00
) ;
}
2022-11-14 17:35:21 +01:00
# assertSchemasMatching ( sourceSchemas : any , destinationSchemas : any ) {
2022-11-15 14:15:25 +01:00
const strategy = this . options . schemasMatching || 'strict' ;
2022-11-14 17:35:21 +01:00
const keys = uniq ( Object . keys ( sourceSchemas ) . concat ( Object . keys ( destinationSchemas ) ) ) ;
const diffs : { [ key : string ] : Diff [ ] } = { } ;
keys . forEach ( ( key ) = > {
const sourceSchema = sourceSchemas [ key ] ;
const destinationSchema = destinationSchemas [ key ] ;
2022-11-15 14:15:25 +01:00
const schemaDiffs = compareSchemas ( sourceSchema , destinationSchema , strategy ) ;
2022-11-14 17:35:21 +01:00
2022-11-15 14:15:25 +01:00
if ( schemaDiffs . length ) {
diffs [ key ] = schemaDiffs ;
2022-11-14 17:35:21 +01:00
}
} ) ;
if ( ! isEmpty ( diffs ) ) {
throw new Error (
` Import process failed because the project doesn't have a matching data structure
2022-11-28 13:24:34 +01:00
$ { JSON . stringify ( diffs , null , 2 ) }
2022-11-14 17:35:21 +01:00
`
) ;
}
}
2022-11-16 11:16:14 +01:00
async init ( ) : Promise < void > {
// Resolve providers' resource and store
// them in the engine's internal state
await this . # resolveProviderResource ( ) ;
// Update the destination provider's source metadata
const { source : sourceMetadata } = this . # metadata ;
if ( sourceMetadata ) {
this . destinationProvider . setMetadata ? . ( 'source' , sourceMetadata ) ;
}
}
2022-11-15 18:51:43 +01:00
async bootstrap ( ) : Promise < void > {
2022-11-17 20:06:53 +01:00
await Promise . all ( [ this . sourceProvider . bootstrap ? . ( ) , this . destinationProvider . bootstrap ? . ( ) ] ) ;
2022-10-13 11:01:35 +02:00
}
async close ( ) : Promise < void > {
2022-11-17 20:06:53 +01:00
await Promise . all ( [ this . sourceProvider . close ? . ( ) , this . destinationProvider . close ? . ( ) ] ) ;
2022-10-13 11:01:35 +02:00
}
2022-11-15 09:48:51 +01:00
async # resolveProviderResource() {
2022-10-13 11:01:35 +02:00
const sourceMetadata = await this . sourceProvider . getMetadata ( ) ;
const destinationMetadata = await this . destinationProvider . getMetadata ( ) ;
2022-11-15 09:48:51 +01:00
if ( sourceMetadata ) {
this . # metadata . source = sourceMetadata ;
}
if ( destinationMetadata ) {
this . # metadata . destination = destinationMetadata ;
}
}
async integrityCheck ( ) : Promise < boolean > {
2022-11-14 17:35:21 +01:00
try {
const sourceMetadata = await this . sourceProvider . getMetadata ( ) ;
const destinationMetadata = await this . destinationProvider . getMetadata ( ) ;
2022-11-15 09:48:51 +01:00
2022-11-14 17:35:21 +01:00
if ( sourceMetadata && destinationMetadata ) {
this . # assertStrapiVersionIntegrity (
sourceMetadata ? . strapi ? . version ,
destinationMetadata ? . strapi ? . version
) ;
}
2022-11-15 09:48:51 +01:00
2022-11-14 17:35:21 +01:00
const sourceSchemas = await this . sourceProvider . getSchemas ? . ( ) ;
const destinationSchemas = await this . destinationProvider . getSchemas ? . ( ) ;
2022-10-13 11:01:35 +02:00
2022-11-14 17:35:21 +01:00
if ( sourceSchemas && destinationSchemas ) {
this . # assertSchemasMatching ( sourceSchemas , destinationSchemas ) ;
}
2022-10-13 11:01:35 +02:00
return true ;
} catch ( error ) {
return false ;
}
}
2022-11-08 14:26:48 +01:00
async transfer ( ) : Promise < ITransferResults < S , D > > {
2022-10-13 11:01:35 +02:00
try {
2022-11-15 18:51:43 +01:00
await this . bootstrap ( ) ;
2022-11-16 11:16:14 +01:00
await this . init ( ) ;
2022-10-13 11:01:35 +02:00
const isValidTransfer = await this . integrityCheck ( ) ;
if ( ! isValidTransfer ) {
2022-11-28 13:24:34 +01:00
// TODO: provide the log from the integrity check
2022-10-13 11:01:35 +02:00
throw new Error (
` Unable to transfer the data between ${ this . sourceProvider . name } and ${ this . destinationProvider . name } . \ nPlease refer to the log above for more information. `
) ;
}
2022-12-02 11:55:39 +02:00
await this . beforeTransfer ( ) ;
2022-11-15 09:48:51 +01:00
// Run the transfer stages
2022-11-03 10:12:16 +02:00
await this . transferSchemas ( ) ;
2022-11-08 15:23:14 +01:00
await this . transferEntities ( ) ;
2022-11-24 16:07:07 +01:00
await this . transferAssets ( ) ;
2022-10-13 11:01:35 +02:00
await this . transferLinks ( ) ;
await this . transferConfiguration ( ) ;
2022-11-15 09:48:51 +01:00
// Gracefully close the providers
2022-10-13 11:01:35 +02:00
await this . close ( ) ;
2022-11-17 14:32:28 +01:00
} catch ( e : unknown ) {
2022-10-13 11:01:35 +02:00
// Rollback the destination provider if an exception is thrown during the transfer
// Note: This will be configurable in the future
2022-11-17 14:32:28 +01:00
await this . destinationProvider . rollback ? . ( e as Error ) ;
2022-11-18 14:41:57 +01:00
throw e ;
2022-10-13 11:01:35 +02:00
}
2022-11-08 14:26:48 +01:00
return {
source : this.sourceProvider.results ,
destination : this.destinationProvider.results ,
2022-12-05 10:32:38 +01:00
engine : this.progress.data ,
2022-11-08 14:26:48 +01:00
} ;
2022-10-13 11:01:35 +02:00
}
2022-12-02 11:55:39 +02:00
async beforeTransfer ( ) : Promise < void > {
2022-12-02 17:15:27 +02:00
await this . sourceProvider . beforeTransfer ? . ( ) ;
await this . destinationProvider . beforeTransfer ? . ( ) ;
2022-11-16 15:00:29 +02:00
}
2022-11-03 10:12:16 +02:00
async transferSchemas ( ) : Promise < void > {
2022-11-15 14:30:42 +01:00
const stageName : TransferStage = 'schemas' ;
2022-11-07 11:10:05 +02:00
2022-11-15 20:58:50 +01:00
const inStream = await this . sourceProvider . streamSchemas ? . ( ) ;
2022-11-08 09:41:53 +01:00
if ( ! inStream ) {
2022-11-15 20:58:50 +01:00
return ;
2022-11-08 09:41:53 +01:00
}
2022-11-15 20:58:50 +01:00
const outStream = await this . destinationProvider . getSchemasStream ? . ( ) ;
2022-11-08 09:41:53 +01:00
if ( ! outStream ) {
2022-11-15 20:58:50 +01:00
return ;
2022-11-03 10:12:16 +02:00
}
2022-12-05 10:32:38 +01:00
this . # emitStageUpdate ( 'start' , stageName ) ;
2022-11-03 10:12:16 +02:00
return new Promise ( ( resolve , reject ) = > {
inStream
// Throw on error in the source
. on ( 'error' , reject ) ;
outStream
// Throw on error in the destination
2022-11-08 09:41:53 +01:00
. on ( 'error' , reject )
2022-11-03 10:12:16 +02:00
// Resolve the promise when the destination has finished reading all the data from the source
2022-11-11 13:41:01 +01:00
. on ( 'close' , ( ) = > {
2022-12-05 10:32:38 +01:00
this . # emitStageUpdate ( 'complete' , stageName ) ;
2022-11-11 13:41:01 +01:00
resolve ( ) ;
} ) ;
2022-11-03 10:12:16 +02:00
2022-12-05 10:32:38 +01:00
inStream
. pipe ( this . # progressTracker ( stageName , { key : ( value : Schema ) = > value . modelType } ) )
. pipe ( outStream ) ;
2022-11-03 10:12:16 +02:00
} ) ;
}
2022-10-13 11:01:35 +02:00
async transferEntities ( ) : Promise < void > {
2022-11-15 14:30:42 +01:00
const stageName : TransferStage = 'entities' ;
2022-10-24 17:11:59 +02:00
2022-11-15 20:58:50 +01:00
const inStream = await this . sourceProvider . streamEntities ? . ( ) ;
2022-10-31 10:21:23 +01:00
if ( ! inStream ) {
2022-11-15 20:58:50 +01:00
return ;
2022-10-31 10:21:23 +01:00
}
2022-11-08 09:41:53 +01:00
2022-11-15 20:58:50 +01:00
const outStream = await this . destinationProvider . getEntitiesStream ? . ( ) ;
2022-10-31 10:21:23 +01:00
if ( ! outStream ) {
2022-11-15 20:58:50 +01:00
return ;
2022-10-24 17:11:59 +02:00
}
2022-12-05 10:32:38 +01:00
this . # emitStageUpdate ( 'start' , stageName ) ;
2022-11-11 13:41:01 +01:00
2022-10-24 17:11:59 +02:00
return new Promise ( ( resolve , reject ) = > {
inStream
// Throw on error in the source
. on ( 'error' , ( e ) = > {
reject ( e ) ;
} ) ;
outStream
// Throw on error in the destination
. on ( 'error' , ( e ) = > {
reject ( e ) ;
} )
// Resolve the promise when the destination has finished reading all the data from the source
2022-11-11 13:41:01 +01:00
. on ( 'close' , ( ) = > {
2022-12-05 10:32:38 +01:00
this . # emitStageUpdate ( 'complete' , stageName ) ;
2022-11-11 13:41:01 +01:00
resolve ( ) ;
} ) ;
2022-10-24 17:11:59 +02:00
2022-12-05 10:32:38 +01:00
inStream
. pipe ( this . # progressTracker ( stageName , { key : ( value : IEntity ) = > value . type } ) )
. pipe ( outStream ) ;
2022-10-24 17:11:59 +02:00
} ) ;
2022-10-13 11:01:35 +02:00
}
async transferLinks ( ) : Promise < void > {
2022-11-15 14:30:42 +01:00
const stageName : TransferStage = 'links' ;
2022-10-24 17:11:59 +02:00
2022-11-15 20:58:50 +01:00
const inStream = await this . sourceProvider . streamLinks ? . ( ) ;
2022-10-31 10:21:23 +01:00
if ( ! inStream ) {
2022-11-15 20:58:50 +01:00
return ;
2022-10-31 10:21:23 +01:00
}
2022-11-08 09:41:53 +01:00
2022-11-15 20:58:50 +01:00
const outStream = await this . destinationProvider . getLinksStream ? . ( ) ;
2022-10-31 10:21:23 +01:00
if ( ! outStream ) {
2022-11-15 20:58:50 +01:00
return ;
2022-10-24 17:11:59 +02:00
}
2022-12-05 10:32:38 +01:00
this . # emitStageUpdate ( 'start' , 'links' ) ;
2022-11-11 13:41:01 +01:00
2022-10-24 17:11:59 +02:00
return new Promise ( ( resolve , reject ) = > {
inStream
// Throw on error in the source
. on ( 'error' , reject ) ;
outStream
// Throw on error in the destination
. on ( 'error' , reject )
// Resolve the promise when the destination has finished reading all the data from the source
2022-11-11 13:41:01 +01:00
. on ( 'close' , ( ) = > {
2022-12-05 10:32:38 +01:00
this . # emitStageUpdate ( 'complete' , stageName ) ;
2022-11-11 13:41:01 +01:00
resolve ( ) ;
} ) ;
2022-10-24 17:11:59 +02:00
2022-12-05 10:32:38 +01:00
inStream . pipe ( this . # progressTracker ( stageName ) ) . pipe ( outStream ) ;
2022-10-24 17:11:59 +02:00
} ) ;
2022-10-13 11:01:35 +02:00
}
2022-11-24 16:07:07 +01:00
async transferAssets ( ) : Promise < void > {
const stageName : TransferStage = 'assets' ;
const inStream = await this . sourceProvider . streamAssets ? . ( ) ;
2022-11-21 18:55:51 +01:00
if ( ! inStream ) {
2022-11-30 09:19:15 +01:00
return ;
2022-11-21 18:55:51 +01:00
}
2022-11-30 09:19:15 +01:00
const outStream = await this . destinationProvider . getAssetsStream ? . ( ) ;
2022-11-21 18:55:51 +01:00
if ( ! outStream ) {
2022-11-30 09:19:15 +01:00
return ;
2022-11-21 18:55:51 +01:00
}
2022-12-05 10:32:38 +01:00
this . # emitStageUpdate ( 'start' , stageName ) ;
2022-11-21 18:55:51 +01:00
return new Promise ( ( resolve , reject ) = > {
inStream
// Throw on error in the source
. on ( 'error' , reject ) ;
outStream
// Throw on error in the destination
. on ( 'error' , reject )
// Resolve the promise when the destination has finished reading all the data from the source
. on ( 'close' , ( ) = > {
2022-12-05 10:32:38 +01:00
this . # emitStageUpdate ( 'complete' , stageName ) ;
2022-11-21 18:55:51 +01:00
resolve ( ) ;
} ) ;
2022-12-05 10:32:38 +01:00
inStream
. pipe (
this . # progressTracker ( stageName , {
size : ( value : IAsset ) = > value . stats . size ,
key : ( value : IAsset ) = > path . extname ( value . filename ) ,
} )
)
. pipe ( outStream ) ;
2022-11-21 18:55:51 +01:00
} ) ;
2022-10-13 11:01:35 +02:00
}
async transferConfiguration ( ) : Promise < void > {
2022-11-15 14:30:42 +01:00
const stageName : TransferStage = 'configuration' ;
2022-11-02 17:26:32 +01:00
2022-11-15 20:58:50 +01:00
const inStream = await this . sourceProvider . streamConfiguration ? . ( ) ;
2022-11-02 17:26:32 +01:00
if ( ! inStream ) {
2022-11-15 20:58:50 +01:00
return ;
2022-11-02 17:26:32 +01:00
}
2022-11-08 09:41:53 +01:00
2022-11-15 20:58:50 +01:00
const outStream = await this . destinationProvider . getConfigurationStream ? . ( ) ;
2022-11-02 17:26:32 +01:00
if ( ! outStream ) {
2022-11-15 20:58:50 +01:00
return ;
2022-11-02 17:26:32 +01:00
}
2022-12-05 10:32:38 +01:00
this . # emitStageUpdate ( 'start' , stageName ) ;
2022-11-11 13:41:01 +01:00
2022-11-02 17:26:32 +01:00
return new Promise ( ( resolve , reject ) = > {
inStream
// Throw on error in the source
. on ( 'error' , reject ) ;
outStream
// Throw on error in the destination
. on ( 'error' , reject )
// Resolve the promise when the destination has finished reading all the data from the source
2022-11-11 13:41:01 +01:00
. on ( 'close' , ( ) = > {
2022-12-05 10:32:38 +01:00
this . # emitStageUpdate ( 'complete' , stageName ) ;
2022-11-11 13:41:01 +01:00
resolve ( ) ;
} ) ;
2022-11-02 17:26:32 +01:00
2022-12-05 10:32:38 +01:00
inStream . pipe ( this . # progressTracker ( stageName ) ) . pipe ( outStream ) ;
2022-11-02 17:26:32 +01:00
} ) ;
2022-10-13 11:01:35 +02:00
}
}
2022-10-13 16:52:16 +02:00
2022-11-15 21:15:16 +01:00
export const createTransferEngine = <
S extends ISourceProvider = ISourceProvider ,
D extends IDestinationProvider = IDestinationProvider
> (
2022-11-08 14:26:48 +01:00
sourceProvider : S ,
destinationProvider : D ,
2022-10-13 16:52:16 +02:00
options : ITransferEngineOptions
2022-11-08 14:26:48 +01:00
) : TransferEngine < S , D > = > {
return new TransferEngine < S , D > ( sourceProvider , destinationProvider , options ) ;
2022-10-13 16:52:16 +02:00
} ;