Add: Unit Test source for multiple conn (#4845)

Add: Unit Test source for multiple conn (#4845)
This commit is contained in:
Milan Bariya 2022-05-12 18:01:09 +05:30 committed by GitHub
parent c419b07ab5
commit d1f21911f5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -12,10 +12,18 @@
from unittest import TestCase
from metadata.generated.schema.entity.services.connections.database.clickhouseConnection import (
ClickhouseConnection,
ClickhouseScheme,
)
from metadata.generated.schema.entity.services.connections.database.databricksConnection import (
DatabricksConnection,
DatabricksScheme,
)
from metadata.generated.schema.entity.services.connections.database.db2Connection import (
Db2Connection,
Db2Scheme,
)
from metadata.generated.schema.entity.services.connections.database.druidConnection import (
DruidConnection,
DruidScheme,
@ -24,10 +32,34 @@ from metadata.generated.schema.entity.services.connections.database.hiveConnecti
HiveConnection,
HiveScheme,
)
from metadata.generated.schema.entity.services.connections.database.mariaDBConnection import (
MariaDBConnection,
MariaDBScheme,
)
from metadata.generated.schema.entity.services.connections.database.mysqlConnection import (
MysqlConnection,
MySQLScheme,
)
from metadata.generated.schema.entity.services.connections.database.pinotDBConnection import (
PinotDBConnection,
PinotDBScheme,
)
from metadata.generated.schema.entity.services.connections.database.postgresConnection import (
PostgresConnection,
PostgresScheme,
)
from metadata.generated.schema.entity.services.connections.database.redshiftConnection import (
RedshiftConnection,
RedshiftScheme,
)
from metadata.generated.schema.entity.services.connections.database.singleStoreConnection import (
SingleStoreConnection,
SingleStoreScheme,
)
from metadata.generated.schema.entity.services.connections.database.snowflakeConnection import (
SnowflakeConnection,
SnowflakeScheme,
)
from metadata.generated.schema.entity.services.connections.database.trinoConnection import (
TrinoConnection,
TrinoScheme,
@ -209,3 +241,365 @@ class SouceConnectionTest(TestCase):
pinotControllerHost="http://localhost:9000/",
)
assert expected_url == get_connection_url(pinot_conn_obj)
def test_mysql_url(self):
# connection arguments without db
expected_url = "mysql+pymysql://openmetadata_user:@localhost:3306"
mysql_conn_obj = MysqlConnection(
username="openmetadata_user",
hostPort="localhost:3306",
scheme=MySQLScheme.mysql_pymysql,
database=None,
)
assert expected_url == get_connection_url(mysql_conn_obj)
# connection arguments with db
expected_url = "mysql+pymysql://openmetadata_user:@localhost:3306/default"
mysql_conn_obj = MysqlConnection(
username="openmetadata_user",
hostPort="localhost:3306",
scheme=MySQLScheme.mysql_pymysql,
database="default",
)
assert expected_url == get_connection_url(mysql_conn_obj)
def test_clickhouse_url(self):
# connection arguments without db
expected_url = "clickhouse+http://username:@localhost:8123"
clickhouse_conn_obj = ClickhouseConnection(
username="username",
hostPort="localhost:8123",
scheme=ClickhouseScheme.clickhouse_http,
database=None,
)
assert expected_url == get_connection_url(clickhouse_conn_obj)
# connection arguments with db
expected_url = "clickhouse+http://username:@localhost:8123/default"
clickhouse_conn_obj = ClickhouseConnection(
username="username",
hostPort="localhost:8123",
scheme=ClickhouseScheme.clickhouse_http,
database="default",
)
assert expected_url == get_connection_url(clickhouse_conn_obj)
def test_mariadb_url(self):
# connection arguments without db
expected_url = "mysql+pymysql://openmetadata_user:@localhost:3306"
mariadb_conn_obj = MariaDBConnection(
username="openmetadata_user",
hostPort="localhost:3306",
scheme=MariaDBScheme.mysql_pymysql,
database=None,
)
assert expected_url == get_connection_url(mariadb_conn_obj)
# connection arguments with db
expected_url = "mysql+pymysql://openmetadata_user:@localhost:3306/default"
mariadb_conn_obj = MariaDBConnection(
username="openmetadata_user",
hostPort="localhost:3306",
scheme=MariaDBScheme.mysql_pymysql,
database="default",
)
assert expected_url == get_connection_url(mariadb_conn_obj)
def test_postgres_url(self):
# connection arguments without db
expected_url = "postgresql+psycopg2://openmetadata_user:@localhost:5432"
postgres_conn_obj = PostgresConnection(
username="openmetadata_user",
hostPort="localhost:5432",
scheme=PostgresScheme.postgresql_psycopg2,
database=None,
)
assert expected_url == get_connection_url(postgres_conn_obj)
# connection arguments witho db
expected_url = "postgresql+psycopg2://openmetadata_user:@localhost:5432/default"
postgres_conn_obj = PostgresConnection(
username="openmetadata_user",
hostPort="localhost:5432",
scheme=PostgresScheme.postgresql_psycopg2,
database="default",
)
assert expected_url == get_connection_url(postgres_conn_obj)
def test_redshift_url(self):
# connection arguments witho db
expected_url = "redshift+psycopg2://username:strong_password@cluster.name.region.redshift.amazonaws.com:5439/dev"
redshift_conn_obj = RedshiftConnection(
username="username",
password="strong_password",
hostPort="cluster.name.region.redshift.amazonaws.com:5439",
scheme=RedshiftScheme.redshift_psycopg2,
database="dev",
)
assert expected_url == get_connection_url(redshift_conn_obj)
def test_singleStore_url(self):
# connection arguments without db
expected_url = "mysql+pymysql://openmetadata_user:@localhost:5432"
singleStore_conn_obj = SingleStoreConnection(
username="openmetadata_user",
hostPort="localhost:5432",
scheme=SingleStoreScheme.mysql_pymysql,
database=None,
)
assert expected_url == get_connection_url(singleStore_conn_obj)
# connection arguments with db
expected_url = "mysql+pymysql://openmetadata_user:@localhost:5432/default"
singleStore_conn_obj = SingleStoreConnection(
username="openmetadata_user",
hostPort="localhost:5432",
scheme=SingleStoreScheme.mysql_pymysql,
database="default",
)
assert expected_url == get_connection_url(singleStore_conn_obj)
def test_db2_url(self):
# connection arguments without db
expected_url = "db2+ibm_db://openmetadata_user:@localhost:50000"
db2_conn_obj = Db2Connection(
scheme=Db2Scheme.db2_ibm_db,
username="openmetadata_user",
hostPort="localhost:50000",
database=None,
)
assert expected_url == get_connection_url(db2_conn_obj)
# connection arguments with db
expected_url = "db2+ibm_db://openmetadata_user:@localhost:50000/default"
db2_conn_obj = Db2Connection(
username="openmetadata_user",
hostPort="localhost:50000",
scheme=Db2Scheme.db2_ibm_db,
database="default",
)
assert expected_url == get_connection_url(db2_conn_obj)
def test_snowflake_url(self):
# connection arguments without db
expected_url = "snowflake://coding:Abhi@ue18849.us-east-2.aws?account=ue18849.us-east-2.aws&warehouse=COMPUTE_WH"
snowflake_conn_obj = SnowflakeConnection(
scheme=SnowflakeScheme.snowflake,
username="coding",
password="Abhi",
warehouse="COMPUTE_WH",
account="ue18849.us-east-2.aws",
)
assert expected_url == get_connection_url(snowflake_conn_obj)
# connection arguments with db
expected_url = "snowflake://coding:Abhi@ue18849.us-east-2.aws/testdb?account=ue18849.us-east-2.aws&warehouse=COMPUTE_WH"
snowflake_conn_obj = SnowflakeConnection(
scheme=SnowflakeScheme.snowflake,
username="coding",
password="Abhi",
database="testdb",
warehouse="COMPUTE_WH",
account="ue18849.us-east-2.aws",
)
assert expected_url == get_connection_url(snowflake_conn_obj)
def test_mysql_conn_arguments(self):
# connection arguments without connectionArguments
expected_args = {}
mysql_conn_obj = MysqlConnection(
username="user",
password=None,
hostPort="localhost:443",
database=None,
connectionArguments=None,
scheme=MySQLScheme.mysql_pymysql,
)
assert expected_args == get_connection_args(mysql_conn_obj)
# connection arguments with connectionArguments
expected_args = {"user": "user-to-be-impersonated"}
mysql_conn_obj = MysqlConnection(
username="user",
password=None,
hostPort="localhost:443",
database="tiny",
connectionArguments={"user": "user-to-be-impersonated"},
scheme=MySQLScheme.mysql_pymysql,
)
assert expected_args == get_connection_args(mysql_conn_obj)
def test_clickhouse_conn_arguments(self):
# connection arguments without connectionArguments
expected_args = {}
clickhouse_conn_obj = ClickhouseConnection(
username="user",
password=None,
hostPort="localhost:443",
database=None,
connectionArguments=None,
scheme=ClickhouseScheme.clickhouse_http,
)
assert expected_args == get_connection_args(clickhouse_conn_obj)
# connection arguments with connectionArguments
expected_args = {"user": "user-to-be-impersonated"}
clickhouse_conn_obj = ClickhouseConnection(
username="user",
password=None,
hostPort="localhost:443",
database="tiny",
connectionArguments={"user": "user-to-be-impersonated"},
scheme=ClickhouseScheme.clickhouse_http,
)
assert expected_args == get_connection_args(clickhouse_conn_obj)
def test_mariadb_conn_arguments(self):
# connection arguments without connectionArguments
expected_args = {}
mariadb_conn_obj = MariaDBConnection(
username="user",
password=None,
hostPort="localhost:443",
database=None,
connectionArguments=None,
scheme=MariaDBScheme.mysql_pymysql,
)
assert expected_args == get_connection_args(mariadb_conn_obj)
# connection arguments with connectionArguments
expected_args = {"user": "user-to-be-impersonated"}
mariadb_conn_obj = MariaDBConnection(
username="user",
password=None,
hostPort="localhost:443",
database="tiny",
connectionArguments={"user": "user-to-be-impersonated"},
scheme=MariaDBScheme.mysql_pymysql,
)
assert expected_args == get_connection_args(mariadb_conn_obj)
def test_postgres_conn_arguments(self):
# connection arguments without connectionArguments
expected_args = {}
postgres_conn_obj = PostgresConnection(
username="user",
password=None,
hostPort="localhost:443",
database=None,
connectionArguments=None,
scheme=PostgresScheme.postgresql_psycopg2,
)
assert expected_args == get_connection_args(postgres_conn_obj)
# connection arguments with connectionArguments
expected_args = {"user": "user-to-be-impersonated"}
postgres_conn_obj = PostgresConnection(
username="user",
password=None,
hostPort="localhost:443",
database="tiny",
connectionArguments={"user": "user-to-be-impersonated"},
scheme=PostgresScheme.postgresql_psycopg2,
)
assert expected_args == get_connection_args(postgres_conn_obj)
def test_redshift_conn_arguments(self):
# connection arguments without connectionArguments
expected_args = {}
redshift_conn_obj = RedshiftConnection(
username="user",
password=None,
hostPort="localhost:443",
database="tiny",
connectionArguments=None,
scheme=RedshiftScheme.redshift_psycopg2,
)
assert expected_args == get_connection_args(redshift_conn_obj)
# connection arguments with connectionArguments
expected_args = {"user": "user-to-be-impersonated"}
redshift_conn_obj = RedshiftConnection(
username="user",
password=None,
hostPort="localhost:443",
database="tiny",
connectionArguments={"user": "user-to-be-impersonated"},
scheme=RedshiftScheme.redshift_psycopg2,
)
assert expected_args == get_connection_args(redshift_conn_obj)
def test_singleStore_conn_arguments(self):
# connection arguments without connectionArguments
expected_args = {}
singleStore_conn_obj = SingleStoreConnection(
username="user",
password=None,
hostPort="localhost:443",
database="tiny",
connectionArguments=None,
scheme=SingleStoreScheme.mysql_pymysql,
)
assert expected_args == get_connection_args(singleStore_conn_obj)
# connection arguments with connectionArguments
expected_args = {"user": "user-to-be-impersonated"}
singleStore_conn_obj = SingleStoreConnection(
username="user",
password=None,
hostPort="localhost:443",
database="tiny",
connectionArguments={"user": "user-to-be-impersonated"},
scheme=SingleStoreScheme.mysql_pymysql,
)
assert expected_args == get_connection_args(singleStore_conn_obj)
def test_db2_conn_arguments(self):
# connection arguments without connectionArguments
expected_args = {}
db2_conn_obj = Db2Connection(
username="user",
password=None,
hostPort="localhost:443",
database="tiny",
connectionArguments=None,
scheme=Db2Scheme.db2_ibm_db,
)
assert expected_args == get_connection_args(db2_conn_obj)
# connection arguments with connectionArguments
expected_args = {"user": "user-to-be-impersonated"}
db2_conn_obj = Db2Connection(
username="user",
password=None,
hostPort="localhost:443",
database="tiny",
connectionArguments={"user": "user-to-be-impersonated"},
scheme=Db2Scheme.db2_ibm_db,
)
assert expected_args == get_connection_args(db2_conn_obj)
def test_snowflake_conn_arguments(self):
# connection arguments without connectionArguments
expected_args = {}
snowflake_conn_obj = SnowflakeConnection(
username="user",
password=None,
database="tiny",
connectionArguments=None,
scheme=SnowflakeScheme.snowflake,
account="account.region_name.cloud_service",
)
assert expected_args == get_connection_args(snowflake_conn_obj)
# connection arguments with connectionArguments
expected_args = {"user": "user-to-be-impersonated"}
snowflake_conn_obj = SnowflakeConnection(
username="user",
password=None,
database="tiny",
connectionArguments={"user": "user-to-be-impersonated"},
scheme=SnowflakeScheme.snowflake,
account="account.region_name.cloud_service",
)
assert expected_args == get_connection_args(snowflake_conn_obj)