2021-06-22 11:33:22 -07:00
|
|
|
import json
|
2022-03-31 03:50:26 +02:00
|
|
|
from pathlib import Path
|
2022-08-16 05:50:45 +02:00
|
|
|
from typing import Any, Dict, Optional, Tuple, Type, cast
|
|
|
|
from unittest.mock import patch
|
2021-04-04 19:00:27 +01:00
|
|
|
|
2023-02-28 10:41:54 -08:00
|
|
|
import pydantic
|
2022-01-17 14:21:53 -08:00
|
|
|
import pytest
|
2021-04-04 19:00:27 +01:00
|
|
|
from botocore.stub import Stubber
|
|
|
|
from freezegun import freeze_time
|
|
|
|
|
2021-04-05 19:11:28 -07:00
|
|
|
from datahub.ingestion.api.common import PipelineContext
|
2022-01-17 14:21:53 -08:00
|
|
|
from datahub.ingestion.extractor.schema_util import avro_schema_to_mce_fields
|
2022-12-28 19:28:38 -05:00
|
|
|
from datahub.ingestion.sink.file import write_metadata_file
|
2022-01-17 14:21:53 -08:00
|
|
|
from datahub.ingestion.source.aws.glue import GlueSource, GlueSourceConfig
|
2022-08-16 05:50:45 +02:00
|
|
|
from datahub.ingestion.source.state.sql_common_state import (
|
|
|
|
BaseSQLAlchemyCheckpointState,
|
|
|
|
)
|
2021-04-04 19:00:27 +01:00
|
|
|
from datahub.metadata.com.linkedin.pegasus2avro.schema import (
|
|
|
|
ArrayTypeClass,
|
|
|
|
MapTypeClass,
|
2022-01-17 14:21:53 -08:00
|
|
|
RecordTypeClass,
|
2021-04-04 19:00:27 +01:00
|
|
|
StringTypeClass,
|
|
|
|
)
|
2022-01-17 14:21:53 -08:00
|
|
|
from datahub.utilities.hive_schema_to_avro import get_avro_schema_for_hive_column
|
2021-06-22 11:33:22 -07:00
|
|
|
from tests.test_helpers import mce_helpers
|
2022-08-16 05:50:45 +02:00
|
|
|
from tests.test_helpers.state_helpers import (
|
2023-05-24 01:27:57 +05:30
|
|
|
get_current_checkpoint_from_pipeline,
|
2022-08-16 05:50:45 +02:00
|
|
|
run_and_get_pipeline,
|
|
|
|
validate_all_providers_have_committed_successfully,
|
|
|
|
)
|
2022-03-31 03:50:26 +02:00
|
|
|
from tests.test_helpers.type_helpers import PytestConfig
|
2021-06-22 11:33:22 -07:00
|
|
|
from tests.unit.test_glue_source_stubs import (
|
2022-09-14 21:25:09 +02:00
|
|
|
databases_1,
|
|
|
|
databases_2,
|
2022-04-29 04:09:06 -04:00
|
|
|
get_bucket_tagging,
|
2024-05-17 14:21:35 +02:00
|
|
|
get_databases_delta_response,
|
2021-06-22 11:33:22 -07:00
|
|
|
get_databases_response,
|
2023-04-21 18:42:32 +01:00
|
|
|
get_databases_response_with_resource_link,
|
2021-06-22 11:33:22 -07:00
|
|
|
get_dataflow_graph_response_1,
|
|
|
|
get_dataflow_graph_response_2,
|
2024-05-17 14:21:35 +02:00
|
|
|
get_delta_tables_response_1,
|
|
|
|
get_delta_tables_response_2,
|
2021-06-22 11:33:22 -07:00
|
|
|
get_jobs_response,
|
2024-05-17 14:21:35 +02:00
|
|
|
get_jobs_response_empty,
|
2021-06-22 11:33:22 -07:00
|
|
|
get_object_body_1,
|
|
|
|
get_object_body_2,
|
|
|
|
get_object_response_1,
|
|
|
|
get_object_response_2,
|
2022-04-29 04:09:06 -04:00
|
|
|
get_object_tagging,
|
2021-06-22 11:33:22 -07:00
|
|
|
get_tables_response_1,
|
|
|
|
get_tables_response_2,
|
2023-04-21 18:42:32 +01:00
|
|
|
get_tables_response_for_target_database,
|
|
|
|
resource_link_database,
|
2022-08-16 05:50:45 +02:00
|
|
|
tables_1,
|
|
|
|
tables_2,
|
2023-04-21 18:42:32 +01:00
|
|
|
target_database_tables,
|
2021-04-04 19:00:27 +01:00
|
|
|
)
|
|
|
|
|
|
|
|
FROZEN_TIME = "2020-04-14 07:00:00"
|
2022-08-16 05:50:45 +02:00
|
|
|
GMS_PORT = 8080
|
|
|
|
GMS_SERVER = f"http://localhost:{GMS_PORT}"
|
2021-04-04 19:00:27 +01:00
|
|
|
|
|
|
|
|
2024-05-17 14:21:35 +02:00
|
|
|
def glue_source(
|
|
|
|
platform_instance: Optional[str] = None,
|
|
|
|
use_s3_bucket_tags: bool = True,
|
|
|
|
use_s3_object_tags: bool = True,
|
|
|
|
extract_delta_schema_from_parameters: bool = False,
|
|
|
|
) -> GlueSource:
|
2021-06-22 11:33:22 -07:00
|
|
|
return GlueSource(
|
2021-04-05 19:11:28 -07:00
|
|
|
ctx=PipelineContext(run_id="glue-source-test"),
|
2022-03-31 03:50:26 +02:00
|
|
|
config=GlueSourceConfig(
|
|
|
|
aws_region="us-west-2",
|
|
|
|
extract_transforms=True,
|
|
|
|
platform_instance=platform_instance,
|
2024-05-17 14:21:35 +02:00
|
|
|
use_s3_bucket_tags=use_s3_bucket_tags,
|
|
|
|
use_s3_object_tags=use_s3_object_tags,
|
|
|
|
extract_delta_schema_from_parameters=extract_delta_schema_from_parameters,
|
2022-03-31 03:50:26 +02:00
|
|
|
),
|
2021-04-05 19:11:28 -07:00
|
|
|
)
|
2021-04-04 19:00:27 +01:00
|
|
|
|
|
|
|
|
2022-01-17 14:21:53 -08:00
|
|
|
column_type_test_cases: Dict[str, Tuple[str, Type]] = {
|
|
|
|
"char": ("char", StringTypeClass),
|
|
|
|
"array": ("array<int>", ArrayTypeClass),
|
|
|
|
"map": ("map<string, int>", MapTypeClass),
|
|
|
|
"struct": ("struct<a:int, b:string>", RecordTypeClass),
|
|
|
|
}
|
2021-04-04 19:00:27 +01:00
|
|
|
|
|
|
|
|
2022-01-17 14:21:53 -08:00
|
|
|
@pytest.mark.parametrize(
|
|
|
|
"hive_column_type, expected_type",
|
|
|
|
column_type_test_cases.values(),
|
|
|
|
ids=column_type_test_cases.keys(),
|
|
|
|
)
|
|
|
|
def test_column_type(hive_column_type: str, expected_type: Type) -> None:
|
|
|
|
avro_schema = get_avro_schema_for_hive_column(
|
|
|
|
f"test_column_{hive_column_type}", hive_column_type
|
|
|
|
)
|
|
|
|
schema_fields = avro_schema_to_mce_fields(json.dumps(avro_schema))
|
|
|
|
actual_schema_field_type = schema_fields[0].type
|
2023-07-31 16:13:07 +02:00
|
|
|
assert isinstance(actual_schema_field_type.type, expected_type)
|
2021-04-04 19:00:27 +01:00
|
|
|
|
2021-04-14 19:25:57 -07:00
|
|
|
|
2022-03-31 03:50:26 +02:00
|
|
|
@pytest.mark.parametrize(
|
|
|
|
"platform_instance, mce_file, mce_golden_file",
|
|
|
|
[
|
|
|
|
(None, "glue_mces.json", "glue_mces_golden.json"),
|
|
|
|
(
|
|
|
|
"some_instance_name",
|
|
|
|
"glue_mces_platform_instance.json",
|
|
|
|
"glue_mces_platform_instance_golden.json",
|
|
|
|
),
|
|
|
|
],
|
|
|
|
)
|
2021-06-22 11:33:22 -07:00
|
|
|
@freeze_time(FROZEN_TIME)
|
2022-03-31 03:50:26 +02:00
|
|
|
def test_glue_ingest(
|
|
|
|
tmp_path: Path,
|
|
|
|
pytestconfig: PytestConfig,
|
|
|
|
platform_instance: str,
|
|
|
|
mce_file: str,
|
|
|
|
mce_golden_file: str,
|
|
|
|
) -> None:
|
|
|
|
glue_source_instance = glue_source(platform_instance=platform_instance)
|
2021-06-22 11:33:22 -07:00
|
|
|
|
|
|
|
with Stubber(glue_source_instance.glue_client) as glue_stubber:
|
|
|
|
glue_stubber.add_response("get_databases", get_databases_response, {})
|
|
|
|
glue_stubber.add_response(
|
|
|
|
"get_tables",
|
|
|
|
get_tables_response_1,
|
|
|
|
{"DatabaseName": "flights-database"},
|
|
|
|
)
|
|
|
|
glue_stubber.add_response(
|
|
|
|
"get_tables",
|
|
|
|
get_tables_response_2,
|
|
|
|
{"DatabaseName": "test-database"},
|
|
|
|
)
|
2024-07-03 08:13:12 +02:00
|
|
|
glue_stubber.add_response(
|
|
|
|
"get_tables",
|
|
|
|
{"TableList": []},
|
|
|
|
{"DatabaseName": "empty-database"},
|
|
|
|
)
|
2021-06-22 11:33:22 -07:00
|
|
|
glue_stubber.add_response("get_jobs", get_jobs_response, {})
|
|
|
|
glue_stubber.add_response(
|
|
|
|
"get_dataflow_graph",
|
|
|
|
get_dataflow_graph_response_1,
|
|
|
|
{"PythonScript": get_object_body_1},
|
|
|
|
)
|
|
|
|
glue_stubber.add_response(
|
|
|
|
"get_dataflow_graph",
|
|
|
|
get_dataflow_graph_response_2,
|
|
|
|
{"PythonScript": get_object_body_2},
|
2021-04-04 19:00:27 +01:00
|
|
|
)
|
|
|
|
|
2021-06-22 11:33:22 -07:00
|
|
|
with Stubber(glue_source_instance.s3_client) as s3_stubber:
|
2022-04-29 04:09:06 -04:00
|
|
|
for _ in range(
|
|
|
|
len(get_tables_response_1["TableList"])
|
|
|
|
+ len(get_tables_response_2["TableList"])
|
|
|
|
):
|
|
|
|
s3_stubber.add_response(
|
|
|
|
"get_bucket_tagging",
|
|
|
|
get_bucket_tagging(),
|
|
|
|
)
|
|
|
|
s3_stubber.add_response(
|
|
|
|
"get_object_tagging",
|
|
|
|
get_object_tagging(),
|
|
|
|
)
|
|
|
|
|
2021-06-22 11:33:22 -07:00
|
|
|
s3_stubber.add_response(
|
|
|
|
"get_object",
|
2022-03-31 03:50:26 +02:00
|
|
|
get_object_response_1(),
|
2021-06-22 11:33:22 -07:00
|
|
|
{
|
|
|
|
"Bucket": "aws-glue-assets-123412341234-us-west-2",
|
|
|
|
"Key": "scripts/job-1.py",
|
|
|
|
},
|
|
|
|
)
|
|
|
|
s3_stubber.add_response(
|
|
|
|
"get_object",
|
2022-03-31 03:50:26 +02:00
|
|
|
get_object_response_2(),
|
2021-06-22 11:33:22 -07:00
|
|
|
{
|
|
|
|
"Bucket": "aws-glue-assets-123412341234-us-west-2",
|
|
|
|
"Key": "scripts/job-2.py",
|
|
|
|
},
|
|
|
|
)
|
|
|
|
|
2022-12-28 19:28:38 -05:00
|
|
|
mce_objects = [wu.metadata for wu in glue_source_instance.get_workunits()]
|
2021-06-22 11:33:22 -07:00
|
|
|
|
2022-03-31 03:50:26 +02:00
|
|
|
glue_stubber.assert_no_pending_responses()
|
|
|
|
s3_stubber.assert_no_pending_responses()
|
|
|
|
|
2022-12-28 19:28:38 -05:00
|
|
|
write_metadata_file(tmp_path / mce_file, mce_objects)
|
2021-06-22 11:33:22 -07:00
|
|
|
|
2021-06-30 16:53:20 -07:00
|
|
|
# Verify the output.
|
2021-06-22 11:33:22 -07:00
|
|
|
test_resources_dir = pytestconfig.rootpath / "tests/unit/glue"
|
2021-06-30 16:53:20 -07:00
|
|
|
mce_helpers.check_golden_file(
|
|
|
|
pytestconfig,
|
2022-03-31 03:50:26 +02:00
|
|
|
output_path=tmp_path / mce_file,
|
|
|
|
golden_path=test_resources_dir / mce_golden_file,
|
2021-06-22 11:33:22 -07:00
|
|
|
)
|
2021-08-06 22:49:21 +05:30
|
|
|
|
|
|
|
|
2023-02-28 10:41:54 -08:00
|
|
|
def test_platform_config():
|
2021-08-06 22:49:21 +05:30
|
|
|
source = GlueSource(
|
|
|
|
ctx=PipelineContext(run_id="glue-source-test"),
|
2023-02-28 10:41:54 -08:00
|
|
|
config=GlueSourceConfig(aws_region="us-west-2", platform="athena"),
|
2021-08-06 22:49:21 +05:30
|
|
|
)
|
2022-03-31 03:50:26 +02:00
|
|
|
assert source.platform == "athena"
|
2021-08-06 22:49:21 +05:30
|
|
|
|
|
|
|
|
2023-04-21 18:42:32 +01:00
|
|
|
@pytest.mark.parametrize(
|
|
|
|
"ignore_resource_links, all_databases_and_tables_result",
|
|
|
|
[
|
|
|
|
(True, ({}, [])),
|
|
|
|
(False, ({"test-database": resource_link_database}, target_database_tables)),
|
|
|
|
],
|
|
|
|
)
|
|
|
|
def test_ignore_resource_links(ignore_resource_links, all_databases_and_tables_result):
|
|
|
|
source = GlueSource(
|
|
|
|
ctx=PipelineContext(run_id="glue-source-test"),
|
|
|
|
config=GlueSourceConfig(
|
|
|
|
aws_region="eu-west-1",
|
|
|
|
ignore_resource_links=ignore_resource_links,
|
|
|
|
),
|
|
|
|
)
|
|
|
|
|
|
|
|
with Stubber(source.glue_client) as glue_stubber:
|
|
|
|
glue_stubber.add_response(
|
|
|
|
"get_databases",
|
|
|
|
get_databases_response_with_resource_link,
|
|
|
|
{},
|
|
|
|
)
|
|
|
|
glue_stubber.add_response(
|
|
|
|
"get_tables",
|
|
|
|
get_tables_response_for_target_database,
|
|
|
|
{"DatabaseName": "test-database"},
|
|
|
|
)
|
|
|
|
|
|
|
|
assert source.get_all_databases_and_tables() == all_databases_and_tables_result
|
|
|
|
|
|
|
|
|
2022-03-31 03:50:26 +02:00
|
|
|
def test_platform_must_be_valid():
|
2023-02-28 10:41:54 -08:00
|
|
|
with pytest.raises(pydantic.ValidationError):
|
2022-03-31 03:50:26 +02:00
|
|
|
GlueSource(
|
|
|
|
ctx=PipelineContext(run_id="glue-source-test"),
|
|
|
|
config=GlueSourceConfig(aws_region="us-west-2", platform="data-warehouse"),
|
|
|
|
)
|
2021-08-06 22:49:21 +05:30
|
|
|
|
|
|
|
|
2023-02-28 10:41:54 -08:00
|
|
|
def test_config_without_platform():
|
2021-08-06 22:49:21 +05:30
|
|
|
source = GlueSource(
|
|
|
|
ctx=PipelineContext(run_id="glue-source-test"),
|
|
|
|
config=GlueSourceConfig(aws_region="us-west-2"),
|
|
|
|
)
|
2022-03-31 03:50:26 +02:00
|
|
|
assert source.platform == "glue"
|
2022-08-16 05:50:45 +02:00
|
|
|
|
|
|
|
|
|
|
|
@freeze_time(FROZEN_TIME)
|
|
|
|
def test_glue_stateful(pytestconfig, tmp_path, mock_time, mock_datahub_graph):
|
|
|
|
test_resources_dir = pytestconfig.rootpath / "tests/unit/glue"
|
|
|
|
|
|
|
|
deleted_actor_golden_mcs = "{}/glue_deleted_actor_mces_golden.json".format(
|
|
|
|
test_resources_dir
|
|
|
|
)
|
|
|
|
|
|
|
|
stateful_config = {
|
|
|
|
"stateful_ingestion": {
|
|
|
|
"enabled": True,
|
|
|
|
"remove_stale_metadata": True,
|
2022-09-22 16:09:22 -07:00
|
|
|
"fail_safe_threshold": 100.0,
|
2022-08-16 05:50:45 +02:00
|
|
|
"state_provider": {
|
|
|
|
"type": "datahub",
|
|
|
|
"config": {"datahub_api": {"server": GMS_SERVER}},
|
|
|
|
},
|
|
|
|
},
|
|
|
|
}
|
|
|
|
|
|
|
|
source_config_dict: Dict[str, Any] = {
|
|
|
|
"extract_transforms": False,
|
|
|
|
"aws_region": "eu-east-1",
|
|
|
|
**stateful_config,
|
|
|
|
}
|
|
|
|
|
|
|
|
pipeline_config_dict: Dict[str, Any] = {
|
|
|
|
"source": {
|
|
|
|
"type": "glue",
|
|
|
|
"config": source_config_dict,
|
|
|
|
},
|
|
|
|
"sink": {
|
|
|
|
# we are not really interested in the resulting events for this test
|
|
|
|
"type": "console"
|
|
|
|
},
|
|
|
|
"pipeline_name": "statefulpipeline",
|
|
|
|
}
|
|
|
|
|
|
|
|
with patch(
|
|
|
|
"datahub.ingestion.source.state_provider.datahub_ingestion_checkpointing_provider.DataHubGraph",
|
|
|
|
mock_datahub_graph,
|
|
|
|
) as mock_checkpoint:
|
|
|
|
mock_checkpoint.return_value = mock_datahub_graph
|
|
|
|
with patch(
|
2023-04-21 18:42:32 +01:00
|
|
|
"datahub.ingestion.source.aws.glue.GlueSource.get_all_databases_and_tables",
|
|
|
|
) as mock_get_all_databases_and_tables:
|
2022-08-16 05:50:45 +02:00
|
|
|
tables_on_first_call = tables_1
|
|
|
|
tables_on_second_call = tables_2
|
2023-04-21 18:42:32 +01:00
|
|
|
mock_get_all_databases_and_tables.side_effect = [
|
2022-09-14 21:25:09 +02:00
|
|
|
(databases_1, tables_on_first_call),
|
|
|
|
(databases_2, tables_on_second_call),
|
2022-08-16 05:50:45 +02:00
|
|
|
]
|
|
|
|
|
|
|
|
pipeline_run1 = run_and_get_pipeline(pipeline_config_dict)
|
|
|
|
checkpoint1 = get_current_checkpoint_from_pipeline(pipeline_run1)
|
|
|
|
|
|
|
|
assert checkpoint1
|
|
|
|
assert checkpoint1.state
|
|
|
|
|
|
|
|
# Capture MCEs of second run to validate Status(removed=true)
|
|
|
|
deleted_mces_path = "{}/{}".format(tmp_path, "glue_deleted_mces.json")
|
|
|
|
pipeline_config_dict["sink"]["type"] = "file"
|
|
|
|
pipeline_config_dict["sink"]["config"] = {"filename": deleted_mces_path}
|
|
|
|
|
|
|
|
# Do the second run of the pipeline.
|
|
|
|
pipeline_run2 = run_and_get_pipeline(pipeline_config_dict)
|
|
|
|
checkpoint2 = get_current_checkpoint_from_pipeline(pipeline_run2)
|
|
|
|
|
|
|
|
assert checkpoint2
|
|
|
|
assert checkpoint2.state
|
|
|
|
|
|
|
|
# Validate that all providers have committed successfully.
|
|
|
|
validate_all_providers_have_committed_successfully(
|
|
|
|
pipeline=pipeline_run1, expected_providers=1
|
|
|
|
)
|
|
|
|
validate_all_providers_have_committed_successfully(
|
|
|
|
pipeline=pipeline_run2, expected_providers=1
|
|
|
|
)
|
|
|
|
|
|
|
|
# Validate against golden MCEs where Status(removed=true)
|
|
|
|
mce_helpers.check_golden_file(
|
|
|
|
pytestconfig,
|
|
|
|
output_path=deleted_mces_path,
|
|
|
|
golden_path=deleted_actor_golden_mcs,
|
|
|
|
)
|
|
|
|
|
|
|
|
# Perform all assertions on the states. The deleted table should not be
|
|
|
|
# part of the second state
|
|
|
|
state1 = cast(BaseSQLAlchemyCheckpointState, checkpoint1.state)
|
|
|
|
state2 = cast(BaseSQLAlchemyCheckpointState, checkpoint2.state)
|
2023-01-31 00:51:21 -05:00
|
|
|
difference_urns = set(
|
2022-12-13 04:05:57 -05:00
|
|
|
state1.get_urns_not_in(type="*", other_checkpoint_state=state2)
|
2022-09-14 09:30:42 -07:00
|
|
|
)
|
2023-01-31 00:51:21 -05:00
|
|
|
assert difference_urns == {
|
|
|
|
"urn:li:dataset:(urn:li:dataPlatform:glue,flights-database.avro,PROD)",
|
|
|
|
"urn:li:container:0b9f1f731ecf6743be6207fec3dc9cba",
|
|
|
|
}
|
2024-05-17 14:21:35 +02:00
|
|
|
|
|
|
|
|
|
|
|
def test_glue_with_delta_schema_ingest(
|
|
|
|
tmp_path: Path,
|
|
|
|
pytestconfig: PytestConfig,
|
|
|
|
) -> None:
|
|
|
|
glue_source_instance = glue_source(
|
|
|
|
platform_instance="delta_platform_instance",
|
|
|
|
use_s3_bucket_tags=False,
|
|
|
|
use_s3_object_tags=False,
|
|
|
|
extract_delta_schema_from_parameters=True,
|
|
|
|
)
|
|
|
|
|
|
|
|
with Stubber(glue_source_instance.glue_client) as glue_stubber:
|
|
|
|
glue_stubber.add_response("get_databases", get_databases_delta_response, {})
|
|
|
|
glue_stubber.add_response(
|
|
|
|
"get_tables",
|
|
|
|
get_delta_tables_response_1,
|
|
|
|
{"DatabaseName": "delta-database"},
|
|
|
|
)
|
|
|
|
glue_stubber.add_response("get_jobs", get_jobs_response_empty, {})
|
|
|
|
|
|
|
|
mce_objects = [wu.metadata for wu in glue_source_instance.get_workunits()]
|
|
|
|
|
|
|
|
glue_stubber.assert_no_pending_responses()
|
|
|
|
|
|
|
|
assert glue_source_instance.get_report().num_dataset_valid_delta_schema == 1
|
|
|
|
|
|
|
|
write_metadata_file(tmp_path / "glue_delta_mces.json", mce_objects)
|
|
|
|
|
|
|
|
# Verify the output.
|
|
|
|
test_resources_dir = pytestconfig.rootpath / "tests/unit/glue"
|
|
|
|
mce_helpers.check_golden_file(
|
|
|
|
pytestconfig,
|
|
|
|
output_path=tmp_path / "glue_delta_mces.json",
|
|
|
|
golden_path=test_resources_dir / "glue_delta_mces_golden.json",
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
def test_glue_with_malformed_delta_schema_ingest(
|
|
|
|
tmp_path: Path,
|
|
|
|
pytestconfig: PytestConfig,
|
|
|
|
) -> None:
|
|
|
|
glue_source_instance = glue_source(
|
|
|
|
platform_instance="delta_platform_instance",
|
|
|
|
use_s3_bucket_tags=False,
|
|
|
|
use_s3_object_tags=False,
|
|
|
|
extract_delta_schema_from_parameters=True,
|
|
|
|
)
|
|
|
|
|
|
|
|
with Stubber(glue_source_instance.glue_client) as glue_stubber:
|
|
|
|
glue_stubber.add_response("get_databases", get_databases_delta_response, {})
|
|
|
|
glue_stubber.add_response(
|
|
|
|
"get_tables",
|
|
|
|
get_delta_tables_response_2,
|
|
|
|
{"DatabaseName": "delta-database"},
|
|
|
|
)
|
|
|
|
glue_stubber.add_response("get_jobs", get_jobs_response_empty, {})
|
|
|
|
|
|
|
|
mce_objects = [wu.metadata for wu in glue_source_instance.get_workunits()]
|
|
|
|
|
|
|
|
glue_stubber.assert_no_pending_responses()
|
|
|
|
|
|
|
|
assert glue_source_instance.get_report().num_dataset_invalid_delta_schema == 1
|
|
|
|
|
|
|
|
write_metadata_file(tmp_path / "glue_malformed_delta_mces.json", mce_objects)
|
|
|
|
|
|
|
|
# Verify the output.
|
|
|
|
test_resources_dir = pytestconfig.rootpath / "tests/unit/glue"
|
|
|
|
mce_helpers.check_golden_file(
|
|
|
|
pytestconfig,
|
|
|
|
output_path=tmp_path / "glue_malformed_delta_mces.json",
|
|
|
|
golden_path=test_resources_dir / "glue_malformed_delta_mces_golden.json",
|
|
|
|
)
|