FIX - Airflow tagging long names (#16304)

* FIX - Airflow tagging long names

* FIX - Airflow tagging long names

* FIX - Airflow tagging long names
This commit is contained in:
Pere Miquel Brull 2024-05-16 15:33:42 +02:00 committed by GitHub
parent 46e14b79f4
commit 44181b1ad3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 35 additions and 15 deletions

View File

@ -15,7 +15,7 @@ import json
import uuid import uuid
from datetime import datetime, timedelta from datetime import datetime, timedelta
from functools import partial from functools import partial
from typing import Callable, Union from typing import Callable, Optional, Union
import airflow import airflow
from airflow import DAG from airflow import DAG
@ -36,6 +36,7 @@ from metadata.generated.schema.metadataIngestion.application import (
) )
from metadata.ingestion.models.encoders import show_secrets_encoder from metadata.ingestion.models.encoders import show_secrets_encoder
from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils import fqn
from metadata.workflow.workflow_output_handler import print_status from metadata.workflow.workflow_output_handler import print_status
# pylint: disable=ungrouped-imports # pylint: disable=ungrouped-imports
@ -225,6 +226,21 @@ def build_workflow_config_property(
) )
def clean_name_tag(tag: str) -> Optional[str]:
"""
Clean the tag to be used in Airflow
:param tag: tag to be cleaned
:return: cleaned tag
"""
if not tag:
return None
try:
return fqn.split(tag)[-1][:100]
except Exception as exc:
logger.warning("Error cleaning tag: %s", exc)
return tag[:100]
def build_dag_configs(ingestion_pipeline: IngestionPipeline) -> dict: def build_dag_configs(ingestion_pipeline: IngestionPipeline) -> dict:
""" """
Prepare kwargs to send to DAG Prepare kwargs to send to DAG
@ -255,9 +271,10 @@ def build_dag_configs(ingestion_pipeline: IngestionPipeline) -> dict:
"schedule_interval": ingestion_pipeline.airflowConfig.scheduleInterval, "schedule_interval": ingestion_pipeline.airflowConfig.scheduleInterval,
"tags": [ "tags": [
"OpenMetadata", "OpenMetadata",
ingestion_pipeline.displayName or ingestion_pipeline.name.__root__, clean_name_tag(ingestion_pipeline.displayName)
or clean_name_tag(ingestion_pipeline.name.__root__),
ingestion_pipeline.pipelineType.value, ingestion_pipeline.pipelineType.value,
ingestion_pipeline.service.name, clean_name_tag(ingestion_pipeline.service.name),
], ],
} }

View File

@ -11,21 +11,24 @@
""" """
Test helper functions Test helper functions
""" """
from unittest import TestCase
from openmetadata_managed_apis.api.utils import clean_dag_id from openmetadata_managed_apis.api.utils import clean_dag_id
from openmetadata_managed_apis.workflows.ingestion.common import clean_name_tag
class TestHelpers(TestCase): def test_clean_dag_id():
""" """
Methods to validate helpers on REST APIs To make sure airflow can parse it
""" """
assert clean_dag_id("hello") == "hello"
assert clean_dag_id("hello(world)") == "hello_world_"
assert clean_dag_id("hello-world") == "hello-world"
assert clean_dag_id("%%&^++hello__") == "_hello__"
def test_clean_dag_id(self):
""" def test_clean_tag():
To make sure airflow can parse it """We can properly tag airflow DAGs"""
"""
self.assertEqual(clean_dag_id("hello"), "hello") assert clean_name_tag("hello") == "hello"
self.assertEqual(clean_dag_id("hello(world)"), "hello_world_") assert clean_name_tag("hello(world)") == "hello(world)"
self.assertEqual(clean_dag_id("hello-world"), "hello-world") assert clean_name_tag("service.pipeline") == "pipeline"
self.assertEqual(clean_dag_id("%%&^++hello__"), "_hello__") assert clean_name_tag(f"service.{'a' * 200}") == "a" * 100