diff --git a/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/database/snowflakeConnection.json b/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/database/snowflakeConnection.json index 099c2f8d79f..2664c93ad43 100644 --- a/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/database/snowflakeConnection.json +++ b/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/database/snowflakeConnection.json @@ -94,5 +94,5 @@ } }, "additionalProperties": false, - "required": ["hostPort", "username", "account"] + "required": ["username", "account"] } diff --git a/ingestion/src/metadata/ingestion/source/salesforce.py b/ingestion/src/metadata/ingestion/source/salesforce.py index 25254cb2cbc..d6d349c8542 100644 --- a/ingestion/src/metadata/ingestion/source/salesforce.py +++ b/ingestion/src/metadata/ingestion/source/salesforce.py @@ -14,7 +14,7 @@ import uuid from dataclasses import dataclass, field from typing import Iterable, List -from pydantic import SecretStr, ValidationError +from pydantic import ValidationError from simple_salesforce import Salesforce from metadata.generated.schema.entity.data.database import Database @@ -39,6 +39,7 @@ from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.api.source import InvalidSourceException, Source, SourceStatus from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.utils.connections import get_connection logger: logging.Logger = logging.getLogger(__name__) @@ -69,12 +70,10 @@ class SalesforceSource(Source[OMetaDatabaseAndTable]): self.service = self.metadata.get_service_or_create( entity=DatabaseService, config=config ) + self.service_connection = self.config.serviceConnection.__root__.config self.status = SalesforceSourceStatus() - self.sf = Salesforce( - username=self.config.serviceConnection.__root__.config.username, - password=self.config.serviceConnection.__root__.config.password.get_secret_value(), - security_token=self.config.serviceConnection.__root__.config.securityToken, - ) + self.salesforce_connection = get_connection(self.service_connection) + self.sf = self.salesforce_connection.client @classmethod def create(cls, config_dict, metadata_config: OpenMetadataConnection): @@ -129,9 +128,7 @@ class SalesforceSource(Source[OMetaDatabaseAndTable]): row_order = 1 table_columns = [] md = self.sf.restful( - "sobjects/{}/describe/".format( - self.config.serviceConnection.__root__.config.sobjectName - ), + "sobjects/{}/describe/".format(self.service_connection.sobjectName), params=None, ) @@ -155,20 +152,18 @@ class SalesforceSource(Source[OMetaDatabaseAndTable]): ) ) row_order += 1 - table_data = self.fetch_sample_data( - self.config.serviceConnection.__root__.config.sobjectName - ) + table_data = self.fetch_sample_data(self.service_connection.sobjectName) logger.info("Successfully Ingested the sample data") table_entity = Table( id=uuid.uuid4(), - name=self.config.serviceConnection.__root__.config.sobjectName, + name=self.service_connection.sobjectName, tableType="Regular", description=" ", columns=table_columns, sampleData=table_data, ) self.status.scanned( - f"{self.config.serviceConnection.__root__.config.scheme}.{self.config.serviceConnection.__root__.config.sobjectName}" + f"{self.service_connection.scheme}.{self.service_connection.sobjectName}" ) database_entity = Database( @@ -178,7 +173,7 @@ class SalesforceSource(Source[OMetaDatabaseAndTable]): ) schema_entity = DatabaseSchema( id=uuid.uuid4(), - name=self.config.serviceConnection.__root__.config.scheme.name, + name=self.service_connection.scheme.name, database=EntityReference(id=database_entity.id, type="databaseSchema"), service=EntityReference(id=self.service.id, type="databaseService"), ) @@ -193,7 +188,7 @@ class SalesforceSource(Source[OMetaDatabaseAndTable]): except ValidationError as err: logger.error(err) self.status.failure( - "{}".format(self.config.serviceConnection.__root__.config.sobjectName), + "{}".format(self.service_connection.sobjectName), err, ) diff --git a/ingestion/src/metadata/utils/connection_clients.py b/ingestion/src/metadata/utils/connection_clients.py index f54d54ca52b..4204be2d38e 100644 --- a/ingestion/src/metadata/utils/connection_clients.py +++ b/ingestion/src/metadata/utils/connection_clients.py @@ -22,3 +22,9 @@ class GlueClient: class DynamoClient: def __init__(self, client) -> None: self.client = client + + +@dataclass +class SalesforceClient: + def __init__(self, client) -> None: + self.client = client diff --git a/ingestion/src/metadata/utils/connections.py b/ingestion/src/metadata/utils/connections.py index 6f920c1152d..b81cf497421 100644 --- a/ingestion/src/metadata/utils/connections.py +++ b/ingestion/src/metadata/utils/connections.py @@ -37,10 +37,13 @@ from metadata.generated.schema.entity.services.connections.database.dynamoDBConn from metadata.generated.schema.entity.services.connections.database.glueConnection import ( GlueConnection, ) +from metadata.generated.schema.entity.services.connections.database.salesforceConnection import ( + SalesforceConnection, +) from metadata.generated.schema.entity.services.connections.database.snowflakeConnection import ( SnowflakeConnection, ) -from metadata.utils.connection_clients import DynamoClient, GlueClient +from metadata.utils.connection_clients import DynamoClient, GlueClient, SalesforceClient from metadata.utils.credentials import set_google_credentials from metadata.utils.source_connections import get_connection_args, get_connection_url from metadata.utils.timeout import timeout @@ -153,6 +156,20 @@ def _(connection: GlueConnection, verbose: bool = False): return glue_connection +@get_connection.register +def _(connection: SalesforceConnection, verbose: bool = False): + from simple_salesforce import Salesforce + + salesforce_connection = SalesforceClient( + Salesforce( + connection.username, + password=connection.password.get_secret_value(), + security_token=connection.securityToken, + ) + ) + return salesforce_connection + + def create_and_bind_session(engine: Engine) -> Session: """ Given an engine, create a session bound @@ -224,3 +241,19 @@ def _(connection: GlueClient) -> None: raise SourceConnectionException( f"Unknown error connecting with {connection} - {err}." ) + + +@test_connection.register +def _(connection: SalesforceClient) -> None: + from simple_salesforce.exceptions import SalesforceAuthenticationFailed + + try: + connection.client.describe() + except SalesforceAuthenticationFailed as err: + raise SourceConnectionException( + f"Connection error for {connection} - {err}. Check the connection details." + ) + except Exception as err: + raise SourceConnectionException( + f"Unknown error connecting with {connection} - {err}." + )