#11799 - Fix Airfow ownership & add pipeline tasks (#14510)

* Fix airflow owner and add tasks

* Add pipeline tasks ownership

* MINOR - Fix py CI

* Add pipeline tasks ownership

* Add pipeline tasks ownership

* MINOR - Fix py CI

* MINOR - Fix py CI

* Add pipeline tasks ownership

* patch team

* patch team

* Format
This commit is contained in:
Pere Miquel Brull 2023-12-28 19:25:00 +01:00 committed by GitHub
parent 95b90bc510
commit b84ce33b80
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 575 additions and 218 deletions

View File

@ -79,6 +79,23 @@ class ESMixin(Generic[T]):
return None
def _get_entity_from_es(
self, entity: Type[T], query_string: str, fields: Optional[list] = None
) -> Optional[T]:
"""Fetch an entity instance from ES"""
try:
entity_list = self._search_es_entity(
entity_type=entity, query_string=query_string, fields=fields
)
for instance in entity_list or []:
return instance
except Exception as err:
logger.debug(traceback.format_exc())
logger.warning(f"Could not get {entity.__name__} info from ES due to {err}")
return None
def es_search_from_fqn(
self,
entity_type: Type[T],

View File

@ -13,12 +13,15 @@ Mixin class containing User specific methods
To be used by OpenMetadata class
"""
import traceback
from functools import lru_cache
from typing import Optional
from typing import Optional, Type
from metadata.generated.schema.entity.teams.team import Team
from metadata.generated.schema.entity.teams.user import User
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.common import T
from metadata.ingestion.ometa.client import REST
from metadata.utils.constants import ENTITY_REFERENCE_TYPE_MAP
from metadata.utils.elasticsearch import ES_INDEX_MAP
from metadata.utils.logger import ometa_logger
@ -34,42 +37,134 @@ class OMetaUserMixin:
client: REST
email_search = (
"/search/query?q=email.keyword:{email}&from={from_}&size={size}&index="
+ ES_INDEX_MAP[User.__name__]
)
@staticmethod
def email_search_query_es(entity: Type[T]) -> str:
return (
"/search/query?q=email.keyword:{email}&from={from_}&size={size}&index="
+ ES_INDEX_MAP[entity.__name__]
)
@lru_cache(maxsize=None)
def get_user_by_email(
@staticmethod
def name_search_query_es(entity: Type[T]) -> str:
"""
Allow for more flexible lookup following what the UI is doing when searching users.
We don't want to stick to `q=name:{name}` since in case a user is named `random.user`
but looked as `Random User`, we want to find this match.
"""
return (
"/search/query?q={name}&from={from_}&size={size}&index="
+ ES_INDEX_MAP[entity.__name__]
)
def _search_by_email(
self,
entity: Type[T],
email: Optional[str],
from_count: int = 0,
size: int = 1,
fields: Optional[list] = None,
) -> Optional[User]:
) -> Optional[T]:
"""
GET user entity by name
GET user or team entity by mail
Args:
email: user email to search
from_count: records to expect
size: number of records
fields: Optional field list to pass to ES request
"""
if email:
query_string = self.email_search.format(
query_string = self.email_search_query_es(entity=entity).format(
email=email, from_=from_count, size=size
)
try:
entity_list = self._search_es_entity(
entity_type=User, query_string=query_string, fields=fields
)
for user in entity_list or []:
return user
except Exception as err:
logger.debug(traceback.format_exc())
logger.warning(
f"Could not get user info from ES for user email {email} due to {err}"
)
return self._get_entity_from_es(
entity=entity, query_string=query_string, fields=fields
)
return None
def _search_by_name(
self,
entity: Type[T],
name: Optional[str],
from_count: int = 0,
size: int = 1,
fields: Optional[list] = None,
) -> Optional[T]:
"""
GET entity by name
Args:
name: user name to search
from_count: records to expect
size: number of records
fields: Optional field list to pass to ES request
"""
if name:
query_string = self.name_search_query_es(entity=entity).format(
name=name, from_=from_count, size=size
)
return self._get_entity_from_es(
entity=entity, query_string=query_string, fields=fields
)
return None
@lru_cache(maxsize=None)
def get_reference_by_email(
self,
email: Optional[str],
from_count: int = 0,
size: int = 1,
fields: Optional[list] = None,
) -> Optional[EntityReference]:
"""
Get a User or Team Entity Reference by searching by its mail
"""
maybe_user = self._search_by_email(
entity=User, email=email, from_count=from_count, size=size, fields=fields
)
if maybe_user:
return EntityReference(
id=maybe_user.id.__root__, type=ENTITY_REFERENCE_TYPE_MAP[User.__name__]
)
maybe_team = self._search_by_email(
entity=Team, email=email, from_count=from_count, size=size, fields=fields
)
if maybe_team:
return EntityReference(
id=maybe_team.id.__root__, type=ENTITY_REFERENCE_TYPE_MAP[Team.__name__]
)
return None
@lru_cache(maxsize=None)
def get_reference_by_name(
self,
name: Optional[str],
from_count: int = 0,
size: int = 1,
fields: Optional[list] = None,
) -> Optional[EntityReference]:
"""
Get a User or Team Entity Reference by searching by its name
"""
maybe_user = self._search_by_name(
entity=User, name=name, from_count=from_count, size=size, fields=fields
)
if maybe_user:
return EntityReference(
id=maybe_user.id.__root__, type=ENTITY_REFERENCE_TYPE_MAP[User.__name__]
)
maybe_team = self._search_by_name(
entity=Team, name=name, from_count=from_count, size=size, fields=fields
)
if maybe_team:
return EntityReference(
id=maybe_team.id.__root__, type=ENTITY_REFERENCE_TYPE_MAP[Team.__name__]
)
return None

View File

@ -100,12 +100,7 @@ class DomodashboardSource(DashboardServiceSource):
try:
owner_details = self.client.domo.users_get(owner.id)
if owner_details.get("email"):
user = self.metadata.get_user_by_email(owner_details["email"])
if user:
return EntityReference(id=user.id.__root__, type="user")
logger.warning(
f"No user found with email [{owner_details['email']}] in OMD"
)
return self.metadata.get_reference_by_email(owner_details["email"])
except Exception as exc:
logger.warning(
f"Error while getting details of user {owner.displayName} - {exc}"

View File

@ -634,9 +634,7 @@ class LookerSource(DashboardServiceSource):
try:
if dashboard_details.user_id is not None:
dashboard_owner = self.client.user(dashboard_details.user_id)
user = self.metadata.get_user_by_email(dashboard_owner.email)
if user:
return EntityReference(id=user.id.__root__, type="user")
return self.metadata.get_reference_by_email(dashboard_owner.email)
except Exception as err:
logger.debug(traceback.format_exc())

View File

@ -109,11 +109,9 @@ class RedashSource(DashboardServiceSource):
def get_owner_details(self, dashboard_details) -> Optional[EntityReference]:
"""Get owner from mail"""
if dashboard_details.get("user") and dashboard_details["user"].get("email"):
user = self.metadata.get_user_by_email(
return self.metadata.get_reference_by_email(
dashboard_details["user"].get("email")
)
if user:
return EntityReference(id=user.id.__root__, type="user")
return None
def get_dashboard_url(self, dashboard_details: dict) -> str:

View File

@ -97,10 +97,7 @@ class SupersetSourceMixin(DashboardServiceSource):
def _get_user_by_email(self, email: Optional[str]) -> Optional[EntityReference]:
if email:
user = self.metadata.get_user_by_email(email)
if user:
return EntityReference(id=user.id.__root__, type="user")
return self.metadata.get_reference_by_email(email)
return None
def get_owner_details(

View File

@ -158,9 +158,7 @@ class TableauSource(DashboardServiceSource):
) -> Optional[EntityReference]:
"""Get dashboard owner from email"""
if dashboard_details.owner and dashboard_details.owner.email:
user = self.metadata.get_user_by_email(dashboard_details.owner.email)
if user:
return EntityReference(id=user.id.__root__, type="user")
return self.metadata.get_reference_by_email(dashboard_details.owner.email)
return None
def yield_tag(self, *_, **__) -> Iterable[Either[OMetaTagAndClassification]]:

View File

@ -166,9 +166,7 @@ class DomodatabaseSource(DatabaseServiceSource):
try:
owner_details = User(**self.domo_client.users_get(owner.id))
if owner_details.email:
user = self.metadata.get_user_by_email(owner_details.email)
if user:
return EntityReference(id=user.id.__root__, type="user")
return self.metadata.get_reference_by_email(owner_details.email)
except Exception as exc:
logger.warning(f"Error while getting details of user {owner.name} - {exc}")
return None

View File

@ -1054,9 +1054,9 @@ class SampleDataSource(
for pipeline in self.pipelines["pipelines"]:
owner = None
if pipeline.get("owner"):
user = self.metadata.get_user_by_email(email=pipeline.get("owner"))
if user:
owner = EntityReference(id=user.id.__root__, type="user")
owner = self.metadata.get_reference_by_email(
email=pipeline.get("owner")
)
pipeline_ev = CreatePipelineRequest(
name=pipeline["name"],
displayName=pipeline["displayName"],

View File

@ -12,6 +12,7 @@
Airflow source to extract metadata from OM UI
"""
import traceback
from collections import Counter
from datetime import datetime
from typing import Iterable, List, Optional, cast
@ -98,7 +99,7 @@ class AirflowSource(PipelineServiceSource):
self._session = None
@classmethod
def create(cls, config_dict, metadata: OpenMetadata):
def create(cls, config_dict, metadata: OpenMetadata) -> "AirflowSource":
config: WorkflowSource = WorkflowSource.parse_obj(config_dict)
connection: AirflowConnection = config.serviceConnection.__root__.config
if not isinstance(connection, AirflowConnection):
@ -283,7 +284,7 @@ class AirflowSource(PipelineServiceSource):
start_date=data.get("start_date", None),
tasks=data.get("tasks", []),
schedule_interval=get_schedule_interval(data),
owners=self.fetch_owners(data),
owner=self.fetch_dag_owners(data),
)
yield dag
@ -296,12 +297,29 @@ class AirflowSource(PipelineServiceSource):
logger.debug(traceback.format_exc())
logger.warning(f"Wild error yielding dag {serialized_dag} - {err}")
def fetch_owners(self, data) -> Optional[str]:
def fetch_dag_owners(self, data) -> Optional[str]:
"""
In Airflow, ownership is defined as:
- `default_args`: Applied to all tasks and available on the DAG payload
- `owners`: Applied at the tasks. In Airflow's source code, DAG ownership is then a
list joined with the owners of all the tasks.
We will pick the owner from the tasks that appears in most tasks.
"""
try:
if self.source_config.includeOwners and data.get("default_args"):
return data.get("default_args", [])["__var"].get("email", [])
except TypeError:
pass
if self.source_config.includeOwners:
task_owners = [
task.get("owner")
for task in data.get("tasks", [])
if task.get("owner") is not None
]
if task_owners:
most_common_owner, _ = Counter(task_owners).most_common(1)[0]
return most_common_owner
except Exception as exc:
self.status.warning(
data.get("dag_id"), f"Could not extract owner information due to {exc}"
)
return None
def get_pipeline_name(self, pipeline_details: SerializedDAG) -> str:
@ -310,8 +328,7 @@ class AirflowSource(PipelineServiceSource):
"""
return pipeline_details.dag_id
@staticmethod
def get_tasks_from_dag(dag: AirflowDagDetails, host_port: str) -> List[Task]:
def get_tasks_from_dag(self, dag: AirflowDagDetails, host_port: str) -> List[Task]:
"""
Obtain the tasks from a SerializedDAG
:param dag: AirflowDagDetails
@ -332,28 +349,26 @@ class AirflowSource(PipelineServiceSource):
startDate=task.start_date.isoformat() if task.start_date else None,
endDate=task.end_date.isoformat() if task.end_date else None,
taskType=task.task_type,
owner=self.get_owner(task.owner),
)
for task in cast(Iterable[BaseOperator], dag.tasks)
]
def get_user_details(self, email) -> Optional[EntityReference]:
user = self.metadata.get_user_by_email(email=email)
if user:
return EntityReference(id=user.id.__root__, type="user")
return None
def get_owner(self, owner) -> Optional[EntityReference]:
"""
Fetching users by name via ES to keep things as fast as possible.
def get_owner(self, owners) -> Optional[EntityReference]:
We use the `owner` field since it's the onw used by Airflow to showcase
the info in its UI. In other connectors we might use the mail (e.g., in Looker),
but we use name here to be consistent with Airflow itself.
If data is not indexed, we can live without this information
until the next run.
"""
try:
if isinstance(owners, str) and owners:
return self.get_user_details(email=owners)
if isinstance(owners, List) and owners:
for owner in owners or []:
return self.get_user_details(email=owner)
logger.debug(f"No user found with email [{owners}] in OMD")
return self.metadata.get_reference_by_name(name=owner)
except Exception as exc:
logger.warning(f"Error while getting details of user {owners} - {exc}")
logger.warning(f"Error while getting details of user {owner} - {exc}")
return None
def yield_pipeline(
@ -380,7 +395,7 @@ class AirflowSource(PipelineServiceSource):
pipeline_details, self.service_connection.hostPort
),
service=self.context.pipeline_service,
owner=self.get_owner(pipeline_details.owners),
owner=self.get_owner(pipeline_details.owner),
scheduleInterval=pipeline_details.schedule_interval,
)
yield Either(right=pipeline_request)

