Incremental Lineage Processing (#20185)

This commit is contained in:
Mayur Singal 2025-03-12 21:21:28 +05:30 committed by ulixius9
parent 31112d46ee
commit 16b27b8124
22 changed files with 146 additions and 5 deletions

View File

@ -12,13 +12,17 @@
Custom wrapper for Lineage Request
"""
from typing import Optional
from typing import Optional, Type, TypeVar
from pydantic import BaseModel
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
T = TypeVar("T", bound=BaseModel)
class OMetaLineageRequest(BaseModel):
override_lineage: Optional[bool] = False
lineage_request: AddLineageRequest
entity_fqn: Optional[str] = None
entity: Optional[Type[T]] = None

View File

@ -391,6 +391,7 @@ class ESMixin(Generic[T]):
def yield_es_view_def(
self,
service_name: str,
incremental: bool = False,
) -> Iterable[TableView]:
"""
Get the view definition from ES
@ -451,6 +452,10 @@ class ESMixin(Generic[T]):
}
}
}
if incremental:
query.get("query").get("bool").get("must").append(
{"bool": {"should": [{"term": {"processedLineage": False}}]}}
)
query = json.dumps(query)
for response in self._paginate_es_internal(
entity=Table,

View File

@ -406,3 +406,24 @@ class OMetaLineageMixin(Generic[T]):
logger.error(
f"Error while adding lineage: {lineage_request.left.error}"
)
def patch_lineage_processed_flag(
self,
entity: Type[T],
fqn: str,
) -> None:
try:
original_entity = self.get_by_name(entity=entity, fqn=fqn)
if not original_entity:
return
updated_entity = original_entity.model_copy(deep=True)
updated_entity.processedLineage = True
self.patch(
entity=entity, source=original_entity, destination=updated_entity
)
except Exception as exc:
logger.debug(f"Error while patching lineage processed flag: {exc}")
logger.debug(traceback.format_exc())

View File

@ -288,7 +288,16 @@ class MetadataRestSink(Sink): # pylint: disable=too-many-public-methods
entity_id=str(add_lineage.lineage_request.edge.toEntity.id.root),
source=add_lineage.lineage_request.edge.lineageDetails.source.value,
)
return self._run_dispatch(add_lineage.lineage_request)
lineage_response = self._run_dispatch(add_lineage.lineage_request)
if (
lineage_response
and lineage_response.right is not None
and add_lineage.entity_fqn
and add_lineage.entity
):
self.metadata.patch_lineage_processed_flag(
entity=add_lineage.entity, fqn=add_lineage.entity_fqn
)
def _create_role(self, create_role: CreateRoleRequest) -> Optional[Role]:
"""

View File

@ -265,11 +265,22 @@ class LineageSource(QueryParserSource, ABC):
timeout_seconds=self.source_config.parsingTimeoutLimit,
):
if lineage.right is not None:
view_fqn = fqn.build(
metadata=self.metadata,
entity_type=Table,
service_name=self.service_name,
database_name=view.db_name,
schema_name=view.schema_name,
table_name=view.table_name,
skip_es_search=True,
)
queue.put(
Either(
right=OMetaLineageRequest(
lineage_request=lineage.right,
override_lineage=self.source_config.overrideViewLineage,
entity_fqn=view_fqn,
entity=Table,
)
)
)
@ -281,7 +292,11 @@ class LineageSource(QueryParserSource, ABC):
def yield_view_lineage(self) -> Iterable[Either[AddLineageRequest]]:
logger.info("Processing View Lineage")
producer_fn = partial(self.metadata.yield_es_view_def, self.config.serviceName)
producer_fn = partial(
self.metadata.yield_es_view_def,
self.config.serviceName,
self.source_config.incrementalLineageProcessing,
)
processor_fn = self.view_lineage_generator
yield from self.generate_lineage_in_thread(producer_fn, processor_fn)

View File

@ -38,6 +38,7 @@ from metadata.ingestion.api.models import Either
from metadata.ingestion.api.status import Status
from metadata.ingestion.lineage.models import ConnectionTypeDialectMapper
from metadata.ingestion.lineage.sql_lineage import get_lineage_by_query
from metadata.ingestion.models.ometa_lineage import OMetaLineageRequest
from metadata.ingestion.models.topology import Queue
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils.logger import ingestion_logger
@ -221,7 +222,19 @@ class StoredProcedureLineageMixin(ABC):
query_by_procedure=procedure_and_query.query_by_procedure,
procedure=procedure_and_query.procedure,
):
queue.put(lineage)
if lineage and lineage.right is not None:
queue.put(
Either(
right=OMetaLineageRequest(
override_lineage=False,
lineage_request=lineage.right,
entity=StoredProcedure,
entity_fqn=procedure_and_query.procedure.fullyQualifiedName.root,
)
)
)
else:
queue.put(lineage)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(
@ -260,8 +273,11 @@ class StoredProcedureLineageMixin(ABC):
}
}
}
if self.source_config.incrementalLineageProcessing:
query.get("query").get("bool").get("must").append(
{"bool": {"should": [{"term": {"processedLineage": False}}]}}
)
query_filter = json.dumps(query)
logger.info("Processing Lineage for Stored Procedures")
# First, get all the query history
queries_dict = self.get_stored_procedure_queries_dict()

View File

@ -156,8 +156,19 @@ public class StoredProcedureRepository extends EntityRepository<StoredProcedure>
original.getStoredProcedureType(),
updated.getStoredProcedureType());
}
updateProcessedLineage(original, updated);
recordChange(
"processedLineage", original.getProcessedLineage(), updated.getProcessedLineage());
recordChange("sourceUrl", original.getSourceUrl(), updated.getSourceUrl());
recordChange("sourceHash", original.getSourceHash(), updated.getSourceHash());
}
private void updateProcessedLineage(StoredProcedure origSP, StoredProcedure updatedSP) {
// if schema definition changes make processed lineage false
if (origSP.getProcessedLineage().booleanValue()
&& !origSP.getCode().equals(updatedSP.getCode())) {
updatedSP.setProcessedLineage(false);
}
}
}
}

View File

@ -1162,12 +1162,23 @@ public class TableRepository extends EntityRepository<Table> {
DatabaseUtil.validateColumns(updatedTable.getColumns());
recordChange("tableType", origTable.getTableType(), updatedTable.getTableType());
updateTableConstraints(origTable, updatedTable, operation);
updateProcessedLineage(origTable, updatedTable);
updateColumns(
COLUMN_FIELD, origTable.getColumns(), updated.getColumns(), EntityUtil.columnMatch);
recordChange("sourceUrl", original.getSourceUrl(), updated.getSourceUrl());
recordChange("retentionPeriod", original.getRetentionPeriod(), updated.getRetentionPeriod());
recordChange("sourceHash", original.getSourceHash(), updated.getSourceHash());
recordChange("locationPath", original.getLocationPath(), updated.getLocationPath());
recordChange(
"processedLineage", original.getProcessedLineage(), updated.getProcessedLineage());
}
private void updateProcessedLineage(Table origTable, Table updatedTable) {
// if schema definition changes make processed lineage false
if (origTable.getProcessedLineage().booleanValue()
&& !origTable.getSchemaDefinition().equals(updatedTable.getSchemaDefinition())) {
updatedTable.setProcessedLineage(false);
}
}
private void updateTableConstraints(Table origTable, Table updatedTable, Operation operation) {

View File

@ -33,6 +33,7 @@ public record StoredProcedureIndex(StoredProcedure storedProcedure) implements S
doc.put("lineage", SearchIndex.getLineageData(storedProcedure.getEntityReference()));
doc.put("tier", parseTags.getTierTag());
doc.put("service", getEntityWithDisplayName(storedProcedure.getService()));
doc.put("processedLineage", storedProcedure.getProcessedLineage());
return doc;
}

View File

@ -102,6 +102,7 @@ public record TableIndex(Table table) implements ColumnIndex {
doc.put("service", getEntityWithDisplayName(table.getService()));
doc.put("database", getEntityWithDisplayName(table.getDatabase()));
doc.put("lineage", SearchIndex.getLineageData(table.getEntityReference()));
doc.put("processedLineage", table.getProcessedLineage());
doc.put("entityRelationship", SearchIndex.populateEntityRelationshipData(table));
doc.put("databaseSchema", getEntityWithDisplayName(table.getDatabaseSchema()));
return doc;

View File

@ -314,6 +314,9 @@
"deleted": {
"type": "boolean"
},
"processedLineage": {
"type": "boolean"
},
"owners": {
"properties": {
"id": {

View File

@ -545,6 +545,9 @@
"deleted": {
"type": "boolean"
},
"processedLineage": {
"type": "boolean"
},
"followers": {
"type": "keyword"
},

View File

@ -306,6 +306,9 @@
"deleted": {
"type": "boolean"
},
"processedLineage": {
"type": "boolean"
},
"owners": {
"properties": {
"id": {

View File

@ -542,6 +542,9 @@
"deleted": {
"type": "boolean"
},
"processedLineage": {
"type": "boolean"
},
"followers": {
"type": "keyword"
},

View File

@ -298,6 +298,9 @@
"deleted": {
"type": "boolean"
},
"processedLineage": {
"type": "boolean"
},
"entityType": {
"type": "keyword"
},

View File

@ -535,6 +535,9 @@
"deleted": {
"type": "boolean"
},
"processedLineage": {
"type": "boolean"
},
"followers": {
"type": "keyword"
},

View File

@ -193,6 +193,11 @@
"type": "string",
"minLength": 1,
"maxLength": 32
},
"processedLineage": {
"description": "Processed lineage for the stored procedure",
"type": "boolean",
"default": false
}
},
"required": [

View File

@ -1146,6 +1146,11 @@
"type": "string",
"minLength": 1,
"maxLength": 32
},
"processedLineage": {
"description": "Processed lineage for the table",
"type": "boolean",
"default": false
}
},
"required": [

View File

@ -97,6 +97,12 @@
"description": "Handle Lineage for Snowflake Temporary and Transient Tables. ",
"type": "boolean",
"default": false
},
"incrementalLineageProcessing": {
"title": "Incremental Lineage Processing",
"description": "Set the 'Incremental Lineage Processing' toggle to control whether to process lineage incrementally.",
"type": "boolean",
"default": true
}
},
"additionalProperties": false

View File

@ -89,6 +89,10 @@ export interface StoredProcedure {
* Owners of this Stored Procedure.
*/
owners?: EntityReference[];
/**
* Processed lineage for the stored procedure
*/
processedLineage?: boolean;
/**
* Link to Database service this table is hosted in.
*/

View File

@ -114,6 +114,10 @@ export interface Table {
* Owners of this table.
*/
owners?: EntityReference[];
/**
* Processed lineage for the table
*/
processedLineage?: boolean;
/**
* Latest Data profile for a table.
*/

View File

@ -26,6 +26,11 @@ export interface DatabaseServiceQueryLineagePipeline {
* Configuration the condition to filter the query history.
*/
filterCondition?: string;
/**
* Set the 'Incremental Lineage Processing' toggle to control whether to process lineage
* incrementally.
*/
incrementalLineageProcessing?: boolean;
/**
* Set the 'Override View Lineage' toggle to control whether to override the existing view
* lineage.