mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-10-23 23:04:23 +00:00
189 lines
7.0 KiB
Python
189 lines
7.0 KiB
Python
# Copyright 2025 Collate
|
|
# Licensed under the Collate Community License, Version 1.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
import json
|
|
import pkgutil
|
|
import traceback
|
|
from pathlib import Path
|
|
from typing import Dict
|
|
|
|
from airflow import DAG, settings
|
|
from airflow.models import DagModel
|
|
from flask import escape
|
|
from jinja2 import Template
|
|
from openmetadata_managed_apis.api.config import (
|
|
AIRFLOW_DAGS_FOLDER,
|
|
DAG_GENERATED_CONFIGS,
|
|
PLUGIN_NAME,
|
|
)
|
|
from openmetadata_managed_apis.api.response import ApiResponse
|
|
from openmetadata_managed_apis.api.utils import (
|
|
clean_dag_id,
|
|
get_dagbag,
|
|
import_path,
|
|
scan_dags_job_background,
|
|
)
|
|
from openmetadata_managed_apis.utils.logger import operations_logger
|
|
|
|
from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import (
|
|
IngestionPipeline,
|
|
)
|
|
from metadata.utils.secrets.secrets_manager_factory import SecretsManagerFactory
|
|
|
|
logger = operations_logger()
|
|
|
|
|
|
class DeployDagException(Exception):
|
|
"""
|
|
Error when deploying the DAG
|
|
"""
|
|
|
|
|
|
def dump_with_safe_jwt(ingestion_pipeline: IngestionPipeline) -> str:
|
|
"""
|
|
Get the dump of the IngestionPipeline but keeping the JWT token masked.
|
|
|
|
Since Pydantic V2, we had to handle the serialization of secrets when dumping
|
|
the data at model level, since we don't have anymore fine-grained control of
|
|
it at runtime as we did with V1.
|
|
|
|
This means that even if the JWT token is a secret, a model_dump or model_json_dump
|
|
will automatically show the secret value - picking it from the Secrets Manager if enabled.
|
|
|
|
With this workaround, we're dumping the model to JSON and then replacing the JWT token
|
|
with the secret, so that if we are using a Secret Manager, the resulting file
|
|
will have the secret ID `secret:/super/secret` instead of the actual value.
|
|
|
|
Then, the client will pick up the right secret when the workflow is triggered.
|
|
"""
|
|
pipeline_json = ingestion_pipeline.model_dump(mode="json", exclude_defaults=False)
|
|
pipeline_json["openMetadataServerConnection"]["securityConfig"][
|
|
"jwtToken"
|
|
] = ingestion_pipeline.openMetadataServerConnection.securityConfig.jwtToken.get_secret_value(
|
|
skip_secret_manager=True
|
|
)
|
|
return json.dumps(pipeline_json, ensure_ascii=True)
|
|
|
|
|
|
class DagDeployer:
|
|
"""
|
|
Helper class to store DAG config
|
|
and deploy it to Airflow
|
|
"""
|
|
|
|
def __init__(self, ingestion_pipeline: IngestionPipeline):
|
|
logger.info(
|
|
f"Received the following Airflow Configuration: {ingestion_pipeline.airflowConfig}"
|
|
)
|
|
# we need to instantiate the secret manager in case secrets are passed
|
|
SecretsManagerFactory(
|
|
ingestion_pipeline.openMetadataServerConnection.secretsManagerProvider,
|
|
ingestion_pipeline.openMetadataServerConnection.secretsManagerLoader,
|
|
)
|
|
self.ingestion_pipeline = ingestion_pipeline
|
|
self.dag_id = clean_dag_id(self.ingestion_pipeline.name.root)
|
|
|
|
def store_airflow_pipeline_config(
|
|
self, dag_config_file_path: Path
|
|
) -> Dict[str, str]:
|
|
"""
|
|
Store the airflow pipeline config in a JSON file and
|
|
return the path for the Jinja rendering.
|
|
"""
|
|
|
|
logger.info(f"Saving file to {dag_config_file_path}")
|
|
with open(dag_config_file_path, "w") as outfile:
|
|
outfile.write(dump_with_safe_jwt(self.ingestion_pipeline))
|
|
|
|
return {"workflow_config_file": str(dag_config_file_path)}
|
|
|
|
def store_and_validate_dag_file(self, dag_runner_config: Dict[str, str]) -> str:
|
|
"""
|
|
Stores the Python file generating the DAG and returns
|
|
the rendered strings
|
|
"""
|
|
|
|
dag_py_file = Path(AIRFLOW_DAGS_FOLDER) / f"{self.dag_id}.py"
|
|
|
|
# Open the template and render
|
|
raw_template = pkgutil.get_data(PLUGIN_NAME, "resources/dag_runner.j2").decode()
|
|
template = Template(raw_template, autoescape=True)
|
|
|
|
rendered_dag = template.render(dag_runner_config)
|
|
|
|
# Create the DAGs path if it does not exist
|
|
if not dag_py_file.parent.is_dir():
|
|
dag_py_file.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
with open(dag_py_file, "w") as f:
|
|
f.write(rendered_dag)
|
|
|
|
try:
|
|
dag_file = import_path(str(dag_py_file))
|
|
except Exception as exc:
|
|
logger.debug(traceback.format_exc())
|
|
logger.error(f"Failed to import dag_file [{dag_py_file}]: {exc}")
|
|
raise exc
|
|
|
|
if dag_file is None:
|
|
raise DeployDagException(f"Failed to import dag_file [{dag_py_file}]")
|
|
|
|
return str(dag_py_file)
|
|
|
|
def refresh_session_dag(self, dag_py_file: str):
|
|
"""
|
|
Get the stored python DAG file and update the
|
|
Airflow DagBag and sync it to the db.
|
|
|
|
In Airflow 2.3.3, we also need to add a call
|
|
to the Scheduler job, to make sure that all
|
|
the pieces are being properly picked up.
|
|
"""
|
|
# Refresh dag into session
|
|
with settings.Session() as session:
|
|
try:
|
|
dag_bag = get_dagbag()
|
|
logger.info("dagbag size {}".format(dag_bag.size()))
|
|
found_dags = dag_bag.process_file(dag_py_file)
|
|
logger.info("processed dags {}".format(found_dags))
|
|
dag: DAG = dag_bag.get_dag(self.dag_id, session=session)
|
|
# Sync to DB
|
|
dag.sync_to_db(session=session)
|
|
dag_model = (
|
|
session.query(DagModel)
|
|
.filter(DagModel.dag_id == self.dag_id)
|
|
.first()
|
|
)
|
|
logger.info("dag_model:" + str(dag_model))
|
|
except Exception as exc:
|
|
msg = f"Workflow [{self.dag_id}] failed to refresh due to [{exc}]"
|
|
logger.debug(traceback.format_exc())
|
|
logger.error(msg)
|
|
return ApiResponse.server_error({f"message": msg})
|
|
|
|
scan_dags_job_background()
|
|
|
|
return ApiResponse.success(
|
|
{"message": f"Workflow [{escape(self.dag_id)}] has been created"}
|
|
)
|
|
|
|
def deploy(self):
|
|
"""
|
|
Run all methods to deploy the DAG
|
|
"""
|
|
dag_config_file_path = Path(DAG_GENERATED_CONFIGS) / f"{self.dag_id}.json"
|
|
logger.info(f"Config file under {dag_config_file_path}")
|
|
|
|
dag_runner_config = self.store_airflow_pipeline_config(dag_config_file_path)
|
|
dag_py_file = self.store_and_validate_dag_file(dag_runner_config)
|
|
response = self.refresh_session_dag(dag_py_file)
|
|
|
|
return response
|