-
Notifications
You must be signed in to change notification settings - Fork 20
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
✨ #172 Add Anthropic integration for chat streaming #182
base: develop
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,16 +1,85 @@ | ||
package anthropic | ||
|
||
import ( | ||
"context" | ||
"context" | ||
"encoding/json" | ||
"fmt" | ||
"io" | ||
"net/http" | ||
|
||
"glide/pkg/api/schemas" | ||
"glide/pkg/providers/clients" | ||
"glide/pkg/api/schemas" | ||
"glide/pkg/providers/clients" | ||
"glide/pkg/telemetry" | ||
"go.uber.org/zap" | ||
) | ||
|
||
func (c *Client) SupportChatStream() bool { | ||
return false | ||
return true | ||
} | ||
|
||
func (c *Client) ChatStream(_ context.Context, _ *schemas.ChatRequest) (clients.ChatStream, error) { | ||
return nil, clients.ErrChatStreamNotImplemented | ||
type AnthropicChatStream struct { | ||
tel *telemetry.Telemetry | ||
client *http.client | ||
request *http.Request | ||
response *http.Response | ||
errMapper *ErrorMapper | ||
} | ||
|
||
func NewAnthropicChatStream(tel *telemetry.Telemetry, *http.client, request *http.Request, errMapper *ErrorMapper) *AnthropicChatStream { | ||
return &AnthropicChatStream{ | ||
tel: tel, | ||
client: client, | ||
request: request, | ||
errMapper: errMapper, | ||
} | ||
} | ||
|
||
// Open makes the HTTP request using the provided http.Client to initiate the chat stream. | ||
func (s *AnthropicChatStream) Open(ctx context.Context) error { | ||
resp, err := s.client.Do(s.request) | ||
if err != nil { | ||
s.tel.L().Error("Failed to open chat stream", zap.Error(err)) | ||
// Map and return the error using errMapper, if errMapper is defined. | ||
return s.errMapper.Map(err) | ||
} | ||
|
||
if resp.StatusCode != http.StatusOK { | ||
resp.Body.Close() | ||
s.tel.L().Warn("Unexpected status code", zap.Int("status", resp.StatusCode)) | ||
return fmt.Errorf("unexpected status code: %d", resp.StatusCode) | ||
} | ||
|
||
s.response = resp | ||
s.tel.L().Info("Chat stream opened successfully") | ||
return nil | ||
} | ||
|
||
// Recv listens for and decodes incoming messages from the chat stream into ChatStreamChunk objects. | ||
func (s *AnthropicChatStream) Recv() (*schemas.ChatStreamChunk, error) { | ||
if s.response == nil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does Anthropic uses server-side events (SSE) for chat streaming? If so, you need to use a special parser to read that stream just like OpenAI: https://github.com/EinStack/glide/blob/develop/pkg/providers/openai/chat_stream.go#L75-L95 SSE has a special format that has to be parsed before you can even unmarshal the real chunk from JSON into an Anthropic chat stream struct. |
||
s.tel.L().Error("Attempted to receive from an unopened stream") | ||
return nil, fmt.Errorf("stream not opened") | ||
} | ||
|
||
decoder := json.NewDecoder(s.response.Body) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We are using just |
||
var chunk schemas.ChatStreamChunk | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
This is how it's done in OpenAI case: |
||
if err := decoder.Decode(&chunk); err != nil { | ||
if err == io.EOF { | ||
s.tel.L().Info("Chat stream ended") | ||
return nil, nil // Stream ended normally. | ||
} | ||
s.tel.L().Error("Error during stream processing", zap.Error(err)) | ||
return nil, err // An error occurred during stream processing. | ||
} | ||
|
||
return &chunk, nil | ||
} | ||
|
||
// Close ensures the chat stream is properly terminated by closing the response body. | ||
func (s *AnthropicChatStream) Close() error { | ||
if s.response != nil { | ||
s.tel.L().Info("Closing chat stream") | ||
return s.response.Body.Close() | ||
} | ||
return nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There has to be
ChatStream()
method that would create an instance of AnthropicChatStream in this case.For example, in OpenAI case, it looked this way:
https://github.com/EinStack/glide/blob/develop/pkg/providers/openai/chat_stream.go#L162-L177
Without that there is nothing that would use the AnthropicChatStream struct