Create stage location in the workflow, not the apis (#10676)

* Create stage location in the workflow, not the apis

* Format
This commit is contained in:
Pere Miquel Brull 2023-03-20 18:14:27 +01:00 committed by GitHub
parent d5afb864ab
commit cdb14f30ff
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 15 additions and 12 deletions

View File

@ -17,6 +17,7 @@ import json
import os
import shutil
import traceback
from pathlib import Path
from metadata.config.common import ConfigModel
from metadata.generated.schema.api.data.createQuery import CreateQueryRequest
@ -61,12 +62,9 @@ class TableUsageStage(Stage[QueryParserData]):
self.status = StageStatus()
self.table_usage = {}
self.table_queries = {}
isdir = os.path.isdir(self.config.filename)
if not isdir:
os.mkdir(self.config.filename)
else:
shutil.rmtree(self.config.filename)
os.mkdir(self.config.filename)
self.init_location()
self.wrote_something = False
@classmethod
@ -74,6 +72,17 @@ class TableUsageStage(Stage[QueryParserData]):
config = TableStageConfig.parse_obj(config_dict)
return cls(config, metadata_config)
def init_location(self) -> None:
"""
Prepare the usage location
"""
location = Path(self.config.filename)
if location.is_dir():
logger.info("Location exists, cleaning it up")
shutil.rmtree(self.config.filename)
logger.info(f"Creating the directory to store staging data in {location}")
location.mkdir(parents=True, exist_ok=True)
def _get_user_entity(self, username: str):
if username:
user = self.metadata.get_by_name(entity=User, fqn=username)

View File

@ -12,7 +12,6 @@
Data Insights DAG function builder
"""
import json
from typing import cast
from airflow import DAG
from openmetadata_managed_apis.utils.logger import set_operator_logger

View File

@ -12,7 +12,6 @@
Metadata DAG function builder
"""
import tempfile
from pathlib import Path
from airflow import DAG
from openmetadata_managed_apis.workflows.ingestion.common import (
@ -76,10 +75,6 @@ def build_usage_workflow_config(
workflow_config = build_usage_config_from_file(ingestion_pipeline, tmp_file)
else:
# If dir does not exist, create it
if not Path(location).parent.is_dir():
Path(location).parent.mkdir(parents=True, exist_ok=True)
workflow_config = build_usage_config_from_file(ingestion_pipeline, location)
return workflow_config