dbt performance improvement (#10366)

* dbt performance improvement

* reduced es calls

* Added unit tests
This commit is contained in:
Onkar Ravgan 2023-03-08 11:07:34 +05:30 committed by GitHub
parent b42f457d88
commit ca623d0693
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 416 additions and 796 deletions

View File

@ -39,7 +39,7 @@ class ESMixin(Generic[T]):
fqdn_search = "/search/query?q=fullyQualifiedName:{fqn}&from={from_}&size={size}&index={index}"
@functools.lru_cache()
@functools.lru_cache(maxsize=512)
def _search_es_entity(
self, entity_type: Type[T], query_string: str
) -> Optional[List[T]]:

View File

@ -159,7 +159,7 @@ class MetadataRestSink(Sink[Entity]):
:param datamodel_link: Table ID + Data Model
"""
table: Table = self.metadata.get_by_name(entity=Table, fqn=datamodel_link.fqn)
table: Table = datamodel_link.table_entity
if table:
self.metadata.ingest_table_data_model(
@ -169,10 +169,7 @@ class MetadataRestSink(Sink[Entity]):
f"Successfully ingested DataModel for {table.fullyQualifiedName.__root__}"
)
else:
logger.warning(
f"The table [{datamodel_link.fqn.__root__}] from the manifest file is not found in OM. "
f"Please, check if the table has been ingested previously."
)
logger.warning("Unable to ingest datamodel")
def write_table_location_link(self, table_location_link: TableLocationLink) -> None:
"""

View File

@ -79,7 +79,7 @@ class DataModelLink(BaseModel):
Tmp model to handle data model ingestion
"""
fqn: FullyQualifiedEntityName
table_entity: Table
datamodel: DataModel

View File

@ -75,7 +75,7 @@ from metadata.ingestion.source.database.dbt.dbt_service import (
DbtObjects,
DbtServiceSource,
)
from metadata.utils import fqn
from metadata.utils import entity_link, fqn
from metadata.utils.elasticsearch import get_entity_from_es_result
from metadata.utils.logger import ingestion_logger
@ -199,7 +199,9 @@ class DbtSource(DbtServiceSource): # pylint: disable=too-many-public-methods
f"Service with name {self.config.serviceName} not found"
)
def get_dbt_owner(self, manifest_node: dict, catalog_node: dict) -> Optional[str]:
def get_dbt_owner(
self, manifest_node: dict, catalog_node: Optional[dict]
) -> Optional[str]:
"""
Returns dbt owner
"""
@ -240,7 +242,7 @@ class DbtSource(DbtServiceSource): # pylint: disable=too-many-public-methods
self.metadata,
entity_type=Tag,
classification_name=self.tag_classification_name,
tag_name=tag.replace(".", ""),
tag_name=tag.replace(fqn.FQN_SEPARATOR, ""),
),
labelType=LabelType.Automated,
state=State.Confirmed,
@ -434,58 +436,67 @@ class DbtSource(DbtServiceSource): # pylint: disable=too-many-public-methods
catalog_node = catalog_entities.get(key)
dbt_table_tags_list = None
dbt_model_tag_labels = manifest_node.tags
if dbt_model_tag_labels:
if manifest_node.tags:
dbt_table_tags_list = self.get_dbt_tag_labels(
dbt_model_tag_labels
manifest_node.tags
)
dbt_compiled_query = self.get_dbt_compiled_query(manifest_node)
dbt_raw_query = self.get_dbt_raw_query(manifest_node)
datamodel_path = None
if manifest_node.original_file_path:
if (
hasattr(manifest_node, "root_path")
and manifest_node.root_path
):
datamodel_path = f"{manifest_node.root_path}/{manifest_node.original_file_path}"
else:
datamodel_path = manifest_node.original_file_path
data_model_link = DataModelLink(
fqn=fqn.build(
self.metadata,
entity_type=Table,
service_name=self.config.serviceName,
database_name=self.get_corrected_name(
manifest_node.database
),
schema_name=self.get_corrected_name(manifest_node.schema_),
table_name=model_name,
),
datamodel=DataModel(
modelType=ModelType.DBT,
description=manifest_node.description
if manifest_node.description
else None,
path=datamodel_path,
rawSql=dbt_raw_query if dbt_raw_query else "",
sql=dbt_compiled_query if dbt_compiled_query else "",
columns=self.parse_data_model_columns(
manifest_node, catalog_node
),
upstream=self.parse_upstream_nodes(
manifest_entities, manifest_node
),
owner=self.get_dbt_owner(
manifest_node=manifest_node, catalog_node=catalog_node
),
tags=dbt_table_tags_list,
),
# Get the table entity from ES
# TODO: Change to get_by_name once the postgres case sensitive calls is fixed
table_fqn = fqn.build(
self.metadata,
entity_type=Table,
service_name=self.config.serviceName,
database_name=self.get_corrected_name(manifest_node.database),
schema_name=self.get_corrected_name(manifest_node.schema_),
table_name=model_name,
)
yield data_model_link
self.context.data_model_links.append(data_model_link)
table_entity: Optional[
Union[Table, List[Table]]
] = get_entity_from_es_result(
entity_list=self.metadata.es_search_from_fqn(
entity_type=Table, fqn_search_string=table_fqn
),
fetch_multiple_entities=False,
)
if table_entity:
data_model_link = DataModelLink(
table_entity=table_entity,
datamodel=DataModel(
modelType=ModelType.DBT,
description=manifest_node.description
if manifest_node.description
else None,
path=self.get_data_model_path(
manifest_node=manifest_node
),
rawSql=dbt_raw_query if dbt_raw_query else "",
sql=dbt_compiled_query if dbt_compiled_query else "",
columns=self.parse_data_model_columns(
manifest_node, catalog_node
),
upstream=self.parse_upstream_nodes(
manifest_entities, manifest_node
),
owner=self.get_dbt_owner(
manifest_node=manifest_node,
catalog_node=catalog_node,
),
tags=dbt_table_tags_list,
),
)
yield data_model_link
self.context.data_model_links.append(data_model_link)
else:
logger.warning(
f"Unable to find the table '{table_fqn}' in OpenMetadata"
f"Please check if the table exists is ingested in OpenMetadata"
f"And name, database, schema of the manifest node matches with the table present in OpenMetadata" # pylint: disable=line-too-long
)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(
@ -498,6 +509,17 @@ class DbtSource(DbtServiceSource): # pylint: disable=too-many-public-methods
correct_name = None if name.lower() in NONE_KEYWORDS_LIST else name
return correct_name
def get_data_model_path(self, manifest_node):
datamodel_path = None
if manifest_node.original_file_path:
if hasattr(manifest_node, "root_path") and manifest_node.root_path:
datamodel_path = (
f"{manifest_node.root_path}/{manifest_node.original_file_path}"
)
else:
datamodel_path = manifest_node.original_file_path
return datamodel_path
def parse_upstream_nodes(self, manifest_entities, dbt_node):
"""
Method to fetch the upstream nodes
@ -587,16 +609,11 @@ class DbtSource(DbtServiceSource): # pylint: disable=too-many-public-methods
"""
Method to process DBT lineage from upstream nodes
"""
logger.info(f"Processing DBT lineage for: {data_model_link.fqn.__root__}")
to_entity: Table = data_model_link.table_entity
logger.info(
f"Processing DBT lineage for: {to_entity.fullyQualifiedName.__root__}"
)
# Get the table entity from ES
to_es_result = self.metadata.es_search_from_fqn(
entity_type=Table,
fqn_search_string=data_model_link.fqn.__root__,
)
to_entity: Optional[Union[Table, List[Table]]] = get_entity_from_es_result(
entity_list=to_es_result, fetch_multiple_entities=False
)
for upstream_node in data_model_link.datamodel.upstream:
try:
from_es_result = self.metadata.es_search_from_fqn(
@ -634,11 +651,13 @@ class DbtSource(DbtServiceSource): # pylint: disable=too-many-public-methods
"""
Method to process DBT lineage from queries
"""
table_fqn = data_model_link.fqn.__root__
logger.info(f"Processing DBT Query lineage for: {table_fqn}")
to_entity: Table = data_model_link.table_entity
logger.info(
f"Processing DBT Query lineage for: {to_entity.fullyQualifiedName.__root__}"
)
try:
source_elements = fqn.split(table_fqn)
source_elements = fqn.split(to_entity.fullyQualifiedName.__root__)
# remove service name from fqn to make it parseable in format db.schema.table
query_fqn = fqn._build( # pylint: disable=protected-access
*source_elements[-3:]
@ -671,24 +690,18 @@ class DbtSource(DbtServiceSource): # pylint: disable=too-many-public-methods
"""
Method to process DBT descriptions using patch APIs
"""
logger.info(f"Processing DBT Descriptions for: {data_model_link.fqn.__root__}")
# Get the table entity from ES
to_es_result = self.metadata.es_search_from_fqn(
entity_type=Table,
fqn_search_string=data_model_link.fqn.__root__,
table_entity: Table = data_model_link.table_entity
logger.info(
f"Processing DBT Descriptions for: {table_entity.fullyQualifiedName.__root__}"
)
to_entity: Optional[Union[Table, List[Table]]] = get_entity_from_es_result(
entity_list=to_es_result, fetch_multiple_entities=False
)
if to_entity:
if table_entity:
try:
data_model = data_model_link.datamodel
# Patch table descriptions from DBT
if data_model.description:
self.metadata.patch_description(
entity=Table,
entity_id=to_entity.id,
entity_id=table_entity.id,
description=data_model.description.__root__,
force=self.source_config.dbtUpdateDescriptions,
)
@ -697,7 +710,7 @@ class DbtSource(DbtServiceSource): # pylint: disable=too-many-public-methods
for column in data_model.columns:
if column.description:
self.metadata.patch_column_description(
entity_id=to_entity.id,
entity_id=table_entity.id,
column_name=column.name.__root__,
description=column.description.__root__,
force=self.source_config.dbtUpdateDescriptions,
@ -705,7 +718,7 @@ class DbtSource(DbtServiceSource): # pylint: disable=too-many-public-methods
except Exception as exc: # pylint: disable=broad-except
logger.debug(traceback.format_exc())
logger.warning(
f"Failed to parse the node {data_model_link.fqn.__root__} to update dbt desctiption: {exc}"
f"Failed to parse the node {table_entity.fullyQualifiedName.__root__}to update dbt desctiption: {exc}" # pylint: disable=line-too-long
)
def create_dbt_tests_suite(
@ -784,7 +797,7 @@ class DbtSource(DbtServiceSource): # pylint: disable=too-many-public-methods
f"Processing DBT Test Case Definition for node: {manifest_node.name}"
)
entity_link_list = self.generate_entity_link(dbt_test)
for entity_link in entity_link_list:
for entity_link_str in entity_link_list:
test_suite_name = manifest_node.meta.get(
DbtCommonEnum.TEST_SUITE_NAME.value,
DbtCommonEnum.DBT_TEST_SUITE.value,
@ -795,7 +808,7 @@ class DbtSource(DbtServiceSource): # pylint: disable=too-many-public-methods
testDefinition=FullyQualifiedEntityName(
__root__=manifest_node.name
),
entityLink=entity_link,
entityLink=entity_link_str,
testSuite=FullyQualifiedEntityName(__root__=test_suite_name),
parameterValues=self.create_test_case_parameter_values(
dbt_test
@ -903,16 +916,12 @@ class DbtSource(DbtServiceSource): # pylint: disable=too-many-public-methods
Method returns entity link
"""
manifest_node = dbt_test.get(DbtCommonEnum.MANIFEST_NODE.value)
entity_link_list = []
for table_fqn in dbt_test[DbtCommonEnum.UPSTREAM.value]:
column_name = manifest_node.column_name
if column_name:
entity_link = (
f"<#E::table::" f"{table_fqn}" f"::columns::" f"{column_name}>"
)
else:
entity_link = f"<#E::table::" f"{table_fqn}>"
entity_link_list.append(entity_link)
entity_link_list = [
entity_link.get_entity_link(
table_fqn=table_fqn, column_name=manifest_node.column_name
)
for table_fqn in dbt_test[DbtCommonEnum.UPSTREAM.value]
]
return entity_link_list
def get_dbt_compiled_query(self, mnode) -> Optional[str]:

View File

@ -13,7 +13,7 @@ Handle Entity Link building and splitting logic.
Filter information has been taken from the
ES indexes definitions
"""
from typing import List
from typing import List, Optional
from antlr4.CommonTokenStream import CommonTokenStream
from antlr4.error.ErrorStrategy import BailErrorStrategy
@ -66,3 +66,18 @@ def get_table_fqn(entity_link: str) -> str:
"""
return entity_link.split("::")[2]
def get_entity_link(table_fqn: str, column_name: Optional[str]) -> str:
"""From table fqn and column name get the entity_link
Args:
table_fqn: table fqn
column_name: Optional param to generate entity link with column name
"""
if column_name:
entity_link = f"<#E::table::" f"{table_fqn}" f"::columns::" f"{column_name}>"
else:
entity_link = f"<#E::table::" f"{table_fqn}>"
return entity_link

View File

@ -0,0 +1,121 @@
{
"metadata": {
"dbt_schema_version": "https://schemas.getdbt.com/dbt/manifest/v7.json",
"dbt_version": "1.3.0",
"generated_at": "2023-01-17T19:05:57.859191Z",
"invocation_id": "0c4757bf-0a8f-4f24-a18a-4c2638bf7d8e",
"env": {},
"project_id": "06e5b98c2db46f8a72cc4f66410e9b3b",
"user_id": null,
"send_anonymous_usage_stats": true,
"adapter_type": "redshift"
},
"nodes": {
"test.jaffle_shop.unique_orders_order_id.fed79b3a6e": {
"test_metadata": {
"name": "unique",
"kwargs": {
"column_name": "order_id",
"model": "{{ get_where_subquery(ref('orders')) }}"
},
"namespace": null
},
"compiled": true,
"resource_type": "test",
"depends_on": {
"macros": [
"macro.dbt.test_unique",
"macro.dbt.get_where_subquery"
],
"nodes": [
"model.jaffle_shop.orders"
]
},
"config": {
"enabled": true,
"alias": null,
"schema": "dbt_test__audit",
"database": null,
"tags": [],
"meta": {
"test_suite_name": "DBT TEST SUITE",
"test_suite_desciption": "Dbt test suite description"
},
"materialized": "test",
"severity": "ERROR",
"store_failures": null,
"where": null,
"limit": null,
"fail_calc": "count(*)",
"warn_if": "!= 0",
"error_if": "!= 0"
},
"database": "dev",
"schema": "dbt_jaffle_dbt_test__audit",
"fqn": [
"jaffle_shop",
"unique_orders_order_id"
],
"unique_id": "test.jaffle_shop.unique_orders_order_id.fed79b3a6e",
"raw_code": "{{ test_unique(**_dbt_generic_test_kwargs) }}",
"language": "sql",
"package_name": "jaffle_shop",
"root_path": "/Users/onkarravgan/Desktop/project/jaffle_shop",
"path": "unique_orders_order_id.sql",
"original_file_path": "models/schema.yml",
"name": "unique_orders_order_id",
"alias": "unique_orders_order_id",
"checksum": {
"name": "none",
"checksum": ""
},
"tags": [],
"refs": [
[
"orders"
]
],
"sources": [],
"metrics": [],
"description": "",
"columns": {},
"meta": {
"test_suite_name": "DBT TEST SUITE",
"test_suite_desciption": "Dbt test suite description"
},
"docs": {
"show": true,
"node_color": null
},
"patch_path": null,
"compiled_path": "target/compiled/jaffle_shop/models/schema.yml/unique_orders_order_id.sql",
"build_path": null,
"deferred": false,
"unrendered_config": {
"meta": {
"test_suite_name": "DBT TEST SUITE",
"test_suite_desciption": "Dbt test suite description"
}
},
"created_at": 1673982251.748683,
"compiled_code": "\n \n \n\nselect\n order_id as unique_field,\n count(*) as n_records\n\nfrom \"dev\".\"dbt_jaffle\".\"orders\"\nwhere order_id is not null\ngroup by order_id\nhaving count(*) > 1\n\n\n",
"extra_ctes_injected": true,
"extra_ctes": [],
"relation_name": null,
"column_name": "order_id",
"file_key_name": "models.orders"
}
},
"sources": {},
"macros": {
},
"docs": {
},
"exposures": {},
"metrics": {},
"selectors": {},
"disabled": {},
"parent_map": {},
"child_map": {}
}

View File

@ -136,201 +136,6 @@
"extra_ctes_injected": true,
"extra_ctes": [],
"relation_name": "\"dev\".\"dbt_jaffle\".\"customers\""
},
"model.jaffle_shop.orders": {
"compiled": true,
"resource_type": "model",
"depends_on": {
"macros": [],
"nodes": [
"model.jaffle_shop.stg_orders",
"model.jaffle_shop.stg_payments"
]
},
"config": {
"enabled": true,
"alias": null,
"schema": null,
"database": null,
"tags": [
"single_tag"
],
"meta": {
"owner": "aaron_johnson0"
},
"materialized": "table",
"incremental_strategy": null,
"persist_docs": {},
"quoting": {},
"column_types": {},
"full_refresh": null,
"unique_key": null,
"on_schema_change": "ignore",
"grants": {},
"packages": [],
"docs": {
"show": true
},
"post-hook": [],
"pre-hook": []
},
"database": "dev",
"schema": "dbt_jaffle",
"fqn": [
"jaffle_shop",
"orders"
],
"unique_id": "model.jaffle_shop.orders",
"raw_sql": "sample raw orders code",
"package_name": "jaffle_shop",
"root_path": "sample/orders/root/path",
"path": "orders.sql",
"original_file_path": "models/orders.sql",
"name": "orders",
"alias": "orders",
"checksum": {
"name": "sha256",
"checksum": "53950235d8e29690d259e95ee49bda6a5b7911b44c739b738a646dc6014bcfcd"
},
"tags": [
"single_tag"
],
"refs": [
[
"stg_orders"
],
[
"stg_payments"
]
],
"sources": [],
"description": "This table has basic information about orders, as well as some derived facts based on payments",
"columns": {
"order_id": {
"name": "order_id",
"description": "This is a unique identifier for an order",
"meta": {},
"data_type": null,
"quote": null,
"tags": []
},
"customer_id": {
"name": "customer_id",
"description": "Foreign key to the customers table",
"meta": {},
"data_type": null,
"quote": null,
"tags": []
}
},
"meta": {
"owner": "aaron_johnson0"
},
"docs": {
"show": true
},
"patch_path": "jaffle_shop://models/schema.yml",
"compiled_path": "target/compiled/jaffle_shop/models/orders.sql",
"build_path": null,
"deferred": false,
"unrendered_config": {
"materialized": "table",
"tags": "single_tag",
"meta": {
"owner": "aaron_johnson0"
}
},
"created_at": 1673982251.742371,
"compiled_sql": "sample compiled code",
"extra_ctes_injected": true,
"extra_ctes": [],
"relation_name": "\"dev\".\"dbt_jaffle\".\"orders\""
},
"model.jaffle_shop.stg_customers": {
"compiled": true,
"resource_type": "model",
"depends_on": {
"macros": [],
"nodes": [
"seed.jaffle_shop.raw_customers"
]
},
"config": {
"enabled": true,
"alias": null,
"schema": null,
"database": null,
"tags": [],
"meta": {},
"materialized": "view",
"incremental_strategy": null,
"persist_docs": {},
"quoting": {},
"column_types": {},
"full_refresh": null,
"unique_key": null,
"on_schema_change": "ignore",
"grants": {},
"packages": [],
"docs": {
"show": true
},
"post-hook": [],
"pre-hook": []
},
"database": "dev",
"schema": "dbt_jaffle",
"fqn": [
"jaffle_shop",
"staging",
"stg_customers"
],
"unique_id": "model.jaffle_shop.stg_customers",
"raw_sql": "sample stg_customers raw_code",
"package_name": "jaffle_shop",
"root_path": "sample/stg_customers/root/path",
"path": "staging/stg_customers.sql",
"original_file_path": "models/staging/stg_customers.sql",
"name": "stg_customers",
"alias": "stg_customers",
"checksum": {
"name": "sha256",
"checksum": "6f18a29204dad1de6dbb0c288144c4990742e0a1e065c3b2a67b5f98334c22ba"
},
"tags": [],
"refs": [
[
"raw_customers"
]
],
"sources": [],
"description": "",
"columns": {
"customer_id": {
"name": "customer_id",
"description": "This is a unique identifier for an customer",
"meta": {},
"data_type": null,
"quote": null,
"tags": []
}
},
"meta": {},
"docs": {
"show": true
},
"patch_path": "jaffle_shop://models/staging/schema.yml",
"compiled_path": "target/compiled/jaffle_shop/models/staging/stg_customers.sql",
"build_path": null,
"deferred": false,
"unrendered_config": {
"materialized": "view"
},
"created_at": 1673978228.757611,
"compiled_sql": "sample stg_customers compiled code",
"extra_ctes_injected": true,
"extra_ctes": [],
"relation_name": "\"dev\".\"dbt_jaffle\".\"stg_customers\""
}
},
"sources": {},

View File

@ -140,209 +140,6 @@
"extra_ctes_injected": true,
"extra_ctes": [],
"relation_name": "\"dev\".\"dbt_jaffle\".\"customers\""
},
"model.jaffle_shop.orders": {
"compiled": true,
"resource_type": "model",
"depends_on": {
"macros": [],
"nodes": [
"model.jaffle_shop.stg_orders",
"model.jaffle_shop.stg_payments"
]
},
"config": {
"enabled": true,
"alias": null,
"schema": null,
"database": null,
"tags": [
"single_tag"
],
"meta": {
"owner": "aaron_johnson0"
},
"materialized": "table",
"incremental_strategy": null,
"persist_docs": {},
"quoting": {},
"column_types": {},
"full_refresh": null,
"unique_key": null,
"on_schema_change": "ignore",
"grants": {},
"packages": [],
"docs": {
"show": true,
"node_color": null
},
"post-hook": [],
"pre-hook": []
},
"database": "dev",
"schema": "dbt_jaffle",
"fqn": [
"jaffle_shop",
"orders"
],
"unique_id": "model.jaffle_shop.orders",
"raw_code": "sample raw orders code",
"language": "sql",
"package_name": "jaffle_shop",
"root_path": "sample/orders/root/path",
"path": "orders.sql",
"original_file_path": "models/orders.sql",
"name": "orders",
"alias": "orders",
"checksum": {
"name": "sha256",
"checksum": "53950235d8e29690d259e95ee49bda6a5b7911b44c739b738a646dc6014bcfcd"
},
"tags": [
"single_tag"
],
"refs": [
[
"stg_orders"
],
[
"stg_payments"
]
],
"sources": [],
"metrics": [],
"description": "This table has basic information about orders, as well as some derived facts based on payments",
"columns": {
"order_id": {
"name": "order_id",
"description": "This is a unique identifier for an order",
"meta": {},
"data_type": null,
"quote": null,
"tags": []
},
"customer_id": {
"name": "customer_id",
"description": "Foreign key to the customers table",
"meta": {},
"data_type": null,
"quote": null,
"tags": []
}
},
"meta": {
"owner": "aaron_johnson0"
},
"docs": {
"show": true,
"node_color": null
},
"patch_path": "jaffle_shop://models/schema.yml",
"compiled_path": "target/compiled/jaffle_shop/models/orders.sql",
"build_path": null,
"deferred": false,
"unrendered_config": {
"materialized": "table",
"tags": "single_tag",
"meta": {
"owner": "aaron_johnson0"
}
},
"created_at": 1673982251.742371,
"compiled_code": "sample compiled code",
"extra_ctes_injected": true,
"extra_ctes": [],
"relation_name": "\"dev\".\"dbt_jaffle\".\"orders\""
},
"model.jaffle_shop.stg_customers": {
"compiled": true,
"resource_type": "model",
"depends_on": {
"macros": [],
"nodes": [
"seed.jaffle_shop.raw_customers"
]
},
"config": {
"enabled": true,
"alias": null,
"schema": null,
"database": null,
"tags": [],
"meta": {},
"materialized": "view",
"incremental_strategy": null,
"persist_docs": {},
"quoting": {},
"column_types": {},
"full_refresh": null,
"unique_key": null,
"on_schema_change": "ignore",
"grants": {},
"packages": [],
"docs": {
"show": true,
"node_color": null
},
"post-hook": [],
"pre-hook": []
},
"database": "dev",
"schema": "dbt_jaffle",
"fqn": [
"jaffle_shop",
"staging",
"stg_customers"
],
"unique_id": "model.jaffle_shop.stg_customers",
"raw_code": "sample stg_customers raw_code",
"language": "sql",
"package_name": "jaffle_shop",
"root_path": "sample/stg_customers/root/path",
"path": "staging/stg_customers.sql",
"original_file_path": "models/staging/stg_customers.sql",
"name": "stg_customers",
"alias": "stg_customers",
"checksum": {
"name": "sha256",
"checksum": "6f18a29204dad1de6dbb0c288144c4990742e0a1e065c3b2a67b5f98334c22ba"
},
"tags": [],
"refs": [
[
"raw_customers"
]
],
"sources": [],
"metrics": [],
"description": "",
"columns": {
"customer_id": {
"name": "customer_id",
"description": "This is a unique identifier for an customer",
"meta": {},
"data_type": null,
"quote": null,
"tags": []
}
},
"meta": {},
"docs": {
"show": true,
"node_color": null
},
"patch_path": "jaffle_shop://models/staging/schema.yml",
"compiled_path": "target/compiled/jaffle_shop/models/staging/stg_customers.sql",
"build_path": null,
"deferred": false,
"unrendered_config": {
"materialized": "view"
},
"created_at": 1673978228.757611,
"compiled_code": "sample stg_customers compiled code",
"extra_ctes_injected": true,
"extra_ctes": [],
"relation_name": "\"dev\".\"dbt_jaffle\".\"stg_customers\""
}
},
"sources": {},

View File

@ -139,207 +139,6 @@
"extra_ctes_injected": true,
"extra_ctes": [],
"relation_name": "\"dev\".\"dbt_jaffle\".\"customers\""
},
"model.jaffle_shop.orders": {
"compiled": true,
"resource_type": "model",
"depends_on": {
"macros": [],
"nodes": [
"model.jaffle_shop.stg_orders",
"model.jaffle_shop.stg_payments"
]
},
"config": {
"enabled": true,
"alias": null,
"schema": null,
"database": null,
"tags": [
"single_tag"
],
"meta": {
"owner": "aaron_johnson0"
},
"materialized": "table",
"incremental_strategy": null,
"persist_docs": {},
"quoting": {},
"column_types": {},
"full_refresh": null,
"unique_key": null,
"on_schema_change": "ignore",
"grants": {},
"packages": [],
"docs": {
"show": true,
"node_color": null
},
"post-hook": [],
"pre-hook": []
},
"database": "dev",
"schema": "dbt_jaffle",
"fqn": [
"jaffle_shop",
"orders"
],
"unique_id": "model.jaffle_shop.orders",
"raw_code": "sample raw orders code",
"language": "sql",
"package_name": "jaffle_shop",
"path": "orders.sql",
"original_file_path": "sample/orders/root/path/models/orders.sql",
"name": "orders",
"alias": "orders",
"checksum": {
"name": "sha256",
"checksum": "53950235d8e29690d259e95ee49bda6a5b7911b44c739b738a646dc6014bcfcd"
},
"tags": [
"single_tag"
],
"refs": [
[
"stg_orders"
],
[
"stg_payments"
]
],
"sources": [],
"metrics": [],
"description": "This table has basic information about orders, as well as some derived facts based on payments",
"columns": {
"order_id": {
"name": "order_id",
"description": "This is a unique identifier for an order",
"meta": {},
"data_type": null,
"quote": null,
"tags": []
},
"customer_id": {
"name": "customer_id",
"description": "Foreign key to the customers table",
"meta": {},
"data_type": null,
"quote": null,
"tags": []
}
},
"meta": {
"owner": "aaron_johnson0"
},
"docs": {
"show": true,
"node_color": null
},
"patch_path": "jaffle_shop://models/schema.yml",
"compiled_path": "target/compiled/jaffle_shop/models/orders.sql",
"build_path": null,
"deferred": false,
"unrendered_config": {
"materialized": "table",
"tags": "single_tag",
"meta": {
"owner": "aaron_johnson0"
}
},
"created_at": 1673982251.742371,
"compiled_code": "sample compiled code",
"extra_ctes_injected": true,
"extra_ctes": [],
"relation_name": "\"dev\".\"dbt_jaffle\".\"orders\""
},
"model.jaffle_shop.stg_customers": {
"compiled": true,
"resource_type": "model",
"depends_on": {
"macros": [],
"nodes": [
"seed.jaffle_shop.raw_customers"
]
},
"config": {
"enabled": true,
"alias": null,
"schema": null,
"database": null,
"tags": [],
"meta": {},
"materialized": "view",
"incremental_strategy": null,
"persist_docs": {},
"quoting": {},
"column_types": {},
"full_refresh": null,
"unique_key": null,
"on_schema_change": "ignore",
"grants": {},
"packages": [],
"docs": {
"show": true,
"node_color": null
},
"post-hook": [],
"pre-hook": []
},
"database": "dev",
"schema": "dbt_jaffle",
"fqn": [
"jaffle_shop",
"staging",
"stg_customers"
],
"unique_id": "model.jaffle_shop.stg_customers",
"raw_code": "sample stg_customers raw_code",
"language": "sql",
"package_name": "jaffle_shop",
"path": "staging/stg_customers.sql",
"original_file_path": "sample/stg_customers/root/path/models/staging/stg_customers.sql",
"name": "stg_customers",
"alias": "stg_customers",
"checksum": {
"name": "sha256",
"checksum": "6f18a29204dad1de6dbb0c288144c4990742e0a1e065c3b2a67b5f98334c22ba"
},
"tags": [],
"refs": [
[
"raw_customers"
]
],
"sources": [],
"metrics": [],
"description": "",
"columns": {
"customer_id": {
"name": "customer_id",
"description": "This is a unique identifier for an customer",
"meta": {},
"data_type": null,
"quote": null,
"tags": []
}
},
"meta": {},
"docs": {
"show": true,
"node_color": null
},
"patch_path": "jaffle_shop://models/staging/schema.yml",
"compiled_path": "target/compiled/jaffle_shop/models/staging/stg_customers.sql",
"build_path": null,
"deferred": false,
"unrendered_config": {
"materialized": "view"
},
"created_at": 1673978228.757611,
"compiled_code": "sample stg_customers compiled code",
"extra_ctes_injected": true,
"extra_ctes": [],
"relation_name": "\"dev\".\"dbt_jaffle\".\"stg_customers\""
}
},
"sources": {},

View File

@ -12,11 +12,17 @@ from dbt_artifacts_parser.parser import parse_catalog, parse_manifest, parse_run
from pydantic import AnyUrl
from metadata.generated.schema.entity.data.table import Column, DataModel, Table
from metadata.generated.schema.entity.teams.user import User
from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataWorkflowConfig,
)
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.generated.schema.type.tagLabel import TagLabel
from metadata.generated.schema.type.tagLabel import (
LabelType,
State,
TagLabel,
TagSource,
)
from metadata.ingestion.source.database.database_service import DataModelLink
from metadata.ingestion.source.database.dbt.metadata import DbtSource
from metadata.utils.dbt_config import DbtFiles, DbtObjects
@ -61,6 +67,8 @@ MOCK_SAMPLE_MANIFEST_V8 = "resources/datasets/manifest_v8.json"
MOCK_SAMPLE_MANIFEST_NULL_DB = "resources/datasets/manifest_null_db.json"
MOCK_SAMPLE_MANIFEST_TEST_NODE = "resources/datasets/manifest_test_node.json"
EXPECTED_DATA_MODEL_FQNS = [
"dbt_test.dev.dbt_jaffle.customers",
@ -76,7 +84,7 @@ EXPECTED_DATA_MODELS = [
path="sample/customers/root/path/models/customers.sql",
rawSql="sample customers raw code",
sql="sample customers compile code",
upstream=["dbt_test.dev.dbt_jaffle.stg_customers"],
upstream=[],
owner=EntityReference(
id="cb2a92f5-e935-4ad7-911c-654280046538",
type="user",
@ -133,95 +141,7 @@ EXPECTED_DATA_MODELS = [
),
],
generatedAt=None,
),
DataModel(
modelType="DBT",
description="This table has basic information about orders, as well as some derived facts based on payments",
path="sample/orders/root/path/models/orders.sql",
rawSql="sample raw orders code",
sql="sample compiled code",
upstream=[],
owner=EntityReference(
id="cb2a92f5-e935-4ad7-911c-654280046538",
type="user",
name=None,
fullyQualifiedName="aaron_johnson0",
description=None,
displayName=None,
deleted=None,
href=AnyUrl(
"http://localhost:8585/api/v1/users/cb2a92f5-e935-4ad7-911c-654280046538",
scheme="http",
host="localhost",
host_type="int_domain",
port="8585",
path="/api/v1/users/cb2a92f5-e935-4ad7-911c-654280046538",
),
),
tags=[
TagLabel(
tagFQN="dbtTags.single_tag",
description=None,
source="Tag",
labelType="Automated",
state="Confirmed",
href=None,
)
],
columns=[
Column(
name="order_id",
displayName=None,
dataType="VARCHAR",
dataLength=1,
description="This is a unique identifier for an order",
),
Column(
name="customer_id",
displayName=None,
dataType="VARCHAR",
dataLength=1,
description="Foreign key to the customers table",
),
],
generatedAt=None,
),
DataModel(
modelType="DBT",
description=None,
path="sample/stg_customers/root/path/models/staging/stg_customers.sql",
rawSql="sample stg_customers raw_code",
sql="sample stg_customers compiled code",
upstream=[],
owner=EntityReference(
id="cb2a92f5-e935-4ad7-911c-654280046538",
type="user",
name=None,
fullyQualifiedName="aaron_johnson0",
description=None,
displayName=None,
deleted=None,
href=AnyUrl(
"http://localhost:8585/api/v1/users/cb2a92f5-e935-4ad7-911c-654280046538",
scheme="http",
host="localhost",
host_type="int_domain",
port="8585",
path="/api/v1/users/cb2a92f5-e935-4ad7-911c-654280046538",
),
),
tags=None,
columns=[
Column(
name="customer_id",
displayName=None,
dataType="VARCHAR",
dataLength=1,
description="This is a unique identifier for an customer",
)
],
generatedAt=None,
),
)
]
EXPECTED_DATA_MODEL_NULL_DB = [
@ -281,14 +201,56 @@ MOCK_OWNER = EntityReference(
),
)
MOCK_USER = [
User(
id=uuid.uuid4(),
name="aaron_johnson0",
email="aaron_johnson0@gmail.com",
href="http://localhost:8585/api/v1/users/d96eccb9-9a9b-40ad-9585-0a8a71665c51",
fullyQualifiedName="aaron_johnson0",
)
]
MOCK_TABLE_ENTITIES = [
Table(
id=uuid.uuid4(),
name="customers",
databaseSchema=EntityReference(id=uuid.uuid4(), type="databaseSchema"),
fullyQualifiedName="dbt_test.dev.dbt_jaffle.customers",
columns=[],
)
]
MOCK_NULL_DB_TABLE = [
Table(
id=uuid.uuid4(),
name="test",
name="customers_null_db",
databaseSchema=EntityReference(id=uuid.uuid4(), type="databaseSchema"),
fullyQualifiedName="dbt_test.dev.dbt_jaffle.customers_null_db",
columns=[],
)
),
]
MOCK_TAG_LABELS = [
TagLabel(
tagFQN="dbtTags.tag1",
labelType=LabelType.Automated,
state=State.Confirmed,
source=TagSource.Tag,
),
TagLabel(
tagFQN="dbtTags.tag2name",
labelType=LabelType.Automated,
state=State.Confirmed,
source=TagSource.Tag,
),
TagLabel(
tagFQN="dbtTags.tag3",
labelType=LabelType.Automated,
state=State.Confirmed,
source=TagSource.Tag,
),
]
@ -309,29 +271,35 @@ class DbtUnitTest(TestCase):
)
@patch("metadata.ingestion.source.database.dbt.metadata.DbtSource.get_dbt_owner")
def test_dbt_manifest_v4_v5_v6(self, get_dbt_owner):
@patch("metadata.ingestion.ometa.mixins.es_mixin.ESMixin.es_search_from_fqn")
def test_dbt_manifest_v4_v5_v6(self, es_search_from_fqn, get_dbt_owner):
get_dbt_owner.return_value = MOCK_OWNER
es_search_from_fqn.side_effect = MOCK_TABLE_ENTITIES
self.execute_test(
MOCK_SAMPLE_MANIFEST_V4_V5_V6,
expected_records=4,
expected_records=2,
expected_data_models=EXPECTED_DATA_MODELS,
)
@patch("metadata.ingestion.source.database.dbt.metadata.DbtSource.get_dbt_owner")
def test_dbt_manifest_v7(self, get_dbt_owner):
@patch("metadata.ingestion.ometa.mixins.es_mixin.ESMixin.es_search_from_fqn")
def test_dbt_manifest_v7(self, es_search_from_fqn, get_dbt_owner):
get_dbt_owner.return_value = MOCK_OWNER
es_search_from_fqn.side_effect = MOCK_TABLE_ENTITIES
self.execute_test(
MOCK_SAMPLE_MANIFEST_V7,
expected_records=4,
expected_records=2,
expected_data_models=EXPECTED_DATA_MODELS,
)
@patch("metadata.ingestion.source.database.dbt.metadata.DbtSource.get_dbt_owner")
def test_dbt_manifest_v8(self, get_dbt_owner):
@patch("metadata.ingestion.ometa.mixins.es_mixin.ESMixin.es_search_from_fqn")
def test_dbt_manifest_v8(self, es_search_from_fqn, get_dbt_owner):
get_dbt_owner.return_value = MOCK_OWNER
es_search_from_fqn.return_value = MOCK_TABLE_ENTITIES
self.execute_test(
MOCK_SAMPLE_MANIFEST_V8,
expected_records=4,
expected_records=2,
expected_data_models=EXPECTED_DATA_MODELS,
)
@ -346,7 +314,116 @@ class DbtUnitTest(TestCase):
expected_data_models=EXPECTED_DATA_MODEL_NULL_DB,
)
def test_dbt_get_corrected_name(self):
self.assertEqual(
"dbt_jaffle", self.dbt_source_obj.get_corrected_name(name="dbt_jaffle")
)
self.assertIsNone(self.dbt_source_obj.get_corrected_name(name="None"))
self.assertIsNone(self.dbt_source_obj.get_corrected_name(name="null"))
self.assertIsNotNone(self.dbt_source_obj.get_corrected_name(name="dev"))
def test_dbt_get_dbt_tag_labels(self):
result = self.dbt_source_obj.get_dbt_tag_labels(["tag1", "tag2.name", "tag3"])
self.assertListEqual(result, MOCK_TAG_LABELS)
def test_dbt_get_data_model_path(self):
_, dbt_objects = self.get_dbt_object_files(
mock_manifest=MOCK_SAMPLE_MANIFEST_V8
)
manifest_node = dbt_objects.dbt_manifest.nodes.get(
"model.jaffle_shop.customers"
)
result = self.dbt_source_obj.get_data_model_path(manifest_node=manifest_node)
self.assertEqual("sample/customers/root/path/models/customers.sql", result)
def test_dbt_generate_entity_link(self):
_, dbt_objects = self.get_dbt_object_files(
mock_manifest=MOCK_SAMPLE_MANIFEST_TEST_NODE
)
manifest_node = dbt_objects.dbt_manifest.nodes.get(
"test.jaffle_shop.unique_orders_order_id.fed79b3a6e"
)
dbt_test = {
"manifest_node": manifest_node,
"upstream": ["local_redshift_dbt2.dev.dbt_jaffle.stg_customers"],
"results": "",
}
result = self.dbt_source_obj.generate_entity_link(dbt_test=dbt_test)
self.assertListEqual(
[
"<#E::table::local_redshift_dbt2.dev.dbt_jaffle.stg_customers::columns::order_id>"
],
result,
)
def test_dbt_compiled_query(self):
expected_query = "sample customers compile code"
# Test the compiled queries with v8 manifest
_, dbt_objects = self.get_dbt_object_files(
mock_manifest=MOCK_SAMPLE_MANIFEST_V8
)
manifest_node = dbt_objects.dbt_manifest.nodes.get(
"model.jaffle_shop.customers"
)
result = self.dbt_source_obj.get_dbt_compiled_query(mnode=manifest_node)
self.assertEqual(expected_query, result)
# Test the compiled queries with v4 v5 v6 manifest
_, dbt_objects = self.get_dbt_object_files(
mock_manifest=MOCK_SAMPLE_MANIFEST_V4_V5_V6
)
manifest_node = dbt_objects.dbt_manifest.nodes.get(
"model.jaffle_shop.customers"
)
result = self.dbt_source_obj.get_dbt_compiled_query(mnode=manifest_node)
self.assertEqual(expected_query, result)
def test_dbt_raw_query(self):
expected_query = "sample customers raw code"
# Test the raw queries with v8 manifest
_, dbt_objects = self.get_dbt_object_files(
mock_manifest=MOCK_SAMPLE_MANIFEST_V8
)
manifest_node = dbt_objects.dbt_manifest.nodes.get(
"model.jaffle_shop.customers"
)
result = self.dbt_source_obj.get_dbt_raw_query(mnode=manifest_node)
self.assertEqual(expected_query, result)
# Test the raw queries with v4 v5 v6 manifest
_, dbt_objects = self.get_dbt_object_files(
mock_manifest=MOCK_SAMPLE_MANIFEST_V4_V5_V6
)
manifest_node = dbt_objects.dbt_manifest.nodes.get(
"model.jaffle_shop.customers"
)
result = self.dbt_source_obj.get_dbt_raw_query(mnode=manifest_node)
self.assertEqual(expected_query, result)
@patch("metadata.ingestion.ometa.mixins.es_mixin.ESMixin.es_search_from_fqn")
def test_dbt_owner(self, es_search_from_fqn):
es_search_from_fqn.return_value = MOCK_USER
_, dbt_objects = self.get_dbt_object_files(
mock_manifest=MOCK_SAMPLE_MANIFEST_V8
)
manifest_node = dbt_objects.dbt_manifest.nodes.get(
"model.jaffle_shop.customers"
)
result = self.dbt_source_obj.get_dbt_owner(
manifest_node=manifest_node, catalog_node=None
)
self.assertEqual("aaron_johnson0", result.fullyQualifiedName)
def execute_test(self, mock_manifest, expected_records, expected_data_models):
dbt_files, dbt_objects = self.get_dbt_object_files(mock_manifest)
self.check_dbt_validate(dbt_files=dbt_files, expected_records=expected_records)
self.check_yield_datamodel(
dbt_objects=dbt_objects, expected_data_models=expected_data_models
)
def get_dbt_object_files(self, mock_manifest):
mock_file_path = Path(__file__).parent / mock_manifest
with open(mock_file_path) as file:
mock_data: dict = json.load(file)
@ -360,10 +437,7 @@ class DbtUnitTest(TestCase):
if dbt_files.dbt_run_results
else None,
)
self.check_dbt_validate(dbt_files=dbt_files, expected_records=expected_records)
self.check_yield_datamodel(
dbt_objects=dbt_objects, expected_data_models=expected_data_models
)
return dbt_files, dbt_objects
def check_dbt_validate(self, dbt_files, expected_records):
with self.assertLogs() as captured:
@ -380,7 +454,10 @@ class DbtUnitTest(TestCase):
)
for data_model_link in yield_data_models:
if isinstance(data_model_link, DataModelLink):
self.assertIn(data_model_link.fqn.__root__, EXPECTED_DATA_MODEL_FQNS)
self.assertIn(
data_model_link.table_entity.fullyQualifiedName.__root__,
EXPECTED_DATA_MODEL_FQNS,
)
data_model_list.append(data_model_link.datamodel)
for _, (exptected, original) in enumerate(