Fixed: PostGIS geometry type (#7382)

Fixed: PostGIS geometry type (#7382)
This commit is contained in:
NiharDoshi99 2022-09-15 14:05:15 +05:30 committed by GitHub
parent 3f561448cb
commit 5eb45948bd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 268 additions and 8 deletions

View File

@ -45,7 +45,6 @@ class OMetaServiceMixin:
""" """
create_entity_class = self.get_create_entity_type(entity=entity) create_entity_class = self.get_create_entity_type(entity=entity)
print(create_entity_class)
return create_entity_class( return create_entity_class(
name=config.serviceName, name=config.serviceName,
serviceType=config.serviceConnection.__root__.config.type.value, serviceType=config.serviceConnection.__root__.config.type.value,

View File

@ -92,6 +92,7 @@ class ColumnTypeParser:
"FLOAT64": "DOUBLE", "FLOAT64": "DOUBLE",
"FLOAT8": "DOUBLE", "FLOAT8": "DOUBLE",
"GEOGRAPHY": "GEOGRAPHY", "GEOGRAPHY": "GEOGRAPHY",
"GEOMETRY": "GEOMETRY",
"HYPERLOGLOG": "BINARY", "HYPERLOGLOG": "BINARY",
"IMAGE": "BINARY", "IMAGE": "BINARY",
"INT": "INT", "INT": "INT",
@ -169,6 +170,8 @@ class ColumnTypeParser:
"XML": "BINARY", "XML": "BINARY",
"XMLTYPE": "BINARY", "XMLTYPE": "BINARY",
"UUID": "UUID", "UUID": "UUID",
"POINT": "POINT",
"POLYGON": "POLYGON",
} }
_COMPLEX_TYPE = re.compile("^(struct|map|array|uniontype)") _COMPLEX_TYPE = re.compile("^(struct|map|array|uniontype)")

View File

@ -13,10 +13,11 @@ from collections import namedtuple
from typing import Iterable, Tuple from typing import Iterable, Tuple
from sqlalchemy import sql from sqlalchemy import sql
from sqlalchemy.dialects.postgresql.base import PGDialect from sqlalchemy.dialects.postgresql.base import PGDialect, ischema_names
from sqlalchemy.engine import reflection from sqlalchemy.engine import reflection
from sqlalchemy.engine.reflection import Inspector from sqlalchemy.engine.reflection import Inspector
from sqlalchemy.sql import sqltypes from sqlalchemy.sql import sqltypes
from sqlalchemy.sql.sqltypes import String
from metadata.generated.schema.entity.data.table import IntervalType, TablePartition from metadata.generated.schema.entity.data.table import IntervalType, TablePartition
from metadata.generated.schema.entity.services.connections.database.postgresConnection import ( from metadata.generated.schema.entity.services.connections.database.postgresConnection import (
@ -49,6 +50,24 @@ INTERVAL_TYPE_MAP = {
} }
class GEOMETRY(String):
"""The SQL GEOMETRY type."""
__visit_name__ = "GEOMETRY"
class POINT(String):
"""The SQL POINT type."""
__visit_name__ = "POINT"
class POLYGON(String):
"""The SQL GEOMETRY type."""
__visit_name__ = "POLYGON"
@reflection.cache @reflection.cache
def get_table_names(self, connection, schema=None, **kw): def get_table_names(self, connection, schema=None, **kw):
""" """
@ -61,14 +80,13 @@ def get_table_names(self, connection, schema=None, **kw):
return [name for name, in result] return [name for name, in result]
ischema_names.update({"geometry": GEOMETRY, "point": POINT, "polygon": POLYGON})
PGDialect.get_table_names = get_table_names PGDialect.get_table_names = get_table_names
PGDialect.ischema_names = ischema_names
class PostgresSource(CommonDbSourceService): class PostgresSource(CommonDbSourceService):
def __init__(self, config, metadata_config):
super().__init__(config, metadata_config)
self.pgconn = self.engine.raw_connection()
@classmethod @classmethod
def create(cls, config_dict, metadata_config: OpenMetadataConnection): def create(cls, config_dict, metadata_config: OpenMetadataConnection):
config: WorkflowSource = WorkflowSource.parse_obj(config_dict) config: WorkflowSource = WorkflowSource.parse_obj(config_dict)
@ -126,7 +144,7 @@ class PostgresSource(CommonDbSourceService):
return False, None return False, None
def type_of_column_name(self, sa_type, table_name: str, column_name: str): def type_of_column_name(self, sa_type, table_name: str, column_name: str):
cur = self.pgconn.cursor() cur = self.engine.cursor()
schema_table = table_name.split(".") schema_table = table_name.split(".")
cur.execute( cur.execute(
"""select data_type, udt_name """select data_type, udt_name

View File

@ -0,0 +1,237 @@
import types
from unittest import TestCase
from unittest.mock import patch
from sqlalchemy.types import VARCHAR
from ingestion.src.metadata.ingestion.source.database.postgres import (
GEOMETRY,
POINT,
POLYGON,
)
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
from metadata.generated.schema.entity.data.table import Column, Constraint, DataType
from metadata.generated.schema.entity.services.databaseService import (
DatabaseConnection,
DatabaseService,
DatabaseServiceType,
)
from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataWorkflowConfig,
)
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.source.database.postgres import PostgresSource
mock_postgres_config = {
"source": {
"type": "postgres",
"serviceName": "local_postgres1",
"serviceConnection": {
"config": {
"type": "Postgres",
"username": "username",
"password": "password",
"hostPort": "localhost:5432",
"database": "postgres",
}
},
"sourceConfig": {
"config": {
"type": "DatabaseMetadata",
}
},
},
"sink": {
"type": "metadata-rest",
"config": {},
},
"workflowConfig": {
"openMetadataServerConfig": {
"hostPort": "http://localhost:8585/api",
"authProvider": "no-auth",
}
},
}
MOCK_DATABASE_SERVICE = DatabaseService(
id="85811038-099a-11ed-861d-0242ac120002",
name="postgres_source",
connection=DatabaseConnection(),
serviceType=DatabaseServiceType.Postgres,
)
MOCK_DATABASE = Database(
id="2aaa012e-099a-11ed-861d-0242ac120002",
name="118146679784",
fullyQualifiedName="postgres_source.default",
displayName="118146679784",
description="",
service=EntityReference(
id="85811038-099a-11ed-861d-0242ac120002",
type="databaseService",
),
)
MOCK_DATABASE_SCHEMA = DatabaseSchema(
id="2aaa012e-099a-11ed-861d-0242ac120056",
name="default",
fullyQualifiedName="postgres_source.118146679784.default",
displayName="default",
description="",
database=EntityReference(
id="2aaa012e-099a-11ed-861d-0242ac120002",
type="database",
),
service=EntityReference(
id="2aaa012e-099a-11ed-861d-0242ac120002",
type="database",
),
)
MOCK_COLUMN_VALUE = [
{
"name": "username",
"type": VARCHAR(),
"nullable": True,
"default": None,
"autoincrement": False,
"comment": None,
},
{
"name": "geom_c",
"type": GEOMETRY(),
"nullable": True,
"default": None,
"autoincrement": False,
"comment": None,
},
{
"name": "point_c",
"type": POINT(),
"nullable": True,
"default": None,
"autoincrement": False,
"comment": None,
},
{
"name": "polygon_c",
"type": POLYGON(),
"nullable": True,
"default": None,
"autoincrement": False,
"comment": None,
},
]
EXPECTED_COLUMN_VALUE = [
Column(
name="username",
displayName=None,
dataType=DataType.VARCHAR,
arrayDataType=None,
dataLength=1,
precision=None,
scale=None,
dataTypeDisplay="VARCHAR(1)",
description=None,
fullyQualifiedName=None,
tags=None,
constraint=Constraint.NULL,
ordinalPosition=None,
jsonSchema=None,
children=None,
customMetrics=None,
profile=None,
),
Column(
name="geom_c",
displayName=None,
dataType=DataType.GEOMETRY,
arrayDataType=None,
dataLength=1,
precision=None,
scale=None,
dataTypeDisplay="GEOMETRY",
description=None,
fullyQualifiedName=None,
tags=None,
constraint=Constraint.NULL,
ordinalPosition=None,
jsonSchema=None,
children=None,
customMetrics=None,
profile=None,
),
Column(
name="point_c",
displayName=None,
dataType=DataType.POINT,
arrayDataType=None,
dataLength=1,
precision=None,
scale=None,
dataTypeDisplay="POINT",
description=None,
fullyQualifiedName=None,
tags=None,
constraint=Constraint.NULL,
ordinalPosition=None,
jsonSchema=None,
children=None,
customMetrics=None,
profile=None,
),
Column(
name="polygon_c",
displayName=None,
dataType=DataType.POLYGON,
arrayDataType=None,
dataLength=1,
precision=None,
scale=None,
dataTypeDisplay="POLYGON",
description=None,
fullyQualifiedName=None,
tags=None,
constraint=Constraint.NULL,
ordinalPosition=None,
jsonSchema=None,
children=None,
customMetrics=None,
profile=None,
),
]
class PostgresUnitTest(TestCase):
@patch("metadata.ingestion.source.database.common_db_source.test_connection")
def __init__(self, methodName, test_connection) -> None:
super().__init__(methodName)
test_connection.return_value = False
self.config = OpenMetadataWorkflowConfig.parse_obj(mock_postgres_config)
self.postgres_source = PostgresSource.create(
mock_postgres_config["source"],
self.config.workflowConfig.openMetadataServerConfig,
)
self.postgres_source.context.__dict__[
"database_service"
] = MOCK_DATABASE_SERVICE
self.postgres_source.context.__dict__["database"] = MOCK_DATABASE
self.postgres_source.context.__dict__["database_schema"] = MOCK_DATABASE_SCHEMA
def test_datatype(self):
inspector = types.SimpleNamespace()
inspector.get_columns = (
lambda table_name, schema_name, db_name: MOCK_COLUMN_VALUE
)
inspector.get_pk_constraint = lambda table_name, schema_name: []
inspector.get_unique_constraints = lambda table_name, schema_name: []
inspector.get_foreign_keys = lambda table_name, schema_name: []
result, _ = self.postgres_source.get_columns_and_constraints(
"public", "user", "postgres", inspector
)
for i in range(len(EXPECTED_COLUMN_VALUE)):
self.assertEqual(result[i], EXPECTED_COLUMN_VALUE[i])

View File

@ -91,7 +91,10 @@
"ENUM", "ENUM",
"JSON", "JSON",
"UUID", "UUID",
"VARIANT" "VARIANT",
"GEOMETRY",
"POINT",
"POLYGON"
] ]
}, },
"constraint": { "constraint": {