Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * JBoss, Home of Professional Open Source.
   * Copyright 2014 Red Hat, Inc., and individual contributors
   * as indicated by the @author tags.
   *
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   *
  *     http://www.apache.org/licenses/LICENSE-2.0
  *
  *  Unless required by applicable law or agreed to in writing, software
  *  distributed under the License is distributed on an "AS IS" BASIS,
  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  *  See the License for the specific language governing permissions and
  *  limitations under the License.
  */
 
 package io.undertow.server.handlers.proxy;
 
 import java.net.URI;
 
A pool of connections to a target host. This pool can also be used to open connections in exclusive mode, in which case they will not be added to the connection pool. In this case the caller is responsible for closing any connections.

Author(s):
Stuart Douglas
 
 public class ProxyConnectionPool implements Closeable {
 
     private final URI uri;
 
     private final InetSocketAddress bindAddress;
 
     private final XnioSsl ssl;
 
     private final UndertowClient client;
 
 
     private final OptionMap options;

    
Set to true when the connection pool is closed.
 
     private volatile boolean closed;
 
     private final int maxConnections;
     private final int maxCachedConnections;
     private final int sMaxConnections;
     private final int maxRequestQueueSize;
     private final long ttl;
 
     private final ConcurrentMap<XnioIoThreadHostThreadDatahostThreadData = new CopyOnWriteMap<>();
 
     public ProxyConnectionPool(ConnectionPoolManager connectionPoolManagerURI uriUndertowClient clientOptionMap options) {
         this(connectionPoolManagerurinullclientoptions);
     }
 
     public ProxyConnectionPool(ConnectionPoolManager connectionPoolManager,InetSocketAddress bindAddressURI uriUndertowClient clientOptionMap options) {
         this(connectionPoolManagerbindAddressurinullclientoptions);
     }
 
     public ProxyConnectionPool(ConnectionPoolManager connectionPoolManagerURI uriXnioSsl sslUndertowClient clientOptionMap options) {
         this(connectionPoolManagernullurisslclientoptions);
     }
 
     public ProxyConnectionPool(ConnectionPoolManager connectionPoolManagerInetSocketAddress bindAddress,URI uriXnioSsl sslUndertowClient clientOptionMap options) {
         this. = connectionPoolManager;
         this. = Math.max(connectionPoolManager.getMaxConnections(), 1);
         this. = Math.max(connectionPoolManager.getMaxCachedConnections(), 0);
         this. = Math.max(connectionPoolManager.getSMaxConnections(), 0);
         this. = Math.max(connectionPoolManager.getMaxQueueSize(), 0);
         this. = connectionPoolManager.getTtl();
        this. = bindAddress;
        this. = uri;
        this. = ssl;
        this. = client;
        this. = options;
    }
    public URI getUri() {
        return ;
    }
    public InetSocketAddress getBindAddress() {
        return ;
    }
    public void close() {
        this. = true;
        for (HostThreadData data : .values()) {
            final ConnectionHolder holder = data.availableConnections.poll();
            if (holder != null) {
                IoUtils.safeClose(holder.clientConnection);
            }
        }
    }

    
Called when the IO thread has completed a successful request

Parameters:
connectionHolder The client connection holder
    private void returnConnection(final ConnectionHolder connectionHolder) {
        HostThreadData hostData = getData();
        if () {
            //the host has been closed
            IoUtils.safeClose(connectionHolder.clientConnection);
            ConnectionHolder con = hostData.availableConnections.poll();
            while (con != null) {
                IoUtils.safeClose(con.clientConnection);
                con = hostData.availableConnections.poll();
            }
            redistributeQueued(hostData);
            return;
        }
        //only do something if the connection is open. If it is closed then
        //the close setter will handle creating a new connection and decrementing
        //the connection count
        final ClientConnection connection = connectionHolder.clientConnection;
        if (connection.isOpen() && !connection.isUpgraded()) {
            CallbackHolder callback = hostData.awaitingConnections.poll();
            while (callback != null && callback.isCancelled()) {
                callback = hostData.awaitingConnections.poll();
            }
            if (callback != null) {
                if (callback.getTimeoutKey() != null) {
                    callback.getTimeoutKey().remove();
                }
                // Anything waiting for a connection is not expecting exclusivity.
                connectionReady(connectionHoldercallback.getCallback(), callback.getExchange(), false);
            } else {
                final int cachedConnectionCount = hostData.availableConnections.size();
                if (cachedConnectionCount >= ) {
                    // Close the longest idle connection instead of the current one
                    final ConnectionHolder holder = hostData.availableConnections.poll();
                    if (holder != null) {
                        IoUtils.safeClose(holder.clientConnection);
                    }
                }
                hostData.availableConnections.add(connectionHolder);
                // If the soft max and ttl are configured
                if ( >= 0 &&  > 0) {
                    final long currentTime = System.currentTimeMillis();
                    connectionHolder.timeout = currentTime + ;
                    timeoutConnections(currentTimehostData);
                }
            }
        } else if (connection.isOpen() && connection.isUpgraded()) {
            //we treat upgraded connections as closed
            //as we do not want the connection pool filled with upgraded connections
            //if the connection is actually closed the close setter will handle it
            connection.getCloseSetter().set(null);
            handleClosedConnection(hostDataconnectionHolder);
        }
    }
    private void handleClosedConnection(HostThreadData hostDatafinal ConnectionHolder connection) {
        int connections = --hostData.connections;
        hostData.availableConnections.remove(connection);
        if (connections < ) {
            CallbackHolder task = hostData.awaitingConnections.poll();
            while (task != null && task.isCancelled()) {
                task = hostData.awaitingConnections.poll();
            }
            if (task != null) {
                openConnection(task.exchangetask.callbackhostDatafalse);
            }
        }
    }
    private void openConnection(final HttpServerExchange exchangefinal ProxyCallback<ProxyConnectioncallbackfinal HostThreadData datafinal boolean exclusive) {
        if (!exclusive) {
            data.connections++;
        }
            @Override
            public void completed(final ClientConnection result) {
                final ConnectionHolder connectionHolder = new ConnectionHolder(result);
                if (!exclusive) {
                    result.getCloseSetter().set(new ChannelListener<ClientConnection>() {
                        @Override
                        public void handleEvent(ClientConnection channel) {
                            handleClosedConnection(dataconnectionHolder);
                        }
                    });
                }
                connectionReady(connectionHoldercallbackexchangeexclusive);
            }
            @Override
            public void failed(IOException e) {
                if (!exclusive) {
                    data.connections--;
                }
                ..debug("Failed to connect"e);
                if (!.handleError()) {
                    redistributeQueued(getData());
                    scheduleFailedHostRetry(exchange);
                }
                callback.failed(exchange);
            }
        }, getUri(), exchange.getIoThread(), exchange.getConnection().getBufferPool(), );
    }
    private void redistributeQueued(HostThreadData hostData) {
        CallbackHolder callback = hostData.awaitingConnections.poll();
        while (callback != null) {
            if (callback.getTimeoutKey() != null) {
                callback.getTimeoutKey().remove();
            }
            if (!callback.isCancelled()) {
                long time = System.currentTimeMillis();
                if (callback.getExpireTime() > 0 && callback.getExpireTime() < time) {
                    callback.getCallback().failed(callback.getExchange());
                } else {
                    callback.getCallback().queuedRequestFailed(callback.getExchange());
                }
            }
            callback = hostData.awaitingConnections.poll();
        }
    }
    private void connectionReady(final ConnectionHolder resultfinal ProxyCallback<ProxyConnectioncallbackfinal HttpServerExchange exchangefinal boolean exclusive) {
            @Override
            public void exchangeEvent(HttpServerExchange exchangeNextListener nextListener) {
                if (!exclusive) {
                    returnConnection(result);
                }
                nextListener.proceed();
            }
        });
        callback.completed(exchangenew ProxyConnection(result.clientConnection.getPath() == null ? "/" : .getPath()));
    }
    public AvailabilityType available() {
        if () {
            return .;
        }
        if (!.isAvailable()) {
            return .;
        }
        HostThreadData data = getData();
        if (data.connections < ) {
            return .;
        }
        if (!data.availableConnections.isEmpty()) {
            return .;
        }
        if (data.awaitingConnections.size() >= ) {
            return .;
        }
        return .;
    }

    
