Skip to content

Commit

Permalink
Cleaned up nio ssl to improve performance at high rates.
Browse files Browse the repository at this point in the history
  • Loading branch information
v-arun committed Jun 18, 2016
1 parent 84f7e29 commit 1dfff4d
Show file tree
Hide file tree
Showing 10 changed files with 345 additions and 153 deletions.
8 changes: 8 additions & 0 deletions javadoc.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<project default="javadoc">
<target name="javadoc">
<javadoc access="public" author="true" classpath="/Users/arun/gigapaxos/lib/zookeeper-3.3.6.jar:/Users/arun/gigapaxos/lib/mchange-commons-java-0.2.9.jar:/Users/arun/gigapaxos/lib/derby.jar:/Users/arun/gigapaxos/lib/junit-4.12.jar:/Users/arun/gigapaxos/lib/c3p0-0.9.5.jar:/Users/arun/gigapaxos/lib/hamcrest-all-1.3.jar:/Users/arun/gigapaxos/lib/json-smart-1.2.jar:/Users/arun/gigapaxos/lib/mapdb-2.0.0-20151103.120435-149.jar:/Users/arun/gigapaxos/lib/log4j-1.2.16.jar:/Users/arun/gigapaxos/build/classes" destdir="/Users/arun/gigapaxos/doc" nodeprecated="false" nodeprecatedlist="false" noindex="false" nonavbar="false" notree="false" packagenames="edu.umass.cs.reconfiguration.examples.noopsimple,edu.umass.cs.gigapaxos.interfaces,edu.umass.cs.gigapaxos.examples.noop" source="1.8" sourcefiles="/Users/arun/gigapaxos/src/edu/umass/cs/gigapaxos/PaxosManager.java,/Users/arun/gigapaxos/src/edu/umass/cs/reconfiguration/reconfigurationpackets/DefaultAppRequest.java,/Users/arun/gigapaxos/src/edu/umass/cs/reconfiguration/examples/NoopAppClient.java,/Users/arun/gigapaxos/src/edu/umass/cs/nio/AbstractPacketDemultiplexer.java,/Users/arun/gigapaxos/src/edu/umass/cs/reconfiguration/reconfigurationpackets/CreateServiceName.java,/Users/arun/gigapaxos/src/edu/umass/cs/reconfiguration/interfaces/ReplicableRequest.java,/Users/arun/gigapaxos/src/edu/umass/cs/nio/nioutils/RTTEstimator.java,/Users/arun/gigapaxos/src/edu/umass/cs/reconfiguration/PaxosReplicaCoordinator.java,/Users/arun/gigapaxos/src/edu/umass/cs/nio/interfaces/PacketDemultiplexer.java,/Users/arun/gigapaxos/src/edu/umass/cs/reconfiguration/reconfigurationpackets/RequestActiveReplicas.java,/Users/arun/gigapaxos/src/edu/umass/cs/gigapaxos/paxospackets/RequestPacket.java,/Users/arun/gigapaxos/src/edu/umass/cs/reconfiguration/reconfigurationpackets/ClientReconfigurationPacket.java,/Users/arun/gigapaxos/src/edu/umass/cs/reconfiguration/reconfigurationutils/AbstractDemandProfile.java,/Users/arun/gigapaxos/src/edu/umass/cs/reconfiguration/ReconfigurableAppClientAsync.java,/Users/arun/gigapaxos/src/edu/umass/cs/gigapaxos/PaxosConfig.java,/Users/arun/gigapaxos/src/edu/umass/cs/reconfiguration/reconfigurationutils/DemandProfile.java,/Users/arun/gigapaxos/src/edu/umass/cs/reconfiguration/interfaces/ReconfigurableRequest.java,/Users/arun/gigapaxos/src/edu/umass/cs/gigapaxos/PaxosServer.java,/Users/arun/gigapaxos/src/edu/umass/cs/reconfiguration/ReconfigurableNode.java,/Users/arun/gigapaxos/src/edu/umass/cs/nio/interfaces/Byteable.java,/Users/arun/gigapaxos/src/edu/umass/cs/reconfiguration/reconfigurationpackets/ActiveReplicaError.java,/Users/arun/gigapaxos/src/edu/umass/cs/gigapaxos/PaxosClientAsync.java,/Users/arun/gigapaxos/src/edu/umass/cs/reconfiguration/interfaces/Reconfigurable.java,/Users/arun/gigapaxos/src/edu/umass/cs/reconfiguration/interfaces/ReplicaCoordinator.java,/Users/arun/gigapaxos/src/edu/umass/cs/nio/interfaces/IntegerPacketType.java,/Users/arun/gigapaxos/src/edu/umass/cs/reconfiguration/reconfigurationpackets/DeleteServiceName.java,/Users/arun/gigapaxos/src/edu/umass/cs/reconfiguration/interfaces/Repliconfigurable.java,/Users/arun/gigapaxos/src/edu/umass/cs/gigapaxos/paxosutil/LargeCheckpointer.java,/Users/arun/gigapaxos/src/edu/umass/cs/reconfiguration/examples/AppRequest.java,/Users/arun/gigapaxos/src/edu/umass/cs/gigapaxos/paxosutil/E2ELatencyAwareRedirector.java,/Users/arun/gigapaxos/src/edu/umass/cs/reconfiguration/reconfigurationpackets/ReconfigurationPacket.java" sourcepath="/Users/arun/gigapaxos/src" splitindex="true" use="true" version="true">
<link href="https://docs.oracle.com/javase/8/docs/api/"/>
</javadoc>
</target>
</project>
204 changes: 140 additions & 64 deletions src/edu/umass/cs/nio/AbstractNIOSSL.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,27 @@

