diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index a9ae33e960..fd3923222b 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -145,15 +145,21 @@ plugins: Dict[str, Set[str]] = { "airflow": { "apache-airflow >= 1.10.2", }, - "great-expectations": sql_common | {"sqllineage==1.3.3"}, + "great-expectations": sql_common | {"sqllineage==1.3.4"}, # Source plugins # PyAthena is pinned with exact version because we use private method in PyAthena "athena": sql_common | {"PyAthena[SQLAlchemy]==2.4.1"}, "azure-ad": set(), - "bigquery": sql_common | bigquery_common | {"sqlalchemy-bigquery>=1.4.1", "sqlparse"}, + "bigquery": sql_common + | bigquery_common + | {"sqlalchemy-bigquery>=1.4.1", "sqllineage==1.3.4", "sqlparse"}, "bigquery-usage": bigquery_common | usage_common | {"cachetools"}, "clickhouse": sql_common | {"clickhouse-sqlalchemy==0.1.8"}, - "clickhouse-usage": sql_common | usage_common | {"clickhouse-sqlalchemy==0.1.8", }, + "clickhouse-usage": sql_common + | usage_common + | { + "clickhouse-sqlalchemy==0.1.8", + }, "datahub-lineage-file": set(), "datahub-business-glossary": set(), "data-lake": {*data_lake_base, *data_lake_profiling}, @@ -181,9 +187,9 @@ plugins: Dict[str, Set[str]] = { "looker": looker_common, # lkml>=1.1.2 is required to support the sql_preamble expression in LookML "lookml": looker_common - | {"lkml>=1.1.2", "sql-metadata==2.2.2", "sqllineage==1.3.3"}, - "metabase": {"requests", "sqllineage==1.3.3"}, - "mode": {"requests", "sqllineage==1.3.3", "tenacity>=8.0.1"}, + | {"lkml>=1.1.2", "sql-metadata==2.2.2", "sqllineage==1.3.4"}, + "metabase": {"requests", "sqllineage==1.3.4"}, + "mode": {"requests", "sqllineage==1.3.4", "tenacity>=8.0.1"}, "mongodb": {"pymongo>=3.11", "packaging"}, "mssql": sql_common | {"sqlalchemy-pytds>=0.3"}, "mssql-odbc": sql_common | {"pyodbc"}, @@ -193,20 +199,24 @@ plugins: Dict[str, Set[str]] = { "okta": {"okta~=1.7.0"}, "oracle": sql_common | {"cx_Oracle"}, "postgres": sql_common | {"psycopg2-binary", "GeoAlchemy2"}, - "redash": {"redash-toolbelt", "sql-metadata", "sqllineage==1.3.3"}, + "redash": {"redash-toolbelt", "sql-metadata", "sqllineage==1.3.4"}, "redshift": sql_common - | {"sqlalchemy-redshift", "psycopg2-binary", "GeoAlchemy2", "sqllineage==1.3.3"}, + | {"sqlalchemy-redshift", "psycopg2-binary", "GeoAlchemy2", "sqllineage==1.3.4"}, "redshift-usage": sql_common | usage_common | { "sqlalchemy-redshift", "psycopg2-binary", "GeoAlchemy2", - "sqllineage==1.3.3", + "sqllineage==1.3.4", }, "sagemaker": aws_common, "snowflake": snowflake_common, - "snowflake-usage": snowflake_common | usage_common | {"more-itertools>=8.12.0", }, + "snowflake-usage": snowflake_common + | usage_common + | { + "more-itertools>=8.12.0", + }, "sqlalchemy": sql_common, "superset": { "requests", diff --git a/metadata-ingestion/source_docs/bigquery.md b/metadata-ingestion/source_docs/bigquery.md index 0061efe642..14c20f07dd 100644 --- a/metadata-ingestion/source_docs/bigquery.md +++ b/metadata-ingestion/source_docs/bigquery.md @@ -157,6 +157,7 @@ As a SQL-based service, the Athena integration is also supported by our SQL prof | `domain.domain_key.ignoreCase` | | `True` | Whether to ignore case sensitivity during pattern matching.There can be multiple domain key specified. | | `lineage_client_project_id` | | None | The project to use when creating the BigQuery Client. If left empty, the required `project_id` will be used. This is helpful in case the default project_id is not used for querying. | | `use_v2_audit_metadata` | | `False` | Whether to use `BigQuery audit logs` to get the lineage or not | +| `upstream_lineage_in_report` | | `False` | Useful for debugging lineage information. Set to `True` to see the raw lineage created internally. | The following parameters are only relevant if include_table_lineage is set to true: diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py b/metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py index 7c35cef41f..7db661720f 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py @@ -53,6 +53,7 @@ from datahub.metadata.schema_classes import ( UpstreamClass, UpstreamLineageClass, ) +from datahub.utilities.sql_parser import DefaultSQLParser logger = logging.getLogger(__name__) @@ -69,7 +70,11 @@ AND AND NOT protoPayload.serviceData.jobCompletedEvent.job.jobStatus.error.code:* AND - protoPayload.serviceData.jobCompletedEvent.job.jobStatistics.referencedTables:* + ( + protoPayload.serviceData.jobCompletedEvent.job.jobStatistics.referencedTables:* + OR + protoPayload.serviceData.jobCompletedEvent.job.jobStatistics.referencedViews:* + ) ) ) AND @@ -91,7 +96,11 @@ AND AND protoPayload.metadata.jobChange.job.jobStatus.jobState="DONE" AND NOT protoPayload.metadata.jobChange.job.jobStatus.errorResult:* - AND protoPayload.metadata.jobChange.job.jobStats.queryStats.referencedTables:* + AND ( + protoPayload.metadata.jobChange.job.jobStats.queryStats.referencedTables:* + OR + protoPayload.metadata.jobChange.job.jobStats.queryStats.referencedViews:* + ) ) AND timestamp >= "{start_time}" @@ -493,25 +502,57 @@ class BigQuerySource(SQLAlchemySource): def _create_lineage_map(self, entries: Iterable[QueryEvent]) -> Dict[str, Set[str]]: lineage_map: Dict[str, Set[str]] = collections.defaultdict(set) - num_entries: int = 0 - num_skipped_entries: int = 0 + self.report.num_total_lineage_entries = 0 + self.report.num_skipped_lineage_entries_missing_data = 0 + self.report.num_skipped_lineage_entries_not_allowed = 0 + self.report.num_skipped_lineage_entries_other = 0 for e in entries: - num_entries += 1 - if e.destinationTable is None or not e.referencedTables: - num_skipped_entries += 1 + self.report.num_total_lineage_entries += 1 + if e.destinationTable is None or not ( + e.referencedTables or e.referencedViews + ): + self.report.num_skipped_lineage_entries_missing_data += 1 continue - entry_consumed: bool = False + # Skip if schema/table pattern don't allow the destination table + destination_table_str = str(e.destinationTable.remove_extras()) + destination_table_str_parts = destination_table_str.split("/") + if not self.config.schema_pattern.allowed( + destination_table_str_parts[3] + ) or not self.config.table_pattern.allowed(destination_table_str_parts[-1]): + self.report.num_skipped_lineage_entries_not_allowed += 1 + continue + has_table = False for ref_table in e.referencedTables: - destination_table_str = str(e.destinationTable.remove_extras()) ref_table_str = str(ref_table.remove_extras()) if ref_table_str != destination_table_str: lineage_map[destination_table_str].add(ref_table_str) - entry_consumed = True - if not entry_consumed: - num_skipped_entries += 1 - logger.info( - f"Creating lineage map: total number of entries={num_entries}, number skipped={num_skipped_entries}." - ) + has_table = True + has_view = False + for ref_view in e.referencedViews: + ref_view_str = str(ref_view.remove_extras()) + if ref_view_str != destination_table_str: + lineage_map[destination_table_str].add(ref_view_str) + has_view = True + if has_table and has_view: + # If there is a view being referenced then bigquery sends both the view as well as underlying table + # in the references. There is no distinction between direct/base objects accessed. So doing sql parsing + # to ensure we only use direct objects accessed for lineage + parser = DefaultSQLParser(e.query) + referenced_objs = set( + map(lambda x: x.split(".")[-1], parser.get_tables()) + ) + curr_lineage_str = lineage_map[destination_table_str] + new_lineage_str = set() + for lineage_str in curr_lineage_str: + name = lineage_str.split("/")[-1] + if name in referenced_objs: + new_lineage_str.add(lineage_str) + lineage_map[destination_table_str] = new_lineage_str + if not (has_table or has_view): + self.report.num_skipped_lineage_entries_other += 1 + + if self.config.upstream_lineage_in_report: + self.report.upstream_lineage = lineage_map return lineage_map def get_latest_partition( @@ -688,7 +729,7 @@ WHERE for ref_table in self.lineage_metadata[str(bq_table)]: upstream_table = BigQueryTableRef.from_string_name(ref_table) if upstream_table.is_temporary_table(): - # making sure we don't process a table twice and not get into a recurisve loop + # making sure we don't process a table twice and not get into a recursive loop if ref_table in tables_seen: logger.debug( f"Skipping table {ref_table} because it was seen already" diff --git a/metadata-ingestion/src/datahub/ingestion/source/usage/bigquery_usage.py b/metadata-ingestion/src/datahub/ingestion/source/usage/bigquery_usage.py index 0c47c96325..dcb94d7304 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/usage/bigquery_usage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/usage/bigquery_usage.py @@ -4,7 +4,7 @@ import json import logging import os import re -from dataclasses import dataclass +from dataclasses import dataclass, field from datetime import datetime from typing import Any, Dict, Iterable, List, MutableMapping, Optional, Union, cast @@ -300,14 +300,13 @@ class QueryEvent: timestamp: datetime actor_email: str - query: str - statementType: Optional[str] - destinationTable: Optional[BigQueryTableRef] - referencedTables: Optional[List[BigQueryTableRef]] - jobName: Optional[str] - - payload: Any + statementType: Optional[str] = None + destinationTable: Optional[BigQueryTableRef] = None + referencedTables: List[BigQueryTableRef] = field(default_factory=list) + referencedViews: List[BigQueryTableRef] = field(default_factory=list) + jobName: Optional[str] = None + payload: Optional[Dict] = None @staticmethod def get_missing_key_entry(entry: AuditLogEntry) -> Optional[str]: @@ -323,48 +322,49 @@ class QueryEvent: @classmethod def from_entry(cls, entry: AuditLogEntry) -> "QueryEvent": - user = entry.payload["authenticationInfo"]["principalEmail"] - - job = entry.payload["serviceData"]["jobCompletedEvent"]["job"] - jobName = _job_name_ref( + job: Dict = entry.payload["serviceData"]["jobCompletedEvent"]["job"] + job_query_conf: Dict = job["jobConfiguration"]["query"] + # basic query_event + query_event = QueryEvent( + timestamp=entry.timestamp, + actor_email=entry.payload["authenticationInfo"]["principalEmail"], + query=job_query_conf["query"], + ) + # jobName + query_event.jobName = _job_name_ref( job.get("jobName", {}).get("projectId"), job.get("jobName", {}).get("jobId") ) - rawQuery = job["jobConfiguration"]["query"]["query"] - - rawDestTable = job["jobConfiguration"]["query"]["destinationTable"] - destinationTable = None - if rawDestTable: - destinationTable = BigQueryTableRef.from_spec_obj(rawDestTable) - - try: - statementType = job["jobConfiguration"]["query"]["statementType"] - except KeyError: - statementType = None - - rawRefTables = job["jobStatistics"].get("referencedTables") - referencedTables = None - if rawRefTables: - referencedTables = [ - BigQueryTableRef.from_spec_obj(spec) for spec in rawRefTables + # destinationTable + raw_dest_table = job_query_conf.get("destinationTable") + if raw_dest_table: + query_event.destinationTable = BigQueryTableRef.from_spec_obj( + raw_dest_table + ) + # statementType + query_event.statementType = job_query_conf.get("statementType") + # referencedTables + job_stats: Dict = job["jobStatistics"] + raw_ref_tables = job_stats.get("referencedTables") + if raw_ref_tables: + query_event.referencedTables = [ + BigQueryTableRef.from_spec_obj(spec) for spec in raw_ref_tables ] + # referencedViews + raw_ref_views = job_stats.get("referencedViews") + if raw_ref_views: + query_event.referencedViews = [ + BigQueryTableRef.from_spec_obj(spec) for spec in raw_ref_views + ] + # payload + query_event.payload = entry.payload if DEBUG_INCLUDE_FULL_PAYLOADS else None - queryEvent = QueryEvent( - timestamp=entry.timestamp, - actor_email=user, - query=rawQuery, - statementType=statementType, - destinationTable=destinationTable, - referencedTables=referencedTables, - jobName=jobName, - payload=entry.payload if DEBUG_INCLUDE_FULL_PAYLOADS else None, - ) - if not jobName: + if not query_event.jobName: logger.debug( "jobName from query events is absent. " "Auditlog entry - {logEntry}".format(logEntry=entry) ) - return queryEvent + return query_event @staticmethod def get_missing_key_exported_bigquery_audit_metadata( @@ -376,45 +376,44 @@ class QueryEvent: def from_exported_bigquery_audit_metadata( cls, row: BigQueryAuditMetadata ) -> "QueryEvent": - timestamp = row["timestamp"] - payload = row["protoPayload"] - metadata = json.loads(row["metadata"]) - user = payload["authenticationInfo"]["principalEmail"] - - job = metadata["jobChange"]["job"] - - job_name = job.get("jobName") - raw_query = job["jobConfig"]["queryConfig"]["query"] - - raw_dest_table = job["jobConfig"]["queryConfig"].get("destinationTable") - destination_table = None + payload: Dict = row["protoPayload"] + metadata: Dict = json.loads(row["metadata"]) + job: Dict = metadata["jobChange"]["job"] + query_config: Dict = job["jobConfig"]["queryConfig"] + # basic query_event + query_event = QueryEvent( + timestamp=row["timestamp"], + actor_email=payload["authenticationInfo"]["principalEmail"], + query=query_config["query"], + ) + # jobName + query_event.jobName = job.get("jobName") + # destinationTable + raw_dest_table = query_config.get("destinationTable") if raw_dest_table: - destination_table = BigQueryTableRef.from_string_name(raw_dest_table) - - raw_ref_tables = job["jobStats"]["queryStats"].get("referencedTables") - referenced_tables = None + query_event.destinationTable = BigQueryTableRef.from_string_name( + raw_dest_table + ) + # referencedTables + query_stats: Dict = job["jobStats"]["queryStats"] + raw_ref_tables = query_stats.get("referencedTables") if raw_ref_tables: - referenced_tables = [ + query_event.referencedTables = [ BigQueryTableRef.from_string_name(spec) for spec in raw_ref_tables ] + # referencedViews + raw_ref_views = query_stats.get("referencedViews") + if raw_ref_views: + query_event.referencedViews = [ + BigQueryTableRef.from_string_name(spec) for spec in raw_ref_views + ] + # statementType + query_event.statementType = query_config.get("statementType") + # payload + query_event.payload = payload if DEBUG_INCLUDE_FULL_PAYLOADS else None - try: - statementType = job["jobConfiguration"]["query"]["statementType"] - except KeyError: - statementType = None - - query_event = QueryEvent( - timestamp=timestamp, - actor_email=user, - query=raw_query, - statementType=statementType, - destinationTable=destination_table, - referencedTables=referenced_tables, - jobName=job_name, - payload=payload if DEBUG_INCLUDE_FULL_PAYLOADS else None, - ) - if not job_name: + if not query_event.jobName: logger.debug( "jobName from query events is absent. " "BigQueryAuditMetadata entry - {logEntry}".format(logEntry=row) @@ -424,45 +423,42 @@ class QueryEvent: @classmethod def from_entry_v2(cls, row: BigQueryAuditMetadata) -> "QueryEvent": - timestamp = row.timestamp - payload = row.payload - metadata = payload["metadata"] - - user = payload["authenticationInfo"]["principalEmail"] - - job = metadata["jobChange"]["job"] - - job_name = job.get("jobName") - raw_query = job["jobConfig"]["queryConfig"]["query"] - - raw_dest_table = job["jobConfig"]["queryConfig"].get("destinationTable") - destination_table = None + payload: Dict = row.payload + metadata: Dict = payload["metadata"] + job: Dict = metadata["jobChange"]["job"] + query_config: Dict = job["jobConfig"]["queryConfig"] + # basic query_event + query_event = QueryEvent( + timestamp=row.timestamp, + actor_email=payload["authenticationInfo"]["principalEmail"], + query=query_config["query"], + ) + query_event.jobName = job.get("jobName") + # destinationTable + raw_dest_table = query_config.get("destinationTable") if raw_dest_table: - destination_table = BigQueryTableRef.from_string_name(raw_dest_table) - - raw_ref_tables = job["jobStats"]["queryStats"].get("referencedTables") - referenced_tables = None + query_event.destinationTable = BigQueryTableRef.from_string_name( + raw_dest_table + ) + # statementType + query_event.statementType = query_config.get("statementType") + # referencedTables + query_stats: Dict = job["jobStats"]["queryStats"] + raw_ref_tables = query_stats.get("referencedTables") if raw_ref_tables: - referenced_tables = [ + query_event.referencedTables = [ BigQueryTableRef.from_string_name(spec) for spec in raw_ref_tables ] + # referencedViews + raw_ref_views = query_stats.get("referencedViews") + if raw_ref_views: + query_event.referencedViews = [ + BigQueryTableRef.from_string_name(spec) for spec in raw_ref_views + ] + # payload + query_event.payload = payload if DEBUG_INCLUDE_FULL_PAYLOADS else None - try: - statementType = job["jobConfig"]["queryConfig"]["statementType"] - except KeyError: - statementType = None - - query_event = QueryEvent( - timestamp=timestamp, - actor_email=user, - query=raw_query, - statementType=statementType, - destinationTable=destination_table, - referencedTables=referenced_tables, - jobName=job_name, - payload=payload if DEBUG_INCLUDE_FULL_PAYLOADS else None, - ) - if not job_name: + if not query_event.jobName: logger.debug( "jobName from query events is absent. " "BigQueryAuditMetadata entry - {logEntry}".format(logEntry=row) @@ -511,17 +507,22 @@ class BigQueryUsageSource(Source): Iterable[MetadataWorkUnit], last_updated_work_units_uncasted ) if self.config.include_operational_stats: + self.report.num_operational_stats_workunits_emitted = 0 for wu in last_updated_work_units: self.report.report_workunit(wu) yield wu + self.report.num_operational_stats_workunits_emitted += 1 + hydrated_read_events = self._join_events_by_job_id(parsed_events) aggregated_info = self._aggregate_enriched_read_events(hydrated_read_events) + self.report.num_usage_workunits_emitted = 0 for time_bucket in aggregated_info.values(): for aggregate in time_bucket.values(): wu = self._make_usage_stat(aggregate) self.report.report_workunit(wu) yield wu + self.report.num_usage_workunits_emitted += 1 def _make_bigquery_clients(self) -> List[GCPLoggingClient]: # See https://github.com/googleapis/google-cloud-python/issues/2674 for diff --git a/metadata-ingestion/src/datahub/ingestion/source_config/sql/bigquery.py b/metadata-ingestion/src/datahub/ingestion/source_config/sql/bigquery.py index 24dee92540..ebef6ef5be 100644 --- a/metadata-ingestion/src/datahub/ingestion/source_config/sql/bigquery.py +++ b/metadata-ingestion/src/datahub/ingestion/source_config/sql/bigquery.py @@ -17,20 +17,19 @@ class BigQueryConfig(BaseTimeWindowConfig, SQLAlchemyConfig): scheme: str = "bigquery" project_id: Optional[str] = None lineage_client_project_id: Optional[str] = None - log_page_size: Optional[pydantic.PositiveInt] = 1000 credential: Optional[BigQueryCredential] # extra_client_options, include_table_lineage and max_query_duration are relevant only when computing the lineage. extra_client_options: Dict[str, Any] = {} include_table_lineage: Optional[bool] = True max_query_duration: timedelta = timedelta(minutes=15) - credentials_path: Optional[str] = None bigquery_audit_metadata_datasets: Optional[List[str]] = None use_exported_bigquery_audit_metadata: bool = False use_date_sharded_audit_log_tables: bool = False _credentials_path: Optional[str] = pydantic.PrivateAttr(None) use_v2_audit_metadata: Optional[bool] = False + upstream_lineage_in_report: bool = False def __init__(self, **data: Any): super().__init__(**data) diff --git a/metadata-ingestion/src/datahub/ingestion/source_report/sql/bigquery.py b/metadata-ingestion/src/datahub/ingestion/source_report/sql/bigquery.py index c0d662f2b6..df943f8bc4 100644 --- a/metadata-ingestion/src/datahub/ingestion/source_report/sql/bigquery.py +++ b/metadata-ingestion/src/datahub/ingestion/source_report/sql/bigquery.py @@ -1,12 +1,16 @@ -from dataclasses import dataclass +from dataclasses import dataclass, field from datetime import datetime -from typing import Optional +from typing import Dict, Optional from datahub.ingestion.source.sql.sql_common import SQLSourceReport @dataclass class BigQueryReport(SQLSourceReport): + num_total_lineage_entries: Optional[int] = None + num_skipped_lineage_entries_missing_data: Optional[int] = None + num_skipped_lineage_entries_not_allowed: Optional[int] = None + num_skipped_lineage_entries_other: Optional[int] = None num_total_log_entries: Optional[int] = None num_parsed_log_entires: Optional[int] = None num_total_audit_entries: Optional[int] = None @@ -20,3 +24,4 @@ class BigQueryReport(SQLSourceReport): log_entry_end_time: Optional[str] = None audit_start_time: Optional[str] = None audit_end_time: Optional[str] = None + upstream_lineage: Dict = field(default_factory=dict) diff --git a/metadata-ingestion/src/datahub/ingestion/source_report/usage/bigquery_usage.py b/metadata-ingestion/src/datahub/ingestion/source_report/usage/bigquery_usage.py index c8b785e20f..2a2dfcc771 100644 --- a/metadata-ingestion/src/datahub/ingestion/source_report/usage/bigquery_usage.py +++ b/metadata-ingestion/src/datahub/ingestion/source_report/usage/bigquery_usage.py @@ -21,6 +21,8 @@ class BigQueryUsageSourceReport(SourceReport): deny_pattern: Optional[str] = None log_entry_start_time: Optional[str] = None log_entry_end_time: Optional[str] = None + num_usage_workunits_emitted: Optional[int] = None + num_operational_stats_workunits_emitted: Optional[int] = None def report_dropped(self, key: str) -> None: self.dropped_table[key] += 1