1
1
use alloc:: borrow:: Cow ;
2
2
use alloc:: rc:: Rc ;
3
- use alloc:: string:: { String , ToString } ;
3
+ use alloc:: string:: String ;
4
4
use alloc:: vec:: Vec ;
5
5
use serde:: Deserialize ;
6
6
use serde:: de:: { Error , IgnoredAny , VariantAccess , Visitor } ;
@@ -85,13 +85,76 @@ pub struct Checkpoint<'a> {
85
85
#[ serde( borrow) ]
86
86
pub buckets : Vec < BucketChecksum < ' a > > ,
87
87
#[ serde( default , borrow) ]
88
- pub streams : Vec < StreamDefinition < ' a > > ,
88
+ pub streams : Vec < StreamDescription < ' a > > ,
89
89
}
90
90
91
91
#[ derive( Deserialize , Debug ) ]
92
- pub struct StreamDefinition < ' a > {
92
+ pub struct StreamDescription < ' a > {
93
93
pub name : SyncLineStr < ' a > ,
94
94
pub is_default : bool ,
95
+ pub errors : Rc < Vec < StreamSubscriptionError > > ,
96
+ }
97
+
98
+ #[ derive( Deserialize , Debug ) ]
99
+ pub struct StreamSubscriptionError {
100
+ pub subscription : StreamSubscriptionErrorCause ,
101
+ pub message : String ,
102
+ }
103
+
104
+ /// The concrete stream subscription that has caused an error.
105
+ #[ derive( Debug ) ]
106
+ pub enum StreamSubscriptionErrorCause {
107
+ /// The error is caused by the stream being subscribed to by default (i.e., no parameters).
108
+ Default ,
109
+ /// The error is caused by an explicit subscription (e.g. due to invalid parameters).
110
+ ///
111
+ /// The inner value is the index into [StreamSubscriptionRequest::subscriptions] of the
112
+ /// faulty subscription.
113
+ ExplicitSubscription ( usize ) ,
114
+ }
115
+
116
+ impl < ' de > Deserialize < ' de > for StreamSubscriptionErrorCause {
117
+ fn deserialize < D > ( deserializer : D ) -> Result < Self , D :: Error >
118
+ where
119
+ D : serde:: Deserializer < ' de > ,
120
+ {
121
+ struct CauseVisitor ;
122
+
123
+ impl < ' de > Visitor < ' de > for CauseVisitor {
124
+ type Value = StreamSubscriptionErrorCause ;
125
+
126
+ fn expecting ( & self , formatter : & mut core:: fmt:: Formatter ) -> core:: fmt:: Result {
127
+ write ! ( formatter, "default or index" )
128
+ }
129
+
130
+ fn visit_str < E > ( self , _v : & str ) -> Result < Self :: Value , E >
131
+ where
132
+ E : Error ,
133
+ {
134
+ return Ok ( StreamSubscriptionErrorCause :: Default ) ;
135
+ }
136
+
137
+ fn visit_u64 < E > ( self , v : u64 ) -> Result < Self :: Value , E >
138
+ where
139
+ E : Error ,
140
+ {
141
+ return Ok ( StreamSubscriptionErrorCause :: ExplicitSubscription (
142
+ v as usize ,
143
+ ) ) ;
144
+ }
145
+
146
+ fn visit_i32 < E > ( self , v : i32 ) -> Result < Self :: Value , E >
147
+ where
148
+ E : Error ,
149
+ {
150
+ return Ok ( StreamSubscriptionErrorCause :: ExplicitSubscription (
151
+ v as usize ,
152
+ ) ) ;
153
+ }
154
+ }
155
+
156
+ deserializer. deserialize_any ( CauseVisitor )
157
+ }
95
158
}
96
159
97
160
#[ serde_as]
@@ -141,9 +204,13 @@ pub struct BucketChecksum<'a> {
141
204
#[ derive( Debug ) ]
142
205
pub enum BucketSubscriptionReason {
143
206
/// A bucket was created from a default stream.
144
- DerivedFromDefaultStream ( String ) ,
207
+ ///
208
+ /// The inner value is the index of the stream in [Checkpoint::streams].
209
+ DerivedFromDefaultStream ( usize ) ,
145
210
/// A bucket was created for a subscription id we've explicitly requested in the sync request.
146
- DerivedFromExplicitSubscription ( i64 ) ,
211
+ ///
212
+ /// The inner value is the index of the stream in [StreamSubscriptionRequest::subscriptions].
213
+ DerivedFromExplicitSubscription ( usize ) ,
147
214
}
148
215
149
216
impl < ' de > Deserialize < ' de > for BucketSubscriptionReason {
@@ -153,7 +220,7 @@ impl<'de> Deserialize<'de> for BucketSubscriptionReason {
153
220
{
154
221
struct MyVisitor ;
155
222
156
- const VARIANTS : & ' static [ & ' static str ] = & [ "def " , "sub" ] ;
223
+ const VARIANTS : & ' static [ & ' static str ] = & [ "default " , "sub" ] ;
157
224
158
225
impl < ' de > Visitor < ' de > for MyVisitor {
159
226
type Value = BucketSubscriptionReason ;
@@ -168,17 +235,12 @@ impl<'de> Deserialize<'de> for BucketSubscriptionReason {
168
235
{
169
236
let ( key, variant) = data. variant :: < & ' de str > ( ) ?;
170
237
Ok ( match key {
171
- "def" => BucketSubscriptionReason :: DerivedFromDefaultStream (
238
+ "default" => BucketSubscriptionReason :: DerivedFromDefaultStream (
239
+ variant. newtype_variant ( ) ?,
240
+ ) ,
241
+ "sub" => BucketSubscriptionReason :: DerivedFromExplicitSubscription (
172
242
variant. newtype_variant ( ) ?,
173
243
) ,
174
- "sub" => {
175
- let textual_id = variant. newtype_variant :: < & ' de str > ( ) ?;
176
- let id = textual_id
177
- . parse ( )
178
- . map_err ( |_| A :: Error :: custom ( "not an int" ) ) ?;
179
-
180
- BucketSubscriptionReason :: DerivedFromExplicitSubscription ( id)
181
- }
182
244
other => return Err ( A :: Error :: unknown_variant ( other, VARIANTS ) ) ,
183
245
} )
184
246
}
0 commit comments