diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_audit.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_audit.py index b7c13cf179..0e7e98b0e5 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_audit.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_audit.py @@ -81,7 +81,9 @@ class BigqueryTableIdentifier: @classmethod def from_string_name(cls, table: str) -> "BigqueryTableIdentifier": parts = table.split(".") - return cls(parts[0], parts[1], parts[2]) + # If the table name contains dollar sign, it is a referrence to a partitioned table and we have to strip it + table = parts[2].split("$", 1)[0] + return cls(parts[0], parts[1], table) def raw_table_name(self): return f"{self.project_id}.{self.dataset}.{self.table}" diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py index 5586f94201..c8c1e7c893 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py @@ -17,6 +17,7 @@ from typing import ( ) import humanfriendly +import sqlglot from google.cloud.datacatalog import lineage_v1 from google.cloud.logging_v2.client import Client as GCPLoggingClient @@ -737,13 +738,27 @@ class BigqueryLineageExtractor: # Try the sql parser first. if self.config.lineage_use_sql_parser: + logger.debug( + f"Using sql parser for lineage extraction for destination table: {destination_table.table_identifier.get_table_name()}, queryType: {e.statementType}, query: {e.query}" + ) if e.statementType == "SELECT": # We wrap select statements in a CTE to make them parseable as insert statement. # This is a workaround for the sql parser to support the case where the user runs a query and inserts the result into a table.. - query = f"""create table `{destination_table.table_identifier.get_table_name()}` AS - ( - {e.query} - )""" + try: + parsed_queries = sqlglot.parse(e.query, "bigquery") + if parsed_queries[-1]: + query = f"""create table `{destination_table.get_sanitized_table_ref().table_identifier.get_table_name()}` AS + ( + {parsed_queries[-1].sql(dialect='bigquery')} + )""" + else: + query = e.query + except Exception: + logger.debug( + f"Failed to parse select-based lineage query {e.query} for table {destination_table}." + "Sql parsing will likely fail for this query, which will result in a fallback to audit log." + ) + query = e.query else: query = e.query raw_lineage = sqlglot_lineage( @@ -751,6 +766,9 @@ class BigqueryLineageExtractor: schema_resolver=sql_parser_schema_resolver, default_db=e.project_id, ) + logger.debug( + f"Input tables: {raw_lineage.in_tables}, Output tables: {raw_lineage.out_tables}" + ) if raw_lineage.debug_info.table_error: logger.debug( f"Sql Parser failed on query: {e.query}. It won't cause any major issues, but " diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/usage.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/usage.py index b5c077b7cd..1b95cbf505 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/usage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/usage.py @@ -204,7 +204,7 @@ class BigQueryUsageState(Closeable): r.resource, q.query, COUNT(r.key) as query_count, - ROW_NUMBER() over (PARTITION BY r.timestamp, r.resource, q.query ORDER BY COUNT(r.key) DESC, q.query) as rank + ROW_NUMBER() over (PARTITION BY r.timestamp, r.resource ORDER BY COUNT(r.key) DESC) as rank FROM read_events r INNER JOIN query_events q ON r.name = q.key @@ -256,6 +256,7 @@ class BigQueryUsageState(Closeable): def usage_statistics(self, top_n: int) -> Iterator[UsageStatistic]: query = self.usage_statistics_query(top_n) + rows = self.read_events.sql_query_iterator( query, refs=[self.query_events, self.column_accesses] ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/usage/usage_common.py b/metadata-ingestion/src/datahub/ingestion/source/usage/usage_common.py index e6117d70d0..a2dbef538f 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/usage/usage_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/usage/usage_common.py @@ -74,6 +74,12 @@ def make_usage_workunit( top_sql_queries: Optional[List[str]] = None if query_freq is not None: + if top_n_queries < len(query_freq): + logger.warn( + f"Top N query limit exceeded on {str(resource)}. Max number of queries {top_n_queries} < {len(query_freq)}. Truncating top queries to {top_n_queries}." + ) + query_freq = query_freq[0:top_n_queries] + budget_per_query: int = int(queries_character_limit / top_n_queries) top_sql_queries = [ trim_query( diff --git a/metadata-ingestion/tests/unit/test_bigquery_usage.py b/metadata-ingestion/tests/unit/test_bigquery_usage.py index 3073441446..f476e62dd0 100644 --- a/metadata-ingestion/tests/unit/test_bigquery_usage.py +++ b/metadata-ingestion/tests/unit/test_bigquery_usage.py @@ -190,7 +190,7 @@ def config() -> BigQueryV2Config: end_time=TS_2 + timedelta(minutes=1), usage=BigQueryUsageConfig( include_top_n_queries=True, - top_n_queries=3, + top_n_queries=30, bucket_duration=BucketDuration.DAY, include_operational_stats=False, ), @@ -878,6 +878,125 @@ def test_usage_counts_no_columns( assert not caplog.records +def test_usage_counts_no_columns_and_top_n_limit_hit( + caplog: pytest.LogCaptureFixture, + usage_extractor: BigQueryUsageExtractor, + config: BigQueryV2Config, +) -> None: + config.usage.top_n_queries = 1 + + job_name = "job_name" + ref = BigQueryTableRef( + BigqueryTableIdentifier(PROJECT_1, DATABASE_1.name, TABLE_1.name) + ) + events = [ + AuditEvent.create( + ReadEvent( + jobName=job_name, + timestamp=TS_1, + actor_email=ACTOR_1, + resource=ref, + fieldsRead=[], + readReason="JOB", + payload=None, + ), + ), + AuditEvent.create( + ReadEvent( + jobName="job_name_2", + timestamp=TS_1, + actor_email=ACTOR_1, + resource=ref, + fieldsRead=[], + readReason="JOB", + payload=None, + ), + ), + AuditEvent.create( + ReadEvent( + jobName="job_name_3", + timestamp=TS_1, + actor_email=ACTOR_1, + resource=ref, + fieldsRead=[], + readReason="JOB", + payload=None, + ), + ), + AuditEvent.create( + QueryEvent( + job_name=job_name, + timestamp=TS_1, + actor_email=ACTOR_1, + query="SELECT * FROM table_1", + statementType="SELECT", + project_id=PROJECT_1, + destinationTable=None, + referencedTables=[ref], + referencedViews=[], + payload=None, + ) + ), + AuditEvent.create( + QueryEvent( + job_name="job_name_2", + timestamp=TS_1, + actor_email=ACTOR_1, + query="SELECT * FROM table_1", + statementType="SELECT", + project_id=PROJECT_1, + destinationTable=None, + referencedTables=[ref], + referencedViews=[], + payload=None, + ) + ), + AuditEvent.create( + QueryEvent( + job_name="job_name_3", + timestamp=TS_1, + actor_email=ACTOR_1, + query="SELECT my_column FROM table_1", + statementType="SELECT", + project_id=PROJECT_1, + destinationTable=None, + referencedTables=[ref], + referencedViews=[], + payload=None, + ) + ), + ] + caplog.clear() + with caplog.at_level(logging.WARNING): + workunits = usage_extractor._get_workunits_internal( + events, [TABLE_REFS[TABLE_1.name]] + ) + expected = [ + make_usage_workunit( + table=TABLE_1, + dataset_usage_statistics=DatasetUsageStatisticsClass( + timestampMillis=int(TS_1.timestamp() * 1000), + eventGranularity=TimeWindowSizeClass( + unit=BucketDuration.DAY, multiple=1 + ), + totalSqlQueries=3, + topSqlQueries=["SELECT * FROM table_1"], + uniqueUserCount=1, + userCounts=[ + DatasetUserUsageCountsClass( + user=ACTOR_1_URN, + count=3, + userEmail=ACTOR_1, + ), + ], + fieldCounts=[], + ), + ) + ] + compare_workunits(workunits, expected) + assert not caplog.records + + @freeze_time(FROZEN_TIME) @patch.object(BigQueryUsageExtractor, "_generate_usage_workunits") def test_operational_stats(