mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-06-27 04:22:05 +00:00
* metadata dbt * fix: - default path to current directory - addional warning and exception handling for missing metadata config vars * test: add unit tests for DBT Ingestion CLI * refactor * PR review: - using Pydantic to parse and validate the openmetadata config in dbt's .yml - extended test-cases - giving user more configuration options for ingestion * py refactoring * add: dbt-auto ingest docs * Improvements: - using environement variables for loading sensitve variables - added docs for auto dbt-ingestion for dbt-core - more test cases * fix: - test case for reading JWT token inside the the method * refactor: py code formatting * refactor: py formatting * ingest-dbt docs updated * refined test cases * Chore: - sonar vulnerability issue review - using existing URL class for host validation --------- Co-authored-by: Mayur Singal <39544459+ulixius9@users.noreply.github.com>
This commit is contained in:
parent
cd96b5c7e0
commit
79c3d55128
9
.gitignore
vendored
9
.gitignore
vendored
@ -134,3 +134,12 @@ ingestion/tests/cli_e2e/**/*test.yaml
|
||||
|
||||
# Nox
|
||||
ingestion/.nox/
|
||||
|
||||
# Environment variables
|
||||
.env
|
||||
.env.local
|
||||
.env.*.local
|
||||
|
||||
# Temporary files
|
||||
*.tmp
|
||||
*.temp
|
@ -146,6 +146,7 @@ base_requirements = {
|
||||
VERSIONS["pydantic-settings"],
|
||||
VERSIONS["pymysql"],
|
||||
"python-dateutil>=2.8.1",
|
||||
"python-dotenv>=0.19.0", # For environment variable support in dbt ingestion
|
||||
"PyYAML~=6.0",
|
||||
"requests>=2.23",
|
||||
"requests-aws4auth~=1.1", # Only depends on requests as external package. Leaving as base.
|
||||
|
367
ingestion/src/metadata/cli/ingest_dbt.py
Normal file
367
ingestion/src/metadata/cli/ingest_dbt.py
Normal file
@ -0,0 +1,367 @@
|
||||
# Copyright 2025 Collate
|
||||
# Licensed under the Collate Community License, Version 1.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""
|
||||
DBT Artifacts Ingestion CLI module
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
import traceback
|
||||
from pathlib import Path
|
||||
from typing import Dict, List, Optional
|
||||
|
||||
import yaml
|
||||
from dotenv import load_dotenv
|
||||
from pydantic import BaseModel, Field, field_validator
|
||||
|
||||
from metadata.ingestion.ometa.credentials import URL
|
||||
from metadata.utils.logger import cli_logger
|
||||
from metadata.workflow.metadata import MetadataWorkflow
|
||||
|
||||
logger = cli_logger()
|
||||
|
||||
|
||||
class FilterPattern(BaseModel):
|
||||
"""Filter pattern model for database/schema/table filtering"""
|
||||
|
||||
includes: List[str] = Field(default=[".*"], description="Patterns to include")
|
||||
excludes: Optional[List[str]] = Field(
|
||||
default=None, description="Patterns to exclude"
|
||||
)
|
||||
|
||||
|
||||
class OpenMetadataDBTConfig(BaseModel):
|
||||
"""Pydantic model for OpenMetadata DBT configuration"""
|
||||
|
||||
# Required fields
|
||||
openmetadata_host_port: str = Field(
|
||||
..., description="OpenMetadata server host and port"
|
||||
)
|
||||
openmetadata_jwt_token: str = Field(..., description="JWT token for authentication")
|
||||
openmetadata_service_name: str = Field(
|
||||
..., description="Service name for the DBT service"
|
||||
)
|
||||
|
||||
# Optional DBT source configuration with defaults
|
||||
openmetadata_dbt_update_descriptions: bool = Field(
|
||||
default=True, description="Update model descriptions from DBT"
|
||||
)
|
||||
openmetadata_dbt_update_owners: bool = Field(
|
||||
default=True, description="Update model owners from DBT"
|
||||
)
|
||||
openmetadata_include_tags: bool = Field(
|
||||
default=True, description="Include DBT tags as metadata"
|
||||
)
|
||||
openmetadata_search_across_databases: bool = Field(
|
||||
default=False, description="Search across multiple databases"
|
||||
)
|
||||
openmetadata_dbt_classification_name: Optional[str] = Field(
|
||||
default=None, description="Custom classification name for DBT tags"
|
||||
)
|
||||
|
||||
# Filter patterns - standardized to dict format only
|
||||
openmetadata_database_filter_pattern: Optional[Dict[str, List[str]]] = Field(
|
||||
default=None, description="Database filter pattern with includes/excludes"
|
||||
)
|
||||
openmetadata_schema_filter_pattern: Optional[Dict[str, List[str]]] = Field(
|
||||
default=None, description="Schema filter pattern with includes/excludes"
|
||||
)
|
||||
openmetadata_table_filter_pattern: Optional[Dict[str, List[str]]] = Field(
|
||||
default=None, description="Table filter pattern with includes/excludes"
|
||||
)
|
||||
|
||||
@field_validator("openmetadata_host_port")
|
||||
@classmethod
|
||||
def validate_host_port(cls, v):
|
||||
"""Validate that host_port is a valid URL using the existing URL class"""
|
||||
try:
|
||||
# This will raise ValueError if not a valid http/https/ws/wss URL
|
||||
URL(v)
|
||||
return v
|
||||
except (ValueError, TypeError) as e:
|
||||
raise ValueError(
|
||||
f"Host port must be a valid URL starting with http:// or https://"
|
||||
)
|
||||
|
||||
def _get_filter_pattern(
|
||||
self, pattern_dict: Optional[Dict[str, List[str]]]
|
||||
) -> FilterPattern:
|
||||
"""Convert filter pattern dict to FilterPattern model or return default"""
|
||||
if pattern_dict:
|
||||
return FilterPattern(**pattern_dict)
|
||||
return FilterPattern()
|
||||
|
||||
@property
|
||||
def database_filter(self) -> FilterPattern:
|
||||
"""Get database filter pattern as FilterPattern model"""
|
||||
return self._get_filter_pattern(self.openmetadata_database_filter_pattern)
|
||||
|
||||
@property
|
||||
def schema_filter(self) -> FilterPattern:
|
||||
"""Get schema filter pattern as FilterPattern model"""
|
||||
return self._get_filter_pattern(self.openmetadata_schema_filter_pattern)
|
||||
|
||||
@property
|
||||
def table_filter(self) -> FilterPattern:
|
||||
"""Get table filter pattern as FilterPattern model"""
|
||||
return self._get_filter_pattern(self.openmetadata_table_filter_pattern)
|
||||
|
||||
def log_configuration(self):
|
||||
config = {
|
||||
"update_descriptions": self.openmetadata_dbt_update_descriptions,
|
||||
"update_owners": self.openmetadata_dbt_update_owners,
|
||||
"include_tags": self.openmetadata_include_tags,
|
||||
"search_across_databases": self.openmetadata_search_across_databases,
|
||||
"classification_name": self.openmetadata_dbt_classification_name,
|
||||
"database_filter": self.database_filter.model_dump(exclude_none=True),
|
||||
"schema_filter": self.schema_filter.model_dump(exclude_none=True),
|
||||
"table_filter": self.table_filter.model_dump(exclude_none=True),
|
||||
}
|
||||
logger.info("OpenMetadata DBT Config:\n%s", json.dumps(config, indent=2))
|
||||
|
||||
|
||||
def substitute_env_vars(content: str) -> str:
|
||||
"""
|
||||
Substitute environment variables in YAML content.
|
||||
|
||||
Supports:
|
||||
- ${VAR} - shell style substitution
|
||||
- {{ env_var("VAR") }} - dbt style without default
|
||||
- {{ env_var("VAR", "default") }} - dbt style with default
|
||||
|
||||
:param content: Raw YAML content string
|
||||
:return: Content with environment variables substituted
|
||||
"""
|
||||
|
||||
def replace_shell_vars(match):
|
||||
"""Replace ${VAR} pattern"""
|
||||
var_name = match.group(1)
|
||||
env_value = os.environ.get(var_name)
|
||||
if env_value is None:
|
||||
raise ValueError(f"Environment variable '{var_name}' is not set")
|
||||
return env_value
|
||||
|
||||
def replace_dbt_env_vars(match):
|
||||
"""Replace {{ env_var("VAR") }} and {{ env_var("VAR", "default") }} patterns"""
|
||||
var_name = match.group(1)
|
||||
default_value = match.group(2) # Will be None if no default provided
|
||||
|
||||
env_value = os.environ.get(var_name)
|
||||
if env_value is None:
|
||||
if default_value is not None:
|
||||
# Remove quotes from default value
|
||||
return default_value.strip("\"'")
|
||||
raise ValueError(
|
||||
f"Environment variable '{var_name}' is not set and no default provided"
|
||||
)
|
||||
return env_value
|
||||
|
||||
# Pattern for ${VAR}
|
||||
shell_pattern = re.compile(r"\$\{([^}]+)\}")
|
||||
|
||||
# Pattern for {{ env_var("VAR") }} and {{ env_var("VAR", "default") }}
|
||||
# This handles both single and double quotes around variable names and defaults
|
||||
function_pattern = re.compile(
|
||||
r'\{\{\s*env_var\(\s*["\']([\w-]+)["\']\s*(?:,\s*["\']([\w\s-]*)["\']\s*)?\)\s*\}\}'
|
||||
)
|
||||
|
||||
# Apply substitutions
|
||||
content = shell_pattern.sub(replace_shell_vars, content)
|
||||
content = function_pattern.sub(replace_dbt_env_vars, content)
|
||||
|
||||
return content
|
||||
|
||||
|
||||
def find_dbt_project_config(dbt_project_path: Path) -> Dict:
|
||||
"""
|
||||
Find and load dbt_project.yml configuration with environment variable substitution
|
||||
|
||||
:param dbt_project_path: Path to the dbt project directory
|
||||
:return: Parsed dbt project configuration
|
||||
"""
|
||||
# Load environment variables from .env file if present
|
||||
load_dotenv(dbt_project_path / ".env", override=False)
|
||||
load_dotenv(override=False) # fallback to current dir
|
||||
|
||||
dbt_project_file = dbt_project_path / "dbt_project.yml"
|
||||
|
||||
if not dbt_project_file.exists():
|
||||
raise FileNotFoundError(f"dbt_project.yml not found in {dbt_project_path}")
|
||||
|
||||
try:
|
||||
with open(dbt_project_file, "r", encoding="utf-8") as file:
|
||||
content = file.read()
|
||||
|
||||
# Substitute environment variables before parsing YAML
|
||||
processed_content = substitute_env_vars(content)
|
||||
return yaml.safe_load(processed_content)
|
||||
|
||||
except Exception as exc:
|
||||
raise ValueError(f"Failed to parse dbt_project.yml: {exc}")
|
||||
|
||||
|
||||
def extract_openmetadata_config(dbt_config: Dict) -> OpenMetadataDBTConfig:
|
||||
"""
|
||||
Extract and validate OpenMetadata configuration from dbt project config using Pydantic
|
||||
|
||||
:param dbt_config: Parsed dbt project configuration
|
||||
:return: Validated OpenMetadata configuration model
|
||||
"""
|
||||
vars_config = dbt_config.get("vars", {})
|
||||
try:
|
||||
# Create and validate the configuration using Pydantic
|
||||
om_config = OpenMetadataDBTConfig(**vars_config)
|
||||
om_config.log_configuration()
|
||||
return om_config
|
||||
|
||||
except Exception as exc:
|
||||
# Provide helpful error message for missing required fields
|
||||
error_msg = str(exc)
|
||||
if "Field required" in error_msg:
|
||||
raise ValueError(
|
||||
f"Required OpenMetadata configuration not found in dbt_project.yml vars.\n"
|
||||
f"Error: {error_msg}\n"
|
||||
f"Please add the following to your dbt_project.yml:\n"
|
||||
f"vars:\n"
|
||||
f" openmetadata_jwt_token: 'your-jwt-token'\n"
|
||||
f" openmetadata_host_port: 'your-host-port (e.g. http://openmetadata-server:8585/api)'\n"
|
||||
f" openmetadata_service_name: 'your-service-name'"
|
||||
)
|
||||
raise ValueError(f"Invalid OpenMetadata configuration: {error_msg}")
|
||||
|
||||
|
||||
def create_dbt_workflow_config(
|
||||
dbt_project_path: Path, om_config: OpenMetadataDBTConfig
|
||||
) -> Dict:
|
||||
"""
|
||||
Create OpenMetadata workflow configuration for dbt artifacts ingestion
|
||||
|
||||
:param dbt_project_path: Path to the dbt project directory
|
||||
:param om_config: Validated OpenMetadata configuration model
|
||||
:return: Workflow configuration
|
||||
"""
|
||||
target_dir = dbt_project_path / "target"
|
||||
|
||||
# Check for required artifacts
|
||||
manifest_path = target_dir / "manifest.json"
|
||||
if not manifest_path.exists():
|
||||
raise FileNotFoundError(
|
||||
f"manifest.json not found in {target_dir}. Please run 'dbt compile' or 'dbt run' first."
|
||||
)
|
||||
|
||||
# Build dbt config source
|
||||
dbt_config_source = {
|
||||
"dbtManifestFilePath": str(manifest_path),
|
||||
"dbtConfigType": "local",
|
||||
}
|
||||
|
||||
# Add optional files if they exist
|
||||
catalog_path = target_dir / "catalog.json"
|
||||
if catalog_path.exists():
|
||||
dbt_config_source["dbtCatalogFilePath"] = str(catalog_path)
|
||||
|
||||
run_results_path = target_dir / "run_results.json"
|
||||
if run_results_path.exists():
|
||||
dbt_config_source["dbtRunResultsFilePath"] = str(run_results_path)
|
||||
|
||||
# Build source config with user-configurable options
|
||||
source_config = {
|
||||
"type": "DBT",
|
||||
"dbtConfigSource": dbt_config_source,
|
||||
"dbtUpdateDescriptions": om_config.openmetadata_dbt_update_descriptions,
|
||||
"dbtUpdateOwners": om_config.openmetadata_dbt_update_owners,
|
||||
"includeTags": om_config.openmetadata_include_tags,
|
||||
"searchAcrossDatabases": om_config.openmetadata_search_across_databases,
|
||||
"databaseFilterPattern": om_config.database_filter.model_dump(
|
||||
exclude_none=True
|
||||
),
|
||||
"schemaFilterPattern": om_config.schema_filter.model_dump(exclude_none=True),
|
||||
"tableFilterPattern": om_config.table_filter.model_dump(exclude_none=True),
|
||||
}
|
||||
|
||||
# Add optional classification name if provided
|
||||
if om_config.openmetadata_dbt_classification_name:
|
||||
source_config[
|
||||
"dbtClassificationName"
|
||||
] = om_config.openmetadata_dbt_classification_name
|
||||
|
||||
# Create workflow configuration
|
||||
config = {
|
||||
"source": {
|
||||
"type": "dbt",
|
||||
"serviceName": om_config.openmetadata_service_name,
|
||||
"sourceConfig": {"config": source_config},
|
||||
},
|
||||
"sink": {"type": "metadata-rest", "config": {}},
|
||||
"workflowConfig": {
|
||||
"loggerLevel": "INFO",
|
||||
"openMetadataServerConfig": {
|
||||
"hostPort": om_config.openmetadata_host_port,
|
||||
"authProvider": "openmetadata",
|
||||
"securityConfig": {"jwtToken": om_config.openmetadata_jwt_token},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
return config
|
||||
|
||||
|
||||
def run_ingest_dbt(dbt_project_path: Path) -> None:
|
||||
"""
|
||||
Run the dbt artifacts ingestion workflow from a dbt project path
|
||||
|
||||
:param dbt_project_path: Path to the dbt project directory
|
||||
"""
|
||||
try:
|
||||
# Resolve to absolute path to handle relative paths like "."
|
||||
dbt_project_path = dbt_project_path.resolve()
|
||||
|
||||
logger.info(f"Starting DBT artifacts ingestion from: {dbt_project_path}")
|
||||
|
||||
if not dbt_project_path.exists():
|
||||
raise FileNotFoundError(
|
||||
f"DBT project path does not exist: {dbt_project_path}"
|
||||
)
|
||||
|
||||
if not dbt_project_path.is_dir():
|
||||
raise NotADirectoryError(
|
||||
f"DBT project path is not a directory: {dbt_project_path}"
|
||||
)
|
||||
|
||||
logger.info("Loading dbt project configuration...")
|
||||
dbt_config = find_dbt_project_config(dbt_project_path)
|
||||
|
||||
logger.info("Extracting OpenMetadata configuration...")
|
||||
om_config = extract_openmetadata_config(dbt_config)
|
||||
|
||||
logger.info(f"Publishing to OpenMetadata: {om_config.openmetadata_host_port}")
|
||||
logger.info(f"Service name: {om_config.openmetadata_service_name}")
|
||||
|
||||
logger.info("Creating workflow configuration...")
|
||||
workflow_config = create_dbt_workflow_config(dbt_project_path, om_config)
|
||||
|
||||
# Create and execute the MetadataWorkflow (reusing existing infrastructure)
|
||||
logger.info("Starting OpenMetadata ingestion workflow...")
|
||||
workflow = MetadataWorkflow.create(workflow_config)
|
||||
workflow.execute()
|
||||
workflow.raise_from_status()
|
||||
workflow.print_status()
|
||||
workflow.stop()
|
||||
|
||||
logger.info("DBT artifacts ingestion completed successfully")
|
||||
|
||||
except Exception as exc:
|
||||
logger.error(f"Error during DBT artifacts ingestion: {exc}")
|
||||
logger.debug(traceback.format_exc())
|
||||
sys.exit(1)
|
@ -25,6 +25,7 @@ from metadata.cli.app import run_app
|
||||
from metadata.cli.classify import run_classification
|
||||
from metadata.cli.dataquality import run_test
|
||||
from metadata.cli.ingest import run_ingest
|
||||
from metadata.cli.ingest_dbt import run_ingest_dbt
|
||||
from metadata.cli.lineage import run_lineage
|
||||
from metadata.cli.profile import run_profiler
|
||||
from metadata.cli.usage import run_usage
|
||||
@ -35,6 +36,7 @@ logger = cli_logger()
|
||||
|
||||
class MetadataCommands(Enum):
|
||||
INGEST = "ingest"
|
||||
INGEST_DBT = "ingest-dbt"
|
||||
USAGE = "usage"
|
||||
PROFILE = "profile"
|
||||
TEST = "test"
|
||||
@ -46,6 +48,7 @@ class MetadataCommands(Enum):
|
||||
|
||||
RUN_PATH_METHODS = {
|
||||
MetadataCommands.INGEST.value: run_ingest,
|
||||
MetadataCommands.INGEST_DBT.value: run_ingest_dbt,
|
||||
MetadataCommands.USAGE.value: run_usage,
|
||||
MetadataCommands.LINEAGE.value: run_lineage,
|
||||
MetadataCommands.PROFILE.value: run_profiler,
|
||||
@ -65,6 +68,20 @@ def create_common_config_parser_args(parser: argparse.ArgumentParser):
|
||||
)
|
||||
|
||||
|
||||
def create_dbt_parser_args(parser: argparse.ArgumentParser):
|
||||
"""
|
||||
Additional Parser Arguments for DBT Ingestion
|
||||
"""
|
||||
parser.add_argument(
|
||||
"-c",
|
||||
"--dbt-project-path",
|
||||
help="path to the dbt project directory (default: current directory)",
|
||||
type=Path,
|
||||
default=Path("."),
|
||||
required=False,
|
||||
)
|
||||
|
||||
|
||||
def webhook_args(parser: argparse.ArgumentParser):
|
||||
"""
|
||||
Additional Parser Arguments for Webhook
|
||||
@ -101,6 +118,11 @@ def get_parser(args: Optional[List[str]] = None):
|
||||
create_common_config_parser_args(
|
||||
sub_parser.add_parser(MetadataCommands.INGEST.value, help="Ingestion Workflow")
|
||||
)
|
||||
create_dbt_parser_args(
|
||||
sub_parser.add_parser(
|
||||
MetadataCommands.INGEST_DBT.value, help="DBT Artifacts Ingestion"
|
||||
)
|
||||
)
|
||||
create_common_config_parser_args(
|
||||
sub_parser.add_parser(MetadataCommands.LINEAGE.value, help="Lineage Workflow")
|
||||
)
|
||||
@ -152,9 +174,14 @@ def metadata(args: Optional[List[str]] = None):
|
||||
contains_args = vars(get_parser(args))
|
||||
metadata_workflow = contains_args.get("command")
|
||||
config_file: Optional[Path] = contains_args.get("config")
|
||||
dbt_project_path: Optional[Path] = contains_args.get("dbt_project_path")
|
||||
|
||||
path = None
|
||||
if config_file:
|
||||
path = config_file.expanduser()
|
||||
elif dbt_project_path:
|
||||
path = dbt_project_path.expanduser()
|
||||
|
||||
if contains_args.get("debug"):
|
||||
set_loggers_level(logging.DEBUG)
|
||||
else:
|
||||
|
62
ingestion/tests/unit/resources/dbt_ingest/dbt_project.yml
Normal file
62
ingestion/tests/unit/resources/dbt_ingest/dbt_project.yml
Normal file
@ -0,0 +1,62 @@
|
||||
# Name your project! Project names should contain only lowercase characters
|
||||
# and underscores. A good package name should reflect your organization's
|
||||
# name or the intended use of these models
|
||||
name: 'jaffle_shop'
|
||||
version: '1.0.0'
|
||||
config-version: 2
|
||||
|
||||
# This setting configures which "profile" dbt uses for this project.
|
||||
profile: 'jaffle_shop'
|
||||
|
||||
# These configurations specify where dbt should look for different types of files.
|
||||
# The `model-paths` config, for example, states that models in this project can be
|
||||
# found in the "models/" directory. You probably won't need to change these!
|
||||
model-paths: ["models"]
|
||||
analysis-paths: ["analyses"]
|
||||
test-paths: ["tests"]
|
||||
seed-paths: ["seeds"]
|
||||
macro-paths: ["macros"]
|
||||
snapshot-paths: ["snapshots"]
|
||||
|
||||
clean-targets: # directories to be removed by `dbt clean`
|
||||
- "target"
|
||||
- "dbt_packages"
|
||||
|
||||
|
||||
# Configuring models
|
||||
# Full documentation: https://docs.getdbt.com/docs/configuring-models
|
||||
|
||||
# In this example config, we tell dbt to build all models in the example/
|
||||
# directory as views. These settings can be overridden in the individual model
|
||||
# files using the `{{ config(...) }}` macro.
|
||||
models:
|
||||
jaffle_shop:
|
||||
example:
|
||||
+materialized: table
|
||||
|
||||
|
||||
vars:
|
||||
# Required OpenMetadata configuration for metadata ingest-dbt command
|
||||
openmetadata_host_port: "http://test-server:port/endpoint"
|
||||
openmetadata_jwt_token: ${OPENMETADATA_JWT_TOKEN}
|
||||
openmetadata_service_name: "test_service"
|
||||
|
||||
# Optional OpenMetadata DBT source configuration
|
||||
# DBT behavior settings
|
||||
openmetadata_dbt_update_descriptions: true
|
||||
openmetadata_dbt_update_owners: false
|
||||
openmetadata_dbt_classification_name: "dbtTags"
|
||||
|
||||
# Filter patterns - standardized dict format with includes/excludes
|
||||
openmetadata_database_filter_pattern:
|
||||
includes: ["dbt_test_*"]
|
||||
excludes: ["temp_*", "test_*"]
|
||||
|
||||
openmetadata_table_filter_pattern:
|
||||
includes: [".*"]
|
||||
excludes: ["temp_.*", "tmp_.*"]
|
||||
|
||||
quoting:
|
||||
database: false
|
||||
schema: false
|
||||
identifier: false
|
646
ingestion/tests/unit/test_dbt_ingest.py
Normal file
646
ingestion/tests/unit/test_dbt_ingest.py
Normal file
@ -0,0 +1,646 @@
|
||||
"""
|
||||
Test DBT Ingestion CLI module
|
||||
"""
|
||||
|
||||
import os
|
||||
import tempfile
|
||||
import unittest
|
||||
from pathlib import Path
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
from metadata.cli.ingest_dbt import (
|
||||
FilterPattern,
|
||||
OpenMetadataDBTConfig,
|
||||
create_dbt_workflow_config,
|
||||
extract_openmetadata_config,
|
||||
find_dbt_project_config,
|
||||
run_ingest_dbt,
|
||||
substitute_env_vars,
|
||||
)
|
||||
|
||||
MOCK_ENVIRONMENT_VARIABLES = {
|
||||
"OPENMETADATA_HOST_PORT": "http://test-server:port/endpoint",
|
||||
"OPENMETADATA_JWT_TOKEN": "test-jwt-token",
|
||||
"OPENMETADATA_SERVICE_NAME": "test_service",
|
||||
}
|
||||
|
||||
|
||||
class DbtIngestCLIUnitTest(unittest.TestCase):
|
||||
"""Test cases for DBT Ingestion CLI functionality"""
|
||||
|
||||
def setUp(self):
|
||||
"""Set up test fixtures"""
|
||||
self.test_resources_path = Path(__file__).parent / "resources" / "dbt_ingest"
|
||||
for var, value in MOCK_ENVIRONMENT_VARIABLES.items():
|
||||
os.environ[var] = value
|
||||
|
||||
def tearDown(self):
|
||||
"""Clean up after tests"""
|
||||
for var in MOCK_ENVIRONMENT_VARIABLES:
|
||||
os.environ.pop(var, None)
|
||||
|
||||
def test_filter_pattern_model(self):
|
||||
"""Test FilterPattern Pydantic model"""
|
||||
# Test with defaults
|
||||
pattern = FilterPattern()
|
||||
self.assertEqual(pattern.includes, [".*"])
|
||||
self.assertIsNone(pattern.excludes)
|
||||
|
||||
# Test with custom values
|
||||
pattern = FilterPattern(includes=["table1"], excludes=["temp_*"])
|
||||
self.assertEqual(pattern.includes, ["table1"])
|
||||
self.assertEqual(pattern.excludes, ["temp_*"])
|
||||
|
||||
def test_environment_variable_substitution(self):
|
||||
"""Test all environment variable substitution patterns and integration"""
|
||||
|
||||
# Test all three substitution patterns together
|
||||
content = """
|
||||
name: 'test_project'
|
||||
version: '1.0.0'
|
||||
vars:
|
||||
openmetadata_host_port: "${OPENMETADATA_HOST_PORT}"
|
||||
openmetadata_jwt_token: "{{ env_var('OPENMETADATA_JWT_TOKEN') }}"
|
||||
openmetadata_service_name: '{{ env_var("OPENMETADATA_SERVICE_NAME") }}'
|
||||
fallback_setting: "{{ env_var('UNSET_VAR', 'default-value') }}"
|
||||
"""
|
||||
|
||||
# Test substitution function directly
|
||||
result = substitute_env_vars(content)
|
||||
self.assertIn("http://test-server:port/endpoint", result)
|
||||
self.assertIn("test-jwt-token", result)
|
||||
self.assertIn("test_service", result)
|
||||
self.assertIn("default-value", result)
|
||||
self.assertNotIn("${OPENMETADATA_HOST_PORT", result)
|
||||
self.assertNotIn("env_var('OPENMETADATA_JWT_TOKEN')", result)
|
||||
self.assertNotIn("env_var('OPENMETADATA_SERVICE_NAME')", result)
|
||||
self.assertNotIn("env_var('UNSET_VAR', 'default-value')", result)
|
||||
|
||||
# Test error cases
|
||||
error_content = 'vars:\n host: "${MISSING_VAR}"'
|
||||
with self.assertRaises(ValueError) as context:
|
||||
substitute_env_vars(error_content)
|
||||
self.assertIn("MISSING_VAR", str(context.exception))
|
||||
|
||||
error_content2 = "vars:\n host: \"{{ env_var('MISSING_DBT_VAR') }}\""
|
||||
with self.assertRaises(ValueError) as context:
|
||||
substitute_env_vars(error_content2)
|
||||
self.assertIn("MISSING_DBT_VAR", str(context.exception))
|
||||
|
||||
def test_dotenv_file_support(self):
|
||||
"""Test that .env files are properly loaded"""
|
||||
with tempfile.TemporaryDirectory() as temp_dir:
|
||||
temp_path = Path(temp_dir)
|
||||
|
||||
# Create .env file
|
||||
env_file = temp_path / ".env"
|
||||
env_file.write_text(
|
||||
"""
|
||||
DOTENV_HOST=http://dotenv-host:8585/endpoint
|
||||
DOTENV_TOKEN=dotenv-jwt-token
|
||||
DOTENV_SERVICE=dotenv-service
|
||||
"""
|
||||
)
|
||||
|
||||
# Create dbt_project.yml that uses .env variables
|
||||
dbt_project_file = temp_path / "dbt_project.yml"
|
||||
dbt_project_content = """
|
||||
name: 'test_dotenv_project'
|
||||
version: '1.0.0'
|
||||
vars:
|
||||
openmetadata_host_port: "${DOTENV_HOST}"
|
||||
openmetadata_jwt_token: "{{ env_var('DOTENV_TOKEN') }}"
|
||||
openmetadata_service_name: "{{ env_var('DOTENV_SERVICE') }}"
|
||||
"""
|
||||
dbt_project_file.write_text(dbt_project_content)
|
||||
|
||||
# Load and validate the configuration
|
||||
config = find_dbt_project_config(temp_path)
|
||||
vars_section = config["vars"]
|
||||
|
||||
self.assertEqual(
|
||||
vars_section["openmetadata_host_port"],
|
||||
"http://dotenv-host:8585/endpoint",
|
||||
)
|
||||
self.assertEqual(vars_section["openmetadata_jwt_token"], "dotenv-jwt-token")
|
||||
self.assertEqual(
|
||||
vars_section["openmetadata_service_name"], "dotenv-service"
|
||||
)
|
||||
|
||||
# Test OpenMetadata config extraction
|
||||
om_config = extract_openmetadata_config(config)
|
||||
self.assertEqual(
|
||||
om_config.openmetadata_host_port, "http://dotenv-host:8585/endpoint"
|
||||
)
|
||||
self.assertEqual(om_config.openmetadata_jwt_token, "dotenv-jwt-token")
|
||||
self.assertEqual(om_config.openmetadata_service_name, "dotenv-service")
|
||||
|
||||
def test_dbt_project_config_vars_validation(self):
|
||||
"""Test dbt_project.yml vars section validation and structure"""
|
||||
# Test successful loading and vars validation
|
||||
config = find_dbt_project_config(self.test_resources_path)
|
||||
|
||||
# Validate basic structure
|
||||
self.assertIsInstance(config, dict)
|
||||
self.assertEqual(config["name"], "jaffle_shop")
|
||||
self.assertEqual(config["version"], "1.0.0")
|
||||
self.assertIn("vars", config)
|
||||
|
||||
# Validate vars section structure and required OpenMetadata variables
|
||||
vars_section = config["vars"]
|
||||
self.assertIsInstance(vars_section, dict)
|
||||
|
||||
# Validate all required OpenMetadata variables exist
|
||||
required_om_vars = [
|
||||
"openmetadata_host_port",
|
||||
"openmetadata_jwt_token",
|
||||
"openmetadata_service_name",
|
||||
]
|
||||
|
||||
for var_name in required_om_vars:
|
||||
self.assertIn(
|
||||
var_name, vars_section, f"Missing required variable: {var_name}"
|
||||
)
|
||||
self.assertIsNotNone(
|
||||
vars_section[var_name], f"Variable {var_name} should not be None"
|
||||
)
|
||||
self.assertNotEqual(
|
||||
vars_section[var_name].strip(),
|
||||
"",
|
||||
f"Variable {var_name} should not be empty",
|
||||
)
|
||||
|
||||
# Validate specific values match expected test configuration
|
||||
self.assertEqual(
|
||||
vars_section["openmetadata_host_port"], "http://test-server:port/endpoint"
|
||||
)
|
||||
# Get the expected JWT token from environment variable (same as what gets substituted)
|
||||
expected_jwt_token = os.environ.get("OPENMETADATA_JWT_TOKEN")
|
||||
self.assertEqual(vars_section["openmetadata_jwt_token"], expected_jwt_token)
|
||||
self.assertEqual(vars_section["openmetadata_service_name"], "test_service")
|
||||
|
||||
# Test file not found error
|
||||
with tempfile.TemporaryDirectory() as temp_dir:
|
||||
with self.assertRaises(FileNotFoundError) as context:
|
||||
find_dbt_project_config(Path(temp_dir))
|
||||
self.assertIn("dbt_project.yml not found", str(context.exception))
|
||||
|
||||
def test_openmetadata_config_extraction_with_defaults(self):
|
||||
"""Test OpenMetadata configuration extraction with default values"""
|
||||
# Test with only required variables (should use defaults for optional ones)
|
||||
minimal_config = {
|
||||
"vars": {
|
||||
"openmetadata_host_port": "http://test-server:port/endpoint",
|
||||
"openmetadata_jwt_token": "test-jwt-token",
|
||||
"openmetadata_service_name": "test_service",
|
||||
}
|
||||
}
|
||||
om_config = extract_openmetadata_config(minimal_config)
|
||||
|
||||
# Validate required config
|
||||
self.assertIsInstance(om_config, OpenMetadataDBTConfig)
|
||||
self.assertEqual(
|
||||
om_config.openmetadata_host_port, "http://test-server:port/endpoint"
|
||||
)
|
||||
self.assertEqual(om_config.openmetadata_jwt_token, "test-jwt-token")
|
||||
self.assertEqual(om_config.openmetadata_service_name, "test_service")
|
||||
|
||||
# Validate defaults for optional config
|
||||
self.assertTrue(om_config.openmetadata_dbt_update_descriptions)
|
||||
self.assertTrue(om_config.openmetadata_dbt_update_owners)
|
||||
self.assertTrue(om_config.openmetadata_include_tags)
|
||||
self.assertFalse(om_config.openmetadata_search_across_databases)
|
||||
self.assertIsNone(om_config.openmetadata_dbt_classification_name)
|
||||
|
||||
# Validate default filter patterns (should be defaults when not specified)
|
||||
self.assertEqual(om_config.database_filter.includes, [".*"])
|
||||
self.assertEqual(om_config.schema_filter.includes, [".*"])
|
||||
self.assertEqual(om_config.table_filter.includes, [".*"])
|
||||
|
||||
def test_openmetadata_config_extraction_with_custom_values(self):
|
||||
"""Test OpenMetadata configuration extraction with custom values"""
|
||||
# Test with custom optional variables using dict format only
|
||||
custom_config = {
|
||||
"vars": {
|
||||
"openmetadata_host_port": "http://test-server:port/endpoint",
|
||||
"openmetadata_jwt_token": "test-jwt-token",
|
||||
"openmetadata_service_name": "test_service",
|
||||
"openmetadata_dbt_update_descriptions": False,
|
||||
"openmetadata_dbt_update_owners": False,
|
||||
"openmetadata_include_tags": False,
|
||||
"openmetadata_search_across_databases": True,
|
||||
"openmetadata_dbt_classification_name": "custom_tags",
|
||||
"openmetadata_database_filter_pattern": {
|
||||
"includes": ["prod_*", "staging_*"]
|
||||
},
|
||||
"openmetadata_schema_filter_pattern": {
|
||||
"includes": ["public"],
|
||||
"excludes": ["temp_*"],
|
||||
},
|
||||
"openmetadata_table_filter_pattern": {"includes": ["fact_*"]},
|
||||
}
|
||||
}
|
||||
om_config = extract_openmetadata_config(custom_config)
|
||||
|
||||
# Validate custom config values
|
||||
self.assertFalse(om_config.openmetadata_dbt_update_descriptions)
|
||||
self.assertFalse(om_config.openmetadata_dbt_update_owners)
|
||||
self.assertFalse(om_config.openmetadata_include_tags)
|
||||
self.assertTrue(om_config.openmetadata_search_across_databases)
|
||||
self.assertEqual(om_config.openmetadata_dbt_classification_name, "custom_tags")
|
||||
|
||||
# Validate custom filter patterns
|
||||
self.assertEqual(om_config.database_filter.includes, ["prod_*", "staging_*"])
|
||||
self.assertEqual(om_config.schema_filter.includes, ["public"])
|
||||
self.assertEqual(om_config.schema_filter.excludes, ["temp_*"])
|
||||
self.assertEqual(om_config.table_filter.includes, ["fact_*"])
|
||||
|
||||
def test_openmetadata_config_validation_errors(self):
|
||||
"""Test Pydantic validation errors for invalid configurations"""
|
||||
# Test missing required field
|
||||
with self.assertRaises(ValueError) as context:
|
||||
extract_openmetadata_config(
|
||||
{"vars": {"openmetadata_host_port": "http://test"}}
|
||||
)
|
||||
self.assertIn("Field required", str(context.exception))
|
||||
|
||||
def test_url_validation_comprehensive(self):
|
||||
"""Test comprehensive URL validation scenarios including valid and invalid URLs"""
|
||||
|
||||
# Test valid URLs - should all pass (based on URL class behavior)
|
||||
valid_urls = [
|
||||
"http://localhost:8585",
|
||||
"https://openmetadata.example.com:8585",
|
||||
"http://192.168.1.100:8585/api",
|
||||
"https://my-openmetadata-server.com/api",
|
||||
"ws://localhost:8585",
|
||||
"wss://secure-websocket.example.com:8585",
|
||||
"http://127.0.0.1:8585",
|
||||
"https://openmetadata-prod.company.com:443/api/v1",
|
||||
# URL class accepts these edge cases
|
||||
"http://",
|
||||
"https://",
|
||||
"http:///",
|
||||
"ws://",
|
||||
"wss://",
|
||||
"http://localhost:8585 with spaces", # URL class is permissive
|
||||
"http://local<host>:8585", # URL class allows special chars
|
||||
"http://localhost:8585\nwith\nnewlines", # URL class even accepts newlines
|
||||
]
|
||||
|
||||
print(f"\nTesting {len(valid_urls)} valid URLs:")
|
||||
for url in valid_urls:
|
||||
with self.subTest(url=url):
|
||||
try:
|
||||
config = OpenMetadataDBTConfig(
|
||||
openmetadata_host_port=url,
|
||||
openmetadata_jwt_token="test-jwt-token",
|
||||
openmetadata_service_name="test_service",
|
||||
)
|
||||
print(f"✅ {url!r} - VALID")
|
||||
except Exception as e:
|
||||
self.fail(f"Valid URL {url!r} was rejected: {e}")
|
||||
|
||||
# Test invalid URLs - should all fail based on URL class behavior
|
||||
invalid_urls = [
|
||||
# Missing protocol entirely
|
||||
"localhost:8585",
|
||||
"openmetadata.example.com:8585",
|
||||
"192.168.1.100:8585",
|
||||
# Invalid protocols (not http*, https*, ws*, wss*)
|
||||
"ftp://localhost:8585",
|
||||
"file:///path/to/file",
|
||||
"sftp://server.com:22",
|
||||
"ssh://server.com:22",
|
||||
"tcp://localhost:8585",
|
||||
"smtp://mail.server.com:25",
|
||||
"mysql://localhost:3306/db",
|
||||
# Malformed URLs
|
||||
"invalid-url",
|
||||
"not_a_url_at_all",
|
||||
"://localhost:8585", # missing protocol
|
||||
"htp://localhost:8585", # typo in protocol
|
||||
"http:/localhost:8585", # missing slash
|
||||
# Empty and whitespace
|
||||
"",
|
||||
" ",
|
||||
"\n",
|
||||
"\t",
|
||||
# Completely invalid formats
|
||||
"just some random text",
|
||||
"12345",
|
||||
]
|
||||
|
||||
print(f"\nTesting {len(invalid_urls)} invalid URLs:")
|
||||
for url in invalid_urls:
|
||||
with self.subTest(url=url):
|
||||
with self.assertRaises(
|
||||
ValueError, msg=f"Invalid URL {repr(url)} should have been rejected"
|
||||
):
|
||||
OpenMetadataDBTConfig(
|
||||
openmetadata_host_port=url,
|
||||
openmetadata_jwt_token="test-jwt-token",
|
||||
openmetadata_service_name="test_service",
|
||||
)
|
||||
print(f"✅ {repr(url)} - CORRECTLY REJECTED")
|
||||
|
||||
# Test edge cases with None and non-string types
|
||||
edge_cases = [
|
||||
None,
|
||||
123,
|
||||
[],
|
||||
{},
|
||||
True,
|
||||
False,
|
||||
]
|
||||
|
||||
print(f"\nTesting {len(edge_cases)} edge cases:")
|
||||
for case in edge_cases:
|
||||
with self.subTest(case=case):
|
||||
with self.assertRaises(
|
||||
(ValueError, TypeError),
|
||||
msg=f"Edge case {repr(case)} should have been rejected",
|
||||
):
|
||||
OpenMetadataDBTConfig(
|
||||
openmetadata_host_port=case,
|
||||
openmetadata_jwt_token="test-jwt-token",
|
||||
openmetadata_service_name="test_service",
|
||||
)
|
||||
print(f"✅ {repr(case)} - CORRECTLY REJECTED")
|
||||
|
||||
def test_dbt_project_yml_vars_format_validation(self):
|
||||
"""Test that dbt_project.yml vars follow correct format and naming convention"""
|
||||
config = find_dbt_project_config(self.test_resources_path)
|
||||
vars_section = config["vars"]
|
||||
|
||||
# Test that we only use standard OpenMetadata naming
|
||||
standard_vars = [
|
||||
var for var in vars_section.keys() if var.startswith("openmetadata_")
|
||||
]
|
||||
self.assertGreaterEqual(
|
||||
len(standard_vars),
|
||||
3,
|
||||
"Should have at least 3 required OpenMetadata variables",
|
||||
)
|
||||
|
||||
# Test that the configuration can be successfully parsed
|
||||
om_config = extract_openmetadata_config(config)
|
||||
self.assertIsInstance(om_config, OpenMetadataDBTConfig)
|
||||
|
||||
# Validate URL format
|
||||
self.assertTrue(
|
||||
om_config.openmetadata_host_port.startswith("http://")
|
||||
or om_config.openmetadata_host_port.startswith("https://"),
|
||||
"Host port should be a valid URL",
|
||||
)
|
||||
|
||||
# Validate JWT token format (should be non-empty string)
|
||||
self.assertIsInstance(
|
||||
om_config.openmetadata_jwt_token, str, "JWT token should be a string"
|
||||
)
|
||||
self.assertGreater(
|
||||
len(om_config.openmetadata_jwt_token), 0, "JWT token should not be empty"
|
||||
)
|
||||
|
||||
# Validate service name format
|
||||
self.assertIsInstance(
|
||||
om_config.openmetadata_service_name, str, "Service name should be a string"
|
||||
)
|
||||
self.assertGreater(
|
||||
len(om_config.openmetadata_service_name),
|
||||
0,
|
||||
"Service name should not be empty",
|
||||
)
|
||||
|
||||
def test_workflow_config_creation_with_custom_options(self):
|
||||
"""Test workflow configuration creation with custom DBT options"""
|
||||
with tempfile.TemporaryDirectory() as temp_dir:
|
||||
temp_path = Path(temp_dir)
|
||||
target_dir = temp_path / "target"
|
||||
target_dir.mkdir()
|
||||
|
||||
# Create required manifest.json file
|
||||
manifest_file = target_dir / "manifest.json"
|
||||
manifest_file.write_text('{"metadata": {"dbt_schema_version": "v1"}}')
|
||||
|
||||
# Test with custom configuration using dict format only
|
||||
custom_om_config = OpenMetadataDBTConfig(
|
||||
openmetadata_host_port="http://test-server:port/endpoint",
|
||||
openmetadata_jwt_token="test-jwt-token",
|
||||
openmetadata_service_name="test_service",
|
||||
openmetadata_dbt_update_descriptions=False,
|
||||
openmetadata_dbt_update_owners=False,
|
||||
openmetadata_include_tags=False,
|
||||
openmetadata_search_across_databases=True,
|
||||
openmetadata_dbt_classification_name="custom_tags",
|
||||
openmetadata_database_filter_pattern={"includes": ["prod_*"]},
|
||||
openmetadata_schema_filter_pattern={
|
||||
"includes": ["public"],
|
||||
"excludes": ["temp_*"],
|
||||
},
|
||||
openmetadata_table_filter_pattern={"includes": ["fact_*"]},
|
||||
)
|
||||
|
||||
config = create_dbt_workflow_config(temp_path, custom_om_config)
|
||||
|
||||
# Validate structure
|
||||
self.assertIn("source", config)
|
||||
self.assertIn("sink", config)
|
||||
self.assertIn("workflowConfig", config)
|
||||
|
||||
# Validate custom source config values
|
||||
source_config = config["source"]["sourceConfig"]["config"]
|
||||
self.assertEqual(source_config["type"], "DBT")
|
||||
self.assertFalse(source_config["dbtUpdateDescriptions"])
|
||||
self.assertFalse(source_config["dbtUpdateOwners"])
|
||||
self.assertFalse(source_config["includeTags"])
|
||||
self.assertTrue(source_config["searchAcrossDatabases"])
|
||||
self.assertEqual(source_config["dbtClassificationName"], "custom_tags")
|
||||
|
||||
# Validate custom filter patterns
|
||||
self.assertEqual(
|
||||
source_config["databaseFilterPattern"], {"includes": ["prod_*"]}
|
||||
)
|
||||
self.assertEqual(
|
||||
source_config["schemaFilterPattern"],
|
||||
{"includes": ["public"], "excludes": ["temp_*"]},
|
||||
)
|
||||
self.assertEqual(
|
||||
source_config["tableFilterPattern"], {"includes": ["fact_*"]}
|
||||
)
|
||||
|
||||
def test_workflow_config_creation(self):
|
||||
"""Test workflow configuration creation"""
|
||||
with tempfile.TemporaryDirectory() as temp_dir:
|
||||
temp_path = Path(temp_dir)
|
||||
target_dir = temp_path / "target"
|
||||
target_dir.mkdir()
|
||||
|
||||
# Test with all artifact files
|
||||
manifest_file = target_dir / "manifest.json"
|
||||
manifest_file.write_text('{"metadata": {"dbt_schema_version": "v1"}}')
|
||||
catalog_file = target_dir / "catalog.json"
|
||||
catalog_file.write_text('{"metadata": {"generated_at": "2023-01-01"}}')
|
||||
run_results_file = target_dir / "run_results.json"
|
||||
run_results_file.write_text('{"metadata": {"generated_at": "2023-01-01"}}')
|
||||
|
||||
# Use default config
|
||||
default_om_config = OpenMetadataDBTConfig(
|
||||
openmetadata_host_port="http://test-server:port/endpoint",
|
||||
openmetadata_jwt_token="test-jwt-token",
|
||||
openmetadata_service_name="test_service",
|
||||
)
|
||||
|
||||
config = create_dbt_workflow_config(temp_path, default_om_config)
|
||||
|
||||
# Validate structure
|
||||
self.assertIn("source", config)
|
||||
self.assertIn("sink", config)
|
||||
self.assertIn("workflowConfig", config)
|
||||
self.assertEqual(config["source"]["serviceName"], "test_service")
|
||||
self.assertEqual(config["source"]["sourceConfig"]["config"]["type"], "DBT")
|
||||
|
||||
# Test missing manifest error
|
||||
manifest_file.unlink()
|
||||
with self.assertRaises(FileNotFoundError) as context:
|
||||
create_dbt_workflow_config(temp_path, default_om_config)
|
||||
self.assertIn("manifest.json not found", str(context.exception))
|
||||
|
||||
@patch("metadata.cli.ingest_dbt.MetadataWorkflow")
|
||||
def test_cli_execution(self, mock_workflow_class):
|
||||
"""Test CLI execution - success and error cases"""
|
||||
mock_workflow = MagicMock()
|
||||
mock_workflow_class.create.return_value = mock_workflow
|
||||
|
||||
# Test successful execution
|
||||
with tempfile.TemporaryDirectory() as temp_dir:
|
||||
temp_path = Path(temp_dir)
|
||||
target_dir = temp_path / "target"
|
||||
target_dir.mkdir()
|
||||
|
||||
# Create required files
|
||||
(target_dir / "manifest.json").write_text('{"metadata": {}}')
|
||||
(temp_path / "dbt_project.yml").write_text(
|
||||
"""
|
||||
name: 'test_project'
|
||||
vars:
|
||||
openmetadata_host_port: 'http://test-server:port/endpoint'
|
||||
openmetadata_jwt_token: 'test-jwt-token'
|
||||
openmetadata_service_name: 'test_service'
|
||||
"""
|
||||
)
|
||||
|
||||
run_ingest_dbt(temp_path)
|
||||
mock_workflow_class.create.assert_called_once()
|
||||
mock_workflow.execute.assert_called_once()
|
||||
|
||||
# Test path errors
|
||||
with self.assertRaises(SystemExit):
|
||||
run_ingest_dbt(Path("/non/existent/path"))
|
||||
|
||||
def test_integration_with_test_config(self):
|
||||
"""Integration test using actual test resources with comprehensive validation"""
|
||||
config = find_dbt_project_config(self.test_resources_path)
|
||||
|
||||
# Validate the loaded config has proper structure
|
||||
self.assertIn("vars", config)
|
||||
self.assertIsInstance(config["vars"], dict)
|
||||
|
||||
# Extract and validate OpenMetadata config
|
||||
om_config = extract_openmetadata_config(config)
|
||||
|
||||
# Verify extracted configuration matches expected values exactly
|
||||
self.assertIsInstance(om_config, OpenMetadataDBTConfig)
|
||||
self.assertEqual(
|
||||
om_config.openmetadata_host_port, "http://test-server:port/endpoint"
|
||||
)
|
||||
# Get the expected JWT token from environment variable (same as what gets substituted)
|
||||
expected_jwt_token = os.environ.get("OPENMETADATA_JWT_TOKEN")
|
||||
self.assertEqual(om_config.openmetadata_jwt_token, expected_jwt_token)
|
||||
self.assertEqual(om_config.openmetadata_service_name, "test_service")
|
||||
|
||||
# Verify optional configuration from test file
|
||||
self.assertTrue(
|
||||
om_config.openmetadata_dbt_update_descriptions
|
||||
) # explicitly set to true
|
||||
self.assertFalse(
|
||||
om_config.openmetadata_dbt_update_owners
|
||||
) # explicitly set to false
|
||||
self.assertTrue(
|
||||
om_config.openmetadata_include_tags
|
||||
) # default value (not in config)
|
||||
self.assertFalse(
|
||||
om_config.openmetadata_search_across_databases
|
||||
) # default value (not in config)
|
||||
self.assertEqual(
|
||||
om_config.openmetadata_dbt_classification_name, "dbtTags"
|
||||
) # custom value
|
||||
|
||||
# Verify filter patterns from test file (dict format only)
|
||||
self.assertEqual(om_config.database_filter.includes, ["dbt_test_*"])
|
||||
self.assertEqual(om_config.database_filter.excludes, ["temp_*", "test_*"])
|
||||
|
||||
self.assertEqual(
|
||||
om_config.schema_filter.includes, [".*"]
|
||||
) # default (not specified in config)
|
||||
self.assertIsNone(
|
||||
om_config.schema_filter.excludes
|
||||
) # default (not specified in config)
|
||||
|
||||
self.assertEqual(om_config.table_filter.includes, [".*"])
|
||||
self.assertEqual(om_config.table_filter.excludes, ["temp_.*", "tmp_.*"])
|
||||
|
||||
# Validate that the extracted config can be used to create workflow config
|
||||
with tempfile.TemporaryDirectory() as temp_dir:
|
||||
temp_path = Path(temp_dir)
|
||||
(temp_path / "target").mkdir()
|
||||
(temp_path / "target" / "manifest.json").write_text('{"metadata": {}}')
|
||||
|
||||
workflow_config = create_dbt_workflow_config(temp_path, om_config)
|
||||
|
||||
# Validate workflow config uses the extracted values correctly
|
||||
self.assertIsInstance(workflow_config, dict)
|
||||
self.assertEqual(workflow_config["source"]["serviceName"], "test_service")
|
||||
self.assertEqual(
|
||||
workflow_config["workflowConfig"]["openMetadataServerConfig"][
|
||||
"hostPort"
|
||||
],
|
||||
"http://test-server:port/endpoint",
|
||||
)
|
||||
# Get the expected JWT token from environment variable (same as what gets substituted)
|
||||
expected_jwt_token = os.environ.get("OPENMETADATA_JWT_TOKEN")
|
||||
self.assertEqual(
|
||||
workflow_config["workflowConfig"]["openMetadataServerConfig"][
|
||||
"securityConfig"
|
||||
]["jwtToken"],
|
||||
expected_jwt_token,
|
||||
)
|
||||
|
||||
# Validate the optional config in the workflow
|
||||
source_config = workflow_config["source"]["sourceConfig"]["config"]
|
||||
self.assertTrue(source_config["dbtUpdateDescriptions"])
|
||||
self.assertFalse(source_config["dbtUpdateOwners"])
|
||||
self.assertTrue(source_config["includeTags"]) # default value
|
||||
self.assertFalse(source_config["searchAcrossDatabases"]) # default value
|
||||
self.assertEqual(source_config["dbtClassificationName"], "dbtTags")
|
||||
|
||||
# Validate filter patterns in workflow config (standardized dict format)
|
||||
expected_db_pattern = {
|
||||
"includes": ["dbt_test_*"],
|
||||
"excludes": ["temp_*", "test_*"],
|
||||
}
|
||||
expected_schema_pattern = {"includes": [".*"]} # default pattern
|
||||
expected_table_pattern = {
|
||||
"includes": [".*"],
|
||||
"excludes": ["temp_.*", "tmp_.*"],
|
||||
}
|
||||
|
||||
self.assertEqual(
|
||||
source_config["databaseFilterPattern"], expected_db_pattern
|
||||
)
|
||||
self.assertEqual(
|
||||
source_config["schemaFilterPattern"], expected_schema_pattern
|
||||
)
|
||||
self.assertEqual(
|
||||
source_config["tableFilterPattern"], expected_table_pattern
|
||||
)
|
@ -673,6 +673,8 @@ site_menu:
|
||||
url: /connectors/ingestion/workflows/dbt
|
||||
- category: Connectors / Ingestion / Workflows / dbt / Configure dbt workflow from OpenMetadata UI
|
||||
url: /connectors/ingestion/workflows/dbt/configure-dbt-workflow-from-ui
|
||||
- category: Connectors / Ingestion / Workflows / dbt / Auto Ingest DBT Core Artifacts
|
||||
url: /connectors/ingestion/workflows/dbt/auto-ingest-dbt-core
|
||||
- category: Connectors / Ingestion / Workflows / dbt / Run Externally
|
||||
url: /connectors/ingestion/workflows/dbt/run-dbt-workflow-externally
|
||||
- category: Connectors / Ingestion / Workflows / dbt / Ingest dbt Owner
|
||||
|
@ -0,0 +1,351 @@
|
||||
---
|
||||
title: Auto Ingest dbt-core
|
||||
slug: /connectors/ingestion/workflows/dbt/auto-ingest-dbt-core
|
||||
---
|
||||
|
||||
# Auto Ingest dbt-core
|
||||
|
||||
Learn how to automatically ingest dbt-core artifacts into OpenMetadata using the simplified `metadata ingest-dbt` CLI command that reads configuration directly from your `dbt_project.yml` file.
|
||||
|
||||
{% note %}
|
||||
This feature eliminates the need for separate YAML configuration files. All configuration is done directly in your existing `dbt_project.yml` file.
|
||||
{% /note %}
|
||||
|
||||
## Overview
|
||||
|
||||
The `metadata ingest-dbt` command provides a streamlined way to ingest dbt artifacts into OpenMetadata by:
|
||||
- Reading configuration directly from your `dbt_project.yml` file
|
||||
- Automatically discovering dbt artifacts (`manifest.json`, `catalog.json`, `run_results.json`)
|
||||
- Supporting comprehensive filtering and configuration options
|
||||
|
||||
## Prerequisites
|
||||
|
||||
1. **dbt project setup**: You must have a dbt project with a valid `dbt_project.yml` file
|
||||
2. **dbt artifacts**: Run `dbt compile` or `dbt run` to generate required artifacts in the `target/` directory
|
||||
3. **OpenMetadata service**: Your database service must already be configured in OpenMetadata
|
||||
4. **OpenMetadata Python package**: Install the OpenMetadata ingestion package
|
||||
|
||||
```bash
|
||||
pip install "openmetadata-ingestion[dbt]"
|
||||
```
|
||||
|
||||
{% note %}
|
||||
**Dependencies**: The package includes `python-dotenv>=0.19.0` for automatic `.env` file support, so no additional setup is required for environment variable functionality.
|
||||
{% /note %}
|
||||
|
||||
## Quick Start
|
||||
|
||||
### 1. Configure your dbt_project.yml
|
||||
|
||||
Add the following variables to the `vars` section of your `dbt_project.yml` file:
|
||||
|
||||
```yaml
|
||||
vars:
|
||||
# Required OpenMetadata configuration
|
||||
openmetadata_host_port: "https://your-openmetadata-server-url/endpoint"
|
||||
openmetadata_jwt_token: "your-jwt-token-here"
|
||||
openmetadata_service_name: "your-database-service-name"
|
||||
```
|
||||
|
||||
{% note %}
|
||||
**Environment Variables**: For security, you can use environment variables instead of hardcoding sensitive values. See the [Environment Variables](#environment-variables) section below for supported patterns.
|
||||
{% /note %}
|
||||
|
||||
### 2. Generate dbt artifacts
|
||||
|
||||
```bash
|
||||
cd your-dbt-project
|
||||
dbt compile # or dbt run
|
||||
```
|
||||
|
||||
### 3. Run the ingestion
|
||||
|
||||
If you're already in your dbt project directory:
|
||||
```bash
|
||||
metadata ingest-dbt
|
||||
```
|
||||
|
||||
Or if you're in a different directory:
|
||||
```bash
|
||||
metadata ingest-dbt -c /path/to/your/dbt-project
|
||||
```
|
||||
|
||||
## Environment Variables
|
||||
|
||||
For security and flexibility, you can use environment variables in your `dbt_project.yml` configuration instead of hardcoding sensitive values like JWT tokens. The system supports three different environment variable patterns:
|
||||
|
||||
### Supported Patterns
|
||||
|
||||
| Pattern | Description | Example |
|
||||
|---------|-------------|---------|
|
||||
| `${VAR}` | Shell-style variable substitution | `"${OPENMETADATA_TOKEN}"` |
|
||||
| `{{ env_var("VAR") }}` | dbt-style without default | `"{{ env_var('OPENMETADATA_HOST') }}"` |
|
||||
| `{{ env_var("VAR", "default") }}` | dbt-style with default value | `"{{ env_var('SERVICE_NAME', 'default-service') }}"` |
|
||||
|
||||
### Environment Variables Example
|
||||
|
||||
```yaml
|
||||
# dbt_project.yml
|
||||
vars:
|
||||
# Using shell-style variables
|
||||
openmetadata_host_port: "${OPENMETADATA_HOST_PORT}"
|
||||
openmetadata_jwt_token: "${OPENMETADATA_JWT_TOKEN}"
|
||||
|
||||
# Using dbt-style variables
|
||||
openmetadata_service_name: "{{ env_var('OPENMETADATA_SERVICE_NAME') }}"
|
||||
|
||||
# Using dbt-style with defaults
|
||||
openmetadata_dbt_classification_name: "{{ env_var('DBT_CLASSIFICATION', 'dbt_tags') }}"
|
||||
openmetadata_search_across_databases: "{{ env_var('SEARCH_ACROSS_DB', 'false') }}"
|
||||
```
|
||||
|
||||
Then set your environment variables:
|
||||
```bash
|
||||
export OPENMETADATA_HOST_PORT="https://your-openmetadata-server-url/endpoint"
|
||||
export OPENMETADATA_JWT_TOKEN="your-jwt-token"
|
||||
export OPENMETADATA_SERVICE_NAME="your-database-service"
|
||||
```
|
||||
|
||||
**Alternative: Using .env Files**
|
||||
|
||||
For local development, you can create a `.env` file in your dbt project directory:
|
||||
|
||||
```bash
|
||||
# .env file in your dbt project root
|
||||
OPENMETADATA_HOST_PORT=https://your-openmetadata-server-url/endpoint
|
||||
OPENMETADATA_JWT_TOKEN=your-jwt-token
|
||||
OPENMETADATA_SERVICE_NAME=your-database-service
|
||||
```
|
||||
|
||||
{% note %}
|
||||
**Note**: The system automatically loads environment variables from `.env` files in both the dbt project directory and the current working directory. Environment variables set in the shell take precedence over `.env` file values.
|
||||
{% /note %}
|
||||
|
||||
{% note %}
|
||||
**Error Handling**: If a required environment variable is not set and no default is provided, the ingestion will fail with a clear error message indicating which variable is missing.
|
||||
{% /note %}
|
||||
|
||||
## Configuration Options
|
||||
|
||||
### Required Parameters
|
||||
|
||||
| Parameter | Description |
|
||||
|-----------|-------------|
|
||||
| `openmetadata_host_port` | OpenMetadata server URL (must start with `https://`) |
|
||||
| `openmetadata_jwt_token` | JWT token for authentication |
|
||||
| `openmetadata_service_name` | Name of the database service in OpenMetadata |
|
||||
|
||||
### Optional Parameters
|
||||
|
||||
| Parameter | Default | Description |
|
||||
|-----------|---------|-------------|
|
||||
| `openmetadata_dbt_update_descriptions` | `true` | Update table/column descriptions from dbt |
|
||||
| `openmetadata_dbt_update_owners` | `true` | Update model owners from dbt |
|
||||
| `openmetadata_include_tags` | `true` | Include dbt tags as OpenMetadata tags |
|
||||
| `openmetadata_search_across_databases` | `false` | Search for tables across multiple databases |
|
||||
| `openmetadata_dbt_classification_name` | `null` | Custom classification name for dbt tags |
|
||||
|
||||
### Filter Patterns
|
||||
|
||||
Control which databases, schemas, and tables to include or exclude:
|
||||
|
||||
```yaml
|
||||
vars:
|
||||
# ... required config above ...
|
||||
|
||||
# Filter patterns using regex
|
||||
openmetadata_database_filter_pattern:
|
||||
includes: ["production_*", "analytics_*"]
|
||||
excludes: ["temp_*", "test_*"]
|
||||
|
||||
openmetadata_schema_filter_pattern:
|
||||
includes: ["public", "marts", "staging"]
|
||||
excludes: ["information_schema", "temp_*"]
|
||||
|
||||
openmetadata_table_filter_pattern:
|
||||
includes: [".*"]
|
||||
excludes: ["temp_.*", "tmp_.*", "dbt_.*"]
|
||||
```
|
||||
|
||||
## Complete Example
|
||||
|
||||
```yaml
|
||||
# dbt_project.yml
|
||||
name: 'my_analytics_project'
|
||||
version: '1.0.0'
|
||||
config-version: 2
|
||||
|
||||
profile: 'my_analytics_project'
|
||||
model-paths: ["models"]
|
||||
# ... other dbt settings ...
|
||||
|
||||
vars:
|
||||
# OpenMetadata Configuration - Using Environment Variables
|
||||
openmetadata_host_port: "${OPENMETADATA_HOST_PORT}"
|
||||
openmetadata_jwt_token: "{{ env_var('OPENMETADATA_JWT_TOKEN') }}"
|
||||
openmetadata_service_name: "{{ env_var('OPENMETADATA_SERVICE_NAME', 'postgres_analytics') }}"
|
||||
|
||||
# Optional Settings
|
||||
openmetadata_dbt_update_descriptions: true
|
||||
openmetadata_dbt_update_owners: true
|
||||
openmetadata_include_tags: true
|
||||
openmetadata_dbt_classification_name: "{{ env_var('DBT_CLASSIFICATION', 'dbt_analytics_tags') }}"
|
||||
|
||||
# Filtering
|
||||
openmetadata_database_filter_pattern:
|
||||
includes: ["analytics", "data_warehouse"]
|
||||
excludes: ["temp_db", "test_db"]
|
||||
|
||||
openmetadata_table_filter_pattern:
|
||||
includes: [".*"]
|
||||
excludes: ["temp_.*", "tmp_.*", "test_.*"]
|
||||
```
|
||||
|
||||
## Command Options
|
||||
|
||||
```bash
|
||||
metadata ingest-dbt [OPTIONS]
|
||||
|
||||
Options:
|
||||
-h, --help Show help message and exit
|
||||
-c, --dbt-project-path PATH Path to the dbt project directory (default: current directory)
|
||||
```
|
||||
|
||||
**Note**: Global options like `--version`, `--log-level`, and `--debug` are available at the main `metadata` command level:
|
||||
|
||||
```bash
|
||||
metadata --version # Show version information
|
||||
metadata --log-level DEBUG ingest-dbt -c /path/to/project # Set log level
|
||||
metadata --debug ingest-dbt -c /path/to/project # Enable debug mode
|
||||
```
|
||||
|
||||
## Artifacts Discovery
|
||||
|
||||
The command automatically discovers artifacts from your dbt project's `target/` directory:
|
||||
|
||||
| Artifact | Required | Description |
|
||||
|----------|----------|-------------|
|
||||
| `manifest.json` | ✅ Yes | Model definitions, relationships, and metadata |
|
||||
| `catalog.json` | ❌ Optional | Table and column statistics from `dbt docs generate` |
|
||||
| `run_results.json` | ❌ Optional | Test results from `dbt test` |
|
||||
|
||||
### Generate All Artifacts
|
||||
|
||||
```bash
|
||||
dbt compile # Generate manifest.json
|
||||
dbt docs generate # Generate catalog.json (requires database connection)
|
||||
dbt test # Generate run_results.json
|
||||
```
|
||||
|
||||
## What Gets Ingested
|
||||
|
||||
- **Model Definitions**: Queries, configurations, and relationships
|
||||
- **Lineage**: Table-to-table and column-level lineage
|
||||
- **Documentation**: Model and column descriptions
|
||||
- **Data Quality**: dbt test definitions and results
|
||||
- **Tags & Classification**: Model and column tags
|
||||
- **Ownership**: Model owners and team assignments
|
||||
|
||||
## Error Handling & Troubleshooting
|
||||
|
||||
### Common Issues
|
||||
|
||||
| Issue | Solution |
|
||||
|-------|----------|
|
||||
| `dbt_project.yml not found` | Ensure you're in a valid dbt project directory |
|
||||
| `Required configuration not found` | Add `openmetadata_*` variables to your `dbt_project.yml` |
|
||||
| `manifest.json not found` | Run `dbt compile` or `dbt run` first |
|
||||
| `Invalid URL format` | Ensure `openmetadata_host_port` includes protocol (`https://`) |
|
||||
| `Environment variable 'VAR' is not set` | Set the required environment variable or provide a default value |
|
||||
| `Environment variable not set and no default` | Either set the environment variable or use the `{{ env_var('VAR', 'default') }}` pattern |
|
||||
|
||||
### Debug Mode
|
||||
|
||||
Enable detailed logging:
|
||||
```bash
|
||||
metadata --debug ingest-dbt -c .
|
||||
```
|
||||
|
||||
## Best Practices
|
||||
|
||||
### Security
|
||||
- **Always use environment variables** for sensitive data like JWT tokens
|
||||
- **Multiple patterns supported** for flexibility:
|
||||
```yaml
|
||||
vars:
|
||||
# Shell-style (simple and widely supported)
|
||||
openmetadata_host_port: "${OPENMETADATA_HOST_PORT}"
|
||||
openmetadata_jwt_token: "${OPENMETADATA_JWT_TOKEN}"
|
||||
|
||||
# dbt-style (consistent with dbt conventions)
|
||||
openmetadata_service_name: "{{ env_var('OPENMETADATA_SERVICE_NAME') }}"
|
||||
|
||||
# dbt-style with fallbacks (recommended for optional settings)
|
||||
openmetadata_dbt_classification_name: "{{ env_var('DBT_CLASSIFICATION', 'dbt_tags') }}"
|
||||
```
|
||||
- **Never commit** sensitive values directly to version control
|
||||
|
||||
### Filtering
|
||||
- Use specific patterns to exclude temporary/test tables
|
||||
- Filter based on your organization's naming conventions
|
||||
- Exclude system schemas and databases
|
||||
|
||||
### Automation
|
||||
- Integrate into CI/CD pipelines
|
||||
- Run after successful dbt builds
|
||||
- Set up scheduled ingestion for regular updates
|
||||
|
||||
## CI/CD Integration
|
||||
|
||||
```yaml
|
||||
# .github/workflows/dbt-ingestion.yml
|
||||
name: dbt and OpenMetadata Ingestion
|
||||
|
||||
on:
|
||||
push:
|
||||
branches: [main]
|
||||
|
||||
jobs:
|
||||
dbt-run-and-ingest:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
|
||||
- name: Setup Python
|
||||
uses: actions/setup-python@v4
|
||||
with:
|
||||
python-version: '3.9'
|
||||
|
||||
- name: Install dependencies
|
||||
run: |
|
||||
pip install dbt-core dbt-postgres
|
||||
pip install "openmetadata-ingestion[dbt]"
|
||||
|
||||
- name: Run dbt
|
||||
run: |
|
||||
dbt deps
|
||||
dbt compile
|
||||
dbt run
|
||||
dbt test
|
||||
dbt docs generate
|
||||
env:
|
||||
DBT_PROFILES_DIR: .
|
||||
|
||||
- name: Ingest to OpenMetadata
|
||||
run: metadata ingest-dbt -c .
|
||||
env:
|
||||
OPENMETADATA_HOST_PORT: ${{ secrets.OPENMETADATA_HOST_PORT }}
|
||||
OPENMETADATA_JWT_TOKEN: ${{ secrets.OPENMETADATA_JWT_TOKEN }}
|
||||
OPENMETADATA_SERVICE_NAME: ${{ secrets.OPENMETADATA_SERVICE_NAME }}
|
||||
```
|
||||
|
||||
## Next Steps
|
||||
|
||||
After successful ingestion:
|
||||
|
||||
1. **Explore your data** in the OpenMetadata UI
|
||||
2. **Configure additional dbt features** like [tags](/connectors/ingestion/workflows/dbt/ingest-dbt-tags), [tiers](/connectors/ingestion/workflows/dbt/ingest-dbt-tier), and [glossary](/connectors/ingestion/workflows/dbt/ingest-dbt-glossary)
|
||||
3. **Set up data governance** policies and workflows
|
||||
4. **Schedule regular ingestion** for keeping metadata up-to-date
|
||||
|
||||
For additional troubleshooting, refer to the [dbt Troubleshooting Guide](/connectors/ingestion/workflows/dbt/dbt-troubleshooting).
|
@ -22,6 +22,14 @@ Configure the dbt Workflow from the UI.
|
||||
Configure the dbt Workflow from the CLI.
|
||||
{%/inlineCallout%}
|
||||
|
||||
{%inlineCallout
|
||||
icon="celebration"
|
||||
bold="Auto Ingest DBT Artifacts (dbt-core)"
|
||||
href="/connectors/ingestion/workflows/dbt/auto-ingest-dbt-core"%}
|
||||
Configure the auto dbt ingestion for dbt-core.
|
||||
|
||||
{%/inlineCallout%}
|
||||
|
||||
{%/inlineCalloutContainer%}
|
||||
|
||||
# dbt Integration
|
||||
|
@ -815,6 +815,8 @@ site_menu:
|
||||
url: /connectors/ingestion/workflows/dbt
|
||||
- category: Connectors / Ingestion / Workflows / dbt / Configure dbt workflow from OpenMetadata UI
|
||||
url: /connectors/ingestion/workflows/dbt/configure-dbt-workflow-from-ui
|
||||
- category: Connectors / Ingestion / Workflows / dbt / Auto Ingest DBT Core Artifacts
|
||||
url: /connectors/ingestion/workflows/dbt/auto-ingest-dbt-core
|
||||
- category: Connectors / Ingestion / Workflows / dbt / Run Externally
|
||||
url: /connectors/ingestion/workflows/dbt/run-dbt-workflow-externally
|
||||
- category: Connectors / Ingestion / Workflows / dbt / Ingest dbt Owner
|
||||
|
@ -673,6 +673,8 @@ site_menu:
|
||||
url: /connectors/ingestion/workflows/dbt
|
||||
- category: Connectors / Ingestion / Workflows / dbt / Configure dbt workflow from OpenMetadata UI
|
||||
url: /connectors/ingestion/workflows/dbt/configure-dbt-workflow-from-ui
|
||||
- category: Connectors / Ingestion / Workflows / dbt / Auto Ingest DBT Core Artifacts
|
||||
url: /connectors/ingestion/workflows/dbt/auto-ingest-dbt-core
|
||||
- category: Connectors / Ingestion / Workflows / dbt / Run Externally
|
||||
url: /connectors/ingestion/workflows/dbt/run-dbt-workflow-externally
|
||||
- category: Connectors / Ingestion / Workflows / dbt / Ingest dbt Owner
|
||||
|
@ -0,0 +1,351 @@
|
||||
---
|
||||
title: Auto Ingest dbt-core
|
||||
slug: /connectors/ingestion/workflows/dbt/auto-ingest-dbt-core
|
||||
---
|
||||
|
||||
# Auto Ingest dbt-core
|
||||
|
||||
Learn how to automatically ingest dbt-core artifacts into OpenMetadata using the simplified `metadata ingest-dbt` CLI command that reads configuration directly from your `dbt_project.yml` file.
|
||||
|
||||
{% note %}
|
||||
This feature eliminates the need for separate YAML configuration files. All configuration is done directly in your existing `dbt_project.yml` file.
|
||||
{% /note %}
|
||||
|
||||
## Overview
|
||||
|
||||
The `metadata ingest-dbt` command provides a streamlined way to ingest dbt artifacts into OpenMetadata by:
|
||||
- Reading configuration directly from your `dbt_project.yml` file
|
||||
- Automatically discovering dbt artifacts (`manifest.json`, `catalog.json`, `run_results.json`)
|
||||
- Supporting comprehensive filtering and configuration options
|
||||
|
||||
## Prerequisites
|
||||
|
||||
1. **dbt project setup**: You must have a dbt project with a valid `dbt_project.yml` file
|
||||
2. **dbt artifacts**: Run `dbt compile` or `dbt run` to generate required artifacts in the `target/` directory
|
||||
3. **OpenMetadata service**: Your database service must already be configured in OpenMetadata
|
||||
4. **OpenMetadata Python package**: Install the OpenMetadata ingestion package
|
||||
|
||||
```bash
|
||||
pip install "openmetadata-ingestion[dbt]"
|
||||
```
|
||||
|
||||
{% note %}
|
||||
**Dependencies**: The package includes `python-dotenv>=0.19.0` for automatic `.env` file support, so no additional setup is required for environment variable functionality.
|
||||
{% /note %}
|
||||
|
||||
## Quick Start
|
||||
|
||||
### 1. Configure your dbt_project.yml
|
||||
|
||||
Add the following variables to the `vars` section of your `dbt_project.yml` file:
|
||||
|
||||
```yaml
|
||||
vars:
|
||||
# Required OpenMetadata configuration
|
||||
openmetadata_host_port: "https://your-openmetadata-server-url/endpoint"
|
||||
openmetadata_jwt_token: "your-jwt-token-here"
|
||||
openmetadata_service_name: "your-database-service-name"
|
||||
```
|
||||
|
||||
{% note %}
|
||||
**Environment Variables**: For security, you can use environment variables instead of hardcoding sensitive values. See the [Environment Variables](#environment-variables) section below for supported patterns.
|
||||
{% /note %}
|
||||
|
||||
### 2. Generate dbt artifacts
|
||||
|
||||
```bash
|
||||
cd your-dbt-project
|
||||
dbt compile # or dbt run
|
||||
```
|
||||
|
||||
### 3. Run the ingestion
|
||||
|
||||
If you're already in your dbt project directory:
|
||||
```bash
|
||||
metadata ingest-dbt
|
||||
```
|
||||
|
||||
Or if you're in a different directory:
|
||||
```bash
|
||||
metadata ingest-dbt -c /path/to/your/dbt-project
|
||||
```
|
||||
|
||||
## Environment Variables
|
||||
|
||||
For security and flexibility, you can use environment variables in your `dbt_project.yml` configuration instead of hardcoding sensitive values like JWT tokens. The system supports three different environment variable patterns:
|
||||
|
||||
### Supported Patterns
|
||||
|
||||
| Pattern | Description | Example |
|
||||
|---------|-------------|---------|
|
||||
| `${VAR}` | Shell-style variable substitution | `"${OPENMETADATA_TOKEN}"` |
|
||||
| `{{ env_var("VAR") }}` | dbt-style without default | `"{{ env_var('OPENMETADATA_HOST') }}"` |
|
||||
| `{{ env_var("VAR", "default") }}` | dbt-style with default value | `"{{ env_var('SERVICE_NAME', 'default-service') }}"` |
|
||||
|
||||
### Environment Variables Example
|
||||
|
||||
```yaml
|
||||
# dbt_project.yml
|
||||
vars:
|
||||
# Using shell-style variables
|
||||
openmetadata_host_port: "${OPENMETADATA_HOST_PORT}"
|
||||
openmetadata_jwt_token: "${OPENMETADATA_JWT_TOKEN}"
|
||||
|
||||
# Using dbt-style variables
|
||||
openmetadata_service_name: "{{ env_var('OPENMETADATA_SERVICE_NAME') }}"
|
||||
|
||||
# Using dbt-style with defaults
|
||||
openmetadata_dbt_classification_name: "{{ env_var('DBT_CLASSIFICATION', 'dbt_tags') }}"
|
||||
openmetadata_search_across_databases: "{{ env_var('SEARCH_ACROSS_DB', 'false') }}"
|
||||
```
|
||||
|
||||
Then set your environment variables:
|
||||
```bash
|
||||
export OPENMETADATA_HOST_PORT="https://your-openmetadata-server-url/endpoint"
|
||||
export OPENMETADATA_JWT_TOKEN="your-jwt-token"
|
||||
export OPENMETADATA_SERVICE_NAME="your-database-service"
|
||||
```
|
||||
|
||||
**Alternative: Using .env Files**
|
||||
|
||||
For local development, you can create a `.env` file in your dbt project directory:
|
||||
|
||||
```bash
|
||||
# .env file in your dbt project root
|
||||
OPENMETADATA_HOST_PORT=https://your-openmetadata-server-url/endpoint
|
||||
OPENMETADATA_JWT_TOKEN=your-jwt-token
|
||||
OPENMETADATA_SERVICE_NAME=your-database-service
|
||||
```
|
||||
|
||||
{% note %}
|
||||
**Note**: The system automatically loads environment variables from `.env` files in both the dbt project directory and the current working directory. Environment variables set in the shell take precedence over `.env` file values.
|
||||
{% /note %}
|
||||
|
||||
{% note %}
|
||||
**Error Handling**: If a required environment variable is not set and no default is provided, the ingestion will fail with a clear error message indicating which variable is missing.
|
||||
{% /note %}
|
||||
|
||||
## Configuration Options
|
||||
|
||||
### Required Parameters
|
||||
|
||||
| Parameter | Description |
|
||||
|-----------|-------------|
|
||||
| `openmetadata_host_port` | OpenMetadata server URL (must start with `https://`) |
|
||||
| `openmetadata_jwt_token` | JWT token for authentication |
|
||||
| `openmetadata_service_name` | Name of the database service in OpenMetadata |
|
||||
|
||||
### Optional Parameters
|
||||
|
||||
| Parameter | Default | Description |
|
||||
|-----------|---------|-------------|
|
||||
| `openmetadata_dbt_update_descriptions` | `true` | Update table/column descriptions from dbt |
|
||||
| `openmetadata_dbt_update_owners` | `true` | Update model owners from dbt |
|
||||
| `openmetadata_include_tags` | `true` | Include dbt tags as OpenMetadata tags |
|
||||
| `openmetadata_search_across_databases` | `false` | Search for tables across multiple databases |
|
||||
| `openmetadata_dbt_classification_name` | `null` | Custom classification name for dbt tags |
|
||||
|
||||
### Filter Patterns
|
||||
|
||||
Control which databases, schemas, and tables to include or exclude:
|
||||
|
||||
```yaml
|
||||
vars:
|
||||
# ... required config above ...
|
||||
|
||||
# Filter patterns using regex
|
||||
openmetadata_database_filter_pattern:
|
||||
includes: ["production_*", "analytics_*"]
|
||||
excludes: ["temp_*", "test_*"]
|
||||
|
||||
openmetadata_schema_filter_pattern:
|
||||
includes: ["public", "marts", "staging"]
|
||||
excludes: ["information_schema", "temp_*"]
|
||||
|
||||
openmetadata_table_filter_pattern:
|
||||
includes: [".*"]
|
||||
excludes: ["temp_.*", "tmp_.*", "dbt_.*"]
|
||||
```
|
||||
|
||||
## Complete Example
|
||||
|
||||
```yaml
|
||||
# dbt_project.yml
|
||||
name: 'my_analytics_project'
|
||||
version: '1.0.0'
|
||||
config-version: 2
|
||||
|
||||
profile: 'my_analytics_project'
|
||||
model-paths: ["models"]
|
||||
# ... other dbt settings ...
|
||||
|
||||
vars:
|
||||
# OpenMetadata Configuration - Using Environment Variables
|
||||
openmetadata_host_port: "${OPENMETADATA_HOST_PORT}"
|
||||
openmetadata_jwt_token: "{{ env_var('OPENMETADATA_JWT_TOKEN') }}"
|
||||
openmetadata_service_name: "{{ env_var('OPENMETADATA_SERVICE_NAME', 'postgres_analytics') }}"
|
||||
|
||||
# Optional Settings
|
||||
openmetadata_dbt_update_descriptions: true
|
||||
openmetadata_dbt_update_owners: true
|
||||
openmetadata_include_tags: true
|
||||
openmetadata_dbt_classification_name: "{{ env_var('DBT_CLASSIFICATION', 'dbt_analytics_tags') }}"
|
||||
|
||||
# Filtering
|
||||
openmetadata_database_filter_pattern:
|
||||
includes: ["analytics", "data_warehouse"]
|
||||
excludes: ["temp_db", "test_db"]
|
||||
|
||||
openmetadata_table_filter_pattern:
|
||||
includes: [".*"]
|
||||
excludes: ["temp_.*", "tmp_.*", "test_.*"]
|
||||
```
|
||||
|
||||
## Command Options
|
||||
|
||||
```bash
|
||||
metadata ingest-dbt [OPTIONS]
|
||||
|
||||
Options:
|
||||
-h, --help Show help message and exit
|
||||
-c, --dbt-project-path PATH Path to the dbt project directory (default: current directory)
|
||||
```
|
||||
|
||||
**Note**: Global options like `--version`, `--log-level`, and `--debug` are available at the main `metadata` command level:
|
||||
|
||||
```bash
|
||||
metadata --version # Show version information
|
||||
metadata --log-level DEBUG ingest-dbt -c /path/to/project # Set log level
|
||||
metadata --debug ingest-dbt -c /path/to/project # Enable debug mode
|
||||
```
|
||||
|
||||
## Artifacts Discovery
|
||||
|
||||
The command automatically discovers artifacts from your dbt project's `target/` directory:
|
||||
|
||||
| Artifact | Required | Description |
|
||||
|----------|----------|-------------|
|
||||
| `manifest.json` | ✅ Yes | Model definitions, relationships, and metadata |
|
||||
| `catalog.json` | ❌ Optional | Table and column statistics from `dbt docs generate` |
|
||||
| `run_results.json` | ❌ Optional | Test results from `dbt test` |
|
||||
|
||||
### Generate All Artifacts
|
||||
|
||||
```bash
|
||||
dbt compile # Generate manifest.json
|
||||
dbt docs generate # Generate catalog.json (requires database connection)
|
||||
dbt test # Generate run_results.json
|
||||
```
|
||||
|
||||
## What Gets Ingested
|
||||
|
||||
- **Model Definitions**: Queries, configurations, and relationships
|
||||
- **Lineage**: Table-to-table and column-level lineage
|
||||
- **Documentation**: Model and column descriptions
|
||||
- **Data Quality**: dbt test definitions and results
|
||||
- **Tags & Classification**: Model and column tags
|
||||
- **Ownership**: Model owners and team assignments
|
||||
|
||||
## Error Handling & Troubleshooting
|
||||
|
||||
### Common Issues
|
||||
|
||||
| Issue | Solution |
|
||||
|-------|----------|
|
||||
| `dbt_project.yml not found` | Ensure you're in a valid dbt project directory |
|
||||
| `Required configuration not found` | Add `openmetadata_*` variables to your `dbt_project.yml` |
|
||||
| `manifest.json not found` | Run `dbt compile` or `dbt run` first |
|
||||
| `Invalid URL format` | Ensure `openmetadata_host_port` includes protocol (`https://`) |
|
||||
| `Environment variable 'VAR' is not set` | Set the required environment variable or provide a default value |
|
||||
| `Environment variable not set and no default` | Either set the environment variable or use the `{{ env_var('VAR', 'default') }}` pattern |
|
||||
|
||||
### Debug Mode
|
||||
|
||||
Enable detailed logging:
|
||||
```bash
|
||||
metadata --debug ingest-dbt -c .
|
||||
```
|
||||
|
||||
## Best Practices
|
||||
|
||||
### Security
|
||||
- **Always use environment variables** for sensitive data like JWT tokens
|
||||
- **Multiple patterns supported** for flexibility:
|
||||
```yaml
|
||||
vars:
|
||||
# Shell-style (simple and widely supported)
|
||||
openmetadata_host_port: "${OPENMETADATA_HOST_PORT}"
|
||||
openmetadata_jwt_token: "${OPENMETADATA_JWT_TOKEN}"
|
||||
|
||||
# dbt-style (consistent with dbt conventions)
|
||||
openmetadata_service_name: "{{ env_var('OPENMETADATA_SERVICE_NAME') }}"
|
||||
|
||||
# dbt-style with fallbacks (recommended for optional settings)
|
||||
openmetadata_dbt_classification_name: "{{ env_var('DBT_CLASSIFICATION', 'dbt_tags') }}"
|
||||
```
|
||||
- **Never commit** sensitive values directly to version control
|
||||
|
||||
### Filtering
|
||||
- Use specific patterns to exclude temporary/test tables
|
||||
- Filter based on your organization's naming conventions
|
||||
- Exclude system schemas and databases
|
||||
|
||||
### Automation
|
||||
- Integrate into CI/CD pipelines
|
||||
- Run after successful dbt builds
|
||||
- Set up scheduled ingestion for regular updates
|
||||
|
||||
## CI/CD Integration
|
||||
|
||||
```yaml
|
||||
# .github/workflows/dbt-ingestion.yml
|
||||
name: dbt and OpenMetadata Ingestion
|
||||
|
||||
on:
|
||||
push:
|
||||
branches: [main]
|
||||
|
||||
jobs:
|
||||
dbt-run-and-ingest:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
|
||||
- name: Setup Python
|
||||
uses: actions/setup-python@v4
|
||||
with:
|
||||
python-version: '3.9'
|
||||
|
||||
- name: Install dependencies
|
||||
run: |
|
||||
pip install dbt-core dbt-postgres
|
||||
pip install "openmetadata-ingestion[dbt]"
|
||||
|
||||
- name: Run dbt
|
||||
run: |
|
||||
dbt deps
|
||||
dbt compile
|
||||
dbt run
|
||||
dbt test
|
||||
dbt docs generate
|
||||
env:
|
||||
DBT_PROFILES_DIR: .
|
||||
|
||||
- name: Ingest to OpenMetadata
|
||||
run: metadata ingest-dbt -c .
|
||||
env:
|
||||
OPENMETADATA_HOST_PORT: ${{ secrets.OPENMETADATA_HOST_PORT }}
|
||||
OPENMETADATA_JWT_TOKEN: ${{ secrets.OPENMETADATA_JWT_TOKEN }}
|
||||
OPENMETADATA_SERVICE_NAME: ${{ secrets.OPENMETADATA_SERVICE_NAME }}
|
||||
```
|
||||
|
||||
## Next Steps
|
||||
|
||||
After successful ingestion:
|
||||
|
||||
1. **Explore your data** in the OpenMetadata UI
|
||||
2. **Configure additional dbt features** like [tags](/connectors/ingestion/workflows/dbt/ingest-dbt-tags), [tiers](/connectors/ingestion/workflows/dbt/ingest-dbt-tier), and [glossary](/connectors/ingestion/workflows/dbt/ingest-dbt-glossary)
|
||||
3. **Set up data governance** policies and workflows
|
||||
4. **Schedule regular ingestion** for keeping metadata up-to-date
|
||||
|
||||
For additional troubleshooting, refer to the [dbt Troubleshooting Guide](/connectors/ingestion/workflows/dbt/dbt-troubleshooting).
|
@ -22,6 +22,14 @@ Configure the dbt Workflow from the UI.
|
||||
Configure the dbt Workflow from the CLI.
|
||||
{%/inlineCallout%}
|
||||
|
||||
{%inlineCallout
|
||||
icon="celebration"
|
||||
bold="Auto Ingest DBT Artifacts (dbt-core)"
|
||||
href="/connectors/ingestion/workflows/dbt/auto-ingest-dbt-core"%}
|
||||
Configure the auto dbt ingestion for dbt-core.
|
||||
|
||||
{%/inlineCallout%}
|
||||
|
||||
{%/inlineCalloutContainer%}
|
||||
|
||||
# dbt Integration
|
||||
|
@ -821,6 +821,8 @@ site_menu:
|
||||
url: /connectors/ingestion/workflows/dbt
|
||||
- category: Connectors / Ingestion / Workflows / dbt / Configure dbt workflow from OpenMetadata UI
|
||||
url: /connectors/ingestion/workflows/dbt/configure-dbt-workflow-from-ui
|
||||
- category: Connectors / Ingestion / Workflows / dbt / Auto Ingest DBT Core Artifacts
|
||||
url: /connectors/ingestion/workflows/dbt/auto-ingest-dbt-core
|
||||
- category: Connectors / Ingestion / Workflows / dbt / Run Externally
|
||||
url: /connectors/ingestion/workflows/dbt/run-dbt-workflow-externally
|
||||
- category: Connectors / Ingestion / Workflows / dbt / Ingest dbt Owner
|
||||
|
Loading…
x
Reference in New Issue
Block a user