diff --git a/metadata-ingestion/src/datahub/ingestion/source/datahub_custom_ge_profiler.py b/metadata-ingestion/src/datahub/ingestion/source/datahub_custom_ge_profiler.py index 14f99a0205..b0c7bd5f76 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/datahub_custom_ge_profiler.py +++ b/metadata-ingestion/src/datahub/ingestion/source/datahub_custom_ge_profiler.py @@ -40,7 +40,7 @@ class DatahubGECustomProfiler(BasicDatasetProfilerBase): number_of_columns = len(columns_to_profile) for i, column in enumerate(columns_to_profile): - logger.info( + logger.debug( " Preparing column {} of {}: {}".format( i + 1, number_of_columns, column ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py b/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py index 46275d1164..2441873fd6 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py +++ b/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py @@ -339,7 +339,7 @@ class DatahubGEProfiler: max_workers = min(max_workers, len(requests)) logger.info( - f"Will profile {len(requests)} table(s) with {max_workers} worker(s)" + f"Will profile {len(requests)} table(s) with {max_workers} worker(s) - this may take a while" ) with concurrent.futures.ThreadPoolExecutor( max_workers=max_workers @@ -357,7 +357,7 @@ class DatahubGEProfiler: end_time = time.perf_counter() logger.info( - f"Profiling {len(requests)} table(s) finished in {end_time - start_time} seconds" + f"Profiling {len(requests)} table(s) finished in {(end_time - start_time):.3f} seconds" ) def generate_profile_from_request( @@ -376,7 +376,7 @@ class DatahubGEProfiler: **kwargs: Any, ) -> Optional[DatasetProfileClass]: with self._ge_context() as ge_context: - logger.info(f"Profiling {pretty_name} (this may take a while)") + logger.info(f"Profiling {pretty_name}") evrs = self._profile_data_asset( ge_context, @@ -422,7 +422,7 @@ class DatahubGEProfiler: ) end_time = time.perf_counter() logger.info( - f"Profiling for {pretty_name} took {end_time - start_time} seconds." + f"Profiling for {pretty_name} took {(end_time - start_time):.3f} seconds." ) assert profile_results["success"] diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/snowflake.py b/metadata-ingestion/src/datahub/ingestion/source/sql/snowflake.py index d2b0650b22..91e590071c 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/snowflake.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/snowflake.py @@ -7,12 +7,10 @@ import pydantic # This import verifies that the dependencies are available. import snowflake.sqlalchemy # noqa: F401 -import sqlalchemy from snowflake.sqlalchemy import custom_types from sqlalchemy import create_engine, inspect from sqlalchemy.engine.reflection import Inspector from sqlalchemy.sql import text -from sqlalchemy.sql.elements import quoted_name import datahub.emitter.mce_builder as builder from datahub.configuration.common import AllowDenyPattern @@ -91,8 +89,8 @@ class SnowflakeConfig(BaseSnowflakeConfig, SQLAlchemyConfig): values["database_pattern"].allow = f"^{v}$" return None - def get_sql_alchemy_url(self): - return super().get_sql_alchemy_url(database=None) + def get_sql_alchemy_url(self, database: str = None) -> str: + return super().get_sql_alchemy_url(database=database) class SnowflakeSource(SQLAlchemySource): @@ -109,27 +107,21 @@ class SnowflakeSource(SQLAlchemySource): return cls(config, ctx) def get_inspectors(self) -> Iterable[Inspector]: - url = self.config.get_sql_alchemy_url() + url = self.config.get_sql_alchemy_url(database=None) logger.debug(f"sql_alchemy_url={url}") - engine = create_engine(url, **self.config.options) + db_listing_engine = create_engine(url, **self.config.options) - db = None - - @sqlalchemy.event.listens_for(engine, "connect") - def connect(dbapi_connection, connection_record): - if db is not None: - cursor_obj = dbapi_connection.cursor() - cursor_obj.execute((f'USE DATABASE "{quoted_name(db, True)}"')) - cursor_obj.close() - - for db_row in engine.execute(text("SHOW DATABASES")): + for db_row in db_listing_engine.execute(text("SHOW DATABASES")): db = db_row.name if self.config.database_pattern.allowed(db): + # We create a separate engine for each database in order to ensure that + # they are isolated from each other. self.current_database = db + engine = create_engine( + self.config.get_sql_alchemy_url(database=db), **self.config.options + ) with engine.connect() as conn: - # TRICKY: the "USE DATABASE" command is actually called by the engine's connect event hook. - # https://docs.sqlalchemy.org/en/14/core/engines.html#modifying-the-dbapi-connection-after-connect-or-running-commands-after-connect inspector = inspect(conn) yield inspector else: