Added DBT tests with versionless and fixed v7 parsing (#18028)

This commit is contained in:
Suman Maharana 2024-09-27 19:53:27 +05:30 committed by GitHub
parent c9d37f6afc
commit bc6f4824ea
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 19868 additions and 0 deletions

View File

@ -203,6 +203,7 @@ class DbtServiceSource(TopologyRunnerMixin, Source, ABC):
"compiled_sql",
"raw_code",
"raw_sql",
"language",
}
for node, value in manifest_dict.get( # pylint: disable=unused-variable

File diff suppressed because one or more lines are too long

View File

@ -79,6 +79,8 @@ MOCK_SAMPLE_MANIFEST_V7 = "resources/datasets/manifest_v7.json"
MOCK_SAMPLE_MANIFEST_V8 = "resources/datasets/manifest_v8.json"
MOCK_SAMPLE_MANIFEST_VERSIONLESS = "resources/datasets/manifest_versionless.json"
MOCK_SAMPLE_MANIFEST_NULL_DB = "resources/datasets/manifest_null_db.json"
MOCK_SAMPLE_MANIFEST_TEST_NODE = "resources/datasets/manifest_test_node.json"
@ -197,6 +199,92 @@ EXPECTED_DATA_MODEL_NULL_DB = [
),
]
EXPECTED_DATA_MODEL_VERSIONLESS = [
DataModel(
modelType="DBT",
resourceType="model",
description="This table has basic information about a customer, as well as some derived facts based on a customer's orders",
path="models/customers.sql",
rawSql="sample customers raw code",
sql="sample customers compile code",
upstream=[
"dbt_test.dev.dbt_jaffle.customers",
"dbt_test.dev.dbt_jaffle.customers",
"dbt_test.dev.dbt_jaffle.customers",
],
owners=EntityReferenceList(
root=[
EntityReference(
id="cb2a92f5-e935-4ad7-911c-654280046538",
type="user",
fullyQualifiedName="aaron_johnson0",
href=AnyUrl(
"http://localhost:8585/api/v1/users/cb2a92f5-e935-4ad7-911c-654280046538"
),
)
]
),
tags=[
TagLabel(
tagFQN="dbtTags.model_tag_one",
source="Classification",
labelType="Automated",
state="Suggested",
),
TagLabel(
tagFQN="dbtTags.model_tag_two",
source="Classification",
labelType="Automated",
state="Suggested",
),
],
columns=[
Column(
name="customer_id",
dataType="UNKNOWN",
dataLength=1,
description="This is a unique identifier for a customer",
),
Column(
name="first_name",
dataType="UNKNOWN",
dataLength=1,
description="Customer's first name. PII.",
),
Column(
name="last_name",
dataType="UNKNOWN",
dataLength=1,
description="Customer's last name. PII.",
),
Column(
name="first_order",
dataType="UNKNOWN",
dataLength=1,
description="Date (UTC) of a customer's first order",
),
Column(
name="most_recent_order",
dataType="UNKNOWN",
dataLength=1,
description="Date (UTC) of a customer's most recent order",
),
Column(
name="number_of_orders",
dataType="UNKNOWN",
dataLength=1,
description="Count of the number of orders a customer has placed",
),
Column(
name="total_order_amount",
dataType="UNKNOWN",
dataLength=1,
description="Total value (AUD) of a customer's orders",
),
],
)
]
MOCK_OWNER = EntityReferenceList(
root=[
EntityReference(
@ -345,6 +433,34 @@ class DbtUnitTest(TestCase):
expected_data_models=EXPECTED_DATA_MODELS,
)
@patch("metadata.ingestion.source.database.dbt.metadata.DbtSource.get_dbt_owner")
@patch("metadata.ingestion.ometa.mixins.es_mixin.ESMixin.es_search_from_fqn")
@patch("metadata.utils.tag_utils.get_tag_label")
def test_dbt_manifest_versionless(
self, get_tag_label, es_search_from_fqn, get_dbt_owner
):
get_dbt_owner.return_value = MOCK_OWNER
es_search_from_fqn.return_value = MOCK_TABLE_ENTITIES
get_tag_label.side_effect = [
TagLabel(
tagFQN="dbtTags.model_tag_one",
labelType=LabelType.Automated.value,
state=State.Suggested.value,
source=TagSource.Classification.value,
),
TagLabel(
tagFQN="dbtTags.model_tag_two",
labelType=LabelType.Automated.value,
state=State.Suggested.value,
source=TagSource.Classification.value,
),
]
self.execute_test(
MOCK_SAMPLE_MANIFEST_VERSIONLESS,
expected_records=9,
expected_data_models=EXPECTED_DATA_MODEL_VERSIONLESS,
)
@patch("metadata.ingestion.source.database.dbt.metadata.DbtSource.get_dbt_owner")
@patch("metadata.ingestion.ometa.mixins.es_mixin.ESMixin.es_search_from_fqn")
def test_dbt_manifest_null_db(self, es_search_from_fqn, get_dbt_owner):
@ -470,6 +586,16 @@ class DbtUnitTest(TestCase):
result = get_dbt_raw_query(mnode=manifest_node)
self.assertEqual(expected_query, result)
# Test the raw queries with versionless manifest
_, dbt_objects = self.get_dbt_object_files(
mock_manifest=MOCK_SAMPLE_MANIFEST_VERSIONLESS
)
manifest_node = dbt_objects.dbt_manifest.nodes.get(
"model.jaffle_shop.customers"
)
result = get_dbt_raw_query(mnode=manifest_node)
self.assertEqual(expected_query, result)
@patch(
"metadata.ingestion.ometa.mixins.user_mixin.OMetaUserMixin.get_reference_by_name"
)
@ -502,6 +628,7 @@ class DbtUnitTest(TestCase):
mock_file_path = Path(__file__).parent / mock_manifest
with open(mock_file_path) as file:
mock_data: dict = json.load(file)
self.dbt_source_obj.remove_manifest_non_required_keys(manifest_dict=mock_data)
dbt_files = DbtFiles(dbt_manifest=mock_data)
dbt_objects = DbtObjects(
dbt_catalog=parse_catalog(dbt_files.dbt_catalog)
@ -540,6 +667,43 @@ class DbtUnitTest(TestCase):
):
self.assertEqual(expected, original)
@patch("metadata.ingestion.ometa.mixins.es_mixin.ESMixin.es_search_from_fqn")
def test_updtream_nodes_for_lineage(self, es_search_from_fqn):
expected_upstream_nodes = [
"model.jaffle_shop.stg_customers",
"model.jaffle_shop.stg_orders",
"model.jaffle_shop.stg_payments",
]
es_search_from_fqn.return_value = MOCK_TABLE_ENTITIES
# Test the raw queries with V4 V5 V6 manifest
_, dbt_objects = self.get_dbt_object_files(
mock_manifest=MOCK_SAMPLE_MANIFEST_V4_V5_V6
)
upstream_nodes = dbt_objects.dbt_manifest.nodes.get(
"model.jaffle_shop.customers"
).depends_on.nodes
self.assertEqual(expected_upstream_nodes, upstream_nodes)
# Test the raw queries with V7 manifest
_, dbt_objects = self.get_dbt_object_files(
mock_manifest=MOCK_SAMPLE_MANIFEST_V7
)
upstream_nodes = dbt_objects.dbt_manifest.nodes.get(
"model.jaffle_shop.customers"
).depends_on.nodes
self.assertEqual(expected_upstream_nodes, upstream_nodes)
# Test the raw queries with VERSIONLESS manifest
_, dbt_objects = self.get_dbt_object_files(
mock_manifest=MOCK_SAMPLE_MANIFEST_VERSIONLESS
)
upstream_nodes = dbt_objects.dbt_manifest.nodes.get(
"model.jaffle_shop.customers"
).depends_on.nodes
self.assertEqual(expected_upstream_nodes, upstream_nodes)
@patch("metadata.utils.tag_utils.get_tag_label")
def test_dbt_glossary_tiers(self, get_tag_label):
get_tag_label.side_effect = [