Pipeline Source Lint (#8002)

This commit is contained in:
Mayur Singal 2022-10-10 10:16:31 +05:30 committed by GitHub
parent 4e51c70dcc
commit b8e989af6c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 28 additions and 23 deletions

View File

@ -111,7 +111,10 @@ class AirbyteSource(PipelineServiceSource):
:param pipeline_details: pipeline_details object from airbyte :param pipeline_details: pipeline_details object from airbyte
:return: Create Pipeline request with tasks :return: Create Pipeline request with tasks
""" """
connection_url = f"/workspaces/{pipeline_details.workspace.get('workspaceId')}/connections/{pipeline_details.connection.get('connectionId')}" connection_url = (
f"/workspaces/{pipeline_details.workspace.get('workspaceId')}"
f"/connections/{pipeline_details.connection.get('connectionId')}"
)
yield CreatePipelineRequest( yield CreatePipelineRequest(
name=pipeline_details.connection.get("connectionId"), name=pipeline_details.connection.get("connectionId"),
displayName=pipeline_details.connection.get("name"), displayName=pipeline_details.connection.get("name"),
@ -134,8 +137,8 @@ class AirbyteSource(PipelineServiceSource):
# Airbyte does not offer specific attempt link, just at pipeline level # Airbyte does not offer specific attempt link, just at pipeline level
log_link = ( log_link = (
f"{self.service_connection.hostPort}/workspaces/{pipeline_details.workspace.get('workspaceId')}/connections/" f"{self.service_connection.hostPort}/workspaces/{pipeline_details.workspace.get('workspaceId')}"
f"{pipeline_details.connection.get('connectionId')}/status" f"/connections/{pipeline_details.connection.get('connectionId')}/status"
) )
for job in self.client.list_jobs( for job in self.client.list_jobs(

View File

@ -121,7 +121,9 @@ class AirflowSource(PipelineServiceSource):
return self._session return self._session
def get_pipeline_status(self, dag_id: str) -> List[DagRun]: def get_pipeline_status(self, dag_id: str) -> List[DagRun]:
"""
Return the DagRuns of given dag
"""
dag_run_list = ( dag_run_list = (
self.session.query( self.session.query(
DagRun.dag_id, DagRun.dag_id,
@ -153,9 +155,7 @@ class AirflowSource(PipelineServiceSource):
for elem in dag_run_dict for elem in dag_run_dict
] ]
def get_task_instances( def get_task_instances(self, dag_id: str, run_id: str) -> List[OMTaskInstance]:
self, dag_id: str, run_id: str, execution_date: datetime
) -> List[OMTaskInstance]:
""" """
We are building our own scoped TaskInstance We are building our own scoped TaskInstance
class to only focus on core properties required class to only focus on core properties required
@ -211,9 +211,7 @@ class AirflowSource(PipelineServiceSource):
dag_run.run_id dag_run.run_id
): # Airflow dags can have old task which are turned off/commented out in code ): # Airflow dags can have old task which are turned off/commented out in code
tasks = self.get_task_instances( tasks = self.get_task_instances(
dag_id=dag_run.dag_id, dag_id=dag_run.dag_id, run_id=dag_run.run_id
run_id=dag_run.run_id,
execution_date=dag_run.execution_date, # Used for Airflow 2.1.4 query fallback
) )
task_statuses = [ task_statuses = [
@ -257,7 +255,7 @@ class AirflowSource(PipelineServiceSource):
""" """
json_data_column = ( json_data_column = (
SerializedDagModel._data # For 2.3.0 onwards SerializedDagModel._data # For 2.3.0 onwards # pylint: disable=protected-access
if hasattr(SerializedDagModel, "_data") if hasattr(SerializedDagModel, "_data")
else SerializedDagModel.data # For 2.2.5 and 2.1.4 else SerializedDagModel.data # For 2.2.5 and 2.1.4
) )

View File

@ -12,13 +12,8 @@
Dagster source to extract metadata from OM UI Dagster source to extract metadata from OM UI
""" """
import traceback import traceback
from collections.abc import Iterable
from typing import Dict, Iterable, List, Optional from typing import Dict, Iterable, List, Optional
from dagster_graphql import DagsterGraphQLClient
from sqlalchemy import text
from sqlalchemy.orm import Session
from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.pipeline import ( from metadata.generated.schema.entity.data.pipeline import (
@ -42,7 +37,6 @@ from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus
from metadata.ingestion.source.pipeline.pipeline_service import PipelineServiceSource from metadata.ingestion.source.pipeline.pipeline_service import PipelineServiceSource
from metadata.utils.connections import get_connection, test_connection from metadata.utils.connections import get_connection, test_connection
from metadata.utils.graphql_queries import DAGSTER_PIPELINE_DETAILS_GRAPHQL from metadata.utils.graphql_queries import DAGSTER_PIPELINE_DETAILS_GRAPHQL
from metadata.utils.helpers import datetime_to_ts
from metadata.utils.logger import ingestion_logger from metadata.utils.logger import ingestion_logger
logger = ingestion_logger() logger = ingestion_logger()
@ -84,10 +78,12 @@ class DagsterSource(PipelineServiceSource):
def get_run_list(self): def get_run_list(self):
try: try:
result = self.client.client._execute(DAGSTER_PIPELINE_DETAILS_GRAPHQL) result = self.client.client._execute(
DAGSTER_PIPELINE_DETAILS_GRAPHQL
) # pylint: disable=protected-access
except ConnectionError as conerr: except ConnectionError as conerr:
logger.error("Cannot connect to dagster client", conerr) logger.error(f"Cannot connect to dagster client {conerr}")
logger.debug("Failed due to : ", traceback.format_exc()) logger.debug(f"Failed due to : {traceback.format_exc()}")
return result["assetNodes"] return result["assetNodes"]

View File

@ -9,6 +9,10 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
"""
Glue pipeline source to extract metadata
"""
import traceback import traceback
from typing import Any, Iterable, List, Optional from typing import Any, Iterable, List, Optional
@ -52,6 +56,11 @@ STATUS_MAP = {
class GluepipelineSource(PipelineServiceSource): class GluepipelineSource(PipelineServiceSource):
"""
Implements the necessary methods ot extract
Pipeline metadata from Glue Pipeline's metadata db
"""
def __init__(self, config: WorkflowSource, metadata_config: OpenMetadataConnection): def __init__(self, config: WorkflowSource, metadata_config: OpenMetadataConnection):
super().__init__(config, metadata_config) super().__init__(config, metadata_config)
self.task_id_mapping = {} self.task_id_mapping = {}

View File

@ -40,7 +40,6 @@ from metadata.ingestion.models.topology import (
create_source_context, create_source_context,
) )
from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils import fqn
from metadata.utils.connections import get_connection, test_connection from metadata.utils.connections import get_connection, test_connection
from metadata.utils.filters import filter_by_pipeline from metadata.utils.filters import filter_by_pipeline
from metadata.utils.logger import ingestion_logger from metadata.utils.logger import ingestion_logger
@ -103,8 +102,8 @@ class PipelineSourceStatus(SourceStatus):
Reports the source status after ingestion Reports the source status after ingestion
""" """
pipelines_scanned: List[str] = list() pipelines_scanned: List[str] = []
filtered: List[str] = list() filtered: List[str] = []
def pipeline_scanned(self, topic: str) -> None: def pipeline_scanned(self, topic: str) -> None:
self.pipelines_scanned.append(topic) self.pipelines_scanned.append(topic)