mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-12-26 06:53:37 +00:00
Embedded MCP Server (#21206)
* Mcp Server * Update Server * Refactored into multiple files * Add Tool Dynamic loading * Updated to use toolName * add description for tools * initial create glossary term action * initial patch entity tool * Fix Glossary Tool * Use prepare * Changed const to default * Prepare for Collate Tools * Update HttpServletSseServerTransportProvider.java * Checkstyle fix * endpoint changed to messages in new versions * Add Auth Filter to MCP Request * description * clean response --------- Co-authored-by: Pablo Takara <pjt1991@gmail.com> Co-authored-by: Pere Miquel Brull <peremiquelbrull@gmail.com>
This commit is contained in:
parent
292c334b0b
commit
bbc450b2d1
@ -31,6 +31,8 @@ server:
|
||||
- type: http
|
||||
bindHost: ${SERVER_HOST:-0.0.0.0}
|
||||
port: ${SERVER_ADMIN_PORT:-8586}
|
||||
gzip:
|
||||
syncFlush: true
|
||||
|
||||
maxThreads: ${SERVER_MAX_THREADS:-50}
|
||||
minThreads: ${SERVER_MIN_THREADS:-10}
|
||||
@ -433,9 +435,14 @@ web:
|
||||
option: ${WEB_CONF_PERMISSION_POLICY_OPTION:-""}
|
||||
cache-control: ${WEB_CONF_CACHE_CONTROL:-""}
|
||||
pragma: ${WEB_CONF_PRAGMA:-""}
|
||||
|
||||
|
||||
operationalConfig:
|
||||
enable: ${OPERATIONAL_CONFIG_ENABLED:-true}
|
||||
operationsConfigFile: ${OPERATIONAL_CONFIG_FILE:-"./conf/operations.yaml"}
|
||||
|
||||
mcpConfiguration:
|
||||
enabled: ${MCP_ENABLED:-true}
|
||||
path: ${MCP_PATH:-"/mcp"}
|
||||
mcpServerVersion: ${MCP_SERVER_VERSION:-"1.0.0"}
|
||||
|
||||
|
||||
|
||||
@ -606,6 +606,15 @@
|
||||
<artifactId>socket.io-server</artifactId>
|
||||
<version>4.0.1</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.modelcontextprotocol.sdk</groupId>
|
||||
<artifactId>mcp-bom</artifactId>
|
||||
<type>pom</type>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.modelcontextprotocol.sdk</groupId>
|
||||
<artifactId>mcp</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.socket</groupId>
|
||||
<artifactId>engine.io-server-jetty</artifactId>
|
||||
|
||||
@ -91,6 +91,7 @@ import org.openmetadata.service.jobs.JobDAO;
|
||||
import org.openmetadata.service.jobs.JobHandlerRegistry;
|
||||
import org.openmetadata.service.limits.DefaultLimits;
|
||||
import org.openmetadata.service.limits.Limits;
|
||||
import org.openmetadata.service.mcp.McpServer;
|
||||
import org.openmetadata.service.migration.Migration;
|
||||
import org.openmetadata.service.migration.MigrationValidationClient;
|
||||
import org.openmetadata.service.migration.api.MigrationWorkflow;
|
||||
@ -264,6 +265,9 @@ public class OpenMetadataApplication extends Application<OpenMetadataApplication
|
||||
// Asset Servlet Registration
|
||||
registerAssetServlet(catalogConfig, catalogConfig.getWebConfiguration(), environment);
|
||||
|
||||
// Register MCP
|
||||
registerMCPServer(catalogConfig, environment);
|
||||
|
||||
// Handle Services Jobs
|
||||
registerHealthCheckJobs(catalogConfig);
|
||||
|
||||
@ -271,6 +275,15 @@ public class OpenMetadataApplication extends Application<OpenMetadataApplication
|
||||
registerAuthServlets(catalogConfig, environment);
|
||||
}
|
||||
|
||||
protected void registerMCPServer(
|
||||
OpenMetadataApplicationConfig catalogConfig, Environment environment) {
|
||||
if (catalogConfig.getMcpConfiguration() != null
|
||||
&& catalogConfig.getMcpConfiguration().isEnabled()) {
|
||||
McpServer mcpServer = new McpServer();
|
||||
mcpServer.initializeMcpServer(environment, catalogConfig);
|
||||
}
|
||||
}
|
||||
|
||||
private void registerHealthCheckJobs(OpenMetadataApplicationConfig catalogConfig) {
|
||||
ServicesStatusJobHandler healthCheckStatusHandler =
|
||||
ServicesStatusJobHandler.create(
|
||||
|
||||
@ -35,6 +35,7 @@ import org.openmetadata.schema.api.security.jwt.JWTTokenConfiguration;
|
||||
import org.openmetadata.schema.configuration.LimitsConfiguration;
|
||||
import org.openmetadata.schema.security.secrets.SecretsManagerConfiguration;
|
||||
import org.openmetadata.schema.service.configuration.elasticsearch.ElasticSearchConfiguration;
|
||||
import org.openmetadata.service.config.MCPConfiguration;
|
||||
import org.openmetadata.service.config.OMWebConfiguration;
|
||||
import org.openmetadata.service.config.ObjectStorageConfiguration;
|
||||
import org.openmetadata.service.migration.MigrationConfiguration;
|
||||
@ -84,6 +85,9 @@ public class OpenMetadataApplicationConfig extends Configuration {
|
||||
|
||||
private static final String CERTIFICATE_PATH = "certificatePath";
|
||||
|
||||
@JsonProperty("mcpConfiguration")
|
||||
private MCPConfiguration mcpConfiguration = new MCPConfiguration();
|
||||
|
||||
public PipelineServiceClientConfiguration getPipelineServiceClientConfiguration() {
|
||||
if (pipelineServiceClientConfiguration != null) {
|
||||
Map<String, String> temporarySSLConfig =
|
||||
|
||||
@ -0,0 +1,21 @@
|
||||
package org.openmetadata.service.config;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
|
||||
@Getter
|
||||
@Setter
|
||||
public class MCPConfiguration {
|
||||
@JsonProperty("mcpServerName")
|
||||
private String mcpServerName = "openmetadata-mcp-server";
|
||||
|
||||
@JsonProperty("mcpServerVersion")
|
||||
private String mcpServerVersion = "1.0.0";
|
||||
|
||||
@JsonProperty("enabled")
|
||||
private boolean enabled = true;
|
||||
|
||||
@JsonProperty("path")
|
||||
private String path = "/api/v1/mcp";
|
||||
}
|
||||
@ -0,0 +1,282 @@
|
||||
/*
|
||||
This class should be removed once we migrate to Jakarta.
|
||||
*/
|
||||
|
||||
package org.openmetadata.service.mcp;
|
||||
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import io.modelcontextprotocol.spec.McpError;
|
||||
import io.modelcontextprotocol.spec.McpSchema;
|
||||
import io.modelcontextprotocol.spec.McpServerSession;
|
||||
import io.modelcontextprotocol.spec.McpServerTransport;
|
||||
import io.modelcontextprotocol.spec.McpServerTransportProvider;
|
||||
import java.io.BufferedReader;
|
||||
import java.io.IOException;
|
||||
import java.io.PrintWriter;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import javax.servlet.AsyncContext;
|
||||
import javax.servlet.ServletException;
|
||||
import javax.servlet.annotation.WebServlet;
|
||||
import javax.servlet.http.HttpServlet;
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
import org.openmetadata.service.util.JsonUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
@WebServlet(asyncSupported = true)
|
||||
public class HttpServletSseServerTransportProvider extends HttpServlet
|
||||
implements McpServerTransportProvider {
|
||||
private static final Logger logger =
|
||||
LoggerFactory.getLogger(HttpServletSseServerTransportProvider.class);
|
||||
public static final String UTF_8 = "UTF-8";
|
||||
public static final String APPLICATION_JSON = "application/json";
|
||||
public static final String FAILED_TO_SEND_ERROR_RESPONSE = "Failed to send error response: {}";
|
||||
public static final String DEFAULT_SSE_ENDPOINT = "/sse";
|
||||
public static final String MESSAGE_EVENT_TYPE = "message";
|
||||
public static final String ENDPOINT_EVENT_TYPE = "endpoint";
|
||||
private final ObjectMapper objectMapper;
|
||||
private final String messageEndpoint;
|
||||
private final String sseEndpoint;
|
||||
private final Map<String, McpServerSession> sessions;
|
||||
private final AtomicBoolean isClosing;
|
||||
private McpServerSession.Factory sessionFactory;
|
||||
|
||||
public HttpServletSseServerTransportProvider(String messageEndpoint, String sseEndpoint) {
|
||||
this.sessions = new ConcurrentHashMap<>();
|
||||
this.isClosing = new AtomicBoolean(false);
|
||||
this.objectMapper = JsonUtils.getObjectMapper();
|
||||
this.messageEndpoint = messageEndpoint;
|
||||
this.sseEndpoint = sseEndpoint;
|
||||
}
|
||||
|
||||
public HttpServletSseServerTransportProvider(String messageEndpoint) {
|
||||
this(messageEndpoint, "/sse");
|
||||
}
|
||||
|
||||
public void setSessionFactory(McpServerSession.Factory sessionFactory) {
|
||||
this.sessionFactory = sessionFactory;
|
||||
}
|
||||
|
||||
public Mono<Void> notifyClients(String method, Map<String, Object> params) {
|
||||
if (this.sessions.isEmpty()) {
|
||||
logger.debug("No active sessions to broadcast message to");
|
||||
return Mono.empty();
|
||||
} else {
|
||||
logger.debug("Attempting to broadcast message to {} active sessions", this.sessions.size());
|
||||
return Flux.fromIterable(this.sessions.values())
|
||||
.flatMap(
|
||||
(session) ->
|
||||
session
|
||||
.sendNotification(method, params)
|
||||
.doOnError(
|
||||
(e) ->
|
||||
logger.error(
|
||||
"Failed to send message to session {}: {}",
|
||||
session.getId(),
|
||||
e.getMessage()))
|
||||
.onErrorComplete())
|
||||
.then();
|
||||
}
|
||||
}
|
||||
|
||||
protected void doGet(HttpServletRequest request, HttpServletResponse response)
|
||||
throws ServletException, IOException {
|
||||
handleSseEvent(request, response);
|
||||
}
|
||||
|
||||
private void handleSseEvent(HttpServletRequest request, HttpServletResponse response)
|
||||
throws ServletException, IOException {
|
||||
String pathInfo = request.getPathInfo();
|
||||
if (!this.sseEndpoint.contains(pathInfo)) {
|
||||
response.sendError(404);
|
||||
} else if (this.isClosing.get()) {
|
||||
response.sendError(503, "Server is shutting down");
|
||||
} else {
|
||||
response.setContentType("text/event-stream");
|
||||
response.setCharacterEncoding("UTF-8");
|
||||
response.setHeader("Cache-Control", "no-cache");
|
||||
response.setHeader("Connection", "keep-alive");
|
||||
response.setHeader("Access-Control-Allow-Origin", "*");
|
||||
String sessionId = UUID.randomUUID().toString();
|
||||
AsyncContext asyncContext = request.startAsync();
|
||||
asyncContext.setTimeout(0L);
|
||||
PrintWriter writer = response.getWriter();
|
||||
HttpServletMcpSessionTransport sessionTransport =
|
||||
new HttpServletMcpSessionTransport(sessionId, asyncContext, writer);
|
||||
McpServerSession session = this.sessionFactory.create(sessionTransport);
|
||||
this.sessions.put(sessionId, session);
|
||||
this.sendEvent(writer, "endpoint", this.messageEndpoint + "?sessionId=" + sessionId);
|
||||
}
|
||||
}
|
||||
|
||||
protected void doPost(HttpServletRequest request, HttpServletResponse response)
|
||||
throws ServletException, IOException {
|
||||
if (this.isClosing.get()) {
|
||||
response.sendError(503, "Server is shutting down");
|
||||
} else {
|
||||
String requestURI = request.getRequestURI();
|
||||
if (!requestURI.endsWith(this.messageEndpoint)) {
|
||||
response.sendError(404);
|
||||
} else {
|
||||
String sessionId = request.getParameter("sessionId");
|
||||
if (sessionId == null) {
|
||||
response.setContentType("application/json");
|
||||
response.setCharacterEncoding("UTF-8");
|
||||
response.setStatus(400);
|
||||
String jsonError =
|
||||
this.objectMapper.writeValueAsString(
|
||||
new McpError("Session ID missing in message endpoint"));
|
||||
PrintWriter writer = response.getWriter();
|
||||
writer.write(jsonError);
|
||||
writer.flush();
|
||||
} else {
|
||||
McpServerSession session = (McpServerSession) this.sessions.get(sessionId);
|
||||
if (session == null) {
|
||||
response.setContentType("application/json");
|
||||
response.setCharacterEncoding("UTF-8");
|
||||
response.setStatus(404);
|
||||
String jsonError =
|
||||
this.objectMapper.writeValueAsString(
|
||||
new McpError("Session not found: " + sessionId));
|
||||
PrintWriter writer = response.getWriter();
|
||||
writer.write(jsonError);
|
||||
writer.flush();
|
||||
} else {
|
||||
try {
|
||||
BufferedReader reader = request.getReader();
|
||||
StringBuilder body = new StringBuilder();
|
||||
|
||||
String line;
|
||||
while ((line = reader.readLine()) != null) {
|
||||
body.append(line);
|
||||
}
|
||||
|
||||
McpSchema.JSONRPCMessage message =
|
||||
McpSchema.deserializeJsonRpcMessage(this.objectMapper, body.toString());
|
||||
session.handle(message).block();
|
||||
response.setStatus(200);
|
||||
} catch (Exception var11) {
|
||||
Exception e = var11;
|
||||
logger.error("Error processing message: {}", var11.getMessage());
|
||||
|
||||
try {
|
||||
McpError mcpError = new McpError(e.getMessage());
|
||||
response.setContentType("application/json");
|
||||
response.setCharacterEncoding("UTF-8");
|
||||
response.setStatus(500);
|
||||
String jsonError = this.objectMapper.writeValueAsString(mcpError);
|
||||
PrintWriter writer = response.getWriter();
|
||||
writer.write(jsonError);
|
||||
writer.flush();
|
||||
} catch (IOException ex) {
|
||||
logger.error("Failed to send error response: {}", ex.getMessage());
|
||||
response.sendError(500, "Error processing message");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public Mono<Void> closeGracefully() {
|
||||
this.isClosing.set(true);
|
||||
logger.debug("Initiating graceful shutdown with {} active sessions", this.sessions.size());
|
||||
return Flux.fromIterable(this.sessions.values())
|
||||
.flatMap(McpServerSession::closeGracefully)
|
||||
.then();
|
||||
}
|
||||
|
||||
private void sendEvent(PrintWriter writer, String eventType, String data) throws IOException {
|
||||
writer.write("event: " + eventType + "\n");
|
||||
writer.write("data: " + data + "\n\n");
|
||||
writer.flush();
|
||||
if (writer.checkError()) {
|
||||
throw new IOException("Client disconnected");
|
||||
}
|
||||
}
|
||||
|
||||
public void destroy() {
|
||||
this.closeGracefully().block();
|
||||
super.destroy();
|
||||
}
|
||||
|
||||
private class HttpServletMcpSessionTransport implements McpServerTransport {
|
||||
private final String sessionId;
|
||||
private final AsyncContext asyncContext;
|
||||
private final PrintWriter writer;
|
||||
|
||||
HttpServletMcpSessionTransport(
|
||||
String sessionId, AsyncContext asyncContext, PrintWriter writer) {
|
||||
this.sessionId = sessionId;
|
||||
this.asyncContext = asyncContext;
|
||||
this.writer = writer;
|
||||
HttpServletSseServerTransportProvider.logger.debug(
|
||||
"Session transport {} initialized with SSE writer", sessionId);
|
||||
}
|
||||
|
||||
public Mono<Void> sendMessage(McpSchema.JSONRPCMessage message) {
|
||||
return Mono.fromRunnable(
|
||||
() -> {
|
||||
try {
|
||||
String jsonText =
|
||||
HttpServletSseServerTransportProvider.this.objectMapper.writeValueAsString(
|
||||
message);
|
||||
HttpServletSseServerTransportProvider.this.sendEvent(
|
||||
this.writer, "message", jsonText);
|
||||
HttpServletSseServerTransportProvider.logger.debug(
|
||||
"Message sent to session {}", this.sessionId);
|
||||
} catch (Exception e) {
|
||||
HttpServletSseServerTransportProvider.logger.error(
|
||||
"Failed to send message to session {}: {}", this.sessionId, e.getMessage());
|
||||
HttpServletSseServerTransportProvider.this.sessions.remove(this.sessionId);
|
||||
this.asyncContext.complete();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public <T> T unmarshalFrom(Object data, TypeReference<T> typeRef) {
|
||||
return (T)
|
||||
HttpServletSseServerTransportProvider.this.objectMapper.convertValue(data, typeRef);
|
||||
}
|
||||
|
||||
public Mono<Void> closeGracefully() {
|
||||
return Mono.fromRunnable(
|
||||
() -> {
|
||||
HttpServletSseServerTransportProvider.logger.debug(
|
||||
"Closing session transport: {}", this.sessionId);
|
||||
|
||||
try {
|
||||
HttpServletSseServerTransportProvider.this.sessions.remove(this.sessionId);
|
||||
this.asyncContext.complete();
|
||||
HttpServletSseServerTransportProvider.logger.debug(
|
||||
"Successfully completed async context for session {}", this.sessionId);
|
||||
} catch (Exception e) {
|
||||
HttpServletSseServerTransportProvider.logger.warn(
|
||||
"Failed to complete async context for session {}: {}",
|
||||
this.sessionId,
|
||||
e.getMessage());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public void close() {
|
||||
try {
|
||||
HttpServletSseServerTransportProvider.this.sessions.remove(this.sessionId);
|
||||
this.asyncContext.complete();
|
||||
HttpServletSseServerTransportProvider.logger.debug(
|
||||
"Successfully completed async context for session {}", this.sessionId);
|
||||
} catch (Exception e) {
|
||||
HttpServletSseServerTransportProvider.logger.warn(
|
||||
"Failed to complete async context for session {}: {}", this.sessionId, e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,32 @@
|
||||
package org.openmetadata.service.mcp;
|
||||
|
||||
import static org.openmetadata.service.socket.SocketAddressFilter.validatePrefixedTokenRequest;
|
||||
|
||||
import java.io.IOException;
|
||||
import javax.servlet.Filter;
|
||||
import javax.servlet.FilterChain;
|
||||
import javax.servlet.ServletException;
|
||||
import javax.servlet.ServletRequest;
|
||||
import javax.servlet.ServletResponse;
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import org.openmetadata.service.security.JwtFilter;
|
||||
|
||||
public class McpAuthFilter implements Filter {
|
||||
private final JwtFilter jwtFilter;
|
||||
|
||||
public McpAuthFilter(JwtFilter filter) {
|
||||
this.jwtFilter = filter;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void doFilter(
|
||||
ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain)
|
||||
throws IOException, ServletException {
|
||||
HttpServletRequest httpServletRequest = (HttpServletRequest) servletRequest;
|
||||
String tokenWithType = httpServletRequest.getHeader("Authorization");
|
||||
validatePrefixedTokenRequest(jwtFilter, tokenWithType);
|
||||
|
||||
// Continue with the filter chain
|
||||
filterChain.doFilter(servletRequest, servletResponse);
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,168 @@
|
||||
package org.openmetadata.service.mcp;
|
||||
|
||||
import static org.openmetadata.service.search.SearchUtil.searchMetadata;
|
||||
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import io.dropwizard.jetty.MutableServletContextHandler;
|
||||
import io.dropwizard.setup.Environment;
|
||||
import io.modelcontextprotocol.server.McpServerFeatures;
|
||||
import io.modelcontextprotocol.server.McpSyncServer;
|
||||
import io.modelcontextprotocol.spec.McpSchema;
|
||||
import java.util.ArrayList;
|
||||
import java.util.EnumSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import javax.servlet.DispatcherType;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.eclipse.jetty.servlet.FilterHolder;
|
||||
import org.eclipse.jetty.servlet.ServletHolder;
|
||||
import org.openmetadata.common.utils.CommonUtil;
|
||||
import org.openmetadata.service.OpenMetadataApplicationConfig;
|
||||
import org.openmetadata.service.mcp.tools.CreateGlossaryTerm;
|
||||
import org.openmetadata.service.mcp.tools.PatchEntity;
|
||||
import org.openmetadata.service.security.JwtFilter;
|
||||
import org.openmetadata.service.util.EntityUtil;
|
||||
import org.openmetadata.service.util.JsonUtils;
|
||||
|
||||
@Slf4j
|
||||
public class McpServer {
|
||||
public McpServer() {}
|
||||
|
||||
public void initializeMcpServer(Environment environment, OpenMetadataApplicationConfig config) {
|
||||
McpSchema.ServerCapabilities serverCapabilities =
|
||||
McpSchema.ServerCapabilities.builder()
|
||||
.tools(true)
|
||||
.prompts(true)
|
||||
.resources(true, true)
|
||||
.build();
|
||||
|
||||
HttpServletSseServerTransportProvider transport =
|
||||
new HttpServletSseServerTransportProvider("/mcp/messages", "/mcp/sse");
|
||||
McpSyncServer server =
|
||||
io.modelcontextprotocol.server.McpServer.sync(transport)
|
||||
.serverInfo("openmetadata-mcp", "0.1.0")
|
||||
.capabilities(serverCapabilities)
|
||||
.build();
|
||||
|
||||
// Add resources, prompts, and tools to the MCP server
|
||||
addTools(server);
|
||||
|
||||
MutableServletContextHandler contextHandler = environment.getApplicationContext();
|
||||
ServletHolder servletHolder = new ServletHolder(transport);
|
||||
contextHandler.addServlet(servletHolder, "/mcp/*");
|
||||
|
||||
McpAuthFilter authFilter =
|
||||
new McpAuthFilter(
|
||||
new JwtFilter(
|
||||
config.getAuthenticationConfiguration(), config.getAuthorizerConfiguration()));
|
||||
contextHandler.addFilter(
|
||||
new FilterHolder(authFilter), "/mcp/*", EnumSet.of(DispatcherType.REQUEST));
|
||||
}
|
||||
|
||||
public void addTools(McpSyncServer server) {
|
||||
try {
|
||||
LOG.info("Loading tool definitions...");
|
||||
List<Map<String, Object>> cachedTools = loadToolsDefinitionsFromJson();
|
||||
if (cachedTools == null || cachedTools.isEmpty()) {
|
||||
LOG.error("No tool definitions were loaded!");
|
||||
throw new RuntimeException("Failed to load tool definitions");
|
||||
}
|
||||
LOG.info("Successfully loaded {} tool definitions", cachedTools.size());
|
||||
|
||||
for (Map<String, Object> toolDef : cachedTools) {
|
||||
try {
|
||||
String name = (String) toolDef.get("name");
|
||||
String description = (String) toolDef.get("description");
|
||||
Map<String, Object> schema = JsonUtils.getMap(toolDef.get("parameters"));
|
||||
server.addTool(getTool(JsonUtils.pojoToJson(schema), name, description));
|
||||
} catch (Exception e) {
|
||||
LOG.error("Error processing tool definition: {}", toolDef, e);
|
||||
}
|
||||
}
|
||||
LOG.info("Initializing request handlers...");
|
||||
} catch (Exception e) {
|
||||
LOG.error("Error during server startup", e);
|
||||
throw new RuntimeException("Failed to start MCP server", e);
|
||||
}
|
||||
}
|
||||
|
||||
protected List<Map<String, Object>> loadToolsDefinitionsFromJson() {
|
||||
String json = getJsonFromFile("json/data/mcp/tools.json");
|
||||
return loadToolDefinitionsFromJson(json);
|
||||
}
|
||||
|
||||
protected static String getJsonFromFile(String path) {
|
||||
try {
|
||||
return CommonUtil.getResourceAsStream(McpServer.class.getClassLoader(), path);
|
||||
} catch (Exception ex) {
|
||||
LOG.error("Error loading JSON file: {}", path, ex);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public List<Map<String, Object>> loadToolDefinitionsFromJson(String json) {
|
||||
try {
|
||||
LOG.info("Loaded tool definitions, content length: {}", json.length());
|
||||
LOG.info("Raw tools.json content: {}", json);
|
||||
|
||||
JsonNode toolsJson = JsonUtils.readTree(json);
|
||||
JsonNode toolsArray = toolsJson.get("tools");
|
||||
|
||||
if (toolsArray == null || !toolsArray.isArray()) {
|
||||
LOG.error("Invalid MCP tools file format. Expected 'tools' array.");
|
||||
return new ArrayList<>();
|
||||
}
|
||||
|
||||
List<Map<String, Object>> tools = new ArrayList<>();
|
||||
for (JsonNode toolNode : toolsArray) {
|
||||
String name = toolNode.get("name").asText();
|
||||
Map<String, Object> toolDef = JsonUtils.convertValue(toolNode, Map.class);
|
||||
tools.add(toolDef);
|
||||
LOG.info("Tool found: {} with definition: {}", name, toolDef);
|
||||
}
|
||||
|
||||
LOG.info("Found {} tool definitions", tools.size());
|
||||
return tools;
|
||||
} catch (Exception e) {
|
||||
LOG.error("Error loading tool definitions: {}", e.getMessage(), e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
private McpServerFeatures.SyncToolSpecification getTool(
|
||||
String schema, String toolName, String description) {
|
||||
McpSchema.Tool tool = new McpSchema.Tool(toolName, description, schema);
|
||||
|
||||
return new McpServerFeatures.SyncToolSpecification(
|
||||
tool,
|
||||
(exchange, arguments) -> {
|
||||
McpSchema.Content content =
|
||||
new McpSchema.TextContent(JsonUtils.pojoToJson(runMethod(toolName, arguments)));
|
||||
return new McpSchema.CallToolResult(List.of(content), false);
|
||||
});
|
||||
}
|
||||
|
||||
protected Object runMethod(String toolName, Map<String, Object> params) {
|
||||
Object result;
|
||||
switch (toolName) {
|
||||
case "search_metadata":
|
||||
result = searchMetadata(params);
|
||||
break;
|
||||
case "get_entity_details":
|
||||
result = EntityUtil.getEntityDetails(params);
|
||||
break;
|
||||
case "create_glossary_term":
|
||||
result = CreateGlossaryTerm.execute(params);
|
||||
break;
|
||||
case "patch_entity":
|
||||
result = PatchEntity.execute(params);
|
||||
break;
|
||||
default:
|
||||
result = Map.of("error", "Unknown function: " + toolName);
|
||||
break;
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,99 @@
|
||||
package org.openmetadata.service.mcp.tools;
|
||||
|
||||
import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty;
|
||||
import static org.openmetadata.service.Entity.ADMIN_USER_NAME;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.openmetadata.schema.entity.data.Glossary;
|
||||
import org.openmetadata.schema.entity.data.GlossaryTerm;
|
||||
import org.openmetadata.schema.entity.teams.User;
|
||||
import org.openmetadata.schema.type.EntityReference;
|
||||
import org.openmetadata.schema.type.Include;
|
||||
import org.openmetadata.service.Entity;
|
||||
import org.openmetadata.service.exception.EntityNotFoundException;
|
||||
import org.openmetadata.service.jdbi3.GlossaryRepository;
|
||||
import org.openmetadata.service.jdbi3.GlossaryTermRepository;
|
||||
import org.openmetadata.service.jdbi3.UserRepository;
|
||||
import org.openmetadata.service.resources.glossary.GlossaryTermMapper;
|
||||
import org.openmetadata.service.util.JsonUtils;
|
||||
import org.openmetadata.service.util.RestUtil;
|
||||
|
||||
@Slf4j
|
||||
public class CreateGlossaryTerm {
|
||||
private static GlossaryTermMapper glossaryTermMapper = new GlossaryTermMapper();
|
||||
|
||||
public static Map<String, Object> execute(Map<String, Object> params) {
|
||||
org.openmetadata.schema.api.data.CreateGlossaryTerm createGlossaryTerm =
|
||||
new org.openmetadata.schema.api.data.CreateGlossaryTerm();
|
||||
createGlossaryTerm.setGlossary((String) params.get("glossary"));
|
||||
createGlossaryTerm.setName((String) params.get("name"));
|
||||
createGlossaryTerm.setDescription((String) params.get("description"));
|
||||
// GlossaryTerm glossaryTerm = new GlossaryTerm();
|
||||
// glossaryTerm.setName((String) params.get("name"));
|
||||
// glossaryTerm.setDescription((String) params.get("description"));
|
||||
//
|
||||
// String glossaryName = (String) params.get("glossary");
|
||||
//
|
||||
// // Find Glossary
|
||||
// GlossaryRepository glossaryRepository = (GlossaryRepository)
|
||||
// Entity.getEntityRepository(Entity.GLOSSARY);
|
||||
// try {
|
||||
// Optional.ofNullable(glossaryRepository.getByName(null, glossaryName,
|
||||
// EntityUtil.Fields.EMPTY_FIELDS)).ifPresent(glossary ->
|
||||
// glossaryTerm.setGlossary(glossary.getEntityReference()));
|
||||
// } catch (EntityNotFoundException e) {
|
||||
// LOG.error(String.format("Glossary '%s' not found", glossaryName));
|
||||
// Map<String, Object> error = new HashMap<>();
|
||||
// error.put("error", e.getMessage());
|
||||
// return error;
|
||||
// }
|
||||
//
|
||||
// Find Owners
|
||||
UserRepository userRepository = Entity.getUserRepository();
|
||||
|
||||
List<EntityReference> owners = new java.util.ArrayList<>();
|
||||
|
||||
// TODO: Deal with Teams vs Users
|
||||
if (params.containsKey("owners")) {
|
||||
for (String owner : JsonUtils.readOrConvertValues(params.get("owners"), String.class)) {
|
||||
try {
|
||||
User user = userRepository.findByName(owner, Include.NON_DELETED);
|
||||
owners.add(user.getEntityReference());
|
||||
} catch (EntityNotFoundException e) {
|
||||
LOG.error(String.format("User '%s' not found", owner));
|
||||
Map<String, Object> error = new HashMap<>();
|
||||
error.put("error", e.getMessage());
|
||||
return error;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!owners.isEmpty()) {
|
||||
createGlossaryTerm.setOwners(owners);
|
||||
}
|
||||
|
||||
try {
|
||||
GlossaryRepository glossaryRepository =
|
||||
(GlossaryRepository) Entity.getEntityRepository(Entity.GLOSSARY);
|
||||
Glossary glossary =
|
||||
glossaryRepository.findByNameOrNull(createGlossaryTerm.getGlossary(), Include.ALL);
|
||||
GlossaryTerm glossaryTerm =
|
||||
glossaryTermMapper.createToEntity(createGlossaryTerm, ADMIN_USER_NAME);
|
||||
GlossaryTermRepository glossaryTermRepository =
|
||||
(GlossaryTermRepository) Entity.getEntityRepository(Entity.GLOSSARY_TERM);
|
||||
// TODO: Get the updatedBy from the tool request.
|
||||
glossaryTermRepository.prepare(glossaryTerm, nullOrEmpty(glossary));
|
||||
glossaryTermRepository.setFullyQualifiedName(glossaryTerm);
|
||||
RestUtil.PutResponse<GlossaryTerm> response =
|
||||
glossaryTermRepository.createOrUpdate(null, glossaryTerm, "admin");
|
||||
return JsonUtils.convertValue(response.getEntity(), Map.class);
|
||||
} catch (Exception e) {
|
||||
Map<String, Object> error = new HashMap<>();
|
||||
error.put("error", e.getMessage());
|
||||
return error;
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,25 @@
|
||||
package org.openmetadata.service.mcp.tools;
|
||||
|
||||
import java.util.Map;
|
||||
import javax.json.JsonPatch;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.openmetadata.schema.EntityInterface;
|
||||
import org.openmetadata.schema.type.change.ChangeSource;
|
||||
import org.openmetadata.service.Entity;
|
||||
import org.openmetadata.service.jdbi3.EntityRepository;
|
||||
import org.openmetadata.service.util.JsonUtils;
|
||||
import org.openmetadata.service.util.RestUtil;
|
||||
|
||||
@Slf4j
|
||||
public class PatchEntity {
|
||||
public static Map<String, Object> execute(Map<String, Object> params) {
|
||||
String entityType = (String) params.get("entityType");
|
||||
String entityFqn = (String) params.get("entityFqn");
|
||||
JsonPatch patch = JsonUtils.readOrConvertValue(params.get("patch"), JsonPatch.class);
|
||||
|
||||
EntityRepository<? extends EntityInterface> repository = Entity.getEntityRepository(entityType);
|
||||
RestUtil.PatchResponse<? extends EntityInterface> response =
|
||||
repository.patch(null, entityFqn, "admin", patch, ChangeSource.MANUAL);
|
||||
return JsonUtils.convertValue(response, Map.class);
|
||||
}
|
||||
}
|
||||
@ -1,6 +1,7 @@
|
||||
package org.openmetadata.service.resources.glossary;
|
||||
|
||||
import static org.openmetadata.service.util.EntityUtil.getEntityReference;
|
||||
import static org.openmetadata.service.util.EntityUtil.getEntityReferenceByName;
|
||||
import static org.openmetadata.service.util.EntityUtil.getEntityReferences;
|
||||
|
||||
import org.openmetadata.schema.api.data.CreateGlossaryTerm;
|
||||
@ -14,7 +15,7 @@ public class GlossaryTermMapper implements EntityMapper<GlossaryTerm, CreateGlos
|
||||
return copy(new GlossaryTerm(), create, user)
|
||||
.withSynonyms(create.getSynonyms())
|
||||
.withStyle(create.getStyle())
|
||||
.withGlossary(getEntityReference(Entity.GLOSSARY, create.getGlossary()))
|
||||
.withGlossary(getEntityReferenceByName(Entity.GLOSSARY, create.getGlossary()))
|
||||
.withParent(getEntityReference(Entity.GLOSSARY_TERM, create.getParent()))
|
||||
.withRelatedTerms(getEntityReferences(Entity.GLOSSARY_TERM, create.getRelatedTerms()))
|
||||
.withReferences(create.getReferences())
|
||||
|
||||
@ -4,6 +4,7 @@ import static org.openmetadata.service.search.SearchUtil.isDataAssetIndex;
|
||||
import static org.openmetadata.service.search.SearchUtil.isDataQualityIndex;
|
||||
import static org.openmetadata.service.search.SearchUtil.isServiceIndex;
|
||||
import static org.openmetadata.service.search.SearchUtil.isTimeSeriesIndex;
|
||||
import static org.openmetadata.service.search.SearchUtil.mapEntityTypesToIndexNames;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
@ -122,29 +123,7 @@ public interface SearchSourceBuilderFactory<S, Q, H, F> {
|
||||
|
||||
default AssetTypeConfiguration findAssetTypeConfig(
|
||||
String indexName, SearchSettings searchSettings) {
|
||||
String assetType =
|
||||
switch (indexName) {
|
||||
case "topic_search_index", Entity.TOPIC -> Entity.TOPIC;
|
||||
case "dashboard_search_index", Entity.DASHBOARD -> Entity.DASHBOARD;
|
||||
case "pipeline_search_index", Entity.PIPELINE -> Entity.PIPELINE;
|
||||
case "mlmodel_search_index", Entity.MLMODEL -> Entity.MLMODEL;
|
||||
case "table_search_index", Entity.TABLE -> Entity.TABLE;
|
||||
case "database_search_index", Entity.DATABASE -> Entity.DATABASE;
|
||||
case "database_schema_search_index", Entity.DATABASE_SCHEMA -> Entity.DATABASE_SCHEMA;
|
||||
case "container_search_index", Entity.CONTAINER -> Entity.CONTAINER;
|
||||
case "query_search_index", Entity.QUERY -> Entity.QUERY;
|
||||
case "stored_procedure_search_index", Entity.STORED_PROCEDURE -> Entity.STORED_PROCEDURE;
|
||||
case "dashboard_data_model_search_index", Entity.DASHBOARD_DATA_MODEL -> Entity
|
||||
.DASHBOARD_DATA_MODEL;
|
||||
case "api_endpoint_search_index", Entity.API_ENDPOINT -> Entity.API_ENDPOINT;
|
||||
case "search_entity_search_index", Entity.SEARCH_INDEX -> Entity.SEARCH_INDEX;
|
||||
case "tag_search_index", Entity.TAG -> Entity.TAG;
|
||||
case "glossary_term_search_index", Entity.GLOSSARY_TERM -> Entity.GLOSSARY_TERM;
|
||||
case "domain_search_index", Entity.DOMAIN -> Entity.DOMAIN;
|
||||
case "data_product_search_index", Entity.DATA_PRODUCT -> Entity.DATA_PRODUCT;
|
||||
default -> "default";
|
||||
};
|
||||
|
||||
String assetType = mapEntityTypesToIndexNames(indexName);
|
||||
return searchSettings.getAssetTypeConfigurations().stream()
|
||||
.filter(config -> config.getAssetType().equals(assetType))
|
||||
.findFirst()
|
||||
|
||||
@ -1,9 +1,43 @@
|
||||
package org.openmetadata.service.search;
|
||||
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.stream.Collectors;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.openmetadata.schema.search.SearchRequest;
|
||||
import org.openmetadata.service.Entity;
|
||||
import org.openmetadata.service.util.JsonUtils;
|
||||
|
||||
@Slf4j
|
||||
public class SearchUtil {
|
||||
|
||||
private static final List<String> IGNORE_SEARCH_KEYS =
|
||||
List.of(
|
||||
"id",
|
||||
"version",
|
||||
"updatedAt",
|
||||
"updatedBy",
|
||||
"usageSummary",
|
||||
"followers",
|
||||
"deleted",
|
||||
"votes",
|
||||
"lifeCycle",
|
||||
"sourceHash",
|
||||
"processedLineage",
|
||||
"totalVotes",
|
||||
"fqnParts",
|
||||
"service_suggest",
|
||||
"column_suggest",
|
||||
"schema_suggest",
|
||||
"database_suggest",
|
||||
"upstreamLineage",
|
||||
"entityRelationship",
|
||||
"changeSummary",
|
||||
"fqnHash");
|
||||
|
||||
/**
|
||||
* Check if the index is a data asset index
|
||||
* @param indexName name of the index to check
|
||||
@ -84,4 +118,126 @@ public class SearchUtil {
|
||||
default -> false;
|
||||
};
|
||||
}
|
||||
|
||||
public static String mapEntityTypesToIndexNames(String indexName) {
|
||||
return switch (indexName) {
|
||||
case "topic_search_index", Entity.TOPIC -> Entity.TOPIC;
|
||||
case "dashboard_search_index", Entity.DASHBOARD -> Entity.DASHBOARD;
|
||||
case "pipeline_search_index", Entity.PIPELINE -> Entity.PIPELINE;
|
||||
case "mlmodel_search_index", Entity.MLMODEL -> Entity.MLMODEL;
|
||||
case "table_search_index", Entity.TABLE -> Entity.TABLE;
|
||||
case "database_search_index", Entity.DATABASE -> Entity.DATABASE;
|
||||
case "database_schema_search_index", Entity.DATABASE_SCHEMA -> Entity.DATABASE_SCHEMA;
|
||||
case "container_search_index", Entity.CONTAINER -> Entity.CONTAINER;
|
||||
case "query_search_index", Entity.QUERY -> Entity.QUERY;
|
||||
case "stored_procedure_search_index", Entity.STORED_PROCEDURE -> Entity.STORED_PROCEDURE;
|
||||
case "dashboard_data_model_search_index", Entity.DASHBOARD_DATA_MODEL -> Entity
|
||||
.DASHBOARD_DATA_MODEL;
|
||||
case "api_endpoint_search_index", Entity.API_ENDPOINT -> Entity.API_ENDPOINT;
|
||||
case "search_entity_search_index", Entity.SEARCH_INDEX -> Entity.SEARCH_INDEX;
|
||||
case "tag_search_index", Entity.TAG -> Entity.TAG;
|
||||
case "glossary_term_search_index", Entity.GLOSSARY_TERM -> Entity.GLOSSARY_TERM;
|
||||
case "domain_search_index", Entity.DOMAIN -> Entity.DOMAIN;
|
||||
case "data_product_search_index", Entity.DATA_PRODUCT -> Entity.DATA_PRODUCT;
|
||||
default -> "default";
|
||||
};
|
||||
}
|
||||
|
||||
public static List<Object> searchMetadata(Map<String, Object> params) {
|
||||
try {
|
||||
LOG.info("Executing searchMetadata with params: {}", params);
|
||||
String query = params.containsKey("query") ? (String) params.get("query") : "*";
|
||||
int limit = 10;
|
||||
if (params.containsKey("limit")) {
|
||||
Object limitObj = params.get("limit");
|
||||
if (limitObj instanceof Number) {
|
||||
limit = ((Number) limitObj).intValue();
|
||||
} else if (limitObj instanceof String) {
|
||||
limit = Integer.parseInt((String) limitObj);
|
||||
}
|
||||
}
|
||||
|
||||
boolean includeDeleted = false;
|
||||
if (params.containsKey("include_deleted")) {
|
||||
Object deletedObj = params.get("include_deleted");
|
||||
if (deletedObj instanceof Boolean) {
|
||||
includeDeleted = (Boolean) deletedObj;
|
||||
} else if (deletedObj instanceof String) {
|
||||
includeDeleted = "true".equals(deletedObj);
|
||||
}
|
||||
}
|
||||
|
||||
String entityType =
|
||||
params.containsKey("entity_type") ? (String) params.get("entity_type") : null;
|
||||
String index =
|
||||
(entityType != null && !entityType.isEmpty())
|
||||
? mapEntityTypesToIndexNames(entityType)
|
||||
: Entity.TABLE;
|
||||
|
||||
LOG.info(
|
||||
"Search query: {}, index: {}, limit: {}, includeDeleted: {}",
|
||||
query,
|
||||
index,
|
||||
limit,
|
||||
includeDeleted);
|
||||
|
||||
SearchRequest searchRequest =
|
||||
new SearchRequest()
|
||||
.withQuery(query)
|
||||
.withIndex(index)
|
||||
.withSize(limit)
|
||||
.withFrom(0)
|
||||
.withFetchSource(true)
|
||||
.withDeleted(includeDeleted);
|
||||
|
||||
javax.ws.rs.core.Response response = Entity.getSearchRepository().search(searchRequest, null);
|
||||
|
||||
Map<String, Object> searchResponse;
|
||||
if (response.getEntity() instanceof String responseStr) {
|
||||
LOG.info("Search returned string response");
|
||||
JsonNode jsonNode = JsonUtils.readTree(responseStr);
|
||||
searchResponse = JsonUtils.convertValue(jsonNode, Map.class);
|
||||
} else {
|
||||
LOG.info("Search returned object response: {}", response.getEntity().getClass().getName());
|
||||
searchResponse = JsonUtils.convertValue(response.getEntity(), Map.class);
|
||||
}
|
||||
return cleanSearchResponse(searchResponse);
|
||||
} catch (Exception e) {
|
||||
LOG.error("Error in searchMetadata", e);
|
||||
return Collections.emptyList();
|
||||
}
|
||||
}
|
||||
|
||||
public static List<Object> cleanSearchResponse(Map<String, Object> searchResponse) {
|
||||
if (searchResponse == null) return Collections.emptyList();
|
||||
|
||||
Map<String, Object> topHits = safeGetMap(searchResponse.get("hits"));
|
||||
if (topHits == null) return Collections.emptyList();
|
||||
|
||||
List<Object> hits = safeGetList(topHits.get("hits"));
|
||||
if (hits == null) return Collections.emptyList();
|
||||
|
||||
return hits.stream()
|
||||
.map(SearchUtil::safeGetMap)
|
||||
.filter(Objects::nonNull)
|
||||
.map(
|
||||
hit -> {
|
||||
Map<String, Object> source = safeGetMap(hit.get("_source"));
|
||||
if (source == null) return null;
|
||||
IGNORE_SEARCH_KEYS.forEach(source::remove);
|
||||
return source;
|
||||
})
|
||||
.filter(Objects::nonNull)
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private static Map<String, Object> safeGetMap(Object obj) {
|
||||
return (obj instanceof Map) ? (Map<String, Object>) obj : null;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private static List<Object> safeGetList(Object obj) {
|
||||
return (obj instanceof List) ? (List<Object>) obj : null;
|
||||
}
|
||||
}
|
||||
|
||||
@ -19,7 +19,6 @@ import java.io.IOException;
|
||||
import java.util.Map;
|
||||
import javax.servlet.Filter;
|
||||
import javax.servlet.FilterChain;
|
||||
import javax.servlet.FilterConfig;
|
||||
import javax.servlet.ServletRequest;
|
||||
import javax.servlet.ServletResponse;
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
@ -47,9 +46,6 @@ public class SocketAddressFilter implements Filter {
|
||||
enableSecureSocketConnection = false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void destroy() {}
|
||||
|
||||
@Override
|
||||
public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
|
||||
throws IOException {
|
||||
@ -76,9 +72,6 @@ public class SocketAddressFilter implements Filter {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init(FilterConfig filterConfig) {}
|
||||
|
||||
public static void validatePrefixedTokenRequest(JwtFilter jwtFilter, String prefixedToken) {
|
||||
String token = JwtFilter.extractToken(prefixedToken);
|
||||
Map<String, Claim> claims = jwtFilter.validateJwtAndGetClaims(token);
|
||||
|
||||
@ -34,6 +34,7 @@ import java.util.Comparator;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.TreeSet;
|
||||
import java.util.UUID;
|
||||
@ -576,6 +577,10 @@ public final class EntityUtil {
|
||||
: new EntityReference().withType(entityType).withFullyQualifiedName(fqn);
|
||||
}
|
||||
|
||||
public static EntityReference getEntityReferenceByName(String entityType, String fqn) {
|
||||
return fqn == null ? null : Entity.getEntityReferenceByName(entityType, fqn, ALL);
|
||||
}
|
||||
|
||||
public static List<EntityReference> getEntityReferences(String entityType, List<String> fqns) {
|
||||
if (nullOrEmpty(fqns)) {
|
||||
return null;
|
||||
@ -794,4 +799,19 @@ public final class EntityUtil {
|
||||
&& changeDescription.getFieldsUpdated().isEmpty()
|
||||
&& changeDescription.getFieldsDeleted().isEmpty();
|
||||
}
|
||||
|
||||
public static Object getEntityDetails(Map<String, Object> params) {
|
||||
try {
|
||||
String entityType = (String) params.get("entity_type");
|
||||
String fqn = (String) params.get("fqn");
|
||||
|
||||
LOG.info("Getting details for entity type: {}, FQN: {}", entityType, fqn);
|
||||
String fields = "*";
|
||||
Object entity = Entity.getEntityByName(entityType, fqn, fields, null);
|
||||
return entity;
|
||||
} catch (Exception e) {
|
||||
LOG.error("Error getting entity details", e);
|
||||
return Map.of("error", e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
110
openmetadata-service/src/main/resources/json/data/mcp/tools.json
Normal file
110
openmetadata-service/src/main/resources/json/data/mcp/tools.json
Normal file
@ -0,0 +1,110 @@
|
||||
{
|
||||
"tools": [
|
||||
{
|
||||
"name": "search_metadata",
|
||||
"description": "Find your data and business terms in OpenMetadata. For example if the user asks to 'find tables that contain customers information', then 'customers' should be the query, and the entity_type should be 'table'.",
|
||||
"parameters": {
|
||||
"description": "The search query to find metadata in the OpenMetadata catalog, entity type could be table, topic etc. Limit can be used to paginate on the data.",
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"query": {
|
||||
"type": "string",
|
||||
"description": "Keywords to use for searching."
|
||||
},
|
||||
"entity_type": {
|
||||
"type": "string",
|
||||
"description": "Optional entity type to filter results. The OpenMetadata entities are categorized as follows: Service Entities include databaseService, messagingService, apiService, dashboardService, pipelineService, storageService, mlmodelService, metadataService, and searchService; Data Asset Entities include apiCollection, apiEndpoint, table, storedProcedure, database, databaseSchema, dashboard, dashboardDataModel, pipeline, chart, topic, searchIndex, mlmodel, and container; User Entities include user and team; Domain entities include domain and dataProduct; and Governance entities include metric, glossary, and glossaryTerm."
|
||||
},
|
||||
"limit": {
|
||||
"type": "integer",
|
||||
"description": "Maximum number of results to return. Default is 10. Try to keep this number low unless the user asks for more."
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"query"
|
||||
]
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "get_entity_details",
|
||||
"description": "Get detailed information about a specific entity",
|
||||
"parameters": {
|
||||
"description": "Fqn is the fully qualified name of the entity. Entity type could be table, topic etc.",
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"entity_type": {
|
||||
"type": "string",
|
||||
"description": "Type of entity"
|
||||
},
|
||||
"fqn": {
|
||||
"type": "string",
|
||||
"description": "Fully qualified name of the entity"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"entity_type",
|
||||
"fqn"
|
||||
]
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "create_glossary_term",
|
||||
"description": "Creates a new Glossary Term",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"glossary": {
|
||||
"type": "string",
|
||||
"description": "Glossary in which the term belongs. This should be its fully qualified name."
|
||||
},
|
||||
"name": {
|
||||
"type": "string",
|
||||
"description": "Glossary Term name."
|
||||
},
|
||||
"description": {
|
||||
"type": "string",
|
||||
"description": "Glossary Term description."
|
||||
},
|
||||
"owners": {
|
||||
"type": "array",
|
||||
"description": "Glossary Term owner. This should be an OpenMetadata User",
|
||||
"items": {
|
||||
"type": "string"
|
||||
}
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"glossary",
|
||||
"name",
|
||||
"description"
|
||||
]
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "patch_entity",
|
||||
"description": "Patches an Entity based on a JSONPatch. Beforehand the Entity should be validated by finding it and creating a proper patch.",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"entityType": {
|
||||
"type": "string",
|
||||
"description": "Entity Type to patch."
|
||||
},
|
||||
"entityFqn": {
|
||||
"type": "string",
|
||||
"description": "Fully Qualified Name of the Entity to be patched."
|
||||
},
|
||||
"patch": {
|
||||
"type": "string",
|
||||
"description": "JSONPatch as String format."
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"entityType",
|
||||
"entityFqn",
|
||||
"patch"
|
||||
]
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
@ -0,0 +1,77 @@
|
||||
{
|
||||
"$id": "https://open-metadata.org/schema/api/mcp/mcpToolDefinition.json",
|
||||
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||
"title": "MCP Tool Definition",
|
||||
"description": "Definition of a tool available in the Model Context Protocol",
|
||||
"type": "object",
|
||||
"javaType": "org.openmetadata.schema.api.mcp.MCPToolDefinition",
|
||||
"properties": {
|
||||
"name": {
|
||||
"description": "Name of the tool",
|
||||
"type": "string"
|
||||
},
|
||||
"description": {
|
||||
"description": "Description of what the tool does",
|
||||
"type": "string"
|
||||
},
|
||||
"parameters": {
|
||||
"description": "Definition of tool parameters",
|
||||
"$ref": "#/definitions/toolParameters"
|
||||
}
|
||||
},
|
||||
"required": ["name", "description", "parameters"],
|
||||
"definitions": {
|
||||
"toolParameters": {
|
||||
"description": "Tool parameter definitions",
|
||||
"type": "object",
|
||||
"javaType": "org.openmetadata.schema.api.mcp.MCPToolParameters",
|
||||
"properties": {
|
||||
"type": {
|
||||
"description": "Type of parameter schema",
|
||||
"type": "string",
|
||||
"default": "object"
|
||||
},
|
||||
"properties": {
|
||||
"description": "Parameter properties",
|
||||
"type": "object",
|
||||
"additionalProperties": {
|
||||
"$ref": "#/definitions/toolParameter"
|
||||
}
|
||||
},
|
||||
"required": {
|
||||
"description": "List of required parameters",
|
||||
"type": "array",
|
||||
"items": {
|
||||
"type": "string"
|
||||
}
|
||||
}
|
||||
},
|
||||
"required": ["properties"]
|
||||
},
|
||||
"toolParameter": {
|
||||
"description": "Individual tool parameter definition",
|
||||
"type": "object",
|
||||
"javaType": "org.openmetadata.schema.api.mcp.MCPToolParameter",
|
||||
"properties": {
|
||||
"type": {
|
||||
"description": "Type of parameter",
|
||||
"type": "string",
|
||||
"enum": ["string", "number", "integer", "boolean", "array", "object"]
|
||||
},
|
||||
"description": {
|
||||
"description": "Description of the parameter",
|
||||
"type": "string"
|
||||
},
|
||||
"enum": {
|
||||
"description": "Possible enum values for this parameter",
|
||||
"type": "array",
|
||||
"items": {}
|
||||
},
|
||||
"default": {
|
||||
"description": "Default value for this parameter"
|
||||
}
|
||||
},
|
||||
"required": ["type", "description"]
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,352 @@
|
||||
{
|
||||
"$id": "https://open-metadata.org/schema/api/mcp/mcpTools.json",
|
||||
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||
"title": "MCP Tools",
|
||||
"description": "Central definition of all tools available in the Model Context Protocol",
|
||||
"type": "object",
|
||||
"javaType": "org.openmetadata.schema.api.mcp.MCPTools",
|
||||
"definitions": {
|
||||
"searchMetadata": {
|
||||
"description": "Search for metadata entities in OpenMetadata",
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"tool": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"name": {
|
||||
"type": "string",
|
||||
"default": "search_metadata"
|
||||
},
|
||||
"description": {
|
||||
"type": "string",
|
||||
"default": "Search for metadata entities in OpenMetadata based on keywords or phrases"
|
||||
},
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"type": {
|
||||
"type": "string",
|
||||
"default": "object"
|
||||
},
|
||||
"properties": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"query": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"type": {
|
||||
"type": "string",
|
||||
"default": "string"
|
||||
},
|
||||
"description": {
|
||||
"type": "string",
|
||||
"default": "The search query or keywords to find relevant metadata"
|
||||
}
|
||||
}
|
||||
},
|
||||
"entity_type": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"type": {
|
||||
"type": "string",
|
||||
"default": "string"
|
||||
},
|
||||
"description": {
|
||||
"type": "string",
|
||||
"default": "Optional entity type to filter results (e.g., 'table', 'dashboard', 'topic')"
|
||||
}
|
||||
}
|
||||
},
|
||||
"limit": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"type": {
|
||||
"type": "string",
|
||||
"default": "integer"
|
||||
},
|
||||
"description": {
|
||||
"type": "string",
|
||||
"default": "Maximum number of results to return (default: 10)"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"required": {
|
||||
"type": "array",
|
||||
"items": {
|
||||
"type": "string"
|
||||
},
|
||||
"default": ["query"]
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"getEntityDetails": {
|
||||
"description": "Get details about a specific entity",
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"tool": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"name": {
|
||||
"type": "string",
|
||||
"default": "get_entity_details"
|
||||
},
|
||||
"description": {
|
||||
"type": "string",
|
||||
"default": "Get detailed information about a specific entity when you know its fully qualified name (FQN)"
|
||||
},
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"type": {
|
||||
"type": "string",
|
||||
"default": "object"
|
||||
},
|
||||
"properties": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"entity_type": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"type": {
|
||||
"type": "string",
|
||||
"default": "string"
|
||||
},
|
||||
"description": {
|
||||
"type": "string",
|
||||
"default": "The type of entity (e.g., 'table', 'dashboard', 'topic')"
|
||||
}
|
||||
}
|
||||
},
|
||||
"fqn": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"type": {
|
||||
"type": "string",
|
||||
"default": "string"
|
||||
},
|
||||
"description": {
|
||||
"type": "string",
|
||||
"default": "The fully qualified name of the entity"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"required": {
|
||||
"type": "array",
|
||||
"items": {
|
||||
"type": "string"
|
||||
},
|
||||
"default": ["entity_type", "fqn"]
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"nlqSearch": {
|
||||
"description": "Search with natural language",
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"tool": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"name": {
|
||||
"type": "string",
|
||||
"default": "nlq_search"
|
||||
},
|
||||
"description": {
|
||||
"type": "string",
|
||||
"default": "Search OpenMetadata using natural language queries"
|
||||
},
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"type": {
|
||||
"type": "string",
|
||||
"default": "object"
|
||||
},
|
||||
"properties": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"query": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"type": {
|
||||
"type": "string",
|
||||
"default": "string"
|
||||
},
|
||||
"description": {
|
||||
"type": "string",
|
||||
"default": "Natural language query"
|
||||
}
|
||||
}
|
||||
},
|
||||
"entity_type": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"type": {
|
||||
"type": "string",
|
||||
"default": "string"
|
||||
},
|
||||
"description": {
|
||||
"type": "string",
|
||||
"default": "Entity type to search in (default: table)"
|
||||
}
|
||||
}
|
||||
},
|
||||
"limit": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"type": {
|
||||
"type": "string",
|
||||
"default": "integer"
|
||||
},
|
||||
"description": {
|
||||
"type": "string",
|
||||
"default": "Maximum number of results to return (default: 10)"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"required": {
|
||||
"type": "array",
|
||||
"items": {
|
||||
"type": "string"
|
||||
},
|
||||
"default": ["query"]
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"advancedSearch": {
|
||||
"description": "Advanced search with filters",
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"tool": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"name": {
|
||||
"type": "string",
|
||||
"default": "advanced_search"
|
||||
},
|
||||
"description": {
|
||||
"type": "string",
|
||||
"default": "Perform advanced search with multiple filters and conditions"
|
||||
},
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"type": {
|
||||
"type": "string",
|
||||
"default": "object"
|
||||
},
|
||||
"properties": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"query": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"type": {
|
||||
"type": "string",
|
||||
"default": "string"
|
||||
},
|
||||
"description": {
|
||||
"type": "string",
|
||||
"default": "Base search query"
|
||||
}
|
||||
}
|
||||
},
|
||||
"entity_type": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"type": {
|
||||
"type": "string",
|
||||
"default": "string"
|
||||
},
|
||||
"description": {
|
||||
"type": "string",
|
||||
"default": "Entity type to search in"
|
||||
}
|
||||
}
|
||||
},
|
||||
"filters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"type": {
|
||||
"type": "string",
|
||||
"default": "object"
|
||||
},
|
||||
"description": {
|
||||
"type": "string",
|
||||
"default": "Additional filters to apply (key-value pairs)"
|
||||
}
|
||||
}
|
||||
},
|
||||
"sort_field": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"type": {
|
||||
"type": "string",
|
||||
"default": "string"
|
||||
},
|
||||
"description": {
|
||||
"type": "string",
|
||||
"default": "Field to sort results by"
|
||||
}
|
||||
}
|
||||
},
|
||||
"sort_order": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"type": {
|
||||
"type": "string",
|
||||
"default": "string"
|
||||
},
|
||||
"description": {
|
||||
"type": "string",
|
||||
"default": "Sort order (asc or desc)"
|
||||
}
|
||||
}
|
||||
},
|
||||
"limit": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"type": {
|
||||
"type": "string",
|
||||
"default": "integer"
|
||||
},
|
||||
"description": {
|
||||
"type": "string",
|
||||
"default": "Maximum number of results to return (default: 10)"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"required": {
|
||||
"type": "array",
|
||||
"items": {
|
||||
"type": "string"
|
||||
},
|
||||
"default": ["query"]
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,86 @@
|
||||
/*
|
||||
* Copyright 2025 Collate.
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
/**
|
||||
* Definition of a tool available in the Model Context Protocol
|
||||
*/
|
||||
export interface MCPToolDefinition {
|
||||
/**
|
||||
* Description of what the tool does
|
||||
*/
|
||||
description: string;
|
||||
/**
|
||||
* Name of the tool
|
||||
*/
|
||||
name: string;
|
||||
/**
|
||||
* Definition of tool parameters
|
||||
*/
|
||||
parameters: ToolParameters;
|
||||
[property: string]: any;
|
||||
}
|
||||
|
||||
/**
|
||||
* Definition of tool parameters
|
||||
*
|
||||
* Tool parameter definitions
|
||||
*/
|
||||
export interface ToolParameters {
|
||||
/**
|
||||
* Parameter properties
|
||||
*/
|
||||
properties: { [key: string]: ToolParameter };
|
||||
/**
|
||||
* List of required parameters
|
||||
*/
|
||||
required?: string[];
|
||||
/**
|
||||
* Type of parameter schema
|
||||
*/
|
||||
type?: string;
|
||||
[property: string]: any;
|
||||
}
|
||||
|
||||
/**
|
||||
* Individual tool parameter definition
|
||||
*/
|
||||
export interface ToolParameter {
|
||||
/**
|
||||
* Default value for this parameter
|
||||
*/
|
||||
default?: any;
|
||||
/**
|
||||
* Description of the parameter
|
||||
*/
|
||||
description: string;
|
||||
/**
|
||||
* Possible enum values for this parameter
|
||||
*/
|
||||
enum?: any[];
|
||||
/**
|
||||
* Type of parameter
|
||||
*/
|
||||
type: Type;
|
||||
[property: string]: any;
|
||||
}
|
||||
|
||||
/**
|
||||
* Type of parameter
|
||||
*/
|
||||
export enum Type {
|
||||
Array = "array",
|
||||
Boolean = "boolean",
|
||||
Integer = "integer",
|
||||
Number = "number",
|
||||
Object = "object",
|
||||
String = "string",
|
||||
}
|
||||
14
pom.xml
14
pom.xml
@ -111,7 +111,6 @@
|
||||
<opensearch.version>2.6.0</opensearch.version>
|
||||
<httpasyncclient.version>4.1.5</httpasyncclient.version>
|
||||
<openapiswagger.version>2.2.25</openapiswagger.version>
|
||||
|
||||
<httpclient.version>4.5.14</httpclient.version>
|
||||
<spring.version>6.1.14</spring.version>
|
||||
<log4j.version>2.21.0</log4j.version>
|
||||
@ -155,6 +154,7 @@
|
||||
<everit.version>1.14.4</everit.version>
|
||||
<json-patch.version>1.13</json-patch.version>
|
||||
<jetty-server.version>9.4.57.v20241219</jetty-server.version>
|
||||
<mcp-sdk.version>0.8.1</mcp-sdk.version>
|
||||
</properties>
|
||||
|
||||
<dependencyManagement>
|
||||
@ -214,6 +214,18 @@
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
<!-- https://mvnrepository.com/artifact/io.modelcontextprotocol.sdk/mcp-bom -->
|
||||
<dependency>
|
||||
<groupId>io.modelcontextprotocol.sdk</groupId>
|
||||
<artifactId>mcp-bom</artifactId>
|
||||
<version>${mcp-sdk.version}</version>
|
||||
<type>pom</type>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.modelcontextprotocol.sdk</groupId>
|
||||
<artifactId>mcp</artifactId>
|
||||
<version>${mcp-sdk.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.eclipse.jetty</groupId>
|
||||
<artifactId>jetty-server</artifactId>
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user