import sun.misc.Cleaner;
import sun.nio.ch.DirectBuffer;
import edu.umass.cs.utils.MyLogger;
import edu.umass.cs.nio.nioutils.NIOInstrumenter;
import edu.umass.cs.utils.Util;

/**
* @author arun
*
*/
public abstract class AbstractNIOSSL implements Runnable {
private final static int MAX_BUFFER_SIZE = 2048 * 1024;
private final static int MAX_DST_BUFFER_SIZE = 2 * MAX_BUFFER_SIZE;
private final static int DEFAULT_BUFFER_SIZE = NIOTransport.WRITE_BUFFER_SIZE;
/**
* There isn't much benefit to increasing the buffer size and it does
* introduce a speed bump, so best to leave MAX_FACTOR at 1, effectively
* disabling dynamic buffer expansion.
*/
private final static int MAX_FACTOR = 1;
private final static int MAX_BUFFER_SIZE = MAX_FACTOR
* NIOTransport.WRITE_BUFFER_SIZE;
private final static int MAX_DST_BUFFER_SIZE = MAX_FACTOR
* NIOTransport.WRITE_BUFFER_SIZE;

// final
ByteBuffer wrapSrc, unwrapSrc;
// final
ByteBuffer wrapDst, unwrapDst;

final SSLEngine engine;
Expand All @@ -54,16 +62,14 @@ public abstract class AbstractNIOSSL implements Runnable {
/**
* @param key
* @param engine
* @param bufferSize
* @param taskWorkers
*/
public AbstractNIOSSL(SelectionKey key, SSLEngine engine, int bufferSize,
public AbstractNIOSSL(SelectionKey key, SSLEngine engine,
Executor taskWorkers) {
this.wrapSrc = ByteBuffer.allocateDirect(bufferSize);
this.wrapDst = ByteBuffer.allocateDirect(bufferSize);
this.unwrapSrc = ByteBuffer.allocateDirect(bufferSize);
this.unwrapDst = ByteBuffer.allocateDirect(bufferSize);
// this.unwrapSrc.limit(0);
this.wrapSrc = ByteBuffer.allocateDirect(DEFAULT_BUFFER_SIZE);
this.wrapDst = ByteBuffer.allocateDirect(DEFAULT_BUFFER_SIZE);
this.unwrapSrc = ByteBuffer.allocateDirect(DEFAULT_BUFFER_SIZE);
this.unwrapDst = ByteBuffer.allocateDirect(DEFAULT_BUFFER_SIZE);
this.engine = engine;
this.taskWorkers = taskWorkers;
this.key = key;
Expand Down Expand Up @@ -103,11 +109,13 @@ public AbstractNIOSSL(SelectionKey key, SSLEngine engine, int bufferSize,
*/
public synchronized void nioSend(final ByteBuffer unencrypted) {
try {
wrapSrc.put(unencrypted);
Util.put(wrapSrc, unencrypted);
// wrapSrc.put(unencrypted);
} catch (BufferOverflowException boe) {
// will never come here
wrapSrc = getBiggerBuffer(wrapSrc, unencrypted);
log.log(Level.INFO, MyLogger.FORMAT[1], new Object[] {
"Increased wrapSrc buffer size to ", wrapSrc.capacity() });
log.log(Level.INFO, "{0} increased wrapSrc buffer size to {1}",
new Object[] { this, wrapSrc.capacity() });
}
run();
}
Expand All @@ -118,24 +126,37 @@ public synchronized void nioSend(final ByteBuffer unencrypted) {
* @param encrypted
*/
public synchronized void notifyReceived(ByteBuffer encrypted) {
int original = encrypted.remaining();
try {
unwrapSrc.put(encrypted);
Util.put(unwrapSrc, encrypted);
// unwrapSrc.put(encrypted);
} catch (BufferOverflowException boe) {
// try increasing buffer
// will never come here
unwrapSrc = getBiggerBuffer(unwrapSrc, encrypted);
log.log(Level.FINE, MyLogger.FORMAT[1],
new Object[] { "Increased unwrapSrc buffer size to ",
unwrapSrc.capacity() });
log.log(Level.FINE, "{0} increased unwrapSrc buffer size to {1}",
new Object[] { this, unwrapSrc.capacity() });

}
NIOInstrumenter.incrEncrBytesRcvd(original - encrypted.remaining());
run();
}

// trying to put buf2 into buf1
/**
* Trying to put buf2 into buf1 by creating a new buffer with enough space.
* Using this method seems to cause nio to get stuck at high sending rates.
* Unclear why. This method currently does not get invoked during runtime.
*
* @param buf1
* @param buf2
* @return
*/
@Deprecated
private ByteBuffer getBiggerBuffer(ByteBuffer buf1, ByteBuffer buf2) {
int biggerSize = buf1.position() + buf2.remaining();
if (biggerSize > MAX_BUFFER_SIZE) {
log.warning("Maximum allowed buffer size limit reached");
log.log(Level.WARNING,
"{0} reached maximum allowed buffer size limit",
new Object[] { this });
throw new BufferOverflowException();
}
ByteBuffer biggerBuf = ByteBuffer.allocate(biggerSize);
Expand All @@ -147,6 +168,13 @@ private ByteBuffer getBiggerBuffer(ByteBuffer buf1, ByteBuffer buf2) {

}

private static ByteBuffer getBiggerBuffer(ByteBuffer buf, int size) {
ByteBuffer b = ByteBuffer.allocate(size);
buf.flip();
b.put(buf);
return b;
}

public synchronized void run() {
// executes non-blocking tasks on the IO-Worker
while (this.step())
Expand Down Expand Up @@ -184,12 +212,11 @@ private boolean step() {
@Override
public void run() {
try {
log.log(Level.FINEST, MyLogger.FORMAT[1], new Object[] {
"async SSL task: ", sslTask });
long t0 = System.nanoTime();
sslTask.run();
log.log(Level.FINE, "async SSL task {0} took {1}ms",
new Object[] { sslTask,
log.log(Level.FINE,
"{0} async SSL task {1} took {2}ms",
new Object[] { this, sslTask,
(System.nanoTime() - t0) / 1000000 });

// continue handling I/O
Expand Down Expand Up @@ -226,34 +253,34 @@ private synchronized boolean wrap() {

switch (wrapResult.getStatus()) {
case OK:
if (wrapDst.position() > 0) {
wrapDst.flip();
this.onOutboundData(wrapDst);
wrapDst.compact();
}
if (wrapDst.position() > 0)
this.drainOutbound();
break;

case BUFFER_UNDERFLOW:
log.fine("wrap BUFFER_UNDERFLOW");
log.log(Level.FINE, "{0} wrap BUFFER_UNDERFLOW",
new Object[] { this });
// try again later
break;

case BUFFER_OVERFLOW:
log.warning("wrap BUFFER_OVERFLOW: Wrapped data is coming faster than the network can send it out.");
// Could attempt to drain the dst buffer of any already obtained
// data, but we'll just increase it to the size needed.
log.log(Level.INFO,
"{0} wrap BUFFER_OVERFLOW: Wrapped data is coming faster than the network can send it out.",
new Object[] { this });
int biggerSize = engine.getSession().getApplicationBufferSize()
+ wrapDst.capacity();
if (biggerSize > MAX_DST_BUFFER_SIZE)
throw new IllegalStateException("failed to wrap");
ByteBuffer b = ByteBuffer.allocate(biggerSize);
wrapDst.flip();
b.put(wrapDst);
wrapDst = b;
log.log(Level.FINE, MyLogger.FORMAT[0],
new Object[] { "Increased wrapDst buffer size to "
+ wrapDst.capacity() });
// retry the operation.
// throw new IllegalStateException("failed to wrap")
;
// try increasing buffer size up to maximum limit
if (biggerSize < MAX_DST_BUFFER_SIZE) {
wrapDst = getBiggerBuffer(wrapDst, biggerSize);
log.log(Level.INFO, "{0} increased wrapDst buffer size to {1}",
new Object[] { this, wrapDst.capacity() });
// retry the operation.
}
// drain outbound buffer
this.drainOutbound();
break;

case CLOSED:
Expand All @@ -273,6 +300,48 @@ private synchronized boolean wrap() {
return true;
}

/**
* Push once and spawn task if all outbound data is not fully pushed out.
*
* @param sync
*/
private void drainOutbound() {
if (wrapDst.position() > 0) {
pushOutbound();
if (wrapDst.position() == 0)
return;
// else spawn task
Runnable pushTask = new Runnable() {
@Override
public void run() {
while (wrapDst.position() > 0) {
int prev = wrapDst.position();
synchronized (AbstractNIOSSL.this) {
wrapDst.flip();
AbstractNIOSSL.this.onOutboundData(wrapDst);
wrapDst.compact();
}
if (wrapDst.position() == prev)
Thread.yield();
}
}
};
this.taskWorkers.execute(pushTask);
}
}

private void pushOutbound() {
wrapDst.flip();
AbstractNIOSSL.this.onOutboundData(wrapDst);
wrapDst.compact();
}

private void pullInbound() {
unwrapDst.flip();
this.onInboundData(unwrapDst);
unwrapDst.compact();
}

private synchronized boolean unwrap() {
SSLEngineResult unwrapResult;

Expand All @@ -287,37 +356,38 @@ private synchronized boolean unwrap() {

switch (unwrapResult.getStatus()) {
case OK:
if (unwrapDst.position() > 0) {
unwrapDst.flip();
this.onInboundData(unwrapDst);
unwrapDst.compact();
}
if (unwrapDst.position() > 0)
this.pullInbound();
break;

case CLOSED:
this.onClosed();
return false;

case BUFFER_OVERFLOW:
log.info("unwrap BUFFER_OVERFLOW: Network data is coming in faster than can be unwrapped");
// Could attempt to drain the dst buffer of any already obtained
// data, but we'll just increase it to the size needed.
log.log(Level.INFO,
"{0} unwrap BUFFER_OVERFLOW: Network data is coming in faster than can be unwrapped",
new Object[] { this });
int biggerSize = engine.getSession().getApplicationBufferSize()
+ unwrapDst.capacity();
if (biggerSize > MAX_BUFFER_SIZE)
throw new IllegalStateException("failed to unwrap");
ByteBuffer b = ByteBuffer.allocate(biggerSize);
unwrapDst.flip();
b.put(unwrapDst);
unwrapDst = b;
log.log(Level.FINE, MyLogger.FORMAT[1],
new Object[] { "Increased unwrapDst buffer size to ",
unwrapDst.capacity() });
// retry the operation.
// throw new IllegalStateException("failed to unwrap")
;
// try increasing size first
if (biggerSize < MAX_BUFFER_SIZE) {
unwrapDst = getBiggerBuffer(unwrapDst, biggerSize);
log.log(Level.FINE,
"{0} increased unwrapDst buffer size to {1}",
new Object[] { this, unwrapDst.capacity() });
// retry the operation.
}
// try draining inbound buffer
this.pullInbound();
break;

case BUFFER_UNDERFLOW:
log.fine("unwrap BUFFER_UNDERFLOW");
log.log(Level.FINE, "{0} unwrap BUFFER_UNDERFLOW",
new Object[] { this });
return false;
}

Expand All @@ -333,6 +403,11 @@ private synchronized boolean unwrap() {
return true;
}

public String toString() {
return AbstractNIOSSL.class.getSimpleName()
+ (this.key != null ? this.key.channel() : "");
}

/**
* To flush stuck data if any.
*/
Expand All @@ -356,9 +431,10 @@ protected void clean() {
private static void clean(ByteBuffer bbuf) {
Cleaner cleaner = null;
/* java 9 apparently may not support sun.nio.ch.DirectBuffer; if so,
* just comment the line below. The code will default to using the
* just comment the line below. The code will default to using the
* reflective approach below. */
cleaner = ((DirectBuffer) bbuf).cleaner();
if (bbuf instanceof DirectBuffer)
cleaner = ((DirectBuffer) bbuf).cleaner();
if (cleaner == null)
try {
Field cleanerField = bbuf.getClass()
Expand Down
2 changes: 2 additions & 0 deletions src/edu/umass/cs/nio/AbstractPacketDemultiplexer.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import edu.umass.cs.nio.interfaces.IntegerPacketType;
import edu.umass.cs.nio.interfaces.PacketDemultiplexer;
import edu.umass.cs.nio.nioutils.NIOHeader;
import edu.umass.cs.nio.nioutils.NIOInstrumenter;
import edu.umass.cs.utils.Stringer;

import java.util.logging.Level;
Expand Down Expand Up @@ -141,6 +142,7 @@ public String toString() {
// This method will be invoked by NIO
protected boolean handleMessageSuper(byte[] msg, NIOHeader header)
throws JSONException {
NIOInstrumenter.incrRcvd();
MessageType message = null;
Level level = Level.FINEST;
try {
Expand Down
2 changes: 2 additions & 0 deletions src/edu/umass/cs/nio/MessageExtractor.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import edu.umass.cs.nio.interfaces.InterfaceMessageExtractor;
import edu.umass.cs.nio.nioutils.NIOHeader;
import edu.umass.cs.nio.nioutils.NIOInstrumenter;
import edu.umass.cs.nio.nioutils.PacketDemultiplexerDefault;
import edu.umass.cs.reconfiguration.reconfigurationutils.AppInstrumenter;
import edu.umass.cs.utils.Stringer;
Expand Down Expand Up @@ -249,6 +250,7 @@ private void demultiplexMessage(NIOHeader header, ByteBuffer incoming)
msg = new byte[incoming.remaining()];
incoming.get(msg);
extracted = true;
NIOInstrumenter.incrBytesRcvd(msg.length + 8);
}

// String message = (new String(msg,
Expand Down
Loading

0 comments on commit 1dfff4d

Please sign in to comment.