364 lines
13 KiB
Python
Raw Normal View History

# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Metadata DAG common functions
"""
import json
from datetime import datetime, timedelta
from typing import Callable, Optional
import airflow
from airflow import DAG
from openmetadata_managed_apis.api.utils import clean_dag_id
from pydantic import ValidationError
from requests.utils import quote
from metadata.data_insight.api.workflow import DataInsightWorkflow
from metadata.generated.schema.entity.services.dashboardService import DashboardService
from metadata.generated.schema.entity.services.databaseService import DatabaseService
from metadata.generated.schema.entity.services.messagingService import MessagingService
from metadata.generated.schema.entity.services.metadataService import MetadataService
from metadata.generated.schema.entity.services.mlmodelService import MlModelService
from metadata.generated.schema.entity.services.pipelineService import PipelineService
from metadata.generated.schema.tests.testSuite import TestSuite
from metadata.generated.schema.type import basic
2022-04-20 11:59:56 +02:00
from metadata.ingestion.models.encoders import show_secrets_encoder
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.orm_profiler.api.workflow import ProfilerWorkflow
from metadata.test_suite.api.workflow import TestSuiteWorkflow
from metadata.utils.logger import set_loggers_level
try:
from airflow.operators.python import PythonOperator
except ModuleNotFoundError:
from airflow.operators.python_operator import PythonOperator
from openmetadata_managed_apis.utils.logger import workflow_logger
from openmetadata_managed_apis.utils.parser import (
parse_service_connection,
parse_validation_err,
)
from openmetadata_managed_apis.workflows.ingestion.credentials_builder import (
build_secrets_manager_credentials,
)
from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import (
IngestionPipeline,
PipelineState,
)
from metadata.generated.schema.metadataIngestion.workflow import (
LogLevels,
OpenMetadataWorkflowConfig,
)
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.generated.schema.metadataIngestion.workflow import WorkflowConfig
from metadata.ingestion.api.parser import (
InvalidWorkflowException,
ParsingConfigurationError,
)
from metadata.ingestion.api.workflow import Workflow
from metadata.ingestion.ometa.utils import model_str
logger = workflow_logger()
class InvalidServiceException(Exception):
"""
Exception to be thrown when couldn't fetch the service from server
"""
class ClientInitializationError(Exception):
"""
Exception to be thrown when couldn't initialize the Openmetadata Client
"""
def build_source(ingestion_pipeline: IngestionPipeline) -> WorkflowSource:
"""
Use the service EntityReference to build the Source.
Building the source dynamically helps us to not store any
sensitive info.
:param ingestion_pipeline: With the service ref
:return: WorkflowSource
"""
secrets_manager = (
ingestion_pipeline.openMetadataServerConnection.secretsManagerProvider
)
ingestion_pipeline.openMetadataServerConnection.secretsManagerCredentials = (
build_secrets_manager_credentials(secrets_manager)
)
try:
metadata = OpenMetadata(config=ingestion_pipeline.openMetadataServerConnection)
except Exception as exc:
raise ClientInitializationError(f"Failed to initialize the client: {exc}")
service_type = ingestion_pipeline.service.type
if service_type == "testSuite":
service = metadata.get_by_name(
entity=TestSuite, fqn=ingestion_pipeline.service.name
) # check we are able to access OM server
if not service:
raise InvalidServiceException(
f"Could not get service from type {service_type}"
)
return WorkflowSource(
type=service_type,
serviceName=ingestion_pipeline.service.name,
sourceConfig=ingestion_pipeline.sourceConfig,
)
entity_class = None
try:
if service_type == "databaseService":
entity_class = DatabaseService
service: DatabaseService = metadata.get_by_name(
entity=entity_class, fqn=ingestion_pipeline.service.name
)
elif service_type == "pipelineService":
entity_class = PipelineService
service: PipelineService = metadata.get_by_name(
entity=entity_class, fqn=ingestion_pipeline.service.name
)
elif service_type == "dashboardService":
entity_class = DashboardService
service: DashboardService = metadata.get_by_name(
entity=entity_class, fqn=ingestion_pipeline.service.name
)
elif service_type == "messagingService":
entity_class = MessagingService
service: MessagingService = metadata.get_by_name(
entity=entity_class, fqn=ingestion_pipeline.service.name
)
elif service_type == "mlmodelService":
entity_class = MlModelService
service: MlModelService = metadata.get_by_name(
entity=entity_class, fqn=ingestion_pipeline.service.name
)
elif service_type == "metadataService":
entity_class = MetadataService
service: MetadataService = metadata.get_by_name(
entity=entity_class, fqn=ingestion_pipeline.service.name
)
else:
raise InvalidServiceException(f"Invalid Service Type: {service_type}")
except ValidationError as original_error:
try:
resp = metadata.client.get(
f"{metadata.get_suffix(entity_class)}/name/{quote(model_str(ingestion_pipeline.service.name), safe='')}"
)
parse_service_connection(resp)
except (ValidationError, InvalidWorkflowException) as scoped_error:
if isinstance(scoped_error, ValidationError):
# Let's catch validations of internal Workflow models, not the Workflow itself
object_error = (
scoped_error.model.__name__
if scoped_error.model is not None
else "workflow"
)
raise ParsingConfigurationError(
f"We encountered an error parsing the configuration of your {object_error}.\n"
f"{parse_validation_err(scoped_error)}"
)
raise scoped_error
raise ParsingConfigurationError(
f"We encountered an error parsing the configuration of your workflow.\n"
f"{parse_validation_err(original_error)}"
)
if not service:
raise InvalidServiceException(f"Could not get service from type {service_type}")
return WorkflowSource(
type=service.serviceType.value.lower(),
serviceName=service.name.__root__,
serviceConnection=service.connection,
sourceConfig=ingestion_pipeline.sourceConfig,
)
def metadata_ingestion_workflow(workflow_config: OpenMetadataWorkflowConfig):
"""
Task that creates and runs the ingestion workflow.
The workflow_config gets cooked form the incoming
ingestionPipeline.
This is the callable used to create the PythonOperator
"""
set_loggers_level(workflow_config.workflowConfig.loggerLevel.value)
2022-04-20 11:59:56 +02:00
config = json.loads(workflow_config.json(encoder=show_secrets_encoder))
workflow = Workflow.create(config)
try:
workflow.execute()
workflow.raise_from_status()
workflow.print_status()
workflow.stop()
except Exception as err:
workflow.set_ingestion_pipeline_status(PipelineState.failed)
raise err
def profiler_workflow(workflow_config: OpenMetadataWorkflowConfig):
"""
Task that creates and runs the profiler workflow.
The workflow_config gets cooked form the incoming
ingestionPipeline.
This is the callable used to create the PythonOperator
"""
set_loggers_level(workflow_config.workflowConfig.loggerLevel.value)
config = json.loads(workflow_config.json(encoder=show_secrets_encoder))
workflow = ProfilerWorkflow.create(config)
try:
workflow.execute()
workflow.raise_from_status()
workflow.print_status()
workflow.stop()
except Exception as err:
workflow.set_ingestion_pipeline_status(PipelineState.failed)
raise err
def test_suite_workflow(workflow_config: OpenMetadataWorkflowConfig):
"""
Task that creates and runs the test suite workflow.
The workflow_config gets cooked form the incoming
ingestionPipeline.
This is the callable used to create the PythonOperator
"""
set_loggers_level(workflow_config.workflowConfig.loggerLevel.value)
config = json.loads(workflow_config.json(encoder=show_secrets_encoder))
workflow = TestSuiteWorkflow.create(config)
try:
workflow.execute()
workflow.raise_from_status()
workflow.print_status()
workflow.stop()
except Exception as err:
workflow.set_ingestion_pipeline_status(PipelineState.failed)
raise err
def data_insight_workflow(workflow_config: OpenMetadataWorkflowConfig):
"""Task that creates and runs the data insight workflow.
The workflow_config gets created form the incoming
ingestionPipeline.
This is the callable used to create the PythonOperator
Args:
workflow_config (OpenMetadataWorkflowConfig): _description_
"""
set_loggers_level(workflow_config.workflowConfig.loggerLevel.value)
config = json.loads(workflow_config.json(encoder=show_secrets_encoder))
workflow = DataInsightWorkflow.create(config)
try:
workflow.execute()
workflow.raise_from_status()
workflow.print_status()
workflow.stop()
except Exception as err:
workflow.set_ingestion_pipeline_status(PipelineState.failed)
raise err
def date_to_datetime(
date: Optional[basic.DateTime], date_format: str = "%Y-%m-%dT%H:%M:%S%z"
) -> Optional[datetime]:
"""
Format a basic.DateTime to datetime. ISO 8601 format by default.
"""
if date is None:
return
return datetime.strptime(str(date.__root__), date_format)
def build_workflow_config_property(
ingestion_pipeline: IngestionPipeline,
) -> WorkflowConfig:
"""
Prepare the workflow config with logLevels and openMetadataServerConfig
:param ingestion_pipeline: Received payload from REST
:return: WorkflowConfig
"""
return WorkflowConfig(
loggerLevel=ingestion_pipeline.loggerLevel or LogLevels.INFO,
openMetadataServerConfig=ingestion_pipeline.openMetadataServerConnection,
)
def build_dag_configs(ingestion_pipeline: IngestionPipeline) -> dict:
"""
Prepare kwargs to send to DAG
:param ingestion_pipeline: pipeline configs
:return: dict to use as kwargs
"""
return {
"dag_id": clean_dag_id(ingestion_pipeline.name.__root__),
"description": ingestion_pipeline.description,
"start_date": ingestion_pipeline.airflowConfig.startDate.__root__
if ingestion_pipeline.airflowConfig.startDate
else airflow.utils.dates.days_ago(1),
"end_date": ingestion_pipeline.airflowConfig.endDate.__root__
if ingestion_pipeline.airflowConfig.endDate
else None,
"concurrency": ingestion_pipeline.airflowConfig.concurrency,
"max_active_runs": ingestion_pipeline.airflowConfig.maxActiveRuns,
"default_view": ingestion_pipeline.airflowConfig.workflowDefaultView,
"orientation": ingestion_pipeline.airflowConfig.workflowDefaultViewOrientation,
"dagrun_timeout": timedelta(ingestion_pipeline.airflowConfig.workflowTimeout)
if ingestion_pipeline.airflowConfig.workflowTimeout
else None,
"is_paused_upon_creation": ingestion_pipeline.airflowConfig.pausePipeline
or False,
"catchup": ingestion_pipeline.airflowConfig.pipelineCatchup or False,
"schedule_interval": ingestion_pipeline.airflowConfig.scheduleInterval,
}
def build_dag(
task_name: str,
ingestion_pipeline: IngestionPipeline,
2022-04-20 11:59:56 +02:00
workflow_config: OpenMetadataWorkflowConfig,
workflow_fn: Callable,
) -> DAG:
"""
Build a simple metadata workflow DAG
"""
with DAG(**build_dag_configs(ingestion_pipeline)) as dag:
PythonOperator(
task_id=task_name,
python_callable=workflow_fn,
op_kwargs={"workflow_config": workflow_config},
retries=ingestion_pipeline.airflowConfig.retries,
retry_delay=ingestion_pipeline.airflowConfig.retryDelay,
)
return dag