feat(ingest): test dbt ingestion with and without schemas (#2922)

This commit is contained in:
Kevin Hu 2021-07-21 12:56:31 -07:00 committed by GitHub
parent 7535cf2b85
commit d6875b4f4f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 689 additions and 29 deletions

View File

@ -0,0 +1,600 @@
[
{
"auditHeader": null,
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
"urn": "urn:li:dataset:(urn:li:dataPlatform:dbt,pagila.dbt_postgres.customer_details,PROD)",
"aspects": [
{
"com.linkedin.pegasus2avro.dataset.DatasetProperties": {
"customProperties": {
"node_type": "model",
"materialization": "ephemeral",
"dbt_file_path": "models/transform/customer_details.sql"
},
"externalUrl": null,
"description": "model.sample_dbt.customer_details",
"uri": null,
"tags": []
}
},
{
"com.linkedin.pegasus2avro.dataset.UpstreamLineage": {
"upstreams": [
{
"auditStamp": {
"time": 0,
"actor": "urn:li:corpuser:unknown",
"impersonator": null
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:dbt,pagila.public.customer,PROD)",
"type": "TRANSFORMED"
},
{
"auditStamp": {
"time": 0,
"actor": "urn:li:corpuser:unknown",
"impersonator": null
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:dbt,pagila.public.address,PROD)",
"type": "TRANSFORMED"
},
{
"auditStamp": {
"time": 0,
"actor": "urn:li:corpuser:unknown",
"impersonator": null
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:dbt,pagila.public.city,PROD)",
"type": "TRANSFORMED"
}
]
}
}
]
}
},
"proposedDelta": null
},
{
"auditHeader": null,
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
"urn": "urn:li:dataset:(urn:li:dataPlatform:dbt,pagila.dbt_postgres.monthly_billing_with_cust,PROD)",
"aspects": [
{
"com.linkedin.pegasus2avro.dataset.DatasetProperties": {
"customProperties": {
"node_type": "model",
"materialization": "table",
"dbt_file_path": "models/billing/monthly_billing_with_cust.sql",
"catalog_type": "BASE TABLE"
},
"externalUrl": null,
"description": "model.sample_dbt.monthly_billing_with_cust",
"uri": null,
"tags": []
}
},
{
"com.linkedin.pegasus2avro.dataset.UpstreamLineage": {
"upstreams": [
{
"auditStamp": {
"time": 0,
"actor": "urn:li:corpuser:unknown",
"impersonator": null
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:dbt,pagila.dbt_postgres.payments_by_customer_by_month,PROD)",
"type": "TRANSFORMED"
},
{
"auditStamp": {
"time": 0,
"actor": "urn:li:corpuser:unknown",
"impersonator": null
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:dbt,pagila.dbt_postgres.customer_details,PROD)",
"type": "TRANSFORMED"
}
]
}
}
]
}
},
"proposedDelta": null
},
{
"auditHeader": null,
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
"urn": "urn:li:dataset:(urn:li:dataPlatform:dbt,pagila.dbt_postgres.payments_base,PROD)",
"aspects": [
{
"com.linkedin.pegasus2avro.dataset.DatasetProperties": {
"customProperties": {
"node_type": "model",
"materialization": "view",
"dbt_file_path": "models/base/payments_base.sql",
"catalog_type": "VIEW"
},
"externalUrl": null,
"description": "model.sample_dbt.payments_base",
"uri": null,
"tags": []
}
},
{
"com.linkedin.pegasus2avro.dataset.UpstreamLineage": {
"upstreams": [
{
"auditStamp": {
"time": 0,
"actor": "urn:li:corpuser:unknown",
"impersonator": null
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:dbt,pagila.public.payment_p2020_01,PROD)",
"type": "TRANSFORMED"
},
{
"auditStamp": {
"time": 0,
"actor": "urn:li:corpuser:unknown",
"impersonator": null
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:dbt,pagila.public.payment_p2020_02,PROD)",
"type": "TRANSFORMED"
},
{
"auditStamp": {
"time": 0,
"actor": "urn:li:corpuser:unknown",
"impersonator": null
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:dbt,pagila.public.payment_p2020_02,PROD)",
"type": "TRANSFORMED"
},
{
"auditStamp": {
"time": 0,
"actor": "urn:li:corpuser:unknown",
"impersonator": null
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:dbt,pagila.public.payment_p2020_03,PROD)",
"type": "TRANSFORMED"
},
{
"auditStamp": {
"time": 0,
"actor": "urn:li:corpuser:unknown",
"impersonator": null
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:dbt,pagila.public.payment_p2020_04,PROD)",
"type": "TRANSFORMED"
},
{
"auditStamp": {
"time": 0,
"actor": "urn:li:corpuser:unknown",
"impersonator": null
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:dbt,pagila.public.payment_p2020_05,PROD)",
"type": "TRANSFORMED"
},
{
"auditStamp": {
"time": 0,
"actor": "urn:li:corpuser:unknown",
"impersonator": null
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:dbt,pagila.public.payment_p2020_06,PROD)",
"type": "TRANSFORMED"
}
]
}
}
]
}
},
"proposedDelta": null
},
{
"auditHeader": null,
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
"urn": "urn:li:dataset:(urn:li:dataPlatform:dbt,pagila.dbt_postgres.payments_by_customer_by_month,PROD)",
"aspects": [
{
"com.linkedin.pegasus2avro.dataset.DatasetProperties": {
"customProperties": {
"node_type": "model",
"materialization": "table",
"dbt_file_path": "models/transform/payments_by_customer_by_month.sql",
"catalog_type": "BASE TABLE"
},
"externalUrl": null,
"description": "model.sample_dbt.payments_by_customer_by_month",
"uri": null,
"tags": []
}
},
{
"com.linkedin.pegasus2avro.dataset.UpstreamLineage": {
"upstreams": [
{
"auditStamp": {
"time": 0,
"actor": "urn:li:corpuser:unknown",
"impersonator": null
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:dbt,pagila.dbt_postgres.payments_base,PROD)",
"type": "TRANSFORMED"
}
]
}
}
]
}
},
"proposedDelta": null
},
{
"auditHeader": null,
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
"urn": "urn:li:dataset:(urn:li:dataPlatform:dbt,pagila.public.actor,PROD)",
"aspects": [
{
"com.linkedin.pegasus2avro.dataset.DatasetProperties": {
"customProperties": {
"model_maturity": "in dev",
"owner": "@alice",
"some_other_property": "test 1",
"node_type": "source",
"dbt_file_path": "models/base.yml",
"catalog_type": "BASE TABLE"
},
"externalUrl": null,
"description": "source.sample_dbt.pagila.actor",
"uri": null,
"tags": []
}
},
{
"com.linkedin.pegasus2avro.dataset.UpstreamLineage": {
"upstreams": []
}
}
]
}
},
"proposedDelta": null
},
{
"auditHeader": null,
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
"urn": "urn:li:dataset:(urn:li:dataPlatform:dbt,pagila.public.address,PROD)",
"aspects": [
{
"com.linkedin.pegasus2avro.dataset.DatasetProperties": {
"customProperties": {
"node_type": "source",
"dbt_file_path": "models/base.yml",
"catalog_type": "BASE TABLE"
},
"externalUrl": null,
"description": "source.sample_dbt.pagila.address",
"uri": null,
"tags": []
}
},
{
"com.linkedin.pegasus2avro.dataset.UpstreamLineage": {
"upstreams": []
}
}
]
}
},
"proposedDelta": null
},
{
"auditHeader": null,
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
"urn": "urn:li:dataset:(urn:li:dataPlatform:dbt,pagila.public.category,PROD)",
"aspects": [
{
"com.linkedin.pegasus2avro.dataset.DatasetProperties": {
"customProperties": {
"node_type": "source",
"dbt_file_path": "models/base.yml",
"catalog_type": "BASE TABLE"
},
"externalUrl": null,
"description": "source.sample_dbt.pagila.category",
"uri": null,
"tags": []
}
},
{
"com.linkedin.pegasus2avro.dataset.UpstreamLineage": {
"upstreams": []
}
}
]
}
},
"proposedDelta": null
},
{
"auditHeader": null,
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
"urn": "urn:li:dataset:(urn:li:dataPlatform:dbt,pagila.public.city,PROD)",
"aspects": [
{
"com.linkedin.pegasus2avro.dataset.DatasetProperties": {
"customProperties": {
"node_type": "source",
"dbt_file_path": "models/base.yml",
"catalog_type": "BASE TABLE"
},
"externalUrl": null,
"description": "source.sample_dbt.pagila.city",
"uri": null,
"tags": []
}
},
{
"com.linkedin.pegasus2avro.dataset.UpstreamLineage": {
"upstreams": []
}
}
]
}
},
"proposedDelta": null
},
{
"auditHeader": null,
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
"urn": "urn:li:dataset:(urn:li:dataPlatform:dbt,pagila.public.country,PROD)",
"aspects": [
{
"com.linkedin.pegasus2avro.dataset.DatasetProperties": {
"customProperties": {
"model_maturity": "in prod",
"owner": "@bob",
"some_other_property": "test 2",
"node_type": "source",
"dbt_file_path": "models/base.yml",
"catalog_type": "BASE TABLE"
},
"externalUrl": null,
"description": "source.sample_dbt.pagila.country",
"uri": null,
"tags": []
}
},
{
"com.linkedin.pegasus2avro.dataset.UpstreamLineage": {
"upstreams": []
}
}
]
}
},
"proposedDelta": null
},
{
"auditHeader": null,
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
"urn": "urn:li:dataset:(urn:li:dataPlatform:dbt,pagila.public.customer,PROD)",
"aspects": [
{
"com.linkedin.pegasus2avro.dataset.DatasetProperties": {
"customProperties": {
"node_type": "source",
"dbt_file_path": "models/base.yml",
"catalog_type": "BASE TABLE"
},
"externalUrl": null,
"description": "source.sample_dbt.pagila.customer",
"uri": null,
"tags": []
}
},
{
"com.linkedin.pegasus2avro.dataset.UpstreamLineage": {
"upstreams": []
}
}
]
}
},
"proposedDelta": null
},
{
"auditHeader": null,
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
"urn": "urn:li:dataset:(urn:li:dataPlatform:dbt,pagila.public.payment_p2020_01,PROD)",
"aspects": [
{
"com.linkedin.pegasus2avro.dataset.DatasetProperties": {
"customProperties": {
"node_type": "source",
"dbt_file_path": "models/base.yml",
"catalog_type": "BASE TABLE"
},
"externalUrl": null,
"description": "source.sample_dbt.pagila.payment_p2020_01",
"uri": null,
"tags": []
}
},
{
"com.linkedin.pegasus2avro.dataset.UpstreamLineage": {
"upstreams": []
}
}
]
}
},
"proposedDelta": null
},
{
"auditHeader": null,
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
"urn": "urn:li:dataset:(urn:li:dataPlatform:dbt,pagila.public.payment_p2020_02,PROD)",
"aspects": [
{
"com.linkedin.pegasus2avro.dataset.DatasetProperties": {
"customProperties": {
"an_array_property": "['alpha', 'beta', 'charlie']",
"model_maturity": "in prod",
"owner": "@charles",
"some_other_property": "test 3",
"node_type": "source",
"dbt_file_path": "models/base.yml",
"catalog_type": "BASE TABLE"
},
"externalUrl": null,
"description": "source.sample_dbt.pagila.payment_p2020_02",
"uri": null,
"tags": []
}
},
{
"com.linkedin.pegasus2avro.dataset.UpstreamLineage": {
"upstreams": []
}
}
]
}
},
"proposedDelta": null
},
{
"auditHeader": null,
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
"urn": "urn:li:dataset:(urn:li:dataPlatform:dbt,pagila.public.payment_p2020_03,PROD)",
"aspects": [
{
"com.linkedin.pegasus2avro.dataset.DatasetProperties": {
"customProperties": {
"node_type": "source",
"dbt_file_path": "models/base.yml",
"catalog_type": "BASE TABLE"
},
"externalUrl": null,
"description": "source.sample_dbt.pagila.payment_p2020_03",
"uri": null,
"tags": []
}
},
{
"com.linkedin.pegasus2avro.dataset.UpstreamLineage": {
"upstreams": []
}
}
]
}
},
"proposedDelta": null
},
{
"auditHeader": null,
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
"urn": "urn:li:dataset:(urn:li:dataPlatform:dbt,pagila.public.payment_p2020_04,PROD)",
"aspects": [
{
"com.linkedin.pegasus2avro.dataset.DatasetProperties": {
"customProperties": {
"node_type": "source",
"dbt_file_path": "models/base.yml",
"catalog_type": "BASE TABLE"
},
"externalUrl": null,
"description": "source.sample_dbt.pagila.payment_p2020_04",
"uri": null,
"tags": []
}
},
{
"com.linkedin.pegasus2avro.dataset.UpstreamLineage": {
"upstreams": []
}
}
]
}
},
"proposedDelta": null
},
{
"auditHeader": null,
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
"urn": "urn:li:dataset:(urn:li:dataPlatform:dbt,pagila.public.payment_p2020_05,PROD)",
"aspects": [
{
"com.linkedin.pegasus2avro.dataset.DatasetProperties": {
"customProperties": {
"node_type": "source",
"dbt_file_path": "models/base.yml",
"catalog_type": "BASE TABLE"
},
"externalUrl": null,
"description": "source.sample_dbt.pagila.payment_p2020_05",
"uri": null,
"tags": []
}
},
{
"com.linkedin.pegasus2avro.dataset.UpstreamLineage": {
"upstreams": []
}
}
]
}
},
"proposedDelta": null
},
{
"auditHeader": null,
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
"urn": "urn:li:dataset:(urn:li:dataPlatform:dbt,pagila.public.payment_p2020_06,PROD)",
"aspects": [
{
"com.linkedin.pegasus2avro.dataset.DatasetProperties": {
"customProperties": {
"node_type": "source",
"dbt_file_path": "models/base.yml",
"catalog_type": "BASE TABLE"
},
"externalUrl": null,
"description": "source.sample_dbt.pagila.payment_p2020_06",
"uri": null,
"tags": []
}
},
{
"com.linkedin.pegasus2avro.dataset.UpstreamLineage": {
"upstreams": []
}
}
]
}
},
"proposedDelta": null
}
]

