Feature/1 fix and add lineage to exasol connector (#21399)

* Add lineage to Exasol connector

* Update test_connection to return TestConnectionResult

* Add exasol tests & dependencies to tests in setup.py

* Opensearch is required for testing, so add it there

* Modify metadata

* Update documentation for lineage

* Apply formatting changes to code

* Apply make py_format
This commit is contained in:
Ariel Schulz 2025-08-06 20:19:38 +02:00 committed by ulixius9
parent 395203b589
commit 8d98833622
16 changed files with 440 additions and 7 deletions

View File

@ -18,7 +18,7 @@ on:
e2e-tests:
description: "E2E Tests to run"
required: True
default: '["bigquery", "dbt_redshift", "metabase", "mssql", "mysql", "redash", "snowflake", "tableau", "powerbi", "vertica", "python", "redshift", "quicksight", "datalake_s3", "postgres", "oracle", "athena", "bigquery_multiple_project"]'
default: '["bigquery", "dbt_redshift", "metabase", "mssql", "mysql", "redash", "snowflake", "tableau", "powerbi", "vertica", "python", "redshift", "quicksight", "datalake_s3", "postgres", "oracle", "athena", "bigquery_multiple_project", "exasol"]'
debug:
description: "If Debugging the Pipeline, Slack and Sonar events won't be triggered [default, true or false]. Default will trigger only on main branch."
required: False
@ -45,7 +45,7 @@ jobs:
strategy:
fail-fast: false
matrix:
e2e-test: ${{ fromJSON(inputs.e2e-tests || '["bigquery", "dbt_redshift", "metabase", "mssql", "mysql", "redash", "snowflake", "tableau", "powerbi", "vertica", "python", "redshift", "quicksight", "datalake_s3", "postgres", "oracle", "athena", "bigquery_multiple_project"]') }}
e2e-test: ${{ fromJSON(inputs.e2e-tests || '["bigquery", "dbt_redshift", "metabase", "mssql", "mysql", "redash", "snowflake", "tableau", "powerbi", "vertica", "python", "redshift", "quicksight", "datalake_s3", "postgres", "oracle", "athena", "bigquery_multiple_project", "exasol"]') }}
environment: test
steps:

View File

@ -254,7 +254,10 @@ plugins: Dict[str, Set[str]] = {
"httpx>=0.23.0",
}, # also requires requests-aws4auth which is in base
"opensearch": {VERSIONS["opensearch"]},
"exasol": {"sqlalchemy_exasol>=5,<6"},
"exasol": {
"sqlalchemy_exasol>=5,<6",
"exasol-integration-test-docker-environment>=3.1.0,<4",
},
"glue": {VERSIONS["boto3"]},
"great-expectations": {VERSIONS["great-expectations"]},
"great-expectations-1xx": {VERSIONS["great-expectations-1xx"]},
@ -456,6 +459,8 @@ test = {
VERSIONS["google-cloud-bigtable"],
*plugins["bigquery"],
"faker==37.1.0", # The version needs to be fixed to prevent flaky tests!
*plugins["exasol"],
VERSIONS["opensearch"],
}
if sys.version_info >= (3, 9):

View File

@ -0,0 +1,25 @@
source:
type: exasol
serviceName: local_exasol
serviceConnection:
config:
type: Exasol
username: openmetadata_user
password: openmetadata_password
hostPort: localhost:8563
tls: disable-tls
connectionOptions: {}
connectionArguments: {}
sourceConfig:
config:
type: DatabaseMetadata
sink:
type: metadata-rest
config: {}
workflowConfig:
# loggerLevel: INFO # DEBUG, INFO, WARN or ERROR
openMetadataServerConfig:
hostPort: http://localhost:8585/api
authProvider: openmetadata
securityConfig:
jwtToken: "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg"

View File

@ -19,6 +19,7 @@ from metadata.ingestion.connections.builders import (
)
from metadata.ingestion.connections.test_connections import test_connection_db_common
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.database.exasol.queries import EXASOL_TEST_GET_QUERIES
from metadata.utils.constants import THREE_MIN
from metadata.utils.logger import ingestion_logger
@ -89,10 +90,15 @@ def test_connection(
Test connection. This can be executed either as part
of a metadata workflow or during an Automation Workflow
"""
queries = {
"GetQueries": EXASOL_TEST_GET_QUERIES,
}
return test_connection_db_common(
metadata=metadata,
engine=engine,
service_connection=service_connection,
automation_workflow=automation_workflow,
queries=queries,
timeout_seconds=timeout_seconds,
)

View File

@ -0,0 +1,24 @@
from metadata.ingestion.lineage.models import Dialect
from metadata.ingestion.source.database.exasol.queries import EXASOL_SQL_STATEMENT
from metadata.ingestion.source.database.exasol.query_parser import (
ExasolQueryParserSource,
)
from metadata.ingestion.source.database.lineage_source import LineageSource
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
class ExasolLineageSource(ExasolQueryParserSource, LineageSource):
"""
Exasol class for Lineage
"""
dialect = Dialect.EXASOL
sql_stmt = EXASOL_SQL_STATEMENT
filters = """
AND (
s.command_name IN ('MERGE', 'UPDATE', 'CREATE TABLE AS', 'CREATE VIEW')
OR (s.command_name = 'INSERT' AND LOWER(s.sql_text) LIKE '%insert%into%select%from%')
)
"""

View File

@ -1,5 +1,7 @@
from typing import Optional, cast
from sqlalchemy.engine.reflection import Inspector
from metadata.generated.schema.entity.services.connections.database.exasolConnection import (
ExasolConnection,
)
@ -9,6 +11,10 @@ from metadata.generated.schema.metadataIngestion.workflow import (
from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.database.common_db_source import CommonDbSourceService
from metadata.utils.sqlalchemy_utils import get_all_table_ddls, get_table_ddl
Inspector.get_all_table_ddls = get_all_table_ddls
Inspector.get_table_ddl = get_table_ddl
class ExasolSource(CommonDbSourceService):

View File

@ -1,3 +1,25 @@
import textwrap
EXASOL_SQL_STATEMENT = textwrap.dedent(
"""
SELECT
s.sql_text "query_text",
s.command_name "query_type",
se.user_name "user_name",
s.start_time "start_time",
s.stop_time "end_time",
s.duration "duration"
FROM EXA_DBA_AUDIT_SQL s
JOIN EXA_DBA_AUDIT_SESSIONS se
ON s.SESSION_ID = se.SESSION_ID
WHERE s.sql_text NOT LIKE '/* {{"app": "OpenMetadata", %%}} */%%'
AND s.sql_text NOT LIKE '/* {{"app": "dbt", %%}} */%%'
AND start_time between TO_TIMESTAMP('{start_time}') and TO_TIMESTAMP('{end_time}')
{filters}
LIMIT {result_limit}
"""
)
EXASOL_TEST_GET_QUERIES = """
SELECT
s.sql_text,

View File

@ -0,0 +1,33 @@
from abc import ABC
from typing import Optional
from metadata.generated.schema.entity.services.connections.database.exasolConnection import (
ExasolConnection,
)
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.database.query_parser_source import QueryParserSource
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
class ExasolQueryParserSource(QueryParserSource, ABC):
"""
Exasol base for Usage and Lineage
"""
@classmethod
def create(
cls, config_dict, metadata: OpenMetadata, pipeline_name: Optional[str] = None
):
config: WorkflowSource = WorkflowSource.model_validate(config_dict)
connection: ExasolConnection = config.serviceConnection.root.config
if not isinstance(connection, ExasolConnection):
raise InvalidSourceException(
f"Expected ExasolConnection, but got {connection}"
)
return cls(config, metadata)

View File

@ -1,4 +1,8 @@
from metadata.ingestion.source.database.exasol.lineage import ExasolLineageSource
from metadata.ingestion.source.database.exasol.metadata import ExasolSource
from metadata.utils.service_spec.default import DefaultDatabaseSpec
ServiceSpec = DefaultDatabaseSpec(metadata_source_class=ExasolSource)
ServiceSpec = DefaultDatabaseSpec(
metadata_source_class=ExasolSource,
lineage_source_class=ExasolLineageSource,
)

View File

@ -0,0 +1,29 @@
source:
type: exasol
serviceName: local_exasol
serviceConnection:
config:
type: Exasol
username: sys
password: exasol
hostPort: localhost:8563
tls: disable-tls
connectionOptions: {}
connectionArguments: {}
sourceConfig:
config:
type: DatabaseMetadata
markDeletedTables: true
includeTables: true
includeViews: true
includeDDL: true
sink:
type: metadata-rest
config: {}
workflowConfig:
loggerLevel: DEBUG
openMetadataServerConfig:
hostPort: http://localhost:8585/api
authProvider: openmetadata
securityConfig:
jwtToken: "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg"

View File

@ -0,0 +1,185 @@
# Copyright 2022 Collate
# Licensed under the Collate Community License, Version 1.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Test Exasol connector with CLI
"""
import subprocess
from typing import List
import pytest
from .base.e2e_types import E2EType
from .common.test_cli_db import CliCommonDB
from .common_e2e_sqa_mixins import SQACommonMethods
SERVICE_NAME = "local_exasol"
SCHEMA_NAME = "openmetadata_schema"
TABLE_NAME = "datatypes"
VIEW_NAME = f"view_{TABLE_NAME}"
DB_PORT = 8563
DB_VERSION = "7.1.26"
CONTAINER_SUFFIX = "exasoaddl"
CONTAINER_NAME = f"db_container_{CONTAINER_SUFFIX}"
class ExasolCliTest(CliCommonDB.TestSuite, SQACommonMethods):
"""
Exasol CLI Tests
"""
create_table_query: str = f"""
CREATE TABLE IF NOT EXISTS {SCHEMA_NAME}.{TABLE_NAME} (
col_boolean BOOLEAN,
col_decimal DECIMAL(18,0),
col_date DATE,
col_timestamp TIMESTAMP,
col_timestamp_local TIMESTAMP WITH LOCAL TIME ZONE,
col_char CHAR(1),
col_varchar VARCHAR(1)
);
"""
create_view_query: str = f"""
CREATE VIEW {SCHEMA_NAME}.{VIEW_NAME} AS
SELECT *
FROM {SCHEMA_NAME}.{TABLE_NAME}
"""
insert_data_queries: List[str] = [
f"""
INSERT INTO {SCHEMA_NAME}.{TABLE_NAME} (col_boolean, col_decimal, col_date, col_timestamp, col_timestamp_local, col_char, col_varchar) VALUES
(TRUE, 18.5, '2023-07-13', '2023-07-13 06:04:45', '2023-07-13 04:04:45', 'a', 'b');
""",
f"""
INSERT INTO {SCHEMA_NAME}.{TABLE_NAME} (col_boolean, col_decimal, col_date, col_timestamp, col_timestamp_local, col_char, col_varchar) VALUES
(TRUE, -18.5, '2023-09-13', '2023-09-13 06:04:45', '2023-09-13 04:04:45', 'c', 'd');
""",
]
drop_table_query: str = f"""
DROP TABLE IF EXISTS {SCHEMA_NAME}.{TABLE_NAME};
"""
drop_view_query: str = f"""
DROP VIEW IF EXISTS {SCHEMA_NAME}.{VIEW_NAME};
"""
@classmethod
def setUpClass(cls):
subprocess.run(
[
"itde",
"spawn-test-environment",
"--environment-name",
CONTAINER_SUFFIX,
"--database-port-forward",
f"{DB_PORT}",
"--bucketfs-port-forward",
"2580",
"--docker-db-image-version",
DB_VERSION,
"--db-mem-size",
"4GB",
]
)
super().setUpClass()
with cls.engine.connect() as connection:
connection.execute(f"CREATE SCHEMA IF NOT EXISTS {SCHEMA_NAME}")
connection.execute(f"CREATE SCHEMA IF NOT EXISTS IGNORE_SCHEMA")
connection.execute(cls.create_table_query)
connection.execute(
f"CREATE OR REPLACE TABLE {SCHEMA_NAME}.IGNORE_TABLE AS SELECT * FROM {SCHEMA_NAME}.{TABLE_NAME}"
)
connection.close()
@classmethod
def tearDownClass(cls):
super().tearDownClass()
subprocess.run(["docker", "kill", CONTAINER_NAME], check=True, encoding="utf-8")
@pytest.mark.order(8)
def test_table_filter_excludes(self) -> None:
"""7. Vanilla ingestion + exclude table filter pattern
We will perform the following steps:
1. build config file for ingest with filters
2. run ingest `self.run_command()` defaults to `ingestion`
"""
self.build_config_file(
E2EType.INGEST_DB_FILTER_TABLE, {"excludes": self.get_excludes_tables()}
)
result = self.run_command()
sink_status, source_status = self.retrieve_statuses(result)
self.assert_filtered_tables_excludes(source_status, sink_status)
@staticmethod
def get_connector_name() -> str:
return "exasol"
def create_table_and_view(self) -> None:
SQACommonMethods.create_table_and_view(self)
def delete_table_and_view(self) -> None:
SQACommonMethods.delete_table_and_view(self)
@staticmethod
def get_includes_schemas() -> List[str]:
return [f"{SCHEMA_NAME}.*"]
@classmethod
def get_excludes_schemas(cls) -> List[str]:
return ["IGNORE_SCHEMA.*"]
@staticmethod
def get_includes_tables() -> List[str]:
return [f"{TABLE_NAME}"]
@staticmethod
def get_excludes_tables() -> List[str]:
return ["IGNORE_TABLE"]
@staticmethod
def expected_tables() -> int:
return 1
def expected_sample_size(self) -> int:
return len(self.insert_data_queries)
def view_column_lineage_count(self) -> int:
return 22
def expected_lineage_node(self) -> str:
return f"{SERVICE_NAME}.default.{SCHEMA_NAME}.{VIEW_NAME}"
@staticmethod
def fqn_created_table() -> str:
return f"{SERVICE_NAME}.default.{SCHEMA_NAME}.{TABLE_NAME}"
@staticmethod
def expected_filtered_schema_includes() -> int:
return 1
@staticmethod
def expected_filtered_schema_excludes() -> int:
return 1
@staticmethod
def expected_filtered_table_includes() -> int:
return 1
@staticmethod
def expected_filtered_table_excludes() -> int:
return 1
@staticmethod
def expected_filtered_mix() -> int:
return 2

View File

@ -0,0 +1,75 @@
# Copyright 2025 Collate
# Licensed under the Collate Community License, Version 1.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Test Exasol using the topology
"""
from unittest import TestCase
from unittest.mock import patch
from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataWorkflowConfig,
)
from metadata.ingestion.source.database.exasol.metadata import ExasolSource
mock_exasol_config = {
"source": {
"type": "exasol",
"serviceName": "local_exasol1",
"serviceConnection": {
"config": {
"type": "Exasol",
"username": "username",
"password": "password",
"hostPort": "localhost:8563",
"tls": "disable-tls",
}
},
"sourceConfig": {
"config": {
"type": "DatabaseMetadata",
}
},
},
"sink": {
"type": "metadata-rest",
"config": {},
},
"workflowConfig": {
"openMetadataServerConfig": {
"hostPort": "http://localhost:8585/api",
"authProvider": "openmetadata",
"securityConfig": {"jwtToken": "exasol"},
}
},
}
class ExasolUnitTest(TestCase):
@patch(
"metadata.ingestion.source.database.common_db_source.CommonDbSourceService.test_connection"
)
def __init__(self, methodName, test_connection) -> None:
super().__init__(methodName)
test_connection.return_value = False
self.config = OpenMetadataWorkflowConfig.model_validate(mock_exasol_config)
self.exasol_source = ExasolSource.create(
mock_exasol_config["source"],
self.config.workflowConfig.openMetadataServerConfig,
)
@patch("sqlalchemy.engine.base.Engine")
@patch(
"metadata.ingestion.source.database.common_db_source.CommonDbSourceService.connection"
)
def test_close_connection(self, engine, connection):
connection.return_value = True
self.exasol_source.close()

View File

@ -8,8 +8,8 @@ slug: /connectors/database/exasol
name="Exasol"
stage="PROD"
platform="OpenMetadata"
availableFeatures=["Metadata"]
unavailableFeatures=["Query Usage", "Lineage", "Column-level Lineage", "Data Profiler", "Data Quality", "Owners", "dbt", "Tags", "Stored Procedures", "Sample Data", "Auto-Classification"]
availableFeatures=["Metadata", "Lineage", "Column-level Lineage"]
unavailableFeatures=["Query Usage", "Data Profiler", "Data Quality", "Owners", "dbt", "Tags", "Stored Procedures", "Sample Data", "Auto-Classification"]
/ %}
@ -19,6 +19,7 @@ Configure and schedule Exasol metadata from the OpenMetadata UI:
- [Requirements](#requirements)
- [Metadata Ingestion](#metadata-ingestion)
- [Lineage](/connectors/ingestion/lineage)
- [Troubleshooting](/connectors/database/exasol/troubleshooting)
{% partial file="/v1.8/connectors/ingestion-modes-tiles.md" variables={yamlPath: "/connectors/database/exasol/yaml"} /%}
@ -33,6 +34,13 @@ To deploy OpenMetadata, check the Deployment guides.
The connector requires **Exasol version 7.1 or higher** to function correctly. Ensure your Exasol instance meets this minimum version requirement before proceeding.
To ingest basic metadata, an Exasol user must have the following privileges:
- `USAGE` privilege on Schema
- `SELECT` privilege on Tables
Openmetadata fetches the query logs for the lineage workflow by querying `EXA_STATISTICS.EXA_DBA_AUDIT_SQL` table.
For this, an Exasol user should be granted the `SELECT ANY DICTIONARY` system privilege.
## Metadata Ingestion
{% partial

View File

@ -26,6 +26,13 @@
"description": "From a given schema, list the views belonging to that schema. If no schema is specified, we'll list the tables of a random schema.",
"errorMessage": "Failed to fetch views, please validate if the user has enough privilege to fetch views.",
"mandatory": false
},
{
"name": "GetQueries",
"description":"Check if we can access the EXA_DBA_AUDIT_SQL table to get query logs, These queries are analyzed in the usage & lineage workflow.",
"errorMessage": "Failed to fetch queries, please validate if Exasol instance has EXA_DBA_AUDIT_SQL extension installed and the user has at least select privileges for EXA_DBA_AUDIT_SQL table.",
"mandatory": false
}
]
}
}

View File

@ -90,6 +90,9 @@
"supportsMetadataExtraction": {
"title": "Supports Metadata Extraction",
"$ref": "../connectionBasicType.json#/definitions/supportsMetadataExtraction"
},
"supportsLineageExtraction": {
"$ref": "../connectionBasicType.json#/definitions/supportsLineageExtraction"
}
},
"additionalProperties": false,

View File

@ -36,6 +36,7 @@ export interface ExasolConnection {
* SQLAlchemy driver scheme options.
*/
scheme?: ExasolScheme;
supportsLineageExtraction?: boolean;
supportsMetadataExtraction?: boolean;
/**
* Regex to only include/exclude tables that matches the pattern.