From 7c3a6082a60d820d2b771d1dbfc3834ea8abc505 Mon Sep 17 00:00:00 2001 From: Ayush Shah Date: Thu, 7 Apr 2022 08:52:38 -0700 Subject: [PATCH] Refactor - Redshift Connection (#3920) --- ingestion/examples/workflows/redshift.json | 40 ++++++++----- .../src/metadata/ingestion/source/redshift.py | 59 +++++-------------- .../src/metadata/utils/source_connections.py | 4 ++ ingestion/tests/unit/test_source_url.py | 50 ++++++++++++---- 4 files changed, 82 insertions(+), 71 deletions(-) diff --git a/ingestion/examples/workflows/redshift.json b/ingestion/examples/workflows/redshift.json index 48c3e41e515..ff4d941f1ae 100644 --- a/ingestion/examples/workflows/redshift.json +++ b/ingestion/examples/workflows/redshift.json @@ -1,27 +1,35 @@ { "source": { "type": "redshift", - "config": { - "host_port": "cluster.name.region.redshift.amazonaws.com:5439", - "username": "username", - "password": "strong_password", - "database": "dev", - "service_name": "aws_redshift", - "table_filter_pattern": { - "excludes": ["information_schema.*", "[\\w]*event_vw.*"] - }, - "query":"select * from \"{}\".\"{}\" Limit 50" + "serviceName": "aws_redshift", + "serviceConnection": { + "config": { + "hostPort": "cluster.name.region.redshift.amazonaws.com:5439", + "username": "username", + "password": "strong_password", + "database": "dev", + "type": "Redshift" + } + }, + "sourceConfig": { + "config": { + "schemaFilterPattern": { + "excludes": [ + "information_schema.*", + "[\\w]*event_vw.*" + ] + } + } } }, "sink": { "type": "metadata-rest", "config": {} }, - "metadata_server": { - "type": "metadata-server", - "config": { - "api_endpoint": "http://localhost:8585/api", - "auth_provider_type": "no-auth" + "workflowConfig": { + "openMetadataServerConfig": { + "hostPort": "http://localhost:8585/api", + "authProvider": "no-auth" } } -} \ No newline at end of file +} diff --git a/ingestion/src/metadata/ingestion/source/redshift.py b/ingestion/src/metadata/ingestion/source/redshift.py index 4df90f1dd4f..054262429b4 100644 --- a/ingestion/src/metadata/ingestion/source/redshift.py +++ b/ingestion/src/metadata/ingestion/source/redshift.py @@ -12,11 +12,9 @@ Redshift source ingestion """ - import logging import re from collections import defaultdict -from typing import Optional import sqlalchemy as sa from packaging.version import Version @@ -31,9 +29,11 @@ from sqlalchemy_redshift.dialect import RedshiftDialectMixin, RelationKey from metadata.generated.schema.metadataIngestion.workflow import ( OpenMetadataServerConfig, ) -from metadata.ingestion.api.source import SourceStatus +from metadata.generated.schema.metadataIngestion.workflow import ( + Source as WorkflowSource, +) +from metadata.ingestion.api.source import InvalidSourceException, SourceStatus from metadata.ingestion.source.sql_source import SQLSource -from metadata.ingestion.source.sql_source_common import SQLConnectionConfig from metadata.utils.sql_queries import ( REDSHIFT_GET_ALL_RELATION_INFO, REDSHIFT_GET_SCHEMA_COLUMN_INFO, @@ -427,45 +427,6 @@ from metadata.generated.schema.entity.services.connections.database.redshiftConn ) -class RedshiftConfig(RedshiftConnection, SQLConnectionConfig): - """ - Redshift config class - - Attributes: - scheme: - where_clause: - duration: - service_type: - """ - - where_clause: Optional[str] = None - duration: int = 1 - query = 'select * from "{}"."{}"' - - def get_identifier(self, schema: str, table: str) -> str: - """ - Get identifier - - Args: - schema: - table: - Returns: - str - """ - regular = f"{schema}.{table}" - if self.database: - return f"{self.database}.{regular}" - return regular - - def get_connection_url(self): - """ - Get connection url - - Returns: - """ - return super().get_connection_url() - - # pylint: disable=useless-super-delegation class RedshiftSource(SQLSource): """ @@ -489,7 +450,17 @@ class RedshiftSource(SQLSource): metadata_config: Returns: """ - config = RedshiftConfig.parse_obj(config_dict) + config: WorkflowSource = WorkflowSource.parse_obj(config_dict) + connection: RedshiftConnection = config.serviceConnection.__root__.config + if not isinstance(connection, RedshiftConnection): + raise InvalidSourceException( + f"Expected RedshiftConnection, but got {connection}" + ) + if ( + config.sourceConfig.config.sampleDataQuery + == WorkflowSource.sourceConfig.config.sampleDataQuery + ): + config.sourceConfig.config.sampleDataQuery = 'select * from "{}"."{}"' return cls(config, metadata_config) def get_status(self) -> SourceStatus: diff --git a/ingestion/src/metadata/utils/source_connections.py b/ingestion/src/metadata/utils/source_connections.py index 128e2218129..16bac3b9e49 100644 --- a/ingestion/src/metadata/utils/source_connections.py +++ b/ingestion/src/metadata/utils/source_connections.py @@ -26,6 +26,9 @@ from metadata.generated.schema.entity.services.connections.database.mssqlConnect from metadata.generated.schema.entity.services.connections.database.mysqlConnection import ( MysqlConnection, ) +from metadata.generated.schema.entity.services.connections.database.redshiftConnection import ( + RedshiftConnection, +) from metadata.generated.schema.entity.services.connections.database.sqliteConnection import ( SQLiteConnection, ) @@ -65,6 +68,7 @@ def get_connection_url(connection): ) +@get_connection_url.register(RedshiftConnection) @get_connection_url.register(MysqlConnection) @get_connection_url.register(ClickhouseConnection) def _(connection): diff --git a/ingestion/tests/unit/test_source_url.py b/ingestion/tests/unit/test_source_url.py index 144b3cd95c7..15b4ef56538 100644 --- a/ingestion/tests/unit/test_source_url.py +++ b/ingestion/tests/unit/test_source_url.py @@ -12,21 +12,49 @@ """ OpenMetadata source URL building tests """ +from unittest import TestCase from metadata.generated.schema.entity.services.connections.database.mysqlConnection import ( MysqlConnection, ) +from metadata.generated.schema.entity.services.connections.database.redshiftConnection import ( + RedshiftConnection, +) from metadata.utils.source_connections import get_connection_url -def test_mysql_url(): - """ - Validate MySQL URL building - """ - connection = MysqlConnection( - username="username", - password="password", - hostPort="localhost:1234", - ) - url = get_connection_url(connection) - assert url == "mysql+pymysql://username:password@localhost:1234" +class TestConfig(TestCase): + def test_mysql_url(self): + """ + Validate MySQL URL building + """ + connection = MysqlConnection( + username="username", + password="password", + hostPort="localhost:1234", + ) + url = get_connection_url(connection) + assert url == "mysql+pymysql://username:password@localhost:1234" + + def test_redshift_url(self): + """ + Validate Redshift URL building + """ + connection = RedshiftConnection( + username="username", + password="password", + hostPort="localhost:1234", + database="dev", + ) + url = get_connection_url(connection) + assert url == "redshift+psycopg2://username:password@localhost:1234/dev" + + def test_redshift_url_without_db(self): + """ + Validate Redshift without DB URL building + """ + connection_without_db = RedshiftConnection( + username="username", password="password", hostPort="localhost:1234" + ) + url_without_db = get_connection_url(connection_without_db) + assert url_without_db == "redshift+psycopg2://username:password@localhost:1234"