Fix #15163: Added SAP ERP Connector

This commit is contained in:
Onkar Ravgan 2024-07-04 10:57:46 +05:30 committed by GitHub
parent a3e6ba2eef
commit 80efc7075f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
21 changed files with 1437 additions and 3 deletions

View File

@ -0,0 +1,24 @@
source:
type: SapErp
serviceName: local_saperp
serviceConnection:
config:
type: SapErp
hostPort: https://localhost.com
apiKey: api_key
databaseName: databaseName
databaseSchema: databaseSchema
paginationLimit: 10
sourceConfig:
config:
type: DatabaseMetadata
sink:
type: metadata-rest
config: {}
workflowConfig:
# loggerLevel: INFO # DEBUG, INFO, WARN or ERROR
openMetadataServerConfig:
hostPort: http://localhost:8585/api
authProvider: openmetadata
securityConfig:
jwtToken: "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg"

View File

@ -276,6 +276,22 @@ class ColumnTypeParser:
"WDC_BOOL": "BOOLEAN",
"WDC_DATE": "DATE",
"WDC_GEOMETRY": "GEOMETRY",
# SAP ERP
"CLNT": "INT",
"INT1": "INT",
"LRAW": "BLOB",
"UNIT": "CHAR",
"NUMC": "CHAR",
"LANG": "CHAR",
"CUKY": "CHAR",
"DATS": "DATE",
"TIMS": "TIME",
"FLTP": "FLOAT",
"QUAN": "DECIMAL",
"DEC": "DECIMAL",
"CURR": "DECIMAL",
"STRG": "STRING",
"RSTR": "STRING",
}
_COMPLEX_TYPE = re.compile("^(struct|map|array|uniontype)")

View File

@ -0,0 +1,166 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Client to interact with SAP ERP APIs
"""
import math
import time
import traceback
from typing import Any, List, Optional, Union
from metadata.generated.schema.entity.services.connections.database.sapErpConnection import (
SapErpConnection,
)
from metadata.ingestion.ometa.client import REST, ClientConfig
from metadata.ingestion.source.database.saperp.models import (
SapErpColumn,
SapErpColumnResponse,
SapErpTable,
SapErpTableResponse,
)
from metadata.utils.helpers import clean_uri
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
HEADERS = {"Accept": "*/*"}
class SapErpApiException(Exception):
"""
Raise when API returns an error
"""
class SapErpClient:
"""
Client to interact with SAP ERP APIs
"""
def __init__(self, config: SapErpConnection):
self.config: SapErpConnection = config
self.auth_token = self.config.apiKey.get_secret_value()
client_config: ClientConfig = ClientConfig(
base_url=clean_uri(config.hostPort),
auth_header="APIKey",
auth_token_mode="",
auth_token=lambda: (self.auth_token, 0),
api_version="v1",
verify=False,
allow_redirects=True,
)
self.client = REST(client_config)
def test_table_api(self):
"""
Check metadata connection to SAS ERP tables API
"""
params_data = {"$top": "1", "$format": "json", "$inlinecount": "allpages"}
response_data = self.client._request( # pylint: disable=protected-access
method="GET",
path="/ECC/DDIC/ZZ_I_DDIC_TAB_CDS/",
headers=HEADERS,
data=params_data,
)
if response_data:
return response_data
raise SapErpApiException(
"Unable to fetch data from SAP ERP tables API check your connection."
)
def test_column_api(self):
"""
Check metadata connection to SAP ERP columns API
"""
params_data = {"$top": "1", "$format": "json", "$inlinecount": "allpages"}
response_data = self.client._request( # pylint: disable=protected-access
method="GET",
path="/ECC/DDIC/ZZ_I_DDIC_COL_CDS/",
headers=HEADERS,
data=params_data,
)
if response_data:
return response_data
raise SapErpApiException(
"Unable to fetch data from SAP ERP columns API check your connection."
)
def paginate(
self, api_url: str, params_data: dict, entities_per_page: int, model_class: Any
) -> List[Union[SapErpTable, SapErpColumn]]:
"""
Method to paginate the APIs
"""
entities_list = []
params_data.update({"$top": "1", "$format": "json", "$inlinecount": "allpages"})
response_data = self.client._request( # pylint: disable=protected-access
method="GET", path=api_url, headers=HEADERS, data=params_data
)
response = model_class(**response_data)
count = response.d.count
indexes = math.ceil(count / entities_per_page)
for index in range(indexes):
try:
params_data.update(
{
"$top": str(entities_per_page),
"$skip": str(index * entities_per_page),
}
)
response_data = (
self.client._request( # pylint: disable=protected-access
method="GET", path=api_url, headers=HEADERS, data=params_data
)
)
response = model_class(**response_data)
entities_list.extend(response.d.results)
# Adding a delay before sending the requests to server
# due to server throwing error when too many requests are sent
time.sleep(0.5)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(f"Error fetching entities for pagination: {exc}")
return entities_list
def list_tables(self) -> Optional[List[SapErpTable]]:
"""
List all tables on the SAP ERP instance
"""
table_list = []
params_data = {
"$select": "tabname,tabclass,ddtext",
}
table_list = self.paginate(
api_url="/ECC/DDIC/ZZ_I_DDIC_TAB_CDS/",
params_data=params_data,
entities_per_page=self.config.paginationLimit,
model_class=SapErpTableResponse,
)
return table_list or None
def list_columns(self, table_name: str) -> Optional[List[SapErpColumn]]:
"""
List all the columns on the SAP ERP instance
"""
try:
params_data = {"$filter": f"tabname eq '{table_name}'"}
table_columns = self.paginate(
api_url="/ECC/DDIC/ZZ_I_DDIC_COL_CDS/",
params_data=params_data,
entities_per_page=self.config.paginationLimit,
model_class=SapErpColumnResponse,
)
return table_columns or None
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(f"Error fetching columns for table {table_name}: {exc}")
return None

View File

@ -0,0 +1,50 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Source connection handler
"""
from typing import Optional
from metadata.generated.schema.entity.automations.workflow import (
Workflow as AutomationWorkflow,
)
from metadata.generated.schema.entity.services.connections.database.sapErpConnection import (
SapErpConnection,
)
from metadata.ingestion.connections.test_connections import test_connection_steps
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.database.saperp.client import SapErpClient
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
def get_connection(connection: SapErpConnection) -> SapErpClient:
return SapErpClient(connection)
def test_connection(
metadata: OpenMetadata,
client: SapErpClient,
service_connection: SapErpConnection,
automation_workflow: Optional[AutomationWorkflow] = None,
) -> None:
test_fn = {
"GetTables": client.test_table_api,
"GetColumns": client.test_column_api,
}
test_connection_steps(
metadata=metadata,
test_fn=test_fn,
service_type=service_connection.type.value,
automation_workflow=automation_workflow,
)

