diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/resources/services/ingestionpipelines/IngestionPipelineResource.java b/openmetadata-service/src/main/java/org/openmetadata/service/resources/services/ingestionpipelines/IngestionPipelineResource.java index 1b293fd37d7..14f11d700d8 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/resources/services/ingestionpipelines/IngestionPipelineResource.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/resources/services/ingestionpipelines/IngestionPipelineResource.java @@ -48,7 +48,9 @@ import jakarta.ws.rs.core.Context; import jakarta.ws.rs.core.MediaType; import jakarta.ws.rs.core.Response; import jakarta.ws.rs.core.SecurityContext; +import jakarta.ws.rs.core.StreamingOutput; import jakarta.ws.rs.core.UriInfo; +import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Map; import java.util.UUID; @@ -920,6 +922,77 @@ public class IngestionPipelineResource return Response.ok(lastIngestionLogs, MediaType.APPLICATION_JSON_TYPE).build(); } + @GET + @Path("/logs/{id}/last/download") + @Produces(MediaType.APPLICATION_OCTET_STREAM) + @Operation( + operationId = "downloadLastIngestionLogs", + summary = "Download all logs from last ingestion pipeline run as a stream", + description = "Stream all logs from last ingestion pipeline run by `Id` for download.", + responses = { + @ApiResponse( + responseCode = "200", + description = "Log content as a downloadable stream", + content = @Content(mediaType = "application/octet-stream")), + @ApiResponse(responseCode = "404", description = "Logs for instance {id} is not found") + }) + public Response downloadLastIngestionLogs( + @Context UriInfo uriInfo, + @Context SecurityContext securityContext, + @Parameter(description = "Id of the ingestion pipeline", schema = @Schema(type = "UUID")) + @PathParam("id") + UUID id) { + try { + if (pipelineServiceClient == null) { + return Response.status(200).entity("Pipeline Client Disabled").build(); + } + IngestionPipeline ingestionPipeline = + getInternal(uriInfo, securityContext, id, FIELDS, Include.NON_DELETED); + + String filename = + String.format( + "ingestion_logs_%s_%d.txt", ingestionPipeline.getName(), System.currentTimeMillis()); + + StreamingOutput streamingOutput = + output -> { + String cursor = null; + boolean hasMoreData = true; + + while (hasMoreData) { + Map logChunk = + pipelineServiceClient.getLastIngestionLogs(ingestionPipeline, cursor); + + if (logChunk == null || logChunk.isEmpty()) { + break; + } + + for (Map.Entry entry : logChunk.entrySet()) { + if (entry.getValue() != null + && !entry.getKey().equals("after") + && !entry.getKey().equals("total")) { + output.write(entry.getValue().getBytes(StandardCharsets.UTF_8)); + output.write("\n".getBytes(StandardCharsets.UTF_8)); + } + } + output.flush(); + + cursor = logChunk.get("after"); + if (cursor == null) { + hasMoreData = false; + } + } + }; + + return Response.ok(streamingOutput) + .header("Content-Disposition", "attachment; filename=\"" + filename + "\"") + .build(); + } catch (Exception e) { + return Response.status(Response.Status.BAD_REQUEST) + .entity("Error downloading logs: " + e.getMessage()) + .build(); + } + } + @PUT @Path("/{fqn}/pipelineStatus") @Operation( diff --git a/openmetadata-ui/src/main/resources/ui/src/pages/LogsViewerPage/LogsViewerPage.tsx b/openmetadata-ui/src/main/resources/ui/src/pages/LogsViewerPage/LogsViewerPage.tsx index c07318f3e23..26b751e6148 100644 --- a/openmetadata-ui/src/main/resources/ui/src/pages/LogsViewerPage/LogsViewerPage.tsx +++ b/openmetadata-ui/src/main/resources/ui/src/pages/LogsViewerPage/LogsViewerPage.tsx @@ -15,7 +15,7 @@ import { DownloadOutlined } from '@ant-design/icons'; import { LazyLog } from '@melloware/react-logviewer'; import { Button, Col, Progress, Row, Space, Tooltip, Typography } from 'antd'; import { AxiosError } from 'axios'; -import { isEmpty, isNil, isUndefined, round, toNumber } from 'lodash'; +import { isEmpty, isNil, isUndefined, toNumber } from 'lodash'; import { Fragment, useCallback, @@ -296,32 +296,31 @@ const LogsViewerPage = () => { const handleIngestionDownloadClick = async () => { try { reset(); - const progress = round( - (Number(paging?.after) * 100) / Number(paging?.total) - ); - - updateProgress(paging?.after ? progress : 1); - let logs = ''; + updateProgress(1); let fileName = `${getEntityName(ingestionDetails)}-${ ingestionDetails?.pipelineType }.log`; - if (isApplicationType) { - logs = await downloadAppLogs(ingestionName); - fileName = `${ingestionName}.log`; - } else { - logs = await downloadIngestionLog( - ingestionDetails?.id, - ingestionDetails?.pipelineType - ); - } - const element = document.createElement('a'); - const file = new Blob([logs || ''], { type: 'text/plain' }); - element.href = URL.createObjectURL(file); - element.download = fileName; - document.body.appendChild(element); - element.click(); - document.body.removeChild(element); + if (isApplicationType) { + const logs = await downloadAppLogs(ingestionName); + fileName = `${ingestionName}.log`; + const element = document.createElement('a'); + const file = new Blob([logs || ''], { type: 'text/plain' }); + element.href = URL.createObjectURL(file); + element.download = fileName; + document.body.appendChild(element); + element.click(); + document.body.removeChild(element); + } else { + const logsBlob = await downloadIngestionLog(ingestionDetails?.id); + + const element = document.createElement('a'); + element.href = URL.createObjectURL(logsBlob as Blob); + element.download = fileName; + document.body.appendChild(element); + element.click(); + document.body.removeChild(element); + } } catch (err) { showErrorToast(err as AxiosError); } finally { diff --git a/openmetadata-ui/src/main/resources/ui/src/rest/ingestionPipelineAPI.ts b/openmetadata-ui/src/main/resources/ui/src/rest/ingestionPipelineAPI.ts index bf5d0b08d64..94f3c9772c0 100644 --- a/openmetadata-ui/src/main/resources/ui/src/rest/ingestionPipelineAPI.ts +++ b/openmetadata-ui/src/main/resources/ui/src/rest/ingestionPipelineAPI.ts @@ -165,3 +165,12 @@ export const getRunHistoryForPipeline = async ( return response.data; }; + +export const downloadIngestionPipelineLogsById = (id: string) => { + return APIClient.get( + `/services/ingestionPipelines/logs/${id}/last/download`, + { + responseType: 'blob', + } + ); +}; diff --git a/openmetadata-ui/src/main/resources/ui/src/utils/IngestionLogs/LogsUtils.test.ts b/openmetadata-ui/src/main/resources/ui/src/utils/IngestionLogs/LogsUtils.test.ts index 15d5cd2c1d2..7428d787f95 100644 --- a/openmetadata-ui/src/main/resources/ui/src/utils/IngestionLogs/LogsUtils.test.ts +++ b/openmetadata-ui/src/main/resources/ui/src/utils/IngestionLogs/LogsUtils.test.ts @@ -13,7 +13,10 @@ import { PipelineType } from '../../generated/entity/services/ingestionPipelines/ingestionPipeline'; import { useDownloadProgressStore } from '../../hooks/useDownloadProgressStore'; import { IngestionPipelineLogByIdInterface } from '../../pages/LogsViewerPage/LogsViewerPage.interfaces'; -import { getIngestionPipelineLogById } from '../../rest/ingestionPipelineAPI'; +import { + downloadIngestionPipelineLogsById, + getIngestionPipelineLogById, +} from '../../rest/ingestionPipelineAPI'; import { showErrorToast } from '../ToastUtils'; import { downloadIngestionLog, @@ -23,7 +26,23 @@ import { const mockUpdateProgress = jest.fn(); -jest.mock('../../rest/ingestionPipelineAPI'); +jest.mock('../../rest/ingestionPipelineAPI', () => ({ + getIngestionPipelineLogById: jest.fn().mockImplementation(() => + Promise.resolve({ + data: { + ingestion_task: 'metadata_logs_1', + total: '100', + after: '50', + }, + }) + ), + downloadIngestionPipelineLogsById: jest.fn().mockImplementation(() => + Promise.resolve({ + data: 'downloaded_logs', + }) + ), +})); + jest.mock('../ToastUtils'); jest.mock('../../hooks/useDownloadProgressStore'); jest.mock('../../hooks/useDownloadProgressStore', () => ({ @@ -144,56 +163,36 @@ describe('LogsUtils', () => { describe('downloadIngestionLog', () => { const ingestionId = '123'; - const pipelineType = PipelineType.Metadata; - let mockFetchLogsRecursively: jest.SpyInstance; - - beforeAll(() => { - mockFetchLogsRecursively = jest.spyOn(utils, 'fetchLogsRecursively'); - }); - - afterAll(() => { - jest.clearAllMocks(); - }); it('should return the downloaded logs', async () => { - const logs = 'metadata_logs'; + const result = await downloadIngestionLog(ingestionId); - mockFetchLogsRecursively.mockResolvedValueOnce(logs); - - const result = await downloadIngestionLog(ingestionId, pipelineType); - - expect(mockFetchLogsRecursively).toHaveBeenCalledWith( - ingestionId, - pipelineType + expect(downloadIngestionPipelineLogsById).toHaveBeenCalledWith( + ingestionId ); - expect(result).toBe(logs); + expect(result).toBe('downloaded_logs'); }); it('should show error toast and return empty string if an error occurs', async () => { const error = new Error('Failed to fetch logs'); + (downloadIngestionPipelineLogsById as jest.Mock).mockRejectedValueOnce( + error + ); - mockFetchLogsRecursively.mockRejectedValueOnce(error); + const result = await downloadIngestionLog(ingestionId); - const result = await downloadIngestionLog(ingestionId, pipelineType); - - expect(mockFetchLogsRecursively).toHaveBeenCalledWith( - ingestionId, - pipelineType + expect(downloadIngestionPipelineLogsById).toHaveBeenCalledWith( + ingestionId ); expect(showErrorToast).toHaveBeenCalledWith(error); expect(result).toBe(''); }); it('should return empty string if ingestionId or pipelineType is not provided', async () => { - const result = await downloadIngestionLog(undefined, pipelineType); + const result = await downloadIngestionLog(undefined); - expect(mockFetchLogsRecursively).not.toHaveBeenCalled(); + expect(downloadIngestionPipelineLogsById).not.toHaveBeenCalled(); expect(result).toBe(''); - - const result2 = await downloadIngestionLog(ingestionId, undefined); - - expect(mockFetchLogsRecursively).not.toHaveBeenCalled(); - expect(result2).toBe(''); }); }); }); diff --git a/openmetadata-ui/src/main/resources/ui/src/utils/IngestionLogs/LogsUtils.ts b/openmetadata-ui/src/main/resources/ui/src/utils/IngestionLogs/LogsUtils.ts index d0f794dad2f..5e22a9e955e 100644 --- a/openmetadata-ui/src/main/resources/ui/src/utils/IngestionLogs/LogsUtils.ts +++ b/openmetadata-ui/src/main/resources/ui/src/utils/IngestionLogs/LogsUtils.ts @@ -16,7 +16,10 @@ import { PipelineType } from '../../generated/entity/services/ingestionPipelines import { useDownloadProgressStore } from '../../hooks/useDownloadProgressStore'; import { IngestionPipelineLogByIdInterface } from '../../pages/LogsViewerPage/LogsViewerPage.interfaces'; import { getApplicationLogs } from '../../rest/applicationAPI'; -import { getIngestionPipelineLogById } from '../../rest/ingestionPipelineAPI'; +import { + downloadIngestionPipelineLogsById, + getIngestionPipelineLogById, +} from '../../rest/ingestionPipelineAPI'; import { showErrorToast } from '../ToastUtils'; export const getLogsFromResponse = ( @@ -82,16 +85,15 @@ export const fetchLogsRecursively = async ( return logs; }; -export const downloadIngestionLog = async ( - ingestionId?: string, - pipelineType?: PipelineType -) => { - if (!ingestionId || !pipelineType) { +export const downloadIngestionLog = async (ingestionId?: string) => { + if (!ingestionId) { return ''; } try { - return await fetchLogsRecursively(ingestionId, pipelineType); + const response = await downloadIngestionPipelineLogsById(ingestionId); + + return response.data; } catch (err) { showErrorToast(err as AxiosError);