Skip to content
Kevin Karpenske edited this page Aug 31, 2016 · 3 revisions

At its core, Amza is a collection of total-ordered, distributed, replicated logs. Replication of a log is achieved using the epidemic (gossip) protocol. Each log has an intrinsic elected leader which can be leveraged or ignored depending on the use case or desired data consistency. One of several different implementations of an in-memory or persistent index can be overlaid on top of each log to enable required use cases.

Clusters and members

An Amza cluster consists of one or more logical members (or nodes), each with a unique identifier. A member is an instance of the Amza process running on a physical host. In general, we recommend running a single member per host, but this is not a strict requirement. Each member additionally has a host:port attribute for addressing by other members of the cluster. Discovery of members and their host:port addresses (as well as host:port changes) is a configurable mechanism but is most readily achieved through use of a Routing-bird implementation such as that offered by Upena.

Additionally, each member can specify datacenter and rack attributes to provide insight into the location of the host’s physical hardware. The combination of datacenter and rack identifiers is commonly referred to as a zone. These identifiers are used to ensure high availability in the event of power, switch, or other zone failures.

Partitions and rings

All data in Amza is logically partitioned and assigned to a named ring. The size of the ring (usually 3) denotes the replication factor of its partitions. When using datacenter and rack attributes, replicas of a partition have guaranteed distribution across physical zones.

A partition is created with client-prescribed properties such as durability, consistency, index type, and compression. These properties can be modified at any time, but may require a compaction to take effect (see below).

Partitions and Rings

System partitions

All information used to comprise a cluster is stored in a series of system partitions. These partitions are first-class partitions that belong to a maximal system ring, of which all nodes are a member. They are backed by in-memory indexes (concurrent skip lists), but unlike data partitions they do not make use of the Delta WAL (see below).

The following is a comprehensive list of system partitions and their utility:

  • Node_index: Describes instances and their host:port attributes.
  • Ring_index: Describes rings and their members.
  • Region_index: Describes data partitions and whether they are disabled or deleted.
  • Region_properties: Describes the properties for each data partition.
  • Highwater_mark_index: Describes replication status. Non-replicated (local state).
  • Partition_version_index: Describes the logical version and stripe version of each partition for each replica member.
  • Aquarium_state_index: Describes the leadership state of partition members. Ephemeral (non-persistent).
  • Aquarium_liveliness_index: Describes the liveliness (interconnectivity and heartbeats) of nodes with respect to one another. Ephemeral (non-persistent).

Replication

Each partition is replicated using the epidemic (gossip) protocol among its ring members. The “factor” of gossip is generally dependent on the size of the ring, and is minimally the number of nodes required to achieve the configured write consistency. This number is generally referred to as the take-from-factor.

Replication is achieved via three steps:

  1. Long poll for changes.
  2. Take the changes.
  3. Acknowledge the changes.

Each node maintains an active connection to any neighboring nodes with which it shares one or more common rings. When a node receives new data, it immediately notifies a subset of its neighbors (based on take-from-factor) over the active connection, indicating the partition and highest available transaction ID. It continues to notify these neighbors at intervals until they ACK the change. If the change is not ACKed (unresponsive neighbors) enough times to achieve consistency within a configured timeout, the epidemic escalates to additional neighbors until consistency can be achieved. The epidemic diminishes to its normal state as designated neighbors once again become responsive.

The diagram below depicts a three-node ring with a take-from-factor of 1 in normal and escalated epidemic modes.

Replication

To take changes from a neighbor, a node requests all changes as of the last transaction ID observed from that neighbor. The “taken” node effectively replays its WAL to the taker from the requested transaction ID.

Because transaction IDs are internally sequenced, they are informative only of the node from which they were taken. As an optimization, any time a node takes changes from a neighbor, it writes a highwater mark for that neighbor describing the highest taken transaction ID onto its own WAL for that partition. These highwater marks are then delivered alongside normal changes, hinting to downstream takers that they have effectively taken changes from an upstream node to the given transaction ID. This behavior allows nodes to inherit highwaters very near to those of their immediate neighbors, avoiding excessive replays in the event of epidemic escalation.

