Skip to content

Commit

Permalink
CLIENT-3250 Support config policy force_single_node for testing purpo…
Browse files Browse the repository at this point in the history
…ses.
  • Loading branch information
BrianNichols committed Jan 21, 2025
1 parent ac37be3 commit f17dfac
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 15 deletions.
13 changes: 12 additions & 1 deletion src/include/aerospike/as_config.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2008-2024 Aerospike, Inc.
* Copyright 2008-2025 Aerospike, Inc.
*
* Portions may be licensed to Aerospike, Inc. under one or more contributor
* license agreements.
Expand Down Expand Up @@ -701,6 +701,17 @@ typedef struct as_config_s {
*/
bool use_services_alternate;

/**
* For testing purposes only. Do not modify.
*
* Should the aerospike instance communicate with the first seed node only
* instead of using the data partition map to determine which node to send the
* database command.
*
* Default: false
*/
bool force_single_node;

/**
* Track server rack data. This field is useful when directing read commands to
* the server node that contains the key and exists on the same rack as the client.
Expand Down
111 changes: 100 additions & 11 deletions src/main/aerospike/as_cluster.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2008-2024 Aerospike, Inc.
* Copyright 2008-2025 Aerospike, Inc.
*
* Portions may be licensed to Aerospike, Inc. under one or more contributor
* license agreements.
Expand Down Expand Up @@ -56,7 +56,7 @@ as_status
as_node_refresh_peers(as_cluster* cluster, as_error* err, as_node* node, as_peers* peers);

as_status
as_node_refresh_partitions(as_cluster* cluster, as_error* err, as_node* node, as_peers* peers);
as_node_refresh_partitions(as_cluster* cluster, as_error* err, as_node* node);

as_status
as_node_refresh_racks(as_cluster* cluster, as_error* err, as_node* node);
Expand Down Expand Up @@ -985,7 +985,7 @@ as_cluster_tend(as_cluster* cluster, as_error* err, bool is_init)
// nodes to be dropped.
if (node->partition_changed && node->failures == 0 && node->active &&
(node->peers_count > 0 || peers.refresh_count == 1)) {
as_status status = as_node_refresh_partitions(cluster, &error_local, node, &peers);
as_status status = as_node_refresh_partitions(cluster, &error_local, node);

if (status != AEROSPIKE_OK) {
as_log_warn("Node %s partition refresh failed: %s %s",
Expand Down Expand Up @@ -1326,6 +1326,75 @@ as_cluster_set_max_socket_idle(as_cluster* cluster, uint32_t max_socket_idle_sec
}
}

static as_status
as_cluster_force_single_node(as_cluster* cluster, as_error* err)
{
// Validate first seed.
as_host* host = as_vector_get(cluster->seeds, 0);

as_address_iterator iter;
as_status status = as_lookup_host(&iter, err, host->name, host->port);

if (status != AEROSPIKE_OK) {
return status;
}

as_node_info node_info;
struct sockaddr* addr;
as_node* node = NULL;

while (as_lookup_next(&iter, &addr)) {
as_error_reset(err);
status = as_lookup_node(cluster, err, host, addr, true, &node_info);

if (status == AEROSPIKE_OK) {
node = as_node_create(cluster, &node_info);
break;
}
}
as_lookup_end(&iter);

if (! node) {
// Return last error.
return status;
}

cluster->n_partitions = 4096;

as_node_create_min_connections(node);

// Add seed and peer nodes to cluster.
as_vector nodes_to_add;
as_vector_inita(&nodes_to_add, sizeof(as_node*), 1);
as_vector_append(&nodes_to_add, &node);

as_cluster_add_nodes(cluster, &nodes_to_add);
as_vector_destroy(&nodes_to_add);

as_error_reset(err);
status = as_node_refresh_partitions(cluster, err, node);

if (status != AEROSPIKE_OK) {
return status;
}

// Set partition maps for all namespaces to point to same node.
for (uint32_t i = 0; i < cluster->partition_tables.size; i++) {
as_partition_table* table = cluster->partition_tables.tables[i];

for (uint32_t j = 0; j < table->size; j++) {
as_partition* part = &table->partitions[j];

for (uint32_t k = 0; k < AS_MAX_REPLICATION_FACTOR; k++) {
part->nodes[k] = node;
}
}
}

cluster->valid = true;
return AEROSPIKE_OK;
}

as_status
as_cluster_create(as_config* config, as_error* err, as_cluster** cluster_out)
{
Expand Down Expand Up @@ -1493,6 +1562,34 @@ as_cluster_create(as_config* config, as_error* err, as_cluster** cluster_out)
}
}

// Initialize metrics fields
cluster->metrics_enabled = false;
cluster->metrics_interval = 0;
cluster->metrics_latency_columns = 0;
cluster->metrics_latency_shift = 0;
cluster->command_count = 0;
cluster->retry_count = 0;
cluster->delay_queue_timeout_count = 0;

if (config->force_single_node) {
if (config->use_shm) {
as_cluster_destroy(cluster);
*cluster_out = 0;
return as_error_set_message(err, AEROSPIKE_ERR_CLIENT, "force_single_node does not support shared memory tending");
}

as_status status = as_cluster_force_single_node(cluster, err);

if (status != AEROSPIKE_OK) {
as_cluster_destroy(cluster);
*cluster_out = 0;
return status;
}

*cluster_out = cluster;
return status;
}

if (config->use_shm) {
// Create shared memory cluster.
as_status status = as_shm_create(cluster, err, config);
Expand Down Expand Up @@ -1530,14 +1627,6 @@ as_cluster_create(as_config* config, as_error* err, as_cluster** cluster_out)
pthread_attr_destroy(&attr);
}

// Initialize metrics fields
cluster->metrics_enabled = false;
cluster->metrics_interval = 0;
cluster->metrics_latency_columns = 0;
cluster->metrics_latency_shift = 0;
cluster->command_count = 0;
cluster->retry_count = 0;
cluster->delay_queue_timeout_count = 0;
*cluster_out = cluster;
return AEROSPIKE_OK;
}
Expand Down
3 changes: 2 additions & 1 deletion src/main/aerospike/as_config.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2008-2023 Aerospike, Inc.
* Copyright 2008-2025 Aerospike, Inc.
*
* Portions may be licensed to Aerospike, Inc. under one or more contributor
* license agreements.
Expand Down Expand Up @@ -57,6 +57,7 @@ as_config_init(as_config* c)
c->auth_mode = AS_AUTH_INTERNAL;
c->fail_if_not_connected = true;
c->use_services_alternate = false;
c->force_single_node = false;
c->rack_aware = false;
c->rack_id = 0;
c->rack_ids = NULL;
Expand Down
4 changes: 2 additions & 2 deletions src/main/aerospike/as_node.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2008-2024 Aerospike, Inc.
* Copyright 2008-2025 Aerospike, Inc.
*
* Portions may be licensed to Aerospike, Inc. under one or more contributor
* license agreements.
Expand Down Expand Up @@ -1212,7 +1212,7 @@ as_node_process_partitions(as_cluster* cluster, as_error* err, as_node* node, as
}

as_status
as_node_refresh_partitions(as_cluster* cluster, as_error* err, as_node* node, as_peers* peers)
as_node_refresh_partitions(as_cluster* cluster, as_error* err, as_node* node)
{
as_log_debug("Update partition map for node %s", as_node_get_address_string(node));

Expand Down

0 comments on commit f17dfac

Please sign in to comment.