diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/LineageRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/LineageRepository.java index 65d43bf7666..134fe998ec1 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/LineageRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/LineageRepository.java @@ -338,9 +338,12 @@ public class LineageRepository { baseRow.put("toDomain", getDomainFQN(toEntity.path("domain"))); JsonNode columns = edge.path("columns"); - if (columns.isArray() && !columns.isEmpty()) { - List explicitColumnMappings = extractColumnMappingsFromEdge(columns); - for (ColumnMapping mapping : explicitColumnMappings) { + JsonNode pipeline = edge.path("pipeline"); + + if (columns.isArray() && columns.size() > 0) { + // Process column mappings + List columnMappings = extractColumnMappingsFromEdge(columns); + for (ColumnMapping mapping : columnMappings) { writeCsvRow( csvWriter, baseRow, @@ -354,34 +357,14 @@ public class LineageRepository { "", ""); LOG.debug( - "Exported explicit ColumnMapping: from='{}', to='{}'", + "Exported ColumnMapping: from='{}', to='{}'", mapping.getFromChildFQN(), mapping.getToChildFQN()); } - } - - JsonNode pipeline = edge.path("pipeline"); - if (!pipeline.isMissingNode() && !pipeline.isNull()) { - String pipelineName = getText(pipeline, "name"); - String pipelineType = getText(pipeline, "serviceType"); - String pipelineDescription = getText(pipeline, "description"); - String pipelineOwners = getOwners(pipeline.path("owners")); - String pipelineServiceName = getText(pipeline.path("service"), "name"); - String pipelineServiceType = getText(pipeline, "serviceType"); - String pipelineDomain = getDomainFQN(pipeline.path("domain")); - writeCsvRow( - csvWriter, - baseRow, - "", - "", - pipelineName, - pipelineType, - pipelineDescription, - pipelineOwners, - pipelineDomain, - pipelineServiceName, - pipelineServiceType); - LOG.debug("Exported Pipeline Information: {}", pipelineName); + } else if (!pipeline.isMissingNode() && !pipeline.isNull()) { + writePipelineRow(csvWriter, baseRow, pipeline); + } else { + writeCsvRow(csvWriter, baseRow, "", "", "", "", "", "", "", "", ""); } } csvWriter.close(); @@ -391,6 +374,31 @@ public class LineageRepository { } } + private void writePipelineRow( + CSVWriter csvWriter, Map baseRow, JsonNode pipeline) { + String pipelineName = getText(pipeline, "name"); + String pipelineType = getText(pipeline, "serviceType"); + String pipelineDescription = getText(pipeline, "description"); + String pipelineOwners = getOwners(pipeline.path("owners")); + String pipelineServiceName = getText(pipeline.path("service"), "name"); + String pipelineServiceType = getText(pipeline, "serviceType"); + String pipelineDomain = getDomainFQN(pipeline.path("domain")); + + writeCsvRow( + csvWriter, + baseRow, + "", + "", + pipelineName, + pipelineType, + pipelineDescription, + pipelineOwners, + pipelineDomain, + pipelineServiceName, + pipelineServiceType); + LOG.debug("Exported Pipeline Information: {}", pipelineName); + } + private static void writeCsvRow( CSVWriter csvWriter, Map baseRow, diff --git a/openmetadata-ui/src/main/resources/ui/playwright/e2e/Pages/Lineage.spec.ts b/openmetadata-ui/src/main/resources/ui/playwright/e2e/Pages/Lineage.spec.ts index 24f29ec4746..9213690a8a7 100644 --- a/openmetadata-ui/src/main/resources/ui/playwright/e2e/Pages/Lineage.spec.ts +++ b/openmetadata-ui/src/main/resources/ui/playwright/e2e/Pages/Lineage.spec.ts @@ -43,6 +43,8 @@ import { setupEntitiesForLineage, verifyColumnLayerActive, verifyColumnLayerInactive, + verifyColumnLineageInCSV, + verifyExportLineageCSV, verifyNodePresent, visitLineageTab, } from '../../utils/lineage'; @@ -122,6 +124,13 @@ for (const EntityClass of entities) { } }); + await test.step('Verify Lineage Export CSV', async () => { + await redirectToHomePage(page); + await currentEntity.visitEntityPage(page); + await visitLineageTab(page); + await verifyExportLineageCSV(page, currentEntity, entities, pipeline); + }); + await test.step('Remove lineage between nodes for the entity', async () => { await redirectToHomePage(page); await currentEntity.visitEntityPage(page); @@ -199,6 +208,13 @@ test('Verify column lineage between table and topic', async ({ browser }) => { // Add column lineage await addColumnLineage(page, sourceCol, targetCol); + + // Verify column lineage + await redirectToHomePage(page); + await table.visitEntityPage(page); + await visitLineageTab(page); + await verifyColumnLineageInCSV(page, table, topic, sourceCol, targetCol); + await page.click('[data-testid="edit-lineage"]'); await removeColumnLineage(page, sourceCol, targetCol); @@ -275,7 +291,6 @@ test('Verify column lineage between table and api endpoint', async ({ // Add column lineage await addColumnLineage(page, sourceCol, targetCol); await page.click('[data-testid="edit-lineage"]'); - await removeColumnLineage(page, sourceCol, targetCol); await page.click('[data-testid="edit-lineage"]'); diff --git a/openmetadata-ui/src/main/resources/ui/playwright/utils/lineage.ts b/openmetadata-ui/src/main/resources/ui/playwright/utils/lineage.ts index 369d0aeda63..a41c7a16793 100644 --- a/openmetadata-ui/src/main/resources/ui/playwright/utils/lineage.ts +++ b/openmetadata-ui/src/main/resources/ui/playwright/utils/lineage.ts @@ -12,6 +12,7 @@ */ import { expect, Page } from '@playwright/test'; import { get } from 'lodash'; +import { parseCSV } from '../../src/utils/EntityImport/EntityImportUtils'; import { ApiEndpointClass } from '../support/entity/ApiEndpointClass'; import { ContainerClass } from '../support/entity/ContainerClass'; import { DashboardClass } from '../support/entity/DashboardClass'; @@ -28,6 +29,38 @@ import { toastNotification, } from './common'; +type LineageCSVRecord = { + fromEntityFQN: string; + fromServiceName: string; + fromServiceType: string; + toEntityFQN: string; + toServiceName: string; + toServiceType: string; + pipelineName: string; +}; + +export const LINEAGE_CSV_HEADERS = [ + 'fromEntityFQN', + 'fromServiceName', + 'fromServiceType', + 'fromOwners', + 'fromDomain', + 'toEntityFQN', + 'toServiceName', + 'toServiceType', + 'toOwners', + 'toDomain', + 'fromChildEntityFQN', + 'toChildEntityFQN', + 'pipelineName', + 'pipelineType', + 'pipelineDescription', + 'pipelineOwners', + 'pipelineDomain', + 'pipelineServiceName', + 'pipelineServiceType', +]; + export const verifyColumnLayerInactive = async (page: Page) => { await page.click('[data-testid="lineage-layer-btn"]'); // Open Layer popover await page.waitForSelector( @@ -473,3 +506,122 @@ export const verifyColumnLayerActive = async (page: Page) => { await page.waitForSelector('[data-testid="lineage-layer-column-btn"].active'); await page.click('[data-testid="lineage-layer-btn"]'); // Close Layer popover }; + +export const verifyCSVHeaders = async (page: Page, headers: string[]) => { + LINEAGE_CSV_HEADERS.forEach((expectedHeader) => { + expect(headers).toContain(expectedHeader); + }); +}; + +export const getLineageCSVData = async (page: Page) => { + await page.getByTestId('lineage-export').click(); + + await expect(page.getByRole('dialog', { name: 'Export' })).toBeVisible(); + + const [download] = await Promise.all([ + page.waitForEvent('download'), + page.click('button#submit-button'), + ]); + + const filePath = await download.path(); + + expect(filePath).not.toBeNull(); + + const fileContent = await download.createReadStream(); + + let fileData = ''; + for await (const item of fileContent) { + fileData += item.toString(); + } + + const csvRows = fileData + .split('\n') + .map((row) => row.split(',').map((cell) => cell.replace(/"/g, '').trim())); + + const headers = csvRows[0]; + await verifyCSVHeaders(page, headers); + + return parseCSV(csvRows); +}; + +export const verifyExportLineageCSV = async ( + page: Page, + currentEntity: EntityClass, + entities: readonly [ + TableClass, + DashboardClass, + TopicClass, + MlModelClass, + ContainerClass, + SearchIndexClass, + ApiEndpointClass, + MetricClass + ], + pipeline: PipelineClass +) => { + const parsedData = await getLineageCSVData(page); + const currentEntityFQN = get( + currentEntity, + 'entityResponseData.fullyQualifiedName' + ); + + const arr = []; + for (let i = 0; i < entities.length; i++) { + arr.push({ + fromEntityFQN: currentEntityFQN, + fromServiceName: get( + currentEntity, + 'entityResponseData.service.name', + '' + ), + fromServiceType: get(currentEntity, 'entityResponseData.serviceType', ''), + toEntityFQN: get( + entities[i], + 'entityResponseData.fullyQualifiedName', + '' + ), + toServiceName: get(entities[i], 'entityResponseData.service.name', ''), + toServiceType: get(entities[i], 'entityResponseData.serviceType', ''), + pipelineName: get(pipeline, 'entityResponseData.name', ''), + }); + } + + arr.forEach((expectedRow: LineageCSVRecord) => { + const matchingRow = parsedData.find((row) => + Object.keys(expectedRow).every( + (key) => row[key] === expectedRow[key as keyof LineageCSVRecord] + ) + ); + + expect(matchingRow).toBeDefined(); // Ensure a matching row exists + }); +}; + +export const verifyColumnLineageInCSV = async ( + page: Page, + sourceEntity: EntityClass, + targetEntity: EntityClass, + sourceColFqn: string, + targetColFqn: string +) => { + const parsedData = await getLineageCSVData(page); + const expectedRow = { + fromEntityFQN: get(sourceEntity, 'entityResponseData.fullyQualifiedName'), + fromServiceName: get(sourceEntity, 'entityResponseData.service.name', ''), + fromServiceType: get(sourceEntity, 'entityResponseData.serviceType', ''), + toEntityFQN: get(targetEntity, 'entityResponseData.fullyQualifiedName', ''), + toServiceName: get(targetEntity, 'entityResponseData.service.name', ''), + toServiceType: get(targetEntity, 'entityResponseData.serviceType', ''), + fromChildEntityFQN: sourceColFqn, + toChildEntityFQN: targetColFqn, + pipelineName: '', + }; + + const matchingRow = parsedData.find((row) => + Object.keys(expectedRow).every( + (key) => row[key] === expectedRow[key as keyof LineageCSVRecord] + ) + ); + + expect(matchingRow).toBeDefined(); // Ensure a matching row exists +};