View File

@ -0,0 +1,25 @@
# Copyright 2023 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Constants Module for SAP ERP
"""
from metadata.generated.schema.entity.data.table import TableType
TABLE_TYPE_MAP = {
"TRANSP": TableType.Regular,
"INTTAB": TableType.View,
"CLUSTER": TableType.View,
"POOL": TableType.View,
"VIEW": TableType.View,
"APPEND": TableType.View,
}

View File

@ -0,0 +1,297 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
SAP ERP source module
"""
import traceback
from typing import Iterable, List, Optional
from metadata.generated.schema.api.data.createTable import CreateTableRequest
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
from metadata.generated.schema.entity.data.table import (
Column,
ColumnName,
Constraint,
ConstraintType,
DataType,
Table,
TableConstraint,
TableType,
)
from metadata.generated.schema.entity.services.connections.database.sapErpConnection import (
SapErpConnection,
)
from metadata.generated.schema.entity.services.ingestionPipelines.status import (
StackTraceError,
)
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.generated.schema.type.basic import (
EntityName,
FullyQualifiedEntityName,
Markdown,
)
from metadata.ingestion.api.models import Either
from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.database.column_type_parser import ColumnTypeParser
from metadata.ingestion.source.database.common_db_source import CommonDbSourceService
from metadata.ingestion.source.database.saperp.constants import TABLE_TYPE_MAP
from metadata.ingestion.source.database.saperp.models import (
ColumnsAndConstraints,
SapErpColumn,
SapErpTable,
TableConstraintsModel,
)
from metadata.utils import fqn
from metadata.utils.execution_time_tracker import calculate_execution_time_generator
from metadata.utils.filters import filter_by_table
from metadata.utils.helpers import clean_up_starting_ending_double_quotes_in_string
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
class SaperpSource(CommonDbSourceService):
"""
Implements the necessary methods to extract
Database metadata from Sap ERP Source
"""
@classmethod
def create(
cls, config_dict, metadata: OpenMetadata, pipeline_name: Optional[str] = None
):
config: WorkflowSource = WorkflowSource.model_validate(config_dict)
connection: SapErpConnection = config.serviceConnection.root.config
if not isinstance(connection, SapErpConnection):
raise InvalidSourceException(
f"Expected SapErpConnection, but got {connection}"
)
return cls(config, metadata)
def get_raw_database_schema_names(self) -> Iterable[str]:
if self.service_connection.__dict__.get("databaseSchema"):
yield self.service_connection.databaseSchema
else:
yield "default"
def get_tables_name_and_type(self) -> Optional[Iterable[SapErpTable]]:
"""
Ingest the tables from SAP ERP
"""
for table in self.connection_obj.list_tables() or []:
try:
table_name = table.tabname
table_type = TABLE_TYPE_MAP.get(table.tabclass, TableType.Regular)
if (
table_type == TableType.Regular and self.source_config.includeTables
) or (table_type == TableType.View and self.source_config.includeViews):
table_fqn = fqn.build(
self.metadata,
entity_type=Table,
service_name=self.context.get().database_service,
database_name=self.context.get().database,
schema_name=self.context.get().database_schema,
table_name=table_name,
skip_es_search=True,
)
if filter_by_table(
self.source_config.tableFilterPattern,
(
table_fqn
if self.source_config.useFqnForFiltering
else table_name
),
):
self.status.filter(
table_fqn,
"Table Filtered Out",
)
continue
yield table
except Exception as err:
logger.debug(traceback.format_exc())
logger.warning(
f"Unable to process table information for table: {str(table_name)} - {err}"
)
def _check_col_length( # pylint: disable=arguments-renamed
self, datatype: str
) -> Optional[int]:
"""
return the column length for the dataLength attribute
"""
if datatype and datatype.upper() in {"CHAR", "VARCHAR", "BINARY", "VARBINARY"}:
return 1
return None
def _merge_col_descriptions(self, column: SapErpColumn) -> Optional[str]:
"""
Method to merge the column descriptions from different fields
"""
description = None
try:
if column.scrtext_l:
description = f"**{column.scrtext_l}**"
if column.i_ddtext and column.scrtext_l != column.i_ddtext:
description = f"{description}\n{column.i_ddtext}"
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(
f"Unable to get column descriptions for {column.fieldname}: {exc}"
)
return description
def _get_table_constraints(
self, columns: Optional[List[Column]]
) -> TableConstraintsModel:
"""
Method to get the table constraints
"""
try:
table_constraints = []
pk_columns = []
# check if we have multiple primary keys and add them to the TableConstraints
for column in columns or []:
if column.keyflag:
pk_columns.append(
clean_up_starting_ending_double_quotes_in_string(
column.fieldname
)
)
if len(pk_columns) > 1:
table_constraints.append(
TableConstraint(
constraintType=ConstraintType.PRIMARY_KEY,
columns=pk_columns,
)
)
return TableConstraintsModel(
table_constraints=table_constraints or None, pk_columns=pk_columns
)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(f"Failed to fetch table constraints: {exc}")
return TableConstraintsModel()
def _get_column_constraint(self, column: SapErpColumn, pk_columns: List[str]):
"""
Method to get the column constraint
"""
if column.keyflag:
# In case of multiple primary keys return None for column constraints
# Multiple primary keys will be handled in table constraints
if len(pk_columns) > 1:
return None
return Constraint.PRIMARY_KEY
return Constraint.NOT_NULL if column.notnull == "X" else Constraint.NULL
def get_columns_and_constraints( # pylint: disable=arguments-differ
self, table_name: str
) -> ColumnsAndConstraints:
"""
Method to get the column metadata
"""
sap_columns = self.connection_obj.list_columns(table_name)
table_constraints_model = self._get_table_constraints(columns=sap_columns)
om_columns = []
for sap_column in sap_columns or []:
try:
column_type = ColumnTypeParser.get_column_type(sap_column.datatype)
data_type_display = column_type
column_name = (
f"{sap_column.fieldname}({sap_column.precfield})"
if sap_column.precfield
else sap_column.fieldname
)
if sap_column.datatype is None:
column_type = DataType.UNKNOWN.name
data_type_display = column_type.lower()
logger.warning(
f"Unknown type {repr(sap_column.datatype)}: {sap_column.fieldname}"
)
om_column = Column(
name=ColumnName(
root=column_name
# Passing whitespace if column name is an empty string
# since pydantic doesn't accept empty string
if column_name
else " "
),
displayName=sap_column.fieldname,
description=self._merge_col_descriptions(column=sap_column),
dataType=column_type,
dataTypeDisplay=data_type_display,
ordinalPosition=int(sap_column.POS),
constraint=self._get_column_constraint(
column=sap_column, pk_columns=table_constraints_model.pk_columns
),
dataLength=self._check_col_length(datatype=column_type),
)
if column_type == DataType.ARRAY.value:
om_column.arrayDataType = DataType.UNKNOWN
om_columns.append(om_column)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(
f"Unable to get column details for {sap_column.fieldname}: {exc}"
)
return ColumnsAndConstraints(
columns=om_columns,
table_constraints=table_constraints_model.table_constraints,
)
# pylint: disable=arguments-renamed
@calculate_execution_time_generator()
def yield_table(self, table: SapErpTable) -> Iterable[Either[CreateTableRequest]]:
"""
From topology.
Prepare a table request and pass it to the sink
"""
schema_name = self.context.get().database_schema
try:
columns_and_constraints = self.get_columns_and_constraints(
table_name=table.tabname
)
table_request = CreateTableRequest(
name=EntityName(table.tabname),
tableType=TABLE_TYPE_MAP.get(table.tabclass, TableType.Regular),
description=Markdown(table.ddtext),
columns=columns_and_constraints.columns,
tableConstraints=columns_and_constraints.table_constraints,
databaseSchema=FullyQualifiedEntityName(
fqn.build(
metadata=self.metadata,
entity_type=DatabaseSchema,
service_name=self.context.get().database_service,
database_name=self.context.get().database,
schema_name=schema_name,
)
),
)
yield Either(right=table_request)
# Register the request that we'll handle during the deletion checks
self.register_record(table_request=table_request)
except Exception as exc:
error = f"Unexpected exception to yield table [{table.tabname}]: {exc}"
yield Either(
left=StackTraceError(
name=table.tabname, error=error, stackTrace=traceback.format_exc()
)
)

View File

@ -0,0 +1,98 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
SAP ERP API models
"""
from typing import List, Optional
from pydantic import BaseModel, Field
from metadata.generated.schema.entity.data.table import Column, TableConstraint
class SapErpTable(BaseModel):
"""
SAP ERP Table model
"""
tabname: str
tabclass: Optional[str] = None
ddtext: Optional[str] = None
class SapErpColumn(BaseModel):
"""
SAP ERP Column model
"""
tabname: str
fieldname: Optional[str] = None
precfield: Optional[str] = None
datatype: Optional[str] = None
POS: Optional[int] = None
notnull: Optional[str] = None
keyflag: Optional[bool] = None
scrtext_l: Optional[str] = None
i_ddtext: Optional[str] = None
dd_text: Optional[str] = None
class SapErpTableList(BaseModel):
"""
SAP ERP Table List model
"""
count: Optional[int] = Field(alias="__count")
results: Optional[List[SapErpTable]] = None
class SapErpTableResponse(BaseModel):
"""
SAP ERP Tables Response model
"""
d: Optional[SapErpTableList] = None
class SapErpColumnList(BaseModel):
"""
SAP ERP Column List model
"""
count: Optional[int] = Field(alias="__count")
results: Optional[List[SapErpColumn]] = None
class SapErpColumnResponse(BaseModel):
"""
SAP ERP Columns Response model
"""
d: Optional[SapErpColumnList] = None
class ColumnsAndConstraints(BaseModel):
"""
Wrapper Model for columns and constraints
"""
columns: Optional[List[Column]]
table_constraints: Optional[List[TableConstraint]]
class TableConstraintsModel(BaseModel):
"""
Wrapper Model for table constraints and primary key columns list
"""
table_constraints: Optional[List[TableConstraint]] = None
pk_columns: List[str] = []

View File

@ -0,0 +1,191 @@
[
{
"tabname": "T001B_PS",
"inttype": "C",
"intlen": "000004",
"reftable": "",
"precfield": "",
"reffield": "",
"notnull": "X",
"datatype": "CHAR",
"leng": "000004",
"decimals": "000000",
"domname": "OPVAR",
"fieldname": "BUKRS",
"comptype": "E",
"reftype": "",
"ddtext": "",
"rollname": "OPVAR",
"scrtext_l": "Pstng period variant",
"i_ddtext": "Posting Period Variant",
"as4local": "A",
"as4vers": "0000",
"POS": "0003",
"LANG": "E",
"keyflag": true,
"mandatory": false,
"checktable": "T010O"
},
{
"tabname": "T001B_PS",
"inttype": "C",
"intlen": "000030",
"reftable": "",
"precfield": "",
"reffield": "",
"notnull": "X",
"datatype": "CHAR",
"leng": "000030",
"decimals": "000000",
"domname": "FDNAME",
"fieldname": "FIELD",
"comptype": "E",
"reftype": "",
"ddtext": "",
"rollname": "FAGL_GLFLEX_FIELDNAME",
"scrtext_l": "GL Field Name",
"i_ddtext": "General Ledger Field Name",
"as4local": "A",
"as4vers": "0000",
"POS": "0004",
"LANG": "E",
"keyflag": true,
"mandatory": false,
"checktable": "*"
},
{
"tabname": "T001B_PS",
"inttype": "C",
"intlen": "000003",
"reftable": "",
"precfield": "",
"reffield": "",
"notnull": "X",
"datatype": "CLNT",
"leng": "000003",
"decimals": "000000",
"domname": "MANDT",
"fieldname": "MANDT",
"comptype": "E",
"reftype": "",
"ddtext": "",
"rollname": "MANDT",
"scrtext_l": "Client",
"i_ddtext": "Client",
"as4local": "A",
"as4vers": "0000",
"POS": "0001",
"LANG": "E",
"keyflag": true,
"mandatory": false,
"checktable": "T000"
},
{
"tabname": "T001B_PS",
"inttype": "C",
"intlen": "000001",
"reftable": "",
"precfield": "",
"reffield": "",
"notnull": "X",
"datatype": "CHAR",
"leng": "000001",
"decimals": "000000",
"domname": "RRCTY",
"fieldname": "RRCTY",
"comptype": "E",
"reftype": "",
"ddtext": "",
"rollname": "RRCTY",
"scrtext_l": "Record Type",
"i_ddtext": "Record Type",
"as4local": "A",
"as4vers": "0000",
"POS": "0002",
"LANG": "E",
"keyflag": true,
"mandatory": false,
"checktable": ""
},
{
"tabname": "T001B_PS_PER",
"inttype": "C",
"intlen": "000030",
"reftable": "",
"precfield": "",
"reffield": "",
"notnull": "X",
"datatype": "CHAR",
"leng": "000030",
"decimals": "000000",
"domname": "MAX_LNG_PSELEMENT",
"fieldname": "BKONT",
"comptype": "E",
"reftype": "",
"ddtext": "",
"rollname": "BFM_ETNTITY",
"scrtext_l": "To Account Assmnt",
"i_ddtext": "To Account Assignment",
"as4local": "A",
"as4vers": "0000",
"POS": "0005",
"LANG": "E",
"keyflag": true,
"mandatory": false,
"checktable": ""
},
{
"tabname": "T001B_PS_PER",
"inttype": "C",
"intlen": "000004",
"reftable": "",
"precfield": "",
"reffield": "",
"notnull": "X",
"datatype": "CHAR",
"leng": "000004",
"decimals": "000000",
"domname": "BRGRU",
"fieldname": "BRGRU",
"comptype": "E",
"reftype": "",
"ddtext": "",
"rollname": "BRGRU",
"scrtext_l": "Authorization Group",
"i_ddtext": "Authorization Group",
"as4local": "A",
"as4vers": "0000",
"POS": "0015",
"LANG": "E",
"keyflag": false,
"mandatory": false,
"checktable": "*"
},
{
"tabname": "T001B_PS_PER",
"inttype": "C",
"intlen": "000004",
"reftable": "",
"precfield": "",
"reffield": "",
"notnull": "X",
"datatype": "CHAR",
"leng": "000004",
"decimals": "000000",
"domname": "OPVAR",
"fieldname": "BUKRS",
"comptype": "E",
"reftype": "",
"ddtext": "",
"rollname": "OPVAR",
"scrtext_l": "Pstng period variant",
"i_ddtext": "Posting Period Variant",
"as4local": "A",
"as4vers": "0000",
"POS": "0003",
"LANG": "E",
"keyflag": true,
"mandatory": false,
"checktable": "T010O"
}
]

View File

@ -0,0 +1,26 @@
[
{
"tabname": "T001B_PS",
"as4local": "A",
"as4vers": "0000",
"LANG": "E",
"tabclass": "TRANSP",
"sqltab": "",
"applclass": "FB",
"authclass": "00",
"masterlang": "",
"ddtext": "Account Assignment Objects in General Ledger"
},
{
"tabname": "T001B_PS_PER",
"as4local": "A",
"as4vers": "0000",
"LANG": "E",
"tabclass": "TRANSP",
"sqltab": "",
"applclass": "FB",
"authclass": "00",
"masterlang": "",
"ddtext": "Permitted Posting Periods for Account Assignment Objects"
}
]

View File

@ -0,0 +1,377 @@
# Copyright 2024 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
TestCase for SAP ERP using the topology
"""
import json
from pathlib import Path
from unittest import TestCase
from unittest.mock import patch
from metadata.generated.schema.api.data.createTable import CreateTableRequest
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
from metadata.generated.schema.entity.data.table import (
Column,
ColumnName,
TableConstraint,
)
from metadata.generated.schema.entity.services.databaseService import (
DatabaseConnection,
DatabaseService,
DatabaseServiceType,
)
from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataWorkflowConfig,
)
from metadata.generated.schema.type.basic import (
EntityName,
FullyQualifiedEntityName,
Markdown,
)
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.database.saperp.client import SapErpClient
from metadata.ingestion.source.database.saperp.metadata import SaperpSource
from metadata.ingestion.source.database.saperp.models import SapErpColumn, SapErpTable
mock_saperp_config = {
"source": {
"type": "SapErp",
"serviceName": "local_saperp",
"serviceConnection": {
"config": {
"type": "SapErp",
"hostPort": "https://test.com",
"apiKey": "test_api_key",
"databaseName": "saperp_database",
"databaseSchema": "saperp_database_schema",
"paginationLimit": 100,
}
},
"sourceConfig": {"config": {"type": "DatabaseMetadata"}},
},
"sink": {"type": "metadata-rest", "config": {}},
"workflowConfig": {
"loggerLevel": "DEBUG",
"openMetadataServerConfig": {
"hostPort": "http://localhost:8585/api",
"authProvider": "openmetadata",
"securityConfig": {
"jwtToken": "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg"
},
},
},
}
MOCK_DATABASE_SERVICE = DatabaseService(
id="c3eb265f-5445-4ad3-ba5e-797d3a3071bb",
name="saperp_source_test",
connection=DatabaseConnection(),
serviceType=DatabaseServiceType.SapErp,
)
MOCK_DATABASE = Database(
id="a58b1856-729c-493b-bc87-6d2269b43ec0",
name="saperp_database",
fullyQualifiedName="saperp_source_test.saperp_database",
displayName="saperp_database",
description="",
service=EntityReference(
id="85811038-099a-11ed-861d-0242ac120002", type="databaseService"
),
)
MOCK_DATABASE_SCHEMA = DatabaseSchema(
id="c3eb265f-5445-4ad3-ba5e-797d3a3071bb",
name="saperp_database_schema",
fullyQualifiedName="saperp_source_test.saperp_database.saperp_database_schema",
service=EntityReference(id="c3eb265f-5445-4ad3-ba5e-797d3a3071bb", type="database"),
database=EntityReference(
id="a58b1856-729c-493b-bc87-6d2269b43ec0",
type="database",
),
)
EXPECTED_TABLES_AND_COLUMNS = [
CreateTableRequest(
name=EntityName(root="T001B_PS"),
displayName=None,
description=Markdown(root="Account Assignment Objects in General Ledger"),
tableType="Regular",
columns=[
Column(
name=ColumnName(root="BUKRS"),
displayName="BUKRS",
dataType="CHAR",
arrayDataType=None,
dataLength=1,
precision=None,
scale=None,
dataTypeDisplay="CHAR",
description=Markdown(
root="**Pstng period variant**\nPosting Period Variant"
),
fullyQualifiedName=None,
tags=None,
constraint=None,
ordinalPosition=3,
jsonSchema=None,
children=None,
profile=None,
customMetrics=None,
),
Column(
name=ColumnName(root="FIELD"),
displayName="FIELD",
dataType="CHAR",
arrayDataType=None,
dataLength=1,
precision=None,
scale=None,
dataTypeDisplay="CHAR",
description=Markdown(
root="**GL Field Name**\nGeneral Ledger Field Name"
),
fullyQualifiedName=None,
tags=None,
constraint=None,
ordinalPosition=4,
jsonSchema=None,
children=None,
profile=None,
customMetrics=None,
),
Column(
name=ColumnName(root="MANDT"),
displayName="MANDT",
dataType="INT",
arrayDataType=None,
dataLength=None,
precision=None,
scale=None,
dataTypeDisplay="INT",
description=Markdown(root="**Client**"),
fullyQualifiedName=None,
tags=None,
constraint=None,
ordinalPosition=1,
jsonSchema=None,
children=None,
profile=None,
customMetrics=None,
),
Column(
name=ColumnName(root="RRCTY"),
displayName="RRCTY",
dataType="CHAR",
arrayDataType=None,
dataLength=1,
precision=None,
scale=None,
dataTypeDisplay="CHAR",
description=Markdown(root="**Record Type**"),
fullyQualifiedName=None,
tags=None,
constraint=None,
ordinalPosition=2,
jsonSchema=None,
children=None,
profile=None,
customMetrics=None,
),
],
dataModel=None,
tableConstraints=[
TableConstraint(
constraintType="PRIMARY_KEY",
columns=["BUKRS", "FIELD", "MANDT", "RRCTY"],
referredColumns=None,
)
],
tablePartition=None,
tableProfilerConfig=None,
owner=None,
databaseSchema=FullyQualifiedEntityName(
root="saperp_source_test.saperp_database.saperp_database_schema"
),
tags=None,
schemaDefinition=None,
retentionPeriod=None,
extension=None,
sourceUrl=None,
domain=None,
dataProducts=None,
fileFormat=None,
lifeCycle=None,
sourceHash=None,
),
CreateTableRequest(
name=EntityName(root="T001B_PS_PER"),
displayName=None,
description=Markdown(
root="Permitted Posting Periods for Account Assignment Objects"
),
tableType="Regular",
columns=[
Column(
name=ColumnName(root="BKONT"),
displayName="BKONT",
dataType="CHAR",
arrayDataType=None,
dataLength=1,
precision=None,
scale=None,
dataTypeDisplay="CHAR",
description=Markdown(
root="**To Account Assmnt**\nTo Account Assignment"
),
fullyQualifiedName=None,
tags=None,
constraint=None,
ordinalPosition=5,
jsonSchema=None,
children=None,
profile=None,
customMetrics=None,
),
Column(
name=ColumnName(root="BRGRU"),
displayName="BRGRU",
dataType="CHAR",
arrayDataType=None,
dataLength=1,
precision=None,
scale=None,
dataTypeDisplay="CHAR",
description=Markdown(root="**Authorization Group**"),
fullyQualifiedName=None,
tags=None,
constraint="NOT_NULL",
ordinalPosition=15,
jsonSchema=None,
children=None,
profile=None,
customMetrics=None,
),
Column(
name=ColumnName(root="BUKRS"),
displayName="BUKRS",
dataType="CHAR",
arrayDataType=None,
dataLength=1,
precision=None,
scale=None,
dataTypeDisplay="CHAR",
description=Markdown(
root="**Pstng period variant**\nPosting Period Variant"
),
fullyQualifiedName=None,
tags=None,
constraint=None,
ordinalPosition=3,
jsonSchema=None,
children=None,
profile=None,
customMetrics=None,
),
],
dataModel=None,
tableConstraints=[
TableConstraint(
constraintType="PRIMARY_KEY",
columns=["BKONT", "BUKRS"],
referredColumns=None,
)
],
tablePartition=None,
tableProfilerConfig=None,
owner=None,
databaseSchema=FullyQualifiedEntityName(
root="saperp_source_test.saperp_database.saperp_database_schema"
),
tags=None,
schemaDefinition=None,
retentionPeriod=None,
extension=None,
sourceUrl=None,
domain=None,
dataProducts=None,
fileFormat=None,
lifeCycle=None,
sourceHash=None,
),
]
def read_datasets(file_name: str) -> dict:
mock_file_path = (
Path(__file__).parent.parent.parent / f"resources/datasets/saperp/{file_name}"
)
with open(mock_file_path, encoding="UTF-8") as file:
return json.load(file)
def mock_list_tables(self): # pylint: disable=unused-argument
tables = read_datasets("tables.json")
return [SapErpTable(**table) for table in tables]
def mock_list_columns(self, table_name: str): # pylint: disable=unused-argument
columns = read_datasets("columns.json")
return [
SapErpColumn(**column) for column in columns if column["tabname"] == table_name
]
class SapErpUnitTest(TestCase):
"""
Implements the necessary methods to extract
Alation Unit Test
"""
@patch(
"metadata.ingestion.source.database.saperp.metadata.SaperpSource.test_connection"
)
def __init__(self, methodName, test_connection) -> None:
super().__init__(methodName)
test_connection.return_value = False
self.config = OpenMetadataWorkflowConfig.model_validate(mock_saperp_config)
self.saperp = SaperpSource.create(
mock_saperp_config["source"],
OpenMetadata(self.config.workflowConfig.openMetadataServerConfig),
)
self.saperp.context.get().__dict__["database"] = MOCK_DATABASE.name.root
self.saperp.context.get().__dict__[
"database_service"
] = MOCK_DATABASE_SERVICE.name.root
self.saperp.context.get().__dict__[
"database_schema"
] = MOCK_DATABASE_SCHEMA.name.root
@patch.object(SapErpClient, "list_tables", mock_list_tables)
@patch.object(SapErpClient, "list_columns", mock_list_columns)
def test_yield_table(self):
"""
Test the yield table
"""
tables = self.saperp.get_tables_name_and_type()
returned_tables = []
for table in tables:
returned_tables.extend(
[either.right for either in self.saperp.yield_table(table)]
)
print(returned_tables)
for _, (expected, original) in enumerate(
zip(EXPECTED_TABLES_AND_COLUMNS, returned_tables)
):
self.assertEqual(expected, original)

