-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathserver.go
122 lines (100 loc) · 2.85 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
package faye
import (
"encoding/json"
"github.com/roncohen/faye-go/protocol"
"github.com/roncohen/faye-go/utils"
"io"
)
type Server struct {
engine Engine
logger utils.Logger
}
func NewServer(logger utils.Logger, engine Engine) Server {
return Server{engine, logger}
}
func (s Server) Logger() utils.Logger {
return s.logger
}
func (s Server) Engine() Engine {
return s.engine
}
func (s Server) GetClient(request protocol.Message, conn protocol.Connection) *protocol.Client {
clientId := request.ClientId()
client := s.engine.GetClient(clientId)
if client == nil {
s.Logger().Warnf("Message %v from unknown client %v", request.Channel(), clientId)
response := request
response["successful"] = false
response["advice"] = map[string]interface{}{"reconnect": "handshake", "interval": "1000"}
conn.Send([]protocol.Message{response})
conn.Close()
}
return client
}
func (s Server) HandleRequest(msges interface{}, conn protocol.Connection) {
switch msges.(type) {
case []interface{}:
msg_list := msges.([]interface{})
for _, msg := range msg_list {
s.HandleMessage(msg.(map[string]interface{}), conn)
}
case map[string]interface{}:
s.HandleMessage(msges.(map[string]interface{}), conn)
}
}
func (s Server) HandleMessage(msg protocol.Message, conn protocol.Connection) {
channel := msg.Channel()
if channel.IsMeta() {
s.HandleMeta(msg, conn)
// } else if channel.IsService() {
// s.HandleService(msg)
} else {
s.HandlePublish(msg)
}
}
// Client publishing to a service channel
// func (s Server) HandleService(msg protocol.Message) protocol.Message {
// return nil
// }
func (s Server) HandleMeta(msg protocol.Message, conn protocol.Connection) protocol.Message {
meta_channel := msg.Channel().MetaType()
if meta_channel == protocol.META_HANDSHAKE_CHANNEL {
s.engine.Handshake(msg, conn)
} else {
client := s.GetClient(msg, conn)
if client != nil {
client.SetConnection(conn)
switch meta_channel {
case protocol.META_HANDSHAKE_CHANNEL:
s.engine.Handshake(msg, conn)
case protocol.META_CONNECT_CHANNEL:
s.engine.Connect(msg, client, conn)
case protocol.META_DISCONNECT_CHANNEL:
s.engine.Disconnect(msg, client, conn)
case protocol.META_SUBSCRIBE_CHANNEL:
s.engine.SubscribeClient(msg, client)
case protocol.META_UNKNOWN_CHANNEL:
s.Logger().Panicf("Message with unknown meta channel received")
}
}
}
return nil
}
func (s Server) HandlePublish(msg protocol.Message) {
// Publish
// clientId := msg.ClientId()
// if _client, isConnected := s.engine.GetClient(clientId); !isConnected {
// // TODO: Howto answer if not connected.
// return nil
// }
// log.Printf("Client %s publishing to %s", clientId, msg.Channel())
s.engine.Publish(msg)
}
func JSONWrite(w io.Writer, obj interface{}) error {
msg, err := json.Marshal(obj)
if err != nil {
return err
}
w.Write(msg)
return nil
}