diff --git a/ingestion/Dockerfile b/ingestion/Dockerfile index dfd14c79af5..4fbd33f40b1 100644 --- a/ingestion/Dockerfile +++ b/ingestion/Dockerfile @@ -1,7 +1,7 @@ FROM python:3.9-slim as base ENV AIRFLOW_HOME=/airflow RUN apt-get update && \ - apt-get install -y gcc libsasl2-modules libsasl2-dev curl build-essential libssl-dev libffi-dev librdkafka-dev unixodbc-dev python3.9-dev libevent-dev wget --no-install-recommends && \ + apt-get install -y gcc libsasl2-modules libsasl2-dev curl build-essential libssl-dev libffi-dev librdkafka-dev unixodbc-dev python3.9-dev openjdk-11-jre libevent-dev wget --no-install-recommends && \ rm -rf /var/lib/apt/lists/* # RUN wget https://github.com/open-metadata/openmetadata-airflow-apis/releases/download/0.1/openmetadata-airflow-apis-plugin.tar.gz # RUN tar zxvf openmetadata-airflow-apis-plugin.tar.gz diff --git a/ingestion/src/metadata/ingestion/source/deltalake.py b/ingestion/src/metadata/ingestion/source/deltalake.py index e7295666f42..f4747382f38 100644 --- a/ingestion/src/metadata/ingestion/source/deltalake.py +++ b/ingestion/src/metadata/ingestion/source/deltalake.py @@ -30,6 +30,7 @@ from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.sql_source import SQLSourceStatus from metadata.utils.column_type_parser import ColumnTypeParser +from metadata.utils.connections import get_connection, test_connection from metadata.utils.filters import filter_by_schema, filter_by_table logger: logging.Logger = logging.getLogger(__name__) @@ -54,33 +55,17 @@ class DeltalakeSource(Source[Entity]): ): super().__init__() self.config = config - self.connection_config = config.serviceConnection.__root__.config self.metadata_config = metadata_config self.metadata = OpenMetadata(metadata_config) + self.service_connection = self.config.serviceConnection.__root__.config self.service = self.metadata.get_service_or_create( entity=DatabaseService, config=config ) + self.connection = get_connection(self.service_connection) + self.status = SQLSourceStatus() logger.info("Establishing Sparks Session") - builder = ( - pyspark.sql.SparkSession.builder.appName(self.connection_config.appName) - .enableHiveSupport() - .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") - .config( - "spark.sql.catalog.spark_catalog", - "org.apache.spark.sql.delta.catalog.DeltaCatalog", - ) - ) - if self.connection_config.metastoreHostPort: - builder.config( - "hive.metastore.uris", - f"thrift://{self.connection_config.metastoreHostPort}", - ) - elif self.connection_config.metastoreFilePath: - builder.config( - "spark.sql.warehouse.dir", f"{self.connection_config.metastoreFilePath}" - ) - self.spark = configure_spark_with_delta_pip(builder).getOrCreate() + self.spark = self.connection.client self.table_type_map = { TableType.External.value.lower(): TableType.External.value, TableType.View.value.lower(): TableType.View.value, @@ -175,7 +160,7 @@ class DeltalakeSource(Source[Entity]): id=uuid.uuid4(), name=DEFAULT_DATABASE, service=EntityReference( - id=self.service.id, type=self.connection_config.type.value + id=self.service.id, type=self.service_connection.type.value ), ) @@ -183,7 +168,7 @@ class DeltalakeSource(Source[Entity]): return DatabaseSchema( name=schema, service=EntityReference( - id=self.service.id, type=self.connection_config.type.value + id=self.service.id, type=self.service_connection.type.value ), database=EntityReference(id=database.id, type="database"), ) diff --git a/ingestion/src/metadata/utils/connection_clients.py b/ingestion/src/metadata/utils/connection_clients.py index 4204be2d38e..aa83bc4924c 100644 --- a/ingestion/src/metadata/utils/connection_clients.py +++ b/ingestion/src/metadata/utils/connection_clients.py @@ -28,3 +28,9 @@ class DynamoClient: class SalesforceClient: def __init__(self, client) -> None: self.client = client + + +@dataclass +class DeltaLakeClient: + def __init__(self, client) -> None: + self.client = client diff --git a/ingestion/src/metadata/utils/connections.py b/ingestion/src/metadata/utils/connections.py index b81cf497421..8d100d2af23 100644 --- a/ingestion/src/metadata/utils/connections.py +++ b/ingestion/src/metadata/utils/connections.py @@ -31,6 +31,9 @@ from metadata.generated.schema.entity.services.connections.database.bigQueryConn from metadata.generated.schema.entity.services.connections.database.databricksConnection import ( DatabricksConnection, ) +from metadata.generated.schema.entity.services.connections.database.deltaLakeConnection import ( + DeltaLakeConnection, +) from metadata.generated.schema.entity.services.connections.database.dynamoDBConnection import ( DynamoDBConnection, ) @@ -43,7 +46,12 @@ from metadata.generated.schema.entity.services.connections.database.salesforceCo from metadata.generated.schema.entity.services.connections.database.snowflakeConnection import ( SnowflakeConnection, ) -from metadata.utils.connection_clients import DynamoClient, GlueClient, SalesforceClient +from metadata.utils.connection_clients import ( + DeltaLakeClient, + DynamoClient, + GlueClient, + SalesforceClient, +) from metadata.utils.credentials import set_google_credentials from metadata.utils.source_connections import get_connection_args, get_connection_url from metadata.utils.timeout import timeout @@ -257,3 +265,41 @@ def _(connection: SalesforceClient) -> None: raise SourceConnectionException( f"Unknown error connecting with {connection} - {err}." ) + + +@get_connection.register +def _(connection: DeltaLakeConnection, verbose: bool = False): + import pyspark + from delta import configure_spark_with_delta_pip + + builder = ( + pyspark.sql.SparkSession.builder.appName(connection.appName) + .enableHiveSupport() + .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") + .config( + "spark.sql.catalog.spark_catalog", + "org.apache.spark.sql.delta.catalog.DeltaCatalog", + ) + ) + if connection.metastoreHostPort: + builder.config( + "hive.metastore.uris", + f"thrift://{connection.metastoreHostPort}", + ) + elif connection.metastoreFilePath: + builder.config("spark.sql.warehouse.dir", f"{connection.metastoreFilePath}") + + deltalake_connection = DeltaLakeClient( + configure_spark_with_delta_pip(builder).getOrCreate() + ) + return deltalake_connection + + +@test_connection.register +def _(connection: DeltaLakeClient) -> None: + try: + connection.client.catalog.listDatabases() + except Exception as err: + raise SourceConnectionException( + f"Unknown error connecting with {connection} - {err}." + )