Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Copyright (c) 2010-2011. Axon Framework
   *
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   *
   *     http://www.apache.org/licenses/LICENSE-2.0
   *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 
 package org.axonframework.eventstore.fs;
 
 
 import java.io.File;
 
 import static org.axonframework.eventstore.fs.EventSerializationUtils.*;

Implementation of the org.axonframework.eventstore.EventStore that serializes objects using XStream and writes them to files to disk. Each aggregate is represented by a single file, where each event of that aggregate is a line in that file. Events are serialized to XML format, making them readable for both user and machine.

Use setBaseDir(java.io.File) to specify the directory where event files should be stored.

Note that the resource supplied must point to a folder and should contain a trailing slash. See org.springframework.core.io.FileSystemResource.(java.lang.String).

Author(s):
Allard Buijze
Since:
0.5
 
 public class FileSystemEventStore implements EventStoreSnapshotEventStore {
 
     private static final Logger logger = LoggerFactory.getLogger(FileSystemEventStore.class);
 
     private final Serializer<? super DomainEventeventSerializer;
     private EventFileResolver eventFileResolver;

    
Basic initialization of the event store. The actual serialization and deserialization is delegated to a org.axonframework.eventstore.XStreamEventSerializer
 
     public FileSystemEventStore() {
         this. = new XStreamEventSerializer();
     }

    
Customized initialization of the event store. The actual serialization and deserialization is delegated to the provided eventSerializer .

Deprecated:
Use FileSystemEventStore(org.axonframework.serializer.Serializer) instead
Parameters:
eventSerializer The serializer to serialize DomainEvents with
 
     @Deprecated
     public FileSystemEventStore(final EventSerializer eventSerializer) {
         this(new LegacyEventSerializerWrapper(eventSerializer));
     }

    
Initialize the FileSystemEventStore using the given serializer. The serializer must be capable of serializing at least DomainEvents.

Parameters:
serializer The serializer capable of serializing (at least) DomainEvents
 
     public FileSystemEventStore(Serializer<? super DomainEventserializer) {
         this. = serializer;
     }

    

This implementation writes events to an event log on the file system. It uses a directory per type of aggregate, containing 1 file per aggregate.

    @Override
    public void appendEvents(String typeDomainEventStream eventsToStore) {
        if (!eventsToStore.hasNext()) {
            return;
        }
        OutputStream out = null;
        try {
            DomainEvent next = eventsToStore.next();
            out = .openEventFileForWriting(typenext.getAggregateIdentifier());
            do {
                byte[] bytes = .serialize(next);
                String timeStamp = next.getTimestamp().toString();
                writeEventEntry(outnext.getSequenceNumber(), timeStampbytes);
                if (eventsToStore.hasNext()) {
                    next = eventsToStore.next();
                } else {
                    next = null;
                }
            } while (next != null);
        } catch (IOException e) {
            throw new EventStoreException("Unable to store given entity due to an IOException"e);
        } finally {
            IOUtils.closeQuietly(out);
        }
    }

    
    @Override
    public DomainEventStream readEvents(String typeAggregateIdentifier identifier) {
        try {
            if (!.eventFileExists(typeidentifier)) {
                throw new EventStreamNotFoundException(typeidentifier);
            }
            InputStream eventFileInputStream = .openEventFileForReading(typeidentifier);
            return readEvents(typeidentifiereventFileInputStream);
        } catch (IOException e) {
            throw new EventStoreException(
                    String.format("An error occurred while trying to open the event file "
                                          + "for aggregate type [%s] with identifier [%s]",
                                  type,
                                  identifier.asString()), e);
        }
    }

    

Throws:
org.axonframework.eventstore.EventStoreException when an error occurs while reading or writing to the event logs.
    @Override
    public void appendSnapshotEvent(String typeDomainEvent snapshotEvent) {
        AggregateIdentifier aggregateIdentifier = snapshotEvent.getAggregateIdentifier();
        OutputStream fileOutputStream = null;
        try {
            byte[] serializedEvent = .serialize(snapshotEvent);
            long offset = calculateOffset(typeaggregateIdentifiersnapshotEvent.getSequenceNumber());
            long sequenceNumber = snapshotEvent.getSequenceNumber();
            String timeStamp = snapshotEvent.getTimestamp().toString();
            SnapshotEventEntry snapshotEntry = new SnapshotEventEntry(serializedEvent,
                                                                      sequenceNumber,
                                                                      timeStamp,
                                                                      offset);
            fileOutputStream = .openSnapshotFileForWriting(typeaggregateIdentifier);
            EventSerializationUtils.writeSnapshotEntry(fileOutputStreamsnapshotEntry);
        } catch (IOException e) {
            throw new EventStoreException("Error writing a snapshot event due to an IO exception"e);
        } finally {
            IOUtils.closeQuietly(fileOutputStream);
        }
    }
    private long calculateOffset(String typeAggregateIdentifier aggregateIdentifierlong sequenceNumber)
            throws IOException {
        CountingInputStream countingInputStream = null;
        try {
            InputStream eventInputStream = .openEventFileForReading(typeaggregateIdentifier);
            countingInputStream = new CountingInputStream(new BufferedInputStream(eventInputStream));
            long lastReadSequenceNumber = -1;
            while (lastReadSequenceNumber < sequenceNumber) {
                EventEntry entry = readEventEntry(countingInputStream);
                lastReadSequenceNumber = entry.getSequenceNumber();
            }
            return countingInputStream.getByteCount();
        } finally {
            IOUtils.closeQuietly(countingInputStream);
        }
    }
    private DomainEventStream readEvents(String typeAggregateIdentifier identifierInputStream eventFileInputStream)
            throws IOException {
        SnapshotEventEntry snapshotEntry = readSnapshotEvent(typeidentifiereventFileInputStream);
        InputStream is = eventFileInputStream;
        if (snapshotEntry != null) {
            String timeStamp = snapshotEntry.getTimeStamp();
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            writeEventEntry(baossnapshotEntry.getSequenceNumber(), timeStampsnapshotEntry.getBytes());
            is = new SequenceInputStream(new ByteArrayInputStream(baos.toByteArray()), eventFileInputStream);
        }
        return new BufferedReaderDomainEventStream(is);
    }
    private SnapshotEventEntry readSnapshotEvent(String typeAggregateIdentifier identifier,
                                                 InputStream eventFileInputStream)
            throws IOException {
        SnapshotEventEntry snapshotEvent = null;
        if (.snapshotFileExists(typeidentifier)) {
            InputStream snapshotFileInputStream = .openSnapshotFileForReading(typeidentifier);
            try {
                snapshotEvent = readLastSnapshotEntry(snapshotFileInputStream);
                long actuallySkipped = eventFileInputStream.skip(snapshotEvent.getOffset());
                if (actuallySkipped != snapshotEvent.getOffset()) {
                    .warn(
                            "The skip operation did not actually skip the expected amount of bytes. "
                                    + "The event log of aggregate of type {} and identifier {} might be corrupt.",
                            type,
                            identifier.asString());
                }
            } finally {
                IOUtils.closeQuietly(snapshotFileInputStream);
            }
        }
        return snapshotEvent;
    }

    
Sets the base directory where the event store will store all events.

Parameters:
baseDir the location to store event files
    public void setBaseDir(File baseDir) {
         = new SimpleEventFileResolver(baseDir);
    }

    
Sets the event file resolver to use. This setter is an alternative to the setBaseDir(java.io.File) one.

Parameters:
eventFileResolver The EventFileResolver providing access to event files
    public void setEventFileResolver(EventFileResolver eventFileResolver) {
        this. = eventFileResolver;
    }

    
DomainEventStream implementation that reads DomainEvents from an inputItream. Entries in the input stream must be formatted as described by EventSerializationUtils
    private static class BufferedReaderDomainEventStream implements DomainEventStream {
        private DomainEvent next;
        private final InputStream inputStream;
        private final Serializer<? super DomainEventserializer;

        
Initialize a BufferedReaderDomainEventStream using the given inputStream and serializer. The inputStream must provide a serialized DomainEvent, prefixed with a UTF-8 encoded number indicating the number of bytes to read and a number representing the sequence number of the event. In between each number and the serialized DomainEvent, there must be at least a single whitespace character.

Example:
1234 The serialized domain event using 1234 bytes...

The reader will be closed when the last event has been read from it, or when an exception occurs while reading or deserializing an event.

Parameters:
inputStream The inputStream providing serialized DomainEvents
serializer The serializer to deserialize the DomainEvents
        public BufferedReaderDomainEventStream(InputStream inputStreamSerializer<? super DomainEventserializer) {
            this. = new BufferedInputStream(inputStream);
            this. = serializer;
            this. = doReadNext();
        }

        
        @Override
        public boolean hasNext() {
            return  != null;
        }

        
        @Override
        public DomainEvent next() {
            DomainEvent toReturn = ;
             = doReadNext();
            return toReturn;
        }
        @Override
        public DomainEvent peek() {
            return ;
        }
        private DomainEvent doReadNext() {
            try {
                EventEntry serializedEvent = readEventEntry();
                if (serializedEvent == null) {
                    IOUtils.closeQuietly();
                    return null;
                }
                return serializedEvent.deserialize();
            } catch (IOException e) {
                IOUtils.closeQuietly();
                throw new EventStoreException("An error occurred while reading from the underlying source"e);
            } catch (RuntimeException e) {
                IOUtils.closeQuietly();
                throw e;
            }
        }
    }
New to GrepCode? Check out our FAQ X