mirror of
https://github.com/datahub-project/datahub.git
synced 2026-01-08 15:56:13 +00:00
revert(bigquery-usage): dataset allow filter impl (#4901)
* Revert "fix(ingestion): bigquery-usage: Fix biquery usage table deny pattern template (#4898)"
This commit is contained in:
parent
b6c35a610c
commit
b9f78026c8
@ -74,13 +74,11 @@ BQ_DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ"
|
||||
BQ_DATE_SHARD_FORMAT = "%Y%m%d"
|
||||
BQ_AUDIT_V1 = {
|
||||
"BQ_FILTER_REGEX_ALLOW_TEMPLATE": """
|
||||
protoPayload.serviceData.jobCompletedEvent.job.jobStatistics.referencedTables.tableId =~ "{table_allow_pattern}"
|
||||
AND
|
||||
protoPayload.serviceData.jobCompletedEvent.job.jobStatistics.referencedTables.datasetId =~ "{dataset_allow_pattern}"
|
||||
protoPayload.serviceData.jobCompletedEvent.job.jobStatistics.referencedTables.tableId =~ "{allow_pattern}"
|
||||
""",
|
||||
"BQ_FILTER_REGEX_DENY_TEMPLATE": """
|
||||
{logical_operator}
|
||||
protoPayload.serviceData.jobCompletedEvent.job.jobStatistics.referencedTables.tableId !~ "{table_deny_pattern}"
|
||||
protoPayload.serviceData.jobCompletedEvent.job.jobStatistics.referencedTables.tableId !~ "{deny_pattern}"
|
||||
""",
|
||||
"BQ_FILTER_RULE_TEMPLATE": """
|
||||
protoPayload.serviceName="bigquery.googleapis.com"
|
||||
@ -115,11 +113,11 @@ timestamp < "{end_time}"
|
||||
|
||||
BQ_AUDIT_V2 = {
|
||||
"BQ_FILTER_REGEX_ALLOW_TEMPLATE": """
|
||||
protoPayload.metadata.jobChange.job.jobStats.queryStats.referencedTables =~ "projects/.*/datasets/{dataset_allow_pattern}/tables/{table_allow_pattern}"
|
||||
protoPayload.metadata.jobChange.job.jobStats.queryStats.referencedTables =~ "projects/.*/datasets/.*/tables/{allow_pattern}"
|
||||
""",
|
||||
"BQ_FILTER_REGEX_DENY_TEMPLATE": """
|
||||
{logical_operator}
|
||||
protoPayload.metadata.jobChange.job.jobStats.queryStats.referencedTables !~ "projects/.*/datasets/.*/tables/{table_deny_pattern}"
|
||||
protoPayload.metadata.jobChange.job.jobStats.queryStats.referencedTables !~ "projects/.*/datasets/.*/tables/{deny_pattern}"
|
||||
""",
|
||||
"BQ_FILTER_RULE_TEMPLATE": """
|
||||
resource.type=("bigquery_project" OR "bigquery_dataset")
|
||||
@ -171,8 +169,7 @@ OPERATION_STATEMENT_TYPES = {
|
||||
def bigquery_audit_metadata_query_template(
|
||||
dataset: str,
|
||||
use_date_sharded_tables: bool,
|
||||
table_allow_filter: str,
|
||||
dataset_allow_filter: str,
|
||||
table_allow_filter: str = None,
|
||||
) -> str:
|
||||
"""
|
||||
Receives a dataset (with project specified) and returns a query template that is used to query exported
|
||||
@ -180,15 +177,14 @@ def bigquery_audit_metadata_query_template(
|
||||
:param dataset: the dataset to query against in the form of $PROJECT.$DATASET
|
||||
:param use_date_sharded_tables: whether to read from date sharded audit log tables or time partitioned audit log
|
||||
tables
|
||||
:param table_allow_filter: regex used to filter on log events that contain the wanted tables
|
||||
:param dataset_allow_filter: regex used to filter on log events that contain wanted datasets
|
||||
:param table_allow_filter: regex used to filter on log events that contain the wanted datasets
|
||||
:return: a query template, when supplied start_time and end_time, can be used to query audit logs from BigQuery
|
||||
"""
|
||||
allow_filter = f"""
|
||||
AND EXISTS (SELECT *
|
||||
from UNNEST(JSON_EXTRACT_ARRAY(protopayload_auditlog.metadataJson,
|
||||
"$.jobChange.job.jobStats.queryStats.referencedTables")) AS x
|
||||
where REGEXP_CONTAINS(x, r'(projects/.*/datasets/{dataset_allow_filter if dataset_allow_filter else ".*"}/tables/{table_allow_filter if table_allow_filter else ".*"})'))
|
||||
where REGEXP_CONTAINS(x, r'(projects/.*/datasets/.*/tables/{table_allow_filter if table_allow_filter else ".*"})'))
|
||||
"""
|
||||
|
||||
query: str
|
||||
@ -655,8 +651,8 @@ class BigQueryUsageSource(Source):
|
||||
self.report.use_v2_audit_metadata = self.config.use_v2_audit_metadata
|
||||
self.report.query_log_delay = self.config.query_log_delay
|
||||
self.report.log_page_size = self.config.log_page_size
|
||||
self.report.allow_pattern = self.config.get_table_allow_pattern_string()
|
||||
self.report.deny_pattern = self.config.get_table_deny_pattern_string()
|
||||
self.report.allow_pattern = self.config.get_allow_pattern_string()
|
||||
self.report.deny_pattern = self.config.get_deny_pattern_string()
|
||||
|
||||
def _is_table_allowed(self, table_ref: Optional[BigQueryTableRef]) -> bool:
|
||||
return (
|
||||
@ -753,9 +749,7 @@ class BigQueryUsageSource(Source):
|
||||
list_entries: Iterable[
|
||||
BigQueryAuditMetadata
|
||||
] = self._get_exported_bigquery_audit_metadata(
|
||||
client,
|
||||
self.config.get_table_allow_pattern_string(),
|
||||
self.config.get_dataset_allow_pattern_string(),
|
||||
client, self.config.get_allow_pattern_string()
|
||||
)
|
||||
list_entry_generators_across_clients.append(list_entries)
|
||||
except Exception as e:
|
||||
@ -781,10 +775,7 @@ class BigQueryUsageSource(Source):
|
||||
logger.info(f"Finished loading {i} log entries from BigQuery")
|
||||
|
||||
def _get_exported_bigquery_audit_metadata(
|
||||
self,
|
||||
bigquery_client: BigQueryClient,
|
||||
table_allow_filter: str,
|
||||
dataset_allow_filter: str,
|
||||
self, bigquery_client: BigQueryClient, allow_filter: str
|
||||
) -> Iterable[BigQueryAuditMetadata]:
|
||||
if self.config.bigquery_audit_metadata_datasets is None:
|
||||
return
|
||||
@ -810,10 +801,7 @@ class BigQueryUsageSource(Source):
|
||||
).strftime(BQ_DATE_SHARD_FORMAT)
|
||||
|
||||
query = bigquery_audit_metadata_query_template(
|
||||
dataset,
|
||||
self.config.use_date_sharded_audit_log_tables,
|
||||
table_allow_filter,
|
||||
dataset_allow_filter,
|
||||
dataset, self.config.use_date_sharded_audit_log_tables, allow_filter
|
||||
).format(
|
||||
start_time=start_time,
|
||||
end_time=end_time,
|
||||
@ -822,11 +810,9 @@ class BigQueryUsageSource(Source):
|
||||
)
|
||||
else:
|
||||
query = bigquery_audit_metadata_query_template(
|
||||
dataset,
|
||||
self.config.use_date_sharded_audit_log_tables,
|
||||
table_allow_filter,
|
||||
dataset_allow_filter,
|
||||
dataset, self.config.use_date_sharded_audit_log_tables, allow_filter
|
||||
).format(start_time=start_time, end_time=end_time)
|
||||
|
||||
query_job = bigquery_client.query(query)
|
||||
logger.info(
|
||||
f"Finished loading log entries from BigQueryAuditMetadata in {dataset}"
|
||||
@ -852,30 +838,21 @@ class BigQueryUsageSource(Source):
|
||||
# completion event is delayed and happens after the configured end time.
|
||||
|
||||
# Can safely access the first index of the allow list as it by default contains ".*"
|
||||
use_allow_filter = (
|
||||
self.config.table_pattern
|
||||
and (
|
||||
len(self.config.table_pattern.allow) > 1
|
||||
or self.config.table_pattern.allow[0] != ".*"
|
||||
)
|
||||
or self.config.dataset_pattern
|
||||
and (
|
||||
len(self.config.dataset_pattern.allow) > 1
|
||||
or self.config.dataset_pattern.allow[0] != ".*"
|
||||
)
|
||||
use_allow_filter = self.config.table_pattern and (
|
||||
len(self.config.table_pattern.allow) > 1
|
||||
or self.config.table_pattern.allow[0] != ".*"
|
||||
)
|
||||
use_deny_filter = self.config.table_pattern and self.config.table_pattern.deny
|
||||
allow_regex = (
|
||||
audit_templates["BQ_FILTER_REGEX_ALLOW_TEMPLATE"].format(
|
||||
table_allow_pattern=self.config.get_table_allow_pattern_string(),
|
||||
dataset_allow_pattern=self.config.get_dataset_allow_pattern_string(),
|
||||
allow_pattern=self.config.get_allow_pattern_string()
|
||||
)
|
||||
if use_allow_filter
|
||||
else ""
|
||||
)
|
||||
deny_regex = (
|
||||
audit_templates["BQ_FILTER_REGEX_DENY_TEMPLATE"].format(
|
||||
table_deny_pattern=self.config.get_table_deny_pattern_string(),
|
||||
deny_pattern=self.config.get_deny_pattern_string(),
|
||||
logical_operator="AND" if use_allow_filter else "",
|
||||
)
|
||||
if use_deny_filter
|
||||
|
||||
@ -156,14 +156,8 @@ class BigQueryUsageConfig(DatasetSourceConfigBase, BaseUsageConfig):
|
||||
)
|
||||
return v
|
||||
|
||||
def get_table_allow_pattern_string(self) -> str:
|
||||
def get_allow_pattern_string(self) -> str:
|
||||
return "|".join(self.table_pattern.allow) if self.table_pattern else ""
|
||||
|
||||
def get_table_deny_pattern_string(self) -> str:
|
||||
def get_deny_pattern_string(self) -> str:
|
||||
return "|".join(self.table_pattern.deny) if self.table_pattern else ""
|
||||
|
||||
def get_dataset_allow_pattern_string(self) -> str:
|
||||
return "|".join(self.dataset_pattern.allow) if self.table_pattern else ""
|
||||
|
||||
def get_dataset_deny_pattern_string(self) -> str:
|
||||
return "|".join(self.dataset_pattern.deny) if self.table_pattern else ""
|
||||
|
||||
@ -32,8 +32,8 @@ def test_bq_usage_config():
|
||||
table_pattern={"allow": ["test-regex", "test-regex-1"], "deny": []},
|
||||
)
|
||||
)
|
||||
assert config.get_table_allow_pattern_string() == "test-regex|test-regex-1"
|
||||
assert config.get_table_deny_pattern_string() == ""
|
||||
assert config.get_allow_pattern_string() == "test-regex|test-regex-1"
|
||||
assert config.get_deny_pattern_string() == ""
|
||||
assert (config.end_time - config.start_time) == timedelta(hours=2)
|
||||
assert config.projects == ["sample-bigquery-project-name-1234"]
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user