From 0cac3a3ff75c2a7e88a3b072045f06bb33e114d7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Flc=E3=82=9B?= Date: Thu, 28 Dec 2023 23:07:10 +0800 Subject: [PATCH 1/6] feat(udp): Added udp server --- .gitignore | 2 +- go.mod | 1 + go.sum | 4 ++ udp/README.md | 33 ++++++++++++++++ udp/server.go | 102 ++++++++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 141 insertions(+), 1 deletion(-) create mode 100644 udp/README.md create mode 100644 udp/server.go diff --git a/.gitignore b/.gitignore index fadb7396..89646676 100644 --- a/.gitignore +++ b/.gitignore @@ -15,4 +15,4 @@ vendor/ .idea _backup -_example \ No newline at end of file +example \ No newline at end of file diff --git a/go.mod b/go.mod index 4098ea72..ab4ec711 100644 --- a/go.mod +++ b/go.mod @@ -21,6 +21,7 @@ require ( github.com/gorilla/mux v1.8.0 // indirect github.com/kr/text v0.2.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + golang.org/x/sync v0.3.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d // indirect google.golang.org/grpc v1.58.3 // indirect google.golang.org/protobuf v1.31.0 // indirect diff --git a/go.sum b/go.sum index 1326e39a..b693246c 100644 --- a/go.sum +++ b/go.sum @@ -38,9 +38,13 @@ github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjR github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM= +golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E= +golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/genproto v0.0.0-20230803162519-f966b187b2e5 h1:L6iMMGrtzgHsWofoFcihmDEMYeDR9KN/ThbPWGrh++g= +google.golang.org/genproto/googleapis/api v0.0.0-20230711160842-782d3b101e98 h1:FmF5cCW94Ij59cfpoLiwTgodWmm60eEV0CjlsVg2fuw= google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d h1:uvYuEyMHKNt+lT4K3bN6fGswmK8qSvcreM3BwjDh+y4= google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d/go.mod h1:+Bk1OCOj40wS2hwAMA+aCW9ypzm63QTBBHp6lQ3p+9M= google.golang.org/grpc v1.58.3 h1:BjnpXut1btbtgN/6sp+brB2Kbm2LjNXnidYujAVbSoQ= diff --git a/udp/README.md b/udp/README.md new file mode 100644 index 00000000..eae70b78 --- /dev/null +++ b/udp/README.md @@ -0,0 +1,33 @@ +# UDP + +## Server + +```go +package main + +import ( + "log" + "net" + + "github.com/go-kratos/kratos/v2" + + "github.com/go-packagist/go-kratos-components/udp" +) + +func main() { + err := kratos.New( + kratos.Server( + udp.NewServer(":12190", udp.WithHandler(func(conn net.PacketConn, buf []byte, addr net.Addr) { + log.Println(string(buf)) + }), udp.WithRecoveryHandler(func(conn net.PacketConn, buf []byte, addr net.Addr, err interface{}) { + log.Println(err) + })), + ), + ).Run() + + if err != nil { + log.Fatal(err) + } +} + +``` \ No newline at end of file diff --git a/udp/server.go b/udp/server.go new file mode 100644 index 00000000..c5467fcd --- /dev/null +++ b/udp/server.go @@ -0,0 +1,102 @@ +package udp + +import ( + "context" + "log" + "net" +) + +type Server struct { + address string + + bufSize int + + conn net.PacketConn + + handler func(conn net.PacketConn, buf []byte, addr net.Addr) + + recoveryHandler func(conn net.PacketConn, buf []byte, addr net.Addr, err interface{}) +} + +type Option func(*Server) + +func WithBufSize(bufSize int) Option { + return func(s *Server) { + if bufSize > 0 { + s.bufSize = bufSize + } + } +} + +func WithHandler(handler func(conn net.PacketConn, buf []byte, addr net.Addr)) Option { + return func(s *Server) { + if handler != nil { + s.handler = handler + } + } +} + +func WithRecoveryHandler(handler func(conn net.PacketConn, buf []byte, addr net.Addr, err interface{})) Option { + return func(s *Server) { + if handler != nil { + s.recoveryHandler = handler + } + } +} + +func NewServer(address string, opts ...Option) *Server { + s := &Server{ + address: address, + bufSize: 1024, + } + + for _, opt := range opts { + opt(s) + } + + return s +} + +func (s *Server) Start(ctx context.Context) (err error) { + s.conn, err = net.ListenPacket("udp", s.address) + if err != nil { + return + } + + log.Printf("udp server: listening on %s\n", s.address) + + buf := make([]byte, s.bufSize) + + for { + n, addr, err := s.conn.ReadFrom(buf) + if err != nil { + return err + } + + if s.handler == nil { + log.Printf("udp server: receive from %s: %s\n", addr.String(), string(buf)) + continue + } + + go s.handle(buf[:n], addr) + } + +} + +func (s *Server) handle(buf []byte, addr net.Addr) { + if s.recoveryHandler != nil { + defer func() { + if err := recover(); err != nil { + s.recoveryHandler(s.conn, buf, addr, err) + } + }() + } + + s.handler(s.conn, buf, addr) +} + +func (s *Server) Stop(ctx context.Context) error { + log.Println("udp server: stopping") + + return s.conn.Close() +} From 74eb42ba73b72e12b960312b33625de0b021473a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Flc=E3=82=9B?= Date: Thu, 28 Dec 2023 23:21:52 +0800 Subject: [PATCH 2/6] feat(udp): Added udp server --- udp/server_test.go | 62 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 62 insertions(+) create mode 100644 udp/server_test.go diff --git a/udp/server_test.go b/udp/server_test.go new file mode 100644 index 00000000..98167022 --- /dev/null +++ b/udp/server_test.go @@ -0,0 +1,62 @@ +package udp + +import ( + "context" + "net" + "sync" + "testing" + "time" +) + +func TestServer(t *testing.T) { + var ( + server *Server + wg sync.WaitGroup + done = make(chan []byte, 1) + ) + + wg.Add(3) + + go func() { + defer wg.Done() + + server = NewServer(":12190", WithHandler(func(conn net.PacketConn, buf []byte, addr net.Addr) { + done <- buf + }), WithRecoveryHandler(func(conn net.PacketConn, buf []byte, addr net.Addr, err interface{}) { + t.Log(err) + }), WithBufSize(1024)) + + server.Start(context.Background()) + }() + + go func() { + defer wg.Done() + + c, err := net.Dial("udp", ":12190") + if err != nil { + t.Error(err) + return + } + defer c.Close() + + _, err = c.Write([]byte("test")) + if err != nil { + t.Error(err) + return + } + }() + + go func() { + defer wg.Done() + + time.Sleep(time.Second * 1) + server.Stop(context.Background()) + }() + + wg.Wait() + + buf := <-done + if string(buf) != "test" { + t.Fatal("buf not equal test") + } +} From 83c6201f31160fbbe2d1cef03e40ddb9b82f6165 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Flc=E3=82=9B?= Date: Thu, 28 Dec 2023 23:31:20 +0800 Subject: [PATCH 3/6] feat(udp): Added udp server --- udp/server.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/udp/server.go b/udp/server.go index c5467fcd..1ca30eda 100644 --- a/udp/server.go +++ b/udp/server.go @@ -4,6 +4,7 @@ import ( "context" "log" "net" + "sync" ) type Server struct { @@ -12,6 +13,7 @@ type Server struct { bufSize int conn net.PacketConn + mu sync.Mutex // guards conn handler func(conn net.PacketConn, buf []byte, addr net.Addr) @@ -84,6 +86,9 @@ func (s *Server) Start(ctx context.Context) (err error) { } func (s *Server) handle(buf []byte, addr net.Addr) { + s.mu.Lock() + defer s.mu.Unlock() + if s.recoveryHandler != nil { defer func() { if err := recover(); err != nil { @@ -96,6 +101,9 @@ func (s *Server) handle(buf []byte, addr net.Addr) { } func (s *Server) Stop(ctx context.Context) error { + s.mu.Lock() + defer s.mu.Unlock() + log.Println("udp server: stopping") return s.conn.Close() From 290d1cdca3d1caada34b0cf3227bbe4a401cf070 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Flc=E3=82=9B?= Date: Thu, 28 Dec 2023 23:35:45 +0800 Subject: [PATCH 4/6] feat(udp): Added udp server --- udp/server.go | 8 -------- udp/server_test.go | 14 +++++--------- 2 files changed, 5 insertions(+), 17 deletions(-) diff --git a/udp/server.go b/udp/server.go index 1ca30eda..c5467fcd 100644 --- a/udp/server.go +++ b/udp/server.go @@ -4,7 +4,6 @@ import ( "context" "log" "net" - "sync" ) type Server struct { @@ -13,7 +12,6 @@ type Server struct { bufSize int conn net.PacketConn - mu sync.Mutex // guards conn handler func(conn net.PacketConn, buf []byte, addr net.Addr) @@ -86,9 +84,6 @@ func (s *Server) Start(ctx context.Context) (err error) { } func (s *Server) handle(buf []byte, addr net.Addr) { - s.mu.Lock() - defer s.mu.Unlock() - if s.recoveryHandler != nil { defer func() { if err := recover(); err != nil { @@ -101,9 +96,6 @@ func (s *Server) handle(buf []byte, addr net.Addr) { } func (s *Server) Stop(ctx context.Context) error { - s.mu.Lock() - defer s.mu.Unlock() - log.Println("udp server: stopping") return s.conn.Close() diff --git a/udp/server_test.go b/udp/server_test.go index 98167022..fa0d6a94 100644 --- a/udp/server_test.go +++ b/udp/server_test.go @@ -15,7 +15,7 @@ func TestServer(t *testing.T) { done = make(chan []byte, 1) ) - wg.Add(3) + wg.Add(2) go func() { defer wg.Done() @@ -26,7 +26,10 @@ func TestServer(t *testing.T) { t.Log(err) }), WithBufSize(1024)) - server.Start(context.Background()) + go server.Start(context.Background()) + + time.Sleep(time.Second * 1) + server.Stop(context.Background()) }() go func() { @@ -46,13 +49,6 @@ func TestServer(t *testing.T) { } }() - go func() { - defer wg.Done() - - time.Sleep(time.Second * 1) - server.Stop(context.Background()) - }() - wg.Wait() buf := <-done From 15feab2d99c3257bef3530615209c4b1ff2a1703 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Flc=E3=82=9B?= Date: Thu, 28 Dec 2023 23:40:02 +0800 Subject: [PATCH 5/6] feat(udp): Added udp server --- udp/server_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/udp/server_test.go b/udp/server_test.go index fa0d6a94..2c82fc2e 100644 --- a/udp/server_test.go +++ b/udp/server_test.go @@ -28,7 +28,7 @@ func TestServer(t *testing.T) { go server.Start(context.Background()) - time.Sleep(time.Second * 1) + time.Sleep(time.Second * 5) server.Stop(context.Background()) }() From a71201f18b7f838699454ecab19e01c02170a59d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Flc=E3=82=9B?= Date: Thu, 28 Dec 2023 23:53:02 +0800 Subject: [PATCH 6/6] feat(udp): Added udp server --- udp/server_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/udp/server_test.go b/udp/server_test.go index 2c82fc2e..ee2ac75e 100644 --- a/udp/server_test.go +++ b/udp/server_test.go @@ -35,6 +35,8 @@ func TestServer(t *testing.T) { go func() { defer wg.Done() + time.Sleep(time.Second * 3) + c, err := net.Dial("udp", ":12190") if err != nil { t.Error(err)