MINOR - Add specific endpoint for log downloads in streaming (#23021)

* MINOR - Add specific endpoint for log downloads in streaming

* Fix unit tests

* Fix unit test

---------

Co-authored-by: Aniket Katkar <aniketkatkar97@gmail.com>
(cherry picked from commit 6b075e3c1853a4ae8a5e9ad1453a7f17389791ac)
This commit is contained in:
Pere Miquel Brull 2025-08-27 08:25:38 +02:00 committed by OpenMetadata Release Bot
parent dc548189eb
commit 5fefbe3ca1
5 changed files with 146 additions and 64 deletions

View File

@ -48,7 +48,9 @@ import jakarta.ws.rs.core.Context;
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.Response;
import jakarta.ws.rs.core.SecurityContext;
import jakarta.ws.rs.core.StreamingOutput;
import jakarta.ws.rs.core.UriInfo;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@ -920,6 +922,77 @@ public class IngestionPipelineResource
return Response.ok(lastIngestionLogs, MediaType.APPLICATION_JSON_TYPE).build();
}
@GET
@Path("/logs/{id}/last/download")
@Produces(MediaType.APPLICATION_OCTET_STREAM)
@Operation(
operationId = "downloadLastIngestionLogs",
summary = "Download all logs from last ingestion pipeline run as a stream",
description = "Stream all logs from last ingestion pipeline run by `Id` for download.",
responses = {
@ApiResponse(
responseCode = "200",
description = "Log content as a downloadable stream",
content = @Content(mediaType = "application/octet-stream")),
@ApiResponse(responseCode = "404", description = "Logs for instance {id} is not found")
})
public Response downloadLastIngestionLogs(
@Context UriInfo uriInfo,
@Context SecurityContext securityContext,
@Parameter(description = "Id of the ingestion pipeline", schema = @Schema(type = "UUID"))
@PathParam("id")
UUID id) {
try {
if (pipelineServiceClient == null) {
return Response.status(200).entity("Pipeline Client Disabled").build();
}
IngestionPipeline ingestionPipeline =
getInternal(uriInfo, securityContext, id, FIELDS, Include.NON_DELETED);
String filename =
String.format(
"ingestion_logs_%s_%d.txt", ingestionPipeline.getName(), System.currentTimeMillis());
StreamingOutput streamingOutput =
output -> {
String cursor = null;
boolean hasMoreData = true;
while (hasMoreData) {
Map<String, String> logChunk =
pipelineServiceClient.getLastIngestionLogs(ingestionPipeline, cursor);
if (logChunk == null || logChunk.isEmpty()) {
break;
}
for (Map.Entry<String, String> entry : logChunk.entrySet()) {
if (entry.getValue() != null
&& !entry.getKey().equals("after")
&& !entry.getKey().equals("total")) {
output.write(entry.getValue().getBytes(StandardCharsets.UTF_8));
output.write("\n".getBytes(StandardCharsets.UTF_8));
}
}
output.flush();
cursor = logChunk.get("after");
if (cursor == null) {
hasMoreData = false;
}
}
};
return Response.ok(streamingOutput)
.header("Content-Disposition", "attachment; filename=\"" + filename + "\"")
.build();
} catch (Exception e) {
return Response.status(Response.Status.BAD_REQUEST)
.entity("Error downloading logs: " + e.getMessage())
.build();
}
}
@PUT
@Path("/{fqn}/pipelineStatus")
@Operation(

View File

@ -15,7 +15,7 @@ import { DownloadOutlined } from '@ant-design/icons';
import { LazyLog } from '@melloware/react-logviewer';
import { Button, Col, Progress, Row, Space, Tooltip, Typography } from 'antd';
import { AxiosError } from 'axios';
import { isEmpty, isNil, isUndefined, round, toNumber } from 'lodash';
import { isEmpty, isNil, isUndefined, toNumber } from 'lodash';
import {
Fragment,
useCallback,
@ -296,32 +296,31 @@ const LogsViewerPage = () => {
const handleIngestionDownloadClick = async () => {
try {
reset();
const progress = round(
(Number(paging?.after) * 100) / Number(paging?.total)
);
updateProgress(paging?.after ? progress : 1);
let logs = '';
updateProgress(1);
let fileName = `${getEntityName(ingestionDetails)}-${
ingestionDetails?.pipelineType
}.log`;
if (isApplicationType) {
logs = await downloadAppLogs(ingestionName);
fileName = `${ingestionName}.log`;
} else {
logs = await downloadIngestionLog(
ingestionDetails?.id,
ingestionDetails?.pipelineType
);
}
const element = document.createElement('a');
const file = new Blob([logs || ''], { type: 'text/plain' });
element.href = URL.createObjectURL(file);
element.download = fileName;
document.body.appendChild(element);
element.click();
document.body.removeChild(element);
if (isApplicationType) {
const logs = await downloadAppLogs(ingestionName);
fileName = `${ingestionName}.log`;
const element = document.createElement('a');
const file = new Blob([logs || ''], { type: 'text/plain' });
element.href = URL.createObjectURL(file);
element.download = fileName;
document.body.appendChild(element);
element.click();
document.body.removeChild(element);
} else {
const logsBlob = await downloadIngestionLog(ingestionDetails?.id);
const element = document.createElement('a');
element.href = URL.createObjectURL(logsBlob as Blob);
element.download = fileName;
document.body.appendChild(element);
element.click();
document.body.removeChild(element);
}
} catch (err) {
showErrorToast(err as AxiosError);
} finally {

View File

@ -165,3 +165,12 @@ export const getRunHistoryForPipeline = async (
return response.data;
};
export const downloadIngestionPipelineLogsById = (id: string) => {
return APIClient.get(
`/services/ingestionPipelines/logs/${id}/last/download`,
{
responseType: 'blob',
}
);
};

View File

@ -13,7 +13,10 @@
import { PipelineType } from '../../generated/entity/services/ingestionPipelines/ingestionPipeline';
import { useDownloadProgressStore } from '../../hooks/useDownloadProgressStore';
import { IngestionPipelineLogByIdInterface } from '../../pages/LogsViewerPage/LogsViewerPage.interfaces';
import { getIngestionPipelineLogById } from '../../rest/ingestionPipelineAPI';
import {
downloadIngestionPipelineLogsById,
getIngestionPipelineLogById,
} from '../../rest/ingestionPipelineAPI';
import { showErrorToast } from '../ToastUtils';
import {
downloadIngestionLog,
@ -23,7 +26,23 @@ import {
const mockUpdateProgress = jest.fn();
jest.mock('../../rest/ingestionPipelineAPI');
jest.mock('../../rest/ingestionPipelineAPI', () => ({
getIngestionPipelineLogById: jest.fn().mockImplementation(() =>
Promise.resolve({
data: {
ingestion_task: 'metadata_logs_1',
total: '100',
after: '50',
},
})
),
downloadIngestionPipelineLogsById: jest.fn().mockImplementation(() =>
Promise.resolve({
data: 'downloaded_logs',
})
),
}));
jest.mock('../ToastUtils');
jest.mock('../../hooks/useDownloadProgressStore');
jest.mock('../../hooks/useDownloadProgressStore', () => ({
@ -144,56 +163,36 @@ describe('LogsUtils', () => {
describe('downloadIngestionLog', () => {
const ingestionId = '123';
const pipelineType = PipelineType.Metadata;
let mockFetchLogsRecursively: jest.SpyInstance;
beforeAll(() => {
mockFetchLogsRecursively = jest.spyOn(utils, 'fetchLogsRecursively');
});
afterAll(() => {
jest.clearAllMocks();
});
it('should return the downloaded logs', async () => {
const logs = 'metadata_logs';
const result = await downloadIngestionLog(ingestionId);
mockFetchLogsRecursively.mockResolvedValueOnce(logs);
const result = await downloadIngestionLog(ingestionId, pipelineType);
expect(mockFetchLogsRecursively).toHaveBeenCalledWith(
ingestionId,
pipelineType
expect(downloadIngestionPipelineLogsById).toHaveBeenCalledWith(
ingestionId
);
expect(result).toBe(logs);
expect(result).toBe('downloaded_logs');
});
it('should show error toast and return empty string if an error occurs', async () => {
const error = new Error('Failed to fetch logs');
(downloadIngestionPipelineLogsById as jest.Mock).mockRejectedValueOnce(
error
);
mockFetchLogsRecursively.mockRejectedValueOnce(error);
const result = await downloadIngestionLog(ingestionId);
const result = await downloadIngestionLog(ingestionId, pipelineType);
expect(mockFetchLogsRecursively).toHaveBeenCalledWith(
ingestionId,
pipelineType
expect(downloadIngestionPipelineLogsById).toHaveBeenCalledWith(
ingestionId
);
expect(showErrorToast).toHaveBeenCalledWith(error);
expect(result).toBe('');
});
it('should return empty string if ingestionId or pipelineType is not provided', async () => {
const result = await downloadIngestionLog(undefined, pipelineType);
const result = await downloadIngestionLog(undefined);
expect(mockFetchLogsRecursively).not.toHaveBeenCalled();
expect(downloadIngestionPipelineLogsById).not.toHaveBeenCalled();
expect(result).toBe('');
const result2 = await downloadIngestionLog(ingestionId, undefined);
expect(mockFetchLogsRecursively).not.toHaveBeenCalled();
expect(result2).toBe('');
});
});
});

View File

@ -16,7 +16,10 @@ import { PipelineType } from '../../generated/entity/services/ingestionPipelines
import { useDownloadProgressStore } from '../../hooks/useDownloadProgressStore';
import { IngestionPipelineLogByIdInterface } from '../../pages/LogsViewerPage/LogsViewerPage.interfaces';
import { getApplicationLogs } from '../../rest/applicationAPI';
import { getIngestionPipelineLogById } from '../../rest/ingestionPipelineAPI';
import {
downloadIngestionPipelineLogsById,
getIngestionPipelineLogById,
} from '../../rest/ingestionPipelineAPI';
import { showErrorToast } from '../ToastUtils';
export const getLogsFromResponse = (
@ -82,16 +85,15 @@ export const fetchLogsRecursively = async (
return logs;
};
export const downloadIngestionLog = async (
ingestionId?: string,
pipelineType?: PipelineType
) => {
if (!ingestionId || !pipelineType) {
export const downloadIngestionLog = async (ingestionId?: string) => {
if (!ingestionId) {
return '';
}
try {
return await fetchLogsRecursively(ingestionId, pipelineType);
const response = await downloadIngestionPipelineLogsById(ingestionId);
return response.data;
} catch (err) {
showErrorToast(err as AxiosError);