diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java index af29c911d9c..272c7fc9a54 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java @@ -1595,22 +1595,27 @@ public interface CollectionDAO { @Override default int listCount(ListFilter filter) { - String serviceType = filter.getQueryParam("serviceType"); - String service = filter.getQueryParam("service"); String condition = "INNER JOIN entity_relationship ON ingestion_pipeline_entity.id = entity_relationship.toId"; - String pipelineTypeCondition; + + if (filter.getQueryParam("pipelineType") != null) { + String pipelineTypeCondition = String.format(" and %s", filter.getPipelineTypeCondition(null)); + condition += pipelineTypeCondition; + } + + if (filter.getQueryParam("service") != null) { + String serviceCondition = String.format(" and %s", filter.getServiceCondition(null)); + condition += serviceCondition; + } + Map bindMap = new HashMap<>(); + String serviceType = filter.getQueryParam("serviceType"); if (!CommonUtil.nullOrEmpty(serviceType)) { - if (filter.getQueryParam("pipelineType") != null) { - pipelineTypeCondition = String.format(" and %s", filter.getPipelineTypeCondition(null)); - condition += pipelineTypeCondition; - } + condition = String.format( - "%s WHERE entity_relationship.fromEntity = :serviceType and entity_relationship.relation = :relation and ingestion_pipeline_entity.fullyQualifiedName LIKE :service", + "%s WHERE entity_relationship.fromEntity = :serviceType and entity_relationship.relation = :relation", condition); bindMap.put("relation", CONTAINS.ordinal()); - bindMap.put("service", service + ".%"); bindMap.put("serviceType", serviceType); return listIngestionPipelineCount(condition, bindMap); } @@ -1619,26 +1624,28 @@ public interface CollectionDAO { @Override default List listAfter(ListFilter filter, int limit, String after) { - String serviceType = filter.getQueryParam("serviceType"); - String service = filter.getQueryParam("service"); String condition = "INNER JOIN entity_relationship ON ingestion_pipeline_entity.id = entity_relationship.toId"; - String pipelineTypeCondition; + + if (filter.getQueryParam("pipelineType") != null) { + String pipelineTypeCondition = String.format(" and %s", filter.getPipelineTypeCondition(null)); + condition += pipelineTypeCondition; + } + + if (filter.getQueryParam("service") != null) { + String serviceCondition = String.format(" and %s", filter.getServiceCondition(null)); + condition += serviceCondition; + } + Map bindMap = new HashMap<>(); + String serviceType = filter.getQueryParam("serviceType"); if (!CommonUtil.nullOrEmpty(serviceType)) { - if (filter.getQueryParam("pipelineType") != null) { - pipelineTypeCondition = filter.getPipelineTypeCondition(null); - condition = - String.format( - "%s WHERE entity_relationship.fromEntity = :serviceType and entity_relationship.relation = :relation and ingestion_pipeline_entity.fullyQualifiedName LIKE :service and ingestion_pipeline_entity.fullyQualifiedName > :after and %s order by ingestion_pipeline_entity.fullyQualifiedName ASC LIMIT :limit", - condition, pipelineTypeCondition); - } else { - condition = - String.format( - "%s WHERE entity_relationship.fromEntity = :serviceType and entity_relationship.relation = :relation and ingestion_pipeline_entity.fullyQualifiedName LIKE :service and ingestion_pipeline_entity.fullyQualifiedName > :after order by ingestion_pipeline_entity.fullyQualifiedName ASC LIMIT :limit", - condition); - } + + condition = + String.format( + "%s WHERE entity_relationship.fromEntity = :serviceType and entity_relationship.relation = :relation and ingestion_pipeline_entity.name > :after order by ingestion_pipeline_entity.name ASC LIMIT :limit", + condition); + bindMap.put("serviceType", serviceType); - bindMap.put("service", service + ".%"); bindMap.put("relation", CONTAINS.ordinal()); bindMap.put("after", after); bindMap.put("limit", limit); @@ -1649,26 +1656,27 @@ public interface CollectionDAO { @Override default List listBefore(ListFilter filter, int limit, String before) { - String service = filter.getQueryParam("service"); - String serviceType = filter.getQueryParam("serviceType"); String condition = "INNER JOIN entity_relationship ON ingestion_pipeline_entity.id = entity_relationship.toId"; - String pipelineTypeCondition; + + if (filter.getQueryParam("pipelineType") != null) { + String pipelineTypeCondition = String.format(" and %s", filter.getPipelineTypeCondition(null)); + condition += pipelineTypeCondition; + } + + if (filter.getQueryParam("service") != null) { + String serviceCondition = String.format(" and %s", filter.getServiceCondition(null)); + condition += serviceCondition; + } + Map bindMap = new HashMap<>(); + String serviceType = filter.getQueryParam("serviceType"); if (!CommonUtil.nullOrEmpty(serviceType)) { - if (filter.getQueryParam("pipelineType") != null) { - pipelineTypeCondition = filter.getPipelineTypeCondition(null); - condition = - String.format( - "%s WHERE entity_relationship.fromEntity = :serviceType and entity_relationship.relation = :relation and ingestion_pipeline_entity.fullyQualifiedName LIKE :service and ingestion_pipeline_entity.fullyQualifiedName < :before and %s order by ingestion_pipeline_entity.fullyQualifiedName DESC LIMIT :limit", - condition, pipelineTypeCondition); - } else { - condition = - String.format( - "%s WHERE entity_relationship.fromEntity = :serviceType and entity_relationship.relation = :relation and ingestion_pipeline_entity.fullyQualifiedName LIKE :service and ingestion_pipeline_entity.fullyQualifiedName < :before order by ingestion_pipeline_entity.fullyQualifiedName DESC LIMIT :limit", - condition); - } + condition = + String.format( + "%s WHERE entity_relationship.fromEntity = :serviceType and entity_relationship.relation = :relation and ingestion_pipeline_entity.name < :before order by ingestion_pipeline_entity.name DESC LIMIT :limit", + condition); + bindMap.put("serviceType", serviceType); - bindMap.put("service", service + ".%"); bindMap.put("relation", CONTAINS.ordinal()); bindMap.put("before", before); bindMap.put("limit", limit); @@ -1682,7 +1690,7 @@ public interface CollectionDAO { @Define("cond") String cond, @BindMap Map bindings); @SqlQuery( - "SELECT json FROM (SELECT ingestion_pipeline_entity.fullyQualifiedName, ingestion_pipeline_entity.json FROM ingestion_pipeline_entity ) last_rows_subquery ORDER BY fullyQualifiedName") + "SELECT json FROM (SELECT ingestion_pipeline_entity.name, ingestion_pipeline_entity.json FROM ingestion_pipeline_entity ) last_rows_subquery ORDER BY fullyQualifiedName") List listBeforeIngestionPipelineByserviceType( @Define("cond") String cond, @BindMap Map bindings); diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/resources/services/ingestionpipelines/IngestionPipelineResourceTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/resources/services/ingestionpipelines/IngestionPipelineResourceTest.java index 83746db9e12..2c0c4c71e5e 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/resources/services/ingestionpipelines/IngestionPipelineResourceTest.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/resources/services/ingestionpipelines/IngestionPipelineResourceTest.java @@ -81,6 +81,7 @@ import org.openmetadata.service.secrets.masker.PasswordEntityMasker; import org.openmetadata.service.security.SecurityUtil; import org.openmetadata.service.util.FullyQualifiedName; import org.openmetadata.service.util.JsonUtils; +import org.openmetadata.service.util.ResultList; import org.openmetadata.service.util.TestUtils; @Slf4j @@ -175,6 +176,39 @@ public class IngestionPipelineResourceTest extends EntityResourceTest paramsMessaging = new HashMap<>(); + paramsMessaging.put("serviceType", "messagingService"); + ResultList resList = listEntities(paramsMessaging, ADMIN_AUTH_HEADERS); + assertEquals(1, resList.getData().size()); + + Map paramsType = new HashMap<>(); + paramsType.put("pipelineType", "metadata"); + ResultList resListMeta = listEntities(paramsType, ADMIN_AUTH_HEADERS); + // We get at least the 2 pipelines created here + assertTrue(resListMeta.getData().size() >= 2); + + Map paramsMessagingService = new HashMap<>(); + paramsMessagingService.put("service", REDPANDA_REFERENCE.getFullyQualifiedName()); + ResultList redpandaIngestionList = listEntities(paramsMessagingService, ADMIN_AUTH_HEADERS); + assertEquals(1, redpandaIngestionList.getData().size()); + } + @Test void post_validIngestionPipeline_as_admin_200_OK(TestInfo test) throws IOException { // Create team with different optional fields diff --git a/openmetadata-ui/src/main/resources/ui/src/components/Ingestion/IngestionPipelineList/IngestinoPipelineList.test.tsx b/openmetadata-ui/src/main/resources/ui/src/components/Ingestion/IngestionPipelineList/IngestinoPipelineList.test.tsx new file mode 100644 index 00000000000..70433380679 --- /dev/null +++ b/openmetadata-ui/src/main/resources/ui/src/components/Ingestion/IngestionPipelineList/IngestinoPipelineList.test.tsx @@ -0,0 +1,72 @@ +/* + * Copyright 2023 Collate. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import { render, screen } from '@testing-library/react'; +import { ServiceCategory } from 'enums/service.enum'; +import { useAirflowStatus } from 'hooks/useAirflowStatus'; +import React from 'react'; +import { IngestionPipelineList } from './IngestionPipelineList.component'; + +const mockGetIngestinoPipelines = jest.fn(); +const mockBulkDeployPipelines = jest.fn(); + +jest.mock( + 'components/common/error-with-placeholder/ErrorPlaceHolderIngestion', + () => { + return jest.fn().mockImplementation(() =>

Airflow not available

); + } +); + +jest.mock('hooks/useAirflowStatus', () => ({ + ...jest.requireActual('hooks/useAirflowStatus'), + useAirflowStatus: jest.fn().mockImplementation(() => ({ + isAirflowAvailable: false, + isFetchingStatus: true, + })), +})); + +jest.mock('components/Loader/Loader', () => { + return jest.fn().mockReturnValue(
Loader
); +}); + +jest.mock('rest/ingestionPipelineAPI', () => ({ + deployIngestionPipelineById: mockBulkDeployPipelines, + getIngestionPipelines: mockGetIngestinoPipelines, +})); + +describe('IngestionPipelineList component', () => { + it('component should show loader until get status of airflow', () => { + (useAirflowStatus as jest.Mock).mockImplementationOnce(() => ({ + isAirflowAvailable: false, + isFetchingStatus: true, + })); + + render( + + ); + + expect(screen.getByText('Loader')).toBeInTheDocument(); + }); + + it('component should show error placeholder for airflow not available', () => { + (useAirflowStatus as jest.Mock).mockImplementationOnce(() => ({ + isAirflowAvailable: false, + isFetchingStatus: false, + })); + + render( + + ); + + expect(screen.getByText('Airflow not available')).toBeInTheDocument(); + }); +}); diff --git a/openmetadata-ui/src/main/resources/ui/src/components/Ingestion/IngestionPipelineList/IngestionPipelineList.component.tsx b/openmetadata-ui/src/main/resources/ui/src/components/Ingestion/IngestionPipelineList/IngestionPipelineList.component.tsx index 3fff1b1edda..73cf2fc85e1 100644 --- a/openmetadata-ui/src/main/resources/ui/src/components/Ingestion/IngestionPipelineList/IngestionPipelineList.component.tsx +++ b/openmetadata-ui/src/main/resources/ui/src/components/Ingestion/IngestionPipelineList/IngestionPipelineList.component.tsx @@ -10,20 +10,24 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -import { Button, Col, Row, Typography } from 'antd'; -import Tooltip from 'antd/es/tooltip'; +import { Button, Col, Row, Tooltip, Typography } from 'antd'; import { ColumnsType, TableProps } from 'antd/lib/table'; import { AxiosError } from 'axios'; +import ErrorPlaceHolder from 'components/common/error-with-placeholder/ErrorPlaceHolder'; +import ErrorPlaceHolderIngestion from 'components/common/error-with-placeholder/ErrorPlaceHolderIngestion'; import NextPrevious from 'components/common/next-previous/NextPrevious'; import { PagingHandlerParams } from 'components/common/next-previous/NextPrevious.interface'; import Table from 'components/common/Table/Table'; +import Loader from 'components/Loader/Loader'; import { ColumnFilter } from 'components/Table/ColumnFilter/ColumnFilter.component'; import cronstrue from 'cronstrue'; +import { ServiceCategory } from 'enums/service.enum'; import { IngestionPipeline, PipelineType, } from 'generated/entity/services/ingestionPipelines/ingestionPipeline'; import { usePaging } from 'hooks/paging/usePaging'; +import { useAirflowStatus } from 'hooks/useAirflowStatus'; import { isNil, map, startCase } from 'lodash'; import React, { useEffect, useMemo, useState } from 'react'; import { useTranslation } from 'react-i18next'; @@ -33,6 +37,7 @@ import { } from 'rest/ingestionPipelineAPI'; import { showPagination } from 'utils/CommonUtils'; import { getEntityName } from 'utils/EntityUtils'; +import { getEntityTypeFromServiceCategory } from 'utils/ServiceUtils'; import { FilterIcon } from 'utils/TableUtils'; import { showErrorToast, showSuccessToast } from 'utils/ToastUtils'; import { IngestionRecentRuns } from '../IngestionRecentRun/IngestionRecentRuns.component'; @@ -40,12 +45,15 @@ import { IngestionRecentRuns } from '../IngestionRecentRun/IngestionRecentRuns.c export const IngestionPipelineList = ({ serviceName, }: { - serviceName: string; + serviceName: ServiceCategory; }) => { const [pipelines, setPipelines] = useState>(); - const [selectedPipelines, setSelectedPipelines] = - useState>(); + const { isAirflowAvailable, isFetchingStatus } = useAirflowStatus(); + + const [selectedPipelines, setSelectedPipelines] = useState< + Array + >([]); const [selectedRowKeys, setSelectedRowKeys] = useState>([]); const [deploying, setDeploying] = useState(false); const [loading, setLoading] = useState(false); @@ -186,7 +194,7 @@ export const IngestionPipelineList = ({ try { const { data, paging } = await getIngestionPipelines({ arrQueryFields: ['owner'], - serviceType: serviceName, + serviceType: getEntityTypeFromServiceCategory(serviceName), paging: cursor, pipelineType, limit, @@ -214,8 +222,8 @@ export const IngestionPipelineList = ({ }; useEffect(() => { - fetchPipelines({ limit: pageSize }); - }, [serviceName]); + isAirflowAvailable && fetchPipelines({ limit: pageSize }); + }, [serviceName, isAirflowAvailable]); const handleTableChange: TableProps['onChange'] = ( _pagination, @@ -234,6 +242,14 @@ export const IngestionPipelineList = ({ fetchPipelines({ pipelineType: pipelineTypeFilter, limit: size }); }; + if (isFetchingStatus) { + return ; + } + + if (!isAirflowAvailable) { + return ; + } + return ( @@ -250,6 +266,9 @@ export const IngestionPipelineList = ({ columns={tableColumn} dataSource={pipelines} loading={loading} + locale={{ + emptyText: , + }} pagination={false} rowKey="fullyQualifiedName" rowSelection={{ diff --git a/openmetadata-ui/src/main/resources/ui/src/rest/ingestionPipelineAPI.ts b/openmetadata-ui/src/main/resources/ui/src/rest/ingestionPipelineAPI.ts index 7ecad1c9844..8e2914fc2f6 100644 --- a/openmetadata-ui/src/main/resources/ui/src/rest/ingestionPipelineAPI.ts +++ b/openmetadata-ui/src/main/resources/ui/src/rest/ingestionPipelineAPI.ts @@ -100,7 +100,7 @@ export const getIngestionPipelines = async (data: { service: serviceFilter, testSuite, pipelineType: pipelineType?.length ? pipelineType.join(',') : undefined, - type: serviceType, + serviceType, limit, });