mirror of
				https://github.com/datahub-project/datahub.git
				synced 2025-10-25 07:54:37 +00:00 
			
		
		
		
	feat(ingest/dbt): support use_compiled_code and test_warnings_are_errors (#8956)
				
					
				
			This commit is contained in:
		
							parent
							
								
									c9309ff157
								
							
						
					
					
						commit
						3cede10ab3
					
				| @ -4,7 +4,7 @@ from pydantic import validator | ||||
| from pydantic.fields import Field | ||||
| 
 | ||||
| from datahub.configuration.common import ConfigModel, ConfigurationError | ||||
| from datahub.configuration.pydantic_field_deprecation import pydantic_field_deprecated | ||||
| from datahub.configuration.validate_field_deprecation import pydantic_field_deprecated | ||||
| from datahub.metadata.schema_classes import FabricTypeClass | ||||
| 
 | ||||
| DEFAULT_ENV = FabricTypeClass.PROD | ||||
|  | ||||
| @ -1,20 +1,28 @@ | ||||
| import warnings | ||||
| from typing import Optional, Type | ||||
| from typing import Any, Optional, Type | ||||
| 
 | ||||
| import pydantic | ||||
| 
 | ||||
| from datahub.configuration.common import ConfigurationWarning | ||||
| from datahub.utilities.global_warning_util import add_global_warning | ||||
| 
 | ||||
| _unset = object() | ||||
| 
 | ||||
| def pydantic_field_deprecated(field: str, message: Optional[str] = None) -> classmethod: | ||||
| 
 | ||||
| def pydantic_field_deprecated( | ||||
|     field: str, | ||||
|     warn_if_value_is_not: Any = _unset, | ||||
|     message: Optional[str] = None, | ||||
| ) -> classmethod: | ||||
|     if message: | ||||
|         output = message | ||||
|     else: | ||||
|         output = f"{field} is deprecated and will be removed in a future release. Please remove it from your config." | ||||
| 
 | ||||
|     def _validate_deprecated(cls: Type, values: dict) -> dict: | ||||
|         if field in values: | ||||
|         if field in values and ( | ||||
|             warn_if_value_is_not is _unset or values[field] != warn_if_value_is_not | ||||
|         ): | ||||
|             add_global_warning(output) | ||||
|             warnings.warn(output, ConfigurationWarning, stacklevel=2) | ||||
|         return values | ||||
| @ -18,8 +18,8 @@ from datahub.configuration.common import ( | ||||
|     ConfigurationError, | ||||
|     LineageConfig, | ||||
| ) | ||||
| from datahub.configuration.pydantic_field_deprecation import pydantic_field_deprecated | ||||
| from datahub.configuration.source_common import DatasetSourceConfigMixin | ||||
| from datahub.configuration.validate_field_deprecation import pydantic_field_deprecated | ||||
| from datahub.emitter import mce_builder | ||||
| from datahub.emitter.mcp import MetadataChangeProposalWrapper | ||||
| from datahub.ingestion.api.common import PipelineContext | ||||
| @ -214,7 +214,9 @@ class DBTCommonConfig( | ||||
|         default=False, | ||||
|         description="Use model identifier instead of model name if defined (if not, default to model name).", | ||||
|     ) | ||||
|     _deprecate_use_identifiers = pydantic_field_deprecated("use_identifiers") | ||||
|     _deprecate_use_identifiers = pydantic_field_deprecated( | ||||
|         "use_identifiers", warn_if_value_is_not=False | ||||
|     ) | ||||
| 
 | ||||
|     entities_enabled: DBTEntitiesEnabled = Field( | ||||
|         DBTEntitiesEnabled(), | ||||
| @ -278,6 +280,14 @@ class DBTCommonConfig( | ||||
|         description="When enabled, converts column URNs to lowercase to ensure cross-platform compatibility. " | ||||
|         "If `target_platform` is Snowflake, the default is True.", | ||||
|     ) | ||||
|     use_compiled_code: bool = Field( | ||||
|         default=False, | ||||
|         description="When enabled, uses the compiled dbt code instead of the raw dbt node definition.", | ||||
|     ) | ||||
|     test_warnings_are_errors: bool = Field( | ||||
|         default=False, | ||||
|         description="When enabled, dbt test warnings will be treated as failures.", | ||||
|     ) | ||||
| 
 | ||||
|     @validator("target_platform") | ||||
|     def validate_target_platform_value(cls, target_platform: str) -> str: | ||||
| @ -811,7 +821,7 @@ class DBTSourceBase(StatefulIngestionSourceBase): | ||||
|                         mce_builder.make_schema_field_urn(upstream_urn, column_name) | ||||
|                     ], | ||||
|                     nativeType=node.name, | ||||
|                     logic=node.compiled_code if node.compiled_code else node.raw_code, | ||||
|                     logic=node.compiled_code or node.raw_code, | ||||
|                     aggregation=AssertionStdAggregationClass._NATIVE_, | ||||
|                     nativeParameters=string_map(kw_args), | ||||
|                 ), | ||||
| @ -825,7 +835,7 @@ class DBTSourceBase(StatefulIngestionSourceBase): | ||||
|                     dataset=upstream_urn, | ||||
|                     scope=DatasetAssertionScopeClass.DATASET_ROWS, | ||||
|                     operator=AssertionStdOperatorClass._NATIVE_, | ||||
|                     logic=node.compiled_code if node.compiled_code else node.raw_code, | ||||
|                     logic=node.compiled_code or node.raw_code, | ||||
|                     nativeType=node.name, | ||||
|                     aggregation=AssertionStdAggregationClass._NATIVE_, | ||||
|                     nativeParameters=string_map(kw_args), | ||||
| @ -856,6 +866,10 @@ class DBTSourceBase(StatefulIngestionSourceBase): | ||||
|             result=AssertionResultClass( | ||||
|                 type=AssertionResultTypeClass.SUCCESS | ||||
|                 if test_result.status == "pass" | ||||
|                 or ( | ||||
|                     not self.config.test_warnings_are_errors | ||||
|                     and test_result.status == "warn" | ||||
|                 ) | ||||
|                 else AssertionResultTypeClass.FAILURE, | ||||
|                 nativeResults=test_result.native_results, | ||||
|             ), | ||||
| @ -1007,8 +1021,8 @@ class DBTSourceBase(StatefulIngestionSourceBase): | ||||
|                     aspects.append(upstream_lineage_class) | ||||
| 
 | ||||
