Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  package org.infinispan.client.hotrod.impl.transport.tcp;
  
  import java.util.HashSet;
  import java.util.Map;
 import java.util.Set;
 
 
 

Author(s):
Mircea.Markus@jboss.com
Since:
4.1
 
 public class TcpTransportFactory implements TransportFactory {
 
    private static final Log log = LogFactory.getLog(TcpTransportFactory.classLog.class);

   
 
    private final Object lock = new Object();
    // The connection pool implementation is assumed to be thread-safe, so we need to synchronize just the access to this field and not the method calls
    // Per cache request balancing strategy
    private Map<byte[], FailoverRequestBalancingStrategybalancers;
    private Configuration configuration;
    // the primitive fields are often accessed separately from the rest so it makes sense not to require synchronization for them
    private volatile boolean tcpNoDelay;
    private volatile boolean tcpKeepAlive;
    private volatile int soTimeout;
    private volatile int connectTimeout;
    private volatile int maxRetries;
    private volatile SSLContext sslContext;
    private volatile ClientListenerNotifier listenerNotifier;
    @GuardedBy("lock")
    private volatile TopologyInfo topologyInfo;
 
    @Override
    public void start(Codec codecConfiguration configurationAtomicInteger defaultCacheTopologyIdClientListenerNotifier listenerNotifier) {
       synchronized () {
          this. = listenerNotifier;
          this. = configuration;
          boolean pingOnStartup = configuration.pingOnStartup();
          Collection<SocketAddressservers = new ArrayList<>();
           = new ArrayList<>();
          for(ServerConfiguration server : configuration.servers()) {
             servers.add(new InetSocketAddress(server.host(), server.port()));
          }
          .addAll(servers);
           = new TopologyInfo(defaultCacheTopologyId, Collections.unmodifiableCollection(servers), configuration);
           = configuration.tcpNoDelay();
           = configuration.tcpKeepAlive();
           = configuration.socketTimeout();
           = configuration.connectionTimeout();
           = configuration.maxRetries();
          if (configuration.security().ssl().enabled()) {
             SslConfiguration ssl = configuration.security().ssl();
             if (ssl.sslContext() != null) {
                 = ssl.sslContext();
             } else {
                 = SslContextFactory.getContext(ssl.keyStoreFileName(), ssl.keyStorePassword(), ssl.trustStoreFileName(), ssl.trustStorePassword());
             }
          }
         if (.isDebugEnabled()) {
            .debugf("Statically configured servers: %s"servers);
            .debugf("Load balancer class: %s"configuration.balancingStrategy().getName());
            .debugf("Tcp no delay = %b; client socket timeout = %d ms; connect timeout = %d ms",
                       );
         }
         TransportObjectFactory connectionFactory;
         if (configuration.security().authentication().enabled()) {
            connectionFactory = new SaslTransportObjectFactory(codecthisdefaultCacheTopologyIdpingOnStartupconfiguration.security().authentication());
         } else {
            connectionFactory = new TransportObjectFactory(codecthisdefaultCacheTopologyIdpingOnStartup);
         }
         PropsKeyedObjectPoolFactory<SocketAddressTcpTransportpoolFactory =
               new PropsKeyedObjectPoolFactory<SocketAddressTcpTransport>(
                     connectionFactory,
                     configuration.connectionPool());
         createAndPreparePool(poolFactory);
          = CollectionFactory.makeMap(., AnyEquivalence.getInstance());
         addBalancer(RemoteCacheManager.cacheNameBytes());
      }
      if (configuration.pingOnStartup())
         pingServers();
   }
   private FailoverRequestBalancingStrategy addBalancer(byte[] cacheName) {
      FailoverRequestBalancingStrategy balancer =
         (cfgBalancer instanceof FailoverRequestBalancingStrategy)
            ? (FailoverRequestBalancingStrategycfgBalancer
            : new FailoverToRequestBalancingStrategyDelegate(cfgBalancer);
      .put(cacheNamebalancer);
      balancer.setServers(.getServers());
      return balancer;
   }
   private void pingServers() {
      Collection<SocketAddressservers = .getServers();
      for (SocketAddress addr : servers) {
         try {
            // Go through all statically configured nodes and force a
            // connection to be established and a ping message to be sent.
            pool.returnObject(addrpool.borrowObject(addr));
         } catch (Exception e) {
            // Ping's objective is to retrieve a potentially newer
            // version of the Hot Rod cluster topology, so ignore
            // exceptions from nodes that might not be up any more.
            if (.isTraceEnabled())
               .tracef(e"Ignoring exception pinging configured servers %s to establish a connection",
                     servers);
         }
      }
   }

   
This will makes sure that, when the evictor thread kicks in the minIdle is set. We don't want to do this is the caller's thread, as this is the user.
            poolFactory.createPool();
      Collection<SocketAddressservers = .getServers();
      for (SocketAddress addrservers) {
         .preparePool(addrfalse);
      }
   }
   public void destroy() {
      synchronized () {
         .clear();
         try {
            .close();
         } catch (Exception e) {
            .warn("Exception while shutting down the connection pool."e);
         }
      }
   }
   public CacheTopologyInfo getCacheTopologyInfo(byte[] cacheName) {
      synchronized () {
         return .getCacheTopologyInfo(cacheName);
      }
   }
   public void updateHashFunction(Map<SocketAddressSet<Integer>> servers2Hashint numKeyOwnersshort hashFunctionVersionint hashSpacebyte[] cacheName) {
      synchronized () {
         .updateTopology(servers2HashnumKeyOwnershashFunctionVersionhashSpacecacheName);
      }
   }
   public void updateHashFunction(SocketAddress[][] segmentOwnersint numSegmentsshort hashFunctionVersionbyte[] cacheName) {
      synchronized () {
         .updateTopology(segmentOwnersnumSegmentshashFunctionVersioncacheName);
      }
   }
   public Transport getTransport(Set<SocketAddressfailedServersbyte[] cacheName) {
      SocketAddress server;
      synchronized () {
         server = getNextServer(failedServerscacheName);
      }
      return borrowTransportFromPool(server);
   }
   // To be called from within `lock` synchronized block
   private SocketAddress getNextServer(Set<SocketAddressfailedServersbyte[] cacheName) {
      SocketAddress server = balancer.nextServer(failedServers);
      if (.isTraceEnabled())
         .tracef("Using the balancer for determining the server: %s"server);
      return server;
   }
      FailoverRequestBalancingStrategy balancer = .get(cacheName);
      if (balancer == null)
         balancer = addBalancer(cacheName);
      return balancer;
   }
      return borrowTransportFromPool(server);
   }
   public Transport getTransport(byte[] keySet<SocketAddressfailedServersbyte[] cacheName) {
      SocketAddress server;
      synchronized () {
         Optional<SocketAddresshashAwareServer = .getHashAwareServer(keycacheName);
         server = hashAwareServer.orElse(getNextServer(failedServerscacheName));
      }
      return borrowTransportFromPool(server);
   }
   public void releaseTransport(Transport transport) {
      // The invalidateObject()/returnObject() calls could take a long time, so we hold the lock only until we get the connection pool reference
      TcpTransport tcpTransport = (TcpTransporttransport;
      if (!tcpTransport.isValid()) {
         try {
            if (.isTraceEnabled()) {
               .tracef("Dropping connection as it is no longer valid: %s"tcpTransport);
            }
            pool.invalidateObject(tcpTransport.getServerAddress(), tcpTransport);
         } catch (Exception e) {
            .couldNoInvalidateConnection(tcpTransporte);
         }
      } else {
         try {
            pool.returnObject(tcpTransport.getServerAddress(), tcpTransport);
         } catch (Exception e) {
            .couldNotReleaseConnection(tcpTransporte);
         } finally {
            logConnectionInfo(tcpTransport.getServerAddress());
         }
      }
   }
   public void invalidateTransport(SocketAddress serverAddressTransport transport) {
      try {
         // Transport could be null, in which case all connections
         // to the server address will be invalidated
         pool.invalidateObject(serverAddress, (TcpTransporttransport);
      } catch (Exception e) {
         .unableToInvalidateTransport(serverAddress);
      }
   }
   public void updateServers(Collection<SocketAddressnewServersbyte[] cacheNameboolean quiet) {
      synchronized () {
         Collection<SocketAddressservers = .getServers();
         Set<SocketAddressaddedServers = new HashSet<>(newServers);
         addedServers.removeAll(servers);
         Set<SocketAddressfailedServers = new HashSet<>(servers);
         failedServers.removeAll(newServers);
         if (.isTraceEnabled()) {
            .tracef("Current list: %s"servers);
            .tracef("New list: %s"newServers);
            .tracef("Added servers: %s"addedServers);
            .tracef("Removed servers: %s"failedServers);
         }
         if (failedServers.isEmpty() && addedServers.isEmpty()) {
            .debug("Same list of servers, not changing the pool");
            return;
         }
         //1. first add new servers. For servers that went down, the returned transport will fail for now
         for (SocketAddress server : addedServers) {
            .newServerAdded(server);
            try {
               .addObject(server);
            } catch (Exception e) {
               if (!quiet.failedAddingNewServer(servere);
            }
         }
         //2. Remove failed servers
         for (SocketAddress server : failedServers) {
            .removingServer(server);
            .clear(server);
         }
         servers = Collections.unmodifiableList(new ArrayList(newServers));
         .updateServers(servers);
         if (!failedServers.isEmpty()) {
            .failoverClientListeners(failedServers);
         }
         FailoverRequestBalancingStrategy balancer = getOrCreateIfAbsentBalancer(cacheName);
         balancer.setServers(servers);
      }
   }
      synchronized () {
         return .getServers();
      }
   }
   private void logConnectionInfo(SocketAddress server) {
      if (.isTraceEnabled()) {
         .tracef("For server %s: active = %d; idle = %d",
               serverpool.getNumActive(server), pool.getNumIdle(server));
      }
   }
      // The borrowObject() call could take a long time, so we hold the lock only until we get the connection pool reference
      try {
         return pool.borrowObject(server);
      } catch (Exception e) {
         String message = "Could not fetch transport";
         .debug(messagee);
         throw new TransportException(messageeserver);
      } finally {
         logConnectionInfo(server);
      }
   }

   
Note that the returned ConsistentHash may not be thread-safe.
   public ConsistentHash getConsistentHash(byte[] cacheName) {
      synchronized () {
         return .getConsistentHash(cacheName);
      }
   }
   }
   public boolean isTcpNoDelay() {
      return ;
   }
   public boolean isTcpKeepAlive() {
      return ;
   }
   public int getMaxRetries() {
      if (Thread.currentThread().isInterrupted()) {
         return -1;
      }
      return ;
   }
   public int getSoTimeout() {
      return ;
   }
   public int getConnectTimeout() {
      return ;
   }
   public SSLContext getSSLContext() {
      return ;
   }
   public void reset(byte[] cacheName) {
      updateServers(cacheNametrue);
   }

   
Note that the returned RequestBalancingStrategy may not be thread-safe.
   public RequestBalancingStrategy getBalancer(byte[] cacheName) {
      synchronized () {
         return .get(cacheName);
      }
   }
      synchronized () {
         return ;
      }
   }
         this. = delegate;
      }
      @Override
      public void setServers(Collection<SocketAddressservers) {
         .setServers(servers);
      }
      @Override
      public SocketAddress nextServer() {
         return .nextServer();
      }
      @Override
      public SocketAddress nextServer(Set<SocketAddressfailedServers) {
         return .nextServer();
      }
   }
New to GrepCode? Check out our FAQ X