Fix #3419: Databricks struct, array & map supported (#3426)

* Fix #3419: Databricks struct, array & map supported

* optimized code

* added comments
This commit is contained in:
Mayur Singal 2022-03-15 19:13:05 +05:30 committed by GitHub
parent 2a5bea7f5e
commit 3e92693d34
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -9,8 +9,15 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import re
from typing import Optional from typing import Optional
from pyhive.sqlalchemy_hive import _type_map
from sqlalchemy import types, util
from sqlalchemy.engine import reflection
from sqlalchemy.sql.sqltypes import String
from sqlalchemy_databricks._dialect import DatabricksDialect
from metadata.generated.schema.entity.services.databaseService import ( from metadata.generated.schema.entity.services.databaseService import (
DatabaseServiceType, DatabaseServiceType,
) )
@ -19,6 +26,80 @@ from metadata.ingestion.source.sql_source import SQLSource
from metadata.ingestion.source.sql_source_common import SQLConnectionConfig from metadata.ingestion.source.sql_source_common import SQLConnectionConfig
class STRUCT(String):
# This class is added to support STRUCT datatype
"""The SQL STRUCT type."""
__visit_name__ = "STRUCT"
class ARRAY(String):
# This class is added to support ARRAY datatype
"""The SQL ARRAY type."""
__visit_name__ = "ARRAY"
class MAP(String):
# This class is added to support MAP datatype
"""The SQL MAP type."""
__visit_name__ = "MAP"
# overriding pyhive.sqlalchemy_hive._type_map
# mapping struct, array & map to custom classed instead of sqltypes.String
_type_map.update({"struct": STRUCT, "array": ARRAY, "map": MAP})
def _get_column_rows(self, connection, table_name, schema):
# get columns and strip whitespace
table_columns = self._get_table_columns(connection, table_name, schema)
column_rows = [
[col.strip() if col else None for col in row] for row in table_columns
]
# Filter out empty rows and comment
return [row for row in column_rows if row[0] and row[0] != "# col_name"]
@reflection.cache
def get_columns(self, connection, table_name, schema=None, **kw):
# This function overrides the sqlalchemy_databricks._dialect.DatabricksDialect.get_columns
# to add support for struct, array & map datatype
rows = _get_column_rows(self, connection, table_name, schema)
result = []
for (col_name, col_type, _comment) in rows:
# Handle both oss hive and Databricks' hive partition header, respectively
if col_name in ("# Partition Information", "# Partitioning"):
break
# Take out the more detailed type information
# e.g. 'map<ixnt,int>' -> 'map'
# 'decimal(10,1)' -> decimal
raw_data_type = col_type
col_type = re.search(r"^\w+", col_type).group(0)
try:
coltype = _type_map[col_type]
except KeyError:
util.warn(
"Did not recognize type '%s' of column '%s'" % (col_type, col_name)
)
coltype = types.NullType
col_info = {
"name": col_name,
"type": coltype,
"nullable": True,
"default": None,
}
if col_type in {"array", "struct", "map"}:
col_info["raw_data_type"] = raw_data_type
result.append(col_info)
return result
DatabricksDialect.get_columns = get_columns
class DatabricksConfig(SQLConnectionConfig): class DatabricksConfig(SQLConnectionConfig):
host_port: str host_port: str
scheme = "databricks+connector" scheme = "databricks+connector"