Added fixes according to pylint (#8009)

Co-authored-by: Onkar Ravgan <onkarravgan@Onkars-MacBook-Pro.local>
This commit is contained in:
Onkar Ravgan 2022-10-10 16:23:47 +05:30 committed by GitHub
parent 9571ffc127
commit 107eeef8c7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
33 changed files with 512 additions and 196 deletions

View File

@ -8,6 +8,9 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Module for getting versions of OpenMetadata and python
"""
import os
import sys
@ -18,14 +21,14 @@ version = pkg_resources.require("openmetadata-ingestion")[0].version
def get_metadata_version() -> str:
"""
Return the OpenMetadata version
"""
metadata_pkg_dir = os.path.join(os.path.dirname(__file__), "..", "..")
metadata_pkg_dir = os.path.abspath(metadata_pkg_dir)
return "metadata {} from {} (python {})".format(
version,
metadata_pkg_dir,
get_major_minor_version(),
)
return f"metadata {version} from {metadata_pkg_dir} (python {get_major_minor_version()})"
def get_major_minor_version() -> str:

View File

@ -1,3 +1,18 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Database Dumping utility for the metadata CLI
"""
from pathlib import Path
from typing import List
@ -116,7 +131,7 @@ def dump(engine: Engine, output: Path, schema: str = None) -> None:
for table in tables
if table not in TABLES_DUMP_ALL
and table not in NOT_MIGRATE
and table not in CUSTOM_TABLES.keys()
and table not in CUSTOM_TABLES
]
dump_all(tables=list(TABLES_DUMP_ALL), engine=engine, output=output)

View File

