Fix #5410: Topology for Pipeline Services (#6085)

Fix #5410: Topology for Pipeline Services (#6085)
This commit is contained in:
Mayur Singal 2022-07-22 17:29:14 +05:30 committed by GitHub
parent 41d0c182cb
commit 75ccc803ad
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 830 additions and 321 deletions

View File

@ -205,7 +205,7 @@ class DashboardServiceSource(TopologyRunnerMixin, Source, ABC):
Yields lineage if config is enabled
"""
if self.source_config.dbServiceName:
yield from self.yield_dashboard_lineage_details(dashboard_details)
yield from self.yield_dashboard_lineage_details(dashboard_details) or []
def yield_tag(self, *args, **kwargs) -> Optional[Iterable[OMetaTagAndCategory]]:
"""

View File

@ -243,40 +243,6 @@ class GlueSource(Source[Entity]):
return None
def get_downstream_tasks(self, task_unique_id, tasks):
downstream_tasks = []
for edges in tasks["Edges"]:
if (
edges["SourceId"] == task_unique_id
and edges["DestinationId"] in self.task_id_mapping.values()
):
downstream_tasks.append(
list(self.task_id_mapping.keys())[
list(self.task_id_mapping.values()).index(
edges["DestinationId"]
)
][:128]
)
return downstream_tasks
def get_tasks(self, tasks):
task_list = []
for task in tasks["Graph"]["Nodes"]:
task_name = task["Name"][:128]
self.task_id_mapping[task_name] = task["UniqueId"]
for task in tasks["Graph"]["Nodes"]:
task_list.append(
Task(
name=task["Name"],
displayName=task["Name"],
taskType=task["Type"],
downstreamTasks=self.get_downstream_tasks(
task["UniqueId"], tasks["Graph"]
),
)
)
return task_list
def close(self):
pass

View File

