fix(ingest/bigquery): fix lineage if multiple sql expression passed in and destination table set (#10212)

Co-authored-by: Harshal Sheth <hsheth2@gmail.com>
This commit is contained in:
Tamas Nemeth 2024-04-09 11:54:02 +02:00 committed by GitHub
parent 1625de8e8d
commit 00a890f84f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 153 additions and 7 deletions

View File

@ -81,7 +81,9 @@ class BigqueryTableIdentifier:
@classmethod
def from_string_name(cls, table: str) -> "BigqueryTableIdentifier":
parts = table.split(".")
return cls(parts[0], parts[1], parts[2])
# If the table name contains dollar sign, it is a referrence to a partitioned table and we have to strip it
table = parts[2].split("$", 1)[0]
return cls(parts[0], parts[1], table)
def raw_table_name(self):
return f"{self.project_id}.{self.dataset}.{self.table}"

View File

@ -17,6 +17,7 @@ from typing import (
)
import humanfriendly
import sqlglot
from google.cloud.datacatalog import lineage_v1
from google.cloud.logging_v2.client import Client as GCPLoggingClient
@ -737,13 +738,27 @@ class BigqueryLineageExtractor:
# Try the sql parser first.
if self.config.lineage_use_sql_parser:
logger.debug(
f"Using sql parser for lineage extraction for destination table: {destination_table.table_identifier.get_table_name()}, queryType: {e.statementType}, query: {e.query}"
)
if e.statementType == "SELECT":
# We wrap select statements in a CTE to make them parseable as insert statement.
# This is a workaround for the sql parser to support the case where the user runs a query and inserts the result into a table..
query = f"""create table `{destination_table.table_identifier.get_table_name()}` AS
(
{e.query}
)"""
try:
parsed_queries = sqlglot.parse(e.query, "bigquery")
if parsed_queries[-1]:
query = f"""create table `{destination_table.get_sanitized_table_ref().table_identifier.get_table_name()}` AS
(
{parsed_queries[-1].sql(dialect='bigquery')}
)"""
else:
query = e.query
except Exception:
logger.debug(
f"Failed to parse select-based lineage query {e.query} for table {destination_table}."
"Sql parsing will likely fail for this query, which will result in a fallback to audit log."
)
query = e.query
else:
query = e.query
raw_lineage = sqlglot_lineage(
@ -751,6 +766,9 @@ class BigqueryLineageExtractor:
schema_resolver=sql_parser_schema_resolver,
default_db=e.project_id,
)
logger.debug(
f"Input tables: {raw_lineage.in_tables}, Output tables: {raw_lineage.out_tables}"
)
if raw_lineage.debug_info.table_error:
logger.debug(
f"Sql Parser failed on query: {e.query}. It won't cause any major issues, but "

View File

@ -204,7 +204,7 @@ class BigQueryUsageState(Closeable):
r.resource,
q.query,
COUNT(r.key) as query_count,
ROW_NUMBER() over (PARTITION BY r.timestamp, r.resource, q.query ORDER BY COUNT(r.key) DESC, q.query) as rank
ROW_NUMBER() over (PARTITION BY r.timestamp, r.resource ORDER BY COUNT(r.key) DESC) as rank
FROM
read_events r
INNER JOIN query_events q ON r.name = q.key
@ -256,6 +256,7 @@ class BigQueryUsageState(Closeable):
def usage_statistics(self, top_n: int) -> Iterator[UsageStatistic]:
query = self.usage_statistics_query(top_n)
rows = self.read_events.sql_query_iterator(
query, refs=[self.query_events, self.column_accesses]
)

View File

@ -74,6 +74,12 @@ def make_usage_workunit(
top_sql_queries: Optional[List[str]] = None
if query_freq is not None:
if top_n_queries < len(query_freq):
logger.warn(
f"Top N query limit exceeded on {str(resource)}. Max number of queries {top_n_queries} < {len(query_freq)}. Truncating top queries to {top_n_queries}."
)
query_freq = query_freq[0:top_n_queries]
budget_per_query: int = int(queries_character_limit / top_n_queries)
top_sql_queries = [
trim_query(

View File

@ -190,7 +190,7 @@ def config() -> BigQueryV2Config:
end_time=TS_2 + timedelta(minutes=1),
usage=BigQueryUsageConfig(
include_top_n_queries=True,
top_n_queries=3,
top_n_queries=30,
bucket_duration=BucketDuration.DAY,
include_operational_stats=False,
),
@ -878,6 +878,125 @@ def test_usage_counts_no_columns(
assert not caplog.records
def test_usage_counts_no_columns_and_top_n_limit_hit(
caplog: pytest.LogCaptureFixture,
usage_extractor: BigQueryUsageExtractor,
config: BigQueryV2Config,
) -> None:
config.usage.top_n_queries = 1
job_name = "job_name"
ref = BigQueryTableRef(
BigqueryTableIdentifier(PROJECT_1, DATABASE_1.name, TABLE_1.name)
)
events = [
AuditEvent.create(
ReadEvent(
jobName=job_name,
timestamp=TS_1,
actor_email=ACTOR_1,
resource=ref,
fieldsRead=[],
readReason="JOB",
payload=None,
),
),
AuditEvent.create(
ReadEvent(
jobName="job_name_2",
timestamp=TS_1,
actor_email=ACTOR_1,
resource=ref,
fieldsRead=[],
readReason="JOB",
payload=None,
),
),
AuditEvent.create(
ReadEvent(
jobName="job_name_3",
timestamp=TS_1,
actor_email=ACTOR_1,
resource=ref,
fieldsRead=[],
readReason="JOB",
payload=None,
),
),
AuditEvent.create(
QueryEvent(
job_name=job_name,
timestamp=TS_1,
actor_email=ACTOR_1,
query="SELECT * FROM table_1",
statementType="SELECT",
project_id=PROJECT_1,
destinationTable=None,
referencedTables=[ref],
referencedViews=[],
payload=None,
)
),
AuditEvent.create(
QueryEvent(
job_name="job_name_2",
timestamp=TS_1,
actor_email=ACTOR_1,
query="SELECT * FROM table_1",
statementType="SELECT",
project_id=PROJECT_1,
destinationTable=None,
referencedTables=[ref],
referencedViews=[],
payload=None,
)
),
AuditEvent.create(
QueryEvent(
job_name="job_name_3",
timestamp=TS_1,
actor_email=ACTOR_1,
query="SELECT my_column FROM table_1",
statementType="SELECT",
project_id=PROJECT_1,
destinationTable=None,
referencedTables=[ref],
referencedViews=[],
payload=None,
)
),
]
caplog.clear()
with caplog.at_level(logging.WARNING):
workunits = usage_extractor._get_workunits_internal(
events, [TABLE_REFS[TABLE_1.name]]
)
expected = [
make_usage_workunit(
table=TABLE_1,
dataset_usage_statistics=DatasetUsageStatisticsClass(
timestampMillis=int(TS_1.timestamp() * 1000),
eventGranularity=TimeWindowSizeClass(
unit=BucketDuration.DAY, multiple=1
),
totalSqlQueries=3,
topSqlQueries=["SELECT * FROM table_1"],
uniqueUserCount=1,
userCounts=[
DatasetUserUsageCountsClass(
user=ACTOR_1_URN,
count=3,
userEmail=ACTOR_1,
),
],
fieldCounts=[],
),
)
]
compare_workunits(workunits, expected)
assert not caplog.records
@freeze_time(FROZEN_TIME)
@patch.object(BigQueryUsageExtractor, "_generate_usage_workunits")
def test_operational_stats(