From ca623d06935401a58400b257a09d6cde9af98aaf Mon Sep 17 00:00:00 2001 From: Onkar Ravgan Date: Wed, 8 Mar 2023 11:07:34 +0530 Subject: [PATCH] dbt performance improvement (#10366) * dbt performance improvement * reduced es calls * Added unit tests --- .../ingestion/ometa/mixins/es_mixin.py | 2 +- .../metadata/ingestion/sink/metadata_rest.py | 7 +- .../source/database/database_service.py | 2 +- .../ingestion/source/database/dbt/metadata.py | 179 +++++------ ingestion/src/metadata/utils/entity_link.py | 17 +- .../datasets/manifest_test_node.json | 121 ++++++++ .../resources/datasets/manifest_v4_v5_v6.json | 195 ------------ .../unit/resources/datasets/manifest_v7.json | 203 ------------- .../unit/resources/datasets/manifest_v8.json | 201 ------------ ingestion/tests/unit/test_dbt.py | 285 +++++++++++------- 10 files changed, 416 insertions(+), 796 deletions(-) create mode 100644 ingestion/tests/unit/resources/datasets/manifest_test_node.json diff --git a/ingestion/src/metadata/ingestion/ometa/mixins/es_mixin.py b/ingestion/src/metadata/ingestion/ometa/mixins/es_mixin.py index 55e0fab6915..023140945f2 100644 --- a/ingestion/src/metadata/ingestion/ometa/mixins/es_mixin.py +++ b/ingestion/src/metadata/ingestion/ometa/mixins/es_mixin.py @@ -39,7 +39,7 @@ class ESMixin(Generic[T]): fqdn_search = "/search/query?q=fullyQualifiedName:{fqn}&from={from_}&size={size}&index={index}" - @functools.lru_cache() + @functools.lru_cache(maxsize=512) def _search_es_entity( self, entity_type: Type[T], query_string: str ) -> Optional[List[T]]: diff --git a/ingestion/src/metadata/ingestion/sink/metadata_rest.py b/ingestion/src/metadata/ingestion/sink/metadata_rest.py index e2e72099ffc..c3c9fa4e2a4 100644 --- a/ingestion/src/metadata/ingestion/sink/metadata_rest.py +++ b/ingestion/src/metadata/ingestion/sink/metadata_rest.py @@ -159,7 +159,7 @@ class MetadataRestSink(Sink[Entity]): :param datamodel_link: Table ID + Data Model """ - table: Table = self.metadata.get_by_name(entity=Table, fqn=datamodel_link.fqn) + table: Table = datamodel_link.table_entity if table: self.metadata.ingest_table_data_model( @@ -169,10 +169,7 @@ class MetadataRestSink(Sink[Entity]): f"Successfully ingested DataModel for {table.fullyQualifiedName.__root__}" ) else: - logger.warning( - f"The table [{datamodel_link.fqn.__root__}] from the manifest file is not found in OM. " - f"Please, check if the table has been ingested previously." - ) + logger.warning("Unable to ingest datamodel") def write_table_location_link(self, table_location_link: TableLocationLink) -> None: """ diff --git a/ingestion/src/metadata/ingestion/source/database/database_service.py b/ingestion/src/metadata/ingestion/source/database/database_service.py index 9b1a036b537..981625e073e 100644 --- a/ingestion/src/metadata/ingestion/source/database/database_service.py +++ b/ingestion/src/metadata/ingestion/source/database/database_service.py @@ -79,7 +79,7 @@ class DataModelLink(BaseModel): Tmp model to handle data model ingestion """ - fqn: FullyQualifiedEntityName + table_entity: Table datamodel: DataModel diff --git a/ingestion/src/metadata/ingestion/source/database/dbt/metadata.py b/ingestion/src/metadata/ingestion/source/database/dbt/metadata.py index b59746fc14a..b2a76eb4568 100644 --- a/ingestion/src/metadata/ingestion/source/database/dbt/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/dbt/metadata.py @@ -75,7 +75,7 @@ from metadata.ingestion.source.database.dbt.dbt_service import ( DbtObjects, DbtServiceSource, ) -from metadata.utils import fqn +from metadata.utils import entity_link, fqn from metadata.utils.elasticsearch import get_entity_from_es_result from metadata.utils.logger import ingestion_logger @@ -199,7 +199,9 @@ class DbtSource(DbtServiceSource): # pylint: disable=too-many-public-methods f"Service with name {self.config.serviceName} not found" ) - def get_dbt_owner(self, manifest_node: dict, catalog_node: dict) -> Optional[str]: + def get_dbt_owner( + self, manifest_node: dict, catalog_node: Optional[dict] + ) -> Optional[str]: """ Returns dbt owner """ @@ -240,7 +242,7 @@ class DbtSource(DbtServiceSource): # pylint: disable=too-many-public-methods self.metadata, entity_type=Tag, classification_name=self.tag_classification_name, - tag_name=tag.replace(".", ""), + tag_name=tag.replace(fqn.FQN_SEPARATOR, ""), ), labelType=LabelType.Automated, state=State.Confirmed, @@ -434,58 +436,67 @@ class DbtSource(DbtServiceSource): # pylint: disable=too-many-public-methods catalog_node = catalog_entities.get(key) dbt_table_tags_list = None - dbt_model_tag_labels = manifest_node.tags - if dbt_model_tag_labels: + if manifest_node.tags: dbt_table_tags_list = self.get_dbt_tag_labels( - dbt_model_tag_labels + manifest_node.tags ) dbt_compiled_query = self.get_dbt_compiled_query(manifest_node) dbt_raw_query = self.get_dbt_raw_query(manifest_node) - datamodel_path = None - if manifest_node.original_file_path: - if ( - hasattr(manifest_node, "root_path") - and manifest_node.root_path - ): - datamodel_path = f"{manifest_node.root_path}/{manifest_node.original_file_path}" - else: - datamodel_path = manifest_node.original_file_path - - data_model_link = DataModelLink( - fqn=fqn.build( - self.metadata, - entity_type=Table, - service_name=self.config.serviceName, - database_name=self.get_corrected_name( - manifest_node.database - ), - schema_name=self.get_corrected_name(manifest_node.schema_), - table_name=model_name, - ), - datamodel=DataModel( - modelType=ModelType.DBT, - description=manifest_node.description - if manifest_node.description - else None, - path=datamodel_path, - rawSql=dbt_raw_query if dbt_raw_query else "", - sql=dbt_compiled_query if dbt_compiled_query else "", - columns=self.parse_data_model_columns( - manifest_node, catalog_node - ), - upstream=self.parse_upstream_nodes( - manifest_entities, manifest_node - ), - owner=self.get_dbt_owner( - manifest_node=manifest_node, catalog_node=catalog_node - ), - tags=dbt_table_tags_list, - ), + # Get the table entity from ES + # TODO: Change to get_by_name once the postgres case sensitive calls is fixed + table_fqn = fqn.build( + self.metadata, + entity_type=Table, + service_name=self.config.serviceName, + database_name=self.get_corrected_name(manifest_node.database), + schema_name=self.get_corrected_name(manifest_node.schema_), + table_name=model_name, ) - yield data_model_link - self.context.data_model_links.append(data_model_link) + table_entity: Optional[ + Union[Table, List[Table]] + ] = get_entity_from_es_result( + entity_list=self.metadata.es_search_from_fqn( + entity_type=Table, fqn_search_string=table_fqn + ), + fetch_multiple_entities=False, + ) + + if table_entity: + data_model_link = DataModelLink( + table_entity=table_entity, + datamodel=DataModel( + modelType=ModelType.DBT, + description=manifest_node.description + if manifest_node.description + else None, + path=self.get_data_model_path( + manifest_node=manifest_node + ), + rawSql=dbt_raw_query if dbt_raw_query else "", + sql=dbt_compiled_query if dbt_compiled_query else "", + columns=self.parse_data_model_columns( + manifest_node, catalog_node + ), + upstream=self.parse_upstream_nodes( + manifest_entities, manifest_node + ), + owner=self.get_dbt_owner( + manifest_node=manifest_node, + catalog_node=catalog_node, + ), + tags=dbt_table_tags_list, + ), + ) + yield data_model_link + self.context.data_model_links.append(data_model_link) + else: + logger.warning( + f"Unable to find the table '{table_fqn}' in OpenMetadata" + f"Please check if the table exists is ingested in OpenMetadata" + f"And name, database, schema of the manifest node matches with the table present in OpenMetadata" # pylint: disable=line-too-long + ) except Exception as exc: logger.debug(traceback.format_exc()) logger.warning( @@ -498,6 +509,17 @@ class DbtSource(DbtServiceSource): # pylint: disable=too-many-public-methods correct_name = None if name.lower() in NONE_KEYWORDS_LIST else name return correct_name + def get_data_model_path(self, manifest_node): + datamodel_path = None + if manifest_node.original_file_path: + if hasattr(manifest_node, "root_path") and manifest_node.root_path: + datamodel_path = ( + f"{manifest_node.root_path}/{manifest_node.original_file_path}" + ) + else: + datamodel_path = manifest_node.original_file_path + return datamodel_path + def parse_upstream_nodes(self, manifest_entities, dbt_node): """ Method to fetch the upstream nodes @@ -587,16 +609,11 @@ class DbtSource(DbtServiceSource): # pylint: disable=too-many-public-methods """ Method to process DBT lineage from upstream nodes """ - logger.info(f"Processing DBT lineage for: {data_model_link.fqn.__root__}") + to_entity: Table = data_model_link.table_entity + logger.info( + f"Processing DBT lineage for: {to_entity.fullyQualifiedName.__root__}" + ) - # Get the table entity from ES - to_es_result = self.metadata.es_search_from_fqn( - entity_type=Table, - fqn_search_string=data_model_link.fqn.__root__, - ) - to_entity: Optional[Union[Table, List[Table]]] = get_entity_from_es_result( - entity_list=to_es_result, fetch_multiple_entities=False - ) for upstream_node in data_model_link.datamodel.upstream: try: from_es_result = self.metadata.es_search_from_fqn( @@ -634,11 +651,13 @@ class DbtSource(DbtServiceSource): # pylint: disable=too-many-public-methods """ Method to process DBT lineage from queries """ - table_fqn = data_model_link.fqn.__root__ - logger.info(f"Processing DBT Query lineage for: {table_fqn}") + to_entity: Table = data_model_link.table_entity + logger.info( + f"Processing DBT Query lineage for: {to_entity.fullyQualifiedName.__root__}" + ) try: - source_elements = fqn.split(table_fqn) + source_elements = fqn.split(to_entity.fullyQualifiedName.__root__) # remove service name from fqn to make it parseable in format db.schema.table query_fqn = fqn._build( # pylint: disable=protected-access *source_elements[-3:] @@ -671,24 +690,18 @@ class DbtSource(DbtServiceSource): # pylint: disable=too-many-public-methods """ Method to process DBT descriptions using patch APIs """ - logger.info(f"Processing DBT Descriptions for: {data_model_link.fqn.__root__}") - - # Get the table entity from ES - to_es_result = self.metadata.es_search_from_fqn( - entity_type=Table, - fqn_search_string=data_model_link.fqn.__root__, + table_entity: Table = data_model_link.table_entity + logger.info( + f"Processing DBT Descriptions for: {table_entity.fullyQualifiedName.__root__}" ) - to_entity: Optional[Union[Table, List[Table]]] = get_entity_from_es_result( - entity_list=to_es_result, fetch_multiple_entities=False - ) - if to_entity: + if table_entity: try: data_model = data_model_link.datamodel # Patch table descriptions from DBT if data_model.description: self.metadata.patch_description( entity=Table, - entity_id=to_entity.id, + entity_id=table_entity.id, description=data_model.description.__root__, force=self.source_config.dbtUpdateDescriptions, ) @@ -697,7 +710,7 @@ class DbtSource(DbtServiceSource): # pylint: disable=too-many-public-methods for column in data_model.columns: if column.description: self.metadata.patch_column_description( - entity_id=to_entity.id, + entity_id=table_entity.id, column_name=column.name.__root__, description=column.description.__root__, force=self.source_config.dbtUpdateDescriptions, @@ -705,7 +718,7 @@ class DbtSource(DbtServiceSource): # pylint: disable=too-many-public-methods except Exception as exc: # pylint: disable=broad-except logger.debug(traceback.format_exc()) logger.warning( - f"Failed to parse the node {data_model_link.fqn.__root__} to update dbt desctiption: {exc}" + f"Failed to parse the node {table_entity.fullyQualifiedName.__root__}to update dbt desctiption: {exc}" # pylint: disable=line-too-long ) def create_dbt_tests_suite( @@ -784,7 +797,7 @@ class DbtSource(DbtServiceSource): # pylint: disable=too-many-public-methods f"Processing DBT Test Case Definition for node: {manifest_node.name}" ) entity_link_list = self.generate_entity_link(dbt_test) - for entity_link in entity_link_list: + for entity_link_str in entity_link_list: test_suite_name = manifest_node.meta.get( DbtCommonEnum.TEST_SUITE_NAME.value, DbtCommonEnum.DBT_TEST_SUITE.value, @@ -795,7 +808,7 @@ class DbtSource(DbtServiceSource): # pylint: disable=too-many-public-methods testDefinition=FullyQualifiedEntityName( __root__=manifest_node.name ), - entityLink=entity_link, + entityLink=entity_link_str, testSuite=FullyQualifiedEntityName(__root__=test_suite_name), parameterValues=self.create_test_case_parameter_values( dbt_test @@ -903,16 +916,12 @@ class DbtSource(DbtServiceSource): # pylint: disable=too-many-public-methods Method returns entity link """ manifest_node = dbt_test.get(DbtCommonEnum.MANIFEST_NODE.value) - entity_link_list = [] - for table_fqn in dbt_test[DbtCommonEnum.UPSTREAM.value]: - column_name = manifest_node.column_name - if column_name: - entity_link = ( - f"<#E::table::" f"{table_fqn}" f"::columns::" f"{column_name}>" - ) - else: - entity_link = f"<#E::table::" f"{table_fqn}>" - entity_link_list.append(entity_link) + entity_link_list = [ + entity_link.get_entity_link( + table_fqn=table_fqn, column_name=manifest_node.column_name + ) + for table_fqn in dbt_test[DbtCommonEnum.UPSTREAM.value] + ] return entity_link_list def get_dbt_compiled_query(self, mnode) -> Optional[str]: diff --git a/ingestion/src/metadata/utils/entity_link.py b/ingestion/src/metadata/utils/entity_link.py index 181547cbc88..e2067944f20 100644 --- a/ingestion/src/metadata/utils/entity_link.py +++ b/ingestion/src/metadata/utils/entity_link.py @@ -13,7 +13,7 @@ Handle Entity Link building and splitting logic. Filter information has been taken from the ES indexes definitions """ -from typing import List +from typing import List, Optional from antlr4.CommonTokenStream import CommonTokenStream from antlr4.error.ErrorStrategy import BailErrorStrategy @@ -66,3 +66,18 @@ def get_table_fqn(entity_link: str) -> str: """ return entity_link.split("::")[2] + + +def get_entity_link(table_fqn: str, column_name: Optional[str]) -> str: + """From table fqn and column name get the entity_link + + Args: + table_fqn: table fqn + column_name: Optional param to generate entity link with column name + """ + + if column_name: + entity_link = f"<#E::table::" f"{table_fqn}" f"::columns::" f"{column_name}>" + else: + entity_link = f"<#E::table::" f"{table_fqn}>" + return entity_link diff --git a/ingestion/tests/unit/resources/datasets/manifest_test_node.json b/ingestion/tests/unit/resources/datasets/manifest_test_node.json new file mode 100644 index 00000000000..9b19bf24245 --- /dev/null +++ b/ingestion/tests/unit/resources/datasets/manifest_test_node.json @@ -0,0 +1,121 @@ +{ + "metadata": { + "dbt_schema_version": "https://schemas.getdbt.com/dbt/manifest/v7.json", + "dbt_version": "1.3.0", + "generated_at": "2023-01-17T19:05:57.859191Z", + "invocation_id": "0c4757bf-0a8f-4f24-a18a-4c2638bf7d8e", + "env": {}, + "project_id": "06e5b98c2db46f8a72cc4f66410e9b3b", + "user_id": null, + "send_anonymous_usage_stats": true, + "adapter_type": "redshift" + }, + "nodes": { + "test.jaffle_shop.unique_orders_order_id.fed79b3a6e": { + "test_metadata": { + "name": "unique", + "kwargs": { + "column_name": "order_id", + "model": "{{ get_where_subquery(ref('orders')) }}" + }, + "namespace": null + }, + "compiled": true, + "resource_type": "test", + "depends_on": { + "macros": [ + "macro.dbt.test_unique", + "macro.dbt.get_where_subquery" + ], + "nodes": [ + "model.jaffle_shop.orders" + ] + }, + "config": { + "enabled": true, + "alias": null, + "schema": "dbt_test__audit", + "database": null, + "tags": [], + "meta": { + "test_suite_name": "DBT TEST SUITE", + "test_suite_desciption": "Dbt test suite description" + }, + "materialized": "test", + "severity": "ERROR", + "store_failures": null, + "where": null, + "limit": null, + "fail_calc": "count(*)", + "warn_if": "!= 0", + "error_if": "!= 0" + }, + "database": "dev", + "schema": "dbt_jaffle_dbt_test__audit", + "fqn": [ + "jaffle_shop", + "unique_orders_order_id" + ], + "unique_id": "test.jaffle_shop.unique_orders_order_id.fed79b3a6e", + "raw_code": "{{ test_unique(**_dbt_generic_test_kwargs) }}", + "language": "sql", + "package_name": "jaffle_shop", + "root_path": "/Users/onkarravgan/Desktop/project/jaffle_shop", + "path": "unique_orders_order_id.sql", + "original_file_path": "models/schema.yml", + "name": "unique_orders_order_id", + "alias": "unique_orders_order_id", + "checksum": { + "name": "none", + "checksum": "" + }, + "tags": [], + "refs": [ + [ + "orders" + ] + ], + "sources": [], + "metrics": [], + "description": "", + "columns": {}, + "meta": { + "test_suite_name": "DBT TEST SUITE", + "test_suite_desciption": "Dbt test suite description" + }, + "docs": { + "show": true, + "node_color": null + }, + "patch_path": null, + "compiled_path": "target/compiled/jaffle_shop/models/schema.yml/unique_orders_order_id.sql", + "build_path": null, + "deferred": false, + "unrendered_config": { + "meta": { + "test_suite_name": "DBT TEST SUITE", + "test_suite_desciption": "Dbt test suite description" + } + }, + "created_at": 1673982251.748683, + "compiled_code": "\n \n \n\nselect\n order_id as unique_field,\n count(*) as n_records\n\nfrom \"dev\".\"dbt_jaffle\".\"orders\"\nwhere order_id is not null\ngroup by order_id\nhaving count(*) > 1\n\n\n", + "extra_ctes_injected": true, + "extra_ctes": [], + "relation_name": null, + "column_name": "order_id", + "file_key_name": "models.orders" + } + }, + "sources": {}, + "macros": { + + }, + "docs": { + }, + "exposures": {}, + "metrics": {}, + "selectors": {}, + "disabled": {}, + "parent_map": {}, + "child_map": {} +} \ No newline at end of file diff --git a/ingestion/tests/unit/resources/datasets/manifest_v4_v5_v6.json b/ingestion/tests/unit/resources/datasets/manifest_v4_v5_v6.json index 7ee0718271f..0cdc11523fb 100644 --- a/ingestion/tests/unit/resources/datasets/manifest_v4_v5_v6.json +++ b/ingestion/tests/unit/resources/datasets/manifest_v4_v5_v6.json @@ -136,201 +136,6 @@ "extra_ctes_injected": true, "extra_ctes": [], "relation_name": "\"dev\".\"dbt_jaffle\".\"customers\"" - }, - "model.jaffle_shop.orders": { - "compiled": true, - "resource_type": "model", - "depends_on": { - "macros": [], - "nodes": [ - "model.jaffle_shop.stg_orders", - "model.jaffle_shop.stg_payments" - ] - }, - "config": { - "enabled": true, - "alias": null, - "schema": null, - "database": null, - "tags": [ - "single_tag" - ], - "meta": { - "owner": "aaron_johnson0" - }, - "materialized": "table", - "incremental_strategy": null, - "persist_docs": {}, - "quoting": {}, - "column_types": {}, - "full_refresh": null, - "unique_key": null, - "on_schema_change": "ignore", - "grants": {}, - "packages": [], - "docs": { - "show": true - }, - "post-hook": [], - "pre-hook": [] - }, - "database": "dev", - "schema": "dbt_jaffle", - "fqn": [ - "jaffle_shop", - "orders" - ], - "unique_id": "model.jaffle_shop.orders", - "raw_sql": "sample raw orders code", - "package_name": "jaffle_shop", - "root_path": "sample/orders/root/path", - "path": "orders.sql", - "original_file_path": "models/orders.sql", - "name": "orders", - "alias": "orders", - "checksum": { - "name": "sha256", - "checksum": "53950235d8e29690d259e95ee49bda6a5b7911b44c739b738a646dc6014bcfcd" - }, - "tags": [ - "single_tag" - ], - "refs": [ - [ - "stg_orders" - ], - [ - "stg_payments" - ] - ], - "sources": [], - "description": "This table has basic information about orders, as well as some derived facts based on payments", - "columns": { - "order_id": { - "name": "order_id", - "description": "This is a unique identifier for an order", - "meta": {}, - "data_type": null, - "quote": null, - "tags": [] - }, - "customer_id": { - "name": "customer_id", - "description": "Foreign key to the customers table", - "meta": {}, - "data_type": null, - "quote": null, - "tags": [] - } - }, - "meta": { - "owner": "aaron_johnson0" - }, - "docs": { - "show": true - }, - "patch_path": "jaffle_shop://models/schema.yml", - "compiled_path": "target/compiled/jaffle_shop/models/orders.sql", - "build_path": null, - "deferred": false, - "unrendered_config": { - "materialized": "table", - "tags": "single_tag", - "meta": { - "owner": "aaron_johnson0" - } - }, - "created_at": 1673982251.742371, - "compiled_sql": "sample compiled code", - "extra_ctes_injected": true, - "extra_ctes": [], - "relation_name": "\"dev\".\"dbt_jaffle\".\"orders\"" - }, - "model.jaffle_shop.stg_customers": { - "compiled": true, - "resource_type": "model", - "depends_on": { - "macros": [], - "nodes": [ - "seed.jaffle_shop.raw_customers" - ] - }, - "config": { - "enabled": true, - "alias": null, - "schema": null, - "database": null, - "tags": [], - "meta": {}, - "materialized": "view", - "incremental_strategy": null, - "persist_docs": {}, - "quoting": {}, - "column_types": {}, - "full_refresh": null, - "unique_key": null, - "on_schema_change": "ignore", - "grants": {}, - "packages": [], - "docs": { - "show": true - }, - "post-hook": [], - "pre-hook": [] - }, - "database": "dev", - "schema": "dbt_jaffle", - "fqn": [ - "jaffle_shop", - "staging", - "stg_customers" - ], - "unique_id": "model.jaffle_shop.stg_customers", - "raw_sql": "sample stg_customers raw_code", - "package_name": "jaffle_shop", - "root_path": "sample/stg_customers/root/path", - "path": "staging/stg_customers.sql", - "original_file_path": "models/staging/stg_customers.sql", - "name": "stg_customers", - "alias": "stg_customers", - "checksum": { - "name": "sha256", - "checksum": "6f18a29204dad1de6dbb0c288144c4990742e0a1e065c3b2a67b5f98334c22ba" - }, - "tags": [], - "refs": [ - [ - "raw_customers" - ] - ], - "sources": [], - "description": "", - "columns": { - "customer_id": { - "name": "customer_id", - "description": "This is a unique identifier for an customer", - "meta": {}, - "data_type": null, - "quote": null, - "tags": [] - } - }, - "meta": {}, - "docs": { - "show": true - }, - "patch_path": "jaffle_shop://models/staging/schema.yml", - "compiled_path": "target/compiled/jaffle_shop/models/staging/stg_customers.sql", - "build_path": null, - "deferred": false, - "unrendered_config": { - "materialized": "view" - }, - "created_at": 1673978228.757611, - "compiled_sql": "sample stg_customers compiled code", - "extra_ctes_injected": true, - "extra_ctes": [], - "relation_name": "\"dev\".\"dbt_jaffle\".\"stg_customers\"" } }, "sources": {}, diff --git a/ingestion/tests/unit/resources/datasets/manifest_v7.json b/ingestion/tests/unit/resources/datasets/manifest_v7.json index e5f48f01c41..57ac77d6294 100644 --- a/ingestion/tests/unit/resources/datasets/manifest_v7.json +++ b/ingestion/tests/unit/resources/datasets/manifest_v7.json @@ -140,209 +140,6 @@ "extra_ctes_injected": true, "extra_ctes": [], "relation_name": "\"dev\".\"dbt_jaffle\".\"customers\"" - }, - "model.jaffle_shop.orders": { - "compiled": true, - "resource_type": "model", - "depends_on": { - "macros": [], - "nodes": [ - "model.jaffle_shop.stg_orders", - "model.jaffle_shop.stg_payments" - ] - }, - "config": { - "enabled": true, - "alias": null, - "schema": null, - "database": null, - "tags": [ - "single_tag" - ], - "meta": { - "owner": "aaron_johnson0" - }, - "materialized": "table", - "incremental_strategy": null, - "persist_docs": {}, - "quoting": {}, - "column_types": {}, - "full_refresh": null, - "unique_key": null, - "on_schema_change": "ignore", - "grants": {}, - "packages": [], - "docs": { - "show": true, - "node_color": null - }, - "post-hook": [], - "pre-hook": [] - }, - "database": "dev", - "schema": "dbt_jaffle", - "fqn": [ - "jaffle_shop", - "orders" - ], - "unique_id": "model.jaffle_shop.orders", - "raw_code": "sample raw orders code", - "language": "sql", - "package_name": "jaffle_shop", - "root_path": "sample/orders/root/path", - "path": "orders.sql", - "original_file_path": "models/orders.sql", - "name": "orders", - "alias": "orders", - "checksum": { - "name": "sha256", - "checksum": "53950235d8e29690d259e95ee49bda6a5b7911b44c739b738a646dc6014bcfcd" - }, - "tags": [ - "single_tag" - ], - "refs": [ - [ - "stg_orders" - ], - [ - "stg_payments" - ] - ], - "sources": [], - "metrics": [], - "description": "This table has basic information about orders, as well as some derived facts based on payments", - "columns": { - "order_id": { - "name": "order_id", - "description": "This is a unique identifier for an order", - "meta": {}, - "data_type": null, - "quote": null, - "tags": [] - }, - "customer_id": { - "name": "customer_id", - "description": "Foreign key to the customers table", - "meta": {}, - "data_type": null, - "quote": null, - "tags": [] - } - }, - "meta": { - "owner": "aaron_johnson0" - }, - "docs": { - "show": true, - "node_color": null - }, - "patch_path": "jaffle_shop://models/schema.yml", - "compiled_path": "target/compiled/jaffle_shop/models/orders.sql", - "build_path": null, - "deferred": false, - "unrendered_config": { - "materialized": "table", - "tags": "single_tag", - "meta": { - "owner": "aaron_johnson0" - } - }, - "created_at": 1673982251.742371, - "compiled_code": "sample compiled code", - "extra_ctes_injected": true, - "extra_ctes": [], - "relation_name": "\"dev\".\"dbt_jaffle\".\"orders\"" - }, - "model.jaffle_shop.stg_customers": { - "compiled": true, - "resource_type": "model", - "depends_on": { - "macros": [], - "nodes": [ - "seed.jaffle_shop.raw_customers" - ] - }, - "config": { - "enabled": true, - "alias": null, - "schema": null, - "database": null, - "tags": [], - "meta": {}, - "materialized": "view", - "incremental_strategy": null, - "persist_docs": {}, - "quoting": {}, - "column_types": {}, - "full_refresh": null, - "unique_key": null, - "on_schema_change": "ignore", - "grants": {}, - "packages": [], - "docs": { - "show": true, - "node_color": null - }, - "post-hook": [], - "pre-hook": [] - }, - "database": "dev", - "schema": "dbt_jaffle", - "fqn": [ - "jaffle_shop", - "staging", - "stg_customers" - ], - "unique_id": "model.jaffle_shop.stg_customers", - "raw_code": "sample stg_customers raw_code", - "language": "sql", - "package_name": "jaffle_shop", - "root_path": "sample/stg_customers/root/path", - "path": "staging/stg_customers.sql", - "original_file_path": "models/staging/stg_customers.sql", - "name": "stg_customers", - "alias": "stg_customers", - "checksum": { - "name": "sha256", - "checksum": "6f18a29204dad1de6dbb0c288144c4990742e0a1e065c3b2a67b5f98334c22ba" - }, - "tags": [], - "refs": [ - [ - "raw_customers" - ] - ], - "sources": [], - "metrics": [], - "description": "", - "columns": { - "customer_id": { - "name": "customer_id", - "description": "This is a unique identifier for an customer", - "meta": {}, - "data_type": null, - "quote": null, - "tags": [] - } - }, - "meta": {}, - "docs": { - "show": true, - "node_color": null - }, - "patch_path": "jaffle_shop://models/staging/schema.yml", - "compiled_path": "target/compiled/jaffle_shop/models/staging/stg_customers.sql", - "build_path": null, - "deferred": false, - "unrendered_config": { - "materialized": "view" - }, - "created_at": 1673978228.757611, - "compiled_code": "sample stg_customers compiled code", - "extra_ctes_injected": true, - "extra_ctes": [], - "relation_name": "\"dev\".\"dbt_jaffle\".\"stg_customers\"" } }, "sources": {}, diff --git a/ingestion/tests/unit/resources/datasets/manifest_v8.json b/ingestion/tests/unit/resources/datasets/manifest_v8.json index d1ecb8915e9..407c47b6dcb 100644 --- a/ingestion/tests/unit/resources/datasets/manifest_v8.json +++ b/ingestion/tests/unit/resources/datasets/manifest_v8.json @@ -139,207 +139,6 @@ "extra_ctes_injected": true, "extra_ctes": [], "relation_name": "\"dev\".\"dbt_jaffle\".\"customers\"" - }, - "model.jaffle_shop.orders": { - "compiled": true, - "resource_type": "model", - "depends_on": { - "macros": [], - "nodes": [ - "model.jaffle_shop.stg_orders", - "model.jaffle_shop.stg_payments" - ] - }, - "config": { - "enabled": true, - "alias": null, - "schema": null, - "database": null, - "tags": [ - "single_tag" - ], - "meta": { - "owner": "aaron_johnson0" - }, - "materialized": "table", - "incremental_strategy": null, - "persist_docs": {}, - "quoting": {}, - "column_types": {}, - "full_refresh": null, - "unique_key": null, - "on_schema_change": "ignore", - "grants": {}, - "packages": [], - "docs": { - "show": true, - "node_color": null - }, - "post-hook": [], - "pre-hook": [] - }, - "database": "dev", - "schema": "dbt_jaffle", - "fqn": [ - "jaffle_shop", - "orders" - ], - "unique_id": "model.jaffle_shop.orders", - "raw_code": "sample raw orders code", - "language": "sql", - "package_name": "jaffle_shop", - "path": "orders.sql", - "original_file_path": "sample/orders/root/path/models/orders.sql", - "name": "orders", - "alias": "orders", - "checksum": { - "name": "sha256", - "checksum": "53950235d8e29690d259e95ee49bda6a5b7911b44c739b738a646dc6014bcfcd" - }, - "tags": [ - "single_tag" - ], - "refs": [ - [ - "stg_orders" - ], - [ - "stg_payments" - ] - ], - "sources": [], - "metrics": [], - "description": "This table has basic information about orders, as well as some derived facts based on payments", - "columns": { - "order_id": { - "name": "order_id", - "description": "This is a unique identifier for an order", - "meta": {}, - "data_type": null, - "quote": null, - "tags": [] - }, - "customer_id": { - "name": "customer_id", - "description": "Foreign key to the customers table", - "meta": {}, - "data_type": null, - "quote": null, - "tags": [] - } - }, - "meta": { - "owner": "aaron_johnson0" - }, - "docs": { - "show": true, - "node_color": null - }, - "patch_path": "jaffle_shop://models/schema.yml", - "compiled_path": "target/compiled/jaffle_shop/models/orders.sql", - "build_path": null, - "deferred": false, - "unrendered_config": { - "materialized": "table", - "tags": "single_tag", - "meta": { - "owner": "aaron_johnson0" - } - }, - "created_at": 1673982251.742371, - "compiled_code": "sample compiled code", - "extra_ctes_injected": true, - "extra_ctes": [], - "relation_name": "\"dev\".\"dbt_jaffle\".\"orders\"" - }, - "model.jaffle_shop.stg_customers": { - "compiled": true, - "resource_type": "model", - "depends_on": { - "macros": [], - "nodes": [ - "seed.jaffle_shop.raw_customers" - ] - }, - "config": { - "enabled": true, - "alias": null, - "schema": null, - "database": null, - "tags": [], - "meta": {}, - "materialized": "view", - "incremental_strategy": null, - "persist_docs": {}, - "quoting": {}, - "column_types": {}, - "full_refresh": null, - "unique_key": null, - "on_schema_change": "ignore", - "grants": {}, - "packages": [], - "docs": { - "show": true, - "node_color": null - }, - "post-hook": [], - "pre-hook": [] - }, - "database": "dev", - "schema": "dbt_jaffle", - "fqn": [ - "jaffle_shop", - "staging", - "stg_customers" - ], - "unique_id": "model.jaffle_shop.stg_customers", - "raw_code": "sample stg_customers raw_code", - "language": "sql", - "package_name": "jaffle_shop", - "path": "staging/stg_customers.sql", - "original_file_path": "sample/stg_customers/root/path/models/staging/stg_customers.sql", - "name": "stg_customers", - "alias": "stg_customers", - "checksum": { - "name": "sha256", - "checksum": "6f18a29204dad1de6dbb0c288144c4990742e0a1e065c3b2a67b5f98334c22ba" - }, - "tags": [], - "refs": [ - [ - "raw_customers" - ] - ], - "sources": [], - "metrics": [], - "description": "", - "columns": { - "customer_id": { - "name": "customer_id", - "description": "This is a unique identifier for an customer", - "meta": {}, - "data_type": null, - "quote": null, - "tags": [] - } - }, - "meta": {}, - "docs": { - "show": true, - "node_color": null - }, - "patch_path": "jaffle_shop://models/staging/schema.yml", - "compiled_path": "target/compiled/jaffle_shop/models/staging/stg_customers.sql", - "build_path": null, - "deferred": false, - "unrendered_config": { - "materialized": "view" - }, - "created_at": 1673978228.757611, - "compiled_code": "sample stg_customers compiled code", - "extra_ctes_injected": true, - "extra_ctes": [], - "relation_name": "\"dev\".\"dbt_jaffle\".\"stg_customers\"" } }, "sources": {}, diff --git a/ingestion/tests/unit/test_dbt.py b/ingestion/tests/unit/test_dbt.py index d4299ced964..f150624e8f1 100644 --- a/ingestion/tests/unit/test_dbt.py +++ b/ingestion/tests/unit/test_dbt.py @@ -12,11 +12,17 @@ from dbt_artifacts_parser.parser import parse_catalog, parse_manifest, parse_run from pydantic import AnyUrl from metadata.generated.schema.entity.data.table import Column, DataModel, Table +from metadata.generated.schema.entity.teams.user import User from metadata.generated.schema.metadataIngestion.workflow import ( OpenMetadataWorkflowConfig, ) from metadata.generated.schema.type.entityReference import EntityReference -from metadata.generated.schema.type.tagLabel import TagLabel +from metadata.generated.schema.type.tagLabel import ( + LabelType, + State, + TagLabel, + TagSource, +) from metadata.ingestion.source.database.database_service import DataModelLink from metadata.ingestion.source.database.dbt.metadata import DbtSource from metadata.utils.dbt_config import DbtFiles, DbtObjects @@ -61,6 +67,8 @@ MOCK_SAMPLE_MANIFEST_V8 = "resources/datasets/manifest_v8.json" MOCK_SAMPLE_MANIFEST_NULL_DB = "resources/datasets/manifest_null_db.json" +MOCK_SAMPLE_MANIFEST_TEST_NODE = "resources/datasets/manifest_test_node.json" + EXPECTED_DATA_MODEL_FQNS = [ "dbt_test.dev.dbt_jaffle.customers", @@ -76,7 +84,7 @@ EXPECTED_DATA_MODELS = [ path="sample/customers/root/path/models/customers.sql", rawSql="sample customers raw code", sql="sample customers compile code", - upstream=["dbt_test.dev.dbt_jaffle.stg_customers"], + upstream=[], owner=EntityReference( id="cb2a92f5-e935-4ad7-911c-654280046538", type="user", @@ -133,95 +141,7 @@ EXPECTED_DATA_MODELS = [ ), ], generatedAt=None, - ), - DataModel( - modelType="DBT", - description="This table has basic information about orders, as well as some derived facts based on payments", - path="sample/orders/root/path/models/orders.sql", - rawSql="sample raw orders code", - sql="sample compiled code", - upstream=[], - owner=EntityReference( - id="cb2a92f5-e935-4ad7-911c-654280046538", - type="user", - name=None, - fullyQualifiedName="aaron_johnson0", - description=None, - displayName=None, - deleted=None, - href=AnyUrl( - "http://localhost:8585/api/v1/users/cb2a92f5-e935-4ad7-911c-654280046538", - scheme="http", - host="localhost", - host_type="int_domain", - port="8585", - path="/api/v1/users/cb2a92f5-e935-4ad7-911c-654280046538", - ), - ), - tags=[ - TagLabel( - tagFQN="dbtTags.single_tag", - description=None, - source="Tag", - labelType="Automated", - state="Confirmed", - href=None, - ) - ], - columns=[ - Column( - name="order_id", - displayName=None, - dataType="VARCHAR", - dataLength=1, - description="This is a unique identifier for an order", - ), - Column( - name="customer_id", - displayName=None, - dataType="VARCHAR", - dataLength=1, - description="Foreign key to the customers table", - ), - ], - generatedAt=None, - ), - DataModel( - modelType="DBT", - description=None, - path="sample/stg_customers/root/path/models/staging/stg_customers.sql", - rawSql="sample stg_customers raw_code", - sql="sample stg_customers compiled code", - upstream=[], - owner=EntityReference( - id="cb2a92f5-e935-4ad7-911c-654280046538", - type="user", - name=None, - fullyQualifiedName="aaron_johnson0", - description=None, - displayName=None, - deleted=None, - href=AnyUrl( - "http://localhost:8585/api/v1/users/cb2a92f5-e935-4ad7-911c-654280046538", - scheme="http", - host="localhost", - host_type="int_domain", - port="8585", - path="/api/v1/users/cb2a92f5-e935-4ad7-911c-654280046538", - ), - ), - tags=None, - columns=[ - Column( - name="customer_id", - displayName=None, - dataType="VARCHAR", - dataLength=1, - description="This is a unique identifier for an customer", - ) - ], - generatedAt=None, - ), + ) ] EXPECTED_DATA_MODEL_NULL_DB = [ @@ -281,14 +201,56 @@ MOCK_OWNER = EntityReference( ), ) +MOCK_USER = [ + User( + id=uuid.uuid4(), + name="aaron_johnson0", + email="aaron_johnson0@gmail.com", + href="http://localhost:8585/api/v1/users/d96eccb9-9a9b-40ad-9585-0a8a71665c51", + fullyQualifiedName="aaron_johnson0", + ) +] + + +MOCK_TABLE_ENTITIES = [ + Table( + id=uuid.uuid4(), + name="customers", + databaseSchema=EntityReference(id=uuid.uuid4(), type="databaseSchema"), + fullyQualifiedName="dbt_test.dev.dbt_jaffle.customers", + columns=[], + ) +] + MOCK_NULL_DB_TABLE = [ Table( id=uuid.uuid4(), - name="test", + name="customers_null_db", databaseSchema=EntityReference(id=uuid.uuid4(), type="databaseSchema"), fullyQualifiedName="dbt_test.dev.dbt_jaffle.customers_null_db", columns=[], - ) + ), +] + +MOCK_TAG_LABELS = [ + TagLabel( + tagFQN="dbtTags.tag1", + labelType=LabelType.Automated, + state=State.Confirmed, + source=TagSource.Tag, + ), + TagLabel( + tagFQN="dbtTags.tag2name", + labelType=LabelType.Automated, + state=State.Confirmed, + source=TagSource.Tag, + ), + TagLabel( + tagFQN="dbtTags.tag3", + labelType=LabelType.Automated, + state=State.Confirmed, + source=TagSource.Tag, + ), ] @@ -309,29 +271,35 @@ class DbtUnitTest(TestCase): ) @patch("metadata.ingestion.source.database.dbt.metadata.DbtSource.get_dbt_owner") - def test_dbt_manifest_v4_v5_v6(self, get_dbt_owner): + @patch("metadata.ingestion.ometa.mixins.es_mixin.ESMixin.es_search_from_fqn") + def test_dbt_manifest_v4_v5_v6(self, es_search_from_fqn, get_dbt_owner): get_dbt_owner.return_value = MOCK_OWNER + es_search_from_fqn.side_effect = MOCK_TABLE_ENTITIES self.execute_test( MOCK_SAMPLE_MANIFEST_V4_V5_V6, - expected_records=4, + expected_records=2, expected_data_models=EXPECTED_DATA_MODELS, ) @patch("metadata.ingestion.source.database.dbt.metadata.DbtSource.get_dbt_owner") - def test_dbt_manifest_v7(self, get_dbt_owner): + @patch("metadata.ingestion.ometa.mixins.es_mixin.ESMixin.es_search_from_fqn") + def test_dbt_manifest_v7(self, es_search_from_fqn, get_dbt_owner): get_dbt_owner.return_value = MOCK_OWNER + es_search_from_fqn.side_effect = MOCK_TABLE_ENTITIES self.execute_test( MOCK_SAMPLE_MANIFEST_V7, - expected_records=4, + expected_records=2, expected_data_models=EXPECTED_DATA_MODELS, ) @patch("metadata.ingestion.source.database.dbt.metadata.DbtSource.get_dbt_owner") - def test_dbt_manifest_v8(self, get_dbt_owner): + @patch("metadata.ingestion.ometa.mixins.es_mixin.ESMixin.es_search_from_fqn") + def test_dbt_manifest_v8(self, es_search_from_fqn, get_dbt_owner): get_dbt_owner.return_value = MOCK_OWNER + es_search_from_fqn.return_value = MOCK_TABLE_ENTITIES self.execute_test( MOCK_SAMPLE_MANIFEST_V8, - expected_records=4, + expected_records=2, expected_data_models=EXPECTED_DATA_MODELS, ) @@ -346,7 +314,116 @@ class DbtUnitTest(TestCase): expected_data_models=EXPECTED_DATA_MODEL_NULL_DB, ) + def test_dbt_get_corrected_name(self): + self.assertEqual( + "dbt_jaffle", self.dbt_source_obj.get_corrected_name(name="dbt_jaffle") + ) + self.assertIsNone(self.dbt_source_obj.get_corrected_name(name="None")) + self.assertIsNone(self.dbt_source_obj.get_corrected_name(name="null")) + self.assertIsNotNone(self.dbt_source_obj.get_corrected_name(name="dev")) + + def test_dbt_get_dbt_tag_labels(self): + result = self.dbt_source_obj.get_dbt_tag_labels(["tag1", "tag2.name", "tag3"]) + self.assertListEqual(result, MOCK_TAG_LABELS) + + def test_dbt_get_data_model_path(self): + _, dbt_objects = self.get_dbt_object_files( + mock_manifest=MOCK_SAMPLE_MANIFEST_V8 + ) + manifest_node = dbt_objects.dbt_manifest.nodes.get( + "model.jaffle_shop.customers" + ) + result = self.dbt_source_obj.get_data_model_path(manifest_node=manifest_node) + self.assertEqual("sample/customers/root/path/models/customers.sql", result) + + def test_dbt_generate_entity_link(self): + _, dbt_objects = self.get_dbt_object_files( + mock_manifest=MOCK_SAMPLE_MANIFEST_TEST_NODE + ) + manifest_node = dbt_objects.dbt_manifest.nodes.get( + "test.jaffle_shop.unique_orders_order_id.fed79b3a6e" + ) + dbt_test = { + "manifest_node": manifest_node, + "upstream": ["local_redshift_dbt2.dev.dbt_jaffle.stg_customers"], + "results": "", + } + result = self.dbt_source_obj.generate_entity_link(dbt_test=dbt_test) + self.assertListEqual( + [ + "<#E::table::local_redshift_dbt2.dev.dbt_jaffle.stg_customers::columns::order_id>" + ], + result, + ) + + def test_dbt_compiled_query(self): + expected_query = "sample customers compile code" + + # Test the compiled queries with v8 manifest + _, dbt_objects = self.get_dbt_object_files( + mock_manifest=MOCK_SAMPLE_MANIFEST_V8 + ) + manifest_node = dbt_objects.dbt_manifest.nodes.get( + "model.jaffle_shop.customers" + ) + result = self.dbt_source_obj.get_dbt_compiled_query(mnode=manifest_node) + self.assertEqual(expected_query, result) + + # Test the compiled queries with v4 v5 v6 manifest + _, dbt_objects = self.get_dbt_object_files( + mock_manifest=MOCK_SAMPLE_MANIFEST_V4_V5_V6 + ) + manifest_node = dbt_objects.dbt_manifest.nodes.get( + "model.jaffle_shop.customers" + ) + result = self.dbt_source_obj.get_dbt_compiled_query(mnode=manifest_node) + self.assertEqual(expected_query, result) + + def test_dbt_raw_query(self): + expected_query = "sample customers raw code" + + # Test the raw queries with v8 manifest + _, dbt_objects = self.get_dbt_object_files( + mock_manifest=MOCK_SAMPLE_MANIFEST_V8 + ) + manifest_node = dbt_objects.dbt_manifest.nodes.get( + "model.jaffle_shop.customers" + ) + result = self.dbt_source_obj.get_dbt_raw_query(mnode=manifest_node) + self.assertEqual(expected_query, result) + + # Test the raw queries with v4 v5 v6 manifest + _, dbt_objects = self.get_dbt_object_files( + mock_manifest=MOCK_SAMPLE_MANIFEST_V4_V5_V6 + ) + manifest_node = dbt_objects.dbt_manifest.nodes.get( + "model.jaffle_shop.customers" + ) + result = self.dbt_source_obj.get_dbt_raw_query(mnode=manifest_node) + self.assertEqual(expected_query, result) + + @patch("metadata.ingestion.ometa.mixins.es_mixin.ESMixin.es_search_from_fqn") + def test_dbt_owner(self, es_search_from_fqn): + es_search_from_fqn.return_value = MOCK_USER + _, dbt_objects = self.get_dbt_object_files( + mock_manifest=MOCK_SAMPLE_MANIFEST_V8 + ) + manifest_node = dbt_objects.dbt_manifest.nodes.get( + "model.jaffle_shop.customers" + ) + result = self.dbt_source_obj.get_dbt_owner( + manifest_node=manifest_node, catalog_node=None + ) + self.assertEqual("aaron_johnson0", result.fullyQualifiedName) + def execute_test(self, mock_manifest, expected_records, expected_data_models): + dbt_files, dbt_objects = self.get_dbt_object_files(mock_manifest) + self.check_dbt_validate(dbt_files=dbt_files, expected_records=expected_records) + self.check_yield_datamodel( + dbt_objects=dbt_objects, expected_data_models=expected_data_models + ) + + def get_dbt_object_files(self, mock_manifest): mock_file_path = Path(__file__).parent / mock_manifest with open(mock_file_path) as file: mock_data: dict = json.load(file) @@ -360,10 +437,7 @@ class DbtUnitTest(TestCase): if dbt_files.dbt_run_results else None, ) - self.check_dbt_validate(dbt_files=dbt_files, expected_records=expected_records) - self.check_yield_datamodel( - dbt_objects=dbt_objects, expected_data_models=expected_data_models - ) + return dbt_files, dbt_objects def check_dbt_validate(self, dbt_files, expected_records): with self.assertLogs() as captured: @@ -380,7 +454,10 @@ class DbtUnitTest(TestCase): ) for data_model_link in yield_data_models: if isinstance(data_model_link, DataModelLink): - self.assertIn(data_model_link.fqn.__root__, EXPECTED_DATA_MODEL_FQNS) + self.assertIn( + data_model_link.table_entity.fullyQualifiedName.__root__, + EXPECTED_DATA_MODEL_FQNS, + ) data_model_list.append(data_model_link.datamodel) for _, (exptected, original) in enumerate(