diff --git a/metadata-ingestion/README.md b/metadata-ingestion/README.md index 3f3c08919a..75ea6bddf4 100644 --- a/metadata-ingestion/README.md +++ b/metadata-ingestion/README.md @@ -798,7 +798,7 @@ Note: when `load_schemas` is False, models that use [identifiers](https://docs.g - Fetch a list of tables and columns accessed - Aggregate these statistics into buckets, by day or hour granularity -Note: the client must have one of the following OAuth scopes: +Note: the client must have one of the following OAuth scopes, and should be authorized on all projects you'd like to ingest usage stats from. - https://www.googleapis.com/auth/logging.read - https://www.googleapis.com/auth/logging.admin @@ -809,7 +809,9 @@ Note: the client must have one of the following OAuth scopes: source: type: bigquery-usage config: - project_id: project # optional - can autodetect from environment + projects: # optional - can autodetect a single project from the environment + - project_id_1 + - project_id_2 options: # See https://googleapis.dev/python/logging/latest/client.html for details. credentials: ~ # optional - see docs diff --git a/metadata-ingestion/setup.cfg b/metadata-ingestion/setup.cfg index 3ffffe059d..4777b18c2b 100644 --- a/metadata-ingestion/setup.cfg +++ b/metadata-ingestion/setup.cfg @@ -60,6 +60,8 @@ exclude_lines = pragma: no cover @abstract if TYPE_CHECKING: +include = + src/* omit = # omit codegen src/datahub/metadata/* diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_usage.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_usage.py index fc417b7b03..d8e2d8c2e3 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_usage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_usage.py @@ -1,5 +1,6 @@ import collections import dataclasses +import heapq import logging import re from dataclasses import dataclass @@ -214,12 +215,21 @@ class QueryEvent: class BigQueryUsageConfig(BaseUsageConfig): - project_id: Optional[str] = None + projects: Optional[List[str]] = None + project_id: Optional[str] = None # deprecated in favor of `projects` extra_client_options: dict = {} env: str = builder.DEFAULT_ENV query_log_delay: Optional[pydantic.PositiveInt] = None + @pydantic.validator("project_id") + def note_project_id_deprecation(cls, v, values, **kwargs): + logger.warning( + "bigquery-usage project_id option is deprecated; use projects instead" + ) + values["projects"] = [v] + return None + @dataclass class BigQueryUsageSourceReport(SourceReport): @@ -233,28 +243,19 @@ class BigQueryUsageSource(Source): config: BigQueryUsageConfig report: BigQueryUsageSourceReport - client: GCPLoggingClient - def __init__(self, config: BigQueryUsageConfig, ctx: PipelineContext): super().__init__(ctx) self.config = config self.report = BigQueryUsageSourceReport() - client_options = self.config.extra_client_options.copy() - if self.config.project_id is not None: - client_options["project"] = self.config.project_id - - # See https://github.com/googleapis/google-cloud-python/issues/2674 for - # why we disable gRPC here. - self.client = GCPLoggingClient(**client_options, _use_grpc=False) - @classmethod def create(cls, config_dict: dict, ctx: PipelineContext) -> "BigQueryUsageSource": config = BigQueryUsageConfig.parse_obj(config_dict) return cls(config, ctx) def get_workunits(self) -> Iterable[UsageStatsWorkUnit]: - bigquery_log_entries = self._get_bigquery_log_entries() + clients = self._make_bigquery_clients() + bigquery_log_entries = self._get_bigquery_log_entries(clients) parsed_events = self._parse_bigquery_log_entries(bigquery_log_entries) hydrated_read_events = self._join_events_by_job_id(parsed_events) aggregated_info = self._aggregate_enriched_read_events(hydrated_read_events) @@ -265,15 +266,41 @@ class BigQueryUsageSource(Source): self.report.report_workunit(wu) yield wu - def _get_bigquery_log_entries(self) -> Iterable[AuditLogEntry]: + def _make_bigquery_clients(self) -> List[GCPLoggingClient]: + # See https://github.com/googleapis/google-cloud-python/issues/2674 for + # why we disable gRPC here. + client_options = self.config.extra_client_options.copy() + client_options["_use_grpc"] = False + if self.config.projects is None: + return [ + GCPLoggingClient(**client_options), + ] + else: + return [ + GCPLoggingClient(**client_options, project=project_id) + for project_id in self.config.projects + ] + + def _get_bigquery_log_entries( + self, clients: List[GCPLoggingClient] + ) -> Iterable[AuditLogEntry]: filter = BQ_FILTER_RULE_TEMPLATE.format( start_time=self.config.start_time.strftime(BQ_DATETIME_FORMAT), end_time=self.config.end_time.strftime(BQ_DATETIME_FORMAT), ) + def get_entry_timestamp(entry: AuditLogEntry) -> datetime: + return entry.timestamp + entry: AuditLogEntry for i, entry in enumerate( - self.client.list_entries(filter_=filter, page_size=GCP_LOGGING_PAGE_SIZE) + heapq.merge( + *( + client.list_entries(filter_=filter, page_size=GCP_LOGGING_PAGE_SIZE) + for client in clients + ), + key=get_entry_timestamp, + ) ): if i == 0: logger.debug("starting log load from BigQuery") diff --git a/metadata-ingestion/tests/integration/bigquery-usage/test_bigquery_usage.py b/metadata-ingestion/tests/integration/bigquery-usage/test_bigquery_usage.py index f361b46bf9..b34e36ee5d 100644 --- a/metadata-ingestion/tests/integration/bigquery-usage/test_bigquery_usage.py +++ b/metadata-ingestion/tests/integration/bigquery-usage/test_bigquery_usage.py @@ -15,7 +15,7 @@ from tests.test_helpers import mce_helpers WRITE_REFERENCE_FILE = False -def test_config_time_defaults(): +def test_bq_usage_config(): config = BigQueryUsageConfig.parse_obj( dict( project_id="sample-bigquery-project-name-1234", @@ -23,6 +23,7 @@ def test_config_time_defaults(): ) ) assert (config.end_time - config.start_time) == timedelta(hours=1) + assert config.projects == ["sample-bigquery-project-name-1234"] def test_bq_usage_source(pytestconfig, tmp_path): @@ -36,12 +37,16 @@ def test_bq_usage_source(pytestconfig, tmp_path): if WRITE_REFERENCE_FILE: source = BigQueryUsageSource.create( dict( - project_id="harshal-playground-306419", + projects=[ + "harshal-playground-306419", + ], start_time=datetime.now(tz=timezone.utc) - timedelta(days=25), ), PipelineContext(run_id="bq-usage-test"), ) - entries = list(source._get_bigquery_log_entries()) + entries = list( + source._get_bigquery_log_entries(source._make_bigquery_clients()) + ) entries = [entry._replace(logger=None) for entry in entries] log_entries = jsonpickle.encode(entries, indent=4) @@ -62,7 +67,7 @@ def test_bq_usage_source(pytestconfig, tmp_path): "run_id": "test-bigquery-usage", "source": { "type": "bigquery-usage", - "config": {"project_id": "sample-bigquery-project-1234"}, + "config": {"projects": ["sample-bigquery-project-1234"]}, }, "sink": { "type": "file",