Skip to content

Commit c74d03f

Browse files
committed
stream: added is_sync flag for begin/commit
Closes #TNTP-3334
1 parent aeb3d88 commit c74d03f

File tree

5 files changed

+218
-2
lines changed

5 files changed

+218
-2
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
1212

1313
- Implemented all box.schema.user operations requests and sugar interface (#426).
1414
- Implemented box.session.su request and sugar interface only for current session granting (#426).
15+
- Implemented support for `IPROTO_IS_SYNC` flag in stream transactions,
16+
added `IsSync(bool)` method for `BeginRequest`/`CommitRequest` (#447).
1517

1618
### Changed
1719

example_test.go

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -998,6 +998,90 @@ func ExampleBeginRequest_TxnIsolation() {
998998
fmt.Printf("Select after Rollback: response is %#v\n", data)
999999
}
10001000

1001+
func ExampleBeginRequest_IsSync() {
1002+
conn := exampleConnect(dialer, opts)
1003+
defer conn.Close()
1004+
1005+
// Tarantool supports IS_SYNC flag for BeginRequest since version 3.1.0
1006+
isLess, err := test_helpers.IsTarantoolVersionLess(3, 1, 0)
1007+
if err != nil || isLess {
1008+
return
1009+
}
1010+
1011+
stream, err := conn.NewStream()
1012+
if err != nil {
1013+
fmt.Printf("error getting the stream: %s\n", err)
1014+
return
1015+
}
1016+
1017+
// Begin transaction with synchronous mode
1018+
req := tarantool.NewBeginRequest().IsSync(true)
1019+
resp, err := stream.Do(req).GetResponse()
1020+
switch {
1021+
case err != nil:
1022+
fmt.Printf("error getting the response: %s\n", err)
1023+
case resp.Header().Error != tarantool.ErrorNo:
1024+
fmt.Printf("response error code: %s\n", resp.Header().Error)
1025+
default:
1026+
fmt.Println("Success.")
1027+
}
1028+
}
1029+
1030+
func ExampleCommitRequest_IsSync() {
1031+
conn := exampleConnect(dialer, opts)
1032+
defer conn.Close()
1033+
1034+
// Tarantool supports IS_SYNC flag for CommitRequest since version 3.1.0q
1035+
isLess, err := test_helpers.IsTarantoolVersionLess(3, 1, 0)
1036+
if err != nil || isLess {
1037+
return
1038+
}
1039+
1040+
var req tarantool.Request
1041+
1042+
stream, err := conn.NewStream()
1043+
if err != nil {
1044+
fmt.Printf("error getting the stream: %s\n", err)
1045+
return
1046+
}
1047+
1048+
// Begin transaction
1049+
req = tarantool.NewBeginRequest()
1050+
resp, err := stream.Do(req).GetResponse()
1051+
switch {
1052+
case err != nil:
1053+
fmt.Printf("error getting the response: %s\n", err)
1054+
return
1055+
case resp.Header().Error != tarantool.ErrorNo:
1056+
fmt.Printf("response error code: %s\n", resp.Header().Error)
1057+
return
1058+
}
1059+
1060+
// Insert in stream
1061+
req = tarantool.NewReplaceRequest("test").Tuple([]interface{}{1, "test"})
1062+
resp, err = stream.Do(req).GetResponse()
1063+
switch {
1064+
case err != nil:
1065+
fmt.Printf("error getting the response: %s\n", err)
1066+
return
1067+
case resp.Header().Error != tarantool.ErrorNo:
1068+
fmt.Printf("response error code: %s\n", resp.Header().Error)
1069+
return
1070+
}
1071+
1072+
// Commit transaction in sync mode
1073+
req = tarantool.NewCommitRequest().IsSync(true)
1074+
resp, err = stream.Do(req).GetResponse()
1075+
switch {
1076+
case err != nil:
1077+
fmt.Printf("error getting the response: %s\n", err)
1078+
case resp.Header().Error != tarantool.ErrorNo:
1079+
fmt.Printf("response error code: %s\n", resp.Header().Error)
1080+
default:
1081+
fmt.Println("Success.")
1082+
}
1083+
}
1084+
10011085
func ExampleErrorNo() {
10021086
conn := exampleConnect(dialer, opts)
10031087
defer conn.Close()

stream.go

Lines changed: 55 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ type BeginRequest struct {
4242
baseRequest
4343
txnIsolation TxnIsolationLevel
4444
timeout time.Duration
45+
isSync *bool
4546
}
4647

4748
// NewBeginRequest returns a new BeginRequest.
@@ -59,12 +60,18 @@ func (req *BeginRequest) TxnIsolation(txnIsolation TxnIsolationLevel) *BeginRequ
5960
return req
6061
}
6162

62-
// WithTimeout allows to set up a timeout for call BeginRequest.
63+
// Timeout allows to set up a timeout for call BeginRequest.
6364
func (req *BeginRequest) Timeout(timeout time.Duration) *BeginRequest {
6465
req.timeout = timeout
6566
return req
6667
}
6768

69+
// IsSync allows to set up a IsSync flag for call BeginRequest.
70+
func (req *BeginRequest) IsSync(isSync bool) *BeginRequest {
71+
req.isSync = &isSync
72+
return req
73+
}
74+
6875
// Body fills an msgpack.Encoder with the begin request body.
6976
func (req *BeginRequest) Body(_ SchemaResolver, enc *msgpack.Encoder) error {
7077
var (
@@ -81,6 +88,10 @@ func (req *BeginRequest) Body(_ SchemaResolver, enc *msgpack.Encoder) error {
8188
mapLen++
8289
}
8390

91+
if req.isSync != nil {
92+
mapLen++
93+
}
94+
8495
err := enc.EncodeMapLen(mapLen)
8596
if err != nil {
8697
return err
@@ -110,6 +121,18 @@ func (req *BeginRequest) Body(_ SchemaResolver, enc *msgpack.Encoder) error {
110121
}
111122
}
112123

124+
if req.isSync != nil {
125+
err = enc.EncodeUint(uint64(iproto.IPROTO_IS_SYNC))
126+
if err != nil {
127+
return err
128+
}
129+
130+
err = enc.EncodeBool(*req.isSync)
131+
if err != nil {
132+
return err
133+
}
134+
}
135+
113136
return nil
114137
}
115138

@@ -129,6 +152,8 @@ func (req *BeginRequest) Context(ctx context.Context) *BeginRequest {
129152
// Commit request can not be processed out of stream.
130153
type CommitRequest struct {
131154
baseRequest
155+
156+
isSync *bool
132157
}
133158

134159
// NewCommitRequest returns a new CommitRequest.
@@ -138,9 +163,37 @@ func NewCommitRequest() *CommitRequest {
138163
return req
139164
}
140165

166+
// IsSync allows to set up a IsSync flag for call BeginRequest.
167+
func (req *CommitRequest) IsSync(isSync bool) *CommitRequest {
168+
req.isSync = &isSync
169+
return req
170+
}
171+
141172
// Body fills an msgpack.Encoder with the commit request body.
142173
func (req *CommitRequest) Body(res SchemaResolver, enc *msgpack.Encoder) error {
143-
return enc.EncodeMapLen(0)
174+
var (
175+
mapLen = 0
176+
)
177+
178+
if req.isSync != nil {
179+
mapLen++
180+
}
181+
182+
if err := enc.EncodeMapLen(mapLen); err != nil {
183+
return err
184+
}
185+
186+
if req.isSync != nil {
187+
if err := enc.EncodeUint(uint64(iproto.IPROTO_IS_SYNC)); err != nil {
188+
return err
189+
}
190+
191+
if err := enc.EncodeBool(*req.isSync); err != nil {
192+
return err
193+
}
194+
}
195+
196+
return nil
144197
}
145198

146199
// Context sets a passed context to the request.

tarantool_test.go

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4200,6 +4200,75 @@ func TestFdDialer(t *testing.T) {
42004200
require.Equal(t, int8(0), resp[0])
42014201
}
42024202

4203+
const (
4204+
errNoSyncTransactionQueue = "The synchronous transaction queue doesn't belong to any instance"
4205+
)
4206+
4207+
func TestDoBeginRequest_IsSync(t *testing.T) {
4208+
test_helpers.SkipIfIsSyncUnsupported(t)
4209+
4210+
conn := test_helpers.ConnectWithValidation(t, dialer, opts)
4211+
defer conn.Close()
4212+
4213+
stream, err := conn.NewStream()
4214+
require.NoError(t, err)
4215+
4216+
_, err = stream.Do(NewBeginRequest().IsSync(true)).Get()
4217+
assert.Nil(t, err)
4218+
4219+
_, err = stream.Do(
4220+
NewReplaceRequest("test").Tuple([]interface{}{1, "foo"}),
4221+
).Get()
4222+
require.Nil(t, err)
4223+
4224+
_, err = stream.Do(NewCommitRequest()).Get()
4225+
require.NotNil(t, err)
4226+
assert.Contains(t, err.Error(), errNoSyncTransactionQueue)
4227+
}
4228+
4229+
func TestDoCommitRequest_IsSync(t *testing.T) {
4230+
test_helpers.SkipIfIsSyncUnsupported(t)
4231+
4232+
conn := test_helpers.ConnectWithValidation(t, dialer, opts)
4233+
defer conn.Close()
4234+
4235+
stream, err := conn.NewStream()
4236+
require.NoError(t, err)
4237+
4238+
_, err = stream.Do(NewBeginRequest()).Get()
4239+
require.Nil(t, err)
4240+
4241+
_, err = stream.Do(
4242+
NewReplaceRequest("test").Tuple([]interface{}{1, "foo"}),
4243+
).Get()
4244+
require.Nil(t, err)
4245+
4246+
_, err = stream.Do(NewCommitRequest().IsSync(true)).Get()
4247+
require.NotNil(t, err)
4248+
assert.Contains(t, err.Error(), errNoSyncTransactionQueue)
4249+
}
4250+
4251+
func TestDoCommitRequest_NoSync(t *testing.T) {
4252+
test_helpers.SkipIfIsSyncUnsupported(t)
4253+
4254+
conn := test_helpers.ConnectWithValidation(t, dialer, opts)
4255+
defer conn.Close()
4256+
4257+
stream, err := conn.NewStream()
4258+
require.NoError(t, err)
4259+
4260+
_, err = stream.Do(NewBeginRequest()).Get()
4261+
require.Nil(t, err)
4262+
4263+
_, err = stream.Do(
4264+
NewReplaceRequest("test").Tuple([]interface{}{1, "foo"}),
4265+
).Get()
4266+
require.Nil(t, err)
4267+
4268+
_, err = stream.Do(NewCommitRequest()).Get()
4269+
assert.Nil(t, err)
4270+
}
4271+
42034272
// runTestMain is a body of TestMain function
42044273
// (see https://pkg.go.dev/testing#hdr-Main).
42054274
// Using defer + os.Exit is not works so TestMain body

test_helpers/utils.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,14 @@ func SkipIfCrudSpliceBroken(t *testing.T) {
217217
SkipIfFeatureUnsupported(t, "crud update splice", 2, 0, 0)
218218
}
219219

220+
// SkipIfIsSyncUnsupported skips test run if Tarantool without
221+
// IS_SYNC support is used.
222+
func SkipIfIsSyncUnsupported(t *testing.T) {
223+
t.Helper()
224+
225+
SkipIfFeatureUnsupported(t, "is sync", 3, 1, 0)
226+
}
227+
220228
// IsTcsSupported checks if Tarantool supports centralized storage.
221229
// Tarantool supports centralized storage with Enterprise since 3.3.0 version.
222230
func IsTcsSupported() (bool, error) {

0 commit comments

Comments
 (0)