When persisting taken changes, a node operates with the same behavior as a normal write, save one important difference: the version of the original payload is preserved (see “Writing to a partition” below). Once applied, the change then continues to propagate to downstream neighbors. Any taken changes that are outdated (or same-dated) are ignored, ending the cycle of propagation.

Durability

Each partition is configured with one of the following durability guarantees:

  • fsync_always: Every transaction requires an fsync before a success code is returned to the client.
  • fsync_async: A success code is returned to the client without an fsync, but a background fsync is requested. A background thread performs the fsync on a configured interval (default 1 second).
  • fsync_never: A success code is returned to the client without an fsync, and no fsync is requested.
  • ephemeral: Behaves the same as fsync_never, but additionally qualifies that data need not survive a restart of the instance (e.g. non-persistent caches).

Note: All transactions are appended to a shared log (known as the Delta WAL). If a transaction is written for “Partition A” at fsync_async, but a subsequent transaction is written for “Partition B” to the shared log at fsync_always before the background thread has run, then the background fsync request is deemed satisfied and no additional fsync is performed.

Consistency

In general, consistency is achieved by routing a write to a node, which then blocks until the change is taken and acknowledged by enough additional nodes.

Configured consistency is treated as a minimal requirement to ensure consistent repairs (e.g. in the event of node loss and recovery). Reads and writes can always be performed at stronger compatible consistency levels, as outlined below.

The following consistency levels are supported:

  • Leader
    • Client routing
      • Writes are routed to the elected node. No synchronous replication is performed.
      • Reads are routed to the elected node.
    • Leader election can only be achieved by taking fully from the most recent leader.
    • Compatibility
      • Writes: leader, leader_plus_one, leader_quorum, leader_all, write_all_read_one
      • Reads: leader, write_one_read_all
  • Leader_plus_one
    • Client routing
      • Writes are routed to the elected node. Each write is synchronously replicated to at least one other node.
      • Reads are routed to the elected node. If the elected node is unavailable then reads are routed to all other nodes and merged by the client.
    • Leader election is achieved by taking fully either from the most recent leader or from all other nodes.
    • Compatibility
      • Writes: leader_plus_one, leader_quorum, leader_all, write_all_read_one
      • Reads: leader, leader_plus_one, write_one_read_all
  • Leader_quorum
    • Client routing
      • Writes are routed to the elected node. Each write is synchronously replicated to enough additional nodes to constitute a quorum.
      • Reads are routed to the elected node. If the elected node is unavailable then reads are routed to a quorum of available nodes and merged by the client.
    • Leader election is achieved by taking fully either from the most recent leader or from enough nodes to constitute a quorum.
    • Compatibility
      • Writes: leader_quorum, leader_all, write_all_read_one
      • Reads: leader, leader_quorum, quorum, write_one_read_all
  • Leader_all
    • Client routing
      • Writes are routed to the elected node. Each write is synchronously replicated to all other nodes.
      • Reads are routed to the elected node. If the elected node is unavailable then reads are routed to any available node.
    • Leader election is unrestricted.
    • Compatibility
      • Writes: leader_all, write_all_read_one
      • Reads: (any)
  • Quorum
    • Client routing
      • Writes are routed to any available node. Each write is synchronously replicated to enough additional nodes to constitute a quorum.
      • Reads are routed to a quorum of available nodes and merged by the client.
    • Compatibility
      • Writes: leader_quorum, leader_all, quorum, write_all_read_one
      • Reads: quorum, write_one_read_all
  • Write_all_read_one
    • Client routing
      • Writes are routed any available node. Each write is synchronously replicated to all other nodes.
      • Reads are routed to any available node.
    • Compatibility
      • Writes: leader_all, write_all_read_one
      • Reads: (any)
  • Write_one_read_all
    • Client routing
      • Writes are routed to any available node. No synchronous replication is performed.
      • Reads are routed to all nodes and merged by the client.
    • Compatibility
      • Writes: (any)
      • Reads: write_one_read_all
  • None
    • Client routing
      • Writes are routed to any available node. No synchronous replication is performed.
      • Reads are routed to any available node.
    • Compatibility
      • Writes: (any)
      • Reads: (any)

