Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Copyright 2013, Red Hat Middleware LLC, and individual contributors
   * as indicated by the @author tags.
   * See the copyright.txt in the distribution for a
   * full listing of individual contributors.
   * This copyrighted material is made available to anyone wishing to use,
   * modify, copy, or redistribute it subject to the terms and conditions
   * of the GNU Lesser General Public License, v. 2.1.
   * This program is distributed in the hope that it will be useful, but WITHOUT A
  * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
  * PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more details.
  * You should have received a copy of the GNU Lesser General Public License,
  * v.2.1 along with this distribution; if not, write to the Free Software
  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
  * MA  02110-1301, USA.
  *
  * (C) 2013
  * @author JBoss Inc.
  */
 package com.arjuna.ats.internal.jta.recovery.arjunacore;
 
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
 
This CommitMarkableResourceRecord assumes the following table has been created: create table xids (xid varbinary(255), transactionManagerID varchar(255)) (ora/syb/mysql) create table xids (xid bytea, transactionManagerID varchar(255)) (psql) sp_configure "lock scheme",0,datarows (syb) The CommitMarkableResourceRecord does not support nested transactions TODO you have to set max_allowed_packet for large reaps on mysql
 
 	// 'type' within the Object Store for AtomicActions.
 	private static final String CONNECTABLE_ATOMIC_ACTION_TYPE =
 
 
 	private List<StringjndiNamesToContact = new ArrayList<String>();
 
 	private Map<XidStringcommittedXidsToJndiNames = new HashMap<XidString>();
 
 
 	// Reference to the Object Store.
 	private static RecoveryStore recoveryStore = null;

This map contains items that were in the database, we use it in phase 2 to work out what we can GC
 
 	private Map<StringMap<XidUid>> jndiNamesToPossibleXidsForGC = new HashMap<StringMap<XidUid>>();