View File

@ -31,7 +31,7 @@ class AirflowBaseModel(BaseModel):
dag_id: str
class Task(BaseModel):
class AirflowTask(BaseModel):
pool: Optional[str]
doc_md: Optional[str]
inlets: Optional[List[Any]] = Field(alias="_inlets")
@ -41,6 +41,7 @@ class Task(BaseModel):
downstream_task_ids: Optional[List[str]]
start_date: Optional[datetime]
end_date: Optional[datetime]
owner: Optional[str]
# Allow picking up data from key `inlets` and `_inlets`
class Config:
@ -48,7 +49,7 @@ class Task(BaseModel):
class TaskList(BaseModel):
__root__: List[Task]
__root__: List[AirflowTask]
class Dag(BaseModel):
@ -68,6 +69,6 @@ class AirflowDagDetails(AirflowBaseModel):
max_active_runs: Optional[int]
description: Optional[str]
start_date: Optional[datetime]
tasks: List[Task]
owners: Optional[Any]
tasks: List[AirflowTask]
owner: Optional[str]
schedule_interval: Optional[str]

View File

@ -32,6 +32,8 @@ from metadata.generated.schema.entity.services.mlmodelService import MlModelServ
from metadata.generated.schema.entity.services.pipelineService import PipelineService
from metadata.generated.schema.entity.services.searchService import SearchService
from metadata.generated.schema.entity.services.storageService import StorageService
from metadata.generated.schema.entity.teams.team import Team
from metadata.generated.schema.entity.teams.user import User
DOT = "_DOT_"
TEN_MIN = 10 * 60
@ -92,4 +94,7 @@ ENTITY_REFERENCE_TYPE_MAP = {
SearchIndex.__name__: "searchIndex",
MlModel.__name__: "mlmodel",
Container.__name__: "container",
# User Entities
User.__name__: "user",
Team.__name__: "team",
}

