Fix #466: Fix airflow example dag (#467)

This commit is contained in:
Sriharsha Chintalapani 2021-09-10 22:28:23 -07:00 committed by GitHub
parent 480ff3e780
commit ed698dc40f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -12,7 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pathlib
from datetime import timedelta
from airflow import DAG
@ -23,28 +23,32 @@ except ModuleNotFoundError:
from metadata.config.common import load_config_file
from metadata.ingestion.api.workflow import Workflow
from airflow.utils.dates import days_ago
default_args = {
"owner": "user_name",
"email": ["username@org.com"],
"email_on_failure": True,
"email_on_failure": False,
"retries": 3,
"retry_delay": timedelta(minutes=5),
"execution_timeout": timedelta(minutes=60),
"execution_timeout": timedelta(minutes=60)
}
def metadata_ingestion_workflow():
config = load_config_file("examples/workflows/hive.json")
workflow = Workflow.create(config)
workflow.run()
config_file = pathlib.Path("/tmp/sample_data.json")
workflow_config = load_config_file(config_file)
if workflow_config.get('cron'):
del workflow_config['cron']
workflow = Workflow.create(workflow_config)
workflow.execute()
workflow.raise_from_status()
workflow.print_status()
workflow.stop()
with DAG(
"hive_metadata_ingestion_workflow"
"sample_metadata_ingestion_workflow",
default_args=default_args,
description="An example DAG which runs a OpenMetadata ingestion workflow",
schedule_interval=timedelta(days=1),
@ -53,5 +57,5 @@ with DAG(
) as dag:
ingest_task = PythonOperator(
task_id="ingest_using_recipe",
python_callable=metadata_ingestion_workflow(),
python_callable=metadata_ingestion_workflow,
)