Skip to content
This repository has been archived by the owner on Nov 10, 2022. It is now read-only.

Artery Design Ideas (new remoting) #16

Open
patriknw opened this issue Apr 18, 2016 · 1 comment
Open

Artery Design Ideas (new remoting) #16

patriknw opened this issue Apr 18, 2016 · 1 comment

Comments

@patriknw
Copy link
Member

patriknw commented Apr 18, 2016

A place to capture ideas for Artery, the new remoting.

@patriknw
Copy link
Member Author

patriknw commented Apr 28, 2016

Artery Design

Aeron

The following sections assume that we are using Aeron as underlying transport layer.

We considered Aeron vs. using plain UDP and here are the pros and cons for using Aeron.

Pros:

  • reliable delivery
  • focus on low latency (and high throughput), difficult to get latency right
  • hardened, optimized
  • good fit for Akka Streams
  • IPC and other transports
  • existing community including Martin, Todd
  • built in metrics, counters
  • built in test transport, dropping rate
  • reliable multicast, might be interesting for future usage, e.g. gossip, crdt dissemination
  • media driver as separate process (shared)
  • c++ implementation, possible Akka.Net interop
  • avoid NIH smell

Cons:

  • less control
  • wire format stability? Roadmap?
  • Todd confirmed that 1.0 is close and that they don’t intend to break wire compatibility after 1.0, but if it is changed it will be backwards compatible.
  • security?

Quarantine

After quarantine we must not accept messages from the quarantined node. That is important because we must not deliver messages from an actor after it has been declared as terminated, i.e. after the Terminated message. That is what we often refer to as “no-zombies policy”.

It is also important to not send messages to a quarantined node. Otherwise it will get gossip updates, etc.

Quarantined nodes are explicitly notified of being quarantined.

ActorSelection should pass through quarantined state to allow probing for a restarted system.

Quarantine is performed when:

  • remote death watch is triggered by remote failure detector
  • member is removed from cluster
  • system message buffer is full
  • system message can’t be delivered after long period of failed retries
  • Invalid system message acknowledgment is received

We still need the following existing configuration properties:

  • prune-quarantine-marker-after
  • quarantine-after-silence

Handshake

When opening an association we need a handshake to exchange uid. We need the uid of the destination system to be able to quarantine it and also to properly distinguish system message redelivery/ack information coming from different system incarnations.

When the destination system is restarted and we continue to send to that address a new handshake should be initiated, i.e. new inbound association must be detected by the restarted system.

Such handshake must trigger quarantine and DeathWatchNotification of the old destination incarnation in the sending system.
Gating

We should avoid the gating feature but it might be needed to protect against too frequent handshake attempts.

If we can’t establish the handshake for an outgoing connection it might make sense to gate that destination for a while, if the handshake attempt is a costly process.

Note that if the remote system initiates a handshake the gating, if any, on the other side must be removed and the handshake should be accepted.

System Messages

We must not drop system messages on the way, and if buffers get full we must quarantine.

Aeron delivery is reliable as long as the session is alive, very similar to a TCP connection. For long network partitions the session will timeout and be dropped, resulting in lost messages. This was confirmed and clarified by Martin Thompson.

Therefore we need acked delivery for system messages. It can take advantage of that the messages are ordered and a simple protocol would be:

  • sender has a buffer of sent, but unacknowledged messages
  • each system message has a contiguous sequence number
  • receiver keeps track of expected sequence number
  • receiver replies with ACK when receiving message with expected sequence number (ACKs can be aggregated if needed)
  • receiver replies with NACK including the expected number if it receives a higher sequence number than expected
  • receiver discards messages with lower sequence number than expected (resend duplicates)
  • sender resends when receiving NACK or after periodic timeout
  • sender removes from buffer when receiving ACK

We still need the following existing configuration properties:

  • system-message-buffer-size
  • system-message-resend-timeout

Security

There is an Aeron security discussion in ticket: aeron-io/aeron#203

One possible approach is that as part of the handshake negotiate a key with the remote system using a side-channel, using TCP+SSL and a Diffie-Hellman key-exchange (ECDH), then use symmetric encryption per sent message. The drawback is that with Aeron we have no access to the lowest level (just before packet hits UDP layer) and we can only encrypt whole messages (this leaks more information than doing this after the fragmenting/multiplexing layer).
Other option is to extend the Aeron media driver to have access to the necessary layer.

We would like to avoid any kind of algorithm negotiation, if algorithms used turns out to be weak, the remoting protocol should be updated instead (explicitly breaking compatibility). Key renegotiation is also not planned.

