Separate Status and Logs Endpoints (#13709)

* Separate Status and Logs Endpoints

* fix: change endpoints from UI

* Handle Quartz to Unix Conversion in Backend

---------

Co-authored-by: karanh37 <karanh37@gmail.com>
This commit is contained in:
Mohit Yadav 2023-10-26 02:00:27 +05:30 committed by GitHub
parent c6297b9cdf
commit 528ea335f7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 198 additions and 84 deletions

View File

@ -458,6 +458,11 @@
<artifactId>woodstox-core</artifactId>
<version>${woodstox.version}</version>
</dependency>
<dependency>
<groupId>com.cronutils</groupId>
<artifactId>cron-utils</artifactId>
<version>9.2.0</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>

View File

@ -1,11 +1,16 @@
package org.openmetadata.service.apps;
import static com.cronutils.model.CronType.QUARTZ;
import static org.openmetadata.service.apps.scheduler.AppScheduler.APP_INFO_KEY;
import static org.openmetadata.service.apps.scheduler.AppScheduler.COLLECTION_DAO_KEY;
import static org.openmetadata.service.apps.scheduler.AppScheduler.SEARCH_CLIENT_KEY;
import static org.openmetadata.service.exception.CatalogExceptionMessage.INVALID_APP_TYPE;
import static org.openmetadata.service.exception.CatalogExceptionMessage.LIVE_APP_SCHEDULE_ERR;
import com.cronutils.mapper.CronMapper;
import com.cronutils.model.Cron;
import com.cronutils.model.definition.CronDefinitionBuilder;
import com.cronutils.parser.CronParser;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.openmetadata.schema.AppRuntime;
@ -23,10 +28,12 @@ import org.openmetadata.schema.type.ProviderType;
import org.openmetadata.schema.type.Relationship;
import org.openmetadata.service.Entity;
import org.openmetadata.service.apps.scheduler.AppScheduler;
import org.openmetadata.service.exception.EntityNotFoundException;
import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.jdbi3.EntityRepository;
import org.openmetadata.service.jdbi3.IngestionPipelineRepository;
import org.openmetadata.service.search.SearchRepository;
import org.openmetadata.service.util.FullyQualifiedName;
import org.openmetadata.service.util.JsonUtils;
import org.openmetadata.service.util.OpenMetadataConnectionBuilder;
import org.quartz.JobExecutionContext;
@ -37,6 +44,8 @@ public class AbstractNativeApplication implements NativeApplication {
protected CollectionDAO collectionDAO;
private App app;
protected SearchRepository searchRepository;
private final CronMapper cronMapper = CronMapper.fromQuartzToUnix();
private final CronParser cronParser = new CronParser(CronDefinitionBuilder.instanceDefinitionFor(QUARTZ));
@Override
public void init(App app, CollectionDAO dao, SearchRepository searchRepository) {
@ -74,21 +83,36 @@ public class AbstractNativeApplication implements NativeApplication {
@Override
public void initializeExternalApp() {
if (app.getAppType() == AppType.External && app.getScheduleType().equals(ScheduleType.Scheduled)) {
// Init Application Code for Some Initialization
List<CollectionDAO.EntityRelationshipRecord> records =
collectionDAO
.relationshipDAO()
.findTo(app.getId(), Entity.APPLICATION, Relationship.CONTAINS.ordinal(), Entity.INGESTION_PIPELINE);
if (!records.isEmpty()) {
return;
}
IngestionPipelineRepository ingestionPipelineRepository =
(IngestionPipelineRepository) Entity.getEntityRepository(Entity.INGESTION_PIPELINE);
ExternalAppIngestionConfig ingestionConfig =
JsonUtils.convertValue(app.getAppConfiguration(), ExternalAppIngestionConfig.class);
try {
ExternalAppIngestionConfig ingestionConfig =
JsonUtils.convertValue(app.getAppConfiguration(), ExternalAppIngestionConfig.class);
// Check if the Pipeline Already Exists
String fqn = FullyQualifiedName.add(ingestionConfig.getService().getName(), ingestionConfig.getName());
IngestionPipeline storedPipeline =
ingestionPipelineRepository.getByName(null, fqn, ingestionPipelineRepository.getFields("id"));
IngestionPipelineRepository ingestionPipelineRepository =
(IngestionPipelineRepository) Entity.getEntityRepository(Entity.INGESTION_PIPELINE);
// Init Application Code for Some Initialization
List<CollectionDAO.EntityRelationshipRecord> records =
collectionDAO
.relationshipDAO()
.findTo(app.getId(), Entity.APPLICATION, Relationship.HAS.ordinal(), Entity.INGESTION_PIPELINE);
if (records.isEmpty()) {
// Add Ingestion Pipeline to Application
collectionDAO
.relationshipDAO()
.insert(
app.getId(),
storedPipeline.getId(),
Entity.APPLICATION,
Entity.INGESTION_PIPELINE,
Relationship.HAS.ordinal());
}
} catch (EntityNotFoundException ex) {
// Pipeline needs to be created
EntityRepository<?> serviceRepository =
Entity.getServiceEntityRepository(ServiceType.fromValue(ingestionConfig.getService().getType()));
EntityReference service =
@ -96,6 +120,8 @@ public class AbstractNativeApplication implements NativeApplication {
.getByName(null, ingestionConfig.getService().getName(), serviceRepository.getFields("id"))
.getEntityReference();
Cron quartzCron = cronParser.parse(app.getAppSchedule().getCronExpression());
CreateIngestionPipeline createPipelineRequest =
new CreateIngestionPipeline()
.withName(ingestionConfig.getName())
@ -103,7 +129,8 @@ public class AbstractNativeApplication implements NativeApplication {
.withDescription(ingestionConfig.getDescription())
.withPipelineType(ingestionConfig.getPipelineType())
.withSourceConfig(ingestionConfig.getSourceConfig())
.withAirflowConfig(ingestionConfig.getAirflowConfig())
.withAirflowConfig(
ingestionConfig.getAirflowConfig().withScheduleInterval(cronMapper.map(quartzCron).asString()))
.withService(service);
// Get Pipeline
@ -122,16 +149,10 @@ public class AbstractNativeApplication implements NativeApplication {
Entity.APPLICATION,
Entity.INGESTION_PIPELINE,
Relationship.HAS.ordinal());
} catch (Exception ex) {
LOG.error("[IngestionPipelineResource] Failed in Creating Reindex and Insight Pipeline", ex);
LOG.error("Failed to initialize DataInsightApp", ex);
throw new RuntimeException(ex);
}
return;
} else {
throw new IllegalArgumentException(INVALID_APP_TYPE);
}
throw new IllegalArgumentException(INVALID_APP_TYPE);
}
protected void validateServerExecutableApp(AppRuntime context) {

View File

@ -53,6 +53,7 @@ public class AppRepository extends EntityRepository<App> {
return entity.withBot(getBotUser(entity));
}
@Override
protected List<EntityReference> getIngestionPipelines(App service) {
List<CollectionDAO.EntityRelationshipRecord> pipelines =
findToRecords(service.getId(), entityType, Relationship.HAS, Entity.INGESTION_PIPELINE);

View File

@ -15,9 +15,7 @@ import io.swagger.v3.oas.annotations.media.Schema;
import io.swagger.v3.oas.annotations.parameters.RequestBody;
import io.swagger.v3.oas.annotations.responses.ApiResponse;
import io.swagger.v3.oas.annotations.tags.Tag;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import javax.json.JsonPatch;
import javax.validation.Valid;
@ -52,7 +50,6 @@ import org.openmetadata.schema.entity.app.CreateApp;
import org.openmetadata.schema.entity.app.ScheduleType;
import org.openmetadata.schema.entity.services.ingestionPipelines.IngestionPipeline;
import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineServiceClientResponse;
import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineStatus;
import org.openmetadata.schema.services.connections.metadata.OpenMetadataConnection;
import org.openmetadata.schema.type.EntityHistory;
import org.openmetadata.schema.type.EntityReference;
@ -205,7 +202,7 @@ public class AppResource extends EntityResource<App, AppRepository> {
}
@GET
@Path("/name/{name}/runs")
@Path("/name/{name}/status")
@Operation(
operationId = "listAppRunRecords",
summary = "List App Run Records",
@ -219,7 +216,7 @@ public class AppResource extends EntityResource<App, AppRepository> {
description = "List of Installed Applications Runs",
content = @Content(mediaType = "application/json", schema = @Schema(implementation = AppRunList.class)))
})
public ResultList<AppRunRecord> listAppRuns(
public Response listAppRuns(
@Context UriInfo uriInfo,
@Context SecurityContext securityContext,
@Parameter(description = "Name of the App", schema = @Schema(type = "string")) @PathParam("name") String name,
@ -234,24 +231,54 @@ public class AppResource extends EntityResource<App, AppRepository> {
@QueryParam("offset")
@Min(0)
@Max(1000000)
int offset) {
App installation = repository.getByName(uriInfo, name, repository.getFields("id"));
return repository.listAppRuns(installation.getId(), limitParam, offset);
int offset,
@Parameter(
description = "Filter pipeline status after the given start timestamp",
schema = @Schema(type = "number"))
@QueryParam("startTs")
Long startTs,
@Parameter(
description = "Filter pipeline status before the given end timestamp",
schema = @Schema(type = "number"))
@QueryParam("endTs")
Long endTs) {
App installation = repository.getByName(uriInfo, name, repository.getFields("id,pipelines"));
if (installation.getAppType().equals(AppType.Internal)) {
return Response.status(Response.Status.OK)
.entity(repository.listAppRuns(installation.getId(), limitParam, offset))
.build();
} else {
if (!installation.getPipelines().isEmpty()) {
EntityReference pipelineRef = installation.getPipelines().get(0);
IngestionPipelineRepository ingestionPipelineRepository =
(IngestionPipelineRepository) Entity.getEntityRepository(Entity.INGESTION_PIPELINE);
IngestionPipeline ingestionPipeline =
ingestionPipelineRepository.get(
uriInfo, pipelineRef.getId(), ingestionPipelineRepository.getFields(FIELD_OWNER));
return Response.ok(
ingestionPipelineRepository.listPipelineStatus(
ingestionPipeline.getFullyQualifiedName(), startTs, endTs),
MediaType.APPLICATION_JSON_TYPE)
.build();
} else {
throw new RuntimeException("App does not have an associated pipeline.");
}
}
}
@GET
@Path("/name/{name}/runs/latest")
@Path("/name/{name}/logs")
@Operation(
operationId = "latestAppRunRecord",
summary = "Get Latest App Run Record",
description = "Get a latest applications Run Record.",
summary = "Retrieve all logs from last ingestion pipeline run for the application",
description = "Get all logs from last ingestion pipeline run by `Id`.",
responses = {
@ApiResponse(
responseCode = "200",
description = "List of Installed Applications Runs",
content = @Content(mediaType = "application/json", schema = @Schema(implementation = AppRunRecord.class)))
description = "JSON object with the task instance name of the ingestion on each key and log in the value",
content = @Content(mediaType = "application/json")),
@ApiResponse(responseCode = "404", description = "Logs for instance {id} is not found")
})
public Response listLatestAppRun(
public Response getLastLogs(
@Context UriInfo uriInfo,
@Context SecurityContext securityContext,
@Parameter(description = "Name of the App", schema = @Schema(type = "string")) @PathParam("name") String name,
@ -269,12 +296,9 @@ public class AppResource extends EntityResource<App, AppRepository> {
IngestionPipeline ingestionPipeline =
ingestionPipelineRepository.get(
uriInfo, pipelineRef.getId(), ingestionPipelineRepository.getFields(FIELD_OWNER));
PipelineStatus latestPipelineStatus = ingestionPipelineRepository.getLatestPipelineStatus(ingestionPipeline);
Map<String, String> lastIngestionLogs = pipelineServiceClient.getLastIngestionLogs(ingestionPipeline, after);
Map<String, Object> appRun = new HashMap<>();
appRun.put("pipelineStatus", latestPipelineStatus);
appRun.put("lastIngestionLogs", lastIngestionLogs);
return Response.ok(appRun, MediaType.APPLICATION_JSON_TYPE).build();
return Response.ok(
pipelineServiceClient.getLastIngestionLogs(ingestionPipeline, after), MediaType.APPLICATION_JSON_TYPE)
.build();
}
}
throw new BadRequestException("Failed to Get Logs for the Installation.");

View File

@ -30,6 +30,6 @@
},
"appSchedule": {
"scheduleType": "Custom",
"cronExpression": "0 0 * * *"
"cronExpression": "0 0 0 * * ?"
}
}

View File

@ -318,7 +318,7 @@ const AppDetails = () => {
key: ApplicationTabs.HISTORY,
children: (
<div className="p-y-md">
<AppRunsHistory />
<AppRunsHistory appData={appData} />
</div>
),
},

View File

@ -14,9 +14,7 @@
import { PipelineStatus } from '../../../generated/entity/services/ingestionPipelines/ingestionPipeline';
export interface DataInsightLatestRun {
lastIngestionLogs: {
data_insight_task: string;
total: string;
};
data_insight_task: string;
total: string;
pipelineStatus: PipelineStatus;
}

View File

@ -28,14 +28,21 @@ import { NO_DATA_PLACEHOLDER } from '../../../constants/constants';
import { GlobalSettingOptions } from '../../../constants/GlobalSettings.constants';
import { AppType } from '../../../generated/entity/applications/app';
import { Status } from '../../../generated/entity/applications/appRunRecord';
import {
PipelineState,
PipelineStatus,
} from '../../../generated/entity/services/ingestionPipelines/ingestionPipeline';
import { Paging } from '../../../generated/type/paging';
import { usePaging } from '../../../hooks/paging/usePaging';
import { getApplicationRuns } from '../../../rest/applicationAPI';
import {
getApplicationRuns,
getLatestApplicationRuns,
} from '../../../rest/applicationAPI';
import { getStatusTypeForApplication } from '../../../utils/ApplicationUtils';
import { formatDateTime } from '../../../utils/date-time/DateTimeUtils';
getStatusFromPipelineState,
getStatusTypeForApplication,
} from '../../../utils/ApplicationUtils';
import {
formatDateTime,
getEpochMillisForPastDays,
} from '../../../utils/date-time/DateTimeUtils';
import { getLogsViewerPath } from '../../../utils/RouterUtils';
import { showErrorToast } from '../../../utils/ToastUtils';
import ErrorPlaceHolder from '../../common/error-with-placeholder/ErrorPlaceHolder';
@ -109,6 +116,39 @@ const AppRunsHistory = forwardRef(
return record.status === Status.Running;
}, []);
const getActionButton = useCallback(
(record: AppRunRecordWithId, index: number) => {
if (appData?.appType === AppType.Internal) {
return (
<Button
className="p-0"
data-testid="logs"
disabled={showLogAction(record)}
size="small"
type="link"
onClick={() => handleRowExpandable(record.id)}>
{t('label.log-plural')}
</Button>
);
} else if (isExternalApp && index === 0) {
return (
<Button
className="p-0"
data-testid="logs"
disabled={showLogAction(record)}
size="small"
type="link"
onClick={() => handleRowExpandable(record.id)}>
{t('label.log-plural')}
</Button>
);
} else {
return NO_DATA_PLACEHOLDER;
}
},
[showLogAction, appData, isExternalApp]
);
const tableColumn: ColumnsType<AppRunRecordWithId> = useMemo(
() => [
{
@ -147,24 +187,16 @@ const AppRunsHistory = forwardRef(
title: t('label.action-plural'),
dataIndex: 'actions',
key: 'actions',
render: (_, record) => (
<Button
className="p-0"
data-testid="logs"
disabled={showLogAction(record)}
size="small"
type="link"
onClick={() => handleRowExpandable(record.id)}>
{t('label.log-plural')}
</Button>
),
render: (_, record, index) => getActionButton(record, index),
},
],
[
appData,
formatDateTime,
handleRowExpandable,
getStatusTypeForApplication,
showLogAction,
getActionButton,
]
);
@ -174,17 +206,23 @@ const AppRunsHistory = forwardRef(
setIsLoading(true);
if (isExternalApp) {
const res = await getLatestApplicationRuns(fqn);
const currentTime = Date.now();
const oneDayAgo = getEpochMillisForPastDays(1);
setAppRunsHistoryData([
{
...res,
timestamp: res.pipelineStatus.timestamp,
status: (res.pipelineStatus.pipelineState ??
Status.Failed) as Status,
id: `${res.pipelineStatus.runId}-${res.pipelineStatus.timestamp}`,
},
]);
const { data } = await getApplicationRuns(fqn, {
startTs: oneDayAgo,
endTs: currentTime,
});
setAppRunsHistoryData(
data.map((item) => ({
...item,
status: getStatusFromPipelineState(
(item as PipelineStatus).pipelineState ?? PipelineState.Failed
),
id: (item as PipelineStatus).runId ?? '',
}))
);
} else {
const { data, paging } = await getApplicationRuns(fqn, {
offset: pagingOffset?.offset ?? 0,

View File

@ -25,7 +25,6 @@ import React, {
import { useTranslation } from 'react-i18next';
import { LazyLog } from 'react-lazylog';
import { useParams } from 'react-router-dom';
import { DataInsightLatestRun } from '../../components/Applications/AppDetails/AppDetails.interface';
import { CopyToClipboardButton } from '../../components/buttons/CopyToClipboardButton/CopyToClipboardButton';
import TitleBreadcrumb from '../../components/common/title-breadcrumb/title-breadcrumb.component';
import PageLayoutV1 from '../../components/containers/PageLayoutV1';
@ -38,16 +37,19 @@ import { App, AppScheduleClass } from '../../generated/entity/applications/app';
import {
IngestionPipeline,
PipelineState,
PipelineStatus,
} from '../../generated/entity/services/ingestionPipelines/ingestionPipeline';
import { Paging } from '../../generated/type/paging';
import {
getApplicationByName,
getApplicationRuns,
getLatestApplicationRuns,
} from '../../rest/applicationAPI';
import {
getIngestionPipelineByName,
getIngestionPipelineLogById,
} from '../../rest/ingestionPipelineAPI';
import { getEpochMillisForPastDays } from '../../utils/date-time/DateTimeUtils';
import { getLogBreadCrumbs } from '../../utils/LogsViewer.utils';
import { getDecodedFqn } from '../../utils/StringsUtils';
import { showErrorToast } from '../../utils/ToastUtils';
@ -64,7 +66,7 @@ const LogsViewer = () => {
const [logs, setLogs] = useState<string>('');
const [ingestionDetails, setIngestionDetails] = useState<IngestionPipeline>();
const [appData, setAppData] = useState<App>();
const [appLatestRun, setAppLatestRun] = useState<DataInsightLatestRun>();
const [appLatestRun, setAppLatestRun] = useState<PipelineStatus>();
const [paging, setPaging] = useState<Paging>();
const isApplicationType = useMemo(
@ -78,9 +80,16 @@ const LogsViewer = () => {
) => {
try {
if (isApplicationType) {
const res = await getLatestApplicationRuns(ingestionName);
setAppLatestRun(res);
setLogs(res.lastIngestionLogs.data_insight_task);
const currentTime = Date.now();
const oneDayAgo = getEpochMillisForPastDays(1);
const { data } = await getApplicationRuns(ingestionName, {
startTs: oneDayAgo,
endTs: currentTime,
});
const logs = await getLatestApplicationRuns(ingestionName);
setAppLatestRun(data[0]);
setLogs(logs.data_insight_task);
return;
}
@ -260,12 +269,11 @@ const LogsViewer = () => {
className="ingestion-run-badge latest"
color={
PIPELINE_INGESTION_RUN_STATUS[
appLatestRun?.pipelineStatus?.pipelineState ??
PipelineState.Failed
appLatestRun?.pipelineState ?? PipelineState.Failed
]
}
data-testid="pipeline-status">
{startCase(appLatestRun?.pipelineStatus?.pipelineState)}
{startCase(appLatestRun?.pipelineState)}
</Tag>
);
}

View File

@ -25,6 +25,8 @@ const BASE_URL = '/apps';
type AppListParams = ListParams & {
offset?: number;
startTs?: number;
endTs?: number;
};
export const getApplicationList = async (params?: ListParams) => {
@ -60,7 +62,7 @@ export const getApplicationRuns = async (
params?: AppListParams
) => {
const response = await APIClient.get<PagingResponse<AppRunRecord[]>>(
`${BASE_URL}/name/${appName}/runs`,
`${BASE_URL}/name/${appName}/status`,
{
params,
}
@ -71,7 +73,7 @@ export const getApplicationRuns = async (
export const getLatestApplicationRuns = async (appName: string) => {
const response = await APIClient.get<DataInsightLatestRun>(
`${BASE_URL}/name/${appName}/runs/latest`
`${BASE_URL}/name/${appName}/logs`
);
return response.data;

View File

@ -12,6 +12,7 @@
*/
import { StatusType } from '../components/common/StatusBadge/StatusBadge.interface';
import { Status } from '../generated/entity/applications/appRunRecord';
import { PipelineState } from '../generated/entity/services/ingestionPipelines/ingestionPipeline';
export const getStatusTypeForApplication = (status: Status) => {
if (status === Status.Failed) {
@ -24,3 +25,19 @@ export const getStatusTypeForApplication = (status: Status) => {
return StatusType.Failure;
};
export const getStatusFromPipelineState = (status: PipelineState) => {
if (status === PipelineState.Failed) {
return Status.Failed;
} else if (status === PipelineState.Success) {
return Status.Success;
} else if (
status === PipelineState.Running ||
status === PipelineState.PartialSuccess ||
status === PipelineState.Queued
) {
return Status.Running;
}
return Status.Failed;
};