diff --git a/ingestion/src/metadata/clients/airbyte_client.py b/ingestion/src/metadata/clients/airbyte_client.py index 521bfcb2065..7ef9d83096e 100644 --- a/ingestion/src/metadata/clients/airbyte_client.py +++ b/ingestion/src/metadata/clients/airbyte_client.py @@ -18,6 +18,7 @@ from metadata.generated.schema.entity.services.connections.pipeline.airbyteConne AirbyteConnection, ) from metadata.ingestion.ometa.client import REST, APIError, ClientConfig +from metadata.utils.credentials import generate_http_basic_token class AirbyteClient: @@ -33,6 +34,15 @@ class AirbyteClient: auth_header="Authorization", auth_token=lambda: ("no_token", 0), ) + if self.config.username: + client_config.auth_token_mode = "Basic" + client_config.auth_token = lambda: ( + generate_http_basic_token( + self.config.username, self.config.password.get_secret_value() + ), + 0, + ) + self.client = REST(client_config) def list_workspaces(self) -> List[dict]: diff --git a/ingestion/src/metadata/ingestion/source/pipeline/airbyte.py b/ingestion/src/metadata/ingestion/source/pipeline/airbyte.py index e6559ff3880..a660604e8ce 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/airbyte.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/airbyte.py @@ -36,7 +36,7 @@ from metadata.generated.schema.entity.services.databaseService import DatabaseSe from metadata.generated.schema.metadataIngestion.workflow import ( Source as WorkflowSource, ) -from metadata.generated.schema.type.entityLineage import EntitiesEdge +from metadata.generated.schema.type.entityLineage import EntitiesEdge, LineageDetails from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.api.source import InvalidSourceException from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus @@ -216,25 +216,23 @@ class AirbyteSource(PipelineServiceSource): service_name=destination_connection.get("name"), ) - if not from_fqn and not to_fqn: - continue - from_entity = self.metadata.get_by_name(entity=Table, fqn=from_fqn) to_entity = self.metadata.get_by_name(entity=Table, fqn=to_fqn) + + if not from_entity and not to_entity: + continue + + lineage_details = LineageDetails( + pipeline=EntityReference( + id=self.context.pipeline.id.__root__, type="pipeline" + ) + ) + yield AddLineageRequest( edge=EntitiesEdge( fromEntity=EntityReference(id=from_entity.id, type="table"), - toEntity=EntityReference( - id=self.context.pipeline.id.__root__, type="pipeline" - ), - ) - ) - yield AddLineageRequest( - edge=EntitiesEdge( toEntity=EntityReference(id=to_entity.id, type="table"), - fromEntity=EntityReference( - id=self.context.pipeline.id.__root__, type="pipeline" - ), + lineageDetails=lineage_details, ) ) diff --git a/ingestion/src/metadata/ingestion/source/pipeline/airflow.py b/ingestion/src/metadata/ingestion/source/pipeline/airflow.py index e36e9819475..08d96dd597f 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/airflow.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/airflow.py @@ -39,7 +39,7 @@ from metadata.generated.schema.entity.services.connections.pipeline.airflowConne from metadata.generated.schema.metadataIngestion.workflow import ( Source as WorkflowSource, ) -from metadata.generated.schema.type.entityLineage import EntitiesEdge +from metadata.generated.schema.type.entityLineage import EntitiesEdge, LineageDetails from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.api.source import InvalidSourceException from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus @@ -404,46 +404,40 @@ class AirflowSource(PipelineServiceSource): :return: Lineage from inlets and outlets """ dag: SerializedDAG = self._build_dag(pipeline_details.data) + lineage_details = LineageDetails( + pipeline=EntityReference( + id=self.context.pipeline.id.__root__, type="pipeline" + ) + ) for task in dag.tasks: - for table_fqn in self.get_inlets(task) or []: - table_entity: Table = self.metadata.get_by_name( - entity=Table, fqn=table_fqn - ) - if table_entity: - yield AddLineageRequest( - edge=EntitiesEdge( - fromEntity=EntityReference( - id=table_entity.id, type="table" - ), - toEntity=EntityReference( - id=self.context.pipeline.id.__root__, type="pipeline" - ), - ) - ) + for from_fqn in self.get_inlets(task) or []: + from_entity = self.metadata.get_by_name(entity=Table, fqn=from_fqn) + if from_entity: + for to_fqn in self.get_outlets(task) or []: + to_entity = self.metadata.get_by_name(entity=Table, fqn=to_fqn) + if to_entity: + lineage = AddLineageRequest( + edge=EntitiesEdge( + fromEntity=EntityReference( + id=from_entity.id, type="table" + ), + toEntity=EntityReference( + id=to_entity.id, type="table" + ), + lineageDetails=lineage_details, + ) + ) + yield lineage + else: + logger.warning( + f"Could not find Table [{to_fqn}] from " + f"[{self.context.pipeline_entity.fullyQualifiedName.__root__}] outlets" + ) else: logger.warning( - f"Could not find Table [{table_fqn}] from " - f"[{self.context.pipeline.fullyQualifiedName.__root__}] inlets" - ) - - for table_fqn in self.get_outlets(task) or []: - table_entity: Table = self.metadata.get_by_name( - entity=Table, fqn=table_fqn - ) - if table_entity: - yield AddLineageRequest( - edge=EntitiesEdge( - fromEntity=EntityReference( - id=self.context.pipeline.id.__root__, type="pipeline" - ), - toEntity=EntityReference(id=table_entity.id, type="table"), - ) - ) - else: - logger.warning( - f"Could not find Table [{table_fqn}] from " - f"[{self.context.pipeline.fullyQualifiedName.__root__}] outlets" + f"Could not find Table [{from_fqn}] from " + f"[{self.context.pipeline_entity.fullyQualifiedName.__root__}] inlets" ) def close(self): diff --git a/ingestion/src/metadata/ingestion/source/pipeline/fivetran.py b/ingestion/src/metadata/ingestion/source/pipeline/fivetran.py index 4d174cdf04b..e6cacbeccad 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/fivetran.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/fivetran.py @@ -31,7 +31,7 @@ from metadata.generated.schema.entity.services.databaseService import DatabaseSe from metadata.generated.schema.metadataIngestion.workflow import ( Source as WorkflowSource, ) -from metadata.generated.schema.type.entityLineage import EntitiesEdge +from metadata.generated.schema.type.entityLineage import EntitiesEdge, LineageDetails from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.api.source import InvalidSourceException from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus @@ -170,20 +170,17 @@ class FivetranSource(PipelineServiceSource): if not from_entity or not to_entity: logger.info(f"Lineage Skipped for {from_fqn} - {to_fqn}") continue + lineage_details = LineageDetails( + pipeline=EntityReference( + id=self.context.pipeline.id.__root__, type="pipeline" + ) + ) + yield AddLineageRequest( edge=EntitiesEdge( fromEntity=EntityReference(id=from_entity.id, type="table"), - toEntity=EntityReference( - id=self.context.pipeline.id.__root__, type="pipeline" - ), - ) - ) - yield AddLineageRequest( - edge=EntitiesEdge( toEntity=EntityReference(id=to_entity.id, type="table"), - fromEntity=EntityReference( - id=self.context.pipeline.id.__root__, type="pipeline" - ), + lineageDetails=lineage_details, ) ) diff --git a/ingestion/src/metadata/utils/credentials.py b/ingestion/src/metadata/utils/credentials.py index 36b32e4c359..94630240c1a 100644 --- a/ingestion/src/metadata/utils/credentials.py +++ b/ingestion/src/metadata/utils/credentials.py @@ -11,6 +11,7 @@ """ Credentials helper module """ +import base64 import json import os import tempfile @@ -130,3 +131,12 @@ def set_google_credentials(gcs_credentials: GCSCredentials) -> None: f"Error trying to set GCS credentials with {gcs_credentials}." " Check https://docs.open-metadata.org/openmetadata/connectors/database/bigquery " ) + + +def generate_http_basic_token(username, password): + """ + Generates a HTTP basic token from username and password + Returns a token string (not a byte) + """ + token = base64.b64encode(f"{username}:{password}".encode("utf-8")).decode("utf-8") + return token diff --git a/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/pipeline/airbyteConnection.json b/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/pipeline/airbyteConnection.json index 035d0ca01e0..867292cfe4e 100644 --- a/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/pipeline/airbyteConnection.json +++ b/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/pipeline/airbyteConnection.json @@ -26,6 +26,17 @@ "type": "string", "format": "uri" }, + "username": { + "title": "Username", + "description": "Username to connect to Airbyte.", + "type": "string" + }, + "password": { + "title": "Password", + "description": "Password to connect to Airbyte.", + "type": "string", + "format": "password" + }, "supportsMetadataExtraction": { "title": "Supports Metadata Extraction", "$ref": "../connectionBasicType.json#/definitions/supportsMetadataExtraction"