From 176111aa275dac1bc698df477299c684d06863c7 Mon Sep 17 00:00:00 2001 From: Milan Bariya <52292922+MilanBariya@users.noreply.github.com> Date: Wed, 25 Jan 2023 16:06:48 +0530 Subject: [PATCH] Added databricks in lineage model (#9887) --- ingestion/src/metadata/ingestion/lineage/models.py | 5 +++++ .../ingestion/source/database/databricks/connection.py | 2 +- .../ingestion/source/database/databricks/query_parser.py | 1 + 3 files changed, 7 insertions(+), 1 deletion(-) diff --git a/ingestion/src/metadata/ingestion/lineage/models.py b/ingestion/src/metadata/ingestion/lineage/models.py index f63115ffd7f..f8eed2bfb36 100644 --- a/ingestion/src/metadata/ingestion/lineage/models.py +++ b/ingestion/src/metadata/ingestion/lineage/models.py @@ -23,6 +23,9 @@ from metadata.generated.schema.entity.services.connections.database.bigQueryConn from metadata.generated.schema.entity.services.connections.database.clickhouseConnection import ( ClickhouseType, ) +from metadata.generated.schema.entity.services.connections.database.databricksConnection import ( + DatabricksType, +) from metadata.generated.schema.entity.services.connections.database.db2Connection import ( Db2Type, ) @@ -64,6 +67,7 @@ class Dialect(Enum): ATHENA = "athena" BIGQUERY = "bigquery" CLICKHOUSE = "clickhouse" + DATABRICKS = "databricks" DB2 = "db2" DUCKDB = "duckdb" EXASOL = "exasol" @@ -85,6 +89,7 @@ MAP_CONNECTION_TYPE_DIALECT: Dict[str, Dialect] = { str(AthenaType.Athena.value): Dialect.ATHENA, str(BigqueryType.BigQuery.value): Dialect.BIGQUERY, str(ClickhouseType.Clickhouse.value): Dialect.CLICKHOUSE, + str(DatabricksType.Databricks.value): Dialect.DATABRICKS, str(Db2Type.Db2.value): Dialect.DB2, str(HiveType.Hive.value): Dialect.HIVE, str(MySQLType.Mysql.value): Dialect.MYSQL, diff --git a/ingestion/src/metadata/ingestion/source/database/databricks/connection.py b/ingestion/src/metadata/ingestion/source/database/databricks/connection.py index 56dbab1b0dc..56ca83a8764 100644 --- a/ingestion/src/metadata/ingestion/source/database/databricks/connection.py +++ b/ingestion/src/metadata/ingestion/source/database/databricks/connection.py @@ -37,7 +37,7 @@ def get_connection(connection: DatabricksConnection) -> Engine: if connection.httpPath: if not connection.connectionArguments: connection.connectionArguments = init_empty_connection_arguments() - connection.connectionArguments.__root__["httpPath"] = connection.httpPath + connection.connectionArguments.__root__["http_path"] = connection.httpPath return create_generic_db_connection( connection=connection, diff --git a/ingestion/src/metadata/ingestion/source/database/databricks/query_parser.py b/ingestion/src/metadata/ingestion/source/database/databricks/query_parser.py index 6c69a5c1f01..9e9ceaa19c1 100644 --- a/ingestion/src/metadata/ingestion/source/database/databricks/query_parser.py +++ b/ingestion/src/metadata/ingestion/source/database/databricks/query_parser.py @@ -47,6 +47,7 @@ class DatabricksQueryParserSource(QueryParserSource, ABC): self.metadata_config = metadata_config self.metadata = OpenMetadata(metadata_config) self.connection = self.config.serviceConnection.__root__.config + self.service_connection = self.config.serviceConnection.__root__.config self.source_config = self.config.sourceConfig.config self.start, self.end = get_start_and_end(self.source_config.queryLogDuration) self.report = SQLSourceStatus()