mirror of
https://github.com/datahub-project/datahub.git
synced 2025-09-25 17:15:09 +00:00
fix(ingest/bigquery): fix and refractor exported audit logs query (#7699)
This commit is contained in:
parent
062e8ae323
commit
20504aae70
@ -109,26 +109,18 @@ timestamp < "{end_time}"
|
||||
:param limit: set a limit for the maximum event to return. It is used for connection testing currently
|
||||
:return: a query template, when supplied start_time and end_time, can be used to query audit logs from BigQuery
|
||||
"""
|
||||
query: str
|
||||
limit_text = f"limit {limit}" if limit else ""
|
||||
|
||||
shard_condition = ""
|
||||
if use_date_sharded_tables:
|
||||
query = (
|
||||
f"""
|
||||
SELECT
|
||||
timestamp,
|
||||
logName,
|
||||
insertId,
|
||||
protopayload_auditlog AS protoPayload,
|
||||
protopayload_auditlog.metadataJson AS metadata
|
||||
FROM
|
||||
`{dataset}.cloudaudit_googleapis_com_data_access_*`
|
||||
"""
|
||||
+ """
|
||||
WHERE
|
||||
_TABLE_SUFFIX BETWEEN "{start_time}" AND "{end_time}" AND
|
||||
"""
|
||||
from_table = f"`{dataset}.cloudaudit_googleapis_com_data_access_*`"
|
||||
shard_condition = (
|
||||
""" AND _TABLE_SUFFIX BETWEEN "{start_date}" AND "{end_date}" """
|
||||
)
|
||||
else:
|
||||
query = f"""
|
||||
from_table = f"`{dataset}.cloudaudit_googleapis_com_data_access`"
|
||||
|
||||
query = f"""
|
||||
SELECT
|
||||
timestamp,
|
||||
logName,
|
||||
@ -136,22 +128,19 @@ timestamp < "{end_time}"
|
||||
protopayload_auditlog AS protoPayload,
|
||||
protopayload_auditlog.metadataJson AS metadata
|
||||
FROM
|
||||
`{dataset}.cloudaudit_googleapis_com_data_access`
|
||||
WHERE
|
||||
"""
|
||||
|
||||
audit_log_filter = """ timestamp >= "{start_time}"
|
||||
AND timestamp < "{end_time}"
|
||||
AND protopayload_auditlog.serviceName="bigquery.googleapis.com"
|
||||
AND JSON_EXTRACT_SCALAR(protopayload_auditlog.metadataJson, "$.jobChange.job.jobStatus.jobState") = "DONE"
|
||||
AND JSON_EXTRACT(protopayload_auditlog.metadataJson, "$.jobChange.job.jobStatus.errorResults") IS NULL
|
||||
AND JSON_EXTRACT(protopayload_auditlog.metadataJson, "$.jobChange.job.jobConfig.queryConfig") IS NOT NULL
|
||||
{from_table}
|
||||
WHERE (
|
||||
timestamp >= "{{start_time}}"
|
||||
AND timestamp < "{{end_time}}"
|
||||
)
|
||||
{shard_condition}
|
||||
AND protopayload_auditlog.serviceName="bigquery.googleapis.com"
|
||||
AND JSON_EXTRACT_SCALAR(protopayload_auditlog.metadataJson, "$.jobChange.job.jobStatus.jobState") = "DONE"
|
||||
AND JSON_EXTRACT(protopayload_auditlog.metadataJson, "$.jobChange.job.jobStatus.errorResults") IS NULL
|
||||
AND JSON_EXTRACT(protopayload_auditlog.metadataJson, "$.jobChange.job.jobConfig.queryConfig") IS NOT NULL
|
||||
{limit_text};
|
||||
"""
|
||||
|
||||
if limit is not None:
|
||||
audit_log_filter = audit_log_filter + f" LIMIT {limit}"
|
||||
query = textwrap.dedent(query) + audit_log_filter + ";"
|
||||
|
||||
return textwrap.dedent(query)
|
||||
|
||||
def lineage_via_catalog_lineage_api(
|
||||
@ -341,6 +330,7 @@ timestamp < "{end_time}"
|
||||
def _get_exported_bigquery_audit_metadata(
|
||||
self, bigquery_client: BigQueryClient, limit: Optional[int] = None
|
||||
) -> Iterable[BigQueryAuditMetadata]:
|
||||
|
||||
if self.config.bigquery_audit_metadata_datasets is None:
|
||||
self.error(
|
||||
logger, "audit-metadata", "bigquery_audit_metadata_datasets not set"
|
||||
@ -348,22 +338,14 @@ timestamp < "{end_time}"
|
||||
self.report.bigquery_audit_metadata_datasets_missing = True
|
||||
return
|
||||
|
||||
start_time: str = (
|
||||
self.config.start_time - self.config.max_query_duration
|
||||
).strftime(
|
||||
BQ_DATE_SHARD_FORMAT
|
||||
if self.config.use_date_sharded_audit_log_tables
|
||||
else BQ_DATETIME_FORMAT
|
||||
)
|
||||
corrected_start_time = self.config.start_time - self.config.max_query_duration
|
||||
start_time = corrected_start_time.strftime(BQ_DATETIME_FORMAT)
|
||||
start_date = corrected_start_time.strftime(BQ_DATE_SHARD_FORMAT)
|
||||
self.report.audit_start_time = start_time
|
||||
|
||||
end_time: str = (
|
||||
self.config.end_time + self.config.max_query_duration
|
||||
).strftime(
|
||||
BQ_DATE_SHARD_FORMAT
|
||||
if self.config.use_date_sharded_audit_log_tables
|
||||
else BQ_DATETIME_FORMAT
|
||||
)
|
||||
corrected_end_time = self.config.end_time + self.config.max_query_duration
|
||||
end_time = corrected_end_time.strftime(BQ_DATETIME_FORMAT)
|
||||
end_date = corrected_end_time.strftime(BQ_DATE_SHARD_FORMAT)
|
||||
self.report.audit_end_time = end_time
|
||||
|
||||
for dataset in self.config.bigquery_audit_metadata_datasets:
|
||||
@ -374,13 +356,14 @@ timestamp < "{end_time}"
|
||||
query: str = self.bigquery_audit_metadata_query_template(
|
||||
dataset=dataset,
|
||||
use_date_sharded_tables=self.config.use_date_sharded_audit_log_tables,
|
||||
limit=limit,
|
||||
).format(
|
||||
start_time=start_time,
|
||||
end_time=end_time,
|
||||
start_date=start_date,
|
||||
end_date=end_date,
|
||||
)
|
||||
|
||||
if limit is not None:
|
||||
query = query + f" LIMIT {limit}"
|
||||
query_job = bigquery_client.query(query)
|
||||
|
||||
logger.info(
|
||||
|
@ -100,26 +100,18 @@ def bigquery_audit_metadata_query_template(
|
||||
where REGEXP_CONTAINS(x, r'(projects/.*/datasets/.*/tables/{table_allow_filter if table_allow_filter else ".*"})'))
|
||||
"""
|
||||
|
||||
query: str
|
||||
limit_text = f"limit {limit}" if limit else ""
|
||||
|
||||
shard_condition = ""
|
||||
if use_date_sharded_tables:
|
||||
query = (
|
||||
f"""
|
||||
SELECT
|
||||
timestamp,
|
||||
logName,
|
||||
insertId,
|
||||
protopayload_auditlog AS protoPayload,
|
||||
protopayload_auditlog.metadataJson AS metadata
|
||||
FROM
|
||||
`{dataset}.cloudaudit_googleapis_com_data_access_*`
|
||||
"""
|
||||
+ """
|
||||
WHERE
|
||||
_TABLE_SUFFIX BETWEEN "{start_time}" AND "{end_time}"
|
||||
"""
|
||||
from_table = f"`{dataset}.cloudaudit_googleapis_com_data_access_*`"
|
||||
shard_condition = (
|
||||
""" AND _TABLE_SUFFIX BETWEEN "{start_date}" AND "{end_date}" """
|
||||
)
|
||||
else:
|
||||
query = f"""
|
||||
from_table = f"`{dataset}.cloudaudit_googleapis_com_data_access`"
|
||||
|
||||
query = f"""
|
||||
SELECT
|
||||
timestamp,
|
||||
logName,
|
||||
@ -127,34 +119,25 @@ def bigquery_audit_metadata_query_template(
|
||||
protopayload_auditlog AS protoPayload,
|
||||
protopayload_auditlog.metadataJson AS metadata
|
||||
FROM
|
||||
`{dataset}.cloudaudit_googleapis_com_data_access`
|
||||
WHERE 1=1
|
||||
"""
|
||||
audit_log_filter_timestamps = """AND (timestamp >= "{start_time}"
|
||||
AND timestamp < "{end_time}"
|
||||
)
|
||||
"""
|
||||
audit_log_filter_query_complete = f"""
|
||||
AND (
|
||||
(
|
||||
protopayload_auditlog.serviceName="bigquery.googleapis.com"
|
||||
AND JSON_EXTRACT_SCALAR(protopayload_auditlog.metadataJson, "$.jobChange.job.jobStatus.jobState") = "DONE"
|
||||
AND JSON_EXTRACT(protopayload_auditlog.metadataJson, "$.jobChange.job.jobConfig.queryConfig") IS NOT NULL
|
||||
{allow_filter}
|
||||
)
|
||||
{from_table}
|
||||
WHERE (
|
||||
timestamp >= "{{start_time}}"
|
||||
AND timestamp < "{{end_time}}"
|
||||
)
|
||||
{shard_condition}
|
||||
AND (
|
||||
(
|
||||
protopayload_auditlog.serviceName="bigquery.googleapis.com"
|
||||
AND JSON_EXTRACT_SCALAR(protopayload_auditlog.metadataJson, "$.jobChange.job.jobStatus.jobState") = "DONE"
|
||||
AND JSON_EXTRACT(protopayload_auditlog.metadataJson, "$.jobChange.job.jobConfig.queryConfig") IS NOT NULL
|
||||
{allow_filter}
|
||||
)
|
||||
OR
|
||||
JSON_EXTRACT_SCALAR(protopayload_auditlog.metadataJson, "$.tableDataRead.reason") = "JOB"
|
||||
)
|
||||
)
|
||||
{limit_text};
|
||||
"""
|
||||
|
||||
limit_text = f"limit {limit}" if limit else ""
|
||||
query = (
|
||||
textwrap.dedent(query)
|
||||
+ audit_log_filter_query_complete
|
||||
+ audit_log_filter_timestamps
|
||||
+ limit_text
|
||||
)
|
||||
|
||||
return textwrap.dedent(query)
|
||||
|
||||
|
||||
@ -276,22 +259,14 @@ class BigQueryUsageExtractor:
|
||||
if self.config.bigquery_audit_metadata_datasets is None:
|
||||
return
|
||||
|
||||
start_time: str = (
|
||||
self.config.start_time - self.config.max_query_duration
|
||||
).strftime(
|
||||
BQ_DATE_SHARD_FORMAT
|
||||
if self.config.use_date_sharded_audit_log_tables
|
||||
else BQ_DATETIME_FORMAT
|
||||
)
|
||||
corrected_start_time = self.config.start_time - self.config.max_query_duration
|
||||
start_time = corrected_start_time.strftime(BQ_DATETIME_FORMAT)
|
||||
start_date = corrected_start_time.strftime(BQ_DATE_SHARD_FORMAT)
|
||||
self.report.audit_start_time = start_time
|
||||
|
||||
end_time: str = (
|
||||
self.config.end_time + self.config.max_query_duration
|
||||
).strftime(
|
||||
BQ_DATE_SHARD_FORMAT
|
||||
if self.config.use_date_sharded_audit_log_tables
|
||||
else BQ_DATETIME_FORMAT
|
||||
)
|
||||
corrected_end_time = self.config.end_time + self.config.max_query_duration
|
||||
end_time = corrected_end_time.strftime(BQ_DATETIME_FORMAT)
|
||||
end_date = corrected_end_time.strftime(BQ_DATE_SHARD_FORMAT)
|
||||
self.report.audit_end_time = end_time
|
||||
|
||||
for dataset in self.config.bigquery_audit_metadata_datasets:
|
||||
@ -303,10 +278,12 @@ class BigQueryUsageExtractor:
|
||||
dataset,
|
||||
self.config.use_date_sharded_audit_log_tables,
|
||||
allow_filter,
|
||||
limit,
|
||||
limit=limit,
|
||||
).format(
|
||||
start_time=start_time,
|
||||
end_time=end_time,
|
||||
start_date=start_date,
|
||||
end_date=end_date,
|
||||
)
|
||||
|
||||
query_job = bigquery_client.query(query)
|
||||
|
Loading…
x
Reference in New Issue
Block a user