From bb76a2f11b6e599fea0387518b7247dab41733f6 Mon Sep 17 00:00:00 2001 From: codingwithabhi <63392662+codingwithabhi@users.noreply.github.com> Date: Fri, 8 Apr 2022 16:08:31 +0530 Subject: [PATCH] salesforce-connector-fixed (#3923) * salesforce-connector-fixed * salesforce-connection-updated * ran-make-core_bump_version_dev * code-formatted * removed-get_connection_url * code-formatted * code-formatted * code-formatted --- .../entity/services/databaseService.json | 6 +- ingestion-core/src/metadata/_version.py | 2 +- ingestion/examples/workflows/salesforce.json | 33 ++++---- .../metadata/ingestion/source/salesforce.py | 78 ++++++++++++------- .../src/metadata/utils/source_connections.py | 4 + 5 files changed, 80 insertions(+), 43 deletions(-) diff --git a/catalog-rest-service/src/main/resources/json/schema/entity/services/databaseService.json b/catalog-rest-service/src/main/resources/json/schema/entity/services/databaseService.json index dad216c9195..30a5c252673 100644 --- a/catalog-rest-service/src/main/resources/json/schema/entity/services/databaseService.json +++ b/catalog-rest-service/src/main/resources/json/schema/entity/services/databaseService.json @@ -31,7 +31,8 @@ "AzureSQL", "SingleStore", "SQLite", - "DeltaLake" + "DeltaLake", + "Salesforce" ], "javaEnums": [ { @@ -102,6 +103,9 @@ }, { "name": "DeltaLake" + }, + { + "name": "Salesforce" } ] }, diff --git a/ingestion-core/src/metadata/_version.py b/ingestion-core/src/metadata/_version.py index 6a5ba5e6149..64c54a34574 100644 --- a/ingestion-core/src/metadata/_version.py +++ b/ingestion-core/src/metadata/_version.py @@ -7,5 +7,5 @@ Provides metadata version information. from incremental import Version -__version__ = Version("metadata", 0, 9, 0, dev=13) +__version__ = Version("metadata", 0, 9, 0, dev=16) __all__ = ["__version__"] diff --git a/ingestion/examples/workflows/salesforce.json b/ingestion/examples/workflows/salesforce.json index dbee7cec3e0..81265855f71 100644 --- a/ingestion/examples/workflows/salesforce.json +++ b/ingestion/examples/workflows/salesforce.json @@ -1,24 +1,31 @@ { "source": { "type": "salesforce", - "config": { - "username": "username", - "password": "password", - "security_token": "secuirty_token", - "service_name": "local_salesforce", - "scheme": "salesforce", - "sobject_name": "Salesforce Object Name" + "serviceName": "local_salesforce", + "serviceConnection": { + "config": { + "type": "Salesforce", + "username": "username", + "password": "password", + "securityToken": "securityToken", + "scheme": "salesforce", + "sobjectName": "sobjectName" + } + }, + "sourceConfig": { + "config": { + "enableDataProfiler": false + } } }, "sink": { "type": "metadata-rest", "config": {} }, - "metadata_server": { - "type": "metadata-server", - "config": { - "api_endpoint": "http://localhost:8585/api", - "auth_provider_type": "no-auth" + "workflowConfig": { + "openMetadataServerConfig": { + "hostPort": "http://localhost:8585/api", + "authProvider": "no-auth" } } -} +} \ No newline at end of file diff --git a/ingestion/src/metadata/ingestion/source/salesforce.py b/ingestion/src/metadata/ingestion/source/salesforce.py index 742f6e20a91..b5ff6992cbb 100644 --- a/ingestion/src/metadata/ingestion/source/salesforce.py +++ b/ingestion/src/metadata/ingestion/source/salesforce.py @@ -18,19 +18,25 @@ from pydantic import SecretStr, ValidationError from simple_salesforce import Salesforce from metadata.generated.schema.entity.data.database import Database +from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema from metadata.generated.schema.entity.data.table import ( Column, Constraint, Table, TableData, ) +from metadata.generated.schema.entity.services.connections.database.salesforceConnection import ( + SalesforceConnection, +) from metadata.generated.schema.metadataIngestion.workflow import ( OpenMetadataServerConfig, ) +from metadata.generated.schema.metadataIngestion.workflow import ( + Source as WorkflowSource, +) from metadata.generated.schema.type.entityReference import EntityReference -from metadata.ingestion.api.source import Source, SourceStatus +from metadata.ingestion.api.source import InvalidSourceException, Source, SourceStatus from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable -from metadata.ingestion.source.sql_source import SQLConnectionConfig from metadata.utils.helpers import get_database_service_or_create logger: logging.Logger = logging.getLogger(__name__) @@ -54,20 +60,8 @@ class SalesforceSourceStatus(SourceStatus): logger.warning("Dropped Table {} due to {}".format(table_name, err)) -from metadata.generated.schema.entity.services.connections.database.salesforceConnection import ( - SalesforceConnection, -) - - -class SalesforceConfig(SalesforceConnection, SQLConnectionConfig): - def get_connection_url(self): - return super().get_connection_url() - - class SalesforceSource(Source[OMetaDatabaseAndTable]): - def __init__( - self, config: SalesforceConfig, metadata_config: OpenMetadataServerConfig - ): + def __init__(self, config, metadata_config: OpenMetadataServerConfig): super().__init__() self.config = config self.service = get_database_service_or_create( @@ -75,14 +69,20 @@ class SalesforceSource(Source[OMetaDatabaseAndTable]): ) self.status = SalesforceSourceStatus() self.sf = Salesforce( - username=self.config.username, - password=self.config.password.get_secret_value(), - security_token=self.config.securityToken, + username=self.config.serviceConnection.__root__.config.username, + password=self.config.serviceConnection.__root__.config.password.get_secret_value(), + security_token=self.config.serviceConnection.__root__.config.securityToken, ) @classmethod - def create(cls, config: dict, metadata_config: OpenMetadataServerConfig): - config = SalesforceConfig.parse_obj(config) + def create(cls, config_dict, metadata_config: OpenMetadataServerConfig): + config: WorkflowSource = WorkflowSource.parse_obj(config_dict) + connection: SalesforceConnection = config.serviceConnection.__root__.config + if not isinstance(connection, SalesforceConnection): + raise InvalidSourceException( + f"Expected SalesforceConnection, but got {connection}" + ) + return cls(config, metadata_config) def column_type(self, column_type: str): @@ -127,7 +127,10 @@ class SalesforceSource(Source[OMetaDatabaseAndTable]): row_order = 1 table_columns = [] md = self.sf.restful( - "sobjects/{}/describe/".format(self.config.sobjectName), params=None + "sobjects/{}/describe/".format( + self.config.serviceConnection.__root__.config.sobjectName + ), + params=None, ) for column in md["fields"]: @@ -150,28 +153,47 @@ class SalesforceSource(Source[OMetaDatabaseAndTable]): ) ) row_order += 1 - table_data = self.fetch_sample_data(self.config.sobjectName) + table_data = self.fetch_sample_data( + self.config.serviceConnection.__root__.config.sobjectName + ) logger.info("Successfully Ingested the sample data") table_entity = Table( id=uuid.uuid4(), - name=self.config.sobjectName, + name=self.config.serviceConnection.__root__.config.sobjectName, tableType="Regular", description=" ", columns=table_columns, sampleData=table_data, ) - self.status.scanned(f"{self.config.scheme}.{self.config.sobjectName}") + self.status.scanned( + f"{self.config.serviceConnection.__root__.config.scheme}.{self.config.serviceConnection.__root__.config.sobjectName}" + ) + database_entity = Database( - name=self.config.scheme, - service=EntityReference(id=self.service.id, type=self.config.type), + id=uuid.uuid4(), + name="default", + service=EntityReference(id=self.service.id, type="databaseService"), + ) + schema_entity = DatabaseSchema( + id=uuid.uuid4(), + name=self.config.serviceConnection.__root__.config.scheme.name, + database=EntityReference(id=database_entity.id, type="databaseSchema"), + service=EntityReference(id=self.service.id, type="databaseService"), ) table_and_db = OMetaDatabaseAndTable( - table=table_entity, database=database_entity + table=table_entity, + database=database_entity, + database_schema=schema_entity, ) + yield table_and_db + except ValidationError as err: logger.error(err) - self.status.failure("{}".format(self.config.sobjectName), err) + self.status.failure( + "{}".format(self.config.serviceConnection.__root__.config.sobjectName), + err, + ) def prepare(self): pass diff --git a/ingestion/src/metadata/utils/source_connections.py b/ingestion/src/metadata/utils/source_connections.py index f94e0ffa067..1cd51488fb5 100644 --- a/ingestion/src/metadata/utils/source_connections.py +++ b/ingestion/src/metadata/utils/source_connections.py @@ -31,6 +31,9 @@ from metadata.generated.schema.entity.services.connections.database.mysqlConnect from metadata.generated.schema.entity.services.connections.database.redshiftConnection import ( RedshiftConnection, ) +from metadata.generated.schema.entity.services.connections.database.salesforceConnection import ( + SalesforceConnection, +) from metadata.generated.schema.entity.services.connections.database.singleStoreConnection import ( SingleStoreConnection, ) @@ -84,6 +87,7 @@ def get_connection_url(connection): @get_connection_url.register(RedshiftConnection) @get_connection_url.register(MysqlConnection) +@get_connection_url.register(SalesforceConnection) @get_connection_url.register(ClickhouseConnection) @get_connection_url.register(SingleStoreConnection) @get_connection_url.register(VerticaConnection)