Fix #3654: Fixed TypeError in Deltalake (#3710)

* Fix #3654: fixed minor bugs

* Fixed table ingestion issues

* Added sample script for deltalake

* Added Hive Metastore support

* Added support for local metastore

* removed code smell

* Resolved review comments

* fixed display data type logic

* Fixed Code Smell and Resolved conflicts

* Fixed Delta lake source

* removed set spark

* Updated prepare method

* id fix

* Metatore exception added
This commit is contained in:
Mayur Singal 2022-04-11 10:45:59 +05:30 committed by GitHub
parent 5d2a5c237f
commit 726da97dd8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 234 additions and 70 deletions

View File

@ -0,0 +1,51 @@
{
"$id": "https://open-metadata.org/schema/entity/services/connections/database/deltaLakeConnection.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "DeltaLakeConnection",
"description": "DeltaLake Database Connection Config",
"type": "object",
"javaType": "org.openmetadata.catalog.services.connections.database.DeltaLakeConnection",
"definitions": {
"deltaLakeType": {
"description": "Service type.",
"type": "string",
"enum": ["DeltaLake"],
"default": "DeltaLake"
}
},
"properties": {
"type": {
"description": "Service Type",
"$ref": "#/definitions/deltaLakeType",
"default": "DeltaLake"
},
"metastoreHostPort": {
"description": "Host and port of remote Hive Metastore.",
"type": "string"
},
"metastoreFilePath": {
"description": "File path of local Hive Metastore.",
"type": "string"
},
"appName": {
"description": "pySpark App Name",
"type": "string"
},
"connectionOptions": {
"$ref": "connectionBasicType.json#/definitions/connectionOptions"
},
"connectionArguments": {
"$ref": "connectionBasicType.json#/definitions/connectionArguments"
},
"supportedPipelineTypes": {
"description": "Supported Metadata Extraction Pipelines.",
"type": "string",
"items": {
"type": "string",
"enum": ["Metadata"]
},
"default": ["Metadata"]
}
},
"additionalProperties": false
}

View File

@ -133,6 +133,9 @@
{ {
"$ref": "./connections/database/db2Connection.json" "$ref": "./connections/database/db2Connection.json"
}, },
{
"$ref": "./connections/database/deltaLakeConnection.json"
},
{ {
"$ref": "./connections/database/druidConnection.json" "$ref": "./connections/database/druidConnection.json"
}, },

View File

@ -1,27 +1,24 @@
{ {
"source": { "source": {
"type": "deltalake", "type": "deltalake",
"config": { "serviceName": "local_deltalake",
"platform_name": "deltalake", "serviceConnection": {
"database": "delta", "config": {
"service_name": "local_deltalake", "metastoreHostPort": "localhost:9083",
"table_filter_pattern": { "metastoreFilePath":"<path_to_metastore>/metastore_db",
"excludes": ["[\\w]*event_vw.*"] "appName": "MyApp"
},
"schema_filter_pattern": {
"excludes": ["deltalake.*", "information_schema.*", "performance_schema.*", "sys.*"]
} }
} },
"sourceConfig": {"config": {"enableDataProfiler": false}}
}, },
"sink": { "sink": {
"type": "metadata-rest", "type": "metadata-rest",
"config": {} "config": {}
}, },
"metadata_server": { "workflowConfig": {
"type": "metadata-server", "openMetadataServerConfig": {
"config": { "hostPort": "http://localhost:8585/api",
"api_endpoint": "http://localhost:8585/api", "authProvider": "no-auth"
"auth_provider_type": "no-auth"
} }
} }
} }

View File

