From cdb14f30ffbb5aee463cb6e5891fe8540ad4fa36 Mon Sep 17 00:00:00 2001 From: Pere Miquel Brull Date: Mon, 20 Mar 2023 18:14:27 +0100 Subject: [PATCH] Create stage location in the workflow, not the apis (#10676) * Create stage location in the workflow, not the apis * Format --- .../metadata/ingestion/stage/table_usage.py | 21 +++++++++++++------ .../workflows/ingestion/data_insight.py | 1 - .../workflows/ingestion/usage.py | 5 ----- 3 files changed, 15 insertions(+), 12 deletions(-) diff --git a/ingestion/src/metadata/ingestion/stage/table_usage.py b/ingestion/src/metadata/ingestion/stage/table_usage.py index a9ec8006304..d4b3fa074a0 100644 --- a/ingestion/src/metadata/ingestion/stage/table_usage.py +++ b/ingestion/src/metadata/ingestion/stage/table_usage.py @@ -17,6 +17,7 @@ import json import os import shutil import traceback +from pathlib import Path from metadata.config.common import ConfigModel from metadata.generated.schema.api.data.createQuery import CreateQueryRequest @@ -61,12 +62,9 @@ class TableUsageStage(Stage[QueryParserData]): self.status = StageStatus() self.table_usage = {} self.table_queries = {} - isdir = os.path.isdir(self.config.filename) - if not isdir: - os.mkdir(self.config.filename) - else: - shutil.rmtree(self.config.filename) - os.mkdir(self.config.filename) + + self.init_location() + self.wrote_something = False @classmethod @@ -74,6 +72,17 @@ class TableUsageStage(Stage[QueryParserData]): config = TableStageConfig.parse_obj(config_dict) return cls(config, metadata_config) + def init_location(self) -> None: + """ + Prepare the usage location + """ + location = Path(self.config.filename) + if location.is_dir(): + logger.info("Location exists, cleaning it up") + shutil.rmtree(self.config.filename) + logger.info(f"Creating the directory to store staging data in {location}") + location.mkdir(parents=True, exist_ok=True) + def _get_user_entity(self, username: str): if username: user = self.metadata.get_by_name(entity=User, fqn=username) diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/data_insight.py b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/data_insight.py index 4df804b4fe5..229740a4752 100644 --- a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/data_insight.py +++ b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/data_insight.py @@ -12,7 +12,6 @@ Data Insights DAG function builder """ import json -from typing import cast from airflow import DAG from openmetadata_managed_apis.utils.logger import set_operator_logger diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/usage.py b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/usage.py index b7912012327..437cb00e57c 100644 --- a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/usage.py +++ b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/usage.py @@ -12,7 +12,6 @@ Metadata DAG function builder """ import tempfile -from pathlib import Path from airflow import DAG from openmetadata_managed_apis.workflows.ingestion.common import ( @@ -76,10 +75,6 @@ def build_usage_workflow_config( workflow_config = build_usage_config_from_file(ingestion_pipeline, tmp_file) else: - # If dir does not exist, create it - if not Path(location).parent.is_dir(): - Path(location).parent.mkdir(parents=True, exist_ok=True) - workflow_config = build_usage_config_from_file(ingestion_pipeline, location) return workflow_config