MINOR - FQN encoding in ometa_api, TestSuite pipeline creation & serialization of test case results (#18877)

* DOCS - Update ES config

* MINOR - Add missing FQN encoding & force types

* MINOR - Add missing FQN encoding & force types

* format

* fix tests
This commit is contained in:
Pere Miquel Brull 2024-12-02 17:17:21 +01:00 committed by GitHub
parent e715a7c26a
commit 7aacfe032c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 88 additions and 28 deletions

View File

@ -118,10 +118,10 @@ class BaseTestValidator(ABC):
failed_rows = (
failed_rows if failed_rows is not None else (row_count - passed_rows)
)
test_case_result.passedRows = passed_rows
test_case_result.failedRows = failed_rows
test_case_result.passedRowsPercentage = (passed_rows / row_count) * 100
test_case_result.failedRowsPercentage = (failed_rows / row_count) * 100 # type: ignore
test_case_result.passedRows = int(passed_rows)
test_case_result.failedRows = int(failed_rows)
test_case_result.passedRowsPercentage = float(passed_rows / row_count) * 100
test_case_result.failedRowsPercentage = float(failed_rows / row_count) * 100 # type: ignore
return test_case_result

View File

@ -27,7 +27,7 @@ from metadata.data_quality.validations.table.sqlalchemy.tableDiff import (
)
from metadata.generated.schema.entity.data.table import Table
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.sampler.sqlalchemy.sampler import SQASampler
from metadata.sampler.sampler_interface import SamplerInterface
def removesuffix(s: str, suffix: str) -> str:
@ -68,7 +68,7 @@ class RuntimeParameterSetterFactory:
ometa: OpenMetadata,
service_connection_config,
table_entity: Table,
sampler: SQASampler,
sampler: SamplerInterface,
) -> Set[RuntimeParameterSetter]:
"""Get the runtime parameter setter"""
return {

View File

@ -31,6 +31,7 @@ from metadata.generated.schema.dataInsight.dataInsightChartResult import (
)
from metadata.generated.schema.dataInsight.kpi.basic import KpiResult
from metadata.generated.schema.dataInsight.kpi.kpi import Kpi
from metadata.ingestion.ometa.utils import quote
class DataInsightMixin:
@ -58,7 +59,7 @@ class DataInsightMixin:
record (ReportData): report data
"""
resp = self.client.put(f"/kpi/{fqn}/kpiResult", record.model_dump_json())
resp = self.client.put(f"/kpi/{quote(fqn)}/kpiResult", record.model_dump_json())
return resp
@ -143,7 +144,7 @@ class DataInsightMixin:
params = {"startTs": start_ts, "endTs": end_ts}
resp = self.client.get(
f"/kpi/{fqn}/kpiResult",
f"/kpi/{quote(fqn)}/kpiResult",
params,
)

View File

@ -22,6 +22,7 @@ from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipel
)
from metadata.ingestion.api.parser import parse_ingestion_pipeline_config_gracefully
from metadata.ingestion.ometa.client import REST
from metadata.ingestion.ometa.utils import quote
from metadata.utils.logger import ometa_logger
logger = ometa_logger()
@ -46,7 +47,7 @@ class OMetaIngestionPipelineMixin:
:param pipeline_status: Pipeline Status data to add
"""
resp = self.client.put(
f"{self.get_suffix(IngestionPipeline)}/{ingestion_pipeline_fqn}/pipelineStatus",
f"{self.get_suffix(IngestionPipeline)}/{quote(ingestion_pipeline_fqn)}/pipelineStatus",
data=pipeline_status.model_dump_json(),
)
logger.debug(
@ -64,7 +65,8 @@ class OMetaIngestionPipelineMixin:
:param pipeline_status_run_id: Pipeline Status run id
"""
resp = self.client.get(
f"{self.get_suffix(IngestionPipeline)}/{ingestion_pipeline_fqn}/pipelineStatus/{pipeline_status_run_id}"
f"{self.get_suffix(IngestionPipeline)}/"
f"{quote(ingestion_pipeline_fqn)}/pipelineStatus/{pipeline_status_run_id}"
)
if resp:
return PipelineStatus(**resp)
@ -99,7 +101,7 @@ class OMetaIngestionPipelineMixin:
params = {"startTs": start_ts, "endTs": end_ts}
resp = self.client.get(
f"{self.get_suffix(IngestionPipeline)}/{ingestion_pipeline_fqn}/pipelineStatus",
f"{self.get_suffix(IngestionPipeline)}/{quote(ingestion_pipeline_fqn)}/pipelineStatus",
data=params,
)

View File

@ -19,7 +19,7 @@ from metadata.generated.schema.entity.feed.suggestion import Suggestion, Suggest
from metadata.generated.schema.type import basic
from metadata.generated.schema.type.basic import FullyQualifiedEntityName
from metadata.ingestion.ometa.client import REST
from metadata.ingestion.ometa.utils import model_str
from metadata.ingestion.ometa.utils import model_str, quote
from metadata.utils.logger import ometa_logger
logger = ometa_logger()
@ -65,7 +65,7 @@ class OMetaSuggestionsMixin:
self.client.put(
f"{self.get_suffix(Suggestion)}/accept-all?"
f"userId={model_str(user_id)}&"
f"entityFQN={model_str(fqn)}&"
f"entityFQN={quote(fqn)}&"
f"suggestionType={suggestion_type.value}",
)
@ -79,6 +79,6 @@ class OMetaSuggestionsMixin:
self.client.put(
f"{self.get_suffix(Suggestion)}/reject-all?"
f"userId={model_str(user_id)}&"
f"entityFQN={model_str(fqn)}&"
f"entityFQN={quote(fqn)}&"
f"suggestionType={suggestion_type.value}",
)

View File

@ -257,7 +257,7 @@ class OMetaTableMixin:
profile_type_url = profile_type.__name__[0].lower() + profile_type.__name__[1:]
resp = self.client.get(
f"{self.get_suffix(Table)}/{fqn}/{profile_type_url}?limit={limit}{url_after}",
f"{self.get_suffix(Table)}/{quote(fqn)}/{profile_type_url}?limit={limit}{url_after}",
data={"startTs": start_ts, "endTs": end_ts},
)

View File

@ -66,6 +66,9 @@ from metadata.generated.schema.metadataIngestion.testSuitePipeline import (
TestSuitePipeline,
)
from metadata.generated.schema.metadataIngestion.workflow import SourceConfig
from metadata.generated.schema.tests.testSuite import (
ServiceType as TestSuiteServiceType,
)
SERVICE_TYPE_REF = {
ServiceType.Api.value: "apiService",
@ -77,6 +80,8 @@ SERVICE_TYPE_REF = {
ServiceType.Metadata.value: "metadataService",
ServiceType.Search.value: "searchService",
ServiceType.Storage.value: "storageService",
# We use test suites as "services" for DQ Ingestion Pipelines
TestSuiteServiceType.TestSuite.value: "testSuite",
}
SOURCE_CONFIG_TYPE_INGESTION = {
@ -123,6 +128,8 @@ def get_pipeline_type_from_source_config(source_config: SourceConfig) -> Pipelin
def _get_service_type_from( # pylint: disable=inconsistent-return-statements
service_subtype: str,
) -> ServiceType:
if service_subtype.lower() == "testsuite":
return TestSuiteServiceType.TestSuite
for service_type in ServiceType:
if service_subtype.lower() in [
subtype.value.lower()

View File

@ -22,6 +22,7 @@ from metadata.generated.schema.entity.services.connections.serviceConnection imp
from metadata.generated.schema.entity.services.databaseService import DatabaseService
from metadata.generated.schema.tests.testSuite import ServiceType, TestSuite
from metadata.ingestion.api.steps import Processor, Sink
from metadata.ingestion.ometa.utils import model_str
from metadata.utils import fqn
from metadata.utils.importer import import_sink_class
from metadata.utils.logger import test_suite_logger
@ -40,6 +41,7 @@ class TestSuiteWorkflow(IngestionWorkflow):
"""
__test__ = False
service_type = ServiceType.TestSuite
def set_steps(self):
self.source = TestSuiteSource.create(self.config.model_dump(), self.metadata)
@ -113,6 +115,8 @@ class TestSuiteWorkflow(IngestionWorkflow):
fqn=fqn.build(
metadata=None,
entity_type=TestSuite,
table_fqn=self.config.source.sourceConfig.config.entityFullyQualifiedName,
table_fqn=model_str(
self.config.source.sourceConfig.config.entityFullyQualifiedName
),
),
)

View File

@ -82,7 +82,12 @@ def test_connection_workflow(metadata, mysql_container):
assert final_workflow.status == WorkflowStatus.Successful
assert len(final_workflow.response.steps) == 5
assert final_workflow.response.status.value == StatusType.Successful.value
# Get queries is not passing since we're not enabling the logs in the container
assert final_workflow.response.status.value == StatusType.Failed.value
steps = [
step for step in final_workflow.response.steps if step.name != "GetQueries"
]
assert all(step.passed for step in steps)
metadata.delete(
entity=Workflow,

View File

@ -66,7 +66,7 @@ INGESTION_CONFIG = {
DATA_QUALITY_CONFIG = {
"source": {
"type": "datalake",
"type": "testsuite",
"serviceName": "datalake_for_integration_tests",
"serviceConnection": {
"config": {
@ -84,7 +84,7 @@ DATA_QUALITY_CONFIG = {
"sourceConfig": {
"config": {
"type": "TestSuite",
"entityFullyQualifiedName": f'datalake_for_integration_tests.default.{BUCKET_NAME}."users.csv"',
"entityFullyQualifiedName": f'datalake_for_integration_tests.default.{BUCKET_NAME}."users/users.csv"',
}
},
},
@ -119,6 +119,13 @@ DATA_QUALITY_CONFIG = {
},
],
},
# Helps us ensure that the passedRows and failedRows are proper ints, even when coming from Pandas
{
"name": "first_name_is_unique",
"testDefinitionName": "columnValuesToBeUnique",
"columnName": "first_name",
"computePassedFailedRowCount": True,
},
]
},
},
@ -133,9 +140,16 @@ DATA_QUALITY_CONFIG = {
},
},
},
# Helps us validate we are properly encoding the names of Ingestion Pipelines when sending status updates
"ingestionPipelineFQN": f'datalake_for_integration_tests.default.{BUCKET_NAME}."users/users.csv".testSuite.uuid',
}
@pytest.fixture(scope="session")
def ingestion_fqn():
return f'datalake_for_integration_tests.default.{BUCKET_NAME}."users/users.csv".testSuite.uuid'
@pytest.fixture(scope="session")
def minio_container():
with get_minio_container(MinioContainerConfigs()) as container:
@ -207,7 +221,7 @@ def run_test_suite_workflow(run_ingestion, ingestion_config):
@pytest.fixture(scope="class")
def run_sampled_test_suite_workflow(metadata, run_ingestion, ingestion_config):
metadata.create_or_update_table_profiler_config(
fqn='datalake_for_integration_tests.default.my-bucket."users.csv"',
fqn='datalake_for_integration_tests.default.my-bucket."users/users.csv"',
table_profiler_config=TableProfilerConfig(
profileSampleType=ProfileSampleType.PERCENTAGE,
profileSample=50.0,
@ -223,7 +237,7 @@ def run_sampled_test_suite_workflow(metadata, run_ingestion, ingestion_config):
ingestion_workflow.raise_from_status()
ingestion_workflow.stop()
metadata.create_or_update_table_profiler_config(
fqn='datalake_for_integration_tests.default.my-bucket."users.csv"',
fqn='datalake_for_integration_tests.default.my-bucket."users/users.csv"',
table_profiler_config=TableProfilerConfig(
profileSampleType=ProfileSampleType.PERCENTAGE,
profileSample=100.0,
@ -234,7 +248,7 @@ def run_sampled_test_suite_workflow(metadata, run_ingestion, ingestion_config):
@pytest.fixture(scope="class")
def run_partitioned_test_suite_workflow(metadata, run_ingestion, ingestion_config):
metadata.create_or_update_table_profiler_config(
fqn='datalake_for_integration_tests.default.my-bucket."users.csv"',
fqn='datalake_for_integration_tests.default.my-bucket."users/users.csv"',
table_profiler_config=TableProfilerConfig(
partitioning=PartitionProfilerConfig(
enablePartitioning=True,
@ -253,7 +267,7 @@ def run_partitioned_test_suite_workflow(metadata, run_ingestion, ingestion_confi
ingestion_workflow.raise_from_status()
ingestion_workflow.stop()
metadata.create_or_update_table_profiler_config(
fqn='datalake_for_integration_tests.default.my-bucket."users.csv"',
fqn='datalake_for_integration_tests.default.my-bucket."users/users.csv"',
table_profiler_config=TableProfilerConfig(partitioning=None),
)

View File

Can't render this file because it contains an unexpected character in line 2 and column 42.

View File

@ -15,6 +15,10 @@ from typing import List
import pytest
from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import (
IngestionPipeline,
PipelineState,
)
from metadata.generated.schema.tests.basic import TestCaseStatus
from metadata.generated.schema.tests.testCase import TestCase
@ -28,7 +32,12 @@ class TestDataQuality:
],
)
def test_data_quality(
self, run_test_suite_workflow, metadata, test_case_name, expected_status
self,
run_test_suite_workflow,
metadata,
test_case_name,
expected_status,
ingestion_fqn,
):
test_cases: List[TestCase] = metadata.list_entities(
TestCase, fields=["*"], skip_on_failure=True
@ -39,6 +48,16 @@ class TestDataQuality:
assert test_case is not None
assert test_case.testCaseResult.testCaseStatus == expected_status
# Check the ingestion pipeline is properly created
ingestion_pipeline: IngestionPipeline = metadata.get_by_name(
entity=IngestionPipeline, fqn=ingestion_fqn, fields=["pipelineStatuses"]
)
assert ingestion_pipeline
assert ingestion_pipeline.pipelineStatuses
assert (
ingestion_pipeline.pipelineStatuses.pipelineState == PipelineState.success
)
@pytest.mark.parametrize(
"test_case_name,failed_rows",
[

View File

@ -44,7 +44,7 @@ class TestDatalake:
"names.json",
"names.jsonl",
"new_users.parquet",
"users.csv",
"users/users.csv",
"profiler_test_.csv",
} == set(names)
@ -58,7 +58,7 @@ class TestDatalake:
"""Also excluding the test for parquet files until the above is fixed"""
csv_ = self.metadata.get_by_name(
entity=Table,
fqn=f'datalake_for_integration_tests.default.{BUCKET_NAME}."users.csv"',
fqn=f'datalake_for_integration_tests.default.{BUCKET_NAME}."users/users.csv"',
fields=["tableProfilerConfig"],
)
# parquet_ = self.metadata.get_by_name(

View File

@ -28,6 +28,8 @@ caption="Configure Metadata Ingestion Page" /%}
- **Exclude**: Explicitly exclude search index by adding a list of comma-separated regular expressions to the Exclude field. OpenMetadata will exclude all search indexes with names matching one or more of the supplied regular expressions. All other schemas will be included.
- **Include Sample Data (toggle)**: Set the Ingest Sample Data toggle to control whether to ingest sample data as part of metadata ingestion.
- **Sample Size**: If include sample data is enabled, 10 records will be ingested by default. Using this field you can customize the size of sample data.
- **Include Index Templates (toggle)**: Set the Include Index Templates toggle to control whether to include index templates as part of metadata ingestion.
- **Override Metadata**: Set the Override Metadata toggle to control whether to override the metadata if it already exists.
- **Enable Debug Log (toggle)**: Set the Enable Debug Log toggle to set the default log level to debug.

View File

@ -12,4 +12,8 @@ The `sourceConfig` is defined [here](https://github.com/open-metadata/OpenMetada
**searchIndexFilterPattern**: Note that the `searchIndexFilterPattern` support regex to include or exclude search indexes during metadata ingestion process.
**includeIndexTemplate**: Set the Include Index Templates toggle to control whether to include index templates as part of metadata ingestion.
**overrideMetadata**: Set the Override Metadata toggle to control whether to override the metadata if it already exists.
{% /codeInfo %}

View File

@ -2,8 +2,8 @@
sourceConfig:
config:
type: SearchMetadata
# markDeletedSearchIndexes: True
# includeSampleData: True
# markDeletedSearchIndexes: true
# includeSampleData: false
# sampleSize: 10
# searchIndexFilterPattern:
# includes:
@ -12,4 +12,6 @@
# excludes:
# - index4
# - index3
# includeIndexTemplates: false
# overrideMetadata: false
```