As a first approach, every encrypted message should have

  • Initialization Vector, securely random for each message
  • UID (needed for lookup for the right session key)
  • Encrypted part
    • AES-256, CBC, PKCS-5 padding (7 is not available in JDK8)
    • Additional sequence number against replay attacks. This results in state that must be kept around for all systems ever connected.
    • Ordinary remote envelope
  • HMAC of all the above
    • SHA-256

There is one drawback of the above setup is that the UID must be used to select the proper session key, before the HMAC has been verified (somewhat violating the Cryptographic Doom Principle ). No other operation is allowed to be performed on the message before the HMAC has been verified.
The above design is almost a 1-to-1 correspondence with IPSec Encapsuating Security Payload with the notable difference that the sequence number is also encrypted.

Notes:

  • To re-use our TLS work the channel needs to be bidirectional
  • We can use TLS as a side-channel though

Sub-channels

System messages / priority lanes

System messages should be delivered over a separate sub-channel, since they have different delivery semantics and should not be interfered by ordinary messages.

Heartbeat messages for remote and cluster failure detection should be delivered over a separate sub-channel. They are ordinary messages, but should not be interfered by ordinary messages. This sub-channel is selected by message type (PriorityMessage marker trait) but it is not available for users’ messages.

Large messages

It should be possible to send bulk data, large messages, over a separate sub-channel to avoid head of the line blocking. This sub-channel is selected based on configured destination path.

General

For ordinary messages not falling into the above categories we can use one or several sub-channels. If we use several they would be selected by hashing on the destination ref to preserve send order per sender-receiver pair.

It should be possible to configure buffer sizes and max message size separately for each sub-channel type.

Pruning of associations

Inactive associations should be pruned. This is important because each channel/stream makes use of a considerable amount of memory. It could be a two step pruning where in the first step a shallow placeholder for the association remains but other memory usage is released. In the second step the placeholder can be removed after longer idle timeout.

Protocol Format

MessageEnvelope:

  • version: Byte
  • serializerId: Int
  • serializerManifest: String
  • payload: Bytes
  • recipient: ActorRef
  • sender: ActorRef
  • senderUID
  • senderSystemAddress: Address (or host:port), to be able to reply to the sending system, e.g. initiate a new handshake
  • optional extra meta data, e.g. tracing data

This will be encoded by hand or with SBE (needs investigation).

Handshake:

All information is in the MessageEnvelope so we only need an empty message type for the handshake. Same can be used for both request and response.

Another message type is needed for initiating a new handshake when receiving message from unknown uid.

ActorRef compression

We’ll find the most commonly communicated-with Actors (likely to use approximate data structure like Count-Min-Sketch), and the receiving end will advertise a mapping for that actor path to an identifier that it creates and stores locally in a table.

Wire format draft: is to allow either encoding ActorRef as number or the path string. That field is prefixed with a marker in which mode we are, e.g. [0][full/path/to/actor] or [1][42].

If the receiving node crashes, and gets messages from someone it must initiate a fresh handshake anyway, we piggyback onto the handshake to also mean “invalidate table”, as the new receiving end does not have a table prepared yet. We of course detect and see if a compressed ref came in and does not match the table, so we could log warnings too.

Configuration

Sane default values that should work for most users without changes.

We should try to have a few high level settings for choosing between different trade-offs, e.g. low latency vs. cpu usage. Low level settings should be derived from these high level settings.

Stable Protocol

We need a stable protocol to support rolling upgrades involving Akka version updates.

All Java serialization should be converted to protobuf serialization to make it easier to evolve the schema. We can include the new serializers in Akka 2.4.x, but not have them enabled by default. In Akka 3.0 they will be enabled by default.

Regression tests are needed.

Performance and Scalability

We aim for a solution that can handle high throughput, low latency, and good enough scalability.

Goals (given fast network, and good servers such as EC2 m4.4xlarge):

  • Maximum throughput between two nodes:
    • 1,000,000 msg/s of size 100 bytes, byte-array serialization
    • 10,000 msg/s of size 10,000 bytes, byte-array serialization
  • Round-trip latency between two nodes:
    • < 1 ms at 95%ile for message rate of 1000 msg/s
  • One node should be able to handle 1000 active connections
  • Aggregated ingress/egress throughput from/to 100 nodes should not be less than for the one-to-one case

Overview of Stream Stages

artery_stages3

Out of Scope

  • transport failure detection
  • pluggable transports apart from what is possible with Aeron (e.g. Aeron over shared memory)
  • multiple transports per ActorSystem

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant