From b8e989af6ca6c7c19b381d98e7348e5ba89e73b6 Mon Sep 17 00:00:00 2001 From: Mayur Singal <39544459+ulixius9@users.noreply.github.com> Date: Mon, 10 Oct 2022 10:16:31 +0530 Subject: [PATCH] Pipeline Source Lint (#8002) --- .../metadata/ingestion/source/pipeline/airbyte.py | 9 ++++++--- .../metadata/ingestion/source/pipeline/airflow.py | 14 ++++++-------- .../metadata/ingestion/source/pipeline/dagster.py | 14 +++++--------- .../ingestion/source/pipeline/gluepipeline.py | 9 +++++++++ .../ingestion/source/pipeline/pipeline_service.py | 5 ++--- 5 files changed, 28 insertions(+), 23 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/pipeline/airbyte.py b/ingestion/src/metadata/ingestion/source/pipeline/airbyte.py index 1d9cd1e34bb..e6559ff3880 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/airbyte.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/airbyte.py @@ -111,7 +111,10 @@ class AirbyteSource(PipelineServiceSource): :param pipeline_details: pipeline_details object from airbyte :return: Create Pipeline request with tasks """ - connection_url = f"/workspaces/{pipeline_details.workspace.get('workspaceId')}/connections/{pipeline_details.connection.get('connectionId')}" + connection_url = ( + f"/workspaces/{pipeline_details.workspace.get('workspaceId')}" + f"/connections/{pipeline_details.connection.get('connectionId')}" + ) yield CreatePipelineRequest( name=pipeline_details.connection.get("connectionId"), displayName=pipeline_details.connection.get("name"), @@ -134,8 +137,8 @@ class AirbyteSource(PipelineServiceSource): # Airbyte does not offer specific attempt link, just at pipeline level log_link = ( - f"{self.service_connection.hostPort}/workspaces/{pipeline_details.workspace.get('workspaceId')}/connections/" - f"{pipeline_details.connection.get('connectionId')}/status" + f"{self.service_connection.hostPort}/workspaces/{pipeline_details.workspace.get('workspaceId')}" + f"/connections/{pipeline_details.connection.get('connectionId')}/status" ) for job in self.client.list_jobs( diff --git a/ingestion/src/metadata/ingestion/source/pipeline/airflow.py b/ingestion/src/metadata/ingestion/source/pipeline/airflow.py index d50772a49ad..182679bd18d 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/airflow.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/airflow.py @@ -121,7 +121,9 @@ class AirflowSource(PipelineServiceSource): return self._session def get_pipeline_status(self, dag_id: str) -> List[DagRun]: - + """ + Return the DagRuns of given dag + """ dag_run_list = ( self.session.query( DagRun.dag_id, @@ -153,9 +155,7 @@ class AirflowSource(PipelineServiceSource): for elem in dag_run_dict ] - def get_task_instances( - self, dag_id: str, run_id: str, execution_date: datetime - ) -> List[OMTaskInstance]: + def get_task_instances(self, dag_id: str, run_id: str) -> List[OMTaskInstance]: """ We are building our own scoped TaskInstance class to only focus on core properties required @@ -211,9 +211,7 @@ class AirflowSource(PipelineServiceSource): dag_run.run_id ): # Airflow dags can have old task which are turned off/commented out in code tasks = self.get_task_instances( - dag_id=dag_run.dag_id, - run_id=dag_run.run_id, - execution_date=dag_run.execution_date, # Used for Airflow 2.1.4 query fallback + dag_id=dag_run.dag_id, run_id=dag_run.run_id ) task_statuses = [ @@ -257,7 +255,7 @@ class AirflowSource(PipelineServiceSource): """ json_data_column = ( - SerializedDagModel._data # For 2.3.0 onwards + SerializedDagModel._data # For 2.3.0 onwards # pylint: disable=protected-access if hasattr(SerializedDagModel, "_data") else SerializedDagModel.data # For 2.2.5 and 2.1.4 ) diff --git a/ingestion/src/metadata/ingestion/source/pipeline/dagster.py b/ingestion/src/metadata/ingestion/source/pipeline/dagster.py index 91e8523f6c3..f297c6bce39 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/dagster.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/dagster.py @@ -12,13 +12,8 @@ Dagster source to extract metadata from OM UI """ import traceback -from collections.abc import Iterable from typing import Dict, Iterable, List, Optional -from dagster_graphql import DagsterGraphQLClient -from sqlalchemy import text -from sqlalchemy.orm import Session - from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.entity.data.pipeline import ( @@ -42,7 +37,6 @@ from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus from metadata.ingestion.source.pipeline.pipeline_service import PipelineServiceSource from metadata.utils.connections import get_connection, test_connection from metadata.utils.graphql_queries import DAGSTER_PIPELINE_DETAILS_GRAPHQL -from metadata.utils.helpers import datetime_to_ts from metadata.utils.logger import ingestion_logger logger = ingestion_logger() @@ -84,10 +78,12 @@ class DagsterSource(PipelineServiceSource): def get_run_list(self): try: - result = self.client.client._execute(DAGSTER_PIPELINE_DETAILS_GRAPHQL) + result = self.client.client._execute( + DAGSTER_PIPELINE_DETAILS_GRAPHQL + ) # pylint: disable=protected-access except ConnectionError as conerr: - logger.error("Cannot connect to dagster client", conerr) - logger.debug("Failed due to : ", traceback.format_exc()) + logger.error(f"Cannot connect to dagster client {conerr}") + logger.debug(f"Failed due to : {traceback.format_exc()}") return result["assetNodes"] diff --git a/ingestion/src/metadata/ingestion/source/pipeline/gluepipeline.py b/ingestion/src/metadata/ingestion/source/pipeline/gluepipeline.py index 124b5953c17..09d6ab51dc3 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/gluepipeline.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/gluepipeline.py @@ -9,6 +9,10 @@ # See the License for the specific language governing permissions and # limitations under the License. +""" +Glue pipeline source to extract metadata +""" + import traceback from typing import Any, Iterable, List, Optional @@ -52,6 +56,11 @@ STATUS_MAP = { class GluepipelineSource(PipelineServiceSource): + """ + Implements the necessary methods ot extract + Pipeline metadata from Glue Pipeline's metadata db + """ + def __init__(self, config: WorkflowSource, metadata_config: OpenMetadataConnection): super().__init__(config, metadata_config) self.task_id_mapping = {} diff --git a/ingestion/src/metadata/ingestion/source/pipeline/pipeline_service.py b/ingestion/src/metadata/ingestion/source/pipeline/pipeline_service.py index 72097678368..f5e59ef5bef 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/pipeline_service.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/pipeline_service.py @@ -40,7 +40,6 @@ from metadata.ingestion.models.topology import ( create_source_context, ) from metadata.ingestion.ometa.ometa_api import OpenMetadata -from metadata.utils import fqn from metadata.utils.connections import get_connection, test_connection from metadata.utils.filters import filter_by_pipeline from metadata.utils.logger import ingestion_logger @@ -103,8 +102,8 @@ class PipelineSourceStatus(SourceStatus): Reports the source status after ingestion """ - pipelines_scanned: List[str] = list() - filtered: List[str] = list() + pipelines_scanned: List[str] = [] + filtered: List[str] = [] def pipeline_scanned(self, topic: str) -> None: self.pipelines_scanned.append(topic)