Fix start datetime and format for create ingestions (#5414)

Co-authored-by: pmbrull <peremiquelbrull@gmail.com>
Co-authored-by: Vivek Ratnavel Subramanian <vivekratnavel90@gmail.com>
This commit is contained in:
darth-coder00 2022-06-15 00:06:46 +05:30 committed by GitHub
parent 04640574bc
commit 4662fa8b6c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 74 additions and 65 deletions

View File

@ -54,11 +54,11 @@
}, },
"startDate": { "startDate": {
"description": "Start date of the pipeline.", "description": "Start date of the pipeline.",
"$ref": "../../../type/basic.json#/definitions/date" "$ref": "../../../type/basic.json#/definitions/dateTime"
}, },
"endDate": { "endDate": {
"description": "End Date of the pipeline.", "description": "End Date of the pipeline.",
"$ref": "../../../type/basic.json#/definitions/date" "$ref": "../../../type/basic.json#/definitions/dateTime"
}, },
"pipelineTimezone": { "pipelineTimezone": {
"description": "Timezone in which pipeline going to be scheduled.", "description": "Timezone in which pipeline going to be scheduled.",
@ -109,8 +109,7 @@
"$ref": "../../../type/basic.json#/definitions/email" "$ref": "../../../type/basic.json#/definitions/email"
} }
}, },
"additionalProperties": false, "additionalProperties": false
"required": ["startDate"]
} }
}, },
"properties": { "properties": {

View File

@ -78,6 +78,7 @@ public class IngestionPipelineResourceTest extends EntityResourceTest<IngestionP
public static SourceConfig MESSAGING_METADATA_CONFIG; public static SourceConfig MESSAGING_METADATA_CONFIG;
public static AirflowConfiguration AIRFLOW_CONFIG; public static AirflowConfiguration AIRFLOW_CONFIG;
public static DatabaseServiceResourceTest DATABASE_SERVICE_RESOURCE_TEST; public static DatabaseServiceResourceTest DATABASE_SERVICE_RESOURCE_TEST;
public static Date START_DATE;
public IngestionPipelineResourceTest() { public IngestionPipelineResourceTest() {
super( super(
@ -112,6 +113,7 @@ public class IngestionPipelineResourceTest extends EntityResourceTest<IngestionP
AIRFLOW_CONFIG.setUsername("admin"); AIRFLOW_CONFIG.setUsername("admin");
AIRFLOW_CONFIG.setPassword("admin"); AIRFLOW_CONFIG.setPassword("admin");
DATABASE_SERVICE_RESOURCE_TEST = new DatabaseServiceResourceTest(); DATABASE_SERVICE_RESOURCE_TEST = new DatabaseServiceResourceTest();
START_DATE = new DateTime("2022-06-10T15:06:47+00:00").toDate();
} }
@Override @Override
@ -121,7 +123,7 @@ public class IngestionPipelineResourceTest extends EntityResourceTest<IngestionP
.withPipelineType(PipelineType.METADATA) .withPipelineType(PipelineType.METADATA)
.withService(getContainer()) .withService(getContainer())
.withSourceConfig(DATABASE_METADATA_CONFIG) .withSourceConfig(DATABASE_METADATA_CONFIG)
.withAirflowConfig(new AirflowConfig().withStartDate("2021-11-21")); .withAirflowConfig(new AirflowConfig().withStartDate(new DateTime("2022-06-10T15:06:47+00:00").toDate()));
} }
@Override @Override
@ -197,7 +199,7 @@ public class IngestionPipelineResourceTest extends EntityResourceTest<IngestionP
.withPipelineType(PipelineType.METADATA) .withPipelineType(PipelineType.METADATA)
.withService(new EntityReference().withId(BIGQUERY_REFERENCE.getId()).withType("databaseService")) .withService(new EntityReference().withId(BIGQUERY_REFERENCE.getId()).withType("databaseService"))
.withDescription("description") .withDescription("description")
.withAirflowConfig(new AirflowConfig().withStartDate("2021-11-21").withScheduleInterval("5 * * * *")); .withAirflowConfig(new AirflowConfig().withStartDate(START_DATE).withScheduleInterval("5 * * * *"));
createAndCheckEntity(request, ADMIN_AUTH_HEADERS); createAndCheckEntity(request, ADMIN_AUTH_HEADERS);
Integer pipelineConcurrency = 110; Integer pipelineConcurrency = 110;
Date startDate = new DateTime("2021-11-13T20:20:39+00:00").toDate(); Date startDate = new DateTime("2021-11-13T20:20:39+00:00").toDate();
@ -211,12 +213,12 @@ public class IngestionPipelineResourceTest extends EntityResourceTest<IngestionP
new AirflowConfig() new AirflowConfig()
.withConcurrency(pipelineConcurrency) .withConcurrency(pipelineConcurrency)
.withScheduleInterval(expectedScheduleInterval) .withScheduleInterval(expectedScheduleInterval)
.withStartDate(startDate.toString())), .withStartDate(startDate)),
OK, OK,
ADMIN_AUTH_HEADERS); ADMIN_AUTH_HEADERS);
String expectedFQN = FullyQualifiedName.add(BIGQUERY_REFERENCE.getFullyQualifiedName(), ingestion.getName()); String expectedFQN = FullyQualifiedName.add(BIGQUERY_REFERENCE.getFullyQualifiedName(), ingestion.getName());
validateSourceConfig(DATABASE_METADATA_CONFIG, ingestion.getSourceConfig(), ingestion); validateSourceConfig(DATABASE_METADATA_CONFIG, ingestion.getSourceConfig(), ingestion);
assertEquals(startDate.toString(), ingestion.getAirflowConfig().getStartDate()); assertEquals(startDate, ingestion.getAirflowConfig().getStartDate());
assertEquals(pipelineConcurrency, ingestion.getAirflowConfig().getConcurrency()); assertEquals(pipelineConcurrency, ingestion.getAirflowConfig().getConcurrency());
assertEquals(expectedFQN, ingestion.getFullyQualifiedName()); assertEquals(expectedFQN, ingestion.getFullyQualifiedName());
assertEquals(expectedScheduleInterval, ingestion.getAirflowConfig().getScheduleInterval()); assertEquals(expectedScheduleInterval, ingestion.getAirflowConfig().getScheduleInterval());
@ -232,7 +234,7 @@ public class IngestionPipelineResourceTest extends EntityResourceTest<IngestionP
.withPipelineType(PipelineType.METADATA) .withPipelineType(PipelineType.METADATA)
.withService(new EntityReference().withId(BIGQUERY_REFERENCE.getId()).withType("databaseService")) .withService(new EntityReference().withId(BIGQUERY_REFERENCE.getId()).withType("databaseService"))
.withDescription("description") .withDescription("description")
.withAirflowConfig(new AirflowConfig().withScheduleInterval("5 * * * *").withStartDate("2021-11-21")); .withAirflowConfig(new AirflowConfig().withScheduleInterval("5 * * * *").withStartDate(START_DATE));
createAndCheckEntity(request, ADMIN_AUTH_HEADERS); createAndCheckEntity(request, ADMIN_AUTH_HEADERS);
Integer pipelineConcurrency = 110; Integer pipelineConcurrency = 110;
Date startDate = new DateTime("2021-11-13T20:20:39+00:00").toDate(); Date startDate = new DateTime("2021-11-13T20:20:39+00:00").toDate();
@ -250,12 +252,12 @@ public class IngestionPipelineResourceTest extends EntityResourceTest<IngestionP
new AirflowConfig() new AirflowConfig()
.withConcurrency(pipelineConcurrency) .withConcurrency(pipelineConcurrency)
.withScheduleInterval(expectedScheduleInterval) .withScheduleInterval(expectedScheduleInterval)
.withStartDate(startDate.toString())), .withStartDate(startDate)),
OK, OK,
ADMIN_AUTH_HEADERS); ADMIN_AUTH_HEADERS);
String expectedFQN = FullyQualifiedName.add(BIGQUERY_REFERENCE.getFullyQualifiedName(), ingestion.getName()); String expectedFQN = FullyQualifiedName.add(BIGQUERY_REFERENCE.getFullyQualifiedName(), ingestion.getName());
validateSourceConfig(queryUsageConfig, ingestion.getSourceConfig(), ingestion); validateSourceConfig(queryUsageConfig, ingestion.getSourceConfig(), ingestion);
assertEquals(startDate.toString(), ingestion.getAirflowConfig().getStartDate()); assertEquals(startDate, ingestion.getAirflowConfig().getStartDate());
assertEquals(pipelineConcurrency, ingestion.getAirflowConfig().getConcurrency()); assertEquals(pipelineConcurrency, ingestion.getAirflowConfig().getConcurrency());
assertEquals(expectedFQN, ingestion.getFullyQualifiedName()); assertEquals(expectedFQN, ingestion.getFullyQualifiedName());
assertEquals(expectedScheduleInterval, ingestion.getAirflowConfig().getScheduleInterval()); assertEquals(expectedScheduleInterval, ingestion.getAirflowConfig().getScheduleInterval());
@ -269,7 +271,7 @@ public class IngestionPipelineResourceTest extends EntityResourceTest<IngestionP
createRequest(test) createRequest(test)
.withService(new EntityReference().withId(BIGQUERY_REFERENCE.getId()).withType("databaseService")) .withService(new EntityReference().withId(BIGQUERY_REFERENCE.getId()).withType("databaseService"))
.withDescription("description") .withDescription("description")
.withAirflowConfig(new AirflowConfig().withScheduleInterval("5 * * * *").withStartDate("2021-11-21")); .withAirflowConfig(new AirflowConfig().withScheduleInterval("5 * * * *").withStartDate(START_DATE));
createAndCheckEntity(request, ADMIN_AUTH_HEADERS); createAndCheckEntity(request, ADMIN_AUTH_HEADERS);
Integer pipelineConcurrency = 110; Integer pipelineConcurrency = 110;
Date startDate = new DateTime("2021-11-13T20:20:39+00:00").toDate(); Date startDate = new DateTime("2021-11-13T20:20:39+00:00").toDate();
@ -283,11 +285,11 @@ public class IngestionPipelineResourceTest extends EntityResourceTest<IngestionP
new AirflowConfig() new AirflowConfig()
.withConcurrency(pipelineConcurrency) .withConcurrency(pipelineConcurrency)
.withScheduleInterval(expectedScheduleInterval) .withScheduleInterval(expectedScheduleInterval)
.withStartDate(startDate.toString())), .withStartDate(startDate)),
OK, OK,
ADMIN_AUTH_HEADERS); ADMIN_AUTH_HEADERS);
String expectedFQN = FullyQualifiedName.add(BIGQUERY_REFERENCE.getFullyQualifiedName(), ingestion.getName()); String expectedFQN = FullyQualifiedName.add(BIGQUERY_REFERENCE.getFullyQualifiedName(), ingestion.getName());
assertEquals(startDate.toString(), ingestion.getAirflowConfig().getStartDate()); assertEquals(startDate, ingestion.getAirflowConfig().getStartDate());
assertEquals(pipelineConcurrency, ingestion.getAirflowConfig().getConcurrency()); assertEquals(pipelineConcurrency, ingestion.getAirflowConfig().getConcurrency());
assertEquals(expectedFQN, ingestion.getFullyQualifiedName()); assertEquals(expectedFQN, ingestion.getFullyQualifiedName());
assertEquals(expectedScheduleInterval, ingestion.getAirflowConfig().getScheduleInterval()); assertEquals(expectedScheduleInterval, ingestion.getAirflowConfig().getScheduleInterval());
@ -310,10 +312,10 @@ public class IngestionPipelineResourceTest extends EntityResourceTest<IngestionP
new AirflowConfig() new AirflowConfig()
.withConcurrency(pipelineConcurrency) .withConcurrency(pipelineConcurrency)
.withScheduleInterval(expectedScheduleInterval) .withScheduleInterval(expectedScheduleInterval)
.withStartDate(startDate.toString())), .withStartDate(startDate)),
OK, OK,
ADMIN_AUTH_HEADERS); ADMIN_AUTH_HEADERS);
assertEquals(startDate.toString(), ingestion.getAirflowConfig().getStartDate()); assertEquals(startDate, ingestion.getAirflowConfig().getStartDate());
assertEquals(pipelineConcurrency, ingestion.getAirflowConfig().getConcurrency()); assertEquals(pipelineConcurrency, ingestion.getAirflowConfig().getConcurrency());
assertEquals(expectedFQN, ingestion.getFullyQualifiedName()); assertEquals(expectedFQN, ingestion.getFullyQualifiedName());
assertEquals(expectedScheduleInterval, ingestion.getAirflowConfig().getScheduleInterval()); assertEquals(expectedScheduleInterval, ingestion.getAirflowConfig().getScheduleInterval());
@ -330,7 +332,7 @@ public class IngestionPipelineResourceTest extends EntityResourceTest<IngestionP
.withService(new EntityReference().withId(SUPERSET_REFERENCE.getId()).withType("dashboardService")) .withService(new EntityReference().withId(SUPERSET_REFERENCE.getId()).withType("dashboardService"))
.withDescription("description") .withDescription("description")
.withSourceConfig(DASHBOARD_METADATA_CONFIG) .withSourceConfig(DASHBOARD_METADATA_CONFIG)
.withAirflowConfig(new AirflowConfig().withScheduleInterval("5 * * * *").withStartDate("2021-11-21")); .withAirflowConfig(new AirflowConfig().withScheduleInterval("5 * * * *").withStartDate(START_DATE));
createAndCheckEntity(request, ADMIN_AUTH_HEADERS); createAndCheckEntity(request, ADMIN_AUTH_HEADERS);
Integer pipelineConcurrency = 110; Integer pipelineConcurrency = 110;
Date startDate = new DateTime("2021-11-13T20:20:39+00:00").toDate(); Date startDate = new DateTime("2021-11-13T20:20:39+00:00").toDate();
@ -344,11 +346,11 @@ public class IngestionPipelineResourceTest extends EntityResourceTest<IngestionP
new AirflowConfig() new AirflowConfig()
.withConcurrency(pipelineConcurrency) .withConcurrency(pipelineConcurrency)
.withScheduleInterval(expectedScheduleInterval) .withScheduleInterval(expectedScheduleInterval)
.withStartDate(startDate.toString())), .withStartDate(startDate)),
OK, OK,
ADMIN_AUTH_HEADERS); ADMIN_AUTH_HEADERS);
String expectedFQN = FullyQualifiedName.build(SUPERSET_REFERENCE.getName(), ingestion.getName()); String expectedFQN = FullyQualifiedName.build(SUPERSET_REFERENCE.getName(), ingestion.getName());
assertEquals(startDate.toString(), ingestion.getAirflowConfig().getStartDate()); assertEquals(startDate, ingestion.getAirflowConfig().getStartDate());
assertEquals(pipelineConcurrency, ingestion.getAirflowConfig().getConcurrency()); assertEquals(pipelineConcurrency, ingestion.getAirflowConfig().getConcurrency());
assertEquals(expectedFQN, ingestion.getFullyQualifiedName()); assertEquals(expectedFQN, ingestion.getFullyQualifiedName());
assertEquals(expectedScheduleInterval, ingestion.getAirflowConfig().getScheduleInterval()); assertEquals(expectedScheduleInterval, ingestion.getAirflowConfig().getScheduleInterval());
@ -367,10 +369,10 @@ public class IngestionPipelineResourceTest extends EntityResourceTest<IngestionP
new AirflowConfig() new AirflowConfig()
.withConcurrency(pipelineConcurrency) .withConcurrency(pipelineConcurrency)
.withScheduleInterval(expectedScheduleInterval) .withScheduleInterval(expectedScheduleInterval)
.withStartDate(startDate.toString())), .withStartDate(startDate)),
OK, OK,
ADMIN_AUTH_HEADERS); ADMIN_AUTH_HEADERS);
assertEquals(startDate.toString(), ingestion.getAirflowConfig().getStartDate()); assertEquals(startDate, ingestion.getAirflowConfig().getStartDate());
assertEquals(pipelineConcurrency, ingestion.getAirflowConfig().getConcurrency()); assertEquals(pipelineConcurrency, ingestion.getAirflowConfig().getConcurrency());
assertEquals(expectedFQN, ingestion.getFullyQualifiedName()); assertEquals(expectedFQN, ingestion.getFullyQualifiedName());
assertEquals(expectedScheduleInterval, ingestion.getAirflowConfig().getScheduleInterval()); assertEquals(expectedScheduleInterval, ingestion.getAirflowConfig().getScheduleInterval());
@ -384,7 +386,7 @@ public class IngestionPipelineResourceTest extends EntityResourceTest<IngestionP
.withService(new EntityReference().withId(KAFKA_REFERENCE.getId()).withType("messagingService")) .withService(new EntityReference().withId(KAFKA_REFERENCE.getId()).withType("messagingService"))
.withDescription("description") .withDescription("description")
.withSourceConfig(MESSAGING_METADATA_CONFIG) .withSourceConfig(MESSAGING_METADATA_CONFIG)
.withAirflowConfig(new AirflowConfig().withScheduleInterval("5 * * * *").withStartDate("2021-11-21")); .withAirflowConfig(new AirflowConfig().withScheduleInterval("5 * * * *").withStartDate(START_DATE));
createAndCheckEntity(request, ADMIN_AUTH_HEADERS); createAndCheckEntity(request, ADMIN_AUTH_HEADERS);
Integer pipelineConcurrency = 110; Integer pipelineConcurrency = 110;
Date startDate = new DateTime("2021-11-13T20:20:39+00:00").toDate(); Date startDate = new DateTime("2021-11-13T20:20:39+00:00").toDate();
@ -398,11 +400,11 @@ public class IngestionPipelineResourceTest extends EntityResourceTest<IngestionP
new AirflowConfig() new AirflowConfig()
.withConcurrency(pipelineConcurrency) .withConcurrency(pipelineConcurrency)
.withScheduleInterval(expectedScheduleInterval) .withScheduleInterval(expectedScheduleInterval)
.withStartDate(startDate.toString())), .withStartDate(startDate)),
OK, OK,
ADMIN_AUTH_HEADERS); ADMIN_AUTH_HEADERS);
String expectedFQN = FullyQualifiedName.build(KAFKA_REFERENCE.getName(), ingestion.getName()); String expectedFQN = FullyQualifiedName.build(KAFKA_REFERENCE.getName(), ingestion.getName());
assertEquals(startDate.toString(), ingestion.getAirflowConfig().getStartDate()); assertEquals(startDate, ingestion.getAirflowConfig().getStartDate());
assertEquals(pipelineConcurrency, ingestion.getAirflowConfig().getConcurrency()); assertEquals(pipelineConcurrency, ingestion.getAirflowConfig().getConcurrency());
assertEquals(expectedFQN, ingestion.getFullyQualifiedName()); assertEquals(expectedFQN, ingestion.getFullyQualifiedName());
assertEquals(expectedScheduleInterval, ingestion.getAirflowConfig().getScheduleInterval()); assertEquals(expectedScheduleInterval, ingestion.getAirflowConfig().getScheduleInterval());
@ -420,10 +422,10 @@ public class IngestionPipelineResourceTest extends EntityResourceTest<IngestionP
new AirflowConfig() new AirflowConfig()
.withConcurrency(pipelineConcurrency) .withConcurrency(pipelineConcurrency)
.withScheduleInterval(expectedScheduleInterval) .withScheduleInterval(expectedScheduleInterval)
.withStartDate(startDate.toString())), .withStartDate(startDate)),
OK, OK,
ADMIN_AUTH_HEADERS); ADMIN_AUTH_HEADERS);
assertEquals(startDate.toString(), ingestion.getAirflowConfig().getStartDate()); assertEquals(startDate, ingestion.getAirflowConfig().getStartDate());
assertEquals(pipelineConcurrency, ingestion.getAirflowConfig().getConcurrency()); assertEquals(pipelineConcurrency, ingestion.getAirflowConfig().getConcurrency());
assertEquals(expectedFQN, ingestion.getFullyQualifiedName()); assertEquals(expectedFQN, ingestion.getFullyQualifiedName());
assertEquals(expectedScheduleInterval, ingestion.getAirflowConfig().getScheduleInterval()); assertEquals(expectedScheduleInterval, ingestion.getAirflowConfig().getScheduleInterval());
@ -438,7 +440,7 @@ public class IngestionPipelineResourceTest extends EntityResourceTest<IngestionP
.withPipelineType(PipelineType.METADATA) .withPipelineType(PipelineType.METADATA)
.withService(new EntityReference().withId(BIGQUERY_REFERENCE.getId()).withType("databaseService")) .withService(new EntityReference().withId(BIGQUERY_REFERENCE.getId()).withType("databaseService"))
.withDescription("description") .withDescription("description")
.withAirflowConfig(new AirflowConfig().withScheduleInterval("5 * * * *").withStartDate("2021-11-21")) .withAirflowConfig(new AirflowConfig().withScheduleInterval("5 * * * *").withStartDate(START_DATE))
.withOwner(USER_OWNER1); .withOwner(USER_OWNER1);
IngestionPipeline ingestionPipeline = createAndCheckEntity(request, ADMIN_AUTH_HEADERS); IngestionPipeline ingestionPipeline = createAndCheckEntity(request, ADMIN_AUTH_HEADERS);
@ -455,13 +457,13 @@ public class IngestionPipelineResourceTest extends EntityResourceTest<IngestionP
new AirflowConfig() new AirflowConfig()
.withConcurrency(pipelineConcurrency) .withConcurrency(pipelineConcurrency)
.withScheduleInterval(expectedScheduleInterval) .withScheduleInterval(expectedScheduleInterval)
.withStartDate(startDate.toString())), .withStartDate(startDate)),
OK, OK,
ADMIN_AUTH_HEADERS); ADMIN_AUTH_HEADERS);
String expectedFQN = FullyQualifiedName.add(BIGQUERY_REFERENCE.getFullyQualifiedName(), ingestion.getName()); String expectedFQN = FullyQualifiedName.add(BIGQUERY_REFERENCE.getFullyQualifiedName(), ingestion.getName());
validateSourceConfig(DATABASE_METADATA_CONFIG, ingestion.getSourceConfig(), ingestionPipeline); validateSourceConfig(DATABASE_METADATA_CONFIG, ingestion.getSourceConfig(), ingestionPipeline);
assertEquals(startDate.toString(), ingestion.getAirflowConfig().getStartDate()); assertEquals(startDate, ingestion.getAirflowConfig().getStartDate());
assertEquals(pipelineConcurrency, ingestion.getAirflowConfig().getConcurrency()); assertEquals(pipelineConcurrency, ingestion.getAirflowConfig().getConcurrency());
assertEquals(expectedFQN, ingestion.getFullyQualifiedName()); assertEquals(expectedFQN, ingestion.getFullyQualifiedName());
assertEquals(expectedScheduleInterval, ingestion.getAirflowConfig().getScheduleInterval()); assertEquals(expectedScheduleInterval, ingestion.getAirflowConfig().getScheduleInterval());
@ -528,7 +530,7 @@ public class IngestionPipelineResourceTest extends EntityResourceTest<IngestionP
.withPipelineType(PipelineType.METADATA) .withPipelineType(PipelineType.METADATA)
.withService(bigqueryRef) .withService(bigqueryRef)
.withDescription("description") .withDescription("description")
.withAirflowConfig(new AirflowConfig().withScheduleInterval("5 * * * *").withStartDate("2021-11-21")); .withAirflowConfig(new AirflowConfig().withScheduleInterval("5 * * * *").withStartDate(START_DATE));
IngestionPipeline pipelineBigquery1 = createAndCheckEntity(requestPipeline_1, ADMIN_AUTH_HEADERS); IngestionPipeline pipelineBigquery1 = createAndCheckEntity(requestPipeline_1, ADMIN_AUTH_HEADERS);
CreateIngestionPipeline requestPipeline_2 = CreateIngestionPipeline requestPipeline_2 =
createRequest(test) createRequest(test)
@ -536,7 +538,7 @@ public class IngestionPipelineResourceTest extends EntityResourceTest<IngestionP
.withPipelineType(PipelineType.METADATA) .withPipelineType(PipelineType.METADATA)
.withService(bigqueryRef) .withService(bigqueryRef)
.withDescription("description") .withDescription("description")
.withAirflowConfig(new AirflowConfig().withScheduleInterval("5 * * * *").withStartDate("2021-11-21")); .withAirflowConfig(new AirflowConfig().withScheduleInterval("5 * * * *").withStartDate(START_DATE));
IngestionPipeline pipelineBigquery2 = createAndCheckEntity(requestPipeline_2, ADMIN_AUTH_HEADERS); IngestionPipeline pipelineBigquery2 = createAndCheckEntity(requestPipeline_2, ADMIN_AUTH_HEADERS);
CreateIngestionPipeline requestPipeline_3 = CreateIngestionPipeline requestPipeline_3 =
createRequest(test) createRequest(test)
@ -544,7 +546,7 @@ public class IngestionPipelineResourceTest extends EntityResourceTest<IngestionP
.withPipelineType(PipelineType.METADATA) .withPipelineType(PipelineType.METADATA)
.withService(snowflakeRef) .withService(snowflakeRef)
.withDescription("description") .withDescription("description")
.withAirflowConfig(new AirflowConfig().withScheduleInterval("5 * * * *").withStartDate("2021-11-21")); .withAirflowConfig(new AirflowConfig().withScheduleInterval("5 * * * *").withStartDate(START_DATE));
IngestionPipeline IngestionPipeline3 = createAndCheckEntity(requestPipeline_3, ADMIN_AUTH_HEADERS); IngestionPipeline IngestionPipeline3 = createAndCheckEntity(requestPipeline_3, ADMIN_AUTH_HEADERS);
// List charts by filtering on service name and ensure right charts in the response // List charts by filtering on service name and ensure right charts in the response
Map<String, String> queryParams = new HashMap<>(); Map<String, String> queryParams = new HashMap<>();

