diff --git a/metadata-ingestion/README.md b/metadata-ingestion/README.md index fc113b7981..8761cf2b97 100644 --- a/metadata-ingestion/README.md +++ b/metadata-ingestion/README.md @@ -10,7 +10,10 @@ # Run tests - pip install -r test_requirements.txt -- pytest +# Run Unit tests +- pytest tests/unit +# Run Integration tests +- pytest tests/integration # Sanity check code before checkin (currently broken) - flake8 src test && mypy -p gometa && black --check -l 120 src test && isort --check-only src test && pytest diff --git a/metadata-ingestion/recipes/mssql_to_console.yaml b/metadata-ingestion/recipes/mssql_to_console.yaml new file mode 100644 index 0000000000..096a79e9f4 --- /dev/null +++ b/metadata-ingestion/recipes/mssql_to_console.yaml @@ -0,0 +1,10 @@ +--- +source: + type: mssql + mssql: + username: sa + password: test!Password + database: DemoData + +sink: + type: console diff --git a/metadata-ingestion/recipes/mssql_to_datahub.yaml b/metadata-ingestion/recipes/mssql_to_datahub.yaml index 096a79e9f4..7c6e1883fc 100644 --- a/metadata-ingestion/recipes/mssql_to_datahub.yaml +++ b/metadata-ingestion/recipes/mssql_to_datahub.yaml @@ -7,4 +7,6 @@ source: database: DemoData sink: - type: console + type: "datahub-rest" + datahub-rest: + server: 'http://localhost:8080' diff --git a/metadata-ingestion/src/gometa/configuration/common.py b/metadata-ingestion/src/gometa/configuration/common.py index 63128958f4..db65f4627e 100644 --- a/metadata-ingestion/src/gometa/configuration/common.py +++ b/metadata-ingestion/src/gometa/configuration/common.py @@ -1,7 +1,8 @@ from abc import ABC, abstractmethod -from typing import TypeVar, Type +from typing import TypeVar, Type, List from pydantic import BaseModel, ValidationError from pathlib import Path +import re class ConfigModel(BaseModel): @@ -29,6 +30,27 @@ class ConfigurationMechanism(ABC): def load_config(self, cls: Type[T], config_file: Path) -> T: pass +class AllowDenyPattern(BaseModel): + """ A class to store allow deny regexes""" + allow: List[str] = [".*"] + deny: List[str] = [] + + @classmethod + def allow_all(cls): + return AllowDenyPattern() + + def allowed(self, string: str) -> bool: + for deny_pattern in self.deny: + if re.match(deny_pattern, string): + return False + + for allow_pattern in self.allow: + if re.match(allow_pattern, string): + return True + + return False + + class DynamicFactory: def __init__(self): diff --git a/metadata-ingestion/src/gometa/ingestion/source/sql_common.py b/metadata-ingestion/src/gometa/ingestion/source/sql_common.py index 1b5c9cbf72..3bd87fb2f8 100644 --- a/metadata-ingestion/src/gometa/ingestion/source/sql_common.py +++ b/metadata-ingestion/src/gometa/ingestion/source/sql_common.py @@ -7,10 +7,11 @@ from gometa.metadata.com.linkedin.pegasus2avro.schema import SchemaMetadata, MyS from gometa.metadata.com.linkedin.pegasus2avro.common import AuditStamp from gometa.ingestion.api.source import WorkUnit +from gometa.configuration.common import AllowDenyPattern from pydantic import BaseModel import logging import time -from typing import Optional +from typing import Optional, List from dataclasses import dataclass logger = logging.getLogger(__name__) @@ -23,6 +24,7 @@ class SQLAlchemyConfig(BaseModel): database: str = "" scheme: str options: Optional[dict] = {} + table_pattern: AllowDenyPattern = AllowDenyPattern.allow_all() def get_sql_alchemy_url(self): url=f'{self.scheme}://{self.username}:{self.password}@{self.host_port}/{self.database}' @@ -64,6 +66,7 @@ def get_schema_metadata(dataset_name, platform, columns) -> SchemaMetadata: + def get_column_type(column_type): """ Maps SQLAlchemy types (https://docs.sqlalchemy.org/en/13/core/type_basics.html) to corresponding schema types @@ -95,20 +98,23 @@ def get_sql_workunits(sql_config:SQLAlchemyConfig, platform: str, env: str = "PR database = sql_config.database for schema in inspector.get_schema_names(): for table in inspector.get_table_names(schema): - columns = inspector.get_columns(table, schema) - mce = MetadataChangeEvent() - if database != "": - dataset_name = f'{database}.{schema}.{table}' - else: - dataset_name = f'{schema}.{table}' + if sql_config.table_pattern.allowed(f'{schema}.{table}'): + columns = inspector.get_columns(table, schema) + mce = MetadataChangeEvent() + if database != "": + dataset_name = f'{database}.{schema}.{table}' + else: + dataset_name = f'{schema}.{table}' - dataset_snapshot = DatasetSnapshot() - dataset_snapshot.urn=( - f"urn:li:dataset:(urn:li:dataPlatform:{platform},{dataset_name},{env})" - ) - schema_metadata = get_schema_metadata(dataset_name, platform, columns) - dataset_snapshot.aspects.append(schema_metadata) - mce.proposedSnapshot = dataset_snapshot - yield SqlWorkUnit(id=dataset_name, mce = mce) + dataset_snapshot = DatasetSnapshot() + dataset_snapshot.urn=( + f"urn:li:dataset:(urn:li:dataPlatform:{platform},{dataset_name},{env})" + ) + schema_metadata = get_schema_metadata(dataset_name, platform, columns) + dataset_snapshot.aspects.append(schema_metadata) + mce.proposedSnapshot = dataset_snapshot + yield SqlWorkUnit(id=dataset_name, mce = mce) + else: + logger.debug(f"Found table: {schema}.{table}, but skipping due to allow-deny patterns") diff --git a/metadata-ingestion/tests/integration/sql_server/test_sql_server.py b/metadata-ingestion/tests/integration/sql_server/test_sql_server.py index c590590359..078a7c93b8 100644 --- a/metadata-ingestion/tests/integration/sql_server/test_sql_server.py +++ b/metadata-ingestion/tests/integration/sql_server/test_sql_server.py @@ -10,6 +10,8 @@ def test_ingest(sql_server, pytestconfig): ret = subprocess.run(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) assert ret.returncode == 0 config_file=os.path.join(str(pytestconfig.rootdir), "tests/integration/sql_server", "mssql_to_console.yml") + # delete the output directory. TODO: move to a better way to create an output test fixture + os.system("rm -rf output") ingest_command=f'gometa-ingest -c {config_file}' ret = os.system(ingest_command) assert ret == 0 diff --git a/metadata-ingestion/tests/unit/test_allow_deny.py b/metadata-ingestion/tests/unit/test_allow_deny.py new file mode 100644 index 0000000000..b981f571ef --- /dev/null +++ b/metadata-ingestion/tests/unit/test_allow_deny.py @@ -0,0 +1,15 @@ +from gometa.configuration.common import AllowDenyPattern + + +def test_allow_all(): + pattern = AllowDenyPattern.allow_all() + assert pattern.allowed("foo.table") == True + +def test_deny_all(): + pattern = AllowDenyPattern(allow=[], deny=[".*"]) + assert pattern.allowed("foo.table") == False + +def test_single_table(): + pattern = AllowDenyPattern(allow=["foo.mytable"]) + assert pattern.allowed("foo.mytable") == True +