From 256b16d877eb7d5b038f9afb3d060c854b01c48d Mon Sep 17 00:00:00 2001 From: Pere Miquel Brull Date: Tue, 19 Apr 2022 12:31:34 +0200 Subject: [PATCH] Fix #4032 - Bigquery properties & GCS Credentials (#4202) Fix #4032 - Bigquery properties & GCS Credentials (#4202) --- .../database/bigQueryConnection.json | 10 ++- .../security/credentials/gcsCredentials.json | 82 +++++++++++++++++++ .../services/DatabaseServiceResourceTest.java | 2 +- .../sample_data/datasets/service.json | 15 +++- ingestion/examples/workflows/bigquery.json | 25 +++--- .../src/metadata/ingestion/source/bigquery.py | 45 +++------- .../ingestion/source/bigquery_usage.py | 33 +++----- .../ingestion/source/clickhouse_usage.py | 5 +- .../metadata/ingestion/source/mssql_usage.py | 5 +- .../src/metadata/ingestion/source/postgres.py | 4 +- .../metadata/ingestion/source/snowflake.py | 2 - .../metadata/ingestion/source/sql_source.py | 2 +- ingestion/src/metadata/utils/credentials.py | 81 ++++++++++++++++++ ingestion/src/metadata/utils/engines.py | 12 +-- .../src/metadata/utils/source_connections.py | 10 ++- .../ometa/test_ometa_service_api.py | 44 ++++++++++ ingestion/tests/unit/source/test_bigquery.py | 27 +++--- .../tests/unit/source/test_source_parsing.py | 23 +++--- .../src/openmetadata/api/rest_api.py | 4 +- .../operations/test_connection.py | 9 +- 20 files changed, 312 insertions(+), 128 deletions(-) create mode 100644 catalog-rest-service/src/main/resources/json/schema/security/credentials/gcsCredentials.json create mode 100644 ingestion/src/metadata/utils/credentials.py diff --git a/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/database/bigQueryConnection.json b/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/database/bigQueryConnection.json index 483eab184c1..3be6b8f77ce 100644 --- a/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/database/bigQueryConnection.json +++ b/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/database/bigQueryConnection.json @@ -39,10 +39,14 @@ "description": "username to connect to the Athena. This user should have privileges to read all the metadata in Athena.", "type": "string" }, - "projectID": { - "description": "Google BigQuery project id.", + "projectId": { + "description": "BigQuery project ID. Inform it here if passing the credentials path.", "type": "string" }, + "credentials": { + "description": "GCS Credentials", + "$ref": "../../../../security/credentials/gcsCredentials.json" + }, "enablePolicyTagImport": { "description": "Enable importing policy tags of BigQuery into OpenMetadata", "type": "boolean", @@ -87,6 +91,6 @@ }, "additionalProperties": false, "required": [ - "projectID" + "credentials" ] } diff --git a/catalog-rest-service/src/main/resources/json/schema/security/credentials/gcsCredentials.json b/catalog-rest-service/src/main/resources/json/schema/security/credentials/gcsCredentials.json new file mode 100644 index 00000000000..90c2678c7a3 --- /dev/null +++ b/catalog-rest-service/src/main/resources/json/schema/security/credentials/gcsCredentials.json @@ -0,0 +1,82 @@ +{ + "$id": "https://open-metadata.org/security/credentials/gcsCredentials.json", + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "GCSCredentials", + "description": "GCS credentials configs.", + "type": "object", + "javaType": "org.openmetadata.catalog.security.credentials.GCSCredentials", + "definitions": { + "GCSValues": { + "description": "GCS Credentials.", + "type": "object", + "properties": { + "type": { + "description": "Google Cloud service account type.", + "type": "string" + }, + "projectId": { + "description": "Google Cloud project id.", + "type": "string" + }, + "privateKeyId": { + "description": "Google Cloud private key id.", + "type": "string" + }, + "privateKey": { + "description": "Google Cloud private key.", + "type": "string" + }, + "clientEmail": { + "description": "Google Cloud email.", + "type": "string" + }, + "clientId": { + "description": "Google Cloud Client ID.", + "type": "string" + }, + "authUri": { + "description": "Google Cloud auth uri.", + "type": "string", + "format": "uri" + }, + "tokenUri": { + "description": "Google Cloud token uri.", + "type": "string", + "format": "uri" + }, + "authProviderX509CertUrl": { + "description": "Google Cloud auth provider certificate.", + "type": "string", + "format": "uri" + }, + "clientX509CertUrl": { + "description": "Google Cloud client certificate uri.", + "type": "string", + "format": "uri" + } + }, + "additionalProperties": false + }, + "GCSCredentialsPath": { + "description": "GCS Credentials Path.", + "type": "string" + } + }, + "properties": { + "gcsConfig": { + "description": "GCS configs.", + "oneOf": [ + { + "$ref": "#/definitions/GCSValues" + }, + { + "$ref": "#/definitions/GCSCredentialsPath" + } + ] + } + }, + "additionalProperties": false, + "required": [ + "gcsConfig" + ] +} \ No newline at end of file diff --git a/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/services/DatabaseServiceResourceTest.java b/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/services/DatabaseServiceResourceTest.java index d0dd02cbc02..e7204a152ee 100644 --- a/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/services/DatabaseServiceResourceTest.java +++ b/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/services/DatabaseServiceResourceTest.java @@ -364,7 +364,7 @@ public class DatabaseServiceResourceTest extends EntityResourceTest str: - with tempfile.NamedTemporaryFile(delete=False) as fp: - cred_json = json.dumps(credentials, indent=4, separators=(",", ": ")) - fp.write(cred_json.encode()) - return fp.name + set_google_credentials( + gcs_credentials=config.serviceConnection.__root__.config.credentials + ) + + return cls(config, metadata_config) def standardize_schema_table_names( self, schema: str, table: str @@ -184,7 +162,10 @@ class BigquerySource(SQLSource): def _get_database(self, database: Optional[str]) -> Database: if not database: - database = self.service_connection.projectID + database = ( + self.connection_config.projectId + or self.connection_config.credentials.gcsConfig.projectId + ) return Database( name=database, service=EntityReference( diff --git a/ingestion/src/metadata/ingestion/source/bigquery_usage.py b/ingestion/src/metadata/ingestion/source/bigquery_usage.py index c479ec56e0a..0746c5b79d6 100644 --- a/ingestion/src/metadata/ingestion/source/bigquery_usage.py +++ b/ingestion/src/metadata/ingestion/source/bigquery_usage.py @@ -33,8 +33,8 @@ from metadata.generated.schema.metadataIngestion.workflow import ( ) from metadata.ingestion.api.source import InvalidSourceException, Source, SourceStatus from metadata.ingestion.models.table_queries import TableQuery -from metadata.ingestion.source.bigquery import BigquerySource from metadata.ingestion.source.sql_alchemy_helper import SQLSourceStatus +from metadata.utils.credentials import set_google_credentials from metadata.utils.helpers import get_start_and_end logger = log.getLogger(__name__) @@ -50,7 +50,13 @@ class BigqueryUsageSource(Source[TableQuery]): self.metadata_config = metadata_config self.config = config self.service_connection = config.serviceConnection.__root__.config - self.project_id = self.service_connection.projectID + + # Used as db + self.project_id = ( + self.service_connection.projectId + or self.service_connection.credentials.gcsConfig.projectId + ) + self.logger_name = "cloudaudit.googleapis.com%2Fdata_access" self.status = SQLSourceStatus() @@ -58,28 +64,15 @@ class BigqueryUsageSource(Source[TableQuery]): def create(cls, config_dict, metadata_config: OpenMetadataConnection): config: WorkflowSource = WorkflowSource.parse_obj(config_dict) connection: BigQueryConnection = config.serviceConnection.__root__.config - options = connection.connectionOptions.dict() if not isinstance(connection, BigQueryConnection): raise InvalidSourceException( f"Expected BigQueryConnection, but got {connection}" ) - if not os.environ.get("GOOGLE_APPLICATION_CREDENTIALS"): - if options.get("credentials_path"): - os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = options[ - "credentials_path" - ] - del connection.connectionOptions.credentials_path - elif options.get("credentials"): - cls.temp_credentials = BigquerySource.create_credential_temp_file( - credentials=options["credentials"] - ) - os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = cls.temp_credentials - del connection.connectionOptions.credentials - else: - logger.warning( - "Please refer to the BigQuery connector documentation, especially the credentials part " - "https://docs.open-metadata.org/connectors/bigquery" - ) + + set_google_credentials( + gcs_credentials=config.serviceConnection.__root__.config.credentials + ) + return cls(config, metadata_config) def prepare(self): diff --git a/ingestion/src/metadata/ingestion/source/clickhouse_usage.py b/ingestion/src/metadata/ingestion/source/clickhouse_usage.py index f5650079ae6..2b70f9bd382 100644 --- a/ingestion/src/metadata/ingestion/source/clickhouse_usage.py +++ b/ingestion/src/metadata/ingestion/source/clickhouse_usage.py @@ -18,9 +18,6 @@ from typing import Any, Dict, Iterable from metadata.generated.schema.entity.services.connections.database.clickhouseConnection import ( ClickhouseConnection, ) -from metadata.generated.schema.entity.services.databaseService import ( - DatabaseServiceType, -) from metadata.generated.schema.metadataIngestion.workflow import ( Source as WorkflowSource, ) @@ -31,7 +28,7 @@ from metadata.ingestion.api.source import InvalidSourceException, Source, Source from metadata.ingestion.models.table_queries import TableQuery from metadata.ingestion.source.sql_alchemy_helper import SQLSourceStatus from metadata.utils.engines import get_engine, test_connection -from metadata.utils.helpers import get_raw_extract_iter, get_start_and_end +from metadata.utils.helpers import get_start_and_end from metadata.utils.sql_queries import CLICKHOUSE_SQL_USAGE_STATEMENT diff --git a/ingestion/src/metadata/ingestion/source/mssql_usage.py b/ingestion/src/metadata/ingestion/source/mssql_usage.py index b68346daf14..095dc035ea0 100644 --- a/ingestion/src/metadata/ingestion/source/mssql_usage.py +++ b/ingestion/src/metadata/ingestion/source/mssql_usage.py @@ -17,9 +17,6 @@ from typing import Any, Dict, Iterable from metadata.generated.schema.entity.services.connections.database.mssqlConnection import ( MssqlConnection, ) -from metadata.generated.schema.entity.services.databaseService import ( - DatabaseServiceType, -) from metadata.generated.schema.metadataIngestion.workflow import ( Source as WorkflowSource, ) @@ -30,7 +27,7 @@ from metadata.ingestion.api.source import InvalidSourceException, Source, Source from metadata.ingestion.models.table_queries import TableQuery from metadata.ingestion.source.sql_alchemy_helper import SQLSourceStatus from metadata.utils.engines import get_engine, test_connection -from metadata.utils.helpers import get_raw_extract_iter, get_start_and_end +from metadata.utils.helpers import get_start_and_end from metadata.utils.sql_queries import MSSQL_SQL_USAGE_STATEMENT diff --git a/ingestion/src/metadata/ingestion/source/postgres.py b/ingestion/src/metadata/ingestion/source/postgres.py index f932d48d6cc..dcd7c6c6607 100644 --- a/ingestion/src/metadata/ingestion/source/postgres.py +++ b/ingestion/src/metadata/ingestion/source/postgres.py @@ -66,7 +66,9 @@ class PostgresSource(SQLSource): try: logger.info(f"Ingesting from database: {row[0]}") self.service_connection.database = row[0] - self.engine = get_engine(self.config.serviceConnection) + self.engine = get_engine( + self.config.serviceConnection.__root__.config + ) self.engine.connect() yield inspect(self.engine) except Exception as err: diff --git a/ingestion/src/metadata/ingestion/source/snowflake.py b/ingestion/src/metadata/ingestion/source/snowflake.py index f5f06fc3dc0..e47e69d7b6b 100644 --- a/ingestion/src/metadata/ingestion/source/snowflake.py +++ b/ingestion/src/metadata/ingestion/source/snowflake.py @@ -35,7 +35,6 @@ from metadata.generated.schema.metadataIngestion.workflow import ( from metadata.ingestion.api.source import InvalidSourceException from metadata.ingestion.source.sql_source import SQLSource from metadata.utils.column_type_parser import create_sqlalchemy_type -from metadata.utils.engines import get_engine GEOGRAPHY = create_sqlalchemy_type("GEOGRAPHY") ischema_names["VARIANT"] = VARIANT @@ -82,7 +81,6 @@ class SnowflakeSource(SQLSource): self.connection.execute(use_db_query) logger.info(f"Ingesting from database: {row[1]}") self.config.serviceConnection.__root__.config.database = row[1] - self.engine = get_engine(self.config.serviceConnection) yield inspect(self.engine) def fetch_sample_data(self, schema: str, table: str) -> Optional[TableData]: diff --git a/ingestion/src/metadata/ingestion/source/sql_source.py b/ingestion/src/metadata/ingestion/source/sql_source.py index 65931beb8fb..3880b33a23f 100644 --- a/ingestion/src/metadata/ingestion/source/sql_source.py +++ b/ingestion/src/metadata/ingestion/source/sql_source.py @@ -134,7 +134,7 @@ class SQLSource(Source[OMetaDatabaseAndTable]): entity=DatabaseService, config=config ) - self.engine = get_engine(service_connection=self.config.serviceConnection) + self.engine = get_engine(self.service_connection) self.test_connection() self._session = None # We will instantiate this just if needed diff --git a/ingestion/src/metadata/utils/credentials.py b/ingestion/src/metadata/utils/credentials.py new file mode 100644 index 00000000000..fab68ac1698 --- /dev/null +++ b/ingestion/src/metadata/utils/credentials.py @@ -0,0 +1,81 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Credentials helper module +""" +import json +import os +import tempfile + +from metadata.generated.schema.security.credentials.gcsCredentials import ( + GCSCredentials, + GCSCredentialsPath, + GCSValues, +) + +GOOGLE_CREDENTIALS = "GOOGLE_APPLICATION_CREDENTIALS" + + +class InvalidGcsConfigException(Exception): + """ + Raised when we have errors trying to set GCS credentials + """ + + +def create_credential_tmp_file(credentials: dict) -> str: + """ + Given a credentials' dict, store it in a tmp file + :param credentials: dictionary to store + :return: path to find the file + """ + with tempfile.NamedTemporaryFile(delete=False) as fp: + cred_json = json.dumps(credentials, indent=4, separators=(",", ": ")) + fp.write(cred_json.encode()) + + return fp.name + + +def set_google_credentials(gcs_credentials: GCSCredentials) -> None: + """ + Set GCS credentials environment variable + :param gcs_credentials: GCSCredentials + """ + if os.environ.get(GOOGLE_CREDENTIALS): + return + + if isinstance(gcs_credentials.gcsConfig, GCSCredentialsPath): + os.environ[GOOGLE_CREDENTIALS] = str(gcs_credentials.gcsConfig.__root__) + return + + if isinstance(gcs_credentials.gcsConfig, GCSValues): + credentials_dict = { + "type": gcs_credentials.gcsConfig.type, + "project_id": gcs_credentials.gcsConfig.projectId, + "private_key_id": gcs_credentials.gcsConfig.privateKeyId, + "private_key": gcs_credentials.gcsConfig.privateKey, + "client_email": gcs_credentials.gcsConfig.clientEmail, + "client_id": gcs_credentials.gcsConfig.clientId, + "auth_uri": str(gcs_credentials.gcsConfig.authUri), + "token_uri": str(gcs_credentials.gcsConfig.tokenUri), + "auth_provider_x509_cert_url": str( + gcs_credentials.gcsConfig.authProviderX509CertUrl + ), + "client_x509_cert_url": str(gcs_credentials.gcsConfig.clientX509CertUrl), + } + + tmp_credentials_file = create_credential_tmp_file(credentials=credentials_dict) + os.environ[GOOGLE_CREDENTIALS] = tmp_credentials_file + return + + raise InvalidGcsConfigException( + f"Error trying to set GCS credentials with {gcs_credentials}." + " Check https://docs.open-metadata.org/connectors/bigquery" + ) diff --git a/ingestion/src/metadata/utils/engines.py b/ingestion/src/metadata/utils/engines.py index b83ab2abe03..69688e83816 100644 --- a/ingestion/src/metadata/utils/engines.py +++ b/ingestion/src/metadata/utils/engines.py @@ -23,9 +23,6 @@ from sqlalchemy.orm.session import Session from metadata.generated.schema.entity.services.connections.connectionBasicType import ( ConnectionOptions, ) -from metadata.generated.schema.entity.services.connections.serviceConnection import ( - ServiceConnection, -) from metadata.utils.source_connections import get_connection_args, get_connection_url from metadata.utils.timeout import timeout @@ -38,20 +35,19 @@ class SourceConnectionException(Exception): """ -def get_engine(service_connection: ServiceConnection, verbose: bool = False) -> Engine: +def get_engine(connection, verbose: bool = False) -> Engine: """ Given an SQL configuration, build the SQLAlchemy Engine """ - service_connection_config = service_connection.__root__.config - options = service_connection_config.connectionOptions + options = connection.connectionOptions if not options: options = ConnectionOptions() engine = create_engine( - get_connection_url(service_connection_config), + get_connection_url(connection), **options.dict(), - connect_args=get_connection_args(service_connection_config), + connect_args=get_connection_args(connection), echo=verbose, ) diff --git a/ingestion/src/metadata/utils/source_connections.py b/ingestion/src/metadata/utils/source_connections.py index 0e391678b63..e9a26bb0481 100644 --- a/ingestion/src/metadata/utils/source_connections.py +++ b/ingestion/src/metadata/utils/source_connections.py @@ -73,6 +73,7 @@ from metadata.generated.schema.entity.services.connections.database.trinoConnect from metadata.generated.schema.entity.services.connections.database.verticaConnection import ( VerticaConnection, ) +from metadata.generated.schema.security.credentials.gcsCredentials import GCSValues def get_connection_url_common(connection): @@ -254,8 +255,13 @@ def _(connection: HiveConnection): @get_connection_url.register def _(connection: BigQueryConnection): - if connection.projectID: - return f"{connection.scheme.value}://{connection.projectID}" + project_id = connection.projectId + if not project_id and isinstance(connection.credentials.gcsConfig, GCSValues): + project_id = connection.credentials.gcsConfig.projectId + if project_id: + return ( + f"{connection.scheme.value}://{connection.credentials.gcsConfig.projectId}" + ) return f"{connection.scheme.value}://" diff --git a/ingestion/tests/integration/ometa/test_ometa_service_api.py b/ingestion/tests/integration/ometa/test_ometa_service_api.py index 0b1401c1fa1..34843076ef4 100644 --- a/ingestion/tests/integration/ometa/test_ometa_service_api.py +++ b/ingestion/tests/integration/ometa/test_ometa_service_api.py @@ -126,6 +126,50 @@ class OMetaServiceTest(TestCase): # Clean self.metadata.delete(entity=DatabaseService, entity_id=service.id) + def test_create_database_service_bigquery(self): + """ + Create a db service from WorkflowSource + """ + data = { + "type": "bigquery", + "serviceName": "local_bigquery", + "serviceConnection": { + "config": { + "type": "BigQuery", + "enablePolicyTagImport": True, + "credentials": { + "gcsConfig": { + "type": "service_account", + "projectId": "projectID", + "privateKeyId": "privateKeyId", + "privateKey": "privateKey", + "clientEmail": "clientEmail", + "clientId": "clientId", + "authUri": "https://accounts.google.com/o/oauth2/auth", + "tokenUri": "https://oauth2.googleapis.com/token", + "authProviderX509CertUrl": "https://www.googleapis.com/oauth2/v1/certs", + "clientX509CertUrl": "https://cert.url", + } + }, + } + }, + "sourceConfig": {"config": {"enableDataProfiler": False}}, + } + + workflow_source = WorkflowSource(**data) + + # Create service + service: DatabaseService = self.metadata.get_service_or_create( + entity=DatabaseService, config=workflow_source + ) + assert service + assert service.serviceType == DatabaseServiceType.BigQuery + + # Check get + assert service == self.metadata.get_service_or_create( + entity=DatabaseService, config=workflow_source + ) + def test_create_dashboard_service_looker(self): """ Create a db service from WorkflowSource diff --git a/ingestion/tests/unit/source/test_bigquery.py b/ingestion/tests/unit/source/test_bigquery.py index 549ddcd739f..85e60ee934b 100644 --- a/ingestion/tests/unit/source/test_bigquery.py +++ b/ingestion/tests/unit/source/test_bigquery.py @@ -31,22 +31,21 @@ CONFIG = """ "serviceConnection": { "config": { "type": "BigQuery", - "projectID": "project_id", "enablePolicyTagImport": true, - "connectionOptions": { - "credentials": { - "type": "service_account", - "project_id": "project_id", - "private_key_id": "private_key_id", - "private_key": "private_key", - "client_email": "gcpuser@project_id.iam.gserviceaccount.com", - "client_id": "client_id", - "auth_uri": "https://accounts.google.com/o/oauth2/auth", - "token_uri": "https://oauth2.googleapis.com/token", - "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs", - "client_x509_cert_url": "" + "credentials": { + "gcsConfig": { + "type": "service_account", + "projectId": "projectID", + "privateKeyId": "privateKeyId", + "privateKey": "privateKey", + "clientEmail": "clientEmail", + "clientId": "clientId", + "authUri": "https://accounts.google.com/o/oauth2/auth", + "tokenUri": "https://oauth2.googleapis.com/token", + "authProviderX509CertUrl": "https://www.googleapis.com/oauth2/v1/certs", + "clientX509CertUrl": "https://cert.url" + } } - } } }, "sourceConfig": {"config": {"enableDataProfiler": false}} diff --git a/ingestion/tests/unit/source/test_source_parsing.py b/ingestion/tests/unit/source/test_source_parsing.py index 2b9d6727606..3d2f1cd4f77 100644 --- a/ingestion/tests/unit/source/test_source_parsing.py +++ b/ingestion/tests/unit/source/test_source_parsing.py @@ -172,20 +172,19 @@ def test_bigquery(): "serviceConnection": { "config": { "type": "BigQuery", - "projectID": "project_id", "enablePolicyTagImport": True, - "connectionOptions": { - "credentials": { + "credentials": { + "gcsConfig": { "type": "service_account", - "project_id": "project_id", - "private_key_id": "private_key_id", - "private_key": "private_key", - "client_email": "gcpuser@project_id.iam.gserviceaccount.com", - "client_id": "client_id", - "auth_uri": "https://accounts.google.com/o/oauth2/auth", - "token_uri": "https://oauth2.googleapis.com/token", - "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs", - "client_x509_cert_url": "", + "projectId": "projectID", + "privateKeyId": "privateKeyId", + "privateKey": "privateKey", + "clientEmail": "clientEmail", + "clientId": "clientId", + "authUri": "https://accounts.google.com/o/oauth2/auth", + "tokenUri": "https://oauth2.googleapis.com/token", + "authProviderX509CertUrl": "https://www.googleapis.com/oauth2/v1/certs", + "clientX509CertUrl": "https://cert.url", } }, } diff --git a/openmetadata-airflow-apis/src/openmetadata/api/rest_api.py b/openmetadata-airflow-apis/src/openmetadata/api/rest_api.py index 01232a7d47a..2bdecbf5236 100644 --- a/openmetadata-airflow-apis/src/openmetadata/api/rest_api.py +++ b/openmetadata-airflow-apis/src/openmetadata/api/rest_api.py @@ -173,7 +173,9 @@ class REST_API(AppBuilderBaseView): try: service_connection_model = ServiceConnectionModel(**json_request) - response = test_source_connection(service_connection_model) + response = test_source_connection( + service_connection_model.serviceConnection.__root__.config + ) return response diff --git a/openmetadata-airflow-apis/src/openmetadata/operations/test_connection.py b/openmetadata-airflow-apis/src/openmetadata/operations/test_connection.py index 0c9c0ee8cd8..fcd5c2d295c 100644 --- a/openmetadata-airflow-apis/src/openmetadata/operations/test_connection.py +++ b/openmetadata-airflow-apis/src/openmetadata/operations/test_connection.py @@ -15,9 +15,6 @@ from a WorkflowSource from flask import Response from openmetadata.api.response import ApiResponse -from metadata.generated.schema.entity.services.connections.serviceConnection import ( - ServiceConnectionModel, -) from metadata.utils.engines import ( SourceConnectionException, get_engine, @@ -25,15 +22,13 @@ from metadata.utils.engines import ( ) -def test_source_connection( - service_connection_model: ServiceConnectionModel, -) -> Response: +def test_source_connection(connection) -> Response: """ Create the engine and test the connection :param workflow_source: Source to test :return: None or exception """ - engine = get_engine(service_connection_model.serviceConnection) + engine = get_engine(connection) try: test_connection(engine)