From 16b27b8124e091dbfa72dcb2028ff2992908efcc Mon Sep 17 00:00:00 2001 From: Mayur Singal <39544459+ulixius9@users.noreply.github.com> Date: Wed, 12 Mar 2025 21:21:28 +0530 Subject: [PATCH] Incremental Lineage Processing (#20185) --- .../ingestion/models/ometa_lineage.py | 6 +++++- .../ingestion/ometa/mixins/es_mixin.py | 5 +++++ .../ingestion/ometa/mixins/lineage_mixin.py | 21 +++++++++++++++++++ .../metadata/ingestion/sink/metadata_rest.py | 11 +++++++++- .../source/database/lineage_source.py | 17 ++++++++++++++- .../database/stored_procedures_mixin.py | 20 ++++++++++++++++-- .../jdbi3/StoredProcedureRepository.java | 11 ++++++++++ .../service/jdbi3/TableRepository.java | 11 ++++++++++ .../search/indexes/StoredProcedureIndex.java | 1 + .../service/search/indexes/TableIndex.java | 1 + .../en/stored_procedure_index_mapping.json | 3 +++ .../elasticsearch/en/table_index_mapping.json | 3 +++ .../jp/stored_procedure_index_mapping.json | 3 +++ .../elasticsearch/jp/table_index_mapping.json | 3 +++ .../zh/stored_procedure_index_mapping.json | 3 +++ .../elasticsearch/zh/table_index_mapping.json | 3 +++ .../schema/entity/data/storedProcedure.json | 5 +++++ .../json/schema/entity/data/table.json | 5 +++++ .../databaseServiceQueryLineagePipeline.json | 6 ++++++ .../generated/entity/data/storedProcedure.ts | 4 ++++ .../ui/src/generated/entity/data/table.ts | 4 ++++ .../databaseServiceQueryLineagePipeline.ts | 5 +++++ 22 files changed, 146 insertions(+), 5 deletions(-) diff --git a/ingestion/src/metadata/ingestion/models/ometa_lineage.py b/ingestion/src/metadata/ingestion/models/ometa_lineage.py index e75cdc8d0a3..b4bb4028bf1 100644 --- a/ingestion/src/metadata/ingestion/models/ometa_lineage.py +++ b/ingestion/src/metadata/ingestion/models/ometa_lineage.py @@ -12,13 +12,17 @@ Custom wrapper for Lineage Request """ -from typing import Optional +from typing import Optional, Type, TypeVar from pydantic import BaseModel from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest +T = TypeVar("T", bound=BaseModel) + class OMetaLineageRequest(BaseModel): override_lineage: Optional[bool] = False lineage_request: AddLineageRequest + entity_fqn: Optional[str] = None + entity: Optional[Type[T]] = None diff --git a/ingestion/src/metadata/ingestion/ometa/mixins/es_mixin.py b/ingestion/src/metadata/ingestion/ometa/mixins/es_mixin.py index fbd84b4e92e..2f34c1e65bd 100644 --- a/ingestion/src/metadata/ingestion/ometa/mixins/es_mixin.py +++ b/ingestion/src/metadata/ingestion/ometa/mixins/es_mixin.py @@ -391,6 +391,7 @@ class ESMixin(Generic[T]): def yield_es_view_def( self, service_name: str, + incremental: bool = False, ) -> Iterable[TableView]: """ Get the view definition from ES @@ -451,6 +452,10 @@ class ESMixin(Generic[T]): } } } + if incremental: + query.get("query").get("bool").get("must").append( + {"bool": {"should": [{"term": {"processedLineage": False}}]}} + ) query = json.dumps(query) for response in self._paginate_es_internal( entity=Table, diff --git a/ingestion/src/metadata/ingestion/ometa/mixins/lineage_mixin.py b/ingestion/src/metadata/ingestion/ometa/mixins/lineage_mixin.py index 5adc2c18296..4533bf0f693 100644 --- a/ingestion/src/metadata/ingestion/ometa/mixins/lineage_mixin.py +++ b/ingestion/src/metadata/ingestion/ometa/mixins/lineage_mixin.py @@ -406,3 +406,24 @@ class OMetaLineageMixin(Generic[T]): logger.error( f"Error while adding lineage: {lineage_request.left.error}" ) + + def patch_lineage_processed_flag( + self, + entity: Type[T], + fqn: str, + ) -> None: + + try: + original_entity = self.get_by_name(entity=entity, fqn=fqn) + if not original_entity: + return + + updated_entity = original_entity.model_copy(deep=True) + updated_entity.processedLineage = True + + self.patch( + entity=entity, source=original_entity, destination=updated_entity + ) + except Exception as exc: + logger.debug(f"Error while patching lineage processed flag: {exc}") + logger.debug(traceback.format_exc()) diff --git a/ingestion/src/metadata/ingestion/sink/metadata_rest.py b/ingestion/src/metadata/ingestion/sink/metadata_rest.py index ad8d2eefe4e..84666a4c290 100644 --- a/ingestion/src/metadata/ingestion/sink/metadata_rest.py +++ b/ingestion/src/metadata/ingestion/sink/metadata_rest.py @@ -288,7 +288,16 @@ class MetadataRestSink(Sink): # pylint: disable=too-many-public-methods entity_id=str(add_lineage.lineage_request.edge.toEntity.id.root), source=add_lineage.lineage_request.edge.lineageDetails.source.value, ) - return self._run_dispatch(add_lineage.lineage_request) + lineage_response = self._run_dispatch(add_lineage.lineage_request) + if ( + lineage_response + and lineage_response.right is not None + and add_lineage.entity_fqn + and add_lineage.entity + ): + self.metadata.patch_lineage_processed_flag( + entity=add_lineage.entity, fqn=add_lineage.entity_fqn + ) def _create_role(self, create_role: CreateRoleRequest) -> Optional[Role]: """ diff --git a/ingestion/src/metadata/ingestion/source/database/lineage_source.py b/ingestion/src/metadata/ingestion/source/database/lineage_source.py index b3105b8f5ed..e6bac271dd4 100644 --- a/ingestion/src/metadata/ingestion/source/database/lineage_source.py +++ b/ingestion/src/metadata/ingestion/source/database/lineage_source.py @@ -265,11 +265,22 @@ class LineageSource(QueryParserSource, ABC): timeout_seconds=self.source_config.parsingTimeoutLimit, ): if lineage.right is not None: + view_fqn = fqn.build( + metadata=self.metadata, + entity_type=Table, + service_name=self.service_name, + database_name=view.db_name, + schema_name=view.schema_name, + table_name=view.table_name, + skip_es_search=True, + ) queue.put( Either( right=OMetaLineageRequest( lineage_request=lineage.right, override_lineage=self.source_config.overrideViewLineage, + entity_fqn=view_fqn, + entity=Table, ) ) ) @@ -281,7 +292,11 @@ class LineageSource(QueryParserSource, ABC): def yield_view_lineage(self) -> Iterable[Either[AddLineageRequest]]: logger.info("Processing View Lineage") - producer_fn = partial(self.metadata.yield_es_view_def, self.config.serviceName) + producer_fn = partial( + self.metadata.yield_es_view_def, + self.config.serviceName, + self.source_config.incrementalLineageProcessing, + ) processor_fn = self.view_lineage_generator yield from self.generate_lineage_in_thread(producer_fn, processor_fn) diff --git a/ingestion/src/metadata/ingestion/source/database/stored_procedures_mixin.py b/ingestion/src/metadata/ingestion/source/database/stored_procedures_mixin.py index 7dba89daefd..ac67b7b4a0d 100644 --- a/ingestion/src/metadata/ingestion/source/database/stored_procedures_mixin.py +++ b/ingestion/src/metadata/ingestion/source/database/stored_procedures_mixin.py @@ -38,6 +38,7 @@ from metadata.ingestion.api.models import Either from metadata.ingestion.api.status import Status from metadata.ingestion.lineage.models import ConnectionTypeDialectMapper from metadata.ingestion.lineage.sql_lineage import get_lineage_by_query +from metadata.ingestion.models.ometa_lineage import OMetaLineageRequest from metadata.ingestion.models.topology import Queue from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.utils.logger import ingestion_logger @@ -221,7 +222,19 @@ class StoredProcedureLineageMixin(ABC): query_by_procedure=procedure_and_query.query_by_procedure, procedure=procedure_and_query.procedure, ): - queue.put(lineage) + if lineage and lineage.right is not None: + queue.put( + Either( + right=OMetaLineageRequest( + override_lineage=False, + lineage_request=lineage.right, + entity=StoredProcedure, + entity_fqn=procedure_and_query.procedure.fullyQualifiedName.root, + ) + ) + ) + else: + queue.put(lineage) except Exception as exc: logger.debug(traceback.format_exc()) logger.warning( @@ -260,8 +273,11 @@ class StoredProcedureLineageMixin(ABC): } } } + if self.source_config.incrementalLineageProcessing: + query.get("query").get("bool").get("must").append( + {"bool": {"should": [{"term": {"processedLineage": False}}]}} + ) query_filter = json.dumps(query) - logger.info("Processing Lineage for Stored Procedures") # First, get all the query history queries_dict = self.get_stored_procedure_queries_dict() diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/StoredProcedureRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/StoredProcedureRepository.java index 313e62a92f9..dd6d40672e5 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/StoredProcedureRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/StoredProcedureRepository.java @@ -156,8 +156,19 @@ public class StoredProcedureRepository extends EntityRepository original.getStoredProcedureType(), updated.getStoredProcedureType()); } + updateProcessedLineage(original, updated); + recordChange( + "processedLineage", original.getProcessedLineage(), updated.getProcessedLineage()); recordChange("sourceUrl", original.getSourceUrl(), updated.getSourceUrl()); recordChange("sourceHash", original.getSourceHash(), updated.getSourceHash()); } + + private void updateProcessedLineage(StoredProcedure origSP, StoredProcedure updatedSP) { + // if schema definition changes make processed lineage false + if (origSP.getProcessedLineage().booleanValue() + && !origSP.getCode().equals(updatedSP.getCode())) { + updatedSP.setProcessedLineage(false); + } + } } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TableRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TableRepository.java index 308b7adbd8e..e069eaf00b7 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TableRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TableRepository.java @@ -1162,12 +1162,23 @@ public class TableRepository extends EntityRepository { DatabaseUtil.validateColumns(updatedTable.getColumns()); recordChange("tableType", origTable.getTableType(), updatedTable.getTableType()); updateTableConstraints(origTable, updatedTable, operation); + updateProcessedLineage(origTable, updatedTable); updateColumns( COLUMN_FIELD, origTable.getColumns(), updated.getColumns(), EntityUtil.columnMatch); recordChange("sourceUrl", original.getSourceUrl(), updated.getSourceUrl()); recordChange("retentionPeriod", original.getRetentionPeriod(), updated.getRetentionPeriod()); recordChange("sourceHash", original.getSourceHash(), updated.getSourceHash()); recordChange("locationPath", original.getLocationPath(), updated.getLocationPath()); + recordChange( + "processedLineage", original.getProcessedLineage(), updated.getProcessedLineage()); + } + + private void updateProcessedLineage(Table origTable, Table updatedTable) { + // if schema definition changes make processed lineage false + if (origTable.getProcessedLineage().booleanValue() + && !origTable.getSchemaDefinition().equals(updatedTable.getSchemaDefinition())) { + updatedTable.setProcessedLineage(false); + } } private void updateTableConstraints(Table origTable, Table updatedTable, Operation operation) { diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/StoredProcedureIndex.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/StoredProcedureIndex.java index 06394422581..e2675025d23 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/StoredProcedureIndex.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/StoredProcedureIndex.java @@ -33,6 +33,7 @@ public record StoredProcedureIndex(StoredProcedure storedProcedure) implements S doc.put("lineage", SearchIndex.getLineageData(storedProcedure.getEntityReference())); doc.put("tier", parseTags.getTierTag()); doc.put("service", getEntityWithDisplayName(storedProcedure.getService())); + doc.put("processedLineage", storedProcedure.getProcessedLineage()); return doc; } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/TableIndex.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/TableIndex.java index 16134f36443..7cf3b4d7bdb 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/TableIndex.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/TableIndex.java @@ -102,6 +102,7 @@ public record TableIndex(Table table) implements ColumnIndex { doc.put("service", getEntityWithDisplayName(table.getService())); doc.put("database", getEntityWithDisplayName(table.getDatabase())); doc.put("lineage", SearchIndex.getLineageData(table.getEntityReference())); + doc.put("processedLineage", table.getProcessedLineage()); doc.put("entityRelationship", SearchIndex.populateEntityRelationshipData(table)); doc.put("databaseSchema", getEntityWithDisplayName(table.getDatabaseSchema())); return doc; diff --git a/openmetadata-service/src/main/resources/elasticsearch/en/stored_procedure_index_mapping.json b/openmetadata-service/src/main/resources/elasticsearch/en/stored_procedure_index_mapping.json index 7f630cd96a3..3422fd2e709 100644 --- a/openmetadata-service/src/main/resources/elasticsearch/en/stored_procedure_index_mapping.json +++ b/openmetadata-service/src/main/resources/elasticsearch/en/stored_procedure_index_mapping.json @@ -314,6 +314,9 @@ "deleted": { "type": "boolean" }, + "processedLineage": { + "type": "boolean" + }, "owners": { "properties": { "id": { diff --git a/openmetadata-service/src/main/resources/elasticsearch/en/table_index_mapping.json b/openmetadata-service/src/main/resources/elasticsearch/en/table_index_mapping.json index 852168dd543..519547c2d2e 100644 --- a/openmetadata-service/src/main/resources/elasticsearch/en/table_index_mapping.json +++ b/openmetadata-service/src/main/resources/elasticsearch/en/table_index_mapping.json @@ -545,6 +545,9 @@ "deleted": { "type": "boolean" }, + "processedLineage": { + "type": "boolean" + }, "followers": { "type": "keyword" }, diff --git a/openmetadata-service/src/main/resources/elasticsearch/jp/stored_procedure_index_mapping.json b/openmetadata-service/src/main/resources/elasticsearch/jp/stored_procedure_index_mapping.json index 74dc0afdec8..75321009a3c 100644 --- a/openmetadata-service/src/main/resources/elasticsearch/jp/stored_procedure_index_mapping.json +++ b/openmetadata-service/src/main/resources/elasticsearch/jp/stored_procedure_index_mapping.json @@ -306,6 +306,9 @@ "deleted": { "type": "boolean" }, + "processedLineage": { + "type": "boolean" + }, "owners": { "properties": { "id": { diff --git a/openmetadata-service/src/main/resources/elasticsearch/jp/table_index_mapping.json b/openmetadata-service/src/main/resources/elasticsearch/jp/table_index_mapping.json index 91053db2443..5aa45c3d232 100644 --- a/openmetadata-service/src/main/resources/elasticsearch/jp/table_index_mapping.json +++ b/openmetadata-service/src/main/resources/elasticsearch/jp/table_index_mapping.json @@ -542,6 +542,9 @@ "deleted": { "type": "boolean" }, + "processedLineage": { + "type": "boolean" + }, "followers": { "type": "keyword" }, diff --git a/openmetadata-service/src/main/resources/elasticsearch/zh/stored_procedure_index_mapping.json b/openmetadata-service/src/main/resources/elasticsearch/zh/stored_procedure_index_mapping.json index b658476dbfb..e306b07912c 100644 --- a/openmetadata-service/src/main/resources/elasticsearch/zh/stored_procedure_index_mapping.json +++ b/openmetadata-service/src/main/resources/elasticsearch/zh/stored_procedure_index_mapping.json @@ -298,6 +298,9 @@ "deleted": { "type": "boolean" }, + "processedLineage": { + "type": "boolean" + }, "entityType": { "type": "keyword" }, diff --git a/openmetadata-service/src/main/resources/elasticsearch/zh/table_index_mapping.json b/openmetadata-service/src/main/resources/elasticsearch/zh/table_index_mapping.json index 2aa220c986b..d665c0d1802 100644 --- a/openmetadata-service/src/main/resources/elasticsearch/zh/table_index_mapping.json +++ b/openmetadata-service/src/main/resources/elasticsearch/zh/table_index_mapping.json @@ -535,6 +535,9 @@ "deleted": { "type": "boolean" }, + "processedLineage": { + "type": "boolean" + }, "followers": { "type": "keyword" }, diff --git a/openmetadata-spec/src/main/resources/json/schema/entity/data/storedProcedure.json b/openmetadata-spec/src/main/resources/json/schema/entity/data/storedProcedure.json index 7e56ee587bd..4f2db0e254e 100644 --- a/openmetadata-spec/src/main/resources/json/schema/entity/data/storedProcedure.json +++ b/openmetadata-spec/src/main/resources/json/schema/entity/data/storedProcedure.json @@ -193,6 +193,11 @@ "type": "string", "minLength": 1, "maxLength": 32 + }, + "processedLineage": { + "description": "Processed lineage for the stored procedure", + "type": "boolean", + "default": false } }, "required": [ diff --git a/openmetadata-spec/src/main/resources/json/schema/entity/data/table.json b/openmetadata-spec/src/main/resources/json/schema/entity/data/table.json index 9c99fffc38a..c79b44d4e99 100644 --- a/openmetadata-spec/src/main/resources/json/schema/entity/data/table.json +++ b/openmetadata-spec/src/main/resources/json/schema/entity/data/table.json @@ -1146,6 +1146,11 @@ "type": "string", "minLength": 1, "maxLength": 32 + }, + "processedLineage": { + "description": "Processed lineage for the table", + "type": "boolean", + "default": false } }, "required": [ diff --git a/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/databaseServiceQueryLineagePipeline.json b/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/databaseServiceQueryLineagePipeline.json index d53b197af74..ed7e46c9afd 100644 --- a/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/databaseServiceQueryLineagePipeline.json +++ b/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/databaseServiceQueryLineagePipeline.json @@ -97,6 +97,12 @@ "description": "Handle Lineage for Snowflake Temporary and Transient Tables. ", "type": "boolean", "default": false + }, + "incrementalLineageProcessing": { + "title": "Incremental Lineage Processing", + "description": "Set the 'Incremental Lineage Processing' toggle to control whether to process lineage incrementally.", + "type": "boolean", + "default": true } }, "additionalProperties": false diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/entity/data/storedProcedure.ts b/openmetadata-ui/src/main/resources/ui/src/generated/entity/data/storedProcedure.ts index 4bb31206f91..380447d6729 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/entity/data/storedProcedure.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/entity/data/storedProcedure.ts @@ -89,6 +89,10 @@ export interface StoredProcedure { * Owners of this Stored Procedure. */ owners?: EntityReference[]; + /** + * Processed lineage for the stored procedure + */ + processedLineage?: boolean; /** * Link to Database service this table is hosted in. */ diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/entity/data/table.ts b/openmetadata-ui/src/main/resources/ui/src/generated/entity/data/table.ts index fef3ef2cfc0..76d77c32b88 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/entity/data/table.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/entity/data/table.ts @@ -114,6 +114,10 @@ export interface Table { * Owners of this table. */ owners?: EntityReference[]; + /** + * Processed lineage for the table + */ + processedLineage?: boolean; /** * Latest Data profile for a table. */ diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/databaseServiceQueryLineagePipeline.ts b/openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/databaseServiceQueryLineagePipeline.ts index f3c13ba4058..3d0a79b359a 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/databaseServiceQueryLineagePipeline.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/databaseServiceQueryLineagePipeline.ts @@ -26,6 +26,11 @@ export interface DatabaseServiceQueryLineagePipeline { * Configuration the condition to filter the query history. */ filterCondition?: string; + /** + * Set the 'Incremental Lineage Processing' toggle to control whether to process lineage + * incrementally. + */ + incrementalLineageProcessing?: boolean; /** * Set the 'Override View Lineage' toggle to control whether to override the existing view * lineage.