From 579d3cdd884195cb820358b8eec8055aba7caf99 Mon Sep 17 00:00:00 2001 From: Pere Miquel Brull Date: Tue, 19 Apr 2022 17:48:55 +0200 Subject: [PATCH] Refactor engine (#4236) --- .../src/metadata/ingestion/source/bigquery.py | 5 --- .../src/metadata/ingestion/source/metadata.py | 6 +--- ingestion/src/metadata/utils/engines.py | 33 +++++++++++++++++-- 3 files changed, 31 insertions(+), 13 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/bigquery.py b/ingestion/src/metadata/ingestion/source/bigquery.py index 7a335ad2f6a..7715404e859 100644 --- a/ingestion/src/metadata/ingestion/source/bigquery.py +++ b/ingestion/src/metadata/ingestion/source/bigquery.py @@ -38,7 +38,6 @@ from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.api.source import InvalidSourceException from metadata.ingestion.source.sql_source import SQLSource from metadata.utils.column_type_parser import create_sqlalchemy_type -from metadata.utils.credentials import set_google_credentials from metadata.utils.helpers import get_start_and_end logger = logging.getLogger(__name__) @@ -111,10 +110,6 @@ class BigquerySource(SQLSource): f"Expected BigQueryConnection, but got {connection}" ) - set_google_credentials( - gcs_credentials=config.serviceConnection.__root__.config.credentials - ) - return cls(config, metadata_config) def standardize_schema_table_names( diff --git a/ingestion/src/metadata/ingestion/source/metadata.py b/ingestion/src/metadata/ingestion/source/metadata.py index 32e9e6a7c86..6951ba668dd 100644 --- a/ingestion/src/metadata/ingestion/source/metadata.py +++ b/ingestion/src/metadata/ingestion/source/metadata.py @@ -19,9 +19,6 @@ from metadata.generated.schema.entity.data.glossaryTerm import GlossaryTerm from metadata.generated.schema.entity.data.pipeline import Pipeline from metadata.generated.schema.entity.data.table import Table from metadata.generated.schema.entity.data.topic import Topic -from metadata.generated.schema.entity.services.connections.metadata.metadataESConnection import ( - MetadataESConnection, -) from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( OpenMetadataConnection, ) @@ -31,8 +28,7 @@ from metadata.generated.schema.metadataIngestion.workflow import ( Source as WorkflowSource, ) from metadata.ingestion.api.common import Entity -from metadata.ingestion.api.source import InvalidSourceException, Source, SourceStatus -from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.ingestion.api.source import Source, SourceStatus logger = logging.getLogger(__name__) diff --git a/ingestion/src/metadata/utils/engines.py b/ingestion/src/metadata/utils/engines.py index 69688e83816..18a08a40d39 100644 --- a/ingestion/src/metadata/utils/engines.py +++ b/ingestion/src/metadata/utils/engines.py @@ -13,6 +13,7 @@ Build and document all supported Engines """ import logging +from functools import singledispatch from sqlalchemy import create_engine from sqlalchemy.engine.base import Engine @@ -23,6 +24,10 @@ from sqlalchemy.orm.session import Session from metadata.generated.schema.entity.services.connections.connectionBasicType import ( ConnectionOptions, ) +from metadata.generated.schema.entity.services.connections.database.bigQueryConnection import ( + BigQueryConnection, +) +from metadata.utils.credentials import set_google_credentials from metadata.utils.source_connections import get_connection_args, get_connection_url from metadata.utils.timeout import timeout @@ -35,11 +40,13 @@ class SourceConnectionException(Exception): """ -def get_engine(connection, verbose: bool = False) -> Engine: +def create_generic_engine(connection, verbose: bool = False): """ - Given an SQL configuration, build the SQLAlchemy Engine + Generic Engine creation from connection object + :param connection: JSON Schema connection model + :param verbose: debugger or not + :return: SQAlchemy Engine """ - options = connection.connectionOptions if not options: options = ConnectionOptions() @@ -54,6 +61,26 @@ def get_engine(connection, verbose: bool = False) -> Engine: return engine +@singledispatch +def get_engine(connection, verbose: bool = False) -> Engine: + """ + Given an SQL configuration, build the SQLAlchemy Engine + """ + return create_generic_engine(connection, verbose) + + +@get_engine.register +def _(connection: BigQueryConnection, verbose: bool = False): + """ + Prepare the engine and the GCS credentials + :param connection: BigQuery connection + :param verbose: debugger or not + :return: Engine + """ + set_google_credentials(gcs_credentials=connection.credentials) + return create_generic_engine(connection, verbose) + + def create_and_bind_session(engine: Engine) -> Session: """ Given an engine, create a session bound