feat: enables dbt metadata files to be loaded from URIs (#3739)

This commit is contained in:
Sergio Gómez Villamor 2021-12-15 18:11:39 +01:00 committed by GitHub
parent adf9d2ead7
commit c59c63e90d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 2788 additions and 16 deletions

View File

@ -71,7 +71,7 @@ services:
environment:
- discovery.type=single-node
- xpack.security.enabled=false
- ES_JAVA_OPTS=-Xms256m -Xmx256m
- ES_JAVA_OPTS=-Xms256m -Xmx256m -Dlog4j2.formatMsgNoLookups=true
healthcheck:
retries: 4
start_period: 2m

View File

@ -97,7 +97,7 @@ plugins: Dict[str, Set[str]] = {
"bigquery": sql_common | bigquery_common | {"pybigquery >= 0.6.0"},
"bigquery-usage": bigquery_common | {"cachetools"},
"datahub-business-glossary": set(),
"dbt": set(),
"dbt": {"requests"},
"druid": sql_common | {"pydruid>=0.6.2"},
"feast": {"docker"},
"glue": aws_common,

View File

@ -60,9 +60,9 @@ Note that a `.` is used to denote nested fields in the YAML recipe.
| Field | Required | Default | Description |
| ------------------------- | -------- | -------- | ----------------------------------------------------------------------------------------------------------------------------------------------------- |
| `manifest_path` | ✅ | | Path to dbt manifest JSON. See https://docs.getdbt.com/reference/artifacts/manifest-json |
| `catalog_path` | ✅ | | Path to dbt catalog JSON. See https://docs.getdbt.com/reference/artifacts/catalog-json |
| `sources_path` | | | Path to dbt sources JSON. See https://docs.getdbt.com/reference/artifacts/sources-json. If not specified, last-modified fields will not be populated. |
| `manifest_path` | ✅ | | Path to dbt manifest JSON. See https://docs.getdbt.com/reference/artifacts/manifest-json Note this can be a local file or a URI. |
| `catalog_path` | ✅ | | Path to dbt catalog JSON. See https://docs.getdbt.com/reference/artifacts/catalog-json Note this can be a local file or a URI. |
| `sources_path` | | | Path to dbt sources JSON. See https://docs.getdbt.com/reference/artifacts/sources-json. If not specified, last-modified fields will not be populated. Note this can be a local file or a URI. |
| `env` | | `"PROD"` | Environment to use in namespace when constructing URNs. |
| `target_platform` | ✅ | | The platform that dbt is loading onto. (e.g. bigquery / redshift / postgres etc.) |
| `use_identifiers` | | `False` | Use model [identifier](https://docs.getdbt.com/reference/resource-properties/identifier) instead of model name if defined (if not, default to model name). |

View File

@ -1,9 +1,11 @@
import json
import logging
import re
from dataclasses import dataclass, field
from typing import Any, Dict, Iterable, List, Optional, Tuple
import dateutil.parser
import requests
from pydantic import validator
from datahub.configuration import ConfigModel
@ -264,6 +266,14 @@ def extract_dbt_entities(
return dbt_entities
def load_file_as_json(uri: str) -> Any:
if re.match("^https?://", uri):
return json.loads(requests.get(uri).text)
else:
with open(uri, "r") as f:
return json.load(f)
def loadManifestAndCatalog(
manifest_path: str,
catalog_path: str,
@ -282,16 +292,13 @@ def loadManifestAndCatalog(
Optional[str],
Dict[str, Dict[str, Any]],
]:
with open(manifest_path, "r") as manifest:
dbt_manifest_json = json.load(manifest)
dbt_manifest_json = load_file_as_json(manifest_path)
with open(catalog_path, "r") as catalog:
dbt_catalog_json = json.load(catalog)
dbt_catalog_json = load_file_as_json(catalog_path)
if sources_path is not None:
with open(sources_path, "r") as sources:
dbt_sources_json = json.load(sources)
sources_results = dbt_sources_json["results"]
dbt_sources_json = load_file_as_json(sources_path)
sources_results = dbt_sources_json["results"]
else:
sources_results = {}

View File

@ -2,6 +2,7 @@ from os import PathLike
from typing import Any, Dict, Optional, Union
import pytest
import requests_mock
from datahub.ingestion.run.pipeline import Pipeline
from tests.test_helpers import mce_helpers
@ -11,6 +12,7 @@ class DbtTestConfig:
def __init__(
self,
run_id: str,
dbt_metadata_uri_prefix: str,
test_resources_dir: Union[str, PathLike],
tmp_path: Union[str, PathLike],
output_file: Union[str, PathLike],
@ -27,9 +29,9 @@ class DbtTestConfig:
self.run_id = run_id
self.manifest_path = f"{test_resources_dir}/dbt_manifest.json"
self.catalog_path = f"{test_resources_dir}/dbt_catalog.json"
self.sources_path = f"{test_resources_dir}/dbt_sources.json"
self.manifest_path = f"{dbt_metadata_uri_prefix}/dbt_manifest.json"
self.catalog_path = f"{dbt_metadata_uri_prefix}/dbt_catalog.json"
self.sources_path = f"{dbt_metadata_uri_prefix}/dbt_sources.json"
self.target_platform = "postgres"
self.output_path = f"{tmp_path}/{output_file}"
@ -55,13 +57,30 @@ class DbtTestConfig:
@pytest.mark.integration
def test_dbt_ingest(pytestconfig, tmp_path, mock_time):
@requests_mock.Mocker(kw="req_mock")
def test_dbt_ingest(pytestconfig, tmp_path, mock_time, **kwargs):
test_resources_dir = pytestconfig.rootpath / "tests/integration/dbt"
with open(test_resources_dir / "dbt_manifest.json", "r") as f:
kwargs["req_mock"].get(
"http://some-external-repo/dbt_manifest.json", text=f.read()
)
with open(test_resources_dir / "dbt_catalog.json", "r") as f:
kwargs["req_mock"].get(
"http://some-external-repo/dbt_catalog.json", text=f.read()
)
with open(test_resources_dir / "dbt_sources.json", "r") as f:
kwargs["req_mock"].get(
"http://some-external-repo/dbt_sources.json", text=f.read()
)
config_variants = [
DbtTestConfig(
"dbt-test-with-schemas",
test_resources_dir,
test_resources_dir,
tmp_path,
"dbt_with_schemas_mces.json",
"dbt_with_schemas_mces_golden.json",
@ -70,9 +89,22 @@ def test_dbt_ingest(pytestconfig, tmp_path, mock_time):
"disable_dbt_node_creation": True,
},
),
DbtTestConfig(
"dbt-test-with-external-metadata-files",
"http://some-external-repo",
test_resources_dir,
tmp_path,
"dbt_with_external_metadata_files_mces.json",
"dbt_with_external_metadata_files_mces_golden.json",
source_config_modifiers={
"load_schemas": True,
"disable_dbt_node_creation": True,
},
),
DbtTestConfig(
"dbt-test-without-schemas",
test_resources_dir,
test_resources_dir,
tmp_path,
"dbt_without_schemas_mces.json",
"dbt_without_schemas_mces_golden.json",
@ -84,6 +116,7 @@ def test_dbt_ingest(pytestconfig, tmp_path, mock_time):
DbtTestConfig(
"dbt-test-without-schemas-with-filter",
test_resources_dir,
test_resources_dir,
tmp_path,
"dbt_without_schemas_with_filter_mces.json",
"dbt_without_schemas_with_filter_mces_golden.json",
@ -98,6 +131,7 @@ def test_dbt_ingest(pytestconfig, tmp_path, mock_time):
DbtTestConfig(
"dbt-test-with-schemas-dbt-enabled",
test_resources_dir,
test_resources_dir,
tmp_path,
"dbt_enabled_with_schemas_mces.json",
"dbt_enabled_with_schemas_mces_golden.json",
@ -106,6 +140,7 @@ def test_dbt_ingest(pytestconfig, tmp_path, mock_time):
DbtTestConfig(
"dbt-test-without-schemas-dbt-enabled",
test_resources_dir,
test_resources_dir,
tmp_path,
"dbt_enabled_without_schemas_mces.json",
"dbt_enabled_without_schemas_mces_golden.json",
@ -114,6 +149,7 @@ def test_dbt_ingest(pytestconfig, tmp_path, mock_time):
DbtTestConfig(
"dbt-test-without-schemas-with-filter-dbt-enabled",
test_resources_dir,
test_resources_dir,
tmp_path,
"dbt_enabled_without_schemas_with_filter_mces.json",
"dbt_enabled_without_schemas_with_filter_mces_golden.json",