Refactor - Redshift Connection (#3920)

This commit is contained in:
Ayush Shah 2022-04-07 08:52:38 -07:00 committed by GitHub
parent 8a7fcf0e54
commit 7c3a6082a6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 82 additions and 71 deletions

View File

@ -1,27 +1,35 @@
{ {
"source": { "source": {
"type": "redshift", "type": "redshift",
"serviceName": "aws_redshift",
"serviceConnection": {
"config": { "config": {
"host_port": "cluster.name.region.redshift.amazonaws.com:5439", "hostPort": "cluster.name.region.redshift.amazonaws.com:5439",
"username": "username", "username": "username",
"password": "strong_password", "password": "strong_password",
"database": "dev", "database": "dev",
"service_name": "aws_redshift", "type": "Redshift"
"table_filter_pattern": { }
"excludes": ["information_schema.*", "[\\w]*event_vw.*"]
}, },
"query":"select * from \"{}\".\"{}\" Limit 50" "sourceConfig": {
"config": {
"schemaFilterPattern": {
"excludes": [
"information_schema.*",
"[\\w]*event_vw.*"
]
}
}
} }
}, },
"sink": { "sink": {
"type": "metadata-rest", "type": "metadata-rest",
"config": {} "config": {}
}, },
"metadata_server": { "workflowConfig": {
"type": "metadata-server", "openMetadataServerConfig": {
"config": { "hostPort": "http://localhost:8585/api",
"api_endpoint": "http://localhost:8585/api", "authProvider": "no-auth"
"auth_provider_type": "no-auth"
} }
} }
} }

View File

@ -12,11 +12,9 @@
Redshift source ingestion Redshift source ingestion
""" """
import logging import logging
import re import re
from collections import defaultdict from collections import defaultdict
from typing import Optional
import sqlalchemy as sa import sqlalchemy as sa
from packaging.version import Version from packaging.version import Version
@ -31,9 +29,11 @@ from sqlalchemy_redshift.dialect import RedshiftDialectMixin, RelationKey
from metadata.generated.schema.metadataIngestion.workflow import ( from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataServerConfig, OpenMetadataServerConfig,
) )
from metadata.ingestion.api.source import SourceStatus from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.ingestion.api.source import InvalidSourceException, SourceStatus
from metadata.ingestion.source.sql_source import SQLSource from metadata.ingestion.source.sql_source import SQLSource
from metadata.ingestion.source.sql_source_common import SQLConnectionConfig
from metadata.utils.sql_queries import ( from metadata.utils.sql_queries import (
REDSHIFT_GET_ALL_RELATION_INFO, REDSHIFT_GET_ALL_RELATION_INFO,
REDSHIFT_GET_SCHEMA_COLUMN_INFO, REDSHIFT_GET_SCHEMA_COLUMN_INFO,
@ -427,45 +427,6 @@ from metadata.generated.schema.entity.services.connections.database.redshiftConn
) )
class RedshiftConfig(RedshiftConnection, SQLConnectionConfig):
"""
Redshift config class
Attributes:
scheme:
where_clause:
duration:
service_type:
"""
where_clause: Optional[str] = None
duration: int = 1
query = 'select * from "{}"."{}"'
def get_identifier(self, schema: str, table: str) -> str:
"""
Get identifier
Args:
schema:
table:
Returns:
str
"""
regular = f"{schema}.{table}"
if self.database:
return f"{self.database}.{regular}"
return regular
def get_connection_url(self):
"""
Get connection url
Returns:
"""
return super().get_connection_url()
# pylint: disable=useless-super-delegation # pylint: disable=useless-super-delegation
class RedshiftSource(SQLSource): class RedshiftSource(SQLSource):
""" """
@ -489,7 +450,17 @@ class RedshiftSource(SQLSource):
metadata_config: metadata_config:
Returns: Returns:
""" """
config = RedshiftConfig.parse_obj(config_dict) config: WorkflowSource = WorkflowSource.parse_obj(config_dict)
connection: RedshiftConnection = config.serviceConnection.__root__.config
if not isinstance(connection, RedshiftConnection):
raise InvalidSourceException(
f"Expected RedshiftConnection, but got {connection}"
)
if (
config.sourceConfig.config.sampleDataQuery
== WorkflowSource.sourceConfig.config.sampleDataQuery
):
config.sourceConfig.config.sampleDataQuery = 'select * from "{}"."{}"'
return cls(config, metadata_config) return cls(config, metadata_config)
def get_status(self) -> SourceStatus: def get_status(self) -> SourceStatus:

View File

@ -26,6 +26,9 @@ from metadata.generated.schema.entity.services.connections.database.mssqlConnect
from metadata.generated.schema.entity.services.connections.database.mysqlConnection import ( from metadata.generated.schema.entity.services.connections.database.mysqlConnection import (
MysqlConnection, MysqlConnection,
) )
from metadata.generated.schema.entity.services.connections.database.redshiftConnection import (
RedshiftConnection,
)
from metadata.generated.schema.entity.services.connections.database.sqliteConnection import ( from metadata.generated.schema.entity.services.connections.database.sqliteConnection import (
SQLiteConnection, SQLiteConnection,
) )
@ -65,6 +68,7 @@ def get_connection_url(connection):
) )
@get_connection_url.register(RedshiftConnection)
@get_connection_url.register(MysqlConnection) @get_connection_url.register(MysqlConnection)
@get_connection_url.register(ClickhouseConnection) @get_connection_url.register(ClickhouseConnection)
def _(connection): def _(connection):

View File

@ -12,14 +12,19 @@
""" """
OpenMetadata source URL building tests OpenMetadata source URL building tests
""" """
from unittest import TestCase
from metadata.generated.schema.entity.services.connections.database.mysqlConnection import ( from metadata.generated.schema.entity.services.connections.database.mysqlConnection import (
MysqlConnection, MysqlConnection,
) )
from metadata.generated.schema.entity.services.connections.database.redshiftConnection import (
RedshiftConnection,
)
from metadata.utils.source_connections import get_connection_url from metadata.utils.source_connections import get_connection_url
def test_mysql_url(): class TestConfig(TestCase):
def test_mysql_url(self):
""" """
Validate MySQL URL building Validate MySQL URL building
""" """
@ -30,3 +35,26 @@ def test_mysql_url():
) )
url = get_connection_url(connection) url = get_connection_url(connection)
assert url == "mysql+pymysql://username:password@localhost:1234" assert url == "mysql+pymysql://username:password@localhost:1234"
def test_redshift_url(self):
"""
Validate Redshift URL building
"""
connection = RedshiftConnection(
username="username",
password="password",
hostPort="localhost:1234",
database="dev",
)
url = get_connection_url(connection)
assert url == "redshift+psycopg2://username:password@localhost:1234/dev"
def test_redshift_url_without_db(self):
"""
Validate Redshift without DB URL building
"""
connection_without_db = RedshiftConnection(
username="username", password="password", hostPort="localhost:1234"
)
url_without_db = get_connection_url(connection_without_db)
assert url_without_db == "redshift+psycopg2://username:password@localhost:1234"