diff --git a/src/include/aerospike/as_command.h b/src/include/aerospike/as_command.h index cfd87ce64..5590438b4 100644 --- a/src/include/aerospike/as_command.h +++ b/src/include/aerospike/as_command.h @@ -1,5 +1,5 @@ /* - * Copyright 2008-2024 Aerospike, Inc. + * Copyright 2008-2025 Aerospike, Inc. * * Portions may be licensed to Aerospike, Inc. under one or more contributor * license agreements. @@ -112,9 +112,10 @@ extern "C" { // 1 1 allow unavailable // MRT -#define AS_MSG_INFO4_MRT_VERIFY_READ (1 << 0) // Send MRT version to the server to be verified. -#define AS_MSG_INFO4_MRT_ROLL_FORWARD (1 << 1) // Roll forward MRT. -#define AS_MSG_INFO4_MRT_ROLL_BACK (1 << 2) // Roll back MRT. +#define AS_MSG_INFO4_MRT_VERIFY_READ (1 << 0) // Send MRT version to the server to be verified. +#define AS_MSG_INFO4_MRT_ROLL_FORWARD (1 << 1) // Roll forward MRT. +#define AS_MSG_INFO4_MRT_ROLL_BACK (1 << 2) // Roll back MRT. +#define AS_MSG_INFO4_MRT_ON_LOCKING_ONLY (1 << 4) // Must be able to lock record in transaction. // Misc #define AS_HEADER_SIZE 30 @@ -378,8 +379,8 @@ uint8_t* as_command_write_header_write( uint8_t* cmd, const as_policy_base* policy, as_policy_commit_level commit_level, as_policy_exists exists, as_policy_gen gen_policy, uint32_t gen, uint32_t ttl, - uint16_t n_fields, uint16_t n_bins, bool durable_delete, uint8_t read_attr, uint8_t write_attr, - uint8_t info_attr + uint16_t n_fields, uint16_t n_bins, bool durable_delete, bool on_locking_only, + uint8_t read_attr, uint8_t write_attr, uint8_t info_attr ); /** diff --git a/src/include/aerospike/as_policy.h b/src/include/aerospike/as_policy.h index 487a908d1..4394e544d 100644 --- a/src/include/aerospike/as_policy.h +++ b/src/include/aerospike/as_policy.h @@ -694,6 +694,18 @@ typedef struct as_policy_write_s { */ bool durable_delete; + /** + * Execute the write command only if the record is not already locked by this transaction. + * If this field is true and the record is already locked by this transaction, the command will + * return AEROSPIKE_MRT_ALREADY_LOCKED. + * + * This field is useful for safely retrying non-idempotent writes as an alternative to simply + * aborting the transaction. + * + * Default: false. + */ + bool on_locking_only; + } as_policy_write; /** @@ -746,6 +758,18 @@ typedef struct as_policy_apply_s { */ bool durable_delete; + /** + * Execute the write command only if the record is not already locked by this transaction. + * If this field is true and the record is already locked by this transaction, the command will + * return AEROSPIKE_MRT_ALREADY_LOCKED. + * + * This field is useful for safely retrying non-idempotent writes as an alternative to simply + * aborting the transaction. + * + * Default: false. + */ + bool on_locking_only; + } as_policy_apply; /** @@ -851,6 +875,18 @@ typedef struct as_policy_operate_s { */ bool durable_delete; + /** + * Execute the write command only if the record is not already locked by this transaction. + * If this field is true and the record is already locked by this transaction, the command will + * return AEROSPIKE_MRT_ALREADY_LOCKED. + * + * This field is useful for safely retrying non-idempotent writes as an alternative to simply + * aborting the transaction. + * + * Default: false. + */ + bool on_locking_only; + /** * Should as_record instance be allocated on the heap before user listener is called in * async commands. If true, the user is responsible for calling as_record_destroy() when done @@ -1180,6 +1216,18 @@ typedef struct as_policy_batch_write_s { */ bool durable_delete; + /** + * Execute the write command only if the record is not already locked by this transaction. + * If this field is true and the record is already locked by this transaction, the command will + * return AEROSPIKE_MRT_ALREADY_LOCKED. + * + * This field is useful for safely retrying non-idempotent writes as an alternative to simply + * aborting the transaction. + * + * Default: false. + */ + bool on_locking_only; + } as_policy_batch_write; /** @@ -1233,6 +1281,18 @@ typedef struct as_policy_batch_apply_s { */ bool durable_delete; + /** + * Execute the write command only if the record is not already locked by this transaction. + * If this field is true and the record is already locked by this transaction, the command will + * return AEROSPIKE_MRT_ALREADY_LOCKED. + * + * This field is useful for safely retrying non-idempotent writes as an alternative to simply + * aborting the transaction. + * + * Default: false. + */ + bool on_locking_only; + } as_policy_batch_apply; /** @@ -1673,6 +1733,7 @@ as_policy_write_init(as_policy_write* p) p->ttl = 0; // AS_RECORD_DEFAULT_TTL p->compression_threshold = AS_POLICY_COMPRESSION_THRESHOLD_DEFAULT; p->durable_delete = false; + p->on_locking_only = false; return p; } @@ -1713,6 +1774,7 @@ as_policy_operate_init(as_policy_operate* p) p->read_touch_ttl_percent = 0; p->deserialize = true; p->durable_delete = false; + p->on_locking_only = false; p->async_heap_rec = false; p->respond_all_ops = false; return p; @@ -1784,6 +1846,7 @@ as_policy_apply_init(as_policy_apply* p) p->commit_level = AS_POLICY_COMMIT_LEVEL_DEFAULT; p->ttl = 0; // AS_RECORD_DEFAULT_TTL p->durable_delete = false; + p->on_locking_only = false; return p; } @@ -1884,6 +1947,7 @@ as_policy_batch_write_init(as_policy_batch_write* p) p->exists = AS_POLICY_EXISTS_DEFAULT; p->ttl = 0; // AS_RECORD_DEFAULT_TTL p->durable_delete = false; + p->on_locking_only = false; return p; } @@ -1899,6 +1963,7 @@ as_policy_batch_apply_init(as_policy_batch_apply* p) p->commit_level = AS_POLICY_COMMIT_LEVEL_DEFAULT; p->ttl = 0; // AS_RECORD_DEFAULT_TTL p->durable_delete = false; + p->on_locking_only = false; return p; } diff --git a/src/main/aerospike/aerospike_batch.c b/src/main/aerospike/aerospike_batch.c index e887cdcc6..d1ea83f83 100644 --- a/src/main/aerospike/aerospike_batch.c +++ b/src/main/aerospike/aerospike_batch.c @@ -75,6 +75,7 @@ typedef struct { uint8_t read_attr; uint8_t write_attr; uint8_t info_attr; + uint8_t txn_attr; bool has_write; bool send_key; } as_batch_attr; @@ -996,7 +997,7 @@ as_batch_read_record_size(as_batch_read_record* rec, as_batch_builder* bb, as_er static as_status as_batch_write_record_size(as_batch_write_record* rec, as_batch_builder* bb, as_error* err) { - bb->size += 6; // gen(2) + ttl(4) + bb->size += 7; // gen(2) + ttl(4) + info4(1) bool has_write = false; @@ -1027,7 +1028,7 @@ as_batch_write_record_size(as_batch_write_record* rec, as_batch_builder* bb, as_ static void as_batch_apply_record_size(as_batch_apply_record* rec, as_batch_builder* bb) { - bb->size += 6; // gen(2) + ttl(4) + bb->size += 7; // gen(2) + ttl(4) + info4(1) bb->size += as_command_string_field_size(rec->module); bb->size += as_command_string_field_size(rec->function); @@ -1043,7 +1044,7 @@ as_batch_apply_record_size(as_batch_apply_record* rec, as_batch_builder* bb) static inline void as_batch_remove_record_size(as_batch_builder* bb) { - bb->size += 6; // gen(2) + ttl(4) + bb->size += 7; // gen(2) + ttl(4) + info4(1) } static inline void @@ -1228,6 +1229,7 @@ as_batch_attr_read_header(as_batch_attr* attr, const as_policy_batch* p) attr->info_attr = AS_MSG_INFO3_SC_READ_TYPE | AS_MSG_INFO3_SC_READ_RELAX; break; } + attr->txn_attr = 0; attr->ttl = p->read_touch_ttl_percent; attr->gen = 0; attr->has_write = false; @@ -1261,6 +1263,7 @@ as_batch_attr_read_row(as_batch_attr* attr, const as_policy_batch_read* p) attr->info_attr = AS_MSG_INFO3_SC_READ_TYPE | AS_MSG_INFO3_SC_READ_RELAX; break; } + attr->txn_attr = 0; attr->ttl = p->read_touch_ttl_percent; attr->gen = 0; attr->has_write = false; @@ -1356,6 +1359,8 @@ as_batch_attr_write(as_batch_attr* attr, const as_policy_batch_write* p, as_oper if (p->commit_level == AS_POLICY_COMMIT_LEVEL_MASTER) { attr->info_attr |= AS_MSG_INFO3_COMMIT_MASTER; } + + attr->txn_attr = p->on_locking_only ? AS_MSG_INFO4_MRT_ON_LOCKING_ONLY : 0; } static void @@ -1377,6 +1382,8 @@ as_batch_attr_apply(as_batch_attr* attr, const as_policy_batch_apply* p) if (p->commit_level == AS_POLICY_COMMIT_LEVEL_MASTER) { attr->info_attr |= AS_MSG_INFO3_COMMIT_MASTER; } + + attr->txn_attr = p->on_locking_only ? AS_MSG_INFO4_MRT_ON_LOCKING_ONLY : 0; } static void @@ -1412,6 +1419,8 @@ as_batch_attr_remove(as_batch_attr* attr, const as_policy_batch_remove* p) if (p->commit_level == AS_POLICY_COMMIT_LEVEL_MASTER) { attr->info_attr |= AS_MSG_INFO3_COMMIT_MASTER; } + + attr->txn_attr = 0; } static uint8_t* @@ -1486,10 +1495,11 @@ as_batch_write_write( uint16_t n_fields, uint16_t n_ops ) { - *p++ = (BATCH_MSG_INFO | BATCH_MSG_GEN | BATCH_MSG_TTL); + *p++ = (BATCH_MSG_INFO | BATCH_MSG_GEN | BATCH_MSG_TTL | BATCH_MSG_INFO4); *p++ = attr->read_attr; *p++ = attr->write_attr; *p++ = attr->info_attr; + *p++ = attr->txn_attr; *(uint16_t*)p = cf_swap_to_be16(attr->gen); p += sizeof(uint16_t); *(uint32_t*)p = cf_swap_to_be32(attr->ttl); @@ -2282,6 +2292,7 @@ as_operate_policy_copy_read( trg->ttl = 0; trg->deserialize = pb->deserialize; trg->durable_delete = false; + trg->on_locking_only = false; trg->async_heap_rec = true; // Ignored in sync commands. trg->respond_all_ops = false; // Not relevant for reads, since all reads return a result. @@ -2323,6 +2334,7 @@ as_operate_policy_copy_write( trg->read_touch_ttl_percent = pb->read_touch_ttl_percent; trg->deserialize = pb->deserialize; trg->durable_delete = src->durable_delete; + trg->on_locking_only = src->on_locking_only; trg->async_heap_rec = true; // Ignored in sync commands. trg->respond_all_ops = true; } @@ -2343,6 +2355,7 @@ as_apply_policy_copy( trg->commit_level = src->commit_level; trg->ttl = src->ttl; trg->durable_delete = src->durable_delete; + trg->on_locking_only = src->on_locking_only; } static void diff --git a/src/main/aerospike/aerospike_key.c b/src/main/aerospike/aerospike_key.c index 1854e912c..176b0810e 100644 --- a/src/main/aerospike/aerospike_key.c +++ b/src/main/aerospike/aerospike_key.c @@ -1,5 +1,5 @@ /* - * Copyright 2008-2024 Aerospike, Inc. + * Copyright 2008-2025 Aerospike, Inc. * * Portions may be licensed to Aerospike, Inc. under one or more contributor * license agreements. @@ -854,7 +854,7 @@ as_put_write(void* udata, uint8_t* buf) uint8_t* p = as_command_write_header_write(buf, &policy->base, policy->commit_level, policy->exists, policy->gen, rec->gen, ttl, put->tdata.n_fields, put->n_bins, - policy->durable_delete, 0, AS_MSG_INFO2_WRITE, 0); + policy->durable_delete, policy->on_locking_only, 0, AS_MSG_INFO2_WRITE, 0); p = as_command_write_key(p, &policy->base, policy->key, put->key, &put->tdata); p = as_command_write_filter(&policy->base, put->filter_size, p); @@ -1020,8 +1020,8 @@ aerospike_key_remove( uint8_t* buf = as_command_buffer_init(size); uint8_t* p = as_command_write_header_write(buf, &policy->base, policy->commit_level, - AS_POLICY_EXISTS_IGNORE, policy->gen, policy->generation, 0, tdata.n_fields, 0, - policy->durable_delete, 0, AS_MSG_INFO2_WRITE | AS_MSG_INFO2_DELETE, 0); + AS_POLICY_EXISTS_IGNORE, policy->gen, policy->generation, 0, tdata.n_fields, 0, + policy->durable_delete, false, 0, AS_MSG_INFO2_WRITE | AS_MSG_INFO2_DELETE, 0); p = as_command_write_key(p, &policy->base, policy->key, key, &tdata); p = as_command_write_filter(&policy->base, filter_size, p); @@ -1067,8 +1067,8 @@ aerospike_key_remove_async_ex( pipe_listener, size, as_event_command_parse_header, NULL, 0); uint8_t* p = as_command_write_header_write(cmd->buf, &policy->base, policy->commit_level, - AS_POLICY_EXISTS_IGNORE, policy->gen, policy->generation, 0, tdata.n_fields, 0, - policy->durable_delete, 0, AS_MSG_INFO2_WRITE | AS_MSG_INFO2_DELETE, 0); + AS_POLICY_EXISTS_IGNORE, policy->gen, policy->generation, 0, tdata.n_fields, 0, + policy->durable_delete, false, 0, AS_MSG_INFO2_WRITE | AS_MSG_INFO2_DELETE, 0); p = as_command_write_key(p, &policy->base, policy->key, key, &tdata); p = as_command_write_filter(&policy->base, filter_size, p); @@ -1222,8 +1222,8 @@ as_operate_write(void* udata, uint8_t* buf) uint8_t* p = as_command_write_header_write(buf, &policy->base, policy->commit_level, policy->exists, policy->gen, ops->gen, ttl, oper->tdata.n_fields, - oper->n_operations, policy->durable_delete, oper->read_attr, oper->write_attr, - oper->info_attr); + oper->n_operations, policy->durable_delete, policy->on_locking_only, oper->read_attr, + oper->write_attr, oper->info_attr); p = as_command_write_key(p, &policy->base, policy->key, oper->key, &oper->tdata); p = as_command_write_filter(&policy->base, oper->filter_size, p); @@ -1481,7 +1481,7 @@ as_apply_write(void* udata, uint8_t* buf) uint8_t* p = as_command_write_header_write(buf, &policy->base, policy->commit_level, 0, AS_POLICY_GEN_IGNORE, 0, policy->ttl, ap->tdata.n_fields, 0, policy->durable_delete, - ap->read_attr, AS_MSG_INFO2_WRITE, 0); + policy->on_locking_only, ap->read_attr, AS_MSG_INFO2_WRITE, 0); p = as_command_write_key(p, &policy->base, policy->key, ap->key, &ap->tdata); p = as_command_write_filter(&policy->base, ap->filter_size, p); diff --git a/src/main/aerospike/aerospike_query.c b/src/main/aerospike/aerospike_query.c index 548c79529..a0f226fe9 100644 --- a/src/main/aerospike/aerospike_query.c +++ b/src/main/aerospike/aerospike_query.c @@ -1,5 +1,5 @@ /* - * Copyright 2008-2024 Aerospike, Inc. + * Copyright 2008-2025 Aerospike, Inc. * * Portions may be licensed to Aerospike, Inc. under one or more contributor * license agreements. @@ -891,7 +891,7 @@ as_query_command_init( p = as_command_write_header_write(cmd, base_policy, write_policy->commit_level, write_policy->exists, AS_POLICY_GEN_IGNORE, 0, ttl, qb->n_fields, qb->n_ops, - write_policy->durable_delete, 0, AS_MSG_INFO2_WRITE, 0); + write_policy->durable_delete, false, 0, AS_MSG_INFO2_WRITE, 0); } else { // Background query with UDF. @@ -899,7 +899,7 @@ as_query_command_init( p = as_command_write_header_write(cmd, base_policy, write_policy->commit_level, write_policy->exists, AS_POLICY_GEN_IGNORE, 0, ttl, qb->n_fields, qb->n_ops, - write_policy->durable_delete, 0, AS_MSG_INFO2_WRITE, 0); + write_policy->durable_delete, false, 0, AS_MSG_INFO2_WRITE, 0); } // Write namespace. diff --git a/src/main/aerospike/aerospike_scan.c b/src/main/aerospike/aerospike_scan.c index a8adc6378..68b4e07fb 100644 --- a/src/main/aerospike/aerospike_scan.c +++ b/src/main/aerospike/aerospike_scan.c @@ -1,5 +1,5 @@ /* - * Copyright 2008-2024 Aerospike, Inc. + * Copyright 2008-2025 Aerospike, Inc. * * Portions may be licensed to Aerospike, Inc. under one or more contributor * license agreements. @@ -507,7 +507,7 @@ as_scan_command_init( p = as_command_write_header_write(cmd, &policy->base, AS_POLICY_COMMIT_LEVEL_ALL, AS_POLICY_EXISTS_IGNORE, AS_POLICY_GEN_IGNORE, 0, ttl, sb->n_fields, n_ops, - policy->durable_delete, 0, AS_MSG_INFO2_WRITE, 0); + policy->durable_delete, false, 0, AS_MSG_INFO2_WRITE, 0); } else if (scan->apply_each.function[0]) { // Background scan with UDF. @@ -515,7 +515,7 @@ as_scan_command_init( p = as_command_write_header_write(cmd, &policy->base, AS_POLICY_COMMIT_LEVEL_ALL, AS_POLICY_EXISTS_IGNORE, AS_POLICY_GEN_IGNORE, 0, ttl, sb->n_fields, n_ops, - policy->durable_delete, 0, AS_MSG_INFO2_WRITE, 0); + policy->durable_delete, false, 0, AS_MSG_INFO2_WRITE, 0); } else { // Foreground scan. diff --git a/src/main/aerospike/as_command.c b/src/main/aerospike/as_command.c index 4e7a3a265..bfff614d9 100644 --- a/src/main/aerospike/as_command.c +++ b/src/main/aerospike/as_command.c @@ -1,5 +1,5 @@ /* - * Copyright 2008-2024 Aerospike, Inc. + * Copyright 2008-2025 Aerospike, Inc. * * Portions may be licensed to Aerospike, Inc. under one or more contributor * license agreements. @@ -213,8 +213,8 @@ uint8_t* as_command_write_header_write( uint8_t* cmd, const as_policy_base* policy, as_policy_commit_level commit_level, as_policy_exists exists, as_policy_gen gen_policy, uint32_t gen, uint32_t ttl, - uint16_t n_fields, uint16_t n_bins, bool durable_delete, uint8_t read_attr, uint8_t write_attr, - uint8_t info_attr + uint16_t n_fields, uint16_t n_bins, bool durable_delete, bool on_locking_only, + uint8_t read_attr, uint8_t write_attr, uint8_t info_attr ) { switch (exists) { @@ -266,6 +266,8 @@ as_command_write_header_write( write_attr |= AS_MSG_INFO2_DURABLE_DELETE; } + uint8_t txn_attr = on_locking_only ? AS_MSG_INFO4_MRT_ON_LOCKING_ONLY : 0; + #if defined USE_XDR read_attr |= AS_MSG_INFO1_XDR; #endif @@ -274,7 +276,8 @@ as_command_write_header_write( cmd[9] = read_attr; cmd[10] = write_attr; cmd[11] = info_attr; - *(uint16_t*)&cmd[12] = 0; + cmd[12] = txn_attr; + cmd[13] = 0; *(uint32_t*)&cmd[14] = cf_swap_to_be32(generation); *(uint32_t*)&cmd[18] = cf_swap_to_be32(ttl); uint32_t timeout = as_command_server_timeout(policy);