feat(ingest/bq): integrate bigquery-queries into main source (#11247)

This commit is contained in:
Mayuri Nehate 2024-08-31 06:46:45 +05:30 committed by GitHub
parent c513e17dbb
commit a7fc7f519a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 496 additions and 194 deletions

View File

@ -39,6 +39,10 @@ from datahub.ingestion.source.bigquery_v2.common import (
)
from datahub.ingestion.source.bigquery_v2.lineage import BigqueryLineageExtractor
from datahub.ingestion.source.bigquery_v2.profiler import BigqueryProfiler
from datahub.ingestion.source.bigquery_v2.queries_extractor import (
BigQueryQueriesExtractor,
BigQueryQueriesExtractorConfig,
)
from datahub.ingestion.source.bigquery_v2.usage import BigQueryUsageExtractor
from datahub.ingestion.source.state.profiling_state_handler import ProfilingHandler
from datahub.ingestion.source.state.redundant_run_skip_handler import (
@ -51,6 +55,7 @@ from datahub.ingestion.source.state.stale_entity_removal_handler import (
from datahub.ingestion.source.state.stateful_ingestion_base import (
StatefulIngestionSourceBase,
)
from datahub.ingestion.source_report.ingestion_stage import QUERIES_EXTRACTION
from datahub.sql_parsing.schema_resolver import SchemaResolver
from datahub.utilities.registries.domain_registry import DomainRegistry
@ -139,6 +144,7 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource):
self.lineage_extractor = BigqueryLineageExtractor(
config,
self.report,
schema_resolver=self.sql_parser_schema_resolver,
identifiers=self.identifiers,
redundant_run_skip_handler=redundant_lineage_run_skip_handler,
)
@ -196,7 +202,9 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource):
def _init_schema_resolver(self) -> SchemaResolver:
schema_resolution_required = (
self.config.lineage_parse_view_ddl or self.config.lineage_use_sql_parser
self.config.use_queries_v2
or self.config.lineage_parse_view_ddl
or self.config.lineage_use_sql_parser
)
schema_ingestion_enabled = (
self.config.include_schema_metadata
@ -244,22 +252,54 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource):
for project in projects:
yield from self.bq_schema_extractor.get_project_workunits(project)
if self.config.include_usage_statistics:
yield from self.usage_extractor.get_usage_workunits(
[p.id for p in projects], self.bq_schema_extractor.table_refs
)
if self.config.use_queries_v2:
self.report.set_ingestion_stage("*", "View and Snapshot Lineage")
if self.config.include_table_lineage:
yield from self.lineage_extractor.get_lineage_workunits(
yield from self.lineage_extractor.get_lineage_workunits_for_views_and_snapshots(
[p.id for p in projects],
self.sql_parser_schema_resolver,
self.bq_schema_extractor.view_refs_by_project,
self.bq_schema_extractor.view_definitions,
self.bq_schema_extractor.snapshot_refs_by_project,
self.bq_schema_extractor.snapshots_by_ref,
self.bq_schema_extractor.table_refs,
)
self.report.set_ingestion_stage("*", QUERIES_EXTRACTION)
queries_extractor = BigQueryQueriesExtractor(
connection=self.config.get_bigquery_client(),
schema_api=self.bq_schema_extractor.schema_api,
config=BigQueryQueriesExtractorConfig(
window=self.config,
user_email_pattern=self.config.usage.user_email_pattern,
include_lineage=self.config.include_table_lineage,
include_usage_statistics=self.config.include_usage_statistics,
include_operations=self.config.usage.include_operational_stats,
top_n_queries=self.config.usage.top_n_queries,
),
structured_report=self.report,
filters=self.filters,
identifiers=self.identifiers,
schema_resolver=self.sql_parser_schema_resolver,
discovered_tables=self.bq_schema_extractor.table_refs,
)
self.report.queries_extractor = queries_extractor.report
yield from queries_extractor.get_workunits_internal()
else:
if self.config.include_usage_statistics:
yield from self.usage_extractor.get_usage_workunits(
[p.id for p in projects], self.bq_schema_extractor.table_refs
)
if self.config.include_table_lineage:
yield from self.lineage_extractor.get_lineage_workunits(
[p.id for p in projects],
self.bq_schema_extractor.view_refs_by_project,
self.bq_schema_extractor.view_definitions,
self.bq_schema_extractor.snapshot_refs_by_project,
self.bq_schema_extractor.snapshots_by_ref,
self.bq_schema_extractor.table_refs,
)
def get_report(self) -> BigQueryV2Report:
return self.report

View File

@ -404,6 +404,11 @@ class BigQueryV2Config(
"enabled.",
)
use_queries_v2: bool = Field(
default=False,
description="If enabled, uses the new queries extractor to extract queries from bigquery.",
)
@property
def have_table_data_read_permission(self) -> bool:
return self.use_tables_list_query_v2 or self.is_profiling_enabled()

View File

@ -15,6 +15,7 @@ from datahub.ingestion.source.bigquery_v2.bigquery_config import (
BigQueryIdentifierConfig,
)
from datahub.ingestion.source.bigquery_v2.bigquery_report import (
BigQueryQueriesExtractorReport,
BigQuerySchemaApiPerfReport,
)
from datahub.ingestion.source.bigquery_v2.bigquery_schema import BigQuerySchemaApi
@ -25,7 +26,6 @@ from datahub.ingestion.source.bigquery_v2.common import (
from datahub.ingestion.source.bigquery_v2.queries_extractor import (
BigQueryQueriesExtractor,
BigQueryQueriesExtractorConfig,
BigQueryQueriesExtractorReport,
)
logger = logging.getLogger(__name__)

View File

@ -58,6 +58,18 @@ class BigQueryProcessingPerfReport(Report):
usage_state_size: Optional[str] = None
@dataclass
class BigQueryQueriesExtractorReport(Report):
query_log_fetch_timer: PerfTimer = field(default_factory=PerfTimer)
audit_log_preprocessing_timer: PerfTimer = field(default_factory=PerfTimer)
audit_log_load_timer: PerfTimer = field(default_factory=PerfTimer)
sql_aggregator: Optional[SqlAggregatorReport] = None
num_queries_by_project: TopKDict[str, int] = field(default_factory=int_top_k_dict)
num_total_queries: int = 0
num_unique_queries: int = 0
@dataclass
class BigQueryV2Report(
ProfilingSqlReport,
@ -143,10 +155,8 @@ class BigQueryV2Report(
snapshots_scanned: int = 0
num_view_definitions_parsed: int = 0
num_view_definitions_failed_parsing: int = 0
num_view_definitions_failed_column_parsing: int = 0
view_definitions_parsing_failures: LossyList[str] = field(default_factory=LossyList)
# view lineage
sql_aggregator: Optional[SqlAggregatorReport] = None
read_reasons_stat: Counter[str] = field(default_factory=collections.Counter)
operation_types_stat: Counter[str] = field(default_factory=collections.Counter)
@ -171,8 +181,7 @@ class BigQueryV2Report(
usage_end_time: Optional[datetime] = None
stateful_usage_ingestion_enabled: bool = False
# lineage/usage v2
sql_aggregator: Optional[SqlAggregatorReport] = None
queries_extractor: Optional[BigQueryQueriesExtractorReport] = None
def set_ingestion_stage(self, project_id: str, stage: str) -> None:
self.report_ingestion_stage_start(f"{project_id}: {stage}")

View File

@ -137,7 +137,10 @@ class BigQueryTestConnection:
report: BigQueryV2Report,
) -> CapabilityReport:
lineage_extractor = BigqueryLineageExtractor(
connection_conf, report, BigQueryIdentifierBuilder(connection_conf, report)
connection_conf,
report,
schema_resolver=SchemaResolver(platform="bigquery"),
identifiers=BigQueryIdentifierBuilder(connection_conf, report),
)
for project_id in project_ids:
try:

View File

@ -24,6 +24,7 @@ from google.cloud.logging_v2.client import Client as GCPLoggingClient
from datahub.configuration.pattern_utils import is_schema_allowed
from datahub.emitter import mce_builder
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.source_helpers import auto_workunit
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.bigquery_v2.bigquery_audit import (
AuditLogEntry,
@ -53,6 +54,7 @@ from datahub.ingestion.source.bigquery_v2.queries import (
from datahub.ingestion.source.state.redundant_run_skip_handler import (
RedundantLineageRunSkipHandler,
)
from datahub.ingestion.source_report.ingestion_stage import LINEAGE_EXTRACTION
from datahub.metadata.schema_classes import (
AuditStampClass,
DatasetLineageTypeClass,
@ -63,6 +65,7 @@ from datahub.metadata.schema_classes import (
UpstreamLineageClass,
)
from datahub.sql_parsing.schema_resolver import SchemaResolver
from datahub.sql_parsing.sql_parsing_aggregator import SqlParsingAggregator
from datahub.sql_parsing.sqlglot_lineage import SqlParsingResult, sqlglot_lineage
from datahub.utilities import memory_footprint
from datahub.utilities.file_backed_collections import FileBackedDict
@ -201,38 +204,20 @@ def make_lineage_edges_from_parsing_result(
return list(table_edges.values())
def make_lineage_edge_for_snapshot(
snapshot: BigqueryTableSnapshot,
) -> Optional[LineageEdge]:
if snapshot.base_table_identifier:
base_table_name = str(
BigQueryTableRef.from_bigquery_table(snapshot.base_table_identifier)
)
return LineageEdge(
table=base_table_name,
column_mapping=frozenset(
LineageEdgeColumnMapping(
out_column=column.field_path,
in_columns=frozenset([column.field_path]),
)
for column in snapshot.columns
),
auditStamp=datetime.now(timezone.utc),
type=DatasetLineageTypeClass.TRANSFORMED,
)
return None
class BigqueryLineageExtractor:
def __init__(
self,
config: BigQueryV2Config,
report: BigQueryV2Report,
*,
schema_resolver: SchemaResolver,
identifiers: BigQueryIdentifierBuilder,
redundant_run_skip_handler: Optional[RedundantLineageRunSkipHandler] = None,
):
self.config = config
self.report = report
self.schema_resolver = schema_resolver
self.identifiers = identifiers
self.audit_log_api = BigQueryAuditLogApi(
report.audit_log_api_perf,
@ -246,6 +231,23 @@ class BigqueryLineageExtractor:
self.report.lineage_end_time,
) = self.get_time_window()
self.datasets_skip_audit_log_lineage: Set[str] = set()
self.aggregator = SqlParsingAggregator(
platform=self.identifiers.platform,
platform_instance=self.identifiers.identifier_config.platform_instance,
env=self.identifiers.identifier_config.env,
schema_resolver=self.schema_resolver,
eager_graph_load=False,
generate_lineage=True,
generate_queries=True,
generate_usage_statistics=False,
generate_query_usage_statistics=False,
generate_operations=False,
format_queries=True,
)
self.report.sql_aggregator = self.aggregator.report
def get_time_window(self) -> Tuple[datetime, datetime]:
if self.redundant_run_skip_handler:
return self.redundant_run_skip_handler.suggest_run_time_window(
@ -271,10 +273,46 @@ class BigqueryLineageExtractor:
return True
def get_lineage_workunits_for_views_and_snapshots(
self,
projects: List[str],
view_refs_by_project: Dict[str, Set[str]],
view_definitions: FileBackedDict[str],
snapshot_refs_by_project: Dict[str, Set[str]],
snapshots_by_ref: FileBackedDict[BigqueryTableSnapshot],
) -> Iterable[MetadataWorkUnit]:
for project in projects:
if self.config.lineage_parse_view_ddl:
for view in view_refs_by_project[project]:
self.datasets_skip_audit_log_lineage.add(view)
self.aggregator.add_view_definition(
view_urn=self.identifiers.gen_dataset_urn_from_raw_ref(
BigQueryTableRef.from_string_name(view)
),
view_definition=view_definitions[view],
default_db=project,
)
for snapshot_ref in snapshot_refs_by_project[project]:
snapshot = snapshots_by_ref[snapshot_ref]
if not snapshot.base_table_identifier:
continue
self.datasets_skip_audit_log_lineage.add(snapshot_ref)
snapshot_urn = self.identifiers.gen_dataset_urn_from_raw_ref(
BigQueryTableRef.from_string_name(snapshot_ref)
)
base_table_urn = self.identifiers.gen_dataset_urn_from_raw_ref(
BigQueryTableRef(snapshot.base_table_identifier)
)
self.aggregator.add_known_lineage_mapping(
upstream_urn=base_table_urn, downstream_urn=snapshot_urn
)
yield from auto_workunit(self.aggregator.gen_metadata())
def get_lineage_workunits(
self,
projects: List[str],
sql_parser_schema_resolver: SchemaResolver,
view_refs_by_project: Dict[str, Set[str]],
view_definitions: FileBackedDict[str],
snapshot_refs_by_project: Dict[str, Set[str]],
@ -283,39 +321,22 @@ class BigqueryLineageExtractor:
) -> Iterable[MetadataWorkUnit]:
if not self._should_ingest_lineage():
return
datasets_skip_audit_log_lineage: Set[str] = set()
dataset_lineage: Dict[str, Set[LineageEdge]] = {}
for project in projects:
self.populate_snapshot_lineage(
dataset_lineage,
snapshot_refs_by_project[project],
snapshots_by_ref,
)
if self.config.lineage_parse_view_ddl:
self.populate_view_lineage_with_sql_parsing(
dataset_lineage,
view_refs_by_project[project],
view_definitions,
sql_parser_schema_resolver,
project,
)
datasets_skip_audit_log_lineage.update(dataset_lineage.keys())
for lineage_key in dataset_lineage.keys():
yield from self.gen_lineage_workunits_for_table(
dataset_lineage, BigQueryTableRef.from_string_name(lineage_key)
)
yield from self.get_lineage_workunits_for_views_and_snapshots(
projects,
view_refs_by_project,
view_definitions,
snapshot_refs_by_project,
snapshots_by_ref,
)
if self.config.use_exported_bigquery_audit_metadata:
projects = ["*"] # project_id not used when using exported metadata
for project in projects:
self.report.set_ingestion_stage(project, "Lineage Extraction")
self.report.set_ingestion_stage(project, LINEAGE_EXTRACTION)
yield from self.generate_lineage(
project,
sql_parser_schema_resolver,
datasets_skip_audit_log_lineage,
table_refs,
)
@ -328,8 +349,6 @@ class BigqueryLineageExtractor:
def generate_lineage(
self,
project_id: str,
sql_parser_schema_resolver: SchemaResolver,
datasets_skip_audit_log_lineage: Set[str],
table_refs: Set[str],
) -> Iterable[MetadataWorkUnit]:
logger.info(f"Generate lineage for {project_id}")
@ -339,9 +358,7 @@ class BigqueryLineageExtractor:
lineage = self.lineage_via_catalog_lineage_api(project_id)
else:
events = self._get_parsed_audit_log_events(project_id)
lineage = self._create_lineage_map(
events, sql_parser_schema_resolver
)
lineage = self._create_lineage_map(events)
except Exception as e:
self.report.lineage_failed_extraction.append(project_id)
self.report.warning(
@ -367,7 +384,7 @@ class BigqueryLineageExtractor:
# as they may contain indirectly referenced tables.
if (
lineage_key not in table_refs
or lineage_key in datasets_skip_audit_log_lineage
or lineage_key in self.datasets_skip_audit_log_lineage
):
continue
@ -375,58 +392,6 @@ class BigqueryLineageExtractor:
lineage, BigQueryTableRef.from_string_name(lineage_key)
)
def populate_view_lineage_with_sql_parsing(
self,
view_lineage: Dict[str, Set[LineageEdge]],
view_refs: Set[str],
view_definitions: FileBackedDict[str],
sql_parser_schema_resolver: SchemaResolver,
default_project: str,
) -> None:
for view in view_refs:
view_definition = view_definitions[view]
raw_view_lineage = sqlglot_lineage(
view_definition,
schema_resolver=sql_parser_schema_resolver,
default_db=default_project,
)
if raw_view_lineage.debug_info.table_error:
logger.debug(
f"Failed to parse lineage for view {view}: {raw_view_lineage.debug_info.table_error}"
)
self.report.num_view_definitions_failed_parsing += 1
self.report.view_definitions_parsing_failures.append(
f"Table-level sql parsing error for view {view}: {raw_view_lineage.debug_info.table_error}"
)
continue
elif raw_view_lineage.debug_info.column_error:
self.report.num_view_definitions_failed_column_parsing += 1
self.report.view_definitions_parsing_failures.append(
f"Column-level sql parsing error for view {view}: {raw_view_lineage.debug_info.column_error}"
)
else:
self.report.num_view_definitions_parsed += 1
ts = datetime.now(timezone.utc)
view_lineage[view] = set(
make_lineage_edges_from_parsing_result(
raw_view_lineage,
audit_stamp=ts,
lineage_type=DatasetLineageTypeClass.VIEW,
)
)
def populate_snapshot_lineage(
self,
snapshot_lineage: Dict[str, Set[LineageEdge]],
snapshot_refs: Set[str],
snapshots_by_ref: FileBackedDict[BigqueryTableSnapshot],
) -> None:
for snapshot in snapshot_refs:
lineage_edge = make_lineage_edge_for_snapshot(snapshots_by_ref[snapshot])
if lineage_edge:
snapshot_lineage[snapshot] = {lineage_edge}
def gen_lineage_workunits_for_table(
self, lineage: Dict[str, Set[LineageEdge]], table_ref: BigQueryTableRef
) -> Iterable[MetadataWorkUnit]:
@ -687,7 +652,6 @@ class BigqueryLineageExtractor:
def _create_lineage_map(
self,
entries: Iterable[QueryEvent],
sql_parser_schema_resolver: SchemaResolver,
) -> Dict[str, Set[LineageEdge]]:
logger.info("Entering create lineage map function")
lineage_map: Dict[str, Set[LineageEdge]] = collections.defaultdict(set)
@ -751,7 +715,7 @@ class BigqueryLineageExtractor:
query = e.query
raw_lineage = sqlglot_lineage(
query,
schema_resolver=sql_parser_schema_resolver,
schema_resolver=self.schema_resolver,
default_db=e.project_id,
)
logger.debug(

View File

@ -2,25 +2,29 @@ import functools
import logging
import pathlib
import tempfile
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Dict, Iterable, List, Optional, TypedDict
from typing import Collection, Dict, Iterable, List, Optional, TypedDict
from google.cloud.bigquery import Client
from pydantic import Field
from pydantic import Field, PositiveInt
from datahub.configuration.common import AllowDenyPattern
from datahub.configuration.time_window_config import (
BaseTimeWindowConfig,
get_time_bucket,
)
from datahub.ingestion.api.report import Report
from datahub.ingestion.api.source import SourceReport
from datahub.ingestion.api.source_helpers import auto_workunit
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.graph.client import DataHubGraph
from datahub.ingestion.source.bigquery_v2.bigquery_audit import BigqueryTableIdentifier
from datahub.ingestion.source.bigquery_v2.bigquery_audit import (
BigqueryTableIdentifier,
BigQueryTableRef,
)
from datahub.ingestion.source.bigquery_v2.bigquery_config import BigQueryBaseConfig
from datahub.ingestion.source.bigquery_v2.bigquery_report import (
BigQueryQueriesExtractorReport,
)
from datahub.ingestion.source.bigquery_v2.bigquery_schema import (
BigqueryProject,
BigQuerySchemaApi,
@ -35,7 +39,6 @@ from datahub.ingestion.source.usage.usage_common import BaseUsageConfig
from datahub.sql_parsing.schema_resolver import SchemaResolver
from datahub.sql_parsing.sql_parsing_aggregator import (
ObservedQuery,
SqlAggregatorReport,
SqlParsingAggregator,
)
from datahub.sql_parsing.sqlglot_utils import get_query_fingerprint
@ -44,8 +47,6 @@ from datahub.utilities.file_backed_collections import (
FileBackedDict,
FileBackedList,
)
from datahub.utilities.perf_timer import PerfTimer
from datahub.utilities.stats_collections import TopKDict, int_top_k_dict
from datahub.utilities.time import datetime_to_ts_millis
logger = logging.getLogger(__name__)
@ -95,6 +96,10 @@ class BigQueryQueriesExtractorConfig(BigQueryBaseConfig):
description="regex patterns for user emails to filter in usage.",
)
top_n_queries: PositiveInt = Field(
default=10, description="Number of top queries to save to each table."
)
include_lineage: bool = True
include_queries: bool = True
include_usage_statistics: bool = True
@ -108,18 +113,6 @@ class BigQueryQueriesExtractorConfig(BigQueryBaseConfig):
)
@dataclass
class BigQueryQueriesExtractorReport(Report):
query_log_fetch_timer: PerfTimer = field(default_factory=PerfTimer)
audit_log_preprocessing_timer: PerfTimer = field(default_factory=PerfTimer)
audit_log_load_timer: PerfTimer = field(default_factory=PerfTimer)
sql_aggregator: Optional[SqlAggregatorReport] = None
num_queries_by_project: TopKDict[str, int] = field(default_factory=int_top_k_dict)
num_total_queries: int = 0
num_unique_queries: int = 0
class BigQueryQueriesExtractor:
"""
Extracts query audit log and generates usage/lineage/operation workunits.
@ -128,6 +121,7 @@ class BigQueryQueriesExtractor:
1. For every lineage/operation workunit, corresponding query id is also present
2. Operation aspect for a particular query is emitted at max once(last occurence) for a day
3. "DROP" operation accounts for usage here
4. userEmail is not populated in datasetUsageStatistics aspect, only user urn
"""
@ -141,7 +135,7 @@ class BigQueryQueriesExtractor:
identifiers: BigQueryIdentifierBuilder,
graph: Optional[DataHubGraph] = None,
schema_resolver: Optional[SchemaResolver] = None,
discovered_tables: Optional[List[str]] = None,
discovered_tables: Optional[Collection[str]] = None,
):
self.connection = connection
@ -150,8 +144,7 @@ class BigQueryQueriesExtractor:
self.identifiers = identifiers
self.schema_api = schema_api
self.report = BigQueryQueriesExtractorReport()
# self.filters = filters
self.discovered_tables = discovered_tables
self.discovered_tables = set(discovered_tables) if discovered_tables else None
self.structured_report = structured_report
@ -171,6 +164,7 @@ class BigQueryQueriesExtractor:
start_time=self.config.window.start_time,
end_time=self.config.window.end_time,
user_email_pattern=self.config.user_email_pattern,
top_n_queries=self.config.top_n_queries,
),
generate_operations=self.config.include_operations,
is_temp_table=self.is_temp_table,
@ -192,19 +186,35 @@ class BigQueryQueriesExtractor:
def is_temp_table(self, name: str) -> bool:
try:
return BigqueryTableIdentifier.from_string_name(name).dataset.startswith(
self.config.temp_table_dataset_prefix
)
table = BigqueryTableIdentifier.from_string_name(name)
if table.dataset.startswith(self.config.temp_table_dataset_prefix):
return True
# This is also a temp table if
# 1. this name would be allowed by the dataset patterns, and
# 2. we have a list of discovered tables, and
# 3. it's not in the discovered tables list
if (
self.filters.is_allowed(table)
and self.discovered_tables
and str(BigQueryTableRef(table)) not in self.discovered_tables
):
return True
except Exception:
logger.warning(f"Error parsing table name {name} ")
return False
return False
def is_allowed_table(self, name: str) -> bool:
try:
table_id = BigqueryTableIdentifier.from_string_name(name)
if self.discovered_tables and str(table_id) not in self.discovered_tables:
table = BigqueryTableIdentifier.from_string_name(name)
if (
self.discovered_tables
and str(BigQueryTableRef(table)) not in self.discovered_tables
):
return False
return self.filters.is_allowed(table_id)
return self.filters.is_allowed(table)
except Exception:
logger.warning(f"Error parsing table name {name} ")
return False

View File

@ -166,7 +166,7 @@ class SnowflakeV2Source(
# If we're ingestion schema metadata for tables/views, then we will populate
# schemas into the resolver as we go. We only need to do a bulk fetch
# if we're not ingesting schema metadata as part of ingestion.
(
not (
self.config.include_technical_schema
and self.config.include_tables
and self.config.include_views

View File

@ -37,6 +37,7 @@ from datahub.sql_parsing.sql_parsing_common import QueryType, QueryTypeProps
from datahub.sql_parsing.sqlglot_lineage import (
ColumnLineageInfo,
ColumnRef,
DownstreamColumnRef,
SqlParsingResult,
infer_output_schema,
sqlglot_lineage,
@ -574,9 +575,6 @@ class SqlParsingAggregator(Closeable):
Because this method takes in urns, it does not require that the urns
are part of the platform that the aggregator is configured for.
TODO: In the future, this method will also generate CLL if we have
schemas for either the upstream or downstream.
The known lineage mapping does not contribute to usage statistics or operations.
Args:
@ -589,6 +587,21 @@ class SqlParsingAggregator(Closeable):
# We generate a fake "query" object to hold the lineage.
query_id = self._known_lineage_query_id()
# Generate CLL if schema of downstream is known
column_lineage: List[ColumnLineageInfo] = []
if self._schema_resolver.has_urn(downstream_urn):
schema = self._schema_resolver._resolve_schema_info(downstream_urn)
if schema:
column_lineage = [
ColumnLineageInfo(
downstream=DownstreamColumnRef(
table=downstream_urn, column=field_path
),
upstreams=[ColumnRef(table=upstream_urn, column=field_path)],
)
for field_path in schema
]
# Register the query.
self._add_to_query_map(
QueryMetadata(
@ -600,7 +613,7 @@ class SqlParsingAggregator(Closeable):
latest_timestamp=None,
actor=None,
upstreams=[upstream_urn],
column_lineage=[],
column_lineage=column_lineage,
column_usage={},
confidence_score=1.0,
)

View File

@ -401,6 +401,22 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,project-id-1.bigquery-dataset-1.snapshot-table-1,PROD)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "bigquery-2022_02_03-07_00_00",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "glossaryTerm",
"entityUrn": "urn:li:glossaryTerm:Age",
@ -417,6 +433,97 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,project-id-1.bigquery-dataset-1.snapshot-table-1,PROD)",
"changeType": "UPSERT",
"aspectName": "schemaMetadata",
"aspect": {
"json": {
"schemaName": "project-id-1.bigquery-dataset-1.snapshot-table-1",
"platform": "urn:li:dataPlatform:bigquery",
"version": 0,
"created": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
"hash": "",
"platformSchema": {
"com.linkedin.schema.MySqlDDL": {
"tableSchema": ""
}
},
"fields": [
{
"fieldPath": "age",
"nullable": false,
"description": "comment",
"type": {
"type": {
"com.linkedin.schema.NumberType": {}
}
},
"nativeDataType": "INT",
"recursive": false,
"globalTags": {
"tags": [
{
"tag": "urn:li:tag:Test Policy Tag"
}
]
},
"isPartOfKey": false
},
{
"fieldPath": "email",
"nullable": false,
"description": "comment",
"type": {
"type": {
"com.linkedin.schema.StringType": {}
}
},
"nativeDataType": "STRING",
"recursive": false,
"globalTags": {
"tags": []
},
"isPartOfKey": false
}
]
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "bigquery-2022_02_03-07_00_00",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,project-id-1.bigquery-dataset-1.snapshot-table-1,PROD)",
"changeType": "UPSERT",
"aspectName": "datasetProperties",
"aspect": {
"json": {
"customProperties": {},
"externalUrl": "https://console.cloud.google.com/bigquery?project=project-id-1&ws=!1m5!1m4!4m3!1sproject-id-1!2sbigquery-dataset-1!3ssnapshot-table-1",
"name": "snapshot-table-1",
"qualifiedName": "project-id-1.bigquery-dataset-1.snapshot-table-1",
"description": "",
"tags": []
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "bigquery-2022_02_03-07_00_00",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "glossaryTerm",
"entityUrn": "urn:li:glossaryTerm:Email_Address",
@ -433,6 +540,57 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,project-id-1.bigquery-dataset-1.snapshot-table-1,PROD)",
"changeType": "UPSERT",
"aspectName": "subTypes",
"aspect": {
"json": {
"typeNames": [
"Bigquery Table Snapshot"
]
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "bigquery-2022_02_03-07_00_00",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,project-id-1.bigquery-dataset-1.snapshot-table-1,PROD)",
"changeType": "UPSERT",
"aspectName": "container",
"aspect": {
"json": {
"container": "urn:li:container:8df46c5e3ded05a3122b0015822c0ef0"
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "bigquery-2022_02_03-07_00_00",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,project-id-1.bigquery-dataset-1.snapshot-table-1,PROD)",
"changeType": "UPSERT",
"aspectName": "dataPlatformInstance",
"aspect": {
"json": {
"platform": "urn:li:dataPlatform:bigquery",
"instance": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:bigquery,project-id-1)"
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "bigquery-2022_02_03-07_00_00",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "tag",
"entityUrn": "urn:li:tag:Test Policy Tag",
@ -448,5 +606,83 @@
"runId": "bigquery-2022_02_03-07_00_00",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,project-id-1.bigquery-dataset-1.snapshot-table-1,PROD)",
"changeType": "UPSERT",
"aspectName": "browsePathsV2",
"aspect": {
"json": {
"path": [
{
"id": "urn:li:container:068bd9323110994a40019fcf6cfc60d3",
"urn": "urn:li:container:068bd9323110994a40019fcf6cfc60d3"
},
{
"id": "urn:li:container:8df46c5e3ded05a3122b0015822c0ef0",
"urn": "urn:li:container:8df46c5e3ded05a3122b0015822c0ef0"
}
]
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "bigquery-2022_02_03-07_00_00",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,project-id-1.bigquery-dataset-1.snapshot-table-1,PROD)",
"changeType": "UPSERT",
"aspectName": "upstreamLineage",
"aspect": {
"json": {
"upstreams": [
{
"auditStamp": {
"time": 1643871600000,
"actor": "urn:li:corpuser:_ingestion"
},
"created": {
"time": 0,
"actor": "urn:li:corpuser:_ingestion"
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:bigquery,project-id-1.bigquery-dataset-1.table-1,PROD)",
"type": "COPY"
}
],
"fineGrainedLineages": [
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,project-id-1.bigquery-dataset-1.table-1,PROD),age)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,project-id-1.bigquery-dataset-1.snapshot-table-1,PROD),age)"
],
"confidenceScore": 1.0
},
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,project-id-1.bigquery-dataset-1.table-1,PROD),email)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,project-id-1.bigquery-dataset-1.snapshot-table-1,PROD),email)"
],
"confidenceScore": 1.0
}
]
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "bigquery-2022_02_03-07_00_00",
"lastRunId": "no-run-id-provided"
}
}
]

View File

@ -11,6 +11,7 @@ from datahub.ingestion.glossary.classifier import (
DynamicTypedClassifierConfig,
)
from datahub.ingestion.glossary.datahub_classifier import DataHubClassifierConfig
from datahub.ingestion.source.bigquery_v2.bigquery_audit import BigqueryTableIdentifier
from datahub.ingestion.source.bigquery_v2.bigquery_data_reader import BigQueryDataReader
from datahub.ingestion.source.bigquery_v2.bigquery_schema import (
BigqueryColumn,
@ -18,6 +19,7 @@ from datahub.ingestion.source.bigquery_v2.bigquery_schema import (
BigqueryProject,
BigQuerySchemaApi,
BigqueryTable,
BigqueryTableSnapshot,
)
from datahub.ingestion.source.bigquery_v2.bigquery_schema_gen import (
BigQuerySchemaGenerator,
@ -47,7 +49,7 @@ def recipe(mcp_output_path: str, override: dict = {}) -> dict:
"config": {
"project_ids": ["project-id-1"],
"include_usage_statistics": False,
"include_table_lineage": False,
"include_table_lineage": True,
"include_data_platform_instance": True,
"classification": ClassificationConfig(
enabled=True,
@ -68,6 +70,7 @@ def recipe(mcp_output_path: str, override: dict = {}) -> dict:
@freeze_time(FROZEN_TIME)
@patch.object(BigQuerySchemaApi, "get_snapshots_for_dataset")
@patch.object(BigQuerySchemaApi, "get_tables_for_dataset")
@patch.object(BigQuerySchemaGenerator, "get_core_table_details")
@patch.object(BigQuerySchemaApi, "get_datasets_for_project_id")
@ -85,6 +88,7 @@ def test_bigquery_v2_ingest(
get_datasets_for_project_id,
get_core_table_details,
get_tables_for_dataset,
get_snapshots_for_dataset,
pytestconfig,
tmp_path,
):
@ -100,31 +104,35 @@ def test_bigquery_v2_ingest(
{"tableReference": {"projectId": "", "datasetId": "", "tableId": ""}}
)
table_name = "table-1"
snapshot_table_name = "snapshot-table-1"
get_core_table_details.return_value = {table_name: table_list_item}
columns = [
BigqueryColumn(
name="age",
ordinal_position=1,
is_nullable=False,
field_path="col_1",
data_type="INT",
comment="comment",
is_partition_column=False,
cluster_column_position=None,
policy_tags=["Test Policy Tag"],
),
BigqueryColumn(
name="email",
ordinal_position=1,
is_nullable=False,
field_path="col_2",
data_type="STRING",
comment="comment",
is_partition_column=False,
cluster_column_position=None,
),
]
get_columns_for_dataset.return_value = {
table_name: [
BigqueryColumn(
name="age",
ordinal_position=1,
is_nullable=False,
field_path="col_1",
data_type="INT",
comment="comment",
is_partition_column=False,
cluster_column_position=None,
policy_tags=["Test Policy Tag"],
),
BigqueryColumn(
name="email",
ordinal_position=1,
is_nullable=False,
field_path="col_2",
data_type="STRING",
comment="comment",
is_partition_column=False,
cluster_column_position=None,
),
]
table_name: columns,
snapshot_table_name: columns,
}
get_sample_data_for_table.return_value = {
"age": [random.randint(1, 80) for i in range(20)],
@ -140,6 +148,20 @@ def test_bigquery_v2_ingest(
rows_count=None,
)
get_tables_for_dataset.return_value = iter([bigquery_table])
snapshot_table = BigqueryTableSnapshot(
name=snapshot_table_name,
comment=None,
created=None,
last_altered=None,
size_in_bytes=None,
rows_count=None,
base_table_identifier=BigqueryTableIdentifier(
project_id="project-id-1",
dataset="bigquery-dataset-1",
table="table-1",
),
)
get_snapshots_for_dataset.return_value = iter([snapshot_table])
pipeline_config_dict: Dict[str, Any] = recipe(mcp_output_path=mcp_output_path)

View File

@ -6,7 +6,6 @@ from unittest.mock import patch
import pytest
from freezegun import freeze_time
from datahub.ingestion.source.usage.usage_common import BaseUsageConfig
from datahub.sql_parsing.sql_parsing_aggregator import ObservedQuery
from datahub.utilities.file_backed_collections import ConnectionWrapper, FileBackedList
from tests.test_helpers import mce_helpers
@ -58,15 +57,12 @@ def test_queries_ingestion(project_client, client, pytestconfig, monkeypatch, tm
"config": {
"project_ids": ["gcp-staging", "gcp-staging-2"],
"local_temp_path": tmp_path,
"top_n_queries": 20,
},
},
"sink": {"type": "file", "config": {"filename": mcp_output_path}},
}
# This is hacky to pick all queries instead of any 10.
# Should be easy to remove once top_n_queries is supported in queries config
monkeypatch.setattr(BaseUsageConfig.__fields__["top_n_queries"], "default", 20)
pipeline = run_and_get_pipeline(pipeline_config_dict)
pipeline.pretty_print_summary()

View File

@ -83,7 +83,10 @@ def test_lineage_with_timestamps(lineage_entries: List[QueryEvent]) -> None:
config = BigQueryV2Config()
report = BigQueryV2Report()
extractor: BigqueryLineageExtractor = BigqueryLineageExtractor(
config, report, BigQueryIdentifierBuilder(config, report)
config,
report,
schema_resolver=SchemaResolver(platform="bigquery"),
identifiers=BigQueryIdentifierBuilder(config, report),
)
bq_table = BigQueryTableRef.from_string_name(
@ -91,8 +94,7 @@ def test_lineage_with_timestamps(lineage_entries: List[QueryEvent]) -> None:
)
lineage_map: Dict[str, Set[LineageEdge]] = extractor._create_lineage_map(
iter(lineage_entries),
sql_parser_schema_resolver=SchemaResolver(platform="bigquery"),
iter(lineage_entries)
)
upstream_lineage = extractor.get_lineage_for_table(
@ -108,7 +110,10 @@ def test_column_level_lineage(lineage_entries: List[QueryEvent]) -> None:
config = BigQueryV2Config(extract_column_lineage=True, incremental_lineage=False)
report = BigQueryV2Report()
extractor: BigqueryLineageExtractor = BigqueryLineageExtractor(
config, report, BigQueryIdentifierBuilder(config, report)
config,
report,
schema_resolver=SchemaResolver(platform="bigquery"),
identifiers=BigQueryIdentifierBuilder(config, report),
)
bq_table = BigQueryTableRef.from_string_name(
@ -117,7 +122,6 @@ def test_column_level_lineage(lineage_entries: List[QueryEvent]) -> None:
lineage_map: Dict[str, Set[LineageEdge]] = extractor._create_lineage_map(
lineage_entries[:1],
sql_parser_schema_resolver=SchemaResolver(platform="bigquery"),
)
upstream_lineage = extractor.get_lineage_for_table(