mirror of
https://github.com/datahub-project/datahub.git
synced 2025-08-31 04:25:29 +00:00
feat(ingestion): support multiple project IDs in bigquery usage stats (#2920)
This commit is contained in:
parent
5f0b4464f5
commit
ad30f2b8ec
@ -798,7 +798,7 @@ Note: when `load_schemas` is False, models that use [identifiers](https://docs.g
|
||||
- Fetch a list of tables and columns accessed
|
||||
- Aggregate these statistics into buckets, by day or hour granularity
|
||||
|
||||
Note: the client must have one of the following OAuth scopes:
|
||||
Note: the client must have one of the following OAuth scopes, and should be authorized on all projects you'd like to ingest usage stats from.
|
||||
|
||||
- https://www.googleapis.com/auth/logging.read
|
||||
- https://www.googleapis.com/auth/logging.admin
|
||||
@ -809,7 +809,9 @@ Note: the client must have one of the following OAuth scopes:
|
||||
source:
|
||||
type: bigquery-usage
|
||||
config:
|
||||
project_id: project # optional - can autodetect from environment
|
||||
projects: # optional - can autodetect a single project from the environment
|
||||
- project_id_1
|
||||
- project_id_2
|
||||
options:
|
||||
# See https://googleapis.dev/python/logging/latest/client.html for details.
|
||||
credentials: ~ # optional - see docs
|
||||
|
@ -60,6 +60,8 @@ exclude_lines =
|
||||
pragma: no cover
|
||||
@abstract
|
||||
if TYPE_CHECKING:
|
||||
include =
|
||||
src/*
|
||||
omit =
|
||||
# omit codegen
|
||||
src/datahub/metadata/*
|
||||
|
@ -1,5 +1,6 @@
|
||||
import collections
|
||||
import dataclasses
|
||||
import heapq
|
||||
import logging
|
||||
import re
|
||||
from dataclasses import dataclass
|
||||
@ -214,12 +215,21 @@ class QueryEvent:
|
||||
|
||||
|
||||
class BigQueryUsageConfig(BaseUsageConfig):
|
||||
project_id: Optional[str] = None
|
||||
projects: Optional[List[str]] = None
|
||||
project_id: Optional[str] = None # deprecated in favor of `projects`
|
||||
extra_client_options: dict = {}
|
||||
env: str = builder.DEFAULT_ENV
|
||||
|
||||
query_log_delay: Optional[pydantic.PositiveInt] = None
|
||||
|
||||
@pydantic.validator("project_id")
|
||||
def note_project_id_deprecation(cls, v, values, **kwargs):
|
||||
logger.warning(
|
||||
"bigquery-usage project_id option is deprecated; use projects instead"
|
||||
)
|
||||
values["projects"] = [v]
|
||||
return None
|
||||
|
||||
|
||||
@dataclass
|
||||
class BigQueryUsageSourceReport(SourceReport):
|
||||
@ -233,28 +243,19 @@ class BigQueryUsageSource(Source):
|
||||
config: BigQueryUsageConfig
|
||||
report: BigQueryUsageSourceReport
|
||||
|
||||
client: GCPLoggingClient
|
||||
|
||||
def __init__(self, config: BigQueryUsageConfig, ctx: PipelineContext):
|
||||
super().__init__(ctx)
|
||||
self.config = config
|
||||
self.report = BigQueryUsageSourceReport()
|
||||
|
||||
client_options = self.config.extra_client_options.copy()
|
||||
if self.config.project_id is not None:
|
||||
client_options["project"] = self.config.project_id
|
||||
|
||||
# See https://github.com/googleapis/google-cloud-python/issues/2674 for
|
||||
# why we disable gRPC here.
|
||||
self.client = GCPLoggingClient(**client_options, _use_grpc=False)
|
||||
|
||||
@classmethod
|
||||
def create(cls, config_dict: dict, ctx: PipelineContext) -> "BigQueryUsageSource":
|
||||
config = BigQueryUsageConfig.parse_obj(config_dict)
|
||||
return cls(config, ctx)
|
||||
|
||||
def get_workunits(self) -> Iterable[UsageStatsWorkUnit]:
|
||||
bigquery_log_entries = self._get_bigquery_log_entries()
|
||||
clients = self._make_bigquery_clients()
|
||||
bigquery_log_entries = self._get_bigquery_log_entries(clients)
|
||||
parsed_events = self._parse_bigquery_log_entries(bigquery_log_entries)
|
||||
hydrated_read_events = self._join_events_by_job_id(parsed_events)
|
||||
aggregated_info = self._aggregate_enriched_read_events(hydrated_read_events)
|
||||
@ -265,15 +266,41 @@ class BigQueryUsageSource(Source):
|
||||
self.report.report_workunit(wu)
|
||||
yield wu
|
||||
|
||||
def _get_bigquery_log_entries(self) -> Iterable[AuditLogEntry]:
|
||||
def _make_bigquery_clients(self) -> List[GCPLoggingClient]:
|
||||
# See https://github.com/googleapis/google-cloud-python/issues/2674 for
|
||||
# why we disable gRPC here.
|
||||
client_options = self.config.extra_client_options.copy()
|
||||
client_options["_use_grpc"] = False
|
||||
if self.config.projects is None:
|
||||
return [
|
||||
GCPLoggingClient(**client_options),
|
||||
]
|
||||
else:
|
||||
return [
|
||||
GCPLoggingClient(**client_options, project=project_id)
|
||||
for project_id in self.config.projects
|
||||
]
|
||||
|
||||
def _get_bigquery_log_entries(
|
||||
self, clients: List[GCPLoggingClient]
|
||||
) -> Iterable[AuditLogEntry]:
|
||||
filter = BQ_FILTER_RULE_TEMPLATE.format(
|
||||
start_time=self.config.start_time.strftime(BQ_DATETIME_FORMAT),
|
||||
end_time=self.config.end_time.strftime(BQ_DATETIME_FORMAT),
|
||||
)
|
||||
|
||||
def get_entry_timestamp(entry: AuditLogEntry) -> datetime:
|
||||
return entry.timestamp
|
||||
|
||||
entry: AuditLogEntry
|
||||
for i, entry in enumerate(
|
||||
self.client.list_entries(filter_=filter, page_size=GCP_LOGGING_PAGE_SIZE)
|
||||
heapq.merge(
|
||||
*(
|
||||
client.list_entries(filter_=filter, page_size=GCP_LOGGING_PAGE_SIZE)
|
||||
for client in clients
|
||||
),
|
||||
key=get_entry_timestamp,
|
||||
)
|
||||
):
|
||||
if i == 0:
|
||||
logger.debug("starting log load from BigQuery")
|
||||
|
@ -15,7 +15,7 @@ from tests.test_helpers import mce_helpers
|
||||
WRITE_REFERENCE_FILE = False
|
||||
|
||||
|
||||
def test_config_time_defaults():
|
||||
def test_bq_usage_config():
|
||||
config = BigQueryUsageConfig.parse_obj(
|
||||
dict(
|
||||
project_id="sample-bigquery-project-name-1234",
|
||||
@ -23,6 +23,7 @@ def test_config_time_defaults():
|
||||
)
|
||||
)
|
||||
assert (config.end_time - config.start_time) == timedelta(hours=1)
|
||||
assert config.projects == ["sample-bigquery-project-name-1234"]
|
||||
|
||||
|
||||
def test_bq_usage_source(pytestconfig, tmp_path):
|
||||
@ -36,12 +37,16 @@ def test_bq_usage_source(pytestconfig, tmp_path):
|
||||
if WRITE_REFERENCE_FILE:
|
||||
source = BigQueryUsageSource.create(
|
||||
dict(
|
||||
project_id="harshal-playground-306419",
|
||||
projects=[
|
||||
"harshal-playground-306419",
|
||||
],
|
||||
start_time=datetime.now(tz=timezone.utc) - timedelta(days=25),
|
||||
),
|
||||
PipelineContext(run_id="bq-usage-test"),
|
||||
)
|
||||
entries = list(source._get_bigquery_log_entries())
|
||||
entries = list(
|
||||
source._get_bigquery_log_entries(source._make_bigquery_clients())
|
||||
)
|
||||
|
||||
entries = [entry._replace(logger=None) for entry in entries]
|
||||
log_entries = jsonpickle.encode(entries, indent=4)
|
||||
@ -62,7 +67,7 @@ def test_bq_usage_source(pytestconfig, tmp_path):
|
||||
"run_id": "test-bigquery-usage",
|
||||
"source": {
|
||||
"type": "bigquery-usage",
|
||||
"config": {"project_id": "sample-bigquery-project-1234"},
|
||||
"config": {"projects": ["sample-bigquery-project-1234"]},
|
||||
},
|
||||
"sink": {
|
||||
"type": "file",
|
||||
|
Loading…
x
Reference in New Issue
Block a user