mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-07-04 15:45:42 +00:00
115 lines
3.6 KiB
Python
115 lines
3.6 KiB
Python
# Copyright 2025 Collate
|
|
# Licensed under the Collate Community License, Version 1.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
"""
|
|
Main ingestion entrypoint to run OM workflows
|
|
"""
|
|
import os
|
|
|
|
import yaml
|
|
|
|
from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import (
|
|
PipelineType,
|
|
)
|
|
from metadata.generated.schema.metadataIngestion.workflow import LogLevels
|
|
from metadata.utils.logger import set_loggers_level
|
|
from metadata.workflow.data_quality import TestSuiteWorkflow
|
|
from metadata.workflow.metadata import MetadataWorkflow
|
|
from metadata.workflow.profiler import ProfilerWorkflow
|
|
from metadata.workflow.usage import UsageWorkflow
|
|
|
|
WORKFLOW_MAP = {
|
|
PipelineType.metadata.value: MetadataWorkflow,
|
|
PipelineType.usage.value: UsageWorkflow,
|
|
PipelineType.lineage.value: MetadataWorkflow,
|
|
PipelineType.profiler.value: ProfilerWorkflow,
|
|
PipelineType.TestSuite.value: TestSuiteWorkflow,
|
|
PipelineType.elasticSearchReindex.value: MetadataWorkflow,
|
|
PipelineType.dbt.value: MetadataWorkflow,
|
|
}
|
|
|
|
|
|
def main():
|
|
"""
|
|
Ingestion entrypoint. Get the right Workflow class
|
|
and execute the ingestion.
|
|
|
|
This image is expected to be used and run in environments
|
|
such as Airflow's KubernetesPodOperator:
|
|
|
|
```
|
|
config = '''
|
|
source:
|
|
type: ...
|
|
serviceName: ...
|
|
serviceConnection:
|
|
...
|
|
sourceConfig:
|
|
...
|
|
sink:
|
|
...
|
|
workflowConfig:
|
|
...
|
|
'''
|
|
|
|
KubernetesPodOperator(
|
|
task_id="ingest",
|
|
name="ingest",
|
|
cmds=["python", "main.py"],
|
|
image="openmetadata/ingestion-base:0.13.2",
|
|
namespace='default',
|
|
env_vars={"config": config, "pipelineType": "metadata"},
|
|
dag=dag,
|
|
)
|
|
```
|
|
|
|
Note how we are expecting the env variables to be sent, with the `config` being the str
|
|
representation of the ingestion YAML.
|
|
|
|
We will also set the `pipelineRunId` value if it comes from the environment.
|
|
"""
|
|
|
|
# DockerOperator expects an env var called config
|
|
config = os.getenv("config")
|
|
if not config:
|
|
raise RuntimeError(
|
|
"Missing environment variable `config`. This is needed to configure the Workflow."
|
|
)
|
|
|
|
pipeline_type = os.getenv("pipelineType")
|
|
if not pipeline_type:
|
|
raise RuntimeError(
|
|
"Missing environment variable `pipelineType`. This is needed to load the Workflow class."
|
|
)
|
|
|
|
pipeline_run_id = os.getenv("pipelineRunId")
|
|
|
|
workflow_class = WORKFLOW_MAP.get(pipeline_type)
|
|
if workflow_class is None:
|
|
raise ValueError(f"Missing workflow_class loaded from {pipeline_type}")
|
|
|
|
# Load the config string representation
|
|
workflow_config = yaml.safe_load(config)
|
|
if pipeline_run_id:
|
|
workflow_config["pipelineRunId"] = pipeline_run_id
|
|
|
|
logger_level = workflow_config.get("workflowConfig", {}).get("loggerLevel")
|
|
set_loggers_level(logger_level or LogLevels.INFO.value)
|
|
|
|
workflow = workflow_class.create(workflow_config)
|
|
workflow.execute()
|
|
workflow.raise_from_status()
|
|
workflow.print_status()
|
|
workflow.stop()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|