Fix #8495: Update pipeline sources for pipeline as edge lineage (#8556)

This commit is contained in:
Mayur Singal 2022-11-08 14:13:49 +05:30 committed by GitHub
parent abc9247899
commit 710e3d785f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 82 additions and 62 deletions

View File

@ -18,6 +18,7 @@ from metadata.generated.schema.entity.services.connections.pipeline.airbyteConne
AirbyteConnection,
)
from metadata.ingestion.ometa.client import REST, APIError, ClientConfig
from metadata.utils.credentials import generate_http_basic_token
class AirbyteClient:
@ -33,6 +34,15 @@ class AirbyteClient:
auth_header="Authorization",
auth_token=lambda: ("no_token", 0),
)
if self.config.username:
client_config.auth_token_mode = "Basic"
client_config.auth_token = lambda: (
generate_http_basic_token(
self.config.username, self.config.password.get_secret_value()
),
0,
)
self.client = REST(client_config)
def list_workspaces(self) -> List[dict]:

View File

@ -36,7 +36,7 @@ from metadata.generated.schema.entity.services.databaseService import DatabaseSe
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.generated.schema.type.entityLineage import EntitiesEdge
from metadata.generated.schema.type.entityLineage import EntitiesEdge, LineageDetails
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.source import InvalidSourceException
from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus
@ -216,25 +216,23 @@ class AirbyteSource(PipelineServiceSource):
service_name=destination_connection.get("name"),
)
if not from_fqn and not to_fqn:
continue
from_entity = self.metadata.get_by_name(entity=Table, fqn=from_fqn)
to_entity = self.metadata.get_by_name(entity=Table, fqn=to_fqn)
if not from_entity and not to_entity:
continue
lineage_details = LineageDetails(
pipeline=EntityReference(
id=self.context.pipeline.id.__root__, type="pipeline"
)
)
yield AddLineageRequest(
edge=EntitiesEdge(
fromEntity=EntityReference(id=from_entity.id, type="table"),
toEntity=EntityReference(
id=self.context.pipeline.id.__root__, type="pipeline"
),
)
)
yield AddLineageRequest(
edge=EntitiesEdge(
toEntity=EntityReference(id=to_entity.id, type="table"),
fromEntity=EntityReference(
id=self.context.pipeline.id.__root__, type="pipeline"
),
lineageDetails=lineage_details,
)
)

View File

@ -39,7 +39,7 @@ from metadata.generated.schema.entity.services.connections.pipeline.airflowConne
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.generated.schema.type.entityLineage import EntitiesEdge
from metadata.generated.schema.type.entityLineage import EntitiesEdge, LineageDetails
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.source import InvalidSourceException
from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus
@ -404,46 +404,40 @@ class AirflowSource(PipelineServiceSource):
:return: Lineage from inlets and outlets
"""
dag: SerializedDAG = self._build_dag(pipeline_details.data)
lineage_details = LineageDetails(
pipeline=EntityReference(
id=self.context.pipeline.id.__root__, type="pipeline"
)
)
for task in dag.tasks:
for table_fqn in self.get_inlets(task) or []:
table_entity: Table = self.metadata.get_by_name(
entity=Table, fqn=table_fqn
)
if table_entity:
yield AddLineageRequest(
edge=EntitiesEdge(
fromEntity=EntityReference(
id=table_entity.id, type="table"
),
toEntity=EntityReference(
id=self.context.pipeline.id.__root__, type="pipeline"
),
)
)
for from_fqn in self.get_inlets(task) or []:
from_entity = self.metadata.get_by_name(entity=Table, fqn=from_fqn)
if from_entity:
for to_fqn in self.get_outlets(task) or []:
to_entity = self.metadata.get_by_name(entity=Table, fqn=to_fqn)
if to_entity:
lineage = AddLineageRequest(
edge=EntitiesEdge(
fromEntity=EntityReference(
id=from_entity.id, type="table"
),
toEntity=EntityReference(
id=to_entity.id, type="table"
),
lineageDetails=lineage_details,
)
)
yield lineage
else:
logger.warning(
f"Could not find Table [{to_fqn}] from "
f"[{self.context.pipeline_entity.fullyQualifiedName.__root__}] outlets"
)
else:
logger.warning(
f"Could not find Table [{table_fqn}] from "
f"[{self.context.pipeline.fullyQualifiedName.__root__}] inlets"
)
for table_fqn in self.get_outlets(task) or []:
table_entity: Table = self.metadata.get_by_name(
entity=Table, fqn=table_fqn
)
if table_entity:
yield AddLineageRequest(
edge=EntitiesEdge(
fromEntity=EntityReference(
id=self.context.pipeline.id.__root__, type="pipeline"
),
toEntity=EntityReference(id=table_entity.id, type="table"),
)
)
else:
logger.warning(
f"Could not find Table [{table_fqn}] from "
f"[{self.context.pipeline.fullyQualifiedName.__root__}] outlets"
f"Could not find Table [{from_fqn}] from "
f"[{self.context.pipeline_entity.fullyQualifiedName.__root__}] inlets"
)
def close(self):

View File

@ -31,7 +31,7 @@ from metadata.generated.schema.entity.services.databaseService import DatabaseSe
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.generated.schema.type.entityLineage import EntitiesEdge
from metadata.generated.schema.type.entityLineage import EntitiesEdge, LineageDetails
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.source import InvalidSourceException
from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus
@ -170,20 +170,17 @@ class FivetranSource(PipelineServiceSource):
if not from_entity or not to_entity:
logger.info(f"Lineage Skipped for {from_fqn} - {to_fqn}")
continue
lineage_details = LineageDetails(
pipeline=EntityReference(
id=self.context.pipeline.id.__root__, type="pipeline"
)
)
yield AddLineageRequest(
edge=EntitiesEdge(
fromEntity=EntityReference(id=from_entity.id, type="table"),
toEntity=EntityReference(
id=self.context.pipeline.id.__root__, type="pipeline"
),
)
)
yield AddLineageRequest(
edge=EntitiesEdge(
toEntity=EntityReference(id=to_entity.id, type="table"),
fromEntity=EntityReference(
id=self.context.pipeline.id.__root__, type="pipeline"
),
lineageDetails=lineage_details,
)
)

View File

@ -11,6 +11,7 @@
"""
Credentials helper module
"""
import base64
import json
import os
import tempfile
@ -130,3 +131,12 @@ def set_google_credentials(gcs_credentials: GCSCredentials) -> None:
f"Error trying to set GCS credentials with {gcs_credentials}."
" Check https://docs.open-metadata.org/openmetadata/connectors/database/bigquery "
)
def generate_http_basic_token(username, password):
"""
Generates a HTTP basic token from username and password
Returns a token string (not a byte)
"""
token = base64.b64encode(f"{username}:{password}".encode("utf-8")).decode("utf-8")
return token

View File

@ -26,6 +26,17 @@
"type": "string",
"format": "uri"
},
"username": {
"title": "Username",
"description": "Username to connect to Airbyte.",
"type": "string"
},
"password": {
"title": "Password",
"description": "Password to connect to Airbyte.",
"type": "string",
"format": "password"
},
"supportsMetadataExtraction": {
"title": "Supports Metadata Extraction",
"$ref": "../connectionBasicType.json#/definitions/supportsMetadataExtraction"