|                 # add view properties aspect | ||||
|                 if node.raw_code and node.language == "sql": | ||||
|                 view_prop_aspect = self._create_view_properties_aspect(node) | ||||
|                 if view_prop_aspect: | ||||
|                     aspects.append(view_prop_aspect) | ||||
| 
 | ||||
|                 # emit subtype mcp | ||||
| @ -1133,14 +1147,21 @@ class DBTSourceBase(StatefulIngestionSourceBase): | ||||
|     def get_external_url(self, node: DBTNode) -> Optional[str]: | ||||
|         pass | ||||
| 
 | ||||
|     def _create_view_properties_aspect(self, node: DBTNode) -> ViewPropertiesClass: | ||||
|     def _create_view_properties_aspect( | ||||
|         self, node: DBTNode | ||||
|     ) -> Optional[ViewPropertiesClass]: | ||||
|         view_logic = ( | ||||
|             node.compiled_code if self.config.use_compiled_code else node.raw_code | ||||
|         ) | ||||
| 
 | ||||
|         if node.language != "sql" or not view_logic: | ||||
|             return None | ||||
| 
 | ||||
|         materialized = node.materialization in {"table", "incremental", "snapshot"} | ||||
|         # this function is only called when raw sql is present. assert is added to satisfy lint checks | ||||
|         assert node.raw_code is not None | ||||
|         view_properties = ViewPropertiesClass( | ||||
|             materialized=materialized, | ||||
|             viewLanguage="SQL", | ||||
|             viewLogic=node.raw_code, | ||||
|             viewLogic=view_logic, | ||||
|         ) | ||||
|         return view_properties | ||||
| 
 | ||||
|  | ||||
| @ -16,7 +16,7 @@ from pydantic import validator | ||||
| from pydantic.fields import Field | ||||
| 
 | ||||
| from datahub.configuration.common import ConfigEnum, ConfigModel, ConfigurationError | ||||
| from datahub.configuration.pydantic_field_deprecation import pydantic_field_deprecated | ||||
| from datahub.configuration.validate_field_deprecation import pydantic_field_deprecated | ||||
| from datahub.configuration.validate_field_rename import pydantic_renamed_field | ||||
| from datahub.emitter.mcp import MetadataChangeProposalWrapper | ||||
| from datahub.ingestion.api.common import PipelineContext | ||||
|  | ||||
| @ -9,8 +9,8 @@ from pydantic.class_validators import root_validator | ||||
| 
 | ||||
