Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Licensed to the Apache Software Foundation (ASF) under one
   * or more contributor license agreements.  See the NOTICE file
   * distributed with this work for additional information
   * regarding copyright ownership.  The ASF licenses this file
   * to you under the Apache License, Version 2.0 (the
   * "License"); you may not use this file except in compliance
   * with the License.  You may obtain a copy of the License at
   *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
  */
 package org.apache.flume.channel.file;
 
 
 
Base class for records in data file: Put, Take, Rollback, Commit
 
 public abstract class TransactionEventRecord implements Writable {
   private static final Logger LOG = LoggerFactory
       .getLogger(TransactionEventRecord.class);
   private final long transactionID;
   private long logWriteOrderID;
 
   protected TransactionEventRecord(long transactionIDlong logWriteOrderID) {
     this. = transactionID;
     this. = logWriteOrderID;
   }
 
   @Override
   public void readFields(DataInput inthrows IOException {
 
   }
   @Override
   public void write(DataOutput outthrows IOException {
 
   }
 
   abstract void writeProtos(OutputStream outthrows IOException;
 
   abstract void readProtos(InputStream inthrows IOExceptionCorruptEventException;
 
   long getLogWriteOrderID() {
     return ;
   }
   long getTransactionID() {
     return ;
   }
 
   abstract short getRecordType();


  
Provides a minimum guarantee we are not reading complete junk
 
   static final int MAGIC_HEADER = 0xdeadbeef;
 
   static enum Type {
     PUT((short)1),
     TAKE((short)2),
     ROLLBACK((short)3),
     COMMIT((short)4);
 
     private short id;
     Type(short id) {
       this. = id;
     }
     public short get() {
       return ;
     }
  }
  private static final ImmutableMap<ShortConstructor<? extends TransactionEventRecord>> TYPES;
  static {
    ImmutableMap.Builder<ShortConstructor<? extends TransactionEventRecord>> builder =
        ImmutableMap.<ShortConstructor<? extends TransactionEventRecord>>builder();
    try {
      builder.put(..get(),
          Put.class.getDeclaredConstructor(Long.classLong.class));
      builder.put(..get(),
          Take.class.getDeclaredConstructor(Long.classLong.class));
      builder.put(..get(),
          Rollback.class.getDeclaredConstructor(Long.classLong.class));
      builder.put(..get(),
          Commit.class.getDeclaredConstructor(Long.classLong.class));
    } catch (Exception e) {
      Throwables.propagate(e);
    }
     = builder.build();
  }
    ByteArrayOutputStream byteOutput = new ByteArrayOutputStream(512);
    DataOutputStream dataOutput = new DataOutputStream(byteOutput);
    try {
      dataOutput.writeInt();
      dataOutput.writeShort(record.getRecordType());
      dataOutput.writeLong(record.getTransactionID());
      dataOutput.writeLong(record.getLogWriteOrderID());
      record.write(dataOutput);
      dataOutput.flush();
      // TODO toByteArray does an unneeded copy
      return ByteBuffer.wrap(byteOutput.toByteArray());
    } catch(IOException e) {
      // near impossible
      throw Throwables.propagate(e);
    } finally {
      if(dataOutput != null) {
        try {
          dataOutput.close();
        } catch (IOException e) {
          .warn("Error closing byte array output stream"e);
        }
      }
    }
  }
      throws IOException {
    int header = in.readInt();
    if(header != ) {
      throw new IOException("Header " + Integer.toHexString(header) +
          " is not the required value: " + Integer.toHexString());
    }
    short type = in.readShort();
    long transactionID = in.readLong();
    long writeOrderID = in.readLong();
    TransactionEventRecord entry = newRecordForType(typetransactionID,
        writeOrderID);
    entry.readFields(in);
    return entry;
  }
    ByteArrayOutputStream byteOutput = new ByteArrayOutputStream(512);
    try {
      ProtosFactory.TransactionEventHeader.Builder headerBuilder =
          ProtosFactory.TransactionEventHeader.newBuilder();
      headerBuilder.setType(record.getRecordType());
      headerBuilder.setTransactionID(record.getTransactionID());
      headerBuilder.setWriteOrderID(record.getLogWriteOrderID());
      headerBuilder.build().writeDelimitedTo(byteOutput);
      record.writeProtos(byteOutput);
          ProtosFactory.TransactionEventFooter.newBuilder().build();
      footer.writeDelimitedTo(byteOutput);
      return ByteBuffer.wrap(byteOutput.toByteArray());
    } catch(IOException e) {
      throw Throwables.propagate(e);
    } finally {
      if(byteOutput != null) {
        try {
          byteOutput.close();
        } catch (IOException e) {
          .warn("Error closing byte array output stream"e);
        }
      }
    }
  }
  static TransactionEventRecord fromByteArray(byte[] buffer)
      throws IOExceptionCorruptEventException {
    ByteArrayInputStream in = new ByteArrayInputStream(buffer);
    try {
      ProtosFactory.TransactionEventHeader header = Preconditions.
          checkNotNull(ProtosFactory.TransactionEventHeader.
              parseDelimitedFrom(in), "Header cannot be null");
      short type = (short)header.getType();
      long transactionID = header.getTransactionID();
      long writeOrderID = header.getWriteOrderID();
      TransactionEventRecord transactionEvent =
          newRecordForType(typetransactionIDwriteOrderID);
      transactionEvent.readProtos(in);
      @SuppressWarnings("unused")
      ProtosFactory.TransactionEventFooter footer = Preconditions.checkNotNull(
          ProtosFactory.TransactionEventFooter.
          parseDelimitedFrom(in), "Footer cannot be null");
      return transactionEvent;
    } catch (InvalidProtocolBufferException ex) {
      throw new CorruptEventException(
        "Could not parse event from data file."ex);
    } finally {
      try {
        in.close();
      } catch (IOException e) {
        .warn("Error closing byte array input stream"e);
      }
    }
  }
  static String getName(short type) {
    Constructor<? extends TransactionEventRecordconstructor = .get(type);
    Preconditions.checkNotNull(constructor"Unknown action " +
        Integer.toHexString(type));
    return constructor.getDeclaringClass().getSimpleName();
  }
  private static TransactionEventRecord newRecordForType(short type,
      long transactionIDlong writeOrderID) {
    Constructor<? extends TransactionEventRecordconstructor = .get(type);
    Preconditions.checkNotNull(constructor"Unknown action " +
        Integer.toHexString(type));
    try {
      return constructor.newInstance(transactionIDwriteOrderID);
    } catch (Exception e) {
      throw Throwables.propagate(e);
    }
  }
New to GrepCode? Check out our FAQ X