If a host fails we periodically retry

Parameters:
exchange The server exchange
    private void scheduleFailedHostRetry(final HttpServerExchange exchange) {
        final int retry = .getProblemServerRetry();
        // only schedule a retry task if the node is not available
        if (retry > 0 && !.isAvailable()) {
            exchange.getIoThread().executeAfter(new Runnable() {
                @Override
                public void run() {
                    if () {
                        return;
                    }
                    ..debugf("Attempting to reconnect to failed host %s"getUri());
                    .connect(new ClientCallback<ClientConnection>() {
                        @Override
                        public void completed(ClientConnection result) {
                            ..debugf("Connected to previously failed host %s, returning to service"getUri());
                            if (.clearError()) {
                                // In case the node is available now, return the connection
                                final ConnectionHolder connectionHolder = new ConnectionHolder(result);
                                final HostThreadData data = getData();
                                result.getCloseSetter().set(new ChannelListener<ClientConnection>() {
                                    @Override
                                    public void handleEvent(ClientConnection channel) {
                                        handleClosedConnection(dataconnectionHolder);
                                    }
                                });
                                data.connections++;
                                returnConnection(connectionHolder);
                            } else {
                                // Otherwise reschedule the retry task
                                scheduleFailedHostRetry(exchange);
                            }
                        }
                        @Override
                        public void failed(IOException e) {
                            ..debugf("Failed to reconnect to failed host %s"getUri());
                            .handleError();
                            scheduleFailedHostRetry(exchange);
                        }
                    }, getUri(), exchange.getIoThread(), exchange.getConnection().getBufferPool(), );
                }
            }, retry.);
        }
    }

    
