Ingestion Pipeline deployed, Athena tests and pydantic extras (#8682)

* Always run python tests

* Fix athena tests and types

* Update deployed prop in IngestionPipeline

* Fix #8554

* Format

* Use true as default deployed migration

* Remove repeated req

* Pydantic wiggle room
This commit is contained in:
Pere Miquel Brull 2022-11-13 11:59:43 +01:00 committed by GitHub
parent 6306c9ac8d
commit 34ba9d95c5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 42 additions and 21 deletions

View File

@ -15,17 +15,12 @@ on:
branches:
- main
- '0.[0-9]+.[0-9]+'
paths:
- ingestion/**
- openmetadata-service/**
paths-ignore:
- 'openmetadata-docs/**'
pull_request_target:
types: [labeled, opened, synchronize, reopened]
branches:
- main
- '0.[0-9]+.[0-9]+'
paths:
- ingestion/**
- openmetadata-service/**
paths-ignore:
- 'openmetadata-docs/**'
permissions:
contents: read

View File

@ -109,4 +109,11 @@ CREATE TABLE IF NOT EXISTS metadata_service_entity (
deleted BOOLEAN GENERATED ALWAYS AS (json -> '$.deleted'),
PRIMARY KEY (id),
UNIQUE (name)
);
);
-- We are starting to store the current deployed flag. Let's mark it as false by default
UPDATE ingestion_pipeline_entity
SET json = JSON_REMOVE(json ,'$.deployed');
UPDATE ingestion_pipeline_entity
SET json = JSON_INSERT(json ,'$.deployed', 'true');

View File

@ -136,3 +136,10 @@ CREATE TABLE IF NOT EXISTS metadata_service_entity (
PRIMARY KEY (id),
UNIQUE (name)
);
-- We are starting to store the current deployed flag. Let's mark it as false by default
UPDATE ingestion_pipeline_entity
SET json = json::jsonb #- '{deployed}';
UPDATE ingestion_pipeline_entity
SET json = jsonb_set(json::jsonb, '{deployed}', 'true'::jsonb, true);

View File

@ -27,11 +27,11 @@ base_requirements = {
"idna<3,>=2.5",
"mypy_extensions>=0.4.3",
"typing-inspect",
"pydantic[email]==1.9.0",
"pydantic~=1.9.0",
"email-validator>=1.0.3",
"google>=3.0.0",
"google-auth>=1.33.0",
"python-dateutil>=2.8.1",
"email-validator>=1.0.3",
"wheel~=0.36.2",
"python-jose==3.3.0",
"sqlalchemy>=1.4.0",

View File

@ -464,7 +464,9 @@ def _(connection: AthenaConnection):
url += ":"
url += f"@athena.{connection.awsConfig.awsRegion}.amazonaws.com:443"
url += f"?s3_staging_dir={quote_plus(connection.s3StagingDir)}"
staging_url = connection.s3StagingDir.scheme + "://" + str(connection.s3StagingDir)
url += f"?s3_staging_dir={quote_plus(staging_url)}"
if connection.workgroup:
url += f"&work_group={connection.workgroup}"
if connection.awsConfig.awsSessionToken:

View File

@ -11,6 +11,8 @@
from unittest import TestCase
from pydantic import AnyUrl
from metadata.generated.schema.entity.services.connections.database.athenaConnection import (
AthenaConnection,
AthenaScheme,
@ -766,20 +768,20 @@ class SouceConnectionTest(TestCase):
awsAccessKeyId="key", awsRegion="us-east-2", awsSecretAccessKey="secret_key"
)
expected_url = "awsathena+rest://key:secret_key@athena.us-east-2.amazonaws.com:443?s3_staging_dir=s3athena-postgres&work_group=primary"
expected_url = "awsathena+rest://key:secret_key@athena.us-east-2.amazonaws.com:443?s3_staging_dir=s3%3A%2F%2Fs3athena-postgres&work_group=primary"
athena_conn_obj = AthenaConnection(
awsConfig=awsCreds,
s3StagingDir="s3athena-postgres",
s3StagingDir=AnyUrl("s3athena-postgres", scheme="s3"),
workgroup="primary",
scheme=AthenaScheme.awsathena_rest,
)
assert expected_url == get_connection_url(athena_conn_obj)
# connection arguments witho db
expected_url = "awsathena+rest://key:secret_key@athena.us-east-2.amazonaws.com:443?s3_staging_dir=s3athena-postgres&work_group=primary"
# connection arguments with db
expected_url = "awsathena+rest://key:secret_key@athena.us-east-2.amazonaws.com:443?s3_staging_dir=s3%3A%2F%2Fs3athena-postgres&work_group=primary"
athena_conn_obj = AthenaConnection(
awsConfig=awsCreds,
s3StagingDir="s3athena-postgres",
s3StagingDir=AnyUrl("s3athena-postgres", scheme="s3"),
workgroup="primary",
scheme=AthenaScheme.awsathena_rest,
)

View File

@ -45,8 +45,8 @@ import org.openmetadata.service.util.RestUtil;
import org.openmetadata.service.util.ResultList;
public class IngestionPipelineRepository extends EntityRepository<IngestionPipeline> {
private static final String UPDATE_FIELDS = "owner,sourceConfig,airflowConfig,loggerLevel,enabled";
private static final String PATCH_FIELDS = "owner,sourceConfig,airflowConfig,loggerLevel,enabled";
private static final String UPDATE_FIELDS = "owner,sourceConfig,airflowConfig,loggerLevel,enabled,deployed";
private static final String PATCH_FIELDS = "owner,sourceConfig,airflowConfig,loggerLevel,enabled,deployed";
private static final String PIPELINE_STATUS_JSON_SCHEMA = "pipelineStatus";
private static PipelineServiceClient pipelineServiceClient;
@ -230,6 +230,7 @@ public class IngestionPipelineRepository extends EntityRepository<IngestionPipel
original.getOpenMetadataServerConnection(), updated.getOpenMetadataServerConnection());
updateLogLevel(original.getLoggerLevel(), updated.getLoggerLevel());
updateEnabled(original.getEnabled(), updated.getEnabled());
updateDeployed(original.getDeployed(), updated.getDeployed());
}
private void updateSourceConfig() throws JsonProcessingException {
@ -266,6 +267,12 @@ public class IngestionPipelineRepository extends EntityRepository<IngestionPipel
}
}
private void updateDeployed(Boolean origDeployed, Boolean updatedDeployed) throws JsonProcessingException {
if (updatedDeployed != null && !origDeployed.equals(updatedDeployed)) {
recordChange("deployed", origDeployed, updatedDeployed);
}
}
private void updateEnabled(Boolean origEnabled, Boolean updatedEnabled) throws JsonProcessingException {
if (updatedEnabled != null && !origEnabled.equals(updatedEnabled)) {
recordChange("enabled", origEnabled, updatedEnabled);

View File

@ -170,7 +170,8 @@
},
"deployed": {
"description": "Indicates if the workflow has been successfully deployed to Airflow.",
"type": "boolean"
"type": "boolean",
"default": false
},
"enabled": {
"description": "True if the pipeline is ready to be run in the next schedule. False if it is paused.",