diff --git a/core/src/main/java/io/undertow/UndertowLogger.java b/core/src/main/java/io/undertow/UndertowLogger.java index 3bdfcc23ad..f747e0144a 100644 --- a/core/src/main/java/io/undertow/UndertowLogger.java +++ b/core/src/main/java/io/undertow/UndertowLogger.java @@ -488,4 +488,8 @@ void nodeConfigCreated(URI connectionURI, String balancer, String domain, String @LogMessage(level = WARN) @Message(id = 5107, value = "Failed to set web socket timeout.") void failedToSetWSTimeout(@Cause Exception e); + + @LogMessage(level = WARN) + @Message(id = 5108, value = "Failed to transition to '%s' state in '%s'.") + void failedToTransitionToState(String state, Object src); } \ No newline at end of file diff --git a/core/src/main/java/io/undertow/client/ajp/AjpClientConnection.java b/core/src/main/java/io/undertow/client/ajp/AjpClientConnection.java index 899d25a450..83433b41d1 100644 --- a/core/src/main/java/io/undertow/client/ajp/AjpClientConnection.java +++ b/core/src/main/java/io/undertow/client/ajp/AjpClientConnection.java @@ -35,6 +35,7 @@ import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import io.undertow.client.ClientStatistics; import org.jboss.logging.Logger; @@ -103,23 +104,30 @@ public void handleEvent(AjpClientResponseStreamSourceChannel channel) { private static final int CLOSE_REQ = 1 << 30; private static final int CLOSED = 1 << 31; - private int state; + @SuppressWarnings("unused") + private volatile int state; + private static final AtomicIntegerFieldUpdater stateUpdater = AtomicIntegerFieldUpdater.newUpdater(AjpClientConnection.class, "state"); private final ChannelListener.SimpleSetter closeSetter = new ChannelListener.SimpleSetter<>(); private final ClientStatistics clientStatistics; private final List> closeListeners = new CopyOnWriteArrayList<>(); - AjpClientConnection(final AjpClientChannel connection, final OptionMap options, final ByteBufferPool bufferPool, ClientStatistics clientStatistics) { + AjpClientConnection(final AjpClientChannel channel, final OptionMap options, final ByteBufferPool bufferPool, ClientStatistics clientStatistics) { this.clientStatistics = clientStatistics; this.options = options; - this.connection = connection; + this.connection = channel; this.bufferPool = bufferPool; - connection.addCloseTask(new ChannelListener() { + channel.addCloseTask(new ChannelListener() { @Override public void handleEvent(AjpClientChannel channel) { log.debugf("connection to %s closed", getPeerAddress()); - AjpClientConnection.this.state |= CLOSED; + final int oldVal = stateUpdater.getAndAccumulate(AjpClientConnection.this, CLOSED, (currentState, flag)-> currentState | flag); + if(anyAreSet(oldVal, CLOSED)) { + //this was closed already? + UndertowLogger.ROOT_LOGGER.failedToTransitionToState("CLOSED", AjpClientConnection.this); + return; + } ChannelListeners.invokeChannelListener(AjpClientConnection.this, closeSetter.get()); for(ChannelListener listener : closeListeners) { listener.handleEvent(AjpClientConnection.this); @@ -135,8 +143,8 @@ public void handleEvent(AjpClientChannel channel) { } } }); - connection.getReceiveSetter().set(new ClientReceiveListener()); - connection.resumeReceives(); + channel.getReceiveSetter().set(new ClientReceiveListener()); + channel.resumeReceives(); } @Override @@ -236,7 +244,7 @@ public void sendRequest(final ClientRequest request, final ClientCallback currentState | flag); } } else if (request.getProtocol() != Protocols.HTTP_1_1) { - state |= CLOSE_REQ; + stateUpdater.getAndAccumulate(AjpClientConnection.this, CLOSE_REQ, (currentState, flag)-> currentState | flag); } if (request.getRequestHeaders().contains(UPGRADE)) { - state |= UPGRADE_REQUESTED; + stateUpdater.getAndAccumulate(AjpClientConnection.this, UPGRADE_REQUESTED, (currentState, flag)-> currentState | flag); } long length = 0; @@ -327,7 +335,7 @@ public void close() throws IOException { if (anyAreSet(state, CLOSED)) { return; } - state |= CLOSED | CLOSE_REQ; + stateUpdater.accumulateAndGet(this, CLOSED | CLOSE_REQ, (currentState, flag)-> currentState | flag); connection.close(); } @@ -336,7 +344,6 @@ public void close() throws IOException { */ public void requestDone() { currentRequest = null; - if (anyAreSet(state, CLOSE_REQ)) { safeClose(connection); } else if (anyAreSet(state, UPGRADE_REQUESTED)) { @@ -352,7 +359,7 @@ public void requestDone() { } public void requestClose() { - state |= CLOSE_REQ; + stateUpdater.getAndAccumulate(AjpClientConnection.this, CLOSE_REQ, (currentState, flag)-> currentState | flag); } diff --git a/core/src/main/java/io/undertow/client/ajp/AjpClientExchange.java b/core/src/main/java/io/undertow/client/ajp/AjpClientExchange.java index 53536882b6..f6958deeab 100644 --- a/core/src/main/java/io/undertow/client/ajp/AjpClientExchange.java +++ b/core/src/main/java/io/undertow/client/ajp/AjpClientExchange.java @@ -35,6 +35,7 @@ import org.xnio.channels.StreamSourceChannel; import java.io.IOException; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import static org.xnio.Bits.anyAreSet; @@ -58,7 +59,9 @@ class AjpClientExchange extends AbstractAttachable implements ClientExchange { private AjpClientResponseStreamSourceChannel responseChannel; private AjpClientRequestClientStreamSinkChannel requestChannel; - private int state = 0; + @SuppressWarnings("unused") + private volatile int state = 0; + private static final AtomicIntegerFieldUpdater stateUpdater = AtomicIntegerFieldUpdater.newUpdater(AjpClientExchange.class, "state"); private static final int REQUEST_TERMINATED = 1; private static final int RESPONSE_TERMINATED = 1 << 1; @@ -78,9 +81,9 @@ class AjpClientExchange extends AbstractAttachable implements ClientExchange { } void terminateRequest() { - state |= REQUEST_TERMINATED; + stateUpdater.accumulateAndGet(this, REQUEST_TERMINATED, (currentState, flag)-> currentState | flag); if(!clientConnection.isOpen()) { - state |= RESPONSE_TERMINATED; + stateUpdater.accumulateAndGet(this, RESPONSE_TERMINATED, (currentState, flag)-> currentState | flag); } if (anyAreSet(state, RESPONSE_TERMINATED)) { clientConnection.requestDone(); @@ -88,9 +91,9 @@ void terminateRequest() { } void terminateResponse() { - state |= RESPONSE_TERMINATED; + stateUpdater.accumulateAndGet(this, RESPONSE_TERMINATED, (currentState, flag)-> currentState | flag); if(!clientConnection.isOpen()) { - state |= REQUEST_TERMINATED; + stateUpdater.accumulateAndGet(this, REQUEST_TERMINATED, (currentState, flag)-> currentState | flag); } if (anyAreSet(state, REQUEST_TERMINATED)) { clientConnection.requestDone(); @@ -155,7 +158,7 @@ public StreamSinkChannel getRequestChannel() { return new DetachableStreamSinkChannel(requestChannel) { @Override protected boolean isFinished() { - return anyAreSet(state, REQUEST_TERMINATED); + return anyAreSet(AjpClientExchange.this.state, REQUEST_TERMINATED); } }; } @@ -165,7 +168,7 @@ public StreamSourceChannel getResponseChannel() { return new DetachableStreamSourceChannel(responseChannel) { @Override protected boolean isFinished() { - return anyAreSet(state, RESPONSE_TERMINATED); + return anyAreSet(AjpClientExchange.this.state, RESPONSE_TERMINATED); } }; } diff --git a/core/src/main/java/io/undertow/client/http/HttpClientConnection.java b/core/src/main/java/io/undertow/client/http/HttpClientConnection.java index 7d66368cf2..5824b6ffe4 100644 --- a/core/src/main/java/io/undertow/client/http/HttpClientConnection.java +++ b/core/src/main/java/io/undertow/client/http/HttpClientConnection.java @@ -83,6 +83,7 @@ import java.util.List; import java.util.Locale; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import static io.undertow.client.UndertowClientMessages.MESSAGES; import static org.xnio.Bits.allAreClear; @@ -132,7 +133,9 @@ public void handleEvent(StreamSourceConduit channel) { private static final int CLOSE_REQ = 1 << 30; private static final int CLOSED = 1 << 31; - private int state; + @SuppressWarnings("unused") + private volatile int state; + private static final AtomicIntegerFieldUpdater stateUpdater = AtomicIntegerFieldUpdater.newUpdater(HttpClientConnection.class, "state"); private final ChannelListener.SimpleSetter closeSetter = new ChannelListener.SimpleSetter<>(); private final ClientStatistics clientStatistics; @@ -178,7 +181,13 @@ public void activity(long bytes) { public void handleEvent(StreamConnection channel) { log.debugf("connection to %s closed", getPeerAddress()); - HttpClientConnection.this.state |= CLOSED; + final int oldVal = stateUpdater.getAndAccumulate(HttpClientConnection.this, CLOSED, (currentState, flag)-> currentState | flag); + if(anyAreSet(oldVal, CLOSED)) { + //this was closed already? + UndertowLogger.ROOT_LOGGER.failedToTransitionToState("CLOSED", HttpClientConnection.this); + //NOTE: cant do that.... + //return; + } ChannelListeners.invokeChannelListener(HttpClientConnection.this, closeSetter.get()); try { if (pooledBuffer != null) { @@ -256,7 +265,7 @@ public boolean isOpen() { if(http2Delegate != null) { return http2Delegate.isOpen(); } - return connection.isOpen() && allAreClear(state, CLOSE_REQ | CLOSED); + return connection.isOpen() && allAreClear(state, CLOSED | CLOSE_REQ); } @Override @@ -348,10 +357,10 @@ public void sendRequest(final ClientRequest request, final ClientCallback currentState | flag); } else if (Headers.UPGRADE.equalToString(connectionString)) { - state |= UPGRADE_REQUESTED; + stateUpdater.accumulateAndGet(HttpClientConnection.this, UPGRADE_REQUESTED, (currentState, flag)-> currentState | flag); } } else if (request.getProtocol() != Protocols.HTTP_1_1) { - state |= CLOSE_REQ; + stateUpdater.accumulateAndGet(HttpClientConnection.this, CLOSE_REQ, (currentState, flag)-> currentState | flag); } if (request.getRequestHeaders().contains(Headers.UPGRADE)) { - state |= UPGRADE_REQUESTED; + stateUpdater.accumulateAndGet(HttpClientConnection.this, UPGRADE_REQUESTED, (currentState, flag)-> currentState | flag); } if(request.getMethod().equals(Methods.CONNECT)) { //we treat CONNECT like upgrade requests - state |= UPGRADE_REQUESTED; + stateUpdater.accumulateAndGet(HttpClientConnection.this, UPGRADE_REQUESTED, (currentState, flag)-> currentState | flag); } //setup the client request conduits @@ -476,7 +485,7 @@ public StreamConnection performUpgrade() throws IOException { if (allAreSet(state, UPGRADED | CLOSE_REQ | CLOSED)) { throw new IOException(UndertowClientMessages.MESSAGES.connectionClosed()); } - state |= UPGRADED; + stateUpdater.accumulateAndGet(this, UPGRADED, (currentState, flag)-> currentState | flag); connection.getSinkChannel().setConduit(originalSinkConduit); connection.getSourceChannel().setConduit(pushBackStreamSourceConduit); return connection; @@ -490,7 +499,7 @@ public void close() throws IOException { if (anyAreSet(state, CLOSED)) { return; } - state |= CLOSED | CLOSE_REQ; + stateUpdater.accumulateAndGet(this, CLOSED | CLOSE_REQ, (currentState, flag)-> currentState | flag); ConnectionUtils.cleanClose(connection); } @@ -508,7 +517,7 @@ public void exchangeDone() { if (anyAreSet(state, CLOSE_REQ)) { currentRequest = null; pendingResponse = null; - this.state |= CLOSED; + stateUpdater.accumulateAndGet(this, CLOSED, (currentState, flag)-> currentState | flag); safeClose(connection); } else if (anyAreSet(state, UPGRADE_REQUESTED)) { connection.getSourceChannel().suspendReads(); @@ -630,7 +639,7 @@ public void handleEvent(StreamSourceChannel channel) { if ((connectionString == null || !Headers.UPGRADE.equalToString(connectionString)) && !response.getResponseHeaders().contains(Headers.UPGRADE)) { if(!currentRequest.getRequest().getMethod().equals(Methods.CONNECT) || response.getResponseCode() != 200) { //make sure it was not actually a connect request //just unset the upgrade requested flag - HttpClientConnection.this.state &= ~UPGRADE_REQUESTED; + stateUpdater.accumulateAndGet(HttpClientConnection.this, ~UPGRADE_REQUESTED, (currentState, flag)-> currentState & flag); } } } @@ -647,7 +656,7 @@ public void handleEvent(StreamSourceChannel channel) { close = true; } if(close) { - HttpClientConnection.this.state |= CLOSE_REQ; + stateUpdater.accumulateAndGet(HttpClientConnection.this, CLOSE_REQ, (currentState, flag)-> currentState | flag); //we are going to close, kill any queued connections HttpClientExchange ex = pendingQueue.poll(); while (ex != null) { @@ -673,7 +682,7 @@ public void handleEvent(StreamSourceChannel channel) { currentRequest.setResponse(response); if(response.getResponseCode() == StatusCodes.EXPECTATION_FAILED) { if(HttpContinue.requiresContinueResponse(currentRequest.getRequest().getRequestHeaders())) { - HttpClientConnection.this.state |= CLOSE_REQ; + stateUpdater.accumulateAndGet(HttpClientConnection.this, CLOSE_REQ, (currentState, flag)-> currentState | flag); ConduitStreamSinkChannel sinkChannel = HttpClientConnection.this.connection.getSinkChannel(); sinkChannel.shutdownWrites(); if(!sinkChannel.flush()) { @@ -749,7 +758,7 @@ private void prepareResponseChannel(ClientResponse response, ClientExchange exch connection.getSourceChannel().setConduit(new FixedLengthStreamSourceConduit(connection.getSourceChannel().getConduit(), 0, responseFinishedListener)); } else { connection.getSourceChannel().setConduit(new FinishableStreamSourceConduit(connection.getSourceChannel().getConduit(), responseFinishedListener)); - state |= CLOSE_REQ; + stateUpdater.accumulateAndGet(this, CLOSE_REQ, (currentState, flag)-> currentState | flag); } } diff --git a/core/src/main/java/io/undertow/client/http/HttpClientExchange.java b/core/src/main/java/io/undertow/client/http/HttpClientExchange.java index 77834ad2ac..b8ad08598c 100644 --- a/core/src/main/java/io/undertow/client/http/HttpClientExchange.java +++ b/core/src/main/java/io/undertow/client/http/HttpClientExchange.java @@ -34,6 +34,7 @@ import org.xnio.channels.StreamSourceChannel; import java.io.IOException; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import static org.xnio.Bits.anyAreSet; @@ -57,7 +58,9 @@ class HttpClientExchange extends AbstractAttachable implements ClientExchange { private IOException failedReason; private HttpRequestConduit requestConduit; - private int state = 0; + @SuppressWarnings("unused") + private volatile int state; + private static final AtomicIntegerFieldUpdater stateUpdater = AtomicIntegerFieldUpdater.newUpdater(HttpClientExchange.class, "state"); private static final int REQUEST_TERMINATED = 1; private static final int RESPONSE_TERMINATED = 1 << 1; @@ -81,28 +84,28 @@ public void setRequestConduit(HttpRequestConduit requestConduit) { } void terminateRequest() { - if(anyAreSet(state, REQUEST_TERMINATED)) { + if(anyAreSet(HttpClientExchange.this.state, REQUEST_TERMINATED)) { return; } log.debugf("request terminated for request to %s %s", clientConnection.getPeerAddress(), getRequest().getPath()); - state |= REQUEST_TERMINATED; + stateUpdater.accumulateAndGet(this, REQUEST_TERMINATED, (currentState, flag)-> currentState | flag); clientConnection.requestDataSent(); - if (anyAreSet(state, RESPONSE_TERMINATED)) { + if (anyAreSet(HttpClientExchange.this.state, RESPONSE_TERMINATED)) { clientConnection.exchangeDone(); } } boolean isRequestDataSent() { - return anyAreSet(state, REQUEST_TERMINATED); + return anyAreSet(HttpClientExchange.this.state, REQUEST_TERMINATED); } void terminateResponse() { - if(anyAreSet(state, RESPONSE_TERMINATED)) { + if(anyAreSet(HttpClientExchange.this.state, RESPONSE_TERMINATED)) { return; } log.debugf("response terminated for request to %s %s", clientConnection.getPeerAddress(), getRequest().getPath()); - state |= RESPONSE_TERMINATED; - if (anyAreSet(state, REQUEST_TERMINATED)) { + stateUpdater.accumulateAndGet(this, RESPONSE_TERMINATED, (currentState, flag)-> currentState | flag); + if (anyAreSet(HttpClientExchange.this.state, REQUEST_TERMINATED)) { clientConnection.exchangeDone(); } } @@ -168,7 +171,7 @@ public StreamSinkChannel getRequestChannel() { return new DetachableStreamSinkChannel(clientConnection.getConnection().getSinkChannel()) { @Override protected boolean isFinished() { - return anyAreSet(state, REQUEST_TERMINATED); + return anyAreSet(HttpClientExchange.this.state, REQUEST_TERMINATED); } }; } @@ -178,7 +181,7 @@ public StreamSourceChannel getResponseChannel() { return new DetachableStreamSourceChannel(clientConnection.getConnection().getSourceChannel()) { @Override protected boolean isFinished() { - return anyAreSet(state, RESPONSE_TERMINATED); + return anyAreSet(HttpClientExchange.this.state, RESPONSE_TERMINATED); } }; } diff --git a/core/src/main/java/io/undertow/client/http/HttpRequestConduit.java b/core/src/main/java/io/undertow/client/http/HttpRequestConduit.java index f25ad40c88..22ccbfd1d4 100644 --- a/core/src/main/java/io/undertow/client/http/HttpRequestConduit.java +++ b/core/src/main/java/io/undertow/client/http/HttpRequestConduit.java @@ -52,8 +52,6 @@ final class HttpRequestConduit extends AbstractStreamSinkConduit nameIterator; private String string; private HttpString headerName; @@ -79,6 +77,7 @@ final class HttpRequestConduit extends AbstractStreamSinkConduit stateUpdater = AtomicIntegerFieldUpdater.newUpdater( HttpRequestConduit.class, "state"); @@ -465,7 +464,7 @@ private int doProcessWrite(int state, final ByteBuffer userData) throws IOExcept if (res == 0) { log.trace("Continuation"); this.charIndex = i; - this.state = STATE_URL; + stateUpdater.set(this, STATE_URL) ; return STATE_URL; } } while (buffer.hasRemaining()); @@ -534,14 +533,14 @@ public int write(final ByteBuffer src) throws IOException { } return alreadyWritten; } catch (IOException | RuntimeException | Error e) { - this.state |= FLAG_SHUTDOWN; + setFlags(FLAG_SHUTDOWN); if(pooledBuffer != null) { pooledBuffer.close(); pooledBuffer = null; } throw e; } finally { - this.state = oldState & ~MASK_STATE | state; + stateUpdater.set(this, oldState & ~MASK_STATE | state); } } @@ -576,14 +575,14 @@ public long write(final ByteBuffer[] srcs, final int offset, final int length) t } return length == 1 ? next.write(srcs[offset]) : next.write(srcs, offset, length); } catch (IOException | RuntimeException | Error e) { - this.state |= FLAG_SHUTDOWN; + setFlags(FLAG_SHUTDOWN); if(pooledBuffer != null) { pooledBuffer.close(); pooledBuffer = null; } throw e; } finally { - this.state = oldVal & ~MASK_STATE | state; + stateUpdater.set(this, oldVal & ~MASK_STATE | state); } } @@ -630,7 +629,7 @@ public long transferFrom(final FileChannel src, final long position, final long } throw e; } finally { - this.state = oldVal & ~MASK_STATE | state; + stateUpdater.set(this, oldVal & ~MASK_STATE | state); } } @@ -661,14 +660,14 @@ public long transferFrom(final StreamSourceChannel source, final long count, fin } return next.transferFrom(source, count, throughBuffer); } catch (IOException | RuntimeException | Error e) { - this.state |= FLAG_SHUTDOWN; + setFlags(FLAG_SHUTDOWN); if(pooledBuffer != null) { pooledBuffer.close(); pooledBuffer = null; } throw e; } finally { - this.state = oldVal & ~MASK_STATE | state; + stateUpdater.set(this, oldVal & ~MASK_STATE | state); } } @@ -698,14 +697,14 @@ public boolean flush() throws IOException { log.trace("Delegating flush"); return next.flush(); } catch (IOException | RuntimeException | Error e) { - this.state |= FLAG_SHUTDOWN; + setFlags(FLAG_SHUTDOWN); if(pooledBuffer != null) { pooledBuffer.close(); pooledBuffer = null; } throw e; } finally { - this.state = oldVal & ~MASK_STATE | state; + stateUpdater.set(this, oldVal & ~MASK_STATE | state); } } @@ -717,7 +716,7 @@ public void terminateWrites() throws IOException { next.terminateWrites(); return; } - this.state = oldVal | FLAG_SHUTDOWN; + stateUpdater.set(this, oldVal | FLAG_SHUTDOWN); } public void truncateWrites() throws IOException { @@ -734,7 +733,7 @@ public void truncateWrites() throws IOException { } return; } - this.state = oldVal & ~MASK_STATE | FLAG_SHUTDOWN; + stateUpdater.set(this, oldVal & ~MASK_STATE | FLAG_SHUTDOWN); throw new TruncatedResponseException(); } @@ -746,7 +745,24 @@ public void freeBuffers() { if(pooledBuffer != null) { pooledBuffer.close(); pooledBuffer = null; - this.state = state & ~MASK_STATE | FLAG_SHUTDOWN; + //this.state = state & ~MASK_STATE | FLAG_SHUTDOWN; + clearFlags(MASK_STATE); + setFlags(FLAG_SHUTDOWN); } } + + + private void setFlags(int flags) { + int old; + do { + old = state; + } while (!stateUpdater.compareAndSet(this, old, old | flags)); + } + + private void clearFlags(int flags) { + int old; + do { + old = state; + } while (!stateUpdater.compareAndSet(this, old, old & ~flags)); + } } diff --git a/core/src/main/java/io/undertow/conduits/AbstractFixedLengthStreamSinkConduit.java b/core/src/main/java/io/undertow/conduits/AbstractFixedLengthStreamSinkConduit.java index c8379ef24c..c8d324b49f 100644 --- a/core/src/main/java/io/undertow/conduits/AbstractFixedLengthStreamSinkConduit.java +++ b/core/src/main/java/io/undertow/conduits/AbstractFixedLengthStreamSinkConduit.java @@ -19,6 +19,7 @@ package io.undertow.conduits; import io.undertow.UndertowLogger; + import org.xnio.Buffers; import org.xnio.channels.FixedLengthOverflowException; import org.xnio.channels.StreamSourceChannel; @@ -31,6 +32,7 @@ import java.nio.channels.ClosedChannelException; import java.nio.channels.FileChannel; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; import static java.lang.Math.min; import static org.xnio.Bits.allAreClear; @@ -46,7 +48,9 @@ public abstract class AbstractFixedLengthStreamSinkConduit extends AbstractStreamSinkConduit { private int config; - private long state; + @SuppressWarnings("unused") + private volatile long state; + protected static final AtomicLongFieldUpdater stateUpdater = AtomicLongFieldUpdater.newUpdater(AbstractFixedLengthStreamSinkConduit.class, "state"); private boolean broken = false; @@ -78,7 +82,7 @@ public AbstractFixedLengthStreamSinkConduit(final StreamSinkConduit next, final } protected void reset(long contentLength, boolean propagateClose) { - this.state = contentLength; + stateUpdater.set(this, contentLength); if (propagateClose) { config |= CONF_FLAG_PASS_CLOSE; } else { @@ -256,7 +260,7 @@ public void terminateWrites() throws IOException { next.truncateWrites(); } finally { if (!anyAreSet(state, FLAG_FINISHED_CALLED)) { - state |= FLAG_FINISHED_CALLED; + stateUpdater.accumulateAndGet(this, FLAG_FINISHED_CALLED, (current, flag)-> current | flag); channelFinished(); } } @@ -270,7 +274,7 @@ public void terminateWrites() throws IOException { public void truncateWrites() throws IOException { try { if (!anyAreSet(state, FLAG_FINISHED_CALLED)) { - state |= FLAG_FINISHED_CALLED; + stateUpdater.accumulateAndGet(this, FLAG_FINISHED_CALLED, (current, flag)-> current | flag); channelFinished(); } } finally { @@ -298,7 +302,7 @@ public long getRemaining() { private void exitWrite(long oldVal, long consumed) { long newVal = oldVal - consumed; - state = newVal; + stateUpdater.set(this, newVal); } private void exitFlush(long oldVal, boolean flushed) { @@ -311,7 +315,7 @@ private void exitFlush(long oldVal, boolean flushed) { newVal |= FLAG_FINISHED_CALLED; callFinish = true; } - state = newVal; + stateUpdater.set(this, newVal); if (callFinish) { channelFinished(); } @@ -322,18 +326,20 @@ protected void channelFinished() { } private long enterShutdown() { - long oldVal, newVal; - oldVal = state; + long oldVal; + oldVal = stateUpdater.get(this); if (anyAreSet(oldVal, FLAG_CLOSE_REQUESTED | FLAG_CLOSE_COMPLETE)) { // no action necessary return oldVal; } - newVal = oldVal | FLAG_CLOSE_REQUESTED; + stateUpdater.accumulateAndGet(this, FLAG_CLOSE_REQUESTED, (currentState, flag)-> currentState | flag); if (anyAreSet(oldVal, MASK_COUNT)) { // error: channel not filled. set both close flags. - newVal |= FLAG_CLOSE_COMPLETE; + stateUpdater.accumulateAndGet(this, FLAG_CLOSE_REQUESTED, (currentState, flag)-> currentState | flag); + } else { + stateUpdater.accumulateAndGet(this, FLAG_CLOSE_REQUESTED, (currentState, flag)-> currentState | flag); } - state = newVal; + //state = newVal; return oldVal; } diff --git a/core/src/main/java/io/undertow/conduits/AbstractFramedStreamSinkConduit.java b/core/src/main/java/io/undertow/conduits/AbstractFramedStreamSinkConduit.java index fe233e47dd..37da2d0f72 100644 --- a/core/src/main/java/io/undertow/conduits/AbstractFramedStreamSinkConduit.java +++ b/core/src/main/java/io/undertow/conduits/AbstractFramedStreamSinkConduit.java @@ -19,6 +19,7 @@ package io.undertow.conduits; import io.undertow.UndertowMessages; + import org.xnio.Buffers; import org.xnio.IoUtils; import io.undertow.connector.PooledByteBuffer; @@ -33,6 +34,7 @@ import java.nio.channels.FileChannel; import java.util.ArrayDeque; import java.util.Deque; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import static org.xnio.Bits.allAreClear; import static org.xnio.Bits.anyAreSet; @@ -58,8 +60,9 @@ public class AbstractFramedStreamSinkConduit extends AbstractStreamSinkConduit stateUpdater = AtomicIntegerFieldUpdater.newUpdater( + AbstractFramedStreamSinkConduit.class, "state"); private static final int FLAG_WRITES_TERMINATED = 1; private static final int FLAG_DELEGATE_SHUTDOWN = 2; @@ -191,11 +194,12 @@ public void terminateWrites() throws IOException { return; } queueCloseFrames(); - state |= FLAG_WRITES_TERMINATED; if (queuedData == 0) { - state |= FLAG_DELEGATE_SHUTDOWN; + stateUpdater.accumulateAndGet(this, FLAG_WRITES_TERMINATED | FLAG_DELEGATE_SHUTDOWN, (current, flag) -> current | flag); doTerminateWrites(); finished(); + } else { + stateUpdater.accumulateAndGet(this, FLAG_WRITES_TERMINATED, (current, flag) -> current | flag); } } @@ -212,7 +216,7 @@ protected boolean flushQueuedData() throws IOException { } if (anyAreSet(state, FLAG_WRITES_TERMINATED) && allAreClear(state, FLAG_DELEGATE_SHUTDOWN)) { doTerminateWrites(); - state |= FLAG_DELEGATE_SHUTDOWN; + stateUpdater.accumulateAndGet(this, FLAG_DELEGATE_SHUTDOWN, (current, flag) -> current | flag); finished(); } return next.flush(); diff --git a/core/src/main/java/io/undertow/conduits/ChunkReader.java b/core/src/main/java/io/undertow/conduits/ChunkReader.java index f4816e3f92..79eeb111f3 100644 --- a/core/src/main/java/io/undertow/conduits/ChunkReader.java +++ b/core/src/main/java/io/undertow/conduits/ChunkReader.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; import org.xnio.conduits.Conduit; import io.undertow.UndertowMessages; @@ -44,12 +45,17 @@ class ChunkReader { private static final long FLAG_READING_TILL_END_OF_LINE = 1L << 60L; private static final long FLAG_READING_NEWLINE = 1L << 59L; private static final long FLAG_READING_AFTER_LAST = 1L << 58L; + private static final long FLAG_COMPOUND = FLAG_READING_LENGTH | FLAG_READING_TILL_END_OF_LINE | FLAG_READING_NEWLINE | FLAG_READING_AFTER_LAST; private static final long MASK_COUNT = longBitMask(0, 56); private static final long LIMIT = Long.MAX_VALUE >> 4; - private long state; + @SuppressWarnings("unused") + private volatile long state; + private static final AtomicLongFieldUpdater stateUpdater = AtomicLongFieldUpdater.newUpdater( + ChunkReader.class, "state"); + private final Attachable attachable; private final AttachmentKey trailerAttachmentKey; /** @@ -71,7 +77,7 @@ public long readChunk(final ByteBuffer buf) throws IOException { long oldVal = state; long chunkRemaining = state & MASK_COUNT; - if (chunkRemaining > 0 && !anyAreSet(state, FLAG_READING_AFTER_LAST | FLAG_READING_LENGTH | FLAG_READING_NEWLINE | FLAG_READING_TILL_END_OF_LINE)) { + if (chunkRemaining > 0 && !anyAreSet(oldVal, FLAG_READING_AFTER_LAST | FLAG_READING_LENGTH | FLAG_READING_NEWLINE | FLAG_READING_TILL_END_OF_LINE)) { return chunkRemaining; } long newVal = oldVal & ~MASK_COUNT; @@ -144,7 +150,7 @@ public long readChunk(final ByteBuffer buf) throws IOException { } return chunkRemaining; } finally { - state = newVal | chunkRemaining; + stateUpdater.set(this, newVal | chunkRemaining); } } @@ -152,24 +158,25 @@ public long getChunkRemaining() { if (anyAreSet(state, FLAG_FINISHED)) { return -1; } - if (anyAreSet(state, FLAG_READING_LENGTH | FLAG_READING_TILL_END_OF_LINE | FLAG_READING_NEWLINE | FLAG_READING_AFTER_LAST)) { + if (anyAreSet(state, FLAG_COMPOUND)) { return 0; } return state & MASK_COUNT; } public void setChunkRemaining(final long remaining) { - if (remaining < 0 || anyAreSet(state, FLAG_READING_LENGTH | FLAG_READING_TILL_END_OF_LINE | FLAG_READING_NEWLINE | FLAG_READING_AFTER_LAST)) { + long old = state; + if (remaining < 0 || anyAreSet(old, FLAG_COMPOUND)) { return; } - long old = state; + long oldRemaining = old & MASK_COUNT; if (remaining == 0 && oldRemaining != 0) { //if oldRemaining is zero it could be that no data has been read yet //and the correct state is READING_LENGTH old |= FLAG_READING_NEWLINE; } - state = (old & ~MASK_COUNT) | remaining; + stateUpdater.set(this, (old & ~MASK_COUNT) | remaining); } private int handleChunkedRequestEnd(ByteBuffer buffer) throws IOException { diff --git a/core/src/main/java/io/undertow/conduits/ChunkedStreamSinkConduit.java b/core/src/main/java/io/undertow/conduits/ChunkedStreamSinkConduit.java index ec8a8e3a61..f874c3ea97 100644 --- a/core/src/main/java/io/undertow/conduits/ChunkedStreamSinkConduit.java +++ b/core/src/main/java/io/undertow/conduits/ChunkedStreamSinkConduit.java @@ -25,6 +25,7 @@ import java.nio.channels.FileChannel; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.function.Supplier; import io.undertow.UndertowLogger; @@ -82,7 +83,9 @@ public class ChunkedStreamSinkConduit extends AbstractStreamSinkConduit stateUpdater = AtomicIntegerFieldUpdater.newUpdater(ChunkedStreamSinkConduit.class, "state"); private int chunkleft = 0; private final ByteBuffer chunkingBuffer = ByteBuffer.allocate(12); //12 is the most @@ -136,7 +139,7 @@ int doWrite(final ByteBuffer src) throws IOException { if(src.remaining() == 0) { return 0; } - this.state |= FLAG_FIRST_DATA_WRITTEN; + stateUpdater.accumulateAndGet(this, FLAG_FIRST_DATA_WRITTEN, (currentState, flag)-> currentState | flag); int oldLimit = src.limit(); boolean dataRemaining = false; //set to true if there is data in src that still needs to be written out if (chunkleft == 0 && !chunkingSepBuffer.hasRemaining()) { @@ -147,7 +150,7 @@ int doWrite(final ByteBuffer src) throws IOException { chunkingSepBuffer.clear(); chunkingSepBuffer.put(CRLF); chunkingSepBuffer.flip(); - state |= FLAG_WRITTEN_FIRST_CHUNK; + stateUpdater.accumulateAndGet(this, FLAG_WRITTEN_FIRST_CHUNK, (currentState, flag)-> currentState | flag); chunkleft = src.remaining(); } else { if (src.remaining() > chunkleft) { @@ -172,10 +175,10 @@ int doWrite(final ByteBuffer src) throws IOException { result = next.write(buf, 0, buf.length); } if (!src.hasRemaining()) { - state |= FLAG_WRITES_SHUTDOWN; + stateUpdater.accumulateAndGet(this, FLAG_WRITES_SHUTDOWN, (currentState, flag)-> currentState | flag); } if (!lastChunkBuffer.getBuffer().hasRemaining()) { - state |= FLAG_NEXT_SHUTDOWN; + stateUpdater.accumulateAndGet(this, FLAG_NEXT_SHUTDOWN, (currentState, flag)-> currentState | flag); lastChunkBuffer.close(); } } @@ -265,7 +268,7 @@ public boolean flush() throws IOException { if (anyAreSet(state, FLAG_FINISHED)) { return true; } - this.state |= FLAG_FIRST_DATA_WRITTEN; + stateUpdater.accumulateAndGet(this, FLAG_FIRST_DATA_WRITTEN, (currentState, flag)-> currentState | flag); if (anyAreSet(state, FLAG_WRITES_SHUTDOWN)) { if (anyAreSet(state, FLAG_NEXT_SHUTDOWN)) { boolean val = next.flush(); @@ -280,7 +283,7 @@ public boolean flush() throws IOException { if (anyAreSet(config, CONF_FLAG_PASS_CLOSE)) { next.terminateWrites(); } - state |= FLAG_NEXT_SHUTDOWN; + stateUpdater.accumulateAndGet(this, FLAG_NEXT_SHUTDOWN, (currentState, flag)-> currentState | flag); boolean val = next.flush(); if (val && allAreClear(state, FLAG_FINISHED)) { invokeFinishListener(); @@ -296,7 +299,7 @@ public boolean flush() throws IOException { } private void invokeFinishListener() { - state |= FLAG_FINISHED; + stateUpdater.accumulateAndGet(this, FLAG_FINISHED, (currentState, flag)-> currentState | flag); if (finishListener != null) { finishListener.handleEvent(this); } @@ -313,7 +316,7 @@ public void terminateWrites() throws IOException { //todo: should we make this behaviour configurable? responseHeaders.put(Headers.CONTENT_LENGTH, "0"); //according to the spec we don't actually need this, but better to be safe responseHeaders.remove(Headers.TRANSFER_ENCODING); - state |= FLAG_NEXT_SHUTDOWN | FLAG_WRITES_SHUTDOWN; + stateUpdater.accumulateAndGet(this, FLAG_NEXT_SHUTDOWN | FLAG_WRITES_SHUTDOWN, (currentState, flag)-> currentState | flag); try { flush(); } catch (IOException ignore) { @@ -325,7 +328,7 @@ public void terminateWrites() throws IOException { } } else { createLastChunk(false); - state |= FLAG_WRITES_SHUTDOWN; + stateUpdater.accumulateAndGet(this, FLAG_WRITES_SHUTDOWN, (currentState, flag)-> currentState | flag); try { flush(); } catch (IOException ignore) { diff --git a/core/src/main/java/io/undertow/conduits/DeflatingStreamSinkConduit.java b/core/src/main/java/io/undertow/conduits/DeflatingStreamSinkConduit.java index 24acce45dc..6dad5b21f5 100644 --- a/core/src/main/java/io/undertow/conduits/DeflatingStreamSinkConduit.java +++ b/core/src/main/java/io/undertow/conduits/DeflatingStreamSinkConduit.java @@ -27,6 +27,7 @@ import java.nio.channels.ClosedChannelException; import java.nio.channels.FileChannel; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.zip.Deflater; import io.undertow.server.Connectors; @@ -77,7 +78,9 @@ public class DeflatingStreamSinkConduit implements StreamSinkConduit { */ private ByteBuffer trailerBuffer; - private int state = 0; + @SuppressWarnings("unused") + private volatile int state = 0; + private static final AtomicIntegerFieldUpdater stateUpdater = AtomicIntegerFieldUpdater.newUpdater(DeflatingStreamSinkConduit.class, "state"); private static final int SHUTDOWN = 1; private static final int NEXT_SHUTDOWN = 1 << 1; @@ -220,7 +223,7 @@ public XnioWorker getWorker() { @Override public void suspendWrites() { if (next == null) { - state = state & ~WRITES_RESUMED; + stateUpdater.getAndAccumulate(DeflatingStreamSinkConduit.this, ~WRITES_RESUMED, (currentState, flag)-> currentState & flag); } else { next.suspendWrites(); } @@ -248,7 +251,7 @@ public void wakeupWrites() { @Override public void resumeWrites() { if (next == null) { - state |= WRITES_RESUMED; + stateUpdater.getAndAccumulate(DeflatingStreamSinkConduit.this, WRITES_RESUMED, (currentState, flag)-> currentState | flag); queueWriteListener(); } else { next.resumeWrites(); @@ -278,7 +281,7 @@ public void terminateWrites() throws IOException { if (deflater != null) { deflater.finish(); } - state |= SHUTDOWN; + stateUpdater.getAndAccumulate(DeflatingStreamSinkConduit.this, SHUTDOWN, (currentState, flag)-> currentState | flag); } @Override @@ -343,7 +346,7 @@ public boolean flush() throws IOException { } final ByteBuffer buffer = currentBuffer.getBuffer(); if (allAreClear(state, WRITTEN_TRAILER)) { - state |= WRITTEN_TRAILER; + stateUpdater.getAndAccumulate(DeflatingStreamSinkConduit.this, WRITTEN_TRAILER, (currentState, flag)-> currentState | flag); byte[] data = getTrailer(); if (data != null) { Connectors.updateResponseBytesSent(exchange, data.length); @@ -364,14 +367,14 @@ public boolean flush() throws IOException { //ok the deflater is flushed, now we need to flush the buffer if (!anyAreSet(state, FLUSHING_BUFFER)) { buffer.flip(); - state |= FLUSHING_BUFFER; + stateUpdater.getAndAccumulate(DeflatingStreamSinkConduit.this, FLUSHING_BUFFER, (currentState, flag)-> currentState | flag); if (next == null) { nextCreated = true; this.next = createNextChannel(); } } if (performFlushIfRequired()) { - state |= NEXT_SHUTDOWN; + stateUpdater.getAndAccumulate(DeflatingStreamSinkConduit.this, NEXT_SHUTDOWN, (currentState, flag)-> currentState | flag); freeBuffer(); next.terminateWrites(); return next.flush(); @@ -389,7 +392,7 @@ public boolean flush() throws IOException { if(allAreClear(state, FLUSHING_BUFFER)) { //deflateData can cause this to be change currentBuffer.getBuffer().flip(); - this.state |= FLUSHING_BUFFER; + stateUpdater.getAndAccumulate(DeflatingStreamSinkConduit.this, FLUSHING_BUFFER, (currentState, flag)-> currentState | flag); } } if(!performFlushIfRequired()) { @@ -450,7 +453,7 @@ private boolean performFlushIfRequiredSingleBuffer() throws IOException { } while (total < totalLength); } currentBuffer.getBuffer().clear(); - state = state & ~FLUSHING_BUFFER; + stateUpdater.getAndAccumulate(DeflatingStreamSinkConduit.this, ~FLUSHING_BUFFER, (currentState, flag)-> currentState & flag); return true; } @@ -472,7 +475,7 @@ private boolean performFlushIfRequiredAdditionalBuffer() throws IOException { } trailerBuffer = null; currentBuffer.getBuffer().clear(); - state = state & ~FLUSHING_BUFFER; + stateUpdater.getAndAccumulate(DeflatingStreamSinkConduit.this, ~FLUSHING_BUFFER, (currentState, flag)-> currentState & flag); return true; } @@ -514,7 +517,7 @@ private void deflateData(boolean force) throws IOException { Connectors.updateResponseBytesSent(exchange, count); if (!outputBuffer.hasRemaining()) { outputBuffer.flip(); - this.state |= FLUSHING_BUFFER; + stateUpdater.getAndAccumulate(DeflatingStreamSinkConduit.this, FLUSHING_BUFFER, (currentState, flag)-> currentState | flag); if (next == null) { nextCreated = true; this.next = createNextChannel(); @@ -540,7 +543,7 @@ private void deflateData(boolean force) throws IOException { @Override public void truncateWrites() throws IOException { freeBuffer(); - state |= CLOSED; + stateUpdater.getAndAccumulate(DeflatingStreamSinkConduit.this, CLOSED, (currentState, flag)-> currentState | flag); next.truncateWrites(); } @@ -548,7 +551,7 @@ private void freeBuffer() { if (currentBuffer != null) { currentBuffer.close(); currentBuffer = null; - state = state & ~FLUSHING_BUFFER; + stateUpdater.getAndAccumulate(DeflatingStreamSinkConduit.this, ~FLUSHING_BUFFER, (currentState, flag)-> currentState & flag); } if (deflater != null) { deflater = null; diff --git a/core/src/main/java/io/undertow/conduits/FixedLengthStreamSourceConduit.java b/core/src/main/java/io/undertow/conduits/FixedLengthStreamSourceConduit.java index 1757210309..b8a6214073 100644 --- a/core/src/main/java/io/undertow/conduits/FixedLengthStreamSourceConduit.java +++ b/core/src/main/java/io/undertow/conduits/FixedLengthStreamSourceConduit.java @@ -30,6 +30,7 @@ import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; import static java.lang.Math.min; import static org.xnio.Bits.allAreClear; @@ -60,7 +61,9 @@ public final class FixedLengthStreamSourceConduit extends AbstractStreamSourceCo private final ConduitListener finishListener; @SuppressWarnings("unused") - private long state; + private volatile long state; + private static final AtomicLongFieldUpdater stateUpdater = AtomicLongFieldUpdater.newUpdater( + FixedLengthStreamSourceConduit.class, "state"); private static final long FLAG_CLOSED = 1L << 63L; private static final long FLAG_FINISHED = 1L << 62L; @@ -169,11 +172,11 @@ private void checkMaxSize(long state) throws IOException { Connectors.terminateRequest(exchange); exchange.setPersistent(false); finishListener.handleEvent(this); - this.state |= FLAG_FINISHED | FLAG_CLOSED; + stateUpdater.accumulateAndGet(this, FLAG_FINISHED | FLAG_CLOSED, (current, flag ) -> current | flag ); throw UndertowMessages.MESSAGES.requestEntityWasTooLarge(exchange.getMaxEntitySize()); } } - this.state |= FLAG_LENGTH_CHECKED; + stateUpdater.accumulateAndGet(this, FLAG_LENGTH_CHECKED, (current, flag ) -> current | flag ); } } @@ -336,7 +339,7 @@ private long enterShutdownReads() { return oldVal; } newVal = oldVal | FLAG_CLOSED; - state = newVal; + stateUpdater.set(this, newVal); return oldVal; } @@ -360,7 +363,7 @@ private void exitRead(long consumed, Throwable readError) throws IOException { if(consumed == -1) { if (anyAreSet(oldVal, MASK_COUNT)) { invokeFinishListener(); - state &= ~MASK_COUNT; + stateUpdater.accumulateAndGet(this, ~MASK_COUNT, (current, flag ) -> current & flag ); final IOException couldNotReadAll = UndertowMessages.MESSAGES.couldNotReadContentLengthData(); if (readError != null) { couldNotReadAll.addSuppressed(readError); @@ -370,11 +373,11 @@ private void exitRead(long consumed, Throwable readError) throws IOException { return; } long newVal = oldVal - consumed; - state = newVal; + stateUpdater.set(this, newVal); } private void invokeFinishListener() { - this.state |= FLAG_FINISHED; + stateUpdater.accumulateAndGet(this, FLAG_FINISHED, (current, flag ) -> current | flag ); finishListener.handleEvent(this); } diff --git a/core/src/main/java/io/undertow/conduits/HeadStreamSinkConduit.java b/core/src/main/java/io/undertow/conduits/HeadStreamSinkConduit.java index aad1232380..0a3b78deab 100644 --- a/core/src/main/java/io/undertow/conduits/HeadStreamSinkConduit.java +++ b/core/src/main/java/io/undertow/conduits/HeadStreamSinkConduit.java @@ -22,6 +22,7 @@ import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; import java.nio.channels.FileChannel; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import org.xnio.IoUtils; import org.xnio.channels.StreamSourceChannel; @@ -43,7 +44,9 @@ public final class HeadStreamSinkConduit extends AbstractStreamSinkConduit finishListener; - private int state; + @SuppressWarnings("unused") + private volatile int state; + private static final AtomicIntegerFieldUpdater stateUpdater = AtomicIntegerFieldUpdater.newUpdater(HeadStreamSinkConduit.class, "state"); private final boolean shutdownDelegate; private static final int FLAG_CLOSE_REQUESTED = 1; @@ -122,29 +125,26 @@ public long transferFrom(final StreamSourceChannel source, final long count, fin } public boolean flush() throws IOException { - int val = state; - if (anyAreSet(val, FLAG_CLOSE_COMPLETE)) { + if (anyAreSet(state, FLAG_CLOSE_COMPLETE)) { return true; } boolean flushed = false; try { return flushed = next.flush(); } finally { - exitFlush(val, flushed); + exitFlush(flushed); } } public void suspendWrites() { - long val = state; - if (anyAreSet(val, FLAG_CLOSE_COMPLETE)) { + if (anyAreSet(state, FLAG_CLOSE_COMPLETE)) { return; } next.suspendWrites(); } public void resumeWrites() { - long val = state; - if (anyAreSet(val, FLAG_CLOSE_COMPLETE)) { + if (anyAreSet(state, FLAG_CLOSE_COMPLETE)) { return; } next.resumeWrites(); @@ -156,37 +156,31 @@ public boolean isWriteResumed() { } public void wakeupWrites() { - long val = state; - if (anyAreSet(val, FLAG_CLOSE_COMPLETE)) { + if (anyAreSet(state, FLAG_CLOSE_COMPLETE)) { return; } next.wakeupWrites(); } public void terminateWrites() throws IOException { - int oldVal, newVal; - oldVal = state; - if (anyAreSet(oldVal, FLAG_CLOSE_REQUESTED | FLAG_CLOSE_COMPLETE)) { + if (anyAreSet(state, FLAG_CLOSE_REQUESTED | FLAG_CLOSE_COMPLETE)) { // no action necessary return; } - newVal = oldVal | FLAG_CLOSE_REQUESTED; - state = newVal; + stateUpdater.accumulateAndGet(this, FLAG_CLOSE_REQUESTED, (currentState, flag)-> currentState | flag); if(shutdownDelegate) { next.terminateWrites(); } } - private void exitFlush(int oldVal, boolean flushed) { - int newVal = oldVal; + private void exitFlush( boolean flushed) { boolean callFinish = false; - if (anyAreSet(oldVal, FLAG_CLOSE_REQUESTED) && flushed) { - newVal |= FLAG_CLOSE_COMPLETE; - if (!anyAreSet(oldVal, FLAG_FINISHED_CALLED)) { - newVal |= FLAG_FINISHED_CALLED; + if (anyAreSet(state, FLAG_CLOSE_REQUESTED) && flushed) { + stateUpdater.accumulateAndGet(this, FLAG_CLOSE_COMPLETE, (currentState, flag)-> currentState | flag); + if (!anyAreSet(state, FLAG_FINISHED_CALLED)) { + stateUpdater.accumulateAndGet(this, FLAG_FINISHED_CALLED, (currentState, flag)-> currentState | flag); callFinish = true; } - state = newVal; if (callFinish) { if (finishListener != null) { finishListener.handleEvent(this); diff --git a/core/src/main/java/io/undertow/conduits/PreChunkedStreamSinkConduit.java b/core/src/main/java/io/undertow/conduits/PreChunkedStreamSinkConduit.java index 2255e6fdd4..942267836a 100644 --- a/core/src/main/java/io/undertow/conduits/PreChunkedStreamSinkConduit.java +++ b/core/src/main/java/io/undertow/conduits/PreChunkedStreamSinkConduit.java @@ -33,6 +33,7 @@ import java.nio.channels.ClosedChannelException; import java.nio.channels.FileChannel; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import static org.xnio.Bits.allAreClear; import static org.xnio.Bits.anyAreSet; @@ -52,7 +53,9 @@ public class PreChunkedStreamSinkConduit extends AbstractStreamSinkConduit stateUpdater = AtomicIntegerFieldUpdater.newUpdater( + PreChunkedStreamSinkConduit.class, "state"); final ChunkReader chunkReader; /** @@ -183,7 +186,7 @@ public boolean flush() throws IOException { } private void invokeFinishListener() { - state |= FLAG_FINISHED; + stateUpdater.accumulateAndGet(this, FLAG_FINISHED, (current, flag) -> current | flag); if (finishListener != null) { finishListener.handleEvent(this); } @@ -197,7 +200,7 @@ public void terminateWrites() throws IOException { if (chunkReader.getChunkRemaining() != -1) { throw UndertowMessages.MESSAGES.chunkedChannelClosedMidChunk(); } - state |= FLAG_WRITES_SHUTDOWN; + stateUpdater.accumulateAndGet(this, FLAG_WRITES_SHUTDOWN, (current, flag) -> current | flag); } @Override diff --git a/core/src/main/java/io/undertow/io/UndertowInputStream.java b/core/src/main/java/io/undertow/io/UndertowInputStream.java index 576c715dcf..17e17d174f 100644 --- a/core/src/main/java/io/undertow/io/UndertowInputStream.java +++ b/core/src/main/java/io/undertow/io/UndertowInputStream.java @@ -30,6 +30,7 @@ import java.io.InputStream; import java.nio.ByteBuffer; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import static io.undertow.UndertowLogger.REQUEST_IO_LOGGER; import static io.undertow.UndertowOptions.DEFAULT_READ_TIMEOUT; @@ -56,7 +57,9 @@ public class UndertowInputStream extends InputStream { private static final int FLAG_CLOSED = 1; private static final int FLAG_FINISHED = 1 << 1; - private int state; + @SuppressWarnings("unused") + private volatile int state; + private static final AtomicIntegerFieldUpdater stateUpdater = AtomicIntegerFieldUpdater.newUpdater(UndertowInputStream.class, "state"); private PooledByteBuffer pooled; public UndertowInputStream(final HttpServerExchange exchange) { @@ -133,7 +136,7 @@ private void readIntoBuffer() throws IOException { int res = Channels.readBlocking(channel, pooled.getBuffer(), readTimeout, TimeUnit.MILLISECONDS); pooled.getBuffer().flip(); if (res == -1) { - state |= FLAG_FINISHED; + stateUpdater.getAndAccumulate(this, FLAG_FINISHED, (currentState, flag)-> currentState | flag); pooled.close(); pooled = null; } else if (res == 0) { @@ -153,7 +156,7 @@ private void readIntoBufferNonBlocking() throws IOException { } pooled.getBuffer().flip(); if (res == -1) { - state |= FLAG_FINISHED; + stateUpdater.getAndAccumulate(this, FLAG_FINISHED, (currentState, flag)-> currentState | flag); pooled.close(); pooled = null; } @@ -180,7 +183,7 @@ public void close() throws IOException { if (anyAreSet(state, FLAG_CLOSED)) { return; } - state |= FLAG_CLOSED; + stateUpdater.getAndAccumulate(this, FLAG_CLOSED, (currentState, flag)-> currentState | flag); try { while (allAreClear(state, FLAG_FINISHED)) { readIntoBuffer(); @@ -195,7 +198,7 @@ public void close() throws IOException { pooled = null; } channel.shutdownReads(); - state |= FLAG_FINISHED; + stateUpdater.getAndAccumulate(this, FLAG_FINISHED, (currentState, flag)-> currentState | flag); } } } diff --git a/core/src/main/java/io/undertow/io/UndertowOutputStream.java b/core/src/main/java/io/undertow/io/UndertowOutputStream.java index 22b7b49709..eaeff45be7 100644 --- a/core/src/main/java/io/undertow/io/UndertowOutputStream.java +++ b/core/src/main/java/io/undertow/io/UndertowOutputStream.java @@ -22,6 +22,7 @@ import java.io.OutputStream; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import io.undertow.UndertowMessages; import io.undertow.server.HttpServerExchange; @@ -51,7 +52,9 @@ public class UndertowOutputStream extends OutputStream implements BufferWritable private ByteBuffer buffer; private PooledByteBuffer pooledBuffer; private StreamSinkChannel channel; - private int state; + @SuppressWarnings("unused") + private volatile int state; + private static final AtomicIntegerFieldUpdater stateUpdater = AtomicIntegerFieldUpdater.newUpdater(UndertowOutputStream.class, "state"); private long written; private final long contentLength; @@ -230,7 +233,7 @@ public void write(ByteBuffer[] buffers) throws IOException { channel = exchange.getResponseChannel(); } Channels.writeBlocking(channel, buffers, 0, buffers.length); - state |= FLAG_WRITE_STARTED; + stateUpdater.getAndAccumulate(this, FLAG_WRITE_STARTED, (currentState, flag)-> currentState | flag); } else { ByteBuffer buffer = buffer(); if (len < buffer.remaining()) { @@ -249,7 +252,7 @@ public void write(ByteBuffer[] buffers) throws IOException { Channels.writeBlocking(channel, newBuffers, 0, newBuffers.length); buffer.clear(); } - state |= FLAG_WRITE_STARTED; + stateUpdater.getAndAccumulate(this, FLAG_WRITE_STARTED, (currentState, flag)-> currentState | flag); } } updateWritten(len); @@ -304,7 +307,7 @@ private void writeBufferBlocking(final boolean writeFinal) throws IOException { } } buffer.clear(); - state |= FLAG_WRITE_STARTED; + stateUpdater.getAndAccumulate(this, FLAG_WRITE_STARTED, (currentState, flag)-> currentState | flag); } @Override @@ -330,7 +333,7 @@ public void transferFrom(FileChannel source) throws IOException { public void close() throws IOException { if (anyAreSet(state, FLAG_CLOSED)) return; try { - state |= FLAG_CLOSED; + stateUpdater.getAndAccumulate(this, FLAG_CLOSED, (currentState, flag)-> currentState | flag); if (anyAreClear(state, FLAG_WRITE_STARTED) && channel == null && !isHeadRequestWithContentLength(exchange)) { diff --git a/core/src/main/java/io/undertow/server/HttpServerExchange.java b/core/src/main/java/io/undertow/server/HttpServerExchange.java index 89f61c0c25..db1c7fc83d 100644 --- a/core/src/main/java/io/undertow/server/HttpServerExchange.java +++ b/core/src/main/java/io/undertow/server/HttpServerExchange.java @@ -1709,7 +1709,7 @@ HttpServerExchange setRequestStartTime(long requestStartTime) { * If the exchange is already complete this method is a noop */ public HttpServerExchange endExchange() { - final int state = this.state; + //TODO: check if this required or even desired? if (allAreSet(state, FLAG_REQUEST_TERMINATED | FLAG_RESPONSE_TERMINATED)) { if(blockingHttpExchange != null) { //we still have to close the blocking exchange in this case, @@ -1854,7 +1854,7 @@ public void handleEvent(final StreamSinkChannel channel) { channel.suspendWrites(); channel.getWriteSetter().set(null); //defensive programming, should never happen - if (anyAreClear(state, FLAG_RESPONSE_TERMINATED)) { + if (anyAreClear(HttpServerExchange.this.state, FLAG_RESPONSE_TERMINATED)) { //make sure the listeners have been invoked invokeExchangeCompleteListeners(); UndertowLogger.ROOT_LOGGER.responseWasNotTerminated(connection, HttpServerExchange.this); @@ -2105,14 +2105,14 @@ private class WriteDispatchChannel extends DetachableStreamSinkChannel implement @Override protected boolean isFinished() { - return allAreSet(state, FLAG_RESPONSE_TERMINATED); + return allAreSet(HttpServerExchange.this.state, FLAG_RESPONSE_TERMINATED); } @Override public void resumeWrites() { if (isInCall()) { setFlags(FLAG_SHOULD_RESUME_WRITES); - if(anyAreSet(state, FLAG_DISPATCHED)) { + if(anyAreSet(HttpServerExchange.this.state, FLAG_DISPATCHED)) { throw UndertowMessages.MESSAGES.resumedAndDispatched(); } } else if(!isFinished()){ @@ -2134,7 +2134,7 @@ public void wakeupWrites() { if (isInCall()) { wakeup = true; setFlags(FLAG_SHOULD_RESUME_WRITES); - if(anyAreSet(state, FLAG_DISPATCHED)) { + if(anyAreSet(HttpServerExchange.this.state, FLAG_DISPATCHED)) { throw UndertowMessages.MESSAGES.resumedAndDispatched(); } } else { @@ -2144,7 +2144,7 @@ public void wakeupWrites() { @Override public boolean isWriteResumed() { - return anyAreSet(state, FLAG_SHOULD_RESUME_WRITES) || super.isWriteResumed(); + return anyAreSet(HttpServerExchange.this.state, FLAG_SHOULD_RESUME_WRITES) || super.isWriteResumed(); } public void runResume() { @@ -2277,7 +2277,7 @@ private final class ReadDispatchChannel extends DetachableStreamSourceChannel im @Override protected boolean isFinished() { - return allAreSet(state, FLAG_REQUEST_TERMINATED); + return allAreSet(HttpServerExchange.this.state, FLAG_REQUEST_TERMINATED); } @Override @@ -2285,7 +2285,7 @@ public void resumeReads() { readsResumed = true; if (isInCall()) { setFlags(FLAG_SHOULD_RESUME_READS); - if(anyAreSet(state, FLAG_DISPATCHED)) { + if(anyAreSet(HttpServerExchange.this.state, FLAG_DISPATCHED)) { throw UndertowMessages.MESSAGES.resumedAndDispatched(); } } else if (!isFinished()) { @@ -2298,7 +2298,7 @@ public void wakeupReads() { if (isInCall()) { wakeup = true; setFlags(FLAG_SHOULD_RESUME_READS); - if(anyAreSet(state, FLAG_DISPATCHED)) { + if(anyAreSet(HttpServerExchange.this.state, FLAG_DISPATCHED)) { throw UndertowMessages.MESSAGES.resumedAndDispatched(); } } else { @@ -2480,7 +2480,7 @@ public boolean isReadResumed() { if(isFinished()) { return false; } - return anyAreSet(state, FLAG_SHOULD_RESUME_READS) || super.isReadResumed(); + return anyAreSet(HttpServerExchange.this.state, FLAG_SHOULD_RESUME_READS) || super.isReadResumed(); } @Override diff --git a/core/src/main/java/io/undertow/server/handlers/GracefulShutdownHandler.java b/core/src/main/java/io/undertow/server/handlers/GracefulShutdownHandler.java index 263b79565f..d65b9106c9 100644 --- a/core/src/main/java/io/undertow/server/handlers/GracefulShutdownHandler.java +++ b/core/src/main/java/io/undertow/server/handlers/GracefulShutdownHandler.java @@ -63,6 +63,7 @@ public class GracefulShutdownHandler implements HttpHandler { private final Object lock = new Object(); + @SuppressWarnings("unused") private volatile long state = 0; private static final AtomicLongFieldUpdater stateUpdater = AtomicLongFieldUpdater.newUpdater(GracefulShutdownHandler.class, "state"); diff --git a/core/src/main/java/io/undertow/server/protocol/http/HttpResponseConduit.java b/core/src/main/java/io/undertow/server/protocol/http/HttpResponseConduit.java index f8fdfe916c..49c605d8f2 100644 --- a/core/src/main/java/io/undertow/server/protocol/http/HttpResponseConduit.java +++ b/core/src/main/java/io/undertow/server/protocol/http/HttpResponseConduit.java @@ -41,6 +41,7 @@ import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; import java.nio.channels.FileChannel; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import static org.xnio.Bits.allAreClear; import static org.xnio.Bits.allAreSet; @@ -57,8 +58,6 @@ final class HttpResponseConduit extends AbstractStreamSinkConduit stateUpdater = AtomicIntegerFieldUpdater.newUpdater( + HttpResponseConduit.class, "state"); + HttpResponseConduit(final StreamSinkConduit next, final ByteBufferPool pool, HttpServerConnection connection) { super(next); this.pool = pool; @@ -102,9 +105,8 @@ final class HttpResponseConduit extends AbstractStreamSinkConduit= 50; // append protocol HttpString protocol = exchange.getProtocol(); @@ -232,7 +234,7 @@ private int processWrite(int state, final Object userData, int pos, int length) this.headerValues = headerValues; this.valueIdx = valueIdx; this.charIndex = 0; - this.state = STATE_HDR_NAME; + stateUpdater.set(this, STATE_HDR_NAME); buffer.flip(); return processStatefulWrite(STATE_HDR_NAME, userData, pos, length); } @@ -247,7 +249,7 @@ private int processWrite(int state, final Object userData, int pos, int length) this.headerValues = headerValues; this.valueIdx = valueIdx; this.charIndex = 0; - this.state = STATE_HDR_VAL; + stateUpdater.set(this, STATE_HDR_VAL); buffer.flip(); return processStatefulWrite(STATE_HDR_VAL, userData, pos, length); } @@ -298,7 +300,7 @@ private int processWrite(int state, final Object userData, int pos, int length) } finally { if (buffer != null) { bufferDone(); - this.state &= ~POOLED_BUFFER_IN_USE; + clearFlags(POOLED_BUFFER_IN_USE); } } } @@ -642,7 +644,7 @@ public int write(final ByteBuffer src) throws IOException { } return alreadyWritten; } finally { - this.state = oldState & ~MASK_STATE | state; + stateUpdater.set(this, oldState & ~MASK_STATE | state); } } catch(IOException|RuntimeException|Error e) { IoUtils.safeClose(connection); @@ -681,7 +683,7 @@ public long write(final ByteBuffer[] srcs, final int offset, final int length) t IoUtils.safeClose(connection); throw e; } finally { - this.state = oldVal & ~MASK_STATE | state; + stateUpdater.set(this, oldVal & ~MASK_STATE | state); } } @@ -784,7 +786,7 @@ public boolean flush() throws IOException { IoUtils.safeClose(connection); throw e; } finally { - this.state = oldVal & ~MASK_STATE | state; + stateUpdater.set(this, oldVal & ~MASK_STATE | state); } } @@ -796,7 +798,7 @@ public void terminateWrites() throws IOException { next.terminateWrites(); return; } - this.state = oldVal | FLAG_SHUTDOWN; + stateUpdater.set(this, oldVal | FLAG_SHUTDOWN); } catch (IOException | RuntimeException | Error e) { IoUtils.safeClose(connection); throw e; @@ -834,4 +836,18 @@ void freeBuffers() { pooledFileTransferBuffer = null; } } + + private void setFlags(int flags) { + int old; + do { + old = state; + } while (!stateUpdater.compareAndSet(this, old, old | flags)); + } + + private void clearFlags(int flags) { + int old; + do { + old = state; + } while (!stateUpdater.compareAndSet(this, old, old & ~flags)); + } } diff --git a/core/src/main/java/io/undertow/server/protocol/http/PipeliningBufferingStreamSinkConduit.java b/core/src/main/java/io/undertow/server/protocol/http/PipeliningBufferingStreamSinkConduit.java index 5f8fe4a574..c826076751 100644 --- a/core/src/main/java/io/undertow/server/protocol/http/PipeliningBufferingStreamSinkConduit.java +++ b/core/src/main/java/io/undertow/server/protocol/http/PipeliningBufferingStreamSinkConduit.java @@ -24,6 +24,7 @@ import java.nio.channels.ClosedChannelException; import java.nio.channels.FileChannel; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import io.undertow.UndertowLogger; import io.undertow.server.HttpServerExchange; @@ -59,8 +60,9 @@ public class PipeliningBufferingStreamSinkConduit extends AbstractStreamSinkCond private static final int DELEGATE_SHUTDOWN = 1 << 1; private static final int FLUSHING = 1 << 3; - private int state; - + @SuppressWarnings("unused") + private volatile int state; + private static final AtomicIntegerFieldUpdater stateUpdater = AtomicIntegerFieldUpdater.newUpdater(PipeliningBufferingStreamSinkConduit.class, "state"); private final ByteBufferPool pool; private PooledByteBuffer buffer; @@ -157,7 +159,7 @@ private long flushBufferWithUserData(final ByteBuffer[] byteBuffers, int offset, } if (!anyAreSet(state, FLUSHING)) { - state |= FLUSHING; + stateUpdater.accumulateAndGet(this, FLUSHING, (current, flag) -> current | flag); byteBuffer.flip(); } int originalBufferedRemaining = byteBuffer.remaining(); @@ -178,7 +180,7 @@ private long flushBufferWithUserData(final ByteBuffer[] byteBuffers, int offset, if (written > originalBufferedRemaining) { buffer.close(); this.buffer = null; - state &= ~FLUSHING; + stateUpdater.accumulateAndGet(this, ~FLUSHING, (current, flag) -> current & flag); return written - originalBufferedRemaining; } return 0; @@ -186,7 +188,7 @@ private long flushBufferWithUserData(final ByteBuffer[] byteBuffers, int offset, } while (written < toWrite); buffer.close(); this.buffer = null; - state &= ~FLUSHING; + stateUpdater.accumulateAndGet(this, ~FLUSHING, (current, flag) -> current & flag); return written - originalBufferedRemaining; } @@ -221,7 +223,7 @@ private boolean flushBuffer() throws IOException { } final ByteBuffer byteBuffer = buffer.getBuffer(); if (!anyAreSet(state, FLUSHING)) { - state |= FLUSHING; + stateUpdater.accumulateAndGet(this, FLUSHING, (current, flag) -> current | flag); byteBuffer.flip(); } while (byteBuffer.hasRemaining()) { @@ -234,7 +236,7 @@ private boolean flushBuffer() throws IOException { } buffer.close(); this.buffer = null; - state &= ~FLUSHING; + stateUpdater.accumulateAndGet(this, ~FLUSHING, (current, flag) -> current & flag); return true; } @@ -266,7 +268,7 @@ public boolean flush() throws IOException { } if (anyAreSet(state, SHUTDOWN) && anyAreClear(state, DELEGATE_SHUTDOWN)) { - state |= DELEGATE_SHUTDOWN; + stateUpdater.accumulateAndGet(this, DELEGATE_SHUTDOWN, (current, flag) -> current | flag); next.terminateWrites(); } return next.flush(); @@ -276,9 +278,9 @@ public boolean flush() throws IOException { @Override public void terminateWrites() throws IOException { - state |= SHUTDOWN; + stateUpdater.accumulateAndGet(this, SHUTDOWN, (current, flag) -> current | flag); if (buffer == null) { - state |= DELEGATE_SHUTDOWN; + stateUpdater.accumulateAndGet(this, DELEGATE_SHUTDOWN, (current, flag) -> current | flag); next.terminateWrites(); } }