Skip to content

Commit

Permalink
CLIENT-3252 Support on_locking_only policy field.
Browse files Browse the repository at this point in the history
  • Loading branch information
BrianNichols committed Jan 14, 2025
1 parent ae67461 commit 3138135
Show file tree
Hide file tree
Showing 7 changed files with 111 additions and 29 deletions.
13 changes: 7 additions & 6 deletions src/include/aerospike/as_command.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2008-2024 Aerospike, Inc.
* Copyright 2008-2025 Aerospike, Inc.
*
* Portions may be licensed to Aerospike, Inc. under one or more contributor
* license agreements.
Expand Down Expand Up @@ -112,9 +112,10 @@ extern "C" {
// 1 1 allow unavailable

// MRT
#define AS_MSG_INFO4_MRT_VERIFY_READ (1 << 0) // Send MRT version to the server to be verified.
#define AS_MSG_INFO4_MRT_ROLL_FORWARD (1 << 1) // Roll forward MRT.
#define AS_MSG_INFO4_MRT_ROLL_BACK (1 << 2) // Roll back MRT.
#define AS_MSG_INFO4_MRT_VERIFY_READ (1 << 0) // Send MRT version to the server to be verified.
#define AS_MSG_INFO4_MRT_ROLL_FORWARD (1 << 1) // Roll forward MRT.
#define AS_MSG_INFO4_MRT_ROLL_BACK (1 << 2) // Roll back MRT.
#define AS_MSG_INFO4_MRT_ON_LOCKING_ONLY (1 << 4) // Must be able to lock record in transaction.

// Misc
#define AS_HEADER_SIZE 30
Expand Down Expand Up @@ -378,8 +379,8 @@ uint8_t*
as_command_write_header_write(
uint8_t* cmd, const as_policy_base* policy, as_policy_commit_level commit_level,
as_policy_exists exists, as_policy_gen gen_policy, uint32_t gen, uint32_t ttl,
uint16_t n_fields, uint16_t n_bins, bool durable_delete, uint8_t read_attr, uint8_t write_attr,
uint8_t info_attr
uint16_t n_fields, uint16_t n_bins, bool durable_delete, bool on_locking_only,
uint8_t read_attr, uint8_t write_attr, uint8_t info_attr
);

/**
Expand Down
65 changes: 65 additions & 0 deletions src/include/aerospike/as_policy.h
Original file line number Diff line number Diff line change
Expand Up @@ -694,6 +694,18 @@ typedef struct as_policy_write_s {
*/
bool durable_delete;

/**
* Execute the write command only if the record is not already locked by this transaction.
* If this field is true and the record is already locked by this transaction, the command will
* return AEROSPIKE_MRT_ALREADY_LOCKED.
*
* This field is useful for safely retrying non-idempotent writes as an alternative to simply
* aborting the transaction.
*
* Default: false.
*/
bool on_locking_only;

} as_policy_write;

/**
Expand Down Expand Up @@ -746,6 +758,18 @@ typedef struct as_policy_apply_s {
*/
bool durable_delete;

/**
* Execute the write command only if the record is not already locked by this transaction.
* If this field is true and the record is already locked by this transaction, the command will
* return AEROSPIKE_MRT_ALREADY_LOCKED.
*
* This field is useful for safely retrying non-idempotent writes as an alternative to simply
* aborting the transaction.
*
* Default: false.
*/
bool on_locking_only;

} as_policy_apply;

/**
Expand Down Expand Up @@ -851,6 +875,18 @@ typedef struct as_policy_operate_s {
*/
bool durable_delete;

/**
* Execute the write command only if the record is not already locked by this transaction.
* If this field is true and the record is already locked by this transaction, the command will
* return AEROSPIKE_MRT_ALREADY_LOCKED.
*
* This field is useful for safely retrying non-idempotent writes as an alternative to simply
* aborting the transaction.
*
* Default: false.
*/
bool on_locking_only;

