Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * JBoss, Home of Professional Open Source.
   * Copyright 2014 Red Hat, Inc., and individual contributors
   * as indicated by the @author tags.
   *
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   *
  *     http://www.apache.org/licenses/LICENSE-2.0
  *
  *  Unless required by applicable law or agreed to in writing, software
  *  distributed under the License is distributed on an "AS IS" BASIS,
  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  *  See the License for the specific language governing permissions and
  *  limitations under the License.
  */
 
 package io.undertow.server.protocol.ajp;
 
 
 
 import static org.xnio.IoUtils.safeClose;

Author(s):
Stuart Douglas
 
 
 final class AjpReadListener implements ChannelListener<StreamSourceChannel> {
 
     private static final byte[] CPONG = {'A''B', 0, 1, 9}; //CPONG response data
 
     private final AjpServerConnection connection;
     private final String scheme;
     private final boolean recordRequestStartTime;
     private AjpRequestParseState state = new AjpRequestParseState();
 
     private volatile int read = 0;
     private final int maxRequestSize;
     private final long maxEntitySize;
     private final AjpRequestParser parser;
 
 
     AjpReadListener(final AjpServerConnection connectionfinal String schemeAjpRequestParser parser) {
         this. = connection;
         this. = scheme;
         this. = parser;
         this. = new WriteReadyHandler.ChannelListenerHandler<>(connection.getChannel().getSinkChannel());
         int requestParseTimeout = connection.getUndertowOptions().get(., -1);
         int requestIdleTimeout = connection.getUndertowOptions().get(., -1);
         if(requestIdleTimeout < 0 && requestParseTimeout < 0) {
             this. = null;
         } else {
             this. = new ParseTimeoutUpdater(connectionrequestParseTimeoutrequestIdleTimeout);
             connection.addCloseListener();
         }
     }
 
     public void startRequest() {
         .resetChannel();
          = new AjpRequestParseState();
          = 0;
         if( != null) {
             .connectionIdle();
         }
     }
 
     public void handleEvent(final StreamSourceChannel channel) {
            safeClose();
            channel.suspendReads();
            return;
        }
        Pooled<ByteBufferexisting = .getExtraBytes();
        final Pooled<ByteBufferpooled = existing == null ? .getBufferPool().allocate() : existing;
        final ByteBuffer buffer = pooled.getResource();
        boolean free = true;
        boolean bytesRead = false;
        try {
            int res;
            do {
                if (existing == null) {
                    buffer.clear();
                    try {
                        res = channel.read(buffer);
                    } catch (IOException e) {
                        ..ioException(e);
                        safeClose(channel);
                        return;
                    }
                } else {
                    res = buffer.remaining();
                }
                if (res == 0) {
                    if(bytesRead &&  != null) {
                        .failedParse();
                    }
                    if (!channel.isReadResumed()) {
                        channel.getReadSetter().set(this);
                        channel.resumeReads();
                    }
                    return;
                }
                if (res == -1) {
                    try {
                        channel.shutdownReads();
                        final StreamSinkChannel responseChannel = .getChannel().getSinkChannel();
                        responseChannel.shutdownWrites();
                        safeClose();
                    } catch (IOException e) {
                        ..ioException(e);
                        // fuck it, it's all ruined
                        safeClose();
                        return;
                    }
                    return;
                }
                bytesRead = true;
                //TODO: we need to handle parse errors
                if (existing != null) {
                    existing = null;
                    .setExtraBytes(null);
                } else {
                    buffer.flip();
                }
                int begin = buffer.remaining();
                .parse(buffer);
                 += begin - buffer.remaining();
                if (buffer.hasRemaining()) {
                    free = false;
                    .setExtraBytes(pooled);
                }
                if ( > ) {
                    safeClose();
                    return;
                }
            } while (!.isComplete());
            if( != null) {
                .requestStarted();
            }
            if (. != .) {
                if (. == .) {
                    ..debug("Received CPING, sending CPONG");
                    handleCPing();
                } else if (. == .) {
                    ..debug("Received CPONG, starting next request");
                     = new AjpRequestParseState();
                    channel.getReadSetter().set(this);
                    channel.resumeReads();
                } else {
                    safeClose();
                }
                return;
            }
            // we remove ourselves as the read listener from the channel;
            // if the http handler doesn't set any then reads will suspend, which is the right thing to do
            channel.getReadSetter().set(null);
            channel.suspendReads();
            final HttpServerExchange httpServerExchange = this.;
            final AjpServerResponseConduit responseConduit = new AjpServerResponseConduit(.getChannel().getSinkChannel().getConduit(), .getBufferPool(), httpServerExchangenew ConduitListener<AjpServerResponseConduit>() {
                @Override
                public void handleEvent(AjpServerResponseConduit channel) {
                    Connectors.terminateResponse(httpServerExchange);
                }
            }, httpServerExchange.getRequestMethod().equals(.));
            .getChannel().getSinkChannel().setConduit(responseConduit);
            .getChannel().getSourceChannel().setConduit(createSourceConduit(.getChannel().getSourceChannel().getConduit(), responseConduithttpServerExchange));
            //we need to set the write ready handler. This allows the response conduit to wrap it
            responseConduit.setWriteReadyHandler();
            try {
                .setSSLSessionInfo(.createSslSessionInfo());
                httpServerExchange.setSourceAddress(.createPeerAddress());
                httpServerExchange.setDestinationAddress(.createDestinationAddress());
                if( != null) {
                    httpServerExchange.setRequestScheme();
                }
                 = null;
                this. = null;
                httpServerExchange.setPersistent(true);
                if() {
                    Connectors.setRequestStartTime(httpServerExchange);
                }
                .setCurrentExchange(httpServerExchange);
                Connectors.executeRootHandler(.getRootHandler(), httpServerExchange);
            } catch (Throwable t) {
                //TODO: we should attempt to return a 500 status code in this situation
                ..exceptionProcessingRequest(t);
                safeClose(channel);
                safeClose();
            }
        } catch (Exception e) {
            safeClose(.getChannel());
        } finally {
            if (freepooled.free();
        }
    }
    private void handleCPing() {
         = new AjpRequestParseState();
        final StreamConnection underlyingChannel = .getChannel();
        underlyingChannel.getSourceChannel().suspendReads();
        final ByteBuffer buffer = ByteBuffer.wrap();
        int res;
        try {
            do {
                res = underlyingChannel.getSinkChannel().write(buffer);
                if (res == 0) {
                    underlyingChannel.getSinkChannel().setWriteListener(new ChannelListener<ConduitStreamSinkChannel>() {
                        @Override
                        public void handleEvent(ConduitStreamSinkChannel channel) {
                            int res;
                            do {
                                try {
                                    res = channel.write(buffer);
                                    if (res == 0) {
                                        return;
                                    }
                                } catch (IOException e) {
                                    ..ioException(e);
                                    safeClose();
                                }
                            } while (buffer.hasRemaining());
                            channel.suspendWrites();
                            AjpReadListener.this.handleEvent(underlyingChannel.getSourceChannel());
                        }
                    });
                    underlyingChannel.getSinkChannel().resumeWrites();
                    return;
                }
            } while (buffer.hasRemaining());
            AjpReadListener.this.handleEvent(underlyingChannel.getSourceChannel());
        } catch (IOException e) {
            ..ioException(e);
            safeClose();
        }
    }
    public void exchangeComplete(final HttpServerExchange exchange) {
        if (!exchange.isUpgrade() && exchange.isPersistent()) {
            startRequest();
            ConduitStreamSourceChannel channel = ((AjpServerConnectionexchange.getConnection()).getChannel().getSourceChannel();
            channel.getReadSetter().set(this);
            channel.wakeupReads();
        } else if(!exchange.isPersistent()) {
            safeClose(exchange.getConnection());
        }
    }
    private StreamSourceConduit createSourceConduit(StreamSourceConduit underlyingConduitAjpServerResponseConduit responseConduitfinal HttpServerExchange exchange) {
        ReadDataStreamSourceConduit conduit = new ReadDataStreamSourceConduit(underlyingConduit, (AbstractServerConnectionexchange.getConnection());
        final HeaderMap requestHeaders = exchange.getRequestHeaders();
        HttpString transferEncoding = .;
        Long length;
        final String teHeader = requestHeaders.getLast(.);
        boolean hasTransferEncoding = teHeader != null;
        if (hasTransferEncoding) {
            transferEncoding = new HttpString(teHeader);
        }
        final String requestContentLength = requestHeaders.getFirst(.);
        if (hasTransferEncoding && !transferEncoding.equals(.)) {
            length = null//unknown length
        } else if (requestContentLength != null) {
            final long contentLength = Long.parseLong(requestContentLength);
            if (contentLength == 0L) {
                ..trace("No content, starting next request");
                // no content - immediately start the next request, returning an empty stream for this one
                Connectors.terminateRequest();
                return new EmptyStreamSourceConduit(conduit.getReadThread());
            } else {
                length = contentLength;
            }
        } else {
            ..trace("No content length or transfer coding, starting next request");
            // no content - immediately start the next request, returning an empty stream for this one
            Connectors.terminateRequest(exchange);
            return new EmptyStreamSourceConduit(conduit.getReadThread());
        }
        return new AjpServerRequestConduit(conduitexchangeresponseConduitlengthnew ConduitListener<AjpServerRequestConduit>() {
            @Override
            public void handleEvent(AjpServerRequestConduit channel) {
                Connectors.terminateRequest(exchange);
            }
        });
    }
New to GrepCode? Check out our FAQ X