Fix OpenLineage ingestor (#16416)

* Fix OpenLineage ingestor

* py format

---------

Co-authored-by: ulixius9 <mayursingal9@gmail.com>
This commit is contained in:
Maxim Martynov 2024-05-27 11:17:01 +03:00 committed by GitHub
parent 732a137eb6
commit 18b2a22958
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -77,7 +77,9 @@ class OpenlineageSource(PipelineServiceSource):
"""
@classmethod
def create(cls, config_dict, metadata: OpenMetadata):
def create(
cls, config_dict, metadata: OpenMetadata, pipeline_name: Optional[str] = None
):
"""Create class instance"""
config: WorkflowSource = WorkflowSource.parse_obj(config_dict)
connection: OpenLineageConnection = config.serviceConnection.__root__.config
@ -379,7 +381,7 @@ class OpenlineageSource(PipelineServiceSource):
{json.dumps(pipeline_details.run_facet, indent=4).strip()}```"""
request = CreatePipelineRequest(
name=pipeline_name,
service=self.context.pipeline_service,
service=self.context.get().pipeline_service,
description=description,
)
@ -433,8 +435,8 @@ class OpenlineageSource(PipelineServiceSource):
pipeline_fqn = fqn.build(
metadata=self.metadata,
entity_type=Pipeline,
service_name=self.context.pipeline_service,
pipeline_name=self.context.pipeline,
service_name=self.context.get().pipeline_service,
pipeline_name=self.context.get().pipeline,
)
pipeline_entity = self.metadata.get_by_name(entity=Pipeline, fqn=pipeline_fqn)