Added support for struct in bigquery, pyhive and bigquery pypi packag… (#717)

* Added support for struct in bigquery, pyhive and bigquery pypi packages modified

* Versions added, Naming changed, Newlines removed
This commit is contained in:
Ayush Shah 2021-10-09 19:45:41 +05:30 committed by GitHub
parent 7ef490ed2d
commit 1650a4ef4a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 109 additions and 63 deletions

View File

@ -51,7 +51,7 @@ base_requirements = {
"typing_extensions>=3.7.4"
"mypy_extensions>=0.4.3",
"typing-inspect",
"pydantic==1.7.4",
"pydantic==1.8.2",
"pydantic[email]>=1.7.2",
"google>=3.0.0",
"google-auth>=1.33.0",
@ -85,10 +85,10 @@ base_plugins = {
}
plugins: Dict[str, Set[str]] = {
"athena": {"PyAthena[SQLAlchemy]"},
"bigquery": {"pybigquery >= 0.6.0"},
"bigquery": {"openmetadata-sqlalchemy-bigquery==0.2.0"},
"bigquery-usage": {"google-cloud-logging", "cachetools"},
"elasticsearch": {"elasticsearch~=7.13.1"},
"hive": {"pyhive~=0.6.3.dev0", "thrift~=0.13.0", "sasl==0.3.1", "thrift-sasl==0.4.3"},
"hive": {"openmetadata-sqlalchemy-hive==0.2.0", "thrift~=0.13.0", "sasl==0.3.1", "thrift-sasl==0.4.3"},
"kafka": {"confluent_kafka>=1.5.0", "fastavro>=1.2.0"},
"ldap-users": {"ldap3==2.9.1"},
"looker": {"looker-sdk==21.12.2"},
@ -101,7 +101,7 @@ plugins: Dict[str, Set[str]] = {
"trino": {"sqlalchemy-trino"},
"postgres": {"pymysql>=1.0.2", "psycopg2-binary", "GeoAlchemy2"},
"redash": {"redash-toolbelt==0.1.4"},
"redshift": {"openmetadata-sqlalchemy-redshift", "psycopg2-binary", "GeoAlchemy2"},
"redshift": {"openmetadata-sqlalchemy-redshift==0.2.0", "psycopg2-binary", "GeoAlchemy2"},
"redshift-usage": {"sqlalchemy-redshift", "psycopg2-binary", "GeoAlchemy2"},
"scheduler": scheduler_requirements,
"data-profiler": {"openmetadata-data-profiler"},

View File

@ -55,3 +55,6 @@ class BigquerySource(SQLSource):
if segments[0] != schema:
raise ValueError(f"schema {schema} does not match table {table}")
return segments[0], segments[1]
def parse_raw_data_type(self, raw_data_type):
return raw_data_type.replace(', ',',').replace(' ',':').lower()

View File

@ -12,6 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import traceback
import logging
import uuid
import re
@ -291,6 +292,9 @@ class SQLSource(Source):
service=EntityReference(id=self.service.id, type=self.config.service_type)
)
def parse_raw_data_type(self,raw_data_type):
return raw_data_type
def _get_columns(self, schema: str, table: str, inspector: Inspector) -> List[Column]:
pk_constraints = inspector.get_pk_constraint(table, schema)
pk_columns = pk_constraints['column_constraints'] if len(
@ -311,33 +315,42 @@ class SQLSource(Source):
row_order = 1
try:
for column in columns:
if '.' in column['name']:
continue
children = None
data_type_display = None
col_data_length = None
arr_data_type = None
if 'raw_data_type' in column and 'raw_data_type' is not None:
if re.match(r'(struct<)(?:.*)', column['raw_data_type']):
if 'raw_data_type' in column and column['raw_data_type'] is not None:
column['raw_data_type'] = self.parse_raw_data_type(column['raw_data_type'])
if column['raw_data_type'].startswith('struct<'):
col_type = 'STRUCT'
# plucked = re.match(r'(?:struct<)(.*)(?:>)',column['raw_data_type']).groups()[0]
children = _handle_complex_data_types(
col_obj = _handle_complex_data_types(
self.status, dataset_name, f"{column['name']}:{column['raw_data_type']}"
)['children']
data_type_display = column['raw_data_type']
elif re.match(r'(map<|array<)(?:.*)', column['raw_data_type']):
if re.match(r'(map<)(?:.*)', column['raw_data_type']):
col_type = 'MAP'
else:
col_type = 'ARRAY'
arr_data_type = re.match(
r'(?:array<)(\w*)(?:.*)', column['raw_data_type']
)
arr_data_type = arr_data_type.groups()[0].upper()
data_type_display = column['raw_data_type']
)
if 'children' in col_obj and col_obj['children'] is not None:
children = col_obj['children']
elif column['raw_data_type'].startswith('map<'):
col_type = 'MAP'
elif column['raw_data_type'].startswith('array<'):
col_type = 'ARRAY'
arr_data_type = re.match(
r'(?:array<)(\w*)(?:.*)', column['raw_data_type']
)
arr_data_type = arr_data_type.groups()[0].upper()
elif column['raw_data_type'].startswith('uniontype<'):
col_type = 'UNION'
else:
col_type = get_column_type(self.status, dataset_name, column['type'])
data_type_display = col_type
if data_type_display is None:
data_type_display = column['raw_data_type']
else:
col_type = get_column_type(self.status, dataset_name, column['type'])
if col_type == 'ARRAY':
if re.match(r'(?:\w*)(?:\()(\w*)(?:.*))',str(column['type'])):
arr_data_type = re.match(r'(?:\w*)(?:[(]*)(\w*)(?:.*))',str(column['type'])).groups()
data_type_display = column['type']
col_constraint = None
if column['nullable']:
col_constraint = Constraint.NULL
@ -351,20 +364,24 @@ class SQLSource(Source):
col_data_length = column['type'].length
if col_data_length is None:
col_data_length = 1
om_column = Column(
name=column['name'],
description=column.get("comment", None),
dataType=col_type,
dataTypeDisplay="{}({})".format(col_type, col_data_length) if data_type_display
is None else
f"{data_type_display}",
dataLength=col_data_length,
constraint=col_constraint,
ordinalPosition=row_order,
children=children if children is not None else None,
arrayDataType=arr_data_type
)
try:
om_column = Column(
name=column['name'],
description=column.get("comment", None),
dataType=col_type,
dataTypeDisplay="{}({})".format(col_type, col_data_length) if data_type_display
is None else f"{data_type_display}",
dataLength=col_data_length,
constraint=col_constraint,
ordinalPosition=row_order,
children=children if children is not None else None,
arrayDataType=arr_data_type
)
except Exception as err:
logger.error(traceback.format_exc())
logger.error(traceback.print_exc())
logger.error(err)
continue
table_columns.append(om_column)
row_order = row_order + 1
return table_columns

View File

@ -4,7 +4,6 @@ from sqlalchemy.sql import sqltypes as types
from metadata.ingestion.api.source import SourceStatus
def register_custom_type(
tp: Type[types.TypeEngine], output: str = None
) -> None:
@ -36,36 +35,60 @@ _column_type_mapping: Dict[Type[types.TypeEngine], str] = {
types.CHAR: "CHAR"
}
_column_string_mapping = {
"INT": "INT",
"BOOLEAN": "BOOLEAN",
"ENUM": "ENUM",
"BYTES": "BYTES",
"ARRAY": "ARRAY",
"VARCHAR": "VARCHAR",
"STRING": "STRING",
"DATE": "DATE",
"TIME": "TIME",
"DATETIME": "DATETIME",
"TIMESTAMP": "TIMESTAMP",
"NULL": "NULL",
"JSON": "JSON",
"CHAR": "CHAR",
"INT64": "BIGINT",
"CHARACTER VARYING": "VARCHAR",
"VARIANT": "JSON",
"OBJECT": "JSON",
"MAP": "MAP",
"UNION": "UNION"
}
_known_unknown_column_types: Set[Type[types.TypeEngine]] = {
types.Interval,
types.CLOB,
}
def get_column_type(status: SourceStatus, dataset_name: str, column_type: Any) -> str:
type_class: Optional[str] = None
for sql_type in _column_type_mapping.keys():
if isinstance(column_type, sql_type):
type_class = _column_type_mapping[sql_type]
break
if type_class is None:
if type_class is None or type_class == "NULL":
for sql_type in _known_unknown_column_types:
if isinstance(column_type, sql_type):
type_class = "NULL"
break
if type_class is None and column_type in ['CHARACTER VARYING', 'CHAR']:
type_class = 'VARCHAR'
if type_class is None:
for col_type in _column_string_mapping.keys():
if str(column_type) == col_type:
type_class = _column_string_mapping.get(col_type)
break
else:
type_class = None
if type_class is None or type_class == "NULL":
status.warning(
dataset_name, f"unable to map type {column_type!r} to metadata schema"
)
type_class = "NULL"
return type_class
def get_last_index(nested_str):
counter = 1
for index, i in enumerate(nested_str):
@ -82,15 +105,16 @@ def get_last_index(nested_str):
def get_array_type(col_type):
col = {}
col['dataType'] = 'ARRAY'
col_type = col_type[:get_last_index(col_type)+2]
col_type = col_type[:get_last_index(col_type) + 2]
col['dataTypeDisplay'] = col_type
col['arrayDataType'] = re.match(
r'(?:array<)(\w*)(?:.*)', col_type).groups()[0].upper()
return col
def _handle_complex_data_types(status,dataset_name,raw_type: str, level=0):
def _handle_complex_data_types(status, dataset_name, raw_type: str, level=0):
col = {}
if re.match(r'([\w\s]*)(:)(.*)',raw_type):
if re.match(r'([\w\s]*)(:)(.*)', raw_type):
name, col_type = raw_type.lstrip('<').split(':', 1)
col['name'] = name
else:
@ -99,39 +123,41 @@ def _handle_complex_data_types(status,dataset_name,raw_type: str, level=0):
col_type = raw_type
else:
col_type = raw_type.lstrip('<').split(':', 1)[0]
if re.match(r'(struct)(.*)', col_type):
if col_type.startswith('struct<'):
children = []
col_type = re.match(r'(struct<)(.*)', col_type).groups()[1]
struct_type, col_type = re.match(r'(struct<)(.*)', col_type).groups()
pluck_index = get_last_index(col_type)
pluck_nested = col_type[:pluck_index+1]
pluck_nested = col_type[:pluck_index + 1]
col['dataTypeDisplay'] = struct_type + pluck_nested
while pluck_nested != '':
col['dataType'] = 'STRUCT'
plucked = col_type[:get_last_index(col_type)]
counter = 0
continue_next = False
for index,type in enumerate(plucked.split(',')):
for index, type in enumerate(plucked.split(',')):
if continue_next:
continue_next = False
continue
if re.match(r'(\w*)(:)(struct)(.*)',type):
col_name,datatype,rest = re.match(r'(\w*)(?::)(struct)(.*)',','.join(plucked.split(',')[index:])).groups()
type = f"{col_name}:{datatype}{rest[:get_last_index(rest)+2]}"
if re.match(r'(\w*)(:)(struct)(.*)', type):
col_name, datatype, rest = re.match(r'(\w*)(?::)(struct)(.*)',
','.join(plucked.split(',')[index:])).groups()
type = f"{col_name}:{datatype}{rest[:get_last_index(rest) + 2]}"
elif type.startswith('struct'):
datatype,rest = re.match(r'(struct)(.*)',','.join(plucked.split(',')[index:])).groups()
type = f"{datatype}{rest[:get_last_index(rest)+2]}"
elif re.match(r'([\w\s]*)(:?)(map)(.*)',type):
datatype, rest = re.match(r'(struct)(.*)', ','.join(plucked.split(',')[index:])).groups()
type = f"{datatype}{rest[:get_last_index(rest) + 2]}"
elif re.match(r'([\w\s]*)(:?)(map)(.*)', type):
get_map_type = ','.join(plucked.split(',')[index:])
type,col_type = re.match(r'([\w]*:?map<[\w,]*>)(.*)',get_map_type).groups()
type, col_type = re.match(r'([\w]*:?map<[\w,]*>)(.*)', get_map_type).groups()
continue_next = True
elif re.match(r'([\w\s]*)(:?)(uniontype)(.*)',type):
elif re.match(r'([\w\s]*)(:?)(uniontype)(.*)', type):
get_union_type = ','.join(plucked.split(',')[index:])
type,col_type = re.match(r'([\w\s]*:?uniontype<[\w\s,]*>)(.*)',get_union_type).groups()
type, col_type = re.match(r'([\w\s]*:?uniontype<[\w\s,]*>)(.*)', get_union_type).groups()
continue_next = True
children.append(_handle_complex_data_types(status,dataset_name,type,counter))
children.append(_handle_complex_data_types(status, dataset_name, type, counter))
if plucked.endswith(type):
break
counter += 1
pluck_nested = col_type[get_last_index(col_type)+3:]
pluck_nested = col_type[get_last_index(col_type) + 3:]
col['children'] = children
elif col_type.startswith('array'):
col.update(get_array_type(col_type))
@ -146,7 +172,7 @@ def _handle_complex_data_types(status,dataset_name,raw_type: str, level=0):
col['dataLength'] = re.match(r'(?:[\w\s]*)(?:\()([\d]*)(?:\))', col_type).groups()[0]
else:
col['dataLength'] = 1
col['dataType'] = get_column_type(status,dataset_name,re.match('([\w\s]*)(?:.*)',col_type).groups()[0].upper())
col['dataType'] = get_column_type(status, dataset_name,
re.match('([\w\s]*)(?:.*)', col_type).groups()[0].upper())
col['dataTypeDisplay'] = col_type.rstrip('>')
return col