Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   ***** BEGIN LICENSE BLOCK *****
   * Version: CPL 1.0/GPL 2.0/LGPL 2.1
   *
   * The contents of this file are subject to the Common Public
   * License Version 1.0 (the "License"); you may not use this file
   * except in compliance with the License. You may obtain a copy of
   * the License at http://www.eclipse.org/legal/cpl-v10.html
   *
  * Software distributed under the License is distributed on an "AS
  * IS" basis, WITHOUT WARRANTY OF ANY KIND, either express or
  * implied. See the License for the specific language governing
  * rights and limitations under the License.
  * 
  * Alternatively, the contents of this file may be used under the terms of
  * either of the GNU General Public License Version 2 or later (the "GPL"),
  * or the GNU Lesser General Public License Version 2.1 or later (the "LGPL"),
  * in which case the provisions of the GPL or the LGPL are applicable instead
  * of those above. If you wish to allow use of your version of this file only
  * under the terms of either the GPL or the LGPL, and not to allow others to
  * use your version of this file under the terms of the CPL, indicate your
  * decision by deleting the provisions above and replace them with the notice
  * and other provisions required by the GPL or the LGPL. If you do not delete
  * the provisions above, a recipient may use your version of this file under
  * the terms of any one of the CPL, the GPL or the LGPL.
  ***** END LICENSE BLOCK *****/
 package org.jruby.util.io;
 
 import org.jruby.Ruby;
 
 import java.util.*;
This is a reimplementation of MRI's IO#select logic. It has been rewritten from an earlier version in JRuby to improve performance and readability. This version avoids allocating a selector or any data structures to hold data about the channels/IOs being selected unless absolutely necessary. It also uses simple boolean arrays to track characteristics like whether an IO is pending or unselectable, rather than maintaining Set structures. It avoids hitting Java Integration code to get IO objects out of the incoming Array. Finally, it tries to build a minimal number of data structures an reuse them as much as possible.
 
 public class SelectBlob {
     public IRubyObject goForIt(ThreadContext contextRuby runtimeIRubyObject[] args) {
         this. = runtime;
         try {
             processReads(runtimeargscontext);
             processWrites(runtimeargscontext);
             if (args.length > 2 && !args[2].isNil()) {
                 checkArrayType(runtimeargs[2]);
                 // Java's select doesn't do anything about this, so we leave it be.
             }
             boolean has_timeout = args.length > 3 && !args[3].isNil();
             long timeout = !has_timeout ? 0 : getTimeoutFromArg(args[3], runtime);
             
             if (timeout < 0) {
                 throw runtime.newArgumentError("time interval must be positive");
             }
             
             // If all streams are nil, just sleep the specified time (JRUBY-4699)
             if (args[0].isNil() && args[1].isNil() && args[2].isNil()) {
                 if (timeout > 0) {
                     RubyThread thread = context.getThread();
                     long now = System.currentTimeMillis();
                     thread.sleep(timeout);
                     // Guard against spurious wakeup
                     while (System.currentTimeMillis() < now + timeout) {
                         thread.sleep(1);
                     }
 
                 }
             } else {
                 doSelect(runtimehas_timeouttimeout);
                 processSelectedKeys(runtime);
                 processPendingAndUnselectable();
                 tidyUp();
             }
             
             if ( == null &&  == null &&  == null) {
                 return runtime.getNil();
             }
             return constructResults(runtime);
         } catch (BadDescriptorException e) {
            throw runtime.newErrnoEBADFError();
        } catch (IOException e) {
            throw runtime.newIOErrorFromException(e);
        } catch (InterruptedException ie) {
            throw runtime.newThreadError("select interrupted");
        } finally {
            for (Selector selector : .values()) {
                try {
                    selector.close();
                } catch (Exception e) {
                }
            }
        }
    }
    private void processReads(Ruby runtimeIRubyObject[] argsThreadContext contextthrows BadDescriptorExceptionIOException {
        if (!args[0].isNil()) {
            // read
            checkArrayType(runtimeargs[0]);
             = (RubyArrayargs[0];
             = .size();
            if ( == 0) {
                // clear reference; we aren't going to do anything
                 = null;
            } else {
                 = new RubyIO[];
                Map<Character,Integerattachment = new HashMap<Character,Integer>(1);
                for (int i = 0; i < i++) {
                    RubyIO ioObj = saveReadIO(icontext);
                    saveReadBlocking(ioObji);
                    saveBufferedRead(ioObji);                    
                    attachment.clear();
                    attachment.put('r'i);
                    trySelectRead(contextattachmentioObj);
                }
            }
        }
    }
    private RubyIO saveReadIO(int iThreadContext context) {
        IRubyObject obj = .eltOk(i);
        RubyIO ioObj = RubyIO.convertToIO(contextobj);
        [i] = ioObj;
        return ioObj;
    }
    private void saveReadBlocking(RubyIO ioObjint i) {
        // save blocking state
        if (ioObj.getChannel() instanceof SelectableChannel) {
            getReadBlocking()[i] = ((SelectableChannelioObj.getChannel()).isBlocking();
        }
    }
    private void saveBufferedRead(RubyIO ioObjint ithrows BadDescriptorException {
        // already buffered data? don't bother selecting
        if (ioObj.getOpenFile().getMainStreamSafe().readDataBuffered()) {
            getUnselectableReads()[i] = true;
        }
    }
    private void trySelectRead(ThreadContext contextMap<Character,IntegerattachmentRubyIO ioObjthrows IOException {
        if (ioObj.getChannel() instanceof SelectableChannel && registerSelect(contextgetSelector(context, (SelectableChannel)ioObj.getChannel()), attachmentioObj. | .)) {
            ++;
            if (ioObj.writeDataBuffered()) {
                getPendingReads()[(Integer)attachment.get('r')] = true;
            }
        } else {
            if ((ioObj.getOpenFile().getMode() & .) != 0) {
                getUnselectableReads()[(Integer)attachment.get('r')] = true;
            }
        }
    }
    private void processWrites(Ruby runtimeIRubyObject[] argsThreadContext contextthrows IOException {
        if (args.length > 1 && !args[1].isNil()) {
            // write
            checkArrayType(runtimeargs[1]);
             = (RubyArrayargs[1];
             = .size();
            if (.size() == 0) {
                // clear reference; we aren't going to do anything
                 = null;
            } else {
                 = new RubyIO[];
                Map<Character,Integerattachment = new HashMap<Character,Integer>(1);
                for (int i = 0; i < i++) {
                    RubyIO ioObj = saveWriteIO(icontext);
                    saveWriteBlocking(ioObji);
                    attachment.clear();                    
                    attachment.put('w'i);
                    trySelectWrite(contextattachmentioObj);
                }
            }
        }
    }
    private RubyIO saveWriteIO(int iThreadContext context) {
        IRubyObject obj = .eltOk(i);
        RubyIO ioObj = RubyIO.convertToIO(contextobj);
        [i] = ioObj;
        return ioObj;
    }
    private void saveWriteBlocking(RubyIO ioObjint i) {
        if (ioObj.getChannel() instanceof SelectableChannel) {
            // save blocking state
            if ( != null) {
                // some read has saved blocking state
                // find obj
                int readIndex = fastSearch(ioObj);
                if (readIndex == -1) {
                    // save blocking only if not found
                    getWriteBlocking()[i] = ((SelectableChannelioObj.getChannel()).isBlocking();
                }
            } else {
                getWriteBlocking()[i] = ((SelectableChannelioObj.getChannel()).isBlocking();
            }
        }
    }
    private void trySelectWrite(ThreadContext contextMap<Character,IntegerattachmentRubyIO ioObjthrows IOException {
        if (!(ioObj.getChannel() instanceof SelectableChannel)
                || !registerSelect(contextgetSelector(context, (SelectableChannel)ioObj.getChannel()), attachmentioObj. | .)) {
            ++;
            if ((ioObj.getOpenFile().getMode() & .) != 0) {
                getUnselectableWrites()[(Integer)attachment.get('w')] = true;
            }
        }
    }
    private static long getTimeoutFromArg(IRubyObject timeArgRuby runtime) {
        long timeout = 0;
        if (timeArg instanceof RubyFloat) {
            timeout = Math.round(((RubyFloattimeArg).getDoubleValue() * 1000);
        } else if (timeArg instanceof RubyFixnum) {
            timeout = Math.round(((RubyFixnumtimeArg).getDoubleValue() * 1000);
        } else {
            // TODO: MRI also can hadle Bignum here
            throw runtime.newTypeError("can't convert " + timeArg.getMetaClass().getName() + " into time interval");
        }
        if (timeout < 0) {
            throw runtime.newArgumentError("negative timeout given");
        }
        return timeout;
    }
    private void doSelect(Ruby runtimefinal boolean has_timeoutlong timeoutthrows IOException {
        if ( != null) {
            if ( == null &&  == null &&  == null) {
                if (has_timeout && timeout == 0) {
                    for (Selector selector : .values()) selector.selectNow();
                } else {
                    List<Futurefutures = new ArrayList<Future>(.size());
                    for (ENXIOSelector enxioSelector : ) {
                        futures.add(runtime.getExecutor().submit(enxioSelector));
                    }
                    .select(has_timeout ? timeout : 0);
                    for (ENXIOSelector enxioSelector : enxioSelector.selector.wakeup();
                    // ensure all the enxio threads have finished
                    for (Future f : futurestry {
                        f.get();
                    } catch (InterruptedException iex) {
                    } catch (ExecutionException eex) {
                        if (eex.getCause() instanceof IOException) {
                            throw (IOExceptioneex.getCause();
                        }
                    }
                }
            } else {
                for (Selector selector : .values()) selector.selectNow();
            }
        }
        // If any enxio selectors woke up, remove them from the selected key set of the main selector
        for (ENXIOSelector enxioSelector : ) {
            Pipe.SourceChannel source = enxioSelector.pipe.source();
            SelectionKey key = source.keyFor();
            if (key != null && .selectedKeys().contains(key)) {
                .selectedKeys().remove(key);
                ByteBuffer buf = ByteBuffer.allocate(1);
                source.read(buf);
            }
        }
    }
    @SuppressWarnings("unchecked")
    private void processSelectedKeys(Ruby runtimethrows IOException {
        for (Selector selector : .values()) {
            for (Iterator i = selector.selectedKeys().iterator(); i.hasNext();) {
                SelectionKey key = (SelectionKeyi.next();
                int readIoIndex = 0;
                int writeIoIndex = 0;
                try {
                    int interestAndReady = key.interestOps() & key.readyOps();
                    if ( != null && (interestAndReady & (. | .)) != 0) {
                        readIoIndex = ((Map<Character,Integer>)key.attachment()).get('r');
                        getReadResults().append(.eltOk(readIoIndex));
                        if ( != null) {
                            [readIoIndex] = false;
                        }
                    }
                    if ( != null && (interestAndReady & (. | .)) != 0) {
                        writeIoIndex = ((Map<Character,Integer>)key.attachment()).get('w');
                        getWriteResults().append(.eltOk(writeIoIndex));
                        // not-great logic for JRUBY-5165; we should move finishConnect into RubySocket logic, I think
                        if (key.channel() instanceof SocketChannel) {
                            SocketChannel socketChannel = (SocketChannel)key.channel();
                            if (socketChannel.isConnectionPending()) {
                                socketChannel.finishConnect();
                            }
                        }
                    }
                } catch (CancelledKeyException cke) {
                    // TODO: is this the right thing to do?
                    int interest = key.interestOps();
                    if ( != null && (interest & (. | . | .)) != 0) {
                        if ( != null) {
                            [readIoIndex] = false;
                        }
                        if ( != null) {
                             = RubyArray.newArray(runtime.size() + .size());
                        }
                        if (fastSearch(.toJavaArrayUnsafe(), [readIoIndex]) == -1) {
                            // only add to error if not there
                            getErrorResults().append(.eltOk(readIoIndex));
                        }
                    }
                    if ( != null && (interest & (.)) != 0) {
                        if (fastSearch(.toJavaArrayUnsafe(), [writeIoIndex]) == -1) {
                            // only add to error if not there
                            .append(.eltOk(writeIoIndex));
                        }
                    }
                }
            }
        }
    }
    private void processPendingAndUnselectable() {
        if ( != null) {
            for (int i = 0; i < .i++) {
                if ([i]) {
                    getReadResults().append(.eltOk(i));
                }
            }
        }
        if ( != null) {
            for (int i = 0; i < .i++) {
                if ([i]) {
                    getReadResults().append(.eltOk(i));
                }
            }
        }
        if ( != null) {
            for (int i = 0; i < .i++) {
                if ([i]) {
                    getWriteResults().append(.eltOk(i));
                }
            }
        }
    }
    private void tidyUp() throws IOException {
        // make all sockets blocking as configured again
        for (Selector selector : .values()) {
            selector.close(); // close unregisters all channels, so we can safely reset blocking modes
        }
        for (ENXIOSelector enxioSelector : ) {
            enxioSelector.pipe.sink().close();
            enxioSelector.pipe.source().close();
        }
        if ( != null) {
            for (int i = 0; i < .i++) {
                if ([i] != null) {
                    try {
                        ((SelectableChannel[i].getChannel()).configureBlocking([i]);
                    } catch (IllegalBlockingModeException ibme) {
                        throw .newConcurrencyError("can not set IO blocking after select; concurrent select detected?");
                    }
                }
            }
        }
        if ( != null) {
            for (int i = 0; i < .i++) {
                if ([i] != null) {
                    try {
                        ((SelectableChannel[i].getChannel()).configureBlocking([i]);
                    } catch (IllegalBlockingModeException ibme) {
                        throw .newConcurrencyError("can not set IO blocking after select; concurrent select detected?");
                    }
                }
            }
        }
    }
    private RubyArray getReadResults() {
        if ( == null) {
             = RubyArray.newArray(.size());
        }
        return ;
    }
    private RubyArray getWriteResults() {
        if ( == null) {
             = RubyArray.newArray(.size());
        }
        return ;
    }
    private RubyArray getErrorResults() {
        if ( != null) {
             = RubyArray.newArray(.size() + .size());
        }
        return ;
    }
    private Selector getSelector(ThreadContext contextSelectableChannel channelthrows IOException {
        Selector selector = .get(channel.provider());
        if (selector == null) {
            selector = SelectorFactory.openWithRetryFrom(context.runtimechannel.provider());
            if (.isEmpty()) {
                 = new HashMap<SelectorProviderSelector>();
            }
            .put(channel.provider(), selector);
            if (!selector.provider().equals(SelectorProvider.provider())) {
                // need to create pipe between alt impl selector and native NIO selector
                Pipe pipe = Pipe.open();
                ENXIOSelector enxioSelector = new ENXIOSelector(selectorpipe);
                if (.isEmpty())  = new ArrayList<ENXIOSelector>();
                .add(enxioSelector);
                pipe.source().configureBlocking(false);
                pipe.source().register(getSelector(contextpipe.source()), .enxioSelector);
            } else if ( == null) {
                 = selector;
            }
        }
        return selector;
    }
    private Boolean[] getReadBlocking() {
        if ( == null) {
             = new Boolean[];
        }
        return ;
    }
    private Boolean[] getWriteBlocking() {
        if ( == null) {
             = new Boolean[];
        }
        return ;
    }
    private boolean[] getUnselectableReads() {
        if ( == null) {
             = new boolean[];
        }
        return ;
    }
    private boolean[] getUnselectableWrites() {
        if ( == null) {
             = new boolean[];
        }
        return ;
    }
    private boolean[] getPendingReads() {
        if ( == null) {
             = new boolean[];
        }
        return ;
    }
    private IRubyObject constructResults(Ruby runtime) {
        return RubyArray.newArrayLight(
                runtime,
                 == null ? RubyArray.newEmptyArray(runtime) : ,
                 == null ? RubyArray.newEmptyArray(runtime) : ,
                 == null ? RubyArray.newEmptyArray(runtime) : );
    }
    private int fastSearch(Object[] aryObject obj) {
        for (int i = 0; i < ary.lengthi++) {
            if (ary[i] == obj) {
                return i;
            }
        }
        return -1;
    }
    private static void checkArrayType(Ruby runtimeIRubyObject obj) {
        if (!(obj instanceof RubyArray)) {
            throw runtime.newTypeError("wrong argument type "
                    + obj.getMetaClass().getName() + " (expected Array)");
        }
    }
    @SuppressWarnings("unchecked")
    private static boolean registerSelect(ThreadContext contextSelector selectorMap<Character,IntegerobjRubyIO ioObjint opsthrows IOException {
        Channel channel = ioObj.getChannel();
        if (channel == null || !(channel instanceof SelectableChannel)) {
            return false;
        }
        ((SelectableChannelchannel).configureBlocking(false);
        int real_ops = ((SelectableChannelchannel).validOps() & ops;
        SelectionKey key = ((SelectableChannelchannel).keyFor(selector);
        if (key == null) {
            Map<Character,Integer>  attachment = new HashMap<Character,Integer> (1);
            attachment.putAll(obj);
            ((SelectableChannelchannel).register(selectorreal_opsattachment );
        } else {
            key.interestOps(key.interestOps() | real_ops);
            Map<Character,Integeratt = (Map<Character,Integer>)key.attachment();
            att.putAll(obj);
            key.attach(att);
        }
        return true;
    }
    private static final class ENXIOSelector implements Callable<Object> {
        private final Selector selector;
        private final Pipe pipe;
        private ENXIOSelector(Selector selectorPipe pipe) {
            this. = selector;
            this. = pipe;
        }
        public Object call() throws Exception {
            try {
                .select();
            } finally {
                ByteBuffer buf = ByteBuffer.allocate(1);
                buf.put((byte) 0);
                buf.flip();
                .sink().write(buf);
            }
            return null;
        }
    }
    
    Ruby runtime;
    RubyArray readArray = null;
    int readSize = 0;
    RubyIO[] readIOs = null;
    boolean[] unselectableReads = null;
    boolean[] pendingReads = null;
    Boolean[] readBlocking = null;
    int selectedReads = 0;
    RubyArray writeArray = null;
    int writeSize = 0;
    RubyIO[] writeIOs = null;
    boolean[] unselectableWrites = null;
    Boolean[] writeBlocking = null;
    int selectedWrites = 0;
    Selector mainSelector = null;
    Map<SelectorProviderSelectorselectors = Collections.emptyMap();
    RubyArray readResults = null;
    RubyArray writeResults = null;
    RubyArray errorResults = null;
New to GrepCode? Check out our FAQ X