diff --git a/ingestion/src/airflow_provider_openmetadata/lineage/runner.py b/ingestion/src/airflow_provider_openmetadata/lineage/runner.py index f6375335716..557838e3279 100644 --- a/ingestion/src/airflow_provider_openmetadata/lineage/runner.py +++ b/ingestion/src/airflow_provider_openmetadata/lineage/runner.py @@ -47,7 +47,7 @@ from metadata.generated.schema.type.entityLineage import EntitiesEdge, LineageDe from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.pipeline.airflow.lineage_parser import XLets -from metadata.utils.helpers import datetime_to_ts +from metadata.utils.helpers import clean_uri, datetime_to_ts class SimpleEdge(BaseModel): @@ -93,6 +93,8 @@ class AirflowLineageRunner: self.dag = dag self.xlets = xlets + self.host_port = conf.get("webserver", "base_url") + def get_or_create_pipeline_service(self) -> PipelineService: """ Fetch the Pipeline Service from OM. If it does not exist, @@ -111,7 +113,7 @@ class AirflowLineageRunner: serviceType=PipelineServiceType.Airflow, connection=PipelineConnection( config=AirflowConnection( - hostPort=conf.get("webserver", "base_url"), + hostPort=self.host_port, connection=BackendConnection(), ), ), @@ -124,7 +126,10 @@ class AirflowLineageRunner: return pipeline_service def get_task_url(self, task: "Operator"): - return f"/taskinstance/list/?flt1_dag_id_equals={self.dag.dag_id}&_flt_3_task_id={task.task_id}" + return ( + f"{clean_uri(self.host_port)}/taskinstance/list/" + f"?flt1_dag_id_equals={self.dag.dag_id}&_flt_3_task_id={task.task_id}" + ) def get_om_tasks(self) -> List[Task]: """ @@ -153,7 +158,7 @@ class AirflowLineageRunner: pipeline_request = CreatePipelineRequest( name=self.dag.dag_id, description=self.dag.description, - pipelineUrl=f"/tree?dag_id={self.dag.dag_id}", + pipelineUrl=f"{clean_uri(self.host_port)}/tree?dag_id={self.dag.dag_id}", concurrency=self.dag.max_active_tasks, pipelineLocation=self.dag.fileloc, startDate=self.dag.start_date.isoformat() if self.dag.start_date else None,