mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-12-09 14:10:18 +00:00
parent
e831c7dd2a
commit
0249368d3a
@ -953,6 +953,9 @@ class DbtSource(DbtServiceSource):
|
||||
def create_dbt_exposures_lineage(
|
||||
self, exposure_spec: dict
|
||||
) -> Iterable[Either[AddLineageRequest]]:
|
||||
"""
|
||||
Method to process dbt exposure lineage
|
||||
"""
|
||||
to_entity = exposure_spec[DbtCommonEnum.EXPOSURE]
|
||||
upstream = exposure_spec[DbtCommonEnum.UPSTREAM]
|
||||
manifest_node = exposure_spec[DbtCommonEnum.MANIFEST_NODE]
|
||||
|
||||
@ -588,6 +588,43 @@ class DbtUnitTest(TestCase):
|
||||
result,
|
||||
)
|
||||
|
||||
def test_dbt_generate_entity_link_with_column(self):
|
||||
_, dbt_objects = self.get_dbt_object_files(
|
||||
mock_manifest=MOCK_SAMPLE_MANIFEST_TEST_NODE
|
||||
)
|
||||
manifest_node = dbt_objects.dbt_manifest.nodes.get(
|
||||
"test.jaffle_shop.unique_orders_order_id.fed79b3a6e"
|
||||
)
|
||||
dbt_test = {
|
||||
"manifest_node": manifest_node,
|
||||
"upstream": ["local_redshift_dbt2.dev.dbt_jaffle.stg_customers"],
|
||||
"results": "",
|
||||
}
|
||||
result = generate_entity_link(dbt_test=dbt_test)
|
||||
self.assertTrue(len(result) > 0)
|
||||
self.assertIn("::columns::order_id>", result[0])
|
||||
|
||||
def test_dbt_generate_entity_link_without_column(self):
|
||||
_, dbt_objects = self.get_dbt_object_files(
|
||||
mock_manifest=MOCK_SAMPLE_MANIFEST_TEST_NODE
|
||||
)
|
||||
manifest_node = dbt_objects.dbt_manifest.nodes.get(
|
||||
"test.jaffle_shop.unique_orders_order_id.fed79b3a6e"
|
||||
)
|
||||
if hasattr(manifest_node, "column_name"):
|
||||
delattr(manifest_node, "column_name")
|
||||
dbt_test = {
|
||||
"manifest_node": manifest_node,
|
||||
"upstream": ["local_redshift_dbt2.dev.dbt_jaffle.stg_customers"],
|
||||
"results": "",
|
||||
}
|
||||
result = generate_entity_link(dbt_test=dbt_test)
|
||||
self.assertTrue(len(result) > 0)
|
||||
self.assertNotIn("::columns::", result[0])
|
||||
self.assertIn(
|
||||
"<#E::table::local_redshift_dbt2.dev.dbt_jaffle.stg_customers>", result[0]
|
||||
)
|
||||
|
||||
def test_dbt_compiled_query(self):
|
||||
expected_query = "sample customers compile code"
|
||||
|
||||
|
||||
@ -5807,10 +5807,23 @@ public abstract class EntityRepository<T extends EntityInterface> {
|
||||
|
||||
// Validate if a given column exists in the table
|
||||
public static void validateColumn(Table table, String columnName) {
|
||||
boolean validColumn =
|
||||
table.getColumns().stream().anyMatch(col -> col.getName().equals(columnName));
|
||||
if (!validColumn && !columnName.equalsIgnoreCase("all")) {
|
||||
throw new IllegalArgumentException("Invalid column name " + columnName);
|
||||
validateColumn(table, columnName, Boolean.TRUE);
|
||||
}
|
||||
|
||||
// Validate if a given column exists in the table with optional case sensitivity
|
||||
public static void validateColumn(Table table, String columnName, Boolean caseSensitive) {
|
||||
if (Boolean.FALSE.equals(caseSensitive)) {
|
||||
boolean validColumn =
|
||||
table.getColumns().stream().anyMatch(col -> col.getName().equalsIgnoreCase(columnName));
|
||||
if (!validColumn && !columnName.equalsIgnoreCase("all")) {
|
||||
throw new IllegalArgumentException("Invalid column name " + columnName);
|
||||
}
|
||||
} else {
|
||||
boolean validColumn =
|
||||
table.getColumns().stream().anyMatch(col -> col.getName().equals(columnName));
|
||||
if (!validColumn && !columnName.equalsIgnoreCase("all")) {
|
||||
throw new IllegalArgumentException("Invalid column name " + columnName);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -618,7 +618,7 @@ public class TestCaseRepository extends EntityRepository<TestCase> {
|
||||
// Validate that the referenced column actually exists in the table
|
||||
if (entityLink.getArrayFieldName() != null) {
|
||||
Table table = Entity.getEntity(entityLink, "columns", ALL);
|
||||
validateColumn(table, entityLink.getArrayFieldName());
|
||||
validateColumn(table, entityLink.getArrayFieldName(), Boolean.FALSE);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -630,7 +630,7 @@ public class TestCaseRepository extends EntityRepository<TestCase> {
|
||||
|
||||
// Validate each dimension column exists in the table
|
||||
for (String dimensionColumn : test.getDimensionColumns()) {
|
||||
validateColumn(table, dimensionColumn);
|
||||
validateColumn(table, dimensionColumn, Boolean.FALSE);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -918,7 +918,7 @@ public class TestCaseRepository extends EntityRepository<TestCase> {
|
||||
// Validate all the columns
|
||||
if (validateColumns) {
|
||||
for (String columnName : tableData.getColumns()) {
|
||||
validateColumn(table, columnName);
|
||||
validateColumn(table, columnName, Boolean.FALSE);
|
||||
}
|
||||
}
|
||||
// Make sure each row has number values for all the columns
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user