View File

@ -0,0 +1,20 @@
{
"name": "SapErp",
"displayName": "SAP ERP Test Connection",
"description": "This Test Connection validates the access against the database and basic metadata extraction tables and columns.",
"steps": [
{
"name": "GetTables",
"description": "Validate that we can fetch the list of tables from the API",
"errorMessage": "Failed to fetch tables, please validate if the access token is correct and has enough privilege to fetch tables.",
"mandatory": true
},
{
"name": "GetColumns",
"description": "Validate that we can fetch the list of tables from the API",
"errorMessage": "Failed to fetch columns, please validate if the access token is correct and has enough privilege to fetch columns.",
"mandatory": false
}
]
}

View File

@ -0,0 +1,66 @@
{
"$id": "https://open-metadata.org/schema/entity/services/connections/database/sapErpConnection.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "SapErpConnection",
"description": "Sap ERP Database Connection Config",
"type": "object",
"javaType": "org.openmetadata.schema.services.connections.database.SapErpConnection",
"definitions": {
"sapErpType": {
"description": "Service type.",
"type": "string",
"enum": ["SapErp"],
"default": "SapErp"
}
},
"properties": {
"type": {
"title": "Service Type",
"description": "Service Type",
"$ref": "#/definitions/sapErpType",
"default": "SapErp"
},
"hostPort": {
"expose": true,
"title": "Host and Port",
"description": "Host and Port of the SAP ERP instance.",
"type": "string",
"format": "uri"
},
"apiKey": {
"title": "API Key",
"description": "API key to authenticate with the SAP ERP APIs.",
"type": "string",
"format": "password"
},
"databaseName": {
"title": "Database Name",
"description": "Optional name to give to the database in OpenMetadata. If left blank, we will use default as the database name.",
"type": "string"
},
"databaseSchema": {
"title": "Database Schema",
"description": "Optional name to give to the schema in OpenMetadata. If left blank, we will use default as the schema name",
"type": "string"
},
"paginationLimit": {
"title": "Pagination Limit",
"description": "Pagination limit used while querying the SAP ERP API for fetching the entities",
"type": "integer",
"default": 10
},
"connectionOptions": {
"title": "Connection Options",
"$ref": "../connectionBasicType.json#/definitions/connectionOptions"
},
"connectionArguments": {
"title": "Connection Arguments",
"$ref": "../connectionBasicType.json#/definitions/connectionArguments"
},
"supportsMetadataExtraction": {
"title": "Supports Metadata Extraction",
"$ref": "../connectionBasicType.json#/definitions/supportsMetadataExtraction"
}
},
"additionalProperties": false
}

