mirror of
				https://github.com/datahub-project/datahub.git
				synced 2025-11-04 12:51:23 +00:00 
			
		
		
		
	feat(ingest): support environment variables in recipes (#2306)
This commit is contained in:
		
							parent
							
								
									7afe038a5c
								
							
						
					
					
						commit
						f57c954fc6
					
				@ -156,7 +156,7 @@ source:
 | 
			
		||||
  type: mssql
 | 
			
		||||
  config:
 | 
			
		||||
    username: sa
 | 
			
		||||
    password: test!Password
 | 
			
		||||
    password: ${MSSQL_PASSWORD}
 | 
			
		||||
    database: DemoData
 | 
			
		||||
 | 
			
		||||
sink:
 | 
			
		||||
@ -165,6 +165,10 @@ sink:
 | 
			
		||||
    server: "http://localhost:8080"
 | 
			
		||||
```
 | 
			
		||||
 | 
			
		||||
We automatically expand environment variables in the config,
 | 
			
		||||
similar to variable substitution in GNU bash or in docker-compose files. For details, see
 | 
			
		||||
https://docs.docker.com/compose/compose-file/compose-file-v2/#variable-substitution.
 | 
			
		||||
 | 
			
		||||
Running a recipe is quite easy.
 | 
			
		||||
 | 
			
		||||
```sh
 | 
			
		||||
@ -208,19 +212,20 @@ source:
 | 
			
		||||
    database: dbname
 | 
			
		||||
    host_port: localhost:3306
 | 
			
		||||
    table_pattern:
 | 
			
		||||
      deny:
 | 
			
		||||
        # Note that the deny patterns take precedence over the allow patterns.
 | 
			
		||||
        - "performance_schema"
 | 
			
		||||
      allow:
 | 
			
		||||
        - "schema1.table2"
 | 
			
		||||
      deny:
 | 
			
		||||
        - "performance_schema"
 | 
			
		||||
      # Although the 'table_pattern' enables you to skip everything from certain schemas,
 | 
			
		||||
      # having another option to allow/deny on schema level is an optimization for the case when there is a large number
 | 
			
		||||
      # of schemas that one wants to skip and you want to avoid the time to needlessly fetch those tables only to filter
 | 
			
		||||
      # them out afterwards via the table_pattern.
 | 
			
		||||
    schema_pattern:
 | 
			
		||||
      allow:
 | 
			
		||||
        - "schema1"
 | 
			
		||||
      deny:
 | 
			
		||||
        - "garbage_schema"
 | 
			
		||||
      allow:
 | 
			
		||||
        - "schema1"
 | 
			
		||||
```
 | 
			
		||||
 | 
			
		||||
### Microsoft SQL Server Metadata `mssql`
 | 
			
		||||
@ -239,11 +244,11 @@ source:
 | 
			
		||||
    host_port: localhost:1433
 | 
			
		||||
    database: DemoDatabase
 | 
			
		||||
    table_pattern:
 | 
			
		||||
      deny:
 | 
			
		||||
        - "^.*\\.sys_.*" # deny all tables that start with sys_
 | 
			
		||||
      allow:
 | 
			
		||||
        - "schema1.table1"
 | 
			
		||||
        - "schema1.table2"
 | 
			
		||||
      deny:
 | 
			
		||||
        - "^.*\\.sys_.*" # deny all tables that start with sys_
 | 
			
		||||
    options:
 | 
			
		||||
      # Any options specified here will be passed to SQLAlchemy's create_engine as kwargs.
 | 
			
		||||
      # See https://docs.sqlalchemy.org/en/14/core/engines.html for details.
 | 
			
		||||
 | 
			
		||||
@ -20,36 +20,13 @@ mypy_path = src
 | 
			
		||||
plugins =
 | 
			
		||||
    sqlmypy,
 | 
			
		||||
    pydantic.mypy
 | 
			
		||||
ignore_missing_imports = yes
 | 
			
		||||
namespace_packages = true
 | 
			
		||||
strict_optional = yes
 | 
			
		||||
check_untyped_defs = yes
 | 
			
		||||
# eventually we'd like to enable these
 | 
			
		||||
disallow_untyped_defs = no
 | 
			
		||||
disallow_incomplete_defs = no
 | 
			
		||||
check_untyped_defs = yes
 | 
			
		||||
 | 
			
		||||
[mypy-confluent_kafka.*]
 | 
			
		||||
ignore_missing_imports = yes
 | 
			
		||||
[mypy-avro.*]
 | 
			
		||||
ignore_missing_imports = yes
 | 
			
		||||
[mypy-ldap.*]
 | 
			
		||||
ignore_missing_imports = yes
 | 
			
		||||
[mypy-sqlalchemy_pytds.*]
 | 
			
		||||
ignore_missing_imports = yes
 | 
			
		||||
[mypy-pyhive]
 | 
			
		||||
ignore_missing_imports = yes
 | 
			
		||||
[mypy-pybigquery]
 | 
			
		||||
ignore_missing_imports = yes
 | 
			
		||||
[mypy-psycopg2]
 | 
			
		||||
ignore_missing_imports = yes
 | 
			
		||||
[mypy-geoalchemy2.*]
 | 
			
		||||
ignore_missing_imports = yes
 | 
			
		||||
[mypy-snowflake.*]
 | 
			
		||||
ignore_missing_imports = yes
 | 
			
		||||
[mypy-pydruid.*]
 | 
			
		||||
ignore_missing_imports = yes
 | 
			
		||||
[mypy-pymongo.*]
 | 
			
		||||
ignore_missing_imports = yes
 | 
			
		||||
[mypy-docker.*]
 | 
			
		||||
ignore_missing_imports = yes
 | 
			
		||||
 | 
			
		||||
[isort]
 | 
			
		||||
profile = black
 | 
			
		||||
 | 
			
		||||
@ -38,6 +38,7 @@ framework_common = {
 | 
			
		||||
    "pyyaml>=5.4.1",
 | 
			
		||||
    "toml>=0.10.0",
 | 
			
		||||
    "docker>=4.4",
 | 
			
		||||
    "expandvars>=0.6.5",
 | 
			
		||||
    "avro-gen3==0.3.8",
 | 
			
		||||
    "avro-python3>=1.8.2",
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -0,0 +1,34 @@
 | 
			
		||||
import io
 | 
			
		||||
import pathlib
 | 
			
		||||
 | 
			
		||||
from expandvars import expandvars
 | 
			
		||||
 | 
			
		||||
from datahub.configuration.common import ConfigurationError, ConfigurationMechanism
 | 
			
		||||
from datahub.configuration.toml import TomlConfigurationMechanism
 | 
			
		||||
from datahub.configuration.yaml import YamlConfigurationMechanism
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def load_config_file(config_file: pathlib.Path) -> dict:
 | 
			
		||||
    if not config_file.is_file():
 | 
			
		||||
        raise ConfigurationError(f"Cannot open config file {config_file}")
 | 
			
		||||
 | 
			
		||||
    config_mech: ConfigurationMechanism
 | 
			
		||||
    if config_file.suffix in [".yaml", ".yml"]:
 | 
			
		||||
        config_mech = YamlConfigurationMechanism()
 | 
			
		||||
    elif config_file.suffix == ".toml":
 | 
			
		||||
        config_mech = TomlConfigurationMechanism()
 | 
			
		||||
    else:
 | 
			
		||||
        raise ConfigurationError(
 | 
			
		||||
            "Only .toml and .yml are supported. Cannot process file type {}".format(
 | 
			
		||||
                config_file.suffix
 | 
			
		||||
            )
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
    with config_file.open() as raw_config_fp:
 | 
			
		||||
        raw_config_file = raw_config_fp.read()
 | 
			
		||||
 | 
			
		||||
    expanded_config_file = expandvars(raw_config_file, nounset=True)
 | 
			
		||||
    config_fp = io.StringIO(expanded_config_file)
 | 
			
		||||
    config = config_mech.load_config(config_fp)
 | 
			
		||||
 | 
			
		||||
    return config
 | 
			
		||||
@ -7,9 +7,7 @@ import click
 | 
			
		||||
from pydantic import ValidationError
 | 
			
		||||
 | 
			
		||||
from datahub.check.check_cli import check
 | 
			
		||||
from datahub.configuration.common import ConfigurationError, ConfigurationMechanism
 | 
			
		||||
from datahub.configuration.toml import TomlConfigurationMechanism
 | 
			
		||||
from datahub.configuration.yaml import YamlConfigurationMechanism
 | 
			
		||||
from datahub.configuration.config_loader import load_config_file
 | 
			
		||||
from datahub.ingestion.run.pipeline import Pipeline
 | 
			
		||||
from datahub.ingestion.sink.sink_registry import sink_registry
 | 
			
		||||
from datahub.ingestion.source.source_registry import source_registry
 | 
			
		||||
@ -54,23 +52,7 @@ def ingest(config: str) -> None:
 | 
			
		||||
    """Main command for ingesting metadata into DataHub"""
 | 
			
		||||
 | 
			
		||||
    config_file = pathlib.Path(config)
 | 
			
		||||
    if not config_file.is_file():
 | 
			
		||||
        raise ConfigurationError(f"Cannot open config file {config}")
 | 
			
		||||
 | 
			
		||||
    config_mech: ConfigurationMechanism
 | 
			
		||||
    if config_file.suffix in [".yaml", ".yml"]:
 | 
			
		||||
        config_mech = YamlConfigurationMechanism()
 | 
			
		||||
    elif config_file.suffix == ".toml":
 | 
			
		||||
        config_mech = TomlConfigurationMechanism()
 | 
			
		||||
    else:
 | 
			
		||||
        raise ConfigurationError(
 | 
			
		||||
            "Only .toml and .yml are supported. Cannot process file type {}".format(
 | 
			
		||||
                config_file.suffix
 | 
			
		||||
            )
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
    with config_file.open() as fp:
 | 
			
		||||
        pipeline_config = config_mech.load_config(fp)
 | 
			
		||||
    pipeline_config = load_config_file(config_file)
 | 
			
		||||
 | 
			
		||||
    try:
 | 
			
		||||
        logger.info(f"Using config: {pipeline_config}")
 | 
			
		||||
 | 
			
		||||
@ -0,0 +1,2 @@
 | 
			
		||||
stuff:
 | 
			
		||||
  filename: ${THIS_IS_UNSET?with an error message}
 | 
			
		||||
							
								
								
									
										5
									
								
								metadata-ingestion/tests/unit/config/basic.toml
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										5
									
								
								metadata-ingestion/tests/unit/config/basic.toml
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,5 @@
 | 
			
		||||
foo = "bar"
 | 
			
		||||
 | 
			
		||||
[nested]
 | 
			
		||||
hi = "hello"
 | 
			
		||||
array = [ "one", "two" ]
 | 
			
		||||
							
								
								
									
										7
									
								
								metadata-ingestion/tests/unit/config/basic.yml
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										7
									
								
								metadata-ingestion/tests/unit/config/basic.yml
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,7 @@
 | 
			
		||||
---
 | 
			
		||||
foo: bar
 | 
			
		||||
nested:
 | 
			
		||||
  hi: hello
 | 
			
		||||
  array:
 | 
			
		||||
    - one
 | 
			
		||||
    - two
 | 
			
		||||
@ -0,0 +1,5 @@
 | 
			
		||||
---
 | 
			
		||||
normal: sa
 | 
			
		||||
working_dir: $VAR1
 | 
			
		||||
path: ${VAR2}
 | 
			
		||||
server: ${UNSET_VAR3:-http://localhost:8080}
 | 
			
		||||
							
								
								
									
										75
									
								
								metadata-ingestion/tests/unit/config/test_config_loader.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										75
									
								
								metadata-ingestion/tests/unit/config/test_config_loader.py
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,75 @@
 | 
			
		||||
import os
 | 
			
		||||
from unittest import mock
 | 
			
		||||
 | 
			
		||||
import expandvars
 | 
			
		||||
import pytest
 | 
			
		||||
 | 
			
		||||
from datahub.configuration.common import ConfigurationError
 | 
			
		||||
from datahub.configuration.config_loader import load_config_file
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@pytest.mark.parametrize(
 | 
			
		||||
    "filename,golden_config,env,error_type",
 | 
			
		||||
    [
 | 
			
		||||
        (
 | 
			
		||||
            # Basic YAML load
 | 
			
		||||
            "tests/unit/config/basic.yml",
 | 
			
		||||
            {"foo": "bar", "nested": {"array": ["one", "two"], "hi": "hello"}},
 | 
			
		||||
            {},
 | 
			
		||||
            None,
 | 
			
		||||
        ),
 | 
			
		||||
        (
 | 
			
		||||
            # Basic TOML load
 | 
			
		||||
            "tests/unit/config/basic.toml",
 | 
			
		||||
            {"foo": "bar", "nested": {"array": ["one", "two"], "hi": "hello"}},
 | 
			
		||||
            {},
 | 
			
		||||
            None,
 | 
			
		||||
        ),
 | 
			
		||||
        # Variable expansion load
 | 
			
		||||
        (
 | 
			
		||||
            "tests/unit/config/simple_variable_expansion.yml",
 | 
			
		||||
            {
 | 
			
		||||
                "normal": "sa",
 | 
			
		||||
                "path": "stuff2",
 | 
			
		||||
                "server": "http://localhost:8080",
 | 
			
		||||
                "working_dir": "stuff1",
 | 
			
		||||
            },
 | 
			
		||||
            {
 | 
			
		||||
                "VAR1": "stuff1",
 | 
			
		||||
                "VAR2": "stuff2",
 | 
			
		||||
            },
 | 
			
		||||
            None,
 | 
			
		||||
        ),
 | 
			
		||||
        (
 | 
			
		||||
            # Variable expansion error
 | 
			
		||||
            "tests/unit/config/bad_variable_expansion.yml",
 | 
			
		||||
            None,
 | 
			
		||||
            {},
 | 
			
		||||
            expandvars.ParameterNullOrNotSet,
 | 
			
		||||
        ),
 | 
			
		||||
        (
 | 
			
		||||
            # Missing file
 | 
			
		||||
            "tests/unit/config/this_file_does_not_exist.yml",
 | 
			
		||||
            None,
 | 
			
		||||
            {},
 | 
			
		||||
            ConfigurationError,
 | 
			
		||||
        ),
 | 
			
		||||
        (
 | 
			
		||||
            # Unknown extension
 | 
			
		||||
            "tests/unit/config/bad_extension.whatevenisthis",
 | 
			
		||||
            None,
 | 
			
		||||
            {},
 | 
			
		||||
            ConfigurationError,
 | 
			
		||||
        ),
 | 
			
		||||
    ],
 | 
			
		||||
)
 | 
			
		||||
def test_load(pytestconfig, filename, golden_config, env, error_type):
 | 
			
		||||
    filepath = pytestconfig.rootpath / filename
 | 
			
		||||
 | 
			
		||||
    with mock.patch.dict(os.environ, env):
 | 
			
		||||
        if error_type:
 | 
			
		||||
            with pytest.raises(error_type):
 | 
			
		||||
                _ = load_config_file(filepath)
 | 
			
		||||
        else:
 | 
			
		||||
            loaded_config = load_config_file(filepath)
 | 
			
		||||
            assert loaded_config == golden_config
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user