From 53930a9253b245a461cb80a321290733bc60ca2a Mon Sep 17 00:00:00 2001 From: Pere Miquel Brull Date: Wed, 27 Jul 2022 07:47:25 +0200 Subject: [PATCH] Fix #6283 #6281 #6279 - Send compressed log, backend logs and Airflow fix (#6288) Fix #6283 #6281 #6279 - Send compressed log, backend logs and Airflow fix (#6288) --- .../catalog/airflow/AirflowRESTClient.java | 15 +++++---- .../ingestion/source/pipeline/airflow.py | 5 +-- ingestion/src/metadata/utils/helpers.py | 4 +-- .../src/openmetadata/api/apis_metadata.py | 6 ++++ .../src/openmetadata/api/rest_api.py | 7 +++-- .../openmetadata/operations/last_dag_logs.py | 24 +++++++++----- .../test_workflow_creation.py | 10 ------ .../tests/unit/test_helpers.py | 31 +++++++++++++++++++ .../src/main/resources/ui/package.json | 2 ++ .../IngestionLogsModal.test.tsx | 4 +++ .../IngestionLogsModal/IngestionLogsModal.tsx | 7 +++-- .../resources/ui/src/utils/ingestionutils.ts | 22 +++++++++++++ .../src/main/resources/ui/yarn.lock | 10 ++++++ 13 files changed, 114 insertions(+), 33 deletions(-) create mode 100644 openmetadata-airflow-apis/tests/unit/test_helpers.py create mode 100644 openmetadata-ui/src/main/resources/ui/src/utils/ingestionutils.ts diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/airflow/AirflowRESTClient.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/airflow/AirflowRESTClient.java index 73d83be3a7c..6b01cacb282 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/airflow/AirflowRESTClient.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/airflow/AirflowRESTClient.java @@ -187,15 +187,16 @@ public class AirflowRESTClient extends PipelineServiceClient { @Override public HttpResponse getServiceStatus() { + HttpResponse response; try { - HttpResponse response = requestNoAuthForJsonContent("%s/rest_api/health", serviceURL); + response = requestNoAuthForJsonContent("%s/rest_api/health", serviceURL); if (response.statusCode() == 200) { return response; } } catch (Exception e) { - throw new PipelineServiceClientException("Failed to get REST status."); + throw PipelineServiceClientException.byMessage("Failed to get REST status.", e.getMessage()); } - throw new PipelineServiceClientException("Failed to get REST status."); + throw new PipelineServiceClientException(String.format("Failed to get REST status due to %s.", response.body())); } @Override @@ -236,17 +237,19 @@ public class AirflowRESTClient extends PipelineServiceClient { @Override public Map getLastIngestionLogs(IngestionPipeline ingestionPipeline) { + HttpResponse response; try { - HttpResponse response = + response = requestAuthenticatedForJsonContent( "%s/rest_api/api?api=last_dag_logs&dag_id=%s", serviceURL, ingestionPipeline.getName()); if (response.statusCode() == 200) { return JsonUtils.readValue(response.body(), new TypeReference<>() {}); } } catch (Exception e) { - throw new PipelineServiceClientException("Failed to get last ingestion logs."); + throw PipelineServiceClientException.byMessage("Failed to get last ingestion logs.", e.getMessage()); } - throw new PipelineServiceClientException("Failed to get last ingestion logs."); + throw new PipelineServiceClientException( + String.format("Failed to get last ingestion logs due to %s", response.body())); } private HttpResponse requestAuthenticatedForJsonContent(String stringUrlFormat, Object... stringReplacement) diff --git a/ingestion/src/metadata/ingestion/source/pipeline/airflow.py b/ingestion/src/metadata/ingestion/source/pipeline/airflow.py index 1163437b63b..205a2dd50f6 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/airflow.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/airflow.py @@ -12,7 +12,6 @@ Airflow source to extract metadata from OM UI """ import traceback -from collections.abc import Iterable from typing import Any, Iterable, List, Optional, cast from airflow.models import BaseOperator, DagRun @@ -127,7 +126,9 @@ class AirflowSource(PipelineServiceSource): task.state, StatusType.Pending.value ), startTime=datetime_to_ts(task.start_date), - endTime=datetime_to_ts(task.end_date), + endTime=datetime_to_ts( + task.end_date + ), # Might be None for running tasks logLink=task.log_url, ) for task in tasks diff --git a/ingestion/src/metadata/utils/helpers.py b/ingestion/src/metadata/utils/helpers.py index 4a04b7a5fd3..651251739bd 100644 --- a/ingestion/src/metadata/utils/helpers.py +++ b/ingestion/src/metadata/utils/helpers.py @@ -201,11 +201,11 @@ def get_storage_service_or_create(service_json, metadata_config) -> StorageServi return created_service -def datetime_to_ts(date: datetime) -> int: +def datetime_to_ts(date: Optional[datetime]) -> Optional[int]: """ Convert a given date to a timestamp as an Int in milliseconds """ - return int(date.timestamp() * 1_000) + return int(date.timestamp() * 1_000) if date else None def get_formatted_entity_name(name: str) -> Optional[str]: diff --git a/openmetadata-airflow-apis/src/openmetadata/api/apis_metadata.py b/openmetadata-airflow-apis/src/openmetadata/api/apis_metadata.py index 1ce13ec703e..089eb5a36d8 100644 --- a/openmetadata-airflow-apis/src/openmetadata/api/apis_metadata.py +++ b/openmetadata-airflow-apis/src/openmetadata/api/apis_metadata.py @@ -92,6 +92,12 @@ APIS_METADATA = [ "form_input_type": "text", "required": True, }, + { + "name": "compress", + "description": "Return the logs as gzip", + "form_input_type": "bool", + "required": False, + }, ], }, { diff --git a/openmetadata-airflow-apis/src/openmetadata/api/rest_api.py b/openmetadata-airflow-apis/src/openmetadata/api/rest_api.py index d0b79c9a68a..afe97b8f3eb 100644 --- a/openmetadata-airflow-apis/src/openmetadata/api/rest_api.py +++ b/openmetadata-airflow-apis/src/openmetadata/api/rest_api.py @@ -14,7 +14,7 @@ Airflow REST API definition import logging import traceback -from typing import Optional +from typing import Any, Optional from airflow import settings from airflow.models import DagBag, DagModel @@ -63,7 +63,7 @@ class REST_API(AppBuilderBaseView): return dagbag @staticmethod - def get_request_arg(req, arg) -> Optional[str]: + def get_request_arg(req, arg) -> Optional[Any]: return req.args.get(arg) or req.form.get(arg) def get_arg_dag_id(self) -> Optional[str]: @@ -292,6 +292,7 @@ class REST_API(AppBuilderBaseView): Retrieve all logs from the task instances of a last DAG run """ raw_dag_id: str = self.get_request_arg(request, "dag_id") + compress: bool = self.get_request_arg(request, "compress") if not raw_dag_id: ApiResponse.bad_request("Missing dag_id parameter in the request") @@ -299,7 +300,7 @@ class REST_API(AppBuilderBaseView): dag_id = clean_dag_id(raw_dag_id) try: - return last_dag_logs(dag_id) + return last_dag_logs(dag_id, compress or True) except Exception as exc: logging.info(f"Failed to get last run logs for '{dag_id}'") diff --git a/openmetadata-airflow-apis/src/openmetadata/operations/last_dag_logs.py b/openmetadata-airflow-apis/src/openmetadata/operations/last_dag_logs.py index 88677e5fa7d..faea4904db7 100644 --- a/openmetadata-airflow-apis/src/openmetadata/operations/last_dag_logs.py +++ b/openmetadata-airflow-apis/src/openmetadata/operations/last_dag_logs.py @@ -11,20 +11,23 @@ """ Module containing the logic to retrieve all logs from the tasks of a last DAG run """ +import base64 import glob +import gzip import os from pathlib import Path -from airflow.models import DagModel, DagRun +from airflow.models import DagModel from flask import Response -from openmetadata.api.response import ApiResponse, ResponseFormat +from openmetadata.api.response import ApiResponse -def last_dag_logs(dag_id: str) -> Response: +def last_dag_logs(dag_id: str, compress: bool = True) -> Response: """ Validate that the DAG is registered by Airflow and have at least one Run. If exists, returns all logs for each task instance of the last DAG run. :param dag_id: DAG to find + :param compress: to compress the results or not :return: API Response """ @@ -54,11 +57,18 @@ def last_dag_logs(dag_id: str) -> Response: filter(os.path.isfile, glob.glob(f"{dir_path}/*.log")), key=os.path.getmtime, ) - response[ - task_instance.task_id - ] = f"\n*** Reading local file: {task_instance.log_filepath}\n".join( - [Path(log).read_text() for log in sorted_logs] + + log_res = f"\n*** Reading local file: {task_instance.log_filepath}\n".join( + [Path(sorted_logs[-1]).read_text()] ) + + # Return the base64 encoding of the response, removing the b'...' trailers + response[task_instance.task_id] = ( + str(base64.b64encode(gzip.compress(bytes(log_res, "utf-8"))))[2:-1] + if compress + else log_res + ) + else: return ApiResponse.not_found( f"Logs for task instance '{task_instance}' of DAG '{dag_id}' not found." diff --git a/openmetadata-airflow-apis/tests/unit/ingestion_pipeline/test_workflow_creation.py b/openmetadata-airflow-apis/tests/unit/ingestion_pipeline/test_workflow_creation.py index 751637b31dd..c2f1fb53d2f 100644 --- a/openmetadata-airflow-apis/tests/unit/ingestion_pipeline/test_workflow_creation.py +++ b/openmetadata-airflow-apis/tests/unit/ingestion_pipeline/test_workflow_creation.py @@ -16,7 +16,6 @@ import json import uuid from unittest import TestCase -from openmetadata.helpers import clean_dag_id from openmetadata.workflows.ingestion.metadata import build_metadata_workflow_config from openmetadata.workflows.ingestion.profiler import build_profiler_workflow_config from openmetadata.workflows.ingestion.usage import build_usage_workflow_config @@ -124,15 +123,6 @@ class OMetaServiceTest(TestCase): hard_delete=True, ) - def test_clean_dag_id(self): - """ - Validate dag_id clean - """ - self.assertEqual(clean_dag_id("hello"), "hello") - self.assertEqual(clean_dag_id("hello(world)"), "hello_world_") - self.assertEqual(clean_dag_id("hello-world"), "hello-world") - self.assertEqual(clean_dag_id("%%&^++hello__"), "_hello__") - def test_ingestion_workflow(self): """ Validate that the ingestionPipeline can be parsed diff --git a/openmetadata-airflow-apis/tests/unit/test_helpers.py b/openmetadata-airflow-apis/tests/unit/test_helpers.py new file mode 100644 index 00000000000..dc5fa2375ff --- /dev/null +++ b/openmetadata-airflow-apis/tests/unit/test_helpers.py @@ -0,0 +1,31 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Test helper functions +""" +from unittest import TestCase + +from openmetadata.helpers import clean_dag_id + + +class TestHelpers(TestCase): + """ + Methods to validate helpers on REST APIs + """ + + def test_clean_dag_id(self): + """ + To make sure airflow can parse it + """ + self.assertEqual(clean_dag_id("hello"), "hello") + self.assertEqual(clean_dag_id("hello(world)"), "hello_world_") + self.assertEqual(clean_dag_id("hello-world"), "hello-world") + self.assertEqual(clean_dag_id("%%&^++hello__"), "_hello__") diff --git a/openmetadata-ui/src/main/resources/ui/package.json b/openmetadata-ui/src/main/resources/ui/package.json index ff2a2612ac5..75a91801fc7 100644 --- a/openmetadata-ui/src/main/resources/ui/package.json +++ b/openmetadata-ui/src/main/resources/ui/package.json @@ -52,6 +52,7 @@ "mobx-react": "6.1.4", "moment": "^2.29.1", "oidc-client": "^1.11.5", + "pako": "^2.0.4", "path-browserify": "^1.0.1", "postcss": "^8.3.2", "process": "^0.11.10", @@ -147,6 +148,7 @@ "@types/jest": "^26.0.23", "@types/lodash": "^4.14.167", "@types/node": "^15.6.1", + "@types/pako": "^2.0.0", "@types/react": "^17.0.8", "@types/react-copy-to-clipboard": "^5.0.2", "@types/react-dom": "^17.0.11", diff --git a/openmetadata-ui/src/main/resources/ui/src/components/Modals/IngestionLogsModal/IngestionLogsModal.test.tsx b/openmetadata-ui/src/main/resources/ui/src/components/Modals/IngestionLogsModal/IngestionLogsModal.test.tsx index 487afc2dd84..197738131b9 100644 --- a/openmetadata-ui/src/main/resources/ui/src/components/Modals/IngestionLogsModal/IngestionLogsModal.test.tsx +++ b/openmetadata-ui/src/main/resources/ui/src/components/Modals/IngestionLogsModal/IngestionLogsModal.test.tsx @@ -31,6 +31,10 @@ jest.mock('../../../axiosAPIs/ingestionPipelineAPI', () => ({ ), })); +jest.mock('../../../utils/ingestionutils', () => ({ + gzipToStringConverter: jest.fn().mockImplementation(() => 'logs'), +})); + jest.mock('../../buttons/CopyToClipboardButton/CopyToClipboardButton', () => jest.fn().mockReturnValue() ); diff --git a/openmetadata-ui/src/main/resources/ui/src/components/Modals/IngestionLogsModal/IngestionLogsModal.tsx b/openmetadata-ui/src/main/resources/ui/src/components/Modals/IngestionLogsModal/IngestionLogsModal.tsx index 0ce2451c0be..b77eb03c2b4 100644 --- a/openmetadata-ui/src/main/resources/ui/src/components/Modals/IngestionLogsModal/IngestionLogsModal.tsx +++ b/openmetadata-ui/src/main/resources/ui/src/components/Modals/IngestionLogsModal/IngestionLogsModal.tsx @@ -19,6 +19,7 @@ import { isNil } from 'lodash'; import React, { FC, Fragment, useEffect, useState } from 'react'; import { getIngestionPipelineLogById } from '../../../axiosAPIs/ingestionPipelineAPI'; import { PipelineType } from '../../../generated/entity/services/ingestionPipelines/ingestionPipeline'; +import { gzipToStringConverter } from '../../../utils/ingestionutils'; import { showErrorToast } from '../../../utils/ToastUtils'; import CopyToClipboardButton from '../../buttons/CopyToClipboardButton/CopyToClipboardButton'; import Loader from '../../Loader/Loader'; @@ -48,15 +49,15 @@ const IngestionLogsModal: FC = ({ .then((res: AxiosResponse) => { switch (pipelineType) { case PipelineType.Metadata: - setLogs(res.data?.ingestion_task || ''); + setLogs(gzipToStringConverter(res.data?.ingestion_task || '')); break; case PipelineType.Profiler: - setLogs(res.data?.profiler_task || ''); + setLogs(gzipToStringConverter(res.data?.profiler_task || '')); break; case PipelineType.Usage: - setLogs(res.data?.usage_task || ''); + setLogs(gzipToStringConverter(res.data?.usage_task || '')); break; diff --git a/openmetadata-ui/src/main/resources/ui/src/utils/ingestionutils.ts b/openmetadata-ui/src/main/resources/ui/src/utils/ingestionutils.ts new file mode 100644 index 00000000000..9374450e093 --- /dev/null +++ b/openmetadata-ui/src/main/resources/ui/src/utils/ingestionutils.ts @@ -0,0 +1,22 @@ +/* + * Copyright 2022 Collate + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { Buffer } from 'buffer'; +import { ungzip } from 'pako'; + +export const gzipToStringConverter = (data: string) => { + const gzipedDataBuffer = Buffer.from(data, 'base64'); + const ungzipedData = ungzip(gzipedDataBuffer); + + return new TextDecoder().decode(ungzipedData); +}; diff --git a/openmetadata-ui/src/main/resources/ui/yarn.lock b/openmetadata-ui/src/main/resources/ui/yarn.lock index d5c36b76e34..ae88a4cdeab 100644 --- a/openmetadata-ui/src/main/resources/ui/yarn.lock +++ b/openmetadata-ui/src/main/resources/ui/yarn.lock @@ -3126,6 +3126,11 @@ resolved "https://registry.yarnpkg.com/@types/normalize-package-data/-/normalize-package-data-2.4.1.tgz#d3357479a0fdfdd5907fe67e17e0a85c906e1301" integrity sha512-Gj7cI7z+98M282Tqmp2K5EIsoouUEzbBJhQQzDE3jSIRk6r9gsz0oUokqIUR4u1R3dMHo0pDHM7sNOHyhulypw== +"@types/pako@^2.0.0": + version "2.0.0" + resolved "https://registry.yarnpkg.com/@types/pako/-/pako-2.0.0.tgz#12ab4c19107528452e73ac99132c875ccd43bdfb" + integrity sha512-10+iaz93qR5WYxTo+PMifD5TSxiOtdRaxBf7INGGXMQgTCu8Z/7GYWYFUOS3q/G0nE5boj1r4FEB+WSy7s5gbA== + "@types/parse-json@^4.0.0": version "4.0.0" resolved "https://registry.yarnpkg.com/@types/parse-json/-/parse-json-4.0.0.tgz#2f8bb441434d163b35fb8ffdccd7138927ffb8c0" @@ -11415,6 +11420,11 @@ p-try@^2.0.0: resolved "https://registry.yarnpkg.com/p-try/-/p-try-2.2.0.tgz#cb2868540e313d61de58fafbe35ce9004d5540e6" integrity sha512-R4nPAVTAU0B9D35/Gk3uJf/7XYbQcyohSKdvAxIRSNghFl4e71hVoGnBNQz9cWaXxO2I10KTC+3jMdvvoKw6dQ== +pako@^2.0.4: + version "2.0.4" + resolved "https://registry.yarnpkg.com/pako/-/pako-2.0.4.tgz#6cebc4bbb0b6c73b0d5b8d7e8476e2b2fbea576d" + integrity sha512-v8tweI900AUkZN6heMU/4Uy4cXRc2AYNRggVmTR+dEncawDJgCdLMximOVA2p4qO57WMynangsfGRb5WD6L1Bg== + param-case@^3.0.3: version "3.0.4" resolved "https://registry.yarnpkg.com/param-case/-/param-case-3.0.4.tgz#7d17fe4aa12bde34d4a77d91acfb6219caad01c5"