diff --git a/ingestion/pipelines/metadata_to_es.yaml b/ingestion/pipelines/metadata_to_es.yaml new file mode 100644 index 00000000000..3b704b7982e --- /dev/null +++ b/ingestion/pipelines/metadata_to_es.yaml @@ -0,0 +1,18 @@ +source: + type: metadata_elasticsearch + serviceName: openMetadata + serviceConnection: + config: + type: MetadataES + sourceConfig: + config: {} +sink: + type: elasticsearch + config: + es_host: localhost + es_port: 9200 + recreate_indexes: true +workflowConfig: + openMetadataServerConfig: + hostPort: 'http://localhost:8585/api' + authProvider: no-auth diff --git a/ingestion/src/metadata/ingestion/api/workflow.py b/ingestion/src/metadata/ingestion/api/workflow.py index 6554fc2af84..dbbef9e6c5a 100644 --- a/ingestion/src/metadata/ingestion/api/workflow.py +++ b/ingestion/src/metadata/ingestion/api/workflow.py @@ -39,6 +39,8 @@ logger = ingestion_logger() T = TypeVar("T") +SIMPLE_SOURCE = {"sample-data", "sample-usage", "metadata_elasticsearch"} + class InvalidWorkflowJSONException(Exception): """ @@ -271,4 +273,4 @@ class Workflow: @staticmethod def _is_sample_source(service_type): - return service_type == "sample-data" or service_type == "sample-usage" + return service_type in SIMPLE_SOURCE diff --git a/ingestion/src/metadata/ingestion/source/metadata/metadata.py b/ingestion/src/metadata/ingestion/source/metadata/metadata.py index 97d27278a52..6706d5ddf57 100644 --- a/ingestion/src/metadata/ingestion/source/metadata/metadata.py +++ b/ingestion/src/metadata/ingestion/source/metadata/metadata.py @@ -74,6 +74,7 @@ class MetadataSource(Source[Entity]): self.config = config self.metadata_config = metadata_config self.metadata = OpenMetadata(metadata_config) + self.service_connection = config.serviceConnection.__root__.config self.status = MetadataSourceStatus() self.wrote_something = False self.tables = None diff --git a/ingestion/src/metadata/utils/class_helper.py b/ingestion/src/metadata/utils/class_helper.py index 99d16e176df..783488e0a06 100644 --- a/ingestion/src/metadata/utils/class_helper.py +++ b/ingestion/src/metadata/utils/class_helper.py @@ -4,6 +4,9 @@ from typing import Type from pydantic import BaseModel from metadata.generated.schema.entity.services.serviceType import ServiceType +from metadata.utils.logger import utils_logger + +logger = utils_logger() def _clean(source_type: str): @@ -12,6 +15,8 @@ def _clean(source_type: str): source_type = source_type.replace("_", "") if source_type == "sample": source_type = "sampledata" + if source_type == "metadataelasticsearch": + source_type = "metadataes" return source_type