mirror of
https://github.com/datahub-project/datahub.git
synced 2025-08-21 07:38:13 +00:00
adding allow deny patterns to sql config
This commit is contained in:
parent
62bb7f012f
commit
4b83fc6591
@ -10,7 +10,10 @@
|
|||||||
|
|
||||||
# Run tests
|
# Run tests
|
||||||
- pip install -r test_requirements.txt
|
- pip install -r test_requirements.txt
|
||||||
- pytest
|
# Run Unit tests
|
||||||
|
- pytest tests/unit
|
||||||
|
# Run Integration tests
|
||||||
|
- pytest tests/integration
|
||||||
|
|
||||||
# Sanity check code before checkin (currently broken)
|
# Sanity check code before checkin (currently broken)
|
||||||
- flake8 src test && mypy -p gometa && black --check -l 120 src test && isort --check-only src test && pytest
|
- flake8 src test && mypy -p gometa && black --check -l 120 src test && isort --check-only src test && pytest
|
||||||
|
10
metadata-ingestion/recipes/mssql_to_console.yaml
Normal file
10
metadata-ingestion/recipes/mssql_to_console.yaml
Normal file
@ -0,0 +1,10 @@
|
|||||||
|
---
|
||||||
|
source:
|
||||||
|
type: mssql
|
||||||
|
mssql:
|
||||||
|
username: sa
|
||||||
|
password: test!Password
|
||||||
|
database: DemoData
|
||||||
|
|
||||||
|
sink:
|
||||||
|
type: console
|
@ -7,4 +7,6 @@ source:
|
|||||||
database: DemoData
|
database: DemoData
|
||||||
|
|
||||||
sink:
|
sink:
|
||||||
type: console
|
type: "datahub-rest"
|
||||||
|
datahub-rest:
|
||||||
|
server: 'http://localhost:8080'
|
||||||
|
@ -1,7 +1,8 @@
|
|||||||
from abc import ABC, abstractmethod
|
from abc import ABC, abstractmethod
|
||||||
from typing import TypeVar, Type
|
from typing import TypeVar, Type, List
|
||||||
from pydantic import BaseModel, ValidationError
|
from pydantic import BaseModel, ValidationError
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
import re
|
||||||
|
|
||||||
|
|
||||||
class ConfigModel(BaseModel):
|
class ConfigModel(BaseModel):
|
||||||
@ -29,6 +30,27 @@ class ConfigurationMechanism(ABC):
|
|||||||
def load_config(self, cls: Type[T], config_file: Path) -> T:
|
def load_config(self, cls: Type[T], config_file: Path) -> T:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
class AllowDenyPattern(BaseModel):
|
||||||
|
""" A class to store allow deny regexes"""
|
||||||
|
allow: List[str] = [".*"]
|
||||||
|
deny: List[str] = []
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def allow_all(cls):
|
||||||
|
return AllowDenyPattern()
|
||||||
|
|
||||||
|
def allowed(self, string: str) -> bool:
|
||||||
|
for deny_pattern in self.deny:
|
||||||
|
if re.match(deny_pattern, string):
|
||||||
|
return False
|
||||||
|
|
||||||
|
for allow_pattern in self.allow:
|
||||||
|
if re.match(allow_pattern, string):
|
||||||
|
return True
|
||||||
|
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
class DynamicFactory:
|
class DynamicFactory:
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
|
@ -7,10 +7,11 @@ from gometa.metadata.com.linkedin.pegasus2avro.schema import SchemaMetadata, MyS
|
|||||||
from gometa.metadata.com.linkedin.pegasus2avro.common import AuditStamp
|
from gometa.metadata.com.linkedin.pegasus2avro.common import AuditStamp
|
||||||
|
|
||||||
from gometa.ingestion.api.source import WorkUnit
|
from gometa.ingestion.api.source import WorkUnit
|
||||||
|
from gometa.configuration.common import AllowDenyPattern
|
||||||
from pydantic import BaseModel
|
from pydantic import BaseModel
|
||||||
import logging
|
import logging
|
||||||
import time
|
import time
|
||||||
from typing import Optional
|
from typing import Optional, List
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
@ -23,6 +24,7 @@ class SQLAlchemyConfig(BaseModel):
|
|||||||
database: str = ""
|
database: str = ""
|
||||||
scheme: str
|
scheme: str
|
||||||
options: Optional[dict] = {}
|
options: Optional[dict] = {}
|
||||||
|
table_pattern: AllowDenyPattern = AllowDenyPattern.allow_all()
|
||||||
|
|
||||||
def get_sql_alchemy_url(self):
|
def get_sql_alchemy_url(self):
|
||||||
url=f'{self.scheme}://{self.username}:{self.password}@{self.host_port}/{self.database}'
|
url=f'{self.scheme}://{self.username}:{self.password}@{self.host_port}/{self.database}'
|
||||||
@ -64,6 +66,7 @@ def get_schema_metadata(dataset_name, platform, columns) -> SchemaMetadata:
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def get_column_type(column_type):
|
def get_column_type(column_type):
|
||||||
"""
|
"""
|
||||||
Maps SQLAlchemy types (https://docs.sqlalchemy.org/en/13/core/type_basics.html) to corresponding schema types
|
Maps SQLAlchemy types (https://docs.sqlalchemy.org/en/13/core/type_basics.html) to corresponding schema types
|
||||||
@ -95,6 +98,7 @@ def get_sql_workunits(sql_config:SQLAlchemyConfig, platform: str, env: str = "PR
|
|||||||
database = sql_config.database
|
database = sql_config.database
|
||||||
for schema in inspector.get_schema_names():
|
for schema in inspector.get_schema_names():
|
||||||
for table in inspector.get_table_names(schema):
|
for table in inspector.get_table_names(schema):
|
||||||
|
if sql_config.table_pattern.allowed(f'{schema}.{table}'):
|
||||||
columns = inspector.get_columns(table, schema)
|
columns = inspector.get_columns(table, schema)
|
||||||
mce = MetadataChangeEvent()
|
mce = MetadataChangeEvent()
|
||||||
if database != "":
|
if database != "":
|
||||||
@ -110,5 +114,7 @@ def get_sql_workunits(sql_config:SQLAlchemyConfig, platform: str, env: str = "PR
|
|||||||
dataset_snapshot.aspects.append(schema_metadata)
|
dataset_snapshot.aspects.append(schema_metadata)
|
||||||
mce.proposedSnapshot = dataset_snapshot
|
mce.proposedSnapshot = dataset_snapshot
|
||||||
yield SqlWorkUnit(id=dataset_name, mce = mce)
|
yield SqlWorkUnit(id=dataset_name, mce = mce)
|
||||||
|
else:
|
||||||
|
logger.debug(f"Found table: {schema}.{table}, but skipping due to allow-deny patterns")
|
||||||
|
|
||||||
|
|
||||||
|
@ -10,6 +10,8 @@ def test_ingest(sql_server, pytestconfig):
|
|||||||
ret = subprocess.run(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
ret = subprocess.run(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||||
assert ret.returncode == 0
|
assert ret.returncode == 0
|
||||||
config_file=os.path.join(str(pytestconfig.rootdir), "tests/integration/sql_server", "mssql_to_console.yml")
|
config_file=os.path.join(str(pytestconfig.rootdir), "tests/integration/sql_server", "mssql_to_console.yml")
|
||||||
|
# delete the output directory. TODO: move to a better way to create an output test fixture
|
||||||
|
os.system("rm -rf output")
|
||||||
ingest_command=f'gometa-ingest -c {config_file}'
|
ingest_command=f'gometa-ingest -c {config_file}'
|
||||||
ret = os.system(ingest_command)
|
ret = os.system(ingest_command)
|
||||||
assert ret == 0
|
assert ret == 0
|
||||||
|
15
metadata-ingestion/tests/unit/test_allow_deny.py
Normal file
15
metadata-ingestion/tests/unit/test_allow_deny.py
Normal file
@ -0,0 +1,15 @@
|
|||||||
|
from gometa.configuration.common import AllowDenyPattern
|
||||||
|
|
||||||
|
|
||||||
|
def test_allow_all():
|
||||||
|
pattern = AllowDenyPattern.allow_all()
|
||||||
|
assert pattern.allowed("foo.table") == True
|
||||||
|
|
||||||
|
def test_deny_all():
|
||||||
|
pattern = AllowDenyPattern(allow=[], deny=[".*"])
|
||||||
|
assert pattern.allowed("foo.table") == False
|
||||||
|
|
||||||
|
def test_single_table():
|
||||||
|
pattern = AllowDenyPattern(allow=["foo.mytable"])
|
||||||
|
assert pattern.allowed("foo.mytable") == True
|
||||||
|
|
Loading…
x
Reference in New Issue
Block a user