fix(bigquery): incorrect lineage when views are present (#4568)

* fix(bigquery): incorrect lineage when views are present

Co-authored-by: Ravindra Lanka <rlanka@acryl.io>
This commit is contained in:
Aseem Bansal 2022-04-07 05:59:02 +05:30 committed by GitHub
parent 3a5cf8eded
commit 5ebb37ab4c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 197 additions and 138 deletions

View File

@ -145,15 +145,21 @@ plugins: Dict[str, Set[str]] = {
"airflow": {
"apache-airflow >= 1.10.2",
},
"great-expectations": sql_common | {"sqllineage==1.3.3"},
"great-expectations": sql_common | {"sqllineage==1.3.4"},
# Source plugins
# PyAthena is pinned with exact version because we use private method in PyAthena
"athena": sql_common | {"PyAthena[SQLAlchemy]==2.4.1"},
"azure-ad": set(),
"bigquery": sql_common | bigquery_common | {"sqlalchemy-bigquery>=1.4.1", "sqlparse"},
"bigquery": sql_common
| bigquery_common
| {"sqlalchemy-bigquery>=1.4.1", "sqllineage==1.3.4", "sqlparse"},
"bigquery-usage": bigquery_common | usage_common | {"cachetools"},
"clickhouse": sql_common | {"clickhouse-sqlalchemy==0.1.8"},
"clickhouse-usage": sql_common | usage_common | {"clickhouse-sqlalchemy==0.1.8", },
"clickhouse-usage": sql_common
| usage_common
| {
"clickhouse-sqlalchemy==0.1.8",
},
"datahub-lineage-file": set(),
"datahub-business-glossary": set(),
"data-lake": {*data_lake_base, *data_lake_profiling},
@ -181,9 +187,9 @@ plugins: Dict[str, Set[str]] = {
"looker": looker_common,
# lkml>=1.1.2 is required to support the sql_preamble expression in LookML
"lookml": looker_common
| {"lkml>=1.1.2", "sql-metadata==2.2.2", "sqllineage==1.3.3"},
"metabase": {"requests", "sqllineage==1.3.3"},
"mode": {"requests", "sqllineage==1.3.3", "tenacity>=8.0.1"},
| {"lkml>=1.1.2", "sql-metadata==2.2.2", "sqllineage==1.3.4"},
"metabase": {"requests", "sqllineage==1.3.4"},
"mode": {"requests", "sqllineage==1.3.4", "tenacity>=8.0.1"},
"mongodb": {"pymongo>=3.11", "packaging"},
"mssql": sql_common | {"sqlalchemy-pytds>=0.3"},
"mssql-odbc": sql_common | {"pyodbc"},
@ -193,20 +199,24 @@ plugins: Dict[str, Set[str]] = {
"okta": {"okta~=1.7.0"},
"oracle": sql_common | {"cx_Oracle"},
"postgres": sql_common | {"psycopg2-binary", "GeoAlchemy2"},
"redash": {"redash-toolbelt", "sql-metadata", "sqllineage==1.3.3"},
"redash": {"redash-toolbelt", "sql-metadata", "sqllineage==1.3.4"},
"redshift": sql_common
| {"sqlalchemy-redshift", "psycopg2-binary", "GeoAlchemy2", "sqllineage==1.3.3"},
| {"sqlalchemy-redshift", "psycopg2-binary", "GeoAlchemy2", "sqllineage==1.3.4"},
"redshift-usage": sql_common
| usage_common
| {
"sqlalchemy-redshift",
"psycopg2-binary",
"GeoAlchemy2",
"sqllineage==1.3.3",
"sqllineage==1.3.4",
},
"sagemaker": aws_common,
"snowflake": snowflake_common,
"snowflake-usage": snowflake_common | usage_common | {"more-itertools>=8.12.0", },
"snowflake-usage": snowflake_common
| usage_common
| {
"more-itertools>=8.12.0",
},
"sqlalchemy": sql_common,
"superset": {
"requests",

View File

@ -157,6 +157,7 @@ As a SQL-based service, the Athena integration is also supported by our SQL prof
| `domain.domain_key.ignoreCase` | | `True` | Whether to ignore case sensitivity during pattern matching.There can be multiple domain key specified. |
| `lineage_client_project_id` | | None | The project to use when creating the BigQuery Client. If left empty, the required `project_id` will be used. This is helpful in case the default project_id is not used for querying. |
| `use_v2_audit_metadata` | | `False` | Whether to use `BigQuery audit logs` to get the lineage or not |
| `upstream_lineage_in_report` | | `False` | Useful for debugging lineage information. Set to `True` to see the raw lineage created internally. |
The following parameters are only relevant if include_table_lineage is set to true:

View File

@ -53,6 +53,7 @@ from datahub.metadata.schema_classes import (
UpstreamClass,
UpstreamLineageClass,
)
from datahub.utilities.sql_parser import DefaultSQLParser
logger = logging.getLogger(__name__)
@ -69,7 +70,11 @@ AND
AND NOT
protoPayload.serviceData.jobCompletedEvent.job.jobStatus.error.code:*
AND
(
protoPayload.serviceData.jobCompletedEvent.job.jobStatistics.referencedTables:*
OR
protoPayload.serviceData.jobCompletedEvent.job.jobStatistics.referencedViews:*
)
)
)
AND
@ -91,7 +96,11 @@ AND
AND
protoPayload.metadata.jobChange.job.jobStatus.jobState="DONE"
AND NOT protoPayload.metadata.jobChange.job.jobStatus.errorResult:*
AND protoPayload.metadata.jobChange.job.jobStats.queryStats.referencedTables:*
AND (
protoPayload.metadata.jobChange.job.jobStats.queryStats.referencedTables:*
OR
protoPayload.metadata.jobChange.job.jobStats.queryStats.referencedViews:*
)
)
AND
timestamp >= "{start_time}"
@ -493,25 +502,57 @@ class BigQuerySource(SQLAlchemySource):
def _create_lineage_map(self, entries: Iterable[QueryEvent]) -> Dict[str, Set[str]]:
lineage_map: Dict[str, Set[str]] = collections.defaultdict(set)
num_entries: int = 0
num_skipped_entries: int = 0
self.report.num_total_lineage_entries = 0
self.report.num_skipped_lineage_entries_missing_data = 0
self.report.num_skipped_lineage_entries_not_allowed = 0
self.report.num_skipped_lineage_entries_other = 0
for e in entries:
num_entries += 1
if e.destinationTable is None or not e.referencedTables:
num_skipped_entries += 1
self.report.num_total_lineage_entries += 1
if e.destinationTable is None or not (
e.referencedTables or e.referencedViews
):
self.report.num_skipped_lineage_entries_missing_data += 1
continue
entry_consumed: bool = False
for ref_table in e.referencedTables:
# Skip if schema/table pattern don't allow the destination table
destination_table_str = str(e.destinationTable.remove_extras())
destination_table_str_parts = destination_table_str.split("/")
if not self.config.schema_pattern.allowed(
destination_table_str_parts[3]
) or not self.config.table_pattern.allowed(destination_table_str_parts[-1]):
self.report.num_skipped_lineage_entries_not_allowed += 1
continue
has_table = False
for ref_table in e.referencedTables:
ref_table_str = str(ref_table.remove_extras())
if ref_table_str != destination_table_str:
lineage_map[destination_table_str].add(ref_table_str)
entry_consumed = True
if not entry_consumed:
num_skipped_entries += 1
logger.info(
f"Creating lineage map: total number of entries={num_entries}, number skipped={num_skipped_entries}."
has_table = True
has_view = False
for ref_view in e.referencedViews:
ref_view_str = str(ref_view.remove_extras())
if ref_view_str != destination_table_str:
lineage_map[destination_table_str].add(ref_view_str)
has_view = True
if has_table and has_view:
# If there is a view being referenced then bigquery sends both the view as well as underlying table
# in the references. There is no distinction between direct/base objects accessed. So doing sql parsing
# to ensure we only use direct objects accessed for lineage
parser = DefaultSQLParser(e.query)
referenced_objs = set(
map(lambda x: x.split(".")[-1], parser.get_tables())
)
curr_lineage_str = lineage_map[destination_table_str]
new_lineage_str = set()
for lineage_str in curr_lineage_str:
name = lineage_str.split("/")[-1]
if name in referenced_objs:
new_lineage_str.add(lineage_str)
lineage_map[destination_table_str] = new_lineage_str
if not (has_table or has_view):
self.report.num_skipped_lineage_entries_other += 1
if self.config.upstream_lineage_in_report:
self.report.upstream_lineage = lineage_map
return lineage_map
def get_latest_partition(
@ -688,7 +729,7 @@ WHERE
for ref_table in self.lineage_metadata[str(bq_table)]:
upstream_table = BigQueryTableRef.from_string_name(ref_table)
if upstream_table.is_temporary_table():
# making sure we don't process a table twice and not get into a recurisve loop
# making sure we don't process a table twice and not get into a recursive loop
if ref_table in tables_seen:
logger.debug(
f"Skipping table {ref_table} because it was seen already"

View File

@ -4,7 +4,7 @@ import json
import logging
import os
import re
from dataclasses import dataclass
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any, Dict, Iterable, List, MutableMapping, Optional, Union, cast
@ -300,14 +300,13 @@ class QueryEvent:
timestamp: datetime
actor_email: str
query: str
statementType: Optional[str]
destinationTable: Optional[BigQueryTableRef]
referencedTables: Optional[List[BigQueryTableRef]]
jobName: Optional[str]
payload: Any
statementType: Optional[str] = None
destinationTable: Optional[BigQueryTableRef] = None
referencedTables: List[BigQueryTableRef] = field(default_factory=list)
referencedViews: List[BigQueryTableRef] = field(default_factory=list)
jobName: Optional[str] = None
payload: Optional[Dict] = None
@staticmethod
def get_missing_key_entry(entry: AuditLogEntry) -> Optional[str]:
@ -323,48 +322,49 @@ class QueryEvent:
@classmethod
def from_entry(cls, entry: AuditLogEntry) -> "QueryEvent":
user = entry.payload["authenticationInfo"]["principalEmail"]
job = entry.payload["serviceData"]["jobCompletedEvent"]["job"]
jobName = _job_name_ref(
job: Dict = entry.payload["serviceData"]["jobCompletedEvent"]["job"]
job_query_conf: Dict = job["jobConfiguration"]["query"]
# basic query_event
query_event = QueryEvent(
timestamp=entry.timestamp,
actor_email=entry.payload["authenticationInfo"]["principalEmail"],
query=job_query_conf["query"],
)
# jobName
query_event.jobName = _job_name_ref(
job.get("jobName", {}).get("projectId"), job.get("jobName", {}).get("jobId")
)
rawQuery = job["jobConfiguration"]["query"]["query"]
rawDestTable = job["jobConfiguration"]["query"]["destinationTable"]
destinationTable = None
if rawDestTable:
destinationTable = BigQueryTableRef.from_spec_obj(rawDestTable)
try:
statementType = job["jobConfiguration"]["query"]["statementType"]
except KeyError:
statementType = None
rawRefTables = job["jobStatistics"].get("referencedTables")
referencedTables = None
if rawRefTables:
referencedTables = [
BigQueryTableRef.from_spec_obj(spec) for spec in rawRefTables
]
queryEvent = QueryEvent(
timestamp=entry.timestamp,
actor_email=user,
query=rawQuery,
statementType=statementType,
destinationTable=destinationTable,
referencedTables=referencedTables,
jobName=jobName,
payload=entry.payload if DEBUG_INCLUDE_FULL_PAYLOADS else None,
# destinationTable
raw_dest_table = job_query_conf.get("destinationTable")
if raw_dest_table:
query_event.destinationTable = BigQueryTableRef.from_spec_obj(
raw_dest_table
)
if not jobName:
# statementType
query_event.statementType = job_query_conf.get("statementType")
# referencedTables
job_stats: Dict = job["jobStatistics"]
raw_ref_tables = job_stats.get("referencedTables")
if raw_ref_tables:
query_event.referencedTables = [
BigQueryTableRef.from_spec_obj(spec) for spec in raw_ref_tables
]
# referencedViews
raw_ref_views = job_stats.get("referencedViews")
if raw_ref_views:
query_event.referencedViews = [
BigQueryTableRef.from_spec_obj(spec) for spec in raw_ref_views
]
# payload
query_event.payload = entry.payload if DEBUG_INCLUDE_FULL_PAYLOADS else None
if not query_event.jobName:
logger.debug(
"jobName from query events is absent. "
"Auditlog entry - {logEntry}".format(logEntry=entry)
)
return queryEvent
return query_event
@staticmethod
def get_missing_key_exported_bigquery_audit_metadata(
@ -376,45 +376,44 @@ class QueryEvent:
def from_exported_bigquery_audit_metadata(
cls, row: BigQueryAuditMetadata
) -> "QueryEvent":
timestamp = row["timestamp"]
payload = row["protoPayload"]
metadata = json.loads(row["metadata"])
user = payload["authenticationInfo"]["principalEmail"]
job = metadata["jobChange"]["job"]
job_name = job.get("jobName")
raw_query = job["jobConfig"]["queryConfig"]["query"]
raw_dest_table = job["jobConfig"]["queryConfig"].get("destinationTable")
destination_table = None
payload: Dict = row["protoPayload"]
metadata: Dict = json.loads(row["metadata"])
job: Dict = metadata["jobChange"]["job"]
query_config: Dict = job["jobConfig"]["queryConfig"]
# basic query_event
query_event = QueryEvent(
timestamp=row["timestamp"],
actor_email=payload["authenticationInfo"]["principalEmail"],
query=query_config["query"],
)
# jobName
query_event.jobName = job.get("jobName")
# destinationTable
raw_dest_table = query_config.get("destinationTable")
if raw_dest_table:
destination_table = BigQueryTableRef.from_string_name(raw_dest_table)
raw_ref_tables = job["jobStats"]["queryStats"].get("referencedTables")
referenced_tables = None
query_event.destinationTable = BigQueryTableRef.from_string_name(
raw_dest_table
)
# referencedTables
query_stats: Dict = job["jobStats"]["queryStats"]
raw_ref_tables = query_stats.get("referencedTables")
if raw_ref_tables:
referenced_tables = [
query_event.referencedTables = [
BigQueryTableRef.from_string_name(spec) for spec in raw_ref_tables
]
# referencedViews
raw_ref_views = query_stats.get("referencedViews")
if raw_ref_views:
query_event.referencedViews = [
BigQueryTableRef.from_string_name(spec) for spec in raw_ref_views
]
# statementType
query_event.statementType = query_config.get("statementType")
# payload
query_event.payload = payload if DEBUG_INCLUDE_FULL_PAYLOADS else None
try:
statementType = job["jobConfiguration"]["query"]["statementType"]
except KeyError:
statementType = None
query_event = QueryEvent(
timestamp=timestamp,
actor_email=user,
query=raw_query,
statementType=statementType,
destinationTable=destination_table,
referencedTables=referenced_tables,
jobName=job_name,
payload=payload if DEBUG_INCLUDE_FULL_PAYLOADS else None,
)
if not job_name:
if not query_event.jobName:
logger.debug(
"jobName from query events is absent. "
"BigQueryAuditMetadata entry - {logEntry}".format(logEntry=row)
@ -424,45 +423,42 @@ class QueryEvent:
@classmethod
def from_entry_v2(cls, row: BigQueryAuditMetadata) -> "QueryEvent":
timestamp = row.timestamp
payload = row.payload
metadata = payload["metadata"]
user = payload["authenticationInfo"]["principalEmail"]
job = metadata["jobChange"]["job"]
job_name = job.get("jobName")
raw_query = job["jobConfig"]["queryConfig"]["query"]
raw_dest_table = job["jobConfig"]["queryConfig"].get("destinationTable")
destination_table = None
payload: Dict = row.payload
metadata: Dict = payload["metadata"]
job: Dict = metadata["jobChange"]["job"]
query_config: Dict = job["jobConfig"]["queryConfig"]
# basic query_event
query_event = QueryEvent(
timestamp=row.timestamp,
actor_email=payload["authenticationInfo"]["principalEmail"],
query=query_config["query"],
)
query_event.jobName = job.get("jobName")
# destinationTable
raw_dest_table = query_config.get("destinationTable")
if raw_dest_table:
destination_table = BigQueryTableRef.from_string_name(raw_dest_table)
raw_ref_tables = job["jobStats"]["queryStats"].get("referencedTables")
referenced_tables = None
query_event.destinationTable = BigQueryTableRef.from_string_name(
raw_dest_table
)
# statementType
query_event.statementType = query_config.get("statementType")
# referencedTables
query_stats: Dict = job["jobStats"]["queryStats"]
raw_ref_tables = query_stats.get("referencedTables")
if raw_ref_tables:
referenced_tables = [
query_event.referencedTables = [
BigQueryTableRef.from_string_name(spec) for spec in raw_ref_tables
]
# referencedViews
raw_ref_views = query_stats.get("referencedViews")
if raw_ref_views:
query_event.referencedViews = [
BigQueryTableRef.from_string_name(spec) for spec in raw_ref_views
]
# payload
query_event.payload = payload if DEBUG_INCLUDE_FULL_PAYLOADS else None
try:
statementType = job["jobConfig"]["queryConfig"]["statementType"]
except KeyError:
statementType = None
query_event = QueryEvent(
timestamp=timestamp,
actor_email=user,
query=raw_query,
statementType=statementType,
destinationTable=destination_table,
referencedTables=referenced_tables,
jobName=job_name,
payload=payload if DEBUG_INCLUDE_FULL_PAYLOADS else None,
)
if not job_name:
if not query_event.jobName:
logger.debug(
"jobName from query events is absent. "
"BigQueryAuditMetadata entry - {logEntry}".format(logEntry=row)
@ -511,17 +507,22 @@ class BigQueryUsageSource(Source):
Iterable[MetadataWorkUnit], last_updated_work_units_uncasted
)
if self.config.include_operational_stats:
self.report.num_operational_stats_workunits_emitted = 0
for wu in last_updated_work_units:
self.report.report_workunit(wu)
yield wu
self.report.num_operational_stats_workunits_emitted += 1
hydrated_read_events = self._join_events_by_job_id(parsed_events)
aggregated_info = self._aggregate_enriched_read_events(hydrated_read_events)
self.report.num_usage_workunits_emitted = 0
for time_bucket in aggregated_info.values():
for aggregate in time_bucket.values():
wu = self._make_usage_stat(aggregate)
self.report.report_workunit(wu)
yield wu
self.report.num_usage_workunits_emitted += 1
def _make_bigquery_clients(self) -> List[GCPLoggingClient]:
# See https://github.com/googleapis/google-cloud-python/issues/2674 for

View File

@ -17,20 +17,19 @@ class BigQueryConfig(BaseTimeWindowConfig, SQLAlchemyConfig):
scheme: str = "bigquery"
project_id: Optional[str] = None
lineage_client_project_id: Optional[str] = None
log_page_size: Optional[pydantic.PositiveInt] = 1000
credential: Optional[BigQueryCredential]
# extra_client_options, include_table_lineage and max_query_duration are relevant only when computing the lineage.
extra_client_options: Dict[str, Any] = {}
include_table_lineage: Optional[bool] = True
max_query_duration: timedelta = timedelta(minutes=15)
credentials_path: Optional[str] = None
bigquery_audit_metadata_datasets: Optional[List[str]] = None
use_exported_bigquery_audit_metadata: bool = False
use_date_sharded_audit_log_tables: bool = False
_credentials_path: Optional[str] = pydantic.PrivateAttr(None)
use_v2_audit_metadata: Optional[bool] = False
upstream_lineage_in_report: bool = False
def __init__(self, **data: Any):
super().__init__(**data)

View File

@ -1,12 +1,16 @@
from dataclasses import dataclass
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional
from typing import Dict, Optional
from datahub.ingestion.source.sql.sql_common import SQLSourceReport
@dataclass
class BigQueryReport(SQLSourceReport):
num_total_lineage_entries: Optional[int] = None
num_skipped_lineage_entries_missing_data: Optional[int] = None
num_skipped_lineage_entries_not_allowed: Optional[int] = None
num_skipped_lineage_entries_other: Optional[int] = None
num_total_log_entries: Optional[int] = None
num_parsed_log_entires: Optional[int] = None
num_total_audit_entries: Optional[int] = None
@ -20,3 +24,4 @@ class BigQueryReport(SQLSourceReport):
log_entry_end_time: Optional[str] = None
audit_start_time: Optional[str] = None
audit_end_time: Optional[str] = None
upstream_lineage: Dict = field(default_factory=dict)

View File

@ -21,6 +21,8 @@ class BigQueryUsageSourceReport(SourceReport):
deny_pattern: Optional[str] = None
log_entry_start_time: Optional[str] = None
log_entry_end_time: Optional[str] = None
num_usage_workunits_emitted: Optional[int] = None
num_operational_stats_workunits_emitted: Optional[int] = None
def report_dropped(self, key: str) -> None:
self.dropped_table[key] += 1