FIX #2802: Skip partitions and add datatypes (#2803)

* ISSUE-2802: Skip partitions and add datatypes

* ISSUE-2802: formatting changes
This commit is contained in:
Igor Kramer 2022-02-17 21:09:09 +03:00 committed by GitHub
parent c2b3f4a1d9
commit 316e02cdfd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 27 additions and 0 deletions

View File

@ -52,6 +52,21 @@ class PostgresSource(SQLSource):
def get_status(self) -> SourceStatus:
return self.status
def _is_partition(self, table_name: str, schema_name: str) -> bool:
cur = self.pgconn.cursor()
cur.execute(
"""
SELECT relispartition as is_partition
FROM pg_catalog.pg_class c
JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
WHERE c.relname = %s
AND n.nspname = %s
""",
(table_name, schema_name),
)
is_partition = cur.fetchone()[0]
return is_partition
def type_of_column_name(self, sa_type, table_name: str, column_name: str):
cur = self.pgconn.cursor()
schema_table = table_name.split(".")

View File

@ -203,6 +203,12 @@ class SQLSource(Source[OMetaDatabaseAndTable]):
"Table pattern not allowed",
)
continue
if self._is_partition(table_name, schema):
self.status.filter(
f"{self.config.get_service_name()}.{table_name}",
"Table is partition",
)
continue
description = _get_table_description(schema, table_name, inspector)
fqn = f"{self.config.service_name}.{schema}.{table_name}"
self.database_source_state.add(fqn)
@ -317,6 +323,9 @@ class SQLSource(Source[OMetaDatabaseAndTable]):
if table.fullyQualifiedName not in self.database_source_state:
yield DeleteTable(table=table)
def _is_partition(self, table_name: str, schema: str) -> bool:
return False
def _parse_data_model(self):
"""
Get all the DBT information and feed it to the Table Entity

View File

@ -132,6 +132,7 @@ class ColumnTypeParser:
"TEXT": "TEXT",
"TIME": "TIME",
"TIMESTAMP WITHOUT TIME ZONE": "TIMESTAMP",
"TIMESTAMP WITH TIME ZONE": "TIMESTAMP",
"TIMESTAMP": "TIMESTAMP",
"TIMESTAMPTZ": "TIMESTAMP",
"TIMESTAMP_NTZ": "TIMESTAMP",
@ -144,6 +145,8 @@ class ColumnTypeParser:
"VARBINARY": "VARBINARY",
"VARCHAR": "VARCHAR",
"VARIANT": "JSON",
"JSON": "JSON",
"JSONB": "JSON",
"XML": "BINARY",
"XMLTYPE": "BINARY",
"UUID": "UUID",