Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   *
   * Licensed to the Apache Software Foundation (ASF) under one
   * or more contributor license agreements.  See the NOTICE file
   * distributed with this work for additional information
   * regarding copyright ownership.  The ASF licenses this file
   * to you under the Apache License, Version 2.0 (the
   * "License"); you may not use this file except in compliance
   * with the License.  You may obtain a copy of the License at
  *
  *   http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
  *
  */
 package org.apache.qpid.server.store.berkeleydb.upgrade;
 
 import java.util.List;
 import java.util.Set;
 
 
 import  com.sleepycat.bind.tuple.ByteBinding;
 import  com.sleepycat.bind.tuple.LongBinding;
 import  com.sleepycat.bind.tuple.TupleBase;
 import  com.sleepycat.bind.tuple.TupleBinding;
 import  com.sleepycat.bind.tuple.TupleInput;
 import  com.sleepycat.bind.tuple.TupleOutput;
 import  com.sleepycat.je.Database;
 import  com.sleepycat.je.DatabaseEntry;
 import  com.sleepycat.je.DatabaseException;
 import  com.sleepycat.je.Environment;
 import  com.sleepycat.je.Transaction;
 
 public class UpgradeFrom4To5 extends AbstractStoreUpgrade
 {
     private static final String OLD_DELIVERY_DB = "deliveryDb_v4";
     private static final String NEW_DELIVERY_DB = "deliveryDb_v5";
     private static final String EXCHANGE_DB_NAME = "exchangeDb_v4";
     private static final String OLD_BINDINGS_DB_NAME = "queueBindingsDb_v4";
     private static final String NEW_BINDINGS_DB_NAME = "queueBindingsDb_v5";
     private static final String OLD_QUEUE_DB_NAME = "queueDb_v4";
     private static final String NEW_QUEUE_DB_NAME = "queueDb_v5";
     private static final String OLD_METADATA_DB_NAME = "messageMetaDataDb_v4";
     private static final String NEW_METADATA_DB_NAME = "messageMetaDataDb_v5";
     private static final String OLD_CONTENT_DB_NAME = "messageContentDb_v4";
     private static final String NEW_CONTENT_DB_NAME = "messageContentDb_v5";
 
     private static final byte COLON = (byte':';
 
     private static final Logger _logger = Logger.getLogger(UpgradeFrom4To5.class);
 
     public void performUpgrade(final Environment environmentfinal UpgradeInteractionHandler handlerString virtualHostNamethrows DatabaseException, AMQStoreException
     {
         Transaction transaction = null;
         try
         {
             reportStarting(environment, 4);
 
             transaction = environment.beginTransaction(nullnull);
 
             // find all queues which are bound to a topic exchange and which have a colon in their name
             final List<AMQShortStringpotentialDurableSubs = findPotentialDurableSubscriptions(environmenttransaction);
 
             Set<StringexistingQueues = upgradeQueues(environmenthandlerpotentialDurableSubstransaction);
             upgradeQueueBindings(environmenthandlerpotentialDurableSubstransaction);
             Set<LongmessagesToDiscard = upgradeDelivery(environmentexistingQueueshandlertransaction);
             upgradeContent(environmenthandlermessagesToDiscardtransaction);
             upgradeMetaData(environmenthandlermessagesToDiscardtransaction);
             renameRemainingDatabases(environmenthandlertransaction);
             transaction.commit();
 
             reportFinished(environment, 5);
 
        }
        catch (Exception e)
        {
            transaction.abort();
            if (e instanceof DatabaseException)
            {
                throw (DatabaseException) e;
            }
            else if (e instanceof AMQStoreException)
            {
                throw (AMQStoreExceptione;
            }
            else
            {
                throw new AMQStoreException("Unexpected exception"e);
            }
        }
    }
    private void upgradeQueueBindings(Environment environmentUpgradeInteractionHandler handlerfinal List<AMQShortStringpotentialDurableSubs,
            Transaction transaction)
    {
        if (environment.getDatabaseNames().contains())
        {
            .info("Queue Bindings");
            final BindingTuple bindingTuple = new BindingTuple();
            CursorOperation databaseOperation = new CursorOperation()
            {
                @Override
                public void processEntry(Database sourceDatabase, Database targetDatabase, Transaction transaction,
                        DatabaseEntry key, DatabaseEntry value)
                {
                    // All the information required in binding entries is actually in the *key* not value.
                    BindingRecord oldBindingRecord = bindingTuple.entryToObject(key);
                    AMQShortString queueName = oldBindingRecord.getQueueName();
                    AMQShortString exchangeName = oldBindingRecord.getExchangeName();
                    AMQShortString routingKey = oldBindingRecord.getRoutingKey();
                    FieldTable arguments = oldBindingRecord.getArguments();
                    if (.isDebugEnabled())
                    {
                        .debug(String.format(
                                "Processing binding for queue %s, exchange %s, routingKey %s arguments %s"queueName,
                                exchangeNameroutingKeyarguments));
                    }
                    // if the queue name is in the gathered list then inspect its binding arguments
                    // only topic exchange should have a JMS selector key in binding
                    if (potentialDurableSubs.contains(queueName)
                            && exchangeName.equals(AMQShortString.valueOf(.)))
                    {
                        if (arguments == null)
                        {
                            arguments = new FieldTable();
                        }
                        AMQShortString selectorFilterKey = ..getValue();
                        if (!arguments.containsKey(selectorFilterKey))
                        {
                            if (.isDebugEnabled())
                            {
                                .info("adding the empty string (i.e. 'no selector') value for " + queueName
                                        + " and exchange " + exchangeName);
                            }
                            arguments.put(selectorFilterKey"");
                        }
                    }
                    addBindingToDatabase(bindingTupletargetDatabasetransactionqueueNameexchangeNameroutingKey,
                            arguments);
                }
            };
            new DatabaseTemplate(environmenttransaction)
                    .run(databaseOperation);
            environment.removeDatabase(transaction);
            .info(databaseOperation.getRowCount() + " Queue Binding entries");
        }
    }
    private Set<StringupgradeQueues(final Environment environmentfinal UpgradeInteractionHandler handler,
            List<AMQShortStringpotentialDurableSubs, Transaction transaction)
    {
        .info("Queues");
        final Set<StringexistingQueues = new HashSet<String>();
        if (environment.getDatabaseNames().contains())
        {
            final QueueRecordBinding binding = new QueueRecordBinding(potentialDurableSubs);
            CursorOperation databaseOperation = new CursorOperation()
            {
                @Override
                public void processEntry(final Database sourceDatabasefinal Database targetDatabase,
                        final Transaction transactionfinal DatabaseEntry keyfinal DatabaseEntry value)
                {
                    QueueRecord record = binding.entryToObject(value);
                    DatabaseEntry newValue = new DatabaseEntry();
                    binding.objectToEntry(recordnewValue);
                    targetDatabase.put(transactionkeynewValue);
                    existingQueues.add(record.getNameShortString().asString());
                    sourceDatabase.delete(transactionkey);
                }
            };
            new DatabaseTemplate(environmenttransaction).run(databaseOperation);
            environment.removeDatabase(transaction);
            .info(databaseOperation.getRowCount() + " Queue entries");
        }
        return existingQueues;
    }
    private List<AMQShortStringfindPotentialDurableSubscriptions(final Environment environment,
            Transaction transaction)
    {
        final List<AMQShortStringexchangeNames = findTopicExchanges(environment);
        final List<AMQShortStringqueues = new ArrayList<AMQShortString>();
        final PartialBindingRecordBinding binding = new PartialBindingRecordBinding();
        CursorOperation databaseOperation = new CursorOperation()
        {
            @Override
            public void processEntry(Database sourceDatabase, Database targetDatabase, Transaction transaction,
                    DatabaseEntry key, DatabaseEntry value)
            {
                PartialBindingRecord record = binding.entryToObject(key);
                if (exchangeNames.contains(record.getExchangeName()) && record.getQueueName().contains())
                {
                    queues.add(record.getQueueName());
                }
            }
        };
        new DatabaseTemplate(environmenttransaction).run(databaseOperation);
        return queues;
    }
    private Set<LongupgradeDelivery(final Environment environmentfinal Set<StringexistingQueues,
            final UpgradeInteractionHandler handler, Transaction transaction)
    {
        final Set<LongmessagesToDiscard = new HashSet<Long>();
        final Set<StringqueuesToDiscard = new HashSet<String>();
        final QueueEntryKeyBinding queueEntryKeyBinding = new QueueEntryKeyBinding();
        .info("Delivery Records");
        CursorOperation databaseOperation = new CursorOperation()
        {
            @Override
            public void processEntry(Database sourceDatabase, Database targetDatabase, Transaction transaction,
                    DatabaseEntry key, DatabaseEntry value)
            {
                QueueEntryKey entryKey = queueEntryKeyBinding.entryToObject(key);
                Long messageId = entryKey.getMessageId();
                final String queueName = entryKey.getQueueName().asString();
                if (!existingQueues.contains(queueName))
                {
                    if (queuesToDiscard.contains(queueName))
                    {
                        messagesToDiscard.add(messageId);
                    }
                    else
                    {
                        String lineSeparator = System.getProperty("line.separator");
                        String question = MessageFormat.format("Found persistent messages for non-durable queue ''{1}''. "
                                + " Do you with to create this queue and move all the messages into it?" + lineSeparator
                                + "NOTE: Answering No will result in these messages being discarded!"queueName);
                        UpgradeInteractionResponse response = handler.requireResponse(question.toString(),
                                ..,
                                ..);
                        if (response == .)
                        {
                            createQueue(environmenttransactionqueueName);
                            existingQueues.add(queueName);
                        }
                        else if (response == .)
                        {
                            queuesToDiscard.add(queueName);
                            messagesToDiscard.add(messageId);
                        }
                        else
                        {
                            throw new RuntimeException("Unable is aborted!");
                        }
                    }
                }
                if (!messagesToDiscard.contains(messageId))
                {
                    DatabaseEntry newKey = new DatabaseEntry();
                    queueEntryKeyBinding.objectToEntry(entryKeynewKey);
                    targetDatabase.put(transactionnewKeyvalue);
                }
            }
        };
        new DatabaseTemplate(environmenttransaction).run(databaseOperation);
        if (!messagesToDiscard.isEmpty())
        {
            databaseOperation = new CursorOperation()
            {
                @Override
                public void processEntry(Database sourceDatabase, Database targetDatabase, Transaction transaction,
                        DatabaseEntry key, DatabaseEntry value)
                {
                    QueueEntryKey entryKey = queueEntryKeyBinding.entryToObject(key);
                    Long messageId = entryKey.getMessageId();
                    if (messagesToDiscard.contains(messageId))
                    {
                        messagesToDiscard.remove(messageId);
                    }
                }
            };
            new DatabaseTemplate(environmenttransaction).run(databaseOperation);
        }
        .info(databaseOperation.getRowCount() + " Delivery Records entries ");
        environment.removeDatabase(transaction);
        return messagesToDiscard;
    }
    protected void createQueue(final Environment environment, Transaction transactionfinal String queueName)
    {
        final QueueRecordBinding binding = new QueueRecordBinding(null);
        final BindingTuple bindingTuple = new BindingTuple();
        DatabaseRunnable queueCreateOperation = new DatabaseRunnable()
        {
            @Override
            public void run(Database newQueueDatabase, Database newBindingsDatabase, Transaction transaction)
            {
                AMQShortString queueNameAMQ = new AMQShortString(queueName);
                QueueRecord record = new QueueRecord(queueNameAMQnullfalsenull);
                DatabaseEntry key = new DatabaseEntry();
                TupleOutput output = new TupleOutput();
                AMQShortStringEncoding.writeShortString(record.getNameShortString(), output);
                TupleBase.outputToEntry(outputkey);
                DatabaseEntry newValue = new DatabaseEntry();
                binding.objectToEntry(recordnewValue);
                newQueueDatabase.put(transactionkeynewValue);
                FieldTable emptyArguments = new FieldTable();
                addBindingToDatabase(bindingTuplenewBindingsDatabasetransactionqueueNameAMQ,
                        AMQShortString.valueOf(.), queueNameAMQemptyArguments);
                // TODO QPID-3490 we should not persist a default exchange binding
                addBindingToDatabase(bindingTuplenewBindingsDatabasetransactionqueueNameAMQ,
                        AMQShortString.valueOf(.), queueNameAMQemptyArguments);
            }
        };
        new DatabaseTemplate(environmenttransaction).run(queueCreateOperation);
    }
    private List<AMQShortStringfindTopicExchanges(final Environment environment)
    {
        final List<AMQShortStringtopicExchanges = new ArrayList<AMQShortString>();
        final ExchangeRecordBinding binding = new ExchangeRecordBinding();
        CursorOperation databaseOperation = new CursorOperation()
        {
            @Override
            public void processEntry(Database sourceDatabase, Database targetDatabase, Transaction transaction,
                    DatabaseEntry key, DatabaseEntry value)
            {
                ExchangeRecord record = binding.entryToObject(value);
                if (AMQShortString.valueOf(.).equals(record.getType()))
                {
                    topicExchanges.add(record.getName());
                }
            }
        };
        new DatabaseTemplate(environmentnull).run(databaseOperation);
        return topicExchanges;
    }
    private void upgradeMetaData(final Environment environmentfinal UpgradeInteractionHandler handler,
            final Set<LongmessagesToDiscard, Transaction transaction)
    {
        .info("Message MetaData");
        if (environment.getDatabaseNames().contains())
        {
            final MessageMetaDataBinding binding = new MessageMetaDataBinding();
            CursorOperation databaseOperation = new CursorOperation()
            {
                @Override
                public void processEntry(Database sourceDatabase, Database targetDatabase, Transaction transaction,
                        DatabaseEntry key, DatabaseEntry value)
                {
                    StorableMessageMetaData metaData = binding.entryToObject(value);
                    // get message id
                    Long messageId = LongBinding.entryToLong(key);
                    // ONLY copy data if message is delivered to existing queue
                    if (messagesToDiscard.contains(messageId))
                    {
                        return;
                    }
                    DatabaseEntry newValue = new DatabaseEntry();
                    binding.objectToEntry(metaDatanewValue);
                    targetDatabase.put(transactionkeynewValue);
                    targetDatabase.put(transactionkeynewValue);
                    deleteCurrent();
                }
            };
            new DatabaseTemplate(environmenttransaction)
                    .run(databaseOperation);
            environment.removeDatabase(transaction);
            .info(databaseOperation.getRowCount() + " Message MetaData entries");
        }
    }
    private void upgradeContent(final Environment environmentfinal UpgradeInteractionHandler handler,
            final Set<LongmessagesToDiscard, Transaction transaction)
    {
        .info("Message Contents");
        if (environment.getDatabaseNames().contains())
        {
            final MessageContentKeyBinding keyBinding = new MessageContentKeyBinding();
            final ContentBinding contentBinding = new ContentBinding();
            CursorOperation cursorOperation = new CursorOperation()
            {
                private long _prevMsgId = -1;
                private int _bytesSeenSoFar;
                @Override
                public void processEntry(Database sourceDatabase, Database targetDatabase, Transaction transaction,
                        DatabaseEntry key, DatabaseEntry value)
                {
                    // determine the msgId of the current entry
                    MessageContentKey contentKey = keyBinding.entryToObject(key);
                    long msgId = contentKey.getMessageId();
                    // ONLY copy data if message is delivered to existing queue
                    if (messagesToDiscard.contains(msgId))
                    {
                        return;
                    }
                    // if this is a new message, restart the byte offset count.
                    if ( != msgId)
                    {
                         = 0;
                    }
                    // determine the content size
                    ByteBuffer content = contentBinding.entryToObject(value);
                    int contentSize = content.limit();
                    // create the new key: id + previously seen data count
                    MessageContentKey newKey = new MessageContentKey(msgId);
                    DatabaseEntry newKeyEntry = new DatabaseEntry();
                    keyBinding.objectToEntry(newKeynewKeyEntry);
                    DatabaseEntry newValueEntry = new DatabaseEntry();
                    contentBinding.objectToEntry(contentnewValueEntry);
                    targetDatabase.put(nullnewKeyEntrynewValueEntry);
                     = msgId;
                     += contentSize;
                }
            };
            new DatabaseTemplate(environmenttransaction).run(cursorOperation);
            environment.removeDatabase(transaction);
            .info(cursorOperation.getRowCount() + " Message Content entries");
        }
    }

    
For all databases which haven't been otherwise upgraded, we still need to rename them from _v4 to _v5
    private void renameRemainingDatabases(final Environment environmentfinal UpgradeInteractionHandler handler,
            Transaction transaction)
    {
        for (String dbName : environment.getDatabaseNames())
        {
            if (dbName.endsWith("_v4"))
            {
                String newName = dbName.substring(0, dbName.length() - 3) + "_v5";
                .info("Renaming " + dbName + " into " + newName);
                environment.renameDatabase(transactiondbNamenewName);
            }
        }
    }
    private void addBindingToDatabase(final BindingTuple bindingTuple, Database targetDatabase, Transaction transaction,
            AMQShortString queueNameAMQShortString exchangeNameAMQShortString routingKeyFieldTable arguments)
    {
        DatabaseEntry newKey = new DatabaseEntry();
        bindingTuple.objectToEntry(new BindingRecord(exchangeNamequeueNameroutingKeyarguments), newKey);
        DatabaseEntry newValue = new DatabaseEntry();
        ByteBinding.byteToEntry((byte) 0, newValue);
        targetDatabase.put(transactionnewKeynewValue);
    }
    private static final class ExchangeRecord
    {
        private final AMQShortString _name;
        private final AMQShortString _type;
        private ExchangeRecord(final AMQShortString namefinal AMQShortString type)
        {
             = name;
             = type;
        }
        public AMQShortString getName()
        {
            return ;
        }
        public AMQShortString getType()
        {
            return ;
        }
    }
    private static final class ExchangeRecordBinding extends TupleBinding<ExchangeRecord>
    {
        @Override
        public ExchangeRecord entryToObject(final TupleInput input)
        {
            return new ExchangeRecord(AMQShortStringEncoding.readShortString(input),
                    AMQShortStringEncoding.readShortString(input));
        }
        @Override
        public void objectToEntry(final ExchangeRecord objectfinal TupleOutput output)
        {
            AMQShortStringEncoding.writeShortString(object.getName(), output);
            AMQShortStringEncoding.writeShortString(object.getType(), output);
            output.writeBoolean(false);
        }
    }
    private static final class PartialBindingRecord
    {
        private final AMQShortString _exchangeName;
        private final AMQShortString _queueName;
        private PartialBindingRecord(final AMQShortString namefinal AMQShortString type)
        {
             = name;
             = type;
        }
        public AMQShortString getExchangeName()
        {
            return ;
        }
        public AMQShortString getQueueName()
        {
            return ;
        }
    }
    private static final class PartialBindingRecordBinding extends TupleBinding<PartialBindingRecord>
    {
        @Override
        public PartialBindingRecord entryToObject(final TupleInput input)
        {
            return new PartialBindingRecord(AMQShortStringEncoding.readShortString(input),
                    AMQShortStringEncoding.readShortString(input));
        }
        @Override
        public void objectToEntry(final PartialBindingRecord objectfinal TupleOutput output)
        {
            throw new UnsupportedOperationException();
        }
    }
    static final class QueueRecord
    {
        private final AMQShortString _queueName;
        private final AMQShortString _owner;
        private final FieldTable _arguments;
        private final boolean _exclusive;
        public QueueRecord(AMQShortString queueNameAMQShortString ownerboolean exclusiveFieldTable arguments)
        {
             = queueName;
             = owner;
             = exclusive;
             = arguments;
        }
        public AMQShortString getNameShortString()
        {
            return ;
        }
        public AMQShortString getOwner()
        {
            return ;
        }
        public boolean isExclusive()
        {
            return ;
        }
        public FieldTable getArguments()
        {
            return ;
        }
    }
    static final class QueueRecordBinding extends TupleBinding<QueueRecord>
    {
        private final List<AMQShortString_durableSubNames;
        QueueRecordBinding(final List<AMQShortStringdurableSubNames)
        {
             = durableSubNames;
        }
        @Override
        public QueueRecord entryToObject(final TupleInput input)
        {
            AMQShortString name = AMQShortStringEncoding.readShortString(input);
            AMQShortString owner = AMQShortStringEncoding.readShortString(input);
            FieldTable arguments = FieldTableEncoding.readFieldTable(input);
            boolean exclusive = input.available() > 0 && input.readBoolean();
            exclusive = exclusive || .contains(name);
            return new QueueRecord(nameownerexclusivearguments);
        }
        @Override
        public void objectToEntry(final QueueRecord recordfinal TupleOutput output)
        {
            AMQShortStringEncoding.writeShortString(record.getNameShortString(), output);
            AMQShortStringEncoding.writeShortString(record.getOwner(), output);
            FieldTableEncoding.writeFieldTable(record.getArguments(), output);
            output.writeBoolean(record.isExclusive());
        }
    }
    static final class MessageMetaDataBinding extends TupleBinding<StorableMessageMetaData>
    {
        @Override
        public MessageMetaData entryToObject(final TupleInput input)
        {
            try
            {
                final MessagePublishInfo publishBody = readMessagePublishInfo(input);
                final ContentHeaderBody contentHeaderBody = readContentHeaderBody(input);
                final int contentChunkCount = input.readInt();
                return new MessageMetaData(publishBodycontentHeaderBodycontentChunkCount);
            }
            catch (Exception e)
            {
                .error("Error converting entry to object: " + ee);
                // annoyingly just have to return null since we cannot throw
                return null;
            }
        }
        private MessagePublishInfo readMessagePublishInfo(TupleInput tupleInput)
        {
            final AMQShortString exchange = AMQShortStringEncoding.readShortString(tupleInput);
            final AMQShortString routingKey = AMQShortStringEncoding.readShortString(tupleInput);
            final boolean mandatory = tupleInput.readBoolean();
            final boolean immediate = tupleInput.readBoolean();
            return new MessagePublishInfo()
            {
                public AMQShortString getExchange()
                {
                    return exchange;
                }
                public void setExchange(AMQShortString exchange)
                {
                }
                public boolean isImmediate()
                {
                    return immediate;
                }
                public boolean isMandatory()
                {
                    return mandatory;
                }
                public AMQShortString getRoutingKey()
                {
                    return routingKey;
                }
            };
        }
        private ContentHeaderBody readContentHeaderBody(TupleInput tupleInputthrows AMQFrameDecodingException,
                AMQProtocolVersionException
        {
            int bodySize = tupleInput.readInt();
            byte[] underlying = new byte[bodySize];
            tupleInput.readFast(underlying);
            try
            {
                return ContentHeaderBody.createFromBuffer(new DataInputStream(new ByteArrayInputStream(underlying)),
                        bodySize);
            }
            catch (IOException e)
            {
                throw new AMQFrameDecodingException(nulle.getMessage(), e);
            }
        }
        @Override
        public void objectToEntry(final StorableMessageMetaData metaDatafinal TupleOutput output)
        {
            final int bodySize = 1 + metaData.getStorableSize();
            byte[] underlying = new byte[bodySize];
            underlying[0] = (bytemetaData.getType().ordinal();
            java.nio.ByteBuffer buf = java.nio.ByteBuffer.wrap(underlying);
            buf.position(1);
            buf = buf.slice();
            metaData.writeToBuffer(0, buf);
            output.writeInt(bodySize);
            output.writeFast(underlying);
        }
    }
    static final class MessageContentKey
    {
        private long _messageId;
        private int _chunk;
        public MessageContentKey(long messageIdint chunkNo)
        {
             = messageId;
             = chunkNo;
        }
        public int getChunk()
        {
            return ;
        }
        public long getMessageId()
        {
            return ;
        }
    }
    static final class MessageContentKeyBinding extends TupleBinding<MessageContentKey>
    {
        public MessageContentKey entryToObject(TupleInput tupleInput)
        {
            long messageId = tupleInput.readLong();
            int chunk = tupleInput.readInt();
            return new MessageContentKey(messageIdchunk);
        }
        public void objectToEntry(MessageContentKey object, TupleOutput tupleOutput)
        {
            final MessageContentKey mk = object;
            tupleOutput.writeLong(mk.getMessageId());
            tupleOutput.writeInt(mk.getChunk());
        }
    }
    static final class ContentBinding extends TupleBinding<ByteBuffer>
    {
        public ByteBuffer entryToObject(TupleInput tupleInput)
        {
            final int size = tupleInput.readInt();
            byte[] underlying = new byte[size];
            tupleInput.readFast(underlying);
            return ByteBuffer.wrap(underlying);
        }
        public void objectToEntry(ByteBuffer src, TupleOutput tupleOutput)
        {
            src = src.slice();
            byte[] chunkData = new byte[src.limit()];
            src.duplicate().get(chunkData);
            tupleOutput.writeInt(chunkData.length);
            tupleOutput.writeFast(chunkData);
        }
    }
    static final class QueueEntryKey
    {
        private AMQShortString _queueName;
        private long _messageId;
        public QueueEntryKey(AMQShortString queueNamelong messageId)
        {
             = queueName;
             = messageId;
        }
        public AMQShortString getQueueName()
        {
            return ;
        }
        public long getMessageId()
        {
            return ;
        }
    }
    static final class QueueEntryKeyBinding extends TupleBinding<QueueEntryKey>
    {
        public QueueEntryKey entryToObject(TupleInput tupleInput)
        {
            AMQShortString queueName = AMQShortStringEncoding.readShortString(tupleInput);
            long messageId = tupleInput.readLong();
            return new QueueEntryKey(queueNamemessageId);
        }
        public void objectToEntry(QueueEntryKey mk, TupleOutput tupleOutput)
        {
            AMQShortStringEncoding.writeShortString(mk.getQueueName(), tupleOutput);
            tupleOutput.writeLong(mk.getMessageId());
        }
    }
    static final class BindingRecord extends Object
    {
        private final AMQShortString _exchangeName;
        private final AMQShortString _queueName;
        private final AMQShortString _routingKey;
        private final FieldTable _arguments;
        public BindingRecord(AMQShortString exchangeNameAMQShortString queueNameAMQShortString routingKey,
                FieldTable arguments)
        {
             = exchangeName;
             = queueName;
             = routingKey;
             = arguments;
        }
        public AMQShortString getExchangeName()
        {
            return ;
        }
        public AMQShortString getQueueName()
        {
            return ;
        }
        public AMQShortString getRoutingKey()
        {
            return ;
        }
        public FieldTable getArguments()
        {
            return ;
        }
    }
    static final class BindingTuple extends TupleBinding<BindingRecord>
    {
        public BindingRecord entryToObject(TupleInput tupleInput)
        {
            AMQShortString exchangeName = AMQShortStringEncoding.readShortString(tupleInput);
            AMQShortString queueName = AMQShortStringEncoding.readShortString(tupleInput);
            AMQShortString routingKey = AMQShortStringEncoding.readShortString(tupleInput);
            FieldTable arguments = FieldTableEncoding.readFieldTable(tupleInput);
            return new BindingRecord(exchangeNamequeueNameroutingKeyarguments);
        }
        public void objectToEntry(BindingRecord binding, TupleOutput tupleOutput)
        {
            AMQShortStringEncoding.writeShortString(binding.getExchangeName(), tupleOutput);
            AMQShortStringEncoding.writeShortString(binding.getQueueName(), tupleOutput);
            AMQShortStringEncoding.writeShortString(binding.getRoutingKey(), tupleOutput);
            FieldTableEncoding.writeFieldTable(binding.getArguments(), tupleOutput);
        }
    }