Minor: fix Lineage export when there is no column / pipeline edge (#18737)

* Minor: fix Lineage export when there is no column / pipeline edge

* add test for lineage export (#18709)

Co-authored-by: Karan Hotchandani <33024356+karanh37@users.noreply.github.com>

* add lineage export tests

---------

Co-authored-by: Karan Hotchandani <33024356+karanh37@users.noreply.github.com>
Co-authored-by: Sweta Agarwalla <105535990+sweta1308@users.noreply.github.com>
Co-authored-by: karanh37 <karanh37@gmail.com>
This commit is contained in:
Sriharsha Chintalapani 2024-11-22 08:21:19 -08:00 committed by GitHub
parent b220bdb891
commit 9484d838a4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 204 additions and 29 deletions

View File

@ -338,9 +338,12 @@ public class LineageRepository {
baseRow.put("toDomain", getDomainFQN(toEntity.path("domain")));
JsonNode columns = edge.path("columns");
if (columns.isArray() && !columns.isEmpty()) {
List<ColumnMapping> explicitColumnMappings = extractColumnMappingsFromEdge(columns);
for (ColumnMapping mapping : explicitColumnMappings) {
JsonNode pipeline = edge.path("pipeline");
if (columns.isArray() && columns.size() > 0) {
// Process column mappings
List<ColumnMapping> columnMappings = extractColumnMappingsFromEdge(columns);
for (ColumnMapping mapping : columnMappings) {
writeCsvRow(
csvWriter,
baseRow,
@ -354,34 +357,14 @@ public class LineageRepository {
"",
"");
LOG.debug(
"Exported explicit ColumnMapping: from='{}', to='{}'",
"Exported ColumnMapping: from='{}', to='{}'",
mapping.getFromChildFQN(),
mapping.getToChildFQN());
}
}
JsonNode pipeline = edge.path("pipeline");
if (!pipeline.isMissingNode() && !pipeline.isNull()) {
String pipelineName = getText(pipeline, "name");
String pipelineType = getText(pipeline, "serviceType");
String pipelineDescription = getText(pipeline, "description");
String pipelineOwners = getOwners(pipeline.path("owners"));
String pipelineServiceName = getText(pipeline.path("service"), "name");
String pipelineServiceType = getText(pipeline, "serviceType");
String pipelineDomain = getDomainFQN(pipeline.path("domain"));
writeCsvRow(
csvWriter,
baseRow,
"",
"",
pipelineName,
pipelineType,
pipelineDescription,
pipelineOwners,
pipelineDomain,
pipelineServiceName,
pipelineServiceType);
LOG.debug("Exported Pipeline Information: {}", pipelineName);
} else if (!pipeline.isMissingNode() && !pipeline.isNull()) {
writePipelineRow(csvWriter, baseRow, pipeline);
} else {
writeCsvRow(csvWriter, baseRow, "", "", "", "", "", "", "", "", "");
}
}
csvWriter.close();
@ -391,6 +374,31 @@ public class LineageRepository {
}
}
private void writePipelineRow(
CSVWriter csvWriter, Map<String, String> baseRow, JsonNode pipeline) {
String pipelineName = getText(pipeline, "name");
String pipelineType = getText(pipeline, "serviceType");
String pipelineDescription = getText(pipeline, "description");
String pipelineOwners = getOwners(pipeline.path("owners"));
String pipelineServiceName = getText(pipeline.path("service"), "name");
String pipelineServiceType = getText(pipeline, "serviceType");
String pipelineDomain = getDomainFQN(pipeline.path("domain"));
writeCsvRow(
csvWriter,
baseRow,
"",
"",
pipelineName,
pipelineType,
pipelineDescription,
pipelineOwners,
pipelineDomain,
pipelineServiceName,
pipelineServiceType);
LOG.debug("Exported Pipeline Information: {}", pipelineName);
}
private static void writeCsvRow(
CSVWriter csvWriter,
Map<String, String> baseRow,

View File

@ -43,6 +43,8 @@ import {
setupEntitiesForLineage,
verifyColumnLayerActive,
verifyColumnLayerInactive,
verifyColumnLineageInCSV,
verifyExportLineageCSV,
verifyNodePresent,
visitLineageTab,
} from '../../utils/lineage';
@ -122,6 +124,13 @@ for (const EntityClass of entities) {
}
});
await test.step('Verify Lineage Export CSV', async () => {
await redirectToHomePage(page);
await currentEntity.visitEntityPage(page);
await visitLineageTab(page);
await verifyExportLineageCSV(page, currentEntity, entities, pipeline);
});
await test.step('Remove lineage between nodes for the entity', async () => {
await redirectToHomePage(page);
await currentEntity.visitEntityPage(page);
@ -199,6 +208,13 @@ test('Verify column lineage between table and topic', async ({ browser }) => {
// Add column lineage
await addColumnLineage(page, sourceCol, targetCol);
// Verify column lineage
await redirectToHomePage(page);
await table.visitEntityPage(page);
await visitLineageTab(page);
await verifyColumnLineageInCSV(page, table, topic, sourceCol, targetCol);
await page.click('[data-testid="edit-lineage"]');
await removeColumnLineage(page, sourceCol, targetCol);
@ -275,7 +291,6 @@ test('Verify column lineage between table and api endpoint', async ({
// Add column lineage
await addColumnLineage(page, sourceCol, targetCol);
await page.click('[data-testid="edit-lineage"]');
await removeColumnLineage(page, sourceCol, targetCol);
await page.click('[data-testid="edit-lineage"]');

View File

@ -12,6 +12,7 @@
*/
import { expect, Page } from '@playwright/test';
import { get } from 'lodash';
import { parseCSV } from '../../src/utils/EntityImport/EntityImportUtils';
import { ApiEndpointClass } from '../support/entity/ApiEndpointClass';
import { ContainerClass } from '../support/entity/ContainerClass';
import { DashboardClass } from '../support/entity/DashboardClass';
@ -28,6 +29,38 @@ import {
toastNotification,
} from './common';
type LineageCSVRecord = {
fromEntityFQN: string;
fromServiceName: string;
fromServiceType: string;
toEntityFQN: string;
toServiceName: string;
toServiceType: string;
pipelineName: string;
};
export const LINEAGE_CSV_HEADERS = [
'fromEntityFQN',
'fromServiceName',
'fromServiceType',
'fromOwners',
'fromDomain',
'toEntityFQN',
'toServiceName',
'toServiceType',
'toOwners',
'toDomain',
'fromChildEntityFQN',
'toChildEntityFQN',
'pipelineName',
'pipelineType',
'pipelineDescription',
'pipelineOwners',
'pipelineDomain',
'pipelineServiceName',
'pipelineServiceType',
];
export const verifyColumnLayerInactive = async (page: Page) => {
await page.click('[data-testid="lineage-layer-btn"]'); // Open Layer popover
await page.waitForSelector(
@ -473,3 +506,122 @@ export const verifyColumnLayerActive = async (page: Page) => {
await page.waitForSelector('[data-testid="lineage-layer-column-btn"].active');
await page.click('[data-testid="lineage-layer-btn"]'); // Close Layer popover
};
export const verifyCSVHeaders = async (page: Page, headers: string[]) => {
LINEAGE_CSV_HEADERS.forEach((expectedHeader) => {
expect(headers).toContain(expectedHeader);
});
};
export const getLineageCSVData = async (page: Page) => {
await page.getByTestId('lineage-export').click();
await expect(page.getByRole('dialog', { name: 'Export' })).toBeVisible();
const [download] = await Promise.all([
page.waitForEvent('download'),
page.click('button#submit-button'),
]);
const filePath = await download.path();
expect(filePath).not.toBeNull();
const fileContent = await download.createReadStream();
let fileData = '';
for await (const item of fileContent) {
fileData += item.toString();
}
const csvRows = fileData
.split('\n')
.map((row) => row.split(',').map((cell) => cell.replace(/"/g, '').trim()));
const headers = csvRows[0];
await verifyCSVHeaders(page, headers);
return parseCSV(csvRows);
};
export const verifyExportLineageCSV = async (
page: Page,
currentEntity: EntityClass,
entities: readonly [
TableClass,
DashboardClass,
TopicClass,
MlModelClass,
ContainerClass,
SearchIndexClass,
ApiEndpointClass,
MetricClass
],
pipeline: PipelineClass
) => {
const parsedData = await getLineageCSVData(page);
const currentEntityFQN = get(
currentEntity,
'entityResponseData.fullyQualifiedName'
);
const arr = [];
for (let i = 0; i < entities.length; i++) {
arr.push({
fromEntityFQN: currentEntityFQN,
fromServiceName: get(
currentEntity,
'entityResponseData.service.name',
''
),
fromServiceType: get(currentEntity, 'entityResponseData.serviceType', ''),
toEntityFQN: get(
entities[i],
'entityResponseData.fullyQualifiedName',
''
),
toServiceName: get(entities[i], 'entityResponseData.service.name', ''),
toServiceType: get(entities[i], 'entityResponseData.serviceType', ''),
pipelineName: get(pipeline, 'entityResponseData.name', ''),
});
}
arr.forEach((expectedRow: LineageCSVRecord) => {
const matchingRow = parsedData.find((row) =>
Object.keys(expectedRow).every(
(key) => row[key] === expectedRow[key as keyof LineageCSVRecord]
)
);
expect(matchingRow).toBeDefined(); // Ensure a matching row exists
});
};
export const verifyColumnLineageInCSV = async (
page: Page,
sourceEntity: EntityClass,
targetEntity: EntityClass,
sourceColFqn: string,
targetColFqn: string
) => {
const parsedData = await getLineageCSVData(page);
const expectedRow = {
fromEntityFQN: get(sourceEntity, 'entityResponseData.fullyQualifiedName'),
fromServiceName: get(sourceEntity, 'entityResponseData.service.name', ''),
fromServiceType: get(sourceEntity, 'entityResponseData.serviceType', ''),
toEntityFQN: get(targetEntity, 'entityResponseData.fullyQualifiedName', ''),
toServiceName: get(targetEntity, 'entityResponseData.service.name', ''),
toServiceType: get(targetEntity, 'entityResponseData.serviceType', ''),
fromChildEntityFQN: sourceColFqn,
toChildEntityFQN: targetColFqn,
pipelineName: '',
};
const matchingRow = parsedData.find((row) =>
Object.keys(expectedRow).every(
(key) => row[key] === expectedRow[key as keyof LineageCSVRecord]
)
);
expect(matchingRow).toBeDefined(); // Ensure a matching row exists
};