mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-10-13 01:38:13 +00:00
71 lines
1.6 KiB
Python
71 lines
1.6 KiB
Python
![]() |
import re
|
||
|
from abc import ABC, abstractmethod
|
||
|
from typing import IO, Any, List, Optional
|
||
|
|
||
|
from pydantic import BaseModel
|
||
|
|
||
|
|
||
|
class ConfigModel(BaseModel):
|
||
|
class Config:
|
||
|
extra = "forbid"
|
||
|
|
||
|
|
||
|
class DynamicTypedConfig(ConfigModel):
|
||
|
type: str
|
||
|
# This config type is declared Optional[Any] here. The eventual parser for the
|
||
|
# specified type is responsible for further validation.
|
||
|
config: Optional[Any]
|
||
|
|
||
|
|
||
|
class MetaError(Exception):
|
||
|
"""A base class for all meta exceptions"""
|
||
|
|
||
|
|
||
|
class WorkflowExecutionError(MetaError):
|
||
|
"""An error occurred when executing the workflow"""
|
||
|
|
||
|
|
||
|
class OperationalError(WorkflowExecutionError):
|
||
|
"""An error occurred because of client-provided metadata"""
|
||
|
|
||
|
message: str
|
||
|
info: dict
|
||
|
|
||
|
def __init__(self, message: str, info: dict = None):
|
||
|
self.message = message
|
||
|
if info:
|
||
|
self.info = info
|
||
|
else:
|
||
|
self.info = {}
|
||
|
|
||
|
|
||
|
class ConfigurationError(MetaError):
|
||
|
"""A configuration error has happened"""
|
||
|
|
||
|
|
||
|
class ConfigurationMechanism(ABC):
|
||
|
@abstractmethod
|
||
|
def load_config(self, config_fp: IO) -> dict:
|
||
|
pass
|
||
|
|
||
|
|
||
|
class AllowDenyPattern(ConfigModel):
|
||
|
""" A class to store allow deny regexes"""
|
||
|
|
||
|
allow: List[str] = [".*"]
|
||
|
deny: List[str] = []
|
||
|
|
||
|
@classmethod
|
||
|
def allow_all(cls):
|
||
|
return AllowDenyPattern()
|
||
|
|
||
|
def allowed(self, string: str) -> bool:
|
||
|
for deny_pattern in self.deny:
|
||
|
if re.match(deny_pattern, string):
|
||
|
return False
|
||
|
|
||
|
for allow_pattern in self.allow:
|
||
|
if re.match(allow_pattern, string):
|
||
|
return True
|
||
|
|
||
|
return False
|