fix(ingest): correctly support multiple snowflake databases (#3482)

This commit is contained in:
Harshal Sheth 2021-10-28 14:44:56 -04:00 committed by GitHub
parent 6f72536bbe
commit 1fec105b24
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 15 additions and 23 deletions

View File

@ -40,7 +40,7 @@ class DatahubGECustomProfiler(BasicDatasetProfilerBase):
number_of_columns = len(columns_to_profile)
for i, column in enumerate(columns_to_profile):
logger.info(
logger.debug(
" Preparing column {} of {}: {}".format(
i + 1, number_of_columns, column
)

View File

@ -339,7 +339,7 @@ class DatahubGEProfiler:
max_workers = min(max_workers, len(requests))
logger.info(
f"Will profile {len(requests)} table(s) with {max_workers} worker(s)"
f"Will profile {len(requests)} table(s) with {max_workers} worker(s) - this may take a while"
)
with concurrent.futures.ThreadPoolExecutor(
max_workers=max_workers
@ -357,7 +357,7 @@ class DatahubGEProfiler:
end_time = time.perf_counter()
logger.info(
f"Profiling {len(requests)} table(s) finished in {end_time - start_time} seconds"
f"Profiling {len(requests)} table(s) finished in {(end_time - start_time):.3f} seconds"
)
def generate_profile_from_request(
@ -376,7 +376,7 @@ class DatahubGEProfiler:
**kwargs: Any,
) -> Optional[DatasetProfileClass]:
with self._ge_context() as ge_context:
logger.info(f"Profiling {pretty_name} (this may take a while)")
logger.info(f"Profiling {pretty_name}")
evrs = self._profile_data_asset(
ge_context,
@ -422,7 +422,7 @@ class DatahubGEProfiler:
)
end_time = time.perf_counter()
logger.info(
f"Profiling for {pretty_name} took {end_time - start_time} seconds."
f"Profiling for {pretty_name} took {(end_time - start_time):.3f} seconds."
)
assert profile_results["success"]

View File

@ -7,12 +7,10 @@ import pydantic
# This import verifies that the dependencies are available.
import snowflake.sqlalchemy # noqa: F401
import sqlalchemy
from snowflake.sqlalchemy import custom_types
from sqlalchemy import create_engine, inspect
from sqlalchemy.engine.reflection import Inspector
from sqlalchemy.sql import text
from sqlalchemy.sql.elements import quoted_name
import datahub.emitter.mce_builder as builder
from datahub.configuration.common import AllowDenyPattern
@ -91,8 +89,8 @@ class SnowflakeConfig(BaseSnowflakeConfig, SQLAlchemyConfig):
values["database_pattern"].allow = f"^{v}$"
return None
def get_sql_alchemy_url(self):
return super().get_sql_alchemy_url(database=None)
def get_sql_alchemy_url(self, database: str = None) -> str:
return super().get_sql_alchemy_url(database=database)
class SnowflakeSource(SQLAlchemySource):
@ -109,27 +107,21 @@ class SnowflakeSource(SQLAlchemySource):
return cls(config, ctx)
def get_inspectors(self) -> Iterable[Inspector]:
url = self.config.get_sql_alchemy_url()
url = self.config.get_sql_alchemy_url(database=None)
logger.debug(f"sql_alchemy_url={url}")
engine = create_engine(url, **self.config.options)
db_listing_engine = create_engine(url, **self.config.options)
db = None
@sqlalchemy.event.listens_for(engine, "connect")
def connect(dbapi_connection, connection_record):
if db is not None:
cursor_obj = dbapi_connection.cursor()
cursor_obj.execute((f'USE DATABASE "{quoted_name(db, True)}"'))
cursor_obj.close()
for db_row in engine.execute(text("SHOW DATABASES")):
for db_row in db_listing_engine.execute(text("SHOW DATABASES")):
db = db_row.name
if self.config.database_pattern.allowed(db):
# We create a separate engine for each database in order to ensure that
# they are isolated from each other.
self.current_database = db
engine = create_engine(
self.config.get_sql_alchemy_url(database=db), **self.config.options
)
with engine.connect() as conn:
# TRICKY: the "USE DATABASE" command is actually called by the engine's connect event hook.
# https://docs.sqlalchemy.org/en/14/core/engines.html#modifying-the-dbapi-connection-after-connect-or-running-commands-after-connect
inspector = inspect(conn)
yield inspector
else: