From 726da97dd826e6a116b45244cf450a121c6ffb6c Mon Sep 17 00:00:00 2001 From: Mayur Singal <39544459+ulixius9@users.noreply.github.com> Date: Mon, 11 Apr 2022 10:45:59 +0530 Subject: [PATCH] Fix #3654: Fixed TypeError in Deltalake (#3710) * Fix #3654: fixed minor bugs * Fixed table ingestion issues * Added sample script for deltalake * Added Hive Metastore support * Added support for local metastore * removed code smell * Resolved review comments * fixed display data type logic * Fixed Code Smell and Resolved conflicts * Fixed Delta lake source * removed set spark * Updated prepare method * id fix * Metatore exception added --- .../database/deltaLakeConnection.json | 51 ++++ .../entity/services/databaseService.json | 3 + ingestion/examples/workflows/deltalake.json | 27 +-- ingestion/setup.py | 1 + .../metadata/ingestion/source/deltalake.py | 222 +++++++++++++----- 5 files changed, 234 insertions(+), 70 deletions(-) create mode 100644 catalog-rest-service/src/main/resources/json/schema/entity/services/connections/database/deltaLakeConnection.json diff --git a/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/database/deltaLakeConnection.json b/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/database/deltaLakeConnection.json new file mode 100644 index 00000000000..a6114605a56 --- /dev/null +++ b/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/database/deltaLakeConnection.json @@ -0,0 +1,51 @@ +{ + "$id": "https://open-metadata.org/schema/entity/services/connections/database/deltaLakeConnection.json", + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "DeltaLakeConnection", + "description": "DeltaLake Database Connection Config", + "type": "object", + "javaType": "org.openmetadata.catalog.services.connections.database.DeltaLakeConnection", + "definitions": { + "deltaLakeType": { + "description": "Service type.", + "type": "string", + "enum": ["DeltaLake"], + "default": "DeltaLake" + } + }, + "properties": { + "type": { + "description": "Service Type", + "$ref": "#/definitions/deltaLakeType", + "default": "DeltaLake" + }, + "metastoreHostPort": { + "description": "Host and port of remote Hive Metastore.", + "type": "string" + }, + "metastoreFilePath": { + "description": "File path of local Hive Metastore.", + "type": "string" + }, + "appName": { + "description": "pySpark App Name", + "type": "string" + }, + "connectionOptions": { + "$ref": "connectionBasicType.json#/definitions/connectionOptions" + }, + "connectionArguments": { + "$ref": "connectionBasicType.json#/definitions/connectionArguments" + }, + "supportedPipelineTypes": { + "description": "Supported Metadata Extraction Pipelines.", + "type": "string", + "items": { + "type": "string", + "enum": ["Metadata"] + }, + "default": ["Metadata"] + } + }, + "additionalProperties": false +} diff --git a/catalog-rest-service/src/main/resources/json/schema/entity/services/databaseService.json b/catalog-rest-service/src/main/resources/json/schema/entity/services/databaseService.json index 30a5c252673..4191e3ba5cb 100644 --- a/catalog-rest-service/src/main/resources/json/schema/entity/services/databaseService.json +++ b/catalog-rest-service/src/main/resources/json/schema/entity/services/databaseService.json @@ -133,6 +133,9 @@ { "$ref": "./connections/database/db2Connection.json" }, + { + "$ref": "./connections/database/deltaLakeConnection.json" + }, { "$ref": "./connections/database/druidConnection.json" }, diff --git a/ingestion/examples/workflows/deltalake.json b/ingestion/examples/workflows/deltalake.json index 6d237d01876..7eab4507fd2 100644 --- a/ingestion/examples/workflows/deltalake.json +++ b/ingestion/examples/workflows/deltalake.json @@ -1,27 +1,24 @@ { "source": { "type": "deltalake", - "config": { - "platform_name": "deltalake", - "database": "delta", - "service_name": "local_deltalake", - "table_filter_pattern": { - "excludes": ["[\\w]*event_vw.*"] - }, - "schema_filter_pattern": { - "excludes": ["deltalake.*", "information_schema.*", "performance_schema.*", "sys.*"] + "serviceName": "local_deltalake", + "serviceConnection": { + "config": { + "metastoreHostPort": "localhost:9083", + "metastoreFilePath":"/metastore_db", + "appName": "MyApp" } - } + }, + "sourceConfig": {"config": {"enableDataProfiler": false}} }, "sink": { "type": "metadata-rest", "config": {} }, - "metadata_server": { - "type": "metadata-server", - "config": { - "api_endpoint": "http://localhost:8585/api", - "auth_provider_type": "no-auth" + "workflowConfig": { + "openMetadataServerConfig": { + "hostPort": "http://localhost:8585/api", + "authProvider": "no-auth" } } } \ No newline at end of file diff --git a/ingestion/setup.py b/ingestion/setup.py index 8f1312245c9..9956fa2a1dc 100644 --- a/ingestion/setup.py +++ b/ingestion/setup.py @@ -128,6 +128,7 @@ plugins: Dict[str, Set[str]] = { "databricks": {"sqlalchemy-databricks==0.1.0"}, "singlestore": {"pymysql>=1.0.2"}, "azure-sso": {"msal~=1.17.0"}, + "deltalake": {"delta-spark~=1.1.0"}, } dev = { "boto3==1.20.14", diff --git a/ingestion/src/metadata/ingestion/source/deltalake.py b/ingestion/src/metadata/ingestion/source/deltalake.py index c652c3bc2ce..2f98d3c2079 100644 --- a/ingestion/src/metadata/ingestion/source/deltalake.py +++ b/ingestion/src/metadata/ingestion/source/deltalake.py @@ -1,99 +1,149 @@ import logging +import re import uuid -from collections import Iterable -from typing import Any, Dict, List, Optional +from typing import Any, Dict, Iterable, List, Optional +import pyspark +from delta import configure_spark_with_delta_pip from pyspark.sql import SparkSession -from pyspark.sql.catalog import Table -from pyspark.sql.types import ArrayType, MapType, StructField, StructType +from pyspark.sql.catalog import Table as pyTable +from pyspark.sql.types import ArrayType, MapType, StructType from pyspark.sql.utils import AnalysisException, ParseException -from metadata.config.common import FQDN_SEPARATOR, ConfigModel from metadata.generated.schema.entity.data.database import Database -from metadata.generated.schema.entity.data.table import Column, Table -from metadata.generated.schema.entity.services.databaseService import ( - DatabaseServiceType, +from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema +from metadata.generated.schema.entity.data.table import Column, Table, TableType +from metadata.generated.schema.entity.services.connections.database.deltaLakeConnection import ( + DeltaLakeConnection, ) from metadata.generated.schema.metadataIngestion.workflow import ( OpenMetadataServerConfig, ) +from metadata.generated.schema.metadataIngestion.workflow import ( + Source as WorkflowSource, +) from metadata.generated.schema.type.entityReference import EntityReference -from metadata.ingestion.api.common import IncludeFilterPattern -from metadata.ingestion.api.source import Source +from metadata.ingestion.api.common import Entity +from metadata.ingestion.api.source import InvalidSourceException, Source from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable from metadata.ingestion.source.sql_source import SQLSourceStatus +from metadata.utils.column_type_parser import ColumnTypeParser +from metadata.utils.filters import filter_by_schema, filter_by_table from metadata.utils.helpers import get_database_service_or_create logger: logging.Logger = logging.getLogger(__name__) -class DeltaLakeSourceConfig(ConfigModel): - database: str = "delta" - platform_name: str = "deltalake" - schema_filter_pattern: IncludeFilterPattern = IncludeFilterPattern.allow_all() - table_filter_pattern: IncludeFilterPattern = IncludeFilterPattern.allow_all() - service_name: str - service_type: str = DatabaseServiceType.DeltaLake.value - - def get_service_name(self) -> str: - return self.service_name +DEFAULT_DATABASE = "default" -class DeltaLakeSource(Source): +class MetaStoreNotFoundException(Exception): + """ + Metastore is not passed thorugh file or url + """ + + +class DeltalakeSource(Source[Entity]): spark: SparkSession = None def __init__( self, - config: DeltaLakeSourceConfig, + config: WorkflowSource, metadata_config: OpenMetadataServerConfig, ): super().__init__() self.config = config + self.connection_config = config.serviceConnection.__root__.config self.metadata_config = metadata_config self.service = get_database_service_or_create( config=config, metadata_config=metadata_config, - service_name=config.service_name, + service_name=config.serviceName, ) self.status = SQLSourceStatus() + logger.info("Establishing Sparks Session") + builder = ( + pyspark.sql.SparkSession.builder.appName(self.connection_config.appName) + .enableHiveSupport() + .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") + .config( + "spark.sql.catalog.spark_catalog", + "org.apache.spark.sql.delta.catalog.DeltaCatalog", + ) + ) + if self.connection_config.metastoreHostPort: + builder.config( + "hive.metastore.uris", + f"thrift://{self.connection_config.metastoreHostPort}", + ) + elif self.connection_config.metastoreFilePath: + builder.config( + "spark.sql.warehouse.dir", f"{self.connection_config.metastoreFilePath}" + ) + self.spark = configure_spark_with_delta_pip(builder).getOrCreate() + self.table_type_map = { + TableType.External.value.lower(): TableType.External.value, + TableType.View.value.lower(): TableType.View.value, + TableType.SecureView.value.lower(): TableType.SecureView.value, + TableType.Iceberg.value.lower(): TableType.Iceberg.value, + } + self.array_datatype_replace_map = {"(": "<", ")": ">", "=": ":", "<>": ""} + self.ARRAY_CHILD_START_INDEX = 6 + self.ARRAY_CHILD_END_INDEX = -1 @classmethod - def create(cls, config_dict: dict, metadata_config: OpenMetadataServerConfig): - config = DeltaLakeSourceConfig.parse_obj(config_dict) + def create(cls, config_dict, metadata_config: OpenMetadataServerConfig): + config: WorkflowSource = WorkflowSource.parse_obj(config_dict) + connection: DeltaLakeConnection = config.serviceConnection.__root__.config + if not isinstance(connection, DeltaLakeConnection): + raise InvalidSourceException( + f"Expected DeltaLakeConnection, but got {connection}" + ) + if not connection.metastoreFilePath and not connection.metastoreHostPort: + raise MetaStoreNotFoundException( + "Either of metastoreFilePath or metastoreHostPort is required" + ) return cls(config, metadata_config) def next_record(self) -> Iterable[OMetaDatabaseAndTable]: schemas = self.spark.catalog.listDatabases() for schema in schemas: - if not self.config.schema_filter_pattern.included(schema): - self.status.filter(schema, "Schema pattern not allowed") + if filter_by_schema( + self.config.sourceConfig.config.schemaFilterPattern, schema.name + ): + self.status.filter(schema.name, "Schema pattern not allowed") continue - yield from self.fetch_tables(schema) + yield from self.fetch_tables(schema.name) + + def get_status(self): + return self.status + + def prepare(self): + pass + + def _get_table_type(self, table_type: str): + return self.table_type_map.get(table_type.lower(), TableType.Regular.value) def fetch_tables(self, schema: str) -> Iterable[OMetaDatabaseAndTable]: for table in self.spark.catalog.listTables(schema): try: - database = table.database table_name = table.name - if not self.config.table_filter_pattern.included(table_name): + if filter_by_table( + self.config.sourceConfig.config.tableFilterPattern, table_name + ): self.status.filter( - "{}.{}".format(self.config.get_service_name(), table_name), + "{}.{}".format(self.config.serviceName, table_name), "Table pattern not allowed", ) continue - self.status.scanned( - "{}.{}".format(self.config.get_service_name(), table_name) - ) + self.status.scanned("{}.{}".format(self.config.serviceName, table_name)) table_columns = self._fetch_columns(schema, table_name) - fqn = f"{self.config.service_name}{FQDN_SEPARATOR}{self.config.database}{FQDN_SEPARATOR}{schema}{FQDN_SEPARATOR}{table_name}" if table.tableType and table.tableType.lower() != "view": - table_description = self._fetch_table_description(table_name) table_entity = Table( id=uuid.uuid4(), name=table_name, - tableType=table.tableType, - description=table_description, - fullyQualifiedName=fqn, + tableType=self._get_table_type(table.tableType), + description=table.description, columns=table_columns, ) else: @@ -101,27 +151,41 @@ class DeltaLakeSource(Source): table_entity = Table( id=uuid.uuid4(), name=table_name, - tableType=table.tableType, - description=" ", - fullyQualifiedName=fqn, + tableType=self._get_table_type(table.tableType), + description=table.description, columns=table_columns, viewDefinition=view_definition, ) + database = self._get_database() table_and_db = OMetaDatabaseAndTable( - table=table_entity, database=self._get_database(schema) + table=table_entity, + database=database, + database_schema=self._get_database_schema(database, schema), ) yield table_and_db except Exception as err: logger.error(err) self.status.warnings.append( - "{}.{}".format(self.config.service_name, table.name) + "{}.{}".format(self.config.serviceName, table.name) ) - def _get_database(self, schema: str) -> Database: + def _get_database(self) -> Database: return Database( + id=uuid.uuid4(), + name=DEFAULT_DATABASE, + service=EntityReference( + id=self.service.id, type=self.connection_config.type.value + ), + ) + + def _get_database_schema(self, database: Database, schema: str) -> DatabaseSchema: + return DatabaseSchema( name=schema, - service=EntityReference(id=self.service.id, type=self.config.service_type), + service=EntityReference( + id=self.service.id, type=self.connection_config.type.value + ), + database=EntityReference(id=database.id, type="database"), ) def _fetch_table_description(self, table_name: str) -> Optional[Dict]: @@ -148,7 +212,62 @@ class DeltaLakeSource(Source): view_detail[row_dict["col_name"]] = row_dict["data_type"] if "# Detailed Table" in row_dict["col_name"]: col_details = True - return view_detail + return view_detail.get("View Text") + + def _check_col_length(self, datatype, col_raw_type): + if datatype and datatype.upper() in {"CHAR", "VARCHAR", "BINARY", "VARBINARY"}: + try: + return col_raw_type.length if col_raw_type.length else 1 + except AttributeError: + return 1 + + def _get_display_data_type(self, row): + display_data_type = repr(row["data_type"]).lower() + for original, new in self.array_datatype_replace_map.items(): + display_data_type = display_data_type.replace(original, new) + return display_data_type + + def _get_col_info(self, row): + parsed_string = ColumnTypeParser._parse_datatype_string(row["data_type"]) + column = None + if parsed_string: + parsed_string["dataLength"] = self._check_col_length( + parsed_string["dataType"], row["data_type"] + ) + if row["data_type"] == "array": + array_data_type_display = self._get_display_data_type(row) + parsed_string["dataTypeDisplay"] = array_data_type_display + # Parse Primitive Datatype string + # if Datatype is Arrya(int) -> Parse int + parsed_string[ + "arrayDataType" + ] = ColumnTypeParser._parse_primitive_datatype_string( + array_data_type_display[ + self.ARRAY_CHILD_START_INDEX : self.ARRAY_CHILD_END_INDEX + ] + )[ + "dataType" + ] + column = Column(name=row["col_name"], **parsed_string) + else: + col_type = re.search(r"^\w+", row["data_type"]).group(0) + charlen = re.search(r"\(([\d]+)\)", row["data_type"]) + if charlen: + charlen = int(charlen.group(1)) + if ( + col_type.upper() in {"CHAR", "VARCHAR", "VARBINARY", "BINARY"} + and charlen is None + ): + charlen = 1 + + column = Column( + name=row["col_name"], + description=row.get("comment"), + dataType=col_type, + dataLength=charlen, + displayName=row["data_type"], + ) + return column def _fetch_columns(self, schema: str, table: str) -> List[Column]: raw_columns = [] @@ -163,21 +282,14 @@ class DeltaLakeSource(Source): return [] parsed_columns: [Column] = [] partition_cols = False - row_order = 0 for row in raw_columns: col_name = row["col_name"] if col_name == "" or "#" in col_name: partition_cols = True continue if not partition_cols: - column = Column( - name=row["col_name"], - description=row["comment"] if row["comment"] else None, - data_type=row["data_type"], - ordinal_position=row_order, - ) + column = self._get_col_info(row) parsed_columns.append(column) - row_order += 1 return parsed_columns