| 
									
										
										
										
											2025-04-03 10:39:47 +05:30
										 |  |  | #  Copyright 2025 Collate | 
					
						
							|  |  |  | #  Licensed under the Collate Community License, Version 1.0 (the "License"); | 
					
						
							| 
									
										
										
										
											2024-09-12 07:13:01 +02:00
										 |  |  | #  you may not use this file except in compliance with the License. | 
					
						
							|  |  |  | #  You may obtain a copy of the License at | 
					
						
							| 
									
										
										
										
											2025-04-03 10:39:47 +05:30
										 |  |  | #  https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE | 
					
						
							| 
									
										
										
										
											2024-09-12 07:13:01 +02:00
										 |  |  | #  Unless required by applicable law or agreed to in writing, software | 
					
						
							|  |  |  | #  distributed under the License is distributed on an "AS IS" BASIS, | 
					
						
							|  |  |  | #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
					
						
							|  |  |  | #  See the License for the specific language governing permissions and | 
					
						
							|  |  |  | #  limitations under the License. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | """
 | 
					
						
							|  |  |  | Test Datalake Profiler workflow | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | To run this we need OpenMetadata server up and running. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | No sample data is required beforehand | 
					
						
							|  |  |  | """
 | 
					
						
							|  |  |  | import pytest | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | from metadata.generated.schema.entity.data.table import ColumnProfile, Table | 
					
						
							|  |  |  | from metadata.utils.time_utils import ( | 
					
						
							|  |  |  |     get_beginning_of_day_timestamp_mill, | 
					
						
							|  |  |  |     get_end_of_day_timestamp_mill, | 
					
						
							|  |  |  | ) | 
					
						
							| 
									
										
										
										
											2024-11-19 08:10:45 +01:00
										 |  |  | from metadata.workflow.classification import AutoClassificationWorkflow | 
					
						
							| 
									
										
										
										
											2024-09-12 07:13:01 +02:00
										 |  |  | from metadata.workflow.profiler import ProfilerWorkflow | 
					
						
							|  |  |  | from metadata.workflow.workflow_output_handler import WorkflowResultStatus | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2025-03-27 13:11:56 +05:30
										 |  |  | from .conftest import BUCKET_NAME | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-09-12 07:13:01 +02:00
										 |  |  | 
 | 
					
						
							|  |  |  | @pytest.fixture(scope="class", autouse=True) | 
					
						
							|  |  |  | def before_each(run_ingestion): | 
					
						
							|  |  |  |     pass | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class TestDatalakeProfilerTestE2E: | 
					
						
							|  |  |  |     """datalake profiler E2E test""" | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_datalake_profiler_workflow(self, ingestion_config, metadata): | 
					
						
							|  |  |  |         ingestion_config["source"]["sourceConfig"]["config"].update( | 
					
						
							|  |  |  |             { | 
					
						
							|  |  |  |                 "type": "Profiler", | 
					
						
							|  |  |  |             } | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  |         ingestion_config["processor"] = { | 
					
						
							|  |  |  |             "type": "orm-profiler", | 
					
						
							|  |  |  |             "config": {}, | 
					
						
							|  |  |  |         } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         profiler_workflow = ProfilerWorkflow.create(ingestion_config) | 
					
						
							|  |  |  |         profiler_workflow.execute() | 
					
						
							|  |  |  |         status = profiler_workflow.result_status() | 
					
						
							|  |  |  |         profiler_workflow.stop() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         assert status == WorkflowResultStatus.SUCCESS | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         table_profile = metadata.get_profile_data( | 
					
						
							|  |  |  |             f'{ingestion_config["source"]["serviceName"]}.default.{BUCKET_NAME}."profiler_test_.csv"', | 
					
						
							|  |  |  |             get_beginning_of_day_timestamp_mill(), | 
					
						
							|  |  |  |             get_end_of_day_timestamp_mill(), | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         column_profile = metadata.get_profile_data( | 
					
						
							|  |  |  |             f'{ingestion_config["source"]["serviceName"]}.default.{BUCKET_NAME}."profiler_test_.csv".first_name', | 
					
						
							|  |  |  |             get_beginning_of_day_timestamp_mill(), | 
					
						
							|  |  |  |             get_end_of_day_timestamp_mill(), | 
					
						
							|  |  |  |             profile_type=ColumnProfile, | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         assert table_profile.entities | 
					
						
							|  |  |  |         assert column_profile.entities | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_values_partitioned_datalake_profiler_workflow( | 
					
						
							|  |  |  |         self, metadata, ingestion_config | 
					
						
							|  |  |  |     ): | 
					
						
							|  |  |  |         """Test partitioned datalake profiler workflow""" | 
					
						
							|  |  |  |         ingestion_config["source"]["sourceConfig"]["config"].update( | 
					
						
							|  |  |  |             { | 
					
						
							|  |  |  |                 "type": "Profiler", | 
					
						
							|  |  |  |             } | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  |         ingestion_config["processor"] = { | 
					
						
							|  |  |  |             "type": "orm-profiler", | 
					
						
							|  |  |  |             "config": { | 
					
						
							|  |  |  |                 "tableConfig": [ | 
					
						
							|  |  |  |                     { | 
					
						
							|  |  |  |                         "fullyQualifiedName": f'{ingestion_config["source"]["serviceName"]}.default.{BUCKET_NAME}."profiler_test_.csv"', | 
					
						
							|  |  |  |                         "partitionConfig": { | 
					
						
							|  |  |  |                             "enablePartitioning": "true", | 
					
						
							|  |  |  |                             "partitionColumnName": "first_name", | 
					
						
							|  |  |  |                             "partitionIntervalType": "COLUMN-VALUE", | 
					
						
							|  |  |  |                             "partitionValues": ["John"], | 
					
						
							|  |  |  |                         }, | 
					
						
							|  |  |  |                     } | 
					
						
							|  |  |  |                 ] | 
					
						
							|  |  |  |             }, | 
					
						
							|  |  |  |         } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         profiler_workflow = ProfilerWorkflow.create(ingestion_config) | 
					
						
							|  |  |  |         profiler_workflow.execute() | 
					
						
							|  |  |  |         status = profiler_workflow.result_status() | 
					
						
							|  |  |  |         profiler_workflow.stop() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         assert status == WorkflowResultStatus.SUCCESS | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         table = metadata.get_by_name( | 
					
						
							|  |  |  |             entity=Table, | 
					
						
							|  |  |  |             fqn=f'{ingestion_config["source"]["serviceName"]}.default.{BUCKET_NAME}."profiler_test_.csv"', | 
					
						
							|  |  |  |             fields=["tableProfilerConfig"], | 
					
						
							|  |  |  |             nullable=False, | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-11-27 08:50:54 +01:00
										 |  |  |         profile = metadata.get_latest_table_profile(table.fullyQualifiedName) | 
					
						
							|  |  |  |         table_profile = profile.profile | 
					
						
							|  |  |  |         column_profile = profile.columns[0].profile | 
					
						
							| 
									
										
										
										
											2024-09-12 07:13:01 +02:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-11-27 08:50:54 +01:00
										 |  |  |         assert table_profile.rowCount == 1.0 | 
					
						
							|  |  |  |         assert column_profile.valuesCount == 1.0 | 
					
						
							| 
									
										
										
										
											2024-09-12 07:13:01 +02:00
										 |  |  | 
 | 
					
						
							|  |  |  |     def test_datetime_partitioned_datalake_profiler_workflow( | 
					
						
							|  |  |  |         self, ingestion_config, metadata | 
					
						
							|  |  |  |     ): | 
					
						
							|  |  |  |         """Test partitioned datalake profiler workflow""" | 
					
						
							|  |  |  |         ingestion_config["source"]["sourceConfig"]["config"].update( | 
					
						
							|  |  |  |             { | 
					
						
							|  |  |  |                 "type": "Profiler", | 
					
						
							|  |  |  |             } | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  |         ingestion_config["processor"] = { | 
					
						
							|  |  |  |             "type": "orm-profiler", | 
					
						
							|  |  |  |             "config": { | 
					
						
							|  |  |  |                 "tableConfig": [ | 
					
						
							|  |  |  |                     { | 
					
						
							|  |  |  |                         "fullyQualifiedName": f'{ingestion_config["source"]["serviceName"]}.default.{BUCKET_NAME}."profiler_test_.csv"', | 
					
						
							|  |  |  |                         "partitionConfig": { | 
					
						
							|  |  |  |                             "enablePartitioning": "true", | 
					
						
							|  |  |  |                             "partitionColumnName": "birthdate", | 
					
						
							|  |  |  |                             "partitionIntervalType": "TIME-UNIT", | 
					
						
							|  |  |  |                             "partitionIntervalUnit": "YEAR", | 
					
						
							|  |  |  |                             "partitionInterval": 35, | 
					
						
							|  |  |  |                         }, | 
					
						
							|  |  |  |                     } | 
					
						
							|  |  |  |                 ], | 
					
						
							|  |  |  |             }, | 
					
						
							|  |  |  |         } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         profiler_workflow = ProfilerWorkflow.create(ingestion_config) | 
					
						
							|  |  |  |         profiler_workflow.execute() | 
					
						
							|  |  |  |         status = profiler_workflow.result_status() | 
					
						
							|  |  |  |         profiler_workflow.stop() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         assert status == WorkflowResultStatus.SUCCESS | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         table = metadata.get_by_name( | 
					
						
							|  |  |  |             entity=Table, | 
					
						
							|  |  |  |             fqn=f'{ingestion_config["source"]["serviceName"]}.default.{BUCKET_NAME}."profiler_test_.csv"', | 
					
						
							|  |  |  |             fields=["tableProfilerConfig"], | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-11-27 08:50:54 +01:00
										 |  |  |         profile = metadata.get_latest_table_profile(table.fullyQualifiedName) | 
					
						
							|  |  |  |         table_profile = profile.profile | 
					
						
							|  |  |  |         column_profile = profile.columns[0].profile | 
					
						
							| 
									
										
										
										
											2024-09-12 07:13:01 +02:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-11-27 08:50:54 +01:00
										 |  |  |         assert table_profile.rowCount == 2.0 | 
					
						
							|  |  |  |         assert column_profile.valuesCount == 2.0 | 
					
						
							| 
									
										
										
										
											2024-09-12 07:13:01 +02:00
										 |  |  | 
 | 
					
						
							|  |  |  |     def test_integer_range_partitioned_datalake_profiler_workflow( | 
					
						
							|  |  |  |         self, ingestion_config, metadata | 
					
						
							|  |  |  |     ): | 
					
						
							|  |  |  |         """Test partitioned datalake profiler workflow""" | 
					
						
							|  |  |  |         ingestion_config["source"]["sourceConfig"]["config"].update( | 
					
						
							|  |  |  |             { | 
					
						
							|  |  |  |                 "type": "Profiler", | 
					
						
							|  |  |  |             } | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  |         ingestion_config["processor"] = { | 
					
						
							|  |  |  |             "type": "orm-profiler", | 
					
						
							|  |  |  |             "config": { | 
					
						
							|  |  |  |                 "tableConfig": [ | 
					
						
							|  |  |  |                     { | 
					
						
							|  |  |  |                         "fullyQualifiedName": f'{ingestion_config["source"]["serviceName"]}.default.{BUCKET_NAME}."profiler_test_.csv"', | 
					
						
							|  |  |  |                         "profileSample": 100, | 
					
						
							|  |  |  |                         "partitionConfig": { | 
					
						
							|  |  |  |                             "enablePartitioning": "true", | 
					
						
							|  |  |  |                             "partitionColumnName": "age", | 
					
						
							|  |  |  |                             "partitionIntervalType": "INTEGER-RANGE", | 
					
						
							|  |  |  |                             "partitionIntegerRangeStart": 35, | 
					
						
							|  |  |  |                             "partitionIntegerRangeEnd": 44, | 
					
						
							|  |  |  |                         }, | 
					
						
							|  |  |  |                     } | 
					
						
							|  |  |  |                 ], | 
					
						
							|  |  |  |             }, | 
					
						
							|  |  |  |         } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         profiler_workflow = ProfilerWorkflow.create(ingestion_config) | 
					
						
							|  |  |  |         profiler_workflow.execute() | 
					
						
							|  |  |  |         status = profiler_workflow.result_status() | 
					
						
							|  |  |  |         profiler_workflow.stop() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         assert status == WorkflowResultStatus.SUCCESS | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         table = metadata.get_by_name( | 
					
						
							|  |  |  |             entity=Table, | 
					
						
							|  |  |  |             fqn=f'{ingestion_config["source"]["serviceName"]}.default.{BUCKET_NAME}."profiler_test_.csv"', | 
					
						
							|  |  |  |             fields=["tableProfilerConfig"], | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-11-27 08:50:54 +01:00
										 |  |  |         profile = metadata.get_latest_table_profile(table.fullyQualifiedName) | 
					
						
							|  |  |  |         table_profile = profile.profile | 
					
						
							|  |  |  |         column_profile = profile.columns[0].profile | 
					
						
							| 
									
										
										
										
											2024-09-12 07:13:01 +02:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-11-27 08:50:54 +01:00
										 |  |  |         assert table_profile.rowCount == 2.0 | 
					
						
							|  |  |  |         assert column_profile.valuesCount == 2.0 | 
					
						
							| 
									
										
										
										
											2024-09-12 07:13:01 +02:00
										 |  |  | 
 | 
					
						
							|  |  |  |     def test_datalake_profiler_workflow_with_custom_profiler_config( | 
					
						
							|  |  |  |         self, metadata, ingestion_config | 
					
						
							|  |  |  |     ): | 
					
						
							|  |  |  |         """Test custom profiler config return expected sample and metric computation""" | 
					
						
							|  |  |  |         profiler_metrics = [ | 
					
						
							|  |  |  |             "MIN", | 
					
						
							|  |  |  |             "MAX", | 
					
						
							|  |  |  |             "MEAN", | 
					
						
							|  |  |  |             "MEDIAN", | 
					
						
							|  |  |  |         ] | 
					
						
							|  |  |  |         id_metrics = ["MIN", "MAX"] | 
					
						
							|  |  |  |         non_metric_values = ["name", "timestamp"] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         ingestion_config["source"]["sourceConfig"]["config"].update( | 
					
						
							|  |  |  |             { | 
					
						
							|  |  |  |                 "type": "Profiler", | 
					
						
							|  |  |  |             } | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  |         ingestion_config["processor"] = { | 
					
						
							|  |  |  |             "type": "orm-profiler", | 
					
						
							|  |  |  |             "config": { | 
					
						
							|  |  |  |                 "profiler": { | 
					
						
							|  |  |  |                     "name": "ingestion_profiler", | 
					
						
							|  |  |  |                     "metrics": profiler_metrics, | 
					
						
							|  |  |  |                 }, | 
					
						
							|  |  |  |                 "tableConfig": [ | 
					
						
							|  |  |  |                     { | 
					
						
							|  |  |  |                         "fullyQualifiedName": f'{ingestion_config["source"]["serviceName"]}.default.{BUCKET_NAME}."profiler_test_.csv"', | 
					
						
							|  |  |  |                         "columnConfig": { | 
					
						
							|  |  |  |                             "includeColumns": [ | 
					
						
							|  |  |  |                                 {"columnName": "id", "metrics": id_metrics}, | 
					
						
							|  |  |  |                                 {"columnName": "age"}, | 
					
						
							|  |  |  |                             ] | 
					
						
							|  |  |  |                         }, | 
					
						
							|  |  |  |                     } | 
					
						
							|  |  |  |                 ], | 
					
						
							|  |  |  |             }, | 
					
						
							|  |  |  |         } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         profiler_workflow = ProfilerWorkflow.create(ingestion_config) | 
					
						
							|  |  |  |         profiler_workflow.execute() | 
					
						
							|  |  |  |         status = profiler_workflow.result_status() | 
					
						
							|  |  |  |         profiler_workflow.stop() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         assert status == WorkflowResultStatus.SUCCESS | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         table = metadata.get_by_name( | 
					
						
							|  |  |  |             entity=Table, | 
					
						
							|  |  |  |             fqn=f'{ingestion_config["source"]["serviceName"]}.default.{BUCKET_NAME}."profiler_test_.csv"', | 
					
						
							|  |  |  |             fields=["tableProfilerConfig"], | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         id_profile = metadata.get_profile_data( | 
					
						
							|  |  |  |             f'{ingestion_config["source"]["serviceName"]}.default.{BUCKET_NAME}."profiler_test_.csv".id', | 
					
						
							|  |  |  |             get_beginning_of_day_timestamp_mill(), | 
					
						
							|  |  |  |             get_end_of_day_timestamp_mill(), | 
					
						
							|  |  |  |             profile_type=ColumnProfile, | 
					
						
							|  |  |  |         ).entities | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         latest_id_profile = max(id_profile, key=lambda o: o.timestamp.root) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         id_metric_ln = 0 | 
					
						
							|  |  |  |         for metric_name, metric in latest_id_profile: | 
					
						
							|  |  |  |             if metric_name.upper() in id_metrics: | 
					
						
							|  |  |  |                 assert metric is not None | 
					
						
							|  |  |  |                 id_metric_ln += 1 | 
					
						
							|  |  |  |             else: | 
					
						
							|  |  |  |                 assert metric is None if metric_name not in non_metric_values else True | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         assert id_metric_ln == len(id_metrics) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         age_profile = metadata.get_profile_data( | 
					
						
							|  |  |  |             f'{ingestion_config["source"]["serviceName"]}.default.{BUCKET_NAME}."profiler_test_.csv".age', | 
					
						
							|  |  |  |             get_beginning_of_day_timestamp_mill(), | 
					
						
							|  |  |  |             get_end_of_day_timestamp_mill(), | 
					
						
							|  |  |  |             profile_type=ColumnProfile, | 
					
						
							|  |  |  |         ).entities | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         latest_age_profile = max(age_profile, key=lambda o: o.timestamp.root) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         age_metric_ln = 0 | 
					
						
							|  |  |  |         for metric_name, metric in latest_age_profile: | 
					
						
							|  |  |  |             if metric_name.upper() in profiler_metrics: | 
					
						
							|  |  |  |                 assert metric is not None | 
					
						
							|  |  |  |                 age_metric_ln += 1 | 
					
						
							|  |  |  |             else: | 
					
						
							|  |  |  |                 assert metric is None if metric_name not in non_metric_values else True | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         assert age_metric_ln == len(profiler_metrics) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         latest_exc_timestamp = latest_age_profile.timestamp.root | 
					
						
							|  |  |  |         first_name_profile = metadata.get_profile_data( | 
					
						
							|  |  |  |             f'{ingestion_config["source"]["serviceName"]}.default.{BUCKET_NAME}."profiler_test_.csv".first_name_profile', | 
					
						
							|  |  |  |             get_beginning_of_day_timestamp_mill(), | 
					
						
							|  |  |  |             get_end_of_day_timestamp_mill(), | 
					
						
							|  |  |  |             profile_type=ColumnProfile, | 
					
						
							|  |  |  |         ).entities | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         assert not [ | 
					
						
							|  |  |  |             p for p in first_name_profile if p.timestamp.root == latest_exc_timestamp | 
					
						
							|  |  |  |         ] | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-11-19 08:10:45 +01:00
										 |  |  |         ingestion_config["source"]["sourceConfig"]["config"].update( | 
					
						
							|  |  |  |             { | 
					
						
							|  |  |  |                 "type": "AutoClassification", | 
					
						
							| 
									
										
										
										
											2025-04-07 15:56:57 +02:00
										 |  |  |                 "storeSampleData": True, | 
					
						
							|  |  |  |                 "enableAutoClassification": False, | 
					
						
							| 
									
										
										
										
											2024-11-19 08:10:45 +01:00
										 |  |  |             } | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         auto_workflow = AutoClassificationWorkflow.create(ingestion_config) | 
					
						
							|  |  |  |         auto_workflow.execute() | 
					
						
							|  |  |  |         status = auto_workflow.result_status() | 
					
						
							|  |  |  |         auto_workflow.stop() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         assert status == WorkflowResultStatus.SUCCESS | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-09-12 07:13:01 +02:00
										 |  |  |         sample_data = metadata.get_sample_data(table) | 
					
						
							|  |  |  |         assert sorted([c.root for c in sample_data.sampleData.columns]) == sorted( | 
					
						
							|  |  |  |             ["id", "age"] | 
					
						
							|  |  |  |         ) |