Fixes #11955: Modify URL to register with Airflow Lineage Backend (#11956)

* fix: include host_port in url

* fix: run make py_format
This commit is contained in:
YAMADA Yutaka 2023-06-14 21:06:01 +09:00 committed by GitHub
parent d409c339e4
commit 63e8aa57ce
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -47,7 +47,7 @@ from metadata.generated.schema.type.entityLineage import EntitiesEdge, LineageDe
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.pipeline.airflow.lineage_parser import XLets
from metadata.utils.helpers import datetime_to_ts
from metadata.utils.helpers import clean_uri, datetime_to_ts
class SimpleEdge(BaseModel):
@ -93,6 +93,8 @@ class AirflowLineageRunner:
self.dag = dag
self.xlets = xlets
self.host_port = conf.get("webserver", "base_url")
def get_or_create_pipeline_service(self) -> PipelineService:
"""
Fetch the Pipeline Service from OM. If it does not exist,
@ -111,7 +113,7 @@ class AirflowLineageRunner:
serviceType=PipelineServiceType.Airflow,
connection=PipelineConnection(
config=AirflowConnection(
hostPort=conf.get("webserver", "base_url"),
hostPort=self.host_port,
connection=BackendConnection(),
),
),
@ -124,7 +126,10 @@ class AirflowLineageRunner:
return pipeline_service
def get_task_url(self, task: "Operator"):
return f"/taskinstance/list/?flt1_dag_id_equals={self.dag.dag_id}&_flt_3_task_id={task.task_id}"
return (
f"{clean_uri(self.host_port)}/taskinstance/list/"
f"?flt1_dag_id_equals={self.dag.dag_id}&_flt_3_task_id={task.task_id}"
)
def get_om_tasks(self) -> List[Task]:
"""
@ -153,7 +158,7 @@ class AirflowLineageRunner:
pipeline_request = CreatePipelineRequest(
name=self.dag.dag_id,
description=self.dag.description,
pipelineUrl=f"/tree?dag_id={self.dag.dag_id}",
pipelineUrl=f"{clean_uri(self.host_port)}/tree?dag_id={self.dag.dag_id}",
concurrency=self.dag.max_active_tasks,
pipelineLocation=self.dag.fileloc,
startDate=self.dag.start_date.isoformat() if self.dag.start_date else None,