From b84ce33b80cb43e885540a928f439a75771acbf6 Mon Sep 17 00:00:00 2001 From: Pere Miquel Brull Date: Thu, 28 Dec 2023 19:25:00 +0100 Subject: [PATCH] #11799 - Fix Airfow ownership & add pipeline tasks (#14510) * Fix airflow owner and add tasks * Add pipeline tasks ownership * MINOR - Fix py CI * Add pipeline tasks ownership * Add pipeline tasks ownership * MINOR - Fix py CI * MINOR - Fix py CI * Add pipeline tasks ownership * patch team * patch team * Format --- .../ingestion/ometa/mixins/es_mixin.py | 17 ++ .../ingestion/ometa/mixins/user_mixin.py | 141 +++++++-- .../dashboard/domodashboard/metadata.py | 7 +- .../source/dashboard/looker/metadata.py | 4 +- .../source/dashboard/redash/metadata.py | 4 +- .../source/dashboard/superset/mixin.py | 5 +- .../source/dashboard/tableau/metadata.py | 4 +- .../source/database/domodatabase/metadata.py | 4 +- .../ingestion/source/database/sample_data.py | 6 +- .../source/pipeline/airflow/metadata.py | 65 ++-- .../source/pipeline/airflow/models.py | 9 +- ingestion/src/metadata/utils/constants.py | 5 + .../integration/ometa/test_ometa_user_api.py | 98 ++++-- .../unit/topology/dashboard/test_looker.py | 3 +- .../unit/topology/pipeline/test_airflow.py | 285 +++++++++++------- .../java/org/openmetadata/service/Entity.java | 1 + .../service/jdbi3/PipelineRepository.java | 101 ++++++- .../elasticsearch/en/team_index_mapping.json | 9 + .../pipelines/PipelineResourceTest.java | 21 ++ .../json/schema/entity/data/pipeline.json | 4 + 20 files changed, 575 insertions(+), 218 deletions(-) diff --git a/ingestion/src/metadata/ingestion/ometa/mixins/es_mixin.py b/ingestion/src/metadata/ingestion/ometa/mixins/es_mixin.py index 7034053820d..0e2540b1fd6 100644 --- a/ingestion/src/metadata/ingestion/ometa/mixins/es_mixin.py +++ b/ingestion/src/metadata/ingestion/ometa/mixins/es_mixin.py @@ -79,6 +79,23 @@ class ESMixin(Generic[T]): return None + def _get_entity_from_es( + self, entity: Type[T], query_string: str, fields: Optional[list] = None + ) -> Optional[T]: + """Fetch an entity instance from ES""" + + try: + entity_list = self._search_es_entity( + entity_type=entity, query_string=query_string, fields=fields + ) + for instance in entity_list or []: + return instance + except Exception as err: + logger.debug(traceback.format_exc()) + logger.warning(f"Could not get {entity.__name__} info from ES due to {err}") + + return None + def es_search_from_fqn( self, entity_type: Type[T], diff --git a/ingestion/src/metadata/ingestion/ometa/mixins/user_mixin.py b/ingestion/src/metadata/ingestion/ometa/mixins/user_mixin.py index 4ec851a5ae9..7c450d11f82 100644 --- a/ingestion/src/metadata/ingestion/ometa/mixins/user_mixin.py +++ b/ingestion/src/metadata/ingestion/ometa/mixins/user_mixin.py @@ -13,12 +13,15 @@ Mixin class containing User specific methods To be used by OpenMetadata class """ -import traceback from functools import lru_cache -from typing import Optional +from typing import Optional, Type +from metadata.generated.schema.entity.teams.team import Team from metadata.generated.schema.entity.teams.user import User +from metadata.generated.schema.type.entityReference import EntityReference +from metadata.ingestion.api.common import T from metadata.ingestion.ometa.client import REST +from metadata.utils.constants import ENTITY_REFERENCE_TYPE_MAP from metadata.utils.elasticsearch import ES_INDEX_MAP from metadata.utils.logger import ometa_logger @@ -34,42 +37,134 @@ class OMetaUserMixin: client: REST - email_search = ( - "/search/query?q=email.keyword:{email}&from={from_}&size={size}&index=" - + ES_INDEX_MAP[User.__name__] - ) + @staticmethod + def email_search_query_es(entity: Type[T]) -> str: + return ( + "/search/query?q=email.keyword:{email}&from={from_}&size={size}&index=" + + ES_INDEX_MAP[entity.__name__] + ) - @lru_cache(maxsize=None) - def get_user_by_email( + @staticmethod + def name_search_query_es(entity: Type[T]) -> str: + """ + Allow for more flexible lookup following what the UI is doing when searching users. + + We don't want to stick to `q=name:{name}` since in case a user is named `random.user` + but looked as `Random User`, we want to find this match. + """ + return ( + "/search/query?q={name}&from={from_}&size={size}&index=" + + ES_INDEX_MAP[entity.__name__] + ) + + def _search_by_email( self, + entity: Type[T], email: Optional[str], from_count: int = 0, size: int = 1, fields: Optional[list] = None, - ) -> Optional[User]: + ) -> Optional[T]: """ - GET user entity by name + GET user or team entity by mail Args: email: user email to search from_count: records to expect size: number of records + fields: Optional field list to pass to ES request """ if email: - query_string = self.email_search.format( + query_string = self.email_search_query_es(entity=entity).format( email=email, from_=from_count, size=size ) - - try: - entity_list = self._search_es_entity( - entity_type=User, query_string=query_string, fields=fields - ) - for user in entity_list or []: - return user - except Exception as err: - logger.debug(traceback.format_exc()) - logger.warning( - f"Could not get user info from ES for user email {email} due to {err}" - ) + return self._get_entity_from_es( + entity=entity, query_string=query_string, fields=fields + ) + + return None + + def _search_by_name( + self, + entity: Type[T], + name: Optional[str], + from_count: int = 0, + size: int = 1, + fields: Optional[list] = None, + ) -> Optional[T]: + """ + GET entity by name + + Args: + name: user name to search + from_count: records to expect + size: number of records + fields: Optional field list to pass to ES request + """ + if name: + query_string = self.name_search_query_es(entity=entity).format( + name=name, from_=from_count, size=size + ) + return self._get_entity_from_es( + entity=entity, query_string=query_string, fields=fields + ) + + return None + + @lru_cache(maxsize=None) + def get_reference_by_email( + self, + email: Optional[str], + from_count: int = 0, + size: int = 1, + fields: Optional[list] = None, + ) -> Optional[EntityReference]: + """ + Get a User or Team Entity Reference by searching by its mail + """ + maybe_user = self._search_by_email( + entity=User, email=email, from_count=from_count, size=size, fields=fields + ) + if maybe_user: + return EntityReference( + id=maybe_user.id.__root__, type=ENTITY_REFERENCE_TYPE_MAP[User.__name__] + ) + + maybe_team = self._search_by_email( + entity=Team, email=email, from_count=from_count, size=size, fields=fields + ) + if maybe_team: + return EntityReference( + id=maybe_team.id.__root__, type=ENTITY_REFERENCE_TYPE_MAP[Team.__name__] + ) + + return None + + @lru_cache(maxsize=None) + def get_reference_by_name( + self, + name: Optional[str], + from_count: int = 0, + size: int = 1, + fields: Optional[list] = None, + ) -> Optional[EntityReference]: + """ + Get a User or Team Entity Reference by searching by its name + """ + maybe_user = self._search_by_name( + entity=User, name=name, from_count=from_count, size=size, fields=fields + ) + if maybe_user: + return EntityReference( + id=maybe_user.id.__root__, type=ENTITY_REFERENCE_TYPE_MAP[User.__name__] + ) + + maybe_team = self._search_by_name( + entity=Team, name=name, from_count=from_count, size=size, fields=fields + ) + if maybe_team: + return EntityReference( + id=maybe_team.id.__root__, type=ENTITY_REFERENCE_TYPE_MAP[Team.__name__] + ) return None diff --git a/ingestion/src/metadata/ingestion/source/dashboard/domodashboard/metadata.py b/ingestion/src/metadata/ingestion/source/dashboard/domodashboard/metadata.py index 6f5cebb44da..85d4554004f 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/domodashboard/metadata.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/domodashboard/metadata.py @@ -100,12 +100,7 @@ class DomodashboardSource(DashboardServiceSource): try: owner_details = self.client.domo.users_get(owner.id) if owner_details.get("email"): - user = self.metadata.get_user_by_email(owner_details["email"]) - if user: - return EntityReference(id=user.id.__root__, type="user") - logger.warning( - f"No user found with email [{owner_details['email']}] in OMD" - ) + return self.metadata.get_reference_by_email(owner_details["email"]) except Exception as exc: logger.warning( f"Error while getting details of user {owner.displayName} - {exc}" diff --git a/ingestion/src/metadata/ingestion/source/dashboard/looker/metadata.py b/ingestion/src/metadata/ingestion/source/dashboard/looker/metadata.py index 287568c2746..be5fc575f04 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/looker/metadata.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/looker/metadata.py @@ -634,9 +634,7 @@ class LookerSource(DashboardServiceSource): try: if dashboard_details.user_id is not None: dashboard_owner = self.client.user(dashboard_details.user_id) - user = self.metadata.get_user_by_email(dashboard_owner.email) - if user: - return EntityReference(id=user.id.__root__, type="user") + return self.metadata.get_reference_by_email(dashboard_owner.email) except Exception as err: logger.debug(traceback.format_exc()) diff --git a/ingestion/src/metadata/ingestion/source/dashboard/redash/metadata.py b/ingestion/src/metadata/ingestion/source/dashboard/redash/metadata.py index 583d3e78f9b..45c4f07e6c6 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/redash/metadata.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/redash/metadata.py @@ -109,11 +109,9 @@ class RedashSource(DashboardServiceSource): def get_owner_details(self, dashboard_details) -> Optional[EntityReference]: """Get owner from mail""" if dashboard_details.get("user") and dashboard_details["user"].get("email"): - user = self.metadata.get_user_by_email( + return self.metadata.get_reference_by_email( dashboard_details["user"].get("email") ) - if user: - return EntityReference(id=user.id.__root__, type="user") return None def get_dashboard_url(self, dashboard_details: dict) -> str: diff --git a/ingestion/src/metadata/ingestion/source/dashboard/superset/mixin.py b/ingestion/src/metadata/ingestion/source/dashboard/superset/mixin.py index 7bbd54124b5..a39d1e8ef42 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/superset/mixin.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/superset/mixin.py @@ -97,10 +97,7 @@ class SupersetSourceMixin(DashboardServiceSource): def _get_user_by_email(self, email: Optional[str]) -> Optional[EntityReference]: if email: - user = self.metadata.get_user_by_email(email) - if user: - return EntityReference(id=user.id.__root__, type="user") - + return self.metadata.get_reference_by_email(email) return None def get_owner_details( diff --git a/ingestion/src/metadata/ingestion/source/dashboard/tableau/metadata.py b/ingestion/src/metadata/ingestion/source/dashboard/tableau/metadata.py index 1e3379552cd..ff247e7861a 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/tableau/metadata.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/tableau/metadata.py @@ -158,9 +158,7 @@ class TableauSource(DashboardServiceSource): ) -> Optional[EntityReference]: """Get dashboard owner from email""" if dashboard_details.owner and dashboard_details.owner.email: - user = self.metadata.get_user_by_email(dashboard_details.owner.email) - if user: - return EntityReference(id=user.id.__root__, type="user") + return self.metadata.get_reference_by_email(dashboard_details.owner.email) return None def yield_tag(self, *_, **__) -> Iterable[Either[OMetaTagAndClassification]]: diff --git a/ingestion/src/metadata/ingestion/source/database/domodatabase/metadata.py b/ingestion/src/metadata/ingestion/source/database/domodatabase/metadata.py index 59a0db8a427..6b288524428 100644 --- a/ingestion/src/metadata/ingestion/source/database/domodatabase/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/domodatabase/metadata.py @@ -166,9 +166,7 @@ class DomodatabaseSource(DatabaseServiceSource): try: owner_details = User(**self.domo_client.users_get(owner.id)) if owner_details.email: - user = self.metadata.get_user_by_email(owner_details.email) - if user: - return EntityReference(id=user.id.__root__, type="user") + return self.metadata.get_reference_by_email(owner_details.email) except Exception as exc: logger.warning(f"Error while getting details of user {owner.name} - {exc}") return None diff --git a/ingestion/src/metadata/ingestion/source/database/sample_data.py b/ingestion/src/metadata/ingestion/source/database/sample_data.py index 7824a95e904..1a1fe42d101 100644 --- a/ingestion/src/metadata/ingestion/source/database/sample_data.py +++ b/ingestion/src/metadata/ingestion/source/database/sample_data.py @@ -1054,9 +1054,9 @@ class SampleDataSource( for pipeline in self.pipelines["pipelines"]: owner = None if pipeline.get("owner"): - user = self.metadata.get_user_by_email(email=pipeline.get("owner")) - if user: - owner = EntityReference(id=user.id.__root__, type="user") + owner = self.metadata.get_reference_by_email( + email=pipeline.get("owner") + ) pipeline_ev = CreatePipelineRequest( name=pipeline["name"], displayName=pipeline["displayName"], diff --git a/ingestion/src/metadata/ingestion/source/pipeline/airflow/metadata.py b/ingestion/src/metadata/ingestion/source/pipeline/airflow/metadata.py index 89291f6e4cc..ee31de6609c 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/airflow/metadata.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/airflow/metadata.py @@ -12,6 +12,7 @@ Airflow source to extract metadata from OM UI """ import traceback +from collections import Counter from datetime import datetime from typing import Iterable, List, Optional, cast @@ -98,7 +99,7 @@ class AirflowSource(PipelineServiceSource): self._session = None @classmethod - def create(cls, config_dict, metadata: OpenMetadata): + def create(cls, config_dict, metadata: OpenMetadata) -> "AirflowSource": config: WorkflowSource = WorkflowSource.parse_obj(config_dict) connection: AirflowConnection = config.serviceConnection.__root__.config if not isinstance(connection, AirflowConnection): @@ -283,7 +284,7 @@ class AirflowSource(PipelineServiceSource): start_date=data.get("start_date", None), tasks=data.get("tasks", []), schedule_interval=get_schedule_interval(data), - owners=self.fetch_owners(data), + owner=self.fetch_dag_owners(data), ) yield dag @@ -296,12 +297,29 @@ class AirflowSource(PipelineServiceSource): logger.debug(traceback.format_exc()) logger.warning(f"Wild error yielding dag {serialized_dag} - {err}") - def fetch_owners(self, data) -> Optional[str]: + def fetch_dag_owners(self, data) -> Optional[str]: + """ + In Airflow, ownership is defined as: + - `default_args`: Applied to all tasks and available on the DAG payload + - `owners`: Applied at the tasks. In Airflow's source code, DAG ownership is then a + list joined with the owners of all the tasks. + + We will pick the owner from the tasks that appears in most tasks. + """ try: - if self.source_config.includeOwners and data.get("default_args"): - return data.get("default_args", [])["__var"].get("email", []) - except TypeError: - pass + if self.source_config.includeOwners: + task_owners = [ + task.get("owner") + for task in data.get("tasks", []) + if task.get("owner") is not None + ] + if task_owners: + most_common_owner, _ = Counter(task_owners).most_common(1)[0] + return most_common_owner + except Exception as exc: + self.status.warning( + data.get("dag_id"), f"Could not extract owner information due to {exc}" + ) return None def get_pipeline_name(self, pipeline_details: SerializedDAG) -> str: @@ -310,8 +328,7 @@ class AirflowSource(PipelineServiceSource): """ return pipeline_details.dag_id - @staticmethod - def get_tasks_from_dag(dag: AirflowDagDetails, host_port: str) -> List[Task]: + def get_tasks_from_dag(self, dag: AirflowDagDetails, host_port: str) -> List[Task]: """ Obtain the tasks from a SerializedDAG :param dag: AirflowDagDetails @@ -332,28 +349,26 @@ class AirflowSource(PipelineServiceSource): startDate=task.start_date.isoformat() if task.start_date else None, endDate=task.end_date.isoformat() if task.end_date else None, taskType=task.task_type, + owner=self.get_owner(task.owner), ) for task in cast(Iterable[BaseOperator], dag.tasks) ] - def get_user_details(self, email) -> Optional[EntityReference]: - user = self.metadata.get_user_by_email(email=email) - if user: - return EntityReference(id=user.id.__root__, type="user") - return None + def get_owner(self, owner) -> Optional[EntityReference]: + """ + Fetching users by name via ES to keep things as fast as possible. - def get_owner(self, owners) -> Optional[EntityReference]: + We use the `owner` field since it's the onw used by Airflow to showcase + the info in its UI. In other connectors we might use the mail (e.g., in Looker), + but we use name here to be consistent with Airflow itself. + + If data is not indexed, we can live without this information + until the next run. + """ try: - if isinstance(owners, str) and owners: - return self.get_user_details(email=owners) - - if isinstance(owners, List) and owners: - for owner in owners or []: - return self.get_user_details(email=owner) - - logger.debug(f"No user found with email [{owners}] in OMD") + return self.metadata.get_reference_by_name(name=owner) except Exception as exc: - logger.warning(f"Error while getting details of user {owners} - {exc}") + logger.warning(f"Error while getting details of user {owner} - {exc}") return None def yield_pipeline( @@ -380,7 +395,7 @@ class AirflowSource(PipelineServiceSource): pipeline_details, self.service_connection.hostPort ), service=self.context.pipeline_service, - owner=self.get_owner(pipeline_details.owners), + owner=self.get_owner(pipeline_details.owner), scheduleInterval=pipeline_details.schedule_interval, ) yield Either(right=pipeline_request) diff --git a/ingestion/src/metadata/ingestion/source/pipeline/airflow/models.py b/ingestion/src/metadata/ingestion/source/pipeline/airflow/models.py index a9dae7fbd6e..38f9e908ec3 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/airflow/models.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/airflow/models.py @@ -31,7 +31,7 @@ class AirflowBaseModel(BaseModel): dag_id: str -class Task(BaseModel): +class AirflowTask(BaseModel): pool: Optional[str] doc_md: Optional[str] inlets: Optional[List[Any]] = Field(alias="_inlets") @@ -41,6 +41,7 @@ class Task(BaseModel): downstream_task_ids: Optional[List[str]] start_date: Optional[datetime] end_date: Optional[datetime] + owner: Optional[str] # Allow picking up data from key `inlets` and `_inlets` class Config: @@ -48,7 +49,7 @@ class Task(BaseModel): class TaskList(BaseModel): - __root__: List[Task] + __root__: List[AirflowTask] class Dag(BaseModel): @@ -68,6 +69,6 @@ class AirflowDagDetails(AirflowBaseModel): max_active_runs: Optional[int] description: Optional[str] start_date: Optional[datetime] - tasks: List[Task] - owners: Optional[Any] + tasks: List[AirflowTask] + owner: Optional[str] schedule_interval: Optional[str] diff --git a/ingestion/src/metadata/utils/constants.py b/ingestion/src/metadata/utils/constants.py index 12011e34b1e..f45e06ebec3 100644 --- a/ingestion/src/metadata/utils/constants.py +++ b/ingestion/src/metadata/utils/constants.py @@ -32,6 +32,8 @@ from metadata.generated.schema.entity.services.mlmodelService import MlModelServ from metadata.generated.schema.entity.services.pipelineService import PipelineService from metadata.generated.schema.entity.services.searchService import SearchService from metadata.generated.schema.entity.services.storageService import StorageService +from metadata.generated.schema.entity.teams.team import Team +from metadata.generated.schema.entity.teams.user import User DOT = "_DOT_" TEN_MIN = 10 * 60 @@ -92,4 +94,7 @@ ENTITY_REFERENCE_TYPE_MAP = { SearchIndex.__name__: "searchIndex", MlModel.__name__: "mlmodel", Container.__name__: "container", + # User Entities + User.__name__: "user", + Team.__name__: "team", } diff --git a/ingestion/tests/integration/ometa/test_ometa_user_api.py b/ingestion/tests/integration/ometa/test_ometa_user_api.py index cf7f77b8aec..d492609fa98 100644 --- a/ingestion/tests/integration/ometa/test_ometa_user_api.py +++ b/ingestion/tests/integration/ometa/test_ometa_user_api.py @@ -15,15 +15,12 @@ import logging import time from unittest import TestCase +from metadata.generated.schema.api.teams.createTeam import CreateTeamRequest from metadata.generated.schema.api.teams.createUser import CreateUserRequest -from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( - OpenMetadataConnection, -) +from metadata.generated.schema.entity.teams.team import Team, TeamType from metadata.generated.schema.entity.teams.user import User -from metadata.generated.schema.security.client.openMetadataJWTClientConfig import ( - OpenMetadataJWTClientConfig, -) -from metadata.ingestion.ometa.ometa_api import OpenMetadata + +from ..integration_base import int_admin_ometa class OMetaUserTest(TestCase): @@ -32,16 +29,7 @@ class OMetaUserTest(TestCase): Install the ingestion package before running the tests """ - server_config = OpenMetadataConnection( - hostPort="http://localhost:8585/api", - authProvider="openmetadata", - securityConfig=OpenMetadataJWTClientConfig( - jwtToken="eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg" - ), - ) - metadata = OpenMetadata(server_config) - - assert metadata.health_check() + metadata = int_admin_ometa() @classmethod def check_es_index(cls) -> None: @@ -67,9 +55,15 @@ class OMetaUserTest(TestCase): Prepare ingredients """ + cls.team: Team = cls.metadata.create_or_update( + data=CreateTeamRequest( + teamType=TeamType.Group, name="ops.team", email="ops.team@getcollate.io" + ) + ) + cls.user_1: User = cls.metadata.create_or_update( data=CreateUserRequest( - name="random.user", email="random.user@getcollate.io" + name="random.user.es", email="random.user.es@getcollate.io" ), ) @@ -102,34 +96,92 @@ class OMetaUserTest(TestCase): hard_delete=True, ) + cls.metadata.delete( + entity=User, + entity_id=cls.user_3.id, + hard_delete=True, + ) + + cls.metadata.delete( + entity=Team, + entity_id=cls.team.id, + hard_delete=True, + ) + def test_es_search_from_email(self): """ We can fetch users by its email """ # No email returns None - self.assertIsNone(self.metadata.get_user_by_email(email=None)) + self.assertIsNone(self.metadata.get_reference_by_email(email=None)) # Non existing email returns None self.assertIsNone( - self.metadata.get_user_by_email(email="idonotexist@random.com") + self.metadata.get_reference_by_email(email="idonotexist@random.com") ) # Non existing email returns, even if they have the same domain # To get this fixed, we had to update the `email` field in the # index as a `keyword` and search by `email.keyword` in ES. self.assertIsNone( - self.metadata.get_user_by_email(email="idonotexist@getcollate.io") + self.metadata.get_reference_by_email(email="idonotexist@getcollate.io") ) # I can get User 1, who has the name equal to its email self.assertEqual( self.user_1.id, - self.metadata.get_user_by_email(email="random.user@getcollate.io").id, + self.metadata.get_reference_by_email( + email="random.user.es@getcollate.io" + ).id, ) # I can get User 2, who has an email not matching the name self.assertEqual( self.user_2.id, - self.metadata.get_user_by_email(email="user2.1234@getcollate.io").id, + self.metadata.get_reference_by_email(email="user2.1234@getcollate.io").id, + ) + + # I can get the team by its mail + self.assertEqual( + self.team.id, + self.metadata.get_reference_by_email(email="ops.team@getcollate.io").id, + ) + + def test_es_search_from_name(self): + """ + We can fetch users by its name + """ + # No email returns None + self.assertIsNone(self.metadata.get_reference_by_name(name=None)) + + # Non existing email returns None + self.assertIsNone(self.metadata.get_reference_by_name(name="idonotexist")) + + # We can get the user matching its name + self.assertEqual( + self.user_1.id, + self.metadata.get_reference_by_name(name="random.user.es").id, + ) + + # Casing does not matter + self.assertEqual( + self.user_2.id, + self.metadata.get_reference_by_name(name="levy").id, + ) + + self.assertEqual( + self.user_2.id, + self.metadata.get_reference_by_name(name="Levy").id, + ) + + self.assertEqual( + self.user_1.id, + self.metadata.get_reference_by_name(name="Random User Es").id, + ) + + # I can get the team by its name + self.assertEqual( + self.team.id, + self.metadata.get_reference_by_name(name="OPS Team").id, ) diff --git a/ingestion/tests/unit/topology/dashboard/test_looker.py b/ingestion/tests/unit/topology/dashboard/test_looker.py index c231010543a..70dab3451a4 100644 --- a/ingestion/tests/unit/topology/dashboard/test_looker.py +++ b/ingestion/tests/unit/topology/dashboard/test_looker.py @@ -253,9 +253,8 @@ class LookerUnitTest(TestCase): ref = EntityReference(id=uuid.uuid4(), type="user") with patch.object(Looker40SDK, "user", return_value=MOCK_USER), patch.object( - # This does not really return a ref, but for simplicity OpenMetadata, - "get_user_by_email", + "get_reference_by_email", return_value=ref, ): self.assertEqual(self.looker.get_owner_details(MOCK_LOOKER_DASHBOARD), ref) diff --git a/ingestion/tests/unit/topology/pipeline/test_airflow.py b/ingestion/tests/unit/topology/pipeline/test_airflow.py index 47282ea896c..5a9a11dcb04 100644 --- a/ingestion/tests/unit/topology/pipeline/test_airflow.py +++ b/ingestion/tests/unit/topology/pipeline/test_airflow.py @@ -12,143 +12,188 @@ Test Airflow processing """ from unittest import TestCase +from unittest.mock import patch +from metadata.generated.schema.metadataIngestion.workflow import ( + OpenMetadataWorkflowConfig, +) +from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.ingestion.source.pipeline.airflow.metadata import AirflowSource from metadata.ingestion.source.pipeline.airflow.models import ( AirflowDag, AirflowDagDetails, ) from metadata.ingestion.source.pipeline.airflow.utils import get_schedule_interval +MOCK_CONFIG = { + "source": { + "type": "airflow", + "serviceName": "test_airflow", + "serviceConnection": { + "config": { + "type": "Airflow", + "hostPort": "https://localhost:8080", + "connection": {"type": "Backend"}, + } + }, + "sourceConfig": { + "config": { + "type": "PipelineMetadata", + "includeOwners": True, + } + }, + }, + "sink": {"type": "metadata-rest", "config": {}}, + "workflowConfig": { + "openMetadataServerConfig": { + "hostPort": "http://localhost:8585/api", + "authProvider": "openmetadata", + "securityConfig": {"jwtToken": "token"}, + }, + }, +} + + +SERIALIZED_DAG = { + "__version": 1, + "dag": { + "_dag_id": "test-lineage-253", + "fileloc": "/opt/airflow/dags/lineage-test.py", + "default_args": { + "__var": { + "owner": "my_owner", + "depends_on_past": False, + "email": ["airflow@example.com"], + "email_on_failure": False, + "email_on_retry": False, + "retries": 1, + "retry_delay": {"__var": 1, "__type": "timedelta"}, + }, + "__type": "dict", + }, + "timezone": "UTC", + "catchup": False, + "edge_info": {}, + "dataset_triggers": [], + "_description": "An example DAG which simulate dbt run of fct_application_summary for airflow lineage backend", + "_task_group": { + "_group_id": None, + "prefix_group_id": True, + "tooltip": "", + "ui_color": "CornflowerBlue", + "ui_fgcolor": "#000", + "children": { + "task0": ["operator", "task0"], + "task1": ["operator", "task1"], + }, + "upstream_group_ids": [], + "downstream_group_ids": [], + "upstream_task_ids": [], + "downstream_task_ids": [], + }, + "is_paused_upon_creation": False, + "start_date": 1688860800, + "schedule_interval": None, + "_processor_dags_folder": "/opt/airflow/dags", + "tasks": [ + { + "owner": "another_owner", + "retry_delay": 1, + "retries": 1, + "ui_color": "#e8f7e4", + "email": ["airflow@example.com"], + "task_id": "task0", + "email_on_failure": False, + "email_on_retry": False, + "pool": "default_pool", + "downstream_task_ids": ["task1"], + "template_ext": [], + "template_fields_renderers": {}, + "inlets": [ + { + "__var": { + "tables": ["sample_data.ecommerce_db.shopify.dim_location"] + }, + "__type": "dict", + } + ], + "template_fields": [], + "ui_fgcolor": "#000", + "_task_type": "EmptyOperator", + "_task_module": "airflow.operators.empty", + "_is_empty": True, + }, + { + "outlets": [ + { + "__var": { + "tables": ["sample_data.ecommerce_db.shopify.dim_staff"] + }, + "__type": "dict", + } + ], + "owner": "another_owner", + "retry_delay": 1, + "retries": 1, + "ui_color": "#e8f7e4", + "email": ["airflow@example.com"], + "task_id": "task1", + "email_on_failure": False, + "email_on_retry": False, + "pool": "default_pool", + "downstream_task_ids": [], + "template_ext": [], + "template_fields_renderers": {}, + "template_fields": [], + "ui_fgcolor": "#000", + "_task_type": "EmptyOperator", + "_task_module": "airflow.operators.empty", + "_is_empty": True, + }, + ], + "dag_dependencies": [], + "params": {}, + }, +} + class TestAirflow(TestCase): """ Test Airflow model processing """ + @patch( + "metadata.ingestion.source.pipeline.pipeline_service.PipelineServiceSource.test_connection" + ) + def __init__(self, methodName, test_connection) -> None: + super().__init__(methodName) + test_connection.return_value = False + self.config = OpenMetadataWorkflowConfig.parse_obj(MOCK_CONFIG) + + # This already validates that the source can be initialized + self.airflow: AirflowSource = AirflowSource.create( + MOCK_CONFIG["source"], + OpenMetadata(self.config.workflowConfig.openMetadataServerConfig), + ) + def test_parsing(self): """ We can properly pick up Airflow's payload and convert it to our models """ - serialized_dag = { - "__version": 1, - "dag": { - "_dag_id": "test-lineage-253", - "fileloc": "/opt/airflow/dags/lineage-test.py", - "default_args": { - "__var": { - "owner": "airflow", - "depends_on_past": False, - "email": ["airflow@example.com"], - "email_on_failure": False, - "email_on_retry": False, - "retries": 1, - "retry_delay": {"__var": 1, "__type": "timedelta"}, - }, - "__type": "dict", - }, - "timezone": "UTC", - "catchup": False, - "edge_info": {}, - "dataset_triggers": [], - "_description": "An example DAG which simulate dbt run of fct_application_summary for airflow lineage backend", - "_task_group": { - "_group_id": None, - "prefix_group_id": True, - "tooltip": "", - "ui_color": "CornflowerBlue", - "ui_fgcolor": "#000", - "children": { - "task0": ["operator", "task0"], - "task1": ["operator", "task1"], - }, - "upstream_group_ids": [], - "downstream_group_ids": [], - "upstream_task_ids": [], - "downstream_task_ids": [], - }, - "is_paused_upon_creation": False, - "start_date": 1688860800, - "schedule_interval": None, - "_processor_dags_folder": "/opt/airflow/dags", - "tasks": [ - { - "owner": "airflow", - "retry_delay": 1, - "retries": 1, - "ui_color": "#e8f7e4", - "email": ["airflow@example.com"], - "task_id": "task0", - "email_on_failure": False, - "email_on_retry": False, - "pool": "default_pool", - "downstream_task_ids": ["task1"], - "template_ext": [], - "template_fields_renderers": {}, - "inlets": [ - { - "__var": { - "tables": [ - "sample_data.ecommerce_db.shopify.dim_location" - ] - }, - "__type": "dict", - } - ], - "template_fields": [], - "ui_fgcolor": "#000", - "_task_type": "EmptyOperator", - "_task_module": "airflow.operators.empty", - "_is_empty": True, - }, - { - "outlets": [ - { - "__var": { - "tables": [ - "sample_data.ecommerce_db.shopify.dim_staff" - ] - }, - "__type": "dict", - } - ], - "owner": "airflow", - "retry_delay": 1, - "retries": 1, - "ui_color": "#e8f7e4", - "email": ["airflow@example.com"], - "task_id": "task1", - "email_on_failure": False, - "email_on_retry": False, - "pool": "default_pool", - "downstream_task_ids": [], - "template_ext": [], - "template_fields_renderers": {}, - "template_fields": [], - "ui_fgcolor": "#000", - "_task_type": "EmptyOperator", - "_task_module": "airflow.operators.empty", - "_is_empty": True, - }, - ], - "dag_dependencies": [], - "params": {}, - }, - } - - data = serialized_dag["dag"] + data = SERIALIZED_DAG["dag"] dag = AirflowDagDetails( dag_id="id", fileloc="loc", - data=AirflowDag.parse_obj(serialized_dag), + data=AirflowDag.parse_obj(SERIALIZED_DAG), max_active_runs=data.get("max_active_runs", None), description=data.get("_description", None), start_date=data.get("start_date", None), tasks=data.get("tasks", []), schedule_interval=None, - owners=None, + owner=None, ) self.assertEqual( @@ -172,6 +217,28 @@ class TestAirflow(TestCase): ], ) + def test_get_dag_owners(self): + data = SERIALIZED_DAG["dag"] + + # The owner will be the one appearing as owner in most of the tasks + self.assertEqual("another_owner", self.airflow.fetch_dag_owners(data)) + + # if we monkey-patch the data dict with tasks with different owner counts... + data = { + "tasks": [ + {"owner": "my_owner"}, + {"owner": "my_owner"}, + {"owner": "another_owner"}, + ] + } + self.assertEqual("my_owner", self.airflow.fetch_dag_owners(data)) + + # If there are no owners, return None + data = { + "tasks": [{"something": None}, {"another_thing": None}, {"random": None}] + } + self.assertIsNone(self.airflow.fetch_dag_owners(data)) + def test_get_schedule_interval(self): """ Check the shape of different DAGs diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/Entity.java b/openmetadata-service/src/main/java/org/openmetadata/service/Entity.java index b3b1d47c8d4..0d2f9481da2 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/Entity.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/Entity.java @@ -139,6 +139,7 @@ public final class Entity { public static final String DASHBOARD = "dashboard"; public static final String DASHBOARD_DATA_MODEL = "dashboardDataModel"; public static final String PIPELINE = "pipeline"; + public static final String TASK = "task"; public static final String CHART = "chart"; public static final String APPLICATION = "app"; public static final String APP_MARKET_PLACE_DEF = "appMarketPlaceDefinition"; diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/PipelineRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/PipelineRepository.java index 5b3bc758140..b65981af067 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/PipelineRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/PipelineRepository.java @@ -17,7 +17,9 @@ import static org.openmetadata.common.utils.CommonUtil.listOrEmpty; import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty; import static org.openmetadata.schema.type.Include.ALL; import static org.openmetadata.schema.type.Include.NON_DELETED; +import static org.openmetadata.schema.type.Relationship.OWNS; import static org.openmetadata.service.Entity.CONTAINER; +import static org.openmetadata.service.Entity.FIELD_OWNER; import static org.openmetadata.service.Entity.FIELD_TAGS; import static org.openmetadata.service.resources.tags.TagLabelUtil.addDerivedTags; import static org.openmetadata.service.resources.tags.TagLabelUtil.checkMutuallyExclusive; @@ -25,12 +27,14 @@ import static org.openmetadata.service.util.EntityUtil.taskMatch; import java.util.ArrayList; import java.util.List; +import org.apache.commons.lang3.tuple.Triple; import org.jdbi.v3.sqlobject.transaction.Transaction; import org.openmetadata.schema.EntityInterface; import org.openmetadata.schema.api.feed.ResolveTask; import org.openmetadata.schema.entity.data.Pipeline; import org.openmetadata.schema.entity.data.PipelineStatus; import org.openmetadata.schema.entity.services.PipelineService; +import org.openmetadata.schema.entity.teams.User; import org.openmetadata.schema.type.EntityReference; import org.openmetadata.schema.type.Include; import org.openmetadata.schema.type.Status; @@ -133,6 +137,7 @@ public class PipelineRepository extends EntityRepository { pipeline.setService(getContainer(pipeline.getId())); pipeline.setSourceHash(fields.contains("sourceHash") ? pipeline.getSourceHash() : null); getTaskTags(fields.contains(FIELD_TAGS), pipeline.getTasks()); + getTaskOwners(fields.contains(FIELD_OWNER), pipeline.getTasks()); pipeline.withPipelineStatus( fields.contains("pipelineStatus") ? getPipelineStatus(pipeline) @@ -146,6 +151,14 @@ public class PipelineRepository extends EntityRepository { fields.contains("pipelineStatus") ? pipeline.getPipelineStatus() : null); } + @Override + protected void postDelete(Pipeline entity) { + // Cleanup all the fieldRelationship for task ownership. User -[owns]-> Task + for (Task task : listOrEmpty(entity.getTasks())) { + deleteTaskOwnerRelationship(task); + } + } + private PipelineStatus getPipelineStatus(Pipeline pipeline) { return JsonUtils.readValue( getLatestExtensionFromTimeSeries( @@ -232,6 +245,11 @@ public class PipelineRepository extends EntityRepository { @Override public void prepare(Pipeline pipeline, boolean update) { populateService(pipeline); + // Tasks can have owners + for (Task task : listOrEmpty(pipeline.getTasks())) { + EntityReference owner = validateOwner(task.getOwner()); + task.setOwner(owner); + } } @Override @@ -241,15 +259,32 @@ public class PipelineRepository extends EntityRepository { pipeline.withService(null); // Don't store column tags as JSON but build it on the fly based on relationships - List taskWithTags = pipeline.getTasks(); - pipeline.setTasks(cloneWithoutTags(taskWithTags)); + List taskWithTagsAndOwners = pipeline.getTasks(); + pipeline.setTasks(cloneWithoutTagsAndOwners(taskWithTagsAndOwners)); store(pipeline, update); - pipeline.withService(service).withTasks(taskWithTags); + pipeline.withService(service).withTasks(taskWithTagsAndOwners); } @Override public void storeRelationships(Pipeline pipeline) { addServiceRelationship(pipeline, pipeline.getService()); + + for (Task task : listOrEmpty(pipeline.getTasks())) { + if (task.getOwner() != null) { + daoCollection + .fieldRelationshipDAO() + .insert( + FullyQualifiedName.buildHash( + task.getOwner().getFullyQualifiedName()), // from FQN hash + FullyQualifiedName.buildHash(task.getFullyQualifiedName()), // to FQN hash + task.getOwner().getFullyQualifiedName(), // from FQN + task.getFullyQualifiedName(), // to FQN + task.getOwner().getType(), // from type + Entity.TASK, // to type + OWNS.ordinal(), + null); + } + } } @Override @@ -288,6 +323,41 @@ public class PipelineRepository extends EntityRepository { } } + private void getTaskOwners(boolean setOwner, List tasks) { + for (Task t : listOrEmpty(tasks)) { + if (t.getOwner() == null) { + t.setOwner(setOwner ? getTaskOwner(t.getFullyQualifiedName()) : t.getOwner()); + } + } + } + + private EntityReference getTaskOwner(String taskFullyQualifiedName) { + EntityReference ownerRef = null; + + List> owners = + daoCollection + .fieldRelationshipDAO() + .findFrom( + FullyQualifiedName.buildHash(taskFullyQualifiedName), Entity.TASK, OWNS.ordinal()); + + // Triple + for (Triple owner : owners) { + if (owner.getMiddle().equals(Entity.USER)) { + User user = daoCollection.userDAO().findEntityByName(owner.getLeft(), Include.NON_DELETED); + ownerRef = + new EntityReference() + .withId(user.getId()) + .withName(user.getName()) + .withFullyQualifiedName(user.getFullyQualifiedName()) + .withDescription(user.getDescription()) + .withDisplayName(user.getDisplayName()) + .withHref(user.getHref()) + .withDeleted(user.getDeleted()); + } + } + return ownerRef; + } + private void setTaskFQN(String parentFQN, List tasks) { if (tasks != null) { tasks.forEach( @@ -320,16 +390,16 @@ public class PipelineRepository extends EntityRepository { pipeline.setServiceType(service.getServiceType()); } - private List cloneWithoutTags(List tasks) { + private List cloneWithoutTagsAndOwners(List tasks) { if (nullOrEmpty(tasks)) { return tasks; } List copy = new ArrayList<>(); - tasks.forEach(t -> copy.add(cloneWithoutTags(t))); + tasks.forEach(t -> copy.add(cloneWithoutTagsAndOwners(t))); return copy; } - private Task cloneWithoutTags(Task task) { + private Task cloneWithoutTagsAndOwners(Task task) { return new Task() .withDescription(task.getDescription()) .withName(task.getName()) @@ -343,6 +413,20 @@ public class PipelineRepository extends EntityRepository { .withEndDate(task.getEndDate()); } + protected void deleteTaskOwnerRelationship(Task task) { + // If the deleted task has owners, we need to remove the field relationship + if (task.getOwner() != null) { + daoCollection + .fieldRelationshipDAO() + .delete( + FullyQualifiedName.buildHash(task.getOwner().getFullyQualifiedName()), + FullyQualifiedName.buildHash(task.getFullyQualifiedName()), + task.getOwner().getType(), + Entity.TASK, + OWNS.ordinal()); + } + } + /** Handles entity updated from PUT and POST operation. */ public class PipelineUpdater extends EntityUpdater { public PipelineUpdater(Pipeline original, Pipeline updated, Operation operation) { @@ -400,7 +484,10 @@ public class PipelineRepository extends EntityRepository { recordListChange(TASKS_FIELD, origTasks, updatedTasks, added, deleted, taskMatch); applyTaskTags(added); deleted.forEach( - d -> daoCollection.tagUsageDAO().deleteTagsByTarget(d.getFullyQualifiedName())); + d -> { + daoCollection.tagUsageDAO().deleteTagsByTarget(d.getFullyQualifiedName()); + deleteTaskOwnerRelationship(d); + }); } } diff --git a/openmetadata-service/src/main/resources/elasticsearch/en/team_index_mapping.json b/openmetadata-service/src/main/resources/elasticsearch/en/team_index_mapping.json index 08fce212025..a10894757a0 100644 --- a/openmetadata-service/src/main/resources/elasticsearch/en/team_index_mapping.json +++ b/openmetadata-service/src/main/resources/elasticsearch/en/team_index_mapping.json @@ -94,6 +94,15 @@ "href": { "type": "text" }, + "email": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, "domain" : { "properties": { "id": { diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/resources/pipelines/PipelineResourceTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/resources/pipelines/PipelineResourceTest.java index 6b09d7a9d4d..483c7424f5d 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/resources/pipelines/PipelineResourceTest.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/resources/pipelines/PipelineResourceTest.java @@ -227,6 +227,27 @@ public class PipelineResourceTest extends EntityResourceTest