Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * $Id: SftpConnector.java 23692 2012-01-21 12:50:42Z pablo.kraan $
   * --------------------------------------------------------------------------------------
   * Copyright (c) MuleSoft, Inc.  All rights reserved.  http://www.mulesoft.com
   *
   * The software in this package is published under the terms of the CPAL v1.0
   * license, a copy of which has been included with this distribution in the
   * LICENSE.txt file.
   */
 
 package org.mule.transport.sftp;
 
 
 import java.util.Map;
 
SftpConnector sends and receives file messages over sftp using jsch library Improves on SFTP with VFS Connector in the following ways: 1. Streams files instead of reading them into memory. The SftpMessageReceiver is a "non-materializing stream receiver" which does not read the file to memory. The SftpMessageDispatcher also never materializes the stream and delegates the jsch library for materialization. 3. Uses jsch library directly instead of using VFS as middle-man. 3. More explicit connection lifefecyle management. 4. Leverages sftp stat to determine if a file size changes (simpler and also less memory intensive)
 
 public class SftpConnector extends AbstractConnector
 {
 
     public static final String PROPERTY_POLLING_FREQUENCY = "pollingFrequency";
     public static final String PROPERTY_DIRECTORY = "directory";
     public static final String PROPERTY_OUTPUT_PATTERN = "outputPattern";
     public static final String PROPERTY_FILENAME = "filename";
     public static final String PROPERTY_ORIGINAL_FILENAME = "originalFilename";
     public static final String PROPERTY_SELECT_EXPRESSION = "selectExpression";
     public static final String PROPERTY_FILE_EXTENSION = "fileExtension";
     public static final String PROPERTY_INCLUDE_SUBFOLDERS = "includeSubfolders";
     public static final String PROPERTY_IDENTITY_FILE = "identityFile";
     public static final String PROPERTY_PASS_PHRASE = "passphrase";
     public static final String PROPERTY_FILE_AGE = "fileAge";
     public static final String PROPERTY_TEMP_DIR = "tempDir";
     public static final String PROPERTY_SIZE_CHECK_WAIT_TIME = "sizeCheckWaitTime";
     public static final String PROPERTY_ARCHIVE_DIR = "archiveDir";
     public static final String PROPERTY_ARCHIVE_TEMP_RECEIVING_DIR = "archiveTempReceivingDir";
     public static final String PROPERTY_ARCHIVE_TEMP_SENDING_DIR = "archiveTempSendingDir";
     public static final String PROPERTY_DUPLICATE_HANDLING = "duplicateHandling";
     public static final String PROPERTY_USE_TEMP_FILE_TIMESTAMP_SUFFIX = "useTempFileTimestampSuffix";
     public static final String PROPERTY_DUPLICATE_HANDLING_THROW_EXCEPTION = "throwException";
     public static final String PROPERTY_DUPLICATE_HANDLING_OVERWRITE = "overwrite";
     public static final String PROPERTY_DUPLICATE_HANDLING_ASS_SEQ_NO = "addSeqNo";
     public static final String PROPERTY_MAX_CONNECTION_POOL_SIZE = "maxConnectionPoolSize";
     public static final String PROPERTY_KEEP_FILE_ON_ERROR = "keepFileOnError";
 
     public static final int DEFAULT_POLLING_FREQUENCY = 1000;

    
logger used by this class
 
     protected final static Log logger = LogFactory.getLog(SftpConnector.class);
 
 
     private long pollingFrequency;
     private boolean autoDelete = true;
     private String outputPattern;
 
     private String identityFile;
     private String passphrase;
 
     private boolean checkFileAge = false;
     private long fileAge = 0;
 
     private String tempDirInbound = null;
     private String tempDirOutbound = null;
 
 
     private String duplicateHandling = null;
     private Boolean useTempFileTimestampSuffix = null;
     private Long sizeCheckWaitTime = null;
    private String archiveDir = "";
    private String archiveTempReceivingDir = "";
    private String archiveTempSendingDir = "";

    
Should the file be kept if an error occurs when writing the file on the outbound endpoint?
    private Boolean keepFileOnError;

    
max pool size. 0 for no pool, -1 for no limit, otherwise the specified value
    private int maxConnectionPoolSize;

    
Value that can be set via the System property 'mule.sftp.transport.maxConnectionPoolSize'. If it's set the value is used instead of maxConnectionPoolSize
    private static final Integer overrideMaxConnectionPoolSize;
    static
    {
        String propValue = System.getProperty("mule.sftp.transport.maxConnectionPoolSize");
        if (propValue != null)
        {
            .info("Will override the maxConnectionPoolSize to " + propValue
                        + " from the system property 'mule.sftp.transport.maxConnectionPoolSize'.");
             = Integer.parseInt(propValue);
        }
        else
        {
             = null;
        }
    }
    public SftpConnector(MuleContext context)
    {
        super(context);
         = new ExpressionFilenameParser();
    }
    public String getProtocol()
    {
        return "sftp";
    }
    @Override
    public MessageReceiver createReceiver(FlowConstruct flowInboundEndpoint endpointthrows Exception
    {
        long polling = ;
        // Override properties on the endpoint for the specific endpoint
        String tempPolling = (Stringendpoint.getProperty();
        if (tempPolling != null)
        {
            polling = Long.parseLong(tempPolling);
        }
        if (polling <= 0)
        {
            polling = ;
        }
        if (.isDebugEnabled())
        {
            .debug("Set polling frequency to: " + polling);
        }
        return .createMessageReceiver(thisflowendpointnew Object[]{polling});
    }
    public SftpClient createSftpClient(ImmutableEndpoint endpointthrows Exception
    {
        return createSftpClient(endpointnull);
    }
    public SftpClient createSftpClient(ImmutableEndpoint endpointSftpNotifier notifierthrows Exception
    {
        SftpClient client = null;
        boolean ok = false;
        try
        {
            if (useConnectionPool())
            {
                ObjectPool pool = getClientPool(endpoint);
                client = (SftpClientpool.borrowObject();
            }
            else
            {
                client = SftpConnectionFactory.createClient(endpoint);
            }
            // We have to set the working directory before returning
            String dir = endpoint.getEndpointURI().getPath();
            client.changeWorkingDirectory(dir);
            if (.isDebugEnabled())
            {
                .debug("Successfully changed working directory to: " + dir);
            }
            // TODO ML: Is this always necessary?
            client.setNotifier(notifier);
            ok = true;
        }
        finally
        {
            // Release the client if it was created but something failed after that,
            // otherwise we start to waste ssh-processes...
            if (!ok && client != null)
            {
                releaseClient(endpointclient);
            }
        }
        return client;
    }

    

Returns:
True if connection pooling is used, otherwise false
    public boolean useConnectionPool()
    {
        return getMaxConnectionPoolSize() != 0;
    }
    public void releaseClient(ImmutableEndpoint endpointSftpClient clientthrows Exception
    {
        if (useConnectionPool())
        {
            if (getDispatcherFactory().isCreateDispatcherPerRequest())
            {
                destroyClient(endpointclient);
            }
            else
            {
                if (client != null && client.isConnected())
                {
                    ObjectPool pool = getClientPool(endpoint);
                    if (.isDebugEnabled())
                    {
                        .debug("Releasing connection for endpoint " + endpoint.getEndpointURI());
                    }
                    pool.returnObject(client);
                }
            }
        }
        else
        {
            client.disconnect();
        }
    }
    public void destroyClient(ImmutableEndpoint endpointSftpClient clientthrows Exception
    {
        if (useConnectionPool())
        {
            if ((client != null) && (client.isConnected()))
            {
                ObjectPool pool = getClientPool(endpoint);
                pool.invalidateObject(client);
            }
        }
    }
    protected synchronized ObjectPool getClientPool(ImmutableEndpoint endpoint)
    {
        GenericObjectPool pool = .get(endpoint.getEndpointURI());
        if (pool == null)
        {
            if (.isDebugEnabled())
            {
                .debug("Pool is null - creating one for endpoint " + endpoint.getEndpointURI()
                             + " with max size " + getMaxConnectionPoolSize());
            }
            pool = new GenericObjectPool(new SftpConnectionFactory(endpoint), getMaxConnectionPoolSize());
            pool.setTestOnBorrow(isValidateConnections());
            .put(endpoint.getEndpointURI(), pool);
        }
        else
        {
            if (.isDebugEnabled())
            {
                .debug("Using existing pool for endpoint " + endpoint.getEndpointURI() + ". Active: "
                             + pool.getNumActive() + ", Idle:" + pool.getNumIdle());
            }
        }
        return pool;
    }
    /*
     * (non-Javadoc)
     * @see org.mule.transport.AbstractConnector#doConnect()
     */
    protected void doConnect() throws Exception
    {
        // Do nothing!
    }
    /*
     * (non-Javadoc)
     * @see org.mule.transport.AbstractConnector#doDisconnect()
     */
    protected void doDisconnect() throws Exception
    {
        // Do nothing!
    }
    /*
     * (non-Javadoc)
     * @see org.mule.transport.AbstractConnector#doDispose()
     */
    protected void doDispose()
    {
        // Do nothing!
    }
    /*
     * (non-Javadoc)
     * @see org.mule.transport.AbstractConnector#doInitialise()
     */
    protected void doInitialise() throws InitialisationException
    {
        if ( != null)
        {
            .setMuleContext();
        }
    }
    /*
     * (non-Javadoc)
     * @see org.mule.transport.AbstractConnector#doStart()
     */
    protected void doStart() throws MuleException
    {
        // Do nothing!
    }
    /*
     * (non-Javadoc)
     * @see org.mule.transport.AbstractConnector#doStop()
     */
    protected void doStop() throws MuleException
    {
        if (.isDebugEnabled())
        {
            .debug("Stopping all pools");
        }
        try
        {
            for (ObjectPool pool : .values())
            {
                pool.close();
            }
        }
        catch (Exception e)
        {
            throw new ConnectorException(CoreMessages.failedToStop("SFTP Connector"), thise);
        }
        finally
        {
            .clear();
        }
    }
    public long getPollingFrequency()
    {
        return ;
    }
    public void setPollingFrequency(long pollingFrequency)
    {
        this. = pollingFrequency;
    }
    {
        return ;
    }
    public void setFilenameParser(FilenameParser filenameParser)
    {
        this. = filenameParser;
        if (filenameParser != null)
        {
            filenameParser.setMuleContext();
        }
    }
    public String getOutputPattern()
    {
        return ;
    }
    public void setOutputPattern(String outputPattern)
    {
        this. = outputPattern;
    }
    public boolean isAutoDelete()
    {
        return ;
    }
    public void setAutoDelete(boolean autoDelete)
    {
        this. = autoDelete;
    }
    public String getIdentityFile()
    {
        return ;
    }
    public void setIdentityFile(String identityFile)
    {
        this. = identityFile;
    }
    public String getPassphrase()
    {
        return ;
    }
    public void setPassphrase(String passphrase)
    {
        this. = passphrase;
    }

    
Returns the file age.

Returns:
Returns the fileAge in milliseconds.
    public long getFileAge()
    {
        return ;
    }

    
Sets the file age.

Parameters:
fileAge the fileAge in milliseconds to set.
    public void setFileAge(long fileAge)
    {
        this. = fileAge;
        this. = true;
    }
    public boolean getCheckFileAge()
    {
        return ;
    }
    public String getTempDirInbound()
    {
        return ;
    }
    public void setTempDirInbound(String pTempDirInbound)
    {
         = pTempDirInbound;
    }
    public String getTempDirOutbound()
    {
        return ;
    }
    public void setTempDirOutbound(String pTempDirOutbound)
    {
         = pTempDirOutbound;
    }
    // Need this method to be public for SftpNotifier
    @Override
    public boolean isEnableMessageEvents()
    {
        return super.isEnableMessageEvents();
    }
    public void setDuplicateHandling(String duplicateHandling)
    {
        this. = duplicateHandling;
    }
    public String getDuplicateHandling()
    {
        return ;
    }
    public void setUseTempFileTimestampSuffix(Boolean useTempFileTimestampSuffix)
    {
        this. = useTempFileTimestampSuffix;
    }
    {
        return ;
    }
    public void setSizeCheckWaitTime(Long sizeCheckWaitTime)
    {
        this. = sizeCheckWaitTime;
    }
    public Long getSizeCheckWaitTime()
    {
        return ;
    }
    public void setArchiveDir(String archiveDir)
    {
        this. = archiveDir;
    }
    public String getArchiveDir()
    {
        return ;
    }
    public void setArchiveTempReceivingDir(String archiveTempReceivingDir)
    {
        this. = archiveTempReceivingDir;
    }
    {
        return ;
    }
    public void setArchiveTempSendingDir(String archiveTempSendingDir)
    {
        this. = archiveTempSendingDir;
    }
    {
        return ;
    }

    

See also:
SftpConnector.maxConnectionPoolSize
    public void setMaxConnectionPoolSize(int maxConnectionPoolSize)
    {
        this. = maxConnectionPoolSize;
    }

    

Returns:
the max connection pool size. If the system parameter mule.sftp.transport.maxConnectionPoolSize is set, that value will be used instead.
    public int getMaxConnectionPoolSize()
    {
        if ( != null)
        {
            return ;
        }
        return ;
    }
    public Boolean isKeepFileOnError()
    {
        return ;
    }
    public void setKeepFileOnError(Boolean pKeepFileOnError)
    {
         = pKeepFileOnError;
    }
New to GrepCode? Check out our FAQ X