mirror of
https://github.com/datahub-project/datahub.git
synced 2025-12-27 09:58:14 +00:00
chore(ingest): enable flake8 bugbear linting (#7763)
This commit is contained in:
parent
f563695bc6
commit
e99875cac6
@ -8,7 +8,18 @@ ignore =
|
||||
# See https://stackoverflow.com/a/57074416.
|
||||
W503,
|
||||
# See https://github.com/psf/black/issues/315.
|
||||
E203
|
||||
E203,
|
||||
# Allow usages of functools.lru_cache.
|
||||
B019,
|
||||
# This rule flags the use of function calls in argument defaults.
|
||||
# There's some good reasons to do this, so we're ok with it.
|
||||
B008,
|
||||
# TODO: However, we should enable B006 to catch issues with mutable args.
|
||||
B006,
|
||||
# TODO: Enable B007 - unused loop control variable.
|
||||
B007
|
||||
# TODO: Enable B902 - require self/cls naming.
|
||||
# TODO: Enable B904 - use raise from in except clauses.
|
||||
exclude =
|
||||
.git,
|
||||
src/datahub/metadata,
|
||||
@ -24,10 +35,10 @@ ban-relative-imports = true
|
||||
plugins =
|
||||
./tests/test_helpers/sqlalchemy_mypy_plugin.py,
|
||||
pydantic.mypy
|
||||
exclude = ^(venv|build|dist)/
|
||||
exclude = ^(venv/|build/|dist/|examples/transforms/setup.py)
|
||||
ignore_missing_imports = yes
|
||||
namespace_packages = no
|
||||
implicit_optional = no
|
||||
# implicit_optional = no
|
||||
strict_optional = yes
|
||||
check_untyped_defs = yes
|
||||
disallow_incomplete_defs = yes
|
||||
|
||||
@ -407,8 +407,9 @@ base_dev_requirements = {
|
||||
# We should make an effort to keep it up to date.
|
||||
"black==22.12.0",
|
||||
"coverage>=5.1",
|
||||
"flake8>=3.8.3",
|
||||
"flake8>=3.8.3", # DEPRECATION: Once we drop Python 3.7, we can pin to 6.x.
|
||||
"flake8-tidy-imports>=4.3.0",
|
||||
"flake8-bugbear==23.3.12",
|
||||
"isort>=5.7.0",
|
||||
"mypy==1.0.0",
|
||||
# pydantic 1.8.2 is incompatible with mypy 0.910.
|
||||
|
||||
@ -21,4 +21,5 @@ if sys.version_info < (3, 7):
|
||||
"DataHub requires Python 3.7 or newer. "
|
||||
"Please upgrade your Python version to continue using DataHub.",
|
||||
FutureWarning,
|
||||
stacklevel=2,
|
||||
)
|
||||
|
||||
@ -653,7 +653,7 @@ def get_latest_timeseries_aspect_values(
|
||||
|
||||
def get_aspects_for_entity(
|
||||
entity_urn: str,
|
||||
aspects: List[str] = [],
|
||||
aspects: List[str],
|
||||
typed: bool = False,
|
||||
cached_session_host: Optional[Tuple[Session, str]] = None,
|
||||
) -> Dict[str, Union[dict, _Aspect]]:
|
||||
|
||||
@ -2,7 +2,7 @@ import logging
|
||||
import os
|
||||
import platform
|
||||
import sys
|
||||
from typing import Optional
|
||||
from typing import ContextManager, Optional
|
||||
|
||||
import click
|
||||
|
||||
@ -31,7 +31,7 @@ from datahub.utilities.logging_manager import configure_logging
|
||||
from datahub.utilities.server_config_util import get_gms_config
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
_logging_configured = None
|
||||
_logging_configured: Optional[ContextManager] = None
|
||||
|
||||
MAX_CONTENT_WIDTH = 120
|
||||
|
||||
@ -97,8 +97,11 @@ def datahub(
|
||||
# clean it up before those error handlers are processed.
|
||||
# So why is this ok? Because we're leaking a context manager, this will
|
||||
# still get cleaned up automatically when the memory is reclaimed, which is
|
||||
# worse-case at program exit.
|
||||
# worse-case at program exit. In a slightly better case, the context manager's
|
||||
# exit call will be triggered by the finally clause of the main() function.
|
||||
global _logging_configured
|
||||
if _logging_configured is not None:
|
||||
_logging_configured.__exit__(None, None, None)
|
||||
_logging_configured = None # see if we can force python to GC this
|
||||
_logging_configured = configure_logging(debug=debug, log_file=log_file)
|
||||
_logging_configured.__enter__()
|
||||
@ -201,3 +204,8 @@ def main(**kwargs):
|
||||
if gms_config:
|
||||
logger.debug(f"GMS config {gms_config}")
|
||||
sys.exit(1)
|
||||
finally:
|
||||
global _logging_configured
|
||||
if _logging_configured:
|
||||
_logging_configured.__exit__(None, None, None)
|
||||
_logging_configured = None
|
||||
|
||||
@ -4,6 +4,7 @@ from typing import TYPE_CHECKING, Dict, Generic, Iterable, Optional, Tuple, Type
|
||||
|
||||
import requests
|
||||
|
||||
from datahub.configuration.common import ConfigurationError
|
||||
from datahub.emitter.mce_builder import set_dataset_urn_to_lower
|
||||
from datahub.ingestion.api.committable import Committable
|
||||
from datahub.ingestion.graph.client import DatahubClientConfig, DataHubGraph
|
||||
@ -59,7 +60,7 @@ class PipelineContext:
|
||||
self.checkpointers: Dict[str, Committable] = {}
|
||||
try:
|
||||
self.graph = DataHubGraph(datahub_api) if datahub_api is not None else None
|
||||
except requests.exceptions.ConnectionError as e:
|
||||
except (requests.exceptions.ConnectionError, ConfigurationError) as e:
|
||||
raise Exception("Failed to connect to DataHub") from e
|
||||
except Exception as e:
|
||||
raise Exception(
|
||||
|
||||
@ -15,12 +15,12 @@ def config_class(config_cls: Type) -> Callable[[Type], Type]:
|
||||
|
||||
def wrapper(cls: Type) -> Type:
|
||||
# add a get_config_class method
|
||||
setattr(cls, "get_config_class", lambda: config_cls)
|
||||
cls.get_config_class = lambda: config_cls
|
||||
if not hasattr(cls, "create") or (
|
||||
getattr(cls, "create").__func__ == getattr(Source, "create").__func__
|
||||
cls.create.__func__ == Source.create.__func__ # type: ignore
|
||||
):
|
||||
# add the create method only if it has not been overridden from the base Source.create method
|
||||
setattr(cls, "create", classmethod(default_create))
|
||||
cls.create = classmethod(default_create)
|
||||
|
||||
return cls
|
||||
|
||||
@ -33,13 +33,9 @@ def platform_name(
|
||||
"""Adds a get_platform_name method to the decorated class"""
|
||||
|
||||
def wrapper(cls: Type) -> Type:
|
||||
setattr(cls, "get_platform_name", lambda: platform_name)
|
||||
setattr(
|
||||
cls,
|
||||
"get_platform_id",
|
||||
lambda: id or platform_name.lower().replace(" ", "-"),
|
||||
)
|
||||
setattr(cls, "get_platform_doc_order", lambda: doc_order or None)
|
||||
cls.get_platform_name = lambda: platform_name
|
||||
cls.get_platform_id = lambda: id or platform_name.lower().replace(" ", "-")
|
||||
cls.get_platform_doc_order = lambda: doc_order or None
|
||||
|
||||
return cls
|
||||
|
||||
@ -76,7 +72,7 @@ def support_status(
|
||||
"""Adds a get_support_status method to the decorated class"""
|
||||
|
||||
def wrapper(cls: Type) -> Type:
|
||||
setattr(cls, "get_support_status", lambda: support_status)
|
||||
cls.get_support_status = lambda: support_status
|
||||
return cls
|
||||
|
||||
return wrapper
|
||||
@ -98,8 +94,8 @@ def capability(
|
||||
|
||||
def wrapper(cls: Type) -> Type:
|
||||
if not hasattr(cls, "__capabilities"):
|
||||
setattr(cls, "__capabilities", {})
|
||||
setattr(cls, "get_capabilities", lambda: cls.__capabilities.values())
|
||||
cls.__capabilities = {}
|
||||
cls.get_capabilities = lambda: cls.__capabilities.values()
|
||||
|
||||
cls.__capabilities[capability_name] = CapabilitySetting(
|
||||
capability=capability_name, description=description, supported=supported
|
||||
|
||||
@ -36,7 +36,10 @@ from datahub.ingestion.source.source_registry import source_registry
|
||||
from datahub.ingestion.transformer.transform_registry import transform_registry
|
||||
from datahub.metadata.schema_classes import MetadataChangeProposalClass
|
||||
from datahub.telemetry import stats, telemetry
|
||||
from datahub.utilities.global_warning_util import get_global_warnings
|
||||
from datahub.utilities.global_warning_util import (
|
||||
clear_global_warnings,
|
||||
get_global_warnings,
|
||||
)
|
||||
from datahub.utilities.lossy_collections import LossyDict, LossyList
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@ -389,6 +392,8 @@ class Pipeline:
|
||||
logger.error("Caught error", exc_info=e)
|
||||
raise
|
||||
finally:
|
||||
clear_global_warnings()
|
||||
|
||||
if callback and hasattr(callback, "close"):
|
||||
callback.close() # type: ignore
|
||||
|
||||
|
||||
@ -199,9 +199,12 @@ class JobProcessor:
|
||||
self,
|
||||
model_data_url: str,
|
||||
job_key: JobKey,
|
||||
metrics: Dict[str, Any] = {},
|
||||
hyperparameters: Dict[str, Any] = {},
|
||||
metrics: Optional[Dict[str, Any]] = None,
|
||||
hyperparameters: Optional[Dict[str, Any]] = None,
|
||||
) -> None:
|
||||
metrics = metrics or {}
|
||||
hyperparameters = hyperparameters or {}
|
||||
|
||||
model_jobs = self.model_image_to_jobs[model_data_url]
|
||||
|
||||
# if model doesn't have job yet, init
|
||||
@ -215,9 +218,12 @@ class JobProcessor:
|
||||
self,
|
||||
model_name: str,
|
||||
job_key: JobKey,
|
||||
metrics: Dict[str, Any] = {},
|
||||
hyperparameters: Dict[str, Any] = {},
|
||||
metrics: Optional[Dict[str, Any]] = None,
|
||||
hyperparameters: Optional[Dict[str, Any]] = None,
|
||||
) -> None:
|
||||
metrics = metrics or {}
|
||||
hyperparameters = hyperparameters or {}
|
||||
|
||||
model_jobs = self.model_name_to_jobs[model_name]
|
||||
|
||||
# if model doesn't have job yet, init
|
||||
@ -845,7 +851,7 @@ class JobProcessor:
|
||||
"channel_name": config.get("ChannelName"),
|
||||
}
|
||||
|
||||
output_s3_uri = job.get("OutputDataConfig", {}).get("S3OutputPath")
|
||||
output_data_s3_uri = job.get("OutputDataConfig", {}).get("S3OutputPath")
|
||||
checkpoint_s3_uri = job.get("CheckpointConfig", {}).get("S3Uri")
|
||||
debug_s3_path = job.get("DebugHookConfig", {}).get("S3OutputPath")
|
||||
tensorboard_output_path = job.get("TensorBoardOutputConfig", {}).get(
|
||||
@ -866,7 +872,7 @@ class JobProcessor:
|
||||
|
||||
# process all output datasets at once
|
||||
for output_s3_uri in [
|
||||
output_s3_uri,
|
||||
output_data_s3_uri,
|
||||
checkpoint_s3_uri,
|
||||
debug_s3_path,
|
||||
tensorboard_output_path,
|
||||
|
||||
@ -142,7 +142,8 @@ class NamingPattern(ConfigModel):
|
||||
|
||||
def replace_variables(self, values: Union[Dict[str, Optional[str]], object]) -> str:
|
||||
if not isinstance(values, dict):
|
||||
assert dataclasses.is_dataclass(values)
|
||||
# Check that this is a dataclass instance (not a dataclass type).
|
||||
assert dataclasses.is_dataclass(values) and not isinstance(values, type)
|
||||
values = dataclasses.asdict(values)
|
||||
values = {k: v for k, v in values.items() if v is not None}
|
||||
return self.pattern.format(**values)
|
||||
|
||||
@ -1,6 +1,6 @@
|
||||
import json
|
||||
import logging
|
||||
import re
|
||||
import warnings
|
||||
from typing import Any, Dict, Generator, List, Optional, Tuple
|
||||
|
||||
import requests
|
||||
@ -14,6 +14,8 @@ from datahub.metadata.com.linkedin.pegasus2avro.schema import (
|
||||
)
|
||||
from datahub.metadata.schema_classes import SchemaFieldDataTypeClass, StringTypeClass
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def flatten(d: dict, prefix: str = "") -> Generator:
|
||||
for k, v in d.items():
|
||||
@ -162,7 +164,7 @@ def get_endpoints(sw_dict: dict) -> dict: # noqa: C901
|
||||
ex_field
|
||||
][0]
|
||||
else:
|
||||
warnings.warn(
|
||||
logger.warning(
|
||||
f"Field in swagger file does not give consistent data --- {p_k}"
|
||||
)
|
||||
elif "text/csv" in res_cont.keys():
|
||||
@ -296,12 +298,12 @@ def extract_fields(
|
||||
dict_data = json.loads(response.content)
|
||||
if isinstance(dict_data, str):
|
||||
# no sense
|
||||
warnings.warn(f"Empty data --- {dataset_name}")
|
||||
logger.warning(f"Empty data --- {dataset_name}")
|
||||
return [], {}
|
||||
elif isinstance(dict_data, list):
|
||||
# it's maybe just a list
|
||||
if len(dict_data) == 0:
|
||||
warnings.warn(f"Empty data --- {dataset_name}")
|
||||
logger.warning(f"Empty data --- {dataset_name}")
|
||||
return [], {}
|
||||
# so we take the fields of the first element,
|
||||
# if it's a dict
|
||||
@ -330,7 +332,7 @@ def extract_fields(
|
||||
else:
|
||||
return [], {} # it's empty!
|
||||
else:
|
||||
warnings.warn(f"Unable to get the attributes --- {dataset_name}")
|
||||
logger.warning(f"Unable to get the attributes --- {dataset_name}")
|
||||
return [], {}
|
||||
|
||||
|
||||
|
||||
@ -326,8 +326,8 @@ class SnowflakeV2Source(
|
||||
else:
|
||||
test_report.internal_failure = True
|
||||
test_report.internal_failure_reason = f"{e}"
|
||||
finally:
|
||||
return test_report
|
||||
|
||||
return test_report
|
||||
|
||||
@staticmethod
|
||||
def check_capabilities(
|
||||
|
||||
@ -14,6 +14,7 @@ source_registry.register_alias(
|
||||
lambda: warnings.warn(
|
||||
"source type snowflake-beta is deprecated, use snowflake instead",
|
||||
ConfigurationWarning,
|
||||
stacklevel=3,
|
||||
),
|
||||
)
|
||||
source_registry.register_alias(
|
||||
@ -22,6 +23,7 @@ source_registry.register_alias(
|
||||
lambda: warnings.warn(
|
||||
"source type bigquery-beta is deprecated, use bigquery instead",
|
||||
ConfigurationWarning,
|
||||
stacklevel=3,
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
@ -44,7 +44,7 @@ class DruidConfig(BasicSQLAlchemyConfig):
|
||||
|
||||
@platform_name("Druid")
|
||||
@config_class(DruidConfig)
|
||||
@support_status(SupportStatus.CERTIFIED)
|
||||
@support_status(SupportStatus.INCUBATING)
|
||||
@capability(SourceCapability.PLATFORM_INSTANCE, "Enabled by default")
|
||||
class DruidSource(SQLAlchemySource):
|
||||
"""
|
||||
|
||||
@ -699,10 +699,11 @@ class RedshiftSource(SQLAlchemySource):
|
||||
return sources
|
||||
|
||||
def get_db_name(self, inspector: Optional[Inspector] = None) -> str:
|
||||
db_name = getattr(self.config, "database")
|
||||
db_alias = getattr(self.config, "database_alias")
|
||||
db_name = self.config.database
|
||||
db_alias = self.config.database_alias
|
||||
if db_alias:
|
||||
db_name = db_alias
|
||||
assert db_name
|
||||
return db_name
|
||||
|
||||
def _get_s3_path(self, path: str) -> str:
|
||||
|
||||
@ -292,7 +292,7 @@ def get_schema_metadata(
|
||||
columns: List[dict],
|
||||
pk_constraints: Optional[dict] = None,
|
||||
foreign_keys: Optional[List[ForeignKeyConstraint]] = None,
|
||||
canonical_schema: List[SchemaField] = [],
|
||||
canonical_schema: Optional[List[SchemaField]] = None,
|
||||
) -> SchemaMetadata:
|
||||
schema_metadata = SchemaMetadata(
|
||||
schemaName=dataset_name,
|
||||
@ -300,7 +300,7 @@ def get_schema_metadata(
|
||||
version=0,
|
||||
hash="",
|
||||
platformSchema=MySqlDDL(tableSchema=""),
|
||||
fields=canonical_schema,
|
||||
fields=canonical_schema or [],
|
||||
)
|
||||
if foreign_keys is not None and foreign_keys != []:
|
||||
schema_metadata.foreignKeys = foreign_keys
|
||||
|
||||
@ -1,6 +1,6 @@
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime
|
||||
from typing import Generic, Optional, TypeVar
|
||||
from typing import Optional
|
||||
|
||||
from pydantic.fields import Field
|
||||
|
||||
@ -26,11 +26,8 @@ class BaseColumn:
|
||||
comment: Optional[str]
|
||||
|
||||
|
||||
SqlTableColumn = TypeVar("SqlTableColumn", bound="BaseColumn")
|
||||
|
||||
|
||||
@dataclass
|
||||
class BaseTable(Generic[SqlTableColumn]):
|
||||
class BaseTable:
|
||||
name: str
|
||||
comment: Optional[str]
|
||||
created: datetime
|
||||
@ -42,7 +39,7 @@ class BaseTable(Generic[SqlTableColumn]):
|
||||
|
||||
|
||||
@dataclass
|
||||
class BaseView(Generic[SqlTableColumn]):
|
||||
class BaseView:
|
||||
name: str
|
||||
comment: Optional[str]
|
||||
created: Optional[datetime]
|
||||
|
||||
@ -9,3 +9,7 @@ def add_global_warning(warn: str) -> None:
|
||||
|
||||
def get_global_warnings() -> List:
|
||||
return global_warnings
|
||||
|
||||
|
||||
def clear_global_warnings() -> None:
|
||||
global_warnings.clear()
|
||||
|
||||
@ -11,7 +11,7 @@ class DomainRegistry:
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
cached_domains: Optional[List[str]] = [],
|
||||
cached_domains: Optional[List[str]] = None,
|
||||
graph: Optional[DataHubGraph] = None,
|
||||
):
|
||||
self.domain_registry = {}
|
||||
|
||||
@ -2,10 +2,9 @@ import contextlib
|
||||
import logging
|
||||
import multiprocessing
|
||||
import re
|
||||
import sys
|
||||
import traceback
|
||||
from multiprocessing import Process, Queue
|
||||
from typing import Any, List, Optional, Tuple, Type
|
||||
from typing import Any, List, Optional, Tuple
|
||||
|
||||
from datahub.utilities.sql_lineage_parser_impl import SqlLineageSQLParserImpl
|
||||
from datahub.utilities.sql_parser_base import SQLParser
|
||||
@ -80,24 +79,23 @@ def sql_lineage_parser_impl_func_wrapper(
|
||||
:param use_raw_names: Parameter used to ignore sqllineage's default lowercasing.
|
||||
:return: None.
|
||||
"""
|
||||
exception_details: Optional[Tuple[Optional[Type[BaseException]], str]] = None
|
||||
exception_details: Optional[Tuple[BaseException, str]] = None
|
||||
tables: List[str] = []
|
||||
columns: List[str] = []
|
||||
try:
|
||||
parser = SqlLineageSQLParserImpl(sql_query, use_raw_names)
|
||||
tables = parser.get_tables()
|
||||
columns = parser.get_columns()
|
||||
except BaseException:
|
||||
exc_info = sys.exc_info()
|
||||
exc_msg: str = str(exc_info[1]) + "".join(traceback.format_tb(exc_info[2]))
|
||||
exception_details = (exc_info[0], exc_msg)
|
||||
except BaseException as e:
|
||||
exc_msg = traceback.format_exc()
|
||||
exception_details = (e, exc_msg)
|
||||
logger.debug(exc_msg)
|
||||
finally:
|
||||
if queue is not None:
|
||||
queue.put((tables, columns, exception_details))
|
||||
return None
|
||||
else:
|
||||
return (tables, columns, exception_details)
|
||||
|
||||
if queue is not None:
|
||||
queue.put((tables, columns, exception_details))
|
||||
return None
|
||||
else:
|
||||
return (tables, columns, exception_details)
|
||||
|
||||
|
||||
class SqlLineageSQLParser(SQLParser):
|
||||
|
||||
@ -335,7 +335,8 @@ def _wrap_task_policy(policy):
|
||||
policy(task)
|
||||
task_policy(task)
|
||||
|
||||
setattr(custom_task_policy, "_task_policy_patched_by", "datahub_plugin")
|
||||
# Add a flag to the policy to indicate that we've patched it.
|
||||
custom_task_policy._task_policy_patched_by = "datahub_plugin" # type: ignore[attr-defined]
|
||||
return custom_task_policy
|
||||
|
||||
|
||||
|
||||
@ -78,9 +78,7 @@ class AirflowGenerator:
|
||||
x for x in dag.parent_dag.task_dict.values() if x.subdag is not None
|
||||
]
|
||||
matched_subdags = [
|
||||
x
|
||||
for x in subdags
|
||||
if getattr(getattr(x, "subdag"), "dag_id") == dag.dag_id
|
||||
x for x in subdags if x.subdag and x.subdag.dag_id == dag.dag_id
|
||||
]
|
||||
|
||||
# id of the task containing the subdag
|
||||
|
||||
@ -51,10 +51,11 @@ def test_base_url_guessing():
|
||||
url_template="https://gitea.com/gitea/tea/src/branch/{branch}/{file_path}",
|
||||
repo_ssh_locator="https://gitea.com/gitea/tea.git",
|
||||
)
|
||||
config.get_url_for_file_path(
|
||||
"cmd/admin.go"
|
||||
) == "https://gitea.com/gitea/tea/src/branch/main/cmd/admin.go"
|
||||
config.repo_ssh_locator == "https://gitea.com/gitea/tea.git"
|
||||
assert (
|
||||
config.get_url_for_file_path("cmd/admin.go")
|
||||
== "https://gitea.com/gitea/tea/src/branch/main/cmd/admin.go"
|
||||
)
|
||||
assert config.repo_ssh_locator == "https://gitea.com/gitea/tea.git"
|
||||
|
||||
# Deprecated: base_url.
|
||||
with pytest.warns(ConfigurationWarning, match="base_url is deprecated"):
|
||||
|
||||
@ -13,7 +13,7 @@ services:
|
||||
- mysqldb
|
||||
- mongo
|
||||
ports:
|
||||
- "28083:58083"
|
||||
- "28083:28083"
|
||||
# volumes:
|
||||
# - ./../kafka-connect/setup/confluentinc-kafka-connect-jdbc-10.2.5:/usr/local/share/kafka/plugins/confluentinc-kafka-connect-jdbc-10.2.5
|
||||
# - ./../kafka-connect/setup/confluentinc-connect-transforms-1.4.1:/usr/local/share/kafka/plugins/confluentinc-connect-transforms-1.4.1
|
||||
|
||||
@ -1,5 +1,5 @@
|
||||
CONNECT_BOOTSTRAP_SERVERS=test_broker:9092
|
||||
CONNECT_REST_PORT=58083
|
||||
CONNECT_REST_PORT=28083
|
||||
CONNECT_GROUP_ID=kafka-connect
|
||||
CONNECT_CONFIG_STORAGE_TOPIC=_connect-configs
|
||||
CONNECT_OFFSET_STORAGE_TOPIC=_connect-offsets
|
||||
|
||||
@ -63,7 +63,7 @@ def kafka_connect_runner(docker_compose_runner, pytestconfig, test_resources_dir
|
||||
# calls to the docker_compose_runner and the first one sets cleanup=False.
|
||||
|
||||
wait_for_port(docker_services, "test_broker", 29092, timeout=120)
|
||||
wait_for_port(docker_services, "test_connect", 58083, timeout=120)
|
||||
wait_for_port(docker_services, "test_connect", 28083, timeout=120)
|
||||
docker_services.wait_until_responsive(
|
||||
timeout=30,
|
||||
pause=1,
|
||||
|
||||
@ -1,4 +1,5 @@
|
||||
import json
|
||||
import pathlib
|
||||
from unittest.mock import patch
|
||||
|
||||
from freezegun import freeze_time
|
||||
@ -25,7 +26,7 @@ JSON_RESPONSE_MAP = {
|
||||
|
||||
RESPONSE_ERROR_LIST = ["http://localhost:3000/api/dashboard"]
|
||||
|
||||
test_resources_dir = None
|
||||
test_resources_dir = pathlib.Path(__file__).parent
|
||||
|
||||
|
||||
class MockResponse:
|
||||
@ -134,9 +135,6 @@ def test_mode_ingest_failure(pytestconfig, tmp_path):
|
||||
"datahub.ingestion.source.metabase.requests.delete",
|
||||
side_effect=mocked_requests_session_delete,
|
||||
):
|
||||
global test_resources_dir
|
||||
test_resources_dir = pytestconfig.rootpath / "tests/integration/metabase"
|
||||
|
||||
pipeline = Pipeline.create(
|
||||
{
|
||||
"run_id": "metabase-test",
|
||||
|
||||
@ -1,4 +1,5 @@
|
||||
import json
|
||||
import pathlib
|
||||
from unittest.mock import patch
|
||||
|
||||
from freezegun import freeze_time
|
||||
@ -24,7 +25,7 @@ JSON_RESPONSE_MAP = {
|
||||
|
||||
RESPONSE_ERROR_LIST = ["https://app.mode.com/api/acryl/spaces/75737b70402e/reports"]
|
||||
|
||||
test_resources_dir = None
|
||||
test_resources_dir = pathlib.Path(__file__).parent
|
||||
|
||||
|
||||
class MockResponse:
|
||||
@ -71,9 +72,6 @@ def test_mode_ingest_success(pytestconfig, tmp_path):
|
||||
"datahub.ingestion.source.mode.requests.session",
|
||||
side_effect=mocked_requests_sucess,
|
||||
):
|
||||
global test_resources_dir
|
||||
test_resources_dir = pytestconfig.rootpath / "tests/integration/mode"
|
||||
|
||||
pipeline = Pipeline.create(
|
||||
{
|
||||
"run_id": "mode-test",
|
||||
|
||||
@ -54,7 +54,7 @@ def s3_populate(pytestconfig, s3_resource, s3_client, bucket_names):
|
||||
test_resources_dir = (
|
||||
pytestconfig.rootpath / "tests/integration/s3/test_data/local_system/"
|
||||
)
|
||||
for root, dirs, files in os.walk(test_resources_dir):
|
||||
for root, _dirs, files in os.walk(test_resources_dir):
|
||||
for file in files:
|
||||
full_path = os.path.join(root, file)
|
||||
rel_path = os.path.relpath(full_path, test_resources_dir)
|
||||
|
||||
@ -9,7 +9,7 @@ from tests.test_helpers import mce_helpers
|
||||
|
||||
FROZEN_TIME = "2022-05-12 11:00:00"
|
||||
|
||||
test_resources_dir = None
|
||||
test_resources_dir = pathlib.Path(__file__).parent
|
||||
|
||||
|
||||
def _read_response(file_name: str) -> dict:
|
||||
@ -57,11 +57,6 @@ def side_effect_call_salesforce(type, url):
|
||||
|
||||
@freeze_time(FROZEN_TIME)
|
||||
def test_salesforce_ingest(pytestconfig, tmp_path):
|
||||
global test_resources_dir
|
||||
test_resources_dir = pathlib.Path(
|
||||
pytestconfig.rootpath / "tests/integration/salesforce"
|
||||
)
|
||||
|
||||
with mock.patch("simple_salesforce.Salesforce") as mock_sdk:
|
||||
mock_sf = mock.Mock()
|
||||
mocked_call = mock.Mock()
|
||||
|
||||
@ -29,7 +29,7 @@ FROZEN_TIME = "2021-12-07 07:00:00"
|
||||
GMS_PORT = 8080
|
||||
GMS_SERVER = f"http://localhost:{GMS_PORT}"
|
||||
|
||||
test_resources_dir = None
|
||||
test_resources_dir = pathlib.Path(__file__).parent
|
||||
|
||||
config_source_default = {
|
||||
"username": "username",
|
||||
@ -67,9 +67,6 @@ def enable_logging():
|
||||
|
||||
|
||||
def read_response(pytestconfig, file_name):
|
||||
test_resources_dir = pathlib.Path(
|
||||
pytestconfig.rootpath / "tests/integration/tableau"
|
||||
)
|
||||
response_json_path = f"{test_resources_dir}/setup/{file_name}"
|
||||
with open(response_json_path) as file:
|
||||
data = json.loads(file.read())
|
||||
@ -154,9 +151,6 @@ def tableau_ingest_common(
|
||||
sign_out_side_effect=lambda: None,
|
||||
pipeline_name="tableau-test-pipeline",
|
||||
):
|
||||
test_resources_dir = pathlib.Path(
|
||||
pytestconfig.rootpath / "tests/integration/tableau"
|
||||
)
|
||||
with mock.patch(
|
||||
"datahub.ingestion.source.state_provider.datahub_ingestion_checkpointing_provider.DataHubGraph",
|
||||
mock_datahub_graph,
|
||||
|
||||
@ -365,9 +365,9 @@ def assert_for_each_entity(
|
||||
if success:
|
||||
print(f"Succeeded on assertion for urns {success}")
|
||||
if failures:
|
||||
assert (
|
||||
False
|
||||
), f"Failed to find aspect_name {aspect_name} for urns {json.dumps(failures, indent=2)}"
|
||||
raise AssertionError(
|
||||
f"Failed to find aspect_name {aspect_name} for urns {json.dumps(failures, indent=2)}"
|
||||
)
|
||||
|
||||
return len(success)
|
||||
|
||||
|
||||
@ -134,7 +134,8 @@ def test_registry():
|
||||
"console-alias",
|
||||
"console",
|
||||
lambda: warnings.warn(
|
||||
ConfigurationWarning("console-alias is deprecated, use console instead")
|
||||
ConfigurationWarning("console-alias is deprecated, use console instead"),
|
||||
stacklevel=2,
|
||||
),
|
||||
)
|
||||
with pytest.warns(ConfigurationWarning):
|
||||
|
||||
@ -111,7 +111,7 @@ def test_account_id_with_snowflake_host_suffix():
|
||||
"role": "sysadmin",
|
||||
}
|
||||
)
|
||||
config.account_id == "acctname"
|
||||
assert config.account_id == "acctname"
|
||||
|
||||
|
||||
def test_snowflake_uri_default_authentication():
|
||||
|
||||
@ -80,8 +80,11 @@ from datahub.utilities.urns.urn import Urn
|
||||
|
||||
def make_generic_dataset(
|
||||
entity_urn: str = "urn:li:dataset:(urn:li:dataPlatform:bigquery,example1,PROD)",
|
||||
aspects: List[Any] = [models.StatusClass(removed=False)],
|
||||
aspects: Optional[List[Any]] = None,
|
||||
) -> models.MetadataChangeEventClass:
|
||||
if aspects is None:
|
||||
# Default to a status aspect if none is provided.
|
||||
aspects = [models.StatusClass(removed=False)]
|
||||
return models.MetadataChangeEventClass(
|
||||
proposedSnapshot=models.DatasetSnapshotClass(
|
||||
urn=entity_urn,
|
||||
@ -1382,20 +1385,23 @@ def test_mcp_multiple_transformers_replace(mock_time, tmp_path):
|
||||
|
||||
# check on browsePaths aspect
|
||||
for i in range(0, 10):
|
||||
tests.test_helpers.mce_helpers.assert_entity_mcp_aspect(
|
||||
entity_urn=str(
|
||||
DatasetUrn.create_from_ids(
|
||||
platform_id="elasticsearch",
|
||||
table_name=f"fooBarIndex{i}",
|
||||
env="PROD",
|
||||
)
|
||||
),
|
||||
aspect_name="browsePaths",
|
||||
aspect_field_matcher={
|
||||
"paths": [f"/prod/elasticsearch/EsComments/fooBarIndex{i}"]
|
||||
},
|
||||
file=events_file,
|
||||
) == 1
|
||||
assert (
|
||||
tests.test_helpers.mce_helpers.assert_entity_mcp_aspect(
|
||||
entity_urn=str(
|
||||
DatasetUrn.create_from_ids(
|
||||
platform_id="elasticsearch",
|
||||
table_name=f"fooBarIndex{i}",
|
||||
env="PROD",
|
||||
)
|
||||
),
|
||||
aspect_name="browsePaths",
|
||||
aspect_field_matcher={
|
||||
"paths": [f"/prod/elasticsearch/EsComments/fooBarIndex{i}"]
|
||||
},
|
||||
file=events_file,
|
||||
)
|
||||
== 1
|
||||
)
|
||||
|
||||
|
||||
class SuppressingTransformer(BaseTransformer, SingleAspectTransformer):
|
||||
|
||||
@ -81,7 +81,7 @@ def test_file_dict() -> None:
|
||||
|
||||
# Test close.
|
||||
cache.close()
|
||||
with pytest.raises(Exception):
|
||||
with pytest.raises(AttributeError):
|
||||
cache["a"] = 1
|
||||
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user