mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-12-30 08:57:09 +00:00
Fix Python test (#4095)
This commit is contained in:
parent
68b53551a5
commit
1f3667b46f
@ -20,7 +20,7 @@
|
||||
}
|
||||
},
|
||||
"properties": {
|
||||
"serviceType": {
|
||||
"type": {
|
||||
"description": "Service Type",
|
||||
"$ref": "#/definitions/athenaType",
|
||||
"default": "Athena"
|
||||
|
||||
@ -16,7 +16,8 @@
|
||||
"properties": {
|
||||
"type": {
|
||||
"description": "Service Type",
|
||||
"$ref": "#/definitions/sampleDataType"
|
||||
"$ref": "#/definitions/sampleDataType",
|
||||
"default": "SampleData"
|
||||
},
|
||||
"sampleDataFolder": {
|
||||
"description": "Sample Data File Path",
|
||||
|
||||
@ -4,7 +4,7 @@
|
||||
"description": "This **mock** database contains tables related to the Glue service",
|
||||
"service": {
|
||||
"id": "b946d870-03b2-4d33-a075-13665a7a76b9",
|
||||
"type": "GLUE"
|
||||
"type": "Glue"
|
||||
}
|
||||
}
|
||||
|
||||
@ -156,7 +156,7 @@ test = {
|
||||
build_options = {"includes": ["_cffi_backend"]}
|
||||
setup(
|
||||
name="openmetadata-ingestion",
|
||||
version="0.9.2.dev1",
|
||||
version="0.10.0.dev0",
|
||||
url="https://open-metadata.org/",
|
||||
author="OpenMetadata Committers",
|
||||
license="Apache License 2.0",
|
||||
|
||||
@ -25,6 +25,7 @@ from sqlalchemy_bigquery._types import (
|
||||
from metadata.generated.schema.api.tags.createTagCategory import (
|
||||
CreateTagCategoryRequest,
|
||||
)
|
||||
from metadata.generated.schema.entity.data.database import Database
|
||||
from metadata.generated.schema.entity.data.table import TableData
|
||||
from metadata.generated.schema.entity.services.connections.database.bigQueryConnection import (
|
||||
BigQueryConnection,
|
||||
@ -35,6 +36,7 @@ from metadata.generated.schema.entity.services.connections.metadata.openMetadata
|
||||
from metadata.generated.schema.metadataIngestion.workflow import (
|
||||
Source as WorkflowSource,
|
||||
)
|
||||
from metadata.generated.schema.type.entityReference import EntityReference
|
||||
from metadata.ingestion.api.source import InvalidSourceException
|
||||
from metadata.ingestion.source.sql_source import SQLSource
|
||||
from metadata.utils.column_type_parser import create_sqlalchemy_type
|
||||
@ -103,12 +105,15 @@ class BigquerySource(SQLSource):
|
||||
def create(cls, config_dict, metadata_config: OpenMetadataConnection):
|
||||
config: WorkflowSource = WorkflowSource.parse_obj(config_dict)
|
||||
connection: BigQueryConnection = config.serviceConnection.__root__.config
|
||||
options = connection.connectionOptions.dict()
|
||||
if not isinstance(connection, BigQueryConnection):
|
||||
raise InvalidSourceException(
|
||||
f"Expected BigQueryConnection, but got {connection}"
|
||||
)
|
||||
if not os.environ.get("GOOGLE_APPLICATION_CREDENTIALS"):
|
||||
if (
|
||||
not os.environ.get("GOOGLE_APPLICATION_CREDENTIALS")
|
||||
and connection.connectionOptions
|
||||
):
|
||||
options = connection.connectionOptions.dict()
|
||||
if options.get("credentials_path"):
|
||||
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = options[
|
||||
"credentials_path"
|
||||
@ -177,6 +182,16 @@ class BigquerySource(SQLSource):
|
||||
|
||||
super().fetch_sample_data(schema, table)
|
||||
|
||||
def _get_database(self, database: Optional[str]) -> Database:
|
||||
if not database:
|
||||
database = self.service_connection.projectID
|
||||
return Database(
|
||||
name=database,
|
||||
service=EntityReference(
|
||||
id=self.service.id, type=self.service_connection.type.value
|
||||
),
|
||||
)
|
||||
|
||||
def parse_raw_data_type(self, raw_data_type):
|
||||
return raw_data_type.replace(", ", ",").replace(" ", ":").lower()
|
||||
|
||||
|
||||
@ -10,6 +10,7 @@
|
||||
# limitations under the License.
|
||||
|
||||
# This import verifies that the dependencies are available.
|
||||
from typing import Optional
|
||||
|
||||
import cx_Oracle # noqa: F401
|
||||
import pydantic
|
||||
@ -47,7 +48,7 @@ class OracleSource(SQLSource):
|
||||
)
|
||||
return cls(config, metadata_config)
|
||||
|
||||
def _get_database(self, database: str) -> Database:
|
||||
def _get_database(self, database: Optional[str]) -> Database:
|
||||
if not database:
|
||||
database = self.service_connection.oracleServiceName
|
||||
return Database(
|
||||
|
||||
@ -51,37 +51,34 @@ class PostgresSource(SQLSource):
|
||||
connection: PostgresConnection = config.serviceConnection.__root__.config
|
||||
if not isinstance(connection, PostgresConnection):
|
||||
raise InvalidSourceException(
|
||||
f"Expected MysqlConnection, but got {connection}"
|
||||
f"Expected PostgresConnection, but got {connection}"
|
||||
)
|
||||
|
||||
return cls(config, metadata_config)
|
||||
|
||||
def get_databases(self) -> Iterable[Inspector]:
|
||||
if self.config.database != None:
|
||||
if self.service_connection.database:
|
||||
yield from super().get_databases()
|
||||
else:
|
||||
query = "select datname from pg_catalog.pg_database;"
|
||||
|
||||
results = self.connection.execute(query)
|
||||
|
||||
for res in results:
|
||||
|
||||
row = list(res)
|
||||
try:
|
||||
|
||||
logger.info(f"Ingesting from database: {row[0]}")
|
||||
self.config.database = row[0]
|
||||
self.service_connection.database = row[0]
|
||||
self.engine = get_engine(self.config.serviceConnection)
|
||||
self.connection = self.engine.connect()
|
||||
self.engine.connect()
|
||||
yield inspect(self.engine)
|
||||
|
||||
except Exception as err:
|
||||
logger.error(f"Failed to Connect: {row[0]} due to error {err}")
|
||||
|
||||
def _get_database(self, schema: str) -> Database:
|
||||
def _get_database(self, database: str) -> Database:
|
||||
if database:
|
||||
self.service_connection.database = database
|
||||
return Database(
|
||||
name=self.config.database + FQDN_SEPARATOR + schema,
|
||||
service=EntityReference(id=self.service.id, type=self.config.service_type),
|
||||
name=self.service_connection.database,
|
||||
service=EntityReference(id=self.service.id, type="database"),
|
||||
)
|
||||
|
||||
def get_status(self) -> SourceStatus:
|
||||
|
||||
@ -549,7 +549,7 @@ class SQLSource(Source[OMetaDatabaseAndTable]):
|
||||
|
||||
return columns
|
||||
|
||||
def _get_database(self, database: str) -> Database:
|
||||
def _get_database(self, database: Optional[str]) -> Database:
|
||||
if not database:
|
||||
database = "default"
|
||||
return Database(
|
||||
|
||||
@ -12,14 +12,15 @@
|
||||
"""
|
||||
OpenMetadata API initialization
|
||||
"""
|
||||
|
||||
|
||||
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
|
||||
OpenMetadataConnection,
|
||||
)
|
||||
from metadata.ingestion.ometa.ometa_api import OpenMetadata
|
||||
|
||||
|
||||
def test_init_ometa():
|
||||
|
||||
server_config = OpenMetadataServerConfig(hostPort="http://localhost:8585/api")
|
||||
server_config = OpenMetadataConnection(hostPort="http://localhost:8585/api")
|
||||
metadata = OpenMetadata(server_config)
|
||||
|
||||
assert metadata.health_check()
|
||||
|
||||
@ -7,9 +7,14 @@ config = """
|
||||
{
|
||||
"source": {
|
||||
"type": "sample-data",
|
||||
"config": {
|
||||
"sample_data_folder": "ingestion/examples/sample_data"
|
||||
}
|
||||
"serviceName": "sample_data",
|
||||
"serviceConnection": {
|
||||
"config": {
|
||||
"type": "SampleData",
|
||||
"sampleDataFolder": "ingestion/examples/sample_data"
|
||||
}
|
||||
},
|
||||
"sourceConfig": {}
|
||||
},
|
||||
"stage": {
|
||||
"type": "file",
|
||||
@ -17,11 +22,10 @@ config = """
|
||||
"filename": "/tmp/stage_test"
|
||||
}
|
||||
},
|
||||
"metadata_server": {
|
||||
"type": "metadata-server",
|
||||
"config": {
|
||||
"api_endpoint": "http://localhost:8585/api",
|
||||
"auth_provider_type": "no-auth"
|
||||
"workflowConfig": {
|
||||
"openMetadataServerConfig": {
|
||||
"hostPort": "http://localhost:8585/api",
|
||||
"authProvider": "no-auth"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -27,57 +27,49 @@ CONFIG = """
|
||||
{
|
||||
"source": {
|
||||
"type": "bigquery",
|
||||
"config": {
|
||||
"service_name": "test_bigquery",
|
||||
"project_id": "project_id",
|
||||
"host_port": "host_port",
|
||||
"username": "username",
|
||||
"service_name": "gcp_bigquery",
|
||||
"connect_args":{},
|
||||
"partition_query": "select * from {}.{} WHERE DATE({}) = '{}' LIMIT 1000",
|
||||
"enable_policy_tags": "True",
|
||||
"duration":1,
|
||||
"options":{
|
||||
"credentials":{
|
||||
"type": "service_account",
|
||||
"project_id": "project_id",
|
||||
"private_key_id": "private_key_id",
|
||||
"private_key": "",
|
||||
"client_email": "gcpuser@project_id.iam.gserviceaccount.com",
|
||||
"client_id": "",
|
||||
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
|
||||
"token_uri": "https://oauth2.googleapis.com/token",
|
||||
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
|
||||
"client_x509_cert_url": ""
|
||||
"serviceName": "local_bigquery",
|
||||
"serviceConnection": {
|
||||
"config": {
|
||||
"type": "BigQuery",
|
||||
"projectID": "project_id",
|
||||
"enablePolicyTagImport": true,
|
||||
"connectionOptions": {
|
||||
"credentials": {
|
||||
"type": "service_account",
|
||||
"project_id": "project_id",
|
||||
"private_key_id": "private_key_id",
|
||||
"private_key": "private_key",
|
||||
"client_email": "gcpuser@project_id.iam.gserviceaccount.com",
|
||||
"client_id": "client_id",
|
||||
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
|
||||
"token_uri": "https://oauth2.googleapis.com/token",
|
||||
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
|
||||
"client_x509_cert_url": ""
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"sourceConfig": {"config": {"enableDataProfiler": false}}
|
||||
},
|
||||
"sink": {
|
||||
"type": "file",
|
||||
"config": {
|
||||
"filename": "/var/tmp/datasets.json"
|
||||
}
|
||||
},
|
||||
"metadata_server": {
|
||||
"type": "metadata-server",
|
||||
"config": {
|
||||
"api_endpoint": "http://localhost:8585/api",
|
||||
"auth_provider_type": "no-auth"
|
||||
},
|
||||
"workflowConfig": {
|
||||
"openMetadataServerConfig": {
|
||||
"hostPort": "http://localhost:8585/api",
|
||||
"authProvider": "no-auth"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
"""
|
||||
|
||||
|
||||
MOCK_GET_TABLE_NAMES = [
|
||||
"open_metadata.test1",
|
||||
"open_metadata.cloudaudit_googleapis_com_data_access_20220122",
|
||||
]
|
||||
MOCK_GET_TABLE_NAMES = ["test_schema_1.test_table_1", "test_schema_1.test_table_2"]
|
||||
|
||||
GET_TABLE_DESCRIPTIONS = {"text": "Test"}
|
||||
MOCK_GET_SCHEMA_NAMES = ["open_metadata"]
|
||||
MOCK_GET_SCHEMA_NAMES = ["test_schema_1"]
|
||||
MOCK_UNIQUE_CONSTRAINTS = []
|
||||
MOCK_PK_CONSTRAINT = {"constrained_columns": []}
|
||||
MOCK_GET_COLUMN = [
|
||||
@ -209,7 +201,6 @@ MOCK_GET_COLUMN = [
|
||||
},
|
||||
]
|
||||
|
||||
|
||||
MOCK_GET_VIEW_NAMES = []
|
||||
MOCK_GET_VIEW_DEFINITION = ""
|
||||
|
||||
@ -274,14 +265,18 @@ class BigQueryIngestionTest(TestCase):
|
||||
def test_file_sink(self):
|
||||
config = json.loads(CONFIG)
|
||||
file_data = open(config["sink"]["config"]["filename"])
|
||||
data = json.load(file_data)
|
||||
for i in data:
|
||||
table = i.get("table")
|
||||
omdtable_obj: OMetaDatabaseAndTable = OMetaDatabaseAndTable.parse_obj(i)
|
||||
file_sink = json.load(file_data)
|
||||
for ometa_data in file_sink:
|
||||
table = ometa_data.get("table")
|
||||
omdtable_obj: OMetaDatabaseAndTable = OMetaDatabaseAndTable.parse_obj(
|
||||
ometa_data
|
||||
)
|
||||
table_obj: Table = Table.parse_obj(table)
|
||||
|
||||
assert table.get("description") == GET_TABLE_DESCRIPTIONS.get("text")
|
||||
table_name = f"{i.get('database').get('name')}.{table.get('name')}"
|
||||
table_name = (
|
||||
f"{ometa_data.get('database_schema').get('name')}.{table.get('name')}"
|
||||
)
|
||||
if table.get("tableType") == TableType.Regular.value:
|
||||
assert table_name in MOCK_GET_TABLE_NAMES
|
||||
|
||||
|
||||
@ -58,7 +58,7 @@ CONFIG = """
|
||||
"sink": {
|
||||
"type": "file",
|
||||
"config": {
|
||||
"filename": "/var/tmp/datasets.json"
|
||||
"filename": "/var/tmp/vertica.json"
|
||||
}
|
||||
},
|
||||
"workflowConfig": {
|
||||
@ -213,7 +213,6 @@ class VerticaIngestionTest(TestCase):
|
||||
get_columns.return_value = MOCK_GET_COLUMN
|
||||
get_view_names.return_value = MOCK_GET_VIEW_NAMES
|
||||
get_view_definition.return_value = MOCK_GET_VIEW_DEFINITION
|
||||
|
||||
execute_workflow()
|
||||
|
||||
def test_file_sink(self):
|
||||
|
||||
@ -25,7 +25,7 @@ config = """
|
||||
"serviceConnection": {
|
||||
"config": {
|
||||
"type": "SampleData",
|
||||
"sampleDataFolder": "./examples/sample_data"
|
||||
"sampleDataFolder": "ingestion/examples/sample_data"
|
||||
}
|
||||
},
|
||||
"sourceConfig": {}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user