feat(mssql): add multi database ingest support (#5516)

* feat(mssql): add multi database ingest support

* Delete older golden file.

* Update s3.md

* fix test setup
This commit is contained in:
Mugdha Hardikar 2022-08-16 10:07:47 +05:30 committed by GitHub
parent dfd0d1581b
commit a449e8ba7d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 5256 additions and 27 deletions

View File

@ -19,7 +19,7 @@ s3://my-bucket/*/*/{table}/{partition[0]}/{partition[1]}/{partition[2]}/*.* # ta
- s3://my-bucket/hr/**
- **/tests/*.csv
- s3://my-bucket/foo/*/my_table/**
-
### Notes
- {table} represents folder for which dataset will be created.

View File

@ -1,15 +1,18 @@
import logging
import urllib.parse
from typing import Any, Dict, List, Optional, Tuple
from typing import Any, Dict, Iterable, List, Optional, Tuple
import pydantic
# This import verifies that the dependencies are available.
import sqlalchemy_pytds # noqa: F401
from pydantic.fields import Field
from sqlalchemy import create_engine, inspect
from sqlalchemy.engine.base import Connection
from sqlalchemy.engine.reflection import Inspector
from sqlalchemy.engine.result import ResultProxy, RowProxy
from datahub.configuration.common import AllowDenyPattern
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.decorators import (
SourceCapability,
@ -22,8 +25,11 @@ from datahub.ingestion.api.decorators import (
from datahub.ingestion.source.sql.sql_common import (
BasicSQLAlchemyConfig,
SQLAlchemySource,
make_sqlalchemy_uri,
)
logger: logging.Logger = logging.getLogger(__name__)
class SQLServerConfig(BasicSQLAlchemyConfig):
# defaults
@ -37,6 +43,19 @@ class SQLServerConfig(BasicSQLAlchemyConfig):
default={},
desscription="Arguments to URL-encode when connecting. See https://docs.microsoft.com/en-us/sql/connect/odbc/dsn-connection-string-attribute?view=sql-server-ver15.",
)
database_pattern: AllowDenyPattern = Field(
default=AllowDenyPattern.allow_all(),
description="Regex patterns for databases to filter in ingestion.",
)
database: Optional[str] = Field(
default=None,
description="database (catalog). If set to Null, all databases will be considered for ingestion.",
)
database_alias: Optional[str] = Field(
default=None,
description="Alias to apply to database when ingesting. Ignored when `database` is not set.",
)
@pydantic.validator("uri_args")
def passwords_match(cls, v, values, **kwargs):
@ -46,26 +65,29 @@ class SQLServerConfig(BasicSQLAlchemyConfig):
raise ValueError("uri_args is not supported when ODBC is disabled")
return v
def get_sql_alchemy_url(self, uri_opts: Optional[Dict[str, Any]] = None) -> str:
def get_sql_alchemy_url(
self,
uri_opts: Optional[Dict[str, Any]] = None,
current_db: Optional[str] = None,
) -> str:
if self.use_odbc:
# Ensure that the import is available.
import pyodbc # noqa: F401
self.scheme = "mssql+pyodbc"
uri: str = super().get_sql_alchemy_url(uri_opts=uri_opts)
uri: str = self.sqlalchemy_uri or make_sqlalchemy_uri(
self.scheme, # type: ignore
self.username,
self.password.get_secret_value() if self.password else None,
self.host_port, # type: ignore
current_db if current_db else self.database,
uri_opts=uri_opts,
)
if self.use_odbc:
uri = f"{uri}?{urllib.parse.urlencode(self.uri_args)}"
return uri
def get_identifier(self, schema: str, table: str) -> str:
regular = f"{schema}.{table}"
if self.database_alias:
return f"{self.database_alias}.{regular}"
if self.database:
return f"{self.database}.{regular}"
return regular
@platform_name("Microsoft SQL Server", id="mssql")
@config_class(SQLServerConfig)
@ -93,8 +115,9 @@ class SQLServerSource(SQLAlchemySource):
def __init__(self, config: SQLServerConfig, ctx: PipelineContext):
super().__init__(config, ctx, "mssql")
# Cache the table and column descriptions
self.config: SQLServerConfig = config
self.current_database = None
self.table_descriptions: Dict[str, str] = {}
self.column_descriptions: Dict[str, str] = {}
for inspector in self.get_inspectors():
@ -183,3 +206,40 @@ class SQLServerSource(SQLAlchemySource):
if description:
column["comment"] = description
return columns
def get_inspectors(self) -> Iterable[Inspector]:
# This method can be overridden in the case that you want to dynamically
# run on multiple databases.
url = self.config.get_sql_alchemy_url()
logger.debug(f"sql_alchemy_url={url}")
engine = create_engine(url, **self.config.options)
with engine.connect() as conn:
if self.config.database and self.config.database != "":
inspector = inspect(conn)
yield inspector
else:
databases = conn.execute(
"SELECT name FROM master.sys.databases WHERE name NOT IN \
('master', 'model', 'msdb', 'tempdb', 'Resource', \
'distribution' , 'reportserver', 'reportservertempdb'); "
)
for db in databases:
if self.config.database_pattern.allowed(db["name"]):
url = self.config.get_sql_alchemy_url(current_db=db["name"])
inspector = inspect(
create_engine(url, **self.config.options).connect()
)
self.current_database = db["name"]
yield inspector
def get_identifier(
self, *, schema: str, entity: str, inspector: Inspector, **kwargs: Any
) -> str:
regular = f"{schema}.{entity}"
if self.config.database:
if self.config.database_alias:
return f"{self.config.database_alias}.{regular}"
return f"{self.config.database}.{regular}"
if self.current_database:
return f"{self.current_database}.{regular}"
return regular

View File

@ -887,6 +887,8 @@
"jsonPath": null,
"nullable": true,
"description": null,
"created": null,
"lastModified": null,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.NumberType": {}
@ -897,6 +899,7 @@
"globalTags": null,
"glossaryTerms": null,
"isPartOfKey": false,
"isPartitioningKey": null,
"jsonProps": null
},
{
@ -904,6 +907,8 @@
"jsonPath": null,
"nullable": true,
"description": null,
"created": null,
"lastModified": null,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.StringType": {}
@ -914,6 +919,7 @@
"globalTags": null,
"glossaryTerms": null,
"isPartOfKey": false,
"isPartitioningKey": null,
"jsonProps": null
}
],
@ -1100,6 +1106,8 @@
"jsonPath": null,
"nullable": true,
"description": null,
"created": null,
"lastModified": null,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.NumberType": {}
@ -1110,6 +1118,7 @@
"globalTags": null,
"glossaryTerms": null,
"isPartOfKey": false,
"isPartitioningKey": null,
"jsonProps": null
},
{
@ -1117,6 +1126,8 @@
"jsonPath": null,
"nullable": true,
"description": null,
"created": null,
"lastModified": null,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.StringType": {}
@ -1127,6 +1138,7 @@
"globalTags": null,
"glossaryTerms": null,
"isPartOfKey": false,
"isPartitioningKey": null,
"jsonProps": null
}
],
@ -1237,6 +1249,8 @@
"jsonPath": null,
"nullable": false,
"description": null,
"created": null,
"lastModified": null,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.NumberType": {}
@ -1247,6 +1261,7 @@
"globalTags": null,
"glossaryTerms": null,
"isPartOfKey": true,
"isPartitioningKey": null,
"jsonProps": null
},
{
@ -1254,6 +1269,8 @@
"jsonPath": null,
"nullable": false,
"description": "Description for column LastName of table Persons of schema Foo.",
"created": null,
"lastModified": null,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.StringType": {}
@ -1264,6 +1281,7 @@
"globalTags": null,
"glossaryTerms": null,
"isPartOfKey": false,
"isPartitioningKey": null,
"jsonProps": null
},
{
@ -1271,6 +1289,8 @@
"jsonPath": null,
"nullable": true,
"description": null,
"created": null,
"lastModified": null,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.StringType": {}
@ -1281,6 +1301,7 @@
"globalTags": null,
"glossaryTerms": null,
"isPartOfKey": false,
"isPartitioningKey": null,
"jsonProps": null
},
{
@ -1288,6 +1309,8 @@
"jsonPath": null,
"nullable": true,
"description": null,
"created": null,
"lastModified": null,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.NumberType": {}
@ -1298,6 +1321,7 @@
"globalTags": null,
"glossaryTerms": null,
"isPartOfKey": false,
"isPartitioningKey": null,
"jsonProps": null
}
],
@ -1408,6 +1432,8 @@
"jsonPath": null,
"nullable": false,
"description": null,
"created": null,
"lastModified": null,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.NumberType": {}
@ -1418,6 +1444,7 @@
"globalTags": null,
"glossaryTerms": null,
"isPartOfKey": true,
"isPartitioningKey": null,
"jsonProps": null
},
{
@ -1425,6 +1452,8 @@
"jsonPath": null,
"nullable": true,
"description": null,
"created": null,
"lastModified": null,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.StringType": {}
@ -1435,6 +1464,7 @@
"globalTags": null,
"glossaryTerms": null,
"isPartOfKey": false,
"isPartitioningKey": null,
"jsonProps": null
}
],

View File

@ -1,3 +1,20 @@
CREATE DATABASE NewData;
GO
USE NewData;
GO
CREATE TABLE ProductsNew (ID int, ProductName nvarchar(max));
GO
CREATE SCHEMA FooNew;
GO
CREATE TABLE FooNew.ItemsNew (ID int, ItemName nvarchar(max));
GO
CREATE TABLE FooNew.PersonsNew (
ID int NOT NULL PRIMARY KEY,
LastName varchar(255) NOT NULL,
FirstName varchar(255),
Age int
);
CREATE DATABASE DemoData;
GO
USE DemoData;

View File

@ -0,0 +1,13 @@
run_id: mssql-test
source:
type: mssql
config:
username: sa
password: test!Password
host_port: localhost:51433
sink:
type: file
config:
filename: "./mssql_mces.json"

View File

@ -0,0 +1,16 @@
run_id: mssql-test
source:
type: mssql
config:
username: sa
password: test!Password
host_port: localhost:51433
database_pattern:
deny:
- NewData
sink:
type: file
config:
filename: "./mssql_mces.json"

View File

@ -1,3 +1,4 @@
import os
import subprocess
import time
@ -8,10 +9,9 @@ from tests.test_helpers.click_helpers import run_datahub_cmd
from tests.test_helpers.docker_helpers import wait_for_port
@pytest.mark.integration
def test_mssql_ingest(docker_compose_runner, pytestconfig, tmp_path, mock_time):
@pytest.fixture(scope="module")
def mssql_runner(docker_compose_runner, pytestconfig):
test_resources_dir = pytestconfig.rootpath / "tests/integration/sql_server"
with docker_compose_runner(
test_resources_dir / "docker-compose.yml", "sql-server"
) as docker_services:
@ -28,16 +28,27 @@ def test_mssql_ingest(docker_compose_runner, pytestconfig, tmp_path, mock_time):
command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
assert ret.returncode == 0
yield docker_services
# Run the metadata ingestion pipeline.
config_file = (test_resources_dir / "mssql_to_file.yml").resolve()
run_datahub_cmd(
["ingest", "-c", f"{config_file}"], tmp_path=tmp_path, check_result=True
)
# Verify the output.
mce_helpers.check_golden_file(
pytestconfig,
output_path=tmp_path / "mssql_mces.json",
golden_path=test_resources_dir / "mssql_mces_golden.json",
)
SOURCE_FILES_PATH = "./tests/integration/sql_server/source_files"
config_file = os.listdir(SOURCE_FILES_PATH)
@pytest.mark.parametrize("config_file", config_file)
@pytest.mark.integration
def test_mssql_ingest(mssql_runner, pytestconfig, tmp_path, mock_time, config_file):
test_resources_dir = pytestconfig.rootpath / "tests/integration/sql_server"
# Run the metadata ingestion pipeline.
config_file_path = (test_resources_dir / f"source_files/{config_file}").resolve()
run_datahub_cmd(
["ingest", "-c", f"{config_file_path}"], tmp_path=tmp_path, check_result=True
)
# Verify the output.
mce_helpers.check_golden_file(
pytestconfig,
output_path=tmp_path / "mssql_mces.json",
golden_path=test_resources_dir
/ f"golden_files/golden_mces_{config_file.replace('yml','json')}",
)