diff --git a/metadata-ingestion/src/datahub/ingestion/source/usage/bigquery_usage.py b/metadata-ingestion/src/datahub/ingestion/source/usage/bigquery_usage.py index 211764da15..5c1693721a 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/usage/bigquery_usage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/usage/bigquery_usage.py @@ -922,51 +922,7 @@ class BigQueryUsageSource(Source): if self.config.use_v2_audit_metadata: audit_templates = BQ_AUDIT_V2 - # We adjust the filter values a bit, since we need to make sure that the join - # between query events and read events is complete. For example, this helps us - # handle the case where the read happens within our time range but the query - # completion event is delayed and happens after the configured end time. - - # Can safely access the first index of the allow list as it by default contains ".*" - use_allow_filter = self.config.table_pattern and ( - len(self.config.table_pattern.allow) > 1 - or self.config.table_pattern.allow[0] != ".*" - ) - use_deny_filter = self.config.table_pattern and self.config.table_pattern.deny - allow_regex = ( - audit_templates["BQ_FILTER_REGEX_ALLOW_TEMPLATE"].format( - allow_pattern=self.config.get_allow_pattern_string() - ) - if use_allow_filter - else "" - ) - - base_deny_pattern: str = "__TABLES_SUMMARY__|INFORMATION_SCHEMA" - deny_regex = audit_templates["BQ_FILTER_REGEX_DENY_TEMPLATE"].format( - deny_pattern=base_deny_pattern + "|" + self.config.get_deny_pattern_string() - if self.config.get_deny_pattern_string() - else base_deny_pattern, - logical_operator="AND" if use_allow_filter else "", - ) - - logger.debug( - f"use_allow_filter={use_allow_filter}, use_deny_filter={use_deny_filter}, " - f"allow_regex={allow_regex}, deny_regex={deny_regex}" - ) - start_time = (self.config.start_time - self.config.max_query_duration).strftime( - BQ_DATETIME_FORMAT - ) - self.report.log_entry_start_time = start_time - end_time = (self.config.end_time + self.config.max_query_duration).strftime( - BQ_DATETIME_FORMAT - ) - self.report.log_entry_end_time = end_time - filter = audit_templates["BQ_FILTER_RULE_TEMPLATE"].format( - start_time=start_time, - end_time=end_time, - allow_regex=allow_regex, - deny_regex=deny_regex, - ) + filter = self._generate_filter(audit_templates) logger.debug(filter) list_entry_generators_across_clients: List[ @@ -1008,6 +964,51 @@ class BigQueryUsageSource(Source): yield entry logger.info(f"Finished loading {i} log entries from GCP Logging") + def _generate_filter(self, audit_templates: Dict[str, str]) -> str: + # We adjust the filter values a bit, since we need to make sure that the join + # between query events and read events is complete. For example, this helps us + # handle the case where the read happens within our time range but the query + # completion event is delayed and happens after the configured end time. + # Can safely access the first index of the allow list as it by default contains ".*" + use_allow_filter = self.config.table_pattern and ( + len(self.config.table_pattern.allow) > 1 + or self.config.table_pattern.allow[0] != ".*" + ) + use_deny_filter = self.config.table_pattern and self.config.table_pattern.deny + allow_regex = ( + audit_templates["BQ_FILTER_REGEX_ALLOW_TEMPLATE"].format( + allow_pattern=self.config.get_allow_pattern_string() + ) + if use_allow_filter + else "" + ) + base_deny_pattern: str = "__TABLES_SUMMARY__|INFORMATION_SCHEMA" + deny_regex = audit_templates["BQ_FILTER_REGEX_DENY_TEMPLATE"].format( + deny_pattern=base_deny_pattern + "|" + self.config.get_deny_pattern_string() + if self.config.get_deny_pattern_string() + else base_deny_pattern, + logical_operator="AND" if use_allow_filter else "", + ) + logger.debug( + f"use_allow_filter={use_allow_filter}, use_deny_filter={use_deny_filter}, " + f"allow_regex={allow_regex}, deny_regex={deny_regex}" + ) + start_time = (self.config.start_time - self.config.max_query_duration).strftime( + BQ_DATETIME_FORMAT + ) + self.report.log_entry_start_time = start_time + end_time = (self.config.end_time + self.config.max_query_duration).strftime( + BQ_DATETIME_FORMAT + ) + self.report.log_entry_end_time = end_time + filter = audit_templates["BQ_FILTER_RULE_TEMPLATE"].format( + start_time=start_time, + end_time=end_time, + allow_regex=allow_regex, + deny_regex=deny_regex, + ) + return filter + def _create_operation_aspect_work_unit( self, event: AuditEvent ) -> Optional[MetadataWorkUnit]: diff --git a/metadata-ingestion/tests/unit/test_bigquery_usage_source.py b/metadata-ingestion/tests/unit/test_bigquery_usage_source.py index 95c18a3839..c9e9551362 100644 --- a/metadata-ingestion/tests/unit/test_bigquery_usage_source.py +++ b/metadata-ingestion/tests/unit/test_bigquery_usage_source.py @@ -1,7 +1,16 @@ import json import os -from datahub.ingestion.source.usage.bigquery_usage import BigQueryUsageConfig +from freezegun import freeze_time + +from datahub.ingestion.api.common import PipelineContext +from datahub.ingestion.source.usage.bigquery_usage import ( + BQ_AUDIT_V1, + BigQueryUsageConfig, + BigQueryUsageSource, +) + +FROZEN_TIME = "2021-07-20 00:00:00" def test_bigquery_uri_with_credential(): @@ -47,3 +56,111 @@ def test_bigquery_uri_with_credential(): if config._credentials_path: os.unlink(str(config._credentials_path)) raise e + + +@freeze_time(FROZEN_TIME) +def test_bigquery_filters_with_allow_filter(): + config = { + "project_id": "test-project", + "credential": { + "project_id": "test-project", + "private_key_id": "test-private-key", + "private_key": "random_private_key", + "client_email": "test@acryl.io", + "client_id": "test_client-id", + }, + "table_pattern": {"allow": ["test-regex", "test-regex-1"], "deny": []}, + } + expected_filter: str = """protoPayload.serviceName="bigquery.googleapis.com" +AND +( + ( + protoPayload.methodName="jobservice.jobcompleted" + AND + protoPayload.serviceData.jobCompletedEvent.eventName="query_job_completed" + AND + protoPayload.serviceData.jobCompletedEvent.job.jobStatus.state="DONE" + AND + NOT protoPayload.serviceData.jobCompletedEvent.job.jobStatus.error.code:* + ) + OR + ( + protoPayload.metadata.tableDataRead:* + ) +) +AND ( + +protoPayload.serviceData.jobCompletedEvent.job.jobStatistics.referencedTables.tableId =~ "test-regex|test-regex-1" + + +AND +protoPayload.serviceData.jobCompletedEvent.job.jobStatistics.referencedTables.tableId !~ "__TABLES_SUMMARY__|INFORMATION_SCHEMA" + + OR + protoPayload.metadata.tableDataRead.reason = "JOB" +) +AND +timestamp >= "2021-07-18T23:45:00Z" +AND +timestamp < "2021-07-21T00:15:00Z\"""" # noqa: W293 + + source = BigQueryUsageSource.create(config, PipelineContext(run_id="bq-usage-test")) + + # source: BigQueryUsageSource = BigQueryUsageSource( + # config=config, ctx=PipelineContext(run_id="test") + # ) + filter: str = source._generate_filter(BQ_AUDIT_V1) + assert filter == expected_filter + + +@freeze_time(FROZEN_TIME) +def test_bigquery_filters_with_deny_filter(): + config = { + "project_id": "test-project", + "credential": { + "project_id": "test-project", + "private_key_id": "test-private-key", + "private_key": "random_private_key", + "client_email": "test@acryl.io", + "client_id": "test_client-id", + }, + "table_pattern": { + "allow": ["test-regex", "test-regex-1"], + "deny": ["excluded_table_regex", "excluded-regex-2"], + }, + } + expected_filter: str = """protoPayload.serviceName="bigquery.googleapis.com" +AND +( + ( + protoPayload.methodName="jobservice.jobcompleted" + AND + protoPayload.serviceData.jobCompletedEvent.eventName="query_job_completed" + AND + protoPayload.serviceData.jobCompletedEvent.job.jobStatus.state="DONE" + AND + NOT protoPayload.serviceData.jobCompletedEvent.job.jobStatus.error.code:* + ) + OR + ( + protoPayload.metadata.tableDataRead:* + ) +) +AND ( + +protoPayload.serviceData.jobCompletedEvent.job.jobStatistics.referencedTables.tableId =~ "test-regex|test-regex-1" + + +AND +protoPayload.serviceData.jobCompletedEvent.job.jobStatistics.referencedTables.tableId !~ "__TABLES_SUMMARY__|INFORMATION_SCHEMA|excluded_table_regex|excluded-regex-2" + + OR + protoPayload.metadata.tableDataRead.reason = "JOB" +) +AND +timestamp >= "2021-07-18T23:45:00Z" +AND +timestamp < "2021-07-21T00:15:00Z\"""" # noqa: W293 + source = BigQueryUsageSource.create(config, PipelineContext(run_id="bq-usage-test")) + filter: str = source._generate_filter(BQ_AUDIT_V1) + assert filter == expected_filter