Attempts to write to or read from a partition with an incompatible (weaker) consistency are turned away by the service. While neither safe nor encouraged, reads are explicitly permitted at consistency “none” should a client deliberately wish to ignore consistency requirements at read time.

Index type

Each partition is configured with an index type. Index support is extensible, where a custom implementation generally requires merge, commit, get, scan and compact semantics. The following indexes are supported by default:

  • LAB: Persistent log-structured merge (LSM) index. Persistent WAL.
  • BerkeleyDB: Persistent B-tree index. Persistent WAL.
  • Memory_persistent: In-memory index backed by a concurrent skip list. Persistent WAL.
  • Memory_ephemeral: In-memory index backed by a concurrent skip list. Ephemeral WAL.
  • Noop_persistent: No indexing. Persistent WAL.

The LAB index type is highly recommended for most use cases. The implementation is backed by Leaps-and-bounds.

Compression

Each partition is configured with a row type. Currently, the only supported row types are primary and snappy_primary. The snappy_primary rowtype enables Snappy compression of values in the WAL.

Writing to a partition

Writes are directed to a replica member for a given partition. Based on the consistency property for the partition, the member may block while awaiting replication (see above). If the required consistency cannot be achieved within a specified timeout, a failure code is returned to the client, indicating that the write could not be guaranteed and the client should reattempt. There is no abort/rollback behavior in the event of a timeout.

When a partition’s consistency property requires leadership (e.g. leader or leader_quorum), writes must be sent to the elected leader. Proper routing is handled by the client library. Attempts to write to a non-leading member are turned away.

A write consists of a transaction containing one or more payloads. The transaction is given a globally unique, internally ordered transaction ID generated via Ordered-ID. A payload consists of an optional prefix, a key-value pair, an optional timestamp, and a tombstone marker to signify deletion. If no timestamp is provided, the service applies an 8-byte timestamp using Unix time in milliseconds. Each payload is additionally given a globally unique 8-byte version (Ordered-ID) to disambiguate between timestamp collisions.

Writing to a partition 1

The write is initially appended to a shared Delta WAL, which is backed by an in-memory index (concurrent skip list). The timestamp and version of each payload are compared against the index (the Delta index backed by the partition’s configured index, if any), and those found to be outdated are discarded. Prefixed payloads are secondarily indexed by prefix, which can be used for prefixed log-order-takes (see “Reading from a partition” below).

When the Delta WAL reaches capacity (configurable, but generally around 1 million entries), a new Delta is opened and the older Delta is merged into the individual storage for each partition with changes. (For more information, see “Delta merge process” below.)

The diagram below depicts one possible write scenario in a six-node cluster with four discrete partitions.

Writing to a partition 1

Reading from a partition

Reads are directed to one or more replica members for a given partition based on its consistency property. This may require the client library to read from the elected leader or read from a quorum of nodes. Like the write path above, index-gets, index-scans, and log-order-takes overlay the Delta’s WAL and index for a partition on top of the partition’s individual backing storage.

The following operations are supported:

  • Get: Retrieve the payload for a specific key. Commonly referred to as a point get.
  • Scan: Retrieve a range of payloads ordered by the lexicographical sort of their keys. Optional lower and upper bounds are used to constrain the key range. Reverse scans are not currently supported.
  • Take: Retrieve a tail of log-ordered payloads, starting from the beginning of the log or an optionally specified cursor. The operation also returns a new cursor to allow the client to resume taking from where it left off. (Guaranteed-once queue semantics.)

Disk striping and rebalancing

Each Amza node is configured with one or more Delta WALs. It is generally recommended that one Delta WAL be configured for each physical disk available on the host machine. Each data partition is assigned to a single Delta WAL, and its individual backing storage (during Delta merge; see below) is written to the same volume.

When a node encounters a partition for which it has no Delta assignment, the partition is randomly assigned to a Delta with the most free space within a configurable threshold. At regular intervals, disk usage is evaluated across all Deltas and a rebalance is performed if data distribution is uneven outside of a threshold (also configurable). During the rebalance, data for one or more partitions is migrated to the stripe with the most available space. This work is performed as a normal compaction (see below).

