From 63533eb38850de75f28441cf336e2da0740c71df Mon Sep 17 00:00:00 2001 From: Pere Miquel Brull Date: Wed, 6 Apr 2022 03:33:25 +0200 Subject: [PATCH] Fix for connectors based on refactoring of schemas V2 (#3870) Co-authored-by: Ayush Shah --- ingestion/examples/workflows/mysql.json | 26 ++-- ingestion/pipelines/sample_data.json | 6 +- ingestion/src/metadata/config/workflow.py | 6 +- .../src/metadata/ingestion/api/workflow.py | 6 +- .../metadata/ingestion/sink/metadata_rest.py | 2 +- .../src/metadata/ingestion/source/mysql.py | 43 ++++--- .../metadata/ingestion/source/sample_data.py | 1 + .../metadata/ingestion/source/snowflake.py | 2 - .../metadata/ingestion/source/sql_source.py | 117 +++++++++++------- .../src/metadata/ingestion/source/sqlite.py | 6 +- ingestion/src/metadata/utils/engines.py | 22 ++-- ingestion/src/metadata/utils/helpers.py | 46 +++---- .../src/metadata/utils/source_connections.py | 55 ++++++++ .../tests/unit/profiler/test_workflow.py | 6 +- 14 files changed, 223 insertions(+), 121 deletions(-) create mode 100644 ingestion/src/metadata/utils/source_connections.py diff --git a/ingestion/examples/workflows/mysql.json b/ingestion/examples/workflows/mysql.json index f35a1b9dac3..370a84bf0a6 100644 --- a/ingestion/examples/workflows/mysql.json +++ b/ingestion/examples/workflows/mysql.json @@ -1,25 +1,25 @@ { "source": { "type": "mysql", - "config": { - "username": "openmetadata_user", - "password": "openmetadata_password", - "database": "openmetadata_db", - "service_name": "local_mysql", - "schema_filter_pattern": { - "includes": ["test_delete.*"] + "serviceName": "local_mysql", + "serviceConnection": { + "config": { + "type": "MySQL", + "username": "openmetadata_user", + "password": "openmetadata_password", + "hostPort": "localhost:3306" } - } + }, + "sourceConfig": {"config": {"enableDataProfiler": false}} }, "sink": { "type": "metadata-rest", "config": {} }, - "metadata_server": { - "type": "metadata-server", - "config": { - "api_endpoint": "http://localhost:8585/api", - "auth_provider_type": "no-auth" + "workflowConfig": { + "openMetadataServerConfig": { + "hostPort": "http://localhost:8585/api", + "authProvider": "no-auth" } } } diff --git a/ingestion/pipelines/sample_data.json b/ingestion/pipelines/sample_data.json index 92204910a1d..6db16dd3b1f 100644 --- a/ingestion/pipelines/sample_data.json +++ b/ingestion/pipelines/sample_data.json @@ -11,10 +11,8 @@ }, "workflowConfig": { "openMetadataServerConfig": { - "api_endpoint": "http://localhost:8585/api", - "auth_provider_type": "no-auth" - }, - "config": { + "hostPort": "http://localhost:8585/api", + "authProvider": "no-auth" } } } diff --git a/ingestion/src/metadata/config/workflow.py b/ingestion/src/metadata/config/workflow.py index 1d17131d319..1a386e18679 100644 --- a/ingestion/src/metadata/config/workflow.py +++ b/ingestion/src/metadata/config/workflow.py @@ -19,7 +19,9 @@ from typing import Type, TypeVar from metadata.generated.schema.metadataIngestion.workflow import ( OpenMetadataServerConfig, ) -from metadata.generated.schema.metadataIngestion.workflow import Source as SourceConfig +from metadata.generated.schema.metadataIngestion.workflow import ( + Source as WorkflowSource, +) from metadata.ingestion.api.common import DynamicTypedConfig from metadata.ingestion.api.processor import Processor from metadata.ingestion.api.sink import Sink @@ -55,7 +57,7 @@ def get_class(key: str) -> Type[T]: def get_ingestion_source( source_type: str, - source_config: SourceConfig, + source_config: WorkflowSource, metadata_config: OpenMetadataServerConfig, ) -> Source: """ diff --git a/ingestion/src/metadata/ingestion/api/workflow.py b/ingestion/src/metadata/ingestion/api/workflow.py index 2f439808d9f..1e6cae4fa5c 100644 --- a/ingestion/src/metadata/ingestion/api/workflow.py +++ b/ingestion/src/metadata/ingestion/api/workflow.py @@ -17,6 +17,7 @@ import click from metadata.config.common import WorkflowExecutionError from metadata.generated.schema.metadataIngestion.workflow import ( + OpenMetadataServerConfig, OpenMetadataWorkflowConfig, ) from metadata.ingestion.api.bulk_sink import BulkSink @@ -48,9 +49,10 @@ class Workflow: self.typeClassFetch(source_type, False), ) ) - metadata_config = self.config.workflowConfig.dict().get( - "openMetadataServerConfig", {} + metadata_config: OpenMetadataServerConfig = ( + self.config.workflowConfig.openMetadataServerConfig ) + self.source: Source = source_class.create( self.config.source.dict(), metadata_config ) diff --git a/ingestion/src/metadata/ingestion/sink/metadata_rest.py b/ingestion/src/metadata/ingestion/sink/metadata_rest.py index e796b3e6f3b..be8dc861ac8 100644 --- a/ingestion/src/metadata/ingestion/sink/metadata_rest.py +++ b/ingestion/src/metadata/ingestion/sink/metadata_rest.py @@ -219,7 +219,7 @@ class MetadataRestSink(Sink[Entity]): if db_schema_and_table.table.tableQueries is not None: self.metadata.ingest_table_queries_data( table=created_table, - table_queries=db_schema_and_table.table.tableQueries, + table_queries=db_schema_and_table.table.dict().get("tableQueries"), ) logger.info( diff --git a/ingestion/src/metadata/ingestion/source/mysql.py b/ingestion/src/metadata/ingestion/source/mysql.py index fa37d27e023..0423bada9f5 100644 --- a/ingestion/src/metadata/ingestion/source/mysql.py +++ b/ingestion/src/metadata/ingestion/source/mysql.py @@ -19,14 +19,12 @@ from metadata.generated.schema.entity.services.connections.database.mysqlConnect from metadata.generated.schema.metadataIngestion.workflow import ( OpenMetadataServerConfig, ) +from metadata.generated.schema.metadataIngestion.workflow import ( + Source as WorkflowSource, +) from metadata.ingestion.api.common import Entity +from metadata.ingestion.api.source import InvalidSourceException from metadata.ingestion.source.sql_source import SQLSource -from metadata.ingestion.source.sql_source_common import SQLConnectionConfig - - -class MySQLConfig(MysqlConnection, SQLConnectionConfig): - def get_connection_url(self): - return super().get_connection_url() class MysqlSource(SQLSource): @@ -35,28 +33,35 @@ class MysqlSource(SQLSource): @classmethod def create(cls, config_dict, metadata_config: OpenMetadataServerConfig): - config = MySQLConfig.parse_obj(config_dict) + config: WorkflowSource = WorkflowSource.parse_obj(config_dict) + connection: MysqlConnection = config.serviceConnection.__root__.config + if not isinstance(connection, MysqlConnection): + raise InvalidSourceException( + f"Expected SQLiteConnection, but got {connection}" + ) + return cls(config, metadata_config) def prepare(self): self.inspector = inspect(self.engine) - self.schema_names = ( - self.inspector.get_schema_names() - if not self.config.database - else [self.config.database] - ) + self.service_connection.database = "default" return super().prepare() def next_record(self) -> Iterable[Entity]: - for schema in self.schema_names: + for schema in self.inspector.get_schema_names(): self.database_source_state.clear() - if not self.sql_config.schema_filter_pattern.included(schema): + if ( + self.source_config.schemaFilterPattern + and schema not in self.source_config.schemaFilterPattern.includes + ): self.status.filter(schema, "Schema pattern not allowed") continue - if self.config.include_tables: - yield from self.fetch_tables(self.inspector, schema) - if self.config.include_views: + + # Fetch tables by default + yield from self.fetch_tables(self.inspector, schema) + + if self.source_config.includeViews: yield from self.fetch_views(self.inspector, schema) - if self.config.mark_deleted_tables_as_deleted: - schema_fqdn = f"{self.config.service_name}.{schema}" + if self.source_config.markDeletedTables: + schema_fqdn = f"{self.config.serviceName}.{schema}" yield from self.delete_tables(schema_fqdn) diff --git a/ingestion/src/metadata/ingestion/source/sample_data.py b/ingestion/src/metadata/ingestion/source/sample_data.py index 3e4d94b0449..02deffc54e4 100644 --- a/ingestion/src/metadata/ingestion/source/sample_data.py +++ b/ingestion/src/metadata/ingestion/source/sample_data.py @@ -307,6 +307,7 @@ class SampleDataSource(Source[Entity]): @classmethod def create(cls, config_dict, metadata_config): config = SampleDataSourceConfig.parse_obj(config_dict) + metadata_config = OpenMetadataServerConfig.parse_obj(metadata_config) return cls(config, metadata_config) def prepare(self): diff --git a/ingestion/src/metadata/ingestion/source/snowflake.py b/ingestion/src/metadata/ingestion/source/snowflake.py index 861adab89fa..8049145e96e 100644 --- a/ingestion/src/metadata/ingestion/source/snowflake.py +++ b/ingestion/src/metadata/ingestion/source/snowflake.py @@ -23,12 +23,10 @@ from sqlalchemy.inspection import inspect from sqlalchemy.sql import text from metadata.config.common import FQDN_SEPARATOR -from metadata.generated.schema.entity.data.database import Database from metadata.generated.schema.entity.data.table import TableData from metadata.generated.schema.metadataIngestion.workflow import ( OpenMetadataServerConfig, ) -from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.source.sql_source import SQLSource from metadata.ingestion.source.sql_source_common import SQLConnectionConfig from metadata.utils.column_type_parser import create_sqlalchemy_type diff --git a/ingestion/src/metadata/ingestion/source/sql_source.py b/ingestion/src/metadata/ingestion/source/sql_source.py index 94ada730ad6..e6d64721a3a 100644 --- a/ingestion/src/metadata/ingestion/source/sql_source.py +++ b/ingestion/src/metadata/ingestion/source/sql_source.py @@ -11,6 +11,7 @@ """ Generic source to build SQL connectors. """ +import copy import json import logging import re @@ -39,10 +40,15 @@ from metadata.generated.schema.entity.data.table import ( TableData, TableProfile, ) +from metadata.generated.schema.metadataIngestion.databaseServiceMetadataPipeline import ( + DatabaseServiceMetadataPipeline, +) from metadata.generated.schema.metadataIngestion.workflow import ( OpenMetadataServerConfig, ) -from metadata.generated.schema.metadataIngestion.workflow import Source as SourceConfig +from metadata.generated.schema.metadataIngestion.workflow import ( + Source as WorkflowSource, +) from metadata.generated.schema.type.entityReference import EntityReference from metadata.generated.schema.type.tagLabel import TagLabel from metadata.ingestion.api.common import Entity @@ -82,28 +88,40 @@ class SQLSource(Source[OMetaDatabaseAndTable]): def __init__( self, - config: SourceConfig, + config: WorkflowSource, metadata_config: OpenMetadataServerConfig, ): super().__init__() + self.config = config + + # It will be one of the Unions. We don't know the specific type here. + self.service_connection = self.config.serviceConnection.__root__.config + + self.source_config: DatabaseServiceMetadataPipeline = ( + self.config.sourceConfig.config + ) + self.metadata_config = metadata_config self.service = get_database_service_or_create(config, metadata_config) self.metadata = OpenMetadata(metadata_config) self.status = SQLSourceStatus() - self.sql_config = self.config - self.engine = get_engine(config=self.sql_config) + self.engine = get_engine(config=self.config) self._session = None # We will instantiate this just if needed self.connection = self.engine.connect() self.data_profiler = None self.data_models = {} self.table_constraints = None self.database_source_state = set() - if self.config.dbt_catalog_file is not None: - with open(self.config.dbt_catalog_file, "r", encoding="utf-8") as catalog: + if self.source_config.dbtCatalogFilePath: + with open( + self.source_config.dbtCatalogFilePath, "r", encoding="utf-8" + ) as catalog: self.dbt_catalog = json.load(catalog) - if self.config.dbt_manifest_file is not None: - with open(self.config.dbt_manifest_file, "r", encoding="utf-8") as manifest: + if self.source_config.dbtManifestFilePath: + with open( + self.source_config.dbtManifestFilePath, "r", encoding="utf-8" + ) as manifest: self.dbt_manifest = json.load(manifest) self.profile_date = datetime.now() @@ -129,7 +147,7 @@ class SQLSource(Source[OMetaDatabaseAndTable]): logger.error( f"Profiling not available for this databaseService: {str(err)}" ) - self.config.data_profiler_enabled = False + self.source_config.enableDataProfiler = False except Exception as exc: # pylint: disable=broad-except logger.debug(traceback.print_exc()) @@ -164,7 +182,7 @@ class SQLSource(Source[OMetaDatabaseAndTable]): to the Table Entities """ try: - query = self.config.query.format(schema, table) + query = self.source_config.sampleDataQuery.format(schema, table) logger.info(query) results = self.connection.execute(query) cols = [] @@ -191,18 +209,20 @@ class SQLSource(Source[OMetaDatabaseAndTable]): inspectors = self.get_databases() for inspector in inspectors: schema_names = inspector.get_schema_names() + print(schema_names) for schema in schema_names: # clear any previous source database state self.database_source_state.clear() - if not self.sql_config.schema_filter_pattern.included(schema): + if ( + self.source_config.schemaFilterPattern + and schema not in self.source_config.schemaFilterPattern.includes + ): self.status.filter(schema, "Schema pattern not allowed") continue - if self.config.include_tables: - yield from self.fetch_tables(inspector, schema) - if self.config.include_views: + if self.source_config.includeViews: yield from self.fetch_views(inspector, schema) - if self.config.mark_deleted_tables_as_deleted: - schema_fqdn = f"{self.config.service_name}.{schema}" + if self.source_config.markDeletedTables: + schema_fqdn = f"{self.config.serviceName}.{schema}" yield from self.delete_tables(schema_fqdn) def fetch_tables( @@ -218,20 +238,23 @@ class SQLSource(Source[OMetaDatabaseAndTable]): schema, table_name = self.standardize_schema_table_names( schema, table_name ) - if not self.sql_config.table_filter_pattern.included(table_name): + if ( + self.source_config.tableFilterPattern + and table_name not in self.source_config.tableFilterPattern.includes + ): self.status.filter( - f"{self.config.get_service_name()}.{table_name}", + f"{self.config.serviceName}.{table_name}", "Table pattern not allowed", ) continue if self._is_partition(table_name, schema, inspector): self.status.filter( - f"{self.config.get_service_name()}.{table_name}", + f"{self.config.serviceName}.{table_name}", "Table is partition", ) continue description = _get_table_description(schema, table_name, inspector) - fqn = self.get_table_fqn(self.config.service_name, schema, table_name) + fqn = self.get_table_fqn(self.config.serviceName, schema, table_name) self.database_source_state.add(fqn) self.table_constraints = None table_columns = self._get_columns(schema, table_name, inspector) @@ -240,13 +263,12 @@ class SQLSource(Source[OMetaDatabaseAndTable]): name=table_name, tableType="Regular", description=description if description is not None else " ", - fullyQualifiedName=fqn, columns=table_columns, ) if self.table_constraints: table_entity.tableConstraints = self.table_constraints try: - if self.sql_config.generate_sample_data: + if self.source_config.generateSampleData: table_data = self.fetch_sample_data(schema, table_name) if table_data: table_entity.sampleData = table_data @@ -256,7 +278,7 @@ class SQLSource(Source[OMetaDatabaseAndTable]): logger.error(err) try: - if self.config.data_profiler_enabled: + if self.source_config.enableDataProfiler: profile = self.run_profiler(table=table_entity, schema=schema) table_entity.tableProfile = [profile] if profile else None # Catch any errors during the profile runner and continue @@ -265,21 +287,19 @@ class SQLSource(Source[OMetaDatabaseAndTable]): # check if we have any model to associate with table_entity.dataModel = self._get_data_model(schema, table_name) - database = self._get_database(self.config.database) + database = self._get_database(self.service_connection.database) table_schema_and_db = OMetaDatabaseAndTable( table=table_entity, database=database, database_schema=self._get_schema(schema, database), ) yield table_schema_and_db - self.status.scanned( - "{}.{}".format(self.config.get_service_name(), table_name) - ) + self.status.scanned("{}.{}".format(self.config.serviceName, table_name)) except Exception as err: logger.debug(traceback.print_exc()) logger.error(err) self.status.failures.append( - "{}.{}".format(self.config.service_name, table_name) + "{}.{}".format(self.config.serviceName, table_name) ) continue @@ -292,20 +312,23 @@ class SQLSource(Source[OMetaDatabaseAndTable]): """ for view_name in inspector.get_view_names(schema): try: - if self.config.scheme == "bigquery": + if self.service_connection.scheme == "bigquery": schema, view_name = self.standardize_schema_table_names( schema, view_name ) - if not self.sql_config.table_filter_pattern.included(view_name): + if ( + self.source_config.tableFilterPattern + and view_name not in self.source_config.tableFilterPattern.includes + ): self.status.filter( - f"{self.config.get_service_name()}.{view_name}", + f"{self.config.serviceName}.{view_name}", "View pattern not allowed", ) continue try: - if self.config.scheme == "bigquery": + if self.service_connection.scheme == "bigquery": view_definition = inspector.get_view_definition( - f"{self.config.project_id}.{schema}.{view_name}" + f"{self.service_connection.projectId}.{schema}.{view_name}" ) else: view_definition = inspector.get_view_definition( @@ -316,7 +339,7 @@ class SQLSource(Source[OMetaDatabaseAndTable]): ) except NotImplementedError: view_definition = "" - fqn = self.get_table_fqn(self.config.service_name, schema, view_name) + fqn = self.get_table_fqn(self.config.serviceName, schema, view_name) self.database_source_state.add(fqn) table = Table( id=uuid.uuid4(), @@ -325,7 +348,6 @@ class SQLSource(Source[OMetaDatabaseAndTable]): description=_get_table_description(schema, view_name, inspector) or "", # This will be generated in the backend!! #1673 - fullyQualifiedName=view_name, columns=self._get_columns(schema, view_name, inspector), viewDefinition=view_definition, ) @@ -334,16 +356,16 @@ class SQLSource(Source[OMetaDatabaseAndTable]): "sql": table.viewDefinition.__root__, "from_type": "table", "to_type": "table", - "service_name": self.config.service_name, + "service_name": self.config.serviceName, } ingest_lineage( query_info=query_info, metadata_config=self.metadata_config ) - if self.sql_config.generate_sample_data: + if self.source_config.generateSampleData: table_data = self.fetch_sample_data(schema, view_name) table.sampleData = table_data - table.dataModel = self._get_data_model(schema, view_name) - database = self._get_database(self.config.database) + # table.dataModel = self._get_data_model(schema, view_name) + database = self._get_database(self.service_connection.database) table_schema_and_db = OMetaDatabaseAndTable( table=table, database=database, @@ -353,7 +375,7 @@ class SQLSource(Source[OMetaDatabaseAndTable]): # Catch any errors and continue the ingestion except Exception as err: # pylint: disable=broad-except logger.error(err) - self.status.warnings.append(f"{self.config.service_name}.{view_name}") + self.status.warnings.append(f"{self.config.serviceName}.{view_name}") continue def delete_tables(self, schema_fqdn: str) -> DeleteTable: @@ -370,7 +392,10 @@ class SQLSource(Source[OMetaDatabaseAndTable]): """ Get all the DBT information and feed it to the Table Entity """ - if self.config.dbt_manifest_file and self.config.dbt_catalog_file: + if ( + self.source_config.dbtManifestFilePath + and self.source_config.dbtCatalogFilePath + ): logger.info("Parsing Data Models") manifest_entities = { **self.dbt_manifest["nodes"], @@ -421,7 +446,7 @@ class SQLSource(Source[OMetaDatabaseAndTable]): try: _, database, table = node.split(".", 2) table_fqn = self.get_table_fqn( - self.config.service_name, database, table + self.config.serviceName, database, table ).lower() upstream_nodes.append(table_fqn) except Exception as err: # pylint: disable=broad-except @@ -471,14 +496,18 @@ class SQLSource(Source[OMetaDatabaseAndTable]): def _get_database(self, database: str) -> Database: return Database( name=database, - service=EntityReference(id=self.service.id, type=self.config.service_type), + service=EntityReference( + id=self.service.id, type=self.service_connection.type.value + ), ) def _get_schema(self, schema: str, database: Database) -> DatabaseSchema: return DatabaseSchema( name=schema, database=database.service, - service=EntityReference(id=self.service.id, type=self.config.service_type), + service=EntityReference( + id=self.service.id, type=self.service_connection.type.value + ), ) @staticmethod diff --git a/ingestion/src/metadata/ingestion/source/sqlite.py b/ingestion/src/metadata/ingestion/source/sqlite.py index 974dc3576b4..46d87c5102d 100644 --- a/ingestion/src/metadata/ingestion/source/sqlite.py +++ b/ingestion/src/metadata/ingestion/source/sqlite.py @@ -20,7 +20,9 @@ from metadata.generated.schema.entity.services.connections.database.sqliteConnec from metadata.generated.schema.metadataIngestion.workflow import ( OpenMetadataServerConfig, ) -from metadata.generated.schema.metadataIngestion.workflow import Source as SourceConfig +from metadata.generated.schema.metadataIngestion.workflow import ( + Source as WorkflowSource, +) from metadata.ingestion.api.source import InvalidSourceException from metadata.ingestion.source.sql_source import SQLSource @@ -31,7 +33,7 @@ class SqliteSource(SQLSource): @classmethod def create(cls, config_dict, metadata_config: OpenMetadataServerConfig): - config: SourceConfig = SourceConfig.parse_obj(config_dict) + config: WorkflowSource = WorkflowSource.parse_obj(config_dict) connection = config.serviceConnection.__root__.config if not isinstance(connection, SQLiteConnection): raise InvalidSourceException( diff --git a/ingestion/src/metadata/utils/engines.py b/ingestion/src/metadata/utils/engines.py index b0f4d6e8b96..8706acb25c6 100644 --- a/ingestion/src/metadata/utils/engines.py +++ b/ingestion/src/metadata/utils/engines.py @@ -19,22 +19,30 @@ from sqlalchemy.engine.base import Engine from sqlalchemy.orm import sessionmaker from sqlalchemy.orm.session import Session -from metadata.generated.schema.metadataIngestion.workflow import Source as SourceConfig +from metadata.generated.schema.metadataIngestion.workflow import ( + Source as WorkflowSource, +) +from metadata.utils.source_connections import get_connection_url logger = logging.getLogger("Utils") -# TODO: fix this and use the singledispatch to build the URL instead of get_connection_url -def get_engine(config: SourceConfig, verbose: bool = False) -> Engine: +def get_engine(config: WorkflowSource, verbose: bool = False) -> Engine: """ Given an SQL configuration, build the SQLAlchemy Engine """ logger.info(f"Building Engine for {config.serviceName}...") - + service_connection_config = config.serviceConnection.__root__.config + options = service_connection_config.connectionOptions + if not options: + options = {} + connect_args = service_connection_config.connectionArguments + if not connect_args: + connect_args = {} engine = create_engine( - config.get_connection_url(), - **config.options, - connect_args=config.connect_args, + get_connection_url(config.serviceConnection.__root__.config), + **options, + connect_args=connect_args, echo=verbose, ) diff --git a/ingestion/src/metadata/utils/helpers.py b/ingestion/src/metadata/utils/helpers.py index ffb5c97de8d..2e85426c410 100644 --- a/ingestion/src/metadata/utils/helpers.py +++ b/ingestion/src/metadata/utils/helpers.py @@ -13,6 +13,7 @@ import logging import traceback from datetime import datetime, timedelta from typing import Any, Dict, Iterable +from urllib.parse import quote_plus from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.api.services.createDashboardService import ( @@ -36,7 +37,9 @@ from metadata.generated.schema.entity.services.databaseService import DatabaseSe from metadata.generated.schema.entity.services.messagingService import MessagingService from metadata.generated.schema.entity.services.pipelineService import PipelineService from metadata.generated.schema.entity.services.storageService import StorageService -from metadata.generated.schema.metadataIngestion.workflow import Source as SourceConfig +from metadata.generated.schema.metadataIngestion.workflow import ( + Source as WorkflowSource, +) from metadata.generated.schema.type.entityLineage import EntitiesEdge from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.ometa.ometa_api import OpenMetadata @@ -62,7 +65,7 @@ def snake_to_camel(s): def get_database_service_or_create( - config: SourceConfig, metadata_config, service_name=None + config: WorkflowSource, metadata_config, service_name=None ) -> DatabaseService: metadata = OpenMetadata(metadata_config) if not service_name: @@ -70,47 +73,44 @@ def get_database_service_or_create( service: DatabaseService = metadata.get_by_name( entity=DatabaseService, fqdn=service_name ) - if service: - return service - else: + if not service: + config_dict = config.dict() + service_connection_config = config_dict.get("serviceConnection").get("config") password = ( - config.password.get_secret_value() - if hasattr(config, "password") and config.password + service_connection_config.get("password").get_secret_value() + if service_connection_config and service_connection_config.get("password") else None ) # Use a JSON to dynamically parse the pydantic model # based on the serviceType + # TODO revisit me service_json = { "connection": { "config": { - "hostPort": config.host_port - if hasattr(config, "host_port") - else None, - "username": config.username - if hasattr(config, "username") - else None, + "hostPort": service_connection_config.get("hostPort"), + "username": service_connection_config.get("username"), "password": password, - "database": config.database - if hasattr(config, "database") - else None, - "connectionOptions": config.options - if hasattr(config, "options") - else None, - "connectionArguments": config.connect_args - if hasattr(config, "connect_args") - else None, + "database": service_connection_config.get("database"), + "connectionOptions": service_connection_config.get( + "connectionOptions" + ), + "connectionArguments": service_connection_config.get( + "connectionArguments" + ), } }, "name": service_name, "description": "", - "serviceType": config.service_type, + "serviceType": service_connection_config.get("type").value, } + created_service: DatabaseService = metadata.create_or_update( CreateDatabaseServiceRequest(**service_json) ) logger.info(f"Creating DatabaseService instance for {service_name}") return created_service + return service def get_messaging_service_or_create( diff --git a/ingestion/src/metadata/utils/source_connections.py b/ingestion/src/metadata/utils/source_connections.py new file mode 100644 index 00000000000..97f3aef4f95 --- /dev/null +++ b/ingestion/src/metadata/utils/source_connections.py @@ -0,0 +1,55 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Hosts the singledispatch to build source URLs +""" +from functools import singledispatch +from urllib.parse import quote_plus + +from metadata.generated.schema.entity.services.connections.database.mysqlConnection import ( + MysqlConnection, +) + + +@singledispatch +def get_connection_url(connection): + raise NotImplemented( + f"Connection URL build not implemented for type {type(connection)}: {connection}" + ) + + +@get_connection_url.register +def _(connection: MysqlConnection): + + url = f"{connection.scheme.value}://" + + if connection.username: + url += f"{connection.username}" + url += ( + f":{quote_plus(connection.password.get_secret_value())}" + if connection + else "" + ) + url += "@" + + url += connection.hostPort + url += f"/{connection.database}" if connection.database else "" + + options = connection.connectionOptions + if options: + if not connection.database: + url += "/" + params = "&".join( + f"{key}={quote_plus(value)}" for (key, value) in options.items() if value + ) + url = f"{url}?{params}" + + return url diff --git a/ingestion/tests/unit/profiler/test_workflow.py b/ingestion/tests/unit/profiler/test_workflow.py index d9e38e5cf49..72fbbde7eda 100644 --- a/ingestion/tests/unit/profiler/test_workflow.py +++ b/ingestion/tests/unit/profiler/test_workflow.py @@ -24,7 +24,9 @@ from metadata.generated.schema.entity.data.table import Column, DataType, Table from metadata.generated.schema.metadataIngestion.workflow import ( OpenMetadataServerConfig, ) -from metadata.generated.schema.metadataIngestion.workflow import Source as SourceConfig +from metadata.generated.schema.metadataIngestion.workflow import ( + Source as WorkflowSource, +) from metadata.generated.schema.tests.column.columnValuesToBeBetween import ( ColumnValuesToBeBetween, ) @@ -66,7 +68,7 @@ def test_init_workflow(): """ We can initialise the workflow from a config """ - assert isinstance(workflow.source_config, SourceConfig) + assert isinstance(workflow.source_config, WorkflowSource) assert isinstance(workflow.metadata_config, OpenMetadataServerConfig) assert isinstance(workflow.processor, OrmProfilerProcessor)