View File

@ -15,15 +15,12 @@ import logging
import time
from unittest import TestCase
from metadata.generated.schema.api.teams.createTeam import CreateTeamRequest
from metadata.generated.schema.api.teams.createUser import CreateUserRequest
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.entity.teams.team import Team, TeamType
from metadata.generated.schema.entity.teams.user import User
from metadata.generated.schema.security.client.openMetadataJWTClientConfig import (
OpenMetadataJWTClientConfig,
)
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from ..integration_base import int_admin_ometa
class OMetaUserTest(TestCase):
@ -32,16 +29,7 @@ class OMetaUserTest(TestCase):
Install the ingestion package before running the tests
"""
server_config = OpenMetadataConnection(
hostPort="http://localhost:8585/api",
authProvider="openmetadata",
securityConfig=OpenMetadataJWTClientConfig(
jwtToken="eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg"
),
)
metadata = OpenMetadata(server_config)
assert metadata.health_check()
metadata = int_admin_ometa()
@classmethod
def check_es_index(cls) -> None:
@ -67,9 +55,15 @@ class OMetaUserTest(TestCase):
Prepare ingredients
"""
cls.team: Team = cls.metadata.create_or_update(
data=CreateTeamRequest(
teamType=TeamType.Group, name="ops.team", email="ops.team@getcollate.io"
)
)
cls.user_1: User = cls.metadata.create_or_update(
data=CreateUserRequest(
name="random.user", email="random.user@getcollate.io"
name="random.user.es", email="random.user.es@getcollate.io"
),
)
@ -102,34 +96,92 @@ class OMetaUserTest(TestCase):
hard_delete=True,
)
cls.metadata.delete(
entity=User,
entity_id=cls.user_3.id,
hard_delete=True,
)
cls.metadata.delete(
entity=Team,
entity_id=cls.team.id,
hard_delete=True,
)
def test_es_search_from_email(self):
"""
We can fetch users by its email
"""
# No email returns None
self.assertIsNone(self.metadata.get_user_by_email(email=None))
self.assertIsNone(self.metadata.get_reference_by_email(email=None))
# Non existing email returns None
self.assertIsNone(
self.metadata.get_user_by_email(email="idonotexist@random.com")
self.metadata.get_reference_by_email(email="idonotexist@random.com")
)
# Non existing email returns, even if they have the same domain
# To get this fixed, we had to update the `email` field in the
# index as a `keyword` and search by `email.keyword` in ES.
self.assertIsNone(
self.metadata.get_user_by_email(email="idonotexist@getcollate.io")
self.metadata.get_reference_by_email(email="idonotexist@getcollate.io")
)
# I can get User 1, who has the name equal to its email
self.assertEqual(
self.user_1.id,
self.metadata.get_user_by_email(email="random.user@getcollate.io").id,
self.metadata.get_reference_by_email(
email="random.user.es@getcollate.io"
).id,
)
# I can get User 2, who has an email not matching the name
self.assertEqual(
self.user_2.id,
self.metadata.get_user_by_email(email="user2.1234@getcollate.io").id,
self.metadata.get_reference_by_email(email="user2.1234@getcollate.io").id,
)
# I can get the team by its mail
self.assertEqual(
self.team.id,
self.metadata.get_reference_by_email(email="ops.team@getcollate.io").id,
)
def test_es_search_from_name(self):
"""
We can fetch users by its name
"""
# No email returns None
self.assertIsNone(self.metadata.get_reference_by_name(name=None))
# Non existing email returns None
self.assertIsNone(self.metadata.get_reference_by_name(name="idonotexist"))
# We can get the user matching its name
self.assertEqual(
self.user_1.id,
self.metadata.get_reference_by_name(name="random.user.es").id,
)
# Casing does not matter
self.assertEqual(
self.user_2.id,
self.metadata.get_reference_by_name(name="levy").id,
)
self.assertEqual(
self.user_2.id,
self.metadata.get_reference_by_name(name="Levy").id,
)
self.assertEqual(
self.user_1.id,
self.metadata.get_reference_by_name(name="Random User Es").id,
)
# I can get the team by its name
self.assertEqual(
self.team.id,
self.metadata.get_reference_by_name(name="OPS Team").id,
)