Timeout idle connections which are above the soft max cached connections limit.

Parameters:
currentTime the current time
data the local host thread data
    private void timeoutConnections(final long currentTimefinal HostThreadData data) {
        int idleConnections = data.availableConnections.size();
        for (;;) {
            ConnectionHolder holder;
            if (idleConnections > 0 && idleConnections >=  && (holder = data.availableConnections.peek()) != null) {
                if (!holder.clientConnection.isOpen()) {
                    // Already closed connections decrease the available connections
                    idleConnections--;
                } else if (currentTime >= holder.timeout) {
                    // If the timeout is reached already, just close
                    holder = data.availableConnections.poll();
                    IoUtils.safeClose(holder.clientConnection);
                    idleConnections--;
                } else {
                    // If the next run is after the connection timeout don't reschedule the task
                    if (data.timeoutKey == null || data.nextTimeout > holder.timeout) {
                        if (data.timeoutKey != null) {
                            data.timeoutKey.remove();
                            data.timeoutKey = null;
                        }
                        // Schedule a timeout task
                        final long remaining = holder.timeout - currentTime + 1;
                        data.nextTimeout = holder.timeout;
                        data.timeoutKey = holder.clientConnection.getIoThread().executeAfter(data.timeoutTaskremaining.);
                    }
                    return;
                }
            } else {
                // If we are below the soft limit, just cancel the task
                if (data.timeoutKey != null) {
                    data.timeoutKey.remove();
                    data.timeoutKey = null;
                }
                return;
            }
        }
    }

    
Gets the host data for this thread

