diff --git a/bootstrap/sql/migrations/native/1.2.0/mysql/schemaChanges.sql b/bootstrap/sql/migrations/native/1.2.0/mysql/schemaChanges.sql index 1e3b1b4417e..ee2c5125bf5 100644 --- a/bootstrap/sql/migrations/native/1.2.0/mysql/schemaChanges.sql +++ b/bootstrap/sql/migrations/native/1.2.0/mysql/schemaChanges.sql @@ -261,3 +261,16 @@ ALTER TABLE entity_extension_time_series DROP COLUMN temp; ALTER TABLE entity_extension_time_series MODIFY COLUMN entityFQNHash VARCHAR(768) COLLATE ascii_bin, MODIFY COLUMN jsonSchema VARCHAR(256) COLLATE ascii_bin, MODIFY COLUMN extension VARCHAR(256) COLLATE ascii_bin, ADD CONSTRAINT entity_extension_time_series_constraint UNIQUE (entityFQNHash, extension, timestamp); + +-- Airflow pipeline status set to millis +UPDATE entity_extension_time_series ts +JOIN pipeline_entity p + ON ts.entityFQNHash = p.fqnHash +SET ts.json = JSON_INSERT( + JSON_REMOVE(ts.json, '$.timestamp'), + '$.timestamp', + JSON_EXTRACT(ts.json, '$.timestamp') * 1000 + ) +WHERE ts.extension = 'pipeline.pipelineStatus' + AND JSON_EXTRACT(p.json, '$.serviceType') = 'Airflow' +; diff --git a/bootstrap/sql/migrations/native/1.2.0/postgres/schemaChanges.sql b/bootstrap/sql/migrations/native/1.2.0/postgres/schemaChanges.sql index 58068702d02..6cb0f366722 100644 --- a/bootstrap/sql/migrations/native/1.2.0/postgres/schemaChanges.sql +++ b/bootstrap/sql/migrations/native/1.2.0/postgres/schemaChanges.sql @@ -276,3 +276,16 @@ ALTER TABLE entity_extension_time_series DROP COLUMN temp; ALTER TABLE entity_extension_time_series ALTER COLUMN entityFQNHash TYPE VARCHAR(768), ALTER COLUMN jsonSchema TYPE VARCHAR(256) , ALTER COLUMN extension TYPE VARCHAR(256), ADD CONSTRAINT entity_extension_time_series_constraint UNIQUE (entityFQNHash, extension, timestamp); + +-- Airflow pipeline status set to millis +UPDATE entity_extension_time_series ts +SET json = jsonb_set( + ts.json, + '{timestamp}', + to_jsonb(cast(ts.json #> '{timestamp}' as int8) *1000) +) +FROM pipeline_entity p +WHERE ts.entityFQNHash = p.fqnHash + and ts.extension = 'pipeline.pipelineStatus' + AND p.json #>> '{serviceType}' = 'Airflow' +; \ No newline at end of file diff --git a/ingestion/src/airflow_provider_openmetadata/lineage/backend.py b/ingestion/src/airflow_provider_openmetadata/lineage/backend.py index de056eab9e2..af770d753b5 100644 --- a/ingestion/src/airflow_provider_openmetadata/lineage/backend.py +++ b/ingestion/src/airflow_provider_openmetadata/lineage/backend.py @@ -35,20 +35,13 @@ class OpenMetadataLineageBackend(LineageBackend): """ Sends lineage data from tasks to OpenMetadata. - Configurable via ``airflow_provider_openmetadata.cfg`` as follows: :: + Configurable via `airflow.cfg` as follows: + [lineage] - backend = airflow_provider_openmetadata.lineage.OpenMetadataLineageBackend - airflow_service_name = airflow #make sure this service_name matches - the one configured in openMetadata - openmetadata_api_endpoint = http://localhost:8585 - auth_provider_type = no-auth # use google here if you are - configuring google as SSO - secret_key = google-client-secret-key # it needs to be configured - only if you are using google as SSO the one configured in openMetadata - openmetadata_api_endpoint = http://localhost:8585 - auth_provider_type = no-auth # use google here if you are configuring google as SSO - secret_key = google-client-secret-key # it needs to be configured - only if you are using google as SSO + backend = airflow_provider_openmetadata.lineage.backend.OpenMetadataLineageBackend + airflow_service_name = airflow + openmetadata_api_endpoint = http://localhost:8585/api + jwt_token = # To auth to the OpenMetadata API """ def send_lineage( diff --git a/ingestion/src/airflow_provider_openmetadata/lineage/config/loader.py b/ingestion/src/airflow_provider_openmetadata/lineage/config/loader.py index e4756ad1582..59ec3caec6a 100644 --- a/ingestion/src/airflow_provider_openmetadata/lineage/config/loader.py +++ b/ingestion/src/airflow_provider_openmetadata/lineage/config/loader.py @@ -19,14 +19,13 @@ from airflow.configuration import AirflowConfigParser from pydantic import BaseModel from airflow_provider_openmetadata.lineage.config.commons import LINEAGE -from airflow_provider_openmetadata.lineage.config.providers import ( - InvalidAirflowProviderException, - provider_config_registry, -) from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( AuthProvider, OpenMetadataConnection, ) +from metadata.generated.schema.security.client.openMetadataJWTClientConfig import ( + OpenMetadataJWTClientConfig, +) class AirflowLineageConfig(BaseModel): @@ -43,21 +42,6 @@ def parse_airflow_config( Get airflow config from airflow.cfg and parse it to the config model """ - auth_provider_type = conf.get( - LINEAGE, "auth_provider_type", fallback=AuthProvider.no_auth.value - ) - - if auth_provider_type == AuthProvider.no_auth.value: - security_config = None - else: - load_security_config_fn = provider_config_registry.registry.get( - auth_provider_type - ) - if not load_security_config_fn: - raise InvalidAirflowProviderException( - f"Cannot find {auth_provider_type} in airflow providers registry." - ) - security_config = load_security_config_fn(conf) return AirflowLineageConfig( airflow_service_name=airflow_service_name, @@ -73,8 +57,14 @@ def parse_airflow_config( "openmetadata_api_endpoint", fallback="http://localhost:8585/api", ), - authProvider=auth_provider_type, - securityConfig=security_config, + authProvider=AuthProvider.openmetadata.value, + securityConfig=OpenMetadataJWTClientConfig( + jwtToken=conf.get( + LINEAGE, + "jwt_token", + fallback=None, + ), + ), verifySSL=conf.get(LINEAGE, "verify_ssl", fallback="no-ssl"), ), ) @@ -102,10 +92,5 @@ def get_lineage_config() -> AirflowLineageConfig: config = json.load(config_file) return AirflowLineageConfig.parse_obj(config) - # If nothing is configured, let's use a default for local - return AirflowLineageConfig( - airflow_service_name="airflow", - metadata_config=OpenMetadataConnection( - hostPort="http://localhost:8585/api", - ), - ) + # If nothing is configured, raise + raise ValueError("Missing lineage backend configuration") diff --git a/ingestion/src/airflow_provider_openmetadata/lineage/config/providers.py b/ingestion/src/airflow_provider_openmetadata/lineage/config/providers.py deleted file mode 100644 index b268cd0e464..00000000000 --- a/ingestion/src/airflow_provider_openmetadata/lineage/config/providers.py +++ /dev/null @@ -1,123 +0,0 @@ -# Copyright 2021 Collate -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# http://www.apache.org/licenses/LICENSE-2.0 -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -OpenMetadata Airflow Lineage Backend security providers config -""" -import json - -from airflow.configuration import AirflowConfigParser - -from airflow_provider_openmetadata.lineage.config.commons import LINEAGE -from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( - AuthProvider, -) -from metadata.generated.schema.security.client.auth0SSOClientConfig import ( - Auth0SSOClientConfig, -) -from metadata.generated.schema.security.client.azureSSOClientConfig import ( - AzureSSOClientConfig, -) -from metadata.generated.schema.security.client.customOidcSSOClientConfig import ( - CustomOIDCSSOClientConfig, -) -from metadata.generated.schema.security.client.googleSSOClientConfig import ( - GoogleSSOClientConfig, -) -from metadata.generated.schema.security.client.oktaSSOClientConfig import ( - OktaSSOClientConfig, -) -from metadata.generated.schema.security.client.openMetadataJWTClientConfig import ( - OpenMetadataJWTClientConfig, -) -from metadata.utils.dispatch import enum_register - -provider_config_registry = enum_register() - - -class InvalidAirflowProviderException(Exception): - """ - Raised when we cannot find the provider - in Airflow config - """ - - -@provider_config_registry.add(AuthProvider.google.value) -def load_google_auth(conf: AirflowConfigParser) -> GoogleSSOClientConfig: - """ - Load config for Google Auth - """ - return GoogleSSOClientConfig( - secretKey=conf.get(LINEAGE, "secret_key"), - audience=conf.get( - LINEAGE, "audience", fallback="https://www.googleapis.com/oauth2/v4/token" - ), - ) - - -@provider_config_registry.add(AuthProvider.okta.value) -def load_okta_auth(conf: AirflowConfigParser) -> OktaSSOClientConfig: - """ - Load config for Google Auth - """ - return OktaSSOClientConfig( - clientId=conf.get(LINEAGE, "client_id"), - orgURL=conf.get(LINEAGE, "org_url"), - privateKey=conf.get(LINEAGE, "private_key"), - email=conf.get(LINEAGE, "email"), - # conf.getjson only available for airflow +2.3. Manually casting for lower versions - scopes=json.loads(conf.get(LINEAGE, "scopes", fallback="[]")), - ) - - -@provider_config_registry.add(AuthProvider.auth0.value) -def load_auth0_auth(conf: AirflowConfigParser) -> Auth0SSOClientConfig: - """ - Load config for Google Auth - """ - return Auth0SSOClientConfig( - clientId=conf.get(LINEAGE, "client_id"), - secretKey=conf.get(LINEAGE, "secret_key"), - domain=conf.get(LINEAGE, "domain"), - ) - - -@provider_config_registry.add(AuthProvider.azure.value) -def load_azure_auth(conf: AirflowConfigParser) -> AzureSSOClientConfig: - """ - Load config for Azure Auth - """ - return AzureSSOClientConfig( - clientSecret=conf.get(LINEAGE, "client_secret"), - authority=conf.get(LINEAGE, "authority"), - clientId=conf.get(LINEAGE, "client_id"), - scopes=json.loads(conf.get(LINEAGE, "scopes", fallback="[]")), - ) - - -@provider_config_registry.add(AuthProvider.openmetadata.value) -def load_om_auth(conf: AirflowConfigParser) -> OpenMetadataJWTClientConfig: - """ - Load config for Azure Auth - """ - return OpenMetadataJWTClientConfig(jwtToken=conf.get(LINEAGE, "jwt_token")) - - -@provider_config_registry.add(AuthProvider.custom_oidc.value) -def load_custom_oidc_auth(conf: AirflowConfigParser) -> CustomOIDCSSOClientConfig: - """ - Load config for Custom OIDC Auth - """ - return CustomOIDCSSOClientConfig( - clientId=conf.get(LINEAGE, "client_id"), - secretKey=conf.get(LINEAGE, "secret_key"), - tokenEndpoint=conf.get(LINEAGE, "token_endpoint"), - ) diff --git a/ingestion/tests/unit/airflow/test_lineage_providers.py b/ingestion/tests/unit/airflow/test_lineage_providers.py deleted file mode 100644 index c37c778619a..00000000000 --- a/ingestion/tests/unit/airflow/test_lineage_providers.py +++ /dev/null @@ -1,324 +0,0 @@ -# Copyright 2021 Collate -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# http://www.apache.org/licenses/LICENSE-2.0 -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Mock providers and check custom load -""" -from unittest import TestCase - -from airflow.configuration import AirflowConfigParser - -from airflow_provider_openmetadata.lineage.config.loader import ( - AirflowLineageConfig, - parse_airflow_config, -) -from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( - AuthProvider, - OpenMetadataConnection, -) -from metadata.generated.schema.security.client.auth0SSOClientConfig import ( - Auth0SSOClientConfig, -) -from metadata.generated.schema.security.client.azureSSOClientConfig import ( - AzureSSOClientConfig, -) -from metadata.generated.schema.security.client.customOidcSSOClientConfig import ( - CustomOIDCSSOClientConfig, -) -from metadata.generated.schema.security.client.googleSSOClientConfig import ( - GoogleSSOClientConfig, -) -from metadata.generated.schema.security.client.oktaSSOClientConfig import ( - OktaSSOClientConfig, -) -from metadata.generated.schema.security.client.openMetadataJWTClientConfig import ( - OpenMetadataJWTClientConfig, -) - -AIRFLOW_SERVICE_NAME = "test-service" - - -class TestAirflowAuthProviders(TestCase): - """ - Make sure we are properly loading all required classes - """ - - def test_google_sso(self): - sso_config = """ - [lineage] - backend = airflow_provider_openmetadata.lineage.openmetadata.OpenMetadataLineageBackend - airflow_service_name = local_airflow - openmetadata_api_endpoint = http://localhost:8585/api - - auth_provider_type = google - secret_key = path/to/key - """ - - # mock the conf object - conf = AirflowConfigParser(default_config=sso_config) - - lineage_config = parse_airflow_config(AIRFLOW_SERVICE_NAME, conf) - - self.assertEqual( - lineage_config, - AirflowLineageConfig( - airflow_service_name=AIRFLOW_SERVICE_NAME, - metadata_config=OpenMetadataConnection( - hostPort="http://localhost:8585/api", - authProvider=AuthProvider.google.value, - securityConfig=GoogleSSOClientConfig(secretKey="path/to/key"), - ), - ), - ) - - def test_okta_sso(self): - sso_config = """ - [lineage] - backend = airflow_provider_openmetadata.lineage.openmetadata.OpenMetadataLineageBackend - airflow_service_name = local_airflow - openmetadata_api_endpoint = http://localhost:8585/api - - auth_provider_type = okta - client_id = client_id - org_url = org_url - private_key = private_key - email = email - scopes = ["scope1", "scope2"] - """ - - # mock the conf object - conf = AirflowConfigParser(default_config=sso_config) - - lineage_config = parse_airflow_config(AIRFLOW_SERVICE_NAME, conf) - - self.assertEqual( - lineage_config, - AirflowLineageConfig( - airflow_service_name=AIRFLOW_SERVICE_NAME, - metadata_config=OpenMetadataConnection( - hostPort="http://localhost:8585/api", - authProvider=AuthProvider.okta.value, - securityConfig=OktaSSOClientConfig( - clientId="client_id", - orgURL="org_url", - privateKey="private_key", - email="email", - scopes=["scope1", "scope2"], - ), - ), - ), - ) - - # Validate default scopes - sso_config = """ - [lineage] - backend = airflow_provider_openmetadata.lineage.openmetadata.OpenMetadataLineageBackend - airflow_service_name = local_airflow - openmetadata_api_endpoint = http://localhost:8585/api - - auth_provider_type = okta - client_id = client_id - org_url = org_url - private_key = private_key - email = email - """ - - # mock the conf object - conf = AirflowConfigParser(default_config=sso_config) - - lineage_config = parse_airflow_config(AIRFLOW_SERVICE_NAME, conf) - - self.assertEqual( - lineage_config, - AirflowLineageConfig( - airflow_service_name=AIRFLOW_SERVICE_NAME, - metadata_config=OpenMetadataConnection( - hostPort="http://localhost:8585/api", - authProvider=AuthProvider.okta.value, - securityConfig=OktaSSOClientConfig( - clientId="client_id", - orgURL="org_url", - privateKey="private_key", - email="email", - scopes=[], - ), - ), - ), - ) - - def test_auth0_sso(self): - sso_config = """ - [lineage] - backend = airflow_provider_openmetadata.lineage.openmetadata.OpenMetadataLineageBackend - airflow_service_name = local_airflow - openmetadata_api_endpoint = http://localhost:8585/api - - auth_provider_type = auth0 - client_id = client_id - secret_key = secret_key - domain = domain - """ - - # mock the conf object - conf = AirflowConfigParser(default_config=sso_config) - - lineage_config = parse_airflow_config(AIRFLOW_SERVICE_NAME, conf) - - self.assertEqual( - lineage_config, - AirflowLineageConfig( - airflow_service_name=AIRFLOW_SERVICE_NAME, - metadata_config=OpenMetadataConnection( - hostPort="http://localhost:8585/api", - authProvider=AuthProvider.auth0.value, - securityConfig=Auth0SSOClientConfig( - clientId="client_id", - secretKey="secret_key", - domain="domain", - ), - ), - ), - ) - - def test_azure_sso(self): - sso_config = """ - [lineage] - backend = airflow_provider_openmetadata.lineage.openmetadata.OpenMetadataLineageBackend - airflow_service_name = local_airflow - openmetadata_api_endpoint = http://localhost:8585/api - - auth_provider_type = azure - client_id = client_id - client_secret = client_secret - authority = authority - scopes = ["scope1", "scope2"] - """ - - # mock the conf object - conf = AirflowConfigParser(default_config=sso_config) - - lineage_config = parse_airflow_config(AIRFLOW_SERVICE_NAME, conf) - - self.assertEqual( - lineage_config, - AirflowLineageConfig( - airflow_service_name=AIRFLOW_SERVICE_NAME, - metadata_config=OpenMetadataConnection( - hostPort="http://localhost:8585/api", - authProvider=AuthProvider.azure.value, - securityConfig=AzureSSOClientConfig( - clientId="client_id", - clientSecret="client_secret", - authority="authority", - scopes=["scope1", "scope2"], - ), - ), - ), - ) - - # Validate default scopes - sso_config = """ - [lineage] - backend = airflow_provider_openmetadata.lineage.openmetadata.OpenMetadataLineageBackend - airflow_service_name = local_airflow - openmetadata_api_endpoint = http://localhost:8585/api - - auth_provider_type = azure - client_id = client_id - client_secret = client_secret - authority = authority - """ - - # mock the conf object - conf = AirflowConfigParser(default_config=sso_config) - - lineage_config = parse_airflow_config(AIRFLOW_SERVICE_NAME, conf) - - self.assertEqual( - lineage_config, - AirflowLineageConfig( - airflow_service_name=AIRFLOW_SERVICE_NAME, - metadata_config=OpenMetadataConnection( - hostPort="http://localhost:8585/api", - authProvider=AuthProvider.azure.value, - securityConfig=AzureSSOClientConfig( - clientId="client_id", - clientSecret="client_secret", - authority="authority", - scopes=[], - ), - ), - ), - ) - - def test_om_sso(self): - sso_config = """ - [lineage] - backend = airflow_provider_openmetadata.lineage.openmetadata.OpenMetadataLineageBackend - airflow_service_name = local_airflow - openmetadata_api_endpoint = http://localhost:8585/api - - auth_provider_type = openmetadata - jwt_token = jwt_token - """ - - # mock the conf object - conf = AirflowConfigParser(default_config=sso_config) - - lineage_config = parse_airflow_config(AIRFLOW_SERVICE_NAME, conf) - - self.assertEqual( - lineage_config, - AirflowLineageConfig( - airflow_service_name=AIRFLOW_SERVICE_NAME, - metadata_config=OpenMetadataConnection( - hostPort="http://localhost:8585/api", - authProvider=AuthProvider.openmetadata.value, - securityConfig=OpenMetadataJWTClientConfig( - jwtToken="jwt_token", - ), - ), - ), - ) - - def test_custom_oidc_sso(self): - sso_config = """ - [lineage] - backend = airflow_provider_openmetadata.lineage.openmetadata.OpenMetadataLineageBackend - airflow_service_name = local_airflow - openmetadata_api_endpoint = http://localhost:8585/api - - auth_provider_type = custom-oidc - client_id = client_id - secret_key = secret_key - token_endpoint = token_endpoint - """ - - # mock the conf object - conf = AirflowConfigParser(default_config=sso_config) - - lineage_config = parse_airflow_config(AIRFLOW_SERVICE_NAME, conf) - - self.assertEqual( - lineage_config, - AirflowLineageConfig( - airflow_service_name=AIRFLOW_SERVICE_NAME, - metadata_config=OpenMetadataConnection( - hostPort="http://localhost:8585/api", - authProvider=AuthProvider.custom_oidc.value, - securityConfig=CustomOIDCSSOClientConfig( - clientId="client_id", - secretKey="secret_key", - tokenEndpoint="token_endpoint", - ), - ), - ), - ) diff --git a/openmetadata-docs/content/v1.2.x-SNAPSHOT/connectors/pipeline/airflow/lineage-backend.md b/openmetadata-docs/content/v1.2.x-SNAPSHOT/connectors/pipeline/airflow/lineage-backend.md index 6159cd7a2f6..9b4de654d5f 100644 --- a/openmetadata-docs/content/v1.2.x-SNAPSHOT/connectors/pipeline/airflow/lineage-backend.md +++ b/openmetadata-docs/content/v1.2.x-SNAPSHOT/connectors/pipeline/airflow/lineage-backend.md @@ -54,7 +54,6 @@ After the installation, we need to update the Airflow configuration. This can be backend = airflow_provider_openmetadata.lineage.backend.OpenMetadataLineageBackend airflow_service_name = local_airflow openmetadata_api_endpoint = http://localhost:8585/api -auth_provider_type = openmetadata jwt_token = ``` @@ -64,7 +63,6 @@ Or we can directly provide environment variables: AIRFLOW__LINEAGE__BACKEND="airflow_provider_openmetadata.lineage.backend.OpenMetadataLineageBackend" AIRFLOW__LINEAGE__AIRFLOW_SERVICE_NAME="local_airflow" AIRFLOW__LINEAGE__OPENMETADATA_API_ENDPOINT="http://localhost:8585/api" -AIRFLOW__LINEAGE__AUTH_PROVIDER_TYPE="openmetadata" AIRFLOW__LINEAGE__JWT_TOKEN="" ``` diff --git a/openmetadata-ui/src/main/resources/ui/src/components/Execution/Execution.component.tsx b/openmetadata-ui/src/main/resources/ui/src/components/Execution/Execution.component.tsx index 7a43906f133..b332885f957 100644 --- a/openmetadata-ui/src/main/resources/ui/src/components/Execution/Execution.component.tsx +++ b/openmetadata-ui/src/main/resources/ui/src/components/Execution/Execution.component.tsx @@ -40,9 +40,7 @@ import { PipelineStatus, Task } from '../../generated/entity/data/pipeline'; import { getPipelineStatus } from '../../rest/pipelineAPI'; import { getCurrentMillis, - getCurrentUnixInteger, getEpochMillisForPastDays, - getUnixSecondsForPastDays, } from '../../utils/date-time/DateTimeUtils'; import { showErrorToast } from '../../utils/ToastUtils'; import './Execution.style.less'; @@ -60,9 +58,9 @@ const ExecutionsTab = ({ pipelineFQN, tasks }: ExecutionProps) => { const [executions, setExecutions] = useState>(); const [datesSelected, setDatesSelected] = useState(false); const [startTime, setStartTime] = useState( - getUnixSecondsForPastDays(EXECUTION_FILTER_RANGE.last365days.days) + getEpochMillisForPastDays(EXECUTION_FILTER_RANGE.last365days.days) ); - const [endTime, setEndTime] = useState(getCurrentUnixInteger()); + const [endTime, setEndTime] = useState(getCurrentMillis()); const [isClickedCalendar, setIsClickedCalendar] = useState(false); const [status, setStatus] = useState(MenuOptions.all); const [isLoading, setIsLoading] = useState(false);