Skip to content

Commit bbc2f6a

Browse files
committed
refactor: rename tx/rx to reply/updates and WithChannels to Request
1 parent 6eea8ed commit bbc2f6a

File tree

13 files changed

+433
-390
lines changed

13 files changed

+433
-390
lines changed

examples/compute.rs

Lines changed: 72 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use irpc::{
1111
rpc::{listen, Handler},
1212
rpc_requests,
1313
util::{make_client_endpoint, make_server_endpoint},
14-
Client, LocalSender, Request, Service, WithChannels,
14+
Client, LocalSender, Request, RequestSender, Service,
1515
};
1616
use n0_future::{
1717
stream::StreamExt,
@@ -59,13 +59,13 @@ enum ComputeRequest {
5959
#[rpc_requests(ComputeService, message = ComputeMessage)]
6060
#[derive(Serialize, Deserialize)]
6161
enum ComputeProtocol {
62-
#[rpc(tx=oneshot::Sender<u128>)]
62+
#[rpc(reply=oneshot::Sender<u128>)]
6363
Sqr(Sqr),
64-
#[rpc(rx=mpsc::Receiver<i64>, tx=oneshot::Sender<i64>)]
64+
#[rpc(updates=mpsc::Receiver<i64>, reply=oneshot::Sender<i64>)]
6565
Sum(Sum),
66-
#[rpc(tx=mpsc::Sender<u64>)]
66+
#[rpc(reply=mpsc::Sender<u64>)]
6767
Fibonacci(Fibonacci),
68-
#[rpc(rx=mpsc::Receiver<u64>, tx=mpsc::Sender<u64>)]
68+
#[rpc(updates=mpsc::Receiver<u64>, reply=mpsc::Sender<u64>)]
6969
Multiply(Multiply),
7070
}
7171

@@ -76,10 +76,10 @@ struct ComputeActor {
7676

7777
impl ComputeActor {
7878
pub fn local() -> ComputeApi {
79-
let (tx, rx) = tokio::sync::mpsc::channel(128);
80-
let actor = Self { recv: rx };
79+
let (reply, request) = tokio::sync::mpsc::channel(128);
80+
let actor = Self { recv: request };
8181
n0_future::task::spawn(actor.run());
82-
let local = LocalSender::<ComputeMessage, ComputeService>::from(tx);
82+
let local = LocalSender::<ComputeMessage, ComputeService>::from(reply);
8383
ComputeApi {
8484
inner: local.into(),
8585
}
@@ -99,34 +99,45 @@ impl ComputeActor {
9999
match msg {
100100
ComputeMessage::Sqr(sqr) => {
101101
trace!("sqr {:?}", sqr);
102-
let WithChannels {
103-
tx, inner, span, ..
102+
let Request {
103+
reply,
104+
message,
105+
span,
106+
..
104107
} = sqr;
105108
let _entered = span.enter();
106-
let result = (inner.num as u128) * (inner.num as u128);
107-
tx.send(result).await?;
109+
let result = (message.num as u128) * (message.num as u128);
110+
reply.send(result).await?;
108111
}
109112
ComputeMessage::Sum(sum) => {
110113
trace!("sum {:?}", sum);
111-
let WithChannels { rx, tx, span, .. } = sum;
114+
let Request {
115+
updates,
116+
reply,
117+
span,
118+
..
119+
} = sum;
112120
let _entered = span.enter();
113-
let mut receiver = rx;
121+
let mut receiver = updates;
114122
let mut total = 0;
115123
while let Some(num) = receiver.recv().await? {
116124
total += num;
117125
}
118-
tx.send(total).await?;
126+
reply.send(total).await?;
119127
}
120128
ComputeMessage::Fibonacci(fib) => {
121129
trace!("fibonacci {:?}", fib);
122-
let WithChannels {
123-
tx, inner, span, ..
130+
let Request {
131+
reply,
132+
message,
133+
span,
134+
..
124135
} = fib;
125136
let _entered = span.enter();
126-
let sender = tx;
137+
let sender = reply;
127138
let mut a = 0u64;
128139
let mut b = 1u64;
129-
while a <= inner.max {
140+
while a <= message.max {
130141
sender.send(a).await?;
131142
let next = a + b;
132143
a = b;
@@ -135,17 +146,17 @@ impl ComputeActor {
135146
}
136147
ComputeMessage::Multiply(mult) => {
137148
trace!("multiply {:?}", mult);
138-
let WithChannels {
139-
rx,
140-
tx,
141-
inner,
149+
let Request {
150+
updates,
151+
reply,
152+
message,
142153
span,
143154
..
144155
} = mult;
145156
let _entered = span.enter();
146-
let mut receiver = rx;
147-
let sender = tx;
148-
let multiplier = inner.initial;
157+
let mut receiver = updates;
158+
let sender = reply;
159+
let multiplier = message.initial;
149160
while let Some(num) = receiver.recv().await? {
150161
sender.send(multiplier * num).await?;
151162
}
@@ -171,13 +182,13 @@ impl ComputeApi {
171182
let Some(local) = self.inner.local() else {
172183
bail!("cannot listen on a remote service");
173184
};
174-
let handler: Handler<ComputeProtocol> = Arc::new(move |msg, rx, tx| {
185+
let handler: Handler<ComputeProtocol> = Arc::new(move |msg, request, reply| {
175186
let local = local.clone();
176187
Box::pin(match msg {
177-
ComputeProtocol::Sqr(msg) => local.send((msg, tx)),
178-
ComputeProtocol::Sum(msg) => local.send((msg, tx, rx)),
179-
ComputeProtocol::Fibonacci(msg) => local.send((msg, tx)),
180-
ComputeProtocol::Multiply(msg) => local.send((msg, tx, rx)),
188+
ComputeProtocol::Sqr(msg) => local.send((msg, reply)),
189+
ComputeProtocol::Sum(msg) => local.send((msg, reply, request)),
190+
ComputeProtocol::Fibonacci(msg) => local.send((msg, reply)),
191+
ComputeProtocol::Multiply(msg) => local.send((msg, reply, request)),
181192
})
182193
});
183194
Ok(AbortOnDropHandle::new(task::spawn(listen(
@@ -188,45 +199,45 @@ impl ComputeApi {
188199
pub async fn sqr(&self, num: u64) -> anyhow::Result<oneshot::Receiver<u128>> {
189200
let msg = Sqr { num };
190201
match self.inner.request().await? {
191-
Request::Local(request) => {
192-
let (tx, rx) = oneshot::channel();
193-
request.send((msg, tx)).await?;
194-
Ok(rx)
202+
RequestSender::Local(sender) => {
203+
let (reply, request) = oneshot::channel();
204+
sender.send((msg, reply)).await?;
205+
Ok(request)
195206
}
196-
Request::Remote(request) => {
197-
let (_tx, rx) = request.write(msg).await?;
198-
Ok(rx.into())
207+
RequestSender::Remote(sender) => {
208+
let (_reply, request) = sender.write(msg).await?;
209+
Ok(request.into())
199210
}
200211
}
201212
}
202213

203214
pub async fn sum(&self) -> anyhow::Result<(mpsc::Sender<i64>, oneshot::Receiver<i64>)> {
204215
let msg = Sum;
205216
match self.inner.request().await? {
206-
Request::Local(request) => {
207-
let (num_tx, num_rx) = mpsc::channel(10);
208-
let (sum_tx, sum_rx) = oneshot::channel();
209-
request.send((msg, sum_tx, num_rx)).await?;
210-
Ok((num_tx, sum_rx))
217+
RequestSender::Local(sender) => {
218+
let (num_reply, num_request) = mpsc::channel(10);
219+
let (sum_reply, sum_request) = oneshot::channel();
220+
sender.send((msg, sum_reply, num_request)).await?;
221+
Ok((num_reply, sum_request))
211222
}
212-
Request::Remote(request) => {
213-
let (tx, rx) = request.write(msg).await?;
214-
Ok((tx.into(), rx.into()))
223+
RequestSender::Remote(sender) => {
224+
let (reply, request) = sender.write(msg).await?;
225+
Ok((reply.into(), request.into()))
215226
}
216227
}
217228
}
218229

219230
pub async fn fibonacci(&self, max: u64) -> anyhow::Result<mpsc::Receiver<u64>> {
220231
let msg = Fibonacci { max };
221232
match self.inner.request().await? {
222-
Request::Local(request) => {
223-
let (tx, rx) = mpsc::channel(128);
224-
request.send((msg, tx)).await?;
225-
Ok(rx)
233+
RequestSender::Local(sender) => {
234+
let (reply, request) = mpsc::channel(128);
235+
sender.send((msg, reply)).await?;
236+
Ok(request)
226237
}
227-
Request::Remote(request) => {
228-
let (_tx, rx) = request.write(msg).await?;
229-
Ok(rx.into())
238+
RequestSender::Remote(sender) => {
239+
let (_reply, request) = sender.write(msg).await?;
240+
Ok(request.into())
230241
}
231242
}
232243
}
@@ -237,15 +248,15 @@ impl ComputeApi {
237248
) -> anyhow::Result<(mpsc::Sender<u64>, mpsc::Receiver<u64>)> {
238249
let msg = Multiply { initial };
239250
match self.inner.request().await? {
240-
Request::Local(request) => {
241-
let (in_tx, in_rx) = mpsc::channel(128);
242-
let (out_tx, out_rx) = mpsc::channel(128);
243-
request.send((msg, out_tx, in_rx)).await?;
244-
Ok((in_tx, out_rx))
251+
RequestSender::Local(sender) => {
252+
let (in_reply, in_request) = mpsc::channel(128);
253+
let (out_reply, out_request) = mpsc::channel(128);
254+
sender.send((msg, out_reply, in_request)).await?;
255+
Ok((in_reply, out_request))
245256
}
246-
Request::Remote(request) => {
247-
let (tx, rx) = request.write(msg).await?;
248-
Ok((tx.into(), rx.into()))
257+
RequestSender::Remote(sender) => {
258+
let (reply, request) = sender.write(msg).await?;
259+
Ok((reply.into(), request.into()))
249260
}
250261
}
251262
}

examples/derive.rs

Lines changed: 29 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use irpc::{
1010
rpc::Handler,
1111
rpc_requests,
1212
util::{make_client_endpoint, make_server_endpoint},
13-
Client, LocalSender, Service, WithChannels,
13+
Client, LocalSender, Request, Service,
1414
};
1515
// Import the macro
1616
use n0_future::task::{self, AbortOnDropHandle};
@@ -51,13 +51,13 @@ struct SetMany;
5151
#[rpc_requests(StorageService, message = StorageMessage)]
5252
#[derive(Serialize, Deserialize)]
5353
enum StorageProtocol {
54-
#[rpc(tx=oneshot::Sender<Option<String>>)]
54+
#[rpc(reply=oneshot::Sender<Option<String>>)]
5555
Get(Get),
56-
#[rpc(tx=oneshot::Sender<()>)]
56+
#[rpc(reply=oneshot::Sender<()>)]
5757
Set(Set),
58-
#[rpc(tx=oneshot::Sender<u64>, rx=mpsc::Receiver<(String, String)>)]
58+
#[rpc(reply=oneshot::Sender<u64>, updates=mpsc::Receiver<(String, String)>)]
5959
SetMany(SetMany),
60-
#[rpc(tx=mpsc::Sender<String>)]
60+
#[rpc(reply=mpsc::Sender<String>)]
6161
List(List),
6262
}
6363

@@ -68,13 +68,13 @@ struct StorageActor {
6868

6969
impl StorageActor {
7070
pub fn spawn() -> StorageApi {
71-
let (tx, rx) = tokio::sync::mpsc::channel(1);
71+
let (reply, request) = tokio::sync::mpsc::channel(1);
7272
let actor = Self {
73-
recv: rx,
73+
recv: request,
7474
state: BTreeMap::new(),
7575
};
7676
n0_future::task::spawn(actor.run());
77-
let local = LocalSender::<StorageMessage, StorageService>::from(tx);
77+
let local = LocalSender::<StorageMessage, StorageService>::from(reply);
7878
StorageApi {
7979
inner: local.into(),
8080
}
@@ -90,30 +90,32 @@ impl StorageActor {
9090
match msg {
9191
StorageMessage::Get(get) => {
9292
info!("get {:?}", get);
93-
let WithChannels { tx, inner, .. } = get;
94-
tx.send(self.state.get(&inner.key).cloned()).await.ok();
93+
let Request { reply, message, .. } = get;
94+
reply.send(self.state.get(&message.key).cloned()).await.ok();
9595
}
9696
StorageMessage::Set(set) => {
9797
info!("set {:?}", set);
98-
let WithChannels { tx, inner, .. } = set;
99-
self.state.insert(inner.key, inner.value);
100-
tx.send(()).await.ok();
98+
let Request { reply, message, .. } = set;
99+
self.state.insert(message.key, message.value);
100+
reply.send(()).await.ok();
101101
}
102102
StorageMessage::SetMany(set) => {
103103
info!("set-many {:?}", set);
104-
let WithChannels { mut rx, tx, .. } = set;
104+
let Request {
105+
mut updates, reply, ..
106+
} = set;
105107
let mut count = 0;
106-
while let Ok(Some((key, value))) = rx.recv().await {
108+
while let Ok(Some((key, value))) = updates.recv().await {
107109
self.state.insert(key, value);
108110
count += 1;
109111
}
110-
tx.send(count).await.ok();
112+
reply.send(count).await.ok();
111113
}
112114
StorageMessage::List(list) => {
113115
info!("list {:?}", list);
114-
let WithChannels { tx, .. } = list;
116+
let Request { reply, .. } = list;
115117
for (key, value) in &self.state {
116-
if tx.send(format!("{key}={value}")).await.is_err() {
118+
if reply.send(format!("{key}={value}")).await.is_err() {
117119
break;
118120
}
119121
}
@@ -135,13 +137,13 @@ impl StorageApi {
135137

136138
pub fn listen(&self, endpoint: quinn::Endpoint) -> Result<AbortOnDropHandle<()>> {
137139
let local = self.inner.local().context("cannot listen on remote API")?;
138-
let handler: Handler<StorageProtocol> = Arc::new(move |msg, rx, tx| {
140+
let handler: Handler<StorageProtocol> = Arc::new(move |msg, request, reply| {
139141
let local = local.clone();
140142
Box::pin(match msg {
141-
StorageProtocol::Get(msg) => local.send((msg, tx)),
142-
StorageProtocol::Set(msg) => local.send((msg, tx)),
143-
StorageProtocol::SetMany(msg) => local.send((msg, tx, rx)),
144-
StorageProtocol::List(msg) => local.send((msg, tx)),
143+
StorageProtocol::Get(msg) => local.send((msg, reply)),
144+
StorageProtocol::Set(msg) => local.send((msg, reply)),
145+
StorageProtocol::SetMany(msg) => local.send((msg, reply, request)),
146+
StorageProtocol::List(msg) => local.send((msg, reply)),
145147
})
146148
});
147149
let join_handle = task::spawn(irpc::rpc::listen(endpoint, handler));
@@ -172,12 +174,12 @@ async fn client_demo(api: StorageApi) -> Result<()> {
172174
let value = api.get("hello".to_string()).await?;
173175
println!("get: hello = {:?}", value);
174176

175-
let (tx, rx) = api.set_many().await?;
177+
let (reply, request) = api.set_many().await?;
176178
for i in 0..3 {
177-
tx.send((format!("key{i}"), format!("value{i}"))).await?;
179+
reply.send((format!("key{i}"), format!("value{i}"))).await?;
178180
}
179-
drop(tx);
180-
let count = rx.await?;
181+
drop(reply);
182+
let count = request.await?;
181183
println!("set-many: {count} values set");
182184

183185
let mut list = api.list().await?;

0 commit comments

Comments
 (0)