diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py index f37f5358f9..cbd22b689e 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py @@ -39,6 +39,10 @@ from datahub.ingestion.source.bigquery_v2.common import ( ) from datahub.ingestion.source.bigquery_v2.lineage import BigqueryLineageExtractor from datahub.ingestion.source.bigquery_v2.profiler import BigqueryProfiler +from datahub.ingestion.source.bigquery_v2.queries_extractor import ( + BigQueryQueriesExtractor, + BigQueryQueriesExtractorConfig, +) from datahub.ingestion.source.bigquery_v2.usage import BigQueryUsageExtractor from datahub.ingestion.source.state.profiling_state_handler import ProfilingHandler from datahub.ingestion.source.state.redundant_run_skip_handler import ( @@ -51,6 +55,7 @@ from datahub.ingestion.source.state.stale_entity_removal_handler import ( from datahub.ingestion.source.state.stateful_ingestion_base import ( StatefulIngestionSourceBase, ) +from datahub.ingestion.source_report.ingestion_stage import QUERIES_EXTRACTION from datahub.sql_parsing.schema_resolver import SchemaResolver from datahub.utilities.registries.domain_registry import DomainRegistry @@ -139,6 +144,7 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource): self.lineage_extractor = BigqueryLineageExtractor( config, self.report, + schema_resolver=self.sql_parser_schema_resolver, identifiers=self.identifiers, redundant_run_skip_handler=redundant_lineage_run_skip_handler, ) @@ -196,7 +202,9 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource): def _init_schema_resolver(self) -> SchemaResolver: schema_resolution_required = ( - self.config.lineage_parse_view_ddl or self.config.lineage_use_sql_parser + self.config.use_queries_v2 + or self.config.lineage_parse_view_ddl + or self.config.lineage_use_sql_parser ) schema_ingestion_enabled = ( self.config.include_schema_metadata @@ -244,22 +252,54 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource): for project in projects: yield from self.bq_schema_extractor.get_project_workunits(project) - if self.config.include_usage_statistics: - yield from self.usage_extractor.get_usage_workunits( - [p.id for p in projects], self.bq_schema_extractor.table_refs - ) + if self.config.use_queries_v2: + self.report.set_ingestion_stage("*", "View and Snapshot Lineage") - if self.config.include_table_lineage: - yield from self.lineage_extractor.get_lineage_workunits( + yield from self.lineage_extractor.get_lineage_workunits_for_views_and_snapshots( [p.id for p in projects], - self.sql_parser_schema_resolver, self.bq_schema_extractor.view_refs_by_project, self.bq_schema_extractor.view_definitions, self.bq_schema_extractor.snapshot_refs_by_project, self.bq_schema_extractor.snapshots_by_ref, - self.bq_schema_extractor.table_refs, ) + self.report.set_ingestion_stage("*", QUERIES_EXTRACTION) + + queries_extractor = BigQueryQueriesExtractor( + connection=self.config.get_bigquery_client(), + schema_api=self.bq_schema_extractor.schema_api, + config=BigQueryQueriesExtractorConfig( + window=self.config, + user_email_pattern=self.config.usage.user_email_pattern, + include_lineage=self.config.include_table_lineage, + include_usage_statistics=self.config.include_usage_statistics, + include_operations=self.config.usage.include_operational_stats, + top_n_queries=self.config.usage.top_n_queries, + ), + structured_report=self.report, + filters=self.filters, + identifiers=self.identifiers, + schema_resolver=self.sql_parser_schema_resolver, + discovered_tables=self.bq_schema_extractor.table_refs, + ) + self.report.queries_extractor = queries_extractor.report + yield from queries_extractor.get_workunits_internal() + else: + if self.config.include_usage_statistics: + yield from self.usage_extractor.get_usage_workunits( + [p.id for p in projects], self.bq_schema_extractor.table_refs + ) + + if self.config.include_table_lineage: + yield from self.lineage_extractor.get_lineage_workunits( + [p.id for p in projects], + self.bq_schema_extractor.view_refs_by_project, + self.bq_schema_extractor.view_definitions, + self.bq_schema_extractor.snapshot_refs_by_project, + self.bq_schema_extractor.snapshots_by_ref, + self.bq_schema_extractor.table_refs, + ) + def get_report(self) -> BigQueryV2Report: return self.report diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py index c5a8b2ab7f..cfbefa5bff 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py @@ -404,6 +404,11 @@ class BigQueryV2Config( "enabled.", ) + use_queries_v2: bool = Field( + default=False, + description="If enabled, uses the new queries extractor to extract queries from bigquery.", + ) + @property def have_table_data_read_permission(self) -> bool: return self.use_tables_list_query_v2 or self.is_profiling_enabled() diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_queries.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_queries.py index fffb5cfc8a..ed27aae19c 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_queries.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_queries.py @@ -15,6 +15,7 @@ from datahub.ingestion.source.bigquery_v2.bigquery_config import ( BigQueryIdentifierConfig, ) from datahub.ingestion.source.bigquery_v2.bigquery_report import ( + BigQueryQueriesExtractorReport, BigQuerySchemaApiPerfReport, ) from datahub.ingestion.source.bigquery_v2.bigquery_schema import BigQuerySchemaApi @@ -25,7 +26,6 @@ from datahub.ingestion.source.bigquery_v2.common import ( from datahub.ingestion.source.bigquery_v2.queries_extractor import ( BigQueryQueriesExtractor, BigQueryQueriesExtractorConfig, - BigQueryQueriesExtractorReport, ) logger = logging.getLogger(__name__) diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_report.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_report.py index d68468fd56..b333bcf695 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_report.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_report.py @@ -58,6 +58,18 @@ class BigQueryProcessingPerfReport(Report): usage_state_size: Optional[str] = None +@dataclass +class BigQueryQueriesExtractorReport(Report): + query_log_fetch_timer: PerfTimer = field(default_factory=PerfTimer) + audit_log_preprocessing_timer: PerfTimer = field(default_factory=PerfTimer) + audit_log_load_timer: PerfTimer = field(default_factory=PerfTimer) + sql_aggregator: Optional[SqlAggregatorReport] = None + num_queries_by_project: TopKDict[str, int] = field(default_factory=int_top_k_dict) + + num_total_queries: int = 0 + num_unique_queries: int = 0 + + @dataclass class BigQueryV2Report( ProfilingSqlReport, @@ -143,10 +155,8 @@ class BigQueryV2Report( snapshots_scanned: int = 0 - num_view_definitions_parsed: int = 0 - num_view_definitions_failed_parsing: int = 0 - num_view_definitions_failed_column_parsing: int = 0 - view_definitions_parsing_failures: LossyList[str] = field(default_factory=LossyList) + # view lineage + sql_aggregator: Optional[SqlAggregatorReport] = None read_reasons_stat: Counter[str] = field(default_factory=collections.Counter) operation_types_stat: Counter[str] = field(default_factory=collections.Counter) @@ -171,8 +181,7 @@ class BigQueryV2Report( usage_end_time: Optional[datetime] = None stateful_usage_ingestion_enabled: bool = False - # lineage/usage v2 - sql_aggregator: Optional[SqlAggregatorReport] = None + queries_extractor: Optional[BigQueryQueriesExtractorReport] = None def set_ingestion_stage(self, project_id: str, stage: str) -> None: self.report_ingestion_stage_start(f"{project_id}: {stage}") diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_test_connection.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_test_connection.py index d0f111f451..27beb7b025 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_test_connection.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_test_connection.py @@ -137,7 +137,10 @@ class BigQueryTestConnection: report: BigQueryV2Report, ) -> CapabilityReport: lineage_extractor = BigqueryLineageExtractor( - connection_conf, report, BigQueryIdentifierBuilder(connection_conf, report) + connection_conf, + report, + schema_resolver=SchemaResolver(platform="bigquery"), + identifiers=BigQueryIdentifierBuilder(connection_conf, report), ) for project_id in project_ids: try: diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py index 16d472d4de..c9d0738bea 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py @@ -24,6 +24,7 @@ from google.cloud.logging_v2.client import Client as GCPLoggingClient from datahub.configuration.pattern_utils import is_schema_allowed from datahub.emitter import mce_builder from datahub.emitter.mcp import MetadataChangeProposalWrapper +from datahub.ingestion.api.source_helpers import auto_workunit from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.ingestion.source.bigquery_v2.bigquery_audit import ( AuditLogEntry, @@ -53,6 +54,7 @@ from datahub.ingestion.source.bigquery_v2.queries import ( from datahub.ingestion.source.state.redundant_run_skip_handler import ( RedundantLineageRunSkipHandler, ) +from datahub.ingestion.source_report.ingestion_stage import LINEAGE_EXTRACTION from datahub.metadata.schema_classes import ( AuditStampClass, DatasetLineageTypeClass, @@ -63,6 +65,7 @@ from datahub.metadata.schema_classes import ( UpstreamLineageClass, ) from datahub.sql_parsing.schema_resolver import SchemaResolver +from datahub.sql_parsing.sql_parsing_aggregator import SqlParsingAggregator from datahub.sql_parsing.sqlglot_lineage import SqlParsingResult, sqlglot_lineage from datahub.utilities import memory_footprint from datahub.utilities.file_backed_collections import FileBackedDict @@ -201,38 +204,20 @@ def make_lineage_edges_from_parsing_result( return list(table_edges.values()) -def make_lineage_edge_for_snapshot( - snapshot: BigqueryTableSnapshot, -) -> Optional[LineageEdge]: - if snapshot.base_table_identifier: - base_table_name = str( - BigQueryTableRef.from_bigquery_table(snapshot.base_table_identifier) - ) - return LineageEdge( - table=base_table_name, - column_mapping=frozenset( - LineageEdgeColumnMapping( - out_column=column.field_path, - in_columns=frozenset([column.field_path]), - ) - for column in snapshot.columns - ), - auditStamp=datetime.now(timezone.utc), - type=DatasetLineageTypeClass.TRANSFORMED, - ) - return None - - class BigqueryLineageExtractor: def __init__( self, config: BigQueryV2Config, report: BigQueryV2Report, + *, + schema_resolver: SchemaResolver, identifiers: BigQueryIdentifierBuilder, redundant_run_skip_handler: Optional[RedundantLineageRunSkipHandler] = None, ): self.config = config self.report = report + self.schema_resolver = schema_resolver + self.identifiers = identifiers self.audit_log_api = BigQueryAuditLogApi( report.audit_log_api_perf, @@ -246,6 +231,23 @@ class BigqueryLineageExtractor: self.report.lineage_end_time, ) = self.get_time_window() + self.datasets_skip_audit_log_lineage: Set[str] = set() + + self.aggregator = SqlParsingAggregator( + platform=self.identifiers.platform, + platform_instance=self.identifiers.identifier_config.platform_instance, + env=self.identifiers.identifier_config.env, + schema_resolver=self.schema_resolver, + eager_graph_load=False, + generate_lineage=True, + generate_queries=True, + generate_usage_statistics=False, + generate_query_usage_statistics=False, + generate_operations=False, + format_queries=True, + ) + self.report.sql_aggregator = self.aggregator.report + def get_time_window(self) -> Tuple[datetime, datetime]: if self.redundant_run_skip_handler: return self.redundant_run_skip_handler.suggest_run_time_window( @@ -271,10 +273,46 @@ class BigqueryLineageExtractor: return True + def get_lineage_workunits_for_views_and_snapshots( + self, + projects: List[str], + view_refs_by_project: Dict[str, Set[str]], + view_definitions: FileBackedDict[str], + snapshot_refs_by_project: Dict[str, Set[str]], + snapshots_by_ref: FileBackedDict[BigqueryTableSnapshot], + ) -> Iterable[MetadataWorkUnit]: + for project in projects: + if self.config.lineage_parse_view_ddl: + for view in view_refs_by_project[project]: + self.datasets_skip_audit_log_lineage.add(view) + self.aggregator.add_view_definition( + view_urn=self.identifiers.gen_dataset_urn_from_raw_ref( + BigQueryTableRef.from_string_name(view) + ), + view_definition=view_definitions[view], + default_db=project, + ) + + for snapshot_ref in snapshot_refs_by_project[project]: + snapshot = snapshots_by_ref[snapshot_ref] + if not snapshot.base_table_identifier: + continue + self.datasets_skip_audit_log_lineage.add(snapshot_ref) + snapshot_urn = self.identifiers.gen_dataset_urn_from_raw_ref( + BigQueryTableRef.from_string_name(snapshot_ref) + ) + base_table_urn = self.identifiers.gen_dataset_urn_from_raw_ref( + BigQueryTableRef(snapshot.base_table_identifier) + ) + self.aggregator.add_known_lineage_mapping( + upstream_urn=base_table_urn, downstream_urn=snapshot_urn + ) + + yield from auto_workunit(self.aggregator.gen_metadata()) + def get_lineage_workunits( self, projects: List[str], - sql_parser_schema_resolver: SchemaResolver, view_refs_by_project: Dict[str, Set[str]], view_definitions: FileBackedDict[str], snapshot_refs_by_project: Dict[str, Set[str]], @@ -283,39 +321,22 @@ class BigqueryLineageExtractor: ) -> Iterable[MetadataWorkUnit]: if not self._should_ingest_lineage(): return - datasets_skip_audit_log_lineage: Set[str] = set() - dataset_lineage: Dict[str, Set[LineageEdge]] = {} - for project in projects: - self.populate_snapshot_lineage( - dataset_lineage, - snapshot_refs_by_project[project], - snapshots_by_ref, - ) - if self.config.lineage_parse_view_ddl: - self.populate_view_lineage_with_sql_parsing( - dataset_lineage, - view_refs_by_project[project], - view_definitions, - sql_parser_schema_resolver, - project, - ) - - datasets_skip_audit_log_lineage.update(dataset_lineage.keys()) - for lineage_key in dataset_lineage.keys(): - yield from self.gen_lineage_workunits_for_table( - dataset_lineage, BigQueryTableRef.from_string_name(lineage_key) - ) + yield from self.get_lineage_workunits_for_views_and_snapshots( + projects, + view_refs_by_project, + view_definitions, + snapshot_refs_by_project, + snapshots_by_ref, + ) if self.config.use_exported_bigquery_audit_metadata: projects = ["*"] # project_id not used when using exported metadata for project in projects: - self.report.set_ingestion_stage(project, "Lineage Extraction") + self.report.set_ingestion_stage(project, LINEAGE_EXTRACTION) yield from self.generate_lineage( project, - sql_parser_schema_resolver, - datasets_skip_audit_log_lineage, table_refs, ) @@ -328,8 +349,6 @@ class BigqueryLineageExtractor: def generate_lineage( self, project_id: str, - sql_parser_schema_resolver: SchemaResolver, - datasets_skip_audit_log_lineage: Set[str], table_refs: Set[str], ) -> Iterable[MetadataWorkUnit]: logger.info(f"Generate lineage for {project_id}") @@ -339,9 +358,7 @@ class BigqueryLineageExtractor: lineage = self.lineage_via_catalog_lineage_api(project_id) else: events = self._get_parsed_audit_log_events(project_id) - lineage = self._create_lineage_map( - events, sql_parser_schema_resolver - ) + lineage = self._create_lineage_map(events) except Exception as e: self.report.lineage_failed_extraction.append(project_id) self.report.warning( @@ -367,7 +384,7 @@ class BigqueryLineageExtractor: # as they may contain indirectly referenced tables. if ( lineage_key not in table_refs - or lineage_key in datasets_skip_audit_log_lineage + or lineage_key in self.datasets_skip_audit_log_lineage ): continue @@ -375,58 +392,6 @@ class BigqueryLineageExtractor: lineage, BigQueryTableRef.from_string_name(lineage_key) ) - def populate_view_lineage_with_sql_parsing( - self, - view_lineage: Dict[str, Set[LineageEdge]], - view_refs: Set[str], - view_definitions: FileBackedDict[str], - sql_parser_schema_resolver: SchemaResolver, - default_project: str, - ) -> None: - for view in view_refs: - view_definition = view_definitions[view] - raw_view_lineage = sqlglot_lineage( - view_definition, - schema_resolver=sql_parser_schema_resolver, - default_db=default_project, - ) - if raw_view_lineage.debug_info.table_error: - logger.debug( - f"Failed to parse lineage for view {view}: {raw_view_lineage.debug_info.table_error}" - ) - self.report.num_view_definitions_failed_parsing += 1 - self.report.view_definitions_parsing_failures.append( - f"Table-level sql parsing error for view {view}: {raw_view_lineage.debug_info.table_error}" - ) - continue - elif raw_view_lineage.debug_info.column_error: - self.report.num_view_definitions_failed_column_parsing += 1 - self.report.view_definitions_parsing_failures.append( - f"Column-level sql parsing error for view {view}: {raw_view_lineage.debug_info.column_error}" - ) - else: - self.report.num_view_definitions_parsed += 1 - - ts = datetime.now(timezone.utc) - view_lineage[view] = set( - make_lineage_edges_from_parsing_result( - raw_view_lineage, - audit_stamp=ts, - lineage_type=DatasetLineageTypeClass.VIEW, - ) - ) - - def populate_snapshot_lineage( - self, - snapshot_lineage: Dict[str, Set[LineageEdge]], - snapshot_refs: Set[str], - snapshots_by_ref: FileBackedDict[BigqueryTableSnapshot], - ) -> None: - for snapshot in snapshot_refs: - lineage_edge = make_lineage_edge_for_snapshot(snapshots_by_ref[snapshot]) - if lineage_edge: - snapshot_lineage[snapshot] = {lineage_edge} - def gen_lineage_workunits_for_table( self, lineage: Dict[str, Set[LineageEdge]], table_ref: BigQueryTableRef ) -> Iterable[MetadataWorkUnit]: @@ -687,7 +652,6 @@ class BigqueryLineageExtractor: def _create_lineage_map( self, entries: Iterable[QueryEvent], - sql_parser_schema_resolver: SchemaResolver, ) -> Dict[str, Set[LineageEdge]]: logger.info("Entering create lineage map function") lineage_map: Dict[str, Set[LineageEdge]] = collections.defaultdict(set) @@ -751,7 +715,7 @@ class BigqueryLineageExtractor: query = e.query raw_lineage = sqlglot_lineage( query, - schema_resolver=sql_parser_schema_resolver, + schema_resolver=self.schema_resolver, default_db=e.project_id, ) logger.debug( diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/queries_extractor.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/queries_extractor.py index 8457f4e37b..8e1d27847f 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/queries_extractor.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/queries_extractor.py @@ -2,25 +2,29 @@ import functools import logging import pathlib import tempfile -from dataclasses import dataclass, field from datetime import datetime, timezone -from typing import Dict, Iterable, List, Optional, TypedDict +from typing import Collection, Dict, Iterable, List, Optional, TypedDict from google.cloud.bigquery import Client -from pydantic import Field +from pydantic import Field, PositiveInt from datahub.configuration.common import AllowDenyPattern from datahub.configuration.time_window_config import ( BaseTimeWindowConfig, get_time_bucket, ) -from datahub.ingestion.api.report import Report from datahub.ingestion.api.source import SourceReport from datahub.ingestion.api.source_helpers import auto_workunit from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.ingestion.graph.client import DataHubGraph -from datahub.ingestion.source.bigquery_v2.bigquery_audit import BigqueryTableIdentifier +from datahub.ingestion.source.bigquery_v2.bigquery_audit import ( + BigqueryTableIdentifier, + BigQueryTableRef, +) from datahub.ingestion.source.bigquery_v2.bigquery_config import BigQueryBaseConfig +from datahub.ingestion.source.bigquery_v2.bigquery_report import ( + BigQueryQueriesExtractorReport, +) from datahub.ingestion.source.bigquery_v2.bigquery_schema import ( BigqueryProject, BigQuerySchemaApi, @@ -35,7 +39,6 @@ from datahub.ingestion.source.usage.usage_common import BaseUsageConfig from datahub.sql_parsing.schema_resolver import SchemaResolver from datahub.sql_parsing.sql_parsing_aggregator import ( ObservedQuery, - SqlAggregatorReport, SqlParsingAggregator, ) from datahub.sql_parsing.sqlglot_utils import get_query_fingerprint @@ -44,8 +47,6 @@ from datahub.utilities.file_backed_collections import ( FileBackedDict, FileBackedList, ) -from datahub.utilities.perf_timer import PerfTimer -from datahub.utilities.stats_collections import TopKDict, int_top_k_dict from datahub.utilities.time import datetime_to_ts_millis logger = logging.getLogger(__name__) @@ -95,6 +96,10 @@ class BigQueryQueriesExtractorConfig(BigQueryBaseConfig): description="regex patterns for user emails to filter in usage.", ) + top_n_queries: PositiveInt = Field( + default=10, description="Number of top queries to save to each table." + ) + include_lineage: bool = True include_queries: bool = True include_usage_statistics: bool = True @@ -108,18 +113,6 @@ class BigQueryQueriesExtractorConfig(BigQueryBaseConfig): ) -@dataclass -class BigQueryQueriesExtractorReport(Report): - query_log_fetch_timer: PerfTimer = field(default_factory=PerfTimer) - audit_log_preprocessing_timer: PerfTimer = field(default_factory=PerfTimer) - audit_log_load_timer: PerfTimer = field(default_factory=PerfTimer) - sql_aggregator: Optional[SqlAggregatorReport] = None - num_queries_by_project: TopKDict[str, int] = field(default_factory=int_top_k_dict) - - num_total_queries: int = 0 - num_unique_queries: int = 0 - - class BigQueryQueriesExtractor: """ Extracts query audit log and generates usage/lineage/operation workunits. @@ -128,6 +121,7 @@ class BigQueryQueriesExtractor: 1. For every lineage/operation workunit, corresponding query id is also present 2. Operation aspect for a particular query is emitted at max once(last occurence) for a day 3. "DROP" operation accounts for usage here + 4. userEmail is not populated in datasetUsageStatistics aspect, only user urn """ @@ -141,7 +135,7 @@ class BigQueryQueriesExtractor: identifiers: BigQueryIdentifierBuilder, graph: Optional[DataHubGraph] = None, schema_resolver: Optional[SchemaResolver] = None, - discovered_tables: Optional[List[str]] = None, + discovered_tables: Optional[Collection[str]] = None, ): self.connection = connection @@ -150,8 +144,7 @@ class BigQueryQueriesExtractor: self.identifiers = identifiers self.schema_api = schema_api self.report = BigQueryQueriesExtractorReport() - # self.filters = filters - self.discovered_tables = discovered_tables + self.discovered_tables = set(discovered_tables) if discovered_tables else None self.structured_report = structured_report @@ -171,6 +164,7 @@ class BigQueryQueriesExtractor: start_time=self.config.window.start_time, end_time=self.config.window.end_time, user_email_pattern=self.config.user_email_pattern, + top_n_queries=self.config.top_n_queries, ), generate_operations=self.config.include_operations, is_temp_table=self.is_temp_table, @@ -192,19 +186,35 @@ class BigQueryQueriesExtractor: def is_temp_table(self, name: str) -> bool: try: - return BigqueryTableIdentifier.from_string_name(name).dataset.startswith( - self.config.temp_table_dataset_prefix - ) + table = BigqueryTableIdentifier.from_string_name(name) + + if table.dataset.startswith(self.config.temp_table_dataset_prefix): + return True + + # This is also a temp table if + # 1. this name would be allowed by the dataset patterns, and + # 2. we have a list of discovered tables, and + # 3. it's not in the discovered tables list + if ( + self.filters.is_allowed(table) + and self.discovered_tables + and str(BigQueryTableRef(table)) not in self.discovered_tables + ): + return True + except Exception: logger.warning(f"Error parsing table name {name} ") - return False + return False def is_allowed_table(self, name: str) -> bool: try: - table_id = BigqueryTableIdentifier.from_string_name(name) - if self.discovered_tables and str(table_id) not in self.discovered_tables: + table = BigqueryTableIdentifier.from_string_name(name) + if ( + self.discovered_tables + and str(BigQueryTableRef(table)) not in self.discovered_tables + ): return False - return self.filters.is_allowed(table_id) + return self.filters.is_allowed(table) except Exception: logger.warning(f"Error parsing table name {name} ") return False diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py index 0c861b1334..4da232518c 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py @@ -166,7 +166,7 @@ class SnowflakeV2Source( # If we're ingestion schema metadata for tables/views, then we will populate # schemas into the resolver as we go. We only need to do a bulk fetch # if we're not ingesting schema metadata as part of ingestion. - ( + not ( self.config.include_technical_schema and self.config.include_tables and self.config.include_views diff --git a/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py b/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py index ed4ea2cabe..29204c58fa 100644 --- a/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py +++ b/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py @@ -37,6 +37,7 @@ from datahub.sql_parsing.sql_parsing_common import QueryType, QueryTypeProps from datahub.sql_parsing.sqlglot_lineage import ( ColumnLineageInfo, ColumnRef, + DownstreamColumnRef, SqlParsingResult, infer_output_schema, sqlglot_lineage, @@ -574,9 +575,6 @@ class SqlParsingAggregator(Closeable): Because this method takes in urns, it does not require that the urns are part of the platform that the aggregator is configured for. - TODO: In the future, this method will also generate CLL if we have - schemas for either the upstream or downstream. - The known lineage mapping does not contribute to usage statistics or operations. Args: @@ -589,6 +587,21 @@ class SqlParsingAggregator(Closeable): # We generate a fake "query" object to hold the lineage. query_id = self._known_lineage_query_id() + # Generate CLL if schema of downstream is known + column_lineage: List[ColumnLineageInfo] = [] + if self._schema_resolver.has_urn(downstream_urn): + schema = self._schema_resolver._resolve_schema_info(downstream_urn) + if schema: + column_lineage = [ + ColumnLineageInfo( + downstream=DownstreamColumnRef( + table=downstream_urn, column=field_path + ), + upstreams=[ColumnRef(table=upstream_urn, column=field_path)], + ) + for field_path in schema + ] + # Register the query. self._add_to_query_map( QueryMetadata( @@ -600,7 +613,7 @@ class SqlParsingAggregator(Closeable): latest_timestamp=None, actor=None, upstreams=[upstream_urn], - column_lineage=[], + column_lineage=column_lineage, column_usage={}, confidence_score=1.0, ) diff --git a/metadata-ingestion/tests/integration/bigquery_v2/bigquery_mcp_golden.json b/metadata-ingestion/tests/integration/bigquery_v2/bigquery_mcp_golden.json index e7b2a7c4a9..bcbdd02506 100644 --- a/metadata-ingestion/tests/integration/bigquery_v2/bigquery_mcp_golden.json +++ b/metadata-ingestion/tests/integration/bigquery_v2/bigquery_mcp_golden.json @@ -401,6 +401,22 @@ "lastRunId": "no-run-id-provided" } }, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,project-id-1.bigquery-dataset-1.snapshot-table-1,PROD)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, { "entityType": "glossaryTerm", "entityUrn": "urn:li:glossaryTerm:Age", @@ -417,6 +433,97 @@ "lastRunId": "no-run-id-provided" } }, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,project-id-1.bigquery-dataset-1.snapshot-table-1,PROD)", + "changeType": "UPSERT", + "aspectName": "schemaMetadata", + "aspect": { + "json": { + "schemaName": "project-id-1.bigquery-dataset-1.snapshot-table-1", + "platform": "urn:li:dataPlatform:bigquery", + "version": 0, + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "hash": "", + "platformSchema": { + "com.linkedin.schema.MySqlDDL": { + "tableSchema": "" + } + }, + "fields": [ + { + "fieldPath": "age", + "nullable": false, + "description": "comment", + "type": { + "type": { + "com.linkedin.schema.NumberType": {} + } + }, + "nativeDataType": "INT", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Test Policy Tag" + } + ] + }, + "isPartOfKey": false + }, + { + "fieldPath": "email", + "nullable": false, + "description": "comment", + "type": { + "type": { + "com.linkedin.schema.StringType": {} + } + }, + "nativeDataType": "STRING", + "recursive": false, + "globalTags": { + "tags": [] + }, + "isPartOfKey": false + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,project-id-1.bigquery-dataset-1.snapshot-table-1,PROD)", + "changeType": "UPSERT", + "aspectName": "datasetProperties", + "aspect": { + "json": { + "customProperties": {}, + "externalUrl": "https://console.cloud.google.com/bigquery?project=project-id-1&ws=!1m5!1m4!4m3!1sproject-id-1!2sbigquery-dataset-1!3ssnapshot-table-1", + "name": "snapshot-table-1", + "qualifiedName": "project-id-1.bigquery-dataset-1.snapshot-table-1", + "description": "", + "tags": [] + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, { "entityType": "glossaryTerm", "entityUrn": "urn:li:glossaryTerm:Email_Address", @@ -433,6 +540,57 @@ "lastRunId": "no-run-id-provided" } }, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,project-id-1.bigquery-dataset-1.snapshot-table-1,PROD)", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Bigquery Table Snapshot" + ] + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,project-id-1.bigquery-dataset-1.snapshot-table-1,PROD)", + "changeType": "UPSERT", + "aspectName": "container", + "aspect": { + "json": { + "container": "urn:li:container:8df46c5e3ded05a3122b0015822c0ef0" + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,project-id-1.bigquery-dataset-1.snapshot-table-1,PROD)", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:bigquery", + "instance": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:bigquery,project-id-1)" + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, { "entityType": "tag", "entityUrn": "urn:li:tag:Test Policy Tag", @@ -448,5 +606,83 @@ "runId": "bigquery-2022_02_03-07_00_00", "lastRunId": "no-run-id-provided" } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,project-id-1.bigquery-dataset-1.snapshot-table-1,PROD)", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [ + { + "id": "urn:li:container:068bd9323110994a40019fcf6cfc60d3", + "urn": "urn:li:container:068bd9323110994a40019fcf6cfc60d3" + }, + { + "id": "urn:li:container:8df46c5e3ded05a3122b0015822c0ef0", + "urn": "urn:li:container:8df46c5e3ded05a3122b0015822c0ef0" + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,project-id-1.bigquery-dataset-1.snapshot-table-1,PROD)", + "changeType": "UPSERT", + "aspectName": "upstreamLineage", + "aspect": { + "json": { + "upstreams": [ + { + "auditStamp": { + "time": 1643871600000, + "actor": "urn:li:corpuser:_ingestion" + }, + "created": { + "time": 0, + "actor": "urn:li:corpuser:_ingestion" + }, + "dataset": "urn:li:dataset:(urn:li:dataPlatform:bigquery,project-id-1.bigquery-dataset-1.table-1,PROD)", + "type": "COPY" + } + ], + "fineGrainedLineages": [ + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,project-id-1.bigquery-dataset-1.table-1,PROD),age)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,project-id-1.bigquery-dataset-1.snapshot-table-1,PROD),age)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,project-id-1.bigquery-dataset-1.table-1,PROD),email)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,project-id-1.bigquery-dataset-1.snapshot-table-1,PROD),email)" + ], + "confidenceScore": 1.0 + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } } ] \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/bigquery_v2/test_bigquery.py b/metadata-ingestion/tests/integration/bigquery_v2/test_bigquery.py index dff7f18db6..36199ee0e2 100644 --- a/metadata-ingestion/tests/integration/bigquery_v2/test_bigquery.py +++ b/metadata-ingestion/tests/integration/bigquery_v2/test_bigquery.py @@ -11,6 +11,7 @@ from datahub.ingestion.glossary.classifier import ( DynamicTypedClassifierConfig, ) from datahub.ingestion.glossary.datahub_classifier import DataHubClassifierConfig +from datahub.ingestion.source.bigquery_v2.bigquery_audit import BigqueryTableIdentifier from datahub.ingestion.source.bigquery_v2.bigquery_data_reader import BigQueryDataReader from datahub.ingestion.source.bigquery_v2.bigquery_schema import ( BigqueryColumn, @@ -18,6 +19,7 @@ from datahub.ingestion.source.bigquery_v2.bigquery_schema import ( BigqueryProject, BigQuerySchemaApi, BigqueryTable, + BigqueryTableSnapshot, ) from datahub.ingestion.source.bigquery_v2.bigquery_schema_gen import ( BigQuerySchemaGenerator, @@ -47,7 +49,7 @@ def recipe(mcp_output_path: str, override: dict = {}) -> dict: "config": { "project_ids": ["project-id-1"], "include_usage_statistics": False, - "include_table_lineage": False, + "include_table_lineage": True, "include_data_platform_instance": True, "classification": ClassificationConfig( enabled=True, @@ -68,6 +70,7 @@ def recipe(mcp_output_path: str, override: dict = {}) -> dict: @freeze_time(FROZEN_TIME) +@patch.object(BigQuerySchemaApi, "get_snapshots_for_dataset") @patch.object(BigQuerySchemaApi, "get_tables_for_dataset") @patch.object(BigQuerySchemaGenerator, "get_core_table_details") @patch.object(BigQuerySchemaApi, "get_datasets_for_project_id") @@ -85,6 +88,7 @@ def test_bigquery_v2_ingest( get_datasets_for_project_id, get_core_table_details, get_tables_for_dataset, + get_snapshots_for_dataset, pytestconfig, tmp_path, ): @@ -100,31 +104,35 @@ def test_bigquery_v2_ingest( {"tableReference": {"projectId": "", "datasetId": "", "tableId": ""}} ) table_name = "table-1" + snapshot_table_name = "snapshot-table-1" get_core_table_details.return_value = {table_name: table_list_item} + columns = [ + BigqueryColumn( + name="age", + ordinal_position=1, + is_nullable=False, + field_path="col_1", + data_type="INT", + comment="comment", + is_partition_column=False, + cluster_column_position=None, + policy_tags=["Test Policy Tag"], + ), + BigqueryColumn( + name="email", + ordinal_position=1, + is_nullable=False, + field_path="col_2", + data_type="STRING", + comment="comment", + is_partition_column=False, + cluster_column_position=None, + ), + ] + get_columns_for_dataset.return_value = { - table_name: [ - BigqueryColumn( - name="age", - ordinal_position=1, - is_nullable=False, - field_path="col_1", - data_type="INT", - comment="comment", - is_partition_column=False, - cluster_column_position=None, - policy_tags=["Test Policy Tag"], - ), - BigqueryColumn( - name="email", - ordinal_position=1, - is_nullable=False, - field_path="col_2", - data_type="STRING", - comment="comment", - is_partition_column=False, - cluster_column_position=None, - ), - ] + table_name: columns, + snapshot_table_name: columns, } get_sample_data_for_table.return_value = { "age": [random.randint(1, 80) for i in range(20)], @@ -140,6 +148,20 @@ def test_bigquery_v2_ingest( rows_count=None, ) get_tables_for_dataset.return_value = iter([bigquery_table]) + snapshot_table = BigqueryTableSnapshot( + name=snapshot_table_name, + comment=None, + created=None, + last_altered=None, + size_in_bytes=None, + rows_count=None, + base_table_identifier=BigqueryTableIdentifier( + project_id="project-id-1", + dataset="bigquery-dataset-1", + table="table-1", + ), + ) + get_snapshots_for_dataset.return_value = iter([snapshot_table]) pipeline_config_dict: Dict[str, Any] = recipe(mcp_output_path=mcp_output_path) diff --git a/metadata-ingestion/tests/integration/bigquery_v2/test_bigquery_queries.py b/metadata-ingestion/tests/integration/bigquery_v2/test_bigquery_queries.py index fb51aac9fa..d9effd33f5 100644 --- a/metadata-ingestion/tests/integration/bigquery_v2/test_bigquery_queries.py +++ b/metadata-ingestion/tests/integration/bigquery_v2/test_bigquery_queries.py @@ -6,7 +6,6 @@ from unittest.mock import patch import pytest from freezegun import freeze_time -from datahub.ingestion.source.usage.usage_common import BaseUsageConfig from datahub.sql_parsing.sql_parsing_aggregator import ObservedQuery from datahub.utilities.file_backed_collections import ConnectionWrapper, FileBackedList from tests.test_helpers import mce_helpers @@ -58,15 +57,12 @@ def test_queries_ingestion(project_client, client, pytestconfig, monkeypatch, tm "config": { "project_ids": ["gcp-staging", "gcp-staging-2"], "local_temp_path": tmp_path, + "top_n_queries": 20, }, }, "sink": {"type": "file", "config": {"filename": mcp_output_path}}, } - # This is hacky to pick all queries instead of any 10. - # Should be easy to remove once top_n_queries is supported in queries config - monkeypatch.setattr(BaseUsageConfig.__fields__["top_n_queries"], "default", 20) - pipeline = run_and_get_pipeline(pipeline_config_dict) pipeline.pretty_print_summary() diff --git a/metadata-ingestion/tests/unit/test_bigquery_lineage.py b/metadata-ingestion/tests/unit/test_bigquery_lineage.py index 6bd5cc4d32..7456f2fd1d 100644 --- a/metadata-ingestion/tests/unit/test_bigquery_lineage.py +++ b/metadata-ingestion/tests/unit/test_bigquery_lineage.py @@ -83,7 +83,10 @@ def test_lineage_with_timestamps(lineage_entries: List[QueryEvent]) -> None: config = BigQueryV2Config() report = BigQueryV2Report() extractor: BigqueryLineageExtractor = BigqueryLineageExtractor( - config, report, BigQueryIdentifierBuilder(config, report) + config, + report, + schema_resolver=SchemaResolver(platform="bigquery"), + identifiers=BigQueryIdentifierBuilder(config, report), ) bq_table = BigQueryTableRef.from_string_name( @@ -91,8 +94,7 @@ def test_lineage_with_timestamps(lineage_entries: List[QueryEvent]) -> None: ) lineage_map: Dict[str, Set[LineageEdge]] = extractor._create_lineage_map( - iter(lineage_entries), - sql_parser_schema_resolver=SchemaResolver(platform="bigquery"), + iter(lineage_entries) ) upstream_lineage = extractor.get_lineage_for_table( @@ -108,7 +110,10 @@ def test_column_level_lineage(lineage_entries: List[QueryEvent]) -> None: config = BigQueryV2Config(extract_column_lineage=True, incremental_lineage=False) report = BigQueryV2Report() extractor: BigqueryLineageExtractor = BigqueryLineageExtractor( - config, report, BigQueryIdentifierBuilder(config, report) + config, + report, + schema_resolver=SchemaResolver(platform="bigquery"), + identifiers=BigQueryIdentifierBuilder(config, report), ) bq_table = BigQueryTableRef.from_string_name( @@ -117,7 +122,6 @@ def test_column_level_lineage(lineage_entries: List[QueryEvent]) -> None: lineage_map: Dict[str, Set[LineageEdge]] = extractor._create_lineage_map( lineage_entries[:1], - sql_parser_schema_resolver=SchemaResolver(platform="bigquery"), ) upstream_lineage = extractor.get_lineage_for_table(