2025-04-03 10:39:47 +05:30

83 lines
2.7 KiB
Python

# Copyright 2025 Collate
# Licensed under the Collate Community License, Version 1.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
from typing import Optional, Union
from airflow.models import DagRun
from flask import Response
from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import (
PipelineStatus,
)
from metadata.utils.helpers import datetime_to_ts
class ApiResponse:
"""
Helper class to respond API calls
"""
STATUS_OK = 200
STATUS_BAD_REQUEST = 400
STATUS_UNAUTHORIZED = 401
STATUS_NOT_FOUND = 404
STATUS_SERVER_ERROR = 500
@staticmethod
def standard_response(status, response_obj):
json_data = json.dumps(response_obj)
resp = Response(json_data, status=status, mimetype="application/json")
return resp
@staticmethod
def success(response_obj: Union[Optional[dict], Optional[list]] = None):
response_body = response_obj if response_obj is not None else {}
return ApiResponse.standard_response(ApiResponse.STATUS_OK, response_body)
@staticmethod
def error(status, error):
return ApiResponse.standard_response(status, {"error": error})
@staticmethod
def bad_request(error):
return ApiResponse.error(ApiResponse.STATUS_BAD_REQUEST, error)
@staticmethod
def not_found(error="Resource not found"):
return ApiResponse.error(ApiResponse.STATUS_NOT_FOUND, error)
@staticmethod
def unauthorized(error="Not authorized to access this resource"):
return ApiResponse.error(ApiResponse.STATUS_UNAUTHORIZED, error)
@staticmethod
def server_error(error="An unexpected problem occurred"):
return ApiResponse.error(ApiResponse.STATUS_SERVER_ERROR, error)
class ResponseFormat:
"""
Handle how to manage responses
"""
@staticmethod
def format_dag_run_state(dag_run: DagRun) -> PipelineStatus:
"""
Build the pipeline status
"""
return PipelineStatus(
pipelineState=dag_run.get_state(),
runId=dag_run.run_id,
startDate=datetime_to_ts(dag_run.start_date),
endDate=datetime_to_ts(dag_run.end_date),
timestamp=datetime_to_ts(dag_run.execution_date),
)