mirror of
				https://github.com/open-metadata/OpenMetadata.git
				synced 2025-11-04 04:29:13 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			229 lines
		
	
	
		
			8.0 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			229 lines
		
	
	
		
			8.0 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
#  Copyright 2025 Collate
 | 
						|
#  Licensed under the Collate Community License, Version 1.0 (the "License");
 | 
						|
#  you may not use this file except in compliance with the License.
 | 
						|
#  You may obtain a copy of the License at
 | 
						|
#  https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
 | 
						|
#  Unless required by applicable law or agreed to in writing, software
 | 
						|
#  distributed under the License is distributed on an "AS IS" BASIS,
 | 
						|
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
						|
#  See the License for the specific language governing permissions and
 | 
						|
#  limitations under the License.
 | 
						|
 | 
						|
"""
 | 
						|
OpenMetadata utils tests
 | 
						|
"""
 | 
						|
import base64
 | 
						|
import json
 | 
						|
from unittest import TestCase
 | 
						|
 | 
						|
from metadata.generated.schema.entity.data.mlmodel import MlModel
 | 
						|
from metadata.generated.schema.entity.data.table import Column, Table
 | 
						|
from metadata.generated.schema.type import basic
 | 
						|
from metadata.generated.schema.type.entityReference import EntityReference
 | 
						|
from metadata.ingestion.connections.headers import render_query_header
 | 
						|
from metadata.ingestion.ometa.utils import (
 | 
						|
    build_entity_reference,
 | 
						|
    decode_jwt_token,
 | 
						|
    format_name,
 | 
						|
    get_entity_type,
 | 
						|
    model_str,
 | 
						|
)
 | 
						|
 | 
						|
MOCK_TABLE = Table(
 | 
						|
    id="c3eb265f-5445-4ad3-ba5e-797d3a3071bb",
 | 
						|
    name="customers",
 | 
						|
    description="description\nwith new line",
 | 
						|
    tableType="Regular",
 | 
						|
    columns=[
 | 
						|
        Column(
 | 
						|
            name="customer_id",
 | 
						|
            dataType="INT",
 | 
						|
        ),
 | 
						|
        Column(
 | 
						|
            name="first_name",
 | 
						|
            dataType="STRING",
 | 
						|
        ),
 | 
						|
        Column(
 | 
						|
            name="last_name",
 | 
						|
            dataType="STRING",
 | 
						|
        ),
 | 
						|
    ],
 | 
						|
    databaseSchema=EntityReference(
 | 
						|
        id="c3eb265f-5445-4ad3-ba5e-797d3a3071bb", type="databaseSchema"
 | 
						|
    ),
 | 
						|
)
 | 
						|
 | 
						|
 | 
						|