Typically the whereFilter will restrict this node to recovering the database solely for itself but it is possible to recover for different nodes. It uses the JTAEnvironmentBean::getXaRecoveryNodes().
	private static JTAEnvironmentBean jtaEnvironmentBean = BeanPopulator
	private Map<StringList<Xid>> completedBranches = new HashMap<StringList<Xid>>();
    private boolean inFirstPass;
		JTAEnvironmentBean jtaEnvironmentBean = BeanPopulator
		.addAll(jtaEnvironmentBean
		List<StringxaRecoveryNodes = jtaEnvironmentBean.getXaRecoveryNodes();
		if (xaRecoveryNodes
else {
			StringBuffer buffer = new StringBuffer();
			Iterator<Stringiterator = xaRecoveryNodes.iterator();
			while (iterator.hasNext()) {
				buffer.append("\'" + iterator.next() + "\',");
			}
			 = " where transactionManagerID in ( "
buffer.substring(0, buffer.length() - 1) + ")";
		}
		if ( == null) {
			 = StoreManager.getRecoveryStore();
		}
	}
	public void notifyOfCompletedBranch(String commitMarkableResourceJndiName,
			Xid xid) {
		synchronized () {
			List<XidcompletedXids = 
					.get(commitMarkableResourceJndiName);
			if (completedXids == null) {
				completedXids = new ArrayList<Xid>();
				.put(commitMarkableResourceJndiName,
						completedXids);
			}
			completedXids.add(xid);
		}
	}
	public synchronized void periodicWorkFirstPass() {
        if () {
            return;
        }
	     = true;
		// TODO - this is one shot only due to a
		// remove in the function, if this delete fails only normal
		// recovery is possible
		Map<StringList<Xid>> completedBranches2 = new HashMap<StringList<Xid>>();
		synchronized () {
			while (iterator.hasNext()) {
				String jndiName = iterator.next();
				List<XidcompletedXids = .remove(jndiName);
				completedBranches2.put(jndiNamecompletedXids);
			}
		}
		Iterator<Stringiterator2 = completedBranches2.keySet().iterator();
		while (iterator2.hasNext()) {
			String jndiName = iterator2.next();
			List<XidcompletedXids = completedBranches2.get(jndiName);
			delete(jndiNamecompletedXids);
		}
		if (..isTraceEnabled()) {
					.trace("CommitMarkableResourceRecordRecoveryModule::periodicWorkFirstPass");
		}
		// The algorithm occurs in three stages:
		// 1. We query the database to find all the branches that were committed
		// 3. We check for previously moved AtomicActions where the resource
		// manager was offline but is now online and move them back for
		// processing
		// 3. We check for in doubt AtomicActions that have incomplete branches
		// where the resource manager is now online and update them with the
		// outcome
		// Stage 1
		// Talk to all the known resource managers that support
		// CommitMarkableResourceRecord to find out what transactions have
		// committed
		try {
			while (iterator.hasNext()) {
				String jndiName = iterator.next();
				try {
					DataSource dataSource = (DataSource
							.lookup(jndiName);
					Connection connection = dataSource.getConnection();
					try {
						Statement createStatement = connection
						try {
									.get(jndiName);
							if (tableName == null) {
								tableName = ;
							}
							ResultSet rs = createStatement
									.executeQuery("SELECT xid,actionuid from "
tableName + );
							try {
								int i = 0;
								while (rs.next()) {
									i++;
									byte[] xidAsBytes = rs.getBytes(1);
											xidAsBytes);
											bais);
									XID _theXid = new XID();
									_theXid.formatID = dis.readInt();
									_theXid.gtrid_length = dis.readInt();
									_theXid.bqual_length = dis.readInt();
									int dataLength = dis.readInt();
									_theXid.data = new byte[dataLength];
									dis.read(_theXid.data, 0, dataLength);
									XidImple xid = new XidImple(_theXid);
									byte[] actionuidAsBytes = new byte[.];
									byte[] bytes = rs.getBytes(2);
									System.arraycopy(bytes, 0,
											actionuidAsBytes, 0, bytes.length);
									if (..isTraceEnabled()) {
												.trace("committedXidsToJndiNames.put"
xid + " " + jndiName);
									}
									// Populate the map of possible GCable Xids
									Uid actionuid = new Uid(actionuidAsBytes);
											.get(jndiName);
									if (map == null) {
										map = new HashMap<XidUid>();
												jndiNamemap);
									}
									map.put(xidactionuid);
								}
finally {
								try {
									rs.close();
catch (SQLException e) {
											"Could not close resultset"e);
								}
							}
finally {
							try {
								createStatement.close();
catch (SQLException e) {
										"Could not close statement"e);
							}
						}
finally {
						try {
							connection.close();
catch (SQLException e) {
							..warn("Could not close connection",
									e);
						}
					}
catch (NamingException e) {
							.warn("Could not lookup CommitMarkableResource: "
jndiName);
							"Could not lookup CommitMarkableResource: "
jndiNamee);
catch (SQLException e) {
					..warn("Could not handle connection"e);
catch (IOException e) {
							"Could not lookup write data to select"e);
				}
			}
			// Stage 2
			// Look in the object store for atomic actions that had a connected
			// resource that was not online in a previous scan but is now.
			// Also look for CONNECTABLE_ATOMIC_ACTION_TYPE that have a matching
			// ATOMIC_ACTION_TYPE and remove the CONNECTABLE_ATOMIC_ACTION_TYPE
			// reference
			try {
				ObjectStoreIterator transactionUidEnum = new ObjectStoreIterator(
				Uid currentUid = transactionUidEnum.iterate();
				while (Uid.nullUid().notEquals(currentUid)) {
					// Make sure it isn't garbage from a failure to move before:
							currentUid);
					if (state != null) {
                                )) {
                            ..debug("Could not remove a: "
currentUid);
						}
else {
    				    state = .read_committed(currentUid);
    				    // TX may have been in progress and cleaned up by now 
    	                if (state != null) {
    								currentUidstate);
    
    							String commitMarkableResourceJndiName = rcaa
    							// Check if the resource manager is online yet
    									.contains(commitMarkableResourceJndiName)) {
    
    								// If it is remove the CRR and move it back and
    								// let
    								// the
    								// next stage update it
    								moveRecord(currentUid,
    							}
    						} else {
    						    if (..isTraceEnabled()) {
    						        ..trace("Moving " + currentUid + " back to being an AA");
    						    }
                                // It is now safe to move it back to being an AA so that it can call getNewXAResourceRecord
                                moveRecord(currentUid,
                                        ,
                                        );    						    
    						}
                        }
					}
					currentUid = transactionUidEnum.iterate();
				}
catch (ObjectStoreException ex) {
				..warn("Could not query objectstore: "ex);
catch (IOException ex) {
				..warn("Could not query objectstore: "ex);
			}
			// Stage 3
			// Look for crashed AtomicActions that had a
			// CommitMarkableResourceRecord
			// and see if it is in the list from stage 1, will include all
			// records
			// moved in stage 2
			if (..isDebugEnabled()) {
				..debug("processing " + 
" transactions");
			}
			try {
				ObjectStoreIterator transactionUidEnum = new ObjectStoreIterator(
				Uid currentUid = transactionUidEnum.iterate();
				while (Uid.nullUid().notEquals(currentUid)) {
					// Retrieve the transaction status from its
					// original
					// process.
									currentUid))) {
	                            currentUid);
	                    if (state != null) {
    	                    // Try to load it is a BasicAction that has a
    						// ConnectedResourceRecord
    								currentUidstate);
    						// Check if it did have a ConnectedResourceRecord
    							String commitMarkableResourceJndiName = rcaa
    							// If it did, check if the resource manager was
    							// online
    									.contains(commitMarkableResourceJndiName)) {
    								// If the resource manager wasn't online, move
    								// it
    								moveRecord(currentUid,
    							} else {
    								// Update the completed outcome for the 1PC
    								// resource
                                    rcaa.updateCommitMarkableResourceRecord(.get(rcaa.getXid()) != null);
                                    // Swap the type to avoid the rest of recovery round processing this TX as it already called getNewXAResourceRecord
                                    moveRecord(currentUid,
                                            );
                                    
    							}
    						}
	                    }
					}
					currentUid = transactionUidEnum.iterate();
				}
catch (ObjectStoreException ex) {
				..warn("Could not query objectstore: "ex);
catch (IOException ex) {
				..warn("Could not query objectstore: "ex);
			}
catch (IllegalStateException e) {
			// Thrown when AS is shutting down and we attempt a lookup
					"Could not lookup datasource, AS is shutting down: "
e.getMessage(), e);
		}
         = false;
	}
	public synchronized void periodicWorkSecondPass() {
This is the list of AtomicActions that were prepared but not completed.
		Set<UidpreparedAtomicActions = new HashSet<Uid>();
		try {
			// Refresh our list of all the indoubt atomic actions
				preparedAtomicActions.addAll(convertToList(aa_uids));
				// Refresh our list of all the indoubt connectable atomic
				// actions
						aa_uids)) {
					preparedAtomicActions.addAll(convertToList(aa_uids));
					// Iterate the list that we were able to contact
					while (jndiNames.hasNext()) {
						String jndiName = jndiNames.next();
						List<XidtoDelete = new ArrayList<Xid>();
								.get(jndiName);
						if (map != null) {
							for (Map.Entry<XidUidentry : map.entrySet()) {
								Xid next = entry.getKey();
								Uid uid = entry.getValue();
								if (!preparedAtomicActions.contains(uid)) {
									toDelete.add(next);
								}
							}
						}
						delete(jndiNametoDelete);
					}
else {
							.warn("Could not read data from object store");
				}
else {
						.warn("Could not read "
" from object store");
			}
catch (ObjectStoreException e) {
			..warn("Could not read " + 
" from object store"e);
		}
	}

