mirror of
https://github.com/datahub-project/datahub.git
synced 2025-07-05 16:22:17 +00:00
64 lines
2.1 KiB
Python
64 lines
2.1 KiB
Python
from typing import Iterable
|
|
|
|
from datahub.emitter.mcp import MetadataChangeProposalWrapper
|
|
from datahub.ingestion.api.common import PipelineContext
|
|
from datahub.ingestion.api.workunit import MetadataWorkUnit
|
|
from datahub.ingestion.source.redshift.config import RedshiftConfig
|
|
from datahub.ingestion.source.redshift.redshift import RedshiftSource
|
|
from datahub.ingestion.source.redshift.redshift_schema import RedshiftTable
|
|
from datahub.metadata.schema_classes import (
|
|
MetadataChangeEventClass,
|
|
MetadataChangeProposalClass,
|
|
)
|
|
|
|
|
|
def redshift_source_setup(custom_props_flag: bool) -> Iterable[MetadataWorkUnit]:
|
|
config = RedshiftConfig(
|
|
host_port="localhost:5439",
|
|
database="test",
|
|
patch_custom_properties=custom_props_flag,
|
|
)
|
|
source: RedshiftSource = RedshiftSource(config, ctx=PipelineContext(run_id="test"))
|
|
gen = source.gen_dataset_workunits(
|
|
table=RedshiftTable(
|
|
name="category",
|
|
columns=[],
|
|
created=None,
|
|
comment="",
|
|
),
|
|
database="dev",
|
|
schema="public",
|
|
sub_type="test_sub_type",
|
|
custom_properties={"my_key": "my_value"},
|
|
)
|
|
return gen
|
|
|
|
|
|
def test_gen_dataset_workunits_patch_custom_properties_patch():
|
|
gen = redshift_source_setup(True)
|
|
custom_props_exist = False
|
|
for item in gen:
|
|
mcp = item.metadata
|
|
assert not isinstance(mcp, MetadataChangeEventClass)
|
|
if mcp.aspectName == "datasetProperties":
|
|
assert isinstance(mcp, MetadataChangeProposalClass)
|
|
assert mcp.changeType == "PATCH"
|
|
custom_props_exist = True
|
|
else:
|
|
assert isinstance(mcp, MetadataChangeProposalWrapper)
|
|
|
|
assert custom_props_exist
|
|
|
|
|
|
def test_gen_dataset_workunits_patch_custom_properties_upsert():
|
|
gen = redshift_source_setup(False)
|
|
custom_props_exist = False
|
|
for item in gen:
|
|
assert isinstance(item.metadata, MetadataChangeProposalWrapper)
|
|
mcp = item.metadata
|
|
if mcp.aspectName == "datasetProperties":
|
|
assert mcp.changeType == "UPSERT"
|
|
custom_props_exist = True
|
|
|
|
assert custom_props_exist
|