@ -11,14 +11,14 @@
"""
Airbyte source to extract metadata
"""
import traceback
from dataclasses import dataclass, field
from typing import Iterable, List, Optional
from typing import Iterable, Optional
from pydantic import BaseModel
from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.pipeline import (
Pipeline,
PipelineStatus,
StatusType,
Task,
@ -32,39 +32,21 @@ from metadata.generated.schema.entity.services.connections.pipeline.airbyteConne
AirbyteConnection,
)
from metadata.generated.schema.entity.services.databaseService import DatabaseService
from metadata.generated.schema.entity.services.pipelineService import PipelineService
from metadata.generated.schema.metadataIngestion.pipelineServiceMetadataPipeline import (
PipelineServiceMetadataPipeline,
)
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.generated.schema.type.entityLineage import EntitiesEdge
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.common import Entity
from metadata.ingestion.api.source import InvalidSourceException, Source, SourceStatus
from metadata.ingestion.api.source import InvalidSourceException
from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.pipeline.pipeline_service import PipelineServiceSource
from metadata.utils import fqn
from metadata.utils.airbyte_client import AirbyteClient
from metadata.utils.filters import filter_by_pipeline
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
@dataclass
class AirbyteSourceStatus(SourceStatus):
pipelines_scanned: List[str] = field(default_factory=list)
filtered: List[str] = field(default_factory=list)
def pipeline_scanned(self, topic: str) -> None:
self.pipelines_scanned.append(topic)
def dropped(self, topic: str) -> None:
self.filtered.append(topic)
STATUS_MAP = {
"cancelled": StatusType.Failed,
"succeeded": StatusType.Successful,
@ -75,33 +57,27 @@ STATUS_MAP = {
}
class AirbyteSource(Source[CreatePipelineRequest]):
class AirbytePipelineDetails(BaseModel):
"""
Wrapper Class to combine the workspace with connection
"""
workspace: dict
connection: dict
class AirbyteSource(PipelineServiceSource):
"""
Implements the necessary methods ot extract
Pipeline metadata from Airflow's metadata db
"""
config: WorkflowSource
report: AirbyteSourceStatus
def __init__(
self,
config: WorkflowSource,
metadata_config: OpenMetadataConnection,
):
super().__init__()
self.config = config
self.source_config: PipelineServiceMetadataPipeline = (
self.config.sourceConfig.config
)
self.service_connection = self.config.serviceConnection.__root__.config
self.metadata_config = metadata_config
self.metadata = OpenMetadata(self.metadata_config)
self.status = AirbyteSourceStatus()
self.service: PipelineService = self.metadata.get_service_or_create(
entity=PipelineService, config=config
)
super().__init__(config, metadata_config)
self.client = AirbyteClient(self.service_connection)
@classmethod
@ -114,9 +90,6 @@ class AirbyteSource(Source[CreatePipelineRequest]):
)
return cls(config, metadata_config)
def prepare(self):
pass
def get_connections_jobs(self, connection: dict, connection_url: str):
"""
Returns the list of tasks linked to connection
@ -130,46 +103,51 @@ class AirbyteSource(Source[CreatePipelineRequest]):
)
]
def fetch_pipeline(
self, connection: dict, workspace: dict
def yield_pipeline(
self, pipeline_details: AirbytePipelineDetails
) -> Iterable[CreatePipelineRequest]:
"""
Convert a Connection into a Pipeline Entity
:param connection: connection object from airbyte
:param connection: workspace object from airbyte
:param pipeline_details: pipeline_details object from airbyte
:return: Create Pipeline request with tasks
"""
connection_url = f"/workspaces/{workspace.get('workspaceId')}/connections/{connection.get('connectionId')}"
connection_url = f"/workspaces/{pipeline_details.workspace.get('workspaceId')}/connections/{pipeline_details.connection.get('connectionId')}"
yield CreatePipelineRequest(
name=connection.get("connectionId"),
displayName=connection.get("name"),
name=pipeline_details.connection.get("connectionId"),
displayName=pipeline_details.connection.get("name"),
description="",
pipelineUrl=connection_url,
tasks=self.get_connections_jobs(connection, connection_url),
service=EntityReference(id=self.service.id, type="pipelineService"),
tasks=self.get_connections_jobs(
pipeline_details.connection, connection_url
),
service=EntityReference(
id=self.context.pipeline_service.id.__root__, type="pipelineService"
),
)
def fetch_pipeline_status(
self, workspace: dict, connection: dict, pipeline_fqn: str
) -> OMetaPipelineStatus:
def yield_pipeline_status(
self, pipeline_details: AirbytePipelineDetails
) -> Optional[OMetaPipelineStatus]:
"""
Method to get task & pipeline status
"""
# Airbyte does not offer specific attempt link, just at pipeline level
log_link = (
f"{self.service_connection.hostPort}/workspaces/{workspace.get('workspaceId')}/connections/"
f"{connection.get('connectionId')}/status"
f"{self.service_connection.hostPort}/workspaces/{pipeline_details.workspace.get('workspaceId')}/connections/"
f"{pipeline_details.connection.get('connectionId')}/status"
)
for job in self.client.list_jobs(connection.get("connectionId")):
for job in self.client.list_jobs(
pipeline_details.connection.get("connectionId")
):
if not job or not job.get("attempts"):
continue
for attempt in job["attempts"]:
task_status = [
TaskStatus(
name=str(connection.get("connectionId")),
name=str(pipeline_details.connection.get("connectionId")),
executionStatus=STATUS_MAP.get(
attempt["status"].lower(), StatusType.Pending
).value,
@ -186,21 +164,23 @@ class AirbyteSource(Source[CreatePipelineRequest]):
executionDate=attempt["createdAt"],
)
yield OMetaPipelineStatus(
pipeline_fqn=pipeline_fqn, pipeline_status=pipeline_status
pipeline_fqn=self.context.pipeline.fullyQualifiedName.__root__,
pipeline_status=pipeline_status,
)
def fetch_lineage(
self, connection: dict, pipeline_entity: Pipeline
def yield_pipeline_lineage_details(
self, pipeline_details: AirbytePipelineDetails
) -> Optional[Iterable[AddLineageRequest]]:
"""
Parse all the stream available in the connection and create a lineage between them
:param connection: connection object from airbyte
:param pipeline_entity: Pipeline we just ingested
:param pipeline_details: pipeline_details object from airbyte
:return: Lineage from inlets and outlets
"""
source_connection = self.client.get_source(connection.get("sourceId"))
source_connection = self.client.get_source(
pipeline_details.connection.get("sourceId")
)
destination_connection = self.client.get_destination(
connection.get("destinationId")
pipeline_details.connection.get("destinationId")
)
source_service = self.metadata.get_by_name(
entity=DatabaseService, fqn=source_connection.get("name")
@ -211,7 +191,9 @@ class AirbyteSource(Source[CreatePipelineRequest]):
if not source_service or not destination_service:
return
for task in connection.get("syncCatalog", {}).get("streams") or []:
for task in (
pipeline_details.connection.get("syncCatalog", {}).get("streams") or []
):
stream = task.get("stream")
from_fqn = fqn.build(
self.metadata,
@ -239,57 +221,32 @@ class AirbyteSource(Source[CreatePipelineRequest]):
yield AddLineageRequest(
edge=EntitiesEdge(
fromEntity=EntityReference(id=from_entity.id, type="table"),
toEntity=EntityReference(id=pipeline_entity.id, type="pipeline"),
toEntity=EntityReference(
id=self.context.pipeline.id.__root__, type="pipeline"
),
)
)
yield AddLineageRequest(
edge=EntitiesEdge(
toEntity=EntityReference(id=to_entity.id, type="table"),
fromEntity=EntityReference(id=pipeline_entity.id, type="pipeline"),
fromEntity=EntityReference(
id=self.context.pipeline.id.__root__, type="pipeline"
),
)
)
def next_record(self) -> Iterable[Entity]:
def get_pipelines_list(self) -> Iterable[AirbytePipelineDetails]:
"""
Extract metadata information to create Pipelines with Tasks
Get List of all pipelines
"""
for workspace in self.client.list_workspaces():
for connection in self.client.list_connections(
workflow_id=workspace.get("workspaceId")
):
try:
if filter_by_pipeline(
self.source_config.pipelineFilterPattern,
connection.get("connectionId"),
):
continue
yield from self.fetch_pipeline(connection, workspace)
pipeline_fqn = fqn.build(
self.metadata,
entity_type=Pipeline,
service_name=self.service.name.__root__,
pipeline_name=connection.get("connectionId"),
)
yield from self.fetch_pipeline_status(
workspace, connection, pipeline_fqn
)
if self.source_config.includeLineage:
pipeline_entity: Pipeline = self.metadata.get_by_name(
entity=Pipeline,
fqn=pipeline_fqn,
)
yield from self.fetch_lineage(connection, pipeline_entity) or []
yield AirbytePipelineDetails(workspace=workspace, connection=connection)
except Exception as err:
logger.error(repr(err))
logger.debug(traceback.format_exc())
self.status.failure(connection.get("connectionId"), repr(err))
def get_status(self):
return self.status
def close(self):
pass
def test_connection(self) -> None:
pass
def get_pipeline_name(self, pipeline_details: AirbytePipelineDetails) -> str:
"""
Get Pipeline Name
"""
return pipeline_details.connection.get("connectionId")

View File

@ -11,15 +11,12 @@
"""
Airflow source to extract metadata from OM UI
"""
import sys
import traceback
from collections.abc import Iterable
from dataclasses import dataclass, field
from typing import Any, Iterable, List, Optional, cast
from airflow.models import BaseOperator, DagRun
from airflow.models.serialized_dag import SerializedDagModel
from airflow.models.taskinstance import TaskInstance
from airflow.serialization.serialized_objects import SerializedDAG
from sqlalchemy.engine import Engine
from sqlalchemy.orm import Session
@ -40,44 +37,24 @@ from metadata.generated.schema.entity.services.connections.metadata.openMetadata
from metadata.generated.schema.entity.services.connections.pipeline.airflowConnection import (
AirflowConnection,
)
from metadata.generated.schema.entity.services.pipelineService import PipelineService
from metadata.generated.schema.metadataIngestion.pipelineServiceMetadataPipeline import (
PipelineServiceMetadataPipeline,
)
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.generated.schema.type.entityLineage import EntitiesEdge
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.common import Entity
from metadata.ingestion.api.source import InvalidSourceException, Source, SourceStatus
from metadata.ingestion.api.source import InvalidSourceException
from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils import fqn
from metadata.ingestion.source.pipeline.pipeline_service import PipelineServiceSource
from metadata.utils.connections import (
create_and_bind_session,
get_connection,
test_connection,
)
from metadata.utils.filters import filter_by_pipeline
from metadata.utils.helpers import datetime_to_ts
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
@dataclass
class AirflowSourceStatus(SourceStatus):
pipelines_scanned: List[str] = field(default_factory=list)
filtered: List[str] = field(default_factory=list)
def pipeline_scanned(self, topic: str) -> None:
self.pipelines_scanned.append(topic)
def dropped(self, topic: str) -> None:
self.filtered.append(topic)
STATUS_MAP = {
"success": StatusType.Successful.value,
"failed": StatusType.Failed.value,
@ -85,38 +62,24 @@ STATUS_MAP = {
}
class AirflowSource(Source[CreatePipelineRequest]):
class AirflowSource(PipelineServiceSource):
"""
Implements the necessary methods ot extract
Pipeline metadata from Airflow's metadata db
"""
config: WorkflowSource
report: AirflowSourceStatus
def __init__(
self,
config: WorkflowSource,
metadata_config: OpenMetadataConnection,
):
super().__init__()
self.config = config
self.source_config: PipelineServiceMetadataPipeline = (
self.config.sourceConfig.config
)
self.service_connection = self.config.serviceConnection.__root__.config
self.metadata_config = metadata_config
self.metadata = OpenMetadata(self.metadata_config)
self.status = AirflowSourceStatus()
self.service: PipelineService = self.metadata.get_service_or_create(
entity=PipelineService, config=config
)
self.numberOfStatus = (
self.config.serviceConnection.__root__.config.numberOfStatus
)
# Create the connection to the database
self._session = None
self.service_connection = config.serviceConnection.__root__.config
self.engine: Engine = get_connection(self.service_connection.connection)
super().__init__(config, metadata_config)
# Create the connection to the database
@classmethod
def create(cls, config_dict, metadata_config: OpenMetadataConnection):
@ -138,22 +101,20 @@ class AirflowSource(Source[CreatePipelineRequest]):
return self._session
def prepare(self):
pass
def get_pipeline_status(self, dag_id: str) -> DagRun:
dag_run_list: DagRun = (
self.session.query(DagRun)
.filter(DagRun.dag_id == dag_id)
.order_by(DagRun.execution_date.desc())
.limit(self.numberOfStatus)
.limit(self.config.serviceConnection.__root__.config.numberOfStatus)
)
return dag_run_list
def fetch_pipeline_status(
self, serialized_dag: SerializedDAG, pipeline_fqn: str
def yield_pipeline_status(
self, pipeline_details: SerializedDAG
) -> OMetaPipelineStatus:
dag_run_list = self.get_pipeline_status(serialized_dag.dag_id)
dag_run_list = self.get_pipeline_status(pipeline_details.dag_id)
for dag in dag_run_list:
if isinstance(dag.task_instances, Iterable):
tasks = dag.task_instances
@ -179,10 +140,11 @@ class AirflowSource(Source[CreatePipelineRequest]):
executionDate=dag.execution_date.timestamp(),
)
yield OMetaPipelineStatus(
pipeline_fqn=pipeline_fqn, pipeline_status=pipeline_status
pipeline_fqn=self.context.pipeline.fullyQualifiedName.__root__,
pipeline_status=pipeline_status,
)
def list_dags(self) -> Iterable[SerializedDagModel]:
def get_pipelines_list(self) -> Iterable[SerializedDagModel]:
"""
List all DAGs from the metadata db.
@ -192,6 +154,12 @@ class AirflowSource(Source[CreatePipelineRequest]):
for serialized_dag in self.session.query(SerializedDagModel).all():
yield serialized_dag
def get_pipeline_name(self, pipeline_details: SerializedDAG) -> str:
"""
Get Pipeline Name
"""
return pipeline_details.dag_id
@staticmethod
def get_tasks_from_dag(dag: SerializedDAG) -> List[Task]:
"""
@ -213,25 +181,26 @@ class AirflowSource(Source[CreatePipelineRequest]):
for task in cast(Iterable[BaseOperator], dag.tasks)
]
def fetch_pipeline(
self, serialized_dag: SerializedDagModel
def yield_pipeline(
self, pipeline_details: SerializedDagModel
) -> Iterable[CreatePipelineRequest]:
"""
Convert a DAG into a Pipeline Entity
:param serialized_dag: SerializedDAG from airflow metadata DB
:return: Create Pipeline request with tasks
"""
dag: SerializedDAG = serialized_dag.dag
dag: SerializedDAG = pipeline_details.dag
yield CreatePipelineRequest(
name=serialized_dag.dag_id,
name=pipeline_details.dag_id,
description=dag.description,
pipelineUrl=f"/tree?dag_id={dag.dag_id}", # Just the suffix
concurrency=dag.concurrency,
pipelineLocation=serialized_dag.fileloc,
pipelineLocation=pipeline_details.fileloc,
startDate=dag.start_date.isoformat() if dag.start_date else None,
tasks=self.get_tasks_from_dag(dag),
service=EntityReference(id=self.service.id, type="pipelineService"),
service=EntityReference(
id=self.context.pipeline_service.id.__root__, type="pipelineService"
),
)
@staticmethod
@ -280,16 +249,15 @@ class AirflowSource(Source[CreatePipelineRequest]):
logger.warn(f"Error trying to parse outlets - {err}")
return None
def fetch_lineage(
self, serialized_dag: SerializedDagModel, pipeline_entity: Pipeline
) -> Iterable[AddLineageRequest]:
def yield_pipeline_lineage_details(
self, pipeline_details: SerializedDagModel
) -> Optional[Iterable[AddLineageRequest]]:
"""
Parse xlets and add lineage between Pipelines and Tables
:param serialized_dag: SerializedDAG from airflow metadata DB
:param pipeline_entity: Pipeline we just ingested
:param pipeline_details: SerializedDAG from airflow metadata DB
:return: Lineage from inlets and outlets
"""
dag: SerializedDAG = serialized_dag.dag
dag: SerializedDAG = pipeline_details.dag
for task in dag.tasks:
for table_fqn in self.get_inlets(task) or []:
@ -303,14 +271,14 @@ class AirflowSource(Source[CreatePipelineRequest]):
id=table_entity.id, type="table"
),
toEntity=EntityReference(
id=pipeline_entity.id, type="pipeline"
id=self.context.pipeline.id.__root__, type="pipeline"
),
)
)
else:
logger.warn(
f"Could not find Table [{table_fqn}] from "
f"[{pipeline_entity.fullyQualifiedName.__root__}] inlets"
f"[{self.context.pipeline.fullyQualifiedName.__root__}] inlets"
)
for table_fqn in self.get_outlets(task) or []:
@ -321,7 +289,7 @@ class AirflowSource(Source[CreatePipelineRequest]):
yield AddLineageRequest(
edge=EntitiesEdge(
fromEntity=EntityReference(
id=pipeline_entity.id, type="pipeline"
id=self.context.pipeline.id.__root__, type="pipeline"
),
toEntity=EntityReference(id=table_entity.id, type="table"),
)
@ -329,45 +297,9 @@ class AirflowSource(Source[CreatePipelineRequest]):
else:
logger.warn(
f"Could not find Table [{table_fqn}] from "
f"[{pipeline_entity.fullyQualifiedName.__root__}] outlets"
f"[{self.context.pipeline.fullyQualifiedName.__root__}] outlets"
)
def next_record(self) -> Iterable[Entity]:
"""
Extract metadata information to create Pipelines with Tasks
and lineage
"""
for serialized_dag in self.list_dags():
try:
if not filter_by_pipeline(
self.source_config.pipelineFilterPattern, serialized_dag.dag_id
):
yield from self.fetch_pipeline(serialized_dag)
pipeline_fqn = fqn.build(
self.metadata,
entity_type=Pipeline,
service_name=self.service.name.__root__,
pipeline_name=serialized_dag.dag_id,
)
yield from self.fetch_pipeline_status(serialized_dag, pipeline_fqn)
if self.source_config.includeLineage:
pipeline_entity: Pipeline = self.metadata.get_by_name(
entity=Pipeline,
fqn=pipeline_fqn,
)
yield from self.fetch_lineage(serialized_dag, pipeline_entity)
else:
self.status.dropped(serialized_dag.dag_id)
except Exception as err:
logger.error(repr(err))
logger.debug(traceback.format_exc())
logger.debug(sys.exc_info()[2])
self.status.failure(serialized_dag.dag_id, repr(err))
def get_status(self):
return self.status
def close(self):
self.session.close()

View File

@ -8,35 +8,30 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import traceback
import uuid
from typing import Iterable
from typing import Any, Iterable, Optional
from metadata.generated.schema.entity.data.pipeline import Pipeline, Task
from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.pipeline import (
PipelineStatus,
StatusType,
Task,
TaskStatus,
)
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.entity.services.connections.pipeline.glueConnection import (
GlueConnection,
)
from metadata.generated.schema.entity.services.connections.pipeline.glueConnection import (
GlueConnection as GluePipelineConnection,
)
from metadata.generated.schema.entity.services.pipelineService import (
PipelineConnection,
PipelineService,
)
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.common import Entity
from metadata.ingestion.api.source import InvalidSourceException, Source, SourceStatus
from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.database.common_db_source import SQLSourceStatus
from metadata.utils.connections import get_connection, test_connection
from metadata.ingestion.api.source import InvalidSourceException
from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus
from metadata.ingestion.source.pipeline.pipeline_service import PipelineServiceSource
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
@ -44,31 +39,22 @@ logger = ingestion_logger()
GRAPH = "Graph"
NODES = "Nodes"
NAME = "Name"
JOB_TYPE = "JOB"
STATUS_MAP = {
"cancelled": StatusType.Failed,
"succeeded": StatusType.Successful,
"failed": StatusType.Failed,
"running": StatusType.Pending,
"incomplete": StatusType.Failed,
"pending": StatusType.Pending,
}
class GlueSource(Source[Entity]):
class GlueSource(PipelineServiceSource):
def __init__(self, config: WorkflowSource, metadata_config: OpenMetadataConnection):
super().__init__()
self.status = SQLSourceStatus()
self.config = config
self.metadata_config = metadata_config
self.metadata = OpenMetadata(metadata_config)
self.service_connection = self.config.serviceConnection.__root__.config
self.pipeline_service = self.metadata.get_service_or_create(
entity=PipelineService,
config=WorkflowSource(
type="glue",
serviceName=self.config.serviceName,
serviceConnection=PipelineConnection(
config=GluePipelineConnection(
awsConfig=self.service_connection.awsConfig
),
),
sourceConfig={},
),
)
self.connection = get_connection(self.service_connection)
super().__init__(config, metadata_config)
self.task_id_mapping = {}
self.job_name_list = set()
self.glue = self.connection.client
@classmethod
@ -81,55 +67,101 @@ class GlueSource(Source[Entity]):
)
return cls(config, metadata_config)
def prepare(self):
pass
def get_pipelines_list(self) -> Iterable[dict]:
"""
Get List of all pipelines
"""
for workflow in self.glue.list_workflows()["Workflows"]:
jobs = self.glue.get_workflow(Name=workflow, IncludeGraph=True)["Workflow"]
yield jobs
def next_record(self) -> Iterable[Entity]:
def get_pipeline_name(self, pipeline_details: dict) -> str:
"""
Get Pipeline Name
"""
return pipeline_details[NAME]
yield from self.ingest_pipelines()
def yield_pipeline(self, pipeline_details: Any) -> Iterable[CreatePipelineRequest]:
"""
Method to Get Pipeline Entity
"""
self.job_name_list = set()
pipeline_ev = CreatePipelineRequest(
name=pipeline_details[NAME],
displayName=pipeline_details[NAME],
description="",
tasks=self.get_tasks(pipeline_details),
service=EntityReference(
id=self.context.pipeline_service.id.__root__, type="pipelineService"
),
)
yield pipeline_ev
def get_tasks(self, tasks):
def get_tasks(self, pipeline_details: Any) -> Task:
task_list = []
for task in tasks[GRAPH][NODES]:
for task in pipeline_details["Graph"]["Nodes"]:
self.task_id_mapping[task["UniqueId"]] = task["Name"][:128]
if task["Type"] == JOB_TYPE:
self.job_name_list.add(task[NAME])
for task in pipeline_details[GRAPH][NODES]:
task_list.append(
Task(
name=task[NAME],
displayName=task[NAME],
taskType=task["Type"],
downstreamTasks=self.get_downstream_tasks(
task["UniqueId"], tasks[GRAPH]
task["UniqueId"], pipeline_details[GRAPH]
),
)
)
return task_list
def ingest_pipelines(self) -> Iterable[OMetaDatabaseAndTable]:
try:
for workflow in self.glue.list_workflows()["Workflows"]:
jobs = self.glue.get_workflow(Name=workflow, IncludeGraph=True)[
"Workflow"
]
tasks = self.get_tasks(jobs)
pipeline_ev = Pipeline(
id=uuid.uuid4(),
name=jobs[NAME],
displayName=jobs[NAME],
description="",
tasks=tasks,
service=EntityReference(
id=self.pipeline_service.id, type="pipelineService"
),
)
yield pipeline_ev
except Exception as err:
logger.debug(traceback.format_exc())
logger.error(err)
def get_downstream_tasks(self, task_unique_id, tasks):
downstream_tasks = []
for edges in tasks["Edges"]:
if edges["SourceId"] == task_unique_id and self.task_id_mapping.get(
edges["DestinationId"]
):
downstream_tasks.append(self.task_id_mapping[edges["DestinationId"]])
return downstream_tasks
def close(self):
pass
def yield_pipeline_status(
self, pipeline_details: Any
) -> Iterable[OMetaPipelineStatus]:
for job in self.job_name_list:
try:
runs = self.glue.get_job_runs(JobName=job)
runs = runs.get("JobRuns", [])
for attempt in runs:
task_status = []
task_status.append(
TaskStatus(
name=attempt["JobName"],
executionStatus=STATUS_MAP.get(
attempt["JobRunState"].lower(), StatusType.Pending
).value,
startTime=attempt["StartedOn"].timestamp(),
endTime=attempt["CompletedOn"].timestamp(),
)
)
pipeline_status = PipelineStatus(
taskStatus=task_status,
executionDate=attempt["StartedOn"].timestamp(),
executionStatus=STATUS_MAP.get(
attempt["JobRunState"].lower(), StatusType.Pending
).value,
)
yield OMetaPipelineStatus(
pipeline_fqn=self.context.pipeline.fullyQualifiedName.__root__,
pipeline_status=pipeline_status,
)
except Exception as err:
logger.debug(traceback.format_exc())
logger.error(err)
def get_status(self) -> SourceStatus:
return self.status
def test_connection(self) -> None:
test_connection(self.connection)
def yield_pipeline_lineage_details(
self, pipeline_details: Any
) -> Optional[Iterable[AddLineageRequest]]:
"""
Get lineage between pipeline and data sources
"""

View File

@ -0,0 +1,227 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Base class for ingesting database services
"""
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Any, Iterable, List, Optional
from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.pipeline import Pipeline
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.entity.services.pipelineService import (
PipelineConnection,
PipelineService,
)
from metadata.generated.schema.metadataIngestion.pipelineServiceMetadataPipeline import (
PipelineServiceMetadataPipeline,
)
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.ingestion.api.source import Source, SourceStatus
from metadata.ingestion.api.topology_runner import TopologyRunnerMixin
from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus
from metadata.ingestion.models.topology import (
NodeStage,
ServiceTopology,
TopologyNode,
create_source_context,
)
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils.connections import get_connection, test_connection
from metadata.utils.filters import filter_by_dashboard
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
class PipelineServiceTopology(ServiceTopology):
"""
Defines the hierarchy in Pipeline Services.
We could have a topology validator. We can only consume
data that has been produced by any parent node.
"""
root = TopologyNode(
producer="get_services",
stages=[
NodeStage(
type_=PipelineService,
context="pipeline_service",
processor="yield_pipeline_service",
),
],
children=["pipeline"],
)
pipeline = TopologyNode(
producer="get_pipeline",
stages=[
NodeStage(
type_=Pipeline,
context="pipeline",
processor="yield_pipeline",
consumer=["pipeline_service"],
),
NodeStage(
type_=OMetaPipelineStatus,
context="pipeline_status",
processor="yield_pipeline_status",
consumer=["pipeline_service"],
nullable=True,
ack_sink=False,
),
NodeStage(
type_=AddLineageRequest,
context="lineage",
processor="yield_pipeline_lineage",
consumer=["pipeline_service"],
ack_sink=False,
nullable=True,
),
],
)
@dataclass
class PipelineSourceStatus(SourceStatus):
"""
Reports the source status after ingestion
"""
pipelines_scanned: List[str] = field(default_factory=list)
filtered: List[str] = field(default_factory=list)
def pipeline_scanned(self, topic: str) -> None:
self.pipelines_scanned.append(topic)
def dropped(self, topic: str) -> None:
self.filtered.append(topic)
class PipelineServiceSource(TopologyRunnerMixin, Source, ABC):
"""
Base class for Pipeline Services.
It implements the topology and context.
"""
@abstractmethod
def yield_pipeline(self, pipeline_details: Any) -> Iterable[CreatePipelineRequest]:
"""
Method to Get Pipeline Entity
"""
@abstractmethod
def yield_pipeline_lineage_details(
self, pipeline_details: Any
) -> Optional[Iterable[AddLineageRequest]]:
"""
Get lineage between pipeline and data sources
"""
@abstractmethod
def get_pipelines_list(self) -> Optional[List[Any]]:
"""
Get List of all pipelines
"""
@abstractmethod
def get_pipeline_name(self, pipeline_details: Any) -> str:
"""
Get Pipeline Name
"""
@abstractmethod
def yield_pipeline_status(
self, pipeline_details: Any
) -> Optional[OMetaPipelineStatus]:
"""
Get Pipeline Status
"""
def yield_pipeline_lineage(
self, pipeline_details: Any
) -> Iterable[AddLineageRequest]:
"""
Yields lineage if config is enabled
"""
if self.source_config.includeLineage:
yield from self.yield_pipeline_lineage_details(pipeline_details) or []
status: PipelineSourceStatus
source_config: PipelineServiceMetadataPipeline
config: WorkflowSource
metadata: OpenMetadata
# Big union of types we want to fetch dynamically
service_connection: PipelineConnection.__fields__["config"].type_
topology = PipelineServiceTopology()
context = create_source_context(topology)
@abstractmethod
def __init__(
self,
config: WorkflowSource,
metadata_config: OpenMetadataConnection,
):
super().__init__()
self.config = config
self.metadata_config = metadata_config
self.metadata = OpenMetadata(metadata_config)
self.service_connection = self.config.serviceConnection.__root__.config
self.source_config: PipelineServiceMetadataPipeline = (
self.config.sourceConfig.config
)
self.connection = get_connection(self.service_connection)
self.test_connection()
self.status = PipelineSourceStatus()
def get_status(self) -> SourceStatus:
return self.status
def close(self):
"""
Method to implement any required logic after the ingesion process is completed
"""
def get_services(self) -> Iterable[WorkflowSource]:
yield self.config
def yield_pipeline_service(self, config: WorkflowSource):
yield self.metadata.get_create_service_from_source(
entity=PipelineService, config=config
)
def get_pipeline(self) -> Any:
for pipeline_detail in self.get_pipelines_list():
if filter_by_dashboard(
self.source_config.pipelineFilterPattern,
self.get_pipeline_name(pipeline_detail),
):
self.status.filter(
self.get_pipeline_name(pipeline_detail),
"Pipeline Pattern not Allowed",
)
continue
yield pipeline_detail
def test_connection(self) -> None:
test_connection(self.connection)
def prepare(self):
"""
Method to implement any required logic before starting the ingesion process
"""