Can only be called after the first phase has executed

Parameters:
xid
Returns:
	public synchronized boolean wasCommitted(String jndiNameXid xid)
		if (!.contains(jndiName) || .get(xid) == null) {
		}
            throw new ObjectStoreException(jndiName + " was not online");
        }
        String committed = .get(xid);
		if (..isTraceEnabled()) {
			..trace("wasCommitted" + xid + " " + committed);
		}
		return committed != null;
	}
	private List<UidconvertToList(InputObjectState aa_uids) {
		List<Uiduids = new ArrayList<Uid>();
		boolean moreUids = true;
		while (moreUids) {
			Uid theUid = null;
			try {
				theUid = UidHelper.unpackFrom(aa_uids);
				if (theUid.equals(Uid.nullUid())) {
					moreUids = false;
else {
					Uid newUid = new Uid(theUid);
					if (..isDebugEnabled()) {
						..debug("found transaction " + newUid);
					}
					uids.add(newUid);
				}
catch (IOException ex) {
				moreUids = false;
			}
		}
		return uids;
	}
	private boolean isTransactionInMidFlight(int status) {
		boolean inFlight = false;
		switch (status) {
		// these states can only come from a process that is still alive
			inFlight = true;
			break;
		// the transaction is apparently still there, but has completed its
		// phase2. should be safe to redo it.
			inFlight = false;
			break;
		// this shouldn't happen
		default:
			inFlight = false;
		}
		return inFlight;
	}
	private void moveRecord(Uid uidString fromString to)
		RecoveryStore recoveryStore = StoreManager.getRecoveryStore();
		InputObjectState state = recoveryStore.read_committed(uidfrom);
		if (state != null) {
			if (!recoveryStore.write_committed(uidtonew OutputObjectState(
					state))) {
				..error("Could not move an: " + to + " uid: "
uid);
else if (!recoveryStore.remove_committed(uidfrom)) {
				..error("Could not remove a: " + from + " uid: "
uid);
			}
else {
					.error("Could not read an: " + from + " uid: " + uid);
		}
	}
	private void delete(String jndiNameList<XidcompletedXids) {
		int batchSize = 
						jndiName);
		if (integer != null) {
			batchSize = integer;
		}
		try {
			while (completedXids.size() > 0) {
				int sendingSize = batchSize < 0 ? completedXids.size()
completedXids.size() < batchSize ? completedXids
								.size() : batchSize;
				StringBuffer buffer = new StringBuffer();
				for (int i = 0; i < sendingSizei++) {
					buffer.append("?,");
				}
				if (buffer.length() > 0) {
					Connection connection = null;
					DataSource dataSource = (DataSource
							.lookup(jndiName);
					try {
						connection = dataSource.getConnection();
						connection.setAutoCommit(false);
								.get(jndiName);
						if (tableName == null) {
							tableName = ;
						}
						String sql = "DELETE from " + tableName
" where xid in ("
buffer.substring(0, buffer.length() - 1)
")";
						if (..isTraceEnabled()) {
									.trace("Attempting to delete number of entries: "
buffer.length());
						}
						PreparedStatement prepareStatement = connection
						List<Xiddeleted = new ArrayList<Xid>();
						try {
							for (int i = 0; i < sendingSizei++) {
								XidImple xid = (XidImplecompletedXids
										.remove(0);
								deleted.add(xid);
								XID toSave = xid.getXID();
										baos);
								dos.writeInt(toSave.formatID);
								dos.writeInt(toSave.gtrid_length);
								dos.writeInt(toSave.bqual_length);
								dos.writeInt(toSave.data.length);
								dos.write(toSave.data);
								dos.flush();
								prepareStatement.setBytes(i + 1,
										baos.toByteArray());
							}
							int executeUpdate = prepareStatement
							if (executeUpdate != sendingSize) {
										.error("Update was not successful, expected: "
sendingSize
" actual:"
executeUpdate);
								connection.rollback();
else {
								connection.commit();
								Iterator<Xiditerator = deleted.iterator();
								while (iterator.hasNext()) {
									XidImple xid = (XidImpleiterator.next();
								}
							}
catch (IOException e) {
									.warn("Could not generate prepareStatement paramaters",
											e);
finally {
							try {
								prepareStatement.close();
catch (SQLException e) {
										.warn("Could not close the prepared statement",
												e);
							}
						}
catch (SQLException e) {
						..warn("Could not handle the connection",
								e);
						// the connection is unavailable so try again later
						break;
finally {
						if (connection != null) {
							try {
								connection.close();
catch (SQLException e) {
										"Could not close the connection"e);
							}
						}
					}
				}
			}
catch (NamingException e) {
					.warn("Could not lookup commitMarkable: " + jndiName);
			..debug("Could not lookup commitMarkable: "
jndiNamee);
catch (IllegalStateException e) {
			// Thrown when AS is shutting down and we attempt a lookup
					"Could not lookup datasource, AS is shutting down: "
e.getMessage(), e);
		}
	}
New to GrepCode? Check out our FAQ X