Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * $Id: SftpMessageReceiver.java 25282 2013-02-16 14:43:58Z pablo.lagreca $
   * --------------------------------------------------------------------------------------
   * Copyright (c) MuleSoft, Inc.  All rights reserved.  http://www.mulesoft.com
   *
   * The software in this package is published under the terms of the CPAL v1.0
   * license, a copy of which has been included with this distribution in the
   * LICENSE.txt file.
   */
 
 package org.mule.transport.sftp;
 
 
SftpMessageReceiver polls and receives files from an sftp service using jsch. This receiver produces an InputStream payload, which can be materialized in a MessageDispatcher or Component.
 
 {
 
     private SftpReceiverRequesterUtil sftpRRUtil = null;
     private LockFactory lockFactory;
     private boolean poolOnPrimaryInstanceOnly;
 
     public SftpMessageReceiver(SftpConnector connector,
                                FlowConstruct flow,
                                InboundEndpoint endpoint,
                                long frequencythrows CreateException
     {
         super(connectorflowendpoint);
 
         this.setFrequency(frequency);
 
          = new SftpReceiverRequesterUtil(endpoint);
     }
 
     public SftpMessageReceiver(SftpConnector connectorFlowConstruct flowInboundEndpoint endpointthrows CreateException
     {
         this(connectorflowendpoint);
     }
 
     public void poll() throws Exception
     {
         if (.isDebugEnabled())
         {
             .debug("Polling. Called at endpoint " + .getEndpointURI());
         }
         try
         {
             String[] files = .getAvailableFiles(false);
 
             if (files.length == 0)
             {
                 if (.isDebugEnabled())
                 {
                     .debug("Polling. No matching files found at endpoint " + .getEndpointURI());
                 }
             }
             else
             {
                 if (.isDebugEnabled())
                 {
                     .debug("Polling. " + files.length + " files found at " + .getEndpointURI()
                                  + ":" + Arrays.toString(files));
                 }
                 for (String file : files)
                 {
                     if (getLifecycleState().isStopping())
                     {
                         break;
                     }
                     Lock fileLock = .createLock(.getName() + file);
                     if (fileLock.tryLock(10, .))
                     {
                         try
                         {
                             routeFile(file);
                         }
                         catch (Exception e)
                        {
                            fileLock.unlock();
                        }
                    }
                }
                if (.isDebugEnabled())
                {
                    .debug("Polling. Routed all " + files.length + " files found at "
                                 + .getEndpointURI());
                }
            }
        }
        catch (MessagingException e)
        {
            //Already handled by TransactionTemplate
        }
        catch (Exception e)
        {
            .error("Error in poll"e);
            throw e;
        }
    }
    @Override
    protected void doInitialise() throws InitialisationException
    {
        boolean synchronousProcessing = false;
        if (getFlowConstruct() instanceof Flow)
        {
            synchronousProcessing = ((Flow)getFlowConstruct()).getProcessingStrategy() instanceof SynchronousProcessingStrategy;
        }
        this. = Boolean.valueOf(System.getProperty("mule.transport.sftp.singlepollinstance","false")) || !synchronousProcessing;
    }
    @Override
    protected boolean pollOnPrimaryInstanceOnly()
    {
        return ;
    }
    protected void routeFile(final String paththrows Exception
    {
        ExecutionTemplate<MuleEventexecutionTemplate = createExecutionTemplate();
        executionTemplate.execute(new ExecutionCallback<MuleEvent>()
        {
            @Override
            public MuleEvent process() throws Exception
            {
                // A bit tricky initialization of the notifier in this case since we don't
                // have access to the message yet...
                SftpNotifier notifier = new SftpNotifier((SftpConnectorcreateNullMuleMessage(),
                        .getName());
                InputStream inputStream = .retrieveFile(pathnotifier);
                if (.isDebugEnabled())
                {
                    .debug("Routing file: " + path);
                }
                MuleMessage message = createMuleMessage(inputStream);
                message.setOutboundProperty(.path);
                message.setOutboundProperty(.path);
                // Now we have access to the message, update the notifier with the message
                notifier.setMessage(message);
                routeMessage(message);
                if (.isDebugEnabled())
                {
                    .debug("Routed file: " + path);
                }
                return null;
            }
        });
    }

    
SFTP-35
    @Override 
    protected MuleMessage handleUnacceptedFilter(MuleMessage message) {
        .debug("the filter said no, now trying to close the payload stream");
        try {
            final SftpInputStream payload = (SftpInputStreammessage.getPayload();
            payload.close();
        }
        catch (Exception e) {
            .debug("unable to close payload stream"e);
        }
        return super.handleUnacceptedFilter(message);
    }
    public void doConnect() throws Exception
    {
        // no op
    }
    public void doDisconnect() throws Exception
    {
        // no op
    }
    protected void doDispose()
    {
        // no op
    }
New to GrepCode? Check out our FAQ X