implement rollbacks using transactions

This commit is contained in:
Bassel 2023-01-18 14:59:10 +02:00
parent 9fecc309e7
commit 9dc8e9a25c
7 changed files with 224 additions and 109 deletions

View File

@ -25,6 +25,10 @@ class LocalStrapiDestinationProvider implements IDestinationProvider {
strapi?: Strapi.Strapi; strapi?: Strapi.Strapi;
transaction?: any;
endTransaction?: any;
/** /**
* The entities mapper is used to map old entities to their new IDs * The entities mapper is used to map old entities to their new IDs
*/ */
@ -42,6 +46,7 @@ class LocalStrapiDestinationProvider implements IDestinationProvider {
async close(): Promise<void> { async close(): Promise<void> {
const { autoDestroy } = this.options; const { autoDestroy } = this.options;
this.endTransaction();
// Basically `!== false` but more deterministic // Basically `!== false` but more deterministic
if (autoDestroy === undefined || autoDestroy === true) { if (autoDestroy === undefined || autoDestroy === true) {
@ -64,9 +69,23 @@ class LocalStrapiDestinationProvider implements IDestinationProvider {
} }
async beforeTransfer() { async beforeTransfer() {
if (this.options.strategy === 'restore') { if (!this.strapi) {
await this.#deleteAll(); throw new Error('Strapi instance not found');
} }
const { transaction, endTransaction } = await utils.transaction.createTransaction(this.strapi);
this.transaction = transaction;
this.endTransaction = endTransaction;
await this.transaction(async () => {
try {
if (this.options.strategy === 'restore') {
await this.#deleteAll();
}
} catch (error) {
throw new Error(`restore failed ${error}`);
}
});
} }
getMetadata(): IMetadata { getMetadata(): IMetadata {
@ -120,6 +139,7 @@ class LocalStrapiDestinationProvider implements IDestinationProvider {
return restore.createEntitiesWriteStream({ return restore.createEntitiesWriteStream({
strapi: this.strapi, strapi: this.strapi,
updateMappingTable, updateMappingTable,
transaction: this.transaction,
}); });
} }
@ -183,7 +203,7 @@ class LocalStrapiDestinationProvider implements IDestinationProvider {
const { strategy } = this.options; const { strategy } = this.options;
if (strategy === 'restore') { if (strategy === 'restore') {
return restore.createConfigurationWriteStream(this.strapi); return restore.createConfigurationWriteStream(this.strapi, this.transaction);
} }
throw new Error(`Invalid strategy supplied: "${strategy}"`); throw new Error(`Invalid strategy supplied: "${strategy}"`);
@ -198,7 +218,7 @@ class LocalStrapiDestinationProvider implements IDestinationProvider {
const mapID = (uid: string, id: number): number | undefined => this.#entitiesMapper[uid]?.[id]; const mapID = (uid: string, id: number): number | undefined => this.#entitiesMapper[uid]?.[id];
if (strategy === 'restore') { if (strategy === 'restore') {
return restore.createLinksWriteStream(mapID, this.strapi); return restore.createLinksWriteStream(mapID, this.strapi, this.transaction);
} }
throw new Error(`Invalid strategy supplied: "${strategy}"`); throw new Error(`Invalid strategy supplied: "${strategy}"`);

View File

@ -1,8 +1,10 @@
import _ from 'lodash';
import { Writable } from 'stream'; import { Writable } from 'stream';
import chalk from 'chalk'; import chalk from 'chalk';
import { IConfiguration } from '../../../../../../types'; import { IConfiguration } from '../../../../../../types';
const restoreCoreStore = async <T extends { value: unknown }>(strapi: Strapi.Strapi, data: T) => { const restoreCoreStore = async <T extends { value: unknown }>(strapi: Strapi.Strapi, values: T) => {
const data = _.omit(values, ['id']);
return strapi.db.query('strapi::core-store').create({ return strapi.db.query('strapi::core-store').create({
data: { data: {
...data, ...data,
@ -11,7 +13,8 @@ const restoreCoreStore = async <T extends { value: unknown }>(strapi: Strapi.Str
}); });
}; };
const restoreWebhooks = async (strapi: Strapi.Strapi, data: unknown) => { const restoreWebhooks = async <T extends { value: unknown }>(strapi: Strapi.Strapi, values: T) => {
const data = _.omit(values, ['id']);
return strapi.db.query('webhook').create({ data }); return strapi.db.query('webhook').create({ data });
}; };
@ -21,11 +24,11 @@ export const restoreConfigs = async (strapi: Strapi.Strapi, config: IConfigurati
} }
if (config.type === 'webhook') { if (config.type === 'webhook') {
return restoreWebhooks(strapi, config.value); return restoreWebhooks(strapi, config.value as { value: unknown });
} }
}; };
export const createConfigurationWriteStream = async (strapi: Strapi.Strapi) => { export const createConfigurationWriteStream = async (strapi: Strapi.Strapi, transaction: any) => {
return new Writable({ return new Writable({
objectMode: true, objectMode: true,
async write<T extends { id: number }>( async write<T extends { id: number }>(
@ -33,18 +36,18 @@ export const createConfigurationWriteStream = async (strapi: Strapi.Strapi) => {
_encoding: BufferEncoding, _encoding: BufferEncoding,
callback: (error?: Error | null) => void callback: (error?: Error | null) => void
) { ) {
try { return transaction(async () => {
await restoreConfigs(strapi, config); try {
} catch (error) { await restoreConfigs(strapi, config);
return callback( } catch (error) {
new Error( return callback(
`Failed to import ${chalk.yellowBright(config.type)} (${chalk.greenBright( new Error(
config.value.id `Failed to import ${chalk.yellowBright(config.type)} (${chalk.greenBright(error)}`
)}` )
) );
); }
} callback();
callback(); });
}, },
}); });
}; };

View File

@ -10,90 +10,93 @@ import * as queries from '../../../../queries';
interface IEntitiesRestoreStreamOptions { interface IEntitiesRestoreStreamOptions {
strapi: Strapi.Strapi; strapi: Strapi.Strapi;
updateMappingTable<T extends SchemaUID | string>(type: T, oldID: number, newID: number): void; updateMappingTable<T extends SchemaUID | string>(type: T, oldID: number, newID: number): void;
transaction: any;
} }
const createEntitiesWriteStream = (options: IEntitiesRestoreStreamOptions) => { const createEntitiesWriteStream = (options: IEntitiesRestoreStreamOptions) => {
const { strapi, updateMappingTable } = options; const { strapi, updateMappingTable, transaction } = options;
const query = queries.entity.createEntityQuery(strapi); const query = queries.entity.createEntityQuery(strapi);
return new Writable({ return new Writable({
objectMode: true, objectMode: true,
async write(entity: IEntity, _encoding, callback) { async write(entity: IEntity, _encoding, callback) {
const { type, id, data } = entity; return transaction(async () => {
const { create, getDeepPopulateComponentLikeQuery } = query(type); const { type, id, data } = entity;
const contentType = strapi.getModel(type); const { create, getDeepPopulateComponentLikeQuery } = query(type);
const contentType = strapi.getModel(type);
/** /**
* Resolve the component UID of an entity's attribute based * Resolve the component UID of an entity's attribute based
* on a given path (components & dynamic zones only) * on a given path (components & dynamic zones only)
*/ */
const resolveType = (paths: string[]): string | undefined => { const resolveType = (paths: string[]): string | undefined => {
let cType = contentType; let cType = contentType;
let value: unknown = data; let value: unknown = data;
for (const path of paths) { for (const path of paths) {
value = get(path, value); value = get(path, value);
// Needed when the value of cType should be computed // Needed when the value of cType should be computed
// based on the next value (eg: dynamic zones) // based on the next value (eg: dynamic zones)
if (typeof cType === 'function') { if (typeof cType === 'function') {
cType = cType(value); cType = cType(value);
}
if (path in cType.attributes) {
const attribute = cType.attributes[path];
if (attribute.type === 'component') {
cType = strapi.getModel(attribute.component);
} }
if (attribute.type === 'dynamiczone') { if (path in cType.attributes) {
cType = ({ __component }: { __component: string }) => strapi.getModel(__component); const attribute = cType.attributes[path];
if (attribute.type === 'component') {
cType = strapi.getModel(attribute.component);
}
if (attribute.type === 'dynamiczone') {
cType = ({ __component }: { __component: string }) => strapi.getModel(__component);
}
} }
} }
return cType?.uid;
};
try {
const created = await create({
data,
populate: getDeepPopulateComponentLikeQuery(contentType, { select: 'id' }),
select: 'id',
});
// Compute differences between original & new entities
const diffs = json.diff(data, created);
updateMappingTable(type, id, created.id);
// For each difference found on an ID attribute,
// update the mapping the table accordingly
diffs.forEach((diff) => {
if (diff.kind === 'modified' && last(diff.path) === 'id') {
const target = resolveType(diff.path);
// If no type is found for the given path, then ignore the diff
if (!target) {
return;
}
const [oldID, newID] = diff.values as [number, number];
updateMappingTable(target, oldID, newID);
}
});
} catch (e) {
if (e instanceof Error) {
return callback(e);
}
return callback(new Error(`Failed to create "${type}" (${id})`));
} }
return cType?.uid; return callback(null);
}; });
try {
const created = await create({
data,
populate: getDeepPopulateComponentLikeQuery(contentType, { select: 'id' }),
select: 'id',
});
// Compute differences between original & new entities
const diffs = json.diff(data, created);
updateMappingTable(type, id, created.id);
// For each difference found on an ID attribute,
// update the mapping the table accordingly
diffs.forEach((diff) => {
if (diff.kind === 'modified' && last(diff.path) === 'id') {
const target = resolveType(diff.path);
// If no type is found for the given path, then ignore the diff
if (!target) {
return;
}
const [oldID, newID] = diff.values as [number, number];
updateMappingTable(target, oldID, newID);
}
});
} catch (e) {
if (e instanceof Error) {
return callback(e);
}
return callback(new Error(`Failed to create "${type}" (${id})`));
}
return callback(null);
}, },
}); });
}; };

View File

@ -4,31 +4,34 @@ import { createLinkQuery } from '../../../../queries/link';
export const createLinksWriteStream = ( export const createLinksWriteStream = (
mapID: (uid: string, id: number) => number | undefined, mapID: (uid: string, id: number) => number | undefined,
strapi: Strapi.Strapi strapi: Strapi.Strapi,
transaction: any
) => { ) => {
return new Writable({ return new Writable({
objectMode: true, objectMode: true,
async write(link: ILink, _encoding, callback) { async write(link: ILink, _encoding, callback) {
const { left, right } = link; return transaction(async (trx: any) => {
const query = createLinkQuery(strapi); const { left, right } = link;
const query = createLinkQuery(strapi, trx);
// Map IDs if needed // Map IDs if needed
left.ref = mapID(left.type, left.ref) ?? left.ref; left.ref = mapID(left.type, left.ref) ?? left.ref;
right.ref = mapID(right.type, right.ref) ?? right.ref; right.ref = mapID(right.type, right.ref) ?? right.ref;
try { try {
await query().insert(link); await query().insert(link);
} catch (e) { } catch (e) {
if (e instanceof Error) { if (e instanceof Error) {
return callback(e); return callback(e);
}
return callback(
new Error(`An error happened while trying to import a ${left.type} link. ${e}`)
);
} }
return callback( callback(null);
new Error(`An error happened while trying to import a ${left.type} link. ${e}`) });
);
}
callback(null);
}, },
}); });
}; };

View File

@ -4,7 +4,7 @@ import { ILink } from '../../../types';
// TODO: Remove any types when we'll have types for DB metadata // TODO: Remove any types when we'll have types for DB metadata
export const createLinkQuery = (strapi: Strapi.Strapi) => { export const createLinkQuery = (strapi: Strapi.Strapi, trx?: any) => {
const query = () => { const query = () => {
const { connection } = strapi.db; const { connection } = strapi.db;
@ -33,6 +33,10 @@ export const createLinkQuery = (strapi: Strapi.Strapi) => {
const qb = connection.queryBuilder().select('id', joinColumnName).from(metadata.tableName); const qb = connection.queryBuilder().select('id', joinColumnName).from(metadata.tableName);
if (trx) {
qb.transacting(trx);
}
// TODO: stream the query to improve performances // TODO: stream the query to improve performances
const entries = await qb; const entries = await qb;
@ -114,6 +118,10 @@ export const createLinkQuery = (strapi: Strapi.Strapi) => {
qb.select(validColumns); qb.select(validColumns);
if (trx) {
qb.transacting(trx);
}
// TODO: stream the query to improve performances // TODO: stream the query to improve performances
const entries = await qb; const entries = await qb;
@ -180,10 +188,16 @@ export const createLinkQuery = (strapi: Strapi.Strapi) => {
if (attribute.joinColumn) { if (attribute.joinColumn) {
const joinColumnName = attribute.joinColumn.name; const joinColumnName = attribute.joinColumn.name;
if (trx) {
await connection(metadata.tableName) await connection(metadata.tableName)
.where('id', left.ref) .where('id', left.ref)
.update({ [joinColumnName]: right.ref }); .transacting(trx)
.update({ [joinColumnName]: right.ref });
} else {
await connection(metadata.tableName)
.where('id', left.ref)
.update({ [joinColumnName]: right.ref });
}
} }
if (attribute.joinTable) { if (attribute.joinTable) {
@ -242,7 +256,11 @@ export const createLinkQuery = (strapi: Strapi.Strapi) => {
assignOrderColumns(); assignOrderColumns();
await connection.insert(payload).into(name); if (trx) {
await connection.insert(payload).transacting(trx).into(name);
} else {
await connection.insert(payload).into(name);
}
} }
}; };

View File

@ -2,3 +2,4 @@ export * as encryption from './encryption';
export * as stream from './stream'; export * as stream from './stream';
export * as json from './json'; export * as json from './json';
export * as schema from './schema'; export * as schema from './schema';
export * as transaction from './transaction';

View File

@ -0,0 +1,67 @@
import { EventEmitter } from 'events';
import { randomUUID } from 'crypto';
import { Strapi } from '@strapi/strapi';
export const createTransaction = async (strapi: Strapi) => {
const fns: { fn: (trx?: any) => Promise<void>; uuid: string }[] = [];
let done = false;
let resume = () => {};
const e = new EventEmitter();
e.on('spawn', (uuid, cb) => {
fns.push({ fn: cb, uuid });
resume();
return uuid;
});
e.on('close', () => {
done = true;
resume();
});
strapi.db.transaction(async (trx) => {
while (!done) {
while (fns.length) {
const item = fns.shift();
if (item) {
const { fn, uuid } = item;
try {
const res = await fn(trx);
e.emit(uuid, { data: res });
} catch (error) {
e.emit(uuid, { error });
}
}
}
if (!done && !fns.length) {
// eslint-disable-next-line @typescript-eslint/no-loop-func
await new Promise<void>((resolve) => {
resume = resolve;
});
}
}
});
return {
async transaction<T = undefined>(callback: any): Promise<T | undefined> {
const uuid = randomUUID();
e.emit('spawn', uuid, callback);
return new Promise<T | undefined>((resolve, reject) => {
e.on(uuid, ({ data, error }) => {
if (data) {
resolve(data);
}
if (error) {
reject(data);
}
resolve(undefined);
});
});
},
endTransaction() {
return e.emit('close');
},
};
};