Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Copyright (c) 2010-2011. Axon Framework
   *
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   *
   *     http://www.apache.org/licenses/LICENSE-2.0
   *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 
 package org.axonframework.eventsourcing;
 
 
 import java.util.List;
 import java.util.Set;
Abstract repository implementation that allows easy implementation of an Event Sourcing mechanism. It will automatically publish new events to the given org.axonframework.eventhandling.EventBus and delegate event storage to the provided org.axonframework.eventstore.EventStore.

 
 public class EventSourcingRepository<T extends EventSourcedAggregateRootextends LockingRepository<T> {
 
     private volatile EventStore eventStore;
     private final AggregateFactory<T> aggregateFactory;

    
Initializes a repository with the default locking strategy.

Deprecated:
This constructor will be removed in future releases. Use EventSourcingRepository(org.axonframework.eventsourcing.AggregateFactory) instead.
See also:
org.axonframework.repository.LockingRepository.org.axonframework.repository.LockingRepository.()
 
     @Deprecated
     protected EventSourcingRepository() {
          = null;
     }

    
Initialize a repository with the given locking strategy.

Deprecated:
This constructor will be removed in future release. Use EventSourcingRepository(org.axonframework.eventsourcing.AggregateFactory,org.axonframework.repository.LockingStrategy) instead.
Parameters:
lockingStrategy the locking strategy to apply to this
 
     @Deprecated
     protected EventSourcingRepository(final LockingStrategy lockingStrategy) {
         super(lockingStrategy);
          = null;
     }

    
Initializes a repository with the default locking strategy.

Parameters:
aggregateFactory The factory for new aggregate instances
See also:
org.axonframework.repository.LockingRepository.org.axonframework.repository.LockingRepository.()
 
     public EventSourcingRepository(final AggregateFactory<T> aggregateFactory) {
         this. = aggregateFactory;
     }

    
Initialize a repository with the given locking strategy.

Parameters:
aggregateFactory The factory for new aggregate instances
lockingStrategy the locking strategy to apply to this
    public EventSourcingRepository(final AggregateFactory<T> aggregateFactory,
                                   final LockingStrategy lockingStrategy) {
        super(lockingStrategy);
        this. = aggregateFactory;
    }

    
Perform the actual saving of the aggregate. All necessary locks have been verified.

Parameters:
aggregate the aggregate to store
    @Override
    protected void doSaveWithLock(T aggregate) {
        DomainEventStream eventStream = aggregate.getUncommittedEvents();
        while (iterator.hasNext()) {
            eventStream = iterator.next().decorateForAppend(getTypeIdentifier(), aggregateeventStream);
        }
        .appendEvents(getTypeIdentifier(), eventStream);
    }

    
Delegates to doSaveWithLock(org.axonframework.eventsourcing.EventSourcedAggregateRoot), as Event Sourcing generally doesn't delete aggregates (not their events).

This method may be safely overridden for special cases that do require deleting an Aggregate's Events.

Parameters:
aggregate the aggregate to delete
    @Override
    protected void doDeleteWithLock(T aggregate) {
        doSaveWithLock(aggregate);
    }

    
Perform the actual loading of an aggregate. The necessary locks have been obtained.

Parameters:
aggregateIdentifier the identifier of the aggregate to load
expectedVersion The expected version of the loaded aggregate
Returns:
the fully initialized aggregate
Throws:
AggregateDeletedException in case an aggregate existed in the past, but has been deleted
org.axonframework.repository.AggregateNotFoundException when an aggregate with the given identifier does not exist
    @Override
    protected T doLoad(AggregateIdentifier aggregateIdentifierfinal Long expectedVersion) {
        DomainEventStream events;
        try {
            events = .readEvents(getTypeIdentifier(), aggregateIdentifier);
        } catch (EventStreamNotFoundException e) {
            throw new AggregateNotFoundException(aggregateIdentifier"The aggregate was not found"e);
        }
        for (EventStreamDecorator decorator : ) {
            events = decorator.decorateForRead(getTypeIdentifier(), aggregateIdentifierevents);
        }
        final T aggregate = createAggregate(aggregateIdentifierevents.peek());
        List<DomainEventunseenEvents = new ArrayList<DomainEvent>();
        aggregate.initializeState(new CapturingEventStream(eventsunseenEventsexpectedVersion));
        CurrentUnitOfWork.get().registerListener(new ConflictResolvingListener(aggregateunseenEvents));
        return aggregate;
    }

    
Returns the factory used by this repository.

Returns:
the factory used by this repository
    public AggregateFactory<T> getAggregateFactory() {
        return ;
    }
    private List<DomainEventasList(DomainEventStream domainEventStream) {
        List<DomainEventunseenEvents = new ArrayList<DomainEvent>();
        while (domainEventStream.hasNext()) {
            unseenEvents.add(domainEventStream.next());
        }
        return unseenEvents;
    }

    

This implementation is aware of the AggregateSnapshot type events. When firstEvent is an instance of AggregateSnapshot, the aggregate is extracted from the event. Otherwise, aggregate creation is delegated to the abstract instantiateAggregate(org.axonframework.domain.AggregateIdentifier, org.axonframework.domain.DomainEvent) method.

    @Deprecated
    @SuppressWarnings({"unchecked"})
    protected T createAggregate(AggregateIdentifier aggregateIdentifierDomainEvent firstEvent) {
        T aggregate;
        if (AggregateSnapshot.class.isInstance(firstEvent)) {
            aggregate = (T) ((AggregateSnapshotfirstEvent).getAggregate();
        } else {
            aggregate = instantiateAggregate(aggregateIdentifierfirstEvent);
        }
        return aggregate;
    }

    
Instantiate the aggregate using the given aggregate identifier and first event. The first event of the event stream is passed to allow the repository to identify the actual implementation type of the aggregate to create. The first event can be either the event that created the aggregate or, when using event sourcing, a snapshot event. In either case, the event should be designed, such that these events contain enough information to deduct the actual aggregate type.

Note that aggregate state should *not* be initialized by this method. That means, no events should be applied by a call to this method. The first event is passed to allow the implementation to base the exact type of aggregate to instantiate on that event.

Deprecated:
Explicit use of this method is deprecated. Instead of overriding this method, use theEventSourcingRepository(org.axonframework.eventsourcing.AggregateFactory) or EventSourcingRepository(org.axonframework.eventsourcing.AggregateFactory,org.axonframework.repository.LockingStrategy) constructor to indicate how an aggregate should be created.
Parameters:
aggregateIdentifier the aggregate identifier of the aggregate to instantiate
firstEvent The first event in the event stream. This is either the event generated during creation of the aggregate, or a snapshot event
Returns:
an aggregate ready for initialization using a DomainEventStream.
    @Deprecated
    protected T instantiateAggregate(AggregateIdentifier aggregateIdentifierDomainEvent firstEvent) {
        if ( == null) {
            throw new IllegalStateException("Either an aggregate factory must be configured (recommended), "
                                                    + "or the instantiateAggregate() method must be overridden.");
        }
        return .createAggregate(aggregateIdentifierfirstEvent);
    }

    
Return the type identifier belonging to the AggregateFactory of this repository.

Returns:
the type identifier belonging to the AggregateFactory of this repository
    public String getTypeIdentifier() {
        if ( == null) {
            throw new IllegalStateException("Either an aggregate factory must be configured (recommended), "
                                                    + "or the getTypeIdentifier() method must be overridden.");
        }
        return .getTypeIdentifier();
    }

    

This implementation will do nothing if a conflict resolver (See setConflictResolver(org.axonframework.eventsourcing.ConflictResolver) is set. Otherwise, it will call super.validateOnLoad(...).

    @Override
    protected void validateOnLoad(T aggregateLong expectedVersion) {
        if ( == null) {
            super.validateOnLoad(aggregateexpectedVersion);
        }
    }

    
Sets the event store that would physically store the events.

Parameters:
eventStore the event bus to publish events to
    @Resource
    public void setEventStore(EventStore eventStore) {
        this. = eventStore;
    }

    
Sets the Event Stream Decorators that will process the event in the DomainEventStream when read, or written to the event store.

When appending events to the event store, the processors are invoked in the reverse order, causing the first decorator in this list to receive each event first. When reading from events, the decorators are invoked in the order given.

Parameters:
eventProcessors The processors to that will process events in the DomainEventStream
    public void setEventStreamDecorators(List<? extends EventStreamDecoratoreventProcessors) {
        this..addAll(eventProcessors);
    }

    
Sets the snapshotter trigger for this repository.

Parameters:
snapshotterTrigger the snapshotter trigger for this repository.
    public void setSnapshotterTrigger(SnapshotterTrigger snapshotterTrigger) {
        this..add(snapshotterTrigger);
    }

    
Sets the conflict resolver to use for this repository. If not set (or null), the repository will throw an exception if any unexpected changes appear in loaded aggregates.

Parameters:
conflictResolver The conflict resolver to use for this repository
    public void setConflictResolver(ConflictResolver conflictResolver) {
        this. = conflictResolver;
    }
    private final class ConflictResolvingListener extends UnitOfWorkListenerAdapter {
        private final T aggregate;
        private final List<DomainEventunseenEvents;
        private ConflictResolvingListener(T aggregateList<DomainEventunseenEvents) {
            this. = aggregate;
            this. = unseenEvents;
        }
        @Override
        public void onPrepareCommit(Set<AggregateRootaggregateRootsList<Eventevents) {
            if (hasPotentialConflicts()) {
            }
        }
        private boolean hasPotentialConflicts() {
            return .getUncommittedEventCount() > 0
                    && .getVersion() != null
                    && !.isEmpty();
        }
    }

    
Wrapper around a DomainEventStream that captures all passing events of which the sequence number is larger than the expected version number.
    private static final class CapturingEventStream implements DomainEventStream {
        private final DomainEventStream eventStream;
        private final List<DomainEventunseenEvents;
        private final Long expectedVersion;
        private CapturingEventStream(DomainEventStream events,
                                     List<DomainEventunseenEvents,
                                     Long expectedVersion) {
             = events;
            this. = unseenEvents;
            this. = expectedVersion;
        }
        @Override
        public boolean hasNext() {
            return .hasNext();
        }
        @Override
        public DomainEvent next() {
            DomainEvent next = .next();
            if ( != null && next.getSequenceNumber() > ) {
                .add(next);
            }
            return next;
        }
        @Override
        public DomainEvent peek() {
            return .peek();
        }
    }
New to GrepCode? Check out our FAQ X