2022-12-16 16:34:48 +01:00
import { PassThrough , Transform , Readable , Writable , Stream } from 'stream' ;
2022-12-12 20:37:41 +01:00
import { extname } from 'path' ;
2023-01-13 16:43:20 +01:00
import { EOL } from 'os' ;
import { isEmpty , uniq , last } from 'lodash/fp' ;
2022-12-12 20:36:26 +01:00
import { diff as semverDiff } from 'semver' ;
2022-12-12 20:37:41 +01:00
import type { Schema } from '@strapi/strapi' ;
2022-12-05 10:32:38 +01:00
2022-10-24 17:11:59 +02:00
import type {
2022-12-05 10:32:38 +01:00
IAsset ,
2022-10-13 11:01:35 +02:00
IDestinationProvider ,
2022-12-05 10:32:38 +01:00
IEntity ,
2022-11-15 09:48:51 +01:00
IMetadata ,
2022-10-13 11:01:35 +02:00
ISourceProvider ,
ITransferEngine ,
ITransferEngineOptions ,
2022-12-14 09:59:38 +01:00
TransferProgress ,
2022-11-08 14:26:48 +01:00
ITransferResults ,
2022-11-11 14:24:30 +01:00
TransferStage ,
2022-12-12 15:04:21 +01:00
TransferTransform ,
2023-01-13 16:43:20 +01:00
IProvider ,
2023-01-16 18:44:14 +01:00
TransferFilters ,
TransferFilterPreset ,
2022-10-13 11:01:35 +02:00
} from '../../types' ;
2022-12-12 15:04:21 +01:00
import type { Diff } from '../utils/json' ;
2022-10-13 11:01:35 +02:00
2023-01-13 16:43:20 +01:00
import { compareSchemas , validateProvider } from './validation' ;
2022-12-12 15:04:21 +01:00
import { filter , map } from '../utils/stream' ;
2022-10-13 11:01:35 +02:00
2023-01-13 16:43:20 +01:00
import { TransferEngineValidationError } from './errors' ;
import {
createDiagnosticReporter ,
IDiagnosticReporter ,
ErrorDiagnosticSeverity ,
} from './diagnostic' ;
2023-01-20 16:36:10 +01:00
import { DataTransferError } from '../errors' ;
2023-01-13 16:43:20 +01:00
2022-12-13 15:11:00 +01:00
export const TRANSFER_STAGES : ReadonlyArray < TransferStage > = Object . freeze ( [
2022-12-12 14:24:39 +01:00
'entities' ,
'links' ,
'assets' ,
'schemas' ,
'configuration' ,
2022-12-12 16:38:19 +01:00
] ) ;
2022-11-11 13:41:01 +01:00
2023-01-16 18:44:14 +01:00
export type TransferGroupFilter = Record < TransferFilterPreset , TransferFilters > ;
/ * *
* Preset filters for only / exclude options
* * /
export const TransferGroupPresets : TransferGroupFilter = {
content : {
links : true , // Example: content includes the entire links stage
entities : true ,
// TODO: If we need to implement filtering on a running stage, it would be done like this, but we still need to implement it
// [
// // Example: content processes the entities stage, but filters individual entities
// {
// filter(data) {
// return shouldIncludeThisData(data);
// },
// },
// ],
} ,
files : {
assets : true ,
links : true ,
} ,
config : {
configuration : true ,
} ,
} ;
2022-12-14 12:18:25 +01:00
export const DEFAULT_VERSION_STRATEGY = 'ignore' ;
export const DEFAULT_SCHEMA_STRATEGY = 'strict' ;
2022-12-12 16:51:23 +01:00
type SchemaMap = Record < string , Schema > ;
2022-11-11 13:41:01 +01:00
2022-11-08 14:26:48 +01:00
class TransferEngine <
S extends ISourceProvider = ISourceProvider ,
D extends IDestinationProvider = IDestinationProvider
> implements ITransferEngine
{
2022-10-13 11:01:35 +02:00
sourceProvider : ISourceProvider ;
2022-11-28 16:54:36 +01:00
2022-10-13 11:01:35 +02:00
destinationProvider : IDestinationProvider ;
2022-11-28 16:54:36 +01:00
2022-10-13 11:01:35 +02:00
options : ITransferEngineOptions ;
2022-11-28 16:54:36 +01:00
2022-11-15 09:48:51 +01:00
# metadata : { source? : IMetadata ; destination? : IMetadata } = { } ;
2022-11-15 14:30:42 +01:00
2022-12-05 10:32:38 +01:00
progress : {
data : TransferProgress ;
stream : PassThrough ;
} ;
2022-11-11 13:41:01 +01:00
2023-01-13 16:43:20 +01:00
diagnostics : IDiagnosticReporter ;
constructor ( sourceProvider : S , destinationProvider : D , options : ITransferEngineOptions ) {
this . diagnostics = createDiagnosticReporter ( ) ;
validateProvider ( 'source' , sourceProvider ) ;
2023-01-17 14:30:39 +02:00
validateProvider ( 'destination' , destinationProvider ) ;
2023-01-13 16:43:20 +01:00
2022-10-13 11:01:35 +02:00
this . sourceProvider = sourceProvider ;
this . destinationProvider = destinationProvider ;
this . options = options ;
2022-12-05 10:32:38 +01:00
2022-12-12 15:04:21 +01:00
this . progress = { data : { } , stream : new PassThrough ( { objectMode : true } ) } ;
}
2023-01-13 16:43:20 +01:00
/ * *
* Report a fatal error and throw it
* /
# panic ( error : Error ) {
this . # reportError ( error , 'fatal' ) ;
throw error ;
}
/ * *
* Report an error diagnostic
* /
# reportError ( error : Error , severity : ErrorDiagnosticSeverity ) {
this . diagnostics . report ( {
kind : 'error' ,
details : {
severity ,
createdAt : new Date ( ) ,
name : error.name ,
message : error.message ,
error ,
} ,
} ) ;
}
/ * *
* Report a warning diagnostic
* /
# reportWarning ( message : string , origin? : string ) {
this . diagnostics . report ( {
kind : 'warning' ,
details : { createdAt : new Date ( ) , message , origin } ,
} ) ;
}
/ * *
* Report an info diagnostic
* /
# reportInfo ( message : string , params? : unknown ) {
this . diagnostics . report ( {
kind : 'info' ,
details : { createdAt : new Date ( ) , message , params } ,
} ) ;
}
/ * *
* Create and return a transform stream based on the given stage and options .
*
* Allowed transformations includes 'filter' and 'map' .
* /
2022-12-12 15:04:21 +01:00
# createStageTransformStream < T extends TransferStage > (
key : T ,
options : { includeGlobal? : boolean } = { }
) : PassThrough | Transform {
const { includeGlobal = true } = options ;
2022-12-12 16:51:23 +01:00
const { global : globalTransforms , [ key ] : stageTransforms } = this . options ? . transforms ? ? { } ;
2022-12-12 15:04:21 +01:00
let stream = new PassThrough ( { objectMode : true } ) ;
const applyTransforms = < U > ( transforms : TransferTransform < U > [ ] = [ ] ) = > {
for ( const transform of transforms ) {
if ( 'filter' in transform ) {
stream = stream . pipe ( filter ( transform . filter ) ) ;
}
if ( 'map' in transform ) {
stream = stream . pipe ( map ( transform . map ) ) ;
}
}
} ;
if ( includeGlobal ) {
2022-12-12 16:51:23 +01:00
applyTransforms ( globalTransforms ) ;
2022-12-12 15:04:21 +01:00
}
2022-12-12 16:51:23 +01:00
applyTransforms ( stageTransforms as TransferTransform < unknown > [ ] ) ;
2022-12-12 15:04:21 +01:00
return stream ;
2022-10-13 11:01:35 +02:00
}
2023-01-13 16:43:20 +01:00
/ * *
* Update the Engine ' s transfer progress data for a given stage .
*
* Providing aggregate options enable custom computation to get the size ( bytes ) or the aggregate key associated with the data
* /
2022-12-05 10:32:38 +01:00
# updateTransferProgress < T = unknown > (
stage : TransferStage ,
data : T ,
aggregate ? : {
size ? : ( value : T ) = > number ;
key ? : ( value : T ) = > string ;
}
) {
if ( ! this . progress . data [ stage ] ) {
this . progress . data [ stage ] = { count : 0 , bytes : 0 } ;
2022-11-11 13:41:01 +01:00
}
2022-12-05 10:32:38 +01:00
2022-12-12 16:51:23 +01:00
const stageProgress = this . progress . data [ stage ] ;
if ( ! stageProgress ) {
return ;
}
2022-12-05 10:32:38 +01:00
const size = aggregate ? . size ? . ( data ) ? ? JSON . stringify ( data ) . length ;
const key = aggregate ? . key ? . ( data ) ;
stageProgress . count += 1 ;
stageProgress . bytes += size ;
// Handle aggregate updates if necessary
if ( key ) {
if ( ! stageProgress . aggregates ) {
stageProgress . aggregates = { } ;
2022-11-11 16:14:21 +01:00
}
2022-12-05 10:32:38 +01:00
const { aggregates } = stageProgress ;
if ( ! aggregates [ key ] ) {
aggregates [ key ] = { count : 0 , bytes : 0 } ;
2022-11-11 16:14:21 +01:00
}
2022-12-05 10:32:38 +01:00
aggregates [ key ] . count += 1 ;
aggregates [ key ] . bytes += size ;
2022-11-11 16:14:21 +01:00
}
2022-11-11 13:41:01 +01:00
}
2023-01-13 16:43:20 +01:00
/ * *
* Create and return a PassThrough stream .
*
* Upon writing data into it , it 'll update the Engine' s transfer progress data and trigger stage update events .
* /
2022-12-05 10:32:38 +01:00
# progressTracker (
stage : TransferStage ,
aggregate ? : {
size ? ( value : unknown ) : number ;
key ? ( value : unknown ) : string ;
}
) {
2022-11-11 13:41:01 +01:00
return new PassThrough ( {
objectMode : true ,
2022-11-15 14:30:42 +01:00
transform : ( data , _encoding , callback ) = > {
2022-12-05 10:32:38 +01:00
this . # updateTransferProgress ( stage , data , aggregate ) ;
this . # emitStageUpdate ( 'progress' , stage ) ;
2022-11-11 15:25:19 +01:00
callback ( null , data ) ;
2022-11-11 13:41:01 +01:00
} ,
} ) ;
2022-12-05 10:32:38 +01:00
}
2022-11-11 13:41:01 +01:00
2023-01-13 16:43:20 +01:00
/ * *
* Shorthand method used to trigger transfer update events to every listeners
* /
2022-12-28 16:08:08 +01:00
# emitTransferUpdate ( type : 'init' | 'start' | 'finish' | 'error' , payload? : object ) {
2022-12-12 16:31:44 +01:00
this . progress . stream . emit ( ` transfer:: ${ type } ` , payload ) ;
2022-12-12 14:24:39 +01:00
}
2023-01-13 16:43:20 +01:00
/ * *
* Shorthand method used to trigger stage update events to every listeners
* /
2022-12-12 15:36:04 +01:00
# emitStageUpdate ( type : 'start' | 'finish' | 'progress' | 'skip' , transferStage : TransferStage ) {
2022-12-12 14:24:39 +01:00
this . progress . stream . emit ( ` stage:: ${ type } ` , {
2022-12-05 10:32:38 +01:00
data : this.progress.data ,
2022-11-15 18:23:31 +01:00
stage : transferStage ,
2022-11-11 13:41:01 +01:00
} ) ;
2022-12-05 10:32:38 +01:00
}
2022-11-11 13:41:01 +01:00
2023-01-13 16:43:20 +01:00
/ * *
* Run a version check between two strapi version ( source and destination ) using the strategy given to the engine during initialization .
*
* If there is a mismatch , throws a validation error .
* /
2022-11-15 09:48:51 +01:00
# assertStrapiVersionIntegrity ( sourceVersion? : string , destinationVersion? : string ) {
2022-12-14 12:18:25 +01:00
const strategy = this . options . versionStrategy || DEFAULT_VERSION_STRATEGY ;
2022-10-13 11:01:35 +02:00
2023-01-13 16:43:20 +01:00
const reject = ( ) = > {
throw new TransferEngineValidationError (
` The source and destination provide are targeting incompatible Strapi versions (using the " ${ strategy } " strategy). The source ( ${ this . sourceProvider . name } ) version is ${ sourceVersion } and the destination ( ${ this . destinationProvider . name } ) version is ${ destinationVersion } ` ,
{
check : 'strapi.version' ,
strategy ,
versions : { source : sourceVersion , destination : destinationVersion } ,
}
) ;
} ;
2022-11-23 12:07:20 +01:00
if (
! sourceVersion ||
! destinationVersion ||
strategy === 'ignore' ||
2022-11-23 14:08:47 +01:00
destinationVersion === sourceVersion
2022-11-23 12:07:20 +01:00
) {
2022-10-13 11:01:35 +02:00
return ;
}
2022-11-23 14:08:47 +01:00
let diff ;
try {
diff = semverDiff ( sourceVersion , destinationVersion ) ;
2023-01-13 16:43:20 +01:00
} catch {
reject ( ) ;
2022-11-23 14:08:47 +01:00
}
2023-01-13 16:43:20 +01:00
2022-11-23 12:07:20 +01:00
if ( ! diff ) {
2022-10-13 11:01:35 +02:00
return ;
}
2022-11-23 12:07:20 +01:00
const validPatch = [ 'prelease' , 'build' ] ;
const validMinor = [ . . . validPatch , 'patch' , 'prepatch' ] ;
const validMajor = [ . . . validMinor , 'minor' , 'preminor' ] ;
if ( strategy === 'patch' && validPatch . includes ( diff ) ) {
2022-10-13 11:01:35 +02:00
return ;
}
2022-11-23 12:07:20 +01:00
if ( strategy === 'minor' && validMinor . includes ( diff ) ) {
return ;
}
if ( strategy === 'major' && validMajor . includes ( diff ) ) {
2022-10-13 11:01:35 +02:00
return ;
}
2023-01-13 16:43:20 +01:00
reject ( ) ;
2022-10-13 11:01:35 +02:00
}
2023-01-13 16:43:20 +01:00
/ * *
* Run a check between two set of schemas ( source and destination ) using the strategy given to the engine during initialization .
*
* If there are differences and / or incompatibilities between source and destination schemas , then throw a validation error .
* /
2022-12-12 16:51:23 +01:00
# assertSchemasMatching ( sourceSchemas : SchemaMap , destinationSchemas : SchemaMap ) {
2022-12-14 12:18:25 +01:00
const strategy = this . options . schemaStrategy || DEFAULT_SCHEMA_STRATEGY ;
2023-01-13 16:43:20 +01:00
2022-12-13 16:44:07 +01:00
if ( strategy === 'ignore' ) {
return ;
}
2022-11-14 17:35:21 +01:00
const keys = uniq ( Object . keys ( sourceSchemas ) . concat ( Object . keys ( destinationSchemas ) ) ) ;
const diffs : { [ key : string ] : Diff [ ] } = { } ;
keys . forEach ( ( key ) = > {
const sourceSchema = sourceSchemas [ key ] ;
const destinationSchema = destinationSchemas [ key ] ;
2023-01-24 16:15:55 +01:00
const schemaDiffs = compareSchemas ( destinationSchema , sourceSchema , strategy ) ;
2022-11-14 17:35:21 +01:00
2022-11-15 14:15:25 +01:00
if ( schemaDiffs . length ) {
diffs [ key ] = schemaDiffs ;
2022-11-14 17:35:21 +01:00
}
} ) ;
if ( ! isEmpty ( diffs ) ) {
2023-01-13 16:43:20 +01:00
const formattedDiffs = Object . entries ( diffs )
. map ( ( [ uid , ctDiffs ] ) = > {
let msg = ` - ${ uid } : ${ EOL } ` ;
msg += ctDiffs
. sort ( ( a , b ) = > ( a . kind > b . kind ? - 1 : 1 ) )
. map ( ( diff ) = > {
2023-01-24 16:15:55 +01:00
const path = diff . path . join ( '.' ) ;
2023-01-13 16:43:20 +01:00
if ( diff . kind === 'added' ) {
2023-01-24 16:15:55 +01:00
return ` Added " ${ path } ": " ${ diff . value } " ( ${ diff . type } ) ` ;
2023-01-13 16:43:20 +01:00
}
if ( diff . kind === 'deleted' ) {
2023-01-24 16:15:55 +01:00
return ` Removed " ${ path } " ` ;
2023-01-13 16:43:20 +01:00
}
if ( diff . kind === 'modified' ) {
2023-01-24 16:15:55 +01:00
return ` Modified " ${ path } ": " ${ diff . values [ 0 ] } " ( ${ diff . types [ 0 ] } ) => " ${ diff . values [ 1 ] } " ( ${ diff . types [ 1 ] } ) ` ;
2023-01-13 16:43:20 +01:00
}
2023-01-20 16:36:10 +01:00
throw new TransferEngineValidationError ( ` Invalid diff found for " ${ uid } " ` , {
check : ` schema on ${ uid } ` ,
} ) ;
2023-01-13 16:43:20 +01:00
} )
. map ( ( line ) = > ` - ${ line } ` )
. join ( EOL ) ;
return msg ;
} )
. join ( EOL ) ;
throw new TransferEngineValidationError (
` Invalid schema changes detected during integrity checks (using the ${ strategy } strategy). Please find a summary of the changes below: \ n ${ formattedDiffs } ` ,
{
check : 'schema.changes' ,
strategy ,
diffs ,
}
2022-11-14 17:35:21 +01:00
) ;
}
}
2023-01-16 18:44:14 +01:00
shouldSkipStage ( stage : TransferStage ) {
const { exclude , only } = this . options ;
2023-01-18 09:33:27 +01:00
// schemas must always be included
if ( stage === 'schemas' ) {
return false ;
}
2023-01-16 18:44:14 +01:00
// everything is included by default unless 'only' has been set
let included = isEmpty ( only ) ;
if ( only ? . length > 0 ) {
included = only . some ( ( transferGroup ) = > {
return TransferGroupPresets [ transferGroup ] [ stage ] ;
} ) ;
}
if ( exclude ? . length > 0 ) {
if ( included ) {
included = ! exclude . some ( ( transferGroup ) = > {
return TransferGroupPresets [ transferGroup ] [ stage ] ;
} ) ;
}
}
return ! included ;
}
2022-12-12 15:04:21 +01:00
async # transferStage ( options : {
stage : TransferStage ;
source? : Readable ;
destination? : Writable ;
transform? : PassThrough ;
tracker? : PassThrough ;
} ) {
const { stage , source , destination , transform , tracker } = options ;
2023-01-16 18:44:14 +01:00
if ( ! source || ! destination || this . shouldSkipStage ( stage ) ) {
2022-12-22 12:30:45 +01:00
// Wait until source and destination are closed
2023-01-13 16:43:20 +01:00
const results = await Promise . allSettled (
2022-12-22 12:30:45 +01:00
[ source , destination ] . map ( ( stream ) = > {
// if stream is undefined or already closed, resolve immediately
if ( ! stream || stream . destroyed ) {
return Promise . resolve ( ) ;
}
// Wait until the close event is produced and then destroy the stream and resolve
return new Promise ( ( resolve , reject ) = > {
stream . on ( 'close' , resolve ) . on ( 'error' , reject ) . destroy ( ) ;
} ) ;
} )
) ;
2023-01-13 16:43:20 +01:00
results . forEach ( ( state ) = > {
if ( state . status === 'rejected' ) {
this . # reportWarning ( state . reason , ` transfer( ${ stage } ) ` ) ;
}
} ) ;
2022-12-16 16:34:48 +01:00
this . # emitStageUpdate ( 'skip' , stage ) ;
2022-12-28 10:30:41 +01:00
2022-12-12 15:04:21 +01:00
return ;
}
this . # emitStageUpdate ( 'start' , stage ) ;
await new Promise < void > ( ( resolve , reject ) = > {
let stream : Stream = source ;
if ( transform ) {
stream = stream . pipe ( transform ) ;
}
if ( tracker ) {
stream = stream . pipe ( tracker ) ;
}
2023-01-13 16:43:20 +01:00
stream
. pipe ( destination )
. on ( 'error' , ( e ) = > {
2023-01-25 16:31:51 +01:00
this . # reportError ( e , 'error' ) ;
destination . destroy ( e ) ;
2023-01-13 16:43:20 +01:00
reject ( e ) ;
} )
. on ( 'close' , resolve ) ;
2022-12-12 15:04:21 +01:00
} ) ;
2022-12-16 16:34:48 +01:00
this . # emitStageUpdate ( 'finish' , stage ) ;
2022-12-12 15:04:21 +01:00
}
2022-11-16 11:16:14 +01:00
async init ( ) : Promise < void > {
// Resolve providers' resource and store
// them in the engine's internal state
await this . # resolveProviderResource ( ) ;
// Update the destination provider's source metadata
const { source : sourceMetadata } = this . # metadata ;
if ( sourceMetadata ) {
this . destinationProvider . setMetadata ? . ( 'source' , sourceMetadata ) ;
}
}
2023-01-13 16:43:20 +01:00
/ * *
* Run the bootstrap method in both source and destination providers
* /
2022-11-15 18:51:43 +01:00
async bootstrap ( ) : Promise < void > {
2023-01-13 16:43:20 +01:00
const results = await Promise . allSettled ( [
this . sourceProvider . bootstrap ? . ( ) ,
this . destinationProvider . bootstrap ? . ( ) ,
] ) ;
results . forEach ( ( result ) = > {
if ( result . status === 'rejected' ) {
this . # panic ( result . reason ) ;
}
} ) ;
2022-10-13 11:01:35 +02:00
}
2023-01-13 16:43:20 +01:00
/ * *
* Run the close method in both source and destination providers
* /
2022-10-13 11:01:35 +02:00
async close ( ) : Promise < void > {
2023-01-13 16:43:20 +01:00
const results = await Promise . allSettled ( [
this . sourceProvider . close ? . ( ) ,
2023-01-20 16:36:10 +01:00
this . destinationProvider . close ? . ( ) ,
2023-01-13 16:43:20 +01:00
] ) ;
results . forEach ( ( result ) = > {
if ( result . status === 'rejected' ) {
this . # panic ( result . reason ) ;
}
} ) ;
2022-10-13 11:01:35 +02:00
}
2022-11-15 09:48:51 +01:00
async # resolveProviderResource() {
2022-10-13 11:01:35 +02:00
const sourceMetadata = await this . sourceProvider . getMetadata ( ) ;
const destinationMetadata = await this . destinationProvider . getMetadata ( ) ;
2022-11-15 09:48:51 +01:00
if ( sourceMetadata ) {
this . # metadata . source = sourceMetadata ;
}
if ( destinationMetadata ) {
this . # metadata . destination = destinationMetadata ;
}
}
2023-01-13 16:43:20 +01:00
async integrityCheck() {
2022-11-14 17:35:21 +01:00
try {
const sourceMetadata = await this . sourceProvider . getMetadata ( ) ;
const destinationMetadata = await this . destinationProvider . getMetadata ( ) ;
2022-11-15 09:48:51 +01:00
2022-11-14 17:35:21 +01:00
if ( sourceMetadata && destinationMetadata ) {
this . # assertStrapiVersionIntegrity (
sourceMetadata ? . strapi ? . version ,
destinationMetadata ? . strapi ? . version
) ;
}
2022-11-15 09:48:51 +01:00
2022-12-12 16:51:23 +01:00
const sourceSchemas = ( await this . sourceProvider . getSchemas ? . ( ) ) as SchemaMap ;
const destinationSchemas = ( await this . destinationProvider . getSchemas ? . ( ) ) as SchemaMap ;
2022-10-13 11:01:35 +02:00
2022-11-14 17:35:21 +01:00
if ( sourceSchemas && destinationSchemas ) {
this . # assertSchemasMatching ( sourceSchemas , destinationSchemas ) ;
}
2022-10-13 11:01:35 +02:00
} catch ( error ) {
2023-01-13 16:43:20 +01:00
if ( error instanceof Error ) {
this . # panic ( error ) ;
}
throw error ;
2022-10-13 11:01:35 +02:00
}
}
2022-11-08 14:26:48 +01:00
async transfer ( ) : Promise < ITransferResults < S , D > > {
2022-12-12 16:30:56 +01:00
// reset data between transfers
this . progress . data = { } ;
2022-10-13 11:01:35 +02:00
try {
2022-12-28 16:08:08 +01:00
this . # emitTransferUpdate ( 'init' ) ;
2022-11-15 18:51:43 +01:00
await this . bootstrap ( ) ;
2022-12-28 13:01:35 +01:00
await this . init ( ) ;
2023-01-13 16:43:20 +01:00
await this . integrityCheck ( ) ;
2022-10-13 11:01:35 +02:00
2022-12-28 16:08:08 +01:00
this . # emitTransferUpdate ( 'start' ) ;
2022-12-28 13:01:35 +01:00
await this . beforeTransfer ( ) ;
2023-01-13 16:43:20 +01:00
2022-11-15 09:48:51 +01:00
// Run the transfer stages
2022-12-28 13:01:35 +01:00
await this . transferSchemas ( ) ;
await this . transferEntities ( ) ;
await this . transferAssets ( ) ;
await this . transferLinks ( ) ;
await this . transferConfiguration ( ) ;
2022-11-15 09:48:51 +01:00
// Gracefully close the providers
2022-12-28 13:01:35 +01:00
await this . close ( ) ;
2022-12-21 10:23:18 +01:00
this . # emitTransferUpdate ( 'finish' ) ;
2022-11-17 14:32:28 +01:00
} catch ( e : unknown ) {
2022-12-12 17:06:53 +01:00
this . # emitTransferUpdate ( 'error' , { error : e } ) ;
2022-12-12 14:24:39 +01:00
2023-01-13 16:43:20 +01:00
const lastDiagnostic = last ( this . diagnostics . stack . items ) ;
// Do not report an error diagnostic if the last one reported the same error
if (
e instanceof Error &&
( ! lastDiagnostic || lastDiagnostic . kind !== 'error' || lastDiagnostic . details . error !== e )
) {
2023-01-20 16:36:10 +01:00
this . # reportError ( e , ( e as DataTransferError ) . severity || 'fatal' ) ;
2023-01-13 16:43:20 +01:00
}
2022-10-13 11:01:35 +02:00
// Rollback the destination provider if an exception is thrown during the transfer
// Note: This will be configurable in the future
2022-11-17 14:32:28 +01:00
await this . destinationProvider . rollback ? . ( e as Error ) ;
2023-01-13 16:43:20 +01:00
2022-11-18 14:41:57 +01:00
throw e ;
2022-10-13 11:01:35 +02:00
}
2022-11-08 14:26:48 +01:00
return {
source : this.sourceProvider.results ,
destination : this.destinationProvider.results ,
2022-12-05 10:32:38 +01:00
engine : this.progress.data ,
2022-11-08 14:26:48 +01:00
} ;
2022-10-13 11:01:35 +02:00
}
2022-12-02 11:55:39 +02:00
async beforeTransfer ( ) : Promise < void > {
2023-01-13 16:43:20 +01:00
const runWithDiagnostic = async ( provider : IProvider ) = > {
try {
await provider . beforeTransfer ? . ( ) ;
} catch ( error ) {
// Error happening during the before transfer step should be considered fatal errors
if ( error instanceof Error ) {
this . # panic ( error ) ;
} else {
this . # panic (
new Error ( ` Unknwon error when executing "beforeTransfer" on the ${ origin } provider ` )
) ;
}
}
} ;
await runWithDiagnostic ( this . sourceProvider ) ;
await runWithDiagnostic ( this . destinationProvider ) ;
2022-11-16 15:00:29 +02:00
}
2022-11-03 10:12:16 +02:00
async transferSchemas ( ) : Promise < void > {
2022-12-12 15:04:21 +01:00
const stage : TransferStage = 'schemas' ;
2022-11-07 11:10:05 +02:00
2023-01-09 15:27:48 +01:00
const source = await this . sourceProvider . createSchemasReadStream ? . ( ) ;
const destination = await this . destinationProvider . createSchemasWriteStream ? . ( ) ;
2022-11-08 09:41:53 +01:00
2022-12-12 15:04:21 +01:00
const transform = this . # createStageTransformStream ( stage ) ;
const tracker = this . # progressTracker ( stage , { key : ( value : Schema ) = > value . modelType } ) ;
2022-11-03 10:12:16 +02:00
2022-12-12 15:04:21 +01:00
await this . # transferStage ( { stage , source , destination , transform , tracker } ) ;
2022-11-03 10:12:16 +02:00
}
2022-10-13 11:01:35 +02:00
async transferEntities ( ) : Promise < void > {
2022-12-12 15:04:21 +01:00
const stage : TransferStage = 'entities' ;
2022-10-24 17:11:59 +02:00
2023-01-09 15:27:48 +01:00
const source = await this . sourceProvider . createEntitiesReadStream ? . ( ) ;
const destination = await this . destinationProvider . createEntitiesWriteStream ? . ( ) ;
2022-11-08 09:41:53 +01:00
2022-12-12 15:04:21 +01:00
const transform = this . # createStageTransformStream ( stage ) ;
const tracker = this . # progressTracker ( stage , { key : ( value : IEntity ) = > value . type } ) ;
2022-10-24 17:11:59 +02:00
2022-12-12 15:04:21 +01:00
await this . # transferStage ( { stage , source , destination , transform , tracker } ) ;
2022-10-13 11:01:35 +02:00
}
async transferLinks ( ) : Promise < void > {
2022-12-12 15:04:21 +01:00
const stage : TransferStage = 'links' ;
2022-10-24 17:11:59 +02:00
2023-01-09 15:27:48 +01:00
const source = await this . sourceProvider . createLinksReadStream ? . ( ) ;
const destination = await this . destinationProvider . createLinksWriteStream ? . ( ) ;
2022-11-11 13:41:01 +01:00
2022-12-12 15:04:21 +01:00
const transform = this . # createStageTransformStream ( stage ) ;
const tracker = this . # progressTracker ( stage ) ;
2022-10-24 17:11:59 +02:00
2022-12-12 15:04:21 +01:00
await this . # transferStage ( { stage , source , destination , transform , tracker } ) ;
2022-10-13 11:01:35 +02:00
}
2022-11-24 16:07:07 +01:00
async transferAssets ( ) : Promise < void > {
2022-12-12 15:04:21 +01:00
const stage : TransferStage = 'assets' ;
2022-11-21 18:55:51 +01:00
2023-01-09 15:27:48 +01:00
const source = await this . sourceProvider . createAssetsReadStream ? . ( ) ;
const destination = await this . destinationProvider . createAssetsWriteStream ? . ( ) ;
2022-11-21 18:55:51 +01:00
2022-12-12 15:04:21 +01:00
const transform = this . # createStageTransformStream ( stage ) ;
const tracker = this . # progressTracker ( stage , {
size : ( value : IAsset ) = > value . stats . size ,
2023-01-13 16:43:20 +01:00
key : ( value : IAsset ) = > extname ( value . filename ) ? ? 'NA' ,
2022-11-21 18:55:51 +01:00
} ) ;
2022-12-12 15:04:21 +01:00
await this . # transferStage ( { stage , source , destination , transform , tracker } ) ;
2022-10-13 11:01:35 +02:00
}
async transferConfiguration ( ) : Promise < void > {
2022-12-12 15:04:21 +01:00
const stage : TransferStage = 'configuration' ;
2022-11-02 17:26:32 +01:00
2023-01-09 15:27:48 +01:00
const source = await this . sourceProvider . createConfigurationReadStream ? . ( ) ;
const destination = await this . destinationProvider . createConfigurationWriteStream ? . ( ) ;
2022-11-02 17:26:32 +01:00
2022-12-12 15:04:21 +01:00
const transform = this . # createStageTransformStream ( stage ) ;
const tracker = this . # progressTracker ( stage ) ;
2022-11-11 13:41:01 +01:00
2022-12-12 15:04:21 +01:00
await this . # transferStage ( { stage , source , destination , transform , tracker } ) ;
2022-10-13 11:01:35 +02:00
}
}
2022-10-13 16:52:16 +02:00
2023-01-13 16:43:20 +01:00
export const createTransferEngine = < S extends ISourceProvider , D extends IDestinationProvider > (
2022-11-08 14:26:48 +01:00
sourceProvider : S ,
destinationProvider : D ,
2022-10-13 16:52:16 +02:00
options : ITransferEngineOptions
2022-11-08 14:26:48 +01:00
) : TransferEngine < S , D > = > {
return new TransferEngine < S , D > ( sourceProvider , destinationProvider , options ) ;
2022-10-13 16:52:16 +02:00
} ;