| import datahub.emitter.mce_builder as builder | ||||
| from datahub.configuration.common import AllowDenyPattern, ConfigModel | ||||
| from datahub.configuration.pydantic_field_deprecation import pydantic_field_deprecated | ||||
| from datahub.configuration.source_common import DEFAULT_ENV, DatasetSourceConfigMixin | ||||
| from datahub.configuration.validate_field_deprecation import pydantic_field_deprecated | ||||
| from datahub.ingestion.source.common.subtypes import BIAssetSubTypes | ||||
| from datahub.ingestion.source.state.stale_entity_removal_handler import ( | ||||
|     StaleEntityRemovalSourceReport, | ||||
|  | ||||
| @ -7,8 +7,8 @@ from pydantic.fields import Field | ||||
| 
 | ||||
| from datahub.configuration import ConfigModel | ||||
| from datahub.configuration.common import AllowDenyPattern | ||||
| from datahub.configuration.pydantic_field_deprecation import pydantic_field_deprecated | ||||
| from datahub.configuration.source_common import DatasetLineageProviderConfigBase | ||||
| from datahub.configuration.validate_field_deprecation import pydantic_field_deprecated | ||||
| from datahub.ingestion.source.data_lake_common.path_spec import PathSpec | ||||
| from datahub.ingestion.source.sql.postgres import BasePostgresConfig | ||||
| from datahub.ingestion.source.state.stateful_ingestion_base import ( | ||||
|  | ||||
| @ -5,8 +5,8 @@ import pydantic | ||||
| from pydantic.fields import Field | ||||
| 
 | ||||
| from datahub.configuration.common import AllowDenyPattern | ||||
| from datahub.configuration.pydantic_field_deprecation import pydantic_field_deprecated | ||||
| from datahub.configuration.source_common import DatasetSourceConfigMixin | ||||
| from datahub.configuration.validate_field_deprecation import pydantic_field_deprecated | ||||
| from datahub.configuration.validate_field_rename import pydantic_renamed_field | ||||
| from datahub.ingestion.source.aws.aws_common import AwsConnectionConfig | ||||
| from datahub.ingestion.source.data_lake_common.config import PathSpecsConfigMixin | ||||
|  | ||||
| @ -19,9 +19,9 @@ from sqlalchemy.sql import sqltypes | ||||
| from sqlalchemy.types import BOOLEAN, DATE, DATETIME, INTEGER | ||||
| 
 | ||||
| import datahub.emitter.mce_builder as builder | ||||
| from datahub.configuration.pydantic_field_deprecation import pydantic_field_deprecated | ||||
| from datahub.configuration.source_common import DatasetLineageProviderConfigBase | ||||
| from datahub.configuration.time_window_config import BaseTimeWindowConfig | ||||
| from datahub.configuration.validate_field_deprecation import pydantic_field_deprecated | ||||
| from datahub.emitter import mce_builder | ||||
| from datahub.emitter.mcp import MetadataChangeProposalWrapper | ||||
| from datahub.ingestion.api.decorators import ( | ||||
|  | ||||
| @ -7,8 +7,8 @@ import pydantic | ||||
| from pydantic import Field | ||||
| 
 | ||||
| from datahub.configuration.common import AllowDenyPattern, ConfigModel | ||||
| from datahub.configuration.pydantic_field_deprecation import pydantic_field_deprecated | ||||
| from datahub.configuration.source_common import DatasetSourceConfigMixin | ||||
| from datahub.configuration.validate_field_deprecation import pydantic_field_deprecated | ||||
| from datahub.ingestion.source.ge_profiling_config import GEProfilingConfig | ||||
| from datahub.ingestion.source.state.stale_entity_removal_handler import ( | ||||
|     StatefulStaleMetadataRemovalConfig, | ||||
|  | ||||
| @ -37,11 +37,11 @@ from datahub.configuration.common import ( | ||||
|     ConfigModel, | ||||
|     ConfigurationError, | ||||
| ) | ||||
| from datahub.configuration.pydantic_field_deprecation import pydantic_field_deprecated | ||||
| from datahub.configuration.source_common import ( | ||||
|     DatasetLineageProviderConfigBase, | ||||
|     DatasetSourceConfigMixin, | ||||
| ) | ||||
| from datahub.configuration.validate_field_deprecation import pydantic_field_deprecated | ||||
| from datahub.emitter.mcp import MetadataChangeProposalWrapper | ||||
| from datahub.emitter.mcp_builder import ( | ||||
|     ContainerKey, | ||||
|  | ||||
| @ -4,7 +4,7 @@ import pytest | ||||
| from pydantic import ValidationError | ||||
| 
 | ||||
| from datahub.configuration.common import ConfigModel | ||||
| from datahub.configuration.pydantic_field_deprecation import pydantic_field_deprecated | ||||
| from datahub.configuration.validate_field_deprecation import pydantic_field_deprecated | ||||
| from datahub.configuration.validate_field_removal import pydantic_removed_field | ||||
| from datahub.configuration.validate_field_rename import pydantic_renamed_field | ||||
| from datahub.utilities.global_warning_util import get_global_warnings | ||||
|  | ||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user
	 Harshal Sheth
						Harshal Sheth