Fix #6283 #6281 #6279 - Send compressed log, backend logs and Airflow fix (#6288)

Fix #6283 #6281 #6279 - Send compressed log, backend logs and Airflow fix (#6288)
This commit is contained in:
Pere Miquel Brull 2022-07-27 07:47:25 +02:00 committed by GitHub
parent aabdb3c5f7
commit 53930a9253
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 114 additions and 33 deletions

View File

@ -187,15 +187,16 @@ public class AirflowRESTClient extends PipelineServiceClient {
@Override
public HttpResponse<String> getServiceStatus() {
HttpResponse<String> response;
try {
HttpResponse<String> response = requestNoAuthForJsonContent("%s/rest_api/health", serviceURL);
response = requestNoAuthForJsonContent("%s/rest_api/health", serviceURL);
if (response.statusCode() == 200) {
return response;
}
} catch (Exception e) {
throw new PipelineServiceClientException("Failed to get REST status.");
throw PipelineServiceClientException.byMessage("Failed to get REST status.", e.getMessage());
}
throw new PipelineServiceClientException("Failed to get REST status.");
throw new PipelineServiceClientException(String.format("Failed to get REST status due to %s.", response.body()));
}
@Override
@ -236,17 +237,19 @@ public class AirflowRESTClient extends PipelineServiceClient {
@Override
public Map<String, String> getLastIngestionLogs(IngestionPipeline ingestionPipeline) {
HttpResponse<String> response;
try {
HttpResponse<String> response =
response =
requestAuthenticatedForJsonContent(
"%s/rest_api/api?api=last_dag_logs&dag_id=%s", serviceURL, ingestionPipeline.getName());
if (response.statusCode() == 200) {
return JsonUtils.readValue(response.body(), new TypeReference<>() {});
}
} catch (Exception e) {
throw new PipelineServiceClientException("Failed to get last ingestion logs.");
throw PipelineServiceClientException.byMessage("Failed to get last ingestion logs.", e.getMessage());
}
throw new PipelineServiceClientException("Failed to get last ingestion logs.");
throw new PipelineServiceClientException(
String.format("Failed to get last ingestion logs due to %s", response.body()));
}
private HttpResponse<String> requestAuthenticatedForJsonContent(String stringUrlFormat, Object... stringReplacement)

View File

@ -12,7 +12,6 @@
Airflow source to extract metadata from OM UI
"""
import traceback
from collections.abc import Iterable
from typing import Any, Iterable, List, Optional, cast
from airflow.models import BaseOperator, DagRun
@ -127,7 +126,9 @@ class AirflowSource(PipelineServiceSource):
task.state, StatusType.Pending.value
),
startTime=datetime_to_ts(task.start_date),
endTime=datetime_to_ts(task.end_date),
endTime=datetime_to_ts(
task.end_date
), # Might be None for running tasks
logLink=task.log_url,
)
for task in tasks

View File

@ -201,11 +201,11 @@ def get_storage_service_or_create(service_json, metadata_config) -> StorageServi
return created_service
def datetime_to_ts(date: datetime) -> int:
def datetime_to_ts(date: Optional[datetime]) -> Optional[int]:
"""
Convert a given date to a timestamp as an Int in milliseconds
"""
return int(date.timestamp() * 1_000)
return int(date.timestamp() * 1_000) if date else None
def get_formatted_entity_name(name: str) -> Optional[str]:

View File

@ -92,6 +92,12 @@ APIS_METADATA = [
"form_input_type": "text",
"required": True,
},
{
"name": "compress",
"description": "Return the logs as gzip",
"form_input_type": "bool",
"required": False,
},
],
},
{

View File

@ -14,7 +14,7 @@ Airflow REST API definition
import logging
import traceback
from typing import Optional
from typing import Any, Optional
from airflow import settings
from airflow.models import DagBag, DagModel
@ -63,7 +63,7 @@ class REST_API(AppBuilderBaseView):
return dagbag
@staticmethod
def get_request_arg(req, arg) -> Optional[str]:
def get_request_arg(req, arg) -> Optional[Any]:
return req.args.get(arg) or req.form.get(arg)
def get_arg_dag_id(self) -> Optional[str]:
@ -292,6 +292,7 @@ class REST_API(AppBuilderBaseView):
Retrieve all logs from the task instances of a last DAG run
"""
raw_dag_id: str = self.get_request_arg(request, "dag_id")
compress: bool = self.get_request_arg(request, "compress")
if not raw_dag_id:
ApiResponse.bad_request("Missing dag_id parameter in the request")
@ -299,7 +300,7 @@ class REST_API(AppBuilderBaseView):
dag_id = clean_dag_id(raw_dag_id)
try:
return last_dag_logs(dag_id)
return last_dag_logs(dag_id, compress or True)
except Exception as exc:
logging.info(f"Failed to get last run logs for '{dag_id}'")

View File

@ -11,20 +11,23 @@
"""
Module containing the logic to retrieve all logs from the tasks of a last DAG run
"""
import base64
import glob
import gzip
import os
from pathlib import Path
from airflow.models import DagModel, DagRun
from airflow.models import DagModel
from flask import Response
from openmetadata.api.response import ApiResponse, ResponseFormat
from openmetadata.api.response import ApiResponse
def last_dag_logs(dag_id: str) -> Response:
def last_dag_logs(dag_id: str, compress: bool = True) -> Response:
"""
Validate that the DAG is registered by Airflow and have at least one Run.
If exists, returns all logs for each task instance of the last DAG run.
:param dag_id: DAG to find
:param compress: to compress the results or not
:return: API Response
"""
@ -54,11 +57,18 @@ def last_dag_logs(dag_id: str) -> Response:
filter(os.path.isfile, glob.glob(f"{dir_path}/*.log")),
key=os.path.getmtime,
)
response[
task_instance.task_id
] = f"\n*** Reading local file: {task_instance.log_filepath}\n".join(
[Path(log).read_text() for log in sorted_logs]
log_res = f"\n*** Reading local file: {task_instance.log_filepath}\n".join(
[Path(sorted_logs[-1]).read_text()]
)
# Return the base64 encoding of the response, removing the b'...' trailers
response[task_instance.task_id] = (
str(base64.b64encode(gzip.compress(bytes(log_res, "utf-8"))))[2:-1]
if compress
else log_res
)
else:
return ApiResponse.not_found(
f"Logs for task instance '{task_instance}' of DAG '{dag_id}' not found."

View File

@ -16,7 +16,6 @@ import json
import uuid
from unittest import TestCase
from openmetadata.helpers import clean_dag_id
from openmetadata.workflows.ingestion.metadata import build_metadata_workflow_config
from openmetadata.workflows.ingestion.profiler import build_profiler_workflow_config
from openmetadata.workflows.ingestion.usage import build_usage_workflow_config
@ -124,15 +123,6 @@ class OMetaServiceTest(TestCase):
hard_delete=True,
)
def test_clean_dag_id(self):
"""
Validate dag_id clean
"""
self.assertEqual(clean_dag_id("hello"), "hello")
self.assertEqual(clean_dag_id("hello(world)"), "hello_world_")
self.assertEqual(clean_dag_id("hello-world"), "hello-world")
self.assertEqual(clean_dag_id("%%&^++hello__"), "_hello__")
def test_ingestion_workflow(self):
"""
Validate that the ingestionPipeline can be parsed

View File

@ -0,0 +1,31 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Test helper functions
"""
from unittest import TestCase
from openmetadata.helpers import clean_dag_id
class TestHelpers(TestCase):
"""
Methods to validate helpers on REST APIs
"""
def test_clean_dag_id(self):
"""
To make sure airflow can parse it
"""
self.assertEqual(clean_dag_id("hello"), "hello")
self.assertEqual(clean_dag_id("hello(world)"), "hello_world_")
self.assertEqual(clean_dag_id("hello-world"), "hello-world")
self.assertEqual(clean_dag_id("%%&^++hello__"), "_hello__")

View File

@ -52,6 +52,7 @@
"mobx-react": "6.1.4",
"moment": "^2.29.1",
"oidc-client": "^1.11.5",
"pako": "^2.0.4",
"path-browserify": "^1.0.1",
"postcss": "^8.3.2",
"process": "^0.11.10",
@ -147,6 +148,7 @@
"@types/jest": "^26.0.23",
"@types/lodash": "^4.14.167",
"@types/node": "^15.6.1",
"@types/pako": "^2.0.0",
"@types/react": "^17.0.8",
"@types/react-copy-to-clipboard": "^5.0.2",
"@types/react-dom": "^17.0.11",

View File

@ -31,6 +31,10 @@ jest.mock('../../../axiosAPIs/ingestionPipelineAPI', () => ({
),
}));
jest.mock('../../../utils/ingestionutils', () => ({
gzipToStringConverter: jest.fn().mockImplementation(() => 'logs'),
}));
jest.mock('../../buttons/CopyToClipboardButton/CopyToClipboardButton', () =>
jest.fn().mockReturnValue(<button data-testid="copy">copy</button>)
);

View File

@ -19,6 +19,7 @@ import { isNil } from 'lodash';
import React, { FC, Fragment, useEffect, useState } from 'react';
import { getIngestionPipelineLogById } from '../../../axiosAPIs/ingestionPipelineAPI';
import { PipelineType } from '../../../generated/entity/services/ingestionPipelines/ingestionPipeline';
import { gzipToStringConverter } from '../../../utils/ingestionutils';
import { showErrorToast } from '../../../utils/ToastUtils';
import CopyToClipboardButton from '../../buttons/CopyToClipboardButton/CopyToClipboardButton';
import Loader from '../../Loader/Loader';
@ -48,15 +49,15 @@ const IngestionLogsModal: FC<IngestionLogsModalProps> = ({
.then((res: AxiosResponse) => {
switch (pipelineType) {
case PipelineType.Metadata:
setLogs(res.data?.ingestion_task || '');
setLogs(gzipToStringConverter(res.data?.ingestion_task || ''));
break;
case PipelineType.Profiler:
setLogs(res.data?.profiler_task || '');
setLogs(gzipToStringConverter(res.data?.profiler_task || ''));
break;
case PipelineType.Usage:
setLogs(res.data?.usage_task || '');
setLogs(gzipToStringConverter(res.data?.usage_task || ''));
break;

View File

@ -0,0 +1,22 @@
/*
* Copyright 2022 Collate
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { Buffer } from 'buffer';
import { ungzip } from 'pako';
export const gzipToStringConverter = (data: string) => {
const gzipedDataBuffer = Buffer.from(data, 'base64');
const ungzipedData = ungzip(gzipedDataBuffer);
return new TextDecoder().decode(ungzipedData);
};

View File

@ -3126,6 +3126,11 @@
resolved "https://registry.yarnpkg.com/@types/normalize-package-data/-/normalize-package-data-2.4.1.tgz#d3357479a0fdfdd5907fe67e17e0a85c906e1301"
integrity sha512-Gj7cI7z+98M282Tqmp2K5EIsoouUEzbBJhQQzDE3jSIRk6r9gsz0oUokqIUR4u1R3dMHo0pDHM7sNOHyhulypw==
"@types/pako@^2.0.0":
version "2.0.0"
resolved "https://registry.yarnpkg.com/@types/pako/-/pako-2.0.0.tgz#12ab4c19107528452e73ac99132c875ccd43bdfb"
integrity sha512-10+iaz93qR5WYxTo+PMifD5TSxiOtdRaxBf7INGGXMQgTCu8Z/7GYWYFUOS3q/G0nE5boj1r4FEB+WSy7s5gbA==
"@types/parse-json@^4.0.0":
version "4.0.0"
resolved "https://registry.yarnpkg.com/@types/parse-json/-/parse-json-4.0.0.tgz#2f8bb441434d163b35fb8ffdccd7138927ffb8c0"
@ -11415,6 +11420,11 @@ p-try@^2.0.0:
resolved "https://registry.yarnpkg.com/p-try/-/p-try-2.2.0.tgz#cb2868540e313d61de58fafbe35ce9004d5540e6"
integrity sha512-R4nPAVTAU0B9D35/Gk3uJf/7XYbQcyohSKdvAxIRSNghFl4e71hVoGnBNQz9cWaXxO2I10KTC+3jMdvvoKw6dQ==
pako@^2.0.4:
version "2.0.4"
resolved "https://registry.yarnpkg.com/pako/-/pako-2.0.4.tgz#6cebc4bbb0b6c73b0d5b8d7e8476e2b2fbea576d"
integrity sha512-v8tweI900AUkZN6heMU/4Uy4cXRc2AYNRggVmTR+dEncawDJgCdLMximOVA2p4qO57WMynangsfGRb5WD6L1Bg==
param-case@^3.0.3:
version "3.0.4"
resolved "https://registry.yarnpkg.com/param-case/-/param-case-3.0.4.tgz#7d17fe4aa12bde34d4a77d91acfb6219caad01c5"