diff --git a/ingestion/tests/unit/test_source_connection.py b/ingestion/tests/unit/test_source_connection.py index 3ce23f4c9b5..d6e5374bf4b 100644 --- a/ingestion/tests/unit/test_source_connection.py +++ b/ingestion/tests/unit/test_source_connection.py @@ -12,10 +12,18 @@ from unittest import TestCase +from metadata.generated.schema.entity.services.connections.database.clickhouseConnection import ( + ClickhouseConnection, + ClickhouseScheme, +) from metadata.generated.schema.entity.services.connections.database.databricksConnection import ( DatabricksConnection, DatabricksScheme, ) +from metadata.generated.schema.entity.services.connections.database.db2Connection import ( + Db2Connection, + Db2Scheme, +) from metadata.generated.schema.entity.services.connections.database.druidConnection import ( DruidConnection, DruidScheme, @@ -24,10 +32,34 @@ from metadata.generated.schema.entity.services.connections.database.hiveConnecti HiveConnection, HiveScheme, ) +from metadata.generated.schema.entity.services.connections.database.mariaDBConnection import ( + MariaDBConnection, + MariaDBScheme, +) +from metadata.generated.schema.entity.services.connections.database.mysqlConnection import ( + MysqlConnection, + MySQLScheme, +) from metadata.generated.schema.entity.services.connections.database.pinotDBConnection import ( PinotDBConnection, PinotDBScheme, ) +from metadata.generated.schema.entity.services.connections.database.postgresConnection import ( + PostgresConnection, + PostgresScheme, +) +from metadata.generated.schema.entity.services.connections.database.redshiftConnection import ( + RedshiftConnection, + RedshiftScheme, +) +from metadata.generated.schema.entity.services.connections.database.singleStoreConnection import ( + SingleStoreConnection, + SingleStoreScheme, +) +from metadata.generated.schema.entity.services.connections.database.snowflakeConnection import ( + SnowflakeConnection, + SnowflakeScheme, +) from metadata.generated.schema.entity.services.connections.database.trinoConnection import ( TrinoConnection, TrinoScheme, @@ -209,3 +241,365 @@ class SouceConnectionTest(TestCase): pinotControllerHost="http://localhost:9000/", ) assert expected_url == get_connection_url(pinot_conn_obj) + + def test_mysql_url(self): + # connection arguments without db + expected_url = "mysql+pymysql://openmetadata_user:@localhost:3306" + mysql_conn_obj = MysqlConnection( + username="openmetadata_user", + hostPort="localhost:3306", + scheme=MySQLScheme.mysql_pymysql, + database=None, + ) + assert expected_url == get_connection_url(mysql_conn_obj) + + # connection arguments with db + expected_url = "mysql+pymysql://openmetadata_user:@localhost:3306/default" + mysql_conn_obj = MysqlConnection( + username="openmetadata_user", + hostPort="localhost:3306", + scheme=MySQLScheme.mysql_pymysql, + database="default", + ) + assert expected_url == get_connection_url(mysql_conn_obj) + + def test_clickhouse_url(self): + # connection arguments without db + expected_url = "clickhouse+http://username:@localhost:8123" + clickhouse_conn_obj = ClickhouseConnection( + username="username", + hostPort="localhost:8123", + scheme=ClickhouseScheme.clickhouse_http, + database=None, + ) + assert expected_url == get_connection_url(clickhouse_conn_obj) + + # connection arguments with db + expected_url = "clickhouse+http://username:@localhost:8123/default" + clickhouse_conn_obj = ClickhouseConnection( + username="username", + hostPort="localhost:8123", + scheme=ClickhouseScheme.clickhouse_http, + database="default", + ) + assert expected_url == get_connection_url(clickhouse_conn_obj) + + def test_mariadb_url(self): + # connection arguments without db + expected_url = "mysql+pymysql://openmetadata_user:@localhost:3306" + mariadb_conn_obj = MariaDBConnection( + username="openmetadata_user", + hostPort="localhost:3306", + scheme=MariaDBScheme.mysql_pymysql, + database=None, + ) + assert expected_url == get_connection_url(mariadb_conn_obj) + + # connection arguments with db + expected_url = "mysql+pymysql://openmetadata_user:@localhost:3306/default" + mariadb_conn_obj = MariaDBConnection( + username="openmetadata_user", + hostPort="localhost:3306", + scheme=MariaDBScheme.mysql_pymysql, + database="default", + ) + assert expected_url == get_connection_url(mariadb_conn_obj) + + def test_postgres_url(self): + # connection arguments without db + expected_url = "postgresql+psycopg2://openmetadata_user:@localhost:5432" + postgres_conn_obj = PostgresConnection( + username="openmetadata_user", + hostPort="localhost:5432", + scheme=PostgresScheme.postgresql_psycopg2, + database=None, + ) + assert expected_url == get_connection_url(postgres_conn_obj) + + # connection arguments witho db + expected_url = "postgresql+psycopg2://openmetadata_user:@localhost:5432/default" + postgres_conn_obj = PostgresConnection( + username="openmetadata_user", + hostPort="localhost:5432", + scheme=PostgresScheme.postgresql_psycopg2, + database="default", + ) + assert expected_url == get_connection_url(postgres_conn_obj) + + def test_redshift_url(self): + # connection arguments witho db + expected_url = "redshift+psycopg2://username:strong_password@cluster.name.region.redshift.amazonaws.com:5439/dev" + redshift_conn_obj = RedshiftConnection( + username="username", + password="strong_password", + hostPort="cluster.name.region.redshift.amazonaws.com:5439", + scheme=RedshiftScheme.redshift_psycopg2, + database="dev", + ) + assert expected_url == get_connection_url(redshift_conn_obj) + + def test_singleStore_url(self): + # connection arguments without db + expected_url = "mysql+pymysql://openmetadata_user:@localhost:5432" + singleStore_conn_obj = SingleStoreConnection( + username="openmetadata_user", + hostPort="localhost:5432", + scheme=SingleStoreScheme.mysql_pymysql, + database=None, + ) + assert expected_url == get_connection_url(singleStore_conn_obj) + + # connection arguments with db + expected_url = "mysql+pymysql://openmetadata_user:@localhost:5432/default" + singleStore_conn_obj = SingleStoreConnection( + username="openmetadata_user", + hostPort="localhost:5432", + scheme=SingleStoreScheme.mysql_pymysql, + database="default", + ) + assert expected_url == get_connection_url(singleStore_conn_obj) + + def test_db2_url(self): + # connection arguments without db + expected_url = "db2+ibm_db://openmetadata_user:@localhost:50000" + db2_conn_obj = Db2Connection( + scheme=Db2Scheme.db2_ibm_db, + username="openmetadata_user", + hostPort="localhost:50000", + database=None, + ) + assert expected_url == get_connection_url(db2_conn_obj) + + # connection arguments with db + expected_url = "db2+ibm_db://openmetadata_user:@localhost:50000/default" + db2_conn_obj = Db2Connection( + username="openmetadata_user", + hostPort="localhost:50000", + scheme=Db2Scheme.db2_ibm_db, + database="default", + ) + assert expected_url == get_connection_url(db2_conn_obj) + + def test_snowflake_url(self): + # connection arguments without db + expected_url = "snowflake://coding:Abhi@ue18849.us-east-2.aws?account=ue18849.us-east-2.aws&warehouse=COMPUTE_WH" + snowflake_conn_obj = SnowflakeConnection( + scheme=SnowflakeScheme.snowflake, + username="coding", + password="Abhi", + warehouse="COMPUTE_WH", + account="ue18849.us-east-2.aws", + ) + assert expected_url == get_connection_url(snowflake_conn_obj) + + # connection arguments with db + expected_url = "snowflake://coding:Abhi@ue18849.us-east-2.aws/testdb?account=ue18849.us-east-2.aws&warehouse=COMPUTE_WH" + snowflake_conn_obj = SnowflakeConnection( + scheme=SnowflakeScheme.snowflake, + username="coding", + password="Abhi", + database="testdb", + warehouse="COMPUTE_WH", + account="ue18849.us-east-2.aws", + ) + assert expected_url == get_connection_url(snowflake_conn_obj) + + def test_mysql_conn_arguments(self): + # connection arguments without connectionArguments + expected_args = {} + mysql_conn_obj = MysqlConnection( + username="user", + password=None, + hostPort="localhost:443", + database=None, + connectionArguments=None, + scheme=MySQLScheme.mysql_pymysql, + ) + assert expected_args == get_connection_args(mysql_conn_obj) + + # connection arguments with connectionArguments + expected_args = {"user": "user-to-be-impersonated"} + mysql_conn_obj = MysqlConnection( + username="user", + password=None, + hostPort="localhost:443", + database="tiny", + connectionArguments={"user": "user-to-be-impersonated"}, + scheme=MySQLScheme.mysql_pymysql, + ) + assert expected_args == get_connection_args(mysql_conn_obj) + + def test_clickhouse_conn_arguments(self): + # connection arguments without connectionArguments + expected_args = {} + clickhouse_conn_obj = ClickhouseConnection( + username="user", + password=None, + hostPort="localhost:443", + database=None, + connectionArguments=None, + scheme=ClickhouseScheme.clickhouse_http, + ) + assert expected_args == get_connection_args(clickhouse_conn_obj) + + # connection arguments with connectionArguments + expected_args = {"user": "user-to-be-impersonated"} + clickhouse_conn_obj = ClickhouseConnection( + username="user", + password=None, + hostPort="localhost:443", + database="tiny", + connectionArguments={"user": "user-to-be-impersonated"}, + scheme=ClickhouseScheme.clickhouse_http, + ) + assert expected_args == get_connection_args(clickhouse_conn_obj) + + def test_mariadb_conn_arguments(self): + # connection arguments without connectionArguments + expected_args = {} + mariadb_conn_obj = MariaDBConnection( + username="user", + password=None, + hostPort="localhost:443", + database=None, + connectionArguments=None, + scheme=MariaDBScheme.mysql_pymysql, + ) + assert expected_args == get_connection_args(mariadb_conn_obj) + + # connection arguments with connectionArguments + expected_args = {"user": "user-to-be-impersonated"} + mariadb_conn_obj = MariaDBConnection( + username="user", + password=None, + hostPort="localhost:443", + database="tiny", + connectionArguments={"user": "user-to-be-impersonated"}, + scheme=MariaDBScheme.mysql_pymysql, + ) + assert expected_args == get_connection_args(mariadb_conn_obj) + + def test_postgres_conn_arguments(self): + # connection arguments without connectionArguments + expected_args = {} + postgres_conn_obj = PostgresConnection( + username="user", + password=None, + hostPort="localhost:443", + database=None, + connectionArguments=None, + scheme=PostgresScheme.postgresql_psycopg2, + ) + assert expected_args == get_connection_args(postgres_conn_obj) + + # connection arguments with connectionArguments + expected_args = {"user": "user-to-be-impersonated"} + postgres_conn_obj = PostgresConnection( + username="user", + password=None, + hostPort="localhost:443", + database="tiny", + connectionArguments={"user": "user-to-be-impersonated"}, + scheme=PostgresScheme.postgresql_psycopg2, + ) + assert expected_args == get_connection_args(postgres_conn_obj) + + def test_redshift_conn_arguments(self): + # connection arguments without connectionArguments + expected_args = {} + redshift_conn_obj = RedshiftConnection( + username="user", + password=None, + hostPort="localhost:443", + database="tiny", + connectionArguments=None, + scheme=RedshiftScheme.redshift_psycopg2, + ) + assert expected_args == get_connection_args(redshift_conn_obj) + + # connection arguments with connectionArguments + expected_args = {"user": "user-to-be-impersonated"} + redshift_conn_obj = RedshiftConnection( + username="user", + password=None, + hostPort="localhost:443", + database="tiny", + connectionArguments={"user": "user-to-be-impersonated"}, + scheme=RedshiftScheme.redshift_psycopg2, + ) + assert expected_args == get_connection_args(redshift_conn_obj) + + def test_singleStore_conn_arguments(self): + # connection arguments without connectionArguments + expected_args = {} + singleStore_conn_obj = SingleStoreConnection( + username="user", + password=None, + hostPort="localhost:443", + database="tiny", + connectionArguments=None, + scheme=SingleStoreScheme.mysql_pymysql, + ) + assert expected_args == get_connection_args(singleStore_conn_obj) + + # connection arguments with connectionArguments + expected_args = {"user": "user-to-be-impersonated"} + singleStore_conn_obj = SingleStoreConnection( + username="user", + password=None, + hostPort="localhost:443", + database="tiny", + connectionArguments={"user": "user-to-be-impersonated"}, + scheme=SingleStoreScheme.mysql_pymysql, + ) + assert expected_args == get_connection_args(singleStore_conn_obj) + + def test_db2_conn_arguments(self): + # connection arguments without connectionArguments + expected_args = {} + db2_conn_obj = Db2Connection( + username="user", + password=None, + hostPort="localhost:443", + database="tiny", + connectionArguments=None, + scheme=Db2Scheme.db2_ibm_db, + ) + assert expected_args == get_connection_args(db2_conn_obj) + + # connection arguments with connectionArguments + expected_args = {"user": "user-to-be-impersonated"} + db2_conn_obj = Db2Connection( + username="user", + password=None, + hostPort="localhost:443", + database="tiny", + connectionArguments={"user": "user-to-be-impersonated"}, + scheme=Db2Scheme.db2_ibm_db, + ) + assert expected_args == get_connection_args(db2_conn_obj) + + def test_snowflake_conn_arguments(self): + # connection arguments without connectionArguments + expected_args = {} + snowflake_conn_obj = SnowflakeConnection( + username="user", + password=None, + database="tiny", + connectionArguments=None, + scheme=SnowflakeScheme.snowflake, + account="account.region_name.cloud_service", + ) + assert expected_args == get_connection_args(snowflake_conn_obj) + + # connection arguments with connectionArguments + expected_args = {"user": "user-to-be-impersonated"} + snowflake_conn_obj = SnowflakeConnection( + username="user", + password=None, + database="tiny", + connectionArguments={"user": "user-to-be-impersonated"}, + scheme=SnowflakeScheme.snowflake, + account="account.region_name.cloud_service", + ) + assert expected_args == get_connection_args(snowflake_conn_obj)