class OMetaUtilsTest(TestCase):
 | 
						|
    def test_format_name(self):
 | 
						|
        """
 | 
						|
        Check we are properly formatting names
 | 
						|
        """
 | 
						|
 | 
						|
        self.assertEqual(format_name("random"), "random")
 | 
						|
        self.assertEqual(format_name("ran dom"), "ran_dom")
 | 
						|
        self.assertEqual(format_name("ran_(dom"), "ran__dom")
 | 
						|
 | 
						|
    def test_get_entity_type(self):
 | 
						|
        """
 | 
						|
        Check that we return a string or the class name
 | 
						|
        """
 | 
						|
 | 
						|
        self.assertEqual(get_entity_type("hello"), "hello")
 | 
						|
        self.assertEqual(get_entity_type(MlModel), "mlmodel")
 | 
						|
 | 
						|
    def test_model_str(self):
 | 
						|
        """
 | 
						|
        Return Uuid as str
 | 
						|
        """
 | 
						|
 | 
						|
        self.assertEqual(model_str("random"), "random")
 | 
						|
        self.assertEqual(
 | 
						|
            model_str(basic.Uuid("9fc58e81-7412-4023-a298-59f2494aab9d")),
 | 
						|
            "9fc58e81-7412-4023-a298-59f2494aab9d",
 | 
						|
        )
 | 
						|
 | 
						|
        self.assertEqual(model_str(basic.EntityName("EntityName")), "EntityName")
 | 
						|
        self.assertEqual(model_str(basic.FullyQualifiedEntityName("FQDN")), "FQDN")
 | 
						|
 | 
						|
    def test_render_query_headers_builds_the_right_string(self) -> None:
 | 
						|
        assert (
 | 
						|
            render_query_header("0.0.1")
 | 
						|
            == '/* {"app": "OpenMetadata", "version": "0.0.1"} */'
 | 
						|
        )
 | 
						|
 | 
						|
    def test_build_entity_reference(self) -> None:
 | 
						|
        """Check we're building the right class"""
 | 
						|
        res = build_entity_reference(MOCK_TABLE)
 | 
						|
        self.assertEqual(res.type, "table")
 | 
						|
        self.assertEqual(res.id, MOCK_TABLE.id)
 | 
						|
 | 
						|
    def test_decode_jwt_token_valid(self):
 | 
						|
        """Test decoding a valid JWT token"""
 | 
						|
        # Create a mock JWT payload
 | 
						|
        payload = {
 | 
						|
            "sub": "testuser",
 | 
						|
            "email": "testuser@example.com",
 | 
						|
            "name": "Test User",
 | 
						|
            "iat": 1640995200,
 | 
						|
            "exp": 1641081600,
 | 
						|
        }
 | 
						|
 | 
						|
        # Encode the payload
 | 
						|
        payload_encoded = (
 | 
						|
            base64.urlsafe_b64encode(json.dumps(payload).encode("utf-8"))
 | 
						|
            .decode("utf-8")
 | 
						|
            .rstrip("=")
 | 
						|
        )
 | 
						|
 | 
						|
        # Create a mock JWT token (header.payload.signature)
 | 
						|
        jwt_token = f"eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.{payload_encoded}.signature"
 | 
						|
 | 
						|
        result = decode_jwt_token(jwt_token)
 | 
						|
 | 
						|
        self.assertIsNotNone(result)
 | 
						|
        self.assertEqual(result["sub"], "testuser")
 | 
						|
        self.assertEqual(result["email"], "testuser@example.com")
 | 
						|
        self.assertEqual(result["name"], "Test User")
 | 
						|
 | 
						|
    def test_decode_jwt_token_with_padding(self):
 | 
						|
        """Test decoding a JWT token that needs padding"""
 | 
						|
        # Create a payload that will need padding
 | 
						|
        payload = {"sub": "admin", "email": "admin@openmetadata.org"}
 | 
						|
 | 
						|
        # Encode without padding
 | 
						|
        payload_encoded = (
 | 
						|
            base64.urlsafe_b64encode(json.dumps(payload).encode("utf-8"))
 | 
						|
            .decode("utf-8")
 | 
						|
            .rstrip("=")
 | 
						|
        )
 | 
						|
 | 
						|
        jwt_token = f"header.{payload_encoded}.signature"
 | 
						|
 | 
						|
        result = decode_jwt_token(jwt_token)
 | 
						|
 | 
						|
        self.assertIsNotNone(result)
 | 
						|
        self.assertEqual(result["sub"], "admin")
 | 
						|
        self.assertEqual(result["email"], "admin@openmetadata.org")
 | 
						|
 | 
						|
    def test_decode_jwt_token_invalid_format(self):
 | 
						|
        """Test decoding an invalid JWT token format"""
 | 
						|
        # Test with wrong number of parts
 | 
						|
        invalid_token = "header.payload"  # Missing signature
 | 
						|
        result = decode_jwt_token(invalid_token)
 | 
						|
        self.assertIsNone(result)
 | 
						|
 | 
						|
        # Test with too many parts
 | 
						|
        invalid_token = "header.payload.signature.extra"
 | 
						|
        result = decode_jwt_token(invalid_token)
 | 
						|
        self.assertIsNone(result)
 | 
						|
 | 
						|
    def test_decode_jwt_token_invalid_base64(self):
 | 
						|
        """Test decoding a JWT token with invalid base64 in payload"""
 | 
						|
        invalid_token = "header.invalid-base64.signature"
 | 
						|
        result = decode_jwt_token(invalid_token)
 | 
						|
        self.assertIsNone(result)
 | 
						|
 | 
						|
    def test_decode_jwt_token_invalid_json(self):
 | 
						|
        """Test decoding a JWT token with invalid JSON in payload"""
 | 
						|
        # Create invalid JSON payload
 | 
						|
        invalid_json = "invalid json content"
 | 
						|
        payload_encoded = base64.urlsafe_b64encode(invalid_json.encode("utf-8")).decode(
 | 
						|
            "utf-8"
 | 
						|
        )
 | 
						|
 | 
						|
        jwt_token = f"header.{payload_encoded}.signature"
 | 
						|
 | 
						|
        result = decode_jwt_token(jwt_token)
 | 
						|
        self.assertIsNone(result)
 | 
						|
 | 
						|
    def test_decode_jwt_token_empty_payload(self):
 | 
						|
        """Test decoding a JWT token with empty payload"""
 | 
						|
        # Create empty payload
 | 
						|
        payload_encoded = base64.urlsafe_b64encode(
 | 
						|
            json.dumps({}).encode("utf-8")
 | 
						|
        ).decode("utf-8")
 | 
						|
 | 
						|
        jwt_token = f"header.{payload_encoded}.signature"
 | 
						|
 | 
						|
        result = decode_jwt_token(jwt_token)
 | 
						|
        self.assertIsNotNone(result)
 | 
						|
        self.assertEqual(result, {})
 | 
						|
 | 
						|
    def test_decode_jwt_token_none_input(self):
 | 
						|
        """Test decoding with None input"""
 | 
						|
        result = decode_jwt_token(None)
 | 
						|
        self.assertIsNone(result)
 | 
						|
 | 
						|
    def test_decode_jwt_token_empty_string(self):
 | 
						|
        """Test decoding with empty string input"""
 | 
						|
        result = decode_jwt_token("")
 | 
						|
        self.assertIsNone(result)
 | 
						|
 | 
						|
    def test_decode_jwt_token_real_world_example(self):
 | 
						|
        """Test with a realistic JWT token structure"""
 | 
						|
        # Simulate a real OpenMetadata JWT token payload
 | 
						|
        payload = {
 | 
						|
            "sub": "ingestion-bot",
 | 
						|
            "iss": "open-metadata.org",
 | 
						|
            "iat": 1663938462,
 | 
						|
            "email": "ingestion-bot@open-metadata.org",
 | 
						|
            "isBot": False,
 | 
						|
        }
 | 
						|
 | 
						|
        payload_encoded = (
 | 
						|
            base64.urlsafe_b64encode(json.dumps(payload).encode("utf-8"))
 | 
						|
            .decode("utf-8")
 | 
						|
            .rstrip("=")
 | 
						|
        )
 | 
						|
 | 
						|
        jwt_token = f"eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.{payload_encoded}.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg"
 | 
						|
 | 
						|
        result = decode_jwt_token(jwt_token)
 | 
						|
 | 
						|
        self.assertIsNotNone(result)
 | 
						|
        self.assertEqual(result["sub"], "ingestion-bot")
 | 
						|
        self.assertEqual(result["iss"], "open-metadata.org")
 | 
						|
        self.assertEqual(result["email"], "ingestion-bot@open-metadata.org")
 | 
						|
        self.assertEqual(result["isBot"], False)
 |