diff --git a/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py b/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py index cac5e7dfe9..ba5e60a335 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py +++ b/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py @@ -677,7 +677,10 @@ class DatahubGEProfiler: yield GEContext(data_context, datasource_name) def generate_profiles( - self, requests: List[GEProfilerRequest], max_workers: int + self, + requests: List[GEProfilerRequest], + max_workers: int, + platform: Optional[str] = None, ) -> Iterable[Tuple[GEProfilerRequest, Optional[DatasetProfileClass]]]: with PerfTimer() as timer, concurrent.futures.ThreadPoolExecutor( max_workers=max_workers @@ -704,6 +707,7 @@ class DatahubGEProfiler: self._generate_profile_from_request, query_combiner, request, + platform=platform, ) for request in requests ] @@ -751,10 +755,12 @@ class DatahubGEProfiler: self, query_combiner: SQLAlchemyQueryCombiner, request: GEProfilerRequest, + platform: Optional[str] = None, ) -> Tuple[GEProfilerRequest, Optional[DatasetProfileClass]]: return request, self._generate_single_profile( query_combiner=query_combiner, pretty_name=request.pretty_name, + platform=platform, **request.batch_kwargs, ) @@ -781,6 +787,7 @@ class DatahubGEProfiler: table: str = None, partition: Optional[str] = None, custom_sql: Optional[str] = None, + platform: Optional[str] = None, **kwargs: Any, ) -> Optional[DatasetProfileClass]: bigquery_temp_table: Optional[str] = None @@ -820,6 +827,7 @@ class DatahubGEProfiler: ge_context, ge_config, pretty_name=pretty_name, + platform=platform, ) profile = _SingleDatasetProfiler( @@ -852,6 +860,7 @@ class DatahubGEProfiler: ge_context: GEContext, batch_kwargs: dict, pretty_name: str, + platform: Optional[str] = None, ) -> Dataset: # This is effectively emulating the beginning of the process that # is followed by GE itself. In particular, we simply want to construct @@ -878,4 +887,12 @@ class DatahubGEProfiler: **batch_kwargs, }, ) + if platform is not None and platform == "bigquery": + name_parts = pretty_name.split(".") + if len(name_parts) != 3: + logger.error( + f"Unexpected {pretty_name} while profiling. Should have 3 parts but has {len(name_parts)} parts." + ) + else: + batch.engine.dialect.dataset_id = name_parts[1] return batch diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py index 59d226b5f0..8f540884ef 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py @@ -730,7 +730,9 @@ class SQLAlchemySource(StatefulIngestionSourceBase): ) if profiler and profile_requests: - yield from self.loop_profiler(profile_requests, profiler) + yield from self.loop_profiler( + profile_requests, profiler, platform=self.platform + ) if self.is_stateful_ingestion_configured(): # Clean up stale entities. @@ -1329,10 +1331,13 @@ class SQLAlchemySource(StatefulIngestionSourceBase): ) def loop_profiler( - self, profile_requests: List["GEProfilerRequest"], profiler: "DatahubGEProfiler" + self, + profile_requests: List["GEProfilerRequest"], + profiler: "DatahubGEProfiler", + platform: Optional[str] = None, ) -> Iterable[MetadataWorkUnit]: for request, profile in profiler.generate_profiles( - profile_requests, self.config.profiling.max_workers + profile_requests, self.config.profiling.max_workers, platform=platform ): if profile is None: continue