79 lines
2.2 KiB
Python

import sys
import pytest
from metadata.generated.schema.entity.data.table import Table
from metadata.workflow.metadata import MetadataWorkflow
if not sys.version_info >= (3, 9):
pytest.skip("requires python 3.9+", allow_module_level=True)
@pytest.fixture(scope="module")
def prepare_mongodb(mongodbContainer):
db = mongodbContainer.get_connection_client().test
db.create_collection(
"test_table",
validator={
"$jsonSchema": {
"bsonType": "object",
"required": ["id", "name"],
"properties": {
"id": {
"bsonType": "int",
},
"name": {
"bsonType": "string",
"maxLength": 100,
},
"age": {
"bsonType": "int",
},
},
}
},
)
data = [
{"id": 1, "name": "John Doe", "age": 25},
{"id": 2, "name": "Jane Smith", "age": 30},
]
db.test_table.insert_many(data)
@pytest.mark.parametrize(
"table_fqn,expected_columns",
[
[
"{service}.test.test.test_table",
{
"id": {"type": "int", "nullable": False},
"name": {"type": "string", "nullable": False},
"age": {"type": "int", "nullable": True},
},
]
],
ids=lambda x: x.split(".")[-1] if isinstance(x, str) else "",
)
def test_ingest_metadata(
patch_passwords_for_db_services,
run_workflow,
ingestion_config,
metadata,
table_fqn,
expected_columns,
db_service,
prepare_mongodb,
):
run_workflow(MetadataWorkflow, ingestion_config)
table = metadata.get_by_name(
entity=Table, fqn=table_fqn.format(service=db_service.fullyQualifiedName.root)
)
assert table
assert table.fullyQualifiedName.root.split(".")[-1] == "test_table"
assert len(table.columns) == 4
for name, properties in expected_columns.items():
column = next((col for col in table.columns if col.name.root == name), None)
assert column is not None
assert column.dataType.name.lower() == properties["type"]