View File

@ -55,7 +55,8 @@
"UnityCatalog",
"SAS",
"Iceberg",
"Teradata"
"Teradata",
"SapErp"
],
"javaEnums": [
{
@ -180,6 +181,9 @@
},
{
"name": "Teradata"
},
{
"name": "SapErp"
}
]
},
@ -309,6 +313,9 @@
},
{
"$ref": "./connections/database/teradataConnection.json"
},
{
"$ref": "./connections/database/sapErpConnection.json"
}
]
}

View File

@ -0,0 +1,59 @@
# SAP ERP
In this section, we provide guides and references to use the SAP ERP connector.
## Requirements
You will need the following permissions to extract SAP ERP metadata:
- **API Access**: You must have the API Enabled permission in your SAP ERP instance.
## Connection Details
$$section
### Host Port $(id="hostPort")
This parameter specifies the host and port of the SAP ERP instance. This should be specified as a string in the format `https://hostname.com`.
$$
$$section
### Api Key $(id="apiKey")
Api Key to authenticate the SAP ERP Apis
$$
$$section
### Database Name $(id="databaseName")
In OpenMetadata, the Database Service hierarchy works as follows:
```
Database Service > Database > Schema > Table
```
In the case of SAP ERP, we won't have a Database as such. If you'd like to see your data in a database named something other than `default`, you can specify the name in this field.
$$
$$section
### Database Schema $(id="databaseSchema")
In OpenMetadata, the Database Service hierarchy works as follows:
```
Database Service > Database > Schema > Table
```
In the case of SAP ERP, we won't have a Database Schema as such. If you'd like to see your data in a database schema named something other than `default`, you can specify the name in this field.
$$
$$section
### Pagination Limit $(id="paginationLimit")
Pagination limit used while querying the SAP ERP API for fetching the entities.
$$
$$section
### Connection Options $(id="connectionOptions")
Additional connection options to build the URL that can be sent to service during the connection.
$$
$$section
### Connection Arguments $(id="connectionArguments")
Additional connection arguments such as security or protocol configs that can be sent to service during connection.
$$

