diff --git a/ingestion/src/metadata/ingestion/source/pipeline/openlineage/metadata.py b/ingestion/src/metadata/ingestion/source/pipeline/openlineage/metadata.py index 2e1b63a36a5..9804cce2690 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/openlineage/metadata.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/openlineage/metadata.py @@ -77,7 +77,9 @@ class OpenlineageSource(PipelineServiceSource): """ @classmethod - def create(cls, config_dict, metadata: OpenMetadata): + def create( + cls, config_dict, metadata: OpenMetadata, pipeline_name: Optional[str] = None + ): """Create class instance""" config: WorkflowSource = WorkflowSource.parse_obj(config_dict) connection: OpenLineageConnection = config.serviceConnection.__root__.config @@ -379,7 +381,7 @@ class OpenlineageSource(PipelineServiceSource): {json.dumps(pipeline_details.run_facet, indent=4).strip()}```""" request = CreatePipelineRequest( name=pipeline_name, - service=self.context.pipeline_service, + service=self.context.get().pipeline_service, description=description, ) @@ -433,8 +435,8 @@ class OpenlineageSource(PipelineServiceSource): pipeline_fqn = fqn.build( metadata=self.metadata, entity_type=Pipeline, - service_name=self.context.pipeline_service, - pipeline_name=self.context.pipeline, + service_name=self.context.get().pipeline_service, + pipeline_name=self.context.get().pipeline, ) pipeline_entity = self.metadata.get_by_name(entity=Pipeline, fqn=pipeline_fqn)