Fix #13023 - Fix ingestionPipeline list parameters (#13289)

* Fix ingestionPipeline list parameters

* Fix test

* support filters from ui

---------

Co-authored-by: Chirag Madlani <12962843+chirag-madlani@users.noreply.github.com>
This commit is contained in:
Pere Miquel Brull 2023-09-26 10:57:14 +02:00 committed by GitHub
parent 6f92c3be1f
commit a4b9a2d2cb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 184 additions and 51 deletions

View File

@ -1595,22 +1595,27 @@ public interface CollectionDAO {
@Override
default int listCount(ListFilter filter) {
String serviceType = filter.getQueryParam("serviceType");
String service = filter.getQueryParam("service");
String condition = "INNER JOIN entity_relationship ON ingestion_pipeline_entity.id = entity_relationship.toId";
String pipelineTypeCondition;
if (filter.getQueryParam("pipelineType") != null) {
String pipelineTypeCondition = String.format(" and %s", filter.getPipelineTypeCondition(null));
condition += pipelineTypeCondition;
}
if (filter.getQueryParam("service") != null) {
String serviceCondition = String.format(" and %s", filter.getServiceCondition(null));
condition += serviceCondition;
}
Map<String, Object> bindMap = new HashMap<>();
String serviceType = filter.getQueryParam("serviceType");
if (!CommonUtil.nullOrEmpty(serviceType)) {
if (filter.getQueryParam("pipelineType") != null) {
pipelineTypeCondition = String.format(" and %s", filter.getPipelineTypeCondition(null));
condition += pipelineTypeCondition;
}
condition =
String.format(
"%s WHERE entity_relationship.fromEntity = :serviceType and entity_relationship.relation = :relation and ingestion_pipeline_entity.fullyQualifiedName LIKE :service",
"%s WHERE entity_relationship.fromEntity = :serviceType and entity_relationship.relation = :relation",
condition);
bindMap.put("relation", CONTAINS.ordinal());
bindMap.put("service", service + ".%");
bindMap.put("serviceType", serviceType);
return listIngestionPipelineCount(condition, bindMap);
}
@ -1619,26 +1624,28 @@ public interface CollectionDAO {
@Override
default List<String> listAfter(ListFilter filter, int limit, String after) {
String serviceType = filter.getQueryParam("serviceType");
String service = filter.getQueryParam("service");
String condition = "INNER JOIN entity_relationship ON ingestion_pipeline_entity.id = entity_relationship.toId";
String pipelineTypeCondition;
if (filter.getQueryParam("pipelineType") != null) {
String pipelineTypeCondition = String.format(" and %s", filter.getPipelineTypeCondition(null));
condition += pipelineTypeCondition;
}
if (filter.getQueryParam("service") != null) {
String serviceCondition = String.format(" and %s", filter.getServiceCondition(null));
condition += serviceCondition;
}
Map<String, Object> bindMap = new HashMap<>();
String serviceType = filter.getQueryParam("serviceType");
if (!CommonUtil.nullOrEmpty(serviceType)) {
if (filter.getQueryParam("pipelineType") != null) {
pipelineTypeCondition = filter.getPipelineTypeCondition(null);
condition =
String.format(
"%s WHERE entity_relationship.fromEntity = :serviceType and entity_relationship.relation = :relation and ingestion_pipeline_entity.fullyQualifiedName LIKE :service and ingestion_pipeline_entity.fullyQualifiedName > :after and %s order by ingestion_pipeline_entity.fullyQualifiedName ASC LIMIT :limit",
condition, pipelineTypeCondition);
} else {
condition =
String.format(
"%s WHERE entity_relationship.fromEntity = :serviceType and entity_relationship.relation = :relation and ingestion_pipeline_entity.fullyQualifiedName LIKE :service and ingestion_pipeline_entity.fullyQualifiedName > :after order by ingestion_pipeline_entity.fullyQualifiedName ASC LIMIT :limit",
condition);
}
condition =
String.format(
"%s WHERE entity_relationship.fromEntity = :serviceType and entity_relationship.relation = :relation and ingestion_pipeline_entity.name > :after order by ingestion_pipeline_entity.name ASC LIMIT :limit",
condition);
bindMap.put("serviceType", serviceType);
bindMap.put("service", service + ".%");
bindMap.put("relation", CONTAINS.ordinal());
bindMap.put("after", after);
bindMap.put("limit", limit);
@ -1649,26 +1656,27 @@ public interface CollectionDAO {
@Override
default List<String> listBefore(ListFilter filter, int limit, String before) {
String service = filter.getQueryParam("service");
String serviceType = filter.getQueryParam("serviceType");
String condition = "INNER JOIN entity_relationship ON ingestion_pipeline_entity.id = entity_relationship.toId";
String pipelineTypeCondition;
if (filter.getQueryParam("pipelineType") != null) {
String pipelineTypeCondition = String.format(" and %s", filter.getPipelineTypeCondition(null));
condition += pipelineTypeCondition;
}
if (filter.getQueryParam("service") != null) {
String serviceCondition = String.format(" and %s", filter.getServiceCondition(null));
condition += serviceCondition;
}
Map<String, Object> bindMap = new HashMap<>();
String serviceType = filter.getQueryParam("serviceType");
if (!CommonUtil.nullOrEmpty(serviceType)) {
if (filter.getQueryParam("pipelineType") != null) {
pipelineTypeCondition = filter.getPipelineTypeCondition(null);
condition =
String.format(
"%s WHERE entity_relationship.fromEntity = :serviceType and entity_relationship.relation = :relation and ingestion_pipeline_entity.fullyQualifiedName LIKE :service and ingestion_pipeline_entity.fullyQualifiedName < :before and %s order by ingestion_pipeline_entity.fullyQualifiedName DESC LIMIT :limit",
condition, pipelineTypeCondition);
} else {
condition =
String.format(
"%s WHERE entity_relationship.fromEntity = :serviceType and entity_relationship.relation = :relation and ingestion_pipeline_entity.fullyQualifiedName LIKE :service and ingestion_pipeline_entity.fullyQualifiedName < :before order by ingestion_pipeline_entity.fullyQualifiedName DESC LIMIT :limit",
condition);
}
condition =
String.format(
"%s WHERE entity_relationship.fromEntity = :serviceType and entity_relationship.relation = :relation and ingestion_pipeline_entity.name < :before order by ingestion_pipeline_entity.name DESC LIMIT :limit",
condition);
bindMap.put("serviceType", serviceType);
bindMap.put("service", service + ".%");
bindMap.put("relation", CONTAINS.ordinal());
bindMap.put("before", before);
bindMap.put("limit", limit);
@ -1682,7 +1690,7 @@ public interface CollectionDAO {
@Define("cond") String cond, @BindMap Map<String, Object> bindings);
@SqlQuery(
"SELECT json FROM (SELECT ingestion_pipeline_entity.fullyQualifiedName, ingestion_pipeline_entity.json FROM ingestion_pipeline_entity <cond>) last_rows_subquery ORDER BY fullyQualifiedName")
"SELECT json FROM (SELECT ingestion_pipeline_entity.name, ingestion_pipeline_entity.json FROM ingestion_pipeline_entity <cond>) last_rows_subquery ORDER BY fullyQualifiedName")
List<String> listBeforeIngestionPipelineByserviceType(
@Define("cond") String cond, @BindMap Map<String, Object> bindings);

View File

@ -81,6 +81,7 @@ import org.openmetadata.service.secrets.masker.PasswordEntityMasker;
import org.openmetadata.service.security.SecurityUtil;
import org.openmetadata.service.util.FullyQualifiedName;
import org.openmetadata.service.util.JsonUtils;
import org.openmetadata.service.util.ResultList;
import org.openmetadata.service.util.TestUtils;
@Slf4j
@ -175,6 +176,39 @@ public class IngestionPipelineResourceTest extends EntityResourceTest<IngestionP
assertCommonFieldChange(fieldName, expected, actual);
}
@Test
void get_listPipelinesFiltered(TestInfo test) throws IOException {
CreateIngestionPipeline createMessaging =
new CreateIngestionPipeline()
.withName(getEntityName(test))
.withPipelineType(PipelineType.METADATA)
.withSourceConfig(MESSAGING_METADATA_CONFIG)
.withService(REDPANDA_REFERENCE)
.withAirflowConfig(new AirflowConfig().withStartDate(START_DATE).withScheduleInterval("5 * * * *"));
createAndCheckEntity(createMessaging, ADMIN_AUTH_HEADERS);
CreateIngestionPipeline createDatabase = createRequest(test);
createAndCheckEntity(createDatabase, ADMIN_AUTH_HEADERS);
// If we filter by service type, we get just one
Map<String, String> paramsMessaging = new HashMap<>();
paramsMessaging.put("serviceType", "messagingService");
ResultList<IngestionPipeline> resList = listEntities(paramsMessaging, ADMIN_AUTH_HEADERS);
assertEquals(1, resList.getData().size());
Map<String, String> paramsType = new HashMap<>();
paramsType.put("pipelineType", "metadata");
ResultList<IngestionPipeline> resListMeta = listEntities(paramsType, ADMIN_AUTH_HEADERS);
// We get at least the 2 pipelines created here
assertTrue(resListMeta.getData().size() >= 2);
Map<String, String> paramsMessagingService = new HashMap<>();
paramsMessagingService.put("service", REDPANDA_REFERENCE.getFullyQualifiedName());
ResultList<IngestionPipeline> redpandaIngestionList = listEntities(paramsMessagingService, ADMIN_AUTH_HEADERS);
assertEquals(1, redpandaIngestionList.getData().size());
}
@Test
void post_validIngestionPipeline_as_admin_200_OK(TestInfo test) throws IOException {
// Create team with different optional fields

View File

@ -0,0 +1,72 @@
/*
* Copyright 2023 Collate.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { render, screen } from '@testing-library/react';
import { ServiceCategory } from 'enums/service.enum';
import { useAirflowStatus } from 'hooks/useAirflowStatus';
import React from 'react';
import { IngestionPipelineList } from './IngestionPipelineList.component';
const mockGetIngestinoPipelines = jest.fn();
const mockBulkDeployPipelines = jest.fn();
jest.mock(
'components/common/error-with-placeholder/ErrorPlaceHolderIngestion',
() => {
return jest.fn().mockImplementation(() => <p>Airflow not available</p>);
}
);
jest.mock('hooks/useAirflowStatus', () => ({
...jest.requireActual('hooks/useAirflowStatus'),
useAirflowStatus: jest.fn().mockImplementation(() => ({
isAirflowAvailable: false,
isFetchingStatus: true,
})),
}));
jest.mock('components/Loader/Loader', () => {
return jest.fn().mockReturnValue(<div data-testid="loader">Loader</div>);
});
jest.mock('rest/ingestionPipelineAPI', () => ({
deployIngestionPipelineById: mockBulkDeployPipelines,
getIngestionPipelines: mockGetIngestinoPipelines,
}));
describe('IngestionPipelineList component', () => {
it('component should show loader until get status of airflow', () => {
(useAirflowStatus as jest.Mock).mockImplementationOnce(() => ({
isAirflowAvailable: false,
isFetchingStatus: true,
}));
render(
<IngestionPipelineList serviceName={ServiceCategory.DASHBOARD_SERVICES} />
);
expect(screen.getByText('Loader')).toBeInTheDocument();
});
it('component should show error placeholder for airflow not available', () => {
(useAirflowStatus as jest.Mock).mockImplementationOnce(() => ({
isAirflowAvailable: false,
isFetchingStatus: false,
}));
render(
<IngestionPipelineList serviceName={ServiceCategory.DASHBOARD_SERVICES} />
);
expect(screen.getByText('Airflow not available')).toBeInTheDocument();
});
});

View File

@ -10,20 +10,24 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { Button, Col, Row, Typography } from 'antd';
import Tooltip from 'antd/es/tooltip';
import { Button, Col, Row, Tooltip, Typography } from 'antd';
import { ColumnsType, TableProps } from 'antd/lib/table';
import { AxiosError } from 'axios';
import ErrorPlaceHolder from 'components/common/error-with-placeholder/ErrorPlaceHolder';
import ErrorPlaceHolderIngestion from 'components/common/error-with-placeholder/ErrorPlaceHolderIngestion';
import NextPrevious from 'components/common/next-previous/NextPrevious';
import { PagingHandlerParams } from 'components/common/next-previous/NextPrevious.interface';
import Table from 'components/common/Table/Table';
import Loader from 'components/Loader/Loader';
import { ColumnFilter } from 'components/Table/ColumnFilter/ColumnFilter.component';
import cronstrue from 'cronstrue';
import { ServiceCategory } from 'enums/service.enum';
import {
IngestionPipeline,
PipelineType,
} from 'generated/entity/services/ingestionPipelines/ingestionPipeline';
import { usePaging } from 'hooks/paging/usePaging';
import { useAirflowStatus } from 'hooks/useAirflowStatus';
import { isNil, map, startCase } from 'lodash';
import React, { useEffect, useMemo, useState } from 'react';
import { useTranslation } from 'react-i18next';
@ -33,6 +37,7 @@ import {
} from 'rest/ingestionPipelineAPI';
import { showPagination } from 'utils/CommonUtils';
import { getEntityName } from 'utils/EntityUtils';
import { getEntityTypeFromServiceCategory } from 'utils/ServiceUtils';
import { FilterIcon } from 'utils/TableUtils';
import { showErrorToast, showSuccessToast } from 'utils/ToastUtils';
import { IngestionRecentRuns } from '../IngestionRecentRun/IngestionRecentRuns.component';
@ -40,12 +45,15 @@ import { IngestionRecentRuns } from '../IngestionRecentRun/IngestionRecentRuns.c
export const IngestionPipelineList = ({
serviceName,
}: {
serviceName: string;
serviceName: ServiceCategory;
}) => {
const [pipelines, setPipelines] = useState<Array<IngestionPipeline>>();
const [selectedPipelines, setSelectedPipelines] =
useState<Array<IngestionPipeline>>();
const { isAirflowAvailable, isFetchingStatus } = useAirflowStatus();
const [selectedPipelines, setSelectedPipelines] = useState<
Array<IngestionPipeline>
>([]);
const [selectedRowKeys, setSelectedRowKeys] = useState<Array<React.Key>>([]);
const [deploying, setDeploying] = useState(false);
const [loading, setLoading] = useState(false);
@ -186,7 +194,7 @@ export const IngestionPipelineList = ({
try {
const { data, paging } = await getIngestionPipelines({
arrQueryFields: ['owner'],
serviceType: serviceName,
serviceType: getEntityTypeFromServiceCategory(serviceName),
paging: cursor,
pipelineType,
limit,
@ -214,8 +222,8 @@ export const IngestionPipelineList = ({
};
useEffect(() => {
fetchPipelines({ limit: pageSize });
}, [serviceName]);
isAirflowAvailable && fetchPipelines({ limit: pageSize });
}, [serviceName, isAirflowAvailable]);
const handleTableChange: TableProps<IngestionPipeline>['onChange'] = (
_pagination,
@ -234,6 +242,14 @@ export const IngestionPipelineList = ({
fetchPipelines({ pipelineType: pipelineTypeFilter, limit: size });
};
if (isFetchingStatus) {
return <Loader />;
}
if (!isAirflowAvailable) {
return <ErrorPlaceHolderIngestion />;
}
return (
<Row gutter={[16, 16]}>
<Col className="text-right" span={24}>
@ -250,6 +266,9 @@ export const IngestionPipelineList = ({
columns={tableColumn}
dataSource={pipelines}
loading={loading}
locale={{
emptyText: <ErrorPlaceHolder className="m-y-md" />,
}}
pagination={false}
rowKey="fullyQualifiedName"
rowSelection={{

View File

@ -100,7 +100,7 @@ export const getIngestionPipelines = async (data: {
service: serviceFilter,
testSuite,
pipelineType: pipelineType?.length ? pipelineType.join(',') : undefined,
type: serviceType,
serviceType,
limit,
});