From d72e46724cdc8d043f1eb8eb165a43343ca5ba2d Mon Sep 17 00:00:00 2001 From: Ondrej Kupka Date: Tue, 28 May 2013 22:24:54 +0200 Subject: [PATCH] Initial support for zmq_msg_t As mentioned in #78, current semantics of Recv are not sufficient for centain scenarios, e.g. you want to receive a pointer over PAIR and then copy the data, exactly as zmq_socket_monitor is supposed to be used. The issues is that every call to Recv automatically closes the zmq_msg_t it uses, which is not desirable here. This tiny initial implementation decouples Recv from zmq_msg_close call, making it possible to actually use zmq_socket_monitor and such. Fixes #78. Signed-off-by: Ondrej Kupka --- zmq.go | 27 +++++++++++++++++++++++++++ zmq_2_x.go | 28 ++++++++++++++++++++++++++++ zmq_3_x.go | 28 ++++++++++++++++++++++++++++ 3 files changed, 83 insertions(+) diff --git a/zmq.go b/zmq.go index 256ec53..8172094 100644 --- a/zmq.go +++ b/zmq.go @@ -445,6 +445,33 @@ func Device(t DeviceType, in, out *Socket) error { return errors.New("zmq_device() returned unexpectedly.") } +/** + * Message does not yet implement the full zmq_msg_t API, + * it merely enables us to postpone releasing zmq_msg_t. + */ +type Message struct { + m *C.zmq_msg_t + data []byte +} + +func (m *Message) GetData() (data []byte) { + return m.data +} + +func (m *Message) Close() error { + if m.m == nil { + return errors.New("The message is already closed.") + } + + _, err := C.zmq_msg_close(m.m) + if err != nil { + return casterr(err) + } + m.m = nil + + return nil +} + // XXX For now, this library abstracts zmq_msg_t out of the API. // int zmq_msg_init (zmq_msg_t *msg); // int zmq_msg_init_size (zmq_msg_t *msg, size_t size); diff --git a/zmq_2_x.go b/zmq_2_x.go index ff96d32..e71fb6c 100644 --- a/zmq_2_x.go +++ b/zmq_2_x.go @@ -102,6 +102,34 @@ func (s *Socket) Recv(flags SendRecvOption) (data []byte, err error) { return } +// Receive a single zmq_msg_t and wrap it in Message. +func (s *Socket) RecvMessage(flags SendRecvOption) (msg *Message, err error) { + // Allocate and initialise a new zmq_msg_t. + var m C.zmq_msg_t + var rc C.int + if rc, err = C.zmq_msg_init(&m); rc != 0 { + err = casterr(err) + return + } + + // Receive into the message. + if rc, err = C.zmq_recv(s.s, &m, C.int(flags)); rc != 0 { + C.zmq_msg_close(&m) + err = casterr(err) + return + } + + // Copy message data into a byte slice. + // FIXME Ideally this wouldn't require a copy. + size := C.zmq_msg_size(&m) + data := C.GoBytes(C.zmq_msg_data(&m), C.int(size)) + + return &Message{ + m: &m, + data: data, + }, nil +} + // Portability helper func (s *Socket) getRcvmore() (more bool, err error) { value, err := s.GetSockOptUInt64(RCVMORE) diff --git a/zmq_3_x.go b/zmq_3_x.go index 0a65db8..191ab6b 100644 --- a/zmq_3_x.go +++ b/zmq_3_x.go @@ -181,6 +181,34 @@ func (s *Socket) Recv(flags SendRecvOption) (data []byte, err error) { return } +// Receive a single zmq_msg_t and wrap it in Message. +func (s *Socket) RecvMessage(flags SendRecvOption) (msg *Message, err error) { + // Allocate and initialise a new zmq_msg_t. + var m C.zmq_msg_t + var rc C.int + if rc, err = C.zmq_msg_init(&m); rc != 0 { + err = casterr(err) + return + } + + // Receive into the message. + if rc, err = C.zmq_recvmsg(s.s, &m, C.int(flags)); rc != 0 { + C.zmq_msg_close(&m) + err = casterr(err) + return + } + + // Copy message data into a byte slice. + // FIXME Ideally this wouldn't require a copy. + size := C.zmq_msg_size(&m) + data := C.GoBytes(C.zmq_msg_data(&m), C.int(size)) + + return &Message{ + m: &m, + data: data, + }, nil +} + // Register a monitoring callback endpoint. // int zmq_socket_monitor (void *s, const char *addr, int events); func (s *Socket) Monitor(address string, events Event) error {