-
Notifications
You must be signed in to change notification settings - Fork 11
/
Copy pathserver.go
148 lines (122 loc) · 3.76 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package stratum
import (
"bytes"
"crypto/tls"
"encoding/binary"
"net"
"strconv"
"time"
logging "github.com/ipfs/go-log/v2"
"github.com/minernl/go-miningcore-pool/bans"
"github.com/minernl/go-miningcore-pool/config"
"github.com/minernl/go-miningcore-pool/daemons"
"github.com/minernl/go-miningcore-pool/jobs"
"github.com/minernl/go-miningcore-pool/vardiff"
)
var log = logging.Logger("stratum")
type Server struct {
Options *config.Options
Listener net.Listener
DaemonManager *daemons.DaemonManager
VarDiff *vardiff.VarDiff
JobManager *jobs.JobManager
StratumClients map[uint64]*Client
SubscriptionCounter *SubscriptionCounter
BanningManager *bans.BanningManager
rebroadcastTicker *time.Ticker
}
func NewStratumServer(options *config.Options, jm *jobs.JobManager, bm *bans.BanningManager) *Server {
return &Server{
Options: options,
BanningManager: bm,
SubscriptionCounter: NewSubscriptionCounter(),
JobManager: jm,
StratumClients: make(map[uint64]*Client),
}
}
func (ss *Server) Init() (portStarted []int) {
if ss.Options.Banning != nil {
ss.BanningManager.Init()
}
for port, options := range ss.Options.Ports {
var err error
if options.TLS != nil {
ss.Listener, err = tls.Listen("tcp", ":"+strconv.Itoa(port), options.TLS.ToTLSConfig())
} else {
ss.Listener, err = net.Listen("tcp", ":"+strconv.Itoa(port))
}
if err != nil {
log.Error(err)
continue
}
portStarted = append(portStarted, port)
//if len(portStarted) == len(ss.Options.Ports) {
// // emit started
//}
}
if len(portStarted) == 0 {
log.Panic("No port listened")
}
go func() {
var id string
var txs []byte
ss.rebroadcastTicker = time.NewTicker(time.Duration(ss.Options.JobRebroadcastTimeout) * time.Second)
defer log.Warn("broadcaster stopped")
defer ss.rebroadcastTicker.Stop()
for {
<-ss.rebroadcastTicker.C
go ss.BroadcastCurrentMiningJob(ss.JobManager.CurrentJob.GetJobParams(
id != ss.JobManager.CurrentJob.JobId || !bytes.Equal(txs, ss.JobManager.CurrentJob.TransactionData),
))
id = ss.JobManager.CurrentJob.JobId
txs = ss.JobManager.CurrentJob.TransactionData
}
}()
go func() {
for {
conn, err := ss.Listener.Accept()
if err != nil {
log.Error(err)
continue
}
if conn != nil {
log.Info("new conn from ", conn.RemoteAddr().String())
go ss.HandleNewClient(conn)
}
}
}()
return portStarted
}
// HandleNewClient converts the conn to an underlying client instance and finally return its unique subscriptionID
func (ss *Server) HandleNewClient(socket net.Conn) []byte {
subscriptionID := ss.SubscriptionCounter.Next()
client := NewStratumClient(subscriptionID, socket, ss.Options, ss.JobManager, ss.BanningManager)
ss.StratumClients[binary.LittleEndian.Uint64(subscriptionID)] = client
// client.connected
go func() {
for {
<-client.SocketClosedEvent
log.Warn("a client socket closed")
ss.RemoveStratumClientBySubscriptionId(subscriptionID)
// client.disconnected
}
}()
client.Init()
return subscriptionID
}
func (ss *Server) BroadcastCurrentMiningJob(jobParams []interface{}) {
log.Info("broadcasting job params")
for clientId := range ss.StratumClients {
ss.StratumClients[clientId].SendMiningJob(jobParams)
}
}
func (ss *Server) RemoveStratumClientBySubscriptionId(subscriptionId []byte) {
delete(ss.StratumClients, binary.LittleEndian.Uint64(subscriptionId))
}
func (ss *Server) ManuallyAddStratumClient(client *Client) {
subscriptionId := ss.HandleNewClient(client.Socket)
if subscriptionId != nil {
ss.StratumClients[binary.LittleEndian.Uint64(subscriptionId)].ManuallyAuthClient(client.WorkerName, client.WorkerPass)
ss.StratumClients[binary.LittleEndian.Uint64(subscriptionId)].ManuallySetValues(client)
}
}