From 1007204cda802f02a5639e074d95b634b2be9ddf Mon Sep 17 00:00:00 2001 From: Tamas Nemeth Date: Fri, 13 Oct 2023 21:07:19 +0200 Subject: [PATCH] feat(ingest/teradata): view parsing (#9005) --- .../docs/sources/teradata/teradata_pre.md | 2 +- .../docs/sources/teradata/teradata_recipe.yml | 3 +- .../datahub/ingestion/source/sql/teradata.py | 156 ++++++++++++------ 3 files changed, 106 insertions(+), 55 deletions(-) diff --git a/metadata-ingestion/docs/sources/teradata/teradata_pre.md b/metadata-ingestion/docs/sources/teradata/teradata_pre.md index eb59caa29e..7263a59f5e 100644 --- a/metadata-ingestion/docs/sources/teradata/teradata_pre.md +++ b/metadata-ingestion/docs/sources/teradata/teradata_pre.md @@ -18,7 +18,7 @@ If you want to run profiling, you need to grant select permission on all the tables you want to profile. -3. If linege or usage extraction is enabled, please, check if query logging is enabled and it is set to size which +3. If lineage or usage extraction is enabled, please, check if query logging is enabled and it is set to size which will fit for your queries (the default query text size Teradata captures is max 200 chars) An example how you can set it for all users: ```sql diff --git a/metadata-ingestion/docs/sources/teradata/teradata_recipe.yml b/metadata-ingestion/docs/sources/teradata/teradata_recipe.yml index 8cf07ba4c3..cc94de2011 100644 --- a/metadata-ingestion/docs/sources/teradata/teradata_recipe.yml +++ b/metadata-ingestion/docs/sources/teradata/teradata_recipe.yml @@ -3,12 +3,11 @@ source: type: teradata config: host_port: "myteradatainstance.teradata.com:1025" - #platform_instance: "myteradatainstance" username: myuser password: mypassword #database_pattern: # allow: - # - "demo_user" + # - "my_database" # ignoreCase: true include_table_lineage: true include_usage_statistics: true diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py b/metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py index dd11cd840b..6080cf7b37 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py @@ -1,5 +1,6 @@ import logging from dataclasses import dataclass +from datetime import datetime from typing import Iterable, Optional, Set, Union # This import verifies that the dependencies are available. @@ -11,6 +12,7 @@ from sqlalchemy.engine import Engine from datahub.configuration.common import AllowDenyPattern from datahub.configuration.time_window_config import BaseTimeWindowConfig +from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.emitter.sql_parsing_builder import SqlParsingBuilder from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.api.decorators import ( @@ -32,11 +34,18 @@ from datahub.ingestion.source.sql.two_tier_sql_source import ( from datahub.ingestion.source.usage.usage_common import BaseUsageConfig from datahub.ingestion.source_report.ingestion_stage import IngestionStageReport from datahub.ingestion.source_report.time_window import BaseTimeWindowReport +from datahub.metadata._schema_classes import ( + MetadataChangeEventClass, + SchemaMetadataClass, + ViewPropertiesClass, +) from datahub.metadata.com.linkedin.pegasus2avro.schema import ( BytesTypeClass, TimeTypeClass, ) +from datahub.utilities.file_backed_collections import FileBackedDict from datahub.utilities.sqlglot_lineage import SchemaResolver, sqlglot_lineage +from datahub.utilities.urns.dataset_urn import DatasetUrn logger: logging.Logger = logging.getLogger(__name__) @@ -64,6 +73,7 @@ register_custom_type(custom_types.XML, BytesTypeClass) @dataclass class TeradataReport(ProfilingSqlReport, IngestionStageReport, BaseTimeWindowReport): num_queries_parsed: int = 0 + num_view_ddl_parsed: int = 0 num_table_parse_failures: int = 0 @@ -82,17 +92,16 @@ class TeradataConfig(BaseTeradataConfig, BaseTimeWindowConfig): "This requires to have the table lineage feature enabled.", ) + include_view_lineage = Field( + default=True, + description="Whether to include view lineage in the ingestion. " + "This requires to have the view lineage feature enabled.", + ) usage: BaseUsageConfig = Field( description="The usage config to use when generating usage statistics", default=BaseUsageConfig(), ) - use_schema_resolver: bool = Field( - default=True, - description="Read SchemaMetadata aspects from DataHub to aid in SQL parsing. Turn off only for testing.", - hidden_from_docs=True, - ) - default_db: Optional[str] = Field( default=None, description="The default database to use for unqualified table names", @@ -141,46 +150,47 @@ class TeradataSource(TwoTierSQLAlchemySource): self.report: TeradataReport = TeradataReport() self.graph: Optional[DataHubGraph] = ctx.graph - if self.graph: - if self.config.use_schema_resolver: - self.schema_resolver = ( - self.graph.initialize_schema_resolver_from_datahub( - platform=self.platform, - platform_instance=self.config.platform_instance, - env=self.config.env, - ) - ) - self.urns = self.schema_resolver.get_urns() - else: - self.schema_resolver = self.graph._make_schema_resolver( - platform=self.platform, - platform_instance=self.config.platform_instance, - env=self.config.env, - ) - self.urns = None - else: - self.schema_resolver = SchemaResolver( - platform=self.platform, - platform_instance=self.config.platform_instance, - graph=None, - env=self.config.env, - ) - self.urns = None - self.builder: SqlParsingBuilder = SqlParsingBuilder( usage_config=self.config.usage if self.config.include_usage_statistics else None, - generate_lineage=self.config.include_table_lineage, + generate_lineage=True, generate_usage_statistics=self.config.include_usage_statistics, generate_operations=self.config.usage.include_operational_stats, ) + self.schema_resolver = SchemaResolver( + platform=self.platform, + platform_instance=self.config.platform_instance, + graph=None, + env=self.config.env, + ) + + self._view_definition_cache: FileBackedDict[str] = FileBackedDict() + @classmethod def create(cls, config_dict, ctx): config = TeradataConfig.parse_obj(config_dict) return cls(config, ctx) + def get_view_lineage(self) -> Iterable[MetadataWorkUnit]: + for key in self._view_definition_cache.keys(): + view_definition = self._view_definition_cache[key] + dataset_urn = DatasetUrn.create_from_string(key) + + db_name: Optional[str] = None + # We need to get the default db from the dataset urn otherwise the builder generates the wrong urns + if "." in dataset_urn.get_dataset_name(): + db_name = dataset_urn.get_dataset_name().split(".", 1)[0] + + self.report.num_view_ddl_parsed += 1 + if self.report.num_view_ddl_parsed % 1000 == 0: + logger.info(f"Parsed {self.report.num_queries_parsed} view ddl") + + yield from self.gen_lineage_from_query( + query=view_definition, default_database=db_name, is_view_ddl=True + ) + def get_audit_log_mcps(self) -> Iterable[MetadataWorkUnit]: engine = self.get_metadata_engine() for entry in engine.execute( @@ -192,27 +202,43 @@ class TeradataSource(TwoTierSQLAlchemySource): if self.report.num_queries_parsed % 1000 == 0: logger.info(f"Parsed {self.report.num_queries_parsed} queries") - result = sqlglot_lineage( - sql=entry.query, - schema_resolver=self.schema_resolver, - default_db=None, - default_schema=entry.default_database - if entry.default_database - else self.config.default_db, + yield from self.gen_lineage_from_query( + query=entry.query, + default_database=entry.default_database, + timestamp=entry.timestamp, + user=entry.user, + is_view_ddl=False, ) - if result.debug_info.table_error: - logger.debug( - f"Error parsing table lineage, {result.debug_info.table_error}" - ) - self.report.num_table_parse_failures += 1 - continue + def gen_lineage_from_query( + self, + query: str, + default_database: Optional[str] = None, + timestamp: Optional[datetime] = None, + user: Optional[str] = None, + is_view_ddl: bool = False, + ) -> Iterable[MetadataWorkUnit]: + result = sqlglot_lineage( + sql=query, + schema_resolver=self.schema_resolver, + default_db=None, + default_schema=default_database + if default_database + else self.config.default_db, + ) + if result.debug_info.table_error: + logger.debug( + f"Error parsing table lineage, {result.debug_info.table_error}" + ) + self.report.num_table_parse_failures += 1 + else: yield from self.builder.process_sql_parsing_result( result, - query=entry.query, - query_timestamp=entry.timestamp, - user=f"urn:li:corpuser:{entry.user}", - include_urns=self.urns, + query=query, + is_view_ddl=is_view_ddl, + query_timestamp=timestamp, + user=f"urn:li:corpuser:{user}", + include_urns=self.schema_resolver.get_urns(), ) def get_metadata_engine(self) -> Engine: @@ -221,8 +247,34 @@ class TeradataSource(TwoTierSQLAlchemySource): return create_engine(url, **self.config.options) def get_workunits_internal(self) -> Iterable[Union[MetadataWorkUnit, SqlWorkUnit]]: - yield from super().get_workunits_internal() + # Add all schemas to the schema resolver + for wu in super().get_workunits_internal(): + if isinstance(wu.metadata, MetadataChangeEventClass): + if wu.metadata.proposedSnapshot: + for aspect in wu.metadata.proposedSnapshot.aspects: + if isinstance(aspect, SchemaMetadataClass): + self.schema_resolver.add_schema_metadata( + wu.metadata.proposedSnapshot.urn, + aspect, + ) + break + if isinstance(wu.metadata, MetadataChangeProposalWrapper): + if ( + wu.metadata.entityUrn + and isinstance(wu.metadata.aspect, ViewPropertiesClass) + and wu.metadata.aspect.viewLogic + ): + self._view_definition_cache[ + wu.metadata.entityUrn + ] = wu.metadata.aspect.viewLogic + yield wu + + if self.config.include_view_lineage: + self.report.report_ingestion_stage_start("view lineage extraction") + yield from self.get_view_lineage() + if self.config.include_table_lineage or self.config.include_usage_statistics: self.report.report_ingestion_stage_start("audit log extraction") yield from self.get_audit_log_mcps() - yield from self.builder.gen_workunits() + + yield from self.builder.gen_workunits()