diff --git a/ingestion/src/metadata/ingestion/source/presto.py b/ingestion/src/metadata/ingestion/source/presto.py index 4660ec3a4cf..cf829c424f9 100644 --- a/ingestion/src/metadata/ingestion/source/presto.py +++ b/ingestion/src/metadata/ingestion/source/presto.py @@ -9,8 +9,13 @@ # See the License for the specific language governing permissions and # limitations under the License. +import re from urllib.parse import quote_plus +from pyhive.sqlalchemy_presto import PrestoDialect, _type_map +from sqlalchemy import types, util +from sqlalchemy.engine import reflection + from metadata.generated.schema.entity.services.databaseService import ( DatabaseServiceType, ) @@ -18,6 +23,54 @@ from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig from metadata.ingestion.source.sql_source import SQLSource from metadata.ingestion.source.sql_source_common import SQLConnectionConfig +_type_map.update( + { + "char": types.CHAR, + "decimal": types.Float, + "time": types.TIME, + "varchar": types.VARCHAR, + } +) + + +@reflection.cache +def get_columns(self, connection, table_name, schema=None, **kw): + rows = self._get_table_columns(connection, table_name, schema) + result = [] + for row in rows: + try: + # Take out the more detailed type information + # e.g. 'map' -> 'map' + # 'decimal(10,1)' -> decimal + col_type = re.search(r"^\w+", row.Type).group(0) + coltype = _type_map[col_type] + + charlen = re.search(r"\(([\d]+)\)", row.Type) + if charlen: + charlen = charlen.group(1) + args = (int(charlen),) + coltype = coltype( + *args, + ) + except KeyError: + util.warn( + "Did not recognize type '%s' of column '%s'" % (col_type, row.Column) + ) + coltype = types.NullType + result.append( + { + "name": row.Column, + "type": coltype, + # newer Presto no longer includes this column + "nullable": getattr(row, "Null", True), + "default": None, + } + ) + return result + + +PrestoDialect.get_columns = get_columns + class PrestoConfig(SQLConnectionConfig): host_port = "localhost:8080"