2022-07-06 22:31:16 +05:30
|
|
|
import json
|
|
|
|
import pathlib
|
|
|
|
from unittest import mock
|
2024-08-16 10:46:42 +02:00
|
|
|
from unittest.mock import Mock
|
2022-07-06 22:31:16 +05:30
|
|
|
|
|
|
|
from freezegun import freeze_time
|
|
|
|
|
|
|
|
from datahub.ingestion.run.pipeline import Pipeline
|
2024-08-16 10:46:42 +02:00
|
|
|
from datahub.ingestion.source.salesforce import SalesforceConfig, SalesforceSource
|
2022-07-06 22:31:16 +05:30
|
|
|
from tests.test_helpers import mce_helpers
|
|
|
|
|
|
|
|
FROZEN_TIME = "2022-05-12 11:00:00"
|
|
|
|
|
2023-04-11 02:44:42 +05:30
|
|
|
test_resources_dir = pathlib.Path(__file__).parent
|
2022-07-06 22:31:16 +05:30
|
|
|
|
|
|
|
|
|
|
|
def _read_response(file_name: str) -> dict:
|
|
|
|
response_json_path = f"{test_resources_dir}/mock_files/{file_name}"
|
|
|
|
with open(response_json_path) as file:
|
|
|
|
data = json.loads(file.read())
|
|
|
|
return data
|
|
|
|
|
|
|
|
|
2024-08-16 10:46:42 +02:00
|
|
|
class MockResponse:
|
|
|
|
def __init__(self, json_data, status_code):
|
|
|
|
self.json_data = json_data
|
|
|
|
self.status_code = status_code
|
|
|
|
|
|
|
|
def json(self):
|
|
|
|
return self.json_data
|
2022-07-06 22:31:16 +05:30
|
|
|
|
|
|
|
|
2024-08-16 10:46:42 +02:00
|
|
|
def side_effect_call_salesforce(type, url):
|
2022-07-06 22:31:16 +05:30
|
|
|
if url.endswith("/services/data/"):
|
|
|
|
return MockResponse(_read_response("versions_response.json"), 200)
|
|
|
|
if url.endswith("FROM EntityDefinition WHERE IsCustomizable = true"):
|
|
|
|
return MockResponse(_read_response("entity_definition_soql_response.json"), 200)
|
|
|
|
elif url.endswith("FROM EntityParticle WHERE EntityDefinitionId='Account'"):
|
|
|
|
return MockResponse(_read_response("account_fields_soql_response.json"), 200)
|
|
|
|
elif url.endswith("FROM CustomField WHERE EntityDefinitionId='Account'"):
|
|
|
|
return MockResponse(
|
|
|
|
_read_response("account_custom_fields_soql_response.json"), 200
|
|
|
|
)
|
|
|
|
elif url.endswith("FROM CustomObject where DeveloperName='Property'"):
|
|
|
|
return MockResponse(
|
|
|
|
_read_response("property_custom_object_soql_response.json"), 200
|
|
|
|
)
|
|
|
|
elif url.endswith(
|
|
|
|
"FROM EntityParticle WHERE EntityDefinitionId='01I5i000000Y6fp'"
|
|
|
|
): # DurableId of Property__c
|
|
|
|
return MockResponse(_read_response("property_fields_soql_response.json"), 200)
|
|
|
|
elif url.endswith("FROM CustomField WHERE EntityDefinitionId='01I5i000000Y6fp'"):
|
|
|
|
return MockResponse(
|
|
|
|
_read_response("property_custom_fields_soql_response.json"), 200
|
|
|
|
)
|
|
|
|
elif url.endswith("/recordCount?sObjects=Property__c"):
|
|
|
|
return MockResponse(_read_response("record_count_property_response.json"), 200)
|
|
|
|
return MockResponse({}, 404)
|
|
|
|
|
|
|
|
|
2024-08-16 10:46:42 +02:00
|
|
|
@mock.patch("datahub.ingestion.source.salesforce.Salesforce")
|
|
|
|
def test_latest_version(mock_sdk):
|
|
|
|
mock_sf = mock.Mock()
|
|
|
|
mocked_call = mock.Mock()
|
|
|
|
mocked_call.side_effect = side_effect_call_salesforce
|
|
|
|
mock_sf._call_salesforce = mocked_call
|
|
|
|
mock_sdk.return_value = mock_sf
|
|
|
|
|
|
|
|
config = SalesforceConfig.parse_obj(
|
|
|
|
{
|
|
|
|
"auth": "DIRECT_ACCESS_TOKEN",
|
|
|
|
"instance_url": "https://mydomain.my.salesforce.com/",
|
|
|
|
"access_token": "access_token`",
|
|
|
|
"ingest_tags": True,
|
|
|
|
"object_pattern": {
|
|
|
|
"allow": [
|
|
|
|
"^Account$",
|
|
|
|
"^Property__c$",
|
|
|
|
],
|
|
|
|
},
|
|
|
|
"domain": {"sales": {"allow": {"^Property__c$"}}},
|
|
|
|
"profiling": {"enabled": True},
|
|
|
|
"profile_pattern": {
|
|
|
|
"allow": [
|
|
|
|
"^Property__c$",
|
|
|
|
]
|
|
|
|
},
|
|
|
|
}
|
|
|
|
)
|
|
|
|
SalesforceSource(config=config, ctx=Mock())
|
|
|
|
calls = mock_sf._call_salesforce.mock_calls
|
2025-01-18 15:06:20 +05:30
|
|
|
assert len(calls) == 1, (
|
|
|
|
"We didn't specify version but source didn't call SF API to get the latest one"
|
|
|
|
)
|
|
|
|
assert calls[0].ends_with("/services/data"), (
|
|
|
|
"Source didn't call proper SF API endpoint to get all versions"
|
|
|
|
)
|
|
|
|
assert mock_sf.sf_version == "54.0", (
|
|
|
|
"API version was not correctly set (see versions_responses.json)"
|
|
|
|
)
|
2024-08-16 10:46:42 +02:00
|
|
|
|
|
|
|
|
|
|
|
@mock.patch("datahub.ingestion.source.salesforce.Salesforce")
|
|
|
|
def test_custom_version(mock_sdk):
|
|
|
|
mock_sf = mock.Mock()
|
|
|
|
mocked_call = mock.Mock()
|
|
|
|
mocked_call.side_effect = side_effect_call_salesforce
|
|
|
|
mock_sf._call_salesforce = mocked_call
|
|
|
|
mock_sdk.return_value = mock_sf
|
|
|
|
|
|
|
|
config = SalesforceConfig.parse_obj(
|
|
|
|
{
|
|
|
|
"auth": "DIRECT_ACCESS_TOKEN",
|
|
|
|
"api_version": "46.0",
|
|
|
|
"instance_url": "https://mydomain.my.salesforce.com/",
|
|
|
|
"access_token": "access_token`",
|
|
|
|
"ingest_tags": True,
|
|
|
|
"object_pattern": {
|
|
|
|
"allow": [
|
|
|
|
"^Account$",
|
|
|
|
"^Property__c$",
|
|
|
|
],
|
|
|
|
},
|
|
|
|
"domain": {"sales": {"allow": {"^Property__c$"}}},
|
|
|
|
"profiling": {"enabled": True},
|
|
|
|
"profile_pattern": {
|
|
|
|
"allow": [
|
|
|
|
"^Property__c$",
|
|
|
|
]
|
|
|
|
},
|
|
|
|
}
|
|
|
|
)
|
|
|
|
SalesforceSource(config=config, ctx=Mock())
|
|
|
|
|
|
|
|
calls = mock_sf._call_salesforce.mock_calls
|
2025-01-18 15:06:20 +05:30
|
|
|
assert len(calls) == 0, (
|
|
|
|
"Source called API to get all versions even though we specified proper version"
|
|
|
|
)
|
|
|
|
assert mock_sdk.call_args.kwargs["version"] == "46.0", (
|
|
|
|
"API client object was not correctly initialized with the custom version"
|
|
|
|
)
|
2024-08-16 10:46:42 +02:00
|
|
|
|
|
|
|
|
2022-07-06 22:31:16 +05:30
|
|
|
@freeze_time(FROZEN_TIME)
|
|
|
|
def test_salesforce_ingest(pytestconfig, tmp_path):
|
2024-08-16 10:46:42 +02:00
|
|
|
with mock.patch("datahub.ingestion.source.salesforce.Salesforce") as mock_sdk:
|
2022-07-06 22:31:16 +05:30
|
|
|
mock_sf = mock.Mock()
|
|
|
|
mocked_call = mock.Mock()
|
|
|
|
mocked_call.side_effect = side_effect_call_salesforce
|
|
|
|
mock_sf._call_salesforce = mocked_call
|
|
|
|
mock_sdk.return_value = mock_sf
|
|
|
|
|
|
|
|
pipeline = Pipeline.create(
|
|
|
|
{
|
|
|
|
"run_id": "salesforce-test",
|
|
|
|
"source": {
|
|
|
|
"type": "salesforce",
|
|
|
|
"config": {
|
|
|
|
"auth": "DIRECT_ACCESS_TOKEN",
|
|
|
|
"instance_url": "https://mydomain.my.salesforce.com/",
|
|
|
|
"access_token": "access_token`",
|
|
|
|
"ingest_tags": True,
|
|
|
|
"object_pattern": {
|
|
|
|
"allow": [
|
|
|
|
"^Account$",
|
|
|
|
"^Property__c$",
|
|
|
|
],
|
|
|
|
},
|
|
|
|
"domain": {"sales": {"allow": {"^Property__c$"}}},
|
|
|
|
"profiling": {"enabled": True},
|
|
|
|
"profile_pattern": {
|
|
|
|
"allow": [
|
|
|
|
"^Property__c$",
|
|
|
|
]
|
|
|
|
},
|
|
|
|
},
|
|
|
|
},
|
|
|
|
"sink": {
|
|
|
|
"type": "file",
|
|
|
|
"config": {
|
|
|
|
"filename": f"{tmp_path}/salesforce_mces.json",
|
|
|
|
},
|
|
|
|
},
|
|
|
|
}
|
|
|
|
)
|
|
|
|
pipeline.run()
|
|
|
|
pipeline.raise_from_status()
|
|
|
|
|
|
|
|
mce_helpers.check_golden_file(
|
|
|
|
pytestconfig,
|
|
|
|
output_path=f"{tmp_path}/salesforce_mces.json",
|
|
|
|
golden_path=test_resources_dir / "salesforce_mces_golden.json",
|
|
|
|
ignore_paths=mce_helpers.IGNORE_PATH_TIMESTAMPS,
|
|
|
|
)
|