salesforce-connector-fixed (#3923)

* salesforce-connector-fixed

* salesforce-connection-updated

* ran-make-core_bump_version_dev

* code-formatted

* removed-get_connection_url

* code-formatted

* code-formatted

* code-formatted
This commit is contained in:
codingwithabhi 2022-04-08 16:08:31 +05:30 committed by GitHub
parent 5219bad5e1
commit bb76a2f11b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 80 additions and 43 deletions

View File

@ -31,7 +31,8 @@
"AzureSQL",
"SingleStore",
"SQLite",
"DeltaLake"
"DeltaLake",
"Salesforce"
],
"javaEnums": [
{
@ -102,6 +103,9 @@
},
{
"name": "DeltaLake"
},
{
"name": "Salesforce"
}
]
},

View File

@ -7,5 +7,5 @@ Provides metadata version information.
from incremental import Version
__version__ = Version("metadata", 0, 9, 0, dev=13)
__version__ = Version("metadata", 0, 9, 0, dev=16)
__all__ = ["__version__"]

View File

@ -1,24 +1,31 @@
{
"source": {
"type": "salesforce",
"config": {
"username": "username",
"password": "password",
"security_token": "secuirty_token",
"service_name": "local_salesforce",
"scheme": "salesforce",
"sobject_name": "Salesforce Object Name"
"serviceName": "local_salesforce",
"serviceConnection": {
"config": {
"type": "Salesforce",
"username": "username",
"password": "password",
"securityToken": "securityToken",
"scheme": "salesforce",
"sobjectName": "sobjectName"
}
},
"sourceConfig": {
"config": {
"enableDataProfiler": false
}
}
},
"sink": {
"type": "metadata-rest",
"config": {}
},
"metadata_server": {
"type": "metadata-server",
"config": {
"api_endpoint": "http://localhost:8585/api",
"auth_provider_type": "no-auth"
"workflowConfig": {
"openMetadataServerConfig": {
"hostPort": "http://localhost:8585/api",
"authProvider": "no-auth"
}
}
}
}

View File

@ -18,19 +18,25 @@ from pydantic import SecretStr, ValidationError
from simple_salesforce import Salesforce
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
from metadata.generated.schema.entity.data.table import (
Column,
Constraint,
Table,
TableData,
)
from metadata.generated.schema.entity.services.connections.database.salesforceConnection import (
SalesforceConnection,
)
from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataServerConfig,
)
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.source import Source, SourceStatus
from metadata.ingestion.api.source import InvalidSourceException, Source, SourceStatus
from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable
from metadata.ingestion.source.sql_source import SQLConnectionConfig
from metadata.utils.helpers import get_database_service_or_create
logger: logging.Logger = logging.getLogger(__name__)
@ -54,20 +60,8 @@ class SalesforceSourceStatus(SourceStatus):
logger.warning("Dropped Table {} due to {}".format(table_name, err))
from metadata.generated.schema.entity.services.connections.database.salesforceConnection import (
SalesforceConnection,
)
class SalesforceConfig(SalesforceConnection, SQLConnectionConfig):
def get_connection_url(self):
return super().get_connection_url()
class SalesforceSource(Source[OMetaDatabaseAndTable]):
def __init__(
self, config: SalesforceConfig, metadata_config: OpenMetadataServerConfig
):
def __init__(self, config, metadata_config: OpenMetadataServerConfig):
super().__init__()
self.config = config
self.service = get_database_service_or_create(
@ -75,14 +69,20 @@ class SalesforceSource(Source[OMetaDatabaseAndTable]):
)
self.status = SalesforceSourceStatus()
self.sf = Salesforce(
username=self.config.username,
password=self.config.password.get_secret_value(),
security_token=self.config.securityToken,
username=self.config.serviceConnection.__root__.config.username,
password=self.config.serviceConnection.__root__.config.password.get_secret_value(),
security_token=self.config.serviceConnection.__root__.config.securityToken,
)
@classmethod
def create(cls, config: dict, metadata_config: OpenMetadataServerConfig):
config = SalesforceConfig.parse_obj(config)
def create(cls, config_dict, metadata_config: OpenMetadataServerConfig):
config: WorkflowSource = WorkflowSource.parse_obj(config_dict)
connection: SalesforceConnection = config.serviceConnection.__root__.config
if not isinstance(connection, SalesforceConnection):
raise InvalidSourceException(
f"Expected SalesforceConnection, but got {connection}"
)
return cls(config, metadata_config)
def column_type(self, column_type: str):
@ -127,7 +127,10 @@ class SalesforceSource(Source[OMetaDatabaseAndTable]):
row_order = 1
table_columns = []
md = self.sf.restful(
"sobjects/{}/describe/".format(self.config.sobjectName), params=None
"sobjects/{}/describe/".format(
self.config.serviceConnection.__root__.config.sobjectName
),
params=None,
)
for column in md["fields"]:
@ -150,28 +153,47 @@ class SalesforceSource(Source[OMetaDatabaseAndTable]):
)
)
row_order += 1
table_data = self.fetch_sample_data(self.config.sobjectName)
table_data = self.fetch_sample_data(
self.config.serviceConnection.__root__.config.sobjectName
)
logger.info("Successfully Ingested the sample data")
table_entity = Table(
id=uuid.uuid4(),
name=self.config.sobjectName,
name=self.config.serviceConnection.__root__.config.sobjectName,
tableType="Regular",
description=" ",
columns=table_columns,
sampleData=table_data,
)
self.status.scanned(f"{self.config.scheme}.{self.config.sobjectName}")
self.status.scanned(
f"{self.config.serviceConnection.__root__.config.scheme}.{self.config.serviceConnection.__root__.config.sobjectName}"
)
database_entity = Database(
name=self.config.scheme,
service=EntityReference(id=self.service.id, type=self.config.type),
id=uuid.uuid4(),
name="default",
service=EntityReference(id=self.service.id, type="databaseService"),
)
schema_entity = DatabaseSchema(
id=uuid.uuid4(),
name=self.config.serviceConnection.__root__.config.scheme.name,
database=EntityReference(id=database_entity.id, type="databaseSchema"),
service=EntityReference(id=self.service.id, type="databaseService"),
)
table_and_db = OMetaDatabaseAndTable(
table=table_entity, database=database_entity
table=table_entity,
database=database_entity,
database_schema=schema_entity,
)
yield table_and_db
except ValidationError as err:
logger.error(err)
self.status.failure("{}".format(self.config.sobjectName), err)
self.status.failure(
"{}".format(self.config.serviceConnection.__root__.config.sobjectName),
err,
)
def prepare(self):
pass

View File

@ -31,6 +31,9 @@ from metadata.generated.schema.entity.services.connections.database.mysqlConnect
from metadata.generated.schema.entity.services.connections.database.redshiftConnection import (
RedshiftConnection,
)
from metadata.generated.schema.entity.services.connections.database.salesforceConnection import (
SalesforceConnection,
)
from metadata.generated.schema.entity.services.connections.database.singleStoreConnection import (
SingleStoreConnection,
)
@ -84,6 +87,7 @@ def get_connection_url(connection):
@get_connection_url.register(RedshiftConnection)
@get_connection_url.register(MysqlConnection)
@get_connection_url.register(SalesforceConnection)
@get_connection_url.register(ClickhouseConnection)
@get_connection_url.register(SingleStoreConnection)
@get_connection_url.register(VerticaConnection)