View File

@ -253,9 +253,8 @@ class LookerUnitTest(TestCase):
ref = EntityReference(id=uuid.uuid4(), type="user")
with patch.object(Looker40SDK, "user", return_value=MOCK_USER), patch.object(
# This does not really return a ref, but for simplicity
OpenMetadata,
"get_user_by_email",
"get_reference_by_email",
return_value=ref,
):
self.assertEqual(self.looker.get_owner_details(MOCK_LOOKER_DASHBOARD), ref)

View File

@ -12,143 +12,188 @@
Test Airflow processing
"""
from unittest import TestCase
from unittest.mock import patch
from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataWorkflowConfig,
)
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.pipeline.airflow.metadata import AirflowSource
from metadata.ingestion.source.pipeline.airflow.models import (
AirflowDag,
AirflowDagDetails,
)
from metadata.ingestion.source.pipeline.airflow.utils import get_schedule_interval
MOCK_CONFIG = {
"source": {
"type": "airflow",
"serviceName": "test_airflow",
"serviceConnection": {
"config": {
"type": "Airflow",
"hostPort": "https://localhost:8080",
"connection": {"type": "Backend"},
}
},
"sourceConfig": {
"config": {
"type": "PipelineMetadata",
"includeOwners": True,
}
},
},
"sink": {"type": "metadata-rest", "config": {}},
"workflowConfig": {
"openMetadataServerConfig": {
"hostPort": "http://localhost:8585/api",
"authProvider": "openmetadata",
"securityConfig": {"jwtToken": "token"},
},
},
}
SERIALIZED_DAG = {
"__version": 1,
"dag": {
"_dag_id": "test-lineage-253",
"fileloc": "/opt/airflow/dags/lineage-test.py",
"default_args": {
"__var": {
"owner": "my_owner",
"depends_on_past": False,
"email": ["airflow@example.com"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": {"__var": 1, "__type": "timedelta"},
},
"__type": "dict",
},
"timezone": "UTC",
"catchup": False,
"edge_info": {},
"dataset_triggers": [],
"_description": "An example DAG which simulate dbt run of fct_application_summary for airflow lineage backend",
"_task_group": {
"_group_id": None,
"prefix_group_id": True,
"tooltip": "",
"ui_color": "CornflowerBlue",
"ui_fgcolor": "#000",
"children": {
"task0": ["operator", "task0"],
"task1": ["operator", "task1"],
},
"upstream_group_ids": [],
"downstream_group_ids": [],
"upstream_task_ids": [],
"downstream_task_ids": [],
},
"is_paused_upon_creation": False,
"start_date": 1688860800,
"schedule_interval": None,
"_processor_dags_folder": "/opt/airflow/dags",
"tasks": [
{
"owner": "another_owner",
"retry_delay": 1,
"retries": 1,
"ui_color": "#e8f7e4",
"email": ["airflow@example.com"],
"task_id": "task0",
"email_on_failure": False,
"email_on_retry": False,
"pool": "default_pool",
"downstream_task_ids": ["task1"],
"template_ext": [],
"template_fields_renderers": {},
"inlets": [
{
"__var": {
"tables": ["sample_data.ecommerce_db.shopify.dim_location"]
},
"__type": "dict",
}
],
"template_fields": [],
"ui_fgcolor": "#000",
"_task_type": "EmptyOperator",
"_task_module": "airflow.operators.empty",
"_is_empty": True,
},
{
"outlets": [
{
"__var": {
"tables": ["sample_data.ecommerce_db.shopify.dim_staff"]
},
"__type": "dict",
}
],
"owner": "another_owner",
"retry_delay": 1,
"retries": 1,
"ui_color": "#e8f7e4",
"email": ["airflow@example.com"],
"task_id": "task1",
"email_on_failure": False,
"email_on_retry": False,
"pool": "default_pool",
"downstream_task_ids": [],
"template_ext": [],
"template_fields_renderers": {},
"template_fields": [],
"ui_fgcolor": "#000",
"_task_type": "EmptyOperator",
"_task_module": "airflow.operators.empty",
"_is_empty": True,
},
],
"dag_dependencies": [],
"params": {},
},
}
class TestAirflow(TestCase):
"""
Test Airflow model processing
"""
@patch(
"metadata.ingestion.source.pipeline.pipeline_service.PipelineServiceSource.test_connection"
)
def __init__(self, methodName, test_connection) -> None:
super().__init__(methodName)
test_connection.return_value = False
self.config = OpenMetadataWorkflowConfig.parse_obj(MOCK_CONFIG)
# This already validates that the source can be initialized
self.airflow: AirflowSource = AirflowSource.create(
MOCK_CONFIG["source"],
OpenMetadata(self.config.workflowConfig.openMetadataServerConfig),
)
def test_parsing(self):
"""
We can properly pick up Airflow's payload and convert
it to our models
"""
serialized_dag = {
"__version": 1,
"dag": {
"_dag_id": "test-lineage-253",
"fileloc": "/opt/airflow/dags/lineage-test.py",
"default_args": {
"__var": {
"owner": "airflow",
"depends_on_past": False,
"email": ["airflow@example.com"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": {"__var": 1, "__type": "timedelta"},
},
"__type": "dict",
},
"timezone": "UTC",
"catchup": False,
"edge_info": {},
"dataset_triggers": [],
"_description": "An example DAG which simulate dbt run of fct_application_summary for airflow lineage backend",
"_task_group": {
"_group_id": None,
"prefix_group_id": True,
"tooltip": "",
"ui_color": "CornflowerBlue",
"ui_fgcolor": "#000",
"children": {
"task0": ["operator", "task0"],
"task1": ["operator", "task1"],
},
"upstream_group_ids": [],
"downstream_group_ids": [],
"upstream_task_ids": [],
"downstream_task_ids": [],
},
"is_paused_upon_creation": False,
"start_date": 1688860800,
"schedule_interval": None,
"_processor_dags_folder": "/opt/airflow/dags",
"tasks": [
{
"owner": "airflow",
"retry_delay": 1,
"retries": 1,
"ui_color": "#e8f7e4",
"email": ["airflow@example.com"],
"task_id": "task0",
"email_on_failure": False,
"email_on_retry": False,
"pool": "default_pool",
"downstream_task_ids": ["task1"],
"template_ext": [],
"template_fields_renderers": {},
"inlets": [
{
"__var": {
"tables": [
"sample_data.ecommerce_db.shopify.dim_location"
]
},
"__type": "dict",
}
],
"template_fields": [],
"ui_fgcolor": "#000",
"_task_type": "EmptyOperator",
"_task_module": "airflow.operators.empty",
"_is_empty": True,
},
{
"outlets": [
{
"__var": {
"tables": [
"sample_data.ecommerce_db.shopify.dim_staff"
]
},
"__type": "dict",
}
],
"owner": "airflow",
"retry_delay": 1,
"retries": 1,
"ui_color": "#e8f7e4",
"email": ["airflow@example.com"],
"task_id": "task1",
"email_on_failure": False,
"email_on_retry": False,
"pool": "default_pool",
"downstream_task_ids": [],
"template_ext": [],
"template_fields_renderers": {},
"template_fields": [],
"ui_fgcolor": "#000",
"_task_type": "EmptyOperator",
"_task_module": "airflow.operators.empty",
"_is_empty": True,
},
],
"dag_dependencies": [],
"params": {},
},
}
data = serialized_dag["dag"]
data = SERIALIZED_DAG["dag"]
dag = AirflowDagDetails(
dag_id="id",
fileloc="loc",
data=AirflowDag.parse_obj(serialized_dag),
data=AirflowDag.parse_obj(SERIALIZED_DAG),
max_active_runs=data.get("max_active_runs", None),
description=data.get("_description", None),
start_date=data.get("start_date", None),
tasks=data.get("tasks", []),
schedule_interval=None,
owners=None,
owner=None,
)
self.assertEqual(
@ -172,6 +217,28 @@ class TestAirflow(TestCase):
],
)
def test_get_dag_owners(self):
data = SERIALIZED_DAG["dag"]
# The owner will be the one appearing as owner in most of the tasks
self.assertEqual("another_owner", self.airflow.fetch_dag_owners(data))
# if we monkey-patch the data dict with tasks with different owner counts...
data = {
"tasks": [
{"owner": "my_owner"},
{"owner": "my_owner"},
{"owner": "another_owner"},
]
}
self.assertEqual("my_owner", self.airflow.fetch_dag_owners(data))
# If there are no owners, return None
data = {
"tasks": [{"something": None}, {"another_thing": None}, {"random": None}]
}
self.assertIsNone(self.airflow.fetch_dag_owners(data))
def test_get_schedule_interval(self):
"""
Check the shape of different DAGs

