From 7aacfe032c28a063d7d3a1331dbc06cc95842cd7 Mon Sep 17 00:00:00 2001 From: Pere Miquel Brull Date: Mon, 2 Dec 2024 17:17:21 +0100 Subject: [PATCH] MINOR - FQN encoding in ometa_api, TestSuite pipeline creation & serialization of test case results (#18877) * DOCS - Update ES config * MINOR - Add missing FQN encoding & force types * MINOR - Add missing FQN encoding & force types * format * fix tests --- .../validations/base_test_handler.py | 8 +++--- .../param_setter_factory.py | 4 +-- .../ometa/mixins/data_insight_mixin.py | 5 ++-- .../ometa/mixins/ingestion_pipeline_mixin.py | 8 +++--- .../ometa/mixins/suggestions_mixin.py | 6 ++--- .../ingestion/ometa/mixins/table_mixin.py | 2 +- ingestion/src/metadata/utils/class_helper.py | 7 +++++ .../src/metadata/workflow/data_quality.py | 6 ++++- .../automations/test_connection_automation.py | 7 ++++- .../tests/integration/datalake/conftest.py | 26 ++++++++++++++----- .../datalake/resources/{ => users}/users.csv | 0 .../integration/datalake/test_data_quality.py | 21 ++++++++++++++- .../integration/datalake/test_ingestion.py | 4 +-- .../connectors/search/configure-ingestion.md | 2 ++ .../yaml/search/source-config-def.md | 4 +++ .../connectors/yaml/search/source-config.md | 6 +++-- 16 files changed, 88 insertions(+), 28 deletions(-) rename ingestion/tests/integration/datalake/resources/{ => users}/users.csv (100%) diff --git a/ingestion/src/metadata/data_quality/validations/base_test_handler.py b/ingestion/src/metadata/data_quality/validations/base_test_handler.py index ddd49e1dc9c..feed14dccd0 100644 --- a/ingestion/src/metadata/data_quality/validations/base_test_handler.py +++ b/ingestion/src/metadata/data_quality/validations/base_test_handler.py @@ -118,10 +118,10 @@ class BaseTestValidator(ABC): failed_rows = ( failed_rows if failed_rows is not None else (row_count - passed_rows) ) - test_case_result.passedRows = passed_rows - test_case_result.failedRows = failed_rows - test_case_result.passedRowsPercentage = (passed_rows / row_count) * 100 - test_case_result.failedRowsPercentage = (failed_rows / row_count) * 100 # type: ignore + test_case_result.passedRows = int(passed_rows) + test_case_result.failedRows = int(failed_rows) + test_case_result.passedRowsPercentage = float(passed_rows / row_count) * 100 + test_case_result.failedRowsPercentage = float(failed_rows / row_count) * 100 # type: ignore return test_case_result diff --git a/ingestion/src/metadata/data_quality/validations/runtime_param_setter/param_setter_factory.py b/ingestion/src/metadata/data_quality/validations/runtime_param_setter/param_setter_factory.py index be1a0abc291..5bd6b3180bb 100644 --- a/ingestion/src/metadata/data_quality/validations/runtime_param_setter/param_setter_factory.py +++ b/ingestion/src/metadata/data_quality/validations/runtime_param_setter/param_setter_factory.py @@ -27,7 +27,7 @@ from metadata.data_quality.validations.table.sqlalchemy.tableDiff import ( ) from metadata.generated.schema.entity.data.table import Table from metadata.ingestion.ometa.ometa_api import OpenMetadata -from metadata.sampler.sqlalchemy.sampler import SQASampler +from metadata.sampler.sampler_interface import SamplerInterface def removesuffix(s: str, suffix: str) -> str: @@ -68,7 +68,7 @@ class RuntimeParameterSetterFactory: ometa: OpenMetadata, service_connection_config, table_entity: Table, - sampler: SQASampler, + sampler: SamplerInterface, ) -> Set[RuntimeParameterSetter]: """Get the runtime parameter setter""" return { diff --git a/ingestion/src/metadata/ingestion/ometa/mixins/data_insight_mixin.py b/ingestion/src/metadata/ingestion/ometa/mixins/data_insight_mixin.py index 54018f1f0f9..cab9bdf8c66 100644 --- a/ingestion/src/metadata/ingestion/ometa/mixins/data_insight_mixin.py +++ b/ingestion/src/metadata/ingestion/ometa/mixins/data_insight_mixin.py @@ -31,6 +31,7 @@ from metadata.generated.schema.dataInsight.dataInsightChartResult import ( ) from metadata.generated.schema.dataInsight.kpi.basic import KpiResult from metadata.generated.schema.dataInsight.kpi.kpi import Kpi +from metadata.ingestion.ometa.utils import quote class DataInsightMixin: @@ -58,7 +59,7 @@ class DataInsightMixin: record (ReportData): report data """ - resp = self.client.put(f"/kpi/{fqn}/kpiResult", record.model_dump_json()) + resp = self.client.put(f"/kpi/{quote(fqn)}/kpiResult", record.model_dump_json()) return resp @@ -143,7 +144,7 @@ class DataInsightMixin: params = {"startTs": start_ts, "endTs": end_ts} resp = self.client.get( - f"/kpi/{fqn}/kpiResult", + f"/kpi/{quote(fqn)}/kpiResult", params, ) diff --git a/ingestion/src/metadata/ingestion/ometa/mixins/ingestion_pipeline_mixin.py b/ingestion/src/metadata/ingestion/ometa/mixins/ingestion_pipeline_mixin.py index 8dda0757a79..8b4e663c27b 100644 --- a/ingestion/src/metadata/ingestion/ometa/mixins/ingestion_pipeline_mixin.py +++ b/ingestion/src/metadata/ingestion/ometa/mixins/ingestion_pipeline_mixin.py @@ -22,6 +22,7 @@ from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipel ) from metadata.ingestion.api.parser import parse_ingestion_pipeline_config_gracefully from metadata.ingestion.ometa.client import REST +from metadata.ingestion.ometa.utils import quote from metadata.utils.logger import ometa_logger logger = ometa_logger() @@ -46,7 +47,7 @@ class OMetaIngestionPipelineMixin: :param pipeline_status: Pipeline Status data to add """ resp = self.client.put( - f"{self.get_suffix(IngestionPipeline)}/{ingestion_pipeline_fqn}/pipelineStatus", + f"{self.get_suffix(IngestionPipeline)}/{quote(ingestion_pipeline_fqn)}/pipelineStatus", data=pipeline_status.model_dump_json(), ) logger.debug( @@ -64,7 +65,8 @@ class OMetaIngestionPipelineMixin: :param pipeline_status_run_id: Pipeline Status run id """ resp = self.client.get( - f"{self.get_suffix(IngestionPipeline)}/{ingestion_pipeline_fqn}/pipelineStatus/{pipeline_status_run_id}" + f"{self.get_suffix(IngestionPipeline)}/" + f"{quote(ingestion_pipeline_fqn)}/pipelineStatus/{pipeline_status_run_id}" ) if resp: return PipelineStatus(**resp) @@ -99,7 +101,7 @@ class OMetaIngestionPipelineMixin: params = {"startTs": start_ts, "endTs": end_ts} resp = self.client.get( - f"{self.get_suffix(IngestionPipeline)}/{ingestion_pipeline_fqn}/pipelineStatus", + f"{self.get_suffix(IngestionPipeline)}/{quote(ingestion_pipeline_fqn)}/pipelineStatus", data=params, ) diff --git a/ingestion/src/metadata/ingestion/ometa/mixins/suggestions_mixin.py b/ingestion/src/metadata/ingestion/ometa/mixins/suggestions_mixin.py index 08434124940..63b7f5934ec 100644 --- a/ingestion/src/metadata/ingestion/ometa/mixins/suggestions_mixin.py +++ b/ingestion/src/metadata/ingestion/ometa/mixins/suggestions_mixin.py @@ -19,7 +19,7 @@ from metadata.generated.schema.entity.feed.suggestion import Suggestion, Suggest from metadata.generated.schema.type import basic from metadata.generated.schema.type.basic import FullyQualifiedEntityName from metadata.ingestion.ometa.client import REST -from metadata.ingestion.ometa.utils import model_str +from metadata.ingestion.ometa.utils import model_str, quote from metadata.utils.logger import ometa_logger logger = ometa_logger() @@ -65,7 +65,7 @@ class OMetaSuggestionsMixin: self.client.put( f"{self.get_suffix(Suggestion)}/accept-all?" f"userId={model_str(user_id)}&" - f"entityFQN={model_str(fqn)}&" + f"entityFQN={quote(fqn)}&" f"suggestionType={suggestion_type.value}", ) @@ -79,6 +79,6 @@ class OMetaSuggestionsMixin: self.client.put( f"{self.get_suffix(Suggestion)}/reject-all?" f"userId={model_str(user_id)}&" - f"entityFQN={model_str(fqn)}&" + f"entityFQN={quote(fqn)}&" f"suggestionType={suggestion_type.value}", ) diff --git a/ingestion/src/metadata/ingestion/ometa/mixins/table_mixin.py b/ingestion/src/metadata/ingestion/ometa/mixins/table_mixin.py index caf3d00ff52..3886b4fcbb1 100644 --- a/ingestion/src/metadata/ingestion/ometa/mixins/table_mixin.py +++ b/ingestion/src/metadata/ingestion/ometa/mixins/table_mixin.py @@ -257,7 +257,7 @@ class OMetaTableMixin: profile_type_url = profile_type.__name__[0].lower() + profile_type.__name__[1:] resp = self.client.get( - f"{self.get_suffix(Table)}/{fqn}/{profile_type_url}?limit={limit}{url_after}", + f"{self.get_suffix(Table)}/{quote(fqn)}/{profile_type_url}?limit={limit}{url_after}", data={"startTs": start_ts, "endTs": end_ts}, ) diff --git a/ingestion/src/metadata/utils/class_helper.py b/ingestion/src/metadata/utils/class_helper.py index ce2da245403..3fd0354f5c2 100644 --- a/ingestion/src/metadata/utils/class_helper.py +++ b/ingestion/src/metadata/utils/class_helper.py @@ -66,6 +66,9 @@ from metadata.generated.schema.metadataIngestion.testSuitePipeline import ( TestSuitePipeline, ) from metadata.generated.schema.metadataIngestion.workflow import SourceConfig +from metadata.generated.schema.tests.testSuite import ( + ServiceType as TestSuiteServiceType, +) SERVICE_TYPE_REF = { ServiceType.Api.value: "apiService", @@ -77,6 +80,8 @@ SERVICE_TYPE_REF = { ServiceType.Metadata.value: "metadataService", ServiceType.Search.value: "searchService", ServiceType.Storage.value: "storageService", + # We use test suites as "services" for DQ Ingestion Pipelines + TestSuiteServiceType.TestSuite.value: "testSuite", } SOURCE_CONFIG_TYPE_INGESTION = { @@ -123,6 +128,8 @@ def get_pipeline_type_from_source_config(source_config: SourceConfig) -> Pipelin def _get_service_type_from( # pylint: disable=inconsistent-return-statements service_subtype: str, ) -> ServiceType: + if service_subtype.lower() == "testsuite": + return TestSuiteServiceType.TestSuite for service_type in ServiceType: if service_subtype.lower() in [ subtype.value.lower() diff --git a/ingestion/src/metadata/workflow/data_quality.py b/ingestion/src/metadata/workflow/data_quality.py index 9587a727657..b2b5b878b3b 100644 --- a/ingestion/src/metadata/workflow/data_quality.py +++ b/ingestion/src/metadata/workflow/data_quality.py @@ -22,6 +22,7 @@ from metadata.generated.schema.entity.services.connections.serviceConnection imp from metadata.generated.schema.entity.services.databaseService import DatabaseService from metadata.generated.schema.tests.testSuite import ServiceType, TestSuite from metadata.ingestion.api.steps import Processor, Sink +from metadata.ingestion.ometa.utils import model_str from metadata.utils import fqn from metadata.utils.importer import import_sink_class from metadata.utils.logger import test_suite_logger @@ -40,6 +41,7 @@ class TestSuiteWorkflow(IngestionWorkflow): """ __test__ = False + service_type = ServiceType.TestSuite def set_steps(self): self.source = TestSuiteSource.create(self.config.model_dump(), self.metadata) @@ -113,6 +115,8 @@ class TestSuiteWorkflow(IngestionWorkflow): fqn=fqn.build( metadata=None, entity_type=TestSuite, - table_fqn=self.config.source.sourceConfig.config.entityFullyQualifiedName, + table_fqn=model_str( + self.config.source.sourceConfig.config.entityFullyQualifiedName + ), ), ) diff --git a/ingestion/tests/integration/automations/test_connection_automation.py b/ingestion/tests/integration/automations/test_connection_automation.py index 4026cf35c8d..06fbc0d0e83 100644 --- a/ingestion/tests/integration/automations/test_connection_automation.py +++ b/ingestion/tests/integration/automations/test_connection_automation.py @@ -82,7 +82,12 @@ def test_connection_workflow(metadata, mysql_container): assert final_workflow.status == WorkflowStatus.Successful assert len(final_workflow.response.steps) == 5 - assert final_workflow.response.status.value == StatusType.Successful.value + # Get queries is not passing since we're not enabling the logs in the container + assert final_workflow.response.status.value == StatusType.Failed.value + steps = [ + step for step in final_workflow.response.steps if step.name != "GetQueries" + ] + assert all(step.passed for step in steps) metadata.delete( entity=Workflow, diff --git a/ingestion/tests/integration/datalake/conftest.py b/ingestion/tests/integration/datalake/conftest.py index ce4835be5cc..febbc3fbb1f 100644 --- a/ingestion/tests/integration/datalake/conftest.py +++ b/ingestion/tests/integration/datalake/conftest.py @@ -66,7 +66,7 @@ INGESTION_CONFIG = { DATA_QUALITY_CONFIG = { "source": { - "type": "datalake", + "type": "testsuite", "serviceName": "datalake_for_integration_tests", "serviceConnection": { "config": { @@ -84,7 +84,7 @@ DATA_QUALITY_CONFIG = { "sourceConfig": { "config": { "type": "TestSuite", - "entityFullyQualifiedName": f'datalake_for_integration_tests.default.{BUCKET_NAME}."users.csv"', + "entityFullyQualifiedName": f'datalake_for_integration_tests.default.{BUCKET_NAME}."users/users.csv"', } }, }, @@ -119,6 +119,13 @@ DATA_QUALITY_CONFIG = { }, ], }, + # Helps us ensure that the passedRows and failedRows are proper ints, even when coming from Pandas + { + "name": "first_name_is_unique", + "testDefinitionName": "columnValuesToBeUnique", + "columnName": "first_name", + "computePassedFailedRowCount": True, + }, ] }, }, @@ -133,9 +140,16 @@ DATA_QUALITY_CONFIG = { }, }, }, + # Helps us validate we are properly encoding the names of Ingestion Pipelines when sending status updates + "ingestionPipelineFQN": f'datalake_for_integration_tests.default.{BUCKET_NAME}."users/users.csv".testSuite.uuid', } +@pytest.fixture(scope="session") +def ingestion_fqn(): + return f'datalake_for_integration_tests.default.{BUCKET_NAME}."users/users.csv".testSuite.uuid' + + @pytest.fixture(scope="session") def minio_container(): with get_minio_container(MinioContainerConfigs()) as container: @@ -207,7 +221,7 @@ def run_test_suite_workflow(run_ingestion, ingestion_config): @pytest.fixture(scope="class") def run_sampled_test_suite_workflow(metadata, run_ingestion, ingestion_config): metadata.create_or_update_table_profiler_config( - fqn='datalake_for_integration_tests.default.my-bucket."users.csv"', + fqn='datalake_for_integration_tests.default.my-bucket."users/users.csv"', table_profiler_config=TableProfilerConfig( profileSampleType=ProfileSampleType.PERCENTAGE, profileSample=50.0, @@ -223,7 +237,7 @@ def run_sampled_test_suite_workflow(metadata, run_ingestion, ingestion_config): ingestion_workflow.raise_from_status() ingestion_workflow.stop() metadata.create_or_update_table_profiler_config( - fqn='datalake_for_integration_tests.default.my-bucket."users.csv"', + fqn='datalake_for_integration_tests.default.my-bucket."users/users.csv"', table_profiler_config=TableProfilerConfig( profileSampleType=ProfileSampleType.PERCENTAGE, profileSample=100.0, @@ -234,7 +248,7 @@ def run_sampled_test_suite_workflow(metadata, run_ingestion, ingestion_config): @pytest.fixture(scope="class") def run_partitioned_test_suite_workflow(metadata, run_ingestion, ingestion_config): metadata.create_or_update_table_profiler_config( - fqn='datalake_for_integration_tests.default.my-bucket."users.csv"', + fqn='datalake_for_integration_tests.default.my-bucket."users/users.csv"', table_profiler_config=TableProfilerConfig( partitioning=PartitionProfilerConfig( enablePartitioning=True, @@ -253,7 +267,7 @@ def run_partitioned_test_suite_workflow(metadata, run_ingestion, ingestion_confi ingestion_workflow.raise_from_status() ingestion_workflow.stop() metadata.create_or_update_table_profiler_config( - fqn='datalake_for_integration_tests.default.my-bucket."users.csv"', + fqn='datalake_for_integration_tests.default.my-bucket."users/users.csv"', table_profiler_config=TableProfilerConfig(partitioning=None), ) diff --git a/ingestion/tests/integration/datalake/resources/users.csv b/ingestion/tests/integration/datalake/resources/users/users.csv similarity index 100% rename from ingestion/tests/integration/datalake/resources/users.csv rename to ingestion/tests/integration/datalake/resources/users/users.csv diff --git a/ingestion/tests/integration/datalake/test_data_quality.py b/ingestion/tests/integration/datalake/test_data_quality.py index 77a6e99d9bc..3f9b91cb937 100644 --- a/ingestion/tests/integration/datalake/test_data_quality.py +++ b/ingestion/tests/integration/datalake/test_data_quality.py @@ -15,6 +15,10 @@ from typing import List import pytest +from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import ( + IngestionPipeline, + PipelineState, +) from metadata.generated.schema.tests.basic import TestCaseStatus from metadata.generated.schema.tests.testCase import TestCase @@ -28,7 +32,12 @@ class TestDataQuality: ], ) def test_data_quality( - self, run_test_suite_workflow, metadata, test_case_name, expected_status + self, + run_test_suite_workflow, + metadata, + test_case_name, + expected_status, + ingestion_fqn, ): test_cases: List[TestCase] = metadata.list_entities( TestCase, fields=["*"], skip_on_failure=True @@ -39,6 +48,16 @@ class TestDataQuality: assert test_case is not None assert test_case.testCaseResult.testCaseStatus == expected_status + # Check the ingestion pipeline is properly created + ingestion_pipeline: IngestionPipeline = metadata.get_by_name( + entity=IngestionPipeline, fqn=ingestion_fqn, fields=["pipelineStatuses"] + ) + assert ingestion_pipeline + assert ingestion_pipeline.pipelineStatuses + assert ( + ingestion_pipeline.pipelineStatuses.pipelineState == PipelineState.success + ) + @pytest.mark.parametrize( "test_case_name,failed_rows", [ diff --git a/ingestion/tests/integration/datalake/test_ingestion.py b/ingestion/tests/integration/datalake/test_ingestion.py index e8b2dd38247..fbea3583b9d 100644 --- a/ingestion/tests/integration/datalake/test_ingestion.py +++ b/ingestion/tests/integration/datalake/test_ingestion.py @@ -44,7 +44,7 @@ class TestDatalake: "names.json", "names.jsonl", "new_users.parquet", - "users.csv", + "users/users.csv", "profiler_test_.csv", } == set(names) @@ -58,7 +58,7 @@ class TestDatalake: """Also excluding the test for parquet files until the above is fixed""" csv_ = self.metadata.get_by_name( entity=Table, - fqn=f'datalake_for_integration_tests.default.{BUCKET_NAME}."users.csv"', + fqn=f'datalake_for_integration_tests.default.{BUCKET_NAME}."users/users.csv"', fields=["tableProfilerConfig"], ) # parquet_ = self.metadata.get_by_name( diff --git a/openmetadata-docs/content/partials/v1.6/connectors/search/configure-ingestion.md b/openmetadata-docs/content/partials/v1.6/connectors/search/configure-ingestion.md index 5fb2ee72058..d9187138b34 100644 --- a/openmetadata-docs/content/partials/v1.6/connectors/search/configure-ingestion.md +++ b/openmetadata-docs/content/partials/v1.6/connectors/search/configure-ingestion.md @@ -28,6 +28,8 @@ caption="Configure Metadata Ingestion Page" /%} - **Exclude**: Explicitly exclude search index by adding a list of comma-separated regular expressions to the Exclude field. OpenMetadata will exclude all search indexes with names matching one or more of the supplied regular expressions. All other schemas will be included. - **Include Sample Data (toggle)**: Set the Ingest Sample Data toggle to control whether to ingest sample data as part of metadata ingestion. - **Sample Size**: If include sample data is enabled, 10 records will be ingested by default. Using this field you can customize the size of sample data. +- **Include Index Templates (toggle)**: Set the Include Index Templates toggle to control whether to include index templates as part of metadata ingestion. +- **Override Metadata**: Set the Override Metadata toggle to control whether to override the metadata if it already exists. - **Enable Debug Log (toggle)**: Set the Enable Debug Log toggle to set the default log level to debug. diff --git a/openmetadata-docs/content/partials/v1.6/connectors/yaml/search/source-config-def.md b/openmetadata-docs/content/partials/v1.6/connectors/yaml/search/source-config-def.md index b6c571a77aa..56b61b49a40 100644 --- a/openmetadata-docs/content/partials/v1.6/connectors/yaml/search/source-config-def.md +++ b/openmetadata-docs/content/partials/v1.6/connectors/yaml/search/source-config-def.md @@ -12,4 +12,8 @@ The `sourceConfig` is defined [here](https://github.com/open-metadata/OpenMetada **searchIndexFilterPattern**: Note that the `searchIndexFilterPattern` support regex to include or exclude search indexes during metadata ingestion process. +**includeIndexTemplate**: Set the Include Index Templates toggle to control whether to include index templates as part of metadata ingestion. + +**overrideMetadata**: Set the Override Metadata toggle to control whether to override the metadata if it already exists. + {% /codeInfo %} \ No newline at end of file diff --git a/openmetadata-docs/content/partials/v1.6/connectors/yaml/search/source-config.md b/openmetadata-docs/content/partials/v1.6/connectors/yaml/search/source-config.md index f848a0fc654..e0a31ae2c0a 100644 --- a/openmetadata-docs/content/partials/v1.6/connectors/yaml/search/source-config.md +++ b/openmetadata-docs/content/partials/v1.6/connectors/yaml/search/source-config.md @@ -2,8 +2,8 @@ sourceConfig: config: type: SearchMetadata - # markDeletedSearchIndexes: True - # includeSampleData: True + # markDeletedSearchIndexes: true + # includeSampleData: false # sampleSize: 10 # searchIndexFilterPattern: # includes: @@ -12,4 +12,6 @@ # excludes: # - index4 # - index3 + # includeIndexTemplates: false + # overrideMetadata: false ``` \ No newline at end of file