For hive's complex data types parse raw type (#560)

* For hive's complex data types parse raw type

* Complex Data type logic modification

* Complex Data Type parsing implemented

* Raw Data type helper modification

* handling unnamed/anonymous struct

* Complex Nested structure implementation

* print statements removed and reverted to raw_data_type

* Complex Structure Array & MAP logic implemented

* Raw Data Type Logic revamped

* Redshift Integration

* MAP and UnionType support added

* Redshift Pypi package updated

* dataLength validationError fix

Co-authored-by: Ayush Shah <ayush@getcollate.io>
This commit is contained in:
Sriharsha Chintalapani 2021-10-04 11:06:35 -07:00 committed by GitHub
parent 5bc7e8fbc2
commit e3cfb4dc65
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 257 additions and 129 deletions

View File

@ -88,7 +88,7 @@ plugins: Dict[str, Set[str]] = {
"bigquery": {"pybigquery >= 0.6.0"},
"bigquery-usage": {"google-cloud-logging", "cachetools"},
"elasticsearch": {"elasticsearch~=7.13.1"},
"hive": {"pyhive~=0.6.3", "thrift~=0.13.0", "sasl==0.3.1", "thrift-sasl==0.4.3"},
"hive": {"pyhive~=0.6.3.dev0", "thrift~=0.13.0", "sasl==0.3.1", "thrift-sasl==0.4.3"},
"kafka": {"confluent_kafka>=1.5.0", "fastavro>=1.2.0"},
"ldap-users": {"ldap3==2.9.1"},
"looker": {"looker-sdk==21.12.2"},
@ -100,7 +100,7 @@ plugins: Dict[str, Set[str]] = {
"presto": {"pyhive~=0.6.3"},
"postgres": {"pymysql>=1.0.2", "psycopg2-binary", "GeoAlchemy2"},
"redash": {"redash-toolbelt==0.1.4"},
"redshift": {"sqlalchemy-redshift", "psycopg2-binary", "GeoAlchemy2"},
"redshift": {"openmetadata-sqlalchemy-redshift", "psycopg2-binary", "GeoAlchemy2"},
"redshift-usage": {"sqlalchemy-redshift", "psycopg2-binary", "GeoAlchemy2"},
"scheduler": scheduler_requirements,
"data-profiler": {"openmetadata-data-profiler"},

View File

@ -14,14 +14,15 @@
# limitations under the License.
from typing import Optional
from metadata.utils.helpers import register_custom_type
from pyhive import hive # noqa: F401
from pyhive.sqlalchemy_hive import HiveDate, HiveDecimal, HiveTimestamp
from .sql_source import (
SQLConnectionConfig,
SQLSource,
register_custom_type,
)
from ..ometa.openmetadata_rest import MetadataServerConfig
register_custom_type(HiveDate, "DATE")

View File

@ -15,14 +15,12 @@
import logging
from typing import Optional
from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig
from metadata.ingestion.source.sql_source import SQLSource, SQLConnectionConfig
from metadata.ingestion.api.source import SourceStatus
logger = logging.getLogger(__name__)
class RedshiftConfig(SQLConnectionConfig):
scheme = "redshift+psycopg2"
where_clause: Optional[str] = None

View File

@ -20,10 +20,9 @@ from snowflake.sqlalchemy import custom_types
from .sql_source import (
SQLConnectionConfig,
SQLSource,
register_custom_type,
)
from ..ometa.openmetadata_rest import MetadataServerConfig
from metadata.utils.helpers import register_custom_type
register_custom_type(custom_types.TIMESTAMP_TZ, "TIME")
register_custom_type(custom_types.TIMESTAMP_LTZ, "TIME")
register_custom_type(custom_types.TIMESTAMP_NTZ, "TIME")

View File

@ -12,16 +12,14 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import traceback
import uuid
import re
from abc import abstractmethod
from dataclasses import dataclass, field
from typing import Any, Dict, Iterable, List, Optional, Set, Tuple, Type
from urllib.parse import quote_plus
from pydantic import ValidationError
from metadata.generated.schema.entity.services.databaseService import DatabaseServiceType
from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable
@ -29,11 +27,10 @@ from metadata.generated.schema.type.entityReference import EntityReference
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.table import Table, Column, TableType, Constraint, \
TableData, TableProfile, ConstraintType, TableConstraint
from metadata.generated.schema.entity.data.table import Table, Column, Constraint, \
TableData, TableProfile
from sqlalchemy import create_engine
from sqlalchemy.engine.reflection import Inspector
from sqlalchemy.sql import sqltypes as types
from sqlalchemy.inspection import inspect
from metadata.ingestion.api.common import IncludeFilterPattern, ConfigModel, Record
@ -41,8 +38,8 @@ from metadata.ingestion.api.common import WorkflowContext
from metadata.ingestion.api.source import Source, SourceStatus
from metadata.ingestion.models.table_metadata import DatasetProfile
from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig
from metadata.utils.helpers import get_database_service_or_create
from metadata.utils.helpers import get_database_service_or_create
from metadata.utils.column_helpers import _handle_complex_data_types,get_column_type
logger: logging.Logger = logging.getLogger(__name__)
@ -103,64 +100,6 @@ class SQLConnectionConfig(ConfigModel):
return self.service_name
_column_type_mapping: Dict[Type[types.TypeEngine], str] = {
types.Integer: "INT",
types.Numeric: "INT",
types.Boolean: "BOOLEAN",
types.Enum: "ENUM",
types._Binary: "BYTES",
types.LargeBinary: "BYTES",
types.PickleType: "BYTES",
types.ARRAY: "ARRAY",
types.VARCHAR: "VARCHAR",
types.CHAR: "VARCHAR",
types.String: "STRING",
types.Date: "DATE",
types.DATE: "DATE",
types.Time: "TIME",
types.DateTime: "DATETIME",
types.DATETIME: "DATETIME",
types.TIMESTAMP: "TIMESTAMP",
types.NullType: "NULL",
types.JSON: "JSON"
}
_known_unknown_column_types: Set[Type[types.TypeEngine]] = {
types.Interval,
types.CLOB,
}
def register_custom_type(
tp: Type[types.TypeEngine], output: str = None
) -> None:
if output:
_column_type_mapping[tp] = output
else:
_known_unknown_column_types.add(tp)
def get_column_type(status: SQLSourceStatus, dataset_name: str, column_type: Any) -> str:
type_class: Optional[str] = None
for sql_type in _column_type_mapping.keys():
if isinstance(column_type, sql_type):
type_class = _column_type_mapping[sql_type]
break
if type_class is None:
for sql_type in _known_unknown_column_types:
if isinstance(column_type, sql_type):
type_class = "NULL"
break
if type_class is None:
status.warning(
dataset_name, f"unable to map type {column_type!r} to metadata schema"
)
type_class = "NULL"
return type_class
def _get_table_description(schema: str, table: str, inspector: Inspector) -> str:
description = None
try:
@ -193,8 +132,10 @@ class SQLSource(Source):
if self.config.data_profiler_enabled:
if self.data_profiler is None:
from metadata.profiler.dataprofiler import DataProfiler
self.data_profiler = DataProfiler(status=self.status,
connection_str=self.connection_string)
self.data_profiler = DataProfiler(
status=self.status,
connection_str=self.connection_string
)
return True
return False
except Exception:
@ -268,9 +209,14 @@ class SQLSource(Source):
fullyQualifiedName=fqn,
columns=table_columns
)
if self.sql_config.generate_sample_data:
table_data = self.fetch_sample_data(schema, table_name)
table_entity.sampleData = table_data
try:
if self.sql_config.generate_sample_data:
table_data = self.fetch_sample_data(schema, table_name)
table_entity.sampleData = table_data
except Exception as err:
logger.error(repr(err))
logger.error(err)
pass
if self._instantiate_profiler():
profile = self.run_data_profiler(table_name, schema)
@ -363,44 +309,67 @@ class SQLSource(Source):
columns = inspector.get_columns(table, schema)
table_columns = []
row_order = 1
for column in columns:
col_type = None
try:
col_type = get_column_type(self.status, dataset_name, column['type'])
except Exception as err:
logger.error(err)
try:
for column in columns:
children = None
data_type_display = None
col_data_length = None
arr_data_type = None
if 'raw_data_type' in column and 'raw_data_type' is not None:
if re.match(r'(struct<)(?:.*)', column['raw_data_type']):
col_type = 'STRUCT'
# plucked = re.match(r'(?:struct<)(.*)(?:>)',column['raw_data_type']).groups()[0]
col_constraint = None
if column['nullable']:
col_constraint = Constraint.NULL
elif not column['nullable']:
col_constraint = Constraint.NOT_NULL
if column['name'] in pk_columns:
col_constraint = Constraint.PRIMARY_KEY
elif column['name'] in unique_columns:
col_constraint = Constraint.UNIQUE
col_data_length = None
if col_type in ['CHAR', 'VARCHAR', 'BINARY', 'VARBINARY']:
col_data_length = column['type'].length
om_column = Column(
children = _handle_complex_data_types(
self.status, dataset_name, f"{column['name']}:{column['raw_data_type']}"
)['children']
data_type_display = column['raw_data_type']
elif re.match(r'(map<|array<)(?:.*)', column['raw_data_type']):
if re.match(r'(map<)(?:.*)', column['raw_data_type']):
col_type = 'MAP'
else:
col_type = 'ARRAY'
arr_data_type = re.match(
r'(?:array<)(\w*)(?:.*)', column['raw_data_type']
)
arr_data_type = arr_data_type.groups()[0].upper()
data_type_display = column['raw_data_type']
else:
col_type = get_column_type(self.status, dataset_name, column['type'])
else:
col_type = get_column_type(self.status, dataset_name, column['type'])
col_constraint = None
if column['nullable']:
col_constraint = Constraint.NULL
elif not column['nullable']:
col_constraint = Constraint.NOT_NULL
if column['name'] in pk_columns:
col_constraint = Constraint.PRIMARY_KEY
elif column['name'] in unique_columns:
col_constraint = Constraint.UNIQUE
if col_type in ['CHAR', 'VARCHAR', 'BINARY', 'VARBINARY']:
col_data_length = column['type'].length
if col_data_length is None:
col_data_length = 1
om_column = Column(
name=column['name'],
description=column.get("comment", None),
dataType=col_type,
dataTypeDisplay=col_type,
dataTypeDisplay="{}({})".format(col_type, col_data_length) if data_type_display
is None else
f"{data_type_display}",
dataLength=col_data_length,
constraint=col_constraint,
ordinalPosition=row_order
ordinalPosition=row_order,
children=children if children is not None else None,
arrayDataType=arr_data_type
)
if col_data_length is not None:
om_column.dataLength = col_data_length
table_columns.append(om_column)
row_order = row_order + 1
return table_columns
table_columns.append(om_column)
row_order = row_order + 1
return table_columns
except Exception as err:
logger.error("{}: {} {}".format(repr(err), table, err))
def run_data_profiler(
self,
@ -412,7 +381,7 @@ class SQLSource(Source):
logger.info(
f"Running Profiling for {dataset_name}. "
f"If you haven't configured offset and limit this process can take longer"
)
if self.config.scheme == "bigquery":
table = dataset_name

View File

@ -0,0 +1,152 @@
import re
from typing import Any, Dict, Optional, Set, Type
from sqlalchemy.sql import sqltypes as types
from metadata.ingestion.api.source import SourceStatus
def register_custom_type(
tp: Type[types.TypeEngine], output: str = None
) -> None:
if output:
_column_type_mapping[tp] = output
else:
_known_unknown_column_types.add(tp)
_column_type_mapping: Dict[Type[types.TypeEngine], str] = {
types.Integer: "INT",
types.Numeric: "INT",
types.Boolean: "BOOLEAN",
types.Enum: "ENUM",
types._Binary: "BYTES",
types.LargeBinary: "BYTES",
types.PickleType: "BYTES",
types.ARRAY: "ARRAY",
types.VARCHAR: "VARCHAR",
types.String: "STRING",
types.Date: "DATE",
types.DATE: "DATE",
types.Time: "TIME",
types.DateTime: "DATETIME",
types.DATETIME: "DATETIME",
types.TIMESTAMP: "TIMESTAMP",
types.NullType: "NULL",
types.JSON: "JSON",
types.CHAR: "CHAR"
}
_known_unknown_column_types: Set[Type[types.TypeEngine]] = {
types.Interval,
types.CLOB,
}
def get_column_type(status: SourceStatus, dataset_name: str, column_type: Any) -> str:
type_class: Optional[str] = None
for sql_type in _column_type_mapping.keys():
if isinstance(column_type, sql_type):
type_class = _column_type_mapping[sql_type]
break
if type_class is None:
for sql_type in _known_unknown_column_types:
if isinstance(column_type, sql_type):
type_class = "NULL"
break
if type_class is None and column_type in ['CHARACTER VARYING', 'CHAR']:
type_class = 'VARCHAR'
if type_class is None:
status.warning(
dataset_name, f"unable to map type {column_type!r} to metadata schema"
)
type_class = "NULL"
return type_class
def get_last_index(nested_str):
counter = 1
for index, i in enumerate(nested_str):
if i == '>':
counter -= 1
elif i == '<':
counter += 1
if counter == 0:
break
index = index - counter
return index
def get_array_type(col_type):
col = {}
col['dataType'] = 'ARRAY'
col_type = col_type[:get_last_index(col_type)+2]
col['dataTypeDisplay'] = col_type
col['arrayDataType'] = re.match(
r'(?:array<)(\w*)(?:.*)', col_type).groups()[0].upper()
return col
def _handle_complex_data_types(status,dataset_name,raw_type: str, level=0):
col = {}
if re.match(r'([\w\s]*)(:)(.*)',raw_type):
name, col_type = raw_type.lstrip('<').split(':', 1)
col['name'] = name
else:
col['name'] = f'field_{level}'
if raw_type.startswith('struct<'):
col_type = raw_type
else:
col_type = raw_type.lstrip('<').split(':', 1)[0]
if re.match(r'(struct)(.*)', col_type):
children = []
col_type = re.match(r'(struct<)(.*)', col_type).groups()[1]
pluck_index = get_last_index(col_type)
pluck_nested = col_type[:pluck_index+1]
while pluck_nested != '':
col['dataType'] = 'STRUCT'
plucked = col_type[:get_last_index(col_type)]
counter = 0
continue_next = False
for index,type in enumerate(plucked.split(',')):
if continue_next:
continue_next = False
continue
if re.match(r'(\w*)(:)(struct)(.*)',type):
col_name,datatype,rest = re.match(r'(\w*)(?::)(struct)(.*)',','.join(plucked.split(',')[index:])).groups()
type = f"{col_name}:{datatype}{rest[:get_last_index(rest)+2]}"
elif type.startswith('struct'):
datatype,rest = re.match(r'(struct)(.*)',','.join(plucked.split(',')[index:])).groups()
type = f"{datatype}{rest[:get_last_index(rest)+2]}"
elif re.match(r'([\w\s]*)(:?)(map)(.*)',type):
get_map_type = ','.join(plucked.split(',')[index:])
type,col_type = re.match(r'([\w]*:?map<[\w,]*>)(.*)',get_map_type).groups()
continue_next = True
elif re.match(r'([\w\s]*)(:?)(uniontype)(.*)',type):
get_union_type = ','.join(plucked.split(',')[index:])
type,col_type = re.match(r'([\w\s]*:?uniontype<[\w\s,]*>)(.*)',get_union_type).groups()
continue_next = True
children.append(_handle_complex_data_types(status,dataset_name,type,counter))
if plucked.endswith(type):
break
counter += 1
pluck_nested = col_type[get_last_index(col_type)+3:]
col['children'] = children
elif col_type.startswith('array'):
col.update(get_array_type(col_type))
elif col_type.startswith('map'):
col['dataType'] = 'MAP'
col['dataTypeDisplay'] = col_type
elif col_type.startswith('uniontype'):
col['dataType'] = 'UNION'
col['dataTypeDisplay'] = col_type
else:
if re.match(r'(?:[\w\s]*)(?:\()([\d]*)(?:\))', col_type):
col['dataLength'] = re.match(r'(?:[\w\s]*)(?:\()([\d]*)(?:\))', col_type).groups()[0]
else:
col['dataLength'] = 1
col['dataType'] = get_column_type(status,dataset_name,re.match('([\w\s]*)(?:.*)',col_type).groups()[0].upper())
col['dataTypeDisplay'] = col_type.rstrip('>')
return col

View File

@ -12,17 +12,18 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from datetime import datetime, timedelta
from typing import List
from metadata.generated.schema.api.services.createDashboardService import CreateDashboardServiceEntityRequest
from metadata.generated.schema.api.services.createDatabaseService import CreateDatabaseServiceEntityRequest
from metadata.generated.schema.api.services.createMessagingService import CreateMessagingServiceEntityRequest
from metadata.generated.schema.api.services.createDashboardService import \
CreateDashboardServiceEntityRequest
from metadata.generated.schema.api.services.createDatabaseService import \
CreateDatabaseServiceEntityRequest
from metadata.generated.schema.api.services.createMessagingService import \
CreateMessagingServiceEntityRequest
from metadata.generated.schema.entity.services.dashboardService import DashboardService
from metadata.generated.schema.entity.services.databaseService import DatabaseService
from metadata.generated.schema.entity.services.messagingService import MessagingService
from metadata.ingestion.ometa.client import REST
from metadata.ingestion.ometa.openmetadata_rest import OpenMetadataAPIClient
@ -48,17 +49,22 @@ def get_database_service_or_create(config, metadata_config) -> DatabaseService:
return service
else:
service = {'jdbc': {'connectionUrl': f'jdbc://{config.host_port}', 'driverClass': 'jdbc'},
'name': config.service_name, 'description': '', 'serviceType': config.get_service_type()}
'name': config.service_name, 'description': '',
'serviceType': config.get_service_type()}
print(service)
created_service = client.create_database_service(CreateDatabaseServiceEntityRequest(**service))
created_service = client.create_database_service(
CreateDatabaseServiceEntityRequest(**service)
)
return created_service
def get_messaging_service_or_create(service_name: str,
message_service_type: str,
schema_registry_url: str,
brokers: List[str],
metadata_config) -> MessagingService:
def get_messaging_service_or_create(
service_name: str,
message_service_type: str,
schema_registry_url: str,
brokers: List[str],
metadata_config
) -> MessagingService:
client = OpenMetadataAPIClient(metadata_config)
service = client.get_messaging_service(service_name)
if service is not None:
@ -74,12 +80,14 @@ def get_messaging_service_or_create(service_name: str,
return created_service
def get_dashboard_service_or_create(service_name: str,
dashboard_service_type: str,
username: str,
password: str,
dashboard_url: str,
metadata_config) -> DashboardService:
def get_dashboard_service_or_create(
service_name: str,
dashboard_service_type: str,
username: str,
password: str,
dashboard_url: str,
metadata_config
) -> DashboardService:
client = OpenMetadataAPIClient(metadata_config)
service = client.get_dashboard_service(service_name)
if service is not None:
@ -95,8 +103,8 @@ def get_dashboard_service_or_create(service_name: str,
created_service = client.create_dashboard_service(create_dashboard_service_request)
return created_service
def convert_epoch_to_iso(seconds_since_epoch):
dt = datetime.utcfromtimestamp(seconds_since_epoch)
iso_format = dt.isoformat() + 'Z'
return iso_format

View File

@ -26,4 +26,5 @@ test_data as (
) AS service
)
INSERT INTO TABLE metadata_array_struct_test
select * from test_data;
select * from test_data;
CREATE TABLE union_test(foo UNIONTYPE<int, double, array<string>, struct<a:int,b:string>>);