diff --git a/ingestion/src/metadata/ingestion/source/database/postgres/metadata.py b/ingestion/src/metadata/ingestion/source/database/postgres/metadata.py index 7dd1f6c4a66..1479d24e4f1 100644 --- a/ingestion/src/metadata/ingestion/source/database/postgres/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/postgres/metadata.py @@ -71,6 +71,7 @@ from metadata.ingestion.source.database.postgres.utils import ( get_columns, get_etable_owner, get_foreign_keys, + get_schema_names, get_table_comment, get_table_owner, get_view_definition, @@ -115,6 +116,7 @@ Inspector.get_table_ddl = get_table_ddl Inspector.get_table_owner = get_etable_owner PGDialect.get_foreign_keys = get_foreign_keys +PGDialect.get_schema_names = get_schema_names class PostgresSource(CommonDbSourceService, MultiDBSource): @@ -281,7 +283,7 @@ class PostgresSource(CommonDbSourceService, MultiDBSource): for row in results: try: stored_procedure = PostgresStoredProcedure.model_validate( - dict(row._mapping) + dict(row._mapping) # pylint: disable=protected-access ) yield stored_procedure except Exception as exc: diff --git a/ingestion/src/metadata/ingestion/source/database/postgres/queries.py b/ingestion/src/metadata/ingestion/source/database/postgres/queries.py index 0b76443307c..eb68d1a6767 100644 --- a/ingestion/src/metadata/ingestion/source/database/postgres/queries.py +++ b/ingestion/src/metadata/ingestion/source/database/postgres/queries.py @@ -196,6 +196,13 @@ POSTGRES_GET_SERVER_VERSION = """ show server_version_num """ +# pylint: disable=anomalous-backslash-in-string +POSTGRES_GET_SCHEMA_NAMES = """ +SELECT nspname FROM pg_namespace + WHERE nspname NOT LIKE 'pg\_%' + ORDER BY nspname +""" + POSTGRES_FETCH_FK = """ SELECT r.conname, pg_catalog.pg_get_constraintdef(r.oid, true) as condef, diff --git a/ingestion/src/metadata/ingestion/source/database/postgres/utils.py b/ingestion/src/metadata/ingestion/source/database/postgres/utils.py index 411177e379c..02534f3da9e 100644 --- a/ingestion/src/metadata/ingestion/source/database/postgres/utils.py +++ b/ingestion/src/metadata/ingestion/source/database/postgres/utils.py @@ -26,6 +26,7 @@ from sqlalchemy.sql import sqltypes from metadata.ingestion.source.database.postgres.queries import ( POSTGRES_COL_IDENTITY, POSTGRES_FETCH_FK, + POSTGRES_GET_SCHEMA_NAMES, POSTGRES_GET_SERVER_VERSION, POSTGRES_SQL_COLUMNS, POSTGRES_TABLE_COMMENTS, @@ -43,10 +44,8 @@ logger = utils_logger() OLD_POSTGRES_VERSION = "130000" - -def get_etable_owner( - self, connection, table_name=None, schema=None -): # pylint: disable=unused-argument +# pylint: disable=unused-argument,too-many-arguments,invalid-name,too-many-locals +def get_etable_owner(self, connection, table_name=None, schema=None): """Return all owners. :param schema: Optional, retrieve names from a non-default schema. @@ -67,6 +66,16 @@ def get_etable_owner( def get_foreign_keys( self, connection, table_name, schema=None, postgresql_ignore_search_path=False, **kw ): + """ + Args: + connection (_type_): _description_ + table_name (_type_): _description_ + schema (_type_, optional): _description_. Defaults to None. + postgresql_ignore_search_path (bool, optional): _description_. Defaults to False. + + Returns: + _type_: _description_ + """ preparer = self.identifier_preparer table_oid = self.get_table_oid( connection, table_name, schema, info_cache=kw.get("info_cache") @@ -87,7 +96,7 @@ def get_foreign_keys( t = sql.text(POSTGRES_FETCH_FK).columns( conname=sqltypes.Unicode, condef=sqltypes.Unicode, con_db_name=sqltypes.Unicode ) - c = connection.execute(t, dict(table=table_oid)) + c = connection.execute(t, {"table": table_oid}) fkeys = [] for conname, condef, conschema, con_db_name in c.fetchall(): m = re.search(FK_REGEX, condef).groups() @@ -109,7 +118,7 @@ def get_foreign_keys( ) = m if deferrable is not None: - deferrable = True if deferrable == "DEFERRABLE" else False + deferrable = deferrable == "DEFERRABLE" constrained_columns = tuple(re.split(r"\s*,\s*", constrained_columns)) constrained_columns = [ preparer._unquote_identifier(x) for x in constrained_columns @@ -161,9 +170,7 @@ def get_foreign_keys( @reflection.cache -def get_table_owner( - self, connection, table_name, schema=None, **kw -): # pylint: disable=unused-argument +def get_table_owner(self, connection, table_name, schema=None, **kw): return get_table_owner_wrapper( self, connection=connection, @@ -174,9 +181,7 @@ def get_table_owner( @reflection.cache -def get_table_comment( - self, connection, table_name, schema=None, **kw -): # pylint: disable=unused-argument +def get_table_comment(self, connection, table_name, schema=None, **kw): return get_table_comment_wrapper( self, connection, @@ -187,9 +192,7 @@ def get_table_comment( @reflection.cache -def get_columns( # pylint: disable=too-many-locals - self, connection, table_name, schema=None, **kw -): +def get_columns(self, connection, table_name, schema=None, **kw): """ Overriding the dialect method to add raw_data_type in response """ @@ -368,7 +371,7 @@ def _handle_array_type(attype): ) -# pylint: disable=too-many-statements,too-many-branches,too-many-locals,too-many-arguments +# pylint: disable=too-many-statements,too-many-branches def get_column_info( self, name, @@ -477,9 +480,7 @@ def get_column_info( @reflection.cache -def get_view_definition( - self, connection, table_name, schema=None, **kw -): # pylint: disable=unused-argument +def get_view_definition(self, connection, table_name, schema=None, **kw): return get_view_definition_wrapper( self, connection, @@ -515,3 +516,11 @@ def get_postgres_time_column_name(engine) -> str: ): time_column_name = "total_time" return time_column_name + + +@reflection.cache +def get_schema_names(self, connection, **kw): + result = connection.execute( + sql.text(POSTGRES_GET_SCHEMA_NAMES).columns(nspname=sqltypes.Unicode) + ) + return [name for name, in result] diff --git a/openmetadata-ui/src/main/resources/ui/src/components/Entity/EntityLineage/CustomEdge.component.tsx b/openmetadata-ui/src/main/resources/ui/src/components/Entity/EntityLineage/CustomEdge.component.tsx index 2ec1a5efbee..81e86958a79 100644 --- a/openmetadata-ui/src/main/resources/ui/src/components/Entity/EntityLineage/CustomEdge.component.tsx +++ b/openmetadata-ui/src/main/resources/ui/src/components/Entity/EntityLineage/CustomEdge.component.tsx @@ -95,6 +95,7 @@ export const CustomEdge = ({ onAddPipelineClick, onColumnEdgeRemove, dataQualityLineage, + dqHighlightedEdges, } = useLineageProvider(); const { theme } = useApplicationStore(); @@ -124,14 +125,15 @@ export const CustomEdge = ({ // Compute if should show DQ tracing const showDqTracing = useMemo(() => { - return ( - (activeLayer.includes(LineageLayer.DataObservability) && - dataQualityLineage?.edges?.some( - (dqEdge) => dqEdge?.docId === edge?.docId - )) ?? - false - ); - }, [activeLayer, dataQualityLineage?.edges, edge?.docId]); + if ( + !activeLayer.includes(LineageLayer.DataObservability) || + !dataQualityLineage?.nodes + ) { + return false; + } + + return dqHighlightedEdges?.has(id); + }, [activeLayer, dataQualityLineage?.nodes, id, dqHighlightedEdges]); // Determine if column is highlighted based on traced columns const isColumnHighlighted = useMemo(() => { diff --git a/openmetadata-ui/src/main/resources/ui/src/components/Lineage/Lineage.component.tsx b/openmetadata-ui/src/main/resources/ui/src/components/Lineage/Lineage.component.tsx index 91ae5cd264f..966e8efbffc 100644 --- a/openmetadata-ui/src/main/resources/ui/src/components/Lineage/Lineage.component.tsx +++ b/openmetadata-ui/src/main/resources/ui/src/components/Lineage/Lineage.component.tsx @@ -164,7 +164,9 @@ const Lineage = ({ ref={reactFlowWrapper}> {entityLineage && ( <> - + {isPlatformLineage ? null : ( + + )} void; onUpdateLayerView: (layers: LineageLayer[]) => void; redraw: () => Promise; + dqHighlightedEdges?: Set; } diff --git a/openmetadata-ui/src/main/resources/ui/src/context/LineageProvider/LineageProvider.tsx b/openmetadata-ui/src/main/resources/ui/src/context/LineageProvider/LineageProvider.tsx index 3bbddfb9343..fbec4fc728d 100644 --- a/openmetadata-ui/src/main/resources/ui/src/context/LineageProvider/LineageProvider.tsx +++ b/openmetadata-ui/src/main/resources/ui/src/context/LineageProvider/LineageProvider.tsx @@ -26,6 +26,7 @@ import React, { useState, } from 'react'; import { useTranslation } from 'react-i18next'; +import { useHistory } from 'react-router-dom'; import { Connection, Edge, @@ -96,6 +97,7 @@ import { createNewEdge, createNodes, decodeLineageHandles, + getAllDownstreamEdges, getAllTracedColumnEdge, getClassifiedEdge, getConnectedNodesEdges, @@ -135,6 +137,7 @@ const LineageProvider = ({ children }: LineageProviderProps) => { const { t } = useTranslation(); const { fqn: decodedFqn } = useFqn(); const location = useCustomLocation(); + const history = useHistory(); const { isTourOpen, isTourPage } = useTourProvider(); const { appPreferences } = useApplicationStore(); const defaultLineageConfig = appPreferences?.lineageConfig as LineageSettings; @@ -197,6 +200,7 @@ const LineageProvider = ({ children }: LineageProviderProps) => { const backspacePressed = useKeyPress('Backspace'); const { showModal } = useEntityExportModalProvider(); const [isPlatformLineage, setIsPlatformLineage] = useState(false); + const [dqHighlightedEdges, setDqHighlightedEdges] = useState>(); const lineageLayer = useMemo(() => { const param = location.search; @@ -426,12 +430,29 @@ const LineageProvider = ({ children }: LineageProviderProps) => { [queryFilter, decodedFqn] ); - const onPlatformViewChange = useCallback((view: LineagePlatformView) => { - setPlatformView(view); - if (view !== LineagePlatformView.None) { - setActiveLayer([]); - } - }, []); + const onPlatformViewChange = useCallback( + (view: LineagePlatformView) => { + setPlatformView(view); + if (view !== LineagePlatformView.None) { + setActiveLayer([]); + } + + if (isPlatformLineage) { + const searchData = QueryString.parse( + location.search.startsWith('?') + ? location.search.substring(1) + : location.search + ); + history.push({ + search: QueryString.stringify({ + ...searchData, + platformView: view !== LineagePlatformView.None ? view : undefined, + }), + }); + } + }, + [isPlatformLineage, location.search] + ); const exportLineageData = useCallback( async (_: string) => { @@ -596,7 +617,7 @@ const LineageProvider = ({ children }: LineageProviderProps) => { setEntityType(entityType); setIsPlatformLineage(isPlatformLineage ?? false); if (isPlatformLineage && !entity) { - setPlatformView(LineagePlatformView.Service); + onPlatformViewChange(LineagePlatformView.Service); } }, [] @@ -1527,6 +1548,7 @@ const LineageProvider = ({ children }: LineageProviderProps) => { dataQualityLineage, redraw, onPlatformViewChange, + dqHighlightedEdges, }; }, [ dataQualityLineage, @@ -1575,6 +1597,7 @@ const LineageProvider = ({ children }: LineageProviderProps) => { onExportClick, redraw, onPlatformViewChange, + dqHighlightedEdges, ]); useEffect(() => { @@ -1599,6 +1622,20 @@ const LineageProvider = ({ children }: LineageProviderProps) => { } }, [activeLayer, decodedFqn, lineageConfig]); + useEffect(() => { + if ( + dataQualityLineage?.nodes && + !isUndefined(edges) && + isUndefined(dqHighlightedEdges) + ) { + const edgesToHighlight = dataQualityLineage.nodes + .flatMap((dqNode) => getAllDownstreamEdges(dqNode.id, edges ?? [])) + .map((edge) => edge.id); + const edgesToHighlightSet = new Set(edgesToHighlight); + setDqHighlightedEdges(edgesToHighlightSet); + } + }, [dataQualityLineage, edges, dqHighlightedEdges]); + return (
{ const { t } = useTranslation(); + const location = useCustomLocation(); const history = useHistory(); + const queryParams = new URLSearchParams(location.search); + const platformView = + queryParams.get('platformView') ?? LineagePlatformView.Service; const { entityType } = useParams<{ entityType: EntityType }>(); const { fqn: decodedFqn } = useFqn(); const [selectedEntity, setSelectedEntity] = useState(); @@ -57,6 +63,16 @@ const PlatformLineage = () => { ); const [permissions, setPermissions] = useState(); + const handleEntitySelect = useCallback( + (value: EntityReference) => { + history.push( + `/lineage/${(value as SourceType).entityType}/${ + value.fullyQualifiedName + }` + ); + }, + [history] + ); const debouncedSearch = useCallback( debounce(async (value: string) => { try { @@ -94,17 +110,6 @@ const PlatformLineage = () => { [] ); - const handleEntitySelect = useCallback( - (value: EntityReference) => { - history.push( - `/lineage/${(value as SourceType).entityType}/${ - value.fullyQualifiedName - }` - ); - }, - [history] - ); - const init = useCallback(async () => { if (!decodedFqn || !entityType) { setDefaultValue(undefined); @@ -169,7 +174,14 @@ const PlatformLineage = () => { - +
diff --git a/openmetadata-ui/src/main/resources/ui/src/utils/EntityLineageUtils.tsx b/openmetadata-ui/src/main/resources/ui/src/utils/EntityLineageUtils.tsx index 546b4321ab4..73119a8eba3 100644 --- a/openmetadata-ui/src/main/resources/ui/src/utils/EntityLineageUtils.tsx +++ b/openmetadata-ui/src/main/resources/ui/src/utils/EntityLineageUtils.tsx @@ -1791,3 +1791,40 @@ export const getEntityTypeFromPlatformView = ( return 'service'; } }; + +/** + * Recursively finds all downstream edges from a given node in a graph. + * This function traverses the graph depth-first, collecting all edges that flow downstream + * from the specified node while avoiding cycles by tracking visited nodes. + * + * @param {string} nodeId - The ID of the starting node + * @param {Edge[]} edges - Array of all edges in the graph + * @param {Set} [visitedNodes=new Set()] - Set of already visited node IDs to prevent cycles + * @returns {Edge[]} Array of all downstream edges from the starting node + */ +export const getAllDownstreamEdges = ( + nodeId: string, + edges: Edge[], + visitedNodes: Set = new Set() +): Edge[] => { + // If we've already visited this node, return empty array to avoid cycles + if (visitedNodes.has(nodeId)) { + return []; + } + + visitedNodes.add(nodeId); + + // Get direct downstream edges + const directDownstreamEdges = edges.filter((edge) => edge.source === nodeId); + + // Get target nodes from direct downstream edges + const targetNodes = directDownstreamEdges.map((edge) => edge.target); + + // Recursively get downstream edges for each target node + const nestedDownstreamEdges = targetNodes.flatMap((targetNodeId) => + getAllDownstreamEdges(targetNodeId, edges, visitedNodes) + ); + + // Combine direct and nested downstream edges + return [...directDownstreamEdges, ...nestedDownstreamEdges]; +};