deltalake-test-connection-fixed (#4387)

This commit is contained in:
codingwithabhi 2022-04-23 13:10:44 +05:30 committed by GitHub
parent 0feecde0f4
commit 32d392e0cb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 61 additions and 24 deletions

View File

@ -1,7 +1,7 @@
FROM python:3.9-slim as base
ENV AIRFLOW_HOME=/airflow
RUN apt-get update && \
apt-get install -y gcc libsasl2-modules libsasl2-dev curl build-essential libssl-dev libffi-dev librdkafka-dev unixodbc-dev python3.9-dev libevent-dev wget --no-install-recommends && \
apt-get install -y gcc libsasl2-modules libsasl2-dev curl build-essential libssl-dev libffi-dev librdkafka-dev unixodbc-dev python3.9-dev openjdk-11-jre libevent-dev wget --no-install-recommends && \
rm -rf /var/lib/apt/lists/*
# RUN wget https://github.com/open-metadata/openmetadata-airflow-apis/releases/download/0.1/openmetadata-airflow-apis-plugin.tar.gz
# RUN tar zxvf openmetadata-airflow-apis-plugin.tar.gz

View File

@ -30,6 +30,7 @@ from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.sql_source import SQLSourceStatus
from metadata.utils.column_type_parser import ColumnTypeParser
from metadata.utils.connections import get_connection, test_connection
from metadata.utils.filters import filter_by_schema, filter_by_table
logger: logging.Logger = logging.getLogger(__name__)
@ -54,33 +55,17 @@ class DeltalakeSource(Source[Entity]):
):
super().__init__()
self.config = config
self.connection_config = config.serviceConnection.__root__.config
self.metadata_config = metadata_config
self.metadata = OpenMetadata(metadata_config)
self.service_connection = self.config.serviceConnection.__root__.config
self.service = self.metadata.get_service_or_create(
entity=DatabaseService, config=config
)
self.connection = get_connection(self.service_connection)
self.status = SQLSourceStatus()
logger.info("Establishing Sparks Session")
builder = (
pyspark.sql.SparkSession.builder.appName(self.connection_config.appName)
.enableHiveSupport()
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config(
"spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.delta.catalog.DeltaCatalog",
)
)
if self.connection_config.metastoreHostPort:
builder.config(
"hive.metastore.uris",
f"thrift://{self.connection_config.metastoreHostPort}",
)
elif self.connection_config.metastoreFilePath:
builder.config(
"spark.sql.warehouse.dir", f"{self.connection_config.metastoreFilePath}"
)
self.spark = configure_spark_with_delta_pip(builder).getOrCreate()
self.spark = self.connection.client
self.table_type_map = {
TableType.External.value.lower(): TableType.External.value,
TableType.View.value.lower(): TableType.View.value,
@ -175,7 +160,7 @@ class DeltalakeSource(Source[Entity]):
id=uuid.uuid4(),
name=DEFAULT_DATABASE,
service=EntityReference(
id=self.service.id, type=self.connection_config.type.value
id=self.service.id, type=self.service_connection.type.value
),
)
@ -183,7 +168,7 @@ class DeltalakeSource(Source[Entity]):
return DatabaseSchema(
name=schema,
service=EntityReference(
id=self.service.id, type=self.connection_config.type.value
id=self.service.id, type=self.service_connection.type.value
),
database=EntityReference(id=database.id, type="database"),
)

View File

@ -28,3 +28,9 @@ class DynamoClient:
class SalesforceClient:
def __init__(self, client) -> None:
self.client = client
@dataclass
class DeltaLakeClient:
def __init__(self, client) -> None:
self.client = client

View File

@ -31,6 +31,9 @@ from metadata.generated.schema.entity.services.connections.database.bigQueryConn
from metadata.generated.schema.entity.services.connections.database.databricksConnection import (
DatabricksConnection,
)
from metadata.generated.schema.entity.services.connections.database.deltaLakeConnection import (
DeltaLakeConnection,
)
from metadata.generated.schema.entity.services.connections.database.dynamoDBConnection import (
DynamoDBConnection,
)
@ -43,7 +46,12 @@ from metadata.generated.schema.entity.services.connections.database.salesforceCo
from metadata.generated.schema.entity.services.connections.database.snowflakeConnection import (
SnowflakeConnection,
)
from metadata.utils.connection_clients import DynamoClient, GlueClient, SalesforceClient
from metadata.utils.connection_clients import (
DeltaLakeClient,
DynamoClient,
GlueClient,
SalesforceClient,
)
from metadata.utils.credentials import set_google_credentials
from metadata.utils.source_connections import get_connection_args, get_connection_url
from metadata.utils.timeout import timeout
@ -257,3 +265,41 @@ def _(connection: SalesforceClient) -> None:
raise SourceConnectionException(
f"Unknown error connecting with {connection} - {err}."
)
@get_connection.register
def _(connection: DeltaLakeConnection, verbose: bool = False):
import pyspark
from delta import configure_spark_with_delta_pip
builder = (
pyspark.sql.SparkSession.builder.appName(connection.appName)
.enableHiveSupport()
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config(
"spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.delta.catalog.DeltaCatalog",
)
)
if connection.metastoreHostPort:
builder.config(
"hive.metastore.uris",
f"thrift://{connection.metastoreHostPort}",
)
elif connection.metastoreFilePath:
builder.config("spark.sql.warehouse.dir", f"{connection.metastoreFilePath}")
deltalake_connection = DeltaLakeClient(
configure_spark_with_delta_pip(builder).getOrCreate()
)
return deltalake_connection
@test_connection.register
def _(connection: DeltaLakeClient) -> None:
try:
connection.client.catalog.listDatabases()
except Exception as err:
raise SourceConnectionException(
f"Unknown error connecting with {connection} - {err}."
)