Returns:
The data for this thread
    private HostThreadData getData() {
        Thread thread = Thread.currentThread();
        if (!(thread instanceof XnioIoThread)) {
            throw ..canOnlyBeCalledByIoThread();
        }
        XnioIoThread ioThread = (XnioIoThreadthread;
        HostThreadData data = .get(ioThread);
        if (data != null) {
            return data;
        }
        data = new HostThreadData();
        HostThreadData existing = .putIfAbsent(ioThreaddata);
        if (existing != null) {
            return existing;
        }
        return data;
    }

    

Parameters:
exclusive - Is connection for the exclusive use of one client?
    public void connect(ProxyClient.ProxyTarget proxyTargetHttpServerExchange exchangeProxyCallback<ProxyConnectioncallbackfinal long timeoutfinal TimeUnit timeUnitboolean exclusive) {
        HostThreadData data = getData();
        ConnectionHolder connectionHolder = data.availableConnections.poll();
        while (connectionHolder != null && !connectionHolder.clientConnection.isOpen()) {
            connectionHolder = data.availableConnections.poll();
        }
        if (connectionHolder != null) {
            if (exclusive) {
                data.connections--;
            }
            connectionReady(connectionHoldercallbackexchangeexclusive);
        } else if (exclusive || data.connections < ) {
            openConnection(exchangecallbackdataexclusive);
        } else {
            // Reject the request directly if we reached the max request queue size
            if (data.awaitingConnections.size() >= ) {
                callback.queuedRequestFailed(exchange);
                return;
            }
            CallbackHolder holder;
            if (timeout > 0) {
                long time = System.currentTimeMillis();
                holder = new CallbackHolder(proxyTargetcallbackexchangetime + timeUnit.toMillis(timeout));
                holder.setTimeoutKey(exchange.getIoThread().executeAfter(holdertimeouttimeUnit));
            } else {
                holder = new CallbackHolder(proxyTargetcallbackexchange, -1);
            }
            data.awaitingConnections.add(holder);
        }
    }
    private final class HostThreadData {
        int connections = 0;
        XnioIoThread.Key timeoutKey;
        long nextTimeout;
        final Deque<ConnectionHolderavailableConnections = new ArrayDeque<>();
        final Deque<CallbackHolderawaitingConnections = new ArrayDeque<>();
        final Runnable timeoutTask = new Runnable() {
            @Override
            public void run() {
                final long currentTime = System.currentTimeMillis();
                timeoutConnections(currentTimeHostThreadData.this);
            }
        };
    }
    private static final class ConnectionHolder {
        private long timeout;
        private final ClientConnection clientConnection;
        private ConnectionHolder(ClientConnection clientConnection) {
            this. = clientConnection;
        }
    }
    private static final class CallbackHolder implements Runnable {
        final ProxyClient.ProxyTarget proxyTarget;
        final ProxyCallback<ProxyConnectioncallback;
        final HttpServerExchange exchange;
        final long expireTime;
        XnioExecutor.Key timeoutKey;
        boolean cancelled = false;
        private CallbackHolder(ProxyClient.ProxyTarget proxyTargetProxyCallback<ProxyConnectioncallbackHttpServerExchange exchangelong expireTime) {
            this. = proxyTarget;
            this. = callback;
            this. = exchange;
            this. = expireTime;
        }
        private ProxyCallback<ProxyConnectiongetCallback() {
            return ;
        }
        private HttpServerExchange getExchange() {
            return ;
        }
        private long getExpireTime() {
            return ;
        }
        private XnioExecutor.Key getTimeoutKey() {
            return ;
        }
        private boolean isCancelled() {
            return ;
        }
        private void setTimeoutKey(XnioExecutor.Key timeoutKey) {
            this. = timeoutKey;
        }
        @Override
        public void run() {
             = true;
            .failed();
        }
        public ProxyClient.ProxyTarget getProxyTarget() {
            return ;
        }
    }
    public enum AvailabilityType {
        
The host is read to accept requests
        AVAILABLE,
        
The host is stopped. No request should be forwarded that are not tied to this node via sticky sessions
        DRAIN,
        
All connections are in use, connections will be queued
        FULL,
        
All connections are in use and the queue is full. Requests will be rejected.
        FULL_QUEUE,
        
The host is probably down, only try as a last resort
        PROBLEM,
        
The host is closed. connections will always fail
        CLOSED;
    }
New to GrepCode? Check out our FAQ X