View File

@ -0,0 +1,218 @@
{
"workspace": [
{
"workspaceId": "af5680ec-2687-4fe0-bd55-5ad5f020a603",
"customerId": "de633783-f0f2-4248-9ea2-092c4b5e1165",
"email": "mayur@getcollate.io",
"name": "af5680ec-2687-4fe0-bd55-5ad5f020a603",
"slug": "af5680ec-2687-4fe0-bd55-5ad5f020a603",
"initialSetupComplete": true,
"displaySetupWizard": false,
"anonymousDataCollection": false,
"news": false,
"securityUpdates": false,
"notifications": []
}
],
"connection": [
{
"connectionId": "a10f6d82-4fc6-4c90-ba04-bb773c8fbb0f",
"name": "MSSQL <> Postgres",
"namespaceDefinition": "source",
"namespaceFormat": "${SOURCE_NAMESPACE}",
"prefix": "",
"sourceId": "ed3783a9-8479-4be0-80aa-2075ccc6e310",
"destinationId": "4181764f-3ed8-44cb-9b8e-fefff7fb5ea9",
"operationIds": ["942463dd-f37c-4794-8931-88da6c166a04"],
"syncCatalog": {
"streams": [
{
"stream": {
"name": "brands",
"jsonSchema": {
"type": "object",
"properties": {
"brand_id": {"type": "number"},
"brand_name": {"type": "string"}
}
},
"supportedSyncModes": ["full_refresh", "incremental"],
"defaultCursorField": [],
"sourceDefinedPrimaryKey": [["brand_id"]],
"namespace": "production"
},
"config": {
"syncMode": "full_refresh",
"cursorField": [],
"destinationSyncMode": "overwrite",
"primaryKey": [["brand_id"]],
"aliasName": "brands",
"selected": true
}
},
{
"stream": {
"name": "categories",
"jsonSchema": {
"type": "object",
"properties": {
"category_id": {"type": "number"},
"category_name": {"type": "string"}
}
},
"supportedSyncModes": ["full_refresh", "incremental"],
"defaultCursorField": [],
"sourceDefinedPrimaryKey": [["category_id"]],
"namespace": "production"
},
"config": {
"syncMode": "full_refresh",
"cursorField": [],
"destinationSyncMode": "overwrite",
"primaryKey": [["category_id"]],
"aliasName": "categories",
"selected": true
}
},
{
"stream": {
"name": "products",
"jsonSchema": {
"type": "object",
"properties": {
"brand_id": {"type": "number"},
"list_price": {"type": "number"},
"model_year": {"type": "number"},
"product_id": {"type": "number"},
"category_id": {"type": "number"},
"product_name": {"type": "string"}
}
},
"supportedSyncModes": ["full_refresh", "incremental"],
"defaultCursorField": [],
"sourceDefinedPrimaryKey": [["product_id"]],
"namespace": "production"
},
"config": {
"syncMode": "full_refresh",
"cursorField": [],
"destinationSyncMode": "overwrite",
"primaryKey": [["product_id"]],
"aliasName": "products",
"selected": true
}
},
{
"stream": {
"name": "stocks",
"jsonSchema": {
"type": "object",
"properties": {
"quantity": {"type": "number"},
"store_id": {"type": "number"},
"product_id": {"type": "number"}
}
},
"supportedSyncModes": ["full_refresh", "incremental"],
"defaultCursorField": [],
"sourceDefinedPrimaryKey": [["store_id"], ["product_id"]],
"namespace": "production"
},
"config": {
"syncMode": "full_refresh",
"cursorField": [],
"destinationSyncMode": "overwrite",
"primaryKey": [["store_id"], ["product_id"]],
"aliasName": "stocks",
"selected": true
}
}
]
},
"schedule": {"units": 24, "timeUnit": "hours"},
"status": "inactive"
}
],
"jobs": [
{
"job": {
"id": 14,
"configType": "sync",
"configId": "a10f6d82-4fc6-4c90-ba04-bb773c8fbb0f",
"createdAt": 1655482894,
"updatedAt": 1655482894,
"status": "running"
},
"attempts": [
{
"id": 0,
"status": "running",
"createdAt": 1655482894,
"updatedAt": 1655482894,
"streamStats": []
}
]
},
{
"job": {
"id": 13,
"configType": "sync",
"configId": "a10f6d82-4fc6-4c90-ba04-bb773c8fbb0f",
"createdAt": 1655393914,
"updatedAt": 1655394054,
"status": "succeeded"
},
"attempts": [
{
"id": 0,
"status": "succeeded",
"createdAt": 1655393914,
"updatedAt": 1655394054,
"endedAt": 1655394054,
"bytesSynced": 85607,
"recordsSynced": 1276,
"totalStats": {
"recordsEmitted": 1276,
"bytesEmitted": 85607,
"stateMessagesEmitted": 0,
"recordsCommitted": 1276
},
"streamStats": [
{
"streamName": "brands",
"stats": {
"recordsEmitted": 9,
"bytesEmitted": 333,
"recordsCommitted": 9
}
},
{
"streamName": "categories",
"stats": {
"recordsEmitted": 7,
"bytesEmitted": 359,
"recordsCommitted": 7
}
},
{
"streamName": "stocks",
"stats": {
"recordsEmitted": 939,
"bytesEmitted": 41608,
"recordsCommitted": 939
}
},
{
"streamName": "products",
"stats": {
"recordsEmitted": 321,
"bytesEmitted": 43307,
"recordsCommitted": 321
}
}
]
}
]
}
]
}

