FIX: the race condition in bulk import around websocket and restAPI (#22277)

* fix the race condition in bulk import around websocket and restAPI

* Fix for Race condition

* remove the ui race condition fix

* re-switch the function to old place to avoid unwanted file change

* added ui support to tackle race condition as per backend changes

* fix the localization keys and unsave the data after restAPI call, since we are getting it from websocket started response

* fix bulk edit table flakyness

* minor code fixes

* fix the bulk action being affected if multiple tab open for same page and increase the database test time to avoid flakyness

* Fix Failing Test

---------

Co-authored-by: mohitdeuex <mohit.y@deuexsolutions.com>
Co-authored-by: Sriharsha Chintalapani <harshach@users.noreply.github.com>
This commit is contained in:
Ashish Gupta 2025-07-15 17:10:58 +05:30 committed by GitHub
parent ea63d7e184
commit f93e6758a6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
26 changed files with 127 additions and 33 deletions

View File

@ -699,10 +699,14 @@ public abstract class EntityResource<T extends EntityInterface, K extends Entity
new OperationContext(entityType, MetadataOperation.EDIT_ALL);
authorizer.authorize(securityContext, operationContext, getResourceContextByName(name));
String jobId = UUID.randomUUID().toString();
CSVImportResponse responseEntity = new CSVImportResponse(jobId, "Import is in progress.");
Response response =
Response.ok().entity(responseEntity).type(MediaType.APPLICATION_JSON).build();
ExecutorService executorService = AsyncService.getInstance().getExecutorService();
executorService.submit(
() -> {
try {
WebsocketNotificationHandler.sendCsvImportStartedNotification(jobId, securityContext);
CsvImportResult result =
importCsvInternal(securityContext, name, csv, dryRun, recursive);
WebsocketNotificationHandler.sendCsvImportCompleteNotification(
@ -713,8 +717,8 @@ public abstract class EntityResource<T extends EntityInterface, K extends Entity
jobId, securityContext, e.getMessage());
}
});
CSVImportResponse response = new CSVImportResponse(jobId, "Import is in progress.");
return Response.ok().entity(response).type(MediaType.APPLICATION_JSON).build();
return response;
}
public String exportCsvInternal(SecurityContext securityContext, String name, boolean recursive)

View File

