Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  //  The contents of this file are subject to the Mozilla Public License
  //  Version 1.1 (the "License"); you may not use this file except in
  //  compliance with the License. You may obtain a copy of the License
  //  at http://www.mozilla.org/MPL/
  //
  //  Software distributed under the License is distributed on an "AS IS"
  //  basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
  //  the License for the specific language governing rights and
  //  limitations under the License.
 //
 //  The Original Code is RabbitMQ.
 //
 //  The Initial Developer of the Original Code is VMware, Inc.
 //  Copyright (c) 2007-2011 VMware, Inc.  All rights reserved.
 //
 
 package com.rabbitmq.client.impl;
 
 import java.util.List;
 
Class responsible for piecing together a command from a series of Frames.

Concurrency
This class is thread-safe, since all methods are synchronised. Callers should not synchronise on objects of this class unless they are sole owners.

See also:
AMQCommand
 
 final class CommandAssembler {
     private static final byte[] EMPTY_BYTE_ARRAY = new byte[0];

    
Current state, used to decide how to handle each incoming frame.
 
     private enum CAState {
         EXPECTING_METHOD, EXPECTING_CONTENT_HEADER, EXPECTING_CONTENT_BODY, COMPLETE;
     }
     private CAState state;

    
The method for this command
 
     private Method method;
    
    
The content header for this command
 
     private AMQContentHeader contentHeader;

    
The fragments of this command's content body - a list of byte[]
 
     private final List<byte[]> bodyN;
    
sum of the lengths of all fragments
 
     private int bodyLength;

    
No bytes of content body not yet accumulated
 
     private long remainingBodyBytes;
 
     public CommandAssembler(Method methodAMQContentHeader contentHeaderbyte[] body) {
         this. = method;
         this. = contentHeader;
         this. = new ArrayList<byte[]>(2);
         this. = 0;
         this. = 0;
         appendBodyFragment(body);
         if (method == null) {
             this. = .;
         } else if (contentHeader == null) {
             this. = method.hasContent() ? . : .;
         } else {
             this. = contentHeader.getBodySize() - this.;
             updateContentBodyState();
         }
     }
 
     public synchronized Method getMethod() {
         return this.;
     }
 
     public synchronized AMQContentHeader getContentHeader() {
         return this.;
     }

    

Returns:
true if the command is complete
 
     public synchronized boolean isComplete() {
         return (this. == .);
     }

    
Decides whether more body frames are expected
 
     private void updateContentBodyState() {
         this. = (this. > 0) ? . : .;
     }
 
     private void consumeMethodFrame(Frame fthrows IOException {
         if (f.type == .) {
             this. = AMQImpl.readMethodFrom(f.getInputStream());
             this. = this..hasContent() ? . : .;
         } else {
             throw new UnexpectedFrameError(f.);
         }
     }
 
    private void consumeHeaderFrame(Frame fthrows IOException {
        if (f.type == .) {
            this. = AMQImpl.readContentHeaderFrom(f.getInputStream());
            this. = this..getBodySize();
            updateContentBodyState();
        } else {
            throw new UnexpectedFrameError(f.);
        }
    }
    private void consumeBodyFrame(Frame f) {
        if (f.type == .) {
            byte[] fragment = f.getPayload();
            this. -= fragment.length;
            updateContentBodyState();
            if (this. < 0) {
                throw new UnsupportedOperationException("%%%%%% FIXME unimplemented");
            }
            appendBodyFragment(fragment);
        } else {
            throw new UnexpectedFrameError(f.);
        }
    }

    
Stitches together a fragmented content body into a single byte array
    private byte[] coalesceContentBody() {
        if (this. == 0) return ;
        if (this..size() == 1) return this..get(0);
        byte[] body = new byte[];
        int offset = 0;
        for (byte[] fragment : this.) {
            System.arraycopy(fragment, 0, bodyoffsetfragment.length);
            offset += fragment.length;
        }
        this..clear();
        this..add(body);
        return body;
    }
    public synchronized byte[] getContentBody() {
        return coalesceContentBody();
    }
    private void appendBodyFragment(byte[] fragment) {
        if (fragment == null || fragment.length == 0) return;
        .add(fragment);
         += fragment.length;
    }

    

Parameters:
f frame to be incorporated
Returns:
true if command becomes complete
Throws:
java.io.IOException if error reading frame
    public synchronized boolean handleFrame(Frame fthrows IOException
    {
        switch (this.) {
          case :          consumeMethodFrame(f); break;
          case :  consumeHeaderFrame(f); break;
          case :    consumeBodyFrame(f);   break;
          default:
              throw new AssertionError("Bad Command State " + this.);
        }
        return isComplete();
    }
New to GrepCode? Check out our FAQ X