View File

@ -0,0 +1,177 @@
import json
from unittest import TestCase
from unittest.mock import patch
from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest
from metadata.generated.schema.entity.data.pipeline import (
Pipeline,
PipelineStatus,
StatusType,
Task,
TaskStatus,
)
from metadata.generated.schema.entity.services.pipelineService import (
PipelineConnection,
PipelineService,
PipelineServiceType,
)
from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataWorkflowConfig,
)
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus
from metadata.ingestion.source.pipeline.airbyte import (
AirbytePipelineDetails,
AirbyteSource,
)
mock_data_file = open("ingestion/tests/unit/resources/datasets/airbyte_dataset.json")
mock_data: dict = json.load(mock_data_file)
mock_airbyte_config = {
"source": {
"type": "airbyte",
"serviceName": "airbyte_source",
"serviceConnection": {
"config": {"type": "Airbyte", "hostPort": "http://localhost:1234"}
},
"sourceConfig": {"config": {"type": "PipelineMetadata"}},
},
"sink": {"type": "metadata-rest", "config": {}},
"workflowConfig": {
"openMetadataServerConfig": {
"hostPort": "http://localhost:8585/api",
"authProvider": "no-auth",
}
},
}
EXPECTED_ARIBYTE_DETAILS = AirbytePipelineDetails(
workspace=mock_data["workspace"][0], connection=mock_data["connection"][0]
)
MOCK_CONNECTION_URI_PATH = "/workspaces/af5680ec-2687-4fe0-bd55-5ad5f020a603/connections/a10f6d82-4fc6-4c90-ba04-bb773c8fbb0f"
MOCK_LOG_URL = f"http://localhost:1234{MOCK_CONNECTION_URI_PATH}"
EXPECTED_PIPELINE_STATUS = [
OMetaPipelineStatus(
pipeline_fqn="airbyte_source.a10f6d82-4fc6-4c90-ba04-bb773c8fbb0f",
pipeline_status=PipelineStatus(
executionStatus=StatusType.Pending.value,
taskStatus=[
TaskStatus(
name="a10f6d82-4fc6-4c90-ba04-bb773c8fbb0f",
executionStatus=StatusType.Pending.value,
startTime=1655482894,
endTime=None,
logLink=f"{MOCK_LOG_URL}/status",
)
],
executionDate=1655482894,
),
),
OMetaPipelineStatus(
pipeline_fqn="airbyte_source.a10f6d82-4fc6-4c90-ba04-bb773c8fbb0f",
pipeline_status=PipelineStatus(
executionStatus=StatusType.Successful.value,
taskStatus=[
TaskStatus(
name="a10f6d82-4fc6-4c90-ba04-bb773c8fbb0f",
executionStatus=StatusType.Successful.value,
startTime=1655393914,
endTime=1655394054,
logLink=f"{MOCK_LOG_URL}/status",
)
],
executionDate=1655393914,
),
),
]
EXPECTED_CREATED_PIPELINES = CreatePipelineRequest(
name="a10f6d82-4fc6-4c90-ba04-bb773c8fbb0f",
displayName="MSSQL <> Postgres",
description="",
pipelineUrl=MOCK_CONNECTION_URI_PATH,
tasks=[
Task(
name="a10f6d82-4fc6-4c90-ba04-bb773c8fbb0f",
displayName="MSSQL <> Postgres",
description="",
taskUrl=f"{MOCK_CONNECTION_URI_PATH}/status",
)
],
service=EntityReference(
id="85811038-099a-11ed-861d-0242ac120002", type="pipelineService"
),
)
MOCK_PIPELINE_SERVICE = PipelineService(
id="85811038-099a-11ed-861d-0242ac120002",
name="airbyte_source",
connection=PipelineConnection(),
serviceType=PipelineServiceType.Airbyte,
)
MOCK_PIPELINE = Pipeline(
id="2aaa012e-099a-11ed-861d-0242ac120002",
name="a10f6d82-4fc6-4c90-ba04-bb773c8fbb0f",
fullyQualifiedName="airbyte_source.a10f6d82-4fc6-4c90-ba04-bb773c8fbb0f",
displayName="MSSQL <> Postgres",
description="",
pipelineUrl=MOCK_CONNECTION_URI_PATH,
tasks=[
Task(
name="a10f6d82-4fc6-4c90-ba04-bb773c8fbb0f",
displayName="MSSQL <> Postgres",
description="",
taskUrl=f"{MOCK_CONNECTION_URI_PATH}/status",
)
],
service=EntityReference(
id="85811038-099a-11ed-861d-0242ac120002", type="pipelineService"
),
)
class AirbyteUnitTest(TestCase):
def __init__(self, methodName: str = ...) -> None:
super().__init__(methodName)
self.init_workflow()
@patch("metadata.ingestion.source.pipeline.pipeline_service.test_connection")
@patch("metadata.ingestion.source.pipeline.airbyte.AirbyteClient")
def init_workflow(self, airbyte_client, test_connection):
test_connection.return_value = False
config = OpenMetadataWorkflowConfig.parse_obj(mock_airbyte_config)
self.aribyte = AirbyteSource.create(
mock_airbyte_config["source"],
config.workflowConfig.openMetadataServerConfig,
)
self.aribyte.context.__dict__["pipeline"] = MOCK_PIPELINE
self.aribyte.context.__dict__["pipeline_service"] = MOCK_PIPELINE_SERVICE
self.client = airbyte_client.return_value
self.client.list_jobs.return_value = mock_data.get("jobs")
self.client.list_workspaces.return_value = mock_data.get("workspace")
self.client.list_connections.return_value = mock_data.get("connection")
def test_pipeline_list(self):
assert list(self.aribyte.get_pipelines_list())[0] == EXPECTED_ARIBYTE_DETAILS
def test_pipeline_name(self):
assert self.aribyte.get_pipeline_name(
EXPECTED_ARIBYTE_DETAILS
) == mock_data.get("connection")[0].get("connectionId")
def test_pipelines(self):
pipline = list(self.aribyte.yield_pipeline(EXPECTED_ARIBYTE_DETAILS))[0]
assert pipline == EXPECTED_CREATED_PIPELINES
def test_pipeline_status(self):
assert (
list(self.aribyte.yield_pipeline_status(EXPECTED_ARIBYTE_DETAILS))
== EXPECTED_PIPELINE_STATUS
)