fix(bigquery): add dataset_id for bigquery (#4932)

This commit is contained in:
Aseem Bansal 2022-05-19 10:43:06 +05:30 committed by GitHub
parent 1de2e2c85f
commit 2bb2c5243c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 26 additions and 4 deletions

View File

@ -677,7 +677,10 @@ class DatahubGEProfiler:
yield GEContext(data_context, datasource_name)
def generate_profiles(
self, requests: List[GEProfilerRequest], max_workers: int
self,
requests: List[GEProfilerRequest],
max_workers: int,
platform: Optional[str] = None,
) -> Iterable[Tuple[GEProfilerRequest, Optional[DatasetProfileClass]]]:
with PerfTimer() as timer, concurrent.futures.ThreadPoolExecutor(
max_workers=max_workers
@ -704,6 +707,7 @@ class DatahubGEProfiler:
self._generate_profile_from_request,
query_combiner,
request,
platform=platform,
)
for request in requests
]
@ -751,10 +755,12 @@ class DatahubGEProfiler:
self,
query_combiner: SQLAlchemyQueryCombiner,
request: GEProfilerRequest,
platform: Optional[str] = None,
) -> Tuple[GEProfilerRequest, Optional[DatasetProfileClass]]:
return request, self._generate_single_profile(
query_combiner=query_combiner,
pretty_name=request.pretty_name,
platform=platform,
**request.batch_kwargs,
)
@ -781,6 +787,7 @@ class DatahubGEProfiler:
table: str = None,
partition: Optional[str] = None,
custom_sql: Optional[str] = None,
platform: Optional[str] = None,
**kwargs: Any,
) -> Optional[DatasetProfileClass]:
bigquery_temp_table: Optional[str] = None
@ -820,6 +827,7 @@ class DatahubGEProfiler:
ge_context,
ge_config,
pretty_name=pretty_name,
platform=platform,
)
profile = _SingleDatasetProfiler(
@ -852,6 +860,7 @@ class DatahubGEProfiler:
ge_context: GEContext,
batch_kwargs: dict,
pretty_name: str,
platform: Optional[str] = None,
) -> Dataset:
# This is effectively emulating the beginning of the process that
# is followed by GE itself. In particular, we simply want to construct
@ -878,4 +887,12 @@ class DatahubGEProfiler:
**batch_kwargs,
},
)
if platform is not None and platform == "bigquery":
name_parts = pretty_name.split(".")
if len(name_parts) != 3:
logger.error(
f"Unexpected {pretty_name} while profiling. Should have 3 parts but has {len(name_parts)} parts."
)
else:
batch.engine.dialect.dataset_id = name_parts[1]
return batch

View File

@ -730,7 +730,9 @@ class SQLAlchemySource(StatefulIngestionSourceBase):
)
if profiler and profile_requests:
yield from self.loop_profiler(profile_requests, profiler)
yield from self.loop_profiler(
profile_requests, profiler, platform=self.platform
)
if self.is_stateful_ingestion_configured():
# Clean up stale entities.
@ -1329,10 +1331,13 @@ class SQLAlchemySource(StatefulIngestionSourceBase):
)
def loop_profiler(
self, profile_requests: List["GEProfilerRequest"], profiler: "DatahubGEProfiler"
self,
profile_requests: List["GEProfilerRequest"],
profiler: "DatahubGEProfiler",
platform: Optional[str] = None,
) -> Iterable[MetadataWorkUnit]:
for request, profile in profiler.generate_profiles(
profile_requests, self.config.profiling.max_workers
profile_requests, self.config.profiling.max_workers, platform=platform
):
if profile is None:
continue