diff --git a/ingestion/src/airflow_provider_openmetadata/lineage/config.py b/ingestion/src/airflow_provider_openmetadata/lineage/config.py deleted file mode 100644 index 66d6223ccac..00000000000 --- a/ingestion/src/airflow_provider_openmetadata/lineage/config.py +++ /dev/null @@ -1,115 +0,0 @@ -# Copyright 2021 Collate -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# http://www.apache.org/licenses/LICENSE-2.0 -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -OpenMetadata Airflow Lineage Backend -""" -import json -import os -from typing import Optional - -from airflow.configuration import conf - - -# TODO refactor https://github.com/open-metadata/OpenMetadata/issues/3844 -class OpenMetadataLineageConfig(MetadataServerConfig): - """ - Base class for OpenMetada lineage config - - Attributes - airflow_service_name (str): name of the service - api_endpoint (str): the endpoint for the API - auth_provider_type (str): - secret_key (str): - """ - - airflow_service_name: str = "airflow" - api_endpoint: str = "http://localhost:8585" - auth_provider_type: str = "no-auth" - secret_key: Optional[str] = None - - -def get_lineage_config() -> OpenMetadataLineageConfig: - """ - Load the lineage config from airflow_provider_openmetadata.cfg. - """ - airflow_service_name = conf.get("lineage", "airflow_service_name", fallback=None) - if airflow_service_name: - api_endpoint = conf.get( - "lineage", "openmetadata_api_endpoint", fallback="http://localhost:8585" - ) - auth_provider_type = conf.get( - "lineage", "auth_provider_type", fallback="no-auth" - ) - secret_key = conf.get("lineage", "secret_key", fallback=None) - api_version = conf.get("lineage", "api_version", fallback="v1") - retry = conf.get("lineage", "retry", fallback=3) - retry_wait = conf.get("lineage", "retry_wait", fallback=3) - org_url = conf.get("lineage", "org_url", fallback=None) - client_id = conf.get("lineage", "client_id", fallback=None) - private_key = conf.get("lineage", "private_key", fallback=None) - domain = conf.get("lineage", "domain", fallback=None) - email = conf.get("lineage", "email", fallback=None) - audience = conf.get( - "lineage", "audience", fallback="https://www.googleapis.com/oauth2/v4/token" - ) - auth_header = conf.get("lineage", "auth_header", fallback="Authorization") - authority = conf.get("lineage", "authority", fallback="") - scopes = conf.get("lineage", "scopes", fallback=[]) - return OpenMetadataLineageConfig.parse_obj( - { - "airflow_service_name": airflow_service_name, - "api_endpoint": api_endpoint, - "auth_provider_type": auth_provider_type, - "secret_key": secret_key, - "audience": audience, - "auth_header": auth_header, - "email": email, - "domain": domain, - "private_key": private_key, - "client_id": client_id, - "org_url": org_url, - "retry_wait": retry_wait, - "retry": retry, - "api_version": api_version, - "authority": authority, - "scopes": scopes, - } - ) - - openmetadata_config_file = os.getenv("OPENMETADATA_LINEAGE_CONFIG") - if openmetadata_config_file: - with open(openmetadata_config_file, encoding="utf-8") as config_file: - config = json.load(config_file) - return OpenMetadataLineageConfig.parse_obj(config) - - return OpenMetadataLineageConfig.parse_obj( - { - "airflow_service_name": "airflow", - "api_endpoint": "http://localhost:8585/api", - "auth_provider_type": "no-auth", - } - ) - - -def get_metadata_config(config: OpenMetadataLineageConfig) -> MetadataServerConfig: - """ - Return MetadataServerConfig to interact with the API. - :param config: get_lineage_config() - """ - - return MetadataServerConfig.parse_obj( - { - "api_endpoint": config.api_endpoint, - "auth_provider_type": config.auth_provider_type, - "secret_key": config.secret_key, - } - ) diff --git a/ingestion/src/airflow_provider_openmetadata/lineage/config/__init__.py b/ingestion/src/airflow_provider_openmetadata/lineage/config/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/ingestion/src/airflow_provider_openmetadata/lineage/config/commons.py b/ingestion/src/airflow_provider_openmetadata/lineage/config/commons.py new file mode 100644 index 00000000000..e69de81eb9e --- /dev/null +++ b/ingestion/src/airflow_provider_openmetadata/lineage/config/commons.py @@ -0,0 +1,16 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +OpenMetadata Airflow Lineage Backend config commons & cte +""" + +LINEAGE = "lineage" diff --git a/ingestion/src/airflow_provider_openmetadata/lineage/config/loader.py b/ingestion/src/airflow_provider_openmetadata/lineage/config/loader.py new file mode 100644 index 00000000000..9dcf1b9d712 --- /dev/null +++ b/ingestion/src/airflow_provider_openmetadata/lineage/config/loader.py @@ -0,0 +1,89 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +OpenMetadata Airflow Lineage Backend +""" +import json +import os + +from airflow.configuration import conf +from pydantic import BaseModel + +from airflow_provider_openmetadata.lineage.config.commons import LINEAGE +from airflow_provider_openmetadata.lineage.config.providers import ( + provider_config_registry, +) +from metadata.generated.schema.metadataIngestion.workflow import ( + AuthProvider, + OpenMetadataServerConfig, +) + + +class AirflowLineageConfig(BaseModel): + airflow_service_name: str + metadata_config: OpenMetadataServerConfig + + +def parse_airflow_config(airflow_service_name: str) -> AirflowLineageConfig: + """ + Get airflow config from airflow.cfg and parse it + to the config model + """ + auth_provider_type = conf.get( + LINEAGE, "auth_provider_type", fallback=AuthProvider.no_auth.value + ) + + if auth_provider_type == AuthProvider.no_auth.value: + security_config = None + else: + load_security_config_fn = provider_config_registry.registry(auth_provider_type) + security_config = load_security_config_fn() + + return AirflowLineageConfig( + airflow_service_name=airflow_service_name, + metadata_config=OpenMetadataServerConfig( + hostPort=conf.get( + LINEAGE, + "openmetadata_api_endpoint", + fallback="http://localhost:8585/api", + ), + authProvider=auth_provider_type, + securityConfig=security_config, + ), + ) + + +def get_lineage_config() -> AirflowLineageConfig: + """ + Load the lineage config from airflow.cfg, from + a JSON file path configures as env in OPENMETADATA_LINEAGE_CONFIG + or return a default config. + """ + airflow_service_name = conf.get(LINEAGE, "airflow_service_name", fallback=None) + if airflow_service_name: + return parse_airflow_config(airflow_service_name) + + openmetadata_config_file = os.getenv("OPENMETADATA_LINEAGE_CONFIG") + + # If config file, parse the JSON config, that should conform to AirflowLineageConfig + if openmetadata_config_file: + with open(openmetadata_config_file, encoding="utf-8") as config_file: + config = json.load(config_file) + return AirflowLineageConfig.parse_obj(config) + + # If nothing is configured, let's use a default for local + return AirflowLineageConfig( + airflow_service_name="airflow", + metadata_config=OpenMetadataServerConfig( + hostPort="http://localhost:8585/api", + ), + ) diff --git a/ingestion/src/airflow_provider_openmetadata/lineage/config/providers.py b/ingestion/src/airflow_provider_openmetadata/lineage/config/providers.py new file mode 100644 index 00000000000..f7289565ac9 --- /dev/null +++ b/ingestion/src/airflow_provider_openmetadata/lineage/config/providers.py @@ -0,0 +1,85 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +OpenMetadata Airflow Lineage Backend security providers config +""" + +from collections import namedtuple + +from airflow.configuration import conf + +from airflow_provider_openmetadata.lineage.config.commons import LINEAGE +from metadata.generated.schema.metadataIngestion.workflow import ( + Auth0SSOConfig, + AuthProvider, + GoogleSSOConfig, + OktaSSOConfig, +) + + +def register_provider_config(): + """ + Helps us register custom functions to parse provider config + """ + registry = dict() + + def add(provider: str): + def inner(fn): + registry[provider] = fn + return fn + + return inner + + Register = namedtuple("Register", ["add", "registry"]) + return Register(add, registry) + + +provider_config_registry = register_provider_config() + + +@provider_config_registry.add(AuthProvider.google.value) +def load_google_auth() -> GoogleSSOConfig: + """ + Load config for Google Auth + """ + return GoogleSSOConfig( + secretKey=conf.get(LINEAGE, "secret_key"), + audience=conf.get( + LINEAGE, "audience", fallback="https://www.googleapis.com/oauth2/v4/token" + ), + ) + + +@provider_config_registry.add(AuthProvider.okta.value) +def load_okta_auth() -> OktaSSOConfig: + """ + Load config for Google Auth + """ + return OktaSSOConfig( + clientId=conf.get(LINEAGE, "client_id"), + orgURL=conf.get(LINEAGE, "org_url"), + privateKey=conf.get(LINEAGE, "private_key"), + email=conf.get(LINEAGE, "email"), + scopes=conf.get(LINEAGE, "scopes", fallback=[]), + ) + + +@provider_config_registry.add(AuthProvider.auth0.value) +def load_auth0_auth() -> Auth0SSOConfig: + """ + Load config for Google Auth + """ + return Auth0SSOConfig( + clientId=conf.get(LINEAGE, "client_id"), + secretKey=conf.get(LINEAGE, "secret_key"), + domain=conf.get(LINEAGE, "domain"), + ) diff --git a/ingestion/src/airflow_provider_openmetadata/lineage/openmetadata.py b/ingestion/src/airflow_provider_openmetadata/lineage/openmetadata.py index 177df3e8ede..048621ff165 100644 --- a/ingestion/src/airflow_provider_openmetadata/lineage/openmetadata.py +++ b/ingestion/src/airflow_provider_openmetadata/lineage/openmetadata.py @@ -18,9 +18,9 @@ from typing import TYPE_CHECKING, Dict, List, Optional from airflow.lineage.backend import LineageBackend -from airflow_provider_openmetadata.lineage.config import ( +from airflow_provider_openmetadata.lineage.config.loader import ( + AirflowLineageConfig, get_lineage_config, - get_metadata_config, ) from airflow_provider_openmetadata.lineage.utils import get_xlets, parse_lineage from metadata.ingestion.ometa.ometa_api import OpenMetadata @@ -78,9 +78,8 @@ class OpenMetadataLineageBackend(LineageBackend): """ try: - config = get_lineage_config() - metadata_config = get_metadata_config(config) - client = OpenMetadata(metadata_config) + config: AirflowLineageConfig = get_lineage_config() + client = OpenMetadata(config.metadata_config) op_inlets = get_xlets(operator, "_inlets") op_outlets = get_xlets(operator, "_outlets") diff --git a/ingestion/src/airflow_provider_openmetadata/lineage/utils.py b/ingestion/src/airflow_provider_openmetadata/lineage/utils.py index 3a354bacbbc..b339f959e5f 100644 --- a/ingestion/src/airflow_provider_openmetadata/lineage/utils.py +++ b/ingestion/src/airflow_provider_openmetadata/lineage/utils.py @@ -18,7 +18,7 @@ from typing import TYPE_CHECKING, Dict, List, Optional from airflow.configuration import conf -from airflow_provider_openmetadata.lineage.config import OpenMetadataLineageConfig +from airflow_provider_openmetadata.lineage.config.loader import AirflowLineageConfig from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.api.services.createPipelineService import ( @@ -308,7 +308,7 @@ def add_status( # pylint: disable=too-many-arguments,too-many-locals def parse_lineage( - config: OpenMetadataLineageConfig, + config: AirflowLineageConfig, context: Dict, operator: "BaseOperator", inlets: List, @@ -382,7 +382,7 @@ def parse_lineage( def get_or_create_pipeline_service( - operator: "BaseOperator", client: OpenMetadata, config: OpenMetadataLineageConfig + operator: "BaseOperator", client: OpenMetadata, config: AirflowLineageConfig ) -> PipelineService: """ Check if we already have the airflow instance as a PipelineService, diff --git a/ingestion/src/metadata/ingestion/source/sql_source.py b/ingestion/src/metadata/ingestion/source/sql_source.py index e6d64721a3a..381e1604b4e 100644 --- a/ingestion/src/metadata/ingestion/source/sql_source.py +++ b/ingestion/src/metadata/ingestion/source/sql_source.py @@ -11,7 +11,6 @@ """ Generic source to build SQL connectors. """ -import copy import json import logging import re @@ -209,7 +208,6 @@ class SQLSource(Source[OMetaDatabaseAndTable]): inspectors = self.get_databases() for inspector in inspectors: schema_names = inspector.get_schema_names() - print(schema_names) for schema in schema_names: # clear any previous source database state self.database_source_state.clear() diff --git a/ingestion/src/metadata/utils/source_connections.py b/ingestion/src/metadata/utils/source_connections.py index 97f3aef4f95..f47d2be4665 100644 --- a/ingestion/src/metadata/utils/source_connections.py +++ b/ingestion/src/metadata/utils/source_connections.py @@ -17,6 +17,9 @@ from urllib.parse import quote_plus from metadata.generated.schema.entity.services.connections.database.mysqlConnection import ( MysqlConnection, ) +from metadata.generated.schema.entity.services.connections.database.sqliteConnection import ( + SQLiteConnection, +) @singledispatch @@ -53,3 +56,12 @@ def _(connection: MysqlConnection): url = f"{url}?{params}" return url + + +@get_connection_url.register +def _(connection: SQLiteConnection): + """ + SQLite is only used for testing with the in-memory db + """ + + return f"{connection.scheme.value}:///:memory:"