View File

@ -15,6 +15,7 @@ import json
from datetime import datetime, timedelta from datetime import datetime, timedelta
from typing import Callable, Optional, Union from typing import Callable, Optional, Union
import airflow
from airflow import DAG from airflow import DAG
from metadata.generated.schema.entity.services.dashboardService import DashboardService from metadata.generated.schema.entity.services.dashboardService import DashboardService
@ -132,10 +133,10 @@ def profiler_workflow(workflow_config: OpenMetadataWorkflowConfig):
def date_to_datetime( def date_to_datetime(
date: Optional[basic.Date], date_format: str = "%Y-%m-%d" date: Optional[basic.DateTime], date_format: str = "%Y-%m-%dT%H:%M:%S%z"
) -> Optional[datetime]: ) -> Optional[datetime]:
""" """
Format a basic.Date to datetime Format a basic.DateTime to datetime. ISO 8601 format by default.
""" """
if date is None: if date is None:
return return
@ -166,8 +167,12 @@ def build_dag_configs(ingestion_pipeline: IngestionPipeline) -> dict:
return { return {
"dag_id": ingestion_pipeline.name.__root__, "dag_id": ingestion_pipeline.name.__root__,
"description": ingestion_pipeline.description, "description": ingestion_pipeline.description,
"start_date": date_to_datetime(ingestion_pipeline.airflowConfig.startDate), "start_date": ingestion_pipeline.airflowConfig.startDate.__root__
"end_date": date_to_datetime(ingestion_pipeline.airflowConfig.endDate), if ingestion_pipeline.airflowConfig.startDate
else airflow.utils.dates.days_ago(1),
"end_date": ingestion_pipeline.airflowConfig.endDate.__root__
if ingestion_pipeline.airflowConfig.endDate
else None,
"concurrency": ingestion_pipeline.airflowConfig.concurrency, "concurrency": ingestion_pipeline.airflowConfig.concurrency,
"max_active_runs": ingestion_pipeline.airflowConfig.maxActiveRuns, "max_active_runs": ingestion_pipeline.airflowConfig.maxActiveRuns,
"default_view": ingestion_pipeline.airflowConfig.workflowDefaultView, "default_view": ingestion_pipeline.airflowConfig.workflowDefaultView,

View File

@ -29,9 +29,19 @@ from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipel
IngestionPipeline, IngestionPipeline,
PipelineType, PipelineType,
) )
from metadata.generated.schema.metadataIngestion.databaseServiceMetadataPipeline import (
DatabaseServiceMetadataPipeline,
)
from metadata.generated.schema.metadataIngestion.databaseServiceProfilerPipeline import (
DatabaseServiceProfilerPipeline,
)
from metadata.generated.schema.metadataIngestion.databaseServiceQueryUsagePipeline import (
DatabaseServiceQueryUsagePipeline,
)
from metadata.generated.schema.metadataIngestion.workflow import ( from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource, Source as WorkflowSource,
) )
from metadata.generated.schema.metadataIngestion.workflow import SourceConfig
from metadata.generated.schema.type.entityReference import EntityReference from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.workflow import Workflow from metadata.ingestion.api.workflow import Workflow
from metadata.ingestion.models.encoders import show_secrets_encoder from metadata.ingestion.models.encoders import show_secrets_encoder
@ -54,7 +64,7 @@ class OMetaServiceTest(TestCase):
data = { data = {
"type": "mysql", "type": "mysql",
"serviceName": "local_mysql", "serviceName": "test-workflow-mysql",
"serviceConnection": { "serviceConnection": {
"config": { "config": {
"type": "Mysql", "type": "Mysql",
@ -92,10 +102,14 @@ class OMetaServiceTest(TestCase):
Mock a db service to build the IngestionPipeline Mock a db service to build the IngestionPipeline
""" """
service: DatabaseService = cls.metadata.get_service_or_create( cls.service: DatabaseService = cls.metadata.get_service_or_create(
entity=DatabaseService, config=cls.workflow_source entity=DatabaseService, config=cls.workflow_source
) )
cls.service_entity_id = service.id
cls.usage_service: DatabaseService = cls.metadata.get_service_or_create(
entity=DatabaseService,
config=cls.usage_workflow_source,
)
@classmethod @classmethod
def tearDownClass(cls) -> None: def tearDownClass(cls) -> None:
@ -104,7 +118,7 @@ class OMetaServiceTest(TestCase):
""" """
cls.metadata.delete( cls.metadata.delete(
entity=DatabaseService, entity=DatabaseService,
entity_id=cls.service_entity_id, entity_id=cls.service.id,
recursive=True, recursive=True,
hard_delete=True, hard_delete=True,
) )
@ -120,14 +134,15 @@ class OMetaServiceTest(TestCase):
name="test_ingestion_workflow", name="test_ingestion_workflow",
pipelineType=PipelineType.metadata, pipelineType=PipelineType.metadata,
fullyQualifiedName="local_mysql.test_ingestion_workflow", fullyQualifiedName="local_mysql.test_ingestion_workflow",
source=self.workflow_source, sourceConfig=SourceConfig(config=DatabaseServiceMetadataPipeline()),
openMetadataServerConnection=self.server_config, openMetadataServerConnection=self.server_config,
airflowConfig=AirflowConfig( airflowConfig=AirflowConfig(
startDate="2022-04-10", startDate="2022-06-10T15:06:47+00:00",
), ),
service=EntityReference( service=EntityReference(
id=self.service_entity_id, id=self.service.id,
type="databaseService", type="databaseService",
name=self.service.name.__root__,
), ),
) )
@ -147,14 +162,15 @@ class OMetaServiceTest(TestCase):
name="test_usage_workflow", name="test_usage_workflow",
pipelineType=PipelineType.usage, pipelineType=PipelineType.usage,
fullyQualifiedName="local_snowflake.test_usage_workflow", fullyQualifiedName="local_snowflake.test_usage_workflow",
source=self.usage_workflow_source, sourceConfig=SourceConfig(config=DatabaseServiceQueryUsagePipeline()),
openMetadataServerConnection=self.server_config, openMetadataServerConnection=self.server_config,
airflowConfig=AirflowConfig( airflowConfig=AirflowConfig(
startDate="2022-04-10", startDate="2022-06-10T15:06:47+00:00",
), ),
service=EntityReference( service=EntityReference(
id=self.service_entity_id, id=self.usage_service.id,
type="databaseService", type="databaseService",
name=self.usage_service.name.__root__,
), ),
) )
@ -176,14 +192,15 @@ class OMetaServiceTest(TestCase):
name="test_profiler_workflow", name="test_profiler_workflow",
pipelineType=PipelineType.profiler, pipelineType=PipelineType.profiler,
fullyQualifiedName="local_mysql.test_profiler_workflow", fullyQualifiedName="local_mysql.test_profiler_workflow",
source=self.workflow_source, sourceConfig=SourceConfig(config=DatabaseServiceProfilerPipeline()),
openMetadataServerConnection=self.server_config, openMetadataServerConnection=self.server_config,
airflowConfig=AirflowConfig( airflowConfig=AirflowConfig(
startDate="2022-04-10", startDate="2022-06-10T15:06:47+00:00",
), ),
service=EntityReference( service=EntityReference(
id=self.service_entity_id, id=self.service.id,
type="databaseService", type="databaseService",
name=self.service.name.__root__,
), ),
) )