Binary file not shown.

After

Width:  |  Height:  |  Size: 28 KiB

View File

@ -20,7 +20,7 @@
display: inline-flex;
a {
font-size: 12px;
font-size: 12px;
color: @text-color;
}

View File

@ -72,6 +72,7 @@ import redpanda from '../assets/img/service-icon-redpanda.png';
import redshift from '../assets/img/service-icon-redshift.png';
import sagemaker from '../assets/img/service-icon-sagemaker.png';
import salesforce from '../assets/img/service-icon-salesforce.png';
import sapErp from '../assets/img/service-icon-sap-erp.png';
import sapHana from '../assets/img/service-icon-sap-hana.png';
import sas from '../assets/img/service-icon-sas.svg';
import scikit from '../assets/img/service-icon-scikit.png';
@ -160,6 +161,7 @@ export const SINGLESTORE = singlestore;
export const SALESFORCE = salesforce;
export const MLFLOW = mlflow;
export const SAP_HANA = sapHana;
export const SAP_ERP = sapErp;
export const SCIKIT = scikit;
export const DELTALAKE = deltalake;
export const DEFAULT_SERVICE = iconDefaultService;

View File

@ -22,9 +22,9 @@ import Loader from '../../components/common/Loader/Loader';
import ResizablePanels from '../../components/common/ResizablePanels/ResizablePanels';
import RichTextEditor from '../../components/common/RichTextEditor/RichTextEditor';
import TitleBreadcrumb from '../../components/common/TitleBreadcrumb/TitleBreadcrumb.component';
import { useLimitStore } from '../../context/LimitsProvider/useLimitsStore';
import { ROUTES, VALIDATION_MESSAGES } from '../../constants/constants';
import { NAME_FIELD_RULES } from '../../constants/Form.constants';
import { useLimitStore } from '../../context/LimitsProvider/useLimitsStore';
import { CreateEventSubscription } from '../../generated/events/api/createEventSubscription';
import {
AlertType,

View File

@ -44,6 +44,7 @@ import postgresConnection from '../jsons/connectionSchemas/connections/database/
import prestoConnection from '../jsons/connectionSchemas/connections/database/prestoConnection.json';
import redshiftConnection from '../jsons/connectionSchemas/connections/database/redshiftConnection.json';
import salesforceConnection from '../jsons/connectionSchemas/connections/database/salesforceConnection.json';
import sapErpConnection from '../jsons/connectionSchemas/connections/database/sapErpConnection.json';
import sapHanaConnection from '../jsons/connectionSchemas/connections/database/sapHanaConnection.json';
import sasConnection from '../jsons/connectionSchemas/connections/database/sasConnection.json';
import singleStoreConnection from '../jsons/connectionSchemas/connections/database/singleStoreConnection.json';
@ -214,6 +215,11 @@ export const getDatabaseConfig = (type: DatabaseServiceType) => {
break;
}
case DatabaseServiceType.SapERP: {
schema = sapErpConnection;
break;
}
case DatabaseServiceType.MongoDB: {
schema = mongoDBConnection;

View File

@ -76,6 +76,7 @@ import {
REDSHIFT,
SAGEMAKER,
SALESFORCE,
SAP_ERP,
SAP_HANA,
SAS,
SCIKIT,
@ -303,6 +304,9 @@ class ServiceUtilClassBase {
case this.DatabaseServiceTypeSmallCase.SapHana:
return SAP_HANA;
case this.DatabaseServiceTypeSmallCase.SapERP:
return SAP_ERP;
case this.DatabaseServiceTypeSmallCase.DeltaLake:
return DELTALAKE;