diff --git a/metadata-ingestion/examples/recipes/bigquery_to_datahub.yml b/metadata-ingestion/examples/recipes/bigquery_to_datahub.yml index 10941e74b6..a65f2a34d3 100644 --- a/metadata-ingestion/examples/recipes/bigquery_to_datahub.yml +++ b/metadata-ingestion/examples/recipes/bigquery_to_datahub.yml @@ -40,6 +40,7 @@ source: # - "schema.table.column" # deny: # - "*.*.*" + #lineage_client_project_id: project-id-1234567 ## see https://datahubproject.io/docs/metadata-ingestion/sink_docs/datahub for complete documentation sink: diff --git a/metadata-ingestion/source_docs/bigquery.md b/metadata-ingestion/source_docs/bigquery.md index f709cba602..fab1a527e6 100644 --- a/metadata-ingestion/source_docs/bigquery.md +++ b/metadata-ingestion/source_docs/bigquery.md @@ -139,6 +139,7 @@ As a SQL-based service, the Athena integration is also supported by our SQL prof | `domain.domain_key.allow` | | | List of regex patterns for tables/schemas to set domain_key domain key (domain_key can be any string like `sales`. There can be multiple domain key specified. | | `domain.domain_key.deny` | | | List of regex patterns for tables/schemas to not assign domain_key. There can be multiple domain key specified. | | `domain.domain_key.ignoreCase` | | `True` | Whether to ignore case sensitivity during pattern matching.There can be multiple domain key specified. | +| `lineage_client_project_id` | | None | The project to use when creating the BigQuery Client. If left empty, the required `project_id` will be used. | The following parameters are only relevant if include_table_lineage is set to true: diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py b/metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py index 5698d4d2e5..ed624d3109 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py @@ -236,6 +236,7 @@ def create_credential_temp_file(credential: BigQueryCredential) -> str: class BigQueryConfig(BaseTimeWindowConfig, SQLAlchemyConfig): scheme: str = "bigquery" project_id: Optional[str] = None + lineage_client_project_id: Optional[str] = None log_page_size: Optional[pydantic.PositiveInt] = 1000 credential: Optional[BigQueryCredential] @@ -304,20 +305,29 @@ class BigQuerySource(SQLAlchemySource): def _compute_big_query_lineage(self) -> None: if self.config.include_table_lineage: + lineage_client_project_id = self._get_lineage_client_project_id() if self.config.use_exported_bigquery_audit_metadata: - self._compute_bigquery_lineage_via_exported_bigquery_audit_metadata() + self._compute_bigquery_lineage_via_exported_bigquery_audit_metadata( + lineage_client_project_id + ) else: - self._compute_bigquery_lineage_via_gcp_logging() + self._compute_bigquery_lineage_via_gcp_logging( + lineage_client_project_id + ) if self.lineage_metadata is not None: logger.info( f"Built lineage map containing {len(self.lineage_metadata)} entries." ) - def _compute_bigquery_lineage_via_gcp_logging(self) -> None: + def _compute_bigquery_lineage_via_gcp_logging( + self, lineage_client_project_id: Optional[str] + ) -> None: logger.info("Populating lineage info via GCP audit logs") try: - _clients: List[GCPLoggingClient] = self._make_bigquery_client() + _clients: List[GCPLoggingClient] = self._make_bigquery_client( + lineage_client_project_id + ) log_entries: Iterable[AuditLogEntry] = self._get_bigquery_log_entries( _clients ) @@ -331,10 +341,12 @@ class BigQuerySource(SQLAlchemySource): e, ) - def _compute_bigquery_lineage_via_exported_bigquery_audit_metadata(self) -> None: + def _compute_bigquery_lineage_via_exported_bigquery_audit_metadata( + self, lineage_client_project_id: Optional[str] + ) -> None: logger.info("Populating lineage info via exported GCP audit logs") try: - _client: BigQueryClient = BigQueryClient(project=self.config.project_id) + _client: BigQueryClient = BigQueryClient(project=lineage_client_project_id) exported_bigquery_audit_metadata: Iterable[ BigQueryAuditMetadata ] = self._get_exported_bigquery_audit_metadata(_client) @@ -350,17 +362,28 @@ class BigQuerySource(SQLAlchemySource): e, ) - def _make_bigquery_client(self) -> List[GCPLoggingClient]: + def _make_bigquery_client( + self, lineage_client_project_id: Optional[str] + ) -> List[GCPLoggingClient]: # See https://github.com/googleapis/google-cloud-python/issues/2674 for # why we disable gRPC here. client_options = self.config.extra_client_options.copy() client_options["_use_grpc"] = False - project_id = self.config.project_id - if project_id is not None: - return [GCPLoggingClient(**client_options, project=project_id)] + if lineage_client_project_id is not None: + return [ + GCPLoggingClient(**client_options, project=lineage_client_project_id) + ] else: return [GCPLoggingClient(**client_options)] + def _get_lineage_client_project_id(self) -> Optional[str]: + project_id: Optional[str] = ( + self.config.lineage_client_project_id + if self.config.lineage_client_project_id + else self.config.project_id + ) + return project_id + def _get_bigquery_log_entries( self, clients: List[GCPLoggingClient] ) -> Iterable[AuditLogEntry]: