diff --git a/conf/openmetadata.yaml b/conf/openmetadata.yaml
index d3275a89266..dff1f326091 100644
--- a/conf/openmetadata.yaml
+++ b/conf/openmetadata.yaml
@@ -31,6 +31,8 @@ server:
- type: http
bindHost: ${SERVER_HOST:-0.0.0.0}
port: ${SERVER_ADMIN_PORT:-8586}
+ gzip:
+ syncFlush: true
maxThreads: ${SERVER_MAX_THREADS:-50}
minThreads: ${SERVER_MIN_THREADS:-10}
@@ -433,9 +435,14 @@ web:
option: ${WEB_CONF_PERMISSION_POLICY_OPTION:-""}
cache-control: ${WEB_CONF_CACHE_CONTROL:-""}
pragma: ${WEB_CONF_PRAGMA:-""}
-
+
operationalConfig:
enable: ${OPERATIONAL_CONFIG_ENABLED:-true}
operationsConfigFile: ${OPERATIONAL_CONFIG_FILE:-"./conf/operations.yaml"}
+
+mcpConfiguration:
+ enabled: ${MCP_ENABLED:-true}
+ path: ${MCP_PATH:-"/mcp"}
+ mcpServerVersion: ${MCP_SERVER_VERSION:-"1.0.0"}
diff --git a/openmetadata-service/pom.xml b/openmetadata-service/pom.xml
index 5dfe1b4825f..906d4391a4a 100644
--- a/openmetadata-service/pom.xml
+++ b/openmetadata-service/pom.xml
@@ -606,6 +606,15 @@
socket.io-server
4.0.1
+
+ io.modelcontextprotocol.sdk
+ mcp-bom
+ pom
+
+
+ io.modelcontextprotocol.sdk
+ mcp
+
io.socket
engine.io-server-jetty
diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/OpenMetadataApplication.java b/openmetadata-service/src/main/java/org/openmetadata/service/OpenMetadataApplication.java
index 3f9e7bbc0bc..95ab7b76654 100644
--- a/openmetadata-service/src/main/java/org/openmetadata/service/OpenMetadataApplication.java
+++ b/openmetadata-service/src/main/java/org/openmetadata/service/OpenMetadataApplication.java
@@ -91,6 +91,7 @@ import org.openmetadata.service.jobs.JobDAO;
import org.openmetadata.service.jobs.JobHandlerRegistry;
import org.openmetadata.service.limits.DefaultLimits;
import org.openmetadata.service.limits.Limits;
+import org.openmetadata.service.mcp.McpServer;
import org.openmetadata.service.migration.Migration;
import org.openmetadata.service.migration.MigrationValidationClient;
import org.openmetadata.service.migration.api.MigrationWorkflow;
@@ -264,6 +265,9 @@ public class OpenMetadataApplication extends Application temporarySSLConfig =
diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/config/MCPConfiguration.java b/openmetadata-service/src/main/java/org/openmetadata/service/config/MCPConfiguration.java
new file mode 100644
index 00000000000..d0a0b499963
--- /dev/null
+++ b/openmetadata-service/src/main/java/org/openmetadata/service/config/MCPConfiguration.java
@@ -0,0 +1,21 @@
+package org.openmetadata.service.config;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import lombok.Getter;
+import lombok.Setter;
+
+@Getter
+@Setter
+public class MCPConfiguration {
+ @JsonProperty("mcpServerName")
+ private String mcpServerName = "openmetadata-mcp-server";
+
+ @JsonProperty("mcpServerVersion")
+ private String mcpServerVersion = "1.0.0";
+
+ @JsonProperty("enabled")
+ private boolean enabled = true;
+
+ @JsonProperty("path")
+ private String path = "/api/v1/mcp";
+}
diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/mcp/HttpServletSseServerTransportProvider.java b/openmetadata-service/src/main/java/org/openmetadata/service/mcp/HttpServletSseServerTransportProvider.java
new file mode 100644
index 00000000000..5c83c1a8720
--- /dev/null
+++ b/openmetadata-service/src/main/java/org/openmetadata/service/mcp/HttpServletSseServerTransportProvider.java
@@ -0,0 +1,282 @@
+/*
+This class should be removed once we migrate to Jakarta.
+*/
+
+package org.openmetadata.service.mcp;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.modelcontextprotocol.spec.McpError;
+import io.modelcontextprotocol.spec.McpSchema;
+import io.modelcontextprotocol.spec.McpServerSession;
+import io.modelcontextprotocol.spec.McpServerTransport;
+import io.modelcontextprotocol.spec.McpServerTransportProvider;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.servlet.AsyncContext;
+import javax.servlet.ServletException;
+import javax.servlet.annotation.WebServlet;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import org.openmetadata.service.util.JsonUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+@WebServlet(asyncSupported = true)
+public class HttpServletSseServerTransportProvider extends HttpServlet
+ implements McpServerTransportProvider {
+ private static final Logger logger =
+ LoggerFactory.getLogger(HttpServletSseServerTransportProvider.class);
+ public static final String UTF_8 = "UTF-8";
+ public static final String APPLICATION_JSON = "application/json";
+ public static final String FAILED_TO_SEND_ERROR_RESPONSE = "Failed to send error response: {}";
+ public static final String DEFAULT_SSE_ENDPOINT = "/sse";
+ public static final String MESSAGE_EVENT_TYPE = "message";
+ public static final String ENDPOINT_EVENT_TYPE = "endpoint";
+ private final ObjectMapper objectMapper;
+ private final String messageEndpoint;
+ private final String sseEndpoint;
+ private final Map sessions;
+ private final AtomicBoolean isClosing;
+ private McpServerSession.Factory sessionFactory;
+
+ public HttpServletSseServerTransportProvider(String messageEndpoint, String sseEndpoint) {
+ this.sessions = new ConcurrentHashMap<>();
+ this.isClosing = new AtomicBoolean(false);
+ this.objectMapper = JsonUtils.getObjectMapper();
+ this.messageEndpoint = messageEndpoint;
+ this.sseEndpoint = sseEndpoint;
+ }
+
+ public HttpServletSseServerTransportProvider(String messageEndpoint) {
+ this(messageEndpoint, "/sse");
+ }
+
+ public void setSessionFactory(McpServerSession.Factory sessionFactory) {
+ this.sessionFactory = sessionFactory;
+ }
+
+ public Mono notifyClients(String method, Map params) {
+ if (this.sessions.isEmpty()) {
+ logger.debug("No active sessions to broadcast message to");
+ return Mono.empty();
+ } else {
+ logger.debug("Attempting to broadcast message to {} active sessions", this.sessions.size());
+ return Flux.fromIterable(this.sessions.values())
+ .flatMap(
+ (session) ->
+ session
+ .sendNotification(method, params)
+ .doOnError(
+ (e) ->
+ logger.error(
+ "Failed to send message to session {}: {}",
+ session.getId(),
+ e.getMessage()))
+ .onErrorComplete())
+ .then();
+ }
+ }
+
+ protected void doGet(HttpServletRequest request, HttpServletResponse response)
+ throws ServletException, IOException {
+ handleSseEvent(request, response);
+ }
+
+ private void handleSseEvent(HttpServletRequest request, HttpServletResponse response)
+ throws ServletException, IOException {
+ String pathInfo = request.getPathInfo();
+ if (!this.sseEndpoint.contains(pathInfo)) {
+ response.sendError(404);
+ } else if (this.isClosing.get()) {
+ response.sendError(503, "Server is shutting down");
+ } else {
+ response.setContentType("text/event-stream");
+ response.setCharacterEncoding("UTF-8");
+ response.setHeader("Cache-Control", "no-cache");
+ response.setHeader("Connection", "keep-alive");
+ response.setHeader("Access-Control-Allow-Origin", "*");
+ String sessionId = UUID.randomUUID().toString();
+ AsyncContext asyncContext = request.startAsync();
+ asyncContext.setTimeout(0L);
+ PrintWriter writer = response.getWriter();
+ HttpServletMcpSessionTransport sessionTransport =
+ new HttpServletMcpSessionTransport(sessionId, asyncContext, writer);
+ McpServerSession session = this.sessionFactory.create(sessionTransport);
+ this.sessions.put(sessionId, session);
+ this.sendEvent(writer, "endpoint", this.messageEndpoint + "?sessionId=" + sessionId);
+ }
+ }
+
+ protected void doPost(HttpServletRequest request, HttpServletResponse response)
+ throws ServletException, IOException {
+ if (this.isClosing.get()) {
+ response.sendError(503, "Server is shutting down");
+ } else {
+ String requestURI = request.getRequestURI();
+ if (!requestURI.endsWith(this.messageEndpoint)) {
+ response.sendError(404);
+ } else {
+ String sessionId = request.getParameter("sessionId");
+ if (sessionId == null) {
+ response.setContentType("application/json");
+ response.setCharacterEncoding("UTF-8");
+ response.setStatus(400);
+ String jsonError =
+ this.objectMapper.writeValueAsString(
+ new McpError("Session ID missing in message endpoint"));
+ PrintWriter writer = response.getWriter();
+ writer.write(jsonError);
+ writer.flush();
+ } else {
+ McpServerSession session = (McpServerSession) this.sessions.get(sessionId);
+ if (session == null) {
+ response.setContentType("application/json");
+ response.setCharacterEncoding("UTF-8");
+ response.setStatus(404);
+ String jsonError =
+ this.objectMapper.writeValueAsString(
+ new McpError("Session not found: " + sessionId));
+ PrintWriter writer = response.getWriter();
+ writer.write(jsonError);
+ writer.flush();
+ } else {
+ try {
+ BufferedReader reader = request.getReader();
+ StringBuilder body = new StringBuilder();
+
+ String line;
+ while ((line = reader.readLine()) != null) {
+ body.append(line);
+ }
+
+ McpSchema.JSONRPCMessage message =
+ McpSchema.deserializeJsonRpcMessage(this.objectMapper, body.toString());
+ session.handle(message).block();
+ response.setStatus(200);
+ } catch (Exception var11) {
+ Exception e = var11;
+ logger.error("Error processing message: {}", var11.getMessage());
+
+ try {
+ McpError mcpError = new McpError(e.getMessage());
+ response.setContentType("application/json");
+ response.setCharacterEncoding("UTF-8");
+ response.setStatus(500);
+ String jsonError = this.objectMapper.writeValueAsString(mcpError);
+ PrintWriter writer = response.getWriter();
+ writer.write(jsonError);
+ writer.flush();
+ } catch (IOException ex) {
+ logger.error("Failed to send error response: {}", ex.getMessage());
+ response.sendError(500, "Error processing message");
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+
+ public Mono closeGracefully() {
+ this.isClosing.set(true);
+ logger.debug("Initiating graceful shutdown with {} active sessions", this.sessions.size());
+ return Flux.fromIterable(this.sessions.values())
+ .flatMap(McpServerSession::closeGracefully)
+ .then();
+ }
+
+ private void sendEvent(PrintWriter writer, String eventType, String data) throws IOException {
+ writer.write("event: " + eventType + "\n");
+ writer.write("data: " + data + "\n\n");
+ writer.flush();
+ if (writer.checkError()) {
+ throw new IOException("Client disconnected");
+ }
+ }
+
+ public void destroy() {
+ this.closeGracefully().block();
+ super.destroy();
+ }
+
+ private class HttpServletMcpSessionTransport implements McpServerTransport {
+ private final String sessionId;
+ private final AsyncContext asyncContext;
+ private final PrintWriter writer;
+
+ HttpServletMcpSessionTransport(
+ String sessionId, AsyncContext asyncContext, PrintWriter writer) {
+ this.sessionId = sessionId;
+ this.asyncContext = asyncContext;
+ this.writer = writer;
+ HttpServletSseServerTransportProvider.logger.debug(
+ "Session transport {} initialized with SSE writer", sessionId);
+ }
+
+ public Mono sendMessage(McpSchema.JSONRPCMessage message) {
+ return Mono.fromRunnable(
+ () -> {
+ try {
+ String jsonText =
+ HttpServletSseServerTransportProvider.this.objectMapper.writeValueAsString(
+ message);
+ HttpServletSseServerTransportProvider.this.sendEvent(
+ this.writer, "message", jsonText);
+ HttpServletSseServerTransportProvider.logger.debug(
+ "Message sent to session {}", this.sessionId);
+ } catch (Exception e) {
+ HttpServletSseServerTransportProvider.logger.error(
+ "Failed to send message to session {}: {}", this.sessionId, e.getMessage());
+ HttpServletSseServerTransportProvider.this.sessions.remove(this.sessionId);
+ this.asyncContext.complete();
+ }
+ });
+ }
+
+ public T unmarshalFrom(Object data, TypeReference typeRef) {
+ return (T)
+ HttpServletSseServerTransportProvider.this.objectMapper.convertValue(data, typeRef);
+ }
+
+ public Mono closeGracefully() {
+ return Mono.fromRunnable(
+ () -> {
+ HttpServletSseServerTransportProvider.logger.debug(
+ "Closing session transport: {}", this.sessionId);
+
+ try {
+ HttpServletSseServerTransportProvider.this.sessions.remove(this.sessionId);
+ this.asyncContext.complete();
+ HttpServletSseServerTransportProvider.logger.debug(
+ "Successfully completed async context for session {}", this.sessionId);
+ } catch (Exception e) {
+ HttpServletSseServerTransportProvider.logger.warn(
+ "Failed to complete async context for session {}: {}",
+ this.sessionId,
+ e.getMessage());
+ }
+ });
+ }
+
+ public void close() {
+ try {
+ HttpServletSseServerTransportProvider.this.sessions.remove(this.sessionId);
+ this.asyncContext.complete();
+ HttpServletSseServerTransportProvider.logger.debug(
+ "Successfully completed async context for session {}", this.sessionId);
+ } catch (Exception e) {
+ HttpServletSseServerTransportProvider.logger.warn(
+ "Failed to complete async context for session {}: {}", this.sessionId, e.getMessage());
+ }
+ }
+ }
+}
diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/mcp/McpAuthFilter.java b/openmetadata-service/src/main/java/org/openmetadata/service/mcp/McpAuthFilter.java
new file mode 100644
index 00000000000..dca33d830e7
--- /dev/null
+++ b/openmetadata-service/src/main/java/org/openmetadata/service/mcp/McpAuthFilter.java
@@ -0,0 +1,32 @@
+package org.openmetadata.service.mcp;
+
+import static org.openmetadata.service.socket.SocketAddressFilter.validatePrefixedTokenRequest;
+
+import java.io.IOException;
+import javax.servlet.Filter;
+import javax.servlet.FilterChain;
+import javax.servlet.ServletException;
+import javax.servlet.ServletRequest;
+import javax.servlet.ServletResponse;
+import javax.servlet.http.HttpServletRequest;
+import org.openmetadata.service.security.JwtFilter;
+
+public class McpAuthFilter implements Filter {
+ private final JwtFilter jwtFilter;
+
+ public McpAuthFilter(JwtFilter filter) {
+ this.jwtFilter = filter;
+ }
+
+ @Override
+ public void doFilter(
+ ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain)
+ throws IOException, ServletException {
+ HttpServletRequest httpServletRequest = (HttpServletRequest) servletRequest;
+ String tokenWithType = httpServletRequest.getHeader("Authorization");
+ validatePrefixedTokenRequest(jwtFilter, tokenWithType);
+
+ // Continue with the filter chain
+ filterChain.doFilter(servletRequest, servletResponse);
+ }
+}
diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/mcp/McpServer.java b/openmetadata-service/src/main/java/org/openmetadata/service/mcp/McpServer.java
new file mode 100644
index 00000000000..40a588314b0
--- /dev/null
+++ b/openmetadata-service/src/main/java/org/openmetadata/service/mcp/McpServer.java
@@ -0,0 +1,168 @@
+package org.openmetadata.service.mcp;
+
+import static org.openmetadata.service.search.SearchUtil.searchMetadata;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import io.dropwizard.jetty.MutableServletContextHandler;
+import io.dropwizard.setup.Environment;
+import io.modelcontextprotocol.server.McpServerFeatures;
+import io.modelcontextprotocol.server.McpSyncServer;
+import io.modelcontextprotocol.spec.McpSchema;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import javax.servlet.DispatcherType;
+import lombok.extern.slf4j.Slf4j;
+import org.eclipse.jetty.servlet.FilterHolder;
+import org.eclipse.jetty.servlet.ServletHolder;
+import org.openmetadata.common.utils.CommonUtil;
+import org.openmetadata.service.OpenMetadataApplicationConfig;
+import org.openmetadata.service.mcp.tools.CreateGlossaryTerm;
+import org.openmetadata.service.mcp.tools.PatchEntity;
+import org.openmetadata.service.security.JwtFilter;
+import org.openmetadata.service.util.EntityUtil;
+import org.openmetadata.service.util.JsonUtils;
+
+@Slf4j
+public class McpServer {
+ public McpServer() {}
+
+ public void initializeMcpServer(Environment environment, OpenMetadataApplicationConfig config) {
+ McpSchema.ServerCapabilities serverCapabilities =
+ McpSchema.ServerCapabilities.builder()
+ .tools(true)
+ .prompts(true)
+ .resources(true, true)
+ .build();
+
+ HttpServletSseServerTransportProvider transport =
+ new HttpServletSseServerTransportProvider("/mcp/messages", "/mcp/sse");
+ McpSyncServer server =
+ io.modelcontextprotocol.server.McpServer.sync(transport)
+ .serverInfo("openmetadata-mcp", "0.1.0")
+ .capabilities(serverCapabilities)
+ .build();
+
+ // Add resources, prompts, and tools to the MCP server
+ addTools(server);
+
+ MutableServletContextHandler contextHandler = environment.getApplicationContext();
+ ServletHolder servletHolder = new ServletHolder(transport);
+ contextHandler.addServlet(servletHolder, "/mcp/*");
+
+ McpAuthFilter authFilter =
+ new McpAuthFilter(
+ new JwtFilter(
+ config.getAuthenticationConfiguration(), config.getAuthorizerConfiguration()));
+ contextHandler.addFilter(
+ new FilterHolder(authFilter), "/mcp/*", EnumSet.of(DispatcherType.REQUEST));
+ }
+
+ public void addTools(McpSyncServer server) {
+ try {
+ LOG.info("Loading tool definitions...");
+ List
+
+
+ io.modelcontextprotocol.sdk
+ mcp-bom
+ ${mcp-sdk.version}
+ pom
+
+
+ io.modelcontextprotocol.sdk
+ mcp
+ ${mcp-sdk.version}
+
org.eclipse.jetty
jetty-server