diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py index 4a17d25026..cbaea9942e 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py @@ -109,26 +109,18 @@ timestamp < "{end_time}" :param limit: set a limit for the maximum event to return. It is used for connection testing currently :return: a query template, when supplied start_time and end_time, can be used to query audit logs from BigQuery """ - query: str + limit_text = f"limit {limit}" if limit else "" + + shard_condition = "" if use_date_sharded_tables: - query = ( - f""" - SELECT - timestamp, - logName, - insertId, - protopayload_auditlog AS protoPayload, - protopayload_auditlog.metadataJson AS metadata - FROM - `{dataset}.cloudaudit_googleapis_com_data_access_*` - """ - + """ - WHERE - _TABLE_SUFFIX BETWEEN "{start_time}" AND "{end_time}" AND - """ + from_table = f"`{dataset}.cloudaudit_googleapis_com_data_access_*`" + shard_condition = ( + """ AND _TABLE_SUFFIX BETWEEN "{start_date}" AND "{end_date}" """ ) else: - query = f""" + from_table = f"`{dataset}.cloudaudit_googleapis_com_data_access`" + + query = f""" SELECT timestamp, logName, @@ -136,22 +128,19 @@ timestamp < "{end_time}" protopayload_auditlog AS protoPayload, protopayload_auditlog.metadataJson AS metadata FROM - `{dataset}.cloudaudit_googleapis_com_data_access` - WHERE - """ - - audit_log_filter = """ timestamp >= "{start_time}" - AND timestamp < "{end_time}" - AND protopayload_auditlog.serviceName="bigquery.googleapis.com" - AND JSON_EXTRACT_SCALAR(protopayload_auditlog.metadataJson, "$.jobChange.job.jobStatus.jobState") = "DONE" - AND JSON_EXTRACT(protopayload_auditlog.metadataJson, "$.jobChange.job.jobStatus.errorResults") IS NULL - AND JSON_EXTRACT(protopayload_auditlog.metadataJson, "$.jobChange.job.jobConfig.queryConfig") IS NOT NULL + {from_table} + WHERE ( + timestamp >= "{{start_time}}" + AND timestamp < "{{end_time}}" + ) + {shard_condition} + AND protopayload_auditlog.serviceName="bigquery.googleapis.com" + AND JSON_EXTRACT_SCALAR(protopayload_auditlog.metadataJson, "$.jobChange.job.jobStatus.jobState") = "DONE" + AND JSON_EXTRACT(protopayload_auditlog.metadataJson, "$.jobChange.job.jobStatus.errorResults") IS NULL + AND JSON_EXTRACT(protopayload_auditlog.metadataJson, "$.jobChange.job.jobConfig.queryConfig") IS NOT NULL + {limit_text}; """ - if limit is not None: - audit_log_filter = audit_log_filter + f" LIMIT {limit}" - query = textwrap.dedent(query) + audit_log_filter + ";" - return textwrap.dedent(query) def lineage_via_catalog_lineage_api( @@ -341,6 +330,7 @@ timestamp < "{end_time}" def _get_exported_bigquery_audit_metadata( self, bigquery_client: BigQueryClient, limit: Optional[int] = None ) -> Iterable[BigQueryAuditMetadata]: + if self.config.bigquery_audit_metadata_datasets is None: self.error( logger, "audit-metadata", "bigquery_audit_metadata_datasets not set" @@ -348,22 +338,14 @@ timestamp < "{end_time}" self.report.bigquery_audit_metadata_datasets_missing = True return - start_time: str = ( - self.config.start_time - self.config.max_query_duration - ).strftime( - BQ_DATE_SHARD_FORMAT - if self.config.use_date_sharded_audit_log_tables - else BQ_DATETIME_FORMAT - ) + corrected_start_time = self.config.start_time - self.config.max_query_duration + start_time = corrected_start_time.strftime(BQ_DATETIME_FORMAT) + start_date = corrected_start_time.strftime(BQ_DATE_SHARD_FORMAT) self.report.audit_start_time = start_time - end_time: str = ( - self.config.end_time + self.config.max_query_duration - ).strftime( - BQ_DATE_SHARD_FORMAT - if self.config.use_date_sharded_audit_log_tables - else BQ_DATETIME_FORMAT - ) + corrected_end_time = self.config.end_time + self.config.max_query_duration + end_time = corrected_end_time.strftime(BQ_DATETIME_FORMAT) + end_date = corrected_end_time.strftime(BQ_DATE_SHARD_FORMAT) self.report.audit_end_time = end_time for dataset in self.config.bigquery_audit_metadata_datasets: @@ -374,13 +356,14 @@ timestamp < "{end_time}" query: str = self.bigquery_audit_metadata_query_template( dataset=dataset, use_date_sharded_tables=self.config.use_date_sharded_audit_log_tables, + limit=limit, ).format( start_time=start_time, end_time=end_time, + start_date=start_date, + end_date=end_date, ) - if limit is not None: - query = query + f" LIMIT {limit}" query_job = bigquery_client.query(query) logger.info( diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/usage.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/usage.py index c4ac516d01..747872da24 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/usage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/usage.py @@ -100,26 +100,18 @@ def bigquery_audit_metadata_query_template( where REGEXP_CONTAINS(x, r'(projects/.*/datasets/.*/tables/{table_allow_filter if table_allow_filter else ".*"})')) """ - query: str + limit_text = f"limit {limit}" if limit else "" + + shard_condition = "" if use_date_sharded_tables: - query = ( - f""" - SELECT - timestamp, - logName, - insertId, - protopayload_auditlog AS protoPayload, - protopayload_auditlog.metadataJson AS metadata - FROM - `{dataset}.cloudaudit_googleapis_com_data_access_*` - """ - + """ - WHERE - _TABLE_SUFFIX BETWEEN "{start_time}" AND "{end_time}" - """ + from_table = f"`{dataset}.cloudaudit_googleapis_com_data_access_*`" + shard_condition = ( + """ AND _TABLE_SUFFIX BETWEEN "{start_date}" AND "{end_date}" """ ) else: - query = f""" + from_table = f"`{dataset}.cloudaudit_googleapis_com_data_access`" + + query = f""" SELECT timestamp, logName, @@ -127,34 +119,25 @@ def bigquery_audit_metadata_query_template( protopayload_auditlog AS protoPayload, protopayload_auditlog.metadataJson AS metadata FROM - `{dataset}.cloudaudit_googleapis_com_data_access` - WHERE 1=1 - """ - audit_log_filter_timestamps = """AND (timestamp >= "{start_time}" - AND timestamp < "{end_time}" - ) - """ - audit_log_filter_query_complete = f""" - AND ( - ( - protopayload_auditlog.serviceName="bigquery.googleapis.com" - AND JSON_EXTRACT_SCALAR(protopayload_auditlog.metadataJson, "$.jobChange.job.jobStatus.jobState") = "DONE" - AND JSON_EXTRACT(protopayload_auditlog.metadataJson, "$.jobChange.job.jobConfig.queryConfig") IS NOT NULL - {allow_filter} - ) + {from_table} + WHERE ( + timestamp >= "{{start_time}}" + AND timestamp < "{{end_time}}" + ) + {shard_condition} + AND ( + ( + protopayload_auditlog.serviceName="bigquery.googleapis.com" + AND JSON_EXTRACT_SCALAR(protopayload_auditlog.metadataJson, "$.jobChange.job.jobStatus.jobState") = "DONE" + AND JSON_EXTRACT(protopayload_auditlog.metadataJson, "$.jobChange.job.jobConfig.queryConfig") IS NOT NULL + {allow_filter} + ) OR JSON_EXTRACT_SCALAR(protopayload_auditlog.metadataJson, "$.tableDataRead.reason") = "JOB" - ) + ) + {limit_text}; """ - limit_text = f"limit {limit}" if limit else "" - query = ( - textwrap.dedent(query) - + audit_log_filter_query_complete - + audit_log_filter_timestamps - + limit_text - ) - return textwrap.dedent(query) @@ -276,22 +259,14 @@ class BigQueryUsageExtractor: if self.config.bigquery_audit_metadata_datasets is None: return - start_time: str = ( - self.config.start_time - self.config.max_query_duration - ).strftime( - BQ_DATE_SHARD_FORMAT - if self.config.use_date_sharded_audit_log_tables - else BQ_DATETIME_FORMAT - ) + corrected_start_time = self.config.start_time - self.config.max_query_duration + start_time = corrected_start_time.strftime(BQ_DATETIME_FORMAT) + start_date = corrected_start_time.strftime(BQ_DATE_SHARD_FORMAT) self.report.audit_start_time = start_time - end_time: str = ( - self.config.end_time + self.config.max_query_duration - ).strftime( - BQ_DATE_SHARD_FORMAT - if self.config.use_date_sharded_audit_log_tables - else BQ_DATETIME_FORMAT - ) + corrected_end_time = self.config.end_time + self.config.max_query_duration + end_time = corrected_end_time.strftime(BQ_DATETIME_FORMAT) + end_date = corrected_end_time.strftime(BQ_DATE_SHARD_FORMAT) self.report.audit_end_time = end_time for dataset in self.config.bigquery_audit_metadata_datasets: @@ -303,10 +278,12 @@ class BigQueryUsageExtractor: dataset, self.config.use_date_sharded_audit_log_tables, allow_filter, - limit, + limit=limit, ).format( start_time=start_time, end_time=end_time, + start_date=start_date, + end_date=end_date, ) query_job = bigquery_client.query(query)