mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-10-17 11:43:54 +00:00
MINOR: Implemented SAPERP Connector feedback (#17137)
* Implemented saperp feedback * Fixed pytest
This commit is contained in:
parent
c6e7938e47
commit
b6745d7cf1
@ -13,7 +13,6 @@ Client to interact with SAP ERP APIs
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
import math
|
import math
|
||||||
import time
|
|
||||||
import traceback
|
import traceback
|
||||||
from typing import Any, List, Optional, Union
|
from typing import Any, List, Optional, Union
|
||||||
|
|
||||||
@ -21,6 +20,7 @@ from metadata.generated.schema.entity.services.connections.database.sapErpConnec
|
|||||||
SapErpConnection,
|
SapErpConnection,
|
||||||
)
|
)
|
||||||
from metadata.ingestion.ometa.client import REST, ClientConfig
|
from metadata.ingestion.ometa.client import REST, ClientConfig
|
||||||
|
from metadata.ingestion.source.database.saperp.constants import PARAMS_DATA
|
||||||
from metadata.ingestion.source.database.saperp.models import (
|
from metadata.ingestion.source.database.saperp.models import (
|
||||||
SapErpColumn,
|
SapErpColumn,
|
||||||
SapErpColumnResponse,
|
SapErpColumnResponse,
|
||||||
@ -29,6 +29,7 @@ from metadata.ingestion.source.database.saperp.models import (
|
|||||||
)
|
)
|
||||||
from metadata.utils.helpers import clean_uri
|
from metadata.utils.helpers import clean_uri
|
||||||
from metadata.utils.logger import ingestion_logger
|
from metadata.utils.logger import ingestion_logger
|
||||||
|
from metadata.utils.ssl_registry import get_verify_ssl_fn
|
||||||
|
|
||||||
logger = ingestion_logger()
|
logger = ingestion_logger()
|
||||||
|
|
||||||
@ -49,14 +50,17 @@ class SapErpClient:
|
|||||||
def __init__(self, config: SapErpConnection):
|
def __init__(self, config: SapErpConnection):
|
||||||
self.config: SapErpConnection = config
|
self.config: SapErpConnection = config
|
||||||
self.auth_token = self.config.apiKey.get_secret_value()
|
self.auth_token = self.config.apiKey.get_secret_value()
|
||||||
|
get_verify_ssl = get_verify_ssl_fn(config.verifySSL)
|
||||||
client_config: ClientConfig = ClientConfig(
|
client_config: ClientConfig = ClientConfig(
|
||||||
base_url=clean_uri(config.hostPort),
|
base_url=clean_uri(config.hostPort),
|
||||||
auth_header="APIKey",
|
auth_header="APIKey",
|
||||||
auth_token_mode="",
|
auth_token_mode="",
|
||||||
auth_token=lambda: (self.auth_token, 0),
|
auth_token=lambda: (self.auth_token, 0),
|
||||||
api_version="v1",
|
api_version="v1",
|
||||||
verify=False,
|
|
||||||
allow_redirects=True,
|
allow_redirects=True,
|
||||||
|
retry_codes=[500, 504],
|
||||||
|
retry_wait=5,
|
||||||
|
verify=get_verify_ssl(config.sslConfig),
|
||||||
)
|
)
|
||||||
self.client = REST(client_config)
|
self.client = REST(client_config)
|
||||||
|
|
||||||
@ -64,7 +68,7 @@ class SapErpClient:
|
|||||||
"""
|
"""
|
||||||
Check metadata connection to SAS ERP tables API
|
Check metadata connection to SAS ERP tables API
|
||||||
"""
|
"""
|
||||||
params_data = {"$top": "1", "$format": "json", "$inlinecount": "allpages"}
|
params_data = PARAMS_DATA
|
||||||
response_data = self.client._request( # pylint: disable=protected-access
|
response_data = self.client._request( # pylint: disable=protected-access
|
||||||
method="GET",
|
method="GET",
|
||||||
path="/ECC/DDIC/ZZ_I_DDIC_TAB_CDS/",
|
path="/ECC/DDIC/ZZ_I_DDIC_TAB_CDS/",
|
||||||
@ -81,7 +85,7 @@ class SapErpClient:
|
|||||||
"""
|
"""
|
||||||
Check metadata connection to SAP ERP columns API
|
Check metadata connection to SAP ERP columns API
|
||||||
"""
|
"""
|
||||||
params_data = {"$top": "1", "$format": "json", "$inlinecount": "allpages"}
|
params_data = PARAMS_DATA
|
||||||
response_data = self.client._request( # pylint: disable=protected-access
|
response_data = self.client._request( # pylint: disable=protected-access
|
||||||
method="GET",
|
method="GET",
|
||||||
path="/ECC/DDIC/ZZ_I_DDIC_COL_CDS/",
|
path="/ECC/DDIC/ZZ_I_DDIC_COL_CDS/",
|
||||||
@ -101,7 +105,7 @@ class SapErpClient:
|
|||||||
Method to paginate the APIs
|
Method to paginate the APIs
|
||||||
"""
|
"""
|
||||||
entities_list = []
|
entities_list = []
|
||||||
params_data.update({"$top": "1", "$format": "json", "$inlinecount": "allpages"})
|
params_data.update(PARAMS_DATA)
|
||||||
response_data = self.client._request( # pylint: disable=protected-access
|
response_data = self.client._request( # pylint: disable=protected-access
|
||||||
method="GET", path=api_url, headers=HEADERS, data=params_data
|
method="GET", path=api_url, headers=HEADERS, data=params_data
|
||||||
)
|
)
|
||||||
@ -123,9 +127,6 @@ class SapErpClient:
|
|||||||
)
|
)
|
||||||
response = model_class(**response_data)
|
response = model_class(**response_data)
|
||||||
entities_list.extend(response.d.results)
|
entities_list.extend(response.d.results)
|
||||||
# Adding a delay before sending the requests to server
|
|
||||||
# due to server throwing error when too many requests are sent
|
|
||||||
time.sleep(0.5)
|
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
logger.debug(traceback.format_exc())
|
logger.debug(traceback.format_exc())
|
||||||
logger.warning(f"Error fetching entities for pagination: {exc}")
|
logger.warning(f"Error fetching entities for pagination: {exc}")
|
||||||
@ -152,7 +153,10 @@ class SapErpClient:
|
|||||||
List all the columns on the SAP ERP instance
|
List all the columns on the SAP ERP instance
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
params_data = {"$filter": f"tabname eq '{table_name}'"}
|
logger.debug(f"Fetching columns for table {table_name}")
|
||||||
|
params_data = {
|
||||||
|
"$filter": f"tabname eq '{table_name}' and fieldname ne '.INCLUDE'"
|
||||||
|
}
|
||||||
table_columns = self.paginate(
|
table_columns = self.paginate(
|
||||||
api_url="/ECC/DDIC/ZZ_I_DDIC_COL_CDS/",
|
api_url="/ECC/DDIC/ZZ_I_DDIC_COL_CDS/",
|
||||||
params_data=params_data,
|
params_data=params_data,
|
||||||
|
@ -23,3 +23,5 @@ TABLE_TYPE_MAP = {
|
|||||||
"VIEW": TableType.View,
|
"VIEW": TableType.View,
|
||||||
"APPEND": TableType.View,
|
"APPEND": TableType.View,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
PARAMS_DATA = {"$top": "1", "$format": "json", "$inlinecount": "allpages"}
|
||||||
|
@ -126,32 +126,24 @@ class SaperpSource(CommonDbSourceService):
|
|||||||
f"Unable to process table information for table: {str(table_name)} - {err}"
|
f"Unable to process table information for table: {str(table_name)} - {err}"
|
||||||
)
|
)
|
||||||
|
|
||||||
def _check_col_length( # pylint: disable=arguments-renamed
|
def _check_col_length( # pylint: disable=arguments-differ
|
||||||
self, datatype: str
|
self, datatype: str, col_length: Optional[str]
|
||||||
) -> Optional[int]:
|
) -> Optional[int]:
|
||||||
"""
|
"""
|
||||||
return the column length for the dataLength attribute
|
return the column length for the dataLength attribute
|
||||||
"""
|
"""
|
||||||
if datatype and datatype.upper() in {"CHAR", "VARCHAR", "BINARY", "VARBINARY"}:
|
|
||||||
return 1
|
|
||||||
return None
|
|
||||||
|
|
||||||
def _merge_col_descriptions(self, column: SapErpColumn) -> Optional[str]:
|
|
||||||
"""
|
|
||||||
Method to merge the column descriptions from different fields
|
|
||||||
"""
|
|
||||||
description = None
|
|
||||||
try:
|
try:
|
||||||
if column.scrtext_l:
|
if datatype and datatype.upper() in {
|
||||||
description = f"**{column.scrtext_l}**"
|
"CHAR",
|
||||||
if column.i_ddtext and column.scrtext_l != column.i_ddtext:
|
"VARCHAR",
|
||||||
description = f"{description}\n{column.i_ddtext}"
|
"BINARY",
|
||||||
|
"VARBINARY",
|
||||||
|
}:
|
||||||
|
return int(col_length) if col_length else None
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
logger.debug(traceback.format_exc())
|
logger.debug(traceback.format_exc())
|
||||||
logger.warning(
|
logger.warning(f"Failed to fetch column length: {exc}")
|
||||||
f"Unable to get column descriptions for {column.fieldname}: {exc}"
|
return None
|
||||||
)
|
|
||||||
return description
|
|
||||||
|
|
||||||
def _get_table_constraints(
|
def _get_table_constraints(
|
||||||
self, columns: Optional[List[Column]]
|
self, columns: Optional[List[Column]]
|
||||||
@ -197,6 +189,16 @@ class SaperpSource(CommonDbSourceService):
|
|||||||
return Constraint.PRIMARY_KEY
|
return Constraint.PRIMARY_KEY
|
||||||
return Constraint.NOT_NULL if column.notnull == "X" else Constraint.NULL
|
return Constraint.NOT_NULL if column.notnull == "X" else Constraint.NULL
|
||||||
|
|
||||||
|
def _get_display_datatype( # pylint: disable=arguments-differ
|
||||||
|
self, column_type: str, col_data_length: Optional[int]
|
||||||
|
) -> str:
|
||||||
|
"""
|
||||||
|
Method to get the display datatype
|
||||||
|
"""
|
||||||
|
if col_data_length:
|
||||||
|
return f"{column_type}({str(col_data_length)})"
|
||||||
|
return column_type
|
||||||
|
|
||||||
def get_columns_and_constraints( # pylint: disable=arguments-differ
|
def get_columns_and_constraints( # pylint: disable=arguments-differ
|
||||||
self, table_name: str
|
self, table_name: str
|
||||||
) -> ColumnsAndConstraints:
|
) -> ColumnsAndConstraints:
|
||||||
@ -209,7 +211,9 @@ class SaperpSource(CommonDbSourceService):
|
|||||||
for sap_column in sap_columns or []:
|
for sap_column in sap_columns or []:
|
||||||
try:
|
try:
|
||||||
column_type = ColumnTypeParser.get_column_type(sap_column.datatype)
|
column_type = ColumnTypeParser.get_column_type(sap_column.datatype)
|
||||||
data_type_display = column_type
|
col_data_length = self._check_col_length(
|
||||||
|
datatype=column_type, col_length=sap_column.leng
|
||||||
|
)
|
||||||
column_name = (
|
column_name = (
|
||||||
f"{sap_column.fieldname}({sap_column.precfield})"
|
f"{sap_column.fieldname}({sap_column.precfield})"
|
||||||
if sap_column.precfield
|
if sap_column.precfield
|
||||||
@ -221,6 +225,10 @@ class SaperpSource(CommonDbSourceService):
|
|||||||
logger.warning(
|
logger.warning(
|
||||||
f"Unknown type {repr(sap_column.datatype)}: {sap_column.fieldname}"
|
f"Unknown type {repr(sap_column.datatype)}: {sap_column.fieldname}"
|
||||||
)
|
)
|
||||||
|
data_type_display = self._get_display_datatype(
|
||||||
|
column_type, col_data_length
|
||||||
|
)
|
||||||
|
col_data_length = 1 if col_data_length is None else col_data_length
|
||||||
om_column = Column(
|
om_column = Column(
|
||||||
name=ColumnName(
|
name=ColumnName(
|
||||||
root=column_name
|
root=column_name
|
||||||
@ -229,15 +237,17 @@ class SaperpSource(CommonDbSourceService):
|
|||||||
if column_name
|
if column_name
|
||||||
else " "
|
else " "
|
||||||
),
|
),
|
||||||
displayName=sap_column.fieldname,
|
displayName=sap_column.scrtext_l
|
||||||
description=self._merge_col_descriptions(column=sap_column),
|
if sap_column.scrtext_l
|
||||||
|
else sap_column.fieldname,
|
||||||
|
description=sap_column.i_ddtext,
|
||||||
dataType=column_type,
|
dataType=column_type,
|
||||||
dataTypeDisplay=data_type_display,
|
dataTypeDisplay=data_type_display,
|
||||||
ordinalPosition=int(sap_column.POS),
|
ordinalPosition=int(sap_column.POS),
|
||||||
constraint=self._get_column_constraint(
|
constraint=self._get_column_constraint(
|
||||||
column=sap_column, pk_columns=table_constraints_model.pk_columns
|
column=sap_column, pk_columns=table_constraints_model.pk_columns
|
||||||
),
|
),
|
||||||
dataLength=self._check_col_length(datatype=column_type),
|
dataLength=col_data_length,
|
||||||
)
|
)
|
||||||
if column_type == DataType.ARRAY.value:
|
if column_type == DataType.ARRAY.value:
|
||||||
om_column.arrayDataType = DataType.UNKNOWN
|
om_column.arrayDataType = DataType.UNKNOWN
|
||||||
@ -295,3 +305,6 @@ class SaperpSource(CommonDbSourceService):
|
|||||||
name=table.tabname, error=error, stackTrace=traceback.format_exc()
|
name=table.tabname, error=error, stackTrace=traceback.format_exc()
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
self.metadata.close()
|
||||||
|
@ -44,6 +44,7 @@ class SapErpColumn(BaseModel):
|
|||||||
scrtext_l: Optional[str] = None
|
scrtext_l: Optional[str] = None
|
||||||
i_ddtext: Optional[str] = None
|
i_ddtext: Optional[str] = None
|
||||||
dd_text: Optional[str] = None
|
dd_text: Optional[str] = None
|
||||||
|
leng: Optional[str] = None
|
||||||
|
|
||||||
|
|
||||||
class SapErpTableList(BaseModel):
|
class SapErpTableList(BaseModel):
|
||||||
|
@ -111,16 +111,14 @@ EXPECTED_TABLES_AND_COLUMNS = [
|
|||||||
columns=[
|
columns=[
|
||||||
Column(
|
Column(
|
||||||
name=ColumnName(root="BUKRS"),
|
name=ColumnName(root="BUKRS"),
|
||||||
displayName="BUKRS",
|
displayName="Pstng period variant",
|
||||||
dataType="CHAR",
|
dataType="CHAR",
|
||||||
arrayDataType=None,
|
arrayDataType=None,
|
||||||
dataLength=1,
|
dataLength=4,
|
||||||
precision=None,
|
precision=None,
|
||||||
scale=None,
|
scale=None,
|
||||||
dataTypeDisplay="CHAR",
|
dataTypeDisplay="CHAR(4)",
|
||||||
description=Markdown(
|
description=Markdown(root="Posting Period Variant"),
|
||||||
root="**Pstng period variant**\nPosting Period Variant"
|
|
||||||
),
|
|
||||||
fullyQualifiedName=None,
|
fullyQualifiedName=None,
|
||||||
tags=None,
|
tags=None,
|
||||||
constraint=None,
|
constraint=None,
|
||||||
@ -132,16 +130,14 @@ EXPECTED_TABLES_AND_COLUMNS = [
|
|||||||
),
|
),
|
||||||
Column(
|
Column(
|
||||||
name=ColumnName(root="FIELD"),
|
name=ColumnName(root="FIELD"),
|
||||||
displayName="FIELD",
|
displayName="GL Field Name",
|
||||||
dataType="CHAR",
|
dataType="CHAR",
|
||||||
arrayDataType=None,
|
arrayDataType=None,
|
||||||
dataLength=1,
|
dataLength=30,
|
||||||
precision=None,
|
precision=None,
|
||||||
scale=None,
|
scale=None,
|
||||||
dataTypeDisplay="CHAR",
|
dataTypeDisplay="CHAR(30)",
|
||||||
description=Markdown(
|
description=Markdown(root="General Ledger Field Name"),
|
||||||
root="**GL Field Name**\nGeneral Ledger Field Name"
|
|
||||||
),
|
|
||||||
fullyQualifiedName=None,
|
fullyQualifiedName=None,
|
||||||
tags=None,
|
tags=None,
|
||||||
constraint=None,
|
constraint=None,
|
||||||
@ -153,14 +149,14 @@ EXPECTED_TABLES_AND_COLUMNS = [
|
|||||||
),
|
),
|
||||||
Column(
|
Column(
|
||||||
name=ColumnName(root="MANDT"),
|
name=ColumnName(root="MANDT"),
|
||||||
displayName="MANDT",
|
displayName="Client",
|
||||||
dataType="INT",
|
dataType="INT",
|
||||||
arrayDataType=None,
|
arrayDataType=None,
|
||||||
dataLength=None,
|
dataLength=1,
|
||||||
precision=None,
|
precision=None,
|
||||||
scale=None,
|
scale=None,
|
||||||
dataTypeDisplay="INT",
|
dataTypeDisplay="INT",
|
||||||
description=Markdown(root="**Client**"),
|
description=Markdown(root="Client"),
|
||||||
fullyQualifiedName=None,
|
fullyQualifiedName=None,
|
||||||
tags=None,
|
tags=None,
|
||||||
constraint=None,
|
constraint=None,
|
||||||
@ -172,14 +168,14 @@ EXPECTED_TABLES_AND_COLUMNS = [
|
|||||||
),
|
),
|
||||||
Column(
|
Column(
|
||||||
name=ColumnName(root="RRCTY"),
|
name=ColumnName(root="RRCTY"),
|
||||||
displayName="RRCTY",
|
displayName="Record Type",
|
||||||
dataType="CHAR",
|
dataType="CHAR",
|
||||||
arrayDataType=None,
|
arrayDataType=None,
|
||||||
dataLength=1,
|
dataLength=1,
|
||||||
precision=None,
|
precision=None,
|
||||||
scale=None,
|
scale=None,
|
||||||
dataTypeDisplay="CHAR",
|
dataTypeDisplay="CHAR(1)",
|
||||||
description=Markdown(root="**Record Type**"),
|
description=Markdown(root="Record Type"),
|
||||||
fullyQualifiedName=None,
|
fullyQualifiedName=None,
|
||||||
tags=None,
|
tags=None,
|
||||||
constraint=None,
|
constraint=None,
|
||||||
@ -225,16 +221,14 @@ EXPECTED_TABLES_AND_COLUMNS = [
|
|||||||
columns=[
|
columns=[
|
||||||
Column(
|
Column(
|
||||||
name=ColumnName(root="BKONT"),
|
name=ColumnName(root="BKONT"),
|
||||||
displayName="BKONT",
|
displayName="To Account Assmnt",
|
||||||
dataType="CHAR",
|
dataType="CHAR",
|
||||||
arrayDataType=None,
|
arrayDataType=None,
|
||||||
dataLength=1,
|
dataLength=30,
|
||||||
precision=None,
|
precision=None,
|
||||||
scale=None,
|
scale=None,
|
||||||
dataTypeDisplay="CHAR",
|
dataTypeDisplay="CHAR(30)",
|
||||||
description=Markdown(
|
description=Markdown(root="To Account Assignment"),
|
||||||
root="**To Account Assmnt**\nTo Account Assignment"
|
|
||||||
),
|
|
||||||
fullyQualifiedName=None,
|
fullyQualifiedName=None,
|
||||||
tags=None,
|
tags=None,
|
||||||
constraint=None,
|
constraint=None,
|
||||||
@ -246,14 +240,14 @@ EXPECTED_TABLES_AND_COLUMNS = [
|
|||||||
),
|
),
|
||||||
Column(
|
Column(
|
||||||
name=ColumnName(root="BRGRU"),
|
name=ColumnName(root="BRGRU"),
|
||||||
displayName="BRGRU",
|
displayName="Authorization Group",
|
||||||
dataType="CHAR",
|
dataType="CHAR",
|
||||||
arrayDataType=None,
|
arrayDataType=None,
|
||||||
dataLength=1,
|
dataLength=4,
|
||||||
precision=None,
|
precision=None,
|
||||||
scale=None,
|
scale=None,
|
||||||
dataTypeDisplay="CHAR",
|
dataTypeDisplay="CHAR(4)",
|
||||||
description=Markdown(root="**Authorization Group**"),
|
description=Markdown(root="Authorization Group"),
|
||||||
fullyQualifiedName=None,
|
fullyQualifiedName=None,
|
||||||
tags=None,
|
tags=None,
|
||||||
constraint="NOT_NULL",
|
constraint="NOT_NULL",
|
||||||
@ -265,16 +259,14 @@ EXPECTED_TABLES_AND_COLUMNS = [
|
|||||||
),
|
),
|
||||||
Column(
|
Column(
|
||||||
name=ColumnName(root="BUKRS"),
|
name=ColumnName(root="BUKRS"),
|
||||||
displayName="BUKRS",
|
displayName="Pstng period variant",
|
||||||
dataType="CHAR",
|
dataType="CHAR",
|
||||||
arrayDataType=None,
|
arrayDataType=None,
|
||||||
dataLength=1,
|
dataLength=4,
|
||||||
precision=None,
|
precision=None,
|
||||||
scale=None,
|
scale=None,
|
||||||
dataTypeDisplay="CHAR",
|
dataTypeDisplay="CHAR(4)",
|
||||||
description=Markdown(
|
description=Markdown(root="Posting Period Variant"),
|
||||||
root="**Pstng period variant**\nPosting Period Variant"
|
|
||||||
),
|
|
||||||
fullyQualifiedName=None,
|
fullyQualifiedName=None,
|
||||||
tags=None,
|
tags=None,
|
||||||
constraint=None,
|
constraint=None,
|
||||||
@ -370,7 +362,6 @@ class SapErpUnitTest(TestCase):
|
|||||||
returned_tables.extend(
|
returned_tables.extend(
|
||||||
[either.right for either in self.saperp.yield_table(table)]
|
[either.right for either in self.saperp.yield_table(table)]
|
||||||
)
|
)
|
||||||
print(returned_tables)
|
|
||||||
for _, (expected, original) in enumerate(
|
for _, (expected, original) in enumerate(
|
||||||
zip(EXPECTED_TABLES_AND_COLUMNS, returned_tables)
|
zip(EXPECTED_TABLES_AND_COLUMNS, returned_tables)
|
||||||
):
|
):
|
||||||
|
@ -49,6 +49,13 @@
|
|||||||
"type": "integer",
|
"type": "integer",
|
||||||
"default": 10
|
"default": 10
|
||||||
},
|
},
|
||||||
|
"verifySSL": {
|
||||||
|
"$ref": "../../../../security/ssl/verifySSLConfig.json#/definitions/verifySSL",
|
||||||
|
"default": "no-ssl"
|
||||||
|
},
|
||||||
|
"sslConfig": {
|
||||||
|
"$ref": "../../../../security/ssl/verifySSLConfig.json#/definitions/sslConfig"
|
||||||
|
},
|
||||||
"connectionOptions": {
|
"connectionOptions": {
|
||||||
"title": "Connection Options",
|
"title": "Connection Options",
|
||||||
"$ref": "../connectionBasicType.json#/definitions/connectionOptions"
|
"$ref": "../connectionBasicType.json#/definitions/connectionOptions"
|
||||||
|
Loading…
x
Reference in New Issue
Block a user