From 3e92693d3486c1a21d42b5d594038d4b5342dcee Mon Sep 17 00:00:00 2001 From: Mayur Singal <39544459+ulixius9@users.noreply.github.com> Date: Tue, 15 Mar 2022 19:13:05 +0530 Subject: [PATCH] Fix #3419: Databricks struct, array & map supported (#3426) * Fix #3419: Databricks struct, array & map supported * optimized code * added comments --- .../metadata/ingestion/source/databricks.py | 81 +++++++++++++++++++ 1 file changed, 81 insertions(+) diff --git a/ingestion/src/metadata/ingestion/source/databricks.py b/ingestion/src/metadata/ingestion/source/databricks.py index 0f610fa1ed0..56d2b8157aa 100644 --- a/ingestion/src/metadata/ingestion/source/databricks.py +++ b/ingestion/src/metadata/ingestion/source/databricks.py @@ -9,8 +9,15 @@ # See the License for the specific language governing permissions and # limitations under the License. +import re from typing import Optional +from pyhive.sqlalchemy_hive import _type_map +from sqlalchemy import types, util +from sqlalchemy.engine import reflection +from sqlalchemy.sql.sqltypes import String +from sqlalchemy_databricks._dialect import DatabricksDialect + from metadata.generated.schema.entity.services.databaseService import ( DatabaseServiceType, ) @@ -19,6 +26,80 @@ from metadata.ingestion.source.sql_source import SQLSource from metadata.ingestion.source.sql_source_common import SQLConnectionConfig +class STRUCT(String): + # This class is added to support STRUCT datatype + """The SQL STRUCT type.""" + + __visit_name__ = "STRUCT" + + +class ARRAY(String): + # This class is added to support ARRAY datatype + """The SQL ARRAY type.""" + + __visit_name__ = "ARRAY" + + +class MAP(String): + # This class is added to support MAP datatype + """The SQL MAP type.""" + + __visit_name__ = "MAP" + + +# overriding pyhive.sqlalchemy_hive._type_map +# mapping struct, array & map to custom classed instead of sqltypes.String +_type_map.update({"struct": STRUCT, "array": ARRAY, "map": MAP}) + + +def _get_column_rows(self, connection, table_name, schema): + # get columns and strip whitespace + table_columns = self._get_table_columns(connection, table_name, schema) + column_rows = [ + [col.strip() if col else None for col in row] for row in table_columns + ] + # Filter out empty rows and comment + return [row for row in column_rows if row[0] and row[0] != "# col_name"] + + +@reflection.cache +def get_columns(self, connection, table_name, schema=None, **kw): + # This function overrides the sqlalchemy_databricks._dialect.DatabricksDialect.get_columns + # to add support for struct, array & map datatype + rows = _get_column_rows(self, connection, table_name, schema) + result = [] + for (col_name, col_type, _comment) in rows: + # Handle both oss hive and Databricks' hive partition header, respectively + if col_name in ("# Partition Information", "# Partitioning"): + break + # Take out the more detailed type information + # e.g. 'map' -> 'map' + # 'decimal(10,1)' -> decimal + raw_data_type = col_type + col_type = re.search(r"^\w+", col_type).group(0) + try: + coltype = _type_map[col_type] + except KeyError: + util.warn( + "Did not recognize type '%s' of column '%s'" % (col_type, col_name) + ) + coltype = types.NullType + + col_info = { + "name": col_name, + "type": coltype, + "nullable": True, + "default": None, + } + if col_type in {"array", "struct", "map"}: + col_info["raw_data_type"] = raw_data_type + result.append(col_info) + return result + + +DatabricksDialect.get_columns = get_columns + + class DatabricksConfig(SQLConnectionConfig): host_port: str scheme = "databricks+connector"