View File

@ -11,7 +11,7 @@
* limitations under the License. * limitations under the License.
*/ */
import { isEmpty, isUndefined } from 'lodash'; import { isUndefined } from 'lodash';
import { LoadingState } from 'Models'; import { LoadingState } from 'Models';
import React, { useMemo, useState } from 'react'; import React, { useMemo, useState } from 'react';
import { import {
@ -37,7 +37,7 @@ import {
DatabaseServiceMetadataPipelineClass, DatabaseServiceMetadataPipelineClass,
DbtConfigSource, DbtConfigSource,
} from '../../generated/metadataIngestion/databaseServiceMetadataPipeline'; } from '../../generated/metadataIngestion/databaseServiceMetadataPipeline';
import { getCurrentDate, getCurrentUserId } from '../../utils/CommonUtils'; import { getCurrentUserId } from '../../utils/CommonUtils';
import { getSourceTypeFromConfig } from '../../utils/DBTConfigFormUtil'; import { getSourceTypeFromConfig } from '../../utils/DBTConfigFormUtil';
import { escapeBackwardSlashChar } from '../../utils/JSONSchemaFormUtils'; import { escapeBackwardSlashChar } from '../../utils/JSONSchemaFormUtils';
import { getIngestionName } from '../../utils/ServiceUtils'; import { getIngestionName } from '../../utils/ServiceUtils';
@ -93,11 +93,6 @@ const AddIngestion = ({
const [repeatFrequency, setRepeatFrequency] = useState( const [repeatFrequency, setRepeatFrequency] = useState(
data?.airflowConfig.scheduleInterval ?? INGESTION_SCHEDULER_INITIAL_VALUE data?.airflowConfig.scheduleInterval ?? INGESTION_SCHEDULER_INITIAL_VALUE
); );
const [startDate] = useState(
data?.airflowConfig.startDate ?? getCurrentDate()
);
const [endDate] = useState(data?.airflowConfig?.endDate ?? '');
const [showDashboardFilter, setShowDashboardFilter] = useState( const [showDashboardFilter, setShowDashboardFilter] = useState(
!isUndefined( !isUndefined(
(data?.sourceConfig.config as ConfigClass)?.dashboardFilterPattern (data?.sourceConfig.config as ConfigClass)?.dashboardFilterPattern
@ -474,8 +469,6 @@ const AddIngestion = ({
const createNewIngestion = () => { const createNewIngestion = () => {
const ingestionDetails: CreateIngestionPipeline = { const ingestionDetails: CreateIngestionPipeline = {
airflowConfig: { airflowConfig: {
startDate: startDate as unknown as Date,
endDate: isEmpty(endDate) ? undefined : (endDate as unknown as Date),
scheduleInterval: repeatFrequency, scheduleInterval: repeatFrequency,
}, },
loggerLevel: enableDebugLog ? LogLevels.Debug : LogLevels.Info, loggerLevel: enableDebugLog ? LogLevels.Debug : LogLevels.Info,
@ -522,8 +515,6 @@ const AddIngestion = ({
...data, ...data,
airflowConfig: { airflowConfig: {
...data.airflowConfig, ...data.airflowConfig,
startDate: startDate as unknown as Date,
endDate: (endDate as unknown as Date) || null,
scheduleInterval: repeatFrequency, scheduleInterval: repeatFrequency,
}, },
loggerLevel: enableDebugLog ? LogLevels.Debug : LogLevels.Info, loggerLevel: enableDebugLog ? LogLevels.Debug : LogLevels.Info,

View File

@ -19,7 +19,6 @@ import {
RecentlyViewed, RecentlyViewed,
RecentlyViewedData, RecentlyViewedData,
} from 'Models'; } from 'Models';
import { utc } from 'moment';
import React, { FormEvent } from 'react'; import React, { FormEvent } from 'react';
import { reactLocalStorage } from 'reactjs-localstorage'; import { reactLocalStorage } from 'reactjs-localstorage';
import AppState from '../AppState'; import AppState from '../AppState';
@ -419,10 +418,6 @@ export const getServiceLogo = (
return null; return null;
}; };
export const getCurrentDate = () => {
return `${utc(new Date()).format('YYYY-MM-DD')}`;
};
export const getSvgArrow = (isActive: boolean) => { export const getSvgArrow = (isActive: boolean) => {
return isActive ? ( return isActive ? (
<SVGIcons alt="arrow-down" icon={Icons.ARROW_DOWN_PRIMARY} /> <SVGIcons alt="arrow-down" icon={Icons.ARROW_DOWN_PRIMARY} />