mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-10-27 00:31:42 +00:00
qlikcloud get script tables (#22022)
This commit is contained in:
parent
d029ceab0a
commit
10b377590c
@ -12,6 +12,7 @@
|
||||
REST Auth & Client for QlikCloud
|
||||
"""
|
||||
import json
|
||||
import re
|
||||
import traceback
|
||||
from typing import Dict, Iterable, List, Optional
|
||||
|
||||
@ -23,12 +24,14 @@ from metadata.ingestion.source.dashboard.qlikcloud.constants import (
|
||||
APP_LOADMODEL_REQ,
|
||||
CREATE_SHEET_SESSION,
|
||||
GET_LOADMODEL_LAYOUT,
|
||||
GET_SCRIPT,
|
||||
GET_SHEET_LAYOUT,
|
||||
OPEN_DOC_REQ,
|
||||
)
|
||||
from metadata.ingestion.source.dashboard.qlikcloud.models import (
|
||||
QlikApp,
|
||||
QlikAppResponse,
|
||||
QlikScriptResult,
|
||||
QlikSpace,
|
||||
QlikSpaceResponse,
|
||||
)
|
||||
@ -173,12 +176,18 @@ class QlikCloudClient:
|
||||
models = self._websocket_send_request(GET_LOADMODEL_LAYOUT, response=True)
|
||||
data_models = QlikDataModelResult(**models)
|
||||
layout = data_models.result.qLayout
|
||||
parsed_datamodels = []
|
||||
if isinstance(layout, list):
|
||||
tables = []
|
||||
for layout in data_models.result.qLayout:
|
||||
tables.extend(layout.value.tables)
|
||||
return tables
|
||||
return layout.tables
|
||||
parsed_datamodels.extend(tables)
|
||||
else:
|
||||
parsed_datamodels.extend(layout.tables)
|
||||
script_tables = self.get_script_tables()
|
||||
if script_tables:
|
||||
parsed_datamodels.extend(script_tables)
|
||||
return parsed_datamodels
|
||||
except Exception:
|
||||
logger.debug(traceback.format_exc())
|
||||
logger.warning("Failed to fetch the dashboard datamodels")
|
||||
@ -204,3 +213,26 @@ class QlikCloudClient:
|
||||
except Exception:
|
||||
logger.debug(traceback.format_exc())
|
||||
logger.warning("Failed to fetch the space list")
|
||||
|
||||
def get_script_tables(self) -> Optional[List[QlikTable]]:
|
||||
"""Get script tables from the dashboard script"""
|
||||
script_tables = []
|
||||
try:
|
||||
script_response = self._websocket_send_request(GET_SCRIPT, response=True)
|
||||
script_result = QlikScriptResult(**script_response)
|
||||
if script_result.result.qScript:
|
||||
script_value = script_result.result.qScript
|
||||
matches = re.findall(
|
||||
r'FROM\s+["\']?([a-zA-Z0-9_.]+)["\']?', script_value, re.IGNORECASE
|
||||
)
|
||||
if isinstance(matches, list):
|
||||
for table in matches:
|
||||
table_name = table.split(".")[-1]
|
||||
script_tables.append(QlikTable(tableName=table_name))
|
||||
if not script_tables:
|
||||
logger.warning("No script tables found")
|
||||
return script_tables
|
||||
except Exception:
|
||||
logger.debug(traceback.format_exc())
|
||||
logger.warning("Failed to fetch the script tables")
|
||||
return script_tables
|
||||
|
||||
@ -64,3 +64,9 @@ GET_LOADMODEL_LAYOUT = {
|
||||
"id": 5,
|
||||
"jsonrpc": "2.0",
|
||||
}
|
||||
GET_SCRIPT = {
|
||||
"handle": 3,
|
||||
"method": "GetScript",
|
||||
"id": 1,
|
||||
"jsonrpc": "2.0",
|
||||
}
|
||||
|
||||
@ -84,3 +84,11 @@ class QlikAppResponse(BaseModel):
|
||||
|
||||
apps: Optional[List[QlikApp]] = Field(None, alias="data")
|
||||
links: Optional[QlikLinks] = None
|
||||
|
||||
|
||||
class QlikScript(BaseModel):
|
||||
qScript: Optional[str] = None
|
||||
|
||||
|
||||
class QlikScriptResult(BaseModel):
|
||||
result: Optional[QlikScript] = QlikScript()
|
||||
|
||||
@ -417,3 +417,54 @@ class QlikCloudUnitTest(TestCase):
|
||||
== SHARED_APP_DASHBOARD_IN_MOCK_DASHBOARDS
|
||||
+ PERSONAL_APP_DASHBOARD_IN_MOCK_DASHBOARDS
|
||||
)
|
||||
|
||||
@pytest.mark.order(9)
|
||||
def test_get_script_tables(self):
|
||||
"""Test the get_script_tables method that extracts table names from Qlik scripts"""
|
||||
# Mock script content with FROM clauses
|
||||
mock_script = """
|
||||
LOAD * FROM 'mock_schema.sales_data';
|
||||
LOAD column1, column2 FROM database.schema.customers;
|
||||
LEFT JOIN products ON sales_data.product_id = products.id;
|
||||
"""
|
||||
|
||||
mock_script_response = {"result": {"qScript": mock_script}}
|
||||
|
||||
with patch.object(
|
||||
QlikCloudClient,
|
||||
"_websocket_send_request",
|
||||
return_value=mock_script_response,
|
||||
):
|
||||
script_tables = self.qlikcloud.client.get_script_tables()
|
||||
|
||||
# Expected table names extracted from the script
|
||||
expected_table_names = ["sales_data", "customers"]
|
||||
|
||||
# Verify that we got the expected number of tables
|
||||
assert len(script_tables) == len(
|
||||
expected_table_names
|
||||
), f"Expected {len(expected_table_names)} tables, but got {len(script_tables)}"
|
||||
|
||||
# Verify table names are correctly extracted
|
||||
actual_table_names = [table.tableName for table in script_tables]
|
||||
for expected_name in expected_table_names:
|
||||
assert (
|
||||
expected_name in actual_table_names
|
||||
), f"Expected table '{expected_name}' not found in {actual_table_names}"
|
||||
|
||||
@pytest.mark.order(10)
|
||||
def test_get_script_tables_empty(self):
|
||||
"""Test the get_script_tables method with empty script"""
|
||||
mock_script_response = {"result": {"qScript": ""}}
|
||||
|
||||
with patch.object(
|
||||
QlikCloudClient,
|
||||
"_websocket_send_request",
|
||||
return_value=mock_script_response,
|
||||
):
|
||||
script_tables = self.qlikcloud.client.get_script_tables()
|
||||
|
||||
# Should return empty list for empty script
|
||||
assert (
|
||||
len(script_tables) == 0
|
||||
), f"Expected 0 tables for empty script, but got {len(script_tables)}"
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user