Fix #12313 - Airflow lineage parsing and schedules (#12339)

* Fix docs

* Fix lineage and intervals

* Format
This commit is contained in:
Pere Miquel Brull 2023-07-11 07:35:22 +02:00 committed by GitHub
parent c3e37f4f55
commit 6a1b1190a7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 294 additions and 131 deletions

View File

@ -71,7 +71,6 @@ def clean_str(raw: str, engine: Engine) -> str:
@singledispatch
# pylint: disable=line-too-long
def clean_col(column_raw: Optional[Union[dict, str]], engine: Engine) -> str:
return (
single_quote_wrap(clean_str(str(column_raw), engine))

View File

@ -104,7 +104,7 @@ class DomodashboardSource(DashboardServiceSource):
if user:
return EntityReference(id=user.id.__root__, type="user")
logger.warning(
f"No user for found for email {owner_details['email']} in OMD"
f"No user found with email [{owner_details['email']}] in OMD"
)
except Exception as exc:
logger.warning(

View File

@ -88,6 +88,10 @@ def parse_xlets(xlet: List[dict]) -> Optional[Dict[str, List[str]]]:
Parse airflow xlets for V1
:param xlet: airflow v2 xlet dict
:return: dictionary of xlet list or None
[{'__var': {'tables': ['sample_data.ecommerce_db.shopify.fact_order']},
'__type': 'dict'}]
"""
# This branch is for lineage parser op
if isinstance(xlet, list) and len(xlet) and isinstance(xlet[0], dict):

View File

@ -12,8 +12,8 @@
Airflow source to extract metadata from OM UI
"""
import traceback
from datetime import datetime, timedelta
from typing import Any, Iterable, List, Optional, cast
from datetime import datetime
from typing import Iterable, List, Optional, cast
from airflow.models import BaseOperator, DagRun, TaskInstance
from airflow.models.serialized_dag import SerializedDagModel
@ -49,10 +49,9 @@ from metadata.ingestion.source.pipeline.airflow.models import (
AirflowDag,
AirflowDagDetails,
)
from metadata.ingestion.source.pipeline.airflow.utils import get_schedule_interval
from metadata.ingestion.source.pipeline.pipeline_service import PipelineServiceSource
from metadata.utils.constants import TIMEDELTA
from metadata.utils.helpers import clean_uri, datetime_to_ts
from metadata.utils.importer import import_from_module
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
@ -64,21 +63,6 @@ STATUS_MAP = {
}
class OMSerializedDagDetails(BaseModel):
"""
Custom model we get from the Airflow db
as a scoped SELECT from SerializedDagModel
"""
dag_id: str
data: Any
fileloc: str
# We don't have a validator for SerializedDag
class Config:
arbitrary_types_allowed = True
class OMTaskInstance(BaseModel):
"""
Custom model we get from the Airflow db
@ -206,7 +190,7 @@ class AirflowSource(PipelineServiceSource):
]
def yield_pipeline_status(
self, pipeline_details: SerializedDAG
self, pipeline_details: AirflowDagDetails
) -> OMetaPipelineStatus:
try:
dag_run_list = self.get_pipeline_status(pipeline_details.dag_id)
@ -251,39 +235,7 @@ class AirflowSource(PipelineServiceSource):
" Skipping status ingestion."
)
def get_schedule_interval(self, pipeline_data) -> Optional[str]:
"""
Fetch Schedule Intervals from Airflow Dags
"""
schedule_interval_timetable_val = pipeline_data.get("timetable", {}).get(
"__var", {}
)
if schedule_interval_timetable_val:
# Fetch Cron as String
return schedule_interval_timetable_val.get("expression", None)
schedule_interval_val = pipeline_data.get("schedule_interval", {})
if schedule_interval_val:
type_value = schedule_interval_val.get("__type", {})
if type_value == TIMEDELTA:
var_value = schedule_interval_val.get("__var", {})
# types of schedule interval with timedelta
# timedelta(days=1) = `1 day, 0:00:00`
return str(timedelta(seconds=var_value))
try:
# If the Schedule interval is a const value like @once, @yearly etc
# __type sends the module path, and once instantiated
return import_from_module(
pipeline_data.get("timetable", {}).get("__type", {})
)().summary
except Exception:
logger.debug(traceback.format_exc())
logger.warning(
f"Couldn't fetch schedule interval for dag {pipeline_data.get('_dag_id')}"
)
return None
def get_pipelines_list(self) -> Iterable[OMSerializedDagDetails]:
def get_pipelines_list(self) -> Iterable[AirflowDagDetails]:
"""
List all DAGs from the metadata db.
@ -302,20 +254,21 @@ class AirflowSource(PipelineServiceSource):
SerializedDagModel.fileloc,
).all():
try:
data = serialized_dag[1]["dag"]
dag = AirflowDagDetails(
dag_id=serialized_dag[0],
fileloc=serialized_dag[2],
data=AirflowDag(**serialized_dag[1]),
max_active_runs=data.get("max_active_runs", None),
description=data.get("_description", None),
start_date=data.get("start_date", None),
tasks=data.get("tasks", []),
schedule_interval=self.get_schedule_interval(data),
owners=self.fetch_owners(data),
)
if serialized_dag[0] in ("example_subdag_operator"):
data = serialized_dag[1]["dag"]
dag = AirflowDagDetails(
dag_id=serialized_dag[0],
fileloc=serialized_dag[2],
data=AirflowDag.parse_obj(serialized_dag[1]),
max_active_runs=data.get("max_active_runs", None),
description=data.get("_description", None),
start_date=data.get("start_date", None),
tasks=data.get("tasks", []),
schedule_interval=get_schedule_interval(data),
owners=self.fetch_owners(data),
)
yield dag
yield dag
except ValidationError as err:
logger.debug(traceback.format_exc())
logger.warning(
@ -343,7 +296,8 @@ class AirflowSource(PipelineServiceSource):
def get_tasks_from_dag(dag: AirflowDagDetails, host_port: str) -> List[Task]:
"""
Obtain the tasks from a SerializedDAG
:param dag: SerializedDAG
:param dag: AirflowDagDetails
:param host_port: service host
:return: List of tasks
"""
return [
@ -363,18 +317,6 @@ class AirflowSource(PipelineServiceSource):
for task in cast(Iterable[BaseOperator], dag.tasks)
]
@staticmethod
def _build_dag(data: Any) -> SerializedDAG:
"""
Use the queried data to fetch the DAG
:param data: from SQA query
:return: SerializedDAG
"""
if isinstance(data, dict):
return SerializedDAG.from_dict(data)
return SerializedDAG.from_json(data)
def get_user_details(self, email) -> Optional[EntityReference]:
user = self.metadata.get_user_by_email(email=email)
if user:
@ -390,7 +332,7 @@ class AirflowSource(PipelineServiceSource):
for owner in owners or []:
return self.get_user_details(email=owner)
logger.debug(f"No user for found for email {owners} in OMD")
logger.debug(f"No user found with email [{owners}] in OMD")
except Exception as exc:
logger.warning(f"Error while getting details of user {owners} - {exc}")
return None
@ -439,52 +381,6 @@ class AirflowSource(PipelineServiceSource):
logger.debug(traceback.format_exc())
logger.warning(f"Wild error ingesting pipeline {pipeline_details} - {err}")
@staticmethod
def parse_xlets(xlet: Optional[List[Any]]) -> Optional[List[str]]:
"""
Parse airflow xlets for 2.1.4. E.g.,
[{'__var': {'tables': ['sample_data.ecommerce_db.shopify.fact_order']},
'__type': 'dict'}]
:param xlet: airflow v2 xlet dict
:return: table FQN list or None
"""
if xlet and len(xlet) and isinstance(xlet[0], dict):
tables = xlet[0].get("__var").get("tables")
if tables and isinstance(tables, list):
return tables
return None
def get_inlets(self, task: BaseOperator) -> Optional[List[str]]:
"""
Get inlets from serialised operator
:param task: SerializedBaseOperator
:return: maybe an inlet list
"""
inlets = task.get_inlet_defs()
try:
return self.parse_xlets(inlets)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(f"Error trying to parse inlets: {exc}")
return None
def get_outlets(self, task: BaseOperator) -> Optional[List[str]]:
"""
Get outlets from serialised operator
:param task: SerializedBaseOperator
:return: maybe an inlet list
"""
outlets = task.get_outlet_defs()
try:
return self.parse_xlets(outlets)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(f"Error trying to parse outlets: {exc}")
return None
def yield_pipeline_lineage_details(
self, pipeline_details: AirflowDagDetails
) -> Optional[Iterable[AddLineageRequest]]:

View File

@ -42,6 +42,10 @@ class Task(BaseModel):
start_date: Optional[datetime]
end_date: Optional[datetime]
# Allow picking up data from key `inlets` and `_inlets`
class Config:
allow_population_by_field_name = True
class TaskList(BaseModel):
__root__: List[Task]

View File

@ -0,0 +1,63 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Airflow metadata utils
"""
import traceback
from datetime import timedelta
from typing import Any, Dict, Optional
from metadata.utils.constants import TIMEDELTA
from metadata.utils.importer import import_from_module
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
def get_schedule_interval(pipeline_data: Dict[str, Any]) -> Optional[str]:
"""
Fetch Schedule Intervals from Airflow Dags
"""
try:
timetable, schedule = pipeline_data.get("timetable", {}), pipeline_data.get(
"schedule_interval", {}
)
if timetable:
# Fetch Cron as String
expression = timetable.get("__var", {}).get("expression")
if expression:
return expression
expression_class = timetable.get("__type")
if expression_class:
return import_from_module(expression_class)().summary
if schedule:
if isinstance(schedule, str):
return schedule
type_value = schedule.get("__type")
if type_value == TIMEDELTA:
var_value = schedule.get("__var", {})
# types of schedule interval with timedelta
# timedelta(days=1) = `1 day, 0:00:00`
return str(timedelta(seconds=var_value))
# If no timetable nor schedule, the DAG has no interval set
return None
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(
f"Couldn't fetch schedule interval for dag {pipeline_data.get('_dag_id'): [{exc}]}"
)
return None

View File

@ -0,0 +1,200 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Test Airflow processing
"""
from unittest import TestCase
from metadata.ingestion.source.pipeline.airflow.models import (
AirflowDag,
AirflowDagDetails,
)
from metadata.ingestion.source.pipeline.airflow.utils import get_schedule_interval
class TestAirflow(TestCase):
"""
Test Airflow model processing
"""
def test_parsing(self):
"""
We can properly pick up Airflow's payload and convert
it to our models
"""
serialized_dag = {
"__version": 1,
"dag": {
"_dag_id": "test-lineage-253",
"fileloc": "/opt/airflow/dags/lineage-test.py",
"default_args": {
"__var": {
"owner": "airflow",
"depends_on_past": False,
"email": ["airflow@example.com"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": {"__var": 1, "__type": "timedelta"},
},
"__type": "dict",
},
"timezone": "UTC",
"catchup": False,
"edge_info": {},
"dataset_triggers": [],
"_description": "An example DAG which simulate dbt run of fct_application_summary for airflow lineage backend",
"_task_group": {
"_group_id": None,
"prefix_group_id": True,
"tooltip": "",
"ui_color": "CornflowerBlue",
"ui_fgcolor": "#000",
"children": {
"task0": ["operator", "task0"],
"task1": ["operator", "task1"],
},
"upstream_group_ids": [],
"downstream_group_ids": [],
"upstream_task_ids": [],
"downstream_task_ids": [],
},
"is_paused_upon_creation": False,
"start_date": 1688860800,
"schedule_interval": None,
"_processor_dags_folder": "/opt/airflow/dags",
"tasks": [
{
"owner": "airflow",
"retry_delay": 1,
"retries": 1,
"ui_color": "#e8f7e4",
"email": ["airflow@example.com"],
"task_id": "task0",
"email_on_failure": False,
"email_on_retry": False,
"pool": "default_pool",
"downstream_task_ids": ["task1"],
"template_ext": [],
"template_fields_renderers": {},
"inlets": [
{
"__var": {
"tables": [
"sample_data.ecommerce_db.shopify.dim_location"
]
},
"__type": "dict",
}
],
"template_fields": [],
"ui_fgcolor": "#000",
"_task_type": "EmptyOperator",
"_task_module": "airflow.operators.empty",
"_is_empty": True,
},
{
"outlets": [
{
"__var": {
"tables": [
"sample_data.ecommerce_db.shopify.dim_staff"
]
},
"__type": "dict",
}
],
"owner": "airflow",
"retry_delay": 1,
"retries": 1,
"ui_color": "#e8f7e4",
"email": ["airflow@example.com"],
"task_id": "task1",
"email_on_failure": False,
"email_on_retry": False,
"pool": "default_pool",
"downstream_task_ids": [],
"template_ext": [],
"template_fields_renderers": {},
"template_fields": [],
"ui_fgcolor": "#000",
"_task_type": "EmptyOperator",
"_task_module": "airflow.operators.empty",
"_is_empty": True,
},
],
"dag_dependencies": [],
"params": {},
},
}
data = serialized_dag["dag"]
dag = AirflowDagDetails(
dag_id="id",
fileloc="loc",
data=AirflowDag.parse_obj(serialized_dag),
max_active_runs=data.get("max_active_runs", None),
description=data.get("_description", None),
start_date=data.get("start_date", None),
tasks=data.get("tasks", []),
schedule_interval=None,
owners=None,
)
self.assertEquals(
dag.tasks[0].inlets,
[
{
"__var": {
"tables": ["sample_data.ecommerce_db.shopify.dim_location"]
},
"__type": "dict",
}
],
)
self.assertEquals(
dag.tasks[1].outlets,
[
{
"__var": {"tables": ["sample_data.ecommerce_db.shopify.dim_staff"]},
"__type": "dict",
}
],
)
def test_get_schedule_interval(self):
"""
Check the shape of different DAGs
"""
pipeline_data = {"schedule_interval": None}
self.assertEquals(get_schedule_interval(pipeline_data), None)
pipeline_data = {"schedule_interval": {"__var": 86400.0, "__type": "timedelta"}}
self.assertEquals(get_schedule_interval(pipeline_data), "1 day, 0:00:00")
pipeline_data = {
"timetable": {
"__type": "airflow.timetables.simple.OnceTimetable",
"__var": {},
}
}
self.assertEquals(get_schedule_interval(pipeline_data), "@once")
pipeline_data = {
"timetable": {
"__type": "airflow.timetables.interval.CronDataIntervalTimetable",
"__var": {"expression": "*/2 * * * *", "timezone": "UTC"},
}
}
self.assertEquals(get_schedule_interval(pipeline_data), "*/2 * * * *")

View File

@ -579,11 +579,8 @@ For Airflow providers, you will want to pull the provider versions from [the mat
Also note that the ingestion workflow function must be entirely self contained as it will run by itself in the virtualenv. Any imports it needs, including the configuration, must exist within the function itself.
<<<<<<<< HEAD:openmetadata-docs/content/v1.1.0/connectors/ingestion/deployment/running-from-mwaa.md
### Extracting MWAA Metadata
========
>>>>>>>> docs-sso-airflow:openmetadata-docs/content/v1.1.0/connectors/pipeline/airflow/mwaa.md
As the ingestion process will be happening locally in MWAA, we can prepare a DAG with the following YAML
configuration: