fix(ingest): bigquery - Fix for bigquery error when there was no bigquery catalog specified (#5303)

This commit is contained in:
Tamas Nemeth 2022-07-01 17:47:07 +02:00 committed by GitHub
parent 0fc8a65cda
commit f08c3f784f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 79 additions and 31 deletions

View File

@ -89,5 +89,37 @@ Due to performance reasons, we only profile the latest partition for Partitioned
You can set partition explicitly with `partition.partition_datetime` property if you want. (partition will be applied to all partitioned tables) You can set partition explicitly with `partition.partition_datetime` property if you want. (partition will be applied to all partitioned tables)
::: :::
### Working with multi-project GCP setups
Sometimes you may have multiple GCP project with one only giving you view access rights and other project where you have view/modify rights. To deal with such setups you can use the `storage_project_id` setting. An example recipe looks like this
```yaml
source:
type: "bigquery"
config:
project_id: compute-project-id # With view as well as modify rights
storage_project_id: acryl-staging # with view only rights
...rest of fields
```
The GCP roles with which this setup has been tested are as follows
- Storage Project
- BigQuery Data Viewer
- BigQuery Metadata Viewer
- Logs Viewer
- Private Logs Viewer
- Compute Project
- BigQuery Admin
- BigQuery Data Editor
- BigQuery Job User
If you are using `use_exported_bigquery_audit_metadata = True` and `use_v2_audit_metadata = False` then make sure you prefix the datasets in `bigquery_audit_metadata_datasets` with storage project id.
:::note
Bigquery usage has not been modified and tested with multi-project setting. Only `bigquery` plugin works with multi-project setup currently.
:::note
### Caveats ### Caveats
- For Materialized views lineage is dependent on logs being retained. If your GCP logging is retained for 30 days (default) and 30 days have passed since the creation of the materialized view we won't be able to get lineage for them. - For Materialized views lineage is dependent on logs being retained. If your GCP logging is retained for 30 days (default) and 30 days have passed since the creation of the materialized view we won't be able to get lineage for them.

View File

@ -339,20 +339,24 @@ class BigQuerySource(SQLAlchemySource):
self.partition_info: Dict[str, str] = dict() self.partition_info: Dict[str, str] = dict()
atexit.register(cleanup, config) atexit.register(cleanup, config)
def get_db_name( def get_multiproject_project_id(
self, inspector: Inspector = None, for_sql_queries: bool = True self, inspector: Optional[Inspector] = None, run_on_compute: bool = False
) -> str: ) -> Optional[str]:
""" if self.config.storage_project_id and (not run_on_compute):
for_sql_queries - Used mainly for multi-project setups with different permissions
- should be set to True if this is to be used to run sql queries
- should be set to False if this is to inspect contents and not run sql queries
"""
if for_sql_queries and self.config.storage_project_id:
return self.config.storage_project_id return self.config.storage_project_id
elif self.config.project_id: elif self.config.project_id:
return self.config.project_id return self.config.project_id
else: else:
return self._get_project_id(inspector) if inspector:
return self._get_project_id(inspector)
else:
return None
def get_db_name(self, inspector: Inspector) -> str:
db_name = self.get_multiproject_project_id(inspector)
# db name can't be empty here as we pass in inpector to get_multiproject_project_id
assert db_name
return db_name
def _compute_big_query_lineage(self) -> None: def _compute_big_query_lineage(self) -> None:
if not self.config.include_table_lineage: if not self.config.include_table_lineage:
@ -373,7 +377,7 @@ class BigQuerySource(SQLAlchemySource):
logger.debug(f"lineage metadata is {self.lineage_metadata}") logger.debug(f"lineage metadata is {self.lineage_metadata}")
def _compute_bigquery_lineage_via_gcp_logging(self) -> None: def _compute_bigquery_lineage_via_gcp_logging(self) -> None:
project_id = self.get_db_name() project_id = self.get_multiproject_project_id()
logger.info("Populating lineage info via GCP audit logs") logger.info("Populating lineage info via GCP audit logs")
try: try:
_clients: List[GCPLoggingClient] = self._make_gcp_logging_client(project_id) _clients: List[GCPLoggingClient] = self._make_gcp_logging_client(project_id)
@ -397,7 +401,7 @@ class BigQuerySource(SQLAlchemySource):
) )
def _compute_bigquery_lineage_via_exported_bigquery_audit_metadata(self) -> None: def _compute_bigquery_lineage_via_exported_bigquery_audit_metadata(self) -> None:
project_id = self.get_db_name(for_sql_queries=True) project_id = self.get_multiproject_project_id(run_on_compute=True)
logger.info("Populating lineage info via exported GCP audit logs") logger.info("Populating lineage info via exported GCP audit logs")
try: try:
_client: BigQueryClient = BigQueryClient(project=project_id) _client: BigQueryClient = BigQueryClient(project=project_id)
@ -660,30 +664,34 @@ class BigQuerySource(SQLAlchemySource):
def is_table_partitioned( def is_table_partitioned(
self, database: Optional[str], schema: str, table: str self, database: Optional[str], schema: str, table: str
) -> bool: ) -> bool:
project_id: str project_id: Optional[str]
if database: if database:
project_id = database project_id = database
else: else:
engine = self._get_engine(for_run_sql=False) engine = self._get_engine(run_on_compute=True)
with engine.connect() as con: with engine.connect() as con:
inspector = inspect(con) inspector = inspect(con)
project_id = self.get_db_name(inspector) project_id = self.get_multiproject_project_id(inspector=inspector)
assert project_id
return f"{project_id}.{schema}.{table}" in self.partition_info return f"{project_id}.{schema}.{table}" in self.partition_info
def get_latest_partition( def get_latest_partition(
self, schema: str, table: str self, schema: str, table: str
) -> Optional[BigQueryPartitionColumn]: ) -> Optional[BigQueryPartitionColumn]:
logger.debug(f"get_latest_partition for {schema} and {table}") logger.debug(f"get_latest_partition for {schema} and {table}")
engine = self._get_engine(for_run_sql=True) engine = self._get_engine(run_on_compute=True)
with engine.connect() as con: with engine.connect() as con:
inspector = inspect(con) inspector = inspect(con)
project_id = self.get_db_name(inspector) project_id = self.get_multiproject_project_id(inspector=inspector)
assert project_id
if not self.is_table_partitioned( if not self.is_table_partitioned(
database=project_id, schema=schema, table=table database=project_id, schema=schema, table=table
): ):
return None return None
project_id = self.get_multiproject_project_id(inspector=inspector)
assert project_id
sql = BQ_GET_LATEST_PARTITION_TEMPLATE.format( sql = BQ_GET_LATEST_PARTITION_TEMPLATE.format(
project_id=self.get_db_name(inspector, for_sql_queries=True), project_id=self.get_multiproject_project_id(inspector=inspector),
schema=schema, schema=schema,
table=table, table=table,
) )
@ -709,7 +717,7 @@ class BigQuerySource(SQLAlchemySource):
table_name, shard = self.get_shard_from_table(table) table_name, shard = self.get_shard_from_table(table)
if shard: if shard:
logger.debug(f"{table_name} is sharded and shard id is: {shard}") logger.debug(f"{table_name} is sharded and shard id is: {shard}")
engine = self._get_engine(for_run_sql=True) engine = self._get_engine(run_on_compute=True)
if f"{project_id}.{schema}.{table_name}" not in self.maximum_shard_ids: if f"{project_id}.{schema}.{table_name}" not in self.maximum_shard_ids:
with engine.connect() as con: with engine.connect() as con:
if table_name is not None: if table_name is not None:
@ -738,14 +746,15 @@ class BigQuerySource(SQLAlchemySource):
else: else:
return True return True
def _get_engine(self, for_run_sql: bool) -> Engine: def _get_engine(self, run_on_compute: bool = True) -> Engine:
url = self.config.get_sql_alchemy_url(for_run_sql=for_run_sql) url = self.config.get_sql_alchemy_url(run_on_compute=run_on_compute)
logger.debug(f"sql_alchemy_url={url}") logger.debug(f"sql_alchemy_url={url}")
return create_engine(url, **self.config.options) return create_engine(url, **self.config.options)
def add_information_for_schema(self, inspector: Inspector, schema: str) -> None: def add_information_for_schema(self, inspector: Inspector, schema: str) -> None:
engine = self._get_engine(for_run_sql=True) engine = self._get_engine(run_on_compute=True)
project_id = self.get_db_name(inspector) project_id = self.get_multiproject_project_id(inspector=inspector)
assert project_id
with engine.connect() as con: with engine.connect() as con:
inspector = inspect(con) inspector = inspect(con)
sql = f""" sql = f"""
@ -764,7 +773,8 @@ class BigQuerySource(SQLAlchemySource):
self, inspector: Inspector, schema: str, table: str self, inspector: Inspector, schema: str, table: str
) -> Dict[str, List[str]]: ) -> Dict[str, List[str]]:
extra_tags: Dict[str, List[str]] = {} extra_tags: Dict[str, List[str]] = {}
project_id = self.get_db_name(inspector) project_id = self.get_multiproject_project_id(inspector=inspector)
assert project_id
partition_lookup_key = f"{project_id}.{schema}.{table}" partition_lookup_key = f"{project_id}.{schema}.{table}"
if partition_lookup_key in self.partition_info: if partition_lookup_key in self.partition_info:
@ -831,7 +841,7 @@ WHERE
def get_profiler_instance(self, inspector: Inspector) -> "DatahubGEProfiler": def get_profiler_instance(self, inspector: Inspector) -> "DatahubGEProfiler":
logger.debug("Getting profiler instance from bigquery") logger.debug("Getting profiler instance from bigquery")
engine = self._get_engine(for_run_sql=True) engine = self._get_engine(run_on_compute=True)
with engine.connect() as conn: with engine.connect() as conn:
inspector = inspect(conn) inspector = inspect(conn)
@ -996,13 +1006,16 @@ WHERE
def prepare_profiler_args( def prepare_profiler_args(
self, self,
inspector: Inspector,
schema: str, schema: str,
table: str, table: str,
partition: Optional[str], partition: Optional[str],
custom_sql: Optional[str] = None, custom_sql: Optional[str] = None,
) -> dict: ) -> dict:
project_id = self._get_project_id(inspector=inspector)
assert project_id
return dict( return dict(
schema=self.get_db_name(for_sql_queries=True), schema=project_id,
table=f"{schema}.{table}", table=f"{schema}.{table}",
partition=partition, partition=partition,
custom_sql=custom_sql, custom_sql=custom_sql,

View File

@ -1436,6 +1436,7 @@ class SQLAlchemySource(StatefulIngestionSourceBase):
yield GEProfilerRequest( yield GEProfilerRequest(
pretty_name=dataset_name, pretty_name=dataset_name,
batch_kwargs=self.prepare_profiler_args( batch_kwargs=self.prepare_profiler_args(
inspector=inspector,
schema=schema, schema=schema,
table=table, table=table,
partition=partition, partition=partition,
@ -1478,6 +1479,7 @@ class SQLAlchemySource(StatefulIngestionSourceBase):
def prepare_profiler_args( def prepare_profiler_args(
self, self,
inspector: Inspector,
schema: str, schema: str,
table: str, table: str,
partition: Optional[str], partition: Optional[str],

View File

@ -503,7 +503,7 @@ class QueryEvent:
if "queryOutputRowCount" in job["jobStatistics"] if "queryOutputRowCount" in job["jobStatistics"]
and job["jobStatistics"]["queryOutputRowCount"] and job["jobStatistics"]["queryOutputRowCount"]
else None, else None,
statementType=job_query_conf["statementType"], statementType=job_query_conf.get("statementType", "UNKNOWN"),
) )
# destinationTable # destinationTable
raw_dest_table = job_query_conf.get("destinationTable") raw_dest_table = job_query_conf.get("destinationTable")
@ -580,7 +580,7 @@ class QueryEvent:
numAffectedRows=int(query_stats["outputRowCount"]) numAffectedRows=int(query_stats["outputRowCount"])
if query_stats.get("outputRowCount") if query_stats.get("outputRowCount")
else None, else None,
statementType=query_config["statementType"], statementType=query_config.get("statementType", "UNKNOWN"),
) )
# jobName # jobName
query_event.job_name = job.get("jobName") query_event.job_name = job.get("jobName")
@ -642,7 +642,7 @@ class QueryEvent:
numAffectedRows=int(query_stats["outputRowCount"]) numAffectedRows=int(query_stats["outputRowCount"])
if "outputRowCount" in query_stats and query_stats["outputRowCount"] if "outputRowCount" in query_stats and query_stats["outputRowCount"]
else None, else None,
statementType=query_config["statementType"], statementType=query_config.get("statementType", "UNKNOWN"),
) )
query_event.job_name = job.get("jobName") query_event.job_name = job.get("jobName")
# destinationTable # destinationTable

View File

@ -75,8 +75,8 @@ class BigQueryConfig(BigQueryBaseConfig, BaseTimeWindowConfig, SQLAlchemyConfig)
) )
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = self._credentials_path os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = self._credentials_path
def get_sql_alchemy_url(self, for_run_sql: bool = False) -> str: def get_sql_alchemy_url(self, run_on_compute: bool = False) -> str:
if (not for_run_sql) and self.storage_project_id: if self.storage_project_id and not run_on_compute:
return f"{self.scheme}://{self.storage_project_id}" return f"{self.scheme}://{self.storage_project_id}"
if self.project_id: if self.project_id:
return f"{self.scheme}://{self.project_id}" return f"{self.scheme}://{self.project_id}"
@ -100,6 +100,7 @@ class BigQueryConfig(BigQueryBaseConfig, BaseTimeWindowConfig, SQLAlchemyConfig)
if ( if (
values.get("storage_project_id") values.get("storage_project_id")
and profiling is not None and profiling is not None
and profiling.enabled
and not profiling.bigquery_temp_table_schema and not profiling.bigquery_temp_table_schema
): ):
raise ConfigurationError( raise ConfigurationError(