Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /* ************************************************************************
  #
  #  DivConq
  #
  #  http://divconq.com/
  #
  #  Copyright:
  #    Copyright 2014 eTimeline, LLC. All rights reserved.
  #
 #  License:
 #    See the license.txt file in the project's top-level directory for details.
 #
 #  Authors:
 #    * Andy White
 #
 ************************************************************************ */
 package divconq.api.internal;
 
 import java.util.Map;
 
 
     protected Channel dest = null;
     protected ReadableByteChannel src = null;
     protected Map<StringCookiecookies = new HashMap<>();
     protected OperationCallback callback = null;
 	
 	public Channel allocateChannel(final HyperSession parentOperationResult or) {
 		final AtomicReference<Future<Channel>> sslready = new AtomicReference<>();
 		
         Bootstrap b = new Bootstrap();
         
         b.group(..getEventLoopGroup())
          .channel(NioSocketChannel.class)
          .handler(new ChannelInitializer<SocketChannel>() {
              @Override
              public void initChannel(SocketChannel chthrows Exception {
                  ChannelPipeline pipeline = ch.pipeline();
                  
                  if (parent.getInfo().isSecurel()) {
                 	 SslHandler sh = new SslHandler(parent.getSsl().getClientEngine());
                 	 sslready.set(sh.handshakeFuture());
                 	 pipeline.addLast("ssl"sh);
                  }
                  
                  pipeline.addLast("codec"new HttpClientCodec());
 
                  // so we can get the upload response (200 or not)
                  pipeline.addLast("handler"UploadPutHandler.this);
              }
          });
 
         or.info("Web Data Client connecting");
         
         try {
         	// must wait here to make sure we don't release connectLock too soon
         	// we want chanel init (above) to complete before we try connect again
         	ChannelFuture f = b.connect(parent.getInfo().getAddress()).sync();
         	
         	if (!f.isSuccess()) {
             	or.error(1, "Web Client unable to successfully connect: " + f.cause());
         	}
        	
        	// it has appeared that sometimes we "overshoot" the ssl handshake in code - to prevent
        	// that lets wait for the handshake to be done for sure
        	if (sslready.get() != null) {
        		Future<Channelsf = sslready.get().sync();
            	
            	if (!sf.isSuccess()) {
                	or.error(1, "Web Client unable to securely connect: " + sf.cause());
            	}
        	}
        	
        	return f.channel();
        }
        catch (InterruptedException x) {
        	or.error(1, "Web Client interrupted while connecting: " + x);
        }
        catch (Exception x) {
        	or.error(1, "Web Client unable to connect: " + x);
        }
        
        return null;
	}
    
	public void start(final HyperSession parentScatteringByteChannel srcString chanidMap<StringCookiecookieslong sizelong offsetfinal OperationCallback callback) {
    	this. = src;
    	this. = cookies;
    	this. = callback;
    	this. = this.allocateChannel(parentcallback);
    	
    	if (this..hasErrors()) {
        	callback.complete();
        	return;
    	}
    	
		// send a request to get things going		
		HttpRequest req = new DefaultHttpRequest(.."/upload/" + chanid + "/Final");
		req.headers().set(.parent.getInfo().getHost());
		req.headers().set(."DivConq HyperAPI Client 1.0");
        req.headers().set(., ClientCookieEncoder.encode(this..values()));
        req.headers().set(..size - offset);
        // send request headers - must flush here in case CL = 0
        this..writeAndFlush(req);
        // now start sending the file
		long sent = offset;
		callback.getContext().setAmountCompleted((int)(sent * 100 / size));
		ByteBuf bb = null;
		try {
			bb = ..getBufferAllocator().directBuffer(64 * 1024);		// TODO review if direct is best here
			long toskip = offset;
			if (src instanceof SeekableByteChannel) {
			}
			else {
				while (toskip > 0) {
					int skip = (int) Math.min(bb.capacity(), toskip);
					toskip -= bb.writeBytes(srcskip);
					bb.clear();
				}
			}
			// now start writing the upload
			int amt = bb.writeBytes(srcbb.capacity());
			while (amt != -1) {
				bb.retain();		// this ups ref cnt to 2 - we plan to reuse the buffer
		        this..writeAndFlush(bb).sync();
				sent += amt;
				if (size > 0) 
					callback.getContext().setAmountCompleted((int) (sent * 100 / size));
				// by the time we get here, that buffer has been used up and we can use it for the next buffer
				if (bb.refCnt() != 1) 
					throw new IOException("Buffer reference count is not correct");
				// stop writing if canceled
				if (!this..isOpen()) {
					this.finish();		// might already be finished but to be sure (this is helpful when api.abortStream is called)
					break;
				}
				bb.clear();
				amt = bb.writeBytes(srcbb.capacity());
			}
			// we are now done with it
			bb.release();
		catch (Exception x) {
			try {
				if (bb != null)
					bb.release();
			}
			catch (Exception x2) {
			}
			callback.error(1, "Local read error: " + x);
			this.finish();
	}
	public void finish() {
		//System.out.println("client finished with content");
		try {
			if (this. != null)
				this..close();
		catch (IOException x) {
		}
		this.closeDest();
	}
	public void channelRead0(ChannelHandlerContext ctxHttpObject msgthrows Exception {
		if (msg instanceof HttpResponse) {
			if (((HttpResponse)msg).getStatus().code() != 200)
				Logger.error("Upload Put Handler got unexpected read, non http response");
		}
			Logger.error("Upload Put Handler got unexpected read, non http response");
		this.finish();
	}
    @Override
    public void channelInactive(ChannelHandlerContext ctxthrows Exception {
        ..println("Web Data Client disconnected!");
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctxThrowable causethrows Exception {
    	// TODO logging
        cause.printStackTrace();        
        ctx.close();
    }
	public void closeDest() {
		try {
			if (this. != null)
				this..close().await(2000);
		catch (InterruptedException x) {
			// ignore 
		}
	}
New to GrepCode? Check out our FAQ X