Skip to content

Feat/add streamable http #126

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged

Conversation

jesse-engineer
Copy link
Collaborator

Description

Please provide a brief description of the changes in this pull request.

Related Issue

Fixes #(issue)

Type of change

Please delete options that are not relevant.

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)
  • Documentation update
  • Performance improvement
  • Code refactoring (no functional changes)

How Has This Been Tested?

Please describe the tests that you ran to verify your changes. Provide instructions so we can reproduce.

Checklist:

  • My code follows the style guidelines of this project
  • I have performed a self-review of my own code
  • I have commented my code, particularly in hard-to-understand areas
  • I have made corresponding changes to the documentation
  • My changes generate no new warnings
  • I have added tests that prove my fix is effective or that my feature works
  • New and existing unit tests pass locally with my changes

@codecov-commenter
Copy link

codecov-commenter commented Apr 22, 2025

@jesse-engineer jesse-engineer force-pushed the feat/add_streamable_http branch from 684fb67 to 4c485fb Compare April 22, 2025 14:29
@jesse-engineer jesse-engineer force-pushed the feat/add_streamable_http branch from 4c485fb to 3d00d39 Compare April 22, 2025 14:35
client/call.go Outdated
@@ -220,7 +220,7 @@ func (client *Client) sendNotification4Initialized(ctx context.Context) error {
// Responsible for request and response assembly
func (client *Client) callServer(ctx context.Context, method protocol.Method, params protocol.ClientRequest) (json.RawMessage, error) {
if !client.ready.Load() && (method != protocol.Initialize && method != protocol.Ping) {
return nil, fmt.Errorf("client not ready")
return nil, fmt.Errorf("callServer: client not ready")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

直接 errors.New 好一些

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes


resp, err := t.client.Do(req)
if err != nil {
t.logger.Errorf("failed to connect to SSE stream: %v", err)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个地方是不是少了 resp.Body.Close()

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个地方不调用close,在下面会close,这里主要是在for循环里,不好使用defer

t.sseInFlyConnect.Add(1)
defer t.sseInFlyConnect.Done()

t.handleSSEStream(resp.Body)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这边是协程,resp.Body是不是在外面关掉了

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

if !server.sessionManager.IsExistSession(sessionID) {
return pkg.ErrLackSession
func (server *Server) receive(ctx context.Context, sessionID string, msg []byte) (<-chan []byte, error) {
if sessionID != "" && !server.sessionManager.IsActiveSession(sessionID) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IsActiveSession 和 IsClosedSession 可以合成一个 IsSessionAvailable,然后在里面返回错误

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里合并后IsSessionAvailable返回err,语义上有点奇怪,这里留下IsActiveSession函数,后面也可能会有用吧。

"github.com/ThinkInAIXYZ/go-mcp/pkg"
)

type Manager struct {
sessions pkg.SyncMap[*State]
activeSessions pkg.SyncMap[*State]
closedSessions pkg.SyncMap[*State]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个closedSessions里面的数据什么时候清空?还是说直接改成简单value为struct,记录就行。如果要保存这个session的话,是不是取出来之后看State的closed状态也行。

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

嗯这里可以把value改为struct

return has
}

func (m *Manager) GetSession(sessionID string) (*State, bool) {
state, has := m.sessions.Load(sessionID)
state, has := m.activeSessions.Load(sessionID)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个前面可以直接加一个 sessionID 为空直接返回 false,不用Load

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

@jesse-engineer jesse-engineer merged commit 098a082 into ThinkInAIXYZ:main Apr 23, 2025
6 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants