From 3965b030a9ba8c8b5ea42bca55659ddf2c218901 Mon Sep 17 00:00:00 2001 From: parthp2107 <83201188+parthp2107@users.noreply.github.com> Date: Wed, 8 Sep 2021 00:37:13 +0530 Subject: [PATCH] Added Salesforce Connector (#423) * Added salesforce-connector * minor changes * Added Salesforce-connector * Added Salesforce-connector * Salesforce sample data implemented Co-authored-by: parthp2107 Co-authored-by: Ayush Shah --- ingestion/examples/workflows/salesforce.json | 36 ++++ .../metadata/ingestion/sink/elasticsearch.py | 2 +- .../metadata/ingestion/source/salesforce.py | 163 ++++++++++++++++++ 3 files changed, 200 insertions(+), 1 deletion(-) create mode 100644 ingestion/examples/workflows/salesforce.json create mode 100644 ingestion/src/metadata/ingestion/source/salesforce.py diff --git a/ingestion/examples/workflows/salesforce.json b/ingestion/examples/workflows/salesforce.json new file mode 100644 index 00000000000..261db63a81e --- /dev/null +++ b/ingestion/examples/workflows/salesforce.json @@ -0,0 +1,36 @@ +{ + "source": { + "type": "salesforce", + "config": { + "username": "username", + "password": "password", + "security_token": "secuirty_token", + "service_name": "local_salesforce", + "scheme": "salesforce", + "sobject_name": "Salesforce Object Name" + } + }, + "processor": { + "type": "pii", + "config": { + } + }, + "sink": { + "type": "metadata-rest", + "config": {} + }, + "metadata_server": { + "type": "metadata-server", + "config": { + "api_endpoint": "http://localhost:8585/api", + "auth_provider_type": "no-auth" + } + }, + "cron": { + "minute": "*/5", + "hour": null, + "day": null, + "month": null, + "day_of_week": null + } +} diff --git a/ingestion/src/metadata/ingestion/sink/elasticsearch.py b/ingestion/src/metadata/ingestion/sink/elasticsearch.py index de68a465c32..fa5c103a7e6 100644 --- a/ingestion/src/metadata/ingestion/sink/elasticsearch.py +++ b/ingestion/src/metadata/ingestion/sink/elasticsearch.py @@ -108,7 +108,7 @@ class ElasticsearchSink(Sink): dashboard_doc = self._create_dashboard_es_doc(record) self.elasticsearch_client.index(index=self.config.dashboard_index_name, id=str(dashboard_doc.dashboard_id), body=dashboard_doc.json()) - self.status.records_written(record.name) + self.status.records_written(record.name.__root__) def _create_table_es_doc(self, table: Table): fqdn = table.fullyQualifiedName diff --git a/ingestion/src/metadata/ingestion/source/salesforce.py b/ingestion/src/metadata/ingestion/source/salesforce.py new file mode 100644 index 00000000000..74267e0a993 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/salesforce.py @@ -0,0 +1,163 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import uuid +from dataclasses import dataclass, field +from typing import Iterable, Optional, List + +from metadata.ingestion.api.common import WorkflowContext +from metadata.ingestion.api.source import Source, SourceStatus +from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable +from simple_salesforce import Salesforce + +from .sql_source import SQLConnectionConfig +from ..ometa.openmetadata_rest import MetadataServerConfig +from ...generated.schema.entity.data.database import Database +from ...generated.schema.entity.data.table import Column, ColumnConstraint, Table, TableData +from ...generated.schema.type.entityReference import EntityReference +from metadata.utils.helpers import get_database_service_or_create +from pydantic import ValidationError + +logger: logging.Logger = logging.getLogger(__name__) + +@dataclass +class SalesforceSourceStatus(SourceStatus): + success: List[str] = field(default_factory=list) + failures: List[str] = field(default_factory=list) + warnings: List[str] = field(default_factory=list) + filtered: List[str] = field(default_factory=list) + + def scanned(self, table_name: str) -> None: + self.success.append(table_name) + logger.info('Table Scanned: {}'.format(table_name)) + + def filter( + self, table_name: str, err: str, dataset_name: str = None, col_type: str = None + ) -> None: + self.filtered.append(table_name) + logger.warning("Dropped Table {} due to {}".format(table_name, err)) + + +class SalesforceConfig(SQLConnectionConfig): + username: str + password: str + security_token: str + host_port: Optional[str] + scheme: str + service_type = "MySQL" + sobject_name: str + + def get_connection_url(self): + return super().get_connection_url() + + +class SalesforceSource(Source): + def __init__(self, config: SalesforceConfig, metadata_config: MetadataServerConfig, ctx): + super().__init__(ctx) + self.config = config + self.service = get_database_service_or_create(config, metadata_config) + self.status = SalesforceSourceStatus() + self.sf = Salesforce( + username=self.config.username, password=self.config.password, + security_token=self.config.security_token + ) + + @classmethod + def create(cls, config: dict, metadata_config: dict, ctx: WorkflowContext): + config = SalesforceConfig.parse_obj(config) + metadata_config = MetadataServerConfig.parse_obj(metadata_config) + return cls(config, metadata_config, ctx) + + def column_type(self, column_type: str): + if column_type in ["ID", "PHONE", "CURRENCY"]: + type = "INT" + elif column_type in ["REFERENCE", "PICKLIST", "TEXTAREA", "ADDRESS", "URL"]: + type = "VARCHAR" + else: + type = column_type + return type + + def next_record(self) -> Iterable[OMetaDatabaseAndTable]: + yield from self.salesforce_client() + + def fetch_sample_data(self,sobject_name): + md = self.sf.restful("sobjects/{}/describe/".format(sobject_name), params=None) + columns = [] + rows = [] + for column in md['fields']: + columns.append(column['name']) + query = "select {} from {}".format(str(columns)[1:-1].replace('\'',''),sobject_name) + logger.info("Ingesting data using {}".format(query)) + resp = self.sf.query(query) + for record in resp['records']: + row = [] + for column in columns: + row.append(record[f'{column}']) + rows.append(row) + return TableData(columns=columns, rows=rows) + + def salesforce_client(self) -> Iterable[OMetaDatabaseAndTable]: + try: + + row_order = 1 + table_columns = [] + md = self.sf.restful("sobjects/{}/describe/".format(self.config.sobject_name), params=None) + + for column in md['fields']: + col_constraint = None + if column['nillable']: + col_constraint = ColumnConstraint.NULL + elif not column['nillable']: + col_constraint = ColumnConstraint.NOT_NULL + if column['unique']: + col_constraint = ColumnConstraint.UNIQUE + + table_columns.append( + Column( + name=column['name'], + description=column['label'], + columnDataType=self.column_type(column['type'].upper()), + columnConstraint=col_constraint, + ordinalPosition=row_order + ) + ) + row_order += 1 + table_data = self.fetch_sample_data(self.config.sobject_name) + logger.info("Successfully Ingested the sample data") + table_entity = Table( + id=uuid.uuid4(), + name=self.config.sobject_name, + tableType='Regular', + description=" ", + columns=table_columns, + sampleData=table_data + ) + self.status.scanned(f"{self.config.scheme}.{self.config.sobject_name}") + database_entity = Database( + name=self.config.scheme, + service=EntityReference(id=self.service.id, type=self.config.service_type) + ) + table_and_db = OMetaDatabaseAndTable(table=table_entity, database=database_entity) + yield table_and_db + except ValidationError as err: + logger.error(err) + self.status.failure('{}'.format(self.config.sobject_name), err) + + def prepare(self): + pass + + def get_status(self) -> SourceStatus: + return self.status