Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Copyright 2012 The Netty Project
   *
   * The Netty Project licenses this file to you under the Apache License,
   * version 2.0 (the "License"); you may not use this file except in compliance
   * with the License. You may obtain a copy of the License at:
   *
   *   http://www.apache.org/licenses/LICENSE-2.0
   *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
  * License for the specific language governing permissions and limitations
  * under the License.
  */
 package divconq.net;
 
 
 import java.util.List;

io.netty.channel.ChannelInboundHandlerAdapter which decodes bytes in a stream-like fashion from one io.netty.buffer.ByteBuf to an other Message type. For example here is an implementation which reads all readable bytes from the input io.netty.buffer.ByteBuf and create a new io.netty.buffer.ByteBuf.
     public class SquareDecoder extends ByteToMessageDecoder {
         @Override
         public void decode(io.netty.channel.ChannelHandlerContext ctx, io.netty.buffer.ByteBuf in, List<Object> out)
                 throws java.lang.Exception {
             out.add(in.readBytes(in.readableBytes()));
         }
     }
 
Be aware that sub-classes of ByteToMessageDecoder MUST NOT annotated with Sharable.
 
 public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter {
 
     ByteBuf cumulation;
     private boolean singleDecode;
     private boolean decodeWasNull;
     private boolean first;
 
     protected ByteToMessageDecoder() {
         if (this.isSharable()) {
             throw new IllegalStateException("@Sharable annotation is not allowed");
         }
     }
 
     /*
      * If set then only one message is decoded on each {@link #channelRead(ChannelHandlerContext, Object)}
      * call. This may be useful if you need to do some protocol upgrade and want to make sure nothing is mixed up.
      *
      * Default is {@code false} as this has performance impacts.
      * 
      * @param v 	set to single decoder mode
      */
     public void setSingleDecode(boolean v) {
         this. = v;
     }
 
     /*
      * If {@code true} then only one message is decoded on each
      * {@link #channelRead(ChannelHandlerContext, Object)} call.
      *
      * Default is {@code false} as this has performance impacts.
      * 
      * @return true if this is in single decoder mode
      */
     public boolean isSingleDecode() {
         return this.;
     }
 
     /*
      * Returns the actual number of readable bytes in the internal cumulative
      * buffer of this decoder. You usually do not need to rely on this value
      * to write a decoder. Use it only when you must use it at your own risk.
      * This method is a shortcut to {@link #internalBuffer() internalBuffer().readableBytes()}.
      * 
      * @return readable bytes in the cumulative buffer 
      */
     protected int actualReadableBytes() {
         return this.internalBuffer().readableBytes();
     }
 
     /*
      * Returns the internal cumulative buffer of this decoder. You usually
      * do not need to access the internal buffer directly to write a decoder.
     * Use it only when you must use it at your own risk.
     * 
     * @return the cumulative buffer
     */
    protected ByteBuf internalBuffer() {
        if (this. != null) {
            return this.;
        } 
        else {
            return .;
        }
    }
    @Override
    public final void handlerRemoved(ChannelHandlerContext ctxthrows Exception {
        ByteBuf buf = internalBuffer();
        int readable = buf.readableBytes();
        if (buf.isReadable()) {
            ByteBuf bytes = buf.readBytes(readable);
            buf.release();
            ctx.fireChannelRead(bytes);
        } else {
            buf.release();
        }
        this. = null;
        ctx.fireChannelReadComplete();
        this.handlerRemoved0(ctx);
    }
    /*
     * Gets called after the {@link ByteToMessageDecoder} was removed from the actual context and it doesn't handle
     * events anymore.
     * 
     * @param ctx           the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
     * @throws Exception    is thrown if an error accour
     */
    protected void handlerRemoved0(ChannelHandlerContext ctxthrows Exception { }
    @Override
    public void channelRead(ChannelHandlerContext ctxObject msgthrows Exception {
        if (msg instanceof ByteBuf) {
            RecyclableArrayList out = RecyclableArrayList.newInstance();
            
            try {
                ByteBuf data = (ByteBufmsg;
                this. = this. == null;
                
                if (this.) {
                	this. = data;
                } 
                else {
                    if (this..writerIndex() > this..maxCapacity() - data.readableBytes()
                            || this..refCnt() > 1) {
                        // Expand cumulation (by replace it) when either there is not more room in the buffer
                        // or if the refCnt is greater then 1 which may happen when the user use slice().retain() or
                        // duplicate().retain().
                        //
                        // See:
                        // - https://github.com/netty/netty/issues/2327
                        // - https://github.com/netty/netty/issues/1764
                    	this.expandCumulation(ctxdata.readableBytes());
                    }
                    
                    this..writeBytes(data);
                    data.release();
                }
                
                this.callDecode(ctxthis.out);
            } 
            catch (DecoderException e) {
                throw e;
            } 
            catch (Throwable t) {
                throw new DecoderException(t);
            } 
            finally {
                if (this. != null && !this..isReadable()) {
                	this..release();
                	this. = null;
                }
                int size = out.size();
                this. = size == 0;
                for (int i = 0; i < sizei ++) {
                    ctx.fireChannelRead(out.get(i));
                }
                out.recycle();
            }
        } else {
            ctx.fireChannelRead(msg);
        }
    }
    private void expandCumulation(ChannelHandlerContext ctxint readable) {
        ByteBuf oldCumulation = this.;
        this. = ctx.alloc().buffer(oldCumulation.readableBytes() + readable);
        this..writeBytes(oldCumulation);
        oldCumulation.release();
    }
    @Override
    public void channelReadComplete(ChannelHandlerContext ctxthrows Exception {
        if (this. != null && !this. && this..refCnt() == 1) {
            // discard some bytes if possible to make more room in the
            // buffer but only if the refCnt == 1  as otherwise the user may have
            // used slice().retain() or duplicate().retain().
            //
            // See:
            // - https://github.com/netty/netty/issues/2327
            // - https://github.com/netty/netty/issues/1764
        	this..discardSomeReadBytes();
        }
        if (this.) {
        	this. = false;
        	
            if (ctx.channel().config().isAutoRead()) {
                ctx.read();
            }
        }
        ctx.fireChannelReadComplete();
    }
    @Override
    public void channelInactive(ChannelHandlerContext ctxthrows Exception {
        RecyclableArrayList out = RecyclableArrayList.newInstance();
        try {
            if (this. != null) {
                callDecode(ctxthis.out);
                decodeLast(ctxthis.out);
            } else {
                decodeLast(ctx.out);
            }
        } catch (DecoderException e) {
            throw e;
        } catch (Exception e) {
            throw new DecoderException(e);
        } finally {
            try {
                if (this. != null) {
                	this..release();
                	this. = null;
                }
                int size = out.size();
                for (int i = 0; i < sizei++) {
                    ctx.fireChannelRead(out.get(i));
                }
                if (size > 0) {
                    // Something was read, call fireChannelReadComplete()
                    ctx.fireChannelReadComplete();
                }
                ctx.fireChannelInactive();
            } finally {
                // recycle in all cases
                out.recycle();
            }
        }
    }
    /*
     * Called once data should be decoded from the given {@link ByteBuf}. This method will call
     * {@link #decode(ChannelHandlerContext, ByteBuf, List)} as long as decoding should take place.
     *
     * @param ctx           the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
     * @param in            the {@link ByteBuf} from which to read data
     * @param out           the {@link List} to which decoded messages should be added
     */
    protected void callDecode(ChannelHandlerContext ctxByteBuf inList<Objectout) {
        try {
            while (in.isReadable()) {
                int outSize = out.size();
                int oldInputLength = in.readableBytes();
                
                this.decode(ctxinout);
                // Check if this handler was removed before continuing the loop.
                // If it was removed, it is not safe to continue to operate on the buffer.
                //
                // See https://github.com/netty/netty/issues/1664
                if (ctx.isRemoved()) {
                    break;
                }
                if (outSize == out.size()) {
                    if (oldInputLength == in.readableBytes()) {
                        break;
                    } else {
                        continue;
                    }
                }
                if (oldInputLength == in.readableBytes()) {
                    throw new DecoderException(
                            StringUtil.simpleClassName(getClass()) +
                            ".decode() did not read anything but decoded a message.");
                }
                if (this.isSingleDecode()) {
                    break;
                }
            }
        } catch (DecoderException e) {
            throw e;
        } catch (Throwable cause) {
            throw new DecoderException(cause);
        }
    }
    /*
     * Decode the from one {@link ByteBuf} to an other. This method will be called till either the input
     * {@link ByteBuf} has nothing to read when return from this method or till nothing was read from the input
     * {@link ByteBuf}.
     *
     * @param ctx           the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
     * @param in            the {@link ByteBuf} from which to read data
     * @param out           the {@link List} to which decoded messages should be added
     * @throws Exception    is thrown if an error occur
     */
    protected abstract void decode(ChannelHandlerContext ctxByteBuf inList<Objectoutthrows Exception;
    /*
     * Is called one last time when the {@link ChannelHandlerContext} goes in-active. Which means the
     * {@link #channelInactive(ChannelHandlerContext)} was triggered.
     *
     * By default this will just call {@link #decode(ChannelHandlerContext, ByteBuf, List)} but sub-classes may
     * override this for some special cleanup operation.
     * 
     * @param ctx           the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
     * @param in            the {@link ByteBuf} from which to read data
     * @param out           the {@link List} to which decoded messages should be added
     * @throws Exception    is thrown if an error occur
     */
    protected void decodeLast(ChannelHandlerContext ctxByteBuf inList<Objectoutthrows Exception {
        this.decode(ctxinout);
    }
New to GrepCode? Check out our FAQ X