mirror of
https://github.com/datahub-project/datahub.git
synced 2025-12-25 17:08:29 +00:00
fix(graphql/upsertIngestionSource): Validate cron schedule; parse error in CLI (#11011)
This commit is contained in:
parent
1f7c92bb99
commit
bc75f7a1b1
@ -22,6 +22,7 @@ dependencies {
|
||||
implementation externalDependency.opentelemetryAnnotations
|
||||
|
||||
implementation externalDependency.slf4jApi
|
||||
implementation externalDependency.springContext
|
||||
compileOnly externalDependency.lombok
|
||||
annotationProcessor externalDependency.lombok
|
||||
|
||||
|
||||
@ -25,12 +25,16 @@ import com.linkedin.mxe.MetadataChangeProposal;
|
||||
import graphql.schema.DataFetcher;
|
||||
import graphql.schema.DataFetchingEnvironment;
|
||||
import java.net.URISyntaxException;
|
||||
import java.time.DateTimeException;
|
||||
import java.time.ZoneId;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.stream.Collectors;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.scheduling.support.CronExpression;
|
||||
|
||||
/** Creates or updates an ingestion source. Requires the MANAGE_INGESTION privilege. */
|
||||
@Slf4j
|
||||
@ -46,55 +50,51 @@ public class UpsertIngestionSourceResolver implements DataFetcher<CompletableFut
|
||||
public CompletableFuture<String> get(final DataFetchingEnvironment environment) throws Exception {
|
||||
final QueryContext context = environment.getContext();
|
||||
|
||||
if (!IngestionAuthUtils.canManageIngestion(context)) {
|
||||
throw new AuthorizationException(
|
||||
"Unauthorized to perform this action. Please contact your DataHub administrator.");
|
||||
}
|
||||
final Optional<String> ingestionSourceUrn = Optional.ofNullable(environment.getArgument("urn"));
|
||||
final UpdateIngestionSourceInput input =
|
||||
bindArgument(environment.getArgument("input"), UpdateIngestionSourceInput.class);
|
||||
|
||||
// Create the policy info.
|
||||
final DataHubIngestionSourceInfo info = mapIngestionSourceInfo(input);
|
||||
final MetadataChangeProposal proposal;
|
||||
if (ingestionSourceUrn.isPresent()) {
|
||||
// Update existing ingestion source
|
||||
try {
|
||||
proposal =
|
||||
buildMetadataChangeProposalWithUrn(
|
||||
Urn.createFromString(ingestionSourceUrn.get()), INGESTION_INFO_ASPECT_NAME, info);
|
||||
} catch (URISyntaxException e) {
|
||||
throw new DataHubGraphQLException(
|
||||
String.format("Malformed urn %s provided.", ingestionSourceUrn.get()),
|
||||
DataHubGraphQLErrorCode.BAD_REQUEST);
|
||||
}
|
||||
} else {
|
||||
// Create new ingestion source
|
||||
// Since we are creating a new Ingestion Source, we need to generate a unique UUID.
|
||||
final UUID uuid = UUID.randomUUID();
|
||||
final String uuidStr = uuid.toString();
|
||||
final DataHubIngestionSourceKey key = new DataHubIngestionSourceKey();
|
||||
key.setId(uuidStr);
|
||||
proposal =
|
||||
buildMetadataChangeProposalWithKey(
|
||||
key, INGESTION_SOURCE_ENTITY_NAME, INGESTION_INFO_ASPECT_NAME, info);
|
||||
}
|
||||
|
||||
return GraphQLConcurrencyUtils.supplyAsync(
|
||||
() -> {
|
||||
if (IngestionAuthUtils.canManageIngestion(context)) {
|
||||
|
||||
final Optional<String> ingestionSourceUrn =
|
||||
Optional.ofNullable(environment.getArgument("urn"));
|
||||
final UpdateIngestionSourceInput input =
|
||||
bindArgument(environment.getArgument("input"), UpdateIngestionSourceInput.class);
|
||||
|
||||
// Create the policy info.
|
||||
final DataHubIngestionSourceInfo info = mapIngestionSourceInfo(input);
|
||||
final MetadataChangeProposal proposal;
|
||||
if (ingestionSourceUrn.isPresent()) {
|
||||
// Update existing ingestion source
|
||||
try {
|
||||
proposal =
|
||||
buildMetadataChangeProposalWithUrn(
|
||||
Urn.createFromString(ingestionSourceUrn.get()),
|
||||
INGESTION_INFO_ASPECT_NAME,
|
||||
info);
|
||||
} catch (URISyntaxException e) {
|
||||
throw new DataHubGraphQLException(
|
||||
String.format("Malformed urn %s provided.", ingestionSourceUrn.get()),
|
||||
DataHubGraphQLErrorCode.BAD_REQUEST);
|
||||
}
|
||||
} else {
|
||||
// Create new ingestion source
|
||||
// Since we are creating a new Ingestion Source, we need to generate a unique UUID.
|
||||
final UUID uuid = UUID.randomUUID();
|
||||
final String uuidStr = uuid.toString();
|
||||
final DataHubIngestionSourceKey key = new DataHubIngestionSourceKey();
|
||||
key.setId(uuidStr);
|
||||
proposal =
|
||||
buildMetadataChangeProposalWithKey(
|
||||
key, INGESTION_SOURCE_ENTITY_NAME, INGESTION_INFO_ASPECT_NAME, info);
|
||||
}
|
||||
|
||||
try {
|
||||
return _entityClient.ingestProposal(context.getOperationContext(), proposal, false);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(
|
||||
String.format(
|
||||
"Failed to perform update against ingestion source with urn %s",
|
||||
input.toString()),
|
||||
e);
|
||||
}
|
||||
try {
|
||||
return _entityClient.ingestProposal(context.getOperationContext(), proposal, false);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(
|
||||
String.format(
|
||||
"Failed to perform update against ingestion source with urn %s",
|
||||
input.toString()),
|
||||
e);
|
||||
}
|
||||
throw new AuthorizationException(
|
||||
"Unauthorized to perform this action. Please contact your DataHub administrator.");
|
||||
},
|
||||
this.getClass().getSimpleName(),
|
||||
"get");
|
||||
@ -137,9 +137,38 @@ public class UpsertIngestionSourceResolver implements DataFetcher<CompletableFut
|
||||
|
||||
private DataHubIngestionSourceSchedule mapSchedule(
|
||||
final UpdateIngestionSourceScheduleInput input) {
|
||||
|
||||
final String modifiedCronInterval = adjustCronInterval(input.getInterval());
|
||||
try {
|
||||
CronExpression.parse(modifiedCronInterval);
|
||||
} catch (IllegalArgumentException e) {
|
||||
throw new DataHubGraphQLException(
|
||||
String.format("Invalid cron schedule `%s`: %s", input.getInterval(), e.getMessage()),
|
||||
DataHubGraphQLErrorCode.BAD_REQUEST);
|
||||
}
|
||||
try {
|
||||
ZoneId.of(input.getTimezone());
|
||||
} catch (DateTimeException e) {
|
||||
throw new DataHubGraphQLException(
|
||||
String.format("Invalid timezone `%s`: %s", input.getTimezone(), e.getMessage()),
|
||||
DataHubGraphQLErrorCode.BAD_REQUEST);
|
||||
}
|
||||
|
||||
final DataHubIngestionSourceSchedule result = new DataHubIngestionSourceSchedule();
|
||||
result.setInterval(input.getInterval());
|
||||
result.setTimezone(input.getTimezone());
|
||||
return result;
|
||||
}
|
||||
|
||||
// Copied from IngestionScheduler.java
|
||||
private String adjustCronInterval(final String origCronInterval) {
|
||||
Objects.requireNonNull(origCronInterval, "origCronInterval must not be null");
|
||||
// Typically we support 5-character cron. Spring's lib only supports 6 character cron so we make
|
||||
// an adjustment here.
|
||||
final String[] originalCronParts = origCronInterval.split(" ");
|
||||
if (originalCronParts.length == 5) {
|
||||
return String.format("0 %s", origCronInterval);
|
||||
}
|
||||
return origCronInterval;
|
||||
}
|
||||
}
|
||||
|
||||
@ -7,6 +7,7 @@ import static org.mockito.ArgumentMatchers.anyBoolean;
|
||||
import static org.testng.Assert.*;
|
||||
|
||||
import com.linkedin.datahub.graphql.QueryContext;
|
||||
import com.linkedin.datahub.graphql.exception.DataHubGraphQLException;
|
||||
import com.linkedin.datahub.graphql.generated.UpdateIngestionSourceConfigInput;
|
||||
import com.linkedin.datahub.graphql.generated.UpdateIngestionSourceInput;
|
||||
import com.linkedin.datahub.graphql.generated.UpdateIngestionSourceScheduleInput;
|
||||
@ -22,14 +23,17 @@ import org.testng.annotations.Test;
|
||||
|
||||
public class UpsertIngestionSourceResolverTest {
|
||||
|
||||
private static final UpdateIngestionSourceInput TEST_INPUT =
|
||||
new UpdateIngestionSourceInput(
|
||||
"Test source",
|
||||
"mysql",
|
||||
"Test source description",
|
||||
new UpdateIngestionSourceScheduleInput("* * * * *", "UTC"),
|
||||
new UpdateIngestionSourceConfigInput(
|
||||
"my test recipe", "0.8.18", "executor id", false, null));
|
||||
private static final UpdateIngestionSourceInput TEST_INPUT = makeInput();
|
||||
|
||||
private static UpdateIngestionSourceInput makeInput() {
|
||||
return new UpdateIngestionSourceInput(
|
||||
"Test source",
|
||||
"mysql",
|
||||
"Test source description",
|
||||
new UpdateIngestionSourceScheduleInput("* * * * *", "UTC"),
|
||||
new UpdateIngestionSourceConfigInput(
|
||||
"my test recipe", "0.8.18", "executor id", false, null));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetSuccess() throws Exception {
|
||||
@ -104,4 +108,54 @@ public class UpsertIngestionSourceResolverTest {
|
||||
|
||||
assertThrows(RuntimeException.class, () -> resolver.get(mockEnv).join());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUpsertWithInvalidCron() throws Exception {
|
||||
final UpdateIngestionSourceInput input = makeInput();
|
||||
input.setSchedule(new UpdateIngestionSourceScheduleInput("* * * * 123", "UTC"));
|
||||
|
||||
// Create resolver
|
||||
EntityClient mockClient = Mockito.mock(EntityClient.class);
|
||||
UpsertIngestionSourceResolver resolver = new UpsertIngestionSourceResolver(mockClient);
|
||||
|
||||
// Execute resolver
|
||||
QueryContext mockContext = getMockAllowContext();
|
||||
DataFetchingEnvironment mockEnv = Mockito.mock(DataFetchingEnvironment.class);
|
||||
Mockito.when(mockEnv.getArgument(Mockito.eq("urn")))
|
||||
.thenReturn(TEST_INGESTION_SOURCE_URN.toString());
|
||||
Mockito.when(mockEnv.getArgument(Mockito.eq("input"))).thenReturn(input);
|
||||
Mockito.when(mockEnv.getContext()).thenReturn(mockContext);
|
||||
|
||||
assertThrows(DataHubGraphQLException.class, () -> resolver.get(mockEnv).join());
|
||||
Mockito.verify(mockClient, Mockito.times(0)).ingestProposal(any(), any(), anyBoolean());
|
||||
|
||||
input.setSchedule(new UpdateIngestionSourceScheduleInput("null", "UTC"));
|
||||
assertThrows(DataHubGraphQLException.class, () -> resolver.get(mockEnv).join());
|
||||
Mockito.verify(mockClient, Mockito.times(0)).ingestProposal(any(), any(), anyBoolean());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUpsertWithInvalidTimezone() throws Exception {
|
||||
final UpdateIngestionSourceInput input = makeInput();
|
||||
input.setSchedule(new UpdateIngestionSourceScheduleInput("* * * * *", "Invalid"));
|
||||
|
||||
// Create resolver
|
||||
EntityClient mockClient = Mockito.mock(EntityClient.class);
|
||||
UpsertIngestionSourceResolver resolver = new UpsertIngestionSourceResolver(mockClient);
|
||||
|
||||
// Execute resolver
|
||||
QueryContext mockContext = getMockAllowContext();
|
||||
DataFetchingEnvironment mockEnv = Mockito.mock(DataFetchingEnvironment.class);
|
||||
Mockito.when(mockEnv.getArgument(Mockito.eq("urn")))
|
||||
.thenReturn(TEST_INGESTION_SOURCE_URN.toString());
|
||||
Mockito.when(mockEnv.getArgument(Mockito.eq("input"))).thenReturn(input);
|
||||
Mockito.when(mockEnv.getContext()).thenReturn(mockContext);
|
||||
|
||||
assertThrows(DataHubGraphQLException.class, () -> resolver.get(mockEnv).join());
|
||||
Mockito.verify(mockClient, Mockito.times(0)).ingestProposal(any(), any(), anyBoolean());
|
||||
|
||||
input.setSchedule(new UpdateIngestionSourceScheduleInput("* * * * *", "America/Los_Angel"));
|
||||
assertThrows(DataHubGraphQLException.class, () -> resolver.get(mockEnv).join());
|
||||
Mockito.verify(mockClient, Mockito.times(0)).ingestProposal(any(), any(), anyBoolean());
|
||||
}
|
||||
}
|
||||
|
||||
@ -16,7 +16,7 @@ from tabulate import tabulate
|
||||
import datahub as datahub_package
|
||||
from datahub.cli import cli_utils
|
||||
from datahub.cli.config_utils import CONDENSED_DATAHUB_CONFIG_PATH
|
||||
from datahub.configuration.common import ConfigModel
|
||||
from datahub.configuration.common import ConfigModel, GraphError
|
||||
from datahub.configuration.config_loader import load_config_file
|
||||
from datahub.emitter.mce_builder import datahub_guid
|
||||
from datahub.ingestion.graph.client import get_default_graph
|
||||
@ -372,7 +372,19 @@ def deploy(
|
||||
"""
|
||||
)
|
||||
|
||||
response = datahub_graph.execute_graphql(graphql_query, variables=variables)
|
||||
try:
|
||||
response = datahub_graph.execute_graphql(
|
||||
graphql_query, variables=variables, format_exception=False
|
||||
)
|
||||
except GraphError as graph_error:
|
||||
try:
|
||||
error = json.loads(str(graph_error).replace('"', '\\"').replace("'", '"'))
|
||||
click.secho(error[0]["message"], fg="red", err=True)
|
||||
except Exception:
|
||||
click.secho(
|
||||
f"Could not create ingestion source:\n{graph_error}", fg="red", err=True
|
||||
)
|
||||
sys.exit(1)
|
||||
|
||||
click.echo(
|
||||
f"✅ Successfully wrote data ingestion source metadata for recipe {deploy_options.name}:"
|
||||
|
||||
@ -1111,6 +1111,7 @@ class DataHubGraph(DatahubRestEmitter):
|
||||
query: str,
|
||||
variables: Optional[Dict] = None,
|
||||
operation_name: Optional[str] = None,
|
||||
format_exception: bool = True,
|
||||
) -> Dict:
|
||||
url = f"{self.config.server}/api/graphql"
|
||||
|
||||
@ -1127,7 +1128,10 @@ class DataHubGraph(DatahubRestEmitter):
|
||||
)
|
||||
result = self._post_generic(url, body)
|
||||
if result.get("errors"):
|
||||
raise GraphError(f"Error executing graphql query: {result['errors']}")
|
||||
if format_exception:
|
||||
raise GraphError(f"Error executing graphql query: {result['errors']}")
|
||||
else:
|
||||
raise GraphError(result["errors"])
|
||||
|
||||
return result["data"]
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user