mirror of
				https://github.com/datahub-project/datahub.git
				synced 2025-10-31 02:37:05 +00:00 
			
		
		
		
	fix(ingest): databricks - ingest structs correctly through hive (#5223)
This commit is contained in:
		
							parent
							
								
									8c8f1b987a
								
							
						
					
					
						commit
						141f33508c
					
				| @ -1,4 +1,5 @@ | ||||
| import json | ||||
| import logging | ||||
| import re | ||||
| from typing import Any, Dict, List, Optional | ||||
| 
 | ||||
| @ -33,10 +34,61 @@ from datahub.metadata.com.linkedin.pegasus2avro.schema import ( | ||||
| from datahub.utilities import config_clean | ||||
| from datahub.utilities.hive_schema_to_avro import get_avro_schema_for_hive_column | ||||
| 
 | ||||
| logger = logging.getLogger(__name__) | ||||
| 
 | ||||
| register_custom_type(HiveDate, DateTypeClass) | ||||
| register_custom_type(HiveTimestamp, TimeTypeClass) | ||||
| register_custom_type(HiveDecimal, NumberTypeClass) | ||||
| 
 | ||||
| try: | ||||
| 
 | ||||
|     from databricks_dbapi.sqlalchemy_dialects.hive import DatabricksPyhiveDialect | ||||
|     from pyhive.sqlalchemy_hive import _type_map | ||||
|     from sqlalchemy import types, util | ||||
|     from sqlalchemy.engine import reflection | ||||
| 
 | ||||
|     @reflection.cache  # type: ignore | ||||
|     def dbapi_get_columns_patched(self, connection, table_name, schema=None, **kw): | ||||
|         """Patches the get_columns method from dbapi (databricks_dbapi.sqlalchemy_dialects.base) to pass the native type through""" | ||||
|         rows = self._get_table_columns(connection, table_name, schema) | ||||
|         # Strip whitespace | ||||
|         rows = [[col.strip() if col else None for col in row] for row in rows] | ||||
|         # Filter out empty rows and comment | ||||
|         rows = [row for row in rows if row[0] and row[0] != "# col_name"] | ||||
|         result = [] | ||||
|         for (col_name, col_type, _comment) in rows: | ||||
|             # Handle both oss hive and Databricks' hive partition header, respectively | ||||
|             if col_name in ("# Partition Information", "# Partitioning"): | ||||
|                 break | ||||
|             # Take out the more detailed type information | ||||
|             # e.g. 'map<int,int>' -> 'map' | ||||
|             #      'decimal(10,1)' -> decimal | ||||
|             orig_col_type = col_type  # keep a copy | ||||
|             col_type = re.search(r"^\w+", col_type).group(0)  # type: ignore | ||||
|             try: | ||||
|                 coltype = _type_map[col_type] | ||||
|             except KeyError: | ||||
|                 util.warn( | ||||
|                     "Did not recognize type '%s' of column '%s'" % (col_type, col_name) | ||||
|                 ) | ||||
|                 coltype = types.NullType  # type: ignore | ||||
|             result.append( | ||||
|                 { | ||||
|                     "name": col_name, | ||||
|                     "type": coltype, | ||||
|                     "nullable": True, | ||||
|                     "default": None, | ||||
|                     "full_type": orig_col_type,  # pass it through | ||||
|                 } | ||||
|             ) | ||||
|         return result | ||||
| 
 | ||||
|     DatabricksPyhiveDialect.get_columns = dbapi_get_columns_patched | ||||
| except ModuleNotFoundError: | ||||
|     pass | ||||
| except Exception as e: | ||||
|     logger.warning(f"Failed to patch method due to {e}") | ||||
| 
 | ||||
| 
 | ||||
| class HiveConfig(BasicSQLAlchemyConfig): | ||||
|     # defaults | ||||
|  | ||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user
	 Shirshanka Das
						Shirshanka Das