Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  package org.jruby.ext.fiber;
  
  import java.util.Map;
  import org.jruby.Ruby;
 
 public class ThreadFiber extends RubyObject implements ExecutionContext {
     public ThreadFiber(Ruby runtimeRubyClass klass) {
         super(runtimeklass);
     }
     
     public static void initRootFiber(ThreadContext context) {
         Ruby runtime = context.runtime;
         
         ThreadFiber rootFiber = new ThreadFiber(runtimeruntime.getClass("Fiber")); // FIXME: getFiber()
         
         assert runtime.getClass("SizedQueue") != null : "SizedQueue has not been loaded";
         rootFiber.data = new FiberData(new SizedQueue(runtimeruntime.getClass("SizedQueue")), nullrootFiber);
         rootFiber.thread = context.getThread();
         context.setRootFiber(rootFiber);
     }
     
     @JRubyMethod
     public IRubyObject initialize(ThreadContext contextBlock block) {
         Ruby runtime = context.runtime;
         
         if (!block.isGiven()) throw runtime.newArgumentError("tried to create Proc object without block");
         
          = new FiberData(new SizedQueue(runtimeruntime.getClass("SizedQueue")), context.getFiberCurrentThread(), this);
         
         FiberData currentFiberData = context.getFiber().;
         
          = createThread(runtimecurrentFiberData.queueblock);
         
         return context.nil;
     }
     
     @JRubyMethod(rest = true)
     public IRubyObject resume(ThreadContext contextIRubyObject[] values) {
         Ruby runtime = context.runtime;
         
         if (. != null || .throw runtime.newFiberError("double resume");
         
         if (!alive()) throw runtime.newFiberError("dead fiber called");
         
         FiberData currentFiberData = context.getFiber().;
         
         if (this. == currentFiberData) {
             switch (values.length) {
                 case 0: return context.nil;
                 case 1: return values[0];
                 defaultreturn runtime.newArrayNoCopyLight(values);
             }
         }
         
         IRubyObject val;
         switch (values.length) {
             case 0: val = break;
             case 1: val = values[0]; break;
             defaultval = runtime.newArrayNoCopyLight(values);
         }
         
         if (. != context.getFiberCurrentThread()) throw runtime.newFiberError("fiber called across threads");
         
         . = context.getFiber();
         
         try {
             ..push(contextval);
             IRubyObject result = currentFiberData.queue.pop(context);
             if (result == result = context.nil;
             return result;
         } finally {
             . = null;
         }
     }
     
     @JRubyMethod(rest = true)
     public IRubyObject __transfer__(ThreadContext contextIRubyObject[] values) {
         Ruby runtime = context.runtime;
         
         if (. != nullthrow runtime.newFiberError("double resume");
         
         if (!alive()) throw runtime.newFiberError("dead fiber called");
         
         FiberData currentFiberData = context.getFiber().;
        
        if (this. == currentFiberData) {
            switch (values.length) {
                case 0: return context.nil;
                case 1: return values[0];
                defaultreturn runtime.newArrayNoCopyLight(values);
            }
        }
        
        IRubyObject val;
        switch (values.length) {
            case 0: val = break;
            case 1: val = values[0]; break;
            defaultval = runtime.newArrayNoCopyLight(values);
        }
        
        if (. != context.getFiberCurrentThread()) throw runtime.newFiberError("fiber called across threads");
        
        if (currentFiberData.prev != null) {
            // new fiber should answer to current prev and this fiber is marked as transferred
            . = currentFiberData.prev;
            currentFiberData.prev = null;
            currentFiberData.transferred = true;
        } else {
            . = context.getFiber();
        }
        
        try {
            ..push(contextval);
            
            IRubyObject result = currentFiberData.queue.pop(context);
            if (result == result = context.nil;
            return result;
        } finally {
            . = null;
            currentFiberData.transferred = false;
        }
    }
    
    @JRubyMethod(meta = true)
    public static IRubyObject yield(ThreadContext contextIRubyObject recv) {
        return yield(contextrecvcontext.nil);
    }
    
    @JRubyMethod(meta = true)
    public static IRubyObject yield(ThreadContext contextIRubyObject recvIRubyObject value) {
        Ruby runtime = context.runtime;
        
        FiberData currentFiberData = context.getFiber().;
        
        if (currentFiberData.parent == nullthrow runtime.newFiberError("can't yield from root fiber");
        
        FiberData prevFiberData = currentFiberData.prev.data;
        
        prevFiberData.queue.push(contextvalue);
        
        IRubyObject result = currentFiberData.queue.pop(context);
        if (result == result = context.nil;
        return result;
    }
    
    public IRubyObject __alive__(ThreadContext context) {
        return context.runtime.newBoolean( != null && .isAlive());
    }
    
    @JRubyMethod(meta = true)
    public static IRubyObject __current__(ThreadContext contextIRubyObject recv) {
        return context.getFiber();
    }
    @Override
    public Map<ObjectIRubyObjectgetContextVariables() {
        return .getContextVariables();
    }
    
    boolean alive() {
        return  != null && .isAlive();
    }
    
    static RubyThread createThread(final Ruby runtimefinal FiberData datafinal SizedQueue queuefinal Block block) {
        final AtomicReference<RubyThreadfiberThread = new AtomicReference();
        Thread thread = new Thread() {
            public void run() {
                ThreadContext context = runtime.getCurrentContext();
                context.setFiber(data.fiber.get());
                context.setRootThread(data.parent);
                fiberThread.set(context.getThread());
                
                IRubyObject init = data.queue.pop(context);
                
                try {
                    IRubyObject result;
                    
                    if (init == ) {
                        result = block.yieldSpecific(context);
                    } else {
                        result = block.yieldArray(contextinitnullnull);
                    }
                    
                    data.prev.data.queue.push(contextresult);
                } catch (JumpException.FlowControlException fce) {
                    if (data.prev != null) {
                        data.prev.thread.raise(fce.buildException(runtime).getException());
                    }
                } catch (RaiseException re) {
                    if (data.prev != null) {
                        data.prev.thread.raise(re.getException());
                    }
                } catch (Throwable t) {
                    if (data.prev != null) {
                        data.prev.thread.raise(JavaUtil.convertJavaToUsableRubyObject(runtimet));
                    }
                } finally {
                    data.queue.shutdown();
                }
            }
        };
        thread.setDaemon(true);
        thread.setName("FiberThread#" + data.fiber.get().id());
        thread.start();
        
        while (fiberThread.get() == null) {}
        
        return fiberThread.get();
    }
    
    protected void finalize() throws Throwable {
        try {
            FiberData data = this.;
            if (data != null) {
                // we never interrupt or shutdown root fibers
                if (data.parent == nullreturn;
                
                data.queue.shutdown();
            }
            RubyThread thread = this.;
            if (thread != null) {
                thread.dieFromFinalizer();
                // interrupt Ruby thread to break out of queue sleep, blocking IO
                thread.interrupt();
                // null out references to aid GC
                data = null;
                thread = null;
            }
        } finally {
            super.finalize();
        }
    }
    
    private static class FiberData {
        FiberData(SizedQueue queueRubyThread parentThreadFiber fiber) {
            this. = queue;
            this. = parent;
            this. = new WeakReference<ThreadFiber>(fiber);
        }
        
        final SizedQueue queue;
        volatile ThreadFiber prev;
        final RubyThread parent;
        final WeakReference<ThreadFiberfiber;
        volatile boolean transferred;
    }
    
    volatile FiberData data;
    volatile RubyThread thread;
New to GrepCode? Check out our FAQ X