From 7cf54d069367ad5333dada2f2a7fe8dfc85a8596 Mon Sep 17 00:00:00 2001 From: Ben Irvin Date: Mon, 12 Dec 2022 14:24:39 +0100 Subject: [PATCH] add transfer events and telemetry --- .../lib/engine/__tests__/engine.test.ts | 11 +++-- .../core/data-transfer/lib/engine/index.ts | 47 +++++++++++-------- .../data-transfer/types/transfer-engine.d.ts | 13 +++++ .../strapi/lib/commands/transfer/export.js | 32 ++++++++++--- .../strapi/lib/commands/transfer/import.js | 21 +++++++++ 5 files changed, 94 insertions(+), 30 deletions(-) diff --git a/packages/core/data-transfer/lib/engine/__tests__/engine.test.ts b/packages/core/data-transfer/lib/engine/__tests__/engine.test.ts index e7c817e61e..fbedad9f19 100644 --- a/packages/core/data-transfer/lib/engine/__tests__/engine.test.ts +++ b/packages/core/data-transfer/lib/engine/__tests__/engine.test.ts @@ -2,7 +2,7 @@ import * as path from 'path'; import { cloneDeep } from 'lodash/fp'; import { Readable, Writable } from 'stream-chain'; import type { Schema } from '@strapi/strapi'; -import { createTransferEngine } from '..'; +import { createTransferEngine, transferStages } from '..'; import type { IAsset, IDestinationProvider, @@ -392,12 +392,13 @@ describe('Transfer engine', () => { }); describe('progressStream', () => { - test("emits 'progress' events", async () => { + test("emits 'stage::progress' events", async () => { const source = createSource(); const engine = createTransferEngine(source, completeDestination, defaultOptions); let calls = 0; - engine.progress.stream.on('progress', ({ stage, data }) => { + engine.progress.stream.on('stage::progress', ({ stage, data }) => { + expect(transferStages.includes(stage)).toBe(true); expect(data).toMatchObject(engine.progress.data); calls += 1; }); @@ -411,8 +412,8 @@ describe('Transfer engine', () => { }); // TODO: to implement these, the mocked streams need to be improved - test.todo("emits 'start' events"); - test.todo("emits 'complete' events"); + test.todo("emits 'stage::start' events"); + test.todo("emits 'stage::finish' events"); }); describe('integrity checks', () => { diff --git a/packages/core/data-transfer/lib/engine/index.ts b/packages/core/data-transfer/lib/engine/index.ts index da4ecd2113..29055e82b9 100644 --- a/packages/core/data-transfer/lib/engine/index.ts +++ b/packages/core/data-transfer/lib/engine/index.ts @@ -2,6 +2,7 @@ import { PassThrough } from 'stream-chain'; import * as path from 'path'; import { isEmpty, uniq } from 'lodash/fp'; import type { Schema } from '@strapi/strapi'; +import { randomUUID } from 'crypto'; import type { Diff, @@ -13,25 +14,22 @@ import type { ITransferEngine, ITransferEngineOptions, ITransferResults, + TransferProgress, TransferStage, } from '../../types'; import compareSchemas from '../strategies'; + // eslint-disable-next-line @typescript-eslint/no-var-requires const semverDiff = require('semver/functions/diff'); -type TransferProgress = { - [key in TransferStage]?: { - count: number; - bytes: number; - aggregates?: { - [key: string]: { - count: number; - bytes: number; - }; - }; - }; -}; +export const transferStages: TransferStage[] = [ + 'entities', + 'links', + 'assets', + 'schemas', + 'configuration', +]; class TransferEngine< S extends ISourceProvider = ISourceProvider, @@ -123,8 +121,12 @@ class TransferEngine< }); } - #emitStageUpdate(type: 'start' | 'complete' | 'progress', transferStage: TransferStage) { - this.progress.stream.emit(type, { + #emitTransferUpdate(type: 'start' | 'finish' | 'error', payload?: object) { + this.progress.stream.emit(type, payload); + } + + #emitStageUpdate(type: 'start' | 'finish' | 'progress', transferStage: TransferStage) { + this.progress.stream.emit(`stage::${type}`, { data: this.progress.data, stage: transferStage, }); @@ -256,6 +258,9 @@ class TransferEngine< } async transfer(): Promise> { + const transferId = randomUUID(); + this.#emitTransferUpdate('start', { transferId }); + try { await this.bootstrap(); await this.init(); @@ -277,9 +282,13 @@ class TransferEngine< await this.transferLinks(); await this.transferConfiguration(); + this.#emitTransferUpdate('finish', { transferId }); + // Gracefully close the providers await this.close(); } catch (e: unknown) { + this.#emitTransferUpdate('error', { error: e, transferId }); + // Rollback the destination provider if an exception is thrown during the transfer // Note: This will be configurable in the future await this.destinationProvider.rollback?.(e as Error); @@ -322,7 +331,7 @@ class TransferEngine< .on('error', reject) // Resolve the promise when the destination has finished reading all the data from the source .on('close', () => { - this.#emitStageUpdate('complete', stageName); + this.#emitStageUpdate('finish', stageName); resolve(); }); @@ -361,7 +370,7 @@ class TransferEngine< }) // Resolve the promise when the destination has finished reading all the data from the source .on('close', () => { - this.#emitStageUpdate('complete', stageName); + this.#emitStageUpdate('finish', stageName); resolve(); }); @@ -396,7 +405,7 @@ class TransferEngine< .on('error', reject) // Resolve the promise when the destination has finished reading all the data from the source .on('close', () => { - this.#emitStageUpdate('complete', stageName); + this.#emitStageUpdate('finish', stageName); resolve(); }); @@ -428,7 +437,7 @@ class TransferEngine< .on('error', reject) // Resolve the promise when the destination has finished reading all the data from the source .on('close', () => { - this.#emitStageUpdate('complete', stageName); + this.#emitStageUpdate('finish', stageName); resolve(); }); @@ -468,7 +477,7 @@ class TransferEngine< .on('error', reject) // Resolve the promise when the destination has finished reading all the data from the source .on('close', () => { - this.#emitStageUpdate('complete', stageName); + this.#emitStageUpdate('finish', stageName); resolve(); }); diff --git a/packages/core/data-transfer/types/transfer-engine.d.ts b/packages/core/data-transfer/types/transfer-engine.d.ts index 926c8d666f..460bd81f5f 100644 --- a/packages/core/data-transfer/types/transfer-engine.d.ts +++ b/packages/core/data-transfer/types/transfer-engine.d.ts @@ -3,6 +3,19 @@ import { IEntity, ILink } from './common-entities'; import { ITransferRule } from './utils'; import { ISourceProvider, IDestinationProvider } from './provider'; +export type TransferProgress = { + [key in TransferStage]?: { + count: number; + bytes: number; + aggregates?: { + [key: string]: { + count: number; + bytes: number; + }; + }; + }; +}; + /** * Defines the capabilities and properties of the transfer engine */ diff --git a/packages/core/strapi/lib/commands/transfer/export.js b/packages/core/strapi/lib/commands/transfer/export.js index e74d6432db..a37e954fb6 100644 --- a/packages/core/strapi/lib/commands/transfer/export.js +++ b/packages/core/strapi/lib/commands/transfer/export.js @@ -24,17 +24,15 @@ module.exports = async (opts) => { } const filename = opts.file; + // Load a local instance of Strapi for source and for engine to send telemetry + const strapiInstance = await strapi(await strapi.compile()).load(); + /** * From local Strapi instance */ const sourceOptions = { async getStrapi() { - const appContext = await strapi.compile(); - const app = strapi(appContext); - - app.log.level = 'error'; - - return app.load(); + return strapiInstance; }, }; const source = createLocalStrapiSourceProvider(sourceOptions); @@ -81,6 +79,28 @@ module.exports = async (opts) => { try { logger.log(`Starting export...`); + const progress = engine.progress.stream; + + const telemetryPayload = (payload) => { + return { + transferId: payload.transferId, + source: engine.sourceProvider.name, + destination: engine.destinationProvider.name, + }; + }; + + progress.on('start', (payload = undefined) => { + strapiInstance.telemetry.send('deitsStarted', telemetryPayload(payload)); + }); + + progress.on('finish', (payload = undefined) => { + strapiInstance.telemetry.send('deitsFinished', telemetryPayload(payload)); + }); + + progress.on('error', (payload = undefined) => { + strapiInstance.telemetry.send('deitsFailed', telemetryPayload(payload)); + }); + const results = await engine.transfer(); const table = buildTransferTable(results.engine); logger.log(table.toString()); diff --git a/packages/core/strapi/lib/commands/transfer/import.js b/packages/core/strapi/lib/commands/transfer/import.js index 864f5edd4b..6c428f5634 100644 --- a/packages/core/strapi/lib/commands/transfer/import.js +++ b/packages/core/strapi/lib/commands/transfer/import.js @@ -72,6 +72,27 @@ module.exports = async (opts) => { try { logger.info('Starting import...'); + const progress = engine.progress.stream; + const telemetryPayload = (payload) => { + return { + transferId: payload.transferId, + source: engine.sourceProvider.name, + destination: engine.destinationProvider.name, + }; + }; + + progress.on('start', (payload = undefined) => { + strapiInstance.telemetry.send('deitsStarted', telemetryPayload(payload)); + }); + + progress.on('finish', (payload = undefined) => { + strapiInstance.telemetry.send('deitsFinished', telemetryPayload(payload)); + }); + + progress.on('error', (payload = undefined) => { + strapiInstance.telemetry.send('deitsFailed', telemetryPayload(payload)); + }); + const results = await engine.transfer(); const table = buildTransferTable(results.engine); logger.info(table.toString());