Refactor engine (#4236)

This commit is contained in:
Pere Miquel Brull 2022-04-19 17:48:55 +02:00 committed by GitHub
parent 7283b858e4
commit 579d3cdd88
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 31 additions and 13 deletions

View File

@ -38,7 +38,6 @@ from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.source import InvalidSourceException
from metadata.ingestion.source.sql_source import SQLSource
from metadata.utils.column_type_parser import create_sqlalchemy_type
from metadata.utils.credentials import set_google_credentials
from metadata.utils.helpers import get_start_and_end
logger = logging.getLogger(__name__)
@ -111,10 +110,6 @@ class BigquerySource(SQLSource):
f"Expected BigQueryConnection, but got {connection}"
)
set_google_credentials(
gcs_credentials=config.serviceConnection.__root__.config.credentials
)
return cls(config, metadata_config)
def standardize_schema_table_names(

View File

@ -19,9 +19,6 @@ from metadata.generated.schema.entity.data.glossaryTerm import GlossaryTerm
from metadata.generated.schema.entity.data.pipeline import Pipeline
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.data.topic import Topic
from metadata.generated.schema.entity.services.connections.metadata.metadataESConnection import (
MetadataESConnection,
)
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
@ -31,8 +28,7 @@ from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.ingestion.api.common import Entity
from metadata.ingestion.api.source import InvalidSourceException, Source, SourceStatus
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.api.source import Source, SourceStatus
logger = logging.getLogger(__name__)

View File

@ -13,6 +13,7 @@
Build and document all supported Engines
"""
import logging
from functools import singledispatch
from sqlalchemy import create_engine
from sqlalchemy.engine.base import Engine
@ -23,6 +24,10 @@ from sqlalchemy.orm.session import Session
from metadata.generated.schema.entity.services.connections.connectionBasicType import (
ConnectionOptions,
)
from metadata.generated.schema.entity.services.connections.database.bigQueryConnection import (
BigQueryConnection,
)
from metadata.utils.credentials import set_google_credentials
from metadata.utils.source_connections import get_connection_args, get_connection_url
from metadata.utils.timeout import timeout
@ -35,11 +40,13 @@ class SourceConnectionException(Exception):
"""
def get_engine(connection, verbose: bool = False) -> Engine:
def create_generic_engine(connection, verbose: bool = False):
"""
Given an SQL configuration, build the SQLAlchemy Engine
Generic Engine creation from connection object
:param connection: JSON Schema connection model
:param verbose: debugger or not
:return: SQAlchemy Engine
"""
options = connection.connectionOptions
if not options:
options = ConnectionOptions()
@ -54,6 +61,26 @@ def get_engine(connection, verbose: bool = False) -> Engine:
return engine
@singledispatch
def get_engine(connection, verbose: bool = False) -> Engine:
"""
Given an SQL configuration, build the SQLAlchemy Engine
"""
return create_generic_engine(connection, verbose)
@get_engine.register
def _(connection: BigQueryConnection, verbose: bool = False):
"""
Prepare the engine and the GCS credentials
:param connection: BigQuery connection
:param verbose: debugger or not
:return: Engine
"""
set_google_credentials(gcs_credentials=connection.credentials)
return create_generic_engine(connection, verbose)
def create_and_bind_session(engine: Engine) -> Session:
"""
Given an engine, create a session bound