Fix #8884 & #9460 - View lineage happens in the lineage workflow (#9796)

* View lineage happens in the lineage workflow

* Format

* Remove table views from context

* lint
This commit is contained in:
Pere Miquel Brull 2023-01-18 18:32:40 +01:00 committed by GitHub
parent 6b43eefa82
commit e88fe2f559
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 89 additions and 108 deletions

View File

@ -27,7 +27,6 @@ from metadata.generated.schema.api.data.createDatabaseSchema import (
CreateDatabaseSchemaRequest,
)
from metadata.generated.schema.api.data.createTable import CreateTableRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.table import Table, TablePartition, TableType
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
@ -39,11 +38,6 @@ from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.lineage.parser import LineageParser
from metadata.ingestion.lineage.sql_lineage import (
get_lineage_by_query,
get_lineage_via_table_entity,
)
from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.connections import get_connection, get_test_connection_fn
@ -71,7 +65,6 @@ class TableNameAndType(BaseModel):
type_: TableType = TableType.Regular
# pylint: disable=too-many-public-methods
class CommonDbSourceService(
DatabaseServiceSource, SqlColumnHandlerMixin, SqlAlchemySource, ABC
):
@ -102,7 +95,6 @@ class CommonDbSourceService(
self._connection = None # Lazy init as well
self.table_constraints = None
self.database_source_state = set()
self.context.table_views = []
super().__init__()
def set_inspector(self, database_name: str) -> None:
@ -375,15 +367,6 @@ class CommonDbSourceService(
table_request.tableType = TableType.Partitioned.value
table_request.tablePartition = partition_details
if table_type == TableType.View or view_definition:
table_view = {
"table_name": table_name,
"table_type": table_type,
"schema_name": schema_name,
"db_name": db_name,
}
self.context.table_views.append(table_view)
yield table_request
self.register_record(table_request=table_request)
@ -392,58 +375,6 @@ class CommonDbSourceService(
logger.warning(f"Unexpected exception to yield table [{table_name}]: {exc}")
self.status.failures.append(f"{self.config.serviceName}.{table_name}")
def yield_view_lineage(self) -> Optional[Iterable[AddLineageRequest]]:
logger.info("Processing Lineage for Views")
for view in self.context.table_views:
table_name = view.get("table_name")
table_type = view.get("table_type")
schema_name = view.get("schema_name")
db_name = view.get("db_name")
table_fqn = fqn.build(
self.metadata,
entity_type=Table,
service_name=self.context.database_service.name.__root__,
database_name=db_name,
schema_name=schema_name,
table_name=table_name,
)
table_entity = self.metadata.get_by_name(
entity=Table,
fqn=table_fqn,
)
view_definition = self.get_view_definition(
table_type=table_type,
table_name=table_name,
schema_name=schema_name,
inspector=self.inspector,
)
try:
lineage_parser = LineageParser(view_definition)
if lineage_parser.source_tables and lineage_parser.target_tables:
yield from get_lineage_by_query(
self.metadata,
query=view_definition,
service_name=self.context.database_service.name.__root__,
database_name=db_name,
schema_name=schema_name,
) or []
else:
yield from get_lineage_via_table_entity(
self.metadata,
table_entity=table_entity,
service_name=self.context.database_service.name.__root__,
database_name=db_name,
schema_name=schema_name,
query=view_definition,
) or []
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(
f"Could not parse query [{view_definition}] ingesting lineage failed: {exc}"
)
def test_connection(self) -> None:
"""
Used a timed-bound function to test that the engine

View File

@ -23,7 +23,6 @@ from metadata.generated.schema.api.data.createDatabaseSchema import (
)
from metadata.generated.schema.api.data.createLocation import CreateLocationRequest
from metadata.generated.schema.api.data.createTable import CreateTableRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.api.services.createStorageService import (
CreateStorageServiceRequest,
)
@ -119,9 +118,6 @@ class DatabaseServiceTopology(ServiceTopology):
),
],
children=["database"],
post_process=[
"yield_view_lineage",
],
)
database = TopologyNode(
producer="get_database_names",
@ -302,13 +298,6 @@ class DatabaseServiceSource(
if self.source_config.includeTags:
yield from self.yield_tag(schema_name) or []
@abstractmethod
def yield_view_lineage(self) -> Optional[Iterable[AddLineageRequest]]:
"""
From topology.
Parses view definition to get lineage information
"""
@abstractmethod
def yield_table(
self, table_name_and_type: Tuple[str, TableType]

View File

@ -20,7 +20,6 @@ from metadata.generated.schema.api.data.createDatabaseSchema import (
CreateDatabaseSchemaRequest,
)
from metadata.generated.schema.api.data.createTable import CreateTableRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
from metadata.generated.schema.entity.data.table import (
Column,
@ -567,9 +566,6 @@ class DatalakeSource(DatabaseServiceSource): # pylint: disable=too-many-public-
return cols
def yield_view_lineage(self) -> Optional[Iterable[AddLineageRequest]]:
yield from []
def yield_tag(self, schema_name: str) -> Iterable[OMetaTagAndClassification]:
pass

View File

@ -23,7 +23,6 @@ from metadata.generated.schema.api.data.createDatabaseSchema import (
CreateDatabaseSchemaRequest,
)
from metadata.generated.schema.api.data.createTable import CreateTableRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
from metadata.generated.schema.entity.data.table import Column, Table, TableType
from metadata.generated.schema.entity.services.connections.database.deltaLakeConnection import (
@ -395,9 +394,6 @@ class DeltalakeSource(DatabaseServiceSource):
return parsed_columns
def yield_view_lineage(self) -> Optional[Iterable[AddLineageRequest]]:
yield from []
def yield_tag(self, schema_name: str) -> Iterable[OMetaTagAndClassification]:
pass

View File

@ -22,7 +22,6 @@ from metadata.generated.schema.api.data.createDatabaseSchema import (
CreateDatabaseSchemaRequest,
)
from metadata.generated.schema.api.data.createTable import CreateTableRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.table import Column, Table, TableType
from metadata.generated.schema.entity.services.connections.database.domoDatabaseConnection import (
DomoDatabaseConnection,
@ -190,9 +189,6 @@ class DomodatabaseSource(DatabaseServiceSource):
def yield_tag(self, schema_name: str) -> Iterable[OMetaTagAndClassification]:
pass
def yield_view_lineage(self) -> Optional[Iterable[AddLineageRequest]]:
yield from []
def standardize_table_name( # pylint: disable=unused-argument
self, schema: str, table: str
) -> str:

View File

@ -20,7 +20,6 @@ from metadata.generated.schema.api.data.createDatabaseSchema import (
CreateDatabaseSchemaRequest,
)
from metadata.generated.schema.api.data.createTable import CreateTableRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.table import Column, Table, TableType
from metadata.generated.schema.entity.services.connections.database.dynamoDBConnection import (
DynamoDBConnection,
@ -214,9 +213,6 @@ class DynamodbSource(DatabaseServiceSource):
logger.warning(f"Unexpected exception to yield table [{table_name}]: {exc}")
self.status.failures.append(f"{self.config.serviceName}.{table_name}")
def yield_view_lineage(self) -> Optional[Iterable[AddLineageRequest]]:
yield from []
def yield_tag(self, schema_name: str) -> Iterable[OMetaTagAndClassification]:
pass

View File

@ -20,7 +20,6 @@ from metadata.generated.schema.api.data.createDatabaseSchema import (
)
from metadata.generated.schema.api.data.createLocation import CreateLocationRequest
from metadata.generated.schema.api.data.createTable import CreateTableRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
from metadata.generated.schema.entity.data.location import Location, LocationType
@ -373,9 +372,6 @@ class GlueSource(DatabaseServiceSource):
def standardize_table_name(self, _: str, table: str) -> str:
return table[:128]
def yield_view_lineage(self) -> Optional[Iterable[AddLineageRequest]]:
yield from []
def yield_tag(self, schema_name: str) -> Iterable[OMetaTagAndClassification]:
pass

View File

@ -17,8 +17,14 @@ from abc import ABC
from typing import Iterable, Iterator, Optional
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.table import Table, TableType
from metadata.generated.schema.type.tableQuery import TableQuery
from metadata.ingestion.lineage.sql_lineage import get_lineage_by_query
from metadata.ingestion.lineage.parser import LineageParser
from metadata.ingestion.lineage.sql_lineage import (
get_lineage_by_query,
get_lineage_via_table_entity,
)
from metadata.ingestion.source.connections import get_connection
from metadata.ingestion.source.database.query_parser_source import QueryParserSource
from metadata.utils.logger import ingestion_logger
@ -88,11 +94,39 @@ class LineageSource(QueryParserSource, ABC):
logger.debug(traceback.format_exc())
logger.error(f"Source usage processing error: {exc}")
def next_record(self) -> Iterable[AddLineageRequest]:
def get_database_fqns(self):
"""
Based on the query logs, prepare the lineage
and send it to the sink
List all service databases
"""
return [
database.fullyQualifiedName.__root__
for database in self.metadata.list_all_entities(
entity=Database,
params={"service": self.config.serviceName},
)
]
def get_all_tables_from_api(self) -> Iterable[Table]:
"""
Get all the views from the API with their viewDefinition
"""
for database_fqn in self.get_database_fqns():
logger.info(f"Getting view lineage from {database_fqn}")
yield from self.metadata.list_all_entities(
entity=Table,
fields=[
"viewDefinition",
],
params={"service": self.config.serviceName, "database": database_fqn},
)
def yield_table_query(self):
"""
Basic generator getting lineage from
table queries that we pick up from the
service database
"""
logger.info("Processing query history lineage...")
for table_query in self.get_table_query():
lineages = get_lineage_by_query(
@ -105,3 +139,54 @@ class LineageSource(QueryParserSource, ABC):
for lineage_request in lineages or []:
yield lineage_request
def yield_view_ddl(self):
"""
Obtain the tables' viewDefinition to pick
up the lineage from its DDL
"""
logger.info("Processing view lineage...")
for table_entity in self.get_all_tables_from_api():
# We only pick up views whose viewDefinition is informed
if (
table_entity.tableType in {TableType.View, TableType.MaterializedView}
and table_entity.viewDefinition
and table_entity.viewDefinition.__root__
):
try:
lineage_parser = LineageParser(table_entity.viewDefinition.__root__)
if lineage_parser.source_tables and lineage_parser.target_tables:
yield from get_lineage_by_query(
self.metadata,
query=table_entity.viewDefinition.__root__,
service_name=self.config.serviceName,
database_name=table_entity.database.name,
schema_name=table_entity.databaseSchema.name,
) or []
else:
yield from get_lineage_via_table_entity(
self.metadata,
table_entity=table_entity,
service_name=self.config.serviceName,
database_name=table_entity.database.name,
schema_name=table_entity.databaseSchema.name,
query=table_entity.viewDefinition.__root__,
) or []
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(
f"Could not parse query [{table_entity.viewDefinition.__root__}]."
f" Ingesting lineage failed due to: {exc}"
)
def next_record(self) -> Iterable[AddLineageRequest]:
"""
Based on the query logs, prepare the lineage
and send it to the sink
"""
yield from self.yield_table_query()
yield from self.yield_view_ddl()

View File

@ -19,7 +19,6 @@ from metadata.generated.schema.api.data.createDatabaseSchema import (
CreateDatabaseSchemaRequest,
)
from metadata.generated.schema.api.data.createTable import CreateTableRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.table import (
Column,
Constraint,
@ -247,9 +246,6 @@ class SalesforceSource(DatabaseServiceSource):
return "INT"
return "VARCHAR"
def yield_view_lineage(self) -> Optional[Iterable[AddLineageRequest]]:
yield from []
def yield_tag(self, schema_name: str) -> Iterable[OMetaTagAndClassification]:
pass