diff --git a/docs/install/metadata-ingestion/airflow.md b/docs/install/metadata-ingestion/airflow.md index 87737abed55..b8e26a25ef5 100644 --- a/docs/install/metadata-ingestion/airflow.md +++ b/docs/install/metadata-ingestion/airflow.md @@ -1,2 +1,68 @@ # Airflow +We highly recommend using Airflow or similar schedulers to run Metadata Connectors. +Below is the sample code example you can refer to integrate with Airflow + + +## Airflow Example for Hive + +```py +from datetime import timedelta +from airflow import DAG + +try: + from airflow.operators.python import PythonOperator +except ModuleNotFoundError: + from airflow.operators.python_operator import PythonOperator + +from metadata.config.common import load_config_file +from metadata.ingestion.api.workflow import Workflow + +default_args = { + "owner": "user_name", + "email": ["username@org.com"], + "email_on_failure": True, + "retries": 3, + "retry_delay": timedelta(minutes=5), + "execution_timeout": timedelta(minutes=60), +} + + +def metadata_ingestion_workflow(): + config = load_config_file("examples/workflows/hive.json") + workflow = Workflow.create(config) + workflow.run() + workflow.raise_from_status() + workflow.print_status() + workflow.stop() + + +with DAG( + "hive_metadata_ingestion_workflow" + default_args=default_args, + description="An example DAG which runs a OpenMetadata ingestion workflow", + schedule_interval=timedelta(days=1), + start_date=days_ago(30), + catchup=False, +) as dag: + ingest_task = PythonOperator( + task_id="ingest_using_recipe", + python_callable=metadata_ingestion_workflow(), + ) +``` + +we are using a python method like below + +```py +def metadata_ingestion_workflow(): + config = load_config_file("examples/workflows/hive.json") + workflow = Workflow.create(config) + workflow.run() + workflow.raise_from_status() + workflow.print_status() + workflow.stop() +``` + +Create a Worfklow instance and pass a hive configuration which will read metadata from Hive +and ingest into OpenMetadata Server. You can customize this configuration or add different connectors please refer to our [examples](https://github.com/open-metadata/OpenMetadata/tree/main/ingestion/examples/workflows) and refer to [Metadata Connectors]( + diff --git a/ingestion/examples/airflow/sample_tables_airflow_example.py b/ingestion/examples/airflow/sample_tables_airflow_example.py new file mode 100644 index 00000000000..e3ab749a847 --- /dev/null +++ b/ingestion/examples/airflow/sample_tables_airflow_example.py @@ -0,0 +1,57 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from datetime import timedelta +from airflow import DAG + +try: + from airflow.operators.python import PythonOperator +except ModuleNotFoundError: + from airflow.operators.python_operator import PythonOperator + +from metadata.config.common import load_config_file +from metadata.ingestion.api.workflow import Workflow + +default_args = { + "owner": "user_name", + "email": ["username@org.com"], + "email_on_failure": True, + "retries": 3, + "retry_delay": timedelta(minutes=5), + "execution_timeout": timedelta(minutes=60), +} + + +def metadata_ingestion_workflow(): + config = load_config_file("examples/workflows/hive.json") + workflow = Workflow.create(config) + workflow.run() + workflow.raise_from_status() + workflow.print_status() + workflow.stop() + + +with DAG( + "hive_metadata_ingestion_workflow" + default_args=default_args, + description="An example DAG which runs a OpenMetadata ingestion workflow", + schedule_interval=timedelta(days=1), + start_date=days_ago(1), + catchup=False, +) as dag: + ingest_task = PythonOperator( + task_id="ingest_using_recipe", + python_callable=metadata_ingestion_workflow(), + ) \ No newline at end of file diff --git a/ingestion/ingestion_scheduler/jobs.py b/ingestion/ingestion_scheduler/jobs.py index 5c53921914a..eb8c8b3aadd 100644 --- a/ingestion/ingestion_scheduler/jobs.py +++ b/ingestion/ingestion_scheduler/jobs.py @@ -14,7 +14,7 @@ # limitations under the License. import json -from metadata.ingestion.workflow.workflow import Workflow +from metadata.ingestion.api.workflow import Workflow from simplescheduler.corescheduler import job diff --git a/ingestion/src/metadata/check/__init__.py b/ingestion/src/metadata/check/__init__.py deleted file mode 100644 index dc408941a6d..00000000000 --- a/ingestion/src/metadata/check/__init__.py +++ /dev/null @@ -1,15 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - diff --git a/ingestion/src/metadata/cmd.py b/ingestion/src/metadata/cmd.py index 60d8da43aaf..90a0672ea21 100644 --- a/ingestion/src/metadata/cmd.py +++ b/ingestion/src/metadata/cmd.py @@ -21,8 +21,8 @@ import sys import click from pydantic import ValidationError -from metadata.config.config_loader import load_config_file -from metadata.ingestion.workflow.workflow import Workflow +from metadata.config.common import load_config_file +from metadata.ingestion.api.workflow import Workflow logger = logging.getLogger(__name__) @@ -34,10 +34,12 @@ BASE_LOGGING_FORMAT = ( ) logging.basicConfig(format=BASE_LOGGING_FORMAT) + @click.group() def check() -> None: pass + @click.group() @click.option("--debug/--no-debug", default=False) def metadata(debug: bool) -> None: diff --git a/ingestion/src/metadata/config/common.py b/ingestion/src/metadata/config/common.py index 419c264df27..f95e8219bd1 100644 --- a/ingestion/src/metadata/config/common.py +++ b/ingestion/src/metadata/config/common.py @@ -12,11 +12,13 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -import re +import pathlib from abc import ABC, abstractmethod -from typing import IO, Any, List, Optional +from typing import IO, Any, Optional from pydantic import BaseModel +from expandvars import expandvars +import io +import json class ConfigModel(BaseModel): @@ -26,34 +28,14 @@ class ConfigModel(BaseModel): class DynamicTypedConfig(ConfigModel): type: str - # This config type is declared Optional[Any] here. The eventual parser for the - # specified type is responsible for further validation. config: Optional[Any] -class MetaError(Exception): - """A base class for all meta exceptions""" - - -class WorkflowExecutionError(MetaError): +class WorkflowExecutionError(Exception): """An error occurred when executing the workflow""" -class OperationalError(WorkflowExecutionError): - """An error occurred because of client-provided metadata""" - - message: str - info: dict - - def __init__(self, message: str, info: dict = None): - self.message = message - if info: - self.info = info - else: - self.info = {} - - -class ConfigurationError(MetaError): +class ConfigurationError(Exception): """A configuration error has happened""" @@ -63,23 +45,23 @@ class ConfigurationMechanism(ABC): pass -class AllowDenyPattern(ConfigModel): - """ A class to store allow deny regexes""" +def load_config_file(config_file: pathlib.Path) -> dict: + if not config_file.is_file(): + raise ConfigurationError(f"Cannot open config file {config_file}") - allow: List[str] = [".*"] - deny: List[str] = [] + if config_file.suffix not in [".json"]: + raise ConfigurationError( + "Only json are supported. Cannot process file type {}".format( + config_file.suffix + ) + ) - @classmethod - def allow_all(cls): - return AllowDenyPattern() + with config_file.open() as raw_config_file: + raw_config = raw_config_file.read() - def allowed(self, string: str) -> bool: - for deny_pattern in self.deny: - if re.match(deny_pattern, string): - return False + expanded_config_file = expandvars(raw_config, nounset=True) + config_fp = io.StringIO(expanded_config_file) + config = json.load(config_fp) - for allow_pattern in self.allow: - if re.match(allow_pattern, string): - return True + return config - return False diff --git a/ingestion/src/metadata/config/config_loader.py b/ingestion/src/metadata/config/config_loader.py deleted file mode 100644 index f778a941fee..00000000000 --- a/ingestion/src/metadata/config/config_loader.py +++ /dev/null @@ -1,45 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import io -import pathlib -from expandvars import expandvars - -from .common import ConfigurationError, ConfigurationMechanism -from .json import JsonConfigurationMechanism - - -def load_config_file(config_file: pathlib.Path) -> dict: - if not config_file.is_file(): - raise ConfigurationError(f"Cannot open config file {config_file}") - - config_mech: ConfigurationMechanism - if config_file.suffix in [".json"]: - config_mech = JsonConfigurationMechanism() - else: - raise ConfigurationError( - "Only json are supported. Cannot process file type {}".format( - config_file.suffix - ) - ) - - with config_file.open() as raw_config_fp: - raw_config_file = raw_config_fp.read() - - expanded_config_file = expandvars(raw_config_file, nounset=True) - config_fp = io.StringIO(expanded_config_file) - config = config_mech.load_config(config_fp) - - return config diff --git a/ingestion/src/metadata/config/json.py b/ingestion/src/metadata/config/json.py deleted file mode 100644 index 10445083627..00000000000 --- a/ingestion/src/metadata/config/json.py +++ /dev/null @@ -1,27 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from typing import IO - -import json -from .common import ConfigurationMechanism - - -class JsonConfigurationMechanism(ConfigurationMechanism): - """Ability to load configuration from yaml files""" - - def load_config(self, config_fp: IO): - config = json.load(config_fp) - return config diff --git a/ingestion/src/metadata/ingestion/workflow/workflow.py b/ingestion/src/metadata/ingestion/api/workflow.py similarity index 98% rename from ingestion/src/metadata/ingestion/workflow/workflow.py rename to ingestion/src/metadata/ingestion/api/workflow.py index d02e6c15e4b..26ed464e6e3 100644 --- a/ingestion/src/metadata/ingestion/workflow/workflow.py +++ b/ingestion/src/metadata/ingestion/api/workflow.py @@ -54,7 +54,9 @@ class Workflow: ctx: WorkflowContext source: Source processor: Processor + stage: Stage sink: Sink + bulk_sink: BulkSink report = {} def __init__(self, config: WorkflowConfig): @@ -112,7 +114,7 @@ class Workflow: return MyClass @classmethod - def create(cls, config_dict: dict) -> "Pipeline": + def create(cls, config_dict: dict) -> "Workflow": config = WorkflowConfig.parse_obj(config_dict) return cls(config) diff --git a/ingestion/src/metadata/ingestion/workflow/__init__.py b/ingestion/src/metadata/ingestion/workflow/__init__.py deleted file mode 100644 index dc408941a6d..00000000000 --- a/ingestion/src/metadata/ingestion/workflow/__init__.py +++ /dev/null @@ -1,15 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -