fix aggregate data recording

This commit is contained in:
Ben Irvin 2022-11-11 16:14:21 +01:00
parent 16f8d98731
commit af1a2372d6

View File

@ -9,10 +9,19 @@ import type {
TransferStage,
} from '../../types';
type CategoryProgress = {
[key: string]: {};
};
type TransferProgress = {
[key in TransferStage]?: {
count: number;
bytes?: number;
aggregates?: {
[key: string]: {
count: number;
bytes?: number;
};
};
};
};
@ -25,7 +34,6 @@ class TransferEngine<
destinationProvider: IDestinationProvider;
options: ITransferEngineOptions;
transferProgress: TransferProgress = {};
#progressStream: PassThrough = new PassThrough({ objectMode: true });
get progressStream() {
return this.#progressStream;
@ -41,19 +49,32 @@ class TransferEngine<
this.options = options;
}
incrementTransferProgress(name: TransferStage, data: any) {
#incrementTransferProgress(name: TransferStage, data: any, aggregateKey?: string) {
if (!_.has(name, this.transferProgress)) {
this.transferProgress[name] = { count: 0, bytes: 0 };
}
this.transferProgress[name]!.count += 1;
this.transferProgress[name]!.bytes! += JSON.stringify(data).length;
const size = JSON.stringify(data).length;
this.transferProgress[name]!.bytes! += size;
if (aggregateKey && _.has(aggregateKey, data)) {
const aggKeyValue = data[aggregateKey];
if (!_.has('aggregates', this.transferProgress[name])) {
this.transferProgress[name]!.aggregates = {};
}
if (!_.has(aggKeyValue, this.transferProgress[name]!.aggregates)) {
this.transferProgress[name]!.aggregates![aggKeyValue] = { count: 0, bytes: 0 };
}
this.transferProgress[name]!.aggregates![aggKeyValue].count += 1;
this.transferProgress[name]!.aggregates![aggKeyValue].bytes! += size;
}
}
countRecorder = (name: TransferStage) => {
#countRecorder = (name: TransferStage, aggregateKey?: string) => {
return new PassThrough({
objectMode: true,
transform: (data, encoding, callback) => {
this.incrementTransferProgress(name, data);
this.#incrementTransferProgress(name, data, aggregateKey);
this.#progressStream.write({
type: 'progress',
name,
@ -208,7 +229,7 @@ class TransferEngine<
resolve();
});
inStream.pipe(this.countRecorder(stepName)).pipe(outStream);
inStream.pipe(this.#countRecorder(stepName)).pipe(outStream);
});
}
@ -245,7 +266,7 @@ class TransferEngine<
resolve();
});
inStream.pipe(this.countRecorder(stepName)).pipe(outStream);
inStream.pipe(this.#countRecorder(stepName, 'type')).pipe(outStream);
});
}
@ -278,7 +299,7 @@ class TransferEngine<
resolve();
});
inStream.pipe(this.countRecorder(stepName)).pipe(outStream);
inStream.pipe(this.#countRecorder(stepName)).pipe(outStream);
});
}
@ -323,7 +344,7 @@ class TransferEngine<
resolve();
});
inStream.pipe(this.countRecorder(stepName)).pipe(outStream);
inStream.pipe(this.#countRecorder(stepName)).pipe(outStream);
});
}
}