Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Copyright 2013 Jeanfrancois Arcand
   *
   * Licensed under the Apache License, Version 2.0 (the "License"); you may not
   * use this file except in compliance with the License. You may obtain a copy of
   * the License at
   *
   * http://www.apache.org/licenses/LICENSE-2.0
   *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
  * License for the specific language governing permissions and limitations under
  * the License.
  */
 package org.atmosphere.wasync.impl;
 
 
 import java.util.List;
 import java.util.Map;
Default implementation of the org.atmosphere.wasync.Socket

Author(s):
Jeanfrancois Arcand
 
 public class DefaultSocket implements Socket {
 
     private final static Logger logger = LoggerFactory.getLogger(DefaultSocket.class);
 
     protected Request request;
     protected SocketRuntime socketRuntime;
     protected final List<FunctionWrapperfunctions = new ArrayList<FunctionWrapper>();
     protected Transport transportInUse;
     protected final Options options;
 
     public DefaultSocket(Options options) {
         this. = options;
     }

    
 
     @Override
     public Future fire(Object datathrows IOException {
         checkState();
         if (.status().equals(.) ||
                 .status().equals(.)) {
             .error(new IOException("Invalid Socket Status " + .status().name()));
             return .;
         }
 
         return .write(data);
     }

    
 
     @Override
     public Socket on(Function<? extends Objectfunction) {
         return on(""function);
     }

    
 
     @Override
     public Socket on(String functionNameFunction<? extends Objectfunction) {
         .add(new FunctionWrapper(functionNamefunction));
         return this;
     }
 
     @Override
    public Socket on(Event eventFunction<?> function) {
        return on(event.name(), function);
    }
    public Socket open(Request requestthrows IOException {
        return open(request, -1, .);
    }

    
    @Override
    public Socket open(Request requestlong timeoutTimeUnit tuthrows IOException {
        this. = request;
        RequestBuilder r = new RequestBuilder();
        r.setUrl(request.uri())
                .setMethod(request.method().name())
                .setHeaders(request.headers())
                .setQueryParameters(decodeQueryString(request));
        List<Transporttransports = getTransport(rrequest);
        return connect(rtransportstimeouttu);
    }
    static FluentStringsMap decodeQueryString(Request request) {
        Map<StringList<String>> c = request.queryString();
        FluentStringsMap f = new FluentStringsMap();
        f.putAll(c);
        return f;
    }
    protected Socket connect(final RequestBuilder rfinal List<Transporttransportsthrows IOException {
        return connect(rtransports, -1, .);
    }
    protected Socket connect(final RequestBuilder rfinal List<Transporttransportsfinal long timeoutfinal TimeUnit tuthrows IOException {
        if (transports.size() > 0) {
             = transports.get(0);
        } else {
            throw new IOException("No suitable transport supported");
        }
        DefaultFuture f = new DefaultFuture(this);
         = createRuntime(f);
        addFunction(timeouttu);
            r.setUrl(.uri().replace("http""ws"));
            try {
                java.util.concurrent.Future<WebSocketfw = .runtime().prepareRequest(r.build()).execute(
                        (AsyncHandler<WebSocket>) );
                fw.get(timeouttu);
            } catch (ExecutionException t) {
                Throwable e = t.getCause();
                if (TransportNotSupported.class.isAssignableFrom(e.getClass())) {
                    return this;
                }
                .close();
                closeRuntime(true);
                if (!.errorHandled() && TimeoutException.class.isAssignableFrom(e.getClass())) {
                    .error(new IOException("Invalid state: " + e.getMessage()));
                }
                return new VoidSocket();
            } catch (Throwable t) {
                .onThrowable(t);
                return new VoidSocket();
            }
        } else {
            r.setUrl(.uri().replace("ws""http"));
            try {
                if (.waitBeforeUnlocking() > 0) {
                    .info("Waiting {}, allowing the http connection to get handled by the server. To reduce the delay, make sure some bytes get written when the connection is suspendeded on the server".waitBeforeUnlocking());
                }
                if (.queryString().containsKey("X-atmo-protocol")) {
                    f.get();
                } else {
                    f.get(.waitBeforeUnlocking(), .);
                }
            } catch (Throwable t) {
                // Swallow  LOG ME
                .trace(""t);
            } finally {
                f.done();
            }
        }
        return this;
    }
    protected void addFunction(final long timeoutfinal TimeUnit tu) {
        .add(new FunctionWrapper(""new Function<TransportNotSupported>() {
            @Override
            public void on(TransportNotSupported transportNotSupported) {
                .transport().remove(0);
                if (.transport().size() > 0) {
                    try {
                        open(timeouttu);
                    } catch (IOException e) {
                        .error(""e);
                    }
                } else {
                    throw new Error("No suitable transport supported by the server");
                }
            }
        }));
    }

    
    @Override
    public void close() {
        // Not connected, but close the underlying AHC.
        if ( == null) {
            closeRuntime(false);
        } else if ( != null && !.status().equals(.)) {
            .close();
            closeRuntime(true);
        }
    }
    protected void closeRuntime(boolean async) {
        if (!.runtimeShared() && !.runtime().isClosed()) {
            if (async) {
                // AHC is broken when calling closeAsynchronously.
                // https://github.com/AsyncHttpClient/async-http-client/issues/290
                final ExecutorService e= Executors.newSingleThreadExecutor();
                e.submit(new Runnable() {
                    @Override
                    public void run() {
                        .runtime().close();
                        e.shutdown();
                    }
                });
            }
            else
                .runtime().close();
        } else if (.runtimeShared()) {
            .warn("Cannot close underlying AsyncHttpClient because it is shared. Make sure you close it manually.");
        }
    }
    @Override
    public Socket.STATUS status() {
        if ( == null) {
            return .;
        } else {
            return .status();
        }
    }
    protected SocketRuntime internalSocket() {
        return ;
    }
    protected List<TransportgetTransport(RequestBuilder rRequest requestthrows IOException {
        List<Transporttransports = new ArrayList<Transport>();
        if (request.transport().size() == 0) {
            transports.add(new WebSocketTransport(rrequest));
            transports.add(new LongPollingTransport(rrequest));
        }
        for (Request.TRANSPORT t : request.transport()) {
            if (t.equals(..)) {
                transports.add(new WebSocketTransport(rrequest));
            } else if (t.equals(..)) {
                transports.add(new SSETransport(rrequest));
            } else if (t.equals(..)) {
                transports.add(new LongPollingTransport(rrequest));
            } else if (t.equals(..)) {
                transports.add(new StreamTransport(rrequest));
            }
        }
        return transports;
    }
    protected Request request() {
        return ;
    }
    private final static class VoidSocket implements Socket {
        @Override
        public Future fire(Object datathrows IOException {
            throw new IllegalStateException("An error occurred during connection. Please add a Function(Throwable) to debug.");
        }
        @Override
        public Socket on(Function<? extends Objectfunction) {
            throw new IllegalStateException("An error occurred during connection. Please add a Function(Throwable) to debug.");
        }
        @Override
        public Socket on(String functionMessageFunction<? extends Objectfunction) {
            throw new IllegalStateException("An error occurred during connection. Please add a Function(Throwable) to debug.");
        }
        @Override
        public Socket on(Event eventFunction<?> function) {
            throw new IllegalStateException("An error occurred during connection. Please add a Function(Throwable) to debug.");
        }
        @Override
        public Socket open(Request requestthrows IOException {
            throw new IllegalStateException("An error occurred during connection. Please add a Function(Throwable) to debug.");
        }
        @Override
        public void close() {
            throw new IllegalStateException("An error occurred during connection. Please add a Function(Throwable) to debug.");
        }
        @Override
        public STATUS status() {
            return .;
        }
        @Override
        public Socket open(Request requestlong timeoutTimeUnit tuthrows IOException {
            throw new IllegalStateException("An error occured during connection. Please add a Function(Throwable) to debug.");
        }
    }
    void checkState() {
        if ( == null) {
            throw new IllegalStateException("Invalid Socket Status : Not Connected");
        }
    }
    public SocketRuntime createRuntime(DefaultFuture futureOptions optionsList<FunctionWrapperfunctions) {
        return new SocketRuntime(optionsfuturefunctions);
    }
New to GrepCode? Check out our FAQ X