-
Notifications
You must be signed in to change notification settings - Fork 99
Add MCP Streamable HTTP specification support for the client #210
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
09637b6
4765528
bd55244
03a76cd
3b99748
137929c
b91a9db
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,49 +1,29 @@ | ||
| # frozen_string_literal: true | ||
|
|
||
| require "mcp" | ||
| require "mcp/client" | ||
| require "mcp/client/http" | ||
| require "mcp/client/tool" | ||
| require "net/http" | ||
| require "uri" | ||
| require "json" | ||
| require "logger" | ||
| require "event_stream_parser" | ||
|
|
||
| # Logger for client operations | ||
| logger = Logger.new($stdout) | ||
| logger.formatter = proc do |severity, datetime, _progname, msg| | ||
| "[CLIENT] #{severity} #{datetime.strftime("%H:%M:%S.%L")} - #{msg}\n" | ||
| end | ||
| SERVER_URL = "http://localhost:9393" | ||
|
|
||
| # Server configuration | ||
| SERVER_URL = "http://localhost:9393/mcp" | ||
| PROTOCOL_VERSION = "2024-11-05" | ||
|
|
||
| # Helper method to make JSON-RPC requests | ||
| def make_request(session_id, method, params = {}, id = nil) | ||
| uri = URI(SERVER_URL) | ||
| http = Net::HTTP.new(uri.host, uri.port) | ||
|
|
||
| request = Net::HTTP::Post.new(uri) | ||
| request["Content-Type"] = "application/json" | ||
| request["Mcp-Session-Id"] = session_id if session_id | ||
|
|
||
| body = { | ||
| jsonrpc: "2.0", | ||
| method: method, | ||
| params: params, | ||
| id: id || SecureRandom.uuid, | ||
| } | ||
|
|
||
| request.body = body.to_json | ||
| response = http.request(request) | ||
|
|
||
| { | ||
| status: response.code, | ||
| headers: response.to_hash, | ||
| body: JSON.parse(response.body), | ||
| } | ||
| rescue => e | ||
| { error: e.message } | ||
| # Logger for client operations | ||
| def create_logger | ||
| logger = Logger.new($stdout) | ||
| logger.formatter = proc do |severity, datetime, _progname, msg| | ||
| "[CLIENT] #{severity} #{datetime.strftime("%H:%M:%S.%L")} - #{msg}\n" | ||
| end | ||
| logger | ||
| end | ||
|
|
||
| # Connect to SSE stream | ||
| # Connect to SSE stream for real-time notifications | ||
| # The SDK doesn't support HTTP GET for SSE streaming yet, so we use raw Net::HTTP | ||
| # See: https://modelcontextprotocol.io/specification/2025-11-25/basic/transports#listening-for-messages-from-the-server | ||
| def connect_sse(session_id, logger) | ||
| uri = URI(SERVER_URL) | ||
|
|
||
|
|
@@ -59,17 +39,13 @@ def connect_sse(session_id, logger) | |
| if response.code == "200" | ||
| logger.info("SSE stream connected successfully") | ||
|
|
||
| parser = EventStreamParser::Parser.new | ||
| response.read_body do |chunk| | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's use https://rubygems.org/gems/event_stream_parser for parsing. SSE appears easy to parse but it has weird edge cases. Thanks for kicking off the effort! I've been planning on porting this standalone client I had written for a project, but never got around to doing it: https://gist.github.com/atesgoral/75172912b5951d9be33497b80aba4397 You can see how |
||
| chunk.split("\n").each do |line| | ||
| if line.start_with?("data: ") | ||
| data = line[6..-1] | ||
| begin | ||
| logger.info("SSE data: #{data}") | ||
| rescue JSON::ParserError | ||
| logger.debug("Non-JSON SSE data: #{data}") | ||
| end | ||
| elsif line.start_with?(": ") | ||
| logger.debug("SSE keepalive received: #{line}") | ||
| parser.feed(chunk) do |type, data, _id| | ||
| if type.empty? | ||
| logger.info("SSE event: #{data}") | ||
| else | ||
| logger.info("SSE event (#{type}): #{data}") | ||
| end | ||
| end | ||
| end | ||
|
|
@@ -79,125 +55,128 @@ def connect_sse(session_id, logger) | |
| end | ||
| end | ||
| rescue Interrupt | ||
| logger.info("SSE connection interrupted by user") | ||
| logger.info("SSE connection interrupted") | ||
| rescue => e | ||
| logger.error("SSE connection error: #{e.message}") | ||
| end | ||
|
|
||
| # Main client flow | ||
| def main | ||
| logger = Logger.new($stdout) | ||
| logger.formatter = proc do |severity, datetime, _progname, msg| | ||
| "[CLIENT] #{severity} #{datetime.strftime("%H:%M:%S.%L")} - #{msg}\n" | ||
| end | ||
|
|
||
| puts "=== MCP SSE Test Client ===" | ||
|
|
||
| # Step 1: Initialize session | ||
| logger.info("Initializing session...") | ||
|
|
||
| init_response = make_request( | ||
| nil, | ||
| "initialize", | ||
| { | ||
| protocolVersion: PROTOCOL_VERSION, | ||
| capabilities: {}, | ||
| clientInfo: { | ||
| name: "sse-test-client", | ||
| version: "1.0", | ||
| }, | ||
| }, | ||
| "init-1", | ||
| ) | ||
|
|
||
| if init_response[:error] | ||
| logger.error("Failed to initialize: #{init_response[:error]}") | ||
| exit(1) | ||
| end | ||
|
|
||
| session_id = init_response[:headers]["mcp-session-id"]&.first | ||
|
|
||
| if session_id.nil? | ||
| logger.error("No session ID received") | ||
| exit(1) | ||
| end | ||
|
|
||
| logger.info("Session initialized: #{session_id}") | ||
| logger.info("Server info: #{init_response[:body]["result"]["serverInfo"]}") | ||
|
|
||
| # Step 2: Start SSE connection in a separate thread | ||
| sse_thread = Thread.new { connect_sse(session_id, logger) } | ||
|
|
||
| # Give SSE time to connect | ||
| sleep(1) | ||
|
|
||
| # Step 3: Interactive menu | ||
| loop do | ||
| puts <<~MESSAGE.chomp | ||
|
|
||
| === Available Actions === | ||
| 1. Send custom notification | ||
| 2. Test echo | ||
| 3. List tools | ||
| 0. Exit | ||
|
|
||
| Choose an action:#{" "} | ||
| logger = create_logger | ||
|
|
||
| puts <<~MESSAGE | ||
| MCP Streamable HTTP Client | ||
| Make sure the server is running (ruby examples/streamable_http_server.rb) | ||
| #{"=" * 60} | ||
| MESSAGE | ||
|
|
||
| # Initialize SDK client | ||
| transport = MCP::Client::HTTP.new(url: SERVER_URL) | ||
| client = MCP::Client.new(transport: transport) | ||
|
|
||
| begin | ||
| # Initialize session using SDK | ||
| puts "=== Initializing session ===" | ||
| init_response = client.connect( | ||
| client_info: { name: "streamable-http-client", version: "1.0" }, | ||
| ) | ||
| puts <<~MESSAGE | ||
| ID: #{client.session_id} | ||
| Version: #{client.protocol_version} | ||
| Server: #{init_response.dig("result", "serverInfo")} | ||
| MESSAGE | ||
|
|
||
| choice = gets.chomp | ||
|
|
||
| case choice | ||
| when "1" | ||
| print("Enter notification message: ") | ||
| message = gets.chomp | ||
| print("Enter delay in seconds (0 for immediate): ") | ||
| delay = gets.chomp.to_f | ||
|
|
||
| response = make_request( | ||
| session_id, | ||
| "tools/call", | ||
| { | ||
| name: "notification_tool", | ||
| arguments: { | ||
| message: message, | ||
| delay: delay, | ||
| }, | ||
| }, | ||
| ) | ||
| if response[:body]["accepted"] | ||
| logger.info("Notification sent successfully") | ||
| # Get available tools BEFORE establishing SSE connection | ||
| # (Once SSE is active, server sends responses via SSE stream, not POST response) | ||
| puts "=== Listing tools ===" | ||
| tools = client.tools | ||
| tools.each { |t| puts " - #{t.name}: #{t.description}" } | ||
|
|
||
| echo_tool = tools.find { |t| t.name == "echo" } | ||
| notification_tool = tools.find { |t| t.name == "notification_tool" } | ||
|
|
||
| # Start SSE connection in a separate thread (uses raw HTTP) | ||
| # Note: After this, server responses will be sent via SSE, not POST | ||
| sse_thread = Thread.new { connect_sse(client.session_id, logger) } | ||
|
|
||
| # Give SSE time to connect | ||
| sleep(1) | ||
|
|
||
| # Interactive menu | ||
| loop do | ||
| puts <<~MENU.chomp | ||
|
|
||
| === Available Actions === | ||
| 1. Send notification (triggers SSE event) | ||
| 2. Echo message | ||
| 3. List tools | ||
| 0. Exit | ||
|
|
||
| Choose an action:#{" "} | ||
| MENU | ||
|
|
||
| choice = gets.chomp | ||
|
|
||
| case choice | ||
| when "1" | ||
| if notification_tool | ||
| print("Enter notification message: ") | ||
| message = gets.chomp | ||
| print("Enter delay in seconds (0 for immediate): ") | ||
| delay = gets.chomp.to_f | ||
|
|
||
| puts "=== Calling tool: notification_tool ===" | ||
| response = client.call_tool( | ||
| tool: notification_tool, | ||
| arguments: { message: message, delay: delay }, | ||
| ) | ||
| puts "Response: #{JSON.pretty_generate(response)}" | ||
| else | ||
| puts "notification_tool not available" | ||
| end | ||
| when "2" | ||
| if echo_tool | ||
| print("Enter message to echo: ") | ||
| message = gets.chomp | ||
|
|
||
| puts "=== Calling tool: echo ===" | ||
| response = client.call_tool(tool: echo_tool, arguments: { message: message }) | ||
| puts "Response: #{JSON.pretty_generate(response)}" | ||
| else | ||
| puts "echo tool not available" | ||
| end | ||
| when "3" | ||
| puts "=== Listing tools ===" | ||
| puts "(Note: Response will appear in SSE stream when active)" | ||
| client.tools.each do |tool| | ||
| puts " - #{tool.name}: #{tool.description}" | ||
| end | ||
| when "0" | ||
| logger.info("Exiting...") | ||
| break | ||
| else | ||
| logger.error("Error: #{response[:body]["error"]}") | ||
| puts "Invalid choice" | ||
| end | ||
| when "2" | ||
| print("Enter message to echo: ") | ||
| message = gets.chomp | ||
| make_request(session_id, "tools/call", { name: "echo", arguments: { message: message } }) | ||
| when "3" | ||
| make_request(session_id, "tools/list") | ||
| when "0" | ||
| logger.info("Exiting...") | ||
| break | ||
| else | ||
| puts "Invalid choice" | ||
| end | ||
| rescue MCP::Client::SessionExpiredError => e | ||
| logger.error("Session expired: #{e.message}") | ||
| rescue MCP::Client::RequestHandlerError => e | ||
| logger.error("Request error: #{e.message}") | ||
| rescue Interrupt | ||
| logger.info("Client interrupted") | ||
| rescue => e | ||
| logger.error("Error: #{e.message}") | ||
| logger.error(e.backtrace.first(5).join("\n")) | ||
| ensure | ||
| # Clean up SSE thread | ||
| sse_thread.kill if sse_thread&.alive? | ||
|
|
||
| # Close session using SDK | ||
| puts "=== Closing session ===" | ||
| client.close | ||
| puts "Session closed" | ||
| end | ||
|
|
||
| # Clean up | ||
| sse_thread.kill if sse_thread.alive? | ||
|
|
||
| # Close session | ||
| logger.info("Closing session...") | ||
| make_request(session_id, "close") | ||
| logger.info("Session closed") | ||
| rescue Interrupt | ||
| logger.info("Client interrupted by user") | ||
| rescue => e | ||
| logger.error("Client error: #{e.message}") | ||
| logger.error(e.backtrace.join("\n")) | ||
| end | ||
|
|
||
| # Run the client | ||
| if __FILE__ == $PROGRAM_NAME | ||
| main | ||
| end | ||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since
EventStreamParser::Parseris referenced fromlib/mcp/client/http.rb, it appears that the dependency needs to be added withspec.add_dependencyin mcp.gemspec rather than in the Gemfile.