diff --git a/ingestion/setup.py b/ingestion/setup.py index 0f03004bdae..d4aa15b8f71 100644 --- a/ingestion/setup.py +++ b/ingestion/setup.py @@ -51,7 +51,7 @@ base_requirements = { "typing_extensions>=3.7.4" "mypy_extensions>=0.4.3", "typing-inspect", - "pydantic==1.7.4", + "pydantic==1.8.2", "pydantic[email]>=1.7.2", "google>=3.0.0", "google-auth>=1.33.0", @@ -85,10 +85,10 @@ base_plugins = { } plugins: Dict[str, Set[str]] = { "athena": {"PyAthena[SQLAlchemy]"}, - "bigquery": {"pybigquery >= 0.6.0"}, + "bigquery": {"openmetadata-sqlalchemy-bigquery==0.2.0"}, "bigquery-usage": {"google-cloud-logging", "cachetools"}, "elasticsearch": {"elasticsearch~=7.13.1"}, - "hive": {"pyhive~=0.6.3.dev0", "thrift~=0.13.0", "sasl==0.3.1", "thrift-sasl==0.4.3"}, + "hive": {"openmetadata-sqlalchemy-hive==0.2.0", "thrift~=0.13.0", "sasl==0.3.1", "thrift-sasl==0.4.3"}, "kafka": {"confluent_kafka>=1.5.0", "fastavro>=1.2.0"}, "ldap-users": {"ldap3==2.9.1"}, "looker": {"looker-sdk==21.12.2"}, @@ -101,7 +101,7 @@ plugins: Dict[str, Set[str]] = { "trino": {"sqlalchemy-trino"}, "postgres": {"pymysql>=1.0.2", "psycopg2-binary", "GeoAlchemy2"}, "redash": {"redash-toolbelt==0.1.4"}, - "redshift": {"openmetadata-sqlalchemy-redshift", "psycopg2-binary", "GeoAlchemy2"}, + "redshift": {"openmetadata-sqlalchemy-redshift==0.2.0", "psycopg2-binary", "GeoAlchemy2"}, "redshift-usage": {"sqlalchemy-redshift", "psycopg2-binary", "GeoAlchemy2"}, "scheduler": scheduler_requirements, "data-profiler": {"openmetadata-data-profiler"}, diff --git a/ingestion/src/metadata/ingestion/source/bigquery.py b/ingestion/src/metadata/ingestion/source/bigquery.py index ca663b78348..b94d0e214e3 100644 --- a/ingestion/src/metadata/ingestion/source/bigquery.py +++ b/ingestion/src/metadata/ingestion/source/bigquery.py @@ -55,3 +55,6 @@ class BigquerySource(SQLSource): if segments[0] != schema: raise ValueError(f"schema {schema} does not match table {table}") return segments[0], segments[1] + + def parse_raw_data_type(self, raw_data_type): + return raw_data_type.replace(', ',',').replace(' ',':').lower() diff --git a/ingestion/src/metadata/ingestion/source/sql_source.py b/ingestion/src/metadata/ingestion/source/sql_source.py index b9d9496a5ca..acdb268eed4 100644 --- a/ingestion/src/metadata/ingestion/source/sql_source.py +++ b/ingestion/src/metadata/ingestion/source/sql_source.py @@ -12,6 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import traceback import logging import uuid import re @@ -291,6 +292,9 @@ class SQLSource(Source): service=EntityReference(id=self.service.id, type=self.config.service_type) ) + def parse_raw_data_type(self,raw_data_type): + return raw_data_type + def _get_columns(self, schema: str, table: str, inspector: Inspector) -> List[Column]: pk_constraints = inspector.get_pk_constraint(table, schema) pk_columns = pk_constraints['column_constraints'] if len( @@ -311,33 +315,42 @@ class SQLSource(Source): row_order = 1 try: for column in columns: + if '.' in column['name']: + continue children = None data_type_display = None col_data_length = None arr_data_type = None - if 'raw_data_type' in column and 'raw_data_type' is not None: - if re.match(r'(struct<)(?:.*)', column['raw_data_type']): + if 'raw_data_type' in column and column['raw_data_type'] is not None: + column['raw_data_type'] = self.parse_raw_data_type(column['raw_data_type']) + if column['raw_data_type'].startswith('struct<'): col_type = 'STRUCT' - # plucked = re.match(r'(?:struct<)(.*)(?:>)',column['raw_data_type']).groups()[0] - - children = _handle_complex_data_types( + col_obj = _handle_complex_data_types( self.status, dataset_name, f"{column['name']}:{column['raw_data_type']}" - )['children'] - data_type_display = column['raw_data_type'] - elif re.match(r'(map<|array<)(?:.*)', column['raw_data_type']): - if re.match(r'(map<)(?:.*)', column['raw_data_type']): - col_type = 'MAP' - else: - col_type = 'ARRAY' - arr_data_type = re.match( - r'(?:array<)(\w*)(?:.*)', column['raw_data_type'] - ) - arr_data_type = arr_data_type.groups()[0].upper() - data_type_display = column['raw_data_type'] + ) + if 'children' in col_obj and col_obj['children'] is not None: + children = col_obj['children'] + elif column['raw_data_type'].startswith('map<'): + col_type = 'MAP' + elif column['raw_data_type'].startswith('array<'): + col_type = 'ARRAY' + arr_data_type = re.match( + r'(?:array<)(\w*)(?:.*)', column['raw_data_type'] + ) + arr_data_type = arr_data_type.groups()[0].upper() + elif column['raw_data_type'].startswith('uniontype<'): + col_type = 'UNION' else: col_type = get_column_type(self.status, dataset_name, column['type']) + data_type_display = col_type + if data_type_display is None: + data_type_display = column['raw_data_type'] else: col_type = get_column_type(self.status, dataset_name, column['type']) + if col_type == 'ARRAY': + if re.match(r'(?:\w*)(?:\()(\w*)(?:.*))',str(column['type'])): + arr_data_type = re.match(r'(?:\w*)(?:[(]*)(\w*)(?:.*))',str(column['type'])).groups() + data_type_display = column['type'] col_constraint = None if column['nullable']: col_constraint = Constraint.NULL @@ -351,20 +364,24 @@ class SQLSource(Source): col_data_length = column['type'].length if col_data_length is None: col_data_length = 1 - om_column = Column( - name=column['name'], - description=column.get("comment", None), - dataType=col_type, - dataTypeDisplay="{}({})".format(col_type, col_data_length) if data_type_display - is None else - f"{data_type_display}", - dataLength=col_data_length, - constraint=col_constraint, - ordinalPosition=row_order, - children=children if children is not None else None, - arrayDataType=arr_data_type - ) - + try: + om_column = Column( + name=column['name'], + description=column.get("comment", None), + dataType=col_type, + dataTypeDisplay="{}({})".format(col_type, col_data_length) if data_type_display + is None else f"{data_type_display}", + dataLength=col_data_length, + constraint=col_constraint, + ordinalPosition=row_order, + children=children if children is not None else None, + arrayDataType=arr_data_type + ) + except Exception as err: + logger.error(traceback.format_exc()) + logger.error(traceback.print_exc()) + logger.error(err) + continue table_columns.append(om_column) row_order = row_order + 1 return table_columns diff --git a/ingestion/src/metadata/utils/column_helpers.py b/ingestion/src/metadata/utils/column_helpers.py index 4c030243300..fb248844fd1 100644 --- a/ingestion/src/metadata/utils/column_helpers.py +++ b/ingestion/src/metadata/utils/column_helpers.py @@ -4,7 +4,6 @@ from sqlalchemy.sql import sqltypes as types from metadata.ingestion.api.source import SourceStatus - def register_custom_type( tp: Type[types.TypeEngine], output: str = None ) -> None: @@ -36,36 +35,60 @@ _column_type_mapping: Dict[Type[types.TypeEngine], str] = { types.CHAR: "CHAR" } +_column_string_mapping = { +"INT": "INT", +"BOOLEAN": "BOOLEAN", +"ENUM": "ENUM", +"BYTES": "BYTES", +"ARRAY": "ARRAY", +"VARCHAR": "VARCHAR", +"STRING": "STRING", +"DATE": "DATE", +"TIME": "TIME", +"DATETIME": "DATETIME", +"TIMESTAMP": "TIMESTAMP", +"NULL": "NULL", +"JSON": "JSON", +"CHAR": "CHAR", +"INT64": "BIGINT", +"CHARACTER VARYING": "VARCHAR", +"VARIANT": "JSON", +"OBJECT": "JSON", +"MAP": "MAP", +"UNION": "UNION" +} + _known_unknown_column_types: Set[Type[types.TypeEngine]] = { types.Interval, types.CLOB, } - def get_column_type(status: SourceStatus, dataset_name: str, column_type: Any) -> str: type_class: Optional[str] = None for sql_type in _column_type_mapping.keys(): if isinstance(column_type, sql_type): type_class = _column_type_mapping[sql_type] break - if type_class is None: + if type_class is None or type_class == "NULL": for sql_type in _known_unknown_column_types: if isinstance(column_type, sql_type): type_class = "NULL" break - if type_class is None and column_type in ['CHARACTER VARYING', 'CHAR']: - type_class = 'VARCHAR' - if type_class is None: + for col_type in _column_string_mapping.keys(): + if str(column_type) == col_type: + type_class = _column_string_mapping.get(col_type) + break + else: + type_class = None + if type_class is None or type_class == "NULL": status.warning( dataset_name, f"unable to map type {column_type!r} to metadata schema" ) type_class = "NULL" - return type_class - def get_last_index(nested_str): counter = 1 for index, i in enumerate(nested_str): @@ -82,15 +105,16 @@ def get_last_index(nested_str): def get_array_type(col_type): col = {} col['dataType'] = 'ARRAY' - col_type = col_type[:get_last_index(col_type)+2] + col_type = col_type[:get_last_index(col_type) + 2] col['dataTypeDisplay'] = col_type col['arrayDataType'] = re.match( r'(?:array<)(\w*)(?:.*)', col_type).groups()[0].upper() return col -def _handle_complex_data_types(status,dataset_name,raw_type: str, level=0): + +def _handle_complex_data_types(status, dataset_name, raw_type: str, level=0): col = {} - if re.match(r'([\w\s]*)(:)(.*)',raw_type): + if re.match(r'([\w\s]*)(:)(.*)', raw_type): name, col_type = raw_type.lstrip('<').split(':', 1) col['name'] = name else: @@ -99,39 +123,41 @@ def _handle_complex_data_types(status,dataset_name,raw_type: str, level=0): col_type = raw_type else: col_type = raw_type.lstrip('<').split(':', 1)[0] - if re.match(r'(struct)(.*)', col_type): + if col_type.startswith('struct<'): children = [] - col_type = re.match(r'(struct<)(.*)', col_type).groups()[1] + struct_type, col_type = re.match(r'(struct<)(.*)', col_type).groups() pluck_index = get_last_index(col_type) - pluck_nested = col_type[:pluck_index+1] + pluck_nested = col_type[:pluck_index + 1] + col['dataTypeDisplay'] = struct_type + pluck_nested while pluck_nested != '': col['dataType'] = 'STRUCT' plucked = col_type[:get_last_index(col_type)] counter = 0 continue_next = False - for index,type in enumerate(plucked.split(',')): + for index, type in enumerate(plucked.split(',')): if continue_next: continue_next = False continue - if re.match(r'(\w*)(:)(struct)(.*)',type): - col_name,datatype,rest = re.match(r'(\w*)(?::)(struct)(.*)',','.join(plucked.split(',')[index:])).groups() - type = f"{col_name}:{datatype}{rest[:get_last_index(rest)+2]}" + if re.match(r'(\w*)(:)(struct)(.*)', type): + col_name, datatype, rest = re.match(r'(\w*)(?::)(struct)(.*)', + ','.join(plucked.split(',')[index:])).groups() + type = f"{col_name}:{datatype}{rest[:get_last_index(rest) + 2]}" elif type.startswith('struct'): - datatype,rest = re.match(r'(struct)(.*)',','.join(plucked.split(',')[index:])).groups() - type = f"{datatype}{rest[:get_last_index(rest)+2]}" - elif re.match(r'([\w\s]*)(:?)(map)(.*)',type): + datatype, rest = re.match(r'(struct)(.*)', ','.join(plucked.split(',')[index:])).groups() + type = f"{datatype}{rest[:get_last_index(rest) + 2]}" + elif re.match(r'([\w\s]*)(:?)(map)(.*)', type): get_map_type = ','.join(plucked.split(',')[index:]) - type,col_type = re.match(r'([\w]*:?map<[\w,]*>)(.*)',get_map_type).groups() + type, col_type = re.match(r'([\w]*:?map<[\w,]*>)(.*)', get_map_type).groups() continue_next = True - elif re.match(r'([\w\s]*)(:?)(uniontype)(.*)',type): + elif re.match(r'([\w\s]*)(:?)(uniontype)(.*)', type): get_union_type = ','.join(plucked.split(',')[index:]) - type,col_type = re.match(r'([\w\s]*:?uniontype<[\w\s,]*>)(.*)',get_union_type).groups() + type, col_type = re.match(r'([\w\s]*:?uniontype<[\w\s,]*>)(.*)', get_union_type).groups() continue_next = True - children.append(_handle_complex_data_types(status,dataset_name,type,counter)) + children.append(_handle_complex_data_types(status, dataset_name, type, counter)) if plucked.endswith(type): break counter += 1 - pluck_nested = col_type[get_last_index(col_type)+3:] + pluck_nested = col_type[get_last_index(col_type) + 3:] col['children'] = children elif col_type.startswith('array'): col.update(get_array_type(col_type)) @@ -146,7 +172,7 @@ def _handle_complex_data_types(status,dataset_name,raw_type: str, level=0): col['dataLength'] = re.match(r'(?:[\w\s]*)(?:\()([\d]*)(?:\))', col_type).groups()[0] else: col['dataLength'] = 1 - col['dataType'] = get_column_type(status,dataset_name,re.match('([\w\s]*)(?:.*)',col_type).groups()[0].upper()) + col['dataType'] = get_column_type(status, dataset_name, + re.match('([\w\s]*)(?:.*)', col_type).groups()[0].upper()) col['dataTypeDisplay'] = col_type.rstrip('>') return col - \ No newline at end of file