diff --git a/metadata-ingestion/docs/sources/bigquery/bigquery.md b/metadata-ingestion/docs/sources/bigquery/bigquery.md index a140f8fc81..7231a110ca 100644 --- a/metadata-ingestion/docs/sources/bigquery/bigquery.md +++ b/metadata-ingestion/docs/sources/bigquery/bigquery.md @@ -89,5 +89,37 @@ Due to performance reasons, we only profile the latest partition for Partitioned You can set partition explicitly with `partition.partition_datetime` property if you want. (partition will be applied to all partitioned tables) ::: +### Working with multi-project GCP setups + +Sometimes you may have multiple GCP project with one only giving you view access rights and other project where you have view/modify rights. To deal with such setups you can use the `storage_project_id` setting. An example recipe looks like this + +```yaml +source: + type: "bigquery" + config: + project_id: compute-project-id # With view as well as modify rights + storage_project_id: acryl-staging # with view only rights + ...rest of fields +``` + +The GCP roles with which this setup has been tested are as follows +- Storage Project + - BigQuery Data Viewer + - BigQuery Metadata Viewer + - Logs Viewer + - Private Logs Viewer +- Compute Project + - BigQuery Admin + - BigQuery Data Editor + - BigQuery Job User + +If you are using `use_exported_bigquery_audit_metadata = True` and `use_v2_audit_metadata = False` then make sure you prefix the datasets in `bigquery_audit_metadata_datasets` with storage project id. + +:::note + +Bigquery usage has not been modified and tested with multi-project setting. Only `bigquery` plugin works with multi-project setup currently. + +:::note + ### Caveats - For Materialized views lineage is dependent on logs being retained. If your GCP logging is retained for 30 days (default) and 30 days have passed since the creation of the materialized view we won't be able to get lineage for them. \ No newline at end of file diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py b/metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py index 8aea67cc72..37163437e4 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py @@ -339,20 +339,24 @@ class BigQuerySource(SQLAlchemySource): self.partition_info: Dict[str, str] = dict() atexit.register(cleanup, config) - def get_db_name( - self, inspector: Inspector = None, for_sql_queries: bool = True - ) -> str: - """ - for_sql_queries - Used mainly for multi-project setups with different permissions - - should be set to True if this is to be used to run sql queries - - should be set to False if this is to inspect contents and not run sql queries - """ - if for_sql_queries and self.config.storage_project_id: + def get_multiproject_project_id( + self, inspector: Optional[Inspector] = None, run_on_compute: bool = False + ) -> Optional[str]: + if self.config.storage_project_id and (not run_on_compute): return self.config.storage_project_id elif self.config.project_id: return self.config.project_id else: - return self._get_project_id(inspector) + if inspector: + return self._get_project_id(inspector) + else: + return None + + def get_db_name(self, inspector: Inspector) -> str: + db_name = self.get_multiproject_project_id(inspector) + # db name can't be empty here as we pass in inpector to get_multiproject_project_id + assert db_name + return db_name def _compute_big_query_lineage(self) -> None: if not self.config.include_table_lineage: @@ -373,7 +377,7 @@ class BigQuerySource(SQLAlchemySource): logger.debug(f"lineage metadata is {self.lineage_metadata}") def _compute_bigquery_lineage_via_gcp_logging(self) -> None: - project_id = self.get_db_name() + project_id = self.get_multiproject_project_id() logger.info("Populating lineage info via GCP audit logs") try: _clients: List[GCPLoggingClient] = self._make_gcp_logging_client(project_id) @@ -397,7 +401,7 @@ class BigQuerySource(SQLAlchemySource): ) def _compute_bigquery_lineage_via_exported_bigquery_audit_metadata(self) -> None: - project_id = self.get_db_name(for_sql_queries=True) + project_id = self.get_multiproject_project_id(run_on_compute=True) logger.info("Populating lineage info via exported GCP audit logs") try: _client: BigQueryClient = BigQueryClient(project=project_id) @@ -660,30 +664,34 @@ class BigQuerySource(SQLAlchemySource): def is_table_partitioned( self, database: Optional[str], schema: str, table: str ) -> bool: - project_id: str + project_id: Optional[str] if database: project_id = database else: - engine = self._get_engine(for_run_sql=False) + engine = self._get_engine(run_on_compute=True) with engine.connect() as con: inspector = inspect(con) - project_id = self.get_db_name(inspector) + project_id = self.get_multiproject_project_id(inspector=inspector) + assert project_id return f"{project_id}.{schema}.{table}" in self.partition_info def get_latest_partition( self, schema: str, table: str ) -> Optional[BigQueryPartitionColumn]: logger.debug(f"get_latest_partition for {schema} and {table}") - engine = self._get_engine(for_run_sql=True) + engine = self._get_engine(run_on_compute=True) with engine.connect() as con: inspector = inspect(con) - project_id = self.get_db_name(inspector) + project_id = self.get_multiproject_project_id(inspector=inspector) + assert project_id if not self.is_table_partitioned( database=project_id, schema=schema, table=table ): return None + project_id = self.get_multiproject_project_id(inspector=inspector) + assert project_id sql = BQ_GET_LATEST_PARTITION_TEMPLATE.format( - project_id=self.get_db_name(inspector, for_sql_queries=True), + project_id=self.get_multiproject_project_id(inspector=inspector), schema=schema, table=table, ) @@ -709,7 +717,7 @@ class BigQuerySource(SQLAlchemySource): table_name, shard = self.get_shard_from_table(table) if shard: logger.debug(f"{table_name} is sharded and shard id is: {shard}") - engine = self._get_engine(for_run_sql=True) + engine = self._get_engine(run_on_compute=True) if f"{project_id}.{schema}.{table_name}" not in self.maximum_shard_ids: with engine.connect() as con: if table_name is not None: @@ -738,14 +746,15 @@ class BigQuerySource(SQLAlchemySource): else: return True - def _get_engine(self, for_run_sql: bool) -> Engine: - url = self.config.get_sql_alchemy_url(for_run_sql=for_run_sql) + def _get_engine(self, run_on_compute: bool = True) -> Engine: + url = self.config.get_sql_alchemy_url(run_on_compute=run_on_compute) logger.debug(f"sql_alchemy_url={url}") return create_engine(url, **self.config.options) def add_information_for_schema(self, inspector: Inspector, schema: str) -> None: - engine = self._get_engine(for_run_sql=True) - project_id = self.get_db_name(inspector) + engine = self._get_engine(run_on_compute=True) + project_id = self.get_multiproject_project_id(inspector=inspector) + assert project_id with engine.connect() as con: inspector = inspect(con) sql = f""" @@ -764,7 +773,8 @@ class BigQuerySource(SQLAlchemySource): self, inspector: Inspector, schema: str, table: str ) -> Dict[str, List[str]]: extra_tags: Dict[str, List[str]] = {} - project_id = self.get_db_name(inspector) + project_id = self.get_multiproject_project_id(inspector=inspector) + assert project_id partition_lookup_key = f"{project_id}.{schema}.{table}" if partition_lookup_key in self.partition_info: @@ -831,7 +841,7 @@ WHERE def get_profiler_instance(self, inspector: Inspector) -> "DatahubGEProfiler": logger.debug("Getting profiler instance from bigquery") - engine = self._get_engine(for_run_sql=True) + engine = self._get_engine(run_on_compute=True) with engine.connect() as conn: inspector = inspect(conn) @@ -996,13 +1006,16 @@ WHERE def prepare_profiler_args( self, + inspector: Inspector, schema: str, table: str, partition: Optional[str], custom_sql: Optional[str] = None, ) -> dict: + project_id = self._get_project_id(inspector=inspector) + assert project_id return dict( - schema=self.get_db_name(for_sql_queries=True), + schema=project_id, table=f"{schema}.{table}", partition=partition, custom_sql=custom_sql, diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py index 902e89bc05..f950abd315 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py @@ -1436,6 +1436,7 @@ class SQLAlchemySource(StatefulIngestionSourceBase): yield GEProfilerRequest( pretty_name=dataset_name, batch_kwargs=self.prepare_profiler_args( + inspector=inspector, schema=schema, table=table, partition=partition, @@ -1478,6 +1479,7 @@ class SQLAlchemySource(StatefulIngestionSourceBase): def prepare_profiler_args( self, + inspector: Inspector, schema: str, table: str, partition: Optional[str], diff --git a/metadata-ingestion/src/datahub/ingestion/source/usage/bigquery_usage.py b/metadata-ingestion/src/datahub/ingestion/source/usage/bigquery_usage.py index 5f49d06bb0..243228cc6e 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/usage/bigquery_usage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/usage/bigquery_usage.py @@ -503,7 +503,7 @@ class QueryEvent: if "queryOutputRowCount" in job["jobStatistics"] and job["jobStatistics"]["queryOutputRowCount"] else None, - statementType=job_query_conf["statementType"], + statementType=job_query_conf.get("statementType", "UNKNOWN"), ) # destinationTable raw_dest_table = job_query_conf.get("destinationTable") @@ -580,7 +580,7 @@ class QueryEvent: numAffectedRows=int(query_stats["outputRowCount"]) if query_stats.get("outputRowCount") else None, - statementType=query_config["statementType"], + statementType=query_config.get("statementType", "UNKNOWN"), ) # jobName query_event.job_name = job.get("jobName") @@ -642,7 +642,7 @@ class QueryEvent: numAffectedRows=int(query_stats["outputRowCount"]) if "outputRowCount" in query_stats and query_stats["outputRowCount"] else None, - statementType=query_config["statementType"], + statementType=query_config.get("statementType", "UNKNOWN"), ) query_event.job_name = job.get("jobName") # destinationTable diff --git a/metadata-ingestion/src/datahub/ingestion/source_config/sql/bigquery.py b/metadata-ingestion/src/datahub/ingestion/source_config/sql/bigquery.py index 67f6e608fa..9891698444 100644 --- a/metadata-ingestion/src/datahub/ingestion/source_config/sql/bigquery.py +++ b/metadata-ingestion/src/datahub/ingestion/source_config/sql/bigquery.py @@ -75,8 +75,8 @@ class BigQueryConfig(BigQueryBaseConfig, BaseTimeWindowConfig, SQLAlchemyConfig) ) os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = self._credentials_path - def get_sql_alchemy_url(self, for_run_sql: bool = False) -> str: - if (not for_run_sql) and self.storage_project_id: + def get_sql_alchemy_url(self, run_on_compute: bool = False) -> str: + if self.storage_project_id and not run_on_compute: return f"{self.scheme}://{self.storage_project_id}" if self.project_id: return f"{self.scheme}://{self.project_id}" @@ -100,6 +100,7 @@ class BigQueryConfig(BigQueryBaseConfig, BaseTimeWindowConfig, SQLAlchemyConfig) if ( values.get("storage_project_id") and profiling is not None + and profiling.enabled and not profiling.bigquery_temp_table_schema ): raise ConfigurationError(