mirror of
https://github.com/datahub-project/datahub.git
synced 2025-07-24 18:10:11 +00:00
refactor(ingest): bigquery-usage - Adding tests for bigquery usage filters (#5195)
This commit is contained in:
parent
46e84026a3
commit
393c07ee52
@ -922,51 +922,7 @@ class BigQueryUsageSource(Source):
|
||||
if self.config.use_v2_audit_metadata:
|
||||
audit_templates = BQ_AUDIT_V2
|
||||
|
||||
# We adjust the filter values a bit, since we need to make sure that the join
|
||||
# between query events and read events is complete. For example, this helps us
|
||||
# handle the case where the read happens within our time range but the query
|
||||
# completion event is delayed and happens after the configured end time.
|
||||
|
||||
# Can safely access the first index of the allow list as it by default contains ".*"
|
||||
use_allow_filter = self.config.table_pattern and (
|
||||
len(self.config.table_pattern.allow) > 1
|
||||
or self.config.table_pattern.allow[0] != ".*"
|
||||
)
|
||||
use_deny_filter = self.config.table_pattern and self.config.table_pattern.deny
|
||||
allow_regex = (
|
||||
audit_templates["BQ_FILTER_REGEX_ALLOW_TEMPLATE"].format(
|
||||
allow_pattern=self.config.get_allow_pattern_string()
|
||||
)
|
||||
if use_allow_filter
|
||||
else ""
|
||||
)
|
||||
|
||||
base_deny_pattern: str = "__TABLES_SUMMARY__|INFORMATION_SCHEMA"
|
||||
deny_regex = audit_templates["BQ_FILTER_REGEX_DENY_TEMPLATE"].format(
|
||||
deny_pattern=base_deny_pattern + "|" + self.config.get_deny_pattern_string()
|
||||
if self.config.get_deny_pattern_string()
|
||||
else base_deny_pattern,
|
||||
logical_operator="AND" if use_allow_filter else "",
|
||||
)
|
||||
|
||||
logger.debug(
|
||||
f"use_allow_filter={use_allow_filter}, use_deny_filter={use_deny_filter}, "
|
||||
f"allow_regex={allow_regex}, deny_regex={deny_regex}"
|
||||
)
|
||||
start_time = (self.config.start_time - self.config.max_query_duration).strftime(
|
||||
BQ_DATETIME_FORMAT
|
||||
)
|
||||
self.report.log_entry_start_time = start_time
|
||||
end_time = (self.config.end_time + self.config.max_query_duration).strftime(
|
||||
BQ_DATETIME_FORMAT
|
||||
)
|
||||
self.report.log_entry_end_time = end_time
|
||||
filter = audit_templates["BQ_FILTER_RULE_TEMPLATE"].format(
|
||||
start_time=start_time,
|
||||
end_time=end_time,
|
||||
allow_regex=allow_regex,
|
||||
deny_regex=deny_regex,
|
||||
)
|
||||
filter = self._generate_filter(audit_templates)
|
||||
logger.debug(filter)
|
||||
|
||||
list_entry_generators_across_clients: List[
|
||||
@ -1008,6 +964,51 @@ class BigQueryUsageSource(Source):
|
||||
yield entry
|
||||
logger.info(f"Finished loading {i} log entries from GCP Logging")
|
||||
|
||||
def _generate_filter(self, audit_templates: Dict[str, str]) -> str:
|
||||
# We adjust the filter values a bit, since we need to make sure that the join
|
||||
# between query events and read events is complete. For example, this helps us
|
||||
# handle the case where the read happens within our time range but the query
|
||||
# completion event is delayed and happens after the configured end time.
|
||||
# Can safely access the first index of the allow list as it by default contains ".*"
|
||||
use_allow_filter = self.config.table_pattern and (
|
||||
len(self.config.table_pattern.allow) > 1
|
||||
or self.config.table_pattern.allow[0] != ".*"
|
||||
)
|
||||
use_deny_filter = self.config.table_pattern and self.config.table_pattern.deny
|
||||
allow_regex = (
|
||||
audit_templates["BQ_FILTER_REGEX_ALLOW_TEMPLATE"].format(
|
||||
allow_pattern=self.config.get_allow_pattern_string()
|
||||
)
|
||||
if use_allow_filter
|
||||
else ""
|
||||
)
|
||||
base_deny_pattern: str = "__TABLES_SUMMARY__|INFORMATION_SCHEMA"
|
||||
deny_regex = audit_templates["BQ_FILTER_REGEX_DENY_TEMPLATE"].format(
|
||||
deny_pattern=base_deny_pattern + "|" + self.config.get_deny_pattern_string()
|
||||
if self.config.get_deny_pattern_string()
|
||||
else base_deny_pattern,
|
||||
logical_operator="AND" if use_allow_filter else "",
|
||||
)
|
||||
logger.debug(
|
||||
f"use_allow_filter={use_allow_filter}, use_deny_filter={use_deny_filter}, "
|
||||
f"allow_regex={allow_regex}, deny_regex={deny_regex}"
|
||||
)
|
||||
start_time = (self.config.start_time - self.config.max_query_duration).strftime(
|
||||
BQ_DATETIME_FORMAT
|
||||
)
|
||||
self.report.log_entry_start_time = start_time
|
||||
end_time = (self.config.end_time + self.config.max_query_duration).strftime(
|
||||
BQ_DATETIME_FORMAT
|
||||
)
|
||||
self.report.log_entry_end_time = end_time
|
||||
filter = audit_templates["BQ_FILTER_RULE_TEMPLATE"].format(
|
||||
start_time=start_time,
|
||||
end_time=end_time,
|
||||
allow_regex=allow_regex,
|
||||
deny_regex=deny_regex,
|
||||
)
|
||||
return filter
|
||||
|
||||
def _create_operation_aspect_work_unit(
|
||||
self, event: AuditEvent
|
||||
) -> Optional[MetadataWorkUnit]:
|
||||
|
@ -1,7 +1,16 @@
|
||||
import json
|
||||
import os
|
||||
|
||||
from datahub.ingestion.source.usage.bigquery_usage import BigQueryUsageConfig
|
||||
from freezegun import freeze_time
|
||||
|
||||
from datahub.ingestion.api.common import PipelineContext
|
||||
from datahub.ingestion.source.usage.bigquery_usage import (
|
||||
BQ_AUDIT_V1,
|
||||
BigQueryUsageConfig,
|
||||
BigQueryUsageSource,
|
||||
)
|
||||
|
||||
FROZEN_TIME = "2021-07-20 00:00:00"
|
||||
|
||||
|
||||
def test_bigquery_uri_with_credential():
|
||||
@ -47,3 +56,111 @@ def test_bigquery_uri_with_credential():
|
||||
if config._credentials_path:
|
||||
os.unlink(str(config._credentials_path))
|
||||
raise e
|
||||
|
||||
|
||||
@freeze_time(FROZEN_TIME)
|
||||
def test_bigquery_filters_with_allow_filter():
|
||||
config = {
|
||||
"project_id": "test-project",
|
||||
"credential": {
|
||||
"project_id": "test-project",
|
||||
"private_key_id": "test-private-key",
|
||||
"private_key": "random_private_key",
|
||||
"client_email": "test@acryl.io",
|
||||
"client_id": "test_client-id",
|
||||
},
|
||||
"table_pattern": {"allow": ["test-regex", "test-regex-1"], "deny": []},
|
||||
}
|
||||
expected_filter: str = """protoPayload.serviceName="bigquery.googleapis.com"
|
||||
AND
|
||||
(
|
||||
(
|
||||
protoPayload.methodName="jobservice.jobcompleted"
|
||||
AND
|
||||
protoPayload.serviceData.jobCompletedEvent.eventName="query_job_completed"
|
||||
AND
|
||||
protoPayload.serviceData.jobCompletedEvent.job.jobStatus.state="DONE"
|
||||
AND
|
||||
NOT protoPayload.serviceData.jobCompletedEvent.job.jobStatus.error.code:*
|
||||
)
|
||||
OR
|
||||
(
|
||||
protoPayload.metadata.tableDataRead:*
|
||||
)
|
||||
)
|
||||
AND (
|
||||
|
||||
protoPayload.serviceData.jobCompletedEvent.job.jobStatistics.referencedTables.tableId =~ "test-regex|test-regex-1"
|
||||
|
||||
|
||||
AND
|
||||
protoPayload.serviceData.jobCompletedEvent.job.jobStatistics.referencedTables.tableId !~ "__TABLES_SUMMARY__|INFORMATION_SCHEMA"
|
||||
|
||||
OR
|
||||
protoPayload.metadata.tableDataRead.reason = "JOB"
|
||||
)
|
||||
AND
|
||||
timestamp >= "2021-07-18T23:45:00Z"
|
||||
AND
|
||||
timestamp < "2021-07-21T00:15:00Z\"""" # noqa: W293
|
||||
|
||||
source = BigQueryUsageSource.create(config, PipelineContext(run_id="bq-usage-test"))
|
||||
|
||||
# source: BigQueryUsageSource = BigQueryUsageSource(
|
||||
# config=config, ctx=PipelineContext(run_id="test")
|
||||
# )
|
||||
filter: str = source._generate_filter(BQ_AUDIT_V1)
|
||||
assert filter == expected_filter
|
||||
|
||||
|
||||
@freeze_time(FROZEN_TIME)
|
||||
def test_bigquery_filters_with_deny_filter():
|
||||
config = {
|
||||
"project_id": "test-project",
|
||||
"credential": {
|
||||
"project_id": "test-project",
|
||||
"private_key_id": "test-private-key",
|
||||
"private_key": "random_private_key",
|
||||
"client_email": "test@acryl.io",
|
||||
"client_id": "test_client-id",
|
||||
},
|
||||
"table_pattern": {
|
||||
"allow": ["test-regex", "test-regex-1"],
|
||||
"deny": ["excluded_table_regex", "excluded-regex-2"],
|
||||
},
|
||||
}
|
||||
expected_filter: str = """protoPayload.serviceName="bigquery.googleapis.com"
|
||||
AND
|
||||
(
|
||||
(
|
||||
protoPayload.methodName="jobservice.jobcompleted"
|
||||
AND
|
||||
protoPayload.serviceData.jobCompletedEvent.eventName="query_job_completed"
|
||||
AND
|
||||
protoPayload.serviceData.jobCompletedEvent.job.jobStatus.state="DONE"
|
||||
AND
|
||||
NOT protoPayload.serviceData.jobCompletedEvent.job.jobStatus.error.code:*
|
||||
)
|
||||
OR
|
||||
(
|
||||
protoPayload.metadata.tableDataRead:*
|
||||
)
|
||||
)
|
||||
AND (
|
||||
|
||||
protoPayload.serviceData.jobCompletedEvent.job.jobStatistics.referencedTables.tableId =~ "test-regex|test-regex-1"
|
||||
|
||||
|
||||
AND
|
||||
protoPayload.serviceData.jobCompletedEvent.job.jobStatistics.referencedTables.tableId !~ "__TABLES_SUMMARY__|INFORMATION_SCHEMA|excluded_table_regex|excluded-regex-2"
|
||||
|
||||
OR
|
||||
protoPayload.metadata.tableDataRead.reason = "JOB"
|
||||
)
|
||||
AND
|
||||
timestamp >= "2021-07-18T23:45:00Z"
|
||||
AND
|
||||
timestamp < "2021-07-21T00:15:00Z\"""" # noqa: W293
|
||||
source = BigQueryUsageSource.create(config, PipelineContext(run_id="bq-usage-test"))
|
||||
filter: str = source._generate_filter(BQ_AUDIT_V1)
|
||||
assert filter == expected_filter
|
||||
|
Loading…
x
Reference in New Issue
Block a user