Skip to content

Commit

Permalink
CLIENT-2720 Include all partition unavailable errors in the scan/quer…
Browse files Browse the repository at this point in the history
…y sub-error list.
  • Loading branch information
BrianNichols committed Jan 3, 2024
1 parent 87ebf03 commit 15450d5
Showing 2 changed files with 76 additions and 42 deletions.
13 changes: 3 additions & 10 deletions src/include/aerospike/as_partition_tracker.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2008-2023 Aerospike, Inc.
* Copyright 2008-2024 Aerospike, Inc.
*
* Portions may be licensed to Aerospike, Inc. under one or more contributor
* license agreements.
@@ -101,17 +101,10 @@ as_partition_tracker_assign(
as_partition_tracker* pt, struct as_cluster_s* cluster, const char* ns, struct as_error_s* err
);

static inline void
void
as_partition_tracker_part_unavailable(
as_partition_tracker* pt, as_node_partitions* np, uint32_t part_id
)
{
as_partitions_status* ps = pt->parts_all;
as_partition_status* p = &ps->parts[part_id - ps->part_begin];
p->retry = true;
p->replica_index++;
np->parts_unavailable++;
}
);

static inline void
as_partition_tracker_set_digest(
105 changes: 73 additions & 32 deletions src/main/aerospike/as_partition_tracker.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2008-2023 Aerospike, Inc.
* Copyright 2008-2024 Aerospike, Inc.
*
* Portions may be licensed to Aerospike, Inc. under one or more contributor
* license agreements.
@@ -19,9 +19,20 @@
#include <aerospike/as_shm_cluster.h>
#include <aerospike/as_string_builder.h>

/******************************************************************************
* Static Functions
*****************************************************************************/
//---------------------------------
// Types
//---------------------------------

typedef struct {
char node_address[AS_IP_ADDRESS_SIZE];
as_status status;
uint32_t iteration;
uint32_t part_id;
} query_error;

//---------------------------------
// Static Functions
//---------------------------------

static as_partitions_status*
parts_create(uint16_t part_begin, uint16_t part_count, const as_digest* digest)
@@ -157,6 +168,25 @@ assign_partition(as_partition_tracker* pt, as_partition_status* ps, as_node* nod
}
}

static void
add_error(as_partition_tracker* pt, as_node* node, as_status status, uint32_t part_id)
{
query_error e;
as_strncpy(e.node_address, as_node_get_address_string(node), sizeof(e.node_address));
e.status = status;
e.iteration = pt->iteration;
e.part_id = part_id;

// Multiple scan/query threads may call this function, so error
// list must be modified under lock.
pthread_mutex_lock(&pt->lock);
if (!pt->errors) {
pt->errors = as_vector_create(sizeof(query_error), 32);
}
as_vector_append(pt->errors, &e);
pthread_mutex_unlock(&pt->lock);
}

static void
mark_retry_on_error(as_partition_tracker* pt, as_node_partitions* np)
{
@@ -212,9 +242,9 @@ release_node_partitions(as_vector* list)
}
}

/******************************************************************************
* Functions
*****************************************************************************/
//---------------------------------
// Functions
//---------------------------------

void
as_partition_tracker_init_nodes(
@@ -403,6 +433,19 @@ as_partition_tracker_assign(
return AEROSPIKE_OK;
}

void
as_partition_tracker_part_unavailable(
as_partition_tracker* pt, as_node_partitions* np, uint32_t part_id
)
{
as_partitions_status* ps = pt->parts_all;
as_partition_status* p = &ps->parts[part_id - ps->part_begin];
p->retry = true;
p->replica_index++;
np->parts_unavailable++;
add_error(pt, np->node, AEROSPIKE_ERR_CLUSTER, part_id);
}

as_status
as_partition_tracker_is_complete(as_partition_tracker* pt, as_cluster* cluster, as_error* err)
{
@@ -476,16 +519,10 @@ as_partition_tracker_is_complete(as_partition_tracker* pt, as_cluster* cluster,

// Check if limits have been reached.
if (pt->iteration > pt->max_retries) {
if (!pt->errors || pt->errors->size <= 0) {
// The only retryable errors that are not added to the errors list is
// AEROSPIKE_ERR_CLUSTER (ie some partition(s) are unavailable).
return as_error_set_message(err, AEROSPIKE_ERR_CLUSTER, "Partition(s) unavailable");
}

// Return last sub-error code received.
uint32_t max = pt->errors->size;
as_status last_code = *(as_status*)as_vector_get(pt->errors, max - 1);
as_error_set_message(err, last_code, "");
query_error* last_error = as_vector_get(pt->errors, max - 1);
as_error_set_message(err, last_error->status, "");

// Include all sub-errors in error message.
as_string_builder sb;
@@ -495,11 +532,23 @@ as_partition_tracker_is_complete(as_partition_tracker* pt, as_cluster* cluster,
as_string_builder_append(&sb, "sub-errors:");

for (uint32_t i = 0; i < max; i++) {
as_status st = *(as_status*)as_vector_get(pt->errors, i);
query_error* qe = as_vector_get(pt->errors, i);
as_string_builder_append_newline(&sb);
as_string_builder_append_int(&sb, st);
as_string_builder_append_char(&sb, ' ');
as_string_builder_append(&sb, as_error_string(st));
as_string_builder_append_int(&sb, qe->status);
as_string_builder_append_char(&sb, ',');
as_string_builder_append_uint(&sb, qe->iteration);
as_string_builder_append_char(&sb, ',');
as_string_builder_append(&sb, qe->node_address);
as_string_builder_append_char(&sb, ',');

if (qe->status == AEROSPIKE_ERR_CLUSTER) {
as_string_builder_append(&sb, "Partition ");
as_string_builder_append_uint(&sb, qe->part_id);
as_string_builder_append(&sb, " unavailable");
}
else {
as_string_builder_append(&sb, as_error_string(qe->status));
}
}
return err->code;
}
@@ -545,15 +594,7 @@ as_partition_tracker_should_retry(
case AEROSPIKE_ERR_TIMEOUT:
case AEROSPIKE_ERR_INDEX_NOT_FOUND:
case AEROSPIKE_ERR_INDEX_NOT_READABLE:
// Multiple scan/query threads may call this function, so error
// list must be modified under lock.
pthread_mutex_lock(&pt->lock);
if (!pt->errors) {
pt->errors = as_vector_create(sizeof(as_status), 10);
}
as_vector_append(pt->errors, &status);
pthread_mutex_unlock(&pt->lock);

add_error(pt, np->node, status, 0);
mark_retry_on_error(pt, np);
np->parts_unavailable = np->parts_full.size + np->parts_partial.size;
return true;
@@ -566,13 +607,13 @@ as_partition_tracker_should_retry(
void
as_partition_tracker_destroy(as_partition_tracker* pt)
{
release_node_partitions(&pt->node_parts);
as_vector_destroy(&pt->node_parts);
as_partitions_status_release(pt->parts_all);

if (pt->errors) {
as_vector_destroy(pt->errors);
pt->errors = NULL;
}

release_node_partitions(&pt->node_parts);
as_vector_destroy(&pt->node_parts);
as_partitions_status_release(pt->parts_all);
pthread_mutex_destroy(&pt->lock);
}

0 comments on commit 15450d5

Please sign in to comment.