This commit is contained in:
shrushti2000 2025-04-29 23:11:00 +05:30
commit 7cd3e40e9d
25 changed files with 174 additions and 49 deletions

View File

@ -71,6 +71,7 @@ from metadata.ingestion.source.database.postgres.utils import (
get_columns,
get_etable_owner,
get_foreign_keys,
get_schema_names,
get_table_comment,
get_table_owner,
get_view_definition,
@ -115,6 +116,7 @@ Inspector.get_table_ddl = get_table_ddl
Inspector.get_table_owner = get_etable_owner
PGDialect.get_foreign_keys = get_foreign_keys
PGDialect.get_schema_names = get_schema_names
class PostgresSource(CommonDbSourceService, MultiDBSource):
@ -281,7 +283,7 @@ class PostgresSource(CommonDbSourceService, MultiDBSource):
for row in results:
try:
stored_procedure = PostgresStoredProcedure.model_validate(
dict(row._mapping)
dict(row._mapping) # pylint: disable=protected-access
)
yield stored_procedure
except Exception as exc:

View File

@ -196,6 +196,13 @@ POSTGRES_GET_SERVER_VERSION = """
show server_version_num
"""
# pylint: disable=anomalous-backslash-in-string
POSTGRES_GET_SCHEMA_NAMES = """
SELECT nspname FROM pg_namespace
WHERE nspname NOT LIKE 'pg\_%'
ORDER BY nspname
"""
POSTGRES_FETCH_FK = """
SELECT r.conname,
pg_catalog.pg_get_constraintdef(r.oid, true) as condef,

View File

@ -26,6 +26,7 @@ from sqlalchemy.sql import sqltypes
from metadata.ingestion.source.database.postgres.queries import (
POSTGRES_COL_IDENTITY,
POSTGRES_FETCH_FK,
POSTGRES_GET_SCHEMA_NAMES,
POSTGRES_GET_SERVER_VERSION,
POSTGRES_SQL_COLUMNS,
POSTGRES_TABLE_COMMENTS,
@ -43,10 +44,8 @@ logger = utils_logger()
OLD_POSTGRES_VERSION = "130000"
def get_etable_owner(
self, connection, table_name=None, schema=None
): # pylint: disable=unused-argument
# pylint: disable=unused-argument,too-many-arguments,invalid-name,too-many-locals
def get_etable_owner(self, connection, table_name=None, schema=None):
"""Return all owners.
:param schema: Optional, retrieve names from a non-default schema.
@ -67,6 +66,16 @@ def get_etable_owner(
def get_foreign_keys(
self, connection, table_name, schema=None, postgresql_ignore_search_path=False, **kw
):
"""
Args:
connection (_type_): _description_
table_name (_type_): _description_
schema (_type_, optional): _description_. Defaults to None.
postgresql_ignore_search_path (bool, optional): _description_. Defaults to False.
Returns:
_type_: _description_
"""
preparer = self.identifier_preparer
table_oid = self.get_table_oid(
connection, table_name, schema, info_cache=kw.get("info_cache")
@ -87,7 +96,7 @@ def get_foreign_keys(
t = sql.text(POSTGRES_FETCH_FK).columns(
conname=sqltypes.Unicode, condef=sqltypes.Unicode, con_db_name=sqltypes.Unicode
)
c = connection.execute(t, dict(table=table_oid))
c = connection.execute(t, {"table": table_oid})
fkeys = []
for conname, condef, conschema, con_db_name in c.fetchall():
m = re.search(FK_REGEX, condef).groups()
@ -109,7 +118,7 @@ def get_foreign_keys(
) = m
if deferrable is not None:
deferrable = True if deferrable == "DEFERRABLE" else False
deferrable = deferrable == "DEFERRABLE"
constrained_columns = tuple(re.split(r"\s*,\s*", constrained_columns))
constrained_columns = [
preparer._unquote_identifier(x) for x in constrained_columns
@ -161,9 +170,7 @@ def get_foreign_keys(
@reflection.cache
def get_table_owner(
self, connection, table_name, schema=None, **kw
): # pylint: disable=unused-argument
def get_table_owner(self, connection, table_name, schema=None, **kw):
return get_table_owner_wrapper(
self,
connection=connection,
@ -174,9 +181,7 @@ def get_table_owner(
@reflection.cache
def get_table_comment(
self, connection, table_name, schema=None, **kw
): # pylint: disable=unused-argument
def get_table_comment(self, connection, table_name, schema=None, **kw):
return get_table_comment_wrapper(
self,
connection,
@ -187,9 +192,7 @@ def get_table_comment(
@reflection.cache
def get_columns( # pylint: disable=too-many-locals
self, connection, table_name, schema=None, **kw
):
def get_columns(self, connection, table_name, schema=None, **kw):
"""
Overriding the dialect method to add raw_data_type in response
"""
@ -368,7 +371,7 @@ def _handle_array_type(attype):
)
# pylint: disable=too-many-statements,too-many-branches,too-many-locals,too-many-arguments
# pylint: disable=too-many-statements,too-many-branches
def get_column_info(
self,
name,
@ -477,9 +480,7 @@ def get_column_info(
@reflection.cache
def get_view_definition(
self, connection, table_name, schema=None, **kw
): # pylint: disable=unused-argument
def get_view_definition(self, connection, table_name, schema=None, **kw):
return get_view_definition_wrapper(
self,
connection,
@ -515,3 +516,11 @@ def get_postgres_time_column_name(engine) -> str:
):
time_column_name = "total_time"
return time_column_name
@reflection.cache
def get_schema_names(self, connection, **kw):
result = connection.execute(
sql.text(POSTGRES_GET_SCHEMA_NAMES).columns(nspname=sqltypes.Unicode)
)
return [name for name, in result]

View File

@ -95,6 +95,7 @@ export const CustomEdge = ({
onAddPipelineClick,
onColumnEdgeRemove,
dataQualityLineage,
dqHighlightedEdges,
} = useLineageProvider();
const { theme } = useApplicationStore();
@ -124,14 +125,15 @@ export const CustomEdge = ({
// Compute if should show DQ tracing
const showDqTracing = useMemo(() => {
return (
(activeLayer.includes(LineageLayer.DataObservability) &&
dataQualityLineage?.edges?.some(
(dqEdge) => dqEdge?.docId === edge?.docId
)) ??
false
);
}, [activeLayer, dataQualityLineage?.edges, edge?.docId]);
if (
!activeLayer.includes(LineageLayer.DataObservability) ||
!dataQualityLineage?.nodes
) {
return false;
}
return dqHighlightedEdges?.has(id);
}, [activeLayer, dataQualityLineage?.nodes, id, dqHighlightedEdges]);
// Determine if column is highlighted based on traced columns
const isColumnHighlighted = useMemo(() => {

View File

@ -164,7 +164,9 @@ const Lineage = ({
ref={reactFlowWrapper}>
{entityLineage && (
<>
<CustomControlsComponent className="absolute top-1 right-1 p-xs" />
{isPlatformLineage ? null : (
<CustomControlsComponent className="absolute top-1 right-1 p-xs" />
)}
<LineageControlButtons
deleted={deleted}
entityType={entityType}

View File

@ -98,4 +98,5 @@ export interface LineageContextType {
) => void;
onUpdateLayerView: (layers: LineageLayer[]) => void;
redraw: () => Promise<void>;
dqHighlightedEdges?: Set<string>;
}

View File

@ -26,6 +26,7 @@ import React, {
useState,
} from 'react';
import { useTranslation } from 'react-i18next';
import { useHistory } from 'react-router-dom';
import {
Connection,
Edge,
@ -96,6 +97,7 @@ import {
createNewEdge,
createNodes,
decodeLineageHandles,
getAllDownstreamEdges,
getAllTracedColumnEdge,
getClassifiedEdge,
getConnectedNodesEdges,
@ -135,6 +137,7 @@ const LineageProvider = ({ children }: LineageProviderProps) => {
const { t } = useTranslation();
const { fqn: decodedFqn } = useFqn();
const location = useCustomLocation();
const history = useHistory();
const { isTourOpen, isTourPage } = useTourProvider();
const { appPreferences } = useApplicationStore();
const defaultLineageConfig = appPreferences?.lineageConfig as LineageSettings;
@ -197,6 +200,7 @@ const LineageProvider = ({ children }: LineageProviderProps) => {
const backspacePressed = useKeyPress('Backspace');
const { showModal } = useEntityExportModalProvider();
const [isPlatformLineage, setIsPlatformLineage] = useState(false);
const [dqHighlightedEdges, setDqHighlightedEdges] = useState<Set<string>>();
const lineageLayer = useMemo(() => {
const param = location.search;
@ -426,12 +430,29 @@ const LineageProvider = ({ children }: LineageProviderProps) => {
[queryFilter, decodedFqn]
);
const onPlatformViewChange = useCallback((view: LineagePlatformView) => {
setPlatformView(view);
if (view !== LineagePlatformView.None) {
setActiveLayer([]);
}
}, []);
const onPlatformViewChange = useCallback(
(view: LineagePlatformView) => {
setPlatformView(view);
if (view !== LineagePlatformView.None) {
setActiveLayer([]);
}
if (isPlatformLineage) {
const searchData = QueryString.parse(
location.search.startsWith('?')
? location.search.substring(1)
: location.search
);
history.push({
search: QueryString.stringify({
...searchData,
platformView: view !== LineagePlatformView.None ? view : undefined,
}),
});
}
},
[isPlatformLineage, location.search]
);
const exportLineageData = useCallback(
async (_: string) => {
@ -596,7 +617,7 @@ const LineageProvider = ({ children }: LineageProviderProps) => {
setEntityType(entityType);
setIsPlatformLineage(isPlatformLineage ?? false);
if (isPlatformLineage && !entity) {
setPlatformView(LineagePlatformView.Service);
onPlatformViewChange(LineagePlatformView.Service);
}
},
[]
@ -1527,6 +1548,7 @@ const LineageProvider = ({ children }: LineageProviderProps) => {
dataQualityLineage,
redraw,
onPlatformViewChange,
dqHighlightedEdges,
};
}, [
dataQualityLineage,
@ -1575,6 +1597,7 @@ const LineageProvider = ({ children }: LineageProviderProps) => {
onExportClick,
redraw,
onPlatformViewChange,
dqHighlightedEdges,
]);
useEffect(() => {
@ -1599,6 +1622,20 @@ const LineageProvider = ({ children }: LineageProviderProps) => {
}
}, [activeLayer, decodedFqn, lineageConfig]);
useEffect(() => {
if (
dataQualityLineage?.nodes &&
!isUndefined(edges) &&
isUndefined(dqHighlightedEdges)
) {
const edgesToHighlight = dataQualityLineage.nodes
.flatMap((dqNode) => getAllDownstreamEdges(dqNode.id, edges ?? []))
.map((edge) => edge.id);
const edgesToHighlightSet = new Set(edgesToHighlight);
setDqHighlightedEdges(edgesToHighlightSet);
}
}, [dataQualityLineage, edges, dqHighlightedEdges]);
return (
<LineageContext.Provider value={activityFeedContextValues}>
<div

View File

@ -1000,6 +1000,7 @@
"pipeline-plural": "Pipelines",
"pipeline-state": "Pipeline-Status",
"platform": "Plattform",
"platform-type-lineage": "{{platformType}} Abstammung",
"play": "Abspielen",
"please-enter-value": "Bitte einen Wert für {{name}} eingeben",
"please-password-type-first": "Bitte zuerst das Passwort eingeben",

View File

@ -1000,6 +1000,7 @@
"pipeline-plural": "Pipelines",
"pipeline-state": "Pipeline State",
"platform": "Platform",
"platform-type-lineage": "{{platformType}} Lineage",
"play": "Play",
"please-enter-value": "Please enter {{name}} value",
"please-password-type-first": "Please type password first",

View File

@ -1000,6 +1000,7 @@
"pipeline-plural": "Pipelines",
"pipeline-state": "Estado de la pipeline",
"platform": "Platform",
"platform-type-lineage": "{{platformType}} Linaje",
"play": "Play",
"please-enter-value": "Ingrese el valor de {{name}}",
"please-password-type-first": "Ingrese primero la contraseña",

View File

@ -1000,6 +1000,7 @@
"pipeline-plural": "Pipelines",
"pipeline-state": "État de la Pipeline",
"platform": "Plateforme",
"platform-type-lineage": "{{platformType}} Lignage",
"play": "Play",
"please-enter-value": "Merci d'entrer une valeur pour {{name}} ",
"please-password-type-first": "Merci d'entrer le mot de passe d'abord",

View File

@ -1000,6 +1000,7 @@
"pipeline-plural": "Pipelines",
"pipeline-state": "Estado do pipeline",
"platform": "Plataforma",
"platform-type-lineage": "{{platformType}} Liñaxe",
"play": "Reproducir",
"please-enter-value": "Por favor, introduce o valor de {{name}}",
"please-password-type-first": "Por favor, introduce primeiro o contrasinal",

View File

@ -1000,6 +1000,7 @@
"pipeline-plural": "תהליכי טעינה/עיבוד",
"pipeline-state": "מצב תהליך הטעינה/עיבוד",
"platform": "Platform",
"platform-type-lineage": "{{platformType}} שורשים",
"play": "Play",
"please-enter-value": "נא להזין את ערך {{name}}",
"please-password-type-first": "נא להקליד סיסמה תחילה",

View File

@ -1000,6 +1000,7 @@
"pipeline-plural": "パイプライン",
"pipeline-state": "パイプラインの状態",
"platform": "Platform",
"platform-type-lineage": "{{platformType}} リネージ",
"play": "Play",
"please-enter-value": "{{name}}の値を入力してください",
"please-password-type-first": "パスワードを入力してください",

View File

@ -1000,6 +1000,7 @@
"pipeline-plural": "파이프라인들",
"pipeline-state": "파이프라인 상태",
"platform": "플랫폼",
"platform-type-lineage": "{{platformType}} 계보",
"play": "재생",
"please-enter-value": "{{name}} 값을 입력해주세요",
"please-password-type-first": "먼저 비밀번호를 입력해주세요",

View File

@ -1000,6 +1000,7 @@
"pipeline-plural": "पाइपलाइन",
"pipeline-state": "पाइपलाइन स्थिती",
"platform": "प्लॅटफॉर्म",
"platform-type-lineage": "{{platformType}} वंशावळ",
"play": "प्ले",
"please-enter-value": "कृपया {{name}} मूल्य प्रविष्ट करा",
"please-password-type-first": "कृपया प्रथम पासवर्ड टाइप करा",

View File

@ -1000,6 +1000,7 @@
"pipeline-plural": "Pipelines",
"pipeline-state": "Pipelinestatus",
"platform": "Platform",
"platform-type-lineage": "{{platformType}} Herkomst",
"play": "Play",
"please-enter-value": "Voer alstublieft de waarde voor {{name}} in",
"please-password-type-first": "Typ eerst het wachtwoord alstublieft",

View File

@ -1000,6 +1000,7 @@
"pipeline-plural": "خطوط لوله",
"pipeline-state": "وضعیت خط لوله",
"platform": "پلتفرم",
"platform-type-lineage": "{{platformType}} شجره داده",
"play": "پخش",
"please-enter-value": "لطفاً مقدار {{name}} را وارد کنید",
"please-password-type-first": "لطفاً ابتدا رمز عبور را وارد کنید",

View File

@ -1000,6 +1000,7 @@
"pipeline-plural": "Pipelines",
"pipeline-state": "Estado do Pipeline",
"platform": "Platform",
"platform-type-lineage": "{{platformType}} Linhagem",
"play": "Play",
"please-enter-value": "Por favor, insira o valor de {{name}}",
"please-password-type-first": "Por favor, digite a senha primeiro",

View File

@ -1000,6 +1000,7 @@
"pipeline-plural": "Pipelines",
"pipeline-state": "Estado do Pipeline",
"platform": "Platform",
"platform-type-lineage": "{{platformType}} Linhagem",
"play": "Play",
"please-enter-value": "Por favor, insira o valor de {{name}}",
"please-password-type-first": "Por favor, digite a senha primeiro",

View File

@ -1000,6 +1000,7 @@
"pipeline-plural": "Пайплайны",
"pipeline-state": "Состояние",
"platform": "Platform",
"platform-type-lineage": "{{platformType}} Происхождение",
"play": "Play",
"please-enter-value": "Пожалуйста введите значение {{name}} ",
"please-password-type-first": "Пожалуйста, сначала введите пароль",

View File

@ -1000,6 +1000,7 @@
"pipeline-plural": "ท่อหลายรายการ",
"pipeline-state": "สถานะท่อ",
"platform": "แพลตฟอร์ม",
"platform-type-lineage": "{{platformType}} ลำดับชั้น",
"play": "เล่น",
"please-enter-value": "กรุณากรอกค่าของ {{name}}",
"please-password-type-first": "กรุณาพิมพ์รหัสผ่านก่อน",

View File

@ -1000,6 +1000,7 @@
"pipeline-plural": "工作流",
"pipeline-state": "工作流状态",
"platform": "平台",
"platform-type-lineage": "{{platformType}} 血缘关系",
"play": "Play",
"please-enter-value": "请输入{{name}}值",
"please-password-type-first": "请先输入密码",

View File

@ -13,7 +13,7 @@
import { Col, Row, Select } from 'antd';
import { DefaultOptionType } from 'antd/lib/select';
import { AxiosError } from 'axios';
import { debounce } from 'lodash';
import { debounce, startCase } from 'lodash';
import React, { useCallback, useEffect, useMemo, useState } from 'react';
import { useTranslation } from 'react-i18next';
import { useHistory, useParams } from 'react-router-dom';
@ -27,6 +27,7 @@ import { SourceType } from '../../components/SearchedData/SearchedData.interface
import { PAGE_SIZE_BASE } from '../../constants/constants';
import { PAGE_HEADERS } from '../../constants/PageHeaders.constant';
import LineageProvider from '../../context/LineageProvider/LineageProvider';
import { LineagePlatformView } from '../../context/LineageProvider/LineageProvider.interface';
import {
OperationPermission,
ResourceEntity,
@ -34,6 +35,7 @@ import {
import { EntityType } from '../../enums/entity.enum';
import { SearchIndex } from '../../enums/search.enum';
import { EntityReference } from '../../generated/entity/type';
import useCustomLocation from '../../hooks/useCustomLocation/useCustomLocation';
import { useFqn } from '../../hooks/useFqn';
import { getEntityPermissionByFqn } from '../../rest/permissionAPI';
import { searchQuery } from '../../rest/searchAPI';
@ -45,7 +47,11 @@ import './platform-lineage.less';
const PlatformLineage = () => {
const { t } = useTranslation();
const location = useCustomLocation();
const history = useHistory();
const queryParams = new URLSearchParams(location.search);
const platformView =
queryParams.get('platformView') ?? LineagePlatformView.Service;
const { entityType } = useParams<{ entityType: EntityType }>();
const { fqn: decodedFqn } = useFqn();
const [selectedEntity, setSelectedEntity] = useState<SourceType>();
@ -57,6 +63,16 @@ const PlatformLineage = () => {
);
const [permissions, setPermissions] = useState<OperationPermission>();
const handleEntitySelect = useCallback(
(value: EntityReference) => {
history.push(
`/lineage/${(value as SourceType).entityType}/${
value.fullyQualifiedName
}`
);
},
[history]
);
const debouncedSearch = useCallback(
debounce(async (value: string) => {
try {
@ -94,17 +110,6 @@ const PlatformLineage = () => {
[]
);
const handleEntitySelect = useCallback(
(value: EntityReference) => {
history.push(
`/lineage/${(value as SourceType).entityType}/${
value.fullyQualifiedName
}`
);
},
[history]
);
const init = useCallback(async () => {
if (!decodedFqn || !entityType) {
setDefaultValue(undefined);
@ -169,7 +174,14 @@ const PlatformLineage = () => {
<Col span={24}>
<Row className="">
<Col span={24}>
<PageHeader data={PAGE_HEADERS.PLATFORM_LINEAGE} />
<PageHeader
data={{
...PAGE_HEADERS.PLATFORM_LINEAGE,
header: t('label.platform-type-lineage', {
platformType: startCase(platformView),
}),
}}
/>
</Col>
<Col span={12}>
<div className="m-t-md w-full">

View File

@ -1791,3 +1791,40 @@ export const getEntityTypeFromPlatformView = (
return 'service';
}
};
/**
* Recursively finds all downstream edges from a given node in a graph.
* This function traverses the graph depth-first, collecting all edges that flow downstream
* from the specified node while avoiding cycles by tracking visited nodes.
*
* @param {string} nodeId - The ID of the starting node
* @param {Edge[]} edges - Array of all edges in the graph
* @param {Set<string>} [visitedNodes=new Set()] - Set of already visited node IDs to prevent cycles
* @returns {Edge[]} Array of all downstream edges from the starting node
*/
export const getAllDownstreamEdges = (
nodeId: string,
edges: Edge[],
visitedNodes: Set<string> = new Set()
): Edge[] => {
// If we've already visited this node, return empty array to avoid cycles
if (visitedNodes.has(nodeId)) {
return [];
}
visitedNodes.add(nodeId);
// Get direct downstream edges
const directDownstreamEdges = edges.filter((edge) => edge.source === nodeId);
// Get target nodes from direct downstream edges
const targetNodes = directDownstreamEdges.map((edge) => edge.target);
// Recursively get downstream edges for each target node
const nestedDownstreamEdges = targetNodes.flatMap((targetNodeId) =>
getAllDownstreamEdges(targetNodeId, edges, visitedNodes)
);
// Combine direct and nested downstream edges
return [...directDownstreamEdges, ...nestedDownstreamEdges];
};