Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Copyright 2013 Jeanfrancois Arcand
   *
   * Licensed under the Apache License, Version 2.0 (the "License"); you may not
   * use this file except in compliance with the License. You may obtain a copy of
   * the License at
   *
   * http://www.apache.org/licenses/LICENSE-2.0
   *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
  * License for the specific language governing permissions and limitations under
  * the License.
  */
 package org.atmosphere.wasync.transport;
 
 
 import java.util.List;
 import java.util.Map;
 
 import static org.atmosphere.wasync.Event.CLOSE;
 import static org.atmosphere.wasync.Event.ERROR;
 import static org.atmosphere.wasync.Event.HEADERS;
 import static org.atmosphere.wasync.Event.MESSAGE;
 import static org.atmosphere.wasync.Event.OPEN;
 import static org.atmosphere.wasync.Event.REOPENED;
 import static org.atmosphere.wasync.Event.STATUS;
 import static org.atmosphere.wasync.Event.TRANSPORT;
 import static org.atmosphere.wasync.Socket.STATUS;

Streaming org.atmosphere.wasync.Transport implementation

Author(s):
Jeanfrancois Arcand
 
 public class StreamTransport implements AsyncHandler<String>, Transport {
     private final static String DEFAULT_CHARSET = "UTF-8";
     private final Logger logger = LoggerFactory.getLogger(StreamTransport.class);
 
     protected final List<FunctionWrapperfunctions;
     protected final List<Decoder<? extends Object, ?>> decoders;
     //TODO fix me
     protected String charSet = ;
     protected final FunctionResolver resolver;
     protected final Options options;
     protected final RequestBuilder requestBuilder;
     protected final Request request;
     protected final AtomicBoolean closed = new AtomicBoolean(false);
     protected final boolean isBinary;
     protected STATUS status = ..;
     protected final AtomicBoolean errorHandled = new AtomicBoolean();
     protected ListenableFuture underlyingFuture;
     protected boolean protocolReceived = false;
     protected Future connectdFuture;
     protected final boolean protocolEnabled;
 
     public StreamTransport(RequestBuilder requestBuilderOptions optionsRequest requestList<FunctionWrapperfunctions) {
         this. = request.decoders();
 
         if (.size() == 0) {
             .add(new Decoder<StringObject>() {
                 @Override
                 public Object decode(Event eString s) {
                     return s;
                 }
             });
         }
         this. = functions;
         this. = request.functionResolver();
         this. = options;
         this. = requestBuilder;
         this. = request;
 
          = request.queryString().get("X-atmo-protocol") != null;
          = request.headers().get("Content-Type") != null ?
                request.headers().get("Content-Type").contains("application/octet-stream") : false;
    }

    
    @Override
    public Transport registerF(FunctionWrapper function) {
        .add(function);
        return this;
    }

    
    @Override
    public void onThrowable(Throwable t) {
        if (CancellationException.class.isAssignableFrom(t.getClass())) return;
        .warn(""t);
         = ..;
        .set(TransportsUtil.invokeFunction(t.getClass(), t.name(), ));
    }

    
    @Override
    public STATE onBodyPartReceived(HttpResponseBodyPart bodyPartthrows Exception {
        if () {
            byte[] payload = bodyPart.getBodyPartBytes();
            if (!whiteSpace(payload)) {
                TransportsUtil.invokeFunction(payload.getClass(), payload.name(), );
            }
        } else {
            String m = new String(bodyPart.getBodyPartBytes(), ).trim();
            if (m.length() > 0) {
                TransportsUtil.invokeFunction(m.getClass(), m.name(), );
            }
        }
        if ( != null.done();
        return ..;
    }

    
    @Override
    public STATE onHeadersReceived(HttpResponseHeaders headersthrows Exception {
        TransportsUtil.invokeFunction(Map.classheaders.getHeaders(), .name(), );
        // TODO: Parse charset
        return ..;
    }

    
    @Override
    public AsyncHandler.STATE onStatusReceived(HttpResponseStatus responseStatusthrows Exception {
        TransportsUtil.invokeFunction(Request.TRANSPORT.classname(), .name(), );
        .set(false);
        .set(false);
        Event newStatus = .equals(..) ?  : ;
        TransportsUtil.invokeFunction(newStatus,
                String.classnewStatus.name(), newStatus.name(), );
        TransportsUtil.invokeFunction(Integer.classnew Integer(responseStatus.getStatusCode()), .name(), );
        return ..;
    }

    
    @Override
    public String onCompleted() throws Exception {
        if (.get()) return "";
        if ( == ..) {
            return "";
        }
        close();
        if (.reconnect()) {
            // We can't let the STATUS to close as fire() method won't work.
             = ..;
            if (.reconnectInSeconds() > 0) {
                ScheduledExecutorService e = .runtime().getConfig().reaper();
                e.schedule(new Runnable() {
                    public void run() {
                        reconnect();
                    }
                }, .reconnectInSeconds(), .);
            } else {
                reconnect();
            }
        }
        return "";
    }
    void reconnect() {
        Map<StringList<String>> c = .queryString();
        FluentStringsMap f = new FluentStringsMap();
        f.putAll(c);
        try {
        } catch (IOException e) {
            .error(""e);
        }
    }

    
    @Override
    public Request.TRANSPORT name() {
        return ..;
    }

    
    @Override
    public void close() {
        if (.getAndSet(true)) return;
         = ..;
        TransportsUtil.invokeFunction(String.class.name(), .name(), );
        if ( != null.cancel(false);
    }

    
    @Override
    public STATUS status() {
        return ;
    }

    
    @Override
    public boolean errorHandled() {
        return .get();
    }

    
    @Override
    public void error(Throwable t) {
        .warn(""t);
        TransportsUtil.invokeFunction(t.getClass(), t.name(), );
    }

    
    @Override
    public void future(ListenableFuture f) {
        this. = f;
    }
    @Override
    public void connectedFuture(Future f) {
        this. = f;
    }
    protected final static boolean whiteSpace(byte[] b) {
        int i = b.length;
        while (i-- > 0 && (b[i] == 10 || b[i] == 32)) {
        }
        return i == -1;
    }
New to GrepCode? Check out our FAQ X