@ -128,6 +128,7 @@ plugins: Dict[str, Set[str]] = {
"databricks": {"sqlalchemy-databricks==0.1.0"}, "databricks": {"sqlalchemy-databricks==0.1.0"},
"singlestore": {"pymysql>=1.0.2"}, "singlestore": {"pymysql>=1.0.2"},
"azure-sso": {"msal~=1.17.0"}, "azure-sso": {"msal~=1.17.0"},
"deltalake": {"delta-spark~=1.1.0"},
} }
dev = { dev = {
"boto3==1.20.14", "boto3==1.20.14",

View File

@ -1,99 +1,149 @@
import logging import logging
import re
import uuid import uuid
from collections import Iterable from typing import Any, Dict, Iterable, List, Optional
from typing import Any, Dict, List, Optional
import pyspark
from delta import configure_spark_with_delta_pip
from pyspark.sql import SparkSession from pyspark.sql import SparkSession
from pyspark.sql.catalog import Table from pyspark.sql.catalog import Table as pyTable
from pyspark.sql.types import ArrayType, MapType, StructField, StructType from pyspark.sql.types import ArrayType, MapType, StructType
from pyspark.sql.utils import AnalysisException, ParseException from pyspark.sql.utils import AnalysisException, ParseException
from metadata.config.common import FQDN_SEPARATOR, ConfigModel
from metadata.generated.schema.entity.data.database import Database from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.table import Column, Table from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
from metadata.generated.schema.entity.services.databaseService import ( from metadata.generated.schema.entity.data.table import Column, Table, TableType
DatabaseServiceType, from metadata.generated.schema.entity.services.connections.database.deltaLakeConnection import (
DeltaLakeConnection,
) )
from metadata.generated.schema.metadataIngestion.workflow import ( from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataServerConfig, OpenMetadataServerConfig,
) )
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.generated.schema.type.entityReference import EntityReference from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.common import IncludeFilterPattern from metadata.ingestion.api.common import Entity
from metadata.ingestion.api.source import Source from metadata.ingestion.api.source import InvalidSourceException, Source
from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable
from metadata.ingestion.source.sql_source import SQLSourceStatus from metadata.ingestion.source.sql_source import SQLSourceStatus
from metadata.utils.column_type_parser import ColumnTypeParser
from metadata.utils.filters import filter_by_schema, filter_by_table
from metadata.utils.helpers import get_database_service_or_create from metadata.utils.helpers import get_database_service_or_create
logger: logging.Logger = logging.getLogger(__name__) logger: logging.Logger = logging.getLogger(__name__)
class DeltaLakeSourceConfig(ConfigModel): DEFAULT_DATABASE = "default"
database: str = "delta"
platform_name: str = "deltalake"
schema_filter_pattern: IncludeFilterPattern = IncludeFilterPattern.allow_all()
table_filter_pattern: IncludeFilterPattern = IncludeFilterPattern.allow_all()
service_name: str
service_type: str = DatabaseServiceType.DeltaLake.value
def get_service_name(self) -> str:
return self.service_name
class DeltaLakeSource(Source): class MetaStoreNotFoundException(Exception):
"""
Metastore is not passed thorugh file or url
"""
class DeltalakeSource(Source[Entity]):
spark: SparkSession = None spark: SparkSession = None
def __init__( def __init__(
self, self,
config: DeltaLakeSourceConfig, config: WorkflowSource,
metadata_config: OpenMetadataServerConfig, metadata_config: OpenMetadataServerConfig,
): ):
super().__init__() super().__init__()
self.config = config self.config = config
self.connection_config = config.serviceConnection.__root__.config
self.metadata_config = metadata_config self.metadata_config = metadata_config
self.service = get_database_service_or_create( self.service = get_database_service_or_create(
config=config, config=config,
metadata_config=metadata_config, metadata_config=metadata_config,
service_name=config.service_name, service_name=config.serviceName,
) )
self.status = SQLSourceStatus() self.status = SQLSourceStatus()
logger.info("Establishing Sparks Session")
builder = (
pyspark.sql.SparkSession.builder.appName(self.connection_config.appName)
.enableHiveSupport()
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config(
"spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.delta.catalog.DeltaCatalog",
)
)
if self.connection_config.metastoreHostPort:
builder.config(
"hive.metastore.uris",
f"thrift://{self.connection_config.metastoreHostPort}",
)
elif self.connection_config.metastoreFilePath:
builder.config(
"spark.sql.warehouse.dir", f"{self.connection_config.metastoreFilePath}"
)
self.spark = configure_spark_with_delta_pip(builder).getOrCreate()
self.table_type_map = {
TableType.External.value.lower(): TableType.External.value,
TableType.View.value.lower(): TableType.View.value,
TableType.SecureView.value.lower(): TableType.SecureView.value,
TableType.Iceberg.value.lower(): TableType.Iceberg.value,
}
self.array_datatype_replace_map = {"(": "<", ")": ">", "=": ":", "<>": ""}
self.ARRAY_CHILD_START_INDEX = 6
self.ARRAY_CHILD_END_INDEX = -1
@classmethod @classmethod
def create(cls, config_dict: dict, metadata_config: OpenMetadataServerConfig): def create(cls, config_dict, metadata_config: OpenMetadataServerConfig):
config = DeltaLakeSourceConfig.parse_obj(config_dict) config: WorkflowSource = WorkflowSource.parse_obj(config_dict)
connection: DeltaLakeConnection = config.serviceConnection.__root__.config
if not isinstance(connection, DeltaLakeConnection):
raise InvalidSourceException(
f"Expected DeltaLakeConnection, but got {connection}"
)
if not connection.metastoreFilePath and not connection.metastoreHostPort:
raise MetaStoreNotFoundException(
"Either of metastoreFilePath or metastoreHostPort is required"
)
return cls(config, metadata_config) return cls(config, metadata_config)
def next_record(self) -> Iterable[OMetaDatabaseAndTable]: def next_record(self) -> Iterable[OMetaDatabaseAndTable]:
schemas = self.spark.catalog.listDatabases() schemas = self.spark.catalog.listDatabases()
for schema in schemas: for schema in schemas:
if not self.config.schema_filter_pattern.included(schema): if filter_by_schema(
self.status.filter(schema, "Schema pattern not allowed") self.config.sourceConfig.config.schemaFilterPattern, schema.name
):
self.status.filter(schema.name, "Schema pattern not allowed")
continue continue
yield from self.fetch_tables(schema) yield from self.fetch_tables(schema.name)
def get_status(self):
return self.status
def prepare(self):
pass
def _get_table_type(self, table_type: str):
return self.table_type_map.get(table_type.lower(), TableType.Regular.value)
def fetch_tables(self, schema: str) -> Iterable[OMetaDatabaseAndTable]: def fetch_tables(self, schema: str) -> Iterable[OMetaDatabaseAndTable]:
for table in self.spark.catalog.listTables(schema): for table in self.spark.catalog.listTables(schema):
try: try:
database = table.database
table_name = table.name table_name = table.name
if not self.config.table_filter_pattern.included(table_name): if filter_by_table(
self.config.sourceConfig.config.tableFilterPattern, table_name
):
self.status.filter( self.status.filter(
"{}.{}".format(self.config.get_service_name(), table_name), "{}.{}".format(self.config.serviceName, table_name),
"Table pattern not allowed", "Table pattern not allowed",
) )
continue continue
self.status.scanned( self.status.scanned("{}.{}".format(self.config.serviceName, table_name))
"{}.{}".format(self.config.get_service_name(), table_name)
)
table_columns = self._fetch_columns(schema, table_name) table_columns = self._fetch_columns(schema, table_name)
fqn = f"{self.config.service_name}{FQDN_SEPARATOR}{self.config.database}{FQDN_SEPARATOR}{schema}{FQDN_SEPARATOR}{table_name}"
if table.tableType and table.tableType.lower() != "view": if table.tableType and table.tableType.lower() != "view":
table_description = self._fetch_table_description(table_name)
table_entity = Table( table_entity = Table(
id=uuid.uuid4(), id=uuid.uuid4(),
name=table_name, name=table_name,
tableType=table.tableType, tableType=self._get_table_type(table.tableType),
description=table_description, description=table.description,
fullyQualifiedName=fqn,
columns=table_columns, columns=table_columns,
) )
else: else:
@ -101,27 +151,41 @@ class DeltaLakeSource(Source):
table_entity = Table( table_entity = Table(
id=uuid.uuid4(), id=uuid.uuid4(),
name=table_name, name=table_name,
tableType=table.tableType, tableType=self._get_table_type(table.tableType),
description=" ", description=table.description,
fullyQualifiedName=fqn,
columns=table_columns, columns=table_columns,
viewDefinition=view_definition, viewDefinition=view_definition,
) )
database = self._get_database()
table_and_db = OMetaDatabaseAndTable( table_and_db = OMetaDatabaseAndTable(
table=table_entity, database=self._get_database(schema) table=table_entity,
database=database,
database_schema=self._get_database_schema(database, schema),
) )
yield table_and_db yield table_and_db
except Exception as err: except Exception as err:
logger.error(err) logger.error(err)
self.status.warnings.append( self.status.warnings.append(
"{}.{}".format(self.config.service_name, table.name) "{}.{}".format(self.config.serviceName, table.name)
) )
def _get_database(self, schema: str) -> Database: def _get_database(self) -> Database:
return Database( return Database(
id=uuid.uuid4(),
name=DEFAULT_DATABASE,
service=EntityReference(
id=self.service.id, type=self.connection_config.type.value
),
)
def _get_database_schema(self, database: Database, schema: str) -> DatabaseSchema:
return DatabaseSchema(
name=schema, name=schema,
service=EntityReference(id=self.service.id, type=self.config.service_type), service=EntityReference(
id=self.service.id, type=self.connection_config.type.value
),
database=EntityReference(id=database.id, type="database"),
) )
def _fetch_table_description(self, table_name: str) -> Optional[Dict]: def _fetch_table_description(self, table_name: str) -> Optional[Dict]:
@ -148,7 +212,62 @@ class DeltaLakeSource(Source):
view_detail[row_dict["col_name"]] = row_dict["data_type"] view_detail[row_dict["col_name"]] = row_dict["data_type"]
if "# Detailed Table" in row_dict["col_name"]: if "# Detailed Table" in row_dict["col_name"]:
col_details = True col_details = True
return view_detail return view_detail.get("View Text")
def _check_col_length(self, datatype, col_raw_type):
if datatype and datatype.upper() in {"CHAR", "VARCHAR", "BINARY", "VARBINARY"}:
try:
return col_raw_type.length if col_raw_type.length else 1
except AttributeError:
return 1
def _get_display_data_type(self, row):
display_data_type = repr(row["data_type"]).lower()
for original, new in self.array_datatype_replace_map.items():
display_data_type = display_data_type.replace(original, new)
return display_data_type
def _get_col_info(self, row):
parsed_string = ColumnTypeParser._parse_datatype_string(row["data_type"])
column = None
if parsed_string:
parsed_string["dataLength"] = self._check_col_length(
parsed_string["dataType"], row["data_type"]
)
if row["data_type"] == "array":
array_data_type_display = self._get_display_data_type(row)
parsed_string["dataTypeDisplay"] = array_data_type_display
# Parse Primitive Datatype string
# if Datatype is Arrya(int) -> Parse int
parsed_string[
"arrayDataType"
] = ColumnTypeParser._parse_primitive_datatype_string(
array_data_type_display[
self.ARRAY_CHILD_START_INDEX : self.ARRAY_CHILD_END_INDEX
]
)[
"dataType"
]
column = Column(name=row["col_name"], **parsed_string)
else:
col_type = re.search(r"^\w+", row["data_type"]).group(0)
charlen = re.search(r"\(([\d]+)\)", row["data_type"])
if charlen:
charlen = int(charlen.group(1))
if (
col_type.upper() in {"CHAR", "VARCHAR", "VARBINARY", "BINARY"}
and charlen is None
):
charlen = 1
column = Column(
name=row["col_name"],
description=row.get("comment"),
dataType=col_type,
dataLength=charlen,
displayName=row["data_type"],
)
return column
def _fetch_columns(self, schema: str, table: str) -> List[Column]: def _fetch_columns(self, schema: str, table: str) -> List[Column]:
raw_columns = [] raw_columns = []
@ -163,21 +282,14 @@ class DeltaLakeSource(Source):
return [] return []
parsed_columns: [Column] = [] parsed_columns: [Column] = []
partition_cols = False partition_cols = False
row_order = 0
for row in raw_columns: for row in raw_columns:
col_name = row["col_name"] col_name = row["col_name"]
if col_name == "" or "#" in col_name: if col_name == "" or "#" in col_name:
partition_cols = True partition_cols = True
continue continue
if not partition_cols: if not partition_cols:
column = Column( column = self._get_col_info(row)
name=row["col_name"],
description=row["comment"] if row["comment"] else None,
data_type=row["data_type"],
ordinal_position=row_order,
)
parsed_columns.append(column) parsed_columns.append(column)
row_order += 1
return parsed_columns return parsed_columns