View File

@ -139,6 +139,7 @@ public final class Entity {
public static final String DASHBOARD = "dashboard";
public static final String DASHBOARD_DATA_MODEL = "dashboardDataModel";
public static final String PIPELINE = "pipeline";
public static final String TASK = "task";
public static final String CHART = "chart";
public static final String APPLICATION = "app";
public static final String APP_MARKET_PLACE_DEF = "appMarketPlaceDefinition";

View File

@ -17,7 +17,9 @@ import static org.openmetadata.common.utils.CommonUtil.listOrEmpty;
import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty;
import static org.openmetadata.schema.type.Include.ALL;
import static org.openmetadata.schema.type.Include.NON_DELETED;
import static org.openmetadata.schema.type.Relationship.OWNS;
import static org.openmetadata.service.Entity.CONTAINER;
import static org.openmetadata.service.Entity.FIELD_OWNER;
import static org.openmetadata.service.Entity.FIELD_TAGS;
import static org.openmetadata.service.resources.tags.TagLabelUtil.addDerivedTags;
import static org.openmetadata.service.resources.tags.TagLabelUtil.checkMutuallyExclusive;
@ -25,12 +27,14 @@ import static org.openmetadata.service.util.EntityUtil.taskMatch;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.lang3.tuple.Triple;
import org.jdbi.v3.sqlobject.transaction.Transaction;
import org.openmetadata.schema.EntityInterface;
import org.openmetadata.schema.api.feed.ResolveTask;
import org.openmetadata.schema.entity.data.Pipeline;
import org.openmetadata.schema.entity.data.PipelineStatus;
import org.openmetadata.schema.entity.services.PipelineService;
import org.openmetadata.schema.entity.teams.User;
import org.openmetadata.schema.type.EntityReference;
import org.openmetadata.schema.type.Include;
import org.openmetadata.schema.type.Status;
@ -133,6 +137,7 @@ public class PipelineRepository extends EntityRepository<Pipeline> {
pipeline.setService(getContainer(pipeline.getId()));
pipeline.setSourceHash(fields.contains("sourceHash") ? pipeline.getSourceHash() : null);
getTaskTags(fields.contains(FIELD_TAGS), pipeline.getTasks());
getTaskOwners(fields.contains(FIELD_OWNER), pipeline.getTasks());
pipeline.withPipelineStatus(
fields.contains("pipelineStatus")
? getPipelineStatus(pipeline)
@ -146,6 +151,14 @@ public class PipelineRepository extends EntityRepository<Pipeline> {
fields.contains("pipelineStatus") ? pipeline.getPipelineStatus() : null);
}
@Override
protected void postDelete(Pipeline entity) {
// Cleanup all the fieldRelationship for task ownership. User -[owns]-> Task
for (Task task : listOrEmpty(entity.getTasks())) {
deleteTaskOwnerRelationship(task);
}
}
private PipelineStatus getPipelineStatus(Pipeline pipeline) {
return JsonUtils.readValue(
getLatestExtensionFromTimeSeries(
@ -232,6 +245,11 @@ public class PipelineRepository extends EntityRepository<Pipeline> {
@Override
public void prepare(Pipeline pipeline, boolean update) {
populateService(pipeline);
// Tasks can have owners
for (Task task : listOrEmpty(pipeline.getTasks())) {
EntityReference owner = validateOwner(task.getOwner());
task.setOwner(owner);
}
}
@Override
@ -241,15 +259,32 @@ public class PipelineRepository extends EntityRepository<Pipeline> {
pipeline.withService(null);
// Don't store column tags as JSON but build it on the fly based on relationships
List<Task> taskWithTags = pipeline.getTasks();
pipeline.setTasks(cloneWithoutTags(taskWithTags));
List<Task> taskWithTagsAndOwners = pipeline.getTasks();
pipeline.setTasks(cloneWithoutTagsAndOwners(taskWithTagsAndOwners));
store(pipeline, update);
pipeline.withService(service).withTasks(taskWithTags);
pipeline.withService(service).withTasks(taskWithTagsAndOwners);
}
@Override
public void storeRelationships(Pipeline pipeline) {
addServiceRelationship(pipeline, pipeline.getService());
for (Task task : listOrEmpty(pipeline.getTasks())) {
if (task.getOwner() != null) {
daoCollection
.fieldRelationshipDAO()
.insert(
FullyQualifiedName.buildHash(
task.getOwner().getFullyQualifiedName()), // from FQN hash
FullyQualifiedName.buildHash(task.getFullyQualifiedName()), // to FQN hash
task.getOwner().getFullyQualifiedName(), // from FQN
task.getFullyQualifiedName(), // to FQN
task.getOwner().getType(), // from type
Entity.TASK, // to type
OWNS.ordinal(),
null);
}
}
}
@Override
@ -288,6 +323,41 @@ public class PipelineRepository extends EntityRepository<Pipeline> {
}
}
private void getTaskOwners(boolean setOwner, List<Task> tasks) {
for (Task t : listOrEmpty(tasks)) {
if (t.getOwner() == null) {
t.setOwner(setOwner ? getTaskOwner(t.getFullyQualifiedName()) : t.getOwner());
}
}
}
private EntityReference getTaskOwner(String taskFullyQualifiedName) {
EntityReference ownerRef = null;
List<Triple<String, String, String>> owners =
daoCollection
.fieldRelationshipDAO()
.findFrom(
FullyQualifiedName.buildHash(taskFullyQualifiedName), Entity.TASK, OWNS.ordinal());
// Triple<fromFQN, fromType, json>
for (Triple<String, String, String> owner : owners) {
if (owner.getMiddle().equals(Entity.USER)) {
User user = daoCollection.userDAO().findEntityByName(owner.getLeft(), Include.NON_DELETED);
ownerRef =
new EntityReference()
.withId(user.getId())
.withName(user.getName())
.withFullyQualifiedName(user.getFullyQualifiedName())
.withDescription(user.getDescription())
.withDisplayName(user.getDisplayName())
.withHref(user.getHref())
.withDeleted(user.getDeleted());
}
}
return ownerRef;
}
private void setTaskFQN(String parentFQN, List<Task> tasks) {
if (tasks != null) {
tasks.forEach(
@ -320,16 +390,16 @@ public class PipelineRepository extends EntityRepository<Pipeline> {
pipeline.setServiceType(service.getServiceType());
}
private List<Task> cloneWithoutTags(List<Task> tasks) {
private List<Task> cloneWithoutTagsAndOwners(List<Task> tasks) {
if (nullOrEmpty(tasks)) {
return tasks;
}
List<Task> copy = new ArrayList<>();
tasks.forEach(t -> copy.add(cloneWithoutTags(t)));
tasks.forEach(t -> copy.add(cloneWithoutTagsAndOwners(t)));
return copy;
}
private Task cloneWithoutTags(Task task) {
private Task cloneWithoutTagsAndOwners(Task task) {
return new Task()
.withDescription(task.getDescription())
.withName(task.getName())
@ -343,6 +413,20 @@ public class PipelineRepository extends EntityRepository<Pipeline> {
.withEndDate(task.getEndDate());
}
protected void deleteTaskOwnerRelationship(Task task) {
// If the deleted task has owners, we need to remove the field relationship
if (task.getOwner() != null) {
daoCollection
.fieldRelationshipDAO()
.delete(
FullyQualifiedName.buildHash(task.getOwner().getFullyQualifiedName()),
FullyQualifiedName.buildHash(task.getFullyQualifiedName()),
task.getOwner().getType(),
Entity.TASK,
OWNS.ordinal());
}
}
/** Handles entity updated from PUT and POST operation. */
public class PipelineUpdater extends EntityUpdater {
public PipelineUpdater(Pipeline original, Pipeline updated, Operation operation) {
@ -400,7 +484,10 @@ public class PipelineRepository extends EntityRepository<Pipeline> {
recordListChange(TASKS_FIELD, origTasks, updatedTasks, added, deleted, taskMatch);
applyTaskTags(added);
deleted.forEach(
d -> daoCollection.tagUsageDAO().deleteTagsByTarget(d.getFullyQualifiedName()));
d -> {
daoCollection.tagUsageDAO().deleteTagsByTarget(d.getFullyQualifiedName());
deleteTaskOwnerRelationship(d);
});
}
}

View File

@ -94,6 +94,15 @@
"href": {
"type": "text"
},
"email": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"domain" : {
"properties": {
"id": {

View File

@ -227,6 +227,27 @@ public class PipelineResourceTest extends EntityResourceTest<Pipeline, CreatePip
assertEquals("ta.sk", actualTask.getName());
}
@Test
void post_pipelineWithTaskWithOwner(TestInfo test) throws IOException {
CreatePipeline create = createRequest(test);
Task task =
new Task()
.withName("task")
.withDescription("description")
.withSourceUrl("http://localhost:0")
.withOwner(USER1_REF);
create.setTasks(List.of(task));
Pipeline entity = createAndCheckEntity(create, ADMIN_AUTH_HEADERS);
Task actualTask = entity.getTasks().get(0);
assertEquals(USER1_REF.getName(), actualTask.getOwner().getName());
// We can GET the task retrieving the owner info
Pipeline storedPipeline =
getPipelineByName(entity.getFullyQualifiedName(), "owner,tasks", ADMIN_AUTH_HEADERS);
Task storedTask = storedPipeline.getTasks().get(0);
assertEquals(USER1_REF.getName(), storedTask.getOwner().getName());
}
@Test
void put_PipelineUrlUpdate_200(TestInfo test) throws IOException {
CreatePipeline request =

View File

@ -117,6 +117,10 @@
"$ref": "../../type/tagLabel.json"
},
"default": null
},
"owner": {
"description": "Owner of this task.",
"$ref": "../../type/entityReference.json"
}
},
"required": ["name"],