""" Test Cockroach using the topology """ import types from unittest import TestCase from unittest.mock import patch from sqlalchemy.types import VARCHAR from metadata.generated.schema.entity.data.database import Database from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema from metadata.generated.schema.entity.data.table import ( Column, Constraint, DataType, TableType, ) from metadata.generated.schema.entity.services.databaseService import ( DatabaseConnection, DatabaseService, DatabaseServiceType, ) from metadata.generated.schema.metadataIngestion.workflow import ( OpenMetadataWorkflowConfig, ) from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.source.database.cockroach.metadata import CockroachSource from metadata.ingestion.source.database.common_pg_mappings import ( GEOMETRY, POINT, POLYGON, ) mock_cockroach_config = { "source": { "type": "cockroach", "serviceName": "local_cockroach1", "serviceConnection": { "config": { "type": "Cockroach", "username": "username", "authType": { "password": "password", }, "hostPort": "localhost:26257", "database": "cockroach", } }, "sourceConfig": { "config": { "type": "DatabaseMetadata", } }, }, "sink": { "type": "metadata-rest", "config": {}, }, "workflowConfig": { "openMetadataServerConfig": { "hostPort": "http://localhost:8585/api", "authProvider": "openmetadata", "securityConfig": {"jwtToken": "cockroach"}, } }, } MOCK_DATABASE_SERVICE = DatabaseService( id="85811038-099a-11ed-861d-0242ac120002", name="cockroach_source", connection=DatabaseConnection(), serviceType=DatabaseServiceType.Cockroach, ) MOCK_DATABASE = Database( id="2aaa012e-099a-11ed-861d-0242ac120002", name="default", fullyQualifiedName="cockroach_source.default", displayName="default", description="", service=EntityReference( id="85811038-099a-11ed-861d-0242ac120002", type="databaseService", ), ) MOCK_DATABASE_SCHEMA = DatabaseSchema( id="2aaa012e-099a-11ed-861d-0242ac120056", name="default", fullyQualifiedName="cockroach_source.default.default", displayName="default", description="", database=EntityReference( id="2aaa012e-099a-11ed-861d-0242ac120002", type="database", ), service=EntityReference( id="2aaa012e-099a-11ed-861d-0242ac120002", type="database", ), ) MOCK_COLUMN_VALUE = [ { "name": "username", "type": VARCHAR(), "nullable": True, "default": None, "autoincrement": False, "system_data_type": "varchar(50)", "comment": None, }, { "name": "geom_c", "type": GEOMETRY(), "nullable": True, "default": None, "autoincrement": False, "system_data_type": "geometry", "comment": None, }, { "name": "point_c", "type": POINT(), "nullable": True, "default": None, "autoincrement": False, "system_data_type": "point", "comment": None, }, { "name": "polygon_c", "type": POLYGON(), "nullable": True, "default": None, "autoincrement": False, "comment": None, "system_data_type": "polygon", }, ] EXPECTED_COLUMN_VALUE = [ Column( name="username", displayName=None, dataType=DataType.VARCHAR, arrayDataType=None, dataLength=1, precision=None, scale=None, dataTypeDisplay="varchar(50)", description=None, fullyQualifiedName=None, tags=None, constraint=Constraint.NULL, ordinalPosition=None, jsonSchema=None, children=None, customMetrics=None, profile=None, ), Column( name="geom_c", displayName=None, dataType=DataType.GEOMETRY, arrayDataType=None, dataLength=1, precision=None, scale=None, dataTypeDisplay="geometry", description=None, fullyQualifiedName=None, tags=None, constraint=Constraint.NULL, ordinalPosition=None, jsonSchema=None, children=None, customMetrics=None, profile=None, ), Column( name="point_c", displayName=None, dataType=DataType.GEOMETRY, arrayDataType=None, dataLength=1, precision=None, scale=None, dataTypeDisplay="point", description=None, fullyQualifiedName=None, tags=None, constraint=Constraint.NULL, ordinalPosition=None, jsonSchema=None, children=None, customMetrics=None, profile=None, ), Column( name="polygon_c", displayName=None, dataType=DataType.GEOMETRY, arrayDataType=None, dataLength=1, precision=None, scale=None, dataTypeDisplay="polygon", description=None, fullyQualifiedName=None, tags=None, constraint=Constraint.NULL, ordinalPosition=None, jsonSchema=None, children=None, customMetrics=None, profile=None, ), ] class cockroachUnitTest(TestCase): @patch( "metadata.ingestion.source.database.common_db_source.CommonDbSourceService.test_connection" ) def __init__(self, methodName, test_connection) -> None: super().__init__(methodName) test_connection.return_value = False self.config = OpenMetadataWorkflowConfig.model_validate(mock_cockroach_config) self.cockroach_source = CockroachSource.create( mock_cockroach_config["source"], self.config.workflowConfig.openMetadataServerConfig, ) self.cockroach_source.context.get().__dict__[ "database_service" ] = MOCK_DATABASE_SERVICE.name.root self.cockroach_source.context.get().__dict__[ "database" ] = MOCK_DATABASE.name.root self.cockroach_source.context.get().__dict__[ "database_schema" ] = MOCK_DATABASE_SCHEMA.name.root def test_datatype(self): inspector = types.SimpleNamespace() inspector.get_columns = ( lambda table_name, schema_name, table_type, db_name: MOCK_COLUMN_VALUE ) inspector.get_pk_constraint = lambda table_name, schema_name: [] inspector.get_unique_constraints = lambda table_name, schema_name: [] inspector.get_foreign_keys = lambda table_name, schema_name: [] result, _, _ = self.cockroach_source.get_columns_and_constraints( "public", "user", "cockroach", inspector, TableType.Regular ) for i, _ in enumerate(EXPECTED_COLUMN_VALUE): self.assertEqual(result[i], EXPECTED_COLUMN_VALUE[i]) @patch("sqlalchemy.engine.base.Engine") @patch( "metadata.ingestion.source.database.common_db_source.CommonDbSourceService.connection" ) def test_close_connection(self, engine, connection): connection.return_value = True self.cockroach_source.close()