Ingestion: Refactor, docs, airflow example

This commit is contained in:
Suresh Srinivas 2021-08-14 10:18:52 -07:00
parent ca5869c277
commit 8256de30e9
10 changed files with 153 additions and 146 deletions

View File

@ -1,2 +1,68 @@
# Airflow
We highly recommend using Airflow or similar schedulers to run Metadata Connectors.
Below is the sample code example you can refer to integrate with Airflow
## Airflow Example for Hive
```py
from datetime import timedelta
from airflow import DAG
try:
from airflow.operators.python import PythonOperator
except ModuleNotFoundError:
from airflow.operators.python_operator import PythonOperator
from metadata.config.common import load_config_file
from metadata.ingestion.api.workflow import Workflow
default_args = {
"owner": "user_name",
"email": ["username@org.com"],
"email_on_failure": True,
"retries": 3,
"retry_delay": timedelta(minutes=5),
"execution_timeout": timedelta(minutes=60),
}
def metadata_ingestion_workflow():
config = load_config_file("examples/workflows/hive.json")
workflow = Workflow.create(config)
workflow.run()
workflow.raise_from_status()
workflow.print_status()
workflow.stop()
with DAG(
"hive_metadata_ingestion_workflow"
default_args=default_args,
description="An example DAG which runs a OpenMetadata ingestion workflow",
schedule_interval=timedelta(days=1),
start_date=days_ago(30),
catchup=False,
) as dag:
ingest_task = PythonOperator(
task_id="ingest_using_recipe",
python_callable=metadata_ingestion_workflow(),
)
```
we are using a python method like below
```py
def metadata_ingestion_workflow():
config = load_config_file("examples/workflows/hive.json")
workflow = Workflow.create(config)
workflow.run()
workflow.raise_from_status()
workflow.print_status()
workflow.stop()
```
Create a Worfklow instance and pass a hive configuration which will read metadata from Hive
and ingest into OpenMetadata Server. You can customize this configuration or add different connectors please refer to our [examples](https://github.com/open-metadata/OpenMetadata/tree/main/ingestion/examples/workflows) and refer to [Metadata Connectors](

View File

@ -0,0 +1,57 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from datetime import timedelta
from airflow import DAG
try:
from airflow.operators.python import PythonOperator
except ModuleNotFoundError:
from airflow.operators.python_operator import PythonOperator
from metadata.config.common import load_config_file
from metadata.ingestion.api.workflow import Workflow
default_args = {
"owner": "user_name",
"email": ["username@org.com"],
"email_on_failure": True,
"retries": 3,
"retry_delay": timedelta(minutes=5),
"execution_timeout": timedelta(minutes=60),
}
def metadata_ingestion_workflow():
config = load_config_file("examples/workflows/hive.json")
workflow = Workflow.create(config)
workflow.run()
workflow.raise_from_status()
workflow.print_status()
workflow.stop()
with DAG(
"hive_metadata_ingestion_workflow"
default_args=default_args,
description="An example DAG which runs a OpenMetadata ingestion workflow",
schedule_interval=timedelta(days=1),
start_date=days_ago(1),
catchup=False,
) as dag:
ingest_task = PythonOperator(
task_id="ingest_using_recipe",
python_callable=metadata_ingestion_workflow(),
)

View File

@ -14,7 +14,7 @@
# limitations under the License.
import json
from metadata.ingestion.workflow.workflow import Workflow
from metadata.ingestion.api.workflow import Workflow
from simplescheduler.corescheduler import job

View File

@ -1,15 +0,0 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

View File

@ -21,8 +21,8 @@ import sys
import click
from pydantic import ValidationError
from metadata.config.config_loader import load_config_file
from metadata.ingestion.workflow.workflow import Workflow
from metadata.config.common import load_config_file
from metadata.ingestion.api.workflow import Workflow
logger = logging.getLogger(__name__)
@ -34,10 +34,12 @@ BASE_LOGGING_FORMAT = (
)
logging.basicConfig(format=BASE_LOGGING_FORMAT)
@click.group()
def check() -> None:
pass
@click.group()
@click.option("--debug/--no-debug", default=False)
def metadata(debug: bool) -> None:

View File

@ -12,11 +12,13 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import re
import pathlib
from abc import ABC, abstractmethod
from typing import IO, Any, List, Optional
from typing import IO, Any, Optional
from pydantic import BaseModel
from expandvars import expandvars
import io
import json
class ConfigModel(BaseModel):
@ -26,34 +28,14 @@ class ConfigModel(BaseModel):
class DynamicTypedConfig(ConfigModel):
type: str
# This config type is declared Optional[Any] here. The eventual parser for the
# specified type is responsible for further validation.
config: Optional[Any]
class MetaError(Exception):
"""A base class for all meta exceptions"""
class WorkflowExecutionError(MetaError):
class WorkflowExecutionError(Exception):
"""An error occurred when executing the workflow"""
class OperationalError(WorkflowExecutionError):
"""An error occurred because of client-provided metadata"""
message: str
info: dict
def __init__(self, message: str, info: dict = None):
self.message = message
if info:
self.info = info
else:
self.info = {}
class ConfigurationError(MetaError):
class ConfigurationError(Exception):
"""A configuration error has happened"""
@ -63,23 +45,23 @@ class ConfigurationMechanism(ABC):
pass
class AllowDenyPattern(ConfigModel):
""" A class to store allow deny regexes"""
def load_config_file(config_file: pathlib.Path) -> dict:
if not config_file.is_file():
raise ConfigurationError(f"Cannot open config file {config_file}")
allow: List[str] = [".*"]
deny: List[str] = []
if config_file.suffix not in [".json"]:
raise ConfigurationError(
"Only json are supported. Cannot process file type {}".format(
config_file.suffix
)
)
@classmethod
def allow_all(cls):
return AllowDenyPattern()
with config_file.open() as raw_config_file:
raw_config = raw_config_file.read()
def allowed(self, string: str) -> bool:
for deny_pattern in self.deny:
if re.match(deny_pattern, string):
return False
expanded_config_file = expandvars(raw_config, nounset=True)
config_fp = io.StringIO(expanded_config_file)
config = json.load(config_fp)
for allow_pattern in self.allow:
if re.match(allow_pattern, string):
return True
return config
return False

View File

@ -1,45 +0,0 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import io
import pathlib
from expandvars import expandvars
from .common import ConfigurationError, ConfigurationMechanism
from .json import JsonConfigurationMechanism
def load_config_file(config_file: pathlib.Path) -> dict:
if not config_file.is_file():
raise ConfigurationError(f"Cannot open config file {config_file}")
config_mech: ConfigurationMechanism
if config_file.suffix in [".json"]:
config_mech = JsonConfigurationMechanism()
else:
raise ConfigurationError(
"Only json are supported. Cannot process file type {}".format(
config_file.suffix
)
)
with config_file.open() as raw_config_fp:
raw_config_file = raw_config_fp.read()
expanded_config_file = expandvars(raw_config_file, nounset=True)
config_fp = io.StringIO(expanded_config_file)
config = config_mech.load_config(config_fp)
return config

View File

@ -1,27 +0,0 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import IO
import json
from .common import ConfigurationMechanism
class JsonConfigurationMechanism(ConfigurationMechanism):
"""Ability to load configuration from yaml files"""
def load_config(self, config_fp: IO):
config = json.load(config_fp)
return config

View File

@ -54,7 +54,9 @@ class Workflow:
ctx: WorkflowContext
source: Source
processor: Processor
stage: Stage
sink: Sink
bulk_sink: BulkSink
report = {}
def __init__(self, config: WorkflowConfig):
@ -112,7 +114,7 @@ class Workflow:
return MyClass
@classmethod
def create(cls, config_dict: dict) -> "Pipeline":
def create(cls, config_dict: dict) -> "Workflow":
config = WorkflowConfig.parse_obj(config_dict)
return cls(config)

View File

@ -1,15 +0,0 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.