From 625c8ee51bc3e065da0e2e8d3a53d3634589f548 Mon Sep 17 00:00:00 2001 From: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> Date: Wed, 8 Jan 2025 20:52:44 +0100 Subject: [PATCH] chore: libwaku tweaks (#3233) * make lightpush return msg hash after successful publish * libwaku avoid the use of string * library alloc.nim allocate memory when nil cstring is passed * libwaku store_request remove extra destroyShared(self) --- library/alloc.nim | 5 +++ library/libwaku.nim | 32 ++++--------------- .../requests/protocols/lightpush_request.nim | 8 ++--- .../requests/protocols/relay_request.nim | 4 +-- .../requests/protocols/store_request.nim | 3 -- waku/node/waku_node.nim | 14 +++----- waku/waku_lightpush/client.nim | 23 +++++++------ waku/waku_lightpush/self_req_handler.nim | 15 +++++++-- 8 files changed, 45 insertions(+), 59 deletions(-) diff --git a/library/alloc.nim b/library/alloc.nim index 251dffde06..1a6f118b5e 100644 --- a/library/alloc.nim +++ b/library/alloc.nim @@ -4,6 +4,11 @@ type SharedSeq*[T] = tuple[data: ptr UncheckedArray[T], len: int] proc alloc*(str: cstring): cstring = # Byte allocation from the given address. # There should be the corresponding manual deallocation with deallocShared ! + if str.isNil(): + var ret = cast[cstring](allocShared(1)) # Allocate memory for the null terminator + ret[0] = '\0' # Set the null terminator + return ret + let ret = cast[cstring](allocShared(len(str) + 1)) copyMem(ret, str, len(str) + 1) return ret diff --git a/library/libwaku.nim b/library/libwaku.nim index 208f20257e..204b73d0dd 100644 --- a/library/libwaku.nim +++ b/library/libwaku.nim @@ -313,16 +313,10 @@ proc waku_relay_publish( defer: deallocShared(pst) - let targetPubSubTopic = - if len(pst) == 0: - DefaultPubsubTopic - else: - $pst - handleRequest( ctx, RequestType.RELAY, - RelayRequest.createShared(RelayMsgType.PUBLISH, PubsubTopic($pst), nil, wakuMessage), + RelayRequest.createShared(RelayMsgType.PUBLISH, pst, nil, wakuMessage), callback, userData, ) @@ -370,9 +364,7 @@ proc waku_relay_subscribe( handleRequest( ctx, RequestType.RELAY, - RelayRequest.createShared( - RelayMsgType.SUBSCRIBE, PubsubTopic($pst), WakuRelayHandler(cb) - ), + RelayRequest.createShared(RelayMsgType.SUBSCRIBE, pst, WakuRelayHandler(cb)), callback, userData, ) @@ -398,7 +390,7 @@ proc waku_relay_add_protected_shard( RelayMsgType.ADD_PROTECTED_SHARD, clusterId = clusterId, shardId = shardId, - publicKey = $pubk, + publicKey = pubk, ), callback, userData, @@ -421,9 +413,7 @@ proc waku_relay_unsubscribe( ctx, RequestType.RELAY, RelayRequest.createShared( - RelayMsgType.UNSUBSCRIBE, - PubsubTopic($pst), - WakuRelayHandler(onReceivedMessage(ctx)), + RelayMsgType.UNSUBSCRIBE, pst, WakuRelayHandler(onReceivedMessage(ctx)) ), callback, userData, @@ -445,7 +435,7 @@ proc waku_relay_get_num_connected_peers( handleRequest( ctx, RequestType.RELAY, - RelayRequest.createShared(RelayMsgType.LIST_CONNECTED_PEERS, PubsubTopic($pst)), + RelayRequest.createShared(RelayMsgType.LIST_CONNECTED_PEERS, pst), callback, userData, ) @@ -466,7 +456,7 @@ proc waku_relay_get_num_peers_in_mesh( handleRequest( ctx, RequestType.RELAY, - RelayRequest.createShared(RelayMsgType.LIST_MESH_PEERS, PubsubTopic($pst)), + RelayRequest.createShared(RelayMsgType.LIST_MESH_PEERS, pst), callback, userData, ) @@ -557,18 +547,10 @@ proc waku_lightpush_publish( callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) return RET_ERR - let targetPubSubTopic = - if len(pst) == 0: - DefaultPubsubTopic - else: - $pst - handleRequest( ctx, RequestType.LIGHTPUSH, - LightpushRequest.createShared( - LightpushMsgType.PUBLISH, PubsubTopic($pst), wakuMessage - ), + LightpushRequest.createShared(LightpushMsgType.PUBLISH, pst, wakuMessage), callback, userData, ) diff --git a/library/waku_thread/inter_thread_communication/requests/protocols/lightpush_request.nim b/library/waku_thread/inter_thread_communication/requests/protocols/lightpush_request.nim index fed28f1127..70a6b61162 100644 --- a/library/waku_thread/inter_thread_communication/requests/protocols/lightpush_request.nim +++ b/library/waku_thread/inter_thread_communication/requests/protocols/lightpush_request.nim @@ -32,7 +32,7 @@ type LightpushRequest* = object proc createShared*( T: type LightpushRequest, op: LightpushMsgType, - pubsubTopic: PubsubTopic, + pubsubTopic: cstring, m = WakuMessage(), ): ptr type T = var ret = createShared(T) @@ -97,12 +97,12 @@ proc process*( error "PUBLISH failed", error = errorMsg return err(errorMsg) - ( + let msgHashHex = ( await waku.node.wakuLightpushClient.publish( pubsubTopic, msg, peer = peerOpt.get() ) - ).isOkOr: + ).valueOr: error "PUBLISH failed", error = error return err("LightpushRequest error publishing: " & $error) - return ok("") + return ok(msgHashHex) diff --git a/library/waku_thread/inter_thread_communication/requests/protocols/relay_request.nim b/library/waku_thread/inter_thread_communication/requests/protocols/relay_request.nim index 126dc65d22..6a940e27ed 100644 --- a/library/waku_thread/inter_thread_communication/requests/protocols/relay_request.nim +++ b/library/waku_thread/inter_thread_communication/requests/protocols/relay_request.nim @@ -41,12 +41,12 @@ type RelayRequest* = object proc createShared*( T: type RelayRequest, op: RelayMsgType, - pubsubTopic: PubsubTopic = "", + pubsubTopic: cstring = nil, relayEventCallback: WakuRelayHandler = nil, m = WakuMessage(), clusterId: cint = 0, shardId: cint = 0, - publicKey: string = "", + publicKey: cstring = nil, ): ptr type T = var ret = createShared(T) ret[].operation = op diff --git a/library/waku_thread/inter_thread_communication/requests/protocols/store_request.nim b/library/waku_thread/inter_thread_communication/requests/protocols/store_request.nim index 9ccf8bcedc..91006e9222 100644 --- a/library/waku_thread/inter_thread_communication/requests/protocols/store_request.nim +++ b/library/waku_thread/inter_thread_communication/requests/protocols/store_request.nim @@ -106,9 +106,6 @@ proc destroyShared(self: ptr StoreRequest) = proc process_remote_query( self: ptr StoreRequest, waku: ptr Waku ): Future[Result[string, string]] {.async.} = - defer: - destroyShared(self) - let jsonContentRes = catch: parseJson($self[].jsonQuery) diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 591962472b..2158cb62ac 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -972,7 +972,7 @@ proc lightpushPublish*( pubsubTopic: Option[PubsubTopic], message: WakuMessage, peer: RemotePeerInfo, -): Future[WakuLightPushResult[void]] {.async, gcsafe.} = +): Future[WakuLightPushResult[string]] {.async, gcsafe.} = ## Pushes a `WakuMessage` to a node which relays it further on PubSub topic. ## Returns whether relaying was successful or not. ## `WakuMessage` should contain a `contentTopic` field for light node @@ -986,7 +986,7 @@ proc lightpushPublish*( pubsubTopic: PubsubTopic, message: WakuMessage, peer: RemotePeerInfo, - ): Future[WakuLightPushResult[void]] {.async, gcsafe.} = + ): Future[WakuLightPushResult[string]] {.async, gcsafe.} = let msgHash = pubsubTopic.computeMessageHash(message).to0xHex() if not node.wakuLightpushClient.isNil(): notice "publishing message with lightpush", @@ -1023,7 +1023,7 @@ proc lightpushPublish*( # TODO: Move to application module (e.g., wakunode2.nim) proc lightpushPublish*( node: WakuNode, pubsubTopic: Option[PubsubTopic], message: WakuMessage -): Future[WakuLightPushResult[void]] {. +): Future[WakuLightPushResult[string]] {. async, gcsafe, deprecated: "Use 'node.lightpushPublish()' instead" .} = if node.wakuLightpushClient.isNil() and node.wakuLightPush.isNil(): @@ -1040,13 +1040,7 @@ proc lightpushPublish*( elif not node.wakuLightPush.isNil(): peerOpt = some(RemotePeerInfo.init($node.switch.peerInfo.peerId)) - let publishRes = - await node.lightpushPublish(pubsubTopic, message, peer = peerOpt.get()) - - if publishRes.isErr(): - error "failed to publish message", error = publishRes.error - - return publishRes + return await node.lightpushPublish(pubsubTopic, message, peer = peerOpt.get()) ## Waku RLN Relay proc mountRlnRelay*( diff --git a/waku/waku_lightpush/client.nim b/waku/waku_lightpush/client.nim index 9f5819ecc7..3e20bf9e35 100644 --- a/waku/waku_lightpush/client.nim +++ b/waku/waku_lightpush/client.nim @@ -71,24 +71,23 @@ proc publish*( wl: WakuLightPushClient, pubSubTopic: PubsubTopic, message: WakuMessage, - peer: PeerId | RemotePeerInfo, -): Future[WakuLightPushResult[void]] {.async, gcsafe.} = - when peer is PeerId: - info "publish", - peerId = shortLog(peer), - msg_hash = computeMessageHash(pubsubTopic, message).to0xHex - else: - info "publish", - peerId = shortLog(peer.peerId), - msg_hash = computeMessageHash(pubsubTopic, message).to0xHex - + peer: RemotePeerInfo, +): Future[WakuLightPushResult[string]] {.async, gcsafe.} = + ## On success, returns the msg_hash of the published message + let msg_hash_hex_str = computeMessageHash(pubsubTopic, message).to0xHex() let pushRequest = PushRequest(pubSubTopic: pubSubTopic, message: message) ?await wl.sendPushRequest(pushRequest, peer) for obs in wl.publishObservers: obs.onMessagePublished(pubSubTopic, message) - return ok() + notice "publishing message with lightpush", + pubsubTopic = pubsubTopic, + contentTopic = message.contentTopic, + target_peer_id = peer.peerId, + msg_hash = msg_hash_hex_str + + return ok(msg_hash_hex_str) proc publishToAny*( wl: WakuLightPushClient, pubSubTopic: PubsubTopic, message: WakuMessage diff --git a/waku/waku_lightpush/self_req_handler.nim b/waku/waku_lightpush/self_req_handler.nim index 224a64e9d6..410d5808a9 100644 --- a/waku/waku_lightpush/self_req_handler.nim +++ b/waku/waku_lightpush/self_req_handler.nim @@ -9,7 +9,7 @@ ## which spawn a full service Waku node ## that could be used also as a lightpush client, helping testing and development. -import results, chronos, std/options, metrics +import results, chronos, chronicles, std/options, metrics, stew/byteutils import ../waku_core, ./protocol, @@ -21,9 +21,10 @@ import proc handleSelfLightPushRequest*( self: WakuLightPush, pubSubTopic: PubsubTopic, message: WakuMessage -): Future[WakuLightPushResult[void]] {.async.} = +): Future[WakuLightPushResult[string]] {.async.} = ## Handles the lightpush requests made by the node to itself. ## Normally used in REST-lightpush requests + ## On success, returns the msg_hash of the published message. try: # provide self peerId as now this node is used directly, thus there is no light client sender peer. @@ -45,6 +46,14 @@ proc handleSelfLightPushRequest*( else: return err("unknown failure") - return ok() + let msg_hash_hex_str = computeMessageHash(pubSubTopic, message).to0xHex() + + notice "publishing message with self hosted lightpush", + pubsubTopic = pubsubTopic, + contentTopic = message.contentTopic, + self_peer_id = selfPeerId, + msg_hash = msg_hash_hex_str + + return ok(msg_hash_hex_str) except Exception: return err("exception in handleSelfLightPushRequest: " & getCurrentExceptionMsg())