Skip to content

Commit

Permalink
添加消息队列和命令模式处理消息
Browse files Browse the repository at this point in the history
  • Loading branch information
carlo committed Jun 27, 2020
1 parent fba9bb7 commit 00f00c3
Show file tree
Hide file tree
Showing 11 changed files with 261 additions and 16 deletions.
4 changes: 3 additions & 1 deletion cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/spf13/cobra"
"github.com/wudaoluo/sonic/common"
"github.com/wudaoluo/sonic/dao"
"github.com/wudaoluo/sonic/queue"
"os"

"github.com/spf13/viper"
Expand Down Expand Up @@ -55,7 +56,8 @@ func Execute() {

func init() {
cobra.OnInitialize(initConfig) //永远在顶层
cobra.OnInitialize(dao.InitDB)
cobra.OnInitialize(dao.Init)
cobra.OnInitialize(queue.Init)

// Here you will define your flags and configuration settings.
// Cobra supports persistent flags, which, if defined here,
Expand Down
65 changes: 65 additions & 0 deletions common/command.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package common


type CmdType int

func (c CmdType) String() string {
var value string
switch c {
case MsgReceive:
value = "msg_receive"
case MsgSend:
value = "msg_send"
default:
value = "not found"
}
return value
}

const (
MsgReceive CmdType = iota + 1 //消息接受
MsgSend //消息发送
ContactPersonList //联系人列表
MsgTotalUnRead //总未读书
MsgUnRead //消息未读数

)


var Cmd = &CommandHandler{CmdMap: make(map[CmdType]Command)}

// 命令接口
type Command interface {
Do(args interface{}) (interface{}, error)
}

// 上下文
type CmdContext struct {
CmdType CmdType
Args interface{}
}

// 命令管理者
type CommandHandler struct {
CmdMap map[CmdType]Command
}

// 处理命令
func (ch *CommandHandler) Handle(ctx *CmdContext) (interface{}, error) {
if ctx == nil {
return nil, COMMAND_NIL
}
cmd, ok := ch.CmdMap[ctx.CmdType]
if ok {
return cmd.Do(ctx.Args)
}
return nil, COMMAND_INVALID
}

// 注册命令
func (ch *CommandHandler) Register(cmdType CmdType, cmd Command) {
ch.CmdMap[cmdType] = cmd
}



2 changes: 2 additions & 0 deletions common/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ var (
SERVICE_ERROR = errors.New("服务器错误")
AUTH_ERROR = errors.New("认证失败")
TOKEN_INVALID = errors.New("token is invalid")
COMMAND_INVALID = errors.New("invalid Command")
COMMAND_NIL = errors.New("command is nil")
//PARAM_ERROR = engine.NewResponseError(ERROR_PARAM_ERROR, "param is error")
//MYSQL_PARAM_ERROR = engine.NewResponseError(ERROR_PARAM_ERROR, "sql param is error")
)
Expand Down
12 changes: 3 additions & 9 deletions dao/im_user.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,6 @@ package dao

import "github.com/wudaoluo/sonic/model"

//




type ImUserService struct {}

Expand All @@ -17,7 +13,7 @@ func init() {


func (t *ImUserService) SelectByUserName(username string) (*model.ImUser,error) {
sqlText := "SELECT avatar,email,password,uid,username FROM IM_USER WHERE username = ? limit 1"
sqlText := "SELECT avatar,email,password,uid,username FROM im_user WHERE username = ? limit 1"
row := db.QueryRow(sqlText,username)

msg := new(model.ImUser)
Expand All @@ -41,12 +37,10 @@ func (t *ImUserService) Select() ([]*model.ImUser,error) {
}

func (t *ImUserService) Insert(msg *model.ImUser) (int64,error) {
sqlText := "INSERT INTO IM_USER ( email, password, uid, username) " +
sqlText := "INSERT INTO im_user ( email, password, uid, username) " +
"VALUE ( ?, ?, ?, ?)"

ret, err := db.Exec(sqlText,


&msg.Email,
&msg.Password,
&msg.Uid,
Expand All @@ -59,7 +53,7 @@ func (t *ImUserService) Insert(msg *model.ImUser) (int64,error) {
}

func (t *ImUserService) DeleteById (id int64) error {
sqlText := "DELETE FROM IM_USER where id = ?"
sqlText := "DELETE FROM im_user where id = ?"
_, err := db.Exec(sqlText,id)
return err
}
2 changes: 1 addition & 1 deletion dao/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
var db *sql.DB


func InitDB() {
func Init() {
var buf bytes.Buffer
var err error

Expand Down
43 changes: 43 additions & 0 deletions endpoint/v1/logic.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package v1

import (
"github.com/gin-gonic/gin"
"github.com/wudaoluo/sonic/common"
"github.com/wudaoluo/sonic/middleware"
"github.com/wudaoluo/sonic/model"
"github.com/wudaoluo/sonic/service"
)

func LogicV1Router(parentRoute gin.IRouter) {
router := parentRoute.Group("/auth")
end := NewAuth()
router.POST("/login",middleware.Jwt(),end.Login)
router.POST("/logout",end.Logout)
router.POST("/token/refresh",end.TokenRefresh)
}

type logic struct {
service *service.AuthService
}

func NewLogic() *auth {
return &auth{
service.NewAuthService(),
}
}

func (l logic) Command(c *gin.Context) {
var req model.AuthLogin
if err := c.Bind(&req); err != nil {
common.GinJsonRespErr(c,common.PARAM_ERROR)
return
}

ret,err := common.Cmd.Handle(&common.CmdContext{CmdType: common.MsgReceive, Args: " Post"})
if err != nil {
common.GinJsonRespErr(c,err)
return
}
common.GinJsonResp(c,ret)
}

7 changes: 4 additions & 3 deletions model/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ type Config struct {
Default ConfigDefault
Gateway ConfigGateway
Auth ConfigAuth
MQ ConfigMQ
Queue ConfigQueue
Cache ConfigCache
Storage ConfigStorage
}
Expand All @@ -29,8 +29,9 @@ type ConfigAuthJwt struct {
Key string
}

type ConfigMQ struct {

type ConfigQueue struct {
Type string `json:"type"`
Addrs []string `json:"addrs"`
}

type ConfigCache struct {
Expand Down
58 changes: 58 additions & 0 deletions queue/queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package queue

import(
"github.com/wudaoluo/golog"
"github.com/wudaoluo/sonic/common"
"context"
"github.com/wudaoluo/sonic/queue/queue_default"
)

type Queue interface {
//Start()
Processor
Consumeor
}

type Processor interface {
Producer(ctx context.Context,buf []byte) error
ProducerClose() error
}

type Consumeor interface {
Consumer(ctx context.Context,fn func(buf []byte))
ConsumerClose() error
}


var queue Queue

func Producer(ctx context.Context,buf []byte) error {
return queue.Producer(ctx,buf)
}


func ProducerClose() error {
return queue.ProducerClose()
}

func Consumer(ctx context.Context,fn func(buf []byte)) {
queue.Consumer(ctx,fn)
}

func ConsumerClose() error {
return queue.ConsumerClose()
}

func Init() {
conf := &common.GetConf().Queue
golog.Info("queue.init","type",conf.Type)
switch conf.Type {
case "kafka":
//queue = queue_kafka.New(conf)
case "default":
queue = queue_default.New(conf)
default:
golog.Error("queue.init","err",common.NOT_FOUND_ERROR)
panic(common.NOT_FOUND_ERROR)
}
}
55 changes: 55 additions & 0 deletions queue/queue_default/queue_default.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package queue_default

import (
"github.com/wudaoluo/sonic/model"
"sync/atomic"
"context"
)

type QueueDefault struct {
queue chan []byte
status int32
}

func New(conf *model.ConfigQueue) *QueueDefault {
return &QueueDefault{
queue:make(chan []byte, 100),
status: 1,
}
}

func (q *QueueDefault) Consumer(ctx context.Context,fn func(buf []byte)) {
go func() {
for msg := range q.queue {
fn(msg)
}
}()

}

func (q *QueueDefault) ProducerClose() error {
if atomic.LoadInt32(&q.status) == 1 {
atomic.StoreInt32(&q.status,0)
close(q.queue)
}

return nil
}

func (q *QueueDefault) ConsumerClose() error {
if atomic.LoadInt32(&q.status) == 1 {
atomic.StoreInt32(&q.status,0)
close(q.queue)
}


return nil
}

func (q *QueueDefault) Producer(ctx context.Context,buf []byte) error {
if atomic.LoadInt32(&q.status) == 1 {
q.queue <- buf
}

return nil
}
24 changes: 24 additions & 0 deletions service/logic.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package service

import (
"fmt"
"github.com/wudaoluo/sonic/common"
)



func init() {
common.Cmd.Register(common.MsgReceive, &MsgReceive{})
}


type MsgReceive struct {

}

func (m *MsgReceive) Do(args interface{}) (interface{}, error) {
fmt.Println("PostCommand")
return args, nil
}


5 changes: 3 additions & 2 deletions sonic.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@ key = "eewjdjfdfijdfldfdlfdlfmdlmferieireorealdkfdkdfjfj"
[logic]
addr = ":7779"

[mq]
addr = ["127.0.0.1:1010"]
[queue]
type = "default" #kafka default
addrs = ["127.0.0.1:1010"]

[cache]
addr = ""
Expand Down

0 comments on commit 00f00c3

Please sign in to comment.