Adding mysql integration test + source

This commit is contained in:
Shirshanka Das 2021-02-09 01:02:05 -08:00 committed by Shirshanka Das
parent e03a9e25f8
commit 4e17a24dd4
15 changed files with 378 additions and 101 deletions

View File

@ -2,14 +2,14 @@ from typing import Dict, Type
from gometa.ingestion.api.source import Source
from .mssql import SQLServerSource
# from .mysql import MySQLSource
from .mysql import MySQLSource
from .kafka import KafkaSource
# from .ldap import LDAPSource
from .mce_file import MetadataFileSource
source_class_mapping: Dict[str, Type[Source]] = {
"mssql": SQLServerSource,
# "mysql": MySQLSource,
"mysql": MySQLSource,
"kafka": KafkaSource,
# "ldap": LDAPSource,
"file": MetadataFileSource,

View File

@ -1,27 +1,17 @@
from pydantic import BaseModel
from typing import Optional
from .sql_common import SQLAlchemyConfig, get_sql_workunits
from gometa.ingestion.api.source import Source
from .sql_common import SQLAlchemySource, SQLAlchemyConfig
class SQLServerConfig(SQLAlchemyConfig):
#defaults
host_port = "localhost:1433"
scheme = "mssql+pytds"
class SQLServerSource(Source):
class SQLServerSource(SQLAlchemySource):
def __init__(self, config, ctx):
super().__init__(ctx)
self.config = config
super().__init__(config, ctx, "mssql")
@classmethod
def create(cls, config_dict, ctx):
config = SQLServerConfig.parse_obj(config_dict)
return cls(config, ctx)
def get_workunits(self):
return get_sql_workunits(self.config, "mssql")
def close(self):
pass

View File

@ -1,19 +1,15 @@
from pydantic import BaseModel
from typing import Optional
from .sql_common import SQLAlchemyConfig, get_sql_workunits
from gometa.ingestion.api.source import Source
from .sql_common import SQLAlchemySource, SQLAlchemyConfig
class MySQLConfig(SQLAlchemyConfig):
#defaults
host_port = "localhost:3306"
scheme = "mysql+pymysql"
class MySQLSource(Source):
def configure(self, config_dict):
self.config = MySQLConfig.parse_obj(config_dict)
class MySQLSource(SQLAlchemySource):
def __init__(self, config, ctx):
super().__init__(config, ctx, "mysql")
def get_workunits(self):
return get_sql_workunits(self.config, "mysql")
def close(self):
pass
@classmethod
def create(cls, config_dict, ctx):
config = MySQLConfig.parse_obj(config_dict)
return cls(config, ctx)

View File

@ -6,7 +6,7 @@ from gometa.metadata.com.linkedin.pegasus2avro.metadata.snapshot import DatasetS
from gometa.metadata.com.linkedin.pegasus2avro.schema import SchemaMetadata, MySqlDDL
from gometa.metadata.com.linkedin.pegasus2avro.common import AuditStamp
from gometa.ingestion.api.source import WorkUnit
from gometa.ingestion.api.source import WorkUnit, Source
from gometa.configuration.common import AllowDenyPattern
from pydantic import BaseModel
import logging
@ -40,16 +40,42 @@ class SqlWorkUnit(WorkUnit):
return {'mce': self.mce}
def get_column_type(column_type):
"""
Maps SQLAlchemy types (https://docs.sqlalchemy.org/en/13/core/type_basics.html) to corresponding schema types
"""
if isinstance(column_type, (types.Integer, types.Numeric)):
return ("com.linkedin.pegasus2avro.schema.NumberType", {})
if isinstance(column_type, (types.Boolean)):
return ("com.linkedin.pegasus2avro.schema.BooleanType", {})
if isinstance(column_type, (types.Enum)):
return ("com.linkedin.pegasus2avro.schema.EnumType", {})
if isinstance(column_type, (types._Binary, types.PickleType)):
return ("com.linkedin.pegasus2avro.schema.BytesType", {})
if isinstance(column_type, (types.ARRAY)):
return ("com.linkedin.pegasus2avro.schema.ArrayType", {})
if isinstance(column_type, (types.String)):
return ("com.linkedin.pegasus2avro.schema.StringType", {})
return ("com.linkedin.pegasus2avro.schema.NullType", {})
def get_schema_metadata(dataset_name, platform, columns) -> SchemaMetadata:
canonical_schema = [ {
"fieldPath": column["name"],
"nativeDataType": repr(column["type"]),
"type": { "type":get_column_type(column["type"]) },
"type": { "type": get_column_type(column["type"]) },
"description": column.get("comment", None)
} for column in columns ]
actor, sys_time = "urn:li:corpuser:etl", int(time.time()) * 1000
schema_metadata = SchemaMetadata(
schemaName=dataset_name,
@ -67,54 +93,44 @@ def get_schema_metadata(dataset_name, platform, columns) -> SchemaMetadata:
def get_column_type(column_type):
"""
Maps SQLAlchemy types (https://docs.sqlalchemy.org/en/13/core/type_basics.html) to corresponding schema types
"""
if isinstance(column_type, (types.Integer, types.Numeric)):
return ("com.linkedin.pegasus2avro.schema.NumberType", {})
class SQLAlchemySource(Source):
"""A Base class for all SQL Sources that use SQLAlchemy to extend"""
if isinstance(column_type, (types.Boolean)):
return ("com.linkedin.pegasus2avro.schema.BooleanType", {})
def __init__(self, config, ctx, platform: str):
super().__init__(ctx)
self.config = config
self.platform = platform
if isinstance(column_type, (types.Enum)):
return ("com.linkedin.pegasus2avro.schema.EnumType", {})
if isinstance(column_type, (types._Binary, types.PickleType)):
return ("com.linkedin.pegasus2avro.schema.BytesType", {})
def get_workunits(self):
env:str = "PROD"
sql_config = self.config
platform = self.platform
url = sql_config.get_sql_alchemy_url()
engine = create_engine(url, **sql_config.options)
inspector = reflection.Inspector.from_engine(engine)
database = sql_config.database
for schema in inspector.get_schema_names():
for table in inspector.get_table_names(schema):
if sql_config.table_pattern.allowed(f'{schema}.{table}'):
columns = inspector.get_columns(table, schema)
mce = MetadataChangeEvent()
if database != "":
dataset_name = f'{database}.{schema}.{table}'
else:
dataset_name = f'{schema}.{table}'
if isinstance(column_type, (types.ARRAY)):
return ("com.linkedin.pegasus2avro.schema.ArrayType", {})
if isinstance(column_type, (types.String)):
return ("com.linkedin.pegasus2avro.schema.StringType", {})
return ("com.linkedin.pegasus2avro.schema.NullType", {})
def get_sql_workunits(sql_config:SQLAlchemyConfig, platform: str, env: str = "PROD"):
url = sql_config.get_sql_alchemy_url()
engine = create_engine(url, **sql_config.options)
inspector = reflection.Inspector.from_engine(engine)
database = sql_config.database
for schema in inspector.get_schema_names():
for table in inspector.get_table_names(schema):
if sql_config.table_pattern.allowed(f'{schema}.{table}'):
columns = inspector.get_columns(table, schema)
mce = MetadataChangeEvent()
if database != "":
dataset_name = f'{database}.{schema}.{table}'
dataset_snapshot = DatasetSnapshot()
dataset_snapshot.urn=(
f"urn:li:dataset:(urn:li:dataPlatform:{platform},{dataset_name},{env})"
)
schema_metadata = get_schema_metadata(dataset_name, platform, columns)
dataset_snapshot.aspects.append(schema_metadata)
mce.proposedSnapshot = dataset_snapshot
yield SqlWorkUnit(id=dataset_name, mce = mce)
else:
dataset_name = f'{schema}.{table}'
dataset_snapshot = DatasetSnapshot()
dataset_snapshot.urn=(
f"urn:li:dataset:(urn:li:dataPlatform:{platform},{dataset_name},{env})"
)
schema_metadata = get_schema_metadata(dataset_name, platform, columns)
dataset_snapshot.aspects.append(schema_metadata)
mce.proposedSnapshot = dataset_snapshot
yield SqlWorkUnit(id=dataset_name, mce = mce)
else:
logger.debug(f"Found table: {schema}.{table}, but skipping due to allow-deny patterns")
logger.debug(f"Found table: {schema}.{table}, but skipping due to allow-deny patterns")
def close(self):
pass

View File

@ -1,5 +1,5 @@
pytest_plugins = [
"tests.integration.fixtures.sql_server"
"tests.integration.fixtures.sql_fixtures"
]

View File

@ -10,3 +10,15 @@ services:
- 51433:1433
volumes:
- ./sql_server/setup:/setup
testmysql:
image: mysql
container_name: "testmysql"
command: --default-authentication-plugin=mysql_native_password
environment:
MYSQL_ROOT_PASSWORD: example
ports:
- 53306:3306
volumes:
- ./mysql/setup:/setup
- ./mysql/setup/setup.sql:/docker-entrypoint-initdb.d/setup.sql

View File

@ -0,0 +1,31 @@
import os
import pytest
def is_responsive(container: str, port: int):
ret = os.system(f"docker exec {container} /setup/wait-for-it.sh localhost:{port}")
return ret == 0
@pytest.fixture(scope="session")
def docker_compose_file(pytestconfig):
return os.path.join(str(pytestconfig.rootdir), "tests/integration/", "docker-compose.yml")
def wait_for_db(docker_services, container_name, container_port):
port = docker_services.port_for(container_name, container_port)
docker_services.wait_until_responsive(
timeout=30.0, pause=0.1, check=lambda: is_responsive(container_name, container_port))
import time
time.sleep(5)
return port
@pytest.fixture(scope="session")
def sql_server(docker_ip, docker_services):
return wait_for_db(docker_services, "testsqlserver", 1433)
@pytest.fixture(scope="session")
def mysql(docker_ip, docker_services):
return wait_for_db(docker_services, "testmysql", 3306)

View File

@ -1,23 +0,0 @@
import os
import pytest
def is_responsive(container: str):
ready_string="SQL Server is now ready for client connections."
ret = os.system(f"docker exec {container} /setup/wait-for-it.sh localhost:1433")
return ret == 0
@pytest.fixture(scope="session")
def docker_compose_file(pytestconfig):
return os.path.join(str(pytestconfig.rootdir), "tests/integration/", "docker-compose.yml")
@pytest.fixture(scope="session")
def sql_server(docker_ip, docker_services):
port = docker_services.port_for("testsqlserver", 1433)
docker_services.wait_until_responsive(
timeout=30.0, pause=0.1, check=lambda: is_responsive("testsqlserver"))
import time
time.sleep(5)
return port

View File

@ -0,0 +1,12 @@
---
source:
type: mysql
mysql:
username: root
password: example
database: metagalaxy
host_port: localhost:53306
sink:
type: file
run_id: test

View File

@ -0,0 +1,3 @@
[client]
user=root
password=example

View File

@ -0,0 +1,45 @@
create database metagalaxy;
use metagalaxy;
-- create metadata aspect table
create table metadata_aspect (
urn varchar(500) not null,
aspect varchar(200) not null,
version bigint(20) not null,
metadata longtext not null,
createdon datetime(6) not null,
createdby varchar(255) not null,
createdfor varchar(255),
constraint pk_metadata_aspect primary key (urn,aspect,version)
);
-- create default records for datahub user
insert into metadata_aspect (urn, aspect, version, metadata, createdon, createdby) values(
'urn:li:corpuser:datahub',
'com.linkedin.identity.CorpUserInfo',
0,
'{"displayName":"Data Hub","active":true,"fullName":"Data Hub","email":"datahub@linkedin.com"}',
now(),
'urn:li:principal:datahub'
), (
'urn:li:corpuser:datahub',
'com.linkedin.identity.CorpUserEditableInfo',
0,
'{"skills":[],"teams":[],"pictureLink":"https://raw.githubusercontent.com/linkedin/datahub/master/datahub-web/packages/data-portal/public/assets/images/default_avatar.png"}',
now(),
'urn:li:principal:datahub'
);
-- create metadata index table
CREATE TABLE metadata_index (
`id` BIGINT NOT NULL AUTO_INCREMENT,
`urn` VARCHAR(200) NOT NULL,
`aspect` VARCHAR(150) NOT NULL,
`path` VARCHAR(150) NOT NULL,
`longVal` BIGINT,
`stringVal` VARCHAR(200),
`doubleVal` DOUBLE,
CONSTRAINT id_pk PRIMARY KEY (id),
INDEX longIndex (`urn`,`aspect`,`path`,`longVal`),
INDEX stringIndex (`urn`,`aspect`,`path`,`stringVal`),
INDEX doubleIndex (`urn`,`aspect`,`path`,`doubleVal`)
);

View File

@ -0,0 +1,182 @@
#!/usr/bin/env bash
# Use this script to test if a given TCP host/port are available
WAITFORIT_cmdname=${0##*/}
echoerr() { if [[ $WAITFORIT_QUIET -ne 1 ]]; then echo "$@" 1>&2; fi }
usage()
{
cat << USAGE >&2
Usage:
$WAITFORIT_cmdname host:port [-s] [-t timeout] [-- command args]
-h HOST | --host=HOST Host or IP under test
-p PORT | --port=PORT TCP port under test
Alternatively, you specify the host and port as host:port
-s | --strict Only execute subcommand if the test succeeds
-q | --quiet Don't output any status messages
-t TIMEOUT | --timeout=TIMEOUT
Timeout in seconds, zero for no timeout
-- COMMAND ARGS Execute command with args after the test finishes
USAGE
exit 1
}
wait_for()
{
if [[ $WAITFORIT_TIMEOUT -gt 0 ]]; then
echoerr "$WAITFORIT_cmdname: waiting $WAITFORIT_TIMEOUT seconds for $WAITFORIT_HOST:$WAITFORIT_PORT"
else
echoerr "$WAITFORIT_cmdname: waiting for $WAITFORIT_HOST:$WAITFORIT_PORT without a timeout"
fi
WAITFORIT_start_ts=$(date +%s)
while :
do
if [[ $WAITFORIT_ISBUSY -eq 1 ]]; then
nc -z $WAITFORIT_HOST $WAITFORIT_PORT
WAITFORIT_result=$?
else
(echo -n > /dev/tcp/$WAITFORIT_HOST/$WAITFORIT_PORT) >/dev/null 2>&1
WAITFORIT_result=$?
fi
if [[ $WAITFORIT_result -eq 0 ]]; then
WAITFORIT_end_ts=$(date +%s)
echoerr "$WAITFORIT_cmdname: $WAITFORIT_HOST:$WAITFORIT_PORT is available after $((WAITFORIT_end_ts - WAITFORIT_start_ts)) seconds"
break
fi
sleep 1
done
return $WAITFORIT_result
}
wait_for_wrapper()
{
# In order to support SIGINT during timeout: http://unix.stackexchange.com/a/57692
if [[ $WAITFORIT_QUIET -eq 1 ]]; then
timeout $WAITFORIT_BUSYTIMEFLAG $WAITFORIT_TIMEOUT $0 --quiet --child --host=$WAITFORIT_HOST --port=$WAITFORIT_PORT --timeout=$WAITFORIT_TIMEOUT &
else
timeout $WAITFORIT_BUSYTIMEFLAG $WAITFORIT_TIMEOUT $0 --child --host=$WAITFORIT_HOST --port=$WAITFORIT_PORT --timeout=$WAITFORIT_TIMEOUT &
fi
WAITFORIT_PID=$!
trap "kill -INT -$WAITFORIT_PID" INT
wait $WAITFORIT_PID
WAITFORIT_RESULT=$?
if [[ $WAITFORIT_RESULT -ne 0 ]]; then
echoerr "$WAITFORIT_cmdname: timeout occurred after waiting $WAITFORIT_TIMEOUT seconds for $WAITFORIT_HOST:$WAITFORIT_PORT"
fi
return $WAITFORIT_RESULT
}
# process arguments
while [[ $# -gt 0 ]]
do
case "$1" in
*:* )
WAITFORIT_hostport=(${1//:/ })
WAITFORIT_HOST=${WAITFORIT_hostport[0]}
WAITFORIT_PORT=${WAITFORIT_hostport[1]}
shift 1
;;
--child)
WAITFORIT_CHILD=1
shift 1
;;
-q | --quiet)
WAITFORIT_QUIET=1
shift 1
;;
-s | --strict)
WAITFORIT_STRICT=1
shift 1
;;
-h)
WAITFORIT_HOST="$2"
if [[ $WAITFORIT_HOST == "" ]]; then break; fi
shift 2
;;
--host=*)
WAITFORIT_HOST="${1#*=}"
shift 1
;;
-p)
WAITFORIT_PORT="$2"
if [[ $WAITFORIT_PORT == "" ]]; then break; fi
shift 2
;;
--port=*)
WAITFORIT_PORT="${1#*=}"
shift 1
;;
-t)
WAITFORIT_TIMEOUT="$2"
if [[ $WAITFORIT_TIMEOUT == "" ]]; then break; fi
shift 2
;;
--timeout=*)
WAITFORIT_TIMEOUT="${1#*=}"
shift 1
;;
--)
shift
WAITFORIT_CLI=("$@")
break
;;
--help)
usage
;;
*)
echoerr "Unknown argument: $1"
usage
;;
esac
done
if [[ "$WAITFORIT_HOST" == "" || "$WAITFORIT_PORT" == "" ]]; then
echoerr "Error: you need to provide a host and port to test."
usage
fi
WAITFORIT_TIMEOUT=${WAITFORIT_TIMEOUT:-15}
WAITFORIT_STRICT=${WAITFORIT_STRICT:-0}
WAITFORIT_CHILD=${WAITFORIT_CHILD:-0}
WAITFORIT_QUIET=${WAITFORIT_QUIET:-0}
# Check to see if timeout is from busybox?
WAITFORIT_TIMEOUT_PATH=$(type -p timeout)
WAITFORIT_TIMEOUT_PATH=$(realpath $WAITFORIT_TIMEOUT_PATH 2>/dev/null || readlink -f $WAITFORIT_TIMEOUT_PATH)
WAITFORIT_BUSYTIMEFLAG=""
if [[ $WAITFORIT_TIMEOUT_PATH =~ "busybox" ]]; then
WAITFORIT_ISBUSY=1
# Check if busybox timeout uses -t flag
# (recent Alpine versions don't support -t anymore)
if timeout &>/dev/stdout | grep -q -e '-t '; then
WAITFORIT_BUSYTIMEFLAG="-t"
fi
else
WAITFORIT_ISBUSY=0
fi
if [[ $WAITFORIT_CHILD -gt 0 ]]; then
wait_for
WAITFORIT_RESULT=$?
exit $WAITFORIT_RESULT
else
if [[ $WAITFORIT_TIMEOUT -gt 0 ]]; then
wait_for_wrapper
WAITFORIT_RESULT=$?
else
wait_for
WAITFORIT_RESULT=$?
fi
fi
if [[ $WAITFORIT_CLI != "" ]]; then
if [[ $WAITFORIT_RESULT -ne 0 && $WAITFORIT_STRICT -eq 1 ]]; then
echoerr "$WAITFORIT_cmdname: strict mode, refusing to execute subprocess"
exit $WAITFORIT_RESULT
fi
exec "${WAITFORIT_CLI[@]}"
else
exit $WAITFORIT_RESULT
fi

View File

@ -0,0 +1,13 @@
import os
import pytest
import subprocess
def test_ingest(mysql, pytestconfig):
config_file=os.path.join(str(pytestconfig.rootdir), "tests/integration/mysql", "mysql_to_file.yml")
# delete the output directory. TODO: move to a better way to create an output test fixture
os.system("rm -rf output")
ingest_command=f'gometa-ingest -c {config_file}'
ret = os.system(ingest_command)
assert ret == 0

View File

@ -9,7 +9,7 @@ def test_ingest(sql_server, pytestconfig):
command = f"{docker} exec testsqlserver /opt/mssql-tools/bin/sqlcmd -S localhost -U sa -P 'test!Password' -d master -i /setup/setup.sql"
ret = subprocess.run(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
assert ret.returncode == 0
config_file=os.path.join(str(pytestconfig.rootdir), "tests/integration/sql_server", "mssql_to_console.yml")
config_file=os.path.join(str(pytestconfig.rootdir), "tests/integration/sql_server", "mssql_to_file.yml")
# delete the output directory. TODO: move to a better way to create an output test fixture
os.system("rm -rf output")
ingest_command=f'gometa-ingest -c {config_file}'