diff --git a/metadata-ingestion/src/datahub/ingestion/source/usage/bigquery_usage.py b/metadata-ingestion/src/datahub/ingestion/source/usage/bigquery_usage.py index c382eb21a1..ae05deea34 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/usage/bigquery_usage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/usage/bigquery_usage.py @@ -74,13 +74,11 @@ BQ_DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ" BQ_DATE_SHARD_FORMAT = "%Y%m%d" BQ_AUDIT_V1 = { "BQ_FILTER_REGEX_ALLOW_TEMPLATE": """ -protoPayload.serviceData.jobCompletedEvent.job.jobStatistics.referencedTables.tableId =~ "{table_allow_pattern}" -AND -protoPayload.serviceData.jobCompletedEvent.job.jobStatistics.referencedTables.datasetId =~ "{dataset_allow_pattern}" +protoPayload.serviceData.jobCompletedEvent.job.jobStatistics.referencedTables.tableId =~ "{allow_pattern}" """, "BQ_FILTER_REGEX_DENY_TEMPLATE": """ {logical_operator} -protoPayload.serviceData.jobCompletedEvent.job.jobStatistics.referencedTables.tableId !~ "{table_deny_pattern}" +protoPayload.serviceData.jobCompletedEvent.job.jobStatistics.referencedTables.tableId !~ "{deny_pattern}" """, "BQ_FILTER_RULE_TEMPLATE": """ protoPayload.serviceName="bigquery.googleapis.com" @@ -115,11 +113,11 @@ timestamp < "{end_time}" BQ_AUDIT_V2 = { "BQ_FILTER_REGEX_ALLOW_TEMPLATE": """ -protoPayload.metadata.jobChange.job.jobStats.queryStats.referencedTables =~ "projects/.*/datasets/{dataset_allow_pattern}/tables/{table_allow_pattern}" +protoPayload.metadata.jobChange.job.jobStats.queryStats.referencedTables =~ "projects/.*/datasets/.*/tables/{allow_pattern}" """, "BQ_FILTER_REGEX_DENY_TEMPLATE": """ {logical_operator} -protoPayload.metadata.jobChange.job.jobStats.queryStats.referencedTables !~ "projects/.*/datasets/.*/tables/{table_deny_pattern}" +protoPayload.metadata.jobChange.job.jobStats.queryStats.referencedTables !~ "projects/.*/datasets/.*/tables/{deny_pattern}" """, "BQ_FILTER_RULE_TEMPLATE": """ resource.type=("bigquery_project" OR "bigquery_dataset") @@ -171,8 +169,7 @@ OPERATION_STATEMENT_TYPES = { def bigquery_audit_metadata_query_template( dataset: str, use_date_sharded_tables: bool, - table_allow_filter: str, - dataset_allow_filter: str, + table_allow_filter: str = None, ) -> str: """ Receives a dataset (with project specified) and returns a query template that is used to query exported @@ -180,15 +177,14 @@ def bigquery_audit_metadata_query_template( :param dataset: the dataset to query against in the form of $PROJECT.$DATASET :param use_date_sharded_tables: whether to read from date sharded audit log tables or time partitioned audit log tables - :param table_allow_filter: regex used to filter on log events that contain the wanted tables - :param dataset_allow_filter: regex used to filter on log events that contain wanted datasets + :param table_allow_filter: regex used to filter on log events that contain the wanted datasets :return: a query template, when supplied start_time and end_time, can be used to query audit logs from BigQuery """ allow_filter = f""" AND EXISTS (SELECT * from UNNEST(JSON_EXTRACT_ARRAY(protopayload_auditlog.metadataJson, "$.jobChange.job.jobStats.queryStats.referencedTables")) AS x - where REGEXP_CONTAINS(x, r'(projects/.*/datasets/{dataset_allow_filter if dataset_allow_filter else ".*"}/tables/{table_allow_filter if table_allow_filter else ".*"})')) + where REGEXP_CONTAINS(x, r'(projects/.*/datasets/.*/tables/{table_allow_filter if table_allow_filter else ".*"})')) """ query: str @@ -655,8 +651,8 @@ class BigQueryUsageSource(Source): self.report.use_v2_audit_metadata = self.config.use_v2_audit_metadata self.report.query_log_delay = self.config.query_log_delay self.report.log_page_size = self.config.log_page_size - self.report.allow_pattern = self.config.get_table_allow_pattern_string() - self.report.deny_pattern = self.config.get_table_deny_pattern_string() + self.report.allow_pattern = self.config.get_allow_pattern_string() + self.report.deny_pattern = self.config.get_deny_pattern_string() def _is_table_allowed(self, table_ref: Optional[BigQueryTableRef]) -> bool: return ( @@ -753,9 +749,7 @@ class BigQueryUsageSource(Source): list_entries: Iterable[ BigQueryAuditMetadata ] = self._get_exported_bigquery_audit_metadata( - client, - self.config.get_table_allow_pattern_string(), - self.config.get_dataset_allow_pattern_string(), + client, self.config.get_allow_pattern_string() ) list_entry_generators_across_clients.append(list_entries) except Exception as e: @@ -781,10 +775,7 @@ class BigQueryUsageSource(Source): logger.info(f"Finished loading {i} log entries from BigQuery") def _get_exported_bigquery_audit_metadata( - self, - bigquery_client: BigQueryClient, - table_allow_filter: str, - dataset_allow_filter: str, + self, bigquery_client: BigQueryClient, allow_filter: str ) -> Iterable[BigQueryAuditMetadata]: if self.config.bigquery_audit_metadata_datasets is None: return @@ -810,10 +801,7 @@ class BigQueryUsageSource(Source): ).strftime(BQ_DATE_SHARD_FORMAT) query = bigquery_audit_metadata_query_template( - dataset, - self.config.use_date_sharded_audit_log_tables, - table_allow_filter, - dataset_allow_filter, + dataset, self.config.use_date_sharded_audit_log_tables, allow_filter ).format( start_time=start_time, end_time=end_time, @@ -822,11 +810,9 @@ class BigQueryUsageSource(Source): ) else: query = bigquery_audit_metadata_query_template( - dataset, - self.config.use_date_sharded_audit_log_tables, - table_allow_filter, - dataset_allow_filter, + dataset, self.config.use_date_sharded_audit_log_tables, allow_filter ).format(start_time=start_time, end_time=end_time) + query_job = bigquery_client.query(query) logger.info( f"Finished loading log entries from BigQueryAuditMetadata in {dataset}" @@ -852,30 +838,21 @@ class BigQueryUsageSource(Source): # completion event is delayed and happens after the configured end time. # Can safely access the first index of the allow list as it by default contains ".*" - use_allow_filter = ( - self.config.table_pattern - and ( - len(self.config.table_pattern.allow) > 1 - or self.config.table_pattern.allow[0] != ".*" - ) - or self.config.dataset_pattern - and ( - len(self.config.dataset_pattern.allow) > 1 - or self.config.dataset_pattern.allow[0] != ".*" - ) + use_allow_filter = self.config.table_pattern and ( + len(self.config.table_pattern.allow) > 1 + or self.config.table_pattern.allow[0] != ".*" ) use_deny_filter = self.config.table_pattern and self.config.table_pattern.deny allow_regex = ( audit_templates["BQ_FILTER_REGEX_ALLOW_TEMPLATE"].format( - table_allow_pattern=self.config.get_table_allow_pattern_string(), - dataset_allow_pattern=self.config.get_dataset_allow_pattern_string(), + allow_pattern=self.config.get_allow_pattern_string() ) if use_allow_filter else "" ) deny_regex = ( audit_templates["BQ_FILTER_REGEX_DENY_TEMPLATE"].format( - table_deny_pattern=self.config.get_table_deny_pattern_string(), + deny_pattern=self.config.get_deny_pattern_string(), logical_operator="AND" if use_allow_filter else "", ) if use_deny_filter diff --git a/metadata-ingestion/src/datahub/ingestion/source_config/usage/bigquery_usage.py b/metadata-ingestion/src/datahub/ingestion/source_config/usage/bigquery_usage.py index c8f792d979..960eed94e1 100644 --- a/metadata-ingestion/src/datahub/ingestion/source_config/usage/bigquery_usage.py +++ b/metadata-ingestion/src/datahub/ingestion/source_config/usage/bigquery_usage.py @@ -156,14 +156,8 @@ class BigQueryUsageConfig(DatasetSourceConfigBase, BaseUsageConfig): ) return v - def get_table_allow_pattern_string(self) -> str: + def get_allow_pattern_string(self) -> str: return "|".join(self.table_pattern.allow) if self.table_pattern else "" - def get_table_deny_pattern_string(self) -> str: + def get_deny_pattern_string(self) -> str: return "|".join(self.table_pattern.deny) if self.table_pattern else "" - - def get_dataset_allow_pattern_string(self) -> str: - return "|".join(self.dataset_pattern.allow) if self.table_pattern else "" - - def get_dataset_deny_pattern_string(self) -> str: - return "|".join(self.dataset_pattern.deny) if self.table_pattern else "" diff --git a/metadata-ingestion/tests/integration/bigquery-usage/test_bigquery_usage.py b/metadata-ingestion/tests/integration/bigquery-usage/test_bigquery_usage.py index c750275915..db80fc641c 100644 --- a/metadata-ingestion/tests/integration/bigquery-usage/test_bigquery_usage.py +++ b/metadata-ingestion/tests/integration/bigquery-usage/test_bigquery_usage.py @@ -32,8 +32,8 @@ def test_bq_usage_config(): table_pattern={"allow": ["test-regex", "test-regex-1"], "deny": []}, ) ) - assert config.get_table_allow_pattern_string() == "test-regex|test-regex-1" - assert config.get_table_deny_pattern_string() == "" + assert config.get_allow_pattern_string() == "test-regex|test-regex-1" + assert config.get_deny_pattern_string() == "" assert (config.end_time - config.start_time) == timedelta(hours=2) assert config.projects == ["sample-bigquery-project-name-1234"]