mirror of
https://github.com/datahub-project/datahub.git
synced 2025-07-09 18:24:48 +00:00
119 lines
4.1 KiB
Python
Executable File
119 lines
4.1 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Mock DataHub server that handles both GET and POST requests and supports pagination
|
|
"""
|
|
|
|
import http.server
|
|
import json
|
|
import socketserver
|
|
from http import HTTPStatus
|
|
from urllib.parse import urlparse
|
|
|
|
PORT = 8010
|
|
|
|
# Load the mock response data
|
|
with open("/app/datahub_entities_v3_page1.json", "r") as f:
|
|
ENTITIES_V3_PAGE1_RESPONSE = f.read()
|
|
|
|
with open("/app/datahub_entities_v3_page2.json", "r") as f:
|
|
ENTITIES_V3_PAGE2_RESPONSE = f.read()
|
|
|
|
with open("/app/datahub_get_urns_by_filter_page1.json", "r") as f:
|
|
URNS_BY_FILTER_PAGE1_RESPONSE = f.read()
|
|
|
|
with open("/app/datahub_get_urns_by_filter_page2.json", "r") as f:
|
|
URNS_BY_FILTER_PAGE2_RESPONSE = f.read()
|
|
|
|
# Global state flag to track if first page has been requested
|
|
FIRST_ENTITIES_PAGE_REQUESTED = False
|
|
|
|
|
|
class MockDataHubAPIHandler(http.server.SimpleHTTPRequestHandler):
|
|
# Global state flag to track if first page has been requested accross all instances; one instance per request
|
|
first_entities_page_requested = False
|
|
|
|
def do_GET(self):
|
|
parsed_url = urlparse(self.path)
|
|
path = parsed_url.path
|
|
|
|
# Health check endpoint
|
|
if path == "/health":
|
|
self.send_response(HTTPStatus.OK)
|
|
self.send_header("Content-type", "text/plain")
|
|
self.end_headers()
|
|
self.wfile.write(b"OK")
|
|
return
|
|
|
|
# Mock DataHub API endpoints
|
|
if path.startswith("/config"):
|
|
self.send_response(HTTPStatus.OK)
|
|
self.send_header("Content-type", "application/json")
|
|
self.send_header("Access-Control-Allow-Origin", "*")
|
|
self.end_headers()
|
|
self.wfile.write(json.dumps(dict(noCode="true")).encode())
|
|
return
|
|
|
|
# Default 404 response
|
|
self.send_response(HTTPStatus.NOT_FOUND)
|
|
self.send_header("Content-type", "application/json")
|
|
self.end_headers()
|
|
self.wfile.write(json.dumps({"error": "Not found", "path": self.path}).encode())
|
|
|
|
def do_POST(self):
|
|
parsed_url = urlparse(self.path)
|
|
path = parsed_url.path
|
|
|
|
# Get request body
|
|
content_length = int(self.headers["Content-Length"])
|
|
post_data = self.rfile.read(content_length)
|
|
request_body = json.loads(post_data)
|
|
|
|
if path == "/openapi/v3/entity/query/batchGet":
|
|
self.send_response(HTTPStatus.OK)
|
|
self.send_header("Content-type", "application/json")
|
|
self.send_header("Access-Control-Allow-Origin", "*")
|
|
self.end_headers()
|
|
|
|
# Return the appropriate page of entity data in V3 format
|
|
if not MockDataHubAPIHandler.first_entities_page_requested:
|
|
self.wfile.write(ENTITIES_V3_PAGE1_RESPONSE.encode())
|
|
MockDataHubAPIHandler.first_entities_page_requested = True
|
|
else:
|
|
self.wfile.write(ENTITIES_V3_PAGE2_RESPONSE.encode())
|
|
return
|
|
|
|
if path == "/api/graphql":
|
|
self.send_response(HTTPStatus.OK)
|
|
self.send_header("Content-type", "application/json")
|
|
self.send_header("Access-Control-Allow-Origin", "*")
|
|
self.end_headers()
|
|
|
|
# Check if this is a scroll query with nextScrollId
|
|
scroll_id = None
|
|
if "variables" in request_body:
|
|
scroll_id = request_body.get("variables", {}).get("scrollId")
|
|
|
|
if scroll_id == "page_2_scroll_id":
|
|
self.wfile.write(URNS_BY_FILTER_PAGE2_RESPONSE.encode())
|
|
else:
|
|
self.wfile.write(URNS_BY_FILTER_PAGE1_RESPONSE.encode())
|
|
return
|
|
|
|
# Default 404 response
|
|
self.send_response(HTTPStatus.NOT_FOUND)
|
|
self.send_header("Content-type", "application/json")
|
|
self.end_headers()
|
|
self.wfile.write(
|
|
json.dumps(
|
|
{"error": "Not found", "path": self.path, "method": "POST"}
|
|
).encode()
|
|
)
|
|
|
|
|
|
# Set up the server
|
|
handler = MockDataHubAPIHandler
|
|
httpd = socketserver.TCPServer(("", PORT), handler)
|
|
|
|
print(f"Serving mock DataHub API at port {PORT}")
|
|
httpd.serve_forever()
|