mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-12-04 03:24:55 +00:00
Remove old tests (#11505)
Co-authored-by: Shailesh Parmar <shailesh.parmar.webdev@gmail.com>
This commit is contained in:
parent
93e97674c5
commit
f22d604c54
@ -1,42 +0,0 @@
|
||||
import json
|
||||
from unittest import TestCase
|
||||
|
||||
from metadata.ingestion.api.workflow import Workflow
|
||||
|
||||
config = """
|
||||
{
|
||||
"source": {
|
||||
"type": "glue",
|
||||
"config": {
|
||||
"region_name": "us-west-2",
|
||||
"service_name": "glue_db",
|
||||
"pipeline_service_name": "glue_pipeline",
|
||||
"storage_service_name": "s3"
|
||||
}
|
||||
},
|
||||
"sink": {
|
||||
"type": "metadata-rest",
|
||||
"config": {}
|
||||
},
|
||||
"metadata_server": {
|
||||
"type": "metadata-server",
|
||||
"config": {
|
||||
"api_endpoint": "http://localhost:8585/api",
|
||||
"auth_provider_type": "no-auth"
|
||||
}
|
||||
}
|
||||
}
|
||||
"""
|
||||
|
||||
|
||||
class WorkflowTest(TestCase):
|
||||
def test_execute_200(self):
|
||||
"""
|
||||
stage/file.py must be compatible with source/sample_data.py,
|
||||
this test try to catch if one becomes incompatible with the other
|
||||
by running a workflow that includes both of them.
|
||||
"""
|
||||
workflow = Workflow.create(json.loads(config))
|
||||
workflow.execute()
|
||||
workflow.stop()
|
||||
self.assertTrue(True)
|
||||
@ -1,166 +0,0 @@
|
||||
# Copyright 2021 Collate
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import socket
|
||||
import time
|
||||
from typing import List
|
||||
from urllib.parse import urlparse
|
||||
|
||||
import pytest
|
||||
import requests
|
||||
from sqlalchemy.engine import create_engine
|
||||
from sqlalchemy.inspection import inspect
|
||||
|
||||
from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest
|
||||
from metadata.generated.schema.api.data.createTable import CreateTableRequest
|
||||
from metadata.generated.schema.api.services.createDatabaseService import (
|
||||
CreateDatabaseServiceRequest,
|
||||
)
|
||||
from metadata.generated.schema.entity.data.database import Database
|
||||
from metadata.generated.schema.entity.data.table import Column, Table
|
||||
from metadata.generated.schema.entity.services.databaseService import DatabaseService
|
||||
from metadata.ingestion.ometa.ometa_api import OpenMetadata
|
||||
from metadata.utils.logger import log_ansi_encoded_string
|
||||
|
||||
|
||||
def is_responsive(url):
|
||||
try:
|
||||
response = requests.get(url)
|
||||
if response.status_code == 200:
|
||||
return True
|
||||
except ConnectionError:
|
||||
return False
|
||||
|
||||
|
||||
def is_port_open(url):
|
||||
url_parts = urlparse(url)
|
||||
hostname = url_parts.hostname
|
||||
port = url_parts.port
|
||||
try:
|
||||
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
s.connect((hostname, port))
|
||||
return True
|
||||
except socket.error:
|
||||
return False
|
||||
finally:
|
||||
s.close()
|
||||
|
||||
|
||||
def sleep(timeout_s):
|
||||
log_ansi_encoded_string(message=f"sleeping for {timeout_s} seconds")
|
||||
n = len(str(timeout_s))
|
||||
for i in range(timeout_s, 0, -1):
|
||||
log_ansi_encoded_string(message=f"{i:>{n}}", end="\r", flush=True)
|
||||
time.sleep(1)
|
||||
log_ansi_encoded_string(message=f"{'':>{n}}", end="\n", flush=True)
|
||||
|
||||
|
||||
def status(r):
|
||||
if r.status_code == 200 or r.status_code == 201:
|
||||
return 1
|
||||
else:
|
||||
return 0
|
||||
|
||||
|
||||
def create_delete_table(client: OpenMetadata, databases: List[Database]):
|
||||
columns = [
|
||||
Column(name="id", dataType="INT", dataLength=1),
|
||||
Column(name="name", dataType="VARCHAR", dataLength=1),
|
||||
]
|
||||
table = CreateTableRequest(
|
||||
name="test1", columns=columns, database=databases[0].fullyQualifiedName
|
||||
)
|
||||
created_table = client.create_or_update(table)
|
||||
if table.name.__root__ == created_table.name.__root__:
|
||||
client.delete(entity=Table, entity_id=str(created_table.id.__root__))
|
||||
return 1
|
||||
else:
|
||||
client.delete(entity=Table, entity_id=str(created_table.id.__root__))
|
||||
return 0
|
||||
|
||||
|
||||
def create_delete_database(client: OpenMetadata, databases: List[Database]):
|
||||
data = {
|
||||
"databaseConnection": {"hostPort": "localhost"},
|
||||
"name": "temp_local_hive",
|
||||
"serviceType": "Hive",
|
||||
"description": "local hive env",
|
||||
}
|
||||
create_hive_service = CreateDatabaseServiceRequest(**data)
|
||||
hive_service = client.create_or_update(create_hive_service)
|
||||
create_database_request = CreateDatabaseRequest(
|
||||
name="dwh",
|
||||
service=hive_service.fullyQualifiedName,
|
||||
)
|
||||
created_database = client.create_or_update(create_database_request)
|
||||
resp = create_delete_table(client, databases)
|
||||
log_ansi_encoded_string(message=resp)
|
||||
client.delete(entity=Database, entity_id=str(created_database.id.__root__))
|
||||
client.delete(entity=DatabaseService, entity_id=str(hive_service.id.__root__))
|
||||
return resp
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def hive_service(docker_ip, docker_services):
|
||||
"""Ensure that Docker service is up and responsive."""
|
||||
port = docker_services.port_for("hive-server", 10000)
|
||||
log_ansi_encoded_string(message=f"HIVE is running on port {port}")
|
||||
timeout_s = 120
|
||||
sleep(timeout_s)
|
||||
url = "hive://localhost:10000/"
|
||||
docker_services.wait_until_responsive(
|
||||
timeout=timeout_s, pause=0.1, check=lambda: is_port_open(url)
|
||||
)
|
||||
engine = create_engine(url)
|
||||
inspector = inspect(engine)
|
||||
return inspector
|
||||
|
||||
|
||||
def test_check_schema(hive_service):
|
||||
inspector = hive_service
|
||||
schemas = []
|
||||
for schema in inspector.get_schema_names():
|
||||
schemas.append(schema)
|
||||
if "default" in schemas:
|
||||
assert 1
|
||||
else:
|
||||
assert 0
|
||||
|
||||
|
||||
def test_read_tables(hive_service):
|
||||
inspector = hive_service
|
||||
check_tables = [
|
||||
"metadata_array_struct_test",
|
||||
"metadata_struct_test",
|
||||
"metadata_test_table",
|
||||
"test_check",
|
||||
]
|
||||
tables = []
|
||||
for schema in inspector.get_schema_names():
|
||||
for table in inspector.get_table_names(schema):
|
||||
tables.append(table)
|
||||
if set(tables) == set(check_tables):
|
||||
assert 1
|
||||
else:
|
||||
assert 0
|
||||
|
||||
|
||||
def test_check_table():
|
||||
is_responsive("http://localhost:8586/healthcheck")
|
||||
metadata_config = MetadataServerConfig.parse_obj(
|
||||
{"api_endpoint": "http://localhost:8585/api", "auth_provider_type": "no-auth"}
|
||||
)
|
||||
client = OpenMetadata(metadata_config)
|
||||
databases = client.list_entities(entity=Database).entities
|
||||
if len(databases) > 0:
|
||||
assert create_delete_table(client, databases)
|
||||
else:
|
||||
assert create_delete_database(client, databases)
|
||||
@ -1,65 +0,0 @@
|
||||
# Copyright 2021 Collate
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
version: "3"
|
||||
|
||||
services:
|
||||
namenode:
|
||||
image: bde2020/hadoop-namenode:2.0.0-hadoop2.7.4-java8
|
||||
volumes:
|
||||
- namenode:/hadoop/dfs/name
|
||||
environment:
|
||||
- CLUSTER_NAME=test
|
||||
env_file:
|
||||
- hadoop-hive.env
|
||||
ports:
|
||||
- "50070:50070"
|
||||
datanode:
|
||||
image: bde2020/hadoop-datanode:2.0.0-hadoop2.7.4-java8
|
||||
volumes:
|
||||
- datanode:/hadoop/dfs/data
|
||||
env_file:
|
||||
- hadoop-hive.env
|
||||
environment:
|
||||
SERVICE_PRECONDITION: "namenode:50070"
|
||||
ports:
|
||||
- "50075:50075"
|
||||
hive-server:
|
||||
image: bde2020/hive:2.3.2-postgresql-metastore
|
||||
env_file:
|
||||
- hadoop-hive.env
|
||||
command: "sh /setup/setup.sh"
|
||||
volumes:
|
||||
- ./setup:/setup
|
||||
environment:
|
||||
HIVE_CORE_CONF_javax_jdo_option_ConnectionURL: "jdbc:postgresql://hive-metastore/metastore"
|
||||
SERVICE_PRECONDITION: "hive-metastore:9083"
|
||||
ports:
|
||||
- "10000:10000"
|
||||
hive-metastore:
|
||||
image: bde2020/hive:2.3.2-postgresql-metastore
|
||||
env_file:
|
||||
- hadoop-hive.env
|
||||
command: /opt/hive/bin/hive --service metastore
|
||||
environment:
|
||||
SERVICE_PRECONDITION: "namenode:50070 datanode:50075 hive-metastore-postgresql:5432"
|
||||
ports:
|
||||
- "9083:9083"
|
||||
hive-metastore-postgresql:
|
||||
image: bde2020/hive-metastore-postgresql:2.3.0
|
||||
# presto-coordinator:
|
||||
# image: shawnzhu/prestodb:0.181
|
||||
# ports:
|
||||
# - "8080:8080"
|
||||
|
||||
volumes:
|
||||
namenode:
|
||||
datanode:
|
||||
@ -1,30 +0,0 @@
|
||||
HIVE_SITE_CONF_javax_jdo_option_ConnectionURL=jdbc:postgresql://hive-metastore-postgresql/metastore
|
||||
HIVE_SITE_CONF_javax_jdo_option_ConnectionDriverName=org.postgresql.Driver
|
||||
HIVE_SITE_CONF_javax_jdo_option_ConnectionUserName=hive
|
||||
HIVE_SITE_CONF_javax_jdo_option_ConnectionPassword=hive
|
||||
HIVE_SITE_CONF_datanucleus_autoCreateSchema=false
|
||||
HIVE_SITE_CONF_hive_metastore_uris=thrift://hive-metastore:9083
|
||||
HDFS_CONF_dfs_namenode_datanode_registration_ip___hostname___check=false
|
||||
|
||||
CORE_CONF_fs_defaultFS=hdfs://namenode:8020
|
||||
CORE_CONF_hadoop_http_staticuser_user=root
|
||||
CORE_CONF_hadoop_proxyuser_hue_hosts=*
|
||||
CORE_CONF_hadoop_proxyuser_hue_groups=*
|
||||
|
||||
HDFS_CONF_dfs_webhdfs_enabled=true
|
||||
HDFS_CONF_dfs_permissions_enabled=false
|
||||
|
||||
YARN_CONF_yarn_log___aggregation___enable=true
|
||||
YARN_CONF_yarn_resourcemanager_recovery_enabled=true
|
||||
YARN_CONF_yarn_resourcemanager_store_class=org.apache.hadoop.yarn.server.resourcemanager.recovery.FileSystemRMStateStore
|
||||
YARN_CONF_yarn_resourcemanager_fs_state___store_uri=/rmstate
|
||||
YARN_CONF_yarn_nodemanager_remote___app___log___dir=/app-logs
|
||||
YARN_CONF_yarn_log_server_url=http://historyserver:8188/applicationhistory/logs/
|
||||
YARN_CONF_yarn_timeline___service_enabled=true
|
||||
YARN_CONF_yarn_timeline___service_generic___application___history_enabled=true
|
||||
YARN_CONF_yarn_resourcemanager_system___metrics___publisher_enabled=true
|
||||
YARN_CONF_yarn_resourcemanager_hostname=resourcemanager
|
||||
YARN_CONF_yarn_timeline___service_hostname=historyserver
|
||||
YARN_CONF_yarn_resourcemanager_address=resourcemanager:8032
|
||||
YARN_CONF_yarn_resourcemanager_scheduler_address=resourcemanager:8030
|
||||
YARN_CONF_yarn_resourcemanager_resource__tracker_address=resourcemanager:8031
|
||||
@ -1,30 +0,0 @@
|
||||
CREATE TABLE test_check (foo INT, bar STRING);
|
||||
LOAD DATA LOCAL INPATH '/opt/hive/examples/files/kv1.txt' OVERWRITE INTO TABLE test_check;
|
||||
CREATE TABLE `metadata_test_table` (foo INT, bar STRING);
|
||||
CREATE TABLE metadata_struct_test
|
||||
(
|
||||
test_id INT,
|
||||
service STRUCT<
|
||||
type: STRING
|
||||
,provider: ARRAY<INT>
|
||||
>
|
||||
);
|
||||
|
||||
CREATE TABLE metadata_array_struct_test
|
||||
(
|
||||
test_id INT,
|
||||
service array<STRUCT<
|
||||
type: STRING
|
||||
,provider: ARRAY<INT>
|
||||
>>
|
||||
);
|
||||
|
||||
WITH
|
||||
test_data as (
|
||||
SELECT 100 test_id, array(NAMED_STRUCT('type','Logistics','provider', ARRAY(550, 870)),
|
||||
NAMED_STRUCT('type','Inventory','provider', ARRAY(900`))
|
||||
) AS service
|
||||
)
|
||||
INSERT INTO TABLE metadata_array_struct_test
|
||||
select * from test_data;
|
||||
CREATE TABLE union_test(foo UNIONTYPE<int, double, array<string>, struct<a:int,b:string>>);
|
||||
@ -1,15 +0,0 @@
|
||||
#!/bin/sh
|
||||
# Copyright 2021 Collate
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
sleep 120 && \
|
||||
/opt/hive/bin/beeline -u jdbc:hive2://localhost:10000 -f /setup/hive_setup.sql &
|
||||
/bin/bash /usr/local/bin/entrypoint.sh /bin/sh -c startup.sh
|
||||
@ -1,138 +0,0 @@
|
||||
# Copyright 2021 Collate
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
import time
|
||||
from datetime import datetime
|
||||
|
||||
import pytest
|
||||
import requests
|
||||
from ldap3 import ALL, Connection, Server
|
||||
|
||||
from metadata.generated.schema.api.teams.createUser import CreateUserRequest
|
||||
from metadata.utils.logger import log_ansi_encoded_string
|
||||
|
||||
headers = {"Content-type": "application/json"}
|
||||
url = "http://localhost:8585/api/v1/users"
|
||||
|
||||
|
||||
def sleep(timeout_s):
|
||||
log_ansi_encoded_string(message=f"sleeping for {timeout_s} seconds")
|
||||
n = len(str(timeout_s))
|
||||
for i in range(timeout_s, 0, -1):
|
||||
log_ansi_encoded_string(message=f"{i:>{n}}", end="\r", flush=True)
|
||||
time.sleep(1)
|
||||
log_ansi_encoded_string(message=f"{'':>{n}}", end="\n", flush=True)
|
||||
|
||||
|
||||
def read_user_by_name(name: str):
|
||||
r = requests.get(url + "/name/" + name)
|
||||
r.raise_for_status()
|
||||
bool = status(r)
|
||||
return [bool, r.json()]
|
||||
|
||||
|
||||
def status(r):
|
||||
if r.status_code == 200 or r.status_code == 201:
|
||||
return 1
|
||||
else:
|
||||
return 0
|
||||
|
||||
|
||||
def ldap_connection():
|
||||
s = Server("ldap://localhost:389", get_info=ALL)
|
||||
c = Connection(s, user="cn=admin,dc=example,dc=com", password="ldappassword")
|
||||
c.open()
|
||||
if not c.bind():
|
||||
log_ansi_encoded_string(message="LDAP Connection Unsuccessful")
|
||||
return False
|
||||
return [True, c]
|
||||
|
||||
|
||||
def is_ldap_listening(openldap_service):
|
||||
c = openldap_service
|
||||
if "listening" in str(c):
|
||||
return True
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def openldap_service(docker_ip, docker_services):
|
||||
"""Ensure that Docker service is up and responsive."""
|
||||
port = docker_services.port_for("openldap", 389)
|
||||
log_ansi_encoded_string(message=f"LDAP is running on port {port}")
|
||||
timeout_s = 10
|
||||
sleep(timeout_s)
|
||||
conn = ldap_connection()[1]
|
||||
docker_services.wait_until_responsive(
|
||||
timeout=timeout_s, pause=0.1, check=lambda: is_ldap_listening(conn)
|
||||
)
|
||||
return conn
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def ldap_user_entry(openldap_service):
|
||||
c = openldap_service
|
||||
c.search(
|
||||
"cn=John Doe,ou=users,dc=example,dc=com",
|
||||
"(objectclass=person)",
|
||||
attributes=["*"],
|
||||
)
|
||||
if c.entries:
|
||||
return c.entries[0]
|
||||
else:
|
||||
logging.error("OpenLDAP not running")
|
||||
assert 0
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def datetime_suffix():
|
||||
# Openmetadata doesn't delete users; it deactivates them.
|
||||
# Without changing the user's name with every run of this test,
|
||||
# the test will fail due to duplicate user in metadata.
|
||||
# Use a datetime suffix to have a new user name with every run.
|
||||
now = datetime.now()
|
||||
return now.strftime("_%Y%m%d%H%M%S")
|
||||
|
||||
|
||||
def test_insert_user(ldap_user_entry, datetime_suffix):
|
||||
metadata_user = CreateUserRequest(
|
||||
name=str(ldap_user_entry["uid"]) + datetime_suffix,
|
||||
displayName=str(ldap_user_entry["cn"]),
|
||||
email=str(ldap_user_entry["mail"]),
|
||||
)
|
||||
r = requests.post(url, data=metadata_user.json(), headers=headers)
|
||||
r.raise_for_status()
|
||||
if r.status_code == 200 or r.status_code == 201:
|
||||
assert 1
|
||||
else:
|
||||
assert 0
|
||||
|
||||
|
||||
def test_read_user(ldap_user_entry, datetime_suffix):
|
||||
assert read_user_by_name(str(ldap_user_entry["uid"]) + datetime_suffix)[0]
|
||||
|
||||
|
||||
def test_update_user(ldap_user_entry, datetime_suffix):
|
||||
user = read_user_by_name(str(ldap_user_entry["uid"]) + datetime_suffix)
|
||||
user[1]["displayName"] = "Jane Doe"
|
||||
metadata_user = CreateUserRequest(
|
||||
name=user[1]["name"],
|
||||
displayName=user[1]["displayName"],
|
||||
email=user[1]["email"],
|
||||
)
|
||||
r = requests.patch(url, data=metadata_user.json(), headers=headers)
|
||||
|
||||
|
||||
def test_delete_user(ldap_user_entry, datetime_suffix):
|
||||
r = read_user_by_name(str(ldap_user_entry["uid"]) + datetime_suffix)
|
||||
r = requests.delete(url + "/{}".format(r[1]["id"]))
|
||||
r.raise_for_status()
|
||||
assert 1
|
||||
@ -1,42 +0,0 @@
|
||||
dn: ou=users,dc=example,dc=com
|
||||
objectClass: top
|
||||
objectClass: person
|
||||
objectClass: organizationalPerson
|
||||
objectClass: inetOrgPerson
|
||||
sn: surname
|
||||
cn: firstname
|
||||
ou: users
|
||||
mail: email
|
||||
uid: github_username
|
||||
|
||||
dn: cn=group,dc=example,dc=com
|
||||
objectClass: posixGroup
|
||||
objectClass: top
|
||||
cn: groupname
|
||||
gidNumber: 500
|
||||
|
||||
dn: cn=John Doe,ou=users,dc=example,dc=com
|
||||
givenName: John
|
||||
sn: Doe
|
||||
cn: John Doe
|
||||
uid: dummy_user
|
||||
mail: example@gmail.com
|
||||
uidNumber: 1000
|
||||
gidNumber: 500
|
||||
homeDirectory: /home/users/dummy_user
|
||||
objectClass: inetOrgPerson
|
||||
objectClass: posixAccount
|
||||
objectClass: top
|
||||
|
||||
dn: cn=Joe Doe,ou=users,dc=example,dc=com
|
||||
givenName: Joe
|
||||
sn: Doe
|
||||
cn: Joe Doe
|
||||
uid: dummy2_user
|
||||
mail: example2@gmail.com
|
||||
uidNumber: 1001
|
||||
gidNumber: 500
|
||||
homeDirectory: /home/users/dummy2_user
|
||||
objectClass: inetOrgPerson
|
||||
objectClass: posixAccount
|
||||
objectClass: top
|
||||
@ -1,26 +0,0 @@
|
||||
# Copyright 2021 Collate
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
version: "3.3"
|
||||
services:
|
||||
openldap:
|
||||
image: osixia/openldap:1.5.0
|
||||
command: --copy-service
|
||||
environment:
|
||||
LDAP_ORGANISATION: "example"
|
||||
LDAP_DOMAIN: "example.com"
|
||||
LDAP_ADMIN_PASSWORD: "ldappassword"
|
||||
ports:
|
||||
- 389:389
|
||||
- 636:636
|
||||
volumes:
|
||||
- ./bootstrap.ldif:/container/service/slapd/assets/config/bootstrap/ldif/bootstrap.ldif
|
||||
|
||||
@ -1,7 +0,0 @@
|
||||
FROM python:3.8.13-slim-buster
|
||||
|
||||
RUN python -m pip install --upgrade pip
|
||||
|
||||
RUN pip install cryptography mlflow boto3 pymysql
|
||||
|
||||
EXPOSE 5000
|
||||
@ -1,4 +0,0 @@
|
||||
build:
|
||||
docker-compose -f docker-compose.yml up --force-recreate --build -d
|
||||
mc config host add minio http://localhost:9001 minio password --api S3v4
|
||||
mc mb -p minio/mlops.local.com
|
||||
@ -1,17 +0,0 @@
|
||||
# MlFlow Integration Test
|
||||
|
||||
We have prepared a small test to check the MlFlow ingestion.
|
||||
|
||||
We have used a decoupled architecture for MlFlow with:
|
||||
- `mlflow` running in a remote server
|
||||
- `minio` as the artifact store
|
||||
- `mysql` as the registry
|
||||
|
||||
To run this test:
|
||||
|
||||
- `cd` into this directory
|
||||
- `make build`
|
||||
- `pip install mlflow-skinny sklearn`. We use the skinny one for the client.
|
||||
- `python experiment.py` should show new experiments in http://localhost:5000
|
||||
- `python train.py` will register a new model
|
||||
- `metadata ingest -c examples/workflows/mlflow.yaml` will run the workflow.
|
||||
@ -1,48 +0,0 @@
|
||||
version: '3'
|
||||
|
||||
services:
|
||||
mlflow-db:
|
||||
container_name: mlflow-db
|
||||
platform: linux/x86_64
|
||||
image: mysql
|
||||
restart: always
|
||||
command: --port=3307
|
||||
ports:
|
||||
- "3307:3307"
|
||||
environment:
|
||||
- MYSQL_DATABASE=experiments
|
||||
- MYSQL_USER=mlflow
|
||||
- MYSQL_PASSWORD=password
|
||||
- MYSQL_ROOT_PASSWORD=root
|
||||
- MYSQL_ROOT_HOST=0.0.0.0
|
||||
volumes:
|
||||
# - ./db:/var/lib/mysql
|
||||
- ./mysqldata:/var/lib/mysql:rw,delegated
|
||||
cap_add:
|
||||
- SYS_NICE # to remove "mbind: Operation not permitted" warning (https://stackoverflow.com/questions/55559386/how-to-fix-mbind-operation-not-permitted-in-mysql-error-log)
|
||||
|
||||
mlflow-artifact:
|
||||
image: minio/minio
|
||||
environment:
|
||||
- MINIO_ACCESS_KEY=minio
|
||||
- MINIO_SECRET_KEY=password
|
||||
ports:
|
||||
- "9000:9000"
|
||||
- "9001:9001"
|
||||
command: [ "minio", "server", "--address", ":9001", "--console-address", ":9000", "/data" ]
|
||||
|
||||
mlflow:
|
||||
container_name: mlflow
|
||||
build: .
|
||||
image: mlflow
|
||||
restart: always
|
||||
depends_on:
|
||||
- mlflow-db
|
||||
- mlflow-artifact
|
||||
ports:
|
||||
- "5000:5000"
|
||||
environment:
|
||||
- MLFLOW_S3_ENDPOINT_URL=http://localhost:9000
|
||||
- AWS_ACCESS_KEY_ID="minio"
|
||||
- AWS_SECRET_ACCESS_KEY="password"
|
||||
command: mlflow server --backend-store-uri mysql+pymysql://mlflow:password@mlflow-db:3307/experiments --default-artifact-root s3://mlops.local.com --host 0.0.0.0
|
||||
@ -1,22 +0,0 @@
|
||||
"""
|
||||
This is a simple script for testing the environment.
|
||||
|
||||
You can run it as `python experiment.py`, and a new
|
||||
experiment should appear in the remote server.
|
||||
"""
|
||||
|
||||
import mlflow
|
||||
|
||||
# Create experiment to the remote server
|
||||
mlflow_uri = "http://localhost:5000"
|
||||
mlflow.set_tracking_uri(mlflow_uri)
|
||||
|
||||
mlflow.set_experiment("mlflow_test1")
|
||||
|
||||
mlflow.log_param("param1", "value1")
|
||||
mlflow.log_param("param2", "value2")
|
||||
|
||||
for i in range(10):
|
||||
mlflow.log_metric("metric1", value=1 / (i + 1), step=i)
|
||||
|
||||
mlflow.end_run()
|
||||
@ -1,105 +0,0 @@
|
||||
"""
|
||||
Example extracted from https://www.mlflow.org/docs/latest/tutorials-and-examples/tutorial.html
|
||||
|
||||
To run this you need to have installed `sklearn` in your environment.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
import warnings
|
||||
from urllib.parse import urlparse
|
||||
|
||||
import mlflow.sklearn
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
from mlflow.models import infer_signature
|
||||
from sklearn.linear_model import ElasticNet
|
||||
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
|
||||
from sklearn.model_selection import train_test_split
|
||||
|
||||
from metadata.utils.logger import log_ansi_encoded_string
|
||||
|
||||
logging.basicConfig(level=logging.WARN)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def eval_metrics(actual, pred):
|
||||
rmse = np.sqrt(mean_squared_error(actual, pred))
|
||||
mae = mean_absolute_error(actual, pred)
|
||||
r2 = r2_score(actual, pred)
|
||||
return rmse, mae, r2
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
mlflow_uri = "http://localhost:5000"
|
||||
mlflow.set_tracking_uri(mlflow_uri)
|
||||
|
||||
os.environ["AWS_ACCESS_KEY_ID"] = "minio"
|
||||
os.environ["AWS_SECRET_ACCESS_KEY"] = "password"
|
||||
os.environ["MLFLOW_S3_ENDPOINT_URL"] = "http://localhost:9001"
|
||||
|
||||
warnings.filterwarnings("ignore")
|
||||
np.random.seed(40)
|
||||
|
||||
# Read the wine-quality csv file from the URL
|
||||
csv_url = "http://archive.ics.uci.edu/ml/machine-learning-databases/wine-quality/winequality-red.csv"
|
||||
try:
|
||||
data = pd.read_csv(csv_url, sep=";")
|
||||
except Exception as e:
|
||||
logger.exception(
|
||||
"Unable to download training & test CSV, check your internet connection. Error: %s",
|
||||
e,
|
||||
)
|
||||
|
||||
# Split the data into training and test sets. (0.75, 0.25) split.
|
||||
train, test = train_test_split(data)
|
||||
|
||||
# The predicted column is "quality" which is a scalar from [3, 9]
|
||||
train_x = train.drop(["quality"], axis=1)
|
||||
test_x = test.drop(["quality"], axis=1)
|
||||
train_y = train[["quality"]]
|
||||
test_y = test[["quality"]]
|
||||
|
||||
alpha = float(sys.argv[1]) if len(sys.argv) > 1 else 0.5
|
||||
l1_ratio = float(sys.argv[2]) if len(sys.argv) > 2 else 0.5
|
||||
|
||||
with mlflow.start_run():
|
||||
lr = ElasticNet(alpha=alpha, l1_ratio=l1_ratio, random_state=42)
|
||||
lr.fit(train_x, train_y)
|
||||
|
||||
signature = infer_signature(train_x, lr.predict(train_x))
|
||||
|
||||
predicted_qualities = lr.predict(test_x)
|
||||
|
||||
(rmse, mae, r2) = eval_metrics(test_y, predicted_qualities)
|
||||
|
||||
log_ansi_encoded_string(
|
||||
message="Elasticnet model (alpha=%f, l1_ratio=%f):" % (alpha, l1_ratio)
|
||||
)
|
||||
log_ansi_encoded_string(message=" RMSE: %s" % rmse)
|
||||
log_ansi_encoded_string(message=" MAE: %s" % mae)
|
||||
log_ansi_encoded_string(message=" R2: %s" % r2)
|
||||
|
||||
mlflow.log_param("alpha", alpha)
|
||||
mlflow.log_param("l1_ratio", l1_ratio)
|
||||
mlflow.log_metric("rmse", rmse)
|
||||
mlflow.log_metric("r2", r2)
|
||||
mlflow.log_metric("mae", mae)
|
||||
|
||||
tracking_url_type_store = urlparse(mlflow.get_tracking_uri()).scheme
|
||||
|
||||
# Model registry does not work with file store
|
||||
if tracking_url_type_store != "file":
|
||||
# Register the model
|
||||
# There are other ways to use the Model Registry, which depends on the use case,
|
||||
# please refer to the doc for more information:
|
||||
# https://mlflow.org/docs/latest/model-registry.html#api-workflow
|
||||
mlflow.sklearn.log_model(
|
||||
lr,
|
||||
"model",
|
||||
registered_model_name="ElasticnetWineModel",
|
||||
signature=signature,
|
||||
)
|
||||
else:
|
||||
mlflow.sklearn.log_model(lr, "model")
|
||||
@ -1,123 +0,0 @@
|
||||
# Copyright 2021 Collate
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import time
|
||||
|
||||
import pytest
|
||||
import requests
|
||||
from sqlalchemy.engine import create_engine
|
||||
from sqlalchemy.inspection import inspect
|
||||
|
||||
from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest
|
||||
from metadata.generated.schema.api.data.createTable import CreateTableRequest
|
||||
from metadata.generated.schema.api.services.createDatabaseService import (
|
||||
CreateDatabaseServiceRequest,
|
||||
)
|
||||
from metadata.generated.schema.entity.data.table import Column
|
||||
from metadata.ingestion.ometa.client import REST
|
||||
from metadata.utils.logger import log_ansi_encoded_string
|
||||
|
||||
headers = {"Content-type": "application/json"}
|
||||
url = "http://localhost:8585/api/v1/"
|
||||
|
||||
|
||||
def is_responsive(url):
|
||||
try:
|
||||
response = requests.get(url)
|
||||
if response.status_code == 200:
|
||||
return True
|
||||
except ConnectionError:
|
||||
return False
|
||||
|
||||
|
||||
def status(r):
|
||||
if r.status_code == 200 or r.status_code == 201:
|
||||
return 1
|
||||
else:
|
||||
return 0
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def mssql_service(docker_ip, docker_services):
|
||||
"""Ensure that Docker service is up and responsive."""
|
||||
port = docker_services.port_for("sqlserver", 1433)
|
||||
log_ansi_encoded_string(message="Mssql is running on port {}".format(port))
|
||||
url = "http://localhost:8585"
|
||||
time.sleep(180)
|
||||
docker_services.wait_until_responsive(
|
||||
timeout=120.0, pause=0.5, check=lambda: is_responsive(url)
|
||||
)
|
||||
return url
|
||||
|
||||
|
||||
def create_delete_table(client):
|
||||
databases = client.list_databases()
|
||||
columns = [
|
||||
Column(name="id", columnDataType="INT"),
|
||||
Column(name="name", columnDataType="VARCHAR"),
|
||||
]
|
||||
table = CreateTableRequest(
|
||||
name="test1", columns=columns, database=databases[0].fullyQualifiedName
|
||||
)
|
||||
created_table = client.create_or_update_table(table)
|
||||
if table.name.__root__ == created_table.name.__root__:
|
||||
requests.delete(
|
||||
"http://localhost:8585/api/v1/tables/{}".format(created_table.id.__root__)
|
||||
)
|
||||
return 1
|
||||
else:
|
||||
requests.delete(
|
||||
"http://localhost:8585/api/v1/tables/{}".format(created_table.id.__root__)
|
||||
)
|
||||
return 0
|
||||
|
||||
|
||||
def create_delete_database(client):
|
||||
data = {
|
||||
"databaseConnection": {"hostPort": "localhost"},
|
||||
"name": "temp_local_mssql",
|
||||
"serviceType": "Mssql",
|
||||
"description": "local mssql env",
|
||||
}
|
||||
create_mssql_service = CreateDatabaseServiceRequest(**data)
|
||||
mssql_service = client.create_database_service(create_mssql_service)
|
||||
create_database_request = CreateDatabaseRequest(
|
||||
name="dwh",
|
||||
service=mssql_service.fullyQualifiedName,
|
||||
)
|
||||
created_database = client.create_database(create_database_request)
|
||||
resp = create_delete_table(client)
|
||||
log_ansi_encoded_string(message=resp)
|
||||
client.delete_database(created_database.id.__root__)
|
||||
client.delete_database_service(mssql_service.id.__root__)
|
||||
return resp
|
||||
|
||||
|
||||
def test_check_tables(mssql_service):
|
||||
client = REST("{}/api".format(mssql_service), "test", "test")
|
||||
databases = client.list_databases()
|
||||
if len(databases) > 0:
|
||||
assert create_delete_table(client)
|
||||
else:
|
||||
assert create_delete_database(client)
|
||||
|
||||
|
||||
def test_read_schema(mssql_service):
|
||||
url = "mssql+pytds://sa:test!Password@localhost:51433/catalog_test"
|
||||
engine = create_engine(url)
|
||||
inspector = inspect(engine)
|
||||
schemas = []
|
||||
for schema in inspector.get_schema_names():
|
||||
schemas.append(schema)
|
||||
if "catalog_test_check" in schemas:
|
||||
assert 1
|
||||
else:
|
||||
assert 0
|
||||
@ -1,23 +0,0 @@
|
||||
# Copyright 2021 Collate
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
version: "3.9"
|
||||
services:
|
||||
sqlserver:
|
||||
image: "mcr.microsoft.com/mssql/server:latest"
|
||||
environment:
|
||||
ACCEPT_EULA: "Y"
|
||||
SA_PASSWORD: "test!Password"
|
||||
command: "./setup/setup.sh"
|
||||
volumes:
|
||||
- ./setup:/setup
|
||||
ports:
|
||||
- 1433:1433
|
||||
@ -1,15 +0,0 @@
|
||||
#!/bin/sh
|
||||
# Copyright 2021 Collate
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
sleep 120 && \
|
||||
/opt/mssql-tools/bin/sqlcmd -S localhost -U sa -P 'test!Password' -d master -i /setup/setup.sql \
|
||||
& /opt/mssql/bin/sqlservr
|
||||
@ -1,10 +0,0 @@
|
||||
CREATE DATABASE catalog_test;
|
||||
GO
|
||||
USE catalog_test;
|
||||
GO
|
||||
CREATE TABLE SampleData (ID int, DataName nvarchar(max));
|
||||
GO
|
||||
CREATE SCHEMA catalog_test_check;
|
||||
GO
|
||||
CREATE TABLE catalog_test_check.SampleItems (ID int, SampleItemName nvarchar(max));
|
||||
GO
|
||||
@ -1,118 +0,0 @@
|
||||
# Copyright 2021 Collate
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import time
|
||||
|
||||
import pytest
|
||||
import requests
|
||||
from requests.exceptions import ConnectionError
|
||||
from sqlalchemy.engine import create_engine
|
||||
from sqlalchemy.inspection import inspect
|
||||
|
||||
from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest
|
||||
from metadata.generated.schema.api.data.createTable import CreateTableRequest
|
||||
from metadata.generated.schema.api.services.createDatabaseService import (
|
||||
CreateDatabaseServiceRequest,
|
||||
)
|
||||
from metadata.generated.schema.entity.data.database import Database
|
||||
from metadata.generated.schema.entity.data.table import Column
|
||||
from metadata.generated.schema.entity.services.databaseService import DatabaseService
|
||||
from metadata.generated.schema.type.entityReference import EntityReference
|
||||
from metadata.ingestion.ometa.ometa_api import OpenMetadata
|
||||
from metadata.utils.logger import log_ansi_encoded_string
|
||||
|
||||
|
||||
def is_responsive(url):
|
||||
try:
|
||||
response = requests.get(url)
|
||||
if response.status_code == 200:
|
||||
return True
|
||||
except ConnectionError:
|
||||
return False
|
||||
|
||||
|
||||
def create_delete_table(client: OpenMetadata):
|
||||
databases = client.list_entities(entity=Database).entities
|
||||
columns = [
|
||||
Column(name="id", dataType="INT", dataLength=1),
|
||||
Column(name="name", dataType="VARCHAR", dataLength=1),
|
||||
]
|
||||
db_ref = EntityReference(
|
||||
id=databases[0].id, name=databases[0].name.__root__, type="database"
|
||||
)
|
||||
table = CreateTableRequest(name="test1", columns=columns, database=db_ref)
|
||||
created_table = client.create_or_update(table)
|
||||
if table.name.__root__ == created_table.name.__root__:
|
||||
requests.delete(
|
||||
"http://localhost:8585/api/v1/tables/{}".format(created_table.id.__root__)
|
||||
)
|
||||
return 1
|
||||
else:
|
||||
requests.delete(
|
||||
"http://localhost:8585/api/v1/tables/{}".format(created_table.id.__root__)
|
||||
)
|
||||
return 0
|
||||
|
||||
|
||||
def create_delete_database(client: OpenMetadata):
|
||||
data = {
|
||||
"databaseConnection": {"hostPort": "localhost"},
|
||||
"name": "temp_local_mysql",
|
||||
"serviceType": "Mysql",
|
||||
"description": "local mysql env",
|
||||
}
|
||||
create_mysql_service = CreateDatabaseServiceRequest(**data)
|
||||
mysql_service = client.create_or_update(create_mysql_service)
|
||||
create_database_request = CreateDatabaseRequest(
|
||||
name="dwh", service=mysql_service.fullyQualifiedName
|
||||
)
|
||||
created_database = client.create_or_update(create_database_request)
|
||||
resp = create_delete_table(client)
|
||||
log_ansi_encoded_string(message=resp)
|
||||
client.delete(entity=Database, entity_id=str(created_database.id.__root__))
|
||||
client.delete(entity=DatabaseService, entity_id=str(mysql_service.id.__root__))
|
||||
return resp
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def catalog_service(docker_ip, docker_services):
|
||||
"""Ensure that Docker service is up and responsive."""
|
||||
port = docker_services.port_for("db", 3306)
|
||||
log_ansi_encoded_string(message="Mysql is running on port {}".format(port))
|
||||
url = "http://localhost:8585"
|
||||
time.sleep(30)
|
||||
docker_services.wait_until_responsive(
|
||||
timeout=30.0, pause=0.5, check=lambda: is_responsive(url)
|
||||
)
|
||||
return url
|
||||
|
||||
|
||||
def test_check_tables(catalog_service):
|
||||
metadata_config = MetadataServerConfig.parse_obj(
|
||||
{"api_endpoint": catalog_service + "/api", "auth_provider_type": "no-auth"}
|
||||
)
|
||||
client = OpenMetadata(metadata_config)
|
||||
assert create_delete_database(client)
|
||||
|
||||
|
||||
def test_read_schema():
|
||||
url = "mysql+pymysql://catalog_user:catalog_password@localhost:3307"
|
||||
# pool_recycle to avoid the occasional "Lost connection to MySQL server during query" error
|
||||
# when host machine is slow
|
||||
engine = create_engine(url, pool_recycle=1)
|
||||
inspector = inspect(engine)
|
||||
schemas = []
|
||||
for schema in inspector.get_schema_names():
|
||||
schemas.append(schema)
|
||||
if "catalog_test" in schemas:
|
||||
assert 1
|
||||
else:
|
||||
assert 0
|
||||
@ -1,27 +0,0 @@
|
||||
# Copyright 2021 Collate
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
version: "3.9"
|
||||
services:
|
||||
db:
|
||||
image: mysql/mysql-server:latest
|
||||
restart: always
|
||||
environment:
|
||||
MYSQL_ROOT_PASSWORD: password
|
||||
MYSQL_USER: catalog_user
|
||||
MYSQL_PASSWORD: catalog_password
|
||||
MYSQL_DATABASE: catalog_test
|
||||
volumes:
|
||||
- ./setup/setup.sql:/docker-entrypoint-initdb.d/setup.sql
|
||||
expose:
|
||||
- 3306
|
||||
ports:
|
||||
- 3307:3306
|
||||
@ -1,15 +0,0 @@
|
||||
CREATE DATABASE IF NOT EXISTS catalog_test;
|
||||
use catalog_test;
|
||||
DROP TABLE IF EXISTS `user_entity`;
|
||||
CREATE TABLE `user_entity` (
|
||||
`id` varchar(36) GENERATED ALWAYS AS (json_unquote(json_extract(`json`,_utf8mb4'$.id'))) STORED NOT NULL,
|
||||
`name` varchar(256) GENERATED ALWAYS AS (json_unquote(json_extract(`json`,_utf8mb4'$.name'))) VIRTUAL NOT NULL,
|
||||
`deactivated` varchar(8) GENERATED ALWAYS AS (json_unquote(json_extract(`json`,_utf8mb4'$.deactivated'))) VIRTUAL,
|
||||
`json` json NOT NULL,
|
||||
`timestamp` bigint DEFAULT NULL,
|
||||
PRIMARY KEY (`id`),
|
||||
UNIQUE KEY `unique_name` (`name`)
|
||||
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
|
||||
LOCK TABLES `user_entity` WRITE;
|
||||
INSERT INTO `user_entity` (`json`, `timestamp`) VALUES ,'{\"id\": \"07c8bc6c-fac2-4d38-84bf-e25ce6705f4d\", \"name\": \"john_fleming6\", \"timezone\": \"PST\", \"displayName\": \"John Fleming\"}',NULL),,'{\"id\": \"07c8bc6c-fac2-4d38-84bf-e25ce6705f4d\", \"name\": \"john_fleming6\", \"timezone\": \"PST\", \"displayName\": \"John Fleming\"}',NULL),'{\"id\": \"183de69f-5001-4752-8659-895f4fa5df26\", \"name\": \"geoffrey_edwards9\", \"timezone\": \"PST\", \"displayName\": \"Geoffrey Edwards\"}',NULL),,'{\"id\": \"07c8bc6c-fac2-4d38-84bf-e25ce6705f4d\", \"name\": \"john_fleming6\", \"timezone\": \"PST\", \"displayName\": \"John Fleming\"}',NULL),'{\"id\": \"183de69f-5001-4752-8659-895f4fa5df26\", \"name\": \"geoffrey_edwards9\", \"timezone\": \"PST\", \"displayName\": \"Geoffrey Edwards\"}',NULL),'{\"id\": \"1ad02818-8985-48d6-9597-85b485d67bb8\", \"name\": \"cheryl_blackburn2\", \"timezone\": \"PST\", \"displayName\": \"Cheryl Blackburn\"}',NULL),,'{\"id\": \"07c8bc6c-fac2-4d38-84bf-e25ce6705f4d\", \"name\": \"john_fleming6\", \"timezone\": \"PST\", \"displayName\": \"John Fleming\"}',NULL),'{\"id\": \"183de69f-5001-4752-8659-895f4fa5df26\", \"name\": \"geoffrey_edwards9\", \"timezone\": \"PST\", \"displayName\": \"Geoffrey Edwards\"}',NULL),'{\"id\": \"1ad02818-8985-48d6-9597-85b485d67bb8\", \"name\": \"cheryl_blackburn2\", \"timezone\": \"PST\", \"displayName\": \"Cheryl Blackburn\"}',NULL),'{\"id\": \"4eff3c8d-f9c2-4f4e-9a48-3aa30ceb2956\", \"name\": \"tracie_sandoval3\", \"timezone\": \"PST\", \"displayName\": \"Tracie Sandoval\"}',NULL),,'{\"id\": \"07c8bc6c-fac2-4d38-84bf-e25ce6705f4d\", \"name\": \"john_fleming6\", \"timezone\": \"PST\", \"displayName\": \"John Fleming\"}',NULL),'{\"id\": \"183de69f-5001-4752-8659-895f4fa5df26\", \"name\": \"geoffrey_edwards9\", \"timezone\": \"PST\", \"displayName\": \"Geoffrey Edwards\"}',NULL),'{\"id\": \"1ad02818-8985-48d6-9597-85b485d67bb8\", \"name\": \"cheryl_blackburn2\", \"timezone\": \"PST\", \"displayName\": \"Cheryl Blackburn\"}',NULL),'{\"id\": \"4eff3c8d-f9c2-4f4e-9a48-3aa30ceb2956\", \"name\": \"tracie_sandoval3\", \"timezone\": \"PST\", \"displayName\": \"Tracie Sandoval\"}',NULL),'{\"id\": \"bf63aa8e-d4fb-40ce-8d8c-7c7fd79f84ce\", \"name\": \"katie_wilson5\", \"timezone\": \"PST\", \"displayName\": \"Katie Wilson\"}',NULL);
|
||||
UNLOCK TABLES;
|
||||
@ -1,114 +0,0 @@
|
||||
# Copyright 2021 Collate
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import pytest
|
||||
import requests
|
||||
|
||||
from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest
|
||||
from metadata.generated.schema.api.data.createTable import CreateTableRequest
|
||||
from metadata.generated.schema.api.services.createDatabaseService import (
|
||||
CreateDatabaseServiceRequest,
|
||||
)
|
||||
from metadata.generated.schema.entity.data.table import Column
|
||||
from metadata.generated.schema.type.entityReference import EntityReference
|
||||
from metadata.ingestion.ometa.client import REST
|
||||
from metadata.utils.logger import log_ansi_encoded_string
|
||||
|
||||
headers = {"Content-type": "application/json"}
|
||||
service_name = "temp_local_postgres"
|
||||
database_name = "Test_Postgres"
|
||||
table_name = "test1"
|
||||
|
||||
|
||||
def is_responsive(url):
|
||||
try:
|
||||
response = requests.get(url)
|
||||
if response.status_code == 200:
|
||||
return True
|
||||
except ConnectionError:
|
||||
return False
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def catalog_service(docker_ip, docker_services):
|
||||
"""Ensure that Docker service is up and responsive."""
|
||||
port = docker_services.port_for("postgres", 5432)
|
||||
log_ansi_encoded_string(message="Postgres is running on port {}".format(port))
|
||||
url = "http://localhost:8585"
|
||||
docker_services.wait_until_responsive(
|
||||
timeout=60.0, pause=0.5, check=lambda: is_responsive(url)
|
||||
)
|
||||
return url
|
||||
|
||||
|
||||
def test_create_database_service(catalog_service):
|
||||
client = REST(catalog_service + "/api", "test", "test")
|
||||
data = {
|
||||
"databaseConnection": {"hostPort": "localhost:5432"},
|
||||
"name": "temp_local_postgres",
|
||||
"serviceType": "POSTGRES",
|
||||
"description": "local postgres env",
|
||||
}
|
||||
create_postgres_service = CreateDatabaseServiceRequest(**data)
|
||||
database_service = client.create_database_service(create_postgres_service)
|
||||
if database_service:
|
||||
assert 1
|
||||
else:
|
||||
assert 0
|
||||
|
||||
|
||||
def test_create_table_service(catalog_service):
|
||||
client = REST(catalog_service + "/api", "test", "test")
|
||||
postgres_dbservice = client.get_database_service(service_name)
|
||||
columns = [
|
||||
Column(
|
||||
name="test",
|
||||
description="test_desc",
|
||||
columnDataType="VARCHAR",
|
||||
ordinalPosition=0,
|
||||
),
|
||||
Column(
|
||||
name="test2",
|
||||
description="test_desc2",
|
||||
columnDataType="INT",
|
||||
ordinalPosition=1,
|
||||
),
|
||||
]
|
||||
|
||||
create_database_request = CreateDatabaseRequest(
|
||||
name=database_name,
|
||||
service=postgres_dbservice.fullyQualifiedName,
|
||||
)
|
||||
created_database = client.create_database(create_database_request)
|
||||
db_ref = EntityReference(
|
||||
id=created_database.id.__root__,
|
||||
name=created_database.name.__root__,
|
||||
type="database",
|
||||
)
|
||||
table = CreateTableRequest(name=table_name, columns=columns, database=db_ref)
|
||||
created_table = client.create_or_update_table(table)
|
||||
if created_database and created_table:
|
||||
assert 1
|
||||
else:
|
||||
assert 0
|
||||
|
||||
|
||||
def test_check_and_delete_ingest(catalog_service):
|
||||
client = REST(catalog_service + "/api", "test", "test")
|
||||
postgres_dbservice = client.get_database_service(service_name)
|
||||
database = client.get_database_by_name("{}.{}".format(service_name, database_name))
|
||||
table = client.get_table_by_name(f"{service_name}.{database_name}.{table_name}")
|
||||
r = requests.delete(
|
||||
"http://localhost:8585/api/v1/tables/{}".format(table.id.__root__)
|
||||
)
|
||||
r.raise_for_status()
|
||||
client.delete_database(database.id.__root__)
|
||||
client.delete_database_service(postgres_dbservice.id.__root__)
|
||||
@ -1,22 +0,0 @@
|
||||
# Copyright 2021 Collate
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
version: "3.9"
|
||||
services:
|
||||
postgres:
|
||||
image: postgres
|
||||
restart: always
|
||||
environment:
|
||||
POSTGRES_USER: catalog_user
|
||||
POSTGRES_PASSWORD: catalog_password
|
||||
POSTGRES_DB: pagila
|
||||
ports:
|
||||
- 5433:5432
|
||||
@ -1,174 +0,0 @@
|
||||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import socket
|
||||
import time
|
||||
from typing import List
|
||||
from urllib.parse import urlparse
|
||||
|
||||
import pytest
|
||||
import requests
|
||||
from sqlalchemy.engine import create_engine
|
||||
from sqlalchemy.inspection import inspect
|
||||
|
||||
from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest
|
||||
from metadata.generated.schema.api.data.createTable import CreateTableRequest
|
||||
from metadata.generated.schema.api.services.createDatabaseService import (
|
||||
CreateDatabaseServiceRequest,
|
||||
)
|
||||
from metadata.generated.schema.entity.data.database import Database
|
||||
from metadata.generated.schema.entity.data.table import Column, Table
|
||||
from metadata.generated.schema.entity.services.databaseService import DatabaseService
|
||||
from metadata.ingestion.ometa.ometa_api import OpenMetadata
|
||||
from metadata.utils.logger import log_ansi_encoded_string
|
||||
|
||||
|
||||
def is_responsive(url):
|
||||
try:
|
||||
response = requests.get(url)
|
||||
if response.status_code == 200:
|
||||
return True
|
||||
except ConnectionError:
|
||||
return False
|
||||
|
||||
|
||||
def is_port_open(url):
|
||||
url_parts = urlparse(url)
|
||||
hostname = url_parts.hostname
|
||||
port = url_parts.port
|
||||
try:
|
||||
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
s.connect((hostname, port))
|
||||
return True
|
||||
except socket.error:
|
||||
return False
|
||||
finally:
|
||||
s.close()
|
||||
|
||||
|
||||
def sleep(timeout_s):
|
||||
log_ansi_encoded_string(message=f"sleeping for {timeout_s} seconds")
|
||||
n = len(str(timeout_s))
|
||||
for i in range(timeout_s, 0, -1):
|
||||
log_ansi_encoded_string(message=f"{i:>{n}}", end="\r", flush=True)
|
||||
time.sleep(1)
|
||||
log_ansi_encoded_string(message=f"{'':>{n}}", end="\n", flush=True)
|
||||
|
||||
|
||||
def status(r):
|
||||
if r.status_code == 200 or r.status_code == 201:
|
||||
return 1
|
||||
else:
|
||||
return 0
|
||||
|
||||
|
||||
def create_delete_table(client: OpenMetadata, databases: List[Database]):
|
||||
columns = [
|
||||
Column(name="id", dataType="INT", dataLength=1),
|
||||
Column(name="name", dataType="VARCHAR", dataLength=1),
|
||||
]
|
||||
log_ansi_encoded_string(message=databases[0])
|
||||
table = CreateTableRequest(
|
||||
name="test1", columns=columns, database=databases[0].fullyQualifiedName
|
||||
)
|
||||
created_table = client.create_or_update(table)
|
||||
if table.name.__root__ == created_table.name.__root__:
|
||||
client.delete(entity=Table, entity_id=str(created_table.id.__root__))
|
||||
return 1
|
||||
else:
|
||||
client.delete(entity=Table, entity_id=str(created_table.id.__root__))
|
||||
return 0
|
||||
|
||||
|
||||
def create_delete_database(client: OpenMetadata, databases: List[Database]):
|
||||
data = {
|
||||
"databaseConnection": {"hostPort": "localhost:0000"},
|
||||
"name": "temp_local_trino",
|
||||
"serviceType": "Trino",
|
||||
"description": "local trino env",
|
||||
}
|
||||
create_trino_service = CreateDatabaseServiceRequest(**data)
|
||||
trino_service = client.create_or_update(create_trino_service)
|
||||
create_database_request = CreateDatabaseRequest(
|
||||
name="dwh",
|
||||
service=trino_service.fullyQualifiedName,
|
||||
)
|
||||
created_database = client.create_or_update(create_database_request)
|
||||
resp = create_delete_table(client, databases)
|
||||
log_ansi_encoded_string(message=resp)
|
||||
client.delete(entity=Database, entity_id=str(created_database.id.__root__))
|
||||
client.delete(entity=DatabaseService, entity_id=str(trino_service.id.__root__))
|
||||
return resp
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def trino_service(docker_ip, docker_services):
|
||||
"""Ensure that Docker service is up and responsive."""
|
||||
port = docker_services.port_for("trino-server", 8080)
|
||||
log_ansi_encoded_string(message=f"trino is running on port {port}")
|
||||
timeout_s = 120
|
||||
sleep(timeout_s)
|
||||
url = "trino://localhost:8080/"
|
||||
docker_services.wait_until_responsive(
|
||||
timeout=timeout_s, pause=0.1, check=lambda: is_port_open(url)
|
||||
)
|
||||
engine = create_engine(url)
|
||||
inspector = inspect(engine)
|
||||
return inspector
|
||||
|
||||
|
||||
def test_check_schema(trino_service):
|
||||
inspector = trino_service
|
||||
schemas = []
|
||||
for schema in inspector.get_schema_names():
|
||||
schemas.append(schema)
|
||||
if "metadata" in schemas:
|
||||
assert 1
|
||||
else:
|
||||
assert 0
|
||||
|
||||
|
||||
def test_read_tables(trino_service):
|
||||
inspector = trino_service
|
||||
check_tables = [
|
||||
"analyze_properties",
|
||||
"catalogs",
|
||||
"column_properties",
|
||||
"materialized_view_properties",
|
||||
"materialized_views",
|
||||
"schema_properties",
|
||||
"table_comments",
|
||||
"table_properties",
|
||||
]
|
||||
tables = []
|
||||
for table in inspector.get_table_names("metadata"):
|
||||
tables.append(table)
|
||||
if set(tables) == set(check_tables):
|
||||
assert 1
|
||||
else:
|
||||
assert 0
|
||||
|
||||
|
||||
def test_check_table():
|
||||
is_responsive("http://localhost:8586/healthcheck")
|
||||
metadata_config = MetadataServerConfig.parse_obj(
|
||||
{"api_endpoint": "http://localhost:8585/api", "auth_provider_type": "no-auth"}
|
||||
)
|
||||
client = OpenMetadata(metadata_config)
|
||||
databases = client.list_entities(entity=Database).entities
|
||||
if len(databases) > 0:
|
||||
assert create_delete_table(client, databases)
|
||||
else:
|
||||
assert create_delete_database(client, databases)
|
||||
@ -1,22 +0,0 @@
|
||||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
version: "3"
|
||||
services:
|
||||
trino-server:
|
||||
image: trinodb/trino
|
||||
restart: always
|
||||
ports:
|
||||
- 8080:8080
|
||||
@ -1,43 +0,0 @@
|
||||
import json
|
||||
from unittest import TestCase
|
||||
|
||||
from metadata.ingestion.api.workflow import Workflow
|
||||
|
||||
config = """
|
||||
{
|
||||
"source": {
|
||||
"type": "trino",
|
||||
"config": {
|
||||
"service_name": "local_trino",
|
||||
"host_port": "localhost:8080",
|
||||
"username": "user",
|
||||
"catalog": "tpcds",
|
||||
"database": "tiny"
|
||||
}
|
||||
},
|
||||
"sink": {
|
||||
"type": "metadata-rest",
|
||||
"config": {}
|
||||
},
|
||||
"metadata_server": {
|
||||
"type": "metadata-server",
|
||||
"config": {
|
||||
"api_endpoint": "http://localhost:8585/api",
|
||||
"auth_provider_type": "no-auth"
|
||||
}
|
||||
}
|
||||
}
|
||||
"""
|
||||
|
||||
|
||||
class WorkflowTest(TestCase):
|
||||
def test_execute_200(self):
|
||||
"""
|
||||
stage/file.py must be compatible with source/sample_data.py,
|
||||
this test try to catch if one becomes incompatible with the other
|
||||
by running a workflow that includes both of them.
|
||||
"""
|
||||
workflow = Workflow.create(json.loads(config))
|
||||
workflow.execute()
|
||||
workflow.stop()
|
||||
self.assertTrue(True)
|
||||
Loading…
x
Reference in New Issue
Block a user