From 15450d50e110b5b5fcb5147917bf795c75eba7ae Mon Sep 17 00:00:00 2001 From: Brian Nichols Date: Wed, 3 Jan 2024 17:30:31 -0500 Subject: [PATCH] CLIENT-2720 Include all partition unavailable errors in the scan/query sub-error list. --- src/include/aerospike/as_partition_tracker.h | 13 +-- src/main/aerospike/as_partition_tracker.c | 105 +++++++++++++------ 2 files changed, 76 insertions(+), 42 deletions(-) diff --git a/src/include/aerospike/as_partition_tracker.h b/src/include/aerospike/as_partition_tracker.h index d7edddf5c..76a5372b0 100644 --- a/src/include/aerospike/as_partition_tracker.h +++ b/src/include/aerospike/as_partition_tracker.h @@ -1,5 +1,5 @@ /* - * Copyright 2008-2023 Aerospike, Inc. + * Copyright 2008-2024 Aerospike, Inc. * * Portions may be licensed to Aerospike, Inc. under one or more contributor * license agreements. @@ -101,17 +101,10 @@ as_partition_tracker_assign( as_partition_tracker* pt, struct as_cluster_s* cluster, const char* ns, struct as_error_s* err ); -static inline void +void as_partition_tracker_part_unavailable( as_partition_tracker* pt, as_node_partitions* np, uint32_t part_id - ) -{ - as_partitions_status* ps = pt->parts_all; - as_partition_status* p = &ps->parts[part_id - ps->part_begin]; - p->retry = true; - p->replica_index++; - np->parts_unavailable++; -} + ); static inline void as_partition_tracker_set_digest( diff --git a/src/main/aerospike/as_partition_tracker.c b/src/main/aerospike/as_partition_tracker.c index fa414e4ea..8e4c31dd5 100644 --- a/src/main/aerospike/as_partition_tracker.c +++ b/src/main/aerospike/as_partition_tracker.c @@ -1,5 +1,5 @@ /* - * Copyright 2008-2023 Aerospike, Inc. + * Copyright 2008-2024 Aerospike, Inc. * * Portions may be licensed to Aerospike, Inc. under one or more contributor * license agreements. @@ -19,9 +19,20 @@ #include #include -/****************************************************************************** - * Static Functions - *****************************************************************************/ +//--------------------------------- +// Types +//--------------------------------- + +typedef struct { + char node_address[AS_IP_ADDRESS_SIZE]; + as_status status; + uint32_t iteration; + uint32_t part_id; +} query_error; + +//--------------------------------- +// Static Functions +//--------------------------------- static as_partitions_status* parts_create(uint16_t part_begin, uint16_t part_count, const as_digest* digest) @@ -157,6 +168,25 @@ assign_partition(as_partition_tracker* pt, as_partition_status* ps, as_node* nod } } +static void +add_error(as_partition_tracker* pt, as_node* node, as_status status, uint32_t part_id) +{ + query_error e; + as_strncpy(e.node_address, as_node_get_address_string(node), sizeof(e.node_address)); + e.status = status; + e.iteration = pt->iteration; + e.part_id = part_id; + + // Multiple scan/query threads may call this function, so error + // list must be modified under lock. + pthread_mutex_lock(&pt->lock); + if (!pt->errors) { + pt->errors = as_vector_create(sizeof(query_error), 32); + } + as_vector_append(pt->errors, &e); + pthread_mutex_unlock(&pt->lock); +} + static void mark_retry_on_error(as_partition_tracker* pt, as_node_partitions* np) { @@ -212,9 +242,9 @@ release_node_partitions(as_vector* list) } } -/****************************************************************************** - * Functions - *****************************************************************************/ +//--------------------------------- +// Functions +//--------------------------------- void as_partition_tracker_init_nodes( @@ -403,6 +433,19 @@ as_partition_tracker_assign( return AEROSPIKE_OK; } +void +as_partition_tracker_part_unavailable( + as_partition_tracker* pt, as_node_partitions* np, uint32_t part_id + ) +{ + as_partitions_status* ps = pt->parts_all; + as_partition_status* p = &ps->parts[part_id - ps->part_begin]; + p->retry = true; + p->replica_index++; + np->parts_unavailable++; + add_error(pt, np->node, AEROSPIKE_ERR_CLUSTER, part_id); +} + as_status as_partition_tracker_is_complete(as_partition_tracker* pt, as_cluster* cluster, as_error* err) { @@ -476,16 +519,10 @@ as_partition_tracker_is_complete(as_partition_tracker* pt, as_cluster* cluster, // Check if limits have been reached. if (pt->iteration > pt->max_retries) { - if (!pt->errors || pt->errors->size <= 0) { - // The only retryable errors that are not added to the errors list is - // AEROSPIKE_ERR_CLUSTER (ie some partition(s) are unavailable). - return as_error_set_message(err, AEROSPIKE_ERR_CLUSTER, "Partition(s) unavailable"); - } - // Return last sub-error code received. uint32_t max = pt->errors->size; - as_status last_code = *(as_status*)as_vector_get(pt->errors, max - 1); - as_error_set_message(err, last_code, ""); + query_error* last_error = as_vector_get(pt->errors, max - 1); + as_error_set_message(err, last_error->status, ""); // Include all sub-errors in error message. as_string_builder sb; @@ -495,11 +532,23 @@ as_partition_tracker_is_complete(as_partition_tracker* pt, as_cluster* cluster, as_string_builder_append(&sb, "sub-errors:"); for (uint32_t i = 0; i < max; i++) { - as_status st = *(as_status*)as_vector_get(pt->errors, i); + query_error* qe = as_vector_get(pt->errors, i); as_string_builder_append_newline(&sb); - as_string_builder_append_int(&sb, st); - as_string_builder_append_char(&sb, ' '); - as_string_builder_append(&sb, as_error_string(st)); + as_string_builder_append_int(&sb, qe->status); + as_string_builder_append_char(&sb, ','); + as_string_builder_append_uint(&sb, qe->iteration); + as_string_builder_append_char(&sb, ','); + as_string_builder_append(&sb, qe->node_address); + as_string_builder_append_char(&sb, ','); + + if (qe->status == AEROSPIKE_ERR_CLUSTER) { + as_string_builder_append(&sb, "Partition "); + as_string_builder_append_uint(&sb, qe->part_id); + as_string_builder_append(&sb, " unavailable"); + } + else { + as_string_builder_append(&sb, as_error_string(qe->status)); + } } return err->code; } @@ -545,15 +594,7 @@ as_partition_tracker_should_retry( case AEROSPIKE_ERR_TIMEOUT: case AEROSPIKE_ERR_INDEX_NOT_FOUND: case AEROSPIKE_ERR_INDEX_NOT_READABLE: - // Multiple scan/query threads may call this function, so error - // list must be modified under lock. - pthread_mutex_lock(&pt->lock); - if (!pt->errors) { - pt->errors = as_vector_create(sizeof(as_status), 10); - } - as_vector_append(pt->errors, &status); - pthread_mutex_unlock(&pt->lock); - + add_error(pt, np->node, status, 0); mark_retry_on_error(pt, np); np->parts_unavailable = np->parts_full.size + np->parts_partial.size; return true; @@ -566,13 +607,13 @@ as_partition_tracker_should_retry( void as_partition_tracker_destroy(as_partition_tracker* pt) { - release_node_partitions(&pt->node_parts); - as_vector_destroy(&pt->node_parts); - as_partitions_status_release(pt->parts_all); - if (pt->errors) { as_vector_destroy(pt->errors); pt->errors = NULL; } + + release_node_partitions(&pt->node_parts); + as_vector_destroy(&pt->node_parts); + as_partitions_status_release(pt->parts_all); pthread_mutex_destroy(&pt->lock); }