48 lines
1.6 KiB
Python
Raw Permalink Normal View History

from unittest.mock import Mock, patch
from datahub.ingestion.graph.client import (
DatahubClientConfig,
DataHubGraph,
2025-03-03 10:05:26 -08:00
entity_type_to_graphql,
)
from datahub.metadata.schema_classes import CorpUserEditableInfoClass
@patch("datahub.emitter.rest_emitter.DataHubRestEmitter.test_connection")
def test_get_aspect(mock_test_connection):
mock_test_connection.return_value = {}
graph = DataHubGraph(DatahubClientConfig(server="http://fake-domain.local"))
user_urn = "urn:li:corpuser:foo"
with patch("requests.Session.get") as mock_get:
mock_response = Mock()
mock_response.json = Mock(
return_value={
"version": 0,
"aspect": {"com.linkedin.identity.CorpUserEditableInfo": {}},
}
)
mock_get.return_value = mock_response
editable = graph.get_aspect(user_urn, CorpUserEditableInfoClass)
assert editable is not None
2025-03-03 10:05:26 -08:00
def test_graphql_entity_types() -> None:
# FIXME: This is a subset of all the types, but it's enough to get us ok coverage.
2025-03-03 10:05:26 -08:00
known_mappings = {
"domain": "DOMAIN",
"dataset": "DATASET",
"dashboard": "DASHBOARD",
"chart": "CHART",
"corpuser": "CORP_USER",
"corpGroup": "CORP_GROUP",
"dataFlow": "DATA_FLOW",
"dataJob": "DATA_JOB",
"glossaryNode": "GLOSSARY_NODE",
"glossaryTerm": "GLOSSARY_TERM",
"dataHubExecutionRequest": "EXECUTION_REQUEST",
}
for entity_type, graphql_type in known_mappings.items():
assert entity_type_to_graphql(entity_type) == graphql_type