Skip to content

Commit

Permalink
fix(http): server dynamic update route (#3336)
Browse files Browse the repository at this point in the history
Signed-off-by: Jiyong Huang <[email protected]>
  • Loading branch information
ngjaying authored Feb 7, 2025
1 parent b77de3d commit 94f8d70
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 3 deletions.
14 changes: 12 additions & 2 deletions internal/io/http/httpserver/data_server.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022-2023 EMQ Technologies Co., Ltd.
// Copyright 2022-2024 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -38,6 +38,7 @@ type GlobalServerManager struct {
endpoint map[string]string
server *http.Server
router *mux.Router
routes map[string]http.HandlerFunc
upgrader websocket.Upgrader
websocketEndpoint map[string]*websocketEndpointContext
}
Expand Down Expand Up @@ -68,6 +69,7 @@ func InitGlobalServerManager(ip string, port int, tlsConf *conf.TlsConf) {
endpoint: map[string]string{},
server: s,
router: r,
routes: map[string]http.HandlerFunc{},
upgrader: upgrader,
}
go func(m *GlobalServerManager) {
Expand Down Expand Up @@ -111,7 +113,7 @@ func (m *GlobalServerManager) RegisterEndpoint(endpoint string, method string) (
m.endpoint[key] = topic
}
pubsub.CreatePub(topic)
m.router.HandleFunc(endpoint, func(w http.ResponseWriter, r *http.Request) {
m.routes[endpoint] = func(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
data, err := io.ReadAll(r.Body)
if err != nil {
Expand All @@ -121,6 +123,13 @@ func (m *GlobalServerManager) RegisterEndpoint(endpoint string, method string) (
pubsub.ProduceAny(topoContext.Background(), topic, data)
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte("ok"))
}
m.router.HandleFunc(endpoint, func(w http.ResponseWriter, r *http.Request) {
if h, ok := m.routes[endpoint]; ok {
h(w, r)
} else {
w.WriteHeader(http.StatusNotFound)
}
}).Methods(method)
return topic, nil
}
Expand All @@ -135,6 +144,7 @@ func (m *GlobalServerManager) UnregisterEndpoint(endpoint, method string) {
return
}
delete(m.endpoint, key)
delete(m.routes, endpoint)
pubsub.RemovePub(TopicPrefix + key)
}

Expand Down
12 changes: 11 additions & 1 deletion internal/io/http/httpserver/websocket_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,17 +134,26 @@ func (m *GlobalServerManager) RegisterWebSocketEndpoint(ctx api.StreamContext, e
rTopic := recvTopic(endpoint, true)
sTopic := sendTopic(endpoint, true)
pubsub.CreatePub(rTopic)
m.router.HandleFunc(endpoint, func(w http.ResponseWriter, r *http.Request) {
m.routes[endpoint] = func(w http.ResponseWriter, r *http.Request) {
c, err := m.upgrader.Upgrade(w, r, nil)
if err != nil {
conf.Log.Errorf("websocket upgrade error: %v", err)
return
}
fmt.Printf("is context updated?: %p\n", ctx)
subCtx, cancel := ctx.WithCancel()
wg := m.AddEndpointConnection(endpoint, c, cancel)
go m.handleProcess(subCtx, endpoint, m.FetchInstanceID(), c, cancel, wg)
conf.Log.Infof("websocket endpint %v create connection", endpoint)
}
m.router.HandleFunc(endpoint, func(w http.ResponseWriter, r *http.Request) {
if h, ok := m.routes[endpoint]; ok {
h(w, r)
} else {
w.WriteHeader(http.StatusNotFound)
}
})

conf.Log.Infof("websocker endpoint %v registered success", endpoint)
return rTopic, sTopic, nil
}
Expand All @@ -163,6 +172,7 @@ func (m *GlobalServerManager) UnRegisterWebSocketEndpoint(endpoint string) *webs
cancel()
}
delete(m.websocketEndpoint, endpoint)
delete(m.routes, endpoint)
return wctx
}

Expand Down

0 comments on commit 94f8d70

Please sign in to comment.