@ -21,7 +21,7 @@ from base64 import b64encode
from datetime import timedelta
import click
import requests as requests
import requests
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
@ -36,8 +36,8 @@ from metadata.utils.client_version import get_client_version
from metadata.utils.logger import cli_logger, ometa_logger
logger = cli_logger()
calc_gb = 1024 * 1024 * 1024
min_memory_limit = 6 * calc_gb
CALC_GB = 1024 * 1024 * 1024
MIN_MEMORY_LIMIT = 6 * CALC_GB
RELEASE_BRANCH_VERSION = get_client_version()
DOCKER_URL_ROOT = f"https://raw.githubusercontent.com/open-metadata/OpenMetadata/{RELEASE_BRANCH_VERSION}/docker/metadata/"
@ -51,6 +51,10 @@ DEFUALT_JWT_TOKEN = "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ
def start_docker(docker, start_time, file_path, ingest_sample_data: bool):
"""
Method for starting up the docker containers
"""
logger.info("Running docker compose for OpenMetadata..")
click.secho("It may take some time on the first run", fg="bright_yellow")
if file_path:
@ -120,15 +124,20 @@ It helps OpenMetadata reach wider audience and helps our community.\n""",
def env_file_check(env_file_path):
"""
Method for checking if the env file path is valid
"""
if env_file_path is not None:
if env_file_path == "":
raise ValueError("Please provide path to env file")
else:
logger.info(f"Using env file from {env_file_path}")
return pathlib.Path(env_file_path)
logger.info(f"Using env file from {env_file_path}")
return pathlib.Path(env_file_path)
def file_path_check(file_path, database: str):
"""
Method for checking if the file path is valid
"""
docker_compose_file_name = BACKEND_DATABASES.get(database) or DEFAULT_COMPOSE_FILE
@ -140,16 +149,15 @@ def file_path_check(file_path, database: str):
logger.info(
f"Downloading latest docker compose file {docker_compose_file_name} from openmetadata repository..."
)
r = requests.get(f"{DOCKER_URL_ROOT}{docker_compose_file_name}")
resp = requests.get(f"{DOCKER_URL_ROOT}{docker_compose_file_name}")
with open(docker_compose_file_path, "wb") as docker_compose_file_handle:
docker_compose_file_handle.write(r.content)
docker_compose_file_handle.write(resp.content)
docker_compose_file_handle.close()
else:
if file_path == "":
raise ValueError("Please Provide Path to local docker-compose.yml file")
else:
logger.info(f"Using docker compose file from {file_path}")
docker_compose_file_path = pathlib.Path(file_path)
logger.info(f"Using docker compose file from {file_path}")
docker_compose_file_path = pathlib.Path(file_path)
return docker_compose_file_path
@ -165,6 +173,9 @@ def run_docker(
ingest_sample_data: bool,
database: str,
):
"""
Main method for the OpenMetadata docker commands
"""
try:
from python_on_whales import DockerClient
@ -181,7 +192,7 @@ def run_docker(
raise Exception("Docker Service is not up and running.")
logger.info("Checking openmetadata memory constraints...")
if docker_info.mem_total < min_memory_limit:
if docker_info.mem_total < MIN_MEMORY_LIMIT:
raise MemoryError
# Check for -f <Path>
@ -225,7 +236,7 @@ def run_docker(
logger.debug(traceback.format_exc())
click.secho(
f"Please Allocate More memory to Docker.\nRecommended: 6GB+\nCurrent: "
f"{round(float(dict(docker_info).get('mem_total')) / calc_gb)}",
f"{round(float(dict(docker_info).get('mem_total')) / CALC_GB)}",
fg="red",
)
except Exception as exc:
@ -234,6 +245,10 @@ def run_docker(
def reset_db_om(docker):
"""
Reset the OpenMetadata Database and clear everything in the DB
"""
if docker.container.inspect("openmetadata_server").state.running:
click.secho(
f"Resetting OpenMetadata.\nThis will clear out all the data",
@ -263,10 +278,9 @@ def wait_for_containers(docker) -> None:
)
if running:
break
else:
sys.stdout.write(".")
sys.stdout.flush()
time.sleep(5)
sys.stdout.write(".")
sys.stdout.flush()
time.sleep(5)
def run_sample_data() -> None:
@ -293,7 +307,7 @@ def run_sample_data() -> None:
resp = client.get("/dags")
if resp:
break
elif time.time() > timeout:
if time.time() > timeout:
raise TimeoutError("Ingestion container timed out")
except TimeoutError as err:
logger.debug(traceback.format_exc())
@ -304,4 +318,4 @@ def run_sample_data() -> None:
time.sleep(5)
for dag in dags:
json_sample_data = {"is_paused": False}
client.patch("/dags/{}".format(dag), data=json.dumps(json_sample_data))
client.patch(f"/dags/{dag}", data=json.dumps(json_sample_data))

View File

@ -39,7 +39,7 @@ def run_ingest(config_path: str) -> None:
logger.debug(f"Using config: {workflow.config}")
except Exception as exc:
logger.debug(traceback.format_exc())
print_init_error(exc, config_dict, WorkflowType.ingest)
print_init_error(exc, config_dict, WorkflowType.INGEST)
sys.exit(1)
workflow.execute()

View File

@ -25,9 +25,12 @@ logger = cli_logger()
def execute_sql_file(engine: Engine, input: Path, schema: str = None) -> None:
"""
Method to create the connection and execute the sql query
"""
with open(input, encoding="utf-8") as f:
for query in f.readlines():
with open(input, encoding="utf-8") as file:
for query in file.readlines():
# `%` is a reserved syntax in SQLAlchemy to bind parameters. Escaping it with `%%`
clean_query = query.replace("%", "%%")

View File

@ -1,3 +1,18 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Utils module for the metadata backup and restore process
"""
from sqlalchemy.engine import Engine
from metadata.generated.schema.entity.services.connections.database.mysqlConnection import (
@ -11,6 +26,9 @@ from metadata.utils.helpers import list_to_dict
def get_engine(host, port, user, password, options, arguments, schema, database):
"""
Get the database connection engine
"""
connection_options = list_to_dict(options)
connection_arguments = list_to_dict(arguments)

View File

@ -9,6 +9,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""
This module defines the CLI commands for OpenMetada
"""
import logging
import os
import pathlib
@ -54,6 +58,7 @@ def check() -> None:
required=False,
)
def metadata(debug: bool, log_level: str) -> None:
"""Method to set logger information"""
if debug:
set_loggers_level(logging.DEBUG)
elif log_level:
@ -96,7 +101,7 @@ def test(config: str) -> None:
workflow = TestSuiteWorkflow.create(workflow_test_config_dict)
except Exception as exc:
logger.debug(traceback.format_exc())
print_init_error(exc, workflow_test_config_dict, WorkflowType.profile)
print_init_error(exc, workflow_test_config_dict, WorkflowType.PROFILE)
sys.exit(1)
workflow.execute()
@ -124,7 +129,7 @@ def profile(config: str) -> None:
workflow = ProfilerWorkflow.create(workflow_config_dict)
except Exception as exc:
logger.debug(traceback.format_exc())
print_init_error(exc, workflow_config_dict, WorkflowType.profile)
print_init_error(exc, workflow_config_dict, WorkflowType.PROFILE)
sys.exit(1)
workflow.execute()
@ -141,7 +146,10 @@ def webhook(host: str, port: int) -> None:
"""Simple Webserver to test webhook metadata events"""
class WebhookHandler(BaseHTTPRequestHandler):
"""WebhookHandler class to define the rest API methods"""
def do_GET(self):
"""WebhookHandler GET API method"""
self.send_response(200)
self.send_header("Content-type", "text/html")
self.end_headers()
@ -150,6 +158,7 @@ def webhook(host: str, port: int) -> None:
self.wfile.write(bytes(message, "utf8"))
def do_POST(self):
"""WebhookHandler POST API method"""
content_len = int(self.headers.get("Content-Length"))
post_body = self.rfile.read(content_len)
self.send_response(200)
@ -347,8 +356,8 @@ def openmetadata_imports_migration(
"""Update DAG files generated after creating workflow in 0.11 and before.
In 0.12 the airflow managed API package name changed from `openmetadata` to `openmetadata_managed_apis`
hence breaking existing DAGs. The `dag_generated_config` folder also changed location in Docker. This small CLI
utility allows you to update both elements.
hence breaking existing DAGs. The `dag_generated_config` folder also changed location in Docker.
This small CLI utility allows you to update both elements.
"""
run_openmetadata_imports_migration(dir_path, change_config_file_path)

View File

@ -21,11 +21,15 @@ from pydantic import BaseModel
class ConfigModel(BaseModel):
"""Class definition for config model"""
class Config:
extra = "forbid"
class DynamicTypedConfig(ConfigModel):
"""Class definition for Dynamic Typed Config"""
type: str
config: Optional[Any]
@ -39,15 +43,28 @@ class ConfigurationError(Exception):
class ConfigurationMechanism(ABC):
"""
Class definition for configuration mechanism
"""
@abstractmethod
def load_config(self, config_fp: IO) -> dict:
"""
Abstract method to load configuration from yaml files
"""
pass
class YamlConfigurationMechanism(ConfigurationMechanism):
"""load configuration from yaml files"""
"""
load configuration from yaml files
"""
def load_config(self, config_fp: IO) -> dict:
"""
Method to load configuration from yaml files
"""
try:
config = yaml.safe_load(config_fp)
return config
@ -58,7 +75,9 @@ class YamlConfigurationMechanism(ConfigurationMechanism):
class JsonConfigurationMechanism(ConfigurationMechanism):
"""load configuration from json files"""
"""
load configuration from json files
"""
def load_config(self, config_fp: IO) -> dict:
try:
@ -71,6 +90,10 @@ class JsonConfigurationMechanism(ConfigurationMechanism):
def load_config_file(config_file: pathlib.Path) -> dict:
"""
Method to load configuration from json or yaml,yml files
"""
if not config_file.is_file():
raise ConfigurationError(f"Cannot open config file {config_file}")
@ -81,9 +104,7 @@ def load_config_file(config_file: pathlib.Path) -> dict:
config_mech = JsonConfigurationMechanism()
else:
raise ConfigurationError(
"Only .json and .yml are supported. Cannot process file type {}".format(
config_file.suffix
)
f"Only .json and .yml are supported. Cannot process file type {config_file.suffix}"
)
with config_file.open() as raw_config_file:
raw_config = raw_config_file.read()

View File

@ -38,8 +38,7 @@ def fetch_type_class(_type: str, is_file: bool):
"""
if is_file:
return _type.replace("-", "_")
else:
return "".join([i.title() for i in _type.replace("-", "_").split("_")])
return "".join([i.title() for i in _type.replace("-", "_").split("_")])
def get_class(key: str) -> Type[T]:
@ -71,11 +70,7 @@ def get_sink(
:param _from: From where do we load the sink class. Ingestion by default.
"""
sink_class = get_class(
"metadata.{}.sink.{}.{}Sink".format(
_from,
fetch_type_class(sink_type, is_file=True),
fetch_type_class(sink_type, is_file=False),
)
f"metadata.{_from}.sink.{fetch_type_class(sink_type, is_file=True)}.{fetch_type_class(sink_type, is_file=False)}Sink"
)
sink: Sink = sink_class.create(
@ -109,11 +104,7 @@ def get_processor(
:param _from: From where do we load the sink class. Ingestion by default.
"""
processor_class = get_class(
"metadata.{}.processor.{}.{}Processor".format(
_from,
fetch_type_class(processor_type, is_file=True),
fetch_type_class(processor_type, is_file=False),
)
f"metadata.{_from}.processor.{fetch_type_class(processor_type, is_file=True)}.{fetch_type_class(processor_type, is_file=False)}Processor"
)
processor: Processor = processor_class.create(

View File

@ -1,3 +1,18 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Helper module to process the service type from the config
"""
from pydoc import locate
from typing import Type
@ -32,10 +47,18 @@ def _get_service_type_from(service_subtype: str) -> ServiceType:
def get_service_type_from_source_type(source_type: str) -> ServiceType:
"""
Method to get service type from source type
"""
return _get_service_type_from(_clean(source_type))
def get_service_class_from_service_type(service_type: ServiceType) -> Type[BaseModel]:
"""
Method to get service class from service type
"""
return locate(
f"metadata.generated.schema.entity.services.{service_type.name.lower()}Service.{service_type.name}Service"
)

View File

@ -39,7 +39,7 @@ def get_version_from_string(raw_version: str) -> str:
except AttributeError as err:
raise VersionParsingException(
f"Can't extract version from {raw_version}: {err}"
)
) from err
def get_client_version() -> str:

View File

@ -126,7 +126,7 @@ from metadata.generated.schema.entity.services.connections.pipeline.fivetranConn
FivetranConnection,
)
from metadata.generated.schema.entity.services.connections.pipeline.gluePipelineConnection import (
GluePipelineConnection as GluePipelineConnection,
GluePipelineConnection,
)
from metadata.generated.schema.entity.services.connections.pipeline.nifiConnection import (
NifiConnection,
@ -146,11 +146,19 @@ class SourceConnectionException(Exception):
def render_query_header(ometa_version: str) -> str:
"""
Render the query header for OpenMetadata Queries
"""
header_obj = {"app": "OpenMetadata", "version": ometa_version}
return f"/* {json.dumps(header_obj)} */"
def inject_query_header(conn, cursor, statement, parameters, context, executemany):
"""
Inject the query header for OpenMetadata Queries
"""
version = pkg_resources.require("openmetadata-ingestion")[0].version
statement_with_header = render_query_header(version) + "\n" + statement
return statement_with_header, parameters
@ -234,7 +242,7 @@ def _(connection: SnowflakeConnection, verbose: bool = False) -> Engine:
)
if connection.privateKey:
connection.connectionArguments = dict()
connection.connectionArguments = {}
connection.connectionArguments["private_key"] = pkb
return create_generic_connection(connection, verbose)
@ -428,10 +436,10 @@ def test_connection(connection) -> None:
conn.execute(ConnTestFn())
except OperationalError as err:
msg = f"Connection error for {connection}: {err}. Check the connection details."
raise SourceConnectionException(msg)
raise SourceConnectionException(msg) from err
except Exception as exc:
msg = f"Unknown error connecting with {connection}: {exc}."
raise SourceConnectionException(msg)
raise SourceConnectionException(msg) from exc
@test_connection.register
@ -447,10 +455,10 @@ def _(connection: DynamoClient) -> None:
connection.client.tables.all()
except ClientError as err:
msg = f"Connection error for {connection}: {err}. Check the connection details."
raise SourceConnectionException(msg)
raise SourceConnectionException(msg) from err
except Exception as exc:
msg = f"Unknown error connecting with {connection}: {exc}."
raise SourceConnectionException(msg)
raise SourceConnectionException(msg) from exc
@test_connection.register
@ -468,10 +476,10 @@ def _(connection: GlueDBClient) -> None:
except ClientError as err:
msg = f"Connection error for {connection}: {err}. Check the connection details."
raise SourceConnectionException(msg)
raise SourceConnectionException(msg) from err
except Exception as exc:
msg = f"Unknown error connecting with {connection}: {exc}."
raise SourceConnectionException(msg)
raise SourceConnectionException(msg) from exc
@test_connection.register
@ -487,10 +495,10 @@ def _(connection: GluePipelineClient) -> None:
connection.client.list_workflows()
except ClientError as err:
msg = f"Connection error for {connection}: {err}. Check the connection details."
raise SourceConnectionException(msg)
raise SourceConnectionException(msg) from err
except Exception as exc:
msg = f"Unknown error connecting with {connection}: {exc}."
raise SourceConnectionException(msg)
raise SourceConnectionException(msg) from exc
@test_connection.register
@ -501,10 +509,10 @@ def _(connection: SalesforceClient) -> None:
connection.client.describe()
except SalesforceAuthenticationFailed as err:
msg = f"Connection error for {connection}: {err}. Check the connection details."
raise SourceConnectionException(msg)
raise SourceConnectionException(msg) from err
except Exception as exc:
msg = f"Unknown error connecting with {connection}: {exc}."
raise SourceConnectionException(msg)
raise SourceConnectionException(msg) from exc
@test_connection.register
@ -520,7 +528,7 @@ def _(connection: KafkaClient) -> None:
_ = connection.schema_registry_client.get_subjects()
except Exception as exc:
msg = f"Unknown error connecting with {connection}: {exc}."
raise SourceConnectionException(msg)
raise SourceConnectionException(msg) from exc
@test_connection.register
@ -529,13 +537,13 @@ def _(connection: DeltaLakeClient) -> None:
connection.client.catalog.listDatabases()
except Exception as exc:
msg = f"Unknown error connecting with {connection}: {exc}."
raise SourceConnectionException(msg)
raise SourceConnectionException(msg) from exc
@get_connection.register
def _(connection: MetabaseConnection, verbose: bool = False):
try:
params = dict()
params = {}
params["username"] = connection.username
params["password"] = connection.password.get_secret_value()
@ -554,7 +562,7 @@ def _(connection: MetabaseConnection, verbose: bool = False):
except Exception as exc:
msg = f"Unknown error connecting with {connection}: {exc}."
raise SourceConnectionException(msg)
raise SourceConnectionException(msg) from exc
@test_connection.register
@ -566,7 +574,7 @@ def _(connection: MetabaseClient) -> None:
)
except Exception as exc:
msg = f"Unknown error connecting with {connection}: {exc}."
raise SourceConnectionException(msg)
raise SourceConnectionException(msg) from exc
@test_connection.register
@ -575,7 +583,7 @@ def _(connection: AirflowConnection) -> None:
test_connection(connection.connection)
except Exception as exc:
msg = f"Unknown error connecting with {connection}: {exc}."
raise SourceConnectionException(msg)
raise SourceConnectionException(msg) from exc
@get_connection.register
@ -584,7 +592,7 @@ def _(connection: AirflowConnection) -> None:
return get_connection(connection.connection)
except Exception as exc:
msg = f"Unknown error connecting with {connection}: {exc}."
raise SourceConnectionException(msg)
raise SourceConnectionException(msg) from exc
@get_connection.register
@ -600,7 +608,7 @@ def _(connection: AirByteClient) -> None:
connection.client.list_workspaces()
except Exception as exc:
msg = f"Unknown error connecting with {connection}: {exc}."
raise SourceConnectionException(msg)
raise SourceConnectionException(msg) from exc
@get_connection.register
@ -616,7 +624,7 @@ def _(connection: FivetranClient) -> None:
connection.client.list_groups()
except Exception as exc:
msg = f"Unknown error connecting with {connection}: {exc}."
raise SourceConnectionException(msg)
raise SourceConnectionException(msg) from exc
@get_connection.register
@ -631,7 +639,7 @@ def _(connection: RedashConnection, verbose: bool = False):
except Exception as exc:
msg = f"Unknown error connecting with {connection}: {exc}."
raise SourceConnectionException(msg)
raise SourceConnectionException(msg) from exc
@test_connection.register
@ -640,7 +648,7 @@ def _(connection: RedashClient) -> None:
connection.client.dashboards()
except Exception as exc:
msg = f"Unknown error connecting with {connection}: {exc}."
raise SourceConnectionException(msg)
raise SourceConnectionException(msg) from exc
@get_connection.register
@ -658,7 +666,7 @@ def _(connection: SupersetClient) -> None:
connection.client.fetch_menu()
except Exception as exc:
msg = f"Unknown error connecting with {connection}: {exc}."
raise SourceConnectionException(msg)
raise SourceConnectionException(msg) from exc
@get_connection.register
@ -708,7 +716,7 @@ def _(connection: TableauClient) -> None:
except Exception as exc:
msg = f"Unknown error connecting with {connection}: {exc}."
raise SourceConnectionException(msg)
raise SourceConnectionException(msg) from exc
@get_connection.register
@ -724,7 +732,7 @@ def _(connection: PowerBiClient) -> None:
connection.client.fetch_dashboards()
except Exception as exc:
msg = f"Unknown error connecting with {connection}: {exc}."
raise SourceConnectionException(msg)
raise SourceConnectionException(msg) from exc
@get_connection.register
@ -749,7 +757,7 @@ def _(connection: LookerClient) -> None:
connection.client.me()
except Exception as exc:
msg = f"Unknown error connecting with {connection}: {exc}."
raise SourceConnectionException(msg)
raise SourceConnectionException(msg) from exc
@test_connection.register
@ -777,7 +785,7 @@ def _(connection: DatalakeClient) -> None:
except ClientError as err:
msg = f"Connection error for {connection}: {err}. Check the connection details."
raise SourceConnectionException(msg)
raise SourceConnectionException(msg) from err
@singledispatch
@ -823,7 +831,7 @@ def _(connection: ModeClient) -> None:
connection.client.get_user_account()
except Exception as exc:
msg = f"Unknown error connecting with {connection}: {exc}."
raise SourceConnectionException(msg)
raise SourceConnectionException(msg) from exc
@get_connection.register
@ -844,7 +852,7 @@ def _(connection: MlflowClientWrapper) -> None:
connection.client.list_registered_models()
except Exception as exc:
msg = f"Unknown error connecting with {connection}: {exc}."
raise SourceConnectionException(msg)
raise SourceConnectionException(msg) from exc
@get_connection.register
@ -867,7 +875,7 @@ def _(connection: NifiClientWrapper) -> None:
except Exception as err:
raise SourceConnectionException(
f"Unknown error connecting with {connection} - {err}."
)
) from err
@get_connection.register
@ -896,7 +904,7 @@ def _(connection: DagsterConnection) -> None:
return DagsterClient(connection)
except Exception as exc:
msg = f"Unknown error connecting with {connection}: {exc}."
raise SourceConnectionException(msg)
raise SourceConnectionException(msg) from exc
@test_connection.register
@ -907,4 +915,4 @@ def _(connection: DagsterClient) -> None:
connection.client._execute(TEST_QUERY_GRAPHQL)
except Exception as exc:
msg = f"Unknown error connecting with {connection}: {exc}."
raise SourceConnectionException(msg)
raise SourceConnectionException(msg) from exc

View File

@ -52,7 +52,7 @@ def validate_private_key(private_key: str) -> None:
serialization.load_pem_private_key(private_key.encode(), password=None)
except ValueError as err:
msg = f"Cannot serialise key: {err}"
raise InvalidPrivateKeyException(msg)
raise InvalidPrivateKeyException(msg) from err
def create_credential_tmp_file(credentials: dict) -> str:
@ -61,11 +61,11 @@ def create_credential_tmp_file(credentials: dict) -> str:
:param credentials: dictionary to store
:return: path to find the file
"""
with tempfile.NamedTemporaryFile(delete=False) as fp:
with tempfile.NamedTemporaryFile(delete=False) as temp_file:
cred_json = json.dumps(credentials, indent=4, separators=(",", ": "))
fp.write(cred_json.encode())
temp_file.write(cred_json.encode())
return fp.name
return temp_file.name
def build_google_credentials_dict(gcs_values: GCSValues) -> Dict[str, str]:

View File

@ -37,6 +37,10 @@ DBT_RUN_RESULTS_FILE_NAME = "run_results.json"
@singledispatch
def get_dbt_details(config):
"""
Single dispatch method to get the DBT files from different sources
"""
if config:
raise NotImplementedError(
f"Config not implemented for type {type(config)}: {config}"
@ -61,7 +65,7 @@ def _(config: DbtLocalConfig):
dbt_manifest = manifest.read()
if (
config.dbtRunResultsFilePath is not None
and config.dbtRunResultsFilePath is not ""
and config.dbtRunResultsFilePath != ""
):
logger.debug(
f"Reading [dbtRunResultsFilePath] from: {config.dbtRunResultsFilePath}"
@ -91,7 +95,7 @@ def _(config: DbtHttpConfig):
dbt_run_results = None
if (
config.dbtRunResultsHttpPath is not None
and config.dbtRunResultsHttpPath is not ""
and config.dbtRunResultsHttpPath != ""
):
logger.debug(
f"Requesting [dbtRunResultsHttpPath] to: {config.dbtRunResultsHttpPath}"

View File

@ -25,7 +25,7 @@ def enum_register():
"""
Helps us register custom function for enum values
"""
registry = dict()
registry = {}
def add(name: str):
def inner(fn):
@ -42,7 +42,7 @@ def class_register():
"""
Helps us register custom functions for classes based on their name
"""
registry = dict()
registry = {}
def add(entity_type: Type[T]):
def inner(fn):

View File

@ -33,6 +33,10 @@ class EntityLinkBuildingException(Exception):
def split(s: str) -> List[str]:
"""
Method to handle the splitting logic
"""
lexer = EntityLinkLexer(InputStream(s))
stream = CommonTokenStream(lexer)
parser = EntityLinkParser(stream)

View File

@ -18,13 +18,7 @@ code.
import re
from typing import List, Optional
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.type.filterPattern import FilterPattern
from metadata.ingestion.api.source import SourceStatus
from metadata.ingestion.models.topology import TopologyContext
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils import fqn
class InvalidPatternException(Exception):
@ -43,7 +37,7 @@ def validate_regex(regex_list: List[str]) -> None:
re.compile(regex)
except re.error as err:
msg = f"Invalid regex [{regex}]: {err}"
raise InvalidPatternException(msg)
raise InvalidPatternException(msg) from err
def _filter(filter_pattern: Optional[FilterPattern], name: str) -> bool:

View File

@ -115,12 +115,12 @@ def build(metadata: OpenMetadata, entity_type: Type[T], **kwargs) -> Optional[st
:param kwargs: required to build the FQN
:return: FQN as a string
"""
fn = fqn_build_registry.registry.get(entity_type.__name__)
if not fn:
func = fqn_build_registry.registry.get(entity_type.__name__)
if not func:
raise FQNBuildingException(
f"Invalid Entity Type {entity_type.__name__}. FQN builder not implemented."
)
return fn(metadata, **kwargs)
return func(metadata, **kwargs)
@fqn_build_registry.add(Table)
@ -394,14 +394,13 @@ def _(
column_name,
test_case_name,
)
else:
return _build(
service_name,
database_name,
schema_name,
table_name,
test_case_name,
)
return _build(
service_name,
database_name,
schema_name,
table_name,
test_case_name,
)
def split_table_name(table_name: str) -> Dict[str, Optional[str]]:

View File

@ -9,6 +9,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Utils module to convert different file types from gcs buckets into a dataframe
"""
import json
import traceback
from typing import Any
@ -24,6 +28,10 @@ logger = utils_logger()
def read_csv_from_gcs(key: str, bucket_name: str, sample_size: int = 100) -> DataFrame:
"""
Read the csv file from the gcs bucket and return a dataframe
"""
try:
return pd.read_csv(f"gs://{bucket_name}/{key}", sep=",", nrows=sample_size + 1)
except Exception as exc:
@ -32,6 +40,10 @@ def read_csv_from_gcs(key: str, bucket_name: str, sample_size: int = 100) -> Dat
def read_tsv_from_gcs(key: str, bucket_name: str, sample_size: int = 100) -> DataFrame:
"""
Read the tsv file from the gcs bucket and return a dataframe
"""
try:
return pd.read_csv(f"gs://{bucket_name}/{key}", sep="\t", nrows=sample_size + 1)
except Exception as exc:
@ -42,15 +54,18 @@ def read_tsv_from_gcs(key: str, bucket_name: str, sample_size: int = 100) -> Dat
def read_json_from_gcs(
client: Any, key: str, bucket_name: str, sample_size=100
) -> DataFrame:
"""
Read the json file from the gcs bucket and return a dataframe
"""
try:
bucket = client.get_bucket(bucket_name)
data = json.loads(bucket.get_blob(key).download_as_string())
if isinstance(data, list):
return pd.DataFrame.from_records(data, nrows=sample_size)
else:
return pd.DataFrame.from_dict(
dict([(k, pd.Series(v)) for k, v in data.items()])
)
return pd.DataFrame.from_dict(
dict([(k, pd.Series(v)) for k, v in data.items()])
)
except ValueError as verr:
logger.debug(traceback.format_exc())
@ -58,6 +73,10 @@ def read_json_from_gcs(
def read_parquet_from_gcs(key: str, bucket_name: str) -> DataFrame:
"""
Read the parquet file from the gcs bucket and return a dataframe
"""
gcs = gcsfs.GCSFileSystem()
f = gcs.open(f"gs://{bucket_name}/{key}")
return ParquetFile(f).schema.to_arrow_schema().empty_table().to_pandas()
file = gcs.open(f"gs://{bucket_name}/{key}")
return ParquetFile(file).schema.to_arrow_schema().empty_table().to_pandas()

View File

@ -1,3 +1,18 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
GraphQL queries used during ingestion
"""
DAGSTER_PIPELINE_DETAILS_GRAPHQL = """
query AssetNodeQuery {
assetNodes {

View File

@ -9,6 +9,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Helpers module for ingestion related methods
"""
import re
from datetime import datetime, timedelta
from functools import wraps
@ -67,6 +71,10 @@ om_chart_type_dict = {
def calculate_execution_time(func):
"""
Method to calculate workflow execution time
"""
@wraps(func)
def calculate_debug_time(*args, **kwargs):
start = perf_counter()
@ -80,6 +88,10 @@ def calculate_execution_time(func):
def calculate_execution_time_generator(func):
"""
Generator method to calculate workflow execution time
"""
def calculate_debug_time(*args, **kwargs):
start = perf_counter()
yield from func(*args, **kwargs)
@ -92,6 +104,10 @@ def calculate_execution_time_generator(func):
def pretty_print_time_duration(duration: int) -> str:
"""
Method to format and display the time
"""
days = divmod(duration, 86400)[0]
hours = divmod(duration, 3600)[0]
minutes = divmod(duration, 60)[0]
@ -106,6 +122,10 @@ def pretty_print_time_duration(duration: int) -> str:
def get_start_and_end(duration):
"""
Method to return start and end time based on duration
"""
today = datetime.utcnow()
start = (today + timedelta(0 - duration)).replace(
hour=0, minute=0, second=0, microsecond=0
@ -115,17 +135,24 @@ def get_start_and_end(duration):
return start, end
def snake_to_camel(s):
a = s.split("_")
a[0] = a[0].capitalize()
if len(a) > 1:
a[1:] = [u.title() for u in a[1:]]
return "".join(a)
def snake_to_camel(snake_str):
"""
Method to convert snake case text to camel case
"""
split_str = snake_str.split("_")
split_str[0] = split_str[0].capitalize()
if len(split_str) > 1:
split_str[1:] = [u.title() for u in split_str[1:]]
return "".join(split_str)
def get_database_service_or_create(
config: WorkflowSource, metadata_config, service_name=None
) -> DatabaseService:
"""
Get an existing database service or create a new one based on the config provided
"""
metadata = OpenMetadata(metadata_config)
if not service_name:
service_name = config.serviceName
@ -190,19 +217,22 @@ def get_messaging_service_or_create(
config: dict,
metadata_config,
) -> MessagingService:
"""
Get an existing messaging service or create a new one based on the config provided
"""
metadata = OpenMetadata(metadata_config)
service: MessagingService = metadata.get_by_name(
entity=MessagingService, fqn=service_name
)
if service is not None:
return service
else:
created_service = metadata.create_or_update(
CreateMessagingServiceRequest(
name=service_name, serviceType=message_service_type, connection=config
)
created_service = metadata.create_or_update(
CreateMessagingServiceRequest(
name=service_name, serviceType=message_service_type, connection=config
)
return created_service
)
return created_service
def get_dashboard_service_or_create(
@ -211,36 +241,42 @@ def get_dashboard_service_or_create(
config: dict,
metadata_config,
) -> DashboardService:
"""
Get an existing dashboard service or create a new one based on the config provided
"""
metadata = OpenMetadata(metadata_config)
service: DashboardService = metadata.get_by_name(
entity=DashboardService, fqn=service_name
)
if service is not None:
return service
else:
dashboard_config = {"config": config}
created_service = metadata.create_or_update(
CreateDashboardServiceRequest(
name=service_name,
serviceType=dashboard_service_type,
connection=dashboard_config,
)
dashboard_config = {"config": config}
created_service = metadata.create_or_update(
CreateDashboardServiceRequest(
name=service_name,
serviceType=dashboard_service_type,
connection=dashboard_config,
)
return created_service
)
return created_service
def get_storage_service_or_create(service_json, metadata_config) -> StorageService:
"""
Get an existing storage service or create a new one based on the config provided
"""
metadata = OpenMetadata(metadata_config)
service: StorageService = metadata.get_by_name(
entity=StorageService, fqn=service_json["name"]
)
if service is not None:
return service
else:
created_service = metadata.create_or_update(
CreateStorageServiceRequest(**service_json)
)
return created_service
created_service = metadata.create_or_update(
CreateStorageServiceRequest(**service_json)
)
return created_service
def datetime_to_ts(date: Optional[datetime]) -> Optional[int]:
@ -251,6 +287,10 @@ def datetime_to_ts(date: Optional[datetime]) -> Optional[int]:
def get_formatted_entity_name(name: str) -> Optional[str]:
"""
Method to get formatted entity name
"""
return (
name.replace("[", "").replace("]", "").replace("<default>.", "")
if name
@ -290,12 +330,16 @@ def get_standard_chart_type(raw_chart_type: str) -> str:
def get_chart_entities_from_id(
chart_ids: List[str], metadata: OpenMetadata, service_name: str
) -> List[EntityReferenceList]:
"""
Method to get the chart entity using get_by_name api
"""
entities = []
for id in chart_ids:
for chart_id in chart_ids:
chart: Chart = metadata.get_by_name(
entity=Chart,
fqn=fqn.build(
metadata, Chart, chart_name=str(id), service_name=service_name
metadata, Chart, chart_name=str(chart_id), service_name=service_name
),
)
if chart:

View File

@ -25,6 +25,10 @@ logging.basicConfig(format=BASE_LOGGING_FORMAT, datefmt="%Y-%m-%d %H:%M:%S")
class Loggers(Enum):
"""
Enum for loggers
"""
OMETA = "OMetaAPI"
CLI = "Metadata"
PROFILER = "Profiler"
@ -36,34 +40,66 @@ class Loggers(Enum):
def ometa_logger():
"""
Method to get the OMETA logger
"""
return logging.getLogger(Loggers.OMETA.value)
def cli_logger():
"""
Method to get the CLI logger
"""
return logging.getLogger(Loggers.CLI.value)
def profiler_logger():
"""
Method to get the PROFILER logger
"""
return logging.getLogger(Loggers.PROFILER.value)
def test_suite_logger():
"""
Method to get the TEST SUITE logger
"""
return logging.getLogger(Loggers.TEST_SUITE.value)
def sqa_interface_registry_logger():
"""
Method to get the SQA PROFILER INTERFACE logger
"""
return logging.getLogger(Loggers.SQA_PROFILER_INTERFACE.value)
def ingestion_logger():
"""
Method to get the INGESTION logger
"""
return logging.getLogger(Loggers.INGESTION.value)
def utils_logger():
"""
Method to get the UTILS logger
"""
return logging.getLogger(Loggers.UTILS.value)
def great_expectations_logger():
"""
Method to get the GREAT EXPECTATIONS logger
"""
return logging.getLogger(Loggers.GREAT_EXPECTATIONS.value)

View File

@ -1,3 +1,14 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
LRU cache
"""

View File

@ -9,6 +9,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Utils module to convert different file types from s3 buckets into a dataframe
"""
import json
import os
from typing import Any
@ -22,6 +26,10 @@ from pyarrow.parquet import ParquetFile
def read_csv_from_s3(
client: Any, key: str, bucket_name: str, sep: str = ",", sample_size: int = 100
) -> DataFrame:
"""
Read the csv file from the s3 bucket and return a dataframe
"""
stream = client.get_object(Bucket=bucket_name, Key=key)["Body"]
return pd.read_csv(stream, sep=sep, nrows=sample_size + 1)
@ -29,20 +37,32 @@ def read_csv_from_s3(
def read_tsv_from_s3(
client, key: str, bucket_name: str, sample_size: int = 100
) -> DataFrame:
"""
Read the tsv file from the s3 bucket and return a dataframe
"""
read_csv_from_s3(client, key, bucket_name, sep="\t", sample_size=sample_size)
def read_json_from_s3(
client: Any, key: str, bucket_name: str, sample_size=100
) -> DataFrame:
"""
Read the json file from the s3 bucket and return a dataframe
"""
line_stream = client.get_object(Bucket=bucket_name, Key=key)["Body"].iter_lines()
return pd.DataFrame.from_records(map(json.loads, line_stream), nrows=sample_size)
def read_parquet_from_s3(client: Any, key: str, bucket_name: str) -> DataFrame:
s3 = fs.S3FileSystem(region=client.meta.region_name)
"""
Read the parquet file from the s3 bucket and return a dataframe
"""
s3_file = fs.S3FileSystem(region=client.meta.region_name)
return (
ParquetFile(s3.open_input_file(os.path.join(bucket_name, key)))
ParquetFile(s3_file.open_input_file(os.path.join(bucket_name, key)))
.schema.to_arrow_schema()
.empty_table()
.to_pandas()

View File

@ -29,6 +29,10 @@ NULL_VALUE = "null"
class AWSBasedSecretsManager(ExternalSecretsManager, ABC):
"""
AWS Secrets Manager class
"""
def __init__(
self,
credentials: Optional[AWSCredentials],

View File

@ -29,6 +29,10 @@ from metadata.utils.secrets.secrets_manager import logger
class AWSSecretsManager(AWSBasedSecretsManager):
"""
Secrets Manager Implementation Class
"""
def __init__(self, credentials: Optional[AWSCredentials], cluster_prefix: str):
super().__init__(
credentials, "secretsmanager", SecretsManagerProvider.aws, cluster_prefix
@ -59,7 +63,6 @@ class AWSSecretsManager(AWSBasedSecretsManager):
if response["SecretString"] != NULL_VALUE
else None
)
else:
raise ValueError(
f"SecretString for secret [{name}] not present in the response."
)
raise ValueError(
f"SecretString for secret [{name}] not present in the response."
)

View File

@ -29,6 +29,10 @@ from metadata.utils.secrets.secrets_manager import logger
class AWSSSMSecretsManager(AWSBasedSecretsManager):
"""
AWS SSM Parameter Store Secret Manager Class
"""
def __init__(self, credentials: Optional[AWSCredentials], cluster_prefix: str):
super().__init__(credentials, "ssm", SecretsManagerProvider.aws, cluster_prefix)
@ -55,7 +59,6 @@ class AWSSSMSecretsManager(AWSBasedSecretsManager):
if response["Parameter"]["Value"] != NULL_VALUE
else None
)
else:
raise ValueError(
f"Parameter for parameter name [{name}] not present in the response."
)
raise ValueError(
f"Parameter for parameter name [{name}] not present in the response."
)

View File

@ -25,11 +25,7 @@ from metadata.generated.schema.entity.services.connections.metadata.secretsManag
from metadata.generated.schema.entity.services.connections.serviceConnection import (
ServiceConnection,
)
from metadata.generated.schema.entity.teams.authN.jwtAuth import JWTAuthMechanism
from metadata.generated.schema.metadataIngestion.workflow import SourceConfig
from metadata.generated.schema.security.client.openMetadataJWTClientConfig import (
OpenMetadataJWTClientConfig,
)
from metadata.utils.logger import utils_logger
from metadata.utils.secrets.secrets_manager import (
AUTH_PROVIDER_MAPPING,
@ -48,6 +44,10 @@ NULL_VALUE = "null"
class ExternalSecretsManager(SecretsManager, ABC):
"""
Abstract class for third party secrets' manager implementations
"""
def __init__(
self,
cluster_prefix: str,
@ -115,7 +115,7 @@ class ExternalSecretsManager(SecretsManager, ABC):
).parse_obj(config_object)
except KeyError as err:
msg = f"No client implemented for auth provider [{config.authProvider}]: {err}"
raise NotImplementedError(msg)
raise NotImplementedError(msg) from err
def retrieve_dbt_source_config(
self, source_config: SourceConfig, pipeline_name: str

View File

@ -61,9 +61,8 @@ def get_secrets_manager(
or secrets_manager_provider == SecretsManagerProvider.noop
):
return NoopSecretsManager(cluster_name)
elif secrets_manager_provider == SecretsManagerProvider.aws:
if secrets_manager_provider == SecretsManagerProvider.aws:
return AWSSecretsManager(credentials, cluster_name)
elif secrets_manager_provider == SecretsManagerProvider.aws_ssm:
if secrets_manager_provider == SecretsManagerProvider.aws_ssm:
return AWSSSMSecretsManager(credentials, cluster_name)
else:
raise NotImplementedError(f"[{secrets_manager_provider}] is not implemented.")
raise NotImplementedError(f"[{secrets_manager_provider}] is not implemented.")

View File

@ -1,7 +1,26 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Singleton Module
"""
from abc import ABCMeta
class Singleton(ABCMeta):
"""
Singleton class
"""
_instances = {}
def __call__(cls, *args, **kwargs):
@ -11,4 +30,7 @@ class Singleton(ABCMeta):
@classmethod
def clear_all(cls):
"""
Method to clear all singleton instances
"""
Singleton._instances = {}

View File

@ -93,6 +93,10 @@ CX_ORACLE_LIB_VERSION = "8.3.0"
def get_connection_url_common(connection):
"""
Common method for building the source connection urls
"""
url = f"{connection.scheme.value}://"
if connection.username:
@ -128,7 +132,11 @@ def get_connection_url_common(connection):
@singledispatch
def get_connection_url(connection):
raise NotImplemented(
"""
Single dispatch method to get the source connection url
"""
raise NotImplementedError(
f"Connection URL build not implemented for type {type(connection)}: {connection}"
)
@ -256,6 +264,10 @@ def _(connection: PrestoConnection):
@singledispatch
def get_connection_args(connection):
"""
Single dispatch method to get the connection arguments
"""
return connection.connectionArguments or {}
@ -268,10 +280,8 @@ def _(connection: TrinoConnection):
connection_args = connection.connectionArguments.dict()
connection_args.update({"http_session": session})
return connection_args
else:
return {"http_session": session}
else:
return connection.connectionArguments if connection.connectionArguments else {}
return {"http_session": session}
return connection.connectionArguments if connection.connectionArguments else {}
@get_connection_url.register

View File

@ -1,8 +1,27 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
UUID Encoder Module
"""
import json
from uuid import UUID
class UUIDEncoder(json.JSONEncoder):
"""
UUID Encoder class
"""
def default(self, obj):
if isinstance(obj, UUID):
# if the obj is uuid, we simply return the value of uuid

View File

@ -9,6 +9,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Module handles the output messages from different workflows
"""
import time
from enum import Enum
from pathlib import Path
@ -25,29 +29,33 @@ from metadata.utils.helpers import pretty_print_time_duration
class WorkflowType(Enum):
ingest = "ingest"
profile = "profile"
test = "test"
lineage = "lineage"
usage = "usage"
"""
Workflow type enums
"""
INGEST = "ingest"
PROFILE = "profile"
TEST = "test"
LINEAGE = "lineage"
USAGE = "usage"
EXAMPLES_WORKFLOW_PATH: Path = Path(__file__).parent / "../examples" / "workflows"
URLS = {
WorkflowType.ingest: "https://docs.open-metadata.org/openmetadata/ingestion",
WorkflowType.profile: "https://docs.open-metadata.org/openmetadata/ingestion/workflows/profiler",
WorkflowType.test: "https://docs.open-metadata.org/openmetadata/ingestion/workflows/data-quality",
WorkflowType.lineage: "https://docs.open-metadata.org/openmetadata/ingestion/workflows/lineage",
WorkflowType.usage: "https://docs.open-metadata.org/openmetadata/ingestion/workflows/usage",
WorkflowType.INGEST: "https://docs.open-metadata.org/openmetadata/ingestion",
WorkflowType.PROFILE: "https://docs.open-metadata.org/openmetadata/ingestion/workflows/profiler",
WorkflowType.TEST: "https://docs.open-metadata.org/openmetadata/ingestion/workflows/data-quality",
WorkflowType.LINEAGE: "https://docs.open-metadata.org/openmetadata/ingestion/workflows/lineage",
WorkflowType.USAGE: "https://docs.open-metadata.org/openmetadata/ingestion/workflows/usage",
}
DEFAULT_EXAMPLE_FILE = {
WorkflowType.ingest: "bigquery",
WorkflowType.profile: "bigquery_profiler",
WorkflowType.test: "test_suite",
WorkflowType.lineage: "bigquery_lineage",
WorkflowType.usage: "bigquery_usage",
WorkflowType.INGEST: "bigquery",
WorkflowType.PROFILE: "bigquery_profiler",
WorkflowType.TEST: "test_suite",
WorkflowType.LINEAGE: "bigquery_lineage",
WorkflowType.USAGE: "bigquery_usage",
}
@ -85,22 +93,21 @@ def calculate_ingestion_type(source_type_name: str) -> WorkflowType:
Calculates the ingestion type depending on the source type name
"""
if source_type_name.endswith("lineage"):
return WorkflowType.lineage
elif source_type_name.endswith("usage"):
return WorkflowType.usage
return WorkflowType.ingest
return WorkflowType.LINEAGE
if source_type_name.endswith("usage"):
return WorkflowType.USAGE
return WorkflowType.INGEST
def calculate_example_file(source_type_name: str, workflow_type: WorkflowType) -> str:
"""
Calculates the ingestion type depending on the source type name and workflow_type
"""
if workflow_type == WorkflowType.profile:
if workflow_type == WorkflowType.PROFILE:
return f"{source_type_name}_profiler"
elif workflow_type == WorkflowType.test:
if workflow_type == WorkflowType.TEST:
return DEFAULT_EXAMPLE_FILE[workflow_type]
else:
return source_type_name
return source_type_name
def print_file_example(source_type_name: str, workflow_type: WorkflowType):
@ -124,7 +131,7 @@ def print_file_example(source_type_name: str, workflow_type: WorkflowType):
def print_init_error(
exc: Union[Exception, Type[Exception]],
config: dict,
workflow_type: WorkflowType = WorkflowType.ingest,
workflow_type: WorkflowType = WorkflowType.INGEST,
) -> None:
"""
Click echo print a workflow initialization error
@ -139,7 +146,7 @@ def print_init_error(
source_type_name = source_type_name.replace("-", "-")
workflow_type = (
calculate_ingestion_type(source_type_name)
if workflow_type == WorkflowType.ingest
if workflow_type == WorkflowType.INGEST
else workflow_type
)
@ -147,11 +154,9 @@ def print_init_error(
print_error_msg(f"Error loading {workflow_type.name} configuration: {exc}")
print_file_example(source_type_name, workflow_type)
print_more_info(workflow_type)
elif isinstance(exc, ConfigurationError) or isinstance(
exc, InvalidWorkflowException
):
elif isinstance(exc, (ConfigurationError, InvalidWorkflowException)):
print_error_msg(f"Error loading {workflow_type.name} configuration: {exc}")
if workflow_type == WorkflowType.usage:
if workflow_type == WorkflowType.USAGE:
print_file_example(source_type_name, workflow_type)
print_more_info(workflow_type)
else: