mirror of
https://github.com/datahub-project/datahub.git
synced 2025-12-28 18:38:17 +00:00
dev: enable ruff rule (#12742)
This commit is contained in:
parent
79178e50b0
commit
0fc074b50a
@ -42,7 +42,6 @@ extend-ignore = [
|
||||
"RUF015", # unnecessary-iterable-allocation-for-first-element
|
||||
# TODO: Enable these later
|
||||
"B006", # Mutable args
|
||||
"B904", # Checks for raise statements in exception handlers that lack a from clause
|
||||
]
|
||||
|
||||
[tool.ruff.lint.mccabe]
|
||||
|
||||
@ -231,7 +231,7 @@ def _docker_compose_v2() -> List[str]:
|
||||
# docker-compose v1 is not installed either.
|
||||
raise DockerComposeVersionError(
|
||||
"You don't have Docker Compose installed. Please install Docker Compose. See https://docs.docker.com/compose/install/.",
|
||||
)
|
||||
) from None
|
||||
|
||||
|
||||
def _attempt_stop(quickstart_compose_file: List[pathlib.Path]) -> None:
|
||||
|
||||
@ -298,7 +298,7 @@ def search(
|
||||
except KeyError:
|
||||
raise click.UsageError(
|
||||
f"Failed to find a matching query flavor for {flavor}. Valid values are {[x.lower() for x in SearchFlavor._member_names_]}"
|
||||
)
|
||||
) from None
|
||||
catalog = _get_datahub_lite(read_only=True)
|
||||
# sanitize query
|
||||
result_ids = set()
|
||||
|
||||
@ -49,7 +49,7 @@ def _abort_if_non_existent_urn(graph: DataHubGraph, urn: str, operation: str) ->
|
||||
entity_type = parsed_urn.get_type()
|
||||
except Exception:
|
||||
click.secho(f"Provided urn {urn} does not seem valid", fg="red")
|
||||
raise click.Abort()
|
||||
raise click.Abort() from None
|
||||
else:
|
||||
if not graph.exists(urn):
|
||||
click.secho(
|
||||
|
||||
@ -44,7 +44,7 @@ class KafkaConsumerConnectionConfig(_KafkaConnectionConfig):
|
||||
try:
|
||||
value = CallableConsumerConfig(value).callable_config()
|
||||
except Exception as e:
|
||||
raise ConfigurationError(e)
|
||||
raise ConfigurationError(e) from e
|
||||
return value
|
||||
|
||||
|
||||
|
||||
@ -48,12 +48,12 @@ class S3ListIterator(Iterator):
|
||||
def __next__(self) -> FileInfo:
|
||||
try:
|
||||
return next(self._file_statuses)
|
||||
except StopIteration:
|
||||
except StopIteration as e:
|
||||
if self._token:
|
||||
self.fetch()
|
||||
return next(self._file_statuses)
|
||||
else:
|
||||
raise StopIteration()
|
||||
raise e
|
||||
|
||||
def fetch(self):
|
||||
params = dict(Bucket=self._bucket, Prefix=self._prefix, MaxKeys=self._max_keys)
|
||||
|
||||
@ -1685,9 +1685,10 @@ class DataHubGraph(DatahubRestEmitter, EntityVersioningAPI):
|
||||
self,
|
||||
entity_name: str,
|
||||
urns: List[str],
|
||||
aspects: List[str] = [],
|
||||
aspects: Optional[List[str]] = None,
|
||||
with_system_metadata: bool = False,
|
||||
) -> Dict[str, Any]:
|
||||
aspects = aspects or []
|
||||
payload = {
|
||||
"urns": urns,
|
||||
"aspectNames": aspects,
|
||||
|
||||
@ -93,7 +93,7 @@ class EntityVersioningAPI(DataHubGraphProtocol):
|
||||
try:
|
||||
return response["linkAssetVersion"]["urn"]
|
||||
except KeyError:
|
||||
raise ValueError(f"Unexpected response: {response}")
|
||||
raise ValueError(f"Unexpected response: {response}") from None
|
||||
|
||||
def link_asset_to_versioned_asset(
|
||||
self,
|
||||
@ -165,7 +165,7 @@ class EntityVersioningAPI(DataHubGraphProtocol):
|
||||
try:
|
||||
return response["unlinkAssetVersion"]["urn"]
|
||||
except KeyError:
|
||||
raise ValueError(f"Unexpected response: {response}")
|
||||
raise ValueError(f"Unexpected response: {response}") from None
|
||||
|
||||
def unlink_latest_asset_from_version_set(
|
||||
self, version_set_urn: str
|
||||
@ -198,4 +198,4 @@ class EntityVersioningAPI(DataHubGraphProtocol):
|
||||
try:
|
||||
return response["unlinkAssetVersion"]["urn"]
|
||||
except KeyError:
|
||||
raise ValueError(f"Unexpected response: {response}")
|
||||
raise ValueError(f"Unexpected response: {response}") from None
|
||||
|
||||
@ -159,7 +159,8 @@ class CassandraAPI:
|
||||
self.report.failure(message="Failed to authenticate to Cassandra", exc=e)
|
||||
return False
|
||||
|
||||
def get(self, query: str, parameters: Optional[List] = []) -> List:
|
||||
def get(self, query: str, parameters: Optional[List] = None) -> List:
|
||||
parameters = parameters or []
|
||||
if not self._cassandra_session:
|
||||
return []
|
||||
|
||||
|
||||
@ -640,8 +640,8 @@ class CSVEnricherSource(Source):
|
||||
)
|
||||
except Exception as e:
|
||||
raise ConfigurationError(
|
||||
f"Cannot read remote file {self.config.filename}, error:{e}"
|
||||
)
|
||||
f"Cannot read remote file {self.config.filename}: {e}"
|
||||
) from e
|
||||
else:
|
||||
with open(pathlib.Path(self.config.filename), encoding="utf-8-sig") as f:
|
||||
rows = list(csv.DictReader(f, delimiter=self.config.delimiter))
|
||||
|
||||
@ -271,12 +271,12 @@ class DremioAPIOperations:
|
||||
self.cancel_query(job_id)
|
||||
raise DremioAPIException(
|
||||
f"Query execution timed out after {timeout} seconds"
|
||||
)
|
||||
) from None
|
||||
except RuntimeError as e:
|
||||
raise DremioAPIException(f"{str(e)}")
|
||||
raise DremioAPIException() from e
|
||||
|
||||
except requests.RequestException as e:
|
||||
raise DremioAPIException(f"Error executing query: {str(e)}")
|
||||
raise DremioAPIException("Error executing query") from e
|
||||
|
||||
def fetch_results(self, job_id: str) -> List[Dict]:
|
||||
"""Fetch job results with status checking"""
|
||||
|
||||
@ -168,8 +168,9 @@ class DremioAspects:
|
||||
)
|
||||
|
||||
def get_container_urn(
|
||||
self, name: Optional[str] = None, path: Optional[List[str]] = []
|
||||
self, name: Optional[str] = None, path: Optional[List[str]] = None
|
||||
) -> str:
|
||||
path = path or []
|
||||
container_key = self.get_container_key(name, path)
|
||||
return container_key.as_urn()
|
||||
|
||||
|
||||
@ -130,8 +130,9 @@ class DatahubExecutionRequestCleanup:
|
||||
)
|
||||
|
||||
def _scroll_execution_requests(
|
||||
self, overrides: Dict[str, Any] = {}
|
||||
self, overrides: Optional[Dict[str, Any]] = None
|
||||
) -> Iterator[CleanupRecord]:
|
||||
overrides = overrides or {}
|
||||
headers: Dict[str, Any] = {
|
||||
"Accept": "application/json",
|
||||
"Content-Type": "application/json",
|
||||
|
||||
@ -272,7 +272,7 @@ class KafkaSource(StatefulIngestionSourceBase, TestableSource):
|
||||
return schema_registry_class.create(config, report)
|
||||
except Exception as e:
|
||||
logger.debug(e, exc_info=e)
|
||||
raise ImportError(config.schema_registry_class)
|
||||
raise ImportError(config.schema_registry_class) from e
|
||||
|
||||
def __init__(self, config: KafkaSourceConfig, ctx: PipelineContext):
|
||||
super().__init__(config, ctx)
|
||||
|
||||
@ -33,14 +33,14 @@ class LookerViewFileLoader:
|
||||
base_projects_folder: Dict[str, pathlib.Path],
|
||||
reporter: LookMLSourceReport,
|
||||
source_config: LookMLSourceConfig,
|
||||
manifest_constants: Dict[str, LookerConstant] = {},
|
||||
manifest_constants: Optional[Dict[str, LookerConstant]] = None,
|
||||
) -> None:
|
||||
self.viewfile_cache: Dict[str, Optional[LookerViewFile]] = {}
|
||||
self._root_project_name = root_project_name
|
||||
self._base_projects_folder = base_projects_folder
|
||||
self.reporter = reporter
|
||||
self.source_config = source_config
|
||||
self.manifest_constants = manifest_constants
|
||||
self.manifest_constants = manifest_constants or {}
|
||||
|
||||
def _load_viewfile(
|
||||
self, project_name: str, path: str, reporter: LookMLSourceReport
|
||||
|
||||
@ -501,7 +501,7 @@ class LookMLSource(StatefulIngestionSourceBase):
|
||||
raise ValueError(
|
||||
f"Could not locate a project name for model {model_name}. Consider configuring a static project name "
|
||||
f"in your config file"
|
||||
)
|
||||
) from None
|
||||
|
||||
def get_manifest_if_present(self, folder: pathlib.Path) -> Optional[LookerManifest]:
|
||||
manifest_file = folder / "manifest.lkml"
|
||||
|
||||
@ -1494,7 +1494,7 @@ class ModeSource(StatefulIngestionSourceBase):
|
||||
sleep_time = error_response.headers.get("retry-after")
|
||||
if sleep_time is not None:
|
||||
time.sleep(float(sleep_time))
|
||||
raise HTTPError429
|
||||
raise HTTPError429 from None
|
||||
|
||||
raise http_error
|
||||
|
||||
|
||||
@ -230,8 +230,8 @@ class PulsarSource(StatefulIngestionSourceBase):
|
||||
self.report.report_warning("HTTPError", message)
|
||||
except requests.exceptions.RequestException as e:
|
||||
raise Exception(
|
||||
f"An ambiguous exception occurred while handling the request: {e}"
|
||||
)
|
||||
"An ambiguous exception occurred while handling the request"
|
||||
) from e
|
||||
|
||||
@classmethod
|
||||
def create(cls, config_dict, ctx):
|
||||
|
||||
@ -124,7 +124,7 @@ class SigmaSource(StatefulIngestionSourceBase, TestableSource):
|
||||
try:
|
||||
self.sigma_api = SigmaAPI(self.config, self.reporter)
|
||||
except Exception as e:
|
||||
raise ConfigurationError(f"Unable to connect sigma API. Exception: {e}")
|
||||
raise ConfigurationError("Unable to connect sigma API") from e
|
||||
|
||||
@staticmethod
|
||||
def test_connection(config_dict: dict) -> TestConnectionReport:
|
||||
|
||||
@ -312,7 +312,7 @@ class SnowflakeConnectionConfig(ConfigModel):
|
||||
raise ValueError(
|
||||
f"access_token not found in response {response}. "
|
||||
"Please check your OAuth configuration."
|
||||
)
|
||||
) from None
|
||||
connect_args = self.get_options()["connect_args"]
|
||||
return snowflake.connector.connect(
|
||||
user=self.username,
|
||||
|
||||
@ -1562,8 +1562,9 @@ class TableauSiteSource:
|
||||
query: str,
|
||||
connection_type: str,
|
||||
page_size: int,
|
||||
query_filter: dict = {},
|
||||
query_filter: Optional[dict] = None,
|
||||
) -> Iterable[dict]:
|
||||
query_filter = query_filter or {}
|
||||
query_filter = optimize_query_filter(query_filter)
|
||||
|
||||
# Calls the get_connection_object_page function to get the objects,
|
||||
|
||||
@ -514,7 +514,8 @@ FIELD_TYPE_MAPPING = {
|
||||
}
|
||||
|
||||
|
||||
def get_tags_from_params(params: List[str] = []) -> GlobalTagsClass:
|
||||
def get_tags_from_params(params: Optional[List[str]] = None) -> GlobalTagsClass:
|
||||
params = params or []
|
||||
tags = [
|
||||
TagAssociationClass(tag=builder.make_tag_urn(tag.upper()))
|
||||
for tag in params
|
||||
|
||||
@ -284,9 +284,10 @@ class DuckDBLite(DataHubLiteLocal[DuckDBLiteConfig]):
|
||||
self,
|
||||
query: str,
|
||||
flavor: SearchFlavor,
|
||||
aspects: List[str] = [],
|
||||
aspects: Optional[List[str]] = None,
|
||||
snippet: bool = True,
|
||||
) -> Iterable[Searchable]:
|
||||
aspects = aspects or []
|
||||
if flavor == SearchFlavor.FREE_TEXT:
|
||||
base_query = f"SELECT distinct(urn), 'urn', NULL from metadata_aspect_v2 where urn ILIKE '%{query}%' UNION SELECT urn, aspect_name, metadata from metadata_aspect_v2 where metadata->>'$.name' ILIKE '%{query}%'"
|
||||
for r in self.duckdb_client.execute(base_query).fetchall():
|
||||
|
||||
@ -90,7 +90,7 @@ class DataHubLiteLocal(Generic[LiteConfig], Closeable, metaclass=ABCMeta):
|
||||
self,
|
||||
query: str,
|
||||
flavor: SearchFlavor,
|
||||
aspects: List[str] = [],
|
||||
aspects: Optional[List[str]] = None,
|
||||
snippet: bool = True,
|
||||
) -> Iterable[Searchable]:
|
||||
pass
|
||||
|
||||
@ -70,9 +70,10 @@ class DataHubLiteWrapper(DataHubLiteLocal):
|
||||
self,
|
||||
query: str,
|
||||
flavor: SearchFlavor,
|
||||
aspects: List[str] = [],
|
||||
aspects: Optional[List[str]] = None,
|
||||
snippet: bool = True,
|
||||
) -> Iterable[Searchable]:
|
||||
aspects = aspects or []
|
||||
yield from self.lite.search(query, flavor, aspects, snippet)
|
||||
|
||||
def ls(self, path: str) -> List[Browseable]:
|
||||
@ -96,10 +97,10 @@ def get_datahub_lite(config_dict: dict, read_only: bool = False) -> "DataHubLite
|
||||
lite_type = lite_local_config.type
|
||||
try:
|
||||
lite_class = lite_registry.get(lite_type)
|
||||
except KeyError:
|
||||
except KeyError as e:
|
||||
raise Exception(
|
||||
f"Failed to find a registered lite implementation for {lite_type}. Valid values are {[k for k in lite_registry.mapping.keys()]}"
|
||||
)
|
||||
) from e
|
||||
|
||||
lite_specific_config = lite_class.get_config_class().parse_obj(
|
||||
lite_local_config.config
|
||||
|
||||
@ -1,10 +1,10 @@
|
||||
from collections import deque
|
||||
from itertools import chain
|
||||
from sys import getsizeof
|
||||
from typing import Any, Iterator
|
||||
from typing import Any, Iterator, Optional
|
||||
|
||||
|
||||
def total_size(o: Any, handlers: Any = {}) -> int:
|
||||
def total_size(o: Any, handlers: Optional[Any] = None) -> int:
|
||||
"""Returns the approximate memory footprint an object and all of its contents.
|
||||
Automatically finds the contents of the following builtin containers and
|
||||
their subclasses: tuple, list, deque, dict, set and frozenset.
|
||||
@ -14,6 +14,7 @@ def total_size(o: Any, handlers: Any = {}) -> int:
|
||||
|
||||
Based on https://github.com/ActiveState/recipe-577504-compute-mem-footprint/blob/master/recipe.py
|
||||
"""
|
||||
handlers = handlers or {}
|
||||
|
||||
def dict_handler(d: dict) -> Iterator[Any]:
|
||||
return chain.from_iterable(d.items())
|
||||
|
||||
@ -54,7 +54,8 @@ def random_email():
|
||||
)
|
||||
|
||||
|
||||
def recipe(mcp_output_path: str, source_config_override: dict = {}) -> dict:
|
||||
def recipe(mcp_output_path: str, source_config_override: Optional[dict] = None) -> dict:
|
||||
source_config_override = source_config_override or {}
|
||||
return {
|
||||
"source": {
|
||||
"type": "bigquery",
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user