View File

@ -1,37 +1,97 @@
from os import PathLike
from typing import Any, Dict, Optional, Union
from datahub.ingestion.run.pipeline import Pipeline
from tests.test_helpers import mce_helpers
class DbtTestConfig:
def __init__(
self,
run_id: str,
test_resources_dir: Union[str, PathLike],
tmp_path: Union[str, PathLike],
output_file: Union[str, PathLike],
golden_file: Union[str, PathLike],
source_config_modifiers: Optional[Dict[str, Any]] = None,
sink_config_modifiers: Optional[Dict[str, Any]] = None,
):
if source_config_modifiers is None:
source_config_modifiers = {}
if sink_config_modifiers is None:
sink_config_modifiers = {}
self.run_id = run_id
self.manifest_path = f"{test_resources_dir}/dbt_manifest.json"
self.catalog_path = f"{test_resources_dir}/dbt_catalog.json"
self.sources_path = f"{test_resources_dir}/dbt_sources.json"
self.target_platform = "dbt"
self.output_path = f"{tmp_path}/{output_file}"
self.golden_path = f"{test_resources_dir}/{golden_file}"
self.source_config = dict(
{
"manifest_path": self.manifest_path,
"catalog_path": self.catalog_path,
"sources_path": self.sources_path,
"target_platform": self.target_platform,
},
**source_config_modifiers,
)
self.sink_config = dict(
{
"filename": self.output_path,
},
**sink_config_modifiers,
)
def test_dbt_ingest(pytestconfig, tmp_path, mock_time):
test_resources_dir = pytestconfig.rootpath / "tests/integration/dbt"
# test manifest, catalog, sources are generated from https://github.com/kevinhu/sample-dbt
pipeline = Pipeline.create(
{
"run_id": "dbt-test",
"source": {
"type": "dbt",
"config": {
"manifest_path": f"{test_resources_dir}/dbt_manifest.json",
"catalog_path": f"{test_resources_dir}/dbt_catalog.json",
"sources_path": f"{test_resources_dir}/dbt_sources.json",
"target_platform": "dbt",
"load_schemas": True,
},
},
"sink": {
"type": "file",
"config": {
"filename": f"{tmp_path}/dbt_mces.json",
},
},
}
)
pipeline.run()
pipeline.raise_from_status()
config_variants = [
DbtTestConfig(
"dbt-test-with-schemas",
test_resources_dir,
tmp_path,
"dbt_with_schemas_mces.json",
"dbt_with_schemas_mces_golden.json",
source_config_modifiers={"load_schemas": True},
),
DbtTestConfig(
"dbt-test-without-schemas",
test_resources_dir,
tmp_path,
"dbt_without_schemas_mces.json",
"dbt_without_schemas_mces_golden.json",
source_config_modifiers={"load_schemas": False},
),
]
mce_helpers.check_golden_file(
pytestconfig,
output_path=tmp_path / "dbt_mces.json",
golden_path=test_resources_dir / "dbt_mces_golden.json",
)
for config in config_variants:
# test manifest, catalog, sources are generated from https://github.com/kevinhu/sample-dbt
pipeline = Pipeline.create(
{
"run_id": config.run_id,
"source": {"type": "dbt", "config": config.source_config},
"sink": {
"type": "file",
"config": config.sink_config,
},
}
)
pipeline.run()
pipeline.raise_from_status()
mce_helpers.check_golden_file(
pytestconfig,
output_path=config.output_path,
golden_path=config.golden_path,
)