Skip to content
This repository has been archived by the owner on Feb 19, 2019. It is now read-only.

Initial support for zmq_msg_t #82

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions zmq.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,33 @@ func Device(t DeviceType, in, out *Socket) error {
return errors.New("zmq_device() returned unexpectedly.")
}

/**
* Message does not yet implement the full zmq_msg_t API,
* it merely enables us to postpone releasing zmq_msg_t.
*/
type Message struct {
m *C.zmq_msg_t
data []byte
}

func (m *Message) GetData() (data []byte) {
return m.data
}

func (m *Message) Close() error {
if m.m == nil {
return errors.New("The message is already closed.")
}

_, err := C.zmq_msg_close(m.m)
if err != nil {
return casterr(err)
}
m.m = nil

return nil
}

// XXX For now, this library abstracts zmq_msg_t out of the API.
// int zmq_msg_init (zmq_msg_t *msg);
// int zmq_msg_init_size (zmq_msg_t *msg, size_t size);
Expand Down
28 changes: 28 additions & 0 deletions zmq_2_x.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,34 @@ func (s *Socket) Recv(flags SendRecvOption) (data []byte, err error) {
return
}

// Receive a single zmq_msg_t and wrap it in Message.
func (s *Socket) RecvMessage(flags SendRecvOption) (msg *Message, err error) {
// Allocate and initialise a new zmq_msg_t.
var m C.zmq_msg_t
var rc C.int
if rc, err = C.zmq_msg_init(&m); rc != 0 {
err = casterr(err)
return
}

// Receive into the message.
if rc, err = C.zmq_recv(s.s, &m, C.int(flags)); rc != 0 {
C.zmq_msg_close(&m)
err = casterr(err)
return
}

// Copy message data into a byte slice.
// FIXME Ideally this wouldn't require a copy.
size := C.zmq_msg_size(&m)
data := C.GoBytes(C.zmq_msg_data(&m), C.int(size))

return &Message{
m: &m,
data: data,
}, nil
}

// Portability helper
func (s *Socket) getRcvmore() (more bool, err error) {
value, err := s.GetSockOptUInt64(RCVMORE)
Expand Down
28 changes: 28 additions & 0 deletions zmq_3_x.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,34 @@ func (s *Socket) Recv(flags SendRecvOption) (data []byte, err error) {
return
}

// Receive a single zmq_msg_t and wrap it in Message.
func (s *Socket) RecvMessage(flags SendRecvOption) (msg *Message, err error) {
// Allocate and initialise a new zmq_msg_t.
var m C.zmq_msg_t
var rc C.int
if rc, err = C.zmq_msg_init(&m); rc != 0 {
err = casterr(err)
return
}

// Receive into the message.
if rc, err = C.zmq_recvmsg(s.s, &m, C.int(flags)); rc != 0 {
C.zmq_msg_close(&m)
err = casterr(err)
return
}

// Copy message data into a byte slice.
// FIXME Ideally this wouldn't require a copy.
size := C.zmq_msg_size(&m)
data := C.GoBytes(C.zmq_msg_data(&m), C.int(size))

return &Message{
m: &m,
data: data,
}, nil
}

// Register a monitoring callback endpoint.
// int zmq_socket_monitor (void *s, const char *addr, int events);
func (s *Socket) Monitor(address string, events Event) error {
Expand Down