Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * JBoss, Home of Professional Open Source.
   * Copyright 2014 Red Hat, Inc., and individual contributors
   * as indicated by the @author tags.
   *
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   *
  *     http://www.apache.org/licenses/LICENSE-2.0
  *
  *  Unless required by applicable law or agreed to in writing, software
  *  distributed under the License is distributed on an "AS IS" BASIS,
  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  *  See the License for the specific language governing permissions and
  *  limitations under the License.
  */
 
 package io.undertow.server.protocol.http;
 
 
Listener which reads requests and headers off of an HTTP stream.

Author(s):
David M. Lloyd
 
 
     private static final String BAD_REQUEST = "HTTP/1.1 400 Bad Request\r\nContent-Length: 0\r\nConnection: close\r\n\r\n";
 
     private final HttpServerConnection connection;
     private final ParseState state = new ParseState();
     private final HttpRequestParser parser;
 
 
     private int read = 0;
     private final int maxRequestSize;
     private final long maxEntitySize;
     private final boolean recordRequestStartTime;
 
     //0 = new request ok, reads resumed
     //1 = request running, new request not ok
     //2 = suspending/resuming in progress
     private volatile int requestState;
 
     private static final AtomicIntegerFieldUpdater<HttpReadListenerrequestStateUpdater = AtomicIntegerFieldUpdater.newUpdater(HttpReadListener.class"requestState");
 
 
     HttpReadListener(final HttpServerConnection connectionfinal HttpRequestParser parser) {
         this. = connection;
         this. = parser;
         int requestParseTimeout = connection.getUndertowOptions().get(., -1);
         int requestIdleTimeout = connection.getUndertowOptions().get(., -1);
         if(requestIdleTimeout < 0 && requestParseTimeout < 0) {
             this. = null;
         } else {
             this. = new ParseTimeoutUpdater(connectionrequestParseTimeoutrequestIdleTimeout);
             connection.addCloseListener();
         }
     }
 
     public void newRequest() {
         .reset();
          = 0;
         if( != null) {
             .connectionIdle();
         }
     }
 
     public void handleEvent(final ConduitStreamSourceChannel channel) {
         while (.get(this) != 0) {
            //if the CAS fails it is because another thread is in the process of changing state
            //we just immediately retry
            if (.compareAndSet(this, 1, 2)) {
                channel.suspendReads();
                .set(this, 1);
                return;
            }
        }
        handleEventWithNoRunningRequest(channel);
    }
    public void handleEventWithNoRunningRequest(final ConduitStreamSourceChannel channel) {
        Pooled<ByteBufferexisting = .getExtraBytes();
        if ((existing == null && .getOriginalSourceConduit().isReadShutdown()) || .getOriginalSinkConduit().isWriteShutdown()) {
            IoUtils.safeClose();
            channel.suspendReads();
            return;
        }
        final Pooled<ByteBufferpooled = existing == null ? .getBufferPool().allocate() : existing;
        final ByteBuffer buffer = pooled.getResource();
        boolean free = true;
        try {
            int res;
            boolean bytesRead = false;
            do {
                if (existing == null) {
                    buffer.clear();
                    try {
                        res = channel.read(buffer);
                    } catch (IOException e) {
                        ..debug("Error reading request"e);
                        IoUtils.safeClose();
                        return;
                    }
                } else {
                    res = buffer.remaining();
                }
                if (res <= 0) {
                    if(bytesRead &&  != null) {
                        .failedParse();
                    }
                    handleFailedRead(channelres);
                    return;
                } else {
                    bytesRead = true;
                }
                if (existing != null) {
                    existing = null;
                    .setExtraBytes(null);
                } else {
                    buffer.flip();
                }
                .handle(buffer);
                if (buffer.hasRemaining()) {
                    free = false;
                    .setExtraBytes(pooled);
                }
                int total =  + res;
                 = total;
                if ( > ) {
                    IoUtils.safeClose();
                    return;
                }
            } while (!.isComplete());
            if( != null) {
                .requestStarted();
            }
            final HttpServerExchange httpServerExchange = this.;
            httpServerExchange.setRequestScheme(.getSslSession() != null ? "https" : "http");
            this. = null;
            .set(this, 1);
            HttpTransferEncoding.setupRequest(httpServerExchange);
            if () {
                Connectors.setRequestStartTime(httpServerExchange);
            }
            .setCurrentExchange(httpServerExchange);
            Connectors.executeRootHandler(.getRootHandler(), httpServerExchange);
        } catch (Exception e) {
            sendBadRequestAndClose(.getChannel(), e);
            return;
        } finally {
            if (freepooled.free();
        }
    }
    private void handleFailedRead(ConduitStreamSourceChannel channelint res) {
        if (res == 0) {
            channel.setReadListener(this);
            channel.resumeReads();
        } else if (res == -1) {
            handleConnectionClose(channel);
        }
    }
    private void handleConnectionClose(StreamSourceChannel channel) {
        try {
            channel.suspendReads();
            channel.shutdownReads();
            final StreamSinkChannel responseChannel = this..getChannel().getSinkChannel();
            responseChannel.shutdownWrites();
            IoUtils.safeClose();
        } catch (IOException e) {
            ..debug("Error reading request"e);
            // fuck it, it's all ruined
            IoUtils.safeClose();
        }
    }
    private void sendBadRequestAndClose(final StreamConnection connectionfinal Exception exception) {
        connection.getSourceChannel().suspendReads();
            @Override
            protected void writeDone(final StreamSinkChannel c) {
                super.writeDone(c);
                c.suspendWrites();
                IoUtils.safeClose(connection);
            }
            @Override
            protected void handleError(StreamSinkChannel channelIOException e) {
                IoUtils.safeClose(connection);
            }
        }.setup(connection.getSinkChannel());
    }
    public void exchangeComplete(final HttpServerExchange exchange) {
        .clearChannel();
        final HttpServerConnection connection = this.;
        if (exchange.isPersistent() && !exchange.isUpgrade()) {
            final StreamConnection channel = connection.getChannel();
            if (connection.getExtraBytes() == null) {
                //if we are not pipelining we just register a listener
                //we have to resume from with the io thread
                if (exchange.isInIoThread()) {
                    //no need for CAS, we are in the IO thread
                    newRequest();
                    channel.getSourceChannel().setReadListener(HttpReadListener.this);
                    channel.getSourceChannel().resumeReads();
                    .set(this, 0);
                } else {
                    while (true) {
                        if (connection.getOriginalSourceConduit().isReadShutdown() || connection.getOriginalSinkConduit().isWriteShutdown()) {
                            channel.getSourceChannel().suspendReads();
                            channel.getSinkChannel().suspendWrites();
                            IoUtils.safeClose(connection);
                            return;
                        } else {
                            if (.compareAndSet(this, 1, 2)) {
                                newRequest();
                                channel.getSourceChannel().setReadListener(HttpReadListener.this);
                                .set(this, 0);
                                channel.getSourceChannel().resumeReads();
                                break;
                            }
                        }
                    }
                }
            } else {
                if (exchange.isInIoThread()) {
                    .set(this, 0); //no need to CAS, as we don't actually resume
                    newRequest();
                    //no need to suspend reads here, the task will always run before the read listener anyway
                    channel.getIoThread().execute(this);
                } else {
                    while (true) {
                        if (connection.getOriginalSinkConduit().isWriteShutdown()) {
                            channel.getSourceChannel().suspendReads();
                            channel.getSinkChannel().suspendWrites();
                            IoUtils.safeClose(connection);
                            return;
                        } else if (.compareAndSet(this, 1, 2)) {
                            newRequest();
                            channel.getSourceChannel().suspendReads();
                            .set(this, 0);
                            break;
                        }
                    }
                    Executor executor = exchange.getDispatchExecutor();
                    if (executor == null) {
                        executor = exchange.getConnection().getWorker();
                    }
                    executor.execute(this);
                }
            }
        } else if (!exchange.isPersistent()) {
            IoUtils.safeClose(connection);
        } else if (exchange.isUpgrade()) {
            if (connection.getExtraBytes() != null) {
                connection.getChannel().getSourceChannel().setConduit(new ReadDataStreamSourceConduit(connection.getChannel().getSourceChannel().getConduit(), connection));
            }
            try {
                if (!connection.getChannel().getSinkChannel().flush()) {
                    connection.getChannel().getSinkChannel().setWriteListener(ChannelListeners.flushingChannelListener(new ChannelListener<ConduitStreamSinkChannel>() {
                        @Override
                        public void handleEvent(ConduitStreamSinkChannel conduitStreamSinkChannel) {
                            connection.getUpgradeListener().handleUpgrade(connection.getChannel(), exchange);
                        }
                    }, new ClosingChannelExceptionHandler<ConduitStreamSinkChannel>(connection)));
                    connection.getChannel().getSinkChannel().resumeWrites();
                    return;
                }
                connection.getUpgradeListener().handleUpgrade(connection.getChannel(), exchange);
            } catch (IOException e) {
                ..ioException(e);
                IoUtils.safeClose(connection);
            }
        }
    }
    @Override
    public void run() {
    }
New to GrepCode? Check out our FAQ X