fix(ingest): bigquery: multiproject profiling fix (#5474)

This commit is contained in:
Tamas Nemeth 2022-07-23 18:44:46 +02:00 committed by GitHub
parent f8697ba54f
commit cb05159474
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -493,7 +493,10 @@ class BigQuerySource(SQLAlchemySource):
)
_client: BigQueryClient = BigQueryClient(project=exec_project_id)
full_schema_name = f"{storage_project_id}.{schema}"
# if schema contains a bare dataset name, then add a project_id o/w dont modify it
full_schema_name = (
f"{storage_project_id}.{schema}" if len(schema.split(".")) == 1 else schema
)
# Reading all tables' metadata to report
all_tables = _client.query(self.get_all_schema_tables_query(full_schema_name))
report_tables: List[str] = [
@ -503,7 +506,7 @@ class BigQuerySource(SQLAlchemySource):
report_tables.append(
f"{table_row.table_id}, {table_row.size_bytes}, {table_row.last_modified_time}, {table_row.row_count}"
)
report_key = f"{self._get_project_id(inspector)}.{full_schema_name}"
report_key = f"{full_schema_name}"
self.report.table_metadata[report_key] = report_tables
query = self.generate_profile_candidate_query(threshold_time, full_schema_name)
@ -516,13 +519,7 @@ class BigQuerySource(SQLAlchemySource):
query_job = _client.query(query)
_profile_candidates = []
for row in query_job:
_profile_candidates.append(
self.get_identifier(
schema=full_schema_name,
entity=row.table_id,
inspector=inspector,
)
)
_profile_candidates.append(f"{full_schema_name}.{row.table_id}")
logger.debug(
f"Generated profiling candidates for {schema}: {_profile_candidates}"
)