/**
* Should as_record instance be allocated on the heap before user listener is called in
* async commands. If true, the user is responsible for calling as_record_destroy() when done
Expand Down Expand Up @@ -1180,6 +1216,18 @@ typedef struct as_policy_batch_write_s {
*/
bool durable_delete;

/**
* Execute the write command only if the record is not already locked by this transaction.
* If this field is true and the record is already locked by this transaction, the command will
* return AEROSPIKE_MRT_ALREADY_LOCKED.
*
* This field is useful for safely retrying non-idempotent writes as an alternative to simply
* aborting the transaction.
*
* Default: false.
*/
bool on_locking_only;

} as_policy_batch_write;

/**
Expand Down Expand Up @@ -1233,6 +1281,18 @@ typedef struct as_policy_batch_apply_s {
*/
bool durable_delete;

/**
* Execute the write command only if the record is not already locked by this transaction.
* If this field is true and the record is already locked by this transaction, the command will
* return AEROSPIKE_MRT_ALREADY_LOCKED.
*
* This field is useful for safely retrying non-idempotent writes as an alternative to simply
* aborting the transaction.
*
* Default: false.
*/
bool on_locking_only;

} as_policy_batch_apply;

/**
Expand Down Expand Up @@ -1673,6 +1733,7 @@ as_policy_write_init(as_policy_write* p)
p->ttl = 0; // AS_RECORD_DEFAULT_TTL
p->compression_threshold = AS_POLICY_COMPRESSION_THRESHOLD_DEFAULT;
p->durable_delete = false;
p->on_locking_only = false;
return p;
}

Expand Down Expand Up @@ -1713,6 +1774,7 @@ as_policy_operate_init(as_policy_operate* p)
p->read_touch_ttl_percent = 0;
p->deserialize = true;
p->durable_delete = false;
p->on_locking_only = false;
p->async_heap_rec = false;
p->respond_all_ops = false;
return p;
Expand Down Expand Up @@ -1784,6 +1846,7 @@ as_policy_apply_init(as_policy_apply* p)
p->commit_level = AS_POLICY_COMMIT_LEVEL_DEFAULT;
p->ttl = 0; // AS_RECORD_DEFAULT_TTL
p->durable_delete = false;
p->on_locking_only = false;
return p;
}

Expand Down Expand Up @@ -1884,6 +1947,7 @@ as_policy_batch_write_init(as_policy_batch_write* p)
p->exists = AS_POLICY_EXISTS_DEFAULT;
p->ttl = 0; // AS_RECORD_DEFAULT_TTL
p->durable_delete = false;
p->on_locking_only = false;
return p;
}

Expand All @@ -1899,6 +1963,7 @@ as_policy_batch_apply_init(as_policy_batch_apply* p)
p->commit_level = AS_POLICY_COMMIT_LEVEL_DEFAULT;
p->ttl = 0; // AS_RECORD_DEFAULT_TTL
p->durable_delete = false;
p->on_locking_only = false;
return p;
}

Expand Down
21 changes: 17 additions & 4 deletions src/main/aerospike/aerospike_batch.c
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ typedef struct {
uint8_t read_attr;
uint8_t write_attr;
uint8_t info_attr;
uint8_t txn_attr;
bool has_write;
bool send_key;
} as_batch_attr;
Expand Down Expand Up @@ -996,7 +997,7 @@ as_batch_read_record_size(as_batch_read_record* rec, as_batch_builder* bb, as_er
static as_status
as_batch_write_record_size(as_batch_write_record* rec, as_batch_builder* bb, as_error* err)
{
bb->size += 6; // gen(2) + ttl(4)
bb->size += 7; // gen(2) + ttl(4) + info4(1)

bool has_write = false;

Expand Down Expand Up @@ -1027,7 +1028,7 @@ as_batch_write_record_size(as_batch_write_record* rec, as_batch_builder* bb, as_
static void
as_batch_apply_record_size(as_batch_apply_record* rec, as_batch_builder* bb)
{
bb->size += 6; // gen(2) + ttl(4)
bb->size += 7; // gen(2) + ttl(4) + info4(1)
bb->size += as_command_string_field_size(rec->module);
bb->size += as_command_string_field_size(rec->function);

Expand All @@ -1043,7 +1044,7 @@ as_batch_apply_record_size(as_batch_apply_record* rec, as_batch_builder* bb)
static inline void
as_batch_remove_record_size(as_batch_builder* bb)
{
bb->size += 6; // gen(2) + ttl(4)
bb->size += 7; // gen(2) + ttl(4) + info4(1)
}

static inline void
Expand Down Expand Up @@ -1228,6 +1229,7 @@ as_batch_attr_read_header(as_batch_attr* attr, const as_policy_batch* p)
attr->info_attr = AS_MSG_INFO3_SC_READ_TYPE | AS_MSG_INFO3_SC_READ_RELAX;
break;
}
attr->txn_attr = 0;
attr->ttl = p->read_touch_ttl_percent;
attr->gen = 0;
attr->has_write = false;
Expand Down Expand Up @@ -1261,6 +1263,7 @@ as_batch_attr_read_row(as_batch_attr* attr, const as_policy_batch_read* p)
attr->info_attr = AS_MSG_INFO3_SC_READ_TYPE | AS_MSG_INFO3_SC_READ_RELAX;
break;
}
attr->txn_attr = 0;
attr->ttl = p->read_touch_ttl_percent;
attr->gen = 0;
attr->has_write = false;
Expand Down Expand Up @@ -1356,6 +1359,8 @@ as_batch_attr_write(as_batch_attr* attr, const as_policy_batch_write* p, as_oper
if (p->commit_level == AS_POLICY_COMMIT_LEVEL_MASTER) {
attr->info_attr |= AS_MSG_INFO3_COMMIT_MASTER;
}

attr->txn_attr = p->on_locking_only ? AS_MSG_INFO4_MRT_ON_LOCKING_ONLY : 0;
}

static void
Expand All @@ -1377,6 +1382,8 @@ as_batch_attr_apply(as_batch_attr* attr, const as_policy_batch_apply* p)
if (p->commit_level == AS_POLICY_COMMIT_LEVEL_MASTER) {
attr->info_attr |= AS_MSG_INFO3_COMMIT_MASTER;
}

attr->txn_attr = p->on_locking_only ? AS_MSG_INFO4_MRT_ON_LOCKING_ONLY : 0;
}

static void
Expand Down Expand Up @@ -1412,6 +1419,8 @@ as_batch_attr_remove(as_batch_attr* attr, const as_policy_batch_remove* p)
if (p->commit_level == AS_POLICY_COMMIT_LEVEL_MASTER) {
attr->info_attr |= AS_MSG_INFO3_COMMIT_MASTER;
}

attr->txn_attr = 0;
}

static uint8_t*
Expand Down Expand Up @@ -1486,10 +1495,11 @@ as_batch_write_write(
uint16_t n_fields, uint16_t n_ops
)
{
*p++ = (BATCH_MSG_INFO | BATCH_MSG_GEN | BATCH_MSG_TTL);
*p++ = (BATCH_MSG_INFO | BATCH_MSG_GEN | BATCH_MSG_TTL | BATCH_MSG_INFO4);
*p++ = attr->read_attr;
*p++ = attr->write_attr;
*p++ = attr->info_attr;
*p++ = attr->txn_attr;
*(uint16_t*)p = cf_swap_to_be16(attr->gen);
p += sizeof(uint16_t);
*(uint32_t*)p = cf_swap_to_be32(attr->ttl);
Expand Down Expand Up @@ -2282,6 +2292,7 @@ as_operate_policy_copy_read(
trg->ttl = 0;
trg->deserialize = pb->deserialize;
trg->durable_delete = false;
trg->on_locking_only = false;
trg->async_heap_rec = true; // Ignored in sync commands.
trg->respond_all_ops = false; // Not relevant for reads, since all reads return a result.

Expand Down Expand Up @@ -2323,6 +2334,7 @@ as_operate_policy_copy_write(
trg->read_touch_ttl_percent = pb->read_touch_ttl_percent;
trg->deserialize = pb->deserialize;
trg->durable_delete = src->durable_delete;
trg->on_locking_only = src->on_locking_only;
trg->async_heap_rec = true; // Ignored in sync commands.
trg->respond_all_ops = true;
}
Expand All @@ -2343,6 +2355,7 @@ as_apply_policy_copy(
trg->commit_level = src->commit_level;
trg->ttl = src->ttl;
trg->durable_delete = src->durable_delete;
trg->on_locking_only = src->on_locking_only;
}

static void
Expand Down
18 changes: 9 additions & 9 deletions src/main/aerospike/aerospike_key.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2008-2024 Aerospike, Inc.
* Copyright 2008-2025 Aerospike, Inc.
*
* Portions may be licensed to Aerospike, Inc. under one or more contributor
* license agreements.
Expand Down Expand Up @@ -854,7 +854,7 @@ as_put_write(void* udata, uint8_t* buf)

uint8_t* p = as_command_write_header_write(buf, &policy->base, policy->commit_level,
policy->exists, policy->gen, rec->gen, ttl, put->tdata.n_fields, put->n_bins,
policy->durable_delete, 0, AS_MSG_INFO2_WRITE, 0);
policy->durable_delete, policy->on_locking_only, 0, AS_MSG_INFO2_WRITE, 0);

p = as_command_write_key(p, &policy->base, policy->key, put->key, &put->tdata);
p = as_command_write_filter(&policy->base, put->filter_size, p);
Expand Down Expand Up @@ -1020,8 +1020,8 @@ aerospike_key_remove(

uint8_t* buf = as_command_buffer_init(size);
uint8_t* p = as_command_write_header_write(buf, &policy->base, policy->commit_level,
AS_POLICY_EXISTS_IGNORE, policy->gen, policy->generation, 0, tdata.n_fields, 0,
policy->durable_delete, 0, AS_MSG_INFO2_WRITE | AS_MSG_INFO2_DELETE, 0);
AS_POLICY_EXISTS_IGNORE, policy->gen, policy->generation, 0, tdata.n_fields, 0,
policy->durable_delete, false, 0, AS_MSG_INFO2_WRITE | AS_MSG_INFO2_DELETE, 0);

p = as_command_write_key(p, &policy->base, policy->key, key, &tdata);
p = as_command_write_filter(&policy->base, filter_size, p);
Expand Down Expand Up @@ -1067,8 +1067,8 @@ aerospike_key_remove_async_ex(
pipe_listener, size, as_event_command_parse_header, NULL, 0);

uint8_t* p = as_command_write_header_write(cmd->buf, &policy->base, policy->commit_level,
AS_POLICY_EXISTS_IGNORE, policy->gen, policy->generation, 0, tdata.n_fields, 0,
policy->durable_delete, 0, AS_MSG_INFO2_WRITE | AS_MSG_INFO2_DELETE, 0);
AS_POLICY_EXISTS_IGNORE, policy->gen, policy->generation, 0, tdata.n_fields, 0,
policy->durable_delete, false, 0, AS_MSG_INFO2_WRITE | AS_MSG_INFO2_DELETE, 0);

p = as_command_write_key(p, &policy->base, policy->key, key, &tdata);
p = as_command_write_filter(&policy->base, filter_size, p);
Expand Down Expand Up @@ -1222,8 +1222,8 @@ as_operate_write(void* udata, uint8_t* buf)

uint8_t* p = as_command_write_header_write(buf, &policy->base, policy->commit_level,
policy->exists, policy->gen, ops->gen, ttl, oper->tdata.n_fields,
oper->n_operations, policy->durable_delete, oper->read_attr, oper->write_attr,
oper->info_attr);
oper->n_operations, policy->durable_delete, policy->on_locking_only, oper->read_attr,
oper->write_attr, oper->info_attr);

p = as_command_write_key(p, &policy->base, policy->key, oper->key, &oper->tdata);
p = as_command_write_filter(&policy->base, oper->filter_size, p);
Expand Down Expand Up @@ -1481,7 +1481,7 @@ as_apply_write(void* udata, uint8_t* buf)

uint8_t* p = as_command_write_header_write(buf, &policy->base, policy->commit_level, 0,
AS_POLICY_GEN_IGNORE, 0, policy->ttl, ap->tdata.n_fields, 0, policy->durable_delete,
ap->read_attr, AS_MSG_INFO2_WRITE, 0);
policy->on_locking_only, ap->read_attr, AS_MSG_INFO2_WRITE, 0);

p = as_command_write_key(p, &policy->base, policy->key, ap->key, &ap->tdata);
p = as_command_write_filter(&policy->base, ap->filter_size, p);
Expand Down
6 changes: 3 additions & 3 deletions src/main/aerospike/aerospike_query.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2008-2024 Aerospike, Inc.
* Copyright 2008-2025 Aerospike, Inc.
*
* Portions may be licensed to Aerospike, Inc. under one or more contributor
* license agreements.
Expand Down Expand Up @@ -891,15 +891,15 @@ as_query_command_init(

p = as_command_write_header_write(cmd, base_policy, write_policy->commit_level,
write_policy->exists, AS_POLICY_GEN_IGNORE, 0, ttl, qb->n_fields, qb->n_ops,
write_policy->durable_delete, 0, AS_MSG_INFO2_WRITE, 0);
write_policy->durable_delete, false, 0, AS_MSG_INFO2_WRITE, 0);
}
else {
// Background query with UDF.
uint32_t ttl = (query->ttl == AS_RECORD_CLIENT_DEFAULT_TTL)? write_policy->ttl : query->ttl;

p = as_command_write_header_write(cmd, base_policy, write_policy->commit_level,
write_policy->exists, AS_POLICY_GEN_IGNORE, 0, ttl, qb->n_fields, qb->n_ops,
write_policy->durable_delete, 0, AS_MSG_INFO2_WRITE, 0);
write_policy->durable_delete, false, 0, AS_MSG_INFO2_WRITE, 0);
}

// Write namespace.
Expand Down
6 changes: 3 additions & 3 deletions src/main/aerospike/aerospike_scan.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2008-2024 Aerospike, Inc.
* Copyright 2008-2025 Aerospike, Inc.
*
* Portions may be licensed to Aerospike, Inc. under one or more contributor
* license agreements.
Expand Down Expand Up @@ -507,15 +507,15 @@ as_scan_command_init(

p = as_command_write_header_write(cmd, &policy->base, AS_POLICY_COMMIT_LEVEL_ALL,
AS_POLICY_EXISTS_IGNORE, AS_POLICY_GEN_IGNORE, 0, ttl, sb->n_fields, n_ops,
policy->durable_delete, 0, AS_MSG_INFO2_WRITE, 0);
policy->durable_delete, false, 0, AS_MSG_INFO2_WRITE, 0);
}
else if (scan->apply_each.function[0]) {
// Background scan with UDF.
uint32_t ttl = (scan->ttl == AS_RECORD_CLIENT_DEFAULT_TTL)? policy->ttl : scan->ttl;

p = as_command_write_header_write(cmd, &policy->base, AS_POLICY_COMMIT_LEVEL_ALL,
AS_POLICY_EXISTS_IGNORE, AS_POLICY_GEN_IGNORE, 0, ttl, sb->n_fields, n_ops,
policy->durable_delete, 0, AS_MSG_INFO2_WRITE, 0);
policy->durable_delete, false, 0, AS_MSG_INFO2_WRITE, 0);
}
else {
// Foreground scan.
Expand Down
Loading

0 comments on commit 3138135

Please sign in to comment.