After a data partition has been rebalanced from one stripe to another, its backing storage remains assigned to its original Delta WAL. If after any Delta merge there remain no pending updates in the Delta for the partition, it is then reassigned to the Delta for its rebalanced stripe.

Delta merge process

Each Delta is given a sequential identifier. When a Delta WAL reaches its configured capacity, a new Delta is opened with the incremental identifier, and the original undergoes the merge process.

During the merge process, a pool of workers process each partition that has updates in the Delta, appending their key-value transactions to their individual backing storage. After merging each partition, an end-of-merge marker containing the Delta’s identifier is appended to its backing WAL. After all partitions for the Delta have been fully merged, an fsync is performed for each modified backing storage. Once the fsync phase is complete, the Delta WAL is removed.

Reads and writes are permitted during the merge process. However, only two Deltas may exist for any given stripe. In other words, if a Delta merge is in progress and the newly opened Delta likewise reaches capacity, writes are turned away until the merge completes and the Delta is removed.

Validation and corruption detection

Whenever the Amza process is terminated, either during a routine stop or if killed unexpectedly, a validation procedure is performed on next startup. During this phase, if more than one Delta WAL exists for a given stripe, it is assumed that a merge was interrupted and the older Delta is immediately merged. Any partition with updates in the merging Delta must undergo a validation check, in which all transactions following its last valid end-of-merge marker are truncated from its backing storage. The transactions are then replayed and merged as usual from the Delta.

The most recent Delta for each stripe is loaded sequentially to reconstitute its in-memory index. If at any point a transaction is found to be corrupt, the Delta is truncated at the point of corruption. Given the fsync_always durability constraint, it can safely be assumed that any corruption necessarily followed the last fsync and therefore was never successfully acknowledged to the client. However, in the case of weaker durability requirements, one or more acknowledged writes may effectively be truncated.

In the case of fsync_async durability, the likelihood of data loss is diminished given that transactional highwaters for neighboring nodes are lagging indicators that themselves only flushed after the fsync_async background flush. In this way, nodes are guaranteed to retake transactions from their neighbors within fsync boundaries. Only in the event that a transaction is truncated due to corruption on every replica will data permanently be lost.

Compactions, deletions and TTLs

When deemed necessary, such as in response to deletions or TTLs, both system and data partitions undergo compaction in order to reduce their size on disk. During this process, the partition’s backing WAL is read sequentially and compared with its index (if any) before being rewritten to a new WAL. Any payload that is found to be outdated is omitted during the rewrite process. Additionally, a new index is composed if needed.

The current WAL and index exist in the active directory. During compaction, the new WAL and index are written to the compacting directory. During the compaction, the WAL remains open for reads and writes (in the form of Delta merges). Upon finalization, the backing storage is momentarily locked. The active directory is renamed to backup, the compacting directory is renamed to active, and an fsync is performed. Finally, the backup directory is deleted. The backing storage is then reopened for reads and writes.

In order to delete payloads from Amza, the key for the payload must be updated with the tombstone flag set to true. The timestamp and version of the tombstone undergo normal comparison and effectively mask older instances of the key. If a non-tombstone payload is subsequently written with a higher timestamp and version, the payload reappears as if newly created. Otherwise, tombstones remain in the WAL and index until they reach a configurable age, at which time they are eligible for omission during the partition’s next compaction.

Similar to tombstones, a data partition can be configured with a timestamp and/or version TTL, indicating that payloads whose age exceeds the desired limit are eligible for removal. Like tombstones, payloads that have exceeded the TTL are hidden at query time, but they are not physically removed until the next compaction.

Stripe and partition versioning

To handle the complete loss of a drive or node, all storage is versioned. Each Delta stripe is written with a globally unique Ordered-ID that identifies it in the event that mount points are reordered or renamed. Additionally, nodes assign each member partition a globally unique Ordered-ID that is used to locate its backing storage on disk, as well as to version its replication highwaters.

If a data partition cannot be found on its indicated stripe with the highest known version, the data is assumed lost. In this event, the node assigns the partition a new versioned Ordered-ID, and the node must retake all data for the partition either from the leader or from a quorum (depending on the partition’s consistency property) before it is opened for reads and writes.