Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Copyright (c) 2010-2011. Axon Framework
   *
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   *
   *     http://www.apache.org/licenses/LICENSE-2.0
   *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 
 package org.axonframework.eventstore.jpa;
 
 
 import java.util.List;
An EventStore implementation that uses JPA to store DomainEvents in a database. The actual DomainEvent is stored as a serialized blob of bytes. Other columns are used to store meta-data that allow quick finding of DomainEvents for a specific aggregate in the correct order.

This EventStore supports snapshots pruning, which can enabled by configuring a maximum number of snapshots to archive. By default snapshot pruning is configured to archive only DEFAULT_MAX_SNAPSHOTS_ARCHIVED snapshot per aggregate.

The serializer used to serialize the events is configurable. By default, the org.axonframework.eventstore.XStreamEventSerializer is used.

Author(s):
Allard Buijze
Since:
0.5
 
 public class JpaEventStore implements SnapshotEventStoreEventStoreManagement {
 
     private static final Logger logger = LoggerFactory.getLogger(JpaEventStore.class);
 
     private EntityManager entityManager;
 
     private final Serializer<? super DomainEventeventSerializer;
     private static final int DEFAULT_BATCH_SIZE = 100;
     private int batchSize = ;
     private static final int DEFAULT_MAX_SNAPSHOTS_ARCHIVED = 1;
     private final EventEntryStore eventEntryStore;
 
Initialize a JpaEventStore using an org.axonframework.eventstore.XStreamEventSerializer, which serializes events as XML and the default Event Entry store.

The JPA Persistence context is required to contain two entities: DomainEventEntry and SnapshotEventEntry.

 
     public JpaEventStore() {
         this(new XStreamEventSerializer(), new DefaultEventEntryStore());
     }

    
Initialize a JpaEventStore which serializes events using the given eventSerializer and the default Event Entry store.

The JPA Persistence context is required to contain two entities: DomainEventEntry and SnapshotEventEntry.

Deprecated:
Use JpaEventStore(org.axonframework.serializer.Serializer) instead
Parameters:
eventSerializer The serializer to (de)serialize domain events with.
 
     @Deprecated
     public JpaEventStore(EventSerializer eventSerializer) {
         this(new LegacyEventSerializerWrapper(eventSerializer), new DefaultEventEntryStore());
     }

    
Initialize a JpaEventStore which serializes events using the given eventSerializer and the default Event Entry store.

The JPA Persistence context is required to contain two entities: DomainEventEntry and SnapshotEventEntry.

Parameters:
eventSerializer The serializer to (de)serialize domain events with.
    public JpaEventStore(Serializer<? super DomainEventeventSerializer) {
        this(eventSerializernew DefaultEventEntryStore());
    }

    
Initialize a JpaEventStore using the given eventEntryStore and an org.axonframework.eventstore.XStreamEventSerializer, which serializes events as XML.

Parameters:
eventEntryStore The instance providing persistence logic for Domain Event entries
    public JpaEventStore(EventEntryStore eventEntryStore) {
        this(new XStreamEventSerializer(), eventEntryStore);
    }

    
Initialize a JpaEventStore which serializes events using the given eventSerializer and stores the events in the database using the given eventEntryStore.

Parameters:
eventSerializer The serializer to (de)serialize domain events with.
eventEntryStore The instance providing persistence logic for Domain Event entries
    public JpaEventStore(Serializer<? super DomainEventeventSerializerEventEntryStore eventEntryStore) {
        this. = eventSerializer;
        this. = eventEntryStore;
    }

    
    @Override
    public void appendEvents(String typeDomainEventStream events) {
        DomainEvent event = null;
        try {
            while (events.hasNext()) {
                event = events.next();
                .persistEvent(typeevent.serialize(event), );
            }
        } catch (RuntimeException exception) {
            if ( != null
                    && .isDuplicateKeyViolation(exception)) {
                throw new ConcurrencyException(
                        String.format("Concurrent modification detected for Aggregate identifier [%s], sequence: [%s]",
                                      event.getAggregateIdentifier(),
                                      event.getSequenceNumber().toString()),
                        exception);
            }
            throw exception;
        }
    }

    
    @Override
    public DomainEventStream readEvents(String typeAggregateIdentifier identifier) {
        long snapshotSequenceNumber = -1;
        byte[] lastSnapshotEvent = .loadLastSnapshotEvent(typeidentifier);
        DomainEvent snapshotEvent = null;
        if (lastSnapshotEvent != null) {
            try {
                snapshotEvent = (DomainEvent.deserialize(lastSnapshotEvent);
                snapshotSequenceNumber = snapshotEvent.getSequenceNumber();
            } catch (RuntimeException ex) {
                .warn("Error while reading snapshot event entry. "
                                    + "Reconstructing aggregate on entire event stream. Caused by: {} {}",
                            ex.getClass().getName(),
                            ex.getMessage());
            }
        }
        List<DomainEventevents = fetchBatch(typeidentifiersnapshotSequenceNumber + 1);
        if (snapshotEvent != null) {
            events.add(0, snapshotEvent);
        }
        if (events.isEmpty()) {
            throw new EventStreamNotFoundException(typeidentifier);
        }
        return new BatchingDomainEventStream(eventsidentifiertype);
    }
    @SuppressWarnings({"unchecked"})
    private List<DomainEventfetchBatch(String typeAggregateIdentifier identifierlong firstSequenceNumber) {
        List<byte[]> entries = .fetchBatch(typeidentifierfirstSequenceNumber,
                                                          );
        List<DomainEventevents = new ArrayList<DomainEvent>(entries.size());
        for (byte[] entry : entries) {
            events.add((DomainEvent.deserialize(entry));
        }
        return events;
    }

    

Upon appending a snapshot, this particular EventStore implementation also prunes snapshots which are considered redundant because they fall outside of the range of maximum snapshots to archive.

    @Override
    public void appendSnapshotEvent(String typeDomainEvent snapshotEvent) {
        // Persist snapshot before pruning redundant archived ones, in order to prevent snapshot misses when reloading
        // an aggregate, which may occur when a READ_UNCOMMITTED transaction isolation level is used.
        .persistSnapshot(typesnapshotEvent.serialize(snapshotEvent), );
        if ( > 0) {
            .pruneSnapshots(typesnapshotEvent);
        }
    }
    @Override
    public void visitEvents(EventVisitor visitor) {
        int first = 0;
        List<byte[]> batch;
        boolean shouldContinue = true;
        while (shouldContinue) {
            batch = .fetchBatch(first);
            for (byte[] entry : batch) {
                visitor.doWithEvent((DomainEvent.deserialize(entry));
            }
            shouldContinue = (batch.size() >= );
            first += ;
        }
    }

    
Sets the EntityManager for this EventStore to use.

Parameters:
entityManager the EntityManager to use.
    public void setEntityManager(EntityManager entityManager) {
        this. = entityManager;
    }

    
Registers the data source that allows the EventStore to detect the database type and define the error codes that represent concurrent access failures.

Should not be used in combination with setPersistenceExceptionResolver(org.axonframework.eventstore.jpa.PersistenceExceptionResolver), but rather as a shorthand alternative for most common database types.

Parameters:
dataSource A data source providing access to the backing database
Throws:
java.sql.SQLException If an error occurs while accessing the dataSource
    public void setDataSource(DataSource dataSourcethrows SQLException {
        if ( == null) {
             = new SQLErrorCodesResolver(dataSource);
        }
    }

    
Sets the persistenceExceptionResolver that will help detect concurrency exceptions from the backing database.

Parameters:
persistenceExceptionResolver the persistenceExceptionResolver that will help detect concurrency exceptions
    public void setPersistenceExceptionResolver(PersistenceExceptionResolver persistenceExceptionResolver) {
        this. = persistenceExceptionResolver;
    }

    
Sets the number of events that should be read at each database access. When more than this number of events must be read to rebuild an aggregate's state, the events are read in batches of this size. Defaults to 100.

Tip: if you use a snapshotter, make sure to choose snapshot trigger and batch size such that a single batch will generally retrieve all events required to rebuild an aggregate's state.

Parameters:
batchSize the number of events to read on each database access. Default to 100.
    public void setBatchSize(int batchSize) {
        this. = batchSize;
    }

    
Sets the maximum number of snapshots to archive for an aggregate. The EventStore will keep at most this number of snapshots per aggregate.

Defaults to DEFAULT_MAX_SNAPSHOTS_ARCHIVED.

Parameters:
maxSnapshotsArchived The maximum number of snapshots to archive for an aggregate. A value less than 1 disables pruning of snapshots.
    public void setMaxSnapshotsArchived(int maxSnapshotsArchived) {
        this. = maxSnapshotsArchived;
    }
    private final class BatchingDomainEventStream implements DomainEventStream {
        private int currentBatchSize;
        private Iterator<DomainEventcurrentBatch;
        private DomainEvent next;
        private final AggregateIdentifier id;
        private final String typeId;
        private BatchingDomainEventStream(List<DomainEventfirstBatchAggregateIdentifier id,
                                          String typeId) {
            this. = id;
            this. = typeId;
            this. = firstBatch.size();
            this. = firstBatch.iterator();
            if (.hasNext()) {
                 = .next();
            }
        }
        @Override
        public boolean hasNext() {
            return  != null;
        }
        @Override
        public DomainEvent next() {
            DomainEvent nextEvent = ;
            if (!.hasNext() &&  >= ) {
                .debug("Fetching new batch for Aggregate [{}]".asString());
                 = fetchBatch(.getSequenceNumber() + 1).iterator();
            }
             = .hasNext() ? .next() : null;
            return nextEvent;
        }
        @Override
        public DomainEvent peek() {
            return ;
        }
    }
New to GrepCode? Check out our FAQ X