Salesforce Test Connection Fix (#4376)

This commit is contained in:
Mayur Singal 2022-04-22 22:04:39 +05:30 committed by GitHub
parent 09f928b01b
commit 1c4bea4613
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 52 additions and 18 deletions

View File

@ -94,5 +94,5 @@
} }
}, },
"additionalProperties": false, "additionalProperties": false,
"required": ["hostPort", "username", "account"] "required": ["username", "account"]
} }

View File

@ -14,7 +14,7 @@ import uuid
from dataclasses import dataclass, field from dataclasses import dataclass, field
from typing import Iterable, List from typing import Iterable, List
from pydantic import SecretStr, ValidationError from pydantic import ValidationError
from simple_salesforce import Salesforce from simple_salesforce import Salesforce
from metadata.generated.schema.entity.data.database import Database from metadata.generated.schema.entity.data.database import Database
@ -39,6 +39,7 @@ from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.source import InvalidSourceException, Source, SourceStatus from metadata.ingestion.api.source import InvalidSourceException, Source, SourceStatus
from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable
from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils.connections import get_connection
logger: logging.Logger = logging.getLogger(__name__) logger: logging.Logger = logging.getLogger(__name__)
@ -69,12 +70,10 @@ class SalesforceSource(Source[OMetaDatabaseAndTable]):
self.service = self.metadata.get_service_or_create( self.service = self.metadata.get_service_or_create(
entity=DatabaseService, config=config entity=DatabaseService, config=config
) )
self.service_connection = self.config.serviceConnection.__root__.config
self.status = SalesforceSourceStatus() self.status = SalesforceSourceStatus()
self.sf = Salesforce( self.salesforce_connection = get_connection(self.service_connection)
username=self.config.serviceConnection.__root__.config.username, self.sf = self.salesforce_connection.client
password=self.config.serviceConnection.__root__.config.password.get_secret_value(),
security_token=self.config.serviceConnection.__root__.config.securityToken,
)
@classmethod @classmethod
def create(cls, config_dict, metadata_config: OpenMetadataConnection): def create(cls, config_dict, metadata_config: OpenMetadataConnection):
@ -129,9 +128,7 @@ class SalesforceSource(Source[OMetaDatabaseAndTable]):
row_order = 1 row_order = 1
table_columns = [] table_columns = []
md = self.sf.restful( md = self.sf.restful(
"sobjects/{}/describe/".format( "sobjects/{}/describe/".format(self.service_connection.sobjectName),
self.config.serviceConnection.__root__.config.sobjectName
),
params=None, params=None,
) )
@ -155,20 +152,18 @@ class SalesforceSource(Source[OMetaDatabaseAndTable]):
) )
) )
row_order += 1 row_order += 1
table_data = self.fetch_sample_data( table_data = self.fetch_sample_data(self.service_connection.sobjectName)
self.config.serviceConnection.__root__.config.sobjectName
)
logger.info("Successfully Ingested the sample data") logger.info("Successfully Ingested the sample data")
table_entity = Table( table_entity = Table(
id=uuid.uuid4(), id=uuid.uuid4(),
name=self.config.serviceConnection.__root__.config.sobjectName, name=self.service_connection.sobjectName,
tableType="Regular", tableType="Regular",
description=" ", description=" ",
columns=table_columns, columns=table_columns,
sampleData=table_data, sampleData=table_data,
) )
self.status.scanned( self.status.scanned(
f"{self.config.serviceConnection.__root__.config.scheme}.{self.config.serviceConnection.__root__.config.sobjectName}" f"{self.service_connection.scheme}.{self.service_connection.sobjectName}"
) )
database_entity = Database( database_entity = Database(
@ -178,7 +173,7 @@ class SalesforceSource(Source[OMetaDatabaseAndTable]):
) )
schema_entity = DatabaseSchema( schema_entity = DatabaseSchema(
id=uuid.uuid4(), id=uuid.uuid4(),
name=self.config.serviceConnection.__root__.config.scheme.name, name=self.service_connection.scheme.name,
database=EntityReference(id=database_entity.id, type="databaseSchema"), database=EntityReference(id=database_entity.id, type="databaseSchema"),
service=EntityReference(id=self.service.id, type="databaseService"), service=EntityReference(id=self.service.id, type="databaseService"),
) )
@ -193,7 +188,7 @@ class SalesforceSource(Source[OMetaDatabaseAndTable]):
except ValidationError as err: except ValidationError as err:
logger.error(err) logger.error(err)
self.status.failure( self.status.failure(
"{}".format(self.config.serviceConnection.__root__.config.sobjectName), "{}".format(self.service_connection.sobjectName),
err, err,
) )

View File

@ -22,3 +22,9 @@ class GlueClient:
class DynamoClient: class DynamoClient:
def __init__(self, client) -> None: def __init__(self, client) -> None:
self.client = client self.client = client
@dataclass
class SalesforceClient:
def __init__(self, client) -> None:
self.client = client

View File

@ -37,10 +37,13 @@ from metadata.generated.schema.entity.services.connections.database.dynamoDBConn
from metadata.generated.schema.entity.services.connections.database.glueConnection import ( from metadata.generated.schema.entity.services.connections.database.glueConnection import (
GlueConnection, GlueConnection,
) )
from metadata.generated.schema.entity.services.connections.database.salesforceConnection import (
SalesforceConnection,
)
from metadata.generated.schema.entity.services.connections.database.snowflakeConnection import ( from metadata.generated.schema.entity.services.connections.database.snowflakeConnection import (
SnowflakeConnection, SnowflakeConnection,
) )
from metadata.utils.connection_clients import DynamoClient, GlueClient from metadata.utils.connection_clients import DynamoClient, GlueClient, SalesforceClient
from metadata.utils.credentials import set_google_credentials from metadata.utils.credentials import set_google_credentials
from metadata.utils.source_connections import get_connection_args, get_connection_url from metadata.utils.source_connections import get_connection_args, get_connection_url
from metadata.utils.timeout import timeout from metadata.utils.timeout import timeout
@ -153,6 +156,20 @@ def _(connection: GlueConnection, verbose: bool = False):
return glue_connection return glue_connection
@get_connection.register
def _(connection: SalesforceConnection, verbose: bool = False):
from simple_salesforce import Salesforce
salesforce_connection = SalesforceClient(
Salesforce(
connection.username,
password=connection.password.get_secret_value(),
security_token=connection.securityToken,
)
)
return salesforce_connection
def create_and_bind_session(engine: Engine) -> Session: def create_and_bind_session(engine: Engine) -> Session:
""" """
Given an engine, create a session bound Given an engine, create a session bound
@ -224,3 +241,19 @@ def _(connection: GlueClient) -> None:
raise SourceConnectionException( raise SourceConnectionException(
f"Unknown error connecting with {connection} - {err}." f"Unknown error connecting with {connection} - {err}."
) )
@test_connection.register
def _(connection: SalesforceClient) -> None:
from simple_salesforce.exceptions import SalesforceAuthenticationFailed
try:
connection.client.describe()
except SalesforceAuthenticationFailed as err:
raise SourceConnectionException(
f"Connection error for {connection} - {err}. Check the connection details."
)
except Exception as err:
raise SourceConnectionException(
f"Unknown error connecting with {connection} - {err}."
)