@ -215,6 +215,17 @@ public class WebsocketNotificationHandler {
return null;
}
public static void sendCsvImportStartedNotification(
String jobId, SecurityContext securityContext) {
CSVImportMessage message = new CSVImportMessage(jobId, "STARTED", null, null);
String jsonMessage = JsonUtils.pojoToJson(message);
UUID userId = getUserIdFromSecurityContext(securityContext);
if (userId != null) {
WebSocketManager.getInstance()
.sendToOne(userId, WebSocketManager.CSV_IMPORT_CHANNEL, jsonMessage);
}
}
public static void sendCsvImportCompleteNotification(
String jobId, SecurityContext securityContext, CsvImportResult result) {
CSVImportMessage message = new CSVImportMessage(jobId, "COMPLETED", result, null);

View File

@ -91,6 +91,7 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
@ -5170,10 +5171,17 @@ public abstract class EntityResourceTest<T extends EntityInterface, K extends Cr
.on(
"csvImportChannel",
args -> {
receivedMessage[0] = (String) args[0];
String[] msg = new String[1];
msg[0] = (String) args[0];
CSVImportMessage receivedCsvImportMessage =
JsonUtils.readValue(msg[0], CSVImportMessage.class);
System.out.println("Received message: " + receivedMessage[0]);
messageLatch.countDown();
socket.disconnect();
if (Objects.equals(receivedCsvImportMessage.getStatus(), "COMPLETED")
|| Objects.equals(receivedCsvImportMessage.getStatus(), "FAILED")) {
receivedMessage[0] = msg[0];
messageLatch.countDown();
socket.disconnect();
}
})
.on(
Socket.EVENT_CONNECT_ERROR,

View File

@ -387,7 +387,8 @@ test.describe('Bulk Import Export', () => {
});
test('Database', async ({ page }) => {
test.slow(true);
// 5 minutes to avoid test timeout happening some times in AUTs, since it add all the entities layer
test.setTimeout(300_000);
let customPropertyRecord: Record<string, string> = {};

View File

@ -152,7 +152,12 @@ export const fillGlossaryTermDetails = async (
await page.waitForSelector('[data-testid="loader"]', { state: 'detached' });
await page.click('[data-testid="tag-selector"]');
const searchResponse = page.waitForResponse(
`/api/v1/search/query?q=**&index=glossary_term_search_index&**`
);
await page.locator('[data-testid="tag-selector"] input').fill(glossary.name);
await searchResponse;
await page.waitForSelector('[data-testid="loader"]', { state: 'detached' });
await page.getByTestId(`tag-"${glossary.parent}"."${glossary.name}"`).click();
await page.click('[data-testid="saveAssociatedTag"]');
await page.click('.InovuaReactDataGrid__cell--cell-active');

View File

@ -22,7 +22,7 @@ import { showErrorToast } from '../../utils/ToastUtils';
import Loader from '../common/Loader/Loader';
import { UploadFileProps } from './UploadFile.interface';
export const UploadFile: FC<UploadFileProps> = ({
const UploadFile: FC<UploadFileProps> = ({
fileType,
beforeUpload,
onCSVUploaded,
@ -79,3 +79,5 @@ export const UploadFile: FC<UploadFileProps> = ({
</Dragger>
);
};
export default UploadFile;

View File

@ -29,7 +29,7 @@ import {
} from '../../../pages/EntityImport/BulkEntityImportPage/BulkEntityImportPage.interface';
import { showErrorToast } from '../../../utils/ToastUtils';
import Stepper from '../../Settings/Services/Ingestion/IngestionStepper/IngestionStepper.component';
import { UploadFile } from '../../UploadFile/UploadFile';
import UploadFile from '../../UploadFile/UploadFile';
import Banner from '../Banner/Banner';
import './entity-import.style.less';
import { EntityImportProps } from './EntityImport.interface';

View File

@ -1925,6 +1925,7 @@
"hex-color-validation": "Die Eingabe ist kein gültiger HEX-Code.",
"hi-user-welcome-to": "Hallo {{user}}, Willkommen bei",
"image-upload-error": "Das Hochladen von Bildern wird nicht unterstützt. Bitte verwenden Sie die Markdown-Syntax für Bilder, die über eine URL verfügbar sind.",
"import-data-in-progress": "Import läuft.",
"import-entity-help": "Sparen Sie Zeit und Aufwand, indem Sie eine CSV-Datei mit mehreren {{entity}} in einem Durchgang hochladen.",
"in-this-database": "In dieser Datenbank",
"include-assets-message": "Aktivieren Sie die Extraktion von {{assets}} aus der Datenquelle.",

View File

@ -1925,6 +1925,7 @@
"hex-color-validation": "Input is not valid HEX code.",
"hi-user-welcome-to": "Hi {{user}}, Welcome to",
"image-upload-error": "Image upload is not supported. Please use Markdown syntax for images available via URL.",
"import-data-in-progress": "Import is in progress.",
"import-entity-help": "Save time & effort by uploading a CSV file with several {{entity}} in one go.",
"in-this-database": "In this Database",
"include-assets-message": "Enable extracting {{assets}} from the data source.",

View File

@ -1925,6 +1925,7 @@
"hex-color-validation": "La entrada no es un código HEX válido.",
"hi-user-welcome-to": "Hola {{user}}, Bienvenido a",
"image-upload-error": "Image upload is not supported. Please use Markdown syntax for images available via URL.",
"import-data-in-progress": "La importación está en progreso.",
"import-entity-help": "Ahorre tiempo y esfuerzo cargando un archivo CSV con varios {{entity}} de una vez.",
"in-this-database": "En esta base de datos",
"include-assets-message": "Configuración opcional para activar la ingestión de {{assets}}.",

View File

@ -1925,6 +1925,7 @@
"hex-color-validation": "Code hexadécimal non valide.",
"hi-user-welcome-to": "Bonjour {{user}}, Bienvenue sur",
"image-upload-error": "Image upload is not supported. Please use Markdown syntax for images available via URL.",
"import-data-in-progress": "L'importation est en cours.",
"import-entity-help": "Gagnez du temps et de l'effort en téléchargeant un fichier CSV contenant plusieurs {{entityPlural}} en une seule fois.",
"in-this-database": "Dans cette base de données",
"include-assets-message": "Activer l'extraction des {{assets}} de la source de données.",

View File

@ -1925,6 +1925,7 @@
"hex-color-validation": "A entrada non é un código HEX válido.",
"hi-user-welcome-to": "Ola {{user}}, benvido a",
"image-upload-error": "A carga de imaxes non está soportada. Usa a sintaxe Markdown para imaxes dispoñibles a través de URL.",
"import-data-in-progress": "L'importazione è in corso.",
"import-entity-help": "Aforra tempo e esforzo cargando un ficheiro CSV con varios {{entity}} á vez.",
"in-this-database": "Nesta base de datos",
"include-assets-message": "Activar a extracción de {{assets}} desde a fonte de datos.",

View File

@ -1925,6 +1925,7 @@
"hex-color-validation": "הקלט אינו קוד HEX תקין.",
"hi-user-welcome-to": "שלום {{user}}, ברוך הבא אל",
"image-upload-error": "Image upload is not supported. Please use Markdown syntax for images available via URL.",
"import-data-in-progress": "ייבוא בתהליך.",
"import-entity-help": "חסוך זמן ומאמץ על ידי העלאת קובץ CSV עם מספר {{entity}} בפעם אחת.",
"in-this-database": "במסד נתונים זה",
"include-assets-message": "הפעל את החילוץ של {{assets}} ממקור הנתונים.",

View File

@ -1925,6 +1925,7 @@
"hex-color-validation": "Input is not valid HEX code.",
"hi-user-welcome-to": "Hi {{user}}, Welcome to",
"image-upload-error": "Image upload is not supported. Please use Markdown syntax for images available via URL.",
"import-data-in-progress": "インポートが進行中です。",
"import-entity-help": "Save time & effort by uploading a CSV file with several {{entity}} in one go.",
"in-this-database": "In this Database",
"include-assets-message": "データソースから{{assets}}の抽出を有効にする。",

View File

@ -1925,6 +1925,7 @@
"hex-color-validation": "입력이 유효한 HEX 코드가 아닙니다.",
"hi-user-welcome-to": "안녕하세요 {{user}}님, 환영합니다",
"image-upload-error": "이미지 업로드가 지원되지 않습니다. URL을 통해 사용 가능한 이미지에 대해 Markdown 구문을 사용하세요.",
"import-data-in-progress": "가져오기가 진행 중입니다.",
"import-entity-help": "CSV 파일을 업로드하여 한 번에 여러 {{entity}}을(를) 추가하여 시간과 노력을 절약하세요.",
"in-this-database": "이 데이터베이스에서",
"include-assets-message": "데이터 소스에서 {{assets}} 추출을 활성화합니다.",

View File

@ -1925,6 +1925,7 @@
"hex-color-validation": "इनपुट वैध HEX कोड नाही.",
"hi-user-welcome-to": "नमस्कार {{user}}, स्वागत आहे",
"image-upload-error": "प्रतिमा अपलोड समर्थित नाही. कृपया URL द्वारे उपलब्ध प्रतिमांसाठी मार्कडाउन सिंटॅक्स वापरा.",
"import-data-in-progress": "आयात सुरू आहे.",
"import-entity-help": "एकाच वेळी अनेक {{entity}} सह CSV फाइल अपलोड करून वेळ आणि प्रयत्न वाचवा.",
"in-this-database": "या डेटाबेसमध्ये",
"include-assets-message": "डेटा स्रोतातून {{assets}} काढणे सक्षम करा.",

View File

@ -1925,6 +1925,7 @@
"hex-color-validation": "Invoer is geen geldige HEX-code.",
"hi-user-welcome-to": "Hoi {{user}}, welkom bij",
"image-upload-error": "Image upload is not supported. Please use Markdown syntax for images available via URL.",
"import-data-in-progress": "Import is bezig.",
"import-entity-help": "Bespaar tijd en moeite door een CSV-bestand te uploaden met verschillende {{entity}} in één keer.",
"in-this-database": "In deze database",
"include-assets-message": "Schakel het extraheren van {{assets}} uit de databron in.",

View File

@ -1925,6 +1925,7 @@
"hex-color-validation": "ورودی کد HEX معتبر نیست.",
"hi-user-welcome-to": "سلام {{user}}، خوش آمدید به",
"image-upload-error": "بارگذاری تصویر پشتیبانی نمی‌شود. لطفاً از نحو Markdown برای تصاویری که از طریق URL در دسترس هستند استفاده کنید.",
"import-data-in-progress": "در حال وارد کردن.",
"import-entity-help": "با بارگذاری یک فایل CSV که حاوی چندین {{entity}} است، در زمان و تلاش خود صرفه‌جویی کنید.",
"in-this-database": "در این پایگاه داده",
"include-assets-message": "استخراج {{assets}} از منبع داده را فعال کنید.",

View File

@ -1925,6 +1925,7 @@
"hex-color-validation": "A entrada não é um código HEX válido.",
"hi-user-welcome-to": "Olá {{user}}, Bem-vindo(a) ao",
"image-upload-error": "O upload de imagens não é suportado. Use a sintaxe Markdown para imagens disponíveis via URL.",
"import-data-in-progress": "A importação está em andamento.",
"import-entity-help": "Economize tempo e esforço fazendo upload de um arquivo CSV com vários {{entity}} de uma só vez.",
"in-this-database": "Neste Banco de Dados",
"include-assets-message": "Habilite a extração de {{assets}} da fonte de dados.",

View File

@ -1925,6 +1925,7 @@
"hex-color-validation": "A entrada não é um código HEX válido.",
"hi-user-welcome-to": "Olá {{user}}, Bem-vindo(a) ao",
"image-upload-error": "Image upload is not supported. Please use Markdown syntax for images available via URL.",
"import-data-in-progress": "A importação está em andamento.",
"import-entity-help": "Economize tempo e esforço fazendo upload de um arquivo CSV com vários {{entity}} de uma só vez.",
"in-this-database": "Neste Banco de Dados",
"include-assets-message": "Habilite a extração de {{assets}} da fonte de dados.",

View File

@ -1925,6 +1925,7 @@
"hex-color-validation": "Input is not valid HEX code.",
"hi-user-welcome-to": "Привет, {{user}}! Добро пожаловать в",
"image-upload-error": "Image upload is not supported. Please use Markdown syntax for images available via URL.",
"import-data-in-progress": "Импорт выполняется.",
"import-entity-help": "Сэкономьте время и усилия, загрузив CSV-файл с несколькими {{entity}} за один раз.",
"in-this-database": "В этой базе данных",
"include-assets-message": "Включите извлечение {{assets}} из источника данных.",

View File

@ -1925,6 +1925,7 @@
"hex-color-validation": "ข้อมูลนำเข้าคือรหัส HEX ที่ไม่ถูกต้อง",
"hi-user-welcome-to": "สวัสดี {{user}}, ยินดีต้อนรับสู่",
"image-upload-error": "ไม่สนับสนุนการอัปโหลดภาพ โปรดใช้ไวยากรณ์ Markdown สำหรับรูปภาพที่มีอยู่ผ่าน URL",
"import-data-in-progress": "กำลังนำเข้าข้อมูล",
"import-entity-help": "ประหยัดเวลาและความพยายามโดยการอัปโหลดไฟล์ CSV ที่มีหลาย {{entity}} ในครั้งเดียว",
"in-this-database": "ในฐานข้อมูลนี้",
"include-assets-message": "เปิดใช้งานการดึง {{assets}} จากแหล่งข้อมูล",

View File

@ -1925,6 +1925,7 @@
"hex-color-validation": "Giriş geçerli bir HEX kodu değil.",
"hi-user-welcome-to": "Merhaba {{user}}, Hoş Geldiniz",
"image-upload-error": "Resim yükleme desteklenmiyor. Lütfen URL üzerinden kullanılabilen resimler için Markdown sözdizimini kullanın.",
"import-data-in-progress": "İçe aktarma devam ediyor.",
"import-entity-help": "Bir CSV dosyasını tek seferde birkaç {{entity}} ile yükleyerek zamandan ve emekten tasarruf edin.",
"in-this-database": "Bu Veritabanında",
"include-assets-message": "Veri kaynağından {{assets}} çıkarımını etkinleştirin.",

View File

@ -1925,6 +1925,7 @@
"hex-color-validation": "输入不是有效的 HEX 颜色代码",
"hi-user-welcome-to": "Hi {{user}}, 欢迎来到",
"image-upload-error": "Image upload is not supported. Please use Markdown syntax for images available via URL.",
"import-data-in-progress": "导入正在进行中。",
"import-entity-help": "通过上传 CSV 文件批量维护术语, 节约时间并提高效率。",
"in-this-database": "在此数据库中",
"include-assets-message": "启用从数据源提取{{assets}}",

View File

@ -19,7 +19,7 @@ export type CSVImportAsyncResponse = {
export type CSVImportAsyncWebsocketResponse = {
jobId: string;
status: 'COMPLETED' | 'FAILED';
status: 'COMPLETED' | 'FAILED' | 'STARTED';
result: CSVImportResult;
error: string | null;
};

View File

@ -39,7 +39,7 @@ import { TitleBreadcrumbProps } from '../../../components/common/TitleBreadcrumb
import { DataAssetsHeaderProps } from '../../../components/DataAssets/DataAssetsHeader/DataAssetsHeader.interface';
import PageLayoutV1 from '../../../components/PageLayoutV1/PageLayoutV1';
import Stepper from '../../../components/Settings/Services/Ingestion/IngestionStepper/IngestionStepper.component';
import { UploadFile } from '../../../components/UploadFile/UploadFile';
import UploadFile from '../../../components/UploadFile/UploadFile';
import {
ENTITY_IMPORT_STEPS,
VALIDATION_STEP,
@ -77,6 +77,14 @@ const BulkEntityImportPage = () => {
const [activeAsyncImportJob, setActiveAsyncImportJob] =
useState<CSVImportJobType>();
const activeAsyncImportJobRef = useRef<CSVImportJobType>();
// This ref is used to track the bulk action processing for the Current/Active Page or Tab
const isBulkActionProcessingRef = useRef<{
isProcessing: boolean;
entityType?: EntityType;
}>({
isProcessing: false,
entityType: undefined,
});
const [activeStep, setActiveStep] = useState<VALIDATION_STEP>(
VALIDATION_STEP.UPLOAD
@ -177,22 +185,21 @@ const BulkEntityImportPage = () => {
const handleLoadData = useCallback(
async (e: ProgressEvent<FileReader>) => {
try {
const result = e.target?.result as string;
const validationResponse = await validateCsvString(
result,
isBulkActionProcessingRef.current = {
isProcessing: true,
entityType,
fqn,
isBulkEdit
);
};
const result = e.target?.result as string;
const jobData: CSVImportJobType = {
...validationResponse,
const initialLoadJobData: CSVImportJobType = {
type: 'initialLoad',
initialResult: result,
};
setActiveAsyncImportJob(jobData);
activeAsyncImportJobRef.current = jobData;
setActiveAsyncImportJob(initialLoadJobData);
activeAsyncImportJobRef.current = initialLoadJobData;
await validateCsvString(result, entityType, fqn, isBulkEdit);
} catch (error) {
showErrorToast(error as AxiosError);
}
@ -226,21 +233,25 @@ const BulkEntityImportPage = () => {
const api = getImportValidateAPIEntityType(entityType);
const response = await api({
isBulkActionProcessingRef.current = {
isProcessing: true,
entityType,
};
const validateLoadData: CSVImportJobType = {
type: 'onValidate',
};
setActiveAsyncImportJob(validateLoadData);
activeAsyncImportJobRef.current = validateLoadData;
await api({
entityType,
name: fqn,
data: csvData,
dryRun: activeStep === VALIDATION_STEP.EDIT_VALIDATE,
recursive: !isBulkEdit,
});
const jobData: CSVImportJobType = {
...response,
type: 'onValidate',
};
setActiveAsyncImportJob(jobData);
activeAsyncImportJobRef.current = jobData;
} catch (error) {
showErrorToast(error as AxiosError);
setIsValidating(false);
@ -391,7 +402,6 @@ const BulkEntityImportPage = () => {
importedEntityType,
handleResetImportJob,
handleActiveStepChange,
history,
]
);
@ -401,8 +411,42 @@ const BulkEntityImportPage = () => {
return;
}
const activeImportJob = activeAsyncImportJobRef.current;
// If the job is started, then save the job data and message to the active job.
// This will help in case of restAPI response, didn't come in time.
if (
websocketResponse.status === 'STARTED' &&
isBulkActionProcessingRef.current.isProcessing &&
isBulkActionProcessingRef.current.entityType === entityType
) {
const processedStartedResponse = {
...websocketResponse,
message: t('message.import-data-in-progress'),
};
setActiveAsyncImportJob((job) => {
if (!job) {
return;
}
return {
...job,
...processedStartedResponse,
};
});
activeAsyncImportJobRef.current = {
...(activeAsyncImportJobRef.current as CSVImportJobType),
...processedStartedResponse,
};
isBulkActionProcessingRef.current = {
isProcessing: false,
entityType: undefined,
};
return;
}
const activeImportJob = activeAsyncImportJobRef.current;
if (websocketResponse.jobId === activeImportJob?.jobId) {
setActiveAsyncImportJob((job) => {
if (!job) {
@ -457,12 +501,13 @@ const BulkEntityImportPage = () => {
}
},
[
activeStepRef,
isBulkActionProcessingRef,
activeAsyncImportJobRef,
onCSVReadComplete,
setActiveAsyncImportJob,
handleResetImportJob,
handleActiveStepChange,
handleImportWebsocketResponseWithActiveStep,
]
);
@ -484,7 +529,8 @@ const BulkEntityImportPage = () => {
}
return () => {
socket && socket.off(SOCKET_EVENTS.CSV_IMPORT_CHANNEL);
socket?.off(SOCKET_EVENTS.CSV_IMPORT_CHANNEL);
handleResetImportJob();
};
}, [socket]);