diff --git a/ingestion/src/metadata/__version__.py b/ingestion/src/metadata/__version__.py index 146f0b7c0cb..4254edf0f76 100644 --- a/ingestion/src/metadata/__version__.py +++ b/ingestion/src/metadata/__version__.py @@ -8,6 +8,9 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +""" +Module for getting versions of OpenMetadata and python +""" import os import sys @@ -18,14 +21,14 @@ version = pkg_resources.require("openmetadata-ingestion")[0].version def get_metadata_version() -> str: + """ + Return the OpenMetadata version + """ + metadata_pkg_dir = os.path.join(os.path.dirname(__file__), "..", "..") metadata_pkg_dir = os.path.abspath(metadata_pkg_dir) - return "metadata {} from {} (python {})".format( - version, - metadata_pkg_dir, - get_major_minor_version(), - ) + return f"metadata {version} from {metadata_pkg_dir} (python {get_major_minor_version()})" def get_major_minor_version() -> str: diff --git a/ingestion/src/metadata/cli/db_dump.py b/ingestion/src/metadata/cli/db_dump.py index accf5cbd092..9bf792e044d 100644 --- a/ingestion/src/metadata/cli/db_dump.py +++ b/ingestion/src/metadata/cli/db_dump.py @@ -1,3 +1,18 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Database Dumping utility for the metadata CLI +""" + from pathlib import Path from typing import List @@ -116,7 +131,7 @@ def dump(engine: Engine, output: Path, schema: str = None) -> None: for table in tables if table not in TABLES_DUMP_ALL and table not in NOT_MIGRATE - and table not in CUSTOM_TABLES.keys() + and table not in CUSTOM_TABLES ] dump_all(tables=list(TABLES_DUMP_ALL), engine=engine, output=output) diff --git a/ingestion/src/metadata/cli/docker.py b/ingestion/src/metadata/cli/docker.py index 587712545ad..993293d88b6 100644 --- a/ingestion/src/metadata/cli/docker.py +++ b/ingestion/src/metadata/cli/docker.py @@ -21,7 +21,7 @@ from base64 import b64encode from datetime import timedelta import click -import requests as requests +import requests from metadata.generated.schema.entity.data.table import Table from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( @@ -36,8 +36,8 @@ from metadata.utils.client_version import get_client_version from metadata.utils.logger import cli_logger, ometa_logger logger = cli_logger() -calc_gb = 1024 * 1024 * 1024 -min_memory_limit = 6 * calc_gb +CALC_GB = 1024 * 1024 * 1024 +MIN_MEMORY_LIMIT = 6 * CALC_GB RELEASE_BRANCH_VERSION = get_client_version() DOCKER_URL_ROOT = f"https://raw.githubusercontent.com/open-metadata/OpenMetadata/{RELEASE_BRANCH_VERSION}/docker/metadata/" @@ -51,6 +51,10 @@ DEFUALT_JWT_TOKEN = "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ def start_docker(docker, start_time, file_path, ingest_sample_data: bool): + """ + Method for starting up the docker containers + """ + logger.info("Running docker compose for OpenMetadata..") click.secho("It may take some time on the first run", fg="bright_yellow") if file_path: @@ -120,15 +124,20 @@ It helps OpenMetadata reach wider audience and helps our community.\n""", def env_file_check(env_file_path): + """ + Method for checking if the env file path is valid + """ if env_file_path is not None: if env_file_path == "": raise ValueError("Please provide path to env file") - else: - logger.info(f"Using env file from {env_file_path}") - return pathlib.Path(env_file_path) + logger.info(f"Using env file from {env_file_path}") + return pathlib.Path(env_file_path) def file_path_check(file_path, database: str): + """ + Method for checking if the file path is valid + """ docker_compose_file_name = BACKEND_DATABASES.get(database) or DEFAULT_COMPOSE_FILE @@ -140,16 +149,15 @@ def file_path_check(file_path, database: str): logger.info( f"Downloading latest docker compose file {docker_compose_file_name} from openmetadata repository..." ) - r = requests.get(f"{DOCKER_URL_ROOT}{docker_compose_file_name}") + resp = requests.get(f"{DOCKER_URL_ROOT}{docker_compose_file_name}") with open(docker_compose_file_path, "wb") as docker_compose_file_handle: - docker_compose_file_handle.write(r.content) + docker_compose_file_handle.write(resp.content) docker_compose_file_handle.close() else: if file_path == "": raise ValueError("Please Provide Path to local docker-compose.yml file") - else: - logger.info(f"Using docker compose file from {file_path}") - docker_compose_file_path = pathlib.Path(file_path) + logger.info(f"Using docker compose file from {file_path}") + docker_compose_file_path = pathlib.Path(file_path) return docker_compose_file_path @@ -165,6 +173,9 @@ def run_docker( ingest_sample_data: bool, database: str, ): + """ + Main method for the OpenMetadata docker commands + """ try: from python_on_whales import DockerClient @@ -181,7 +192,7 @@ def run_docker( raise Exception("Docker Service is not up and running.") logger.info("Checking openmetadata memory constraints...") - if docker_info.mem_total < min_memory_limit: + if docker_info.mem_total < MIN_MEMORY_LIMIT: raise MemoryError # Check for -f @@ -225,7 +236,7 @@ def run_docker( logger.debug(traceback.format_exc()) click.secho( f"Please Allocate More memory to Docker.\nRecommended: 6GB+\nCurrent: " - f"{round(float(dict(docker_info).get('mem_total')) / calc_gb)}", + f"{round(float(dict(docker_info).get('mem_total')) / CALC_GB)}", fg="red", ) except Exception as exc: @@ -234,6 +245,10 @@ def run_docker( def reset_db_om(docker): + """ + Reset the OpenMetadata Database and clear everything in the DB + """ + if docker.container.inspect("openmetadata_server").state.running: click.secho( f"Resetting OpenMetadata.\nThis will clear out all the data", @@ -263,10 +278,9 @@ def wait_for_containers(docker) -> None: ) if running: break - else: - sys.stdout.write(".") - sys.stdout.flush() - time.sleep(5) + sys.stdout.write(".") + sys.stdout.flush() + time.sleep(5) def run_sample_data() -> None: @@ -293,7 +307,7 @@ def run_sample_data() -> None: resp = client.get("/dags") if resp: break - elif time.time() > timeout: + if time.time() > timeout: raise TimeoutError("Ingestion container timed out") except TimeoutError as err: logger.debug(traceback.format_exc()) @@ -304,4 +318,4 @@ def run_sample_data() -> None: time.sleep(5) for dag in dags: json_sample_data = {"is_paused": False} - client.patch("/dags/{}".format(dag), data=json.dumps(json_sample_data)) + client.patch(f"/dags/{dag}", data=json.dumps(json_sample_data)) diff --git a/ingestion/src/metadata/cli/ingest.py b/ingestion/src/metadata/cli/ingest.py index 8a3947f1760..f7cd82faf38 100644 --- a/ingestion/src/metadata/cli/ingest.py +++ b/ingestion/src/metadata/cli/ingest.py @@ -39,7 +39,7 @@ def run_ingest(config_path: str) -> None: logger.debug(f"Using config: {workflow.config}") except Exception as exc: logger.debug(traceback.format_exc()) - print_init_error(exc, config_dict, WorkflowType.ingest) + print_init_error(exc, config_dict, WorkflowType.INGEST) sys.exit(1) workflow.execute() diff --git a/ingestion/src/metadata/cli/restore.py b/ingestion/src/metadata/cli/restore.py index 8a53392420b..978a89d444c 100644 --- a/ingestion/src/metadata/cli/restore.py +++ b/ingestion/src/metadata/cli/restore.py @@ -25,9 +25,12 @@ logger = cli_logger() def execute_sql_file(engine: Engine, input: Path, schema: str = None) -> None: + """ + Method to create the connection and execute the sql query + """ - with open(input, encoding="utf-8") as f: - for query in f.readlines(): + with open(input, encoding="utf-8") as file: + for query in file.readlines(): # `%` is a reserved syntax in SQLAlchemy to bind parameters. Escaping it with `%%` clean_query = query.replace("%", "%%") diff --git a/ingestion/src/metadata/cli/utils.py b/ingestion/src/metadata/cli/utils.py index 014a6ad81f4..140752255aa 100644 --- a/ingestion/src/metadata/cli/utils.py +++ b/ingestion/src/metadata/cli/utils.py @@ -1,3 +1,18 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Utils module for the metadata backup and restore process +""" + from sqlalchemy.engine import Engine from metadata.generated.schema.entity.services.connections.database.mysqlConnection import ( @@ -11,6 +26,9 @@ from metadata.utils.helpers import list_to_dict def get_engine(host, port, user, password, options, arguments, schema, database): + """ + Get the database connection engine + """ connection_options = list_to_dict(options) connection_arguments = list_to_dict(arguments) diff --git a/ingestion/src/metadata/cmd.py b/ingestion/src/metadata/cmd.py index 980fabd95ff..6ed654b48ff 100644 --- a/ingestion/src/metadata/cmd.py +++ b/ingestion/src/metadata/cmd.py @@ -9,6 +9,10 @@ # See the License for the specific language governing permissions and # limitations under the License. +""" +This module defines the CLI commands for OpenMetada +""" + import logging import os import pathlib @@ -54,6 +58,7 @@ def check() -> None: required=False, ) def metadata(debug: bool, log_level: str) -> None: + """Method to set logger information""" if debug: set_loggers_level(logging.DEBUG) elif log_level: @@ -96,7 +101,7 @@ def test(config: str) -> None: workflow = TestSuiteWorkflow.create(workflow_test_config_dict) except Exception as exc: logger.debug(traceback.format_exc()) - print_init_error(exc, workflow_test_config_dict, WorkflowType.profile) + print_init_error(exc, workflow_test_config_dict, WorkflowType.PROFILE) sys.exit(1) workflow.execute() @@ -124,7 +129,7 @@ def profile(config: str) -> None: workflow = ProfilerWorkflow.create(workflow_config_dict) except Exception as exc: logger.debug(traceback.format_exc()) - print_init_error(exc, workflow_config_dict, WorkflowType.profile) + print_init_error(exc, workflow_config_dict, WorkflowType.PROFILE) sys.exit(1) workflow.execute() @@ -141,7 +146,10 @@ def webhook(host: str, port: int) -> None: """Simple Webserver to test webhook metadata events""" class WebhookHandler(BaseHTTPRequestHandler): + """WebhookHandler class to define the rest API methods""" + def do_GET(self): + """WebhookHandler GET API method""" self.send_response(200) self.send_header("Content-type", "text/html") self.end_headers() @@ -150,6 +158,7 @@ def webhook(host: str, port: int) -> None: self.wfile.write(bytes(message, "utf8")) def do_POST(self): + """WebhookHandler POST API method""" content_len = int(self.headers.get("Content-Length")) post_body = self.rfile.read(content_len) self.send_response(200) @@ -347,8 +356,8 @@ def openmetadata_imports_migration( """Update DAG files generated after creating workflow in 0.11 and before. In 0.12 the airflow managed API package name changed from `openmetadata` to `openmetadata_managed_apis` - hence breaking existing DAGs. The `dag_generated_config` folder also changed location in Docker. This small CLI - utility allows you to update both elements. + hence breaking existing DAGs. The `dag_generated_config` folder also changed location in Docker. + This small CLI utility allows you to update both elements. """ run_openmetadata_imports_migration(dir_path, change_config_file_path) diff --git a/ingestion/src/metadata/config/common.py b/ingestion/src/metadata/config/common.py index 15e5f961ddd..5324dd14e32 100644 --- a/ingestion/src/metadata/config/common.py +++ b/ingestion/src/metadata/config/common.py @@ -21,11 +21,15 @@ from pydantic import BaseModel class ConfigModel(BaseModel): + """Class definition for config model""" + class Config: extra = "forbid" class DynamicTypedConfig(ConfigModel): + """Class definition for Dynamic Typed Config""" + type: str config: Optional[Any] @@ -39,15 +43,28 @@ class ConfigurationError(Exception): class ConfigurationMechanism(ABC): + """ + Class definition for configuration mechanism + """ + @abstractmethod def load_config(self, config_fp: IO) -> dict: + """ + Abstract method to load configuration from yaml files + """ pass class YamlConfigurationMechanism(ConfigurationMechanism): - """load configuration from yaml files""" + """ + load configuration from yaml files + """ def load_config(self, config_fp: IO) -> dict: + """ + Method to load configuration from yaml files + """ + try: config = yaml.safe_load(config_fp) return config @@ -58,7 +75,9 @@ class YamlConfigurationMechanism(ConfigurationMechanism): class JsonConfigurationMechanism(ConfigurationMechanism): - """load configuration from json files""" + """ + load configuration from json files + """ def load_config(self, config_fp: IO) -> dict: try: @@ -71,6 +90,10 @@ class JsonConfigurationMechanism(ConfigurationMechanism): def load_config_file(config_file: pathlib.Path) -> dict: + """ + Method to load configuration from json or yaml,yml files + """ + if not config_file.is_file(): raise ConfigurationError(f"Cannot open config file {config_file}") @@ -81,9 +104,7 @@ def load_config_file(config_file: pathlib.Path) -> dict: config_mech = JsonConfigurationMechanism() else: raise ConfigurationError( - "Only .json and .yml are supported. Cannot process file type {}".format( - config_file.suffix - ) + f"Only .json and .yml are supported. Cannot process file type {config_file.suffix}" ) with config_file.open() as raw_config_file: raw_config = raw_config_file.read() diff --git a/ingestion/src/metadata/config/workflow.py b/ingestion/src/metadata/config/workflow.py index 1cd7cd52fc5..f5b35d5dcd3 100644 --- a/ingestion/src/metadata/config/workflow.py +++ b/ingestion/src/metadata/config/workflow.py @@ -38,8 +38,7 @@ def fetch_type_class(_type: str, is_file: bool): """ if is_file: return _type.replace("-", "_") - else: - return "".join([i.title() for i in _type.replace("-", "_").split("_")]) + return "".join([i.title() for i in _type.replace("-", "_").split("_")]) def get_class(key: str) -> Type[T]: @@ -71,11 +70,7 @@ def get_sink( :param _from: From where do we load the sink class. Ingestion by default. """ sink_class = get_class( - "metadata.{}.sink.{}.{}Sink".format( - _from, - fetch_type_class(sink_type, is_file=True), - fetch_type_class(sink_type, is_file=False), - ) + f"metadata.{_from}.sink.{fetch_type_class(sink_type, is_file=True)}.{fetch_type_class(sink_type, is_file=False)}Sink" ) sink: Sink = sink_class.create( @@ -109,11 +104,7 @@ def get_processor( :param _from: From where do we load the sink class. Ingestion by default. """ processor_class = get_class( - "metadata.{}.processor.{}.{}Processor".format( - _from, - fetch_type_class(processor_type, is_file=True), - fetch_type_class(processor_type, is_file=False), - ) + f"metadata.{_from}.processor.{fetch_type_class(processor_type, is_file=True)}.{fetch_type_class(processor_type, is_file=False)}Processor" ) processor: Processor = processor_class.create( diff --git a/ingestion/src/metadata/utils/class_helper.py b/ingestion/src/metadata/utils/class_helper.py index 791c8279045..a429be23059 100644 --- a/ingestion/src/metadata/utils/class_helper.py +++ b/ingestion/src/metadata/utils/class_helper.py @@ -1,3 +1,18 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Helper module to process the service type from the config +""" + from pydoc import locate from typing import Type @@ -32,10 +47,18 @@ def _get_service_type_from(service_subtype: str) -> ServiceType: def get_service_type_from_source_type(source_type: str) -> ServiceType: + """ + Method to get service type from source type + """ + return _get_service_type_from(_clean(source_type)) def get_service_class_from_service_type(service_type: ServiceType) -> Type[BaseModel]: + """ + Method to get service class from service type + """ + return locate( f"metadata.generated.schema.entity.services.{service_type.name.lower()}Service.{service_type.name}Service" ) diff --git a/ingestion/src/metadata/utils/client_version.py b/ingestion/src/metadata/utils/client_version.py index 48c50377a59..369c136fe34 100644 --- a/ingestion/src/metadata/utils/client_version.py +++ b/ingestion/src/metadata/utils/client_version.py @@ -39,7 +39,7 @@ def get_version_from_string(raw_version: str) -> str: except AttributeError as err: raise VersionParsingException( f"Can't extract version from {raw_version}: {err}" - ) + ) from err def get_client_version() -> str: diff --git a/ingestion/src/metadata/utils/connections.py b/ingestion/src/metadata/utils/connections.py index dfe4830196f..de51485f2d7 100644 --- a/ingestion/src/metadata/utils/connections.py +++ b/ingestion/src/metadata/utils/connections.py @@ -126,7 +126,7 @@ from metadata.generated.schema.entity.services.connections.pipeline.fivetranConn FivetranConnection, ) from metadata.generated.schema.entity.services.connections.pipeline.gluePipelineConnection import ( - GluePipelineConnection as GluePipelineConnection, + GluePipelineConnection, ) from metadata.generated.schema.entity.services.connections.pipeline.nifiConnection import ( NifiConnection, @@ -146,11 +146,19 @@ class SourceConnectionException(Exception): def render_query_header(ometa_version: str) -> str: + """ + Render the query header for OpenMetadata Queries + """ + header_obj = {"app": "OpenMetadata", "version": ometa_version} return f"/* {json.dumps(header_obj)} */" def inject_query_header(conn, cursor, statement, parameters, context, executemany): + """ + Inject the query header for OpenMetadata Queries + """ + version = pkg_resources.require("openmetadata-ingestion")[0].version statement_with_header = render_query_header(version) + "\n" + statement return statement_with_header, parameters @@ -234,7 +242,7 @@ def _(connection: SnowflakeConnection, verbose: bool = False) -> Engine: ) if connection.privateKey: - connection.connectionArguments = dict() + connection.connectionArguments = {} connection.connectionArguments["private_key"] = pkb return create_generic_connection(connection, verbose) @@ -428,10 +436,10 @@ def test_connection(connection) -> None: conn.execute(ConnTestFn()) except OperationalError as err: msg = f"Connection error for {connection}: {err}. Check the connection details." - raise SourceConnectionException(msg) + raise SourceConnectionException(msg) from err except Exception as exc: msg = f"Unknown error connecting with {connection}: {exc}." - raise SourceConnectionException(msg) + raise SourceConnectionException(msg) from exc @test_connection.register @@ -447,10 +455,10 @@ def _(connection: DynamoClient) -> None: connection.client.tables.all() except ClientError as err: msg = f"Connection error for {connection}: {err}. Check the connection details." - raise SourceConnectionException(msg) + raise SourceConnectionException(msg) from err except Exception as exc: msg = f"Unknown error connecting with {connection}: {exc}." - raise SourceConnectionException(msg) + raise SourceConnectionException(msg) from exc @test_connection.register @@ -468,10 +476,10 @@ def _(connection: GlueDBClient) -> None: except ClientError as err: msg = f"Connection error for {connection}: {err}. Check the connection details." - raise SourceConnectionException(msg) + raise SourceConnectionException(msg) from err except Exception as exc: msg = f"Unknown error connecting with {connection}: {exc}." - raise SourceConnectionException(msg) + raise SourceConnectionException(msg) from exc @test_connection.register @@ -487,10 +495,10 @@ def _(connection: GluePipelineClient) -> None: connection.client.list_workflows() except ClientError as err: msg = f"Connection error for {connection}: {err}. Check the connection details." - raise SourceConnectionException(msg) + raise SourceConnectionException(msg) from err except Exception as exc: msg = f"Unknown error connecting with {connection}: {exc}." - raise SourceConnectionException(msg) + raise SourceConnectionException(msg) from exc @test_connection.register @@ -501,10 +509,10 @@ def _(connection: SalesforceClient) -> None: connection.client.describe() except SalesforceAuthenticationFailed as err: msg = f"Connection error for {connection}: {err}. Check the connection details." - raise SourceConnectionException(msg) + raise SourceConnectionException(msg) from err except Exception as exc: msg = f"Unknown error connecting with {connection}: {exc}." - raise SourceConnectionException(msg) + raise SourceConnectionException(msg) from exc @test_connection.register @@ -520,7 +528,7 @@ def _(connection: KafkaClient) -> None: _ = connection.schema_registry_client.get_subjects() except Exception as exc: msg = f"Unknown error connecting with {connection}: {exc}." - raise SourceConnectionException(msg) + raise SourceConnectionException(msg) from exc @test_connection.register @@ -529,13 +537,13 @@ def _(connection: DeltaLakeClient) -> None: connection.client.catalog.listDatabases() except Exception as exc: msg = f"Unknown error connecting with {connection}: {exc}." - raise SourceConnectionException(msg) + raise SourceConnectionException(msg) from exc @get_connection.register def _(connection: MetabaseConnection, verbose: bool = False): try: - params = dict() + params = {} params["username"] = connection.username params["password"] = connection.password.get_secret_value() @@ -554,7 +562,7 @@ def _(connection: MetabaseConnection, verbose: bool = False): except Exception as exc: msg = f"Unknown error connecting with {connection}: {exc}." - raise SourceConnectionException(msg) + raise SourceConnectionException(msg) from exc @test_connection.register @@ -566,7 +574,7 @@ def _(connection: MetabaseClient) -> None: ) except Exception as exc: msg = f"Unknown error connecting with {connection}: {exc}." - raise SourceConnectionException(msg) + raise SourceConnectionException(msg) from exc @test_connection.register @@ -575,7 +583,7 @@ def _(connection: AirflowConnection) -> None: test_connection(connection.connection) except Exception as exc: msg = f"Unknown error connecting with {connection}: {exc}." - raise SourceConnectionException(msg) + raise SourceConnectionException(msg) from exc @get_connection.register @@ -584,7 +592,7 @@ def _(connection: AirflowConnection) -> None: return get_connection(connection.connection) except Exception as exc: msg = f"Unknown error connecting with {connection}: {exc}." - raise SourceConnectionException(msg) + raise SourceConnectionException(msg) from exc @get_connection.register @@ -600,7 +608,7 @@ def _(connection: AirByteClient) -> None: connection.client.list_workspaces() except Exception as exc: msg = f"Unknown error connecting with {connection}: {exc}." - raise SourceConnectionException(msg) + raise SourceConnectionException(msg) from exc @get_connection.register @@ -616,7 +624,7 @@ def _(connection: FivetranClient) -> None: connection.client.list_groups() except Exception as exc: msg = f"Unknown error connecting with {connection}: {exc}." - raise SourceConnectionException(msg) + raise SourceConnectionException(msg) from exc @get_connection.register @@ -631,7 +639,7 @@ def _(connection: RedashConnection, verbose: bool = False): except Exception as exc: msg = f"Unknown error connecting with {connection}: {exc}." - raise SourceConnectionException(msg) + raise SourceConnectionException(msg) from exc @test_connection.register @@ -640,7 +648,7 @@ def _(connection: RedashClient) -> None: connection.client.dashboards() except Exception as exc: msg = f"Unknown error connecting with {connection}: {exc}." - raise SourceConnectionException(msg) + raise SourceConnectionException(msg) from exc @get_connection.register @@ -658,7 +666,7 @@ def _(connection: SupersetClient) -> None: connection.client.fetch_menu() except Exception as exc: msg = f"Unknown error connecting with {connection}: {exc}." - raise SourceConnectionException(msg) + raise SourceConnectionException(msg) from exc @get_connection.register @@ -708,7 +716,7 @@ def _(connection: TableauClient) -> None: except Exception as exc: msg = f"Unknown error connecting with {connection}: {exc}." - raise SourceConnectionException(msg) + raise SourceConnectionException(msg) from exc @get_connection.register @@ -724,7 +732,7 @@ def _(connection: PowerBiClient) -> None: connection.client.fetch_dashboards() except Exception as exc: msg = f"Unknown error connecting with {connection}: {exc}." - raise SourceConnectionException(msg) + raise SourceConnectionException(msg) from exc @get_connection.register @@ -749,7 +757,7 @@ def _(connection: LookerClient) -> None: connection.client.me() except Exception as exc: msg = f"Unknown error connecting with {connection}: {exc}." - raise SourceConnectionException(msg) + raise SourceConnectionException(msg) from exc @test_connection.register @@ -777,7 +785,7 @@ def _(connection: DatalakeClient) -> None: except ClientError as err: msg = f"Connection error for {connection}: {err}. Check the connection details." - raise SourceConnectionException(msg) + raise SourceConnectionException(msg) from err @singledispatch @@ -823,7 +831,7 @@ def _(connection: ModeClient) -> None: connection.client.get_user_account() except Exception as exc: msg = f"Unknown error connecting with {connection}: {exc}." - raise SourceConnectionException(msg) + raise SourceConnectionException(msg) from exc @get_connection.register @@ -844,7 +852,7 @@ def _(connection: MlflowClientWrapper) -> None: connection.client.list_registered_models() except Exception as exc: msg = f"Unknown error connecting with {connection}: {exc}." - raise SourceConnectionException(msg) + raise SourceConnectionException(msg) from exc @get_connection.register @@ -867,7 +875,7 @@ def _(connection: NifiClientWrapper) -> None: except Exception as err: raise SourceConnectionException( f"Unknown error connecting with {connection} - {err}." - ) + ) from err @get_connection.register @@ -896,7 +904,7 @@ def _(connection: DagsterConnection) -> None: return DagsterClient(connection) except Exception as exc: msg = f"Unknown error connecting with {connection}: {exc}." - raise SourceConnectionException(msg) + raise SourceConnectionException(msg) from exc @test_connection.register @@ -907,4 +915,4 @@ def _(connection: DagsterClient) -> None: connection.client._execute(TEST_QUERY_GRAPHQL) except Exception as exc: msg = f"Unknown error connecting with {connection}: {exc}." - raise SourceConnectionException(msg) + raise SourceConnectionException(msg) from exc diff --git a/ingestion/src/metadata/utils/credentials.py b/ingestion/src/metadata/utils/credentials.py index 8b81ecb1438..36b32e4c359 100644 --- a/ingestion/src/metadata/utils/credentials.py +++ b/ingestion/src/metadata/utils/credentials.py @@ -52,7 +52,7 @@ def validate_private_key(private_key: str) -> None: serialization.load_pem_private_key(private_key.encode(), password=None) except ValueError as err: msg = f"Cannot serialise key: {err}" - raise InvalidPrivateKeyException(msg) + raise InvalidPrivateKeyException(msg) from err def create_credential_tmp_file(credentials: dict) -> str: @@ -61,11 +61,11 @@ def create_credential_tmp_file(credentials: dict) -> str: :param credentials: dictionary to store :return: path to find the file """ - with tempfile.NamedTemporaryFile(delete=False) as fp: + with tempfile.NamedTemporaryFile(delete=False) as temp_file: cred_json = json.dumps(credentials, indent=4, separators=(",", ": ")) - fp.write(cred_json.encode()) + temp_file.write(cred_json.encode()) - return fp.name + return temp_file.name def build_google_credentials_dict(gcs_values: GCSValues) -> Dict[str, str]: diff --git a/ingestion/src/metadata/utils/dbt_config.py b/ingestion/src/metadata/utils/dbt_config.py index 1c3add106dd..823dc7c7935 100644 --- a/ingestion/src/metadata/utils/dbt_config.py +++ b/ingestion/src/metadata/utils/dbt_config.py @@ -37,6 +37,10 @@ DBT_RUN_RESULTS_FILE_NAME = "run_results.json" @singledispatch def get_dbt_details(config): + """ + Single dispatch method to get the DBT files from different sources + """ + if config: raise NotImplementedError( f"Config not implemented for type {type(config)}: {config}" @@ -61,7 +65,7 @@ def _(config: DbtLocalConfig): dbt_manifest = manifest.read() if ( config.dbtRunResultsFilePath is not None - and config.dbtRunResultsFilePath is not "" + and config.dbtRunResultsFilePath != "" ): logger.debug( f"Reading [dbtRunResultsFilePath] from: {config.dbtRunResultsFilePath}" @@ -91,7 +95,7 @@ def _(config: DbtHttpConfig): dbt_run_results = None if ( config.dbtRunResultsHttpPath is not None - and config.dbtRunResultsHttpPath is not "" + and config.dbtRunResultsHttpPath != "" ): logger.debug( f"Requesting [dbtRunResultsHttpPath] to: {config.dbtRunResultsHttpPath}" diff --git a/ingestion/src/metadata/utils/dispatch.py b/ingestion/src/metadata/utils/dispatch.py index 10a169d7886..8795ef2e1a8 100644 --- a/ingestion/src/metadata/utils/dispatch.py +++ b/ingestion/src/metadata/utils/dispatch.py @@ -25,7 +25,7 @@ def enum_register(): """ Helps us register custom function for enum values """ - registry = dict() + registry = {} def add(name: str): def inner(fn): @@ -42,7 +42,7 @@ def class_register(): """ Helps us register custom functions for classes based on their name """ - registry = dict() + registry = {} def add(entity_type: Type[T]): def inner(fn): diff --git a/ingestion/src/metadata/utils/entity_link.py b/ingestion/src/metadata/utils/entity_link.py index 53abd7e86f2..bffe7da8589 100644 --- a/ingestion/src/metadata/utils/entity_link.py +++ b/ingestion/src/metadata/utils/entity_link.py @@ -33,6 +33,10 @@ class EntityLinkBuildingException(Exception): def split(s: str) -> List[str]: + """ + Method to handle the splitting logic + """ + lexer = EntityLinkLexer(InputStream(s)) stream = CommonTokenStream(lexer) parser = EntityLinkParser(stream) diff --git a/ingestion/src/metadata/utils/filters.py b/ingestion/src/metadata/utils/filters.py index 3a52698cca6..7c3278c65b1 100644 --- a/ingestion/src/metadata/utils/filters.py +++ b/ingestion/src/metadata/utils/filters.py @@ -18,13 +18,7 @@ code. import re from typing import List, Optional -from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema -from metadata.generated.schema.entity.data.table import Table from metadata.generated.schema.type.filterPattern import FilterPattern -from metadata.ingestion.api.source import SourceStatus -from metadata.ingestion.models.topology import TopologyContext -from metadata.ingestion.ometa.ometa_api import OpenMetadata -from metadata.utils import fqn class InvalidPatternException(Exception): @@ -43,7 +37,7 @@ def validate_regex(regex_list: List[str]) -> None: re.compile(regex) except re.error as err: msg = f"Invalid regex [{regex}]: {err}" - raise InvalidPatternException(msg) + raise InvalidPatternException(msg) from err def _filter(filter_pattern: Optional[FilterPattern], name: str) -> bool: diff --git a/ingestion/src/metadata/utils/fqn.py b/ingestion/src/metadata/utils/fqn.py index d45df69aafa..2c665c6e173 100644 --- a/ingestion/src/metadata/utils/fqn.py +++ b/ingestion/src/metadata/utils/fqn.py @@ -115,12 +115,12 @@ def build(metadata: OpenMetadata, entity_type: Type[T], **kwargs) -> Optional[st :param kwargs: required to build the FQN :return: FQN as a string """ - fn = fqn_build_registry.registry.get(entity_type.__name__) - if not fn: + func = fqn_build_registry.registry.get(entity_type.__name__) + if not func: raise FQNBuildingException( f"Invalid Entity Type {entity_type.__name__}. FQN builder not implemented." ) - return fn(metadata, **kwargs) + return func(metadata, **kwargs) @fqn_build_registry.add(Table) @@ -394,14 +394,13 @@ def _( column_name, test_case_name, ) - else: - return _build( - service_name, - database_name, - schema_name, - table_name, - test_case_name, - ) + return _build( + service_name, + database_name, + schema_name, + table_name, + test_case_name, + ) def split_table_name(table_name: str) -> Dict[str, Optional[str]]: diff --git a/ingestion/src/metadata/utils/gcs_utils.py b/ingestion/src/metadata/utils/gcs_utils.py index 2e49c1baaeb..7be944971f1 100644 --- a/ingestion/src/metadata/utils/gcs_utils.py +++ b/ingestion/src/metadata/utils/gcs_utils.py @@ -9,6 +9,10 @@ # See the License for the specific language governing permissions and # limitations under the License. +""" +Utils module to convert different file types from gcs buckets into a dataframe +""" + import json import traceback from typing import Any @@ -24,6 +28,10 @@ logger = utils_logger() def read_csv_from_gcs(key: str, bucket_name: str, sample_size: int = 100) -> DataFrame: + """ + Read the csv file from the gcs bucket and return a dataframe + """ + try: return pd.read_csv(f"gs://{bucket_name}/{key}", sep=",", nrows=sample_size + 1) except Exception as exc: @@ -32,6 +40,10 @@ def read_csv_from_gcs(key: str, bucket_name: str, sample_size: int = 100) -> Dat def read_tsv_from_gcs(key: str, bucket_name: str, sample_size: int = 100) -> DataFrame: + """ + Read the tsv file from the gcs bucket and return a dataframe + """ + try: return pd.read_csv(f"gs://{bucket_name}/{key}", sep="\t", nrows=sample_size + 1) except Exception as exc: @@ -42,15 +54,18 @@ def read_tsv_from_gcs(key: str, bucket_name: str, sample_size: int = 100) -> Dat def read_json_from_gcs( client: Any, key: str, bucket_name: str, sample_size=100 ) -> DataFrame: + """ + Read the json file from the gcs bucket and return a dataframe + """ + try: bucket = client.get_bucket(bucket_name) data = json.loads(bucket.get_blob(key).download_as_string()) if isinstance(data, list): return pd.DataFrame.from_records(data, nrows=sample_size) - else: - return pd.DataFrame.from_dict( - dict([(k, pd.Series(v)) for k, v in data.items()]) - ) + return pd.DataFrame.from_dict( + dict([(k, pd.Series(v)) for k, v in data.items()]) + ) except ValueError as verr: logger.debug(traceback.format_exc()) @@ -58,6 +73,10 @@ def read_json_from_gcs( def read_parquet_from_gcs(key: str, bucket_name: str) -> DataFrame: + """ + Read the parquet file from the gcs bucket and return a dataframe + """ + gcs = gcsfs.GCSFileSystem() - f = gcs.open(f"gs://{bucket_name}/{key}") - return ParquetFile(f).schema.to_arrow_schema().empty_table().to_pandas() + file = gcs.open(f"gs://{bucket_name}/{key}") + return ParquetFile(file).schema.to_arrow_schema().empty_table().to_pandas() diff --git a/ingestion/src/metadata/utils/graphql_queries.py b/ingestion/src/metadata/utils/graphql_queries.py index 0094e6e83ea..3e372537653 100644 --- a/ingestion/src/metadata/utils/graphql_queries.py +++ b/ingestion/src/metadata/utils/graphql_queries.py @@ -1,3 +1,18 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +GraphQL queries used during ingestion +""" + DAGSTER_PIPELINE_DETAILS_GRAPHQL = """ query AssetNodeQuery { assetNodes { diff --git a/ingestion/src/metadata/utils/helpers.py b/ingestion/src/metadata/utils/helpers.py index b2aead8d9eb..adab09bba14 100644 --- a/ingestion/src/metadata/utils/helpers.py +++ b/ingestion/src/metadata/utils/helpers.py @@ -9,6 +9,10 @@ # See the License for the specific language governing permissions and # limitations under the License. +""" +Helpers module for ingestion related methods +""" + import re from datetime import datetime, timedelta from functools import wraps @@ -67,6 +71,10 @@ om_chart_type_dict = { def calculate_execution_time(func): + """ + Method to calculate workflow execution time + """ + @wraps(func) def calculate_debug_time(*args, **kwargs): start = perf_counter() @@ -80,6 +88,10 @@ def calculate_execution_time(func): def calculate_execution_time_generator(func): + """ + Generator method to calculate workflow execution time + """ + def calculate_debug_time(*args, **kwargs): start = perf_counter() yield from func(*args, **kwargs) @@ -92,6 +104,10 @@ def calculate_execution_time_generator(func): def pretty_print_time_duration(duration: int) -> str: + """ + Method to format and display the time + """ + days = divmod(duration, 86400)[0] hours = divmod(duration, 3600)[0] minutes = divmod(duration, 60)[0] @@ -106,6 +122,10 @@ def pretty_print_time_duration(duration: int) -> str: def get_start_and_end(duration): + """ + Method to return start and end time based on duration + """ + today = datetime.utcnow() start = (today + timedelta(0 - duration)).replace( hour=0, minute=0, second=0, microsecond=0 @@ -115,17 +135,24 @@ def get_start_and_end(duration): return start, end -def snake_to_camel(s): - a = s.split("_") - a[0] = a[0].capitalize() - if len(a) > 1: - a[1:] = [u.title() for u in a[1:]] - return "".join(a) +def snake_to_camel(snake_str): + """ + Method to convert snake case text to camel case + """ + split_str = snake_str.split("_") + split_str[0] = split_str[0].capitalize() + if len(split_str) > 1: + split_str[1:] = [u.title() for u in split_str[1:]] + return "".join(split_str) def get_database_service_or_create( config: WorkflowSource, metadata_config, service_name=None ) -> DatabaseService: + """ + Get an existing database service or create a new one based on the config provided + """ + metadata = OpenMetadata(metadata_config) if not service_name: service_name = config.serviceName @@ -190,19 +217,22 @@ def get_messaging_service_or_create( config: dict, metadata_config, ) -> MessagingService: + """ + Get an existing messaging service or create a new one based on the config provided + """ + metadata = OpenMetadata(metadata_config) service: MessagingService = metadata.get_by_name( entity=MessagingService, fqn=service_name ) if service is not None: return service - else: - created_service = metadata.create_or_update( - CreateMessagingServiceRequest( - name=service_name, serviceType=message_service_type, connection=config - ) + created_service = metadata.create_or_update( + CreateMessagingServiceRequest( + name=service_name, serviceType=message_service_type, connection=config ) - return created_service + ) + return created_service def get_dashboard_service_or_create( @@ -211,36 +241,42 @@ def get_dashboard_service_or_create( config: dict, metadata_config, ) -> DashboardService: + """ + Get an existing dashboard service or create a new one based on the config provided + """ + metadata = OpenMetadata(metadata_config) service: DashboardService = metadata.get_by_name( entity=DashboardService, fqn=service_name ) if service is not None: return service - else: - dashboard_config = {"config": config} - created_service = metadata.create_or_update( - CreateDashboardServiceRequest( - name=service_name, - serviceType=dashboard_service_type, - connection=dashboard_config, - ) + dashboard_config = {"config": config} + created_service = metadata.create_or_update( + CreateDashboardServiceRequest( + name=service_name, + serviceType=dashboard_service_type, + connection=dashboard_config, ) - return created_service + ) + return created_service def get_storage_service_or_create(service_json, metadata_config) -> StorageService: + """ + Get an existing storage service or create a new one based on the config provided + """ + metadata = OpenMetadata(metadata_config) service: StorageService = metadata.get_by_name( entity=StorageService, fqn=service_json["name"] ) if service is not None: return service - else: - created_service = metadata.create_or_update( - CreateStorageServiceRequest(**service_json) - ) - return created_service + created_service = metadata.create_or_update( + CreateStorageServiceRequest(**service_json) + ) + return created_service def datetime_to_ts(date: Optional[datetime]) -> Optional[int]: @@ -251,6 +287,10 @@ def datetime_to_ts(date: Optional[datetime]) -> Optional[int]: def get_formatted_entity_name(name: str) -> Optional[str]: + """ + Method to get formatted entity name + """ + return ( name.replace("[", "").replace("]", "").replace(".", "") if name @@ -290,12 +330,16 @@ def get_standard_chart_type(raw_chart_type: str) -> str: def get_chart_entities_from_id( chart_ids: List[str], metadata: OpenMetadata, service_name: str ) -> List[EntityReferenceList]: + """ + Method to get the chart entity using get_by_name api + """ + entities = [] - for id in chart_ids: + for chart_id in chart_ids: chart: Chart = metadata.get_by_name( entity=Chart, fqn=fqn.build( - metadata, Chart, chart_name=str(id), service_name=service_name + metadata, Chart, chart_name=str(chart_id), service_name=service_name ), ) if chart: diff --git a/ingestion/src/metadata/utils/logger.py b/ingestion/src/metadata/utils/logger.py index f9324b28464..222668571fa 100644 --- a/ingestion/src/metadata/utils/logger.py +++ b/ingestion/src/metadata/utils/logger.py @@ -25,6 +25,10 @@ logging.basicConfig(format=BASE_LOGGING_FORMAT, datefmt="%Y-%m-%d %H:%M:%S") class Loggers(Enum): + """ + Enum for loggers + """ + OMETA = "OMetaAPI" CLI = "Metadata" PROFILER = "Profiler" @@ -36,34 +40,66 @@ class Loggers(Enum): def ometa_logger(): + """ + Method to get the OMETA logger + """ + return logging.getLogger(Loggers.OMETA.value) def cli_logger(): + """ + Method to get the CLI logger + """ + return logging.getLogger(Loggers.CLI.value) def profiler_logger(): + """ + Method to get the PROFILER logger + """ + return logging.getLogger(Loggers.PROFILER.value) def test_suite_logger(): + """ + Method to get the TEST SUITE logger + """ + return logging.getLogger(Loggers.TEST_SUITE.value) def sqa_interface_registry_logger(): + """ + Method to get the SQA PROFILER INTERFACE logger + """ + return logging.getLogger(Loggers.SQA_PROFILER_INTERFACE.value) def ingestion_logger(): + """ + Method to get the INGESTION logger + """ + return logging.getLogger(Loggers.INGESTION.value) def utils_logger(): + """ + Method to get the UTILS logger + """ + return logging.getLogger(Loggers.UTILS.value) def great_expectations_logger(): + """ + Method to get the GREAT EXPECTATIONS logger + """ + return logging.getLogger(Loggers.GREAT_EXPECTATIONS.value) diff --git a/ingestion/src/metadata/utils/lru_cache.py b/ingestion/src/metadata/utils/lru_cache.py index 0964e6d4ca3..cbc83e26903 100644 --- a/ingestion/src/metadata/utils/lru_cache.py +++ b/ingestion/src/metadata/utils/lru_cache.py @@ -1,3 +1,14 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + """ LRU cache """ diff --git a/ingestion/src/metadata/utils/s3_utils.py b/ingestion/src/metadata/utils/s3_utils.py index 2180cbf6731..c4470aa81c6 100644 --- a/ingestion/src/metadata/utils/s3_utils.py +++ b/ingestion/src/metadata/utils/s3_utils.py @@ -9,6 +9,10 @@ # See the License for the specific language governing permissions and # limitations under the License. +""" +Utils module to convert different file types from s3 buckets into a dataframe +""" + import json import os from typing import Any @@ -22,6 +26,10 @@ from pyarrow.parquet import ParquetFile def read_csv_from_s3( client: Any, key: str, bucket_name: str, sep: str = ",", sample_size: int = 100 ) -> DataFrame: + """ + Read the csv file from the s3 bucket and return a dataframe + """ + stream = client.get_object(Bucket=bucket_name, Key=key)["Body"] return pd.read_csv(stream, sep=sep, nrows=sample_size + 1) @@ -29,20 +37,32 @@ def read_csv_from_s3( def read_tsv_from_s3( client, key: str, bucket_name: str, sample_size: int = 100 ) -> DataFrame: + """ + Read the tsv file from the s3 bucket and return a dataframe + """ + read_csv_from_s3(client, key, bucket_name, sep="\t", sample_size=sample_size) def read_json_from_s3( client: Any, key: str, bucket_name: str, sample_size=100 ) -> DataFrame: + """ + Read the json file from the s3 bucket and return a dataframe + """ + line_stream = client.get_object(Bucket=bucket_name, Key=key)["Body"].iter_lines() return pd.DataFrame.from_records(map(json.loads, line_stream), nrows=sample_size) def read_parquet_from_s3(client: Any, key: str, bucket_name: str) -> DataFrame: - s3 = fs.S3FileSystem(region=client.meta.region_name) + """ + Read the parquet file from the s3 bucket and return a dataframe + """ + + s3_file = fs.S3FileSystem(region=client.meta.region_name) return ( - ParquetFile(s3.open_input_file(os.path.join(bucket_name, key))) + ParquetFile(s3_file.open_input_file(os.path.join(bucket_name, key))) .schema.to_arrow_schema() .empty_table() .to_pandas() diff --git a/ingestion/src/metadata/utils/secrets/aws_based_secrets_manager.py b/ingestion/src/metadata/utils/secrets/aws_based_secrets_manager.py index f5a87df0915..f9f790e4fce 100644 --- a/ingestion/src/metadata/utils/secrets/aws_based_secrets_manager.py +++ b/ingestion/src/metadata/utils/secrets/aws_based_secrets_manager.py @@ -29,6 +29,10 @@ NULL_VALUE = "null" class AWSBasedSecretsManager(ExternalSecretsManager, ABC): + """ + AWS Secrets Manager class + """ + def __init__( self, credentials: Optional[AWSCredentials], diff --git a/ingestion/src/metadata/utils/secrets/aws_secrets_manager.py b/ingestion/src/metadata/utils/secrets/aws_secrets_manager.py index bf9dc1babfc..f43070dbfe8 100644 --- a/ingestion/src/metadata/utils/secrets/aws_secrets_manager.py +++ b/ingestion/src/metadata/utils/secrets/aws_secrets_manager.py @@ -29,6 +29,10 @@ from metadata.utils.secrets.secrets_manager import logger class AWSSecretsManager(AWSBasedSecretsManager): + """ + Secrets Manager Implementation Class + """ + def __init__(self, credentials: Optional[AWSCredentials], cluster_prefix: str): super().__init__( credentials, "secretsmanager", SecretsManagerProvider.aws, cluster_prefix @@ -59,7 +63,6 @@ class AWSSecretsManager(AWSBasedSecretsManager): if response["SecretString"] != NULL_VALUE else None ) - else: - raise ValueError( - f"SecretString for secret [{name}] not present in the response." - ) + raise ValueError( + f"SecretString for secret [{name}] not present in the response." + ) diff --git a/ingestion/src/metadata/utils/secrets/aws_ssm_secrets_manager.py b/ingestion/src/metadata/utils/secrets/aws_ssm_secrets_manager.py index 2fc39fbe2c4..9643d44b6b7 100644 --- a/ingestion/src/metadata/utils/secrets/aws_ssm_secrets_manager.py +++ b/ingestion/src/metadata/utils/secrets/aws_ssm_secrets_manager.py @@ -29,6 +29,10 @@ from metadata.utils.secrets.secrets_manager import logger class AWSSSMSecretsManager(AWSBasedSecretsManager): + """ + AWS SSM Parameter Store Secret Manager Class + """ + def __init__(self, credentials: Optional[AWSCredentials], cluster_prefix: str): super().__init__(credentials, "ssm", SecretsManagerProvider.aws, cluster_prefix) @@ -55,7 +59,6 @@ class AWSSSMSecretsManager(AWSBasedSecretsManager): if response["Parameter"]["Value"] != NULL_VALUE else None ) - else: - raise ValueError( - f"Parameter for parameter name [{name}] not present in the response." - ) + raise ValueError( + f"Parameter for parameter name [{name}] not present in the response." + ) diff --git a/ingestion/src/metadata/utils/secrets/external_secrets_manager.py b/ingestion/src/metadata/utils/secrets/external_secrets_manager.py index 308fe81d86a..1b3b1360552 100644 --- a/ingestion/src/metadata/utils/secrets/external_secrets_manager.py +++ b/ingestion/src/metadata/utils/secrets/external_secrets_manager.py @@ -25,11 +25,7 @@ from metadata.generated.schema.entity.services.connections.metadata.secretsManag from metadata.generated.schema.entity.services.connections.serviceConnection import ( ServiceConnection, ) -from metadata.generated.schema.entity.teams.authN.jwtAuth import JWTAuthMechanism from metadata.generated.schema.metadataIngestion.workflow import SourceConfig -from metadata.generated.schema.security.client.openMetadataJWTClientConfig import ( - OpenMetadataJWTClientConfig, -) from metadata.utils.logger import utils_logger from metadata.utils.secrets.secrets_manager import ( AUTH_PROVIDER_MAPPING, @@ -48,6 +44,10 @@ NULL_VALUE = "null" class ExternalSecretsManager(SecretsManager, ABC): + """ + Abstract class for third party secrets' manager implementations + """ + def __init__( self, cluster_prefix: str, @@ -115,7 +115,7 @@ class ExternalSecretsManager(SecretsManager, ABC): ).parse_obj(config_object) except KeyError as err: msg = f"No client implemented for auth provider [{config.authProvider}]: {err}" - raise NotImplementedError(msg) + raise NotImplementedError(msg) from err def retrieve_dbt_source_config( self, source_config: SourceConfig, pipeline_name: str diff --git a/ingestion/src/metadata/utils/secrets/secrets_manager_factory.py b/ingestion/src/metadata/utils/secrets/secrets_manager_factory.py index 978803687a4..bf1f7e825de 100644 --- a/ingestion/src/metadata/utils/secrets/secrets_manager_factory.py +++ b/ingestion/src/metadata/utils/secrets/secrets_manager_factory.py @@ -61,9 +61,8 @@ def get_secrets_manager( or secrets_manager_provider == SecretsManagerProvider.noop ): return NoopSecretsManager(cluster_name) - elif secrets_manager_provider == SecretsManagerProvider.aws: + if secrets_manager_provider == SecretsManagerProvider.aws: return AWSSecretsManager(credentials, cluster_name) - elif secrets_manager_provider == SecretsManagerProvider.aws_ssm: + if secrets_manager_provider == SecretsManagerProvider.aws_ssm: return AWSSSMSecretsManager(credentials, cluster_name) - else: - raise NotImplementedError(f"[{secrets_manager_provider}] is not implemented.") + raise NotImplementedError(f"[{secrets_manager_provider}] is not implemented.") diff --git a/ingestion/src/metadata/utils/singleton.py b/ingestion/src/metadata/utils/singleton.py index 99fb51a383c..0d16a62ecc2 100644 --- a/ingestion/src/metadata/utils/singleton.py +++ b/ingestion/src/metadata/utils/singleton.py @@ -1,7 +1,26 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Singleton Module +""" + from abc import ABCMeta class Singleton(ABCMeta): + """ + Singleton class + """ + _instances = {} def __call__(cls, *args, **kwargs): @@ -11,4 +30,7 @@ class Singleton(ABCMeta): @classmethod def clear_all(cls): + """ + Method to clear all singleton instances + """ Singleton._instances = {} diff --git a/ingestion/src/metadata/utils/source_connections.py b/ingestion/src/metadata/utils/source_connections.py index b1c498755e3..5ecaae2be82 100644 --- a/ingestion/src/metadata/utils/source_connections.py +++ b/ingestion/src/metadata/utils/source_connections.py @@ -93,6 +93,10 @@ CX_ORACLE_LIB_VERSION = "8.3.0" def get_connection_url_common(connection): + """ + Common method for building the source connection urls + """ + url = f"{connection.scheme.value}://" if connection.username: @@ -128,7 +132,11 @@ def get_connection_url_common(connection): @singledispatch def get_connection_url(connection): - raise NotImplemented( + """ + Single dispatch method to get the source connection url + """ + + raise NotImplementedError( f"Connection URL build not implemented for type {type(connection)}: {connection}" ) @@ -256,6 +264,10 @@ def _(connection: PrestoConnection): @singledispatch def get_connection_args(connection): + """ + Single dispatch method to get the connection arguments + """ + return connection.connectionArguments or {} @@ -268,10 +280,8 @@ def _(connection: TrinoConnection): connection_args = connection.connectionArguments.dict() connection_args.update({"http_session": session}) return connection_args - else: - return {"http_session": session} - else: - return connection.connectionArguments if connection.connectionArguments else {} + return {"http_session": session} + return connection.connectionArguments if connection.connectionArguments else {} @get_connection_url.register diff --git a/ingestion/src/metadata/utils/uuid_encoder.py b/ingestion/src/metadata/utils/uuid_encoder.py index 2755469e726..00d5b8c8a8d 100644 --- a/ingestion/src/metadata/utils/uuid_encoder.py +++ b/ingestion/src/metadata/utils/uuid_encoder.py @@ -1,8 +1,27 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +UUID Encoder Module +""" + import json from uuid import UUID class UUIDEncoder(json.JSONEncoder): + """ + UUID Encoder class + """ + def default(self, obj): if isinstance(obj, UUID): # if the obj is uuid, we simply return the value of uuid diff --git a/ingestion/src/metadata/utils/workflow_output_handler.py b/ingestion/src/metadata/utils/workflow_output_handler.py index 4adf7a7fd35..23a75c922e3 100644 --- a/ingestion/src/metadata/utils/workflow_output_handler.py +++ b/ingestion/src/metadata/utils/workflow_output_handler.py @@ -9,6 +9,10 @@ # See the License for the specific language governing permissions and # limitations under the License. +""" +Module handles the output messages from different workflows +""" + import time from enum import Enum from pathlib import Path @@ -25,29 +29,33 @@ from metadata.utils.helpers import pretty_print_time_duration class WorkflowType(Enum): - ingest = "ingest" - profile = "profile" - test = "test" - lineage = "lineage" - usage = "usage" + """ + Workflow type enums + """ + + INGEST = "ingest" + PROFILE = "profile" + TEST = "test" + LINEAGE = "lineage" + USAGE = "usage" EXAMPLES_WORKFLOW_PATH: Path = Path(__file__).parent / "../examples" / "workflows" URLS = { - WorkflowType.ingest: "https://docs.open-metadata.org/openmetadata/ingestion", - WorkflowType.profile: "https://docs.open-metadata.org/openmetadata/ingestion/workflows/profiler", - WorkflowType.test: "https://docs.open-metadata.org/openmetadata/ingestion/workflows/data-quality", - WorkflowType.lineage: "https://docs.open-metadata.org/openmetadata/ingestion/workflows/lineage", - WorkflowType.usage: "https://docs.open-metadata.org/openmetadata/ingestion/workflows/usage", + WorkflowType.INGEST: "https://docs.open-metadata.org/openmetadata/ingestion", + WorkflowType.PROFILE: "https://docs.open-metadata.org/openmetadata/ingestion/workflows/profiler", + WorkflowType.TEST: "https://docs.open-metadata.org/openmetadata/ingestion/workflows/data-quality", + WorkflowType.LINEAGE: "https://docs.open-metadata.org/openmetadata/ingestion/workflows/lineage", + WorkflowType.USAGE: "https://docs.open-metadata.org/openmetadata/ingestion/workflows/usage", } DEFAULT_EXAMPLE_FILE = { - WorkflowType.ingest: "bigquery", - WorkflowType.profile: "bigquery_profiler", - WorkflowType.test: "test_suite", - WorkflowType.lineage: "bigquery_lineage", - WorkflowType.usage: "bigquery_usage", + WorkflowType.INGEST: "bigquery", + WorkflowType.PROFILE: "bigquery_profiler", + WorkflowType.TEST: "test_suite", + WorkflowType.LINEAGE: "bigquery_lineage", + WorkflowType.USAGE: "bigquery_usage", } @@ -85,22 +93,21 @@ def calculate_ingestion_type(source_type_name: str) -> WorkflowType: Calculates the ingestion type depending on the source type name """ if source_type_name.endswith("lineage"): - return WorkflowType.lineage - elif source_type_name.endswith("usage"): - return WorkflowType.usage - return WorkflowType.ingest + return WorkflowType.LINEAGE + if source_type_name.endswith("usage"): + return WorkflowType.USAGE + return WorkflowType.INGEST def calculate_example_file(source_type_name: str, workflow_type: WorkflowType) -> str: """ Calculates the ingestion type depending on the source type name and workflow_type """ - if workflow_type == WorkflowType.profile: + if workflow_type == WorkflowType.PROFILE: return f"{source_type_name}_profiler" - elif workflow_type == WorkflowType.test: + if workflow_type == WorkflowType.TEST: return DEFAULT_EXAMPLE_FILE[workflow_type] - else: - return source_type_name + return source_type_name def print_file_example(source_type_name: str, workflow_type: WorkflowType): @@ -124,7 +131,7 @@ def print_file_example(source_type_name: str, workflow_type: WorkflowType): def print_init_error( exc: Union[Exception, Type[Exception]], config: dict, - workflow_type: WorkflowType = WorkflowType.ingest, + workflow_type: WorkflowType = WorkflowType.INGEST, ) -> None: """ Click echo print a workflow initialization error @@ -139,7 +146,7 @@ def print_init_error( source_type_name = source_type_name.replace("-", "-") workflow_type = ( calculate_ingestion_type(source_type_name) - if workflow_type == WorkflowType.ingest + if workflow_type == WorkflowType.INGEST else workflow_type ) @@ -147,11 +154,9 @@ def print_init_error( print_error_msg(f"Error loading {workflow_type.name} configuration: {exc}") print_file_example(source_type_name, workflow_type) print_more_info(workflow_type) - elif isinstance(exc, ConfigurationError) or isinstance( - exc, InvalidWorkflowException - ): + elif isinstance(exc, (ConfigurationError, InvalidWorkflowException)): print_error_msg(f"Error loading {workflow_type.name} configuration: {exc}") - if workflow_type == WorkflowType.usage: + if workflow_type == WorkflowType.USAGE: print_file_example(source_type_name, workflow_type) print_more_info(workflow_type) else: