Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   **** BEGIN LICENSE BLOCK *****
   * Version: CPL 1.0/GPL 2.0/LGPL 2.1
   *
   * The contents of this file are subject to the Common Public
   * License Version 1.0 (the "License"); you may not use this file
   * except in compliance with the License. You may obtain a copy of
   * the License at http://www.eclipse.org/legal/cpl-v10.html
   *
  * Software distributed under the License is distributed on an "AS
  * IS" basis, WITHOUT WARRANTY OF ANY KIND, either express or
  * implied. See the License for the specific language governing
  * rights and limitations under the License.
  *
  * Copyright (C) 2008 The JRuby Community <www.jruby.org>
  * 
  * Alternatively, the contents of this file may be used under the terms of
  * either of the GNU General Public License Version 2 or later (the "GPL"),
  * or the GNU Lesser General Public License Version 2.1 or later (the "LGPL"),
  * in which case the provisions of the GPL or the LGPL are applicable instead
  * of those above. If you wish to allow use of your version of this file only
  * under the terms of either the GPL or the LGPL, and not to allow others to
  * use your version of this file under the terms of the CPL, indicate your
  * decision by deleting the provisions above and replace them with the notice
  * and other provisions required by the GPL or the LGPL. If you do not delete
  * the provisions above, a recipient may use your version of this file under
  * the terms of any one of the CPL, the GPL or the LGPL.
  ***** END LICENSE BLOCK *****/
 
 package org.jruby.util.io;
 
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
A Utility class to emulate blocking I/O operations on non-blocking channels.
 
 public class BlockingIO {
     public static final class Condition {
         private final IOChannel channel;
         Condition(IOChannel channel) {
             this. = channel;
         }
         public void cancel() {
             .wakeup(false);
         }
         public void interrupt() {
             .interrupt();
         }
         public boolean await() throws InterruptedException {
             return .await();
         }
         public boolean await(long timeoutTimeUnit unitthrows InterruptedException {
             return .await(timeoutunit);
         }
     }
     static final class IOChannel {
         final SelectableChannel channel;
         final int ops;        
         private final Object monitor;
         private boolean woken = false;
         private boolean ready = false;
         private boolean interrupted = false;
         
         IOChannel(SelectableChannel channelint opsObject monitor) {
             this. = channel;
             this. = ops;
             this. = monitor;
         }
         public final void wakeup(boolean ready) {
             synchronized () {
                 this. = true;
                 this. = ready;
                 .notifyAll();
             }
         }
         public final void interrupt() {
             synchronized () {
                 this. = true;
                 this. = true;
                 .notifyAll();
             }
         }
        public final boolean await() throws InterruptedException {
            return await(0, .);
        }
        public final boolean await(final long timeoutTimeUnit unitthrows InterruptedException {
            synchronized () {
                if (!) {
                    .wait(..convert(timeoutunit));
                }
                if () {
                    throw new InterruptedException("Interrupted");
                }
                return ;
            }
        }
    }
    static final class IOSelector implements Runnable {
        private final Selector selector;
        private final ConcurrentLinkedQueue<IOChannelregistrationQueue;
        public IOSelector(SelectorProvider providerthrows IOException {
             = SelectorFactory.openWithRetryFrom(nullprovider);
             = new ConcurrentLinkedQueue<IOChannel>();
        }
        public void run() {
            for ( ; ; ) {
                try {
                    //
                    // Wake up any channels that became unblocked
                    //
                    Set<SelectionKeyselected = new HashSet<SelectionKey>(.selectedKeys());
                    for (SelectionKey k : selected) {
                        List<IOChannelwaitq = (List<IOChannel>) k.attachment();
                        for (IOChannel ch : waitq) {
                            ch.wakeup(true);
                        }
                        waitq.clear();
                    }
                    //
                    // Register any new blocking I/O requests
                    //
                    IOChannel ch;
                    Set<SelectableChanneladded = new HashSet<SelectableChannel>();
                    while ((ch = .poll()) != null) {
                        SelectionKey k = ch.channel.keyFor();
                        List<IOChannelwaitq = k == null
                                ? new LinkedList<IOChannel>()
                                : (List<IOChannel>) k.attachment();
                        ch.channel.register(ch.opswaitq);
                        waitq.add(ch);
                        added.add(ch.channel);
                    }
                    // Now clear out any previously selected channels
                    for (SelectionKey k : selected) {
                        if (!added.contains(k.channel())) {
                            k.cancel();
                        }
                    }
                    //
                    // Wait for I/O on any channel
                    //
                    .select();
                } catch (IOException ex) {
                }
            }
        }
        Condition add(Channel channelint opsObject monitor) {
            IOChannel io = new IOChannel((SelectableChannelchannelopsmonitor);
            .add(io);
            .wakeup();
            return new Condition(io);
        }
        public void await(Channel channelint opthrows InterruptedException {
            add(channelopnew Object()).await();
        }
    }
    static final private Map<SelectorProviderIOSelectorselectors
            = new ConcurrentHashMap<SelectorProviderIOSelector>();
    private static IOSelector getSelector(SelectorProvider providerthrows IOException {
        IOSelector sel = .get(provider);
        if (sel != null) {
            return sel;
        }
        //
        // Synchronize and re-check to avoid creating more than one Selector per provider
        //
        synchronized (provider) {
            sel = .get(provider);
            if (sel == null) {
                sel = new IOSelector(provider);
                .put(providersel);
                Thread t = new Thread(sel);
                t.setDaemon(true);
                t.start();
            }
        }
        return sel;
    }
    private static IOSelector getSelector(Channel channelthrows IOException {
        if (!(channel instanceof SelectableChannel)) {
            throw new IllegalArgumentException("channel must be a SelectableChannel");
        }        
        return getSelector(((SelectableChannelchannel).provider());
    }
    public static final Condition newCondition(Channel channelint opsObject monitorthrows IOException {
        return getSelector(channel).add(channelopsmonitor);
    }
    public static final Condition newCondition(Channel channelint opsthrows IOException {
        return newCondition(channelopsnew Object());
    }
    public static void waitForIO(Channel channelint opthrows InterruptedExceptionIOException {
        getSelector(channel).await(channelop);
    }
    public static void awaitReadable(ReadableByteChannel channelthrows InterruptedExceptionIOException {
        waitForIO(channel.);
    }
    public static void awaitWritable(WritableByteChannel channelthrows InterruptedExceptionIOException {
        waitForIO(channel.);
    }
    public static int read(ReadableByteChannel channelByteBuffer bufboolean blockingthrows IOException {
        do {
            int n = channel.read(buf);
            if (n != 0 || !blocking || !(channel instanceof SelectableChannel) || !buf.hasRemaining()) {
                return n;
            }
            try {
                awaitReadable(channel);
            } catch (InterruptedException ex) {
                throw new InterruptedIOException(ex.getMessage());
            }
        } while (true);
    }
    public static int write(WritableByteChannel channelByteBuffer bufboolean blockingthrows IOException {
        do {
            int n = channel.write(buf);
            if (n != 0 || !blocking || !(channel instanceof SelectableChannel) || !buf.hasRemaining()) {
                return n;
            }
            try {
                awaitWritable(channel);
            } catch (InterruptedException ex) {
                throw new InterruptedIOException(ex.getMessage());
            }
        } while (true);
    }
    public static int blockingRead(ReadableByteChannel channelByteBuffer bufthrows IOException {
        return read(channelbuftrue);
    }
    public static int blockingWrite(WritableByteChannel channelByteBuffer bufthrows IOException {
        return write(channelbuftrue);
    }
New to GrepCode? Check out our FAQ X