001    package com.github.sarxos.webcam;
002    
003    import java.util.concurrent.ExecutorService;
004    import java.util.concurrent.Executors;
005    import java.util.concurrent.SynchronousQueue;
006    import java.util.concurrent.ThreadFactory;
007    import java.util.concurrent.atomic.AtomicBoolean;
008    
009    
010    public class WebcamProcessor {
011    
012            /**
013             * Thread factory for processor.
014             * 
015             * @author Bartosz Firyn (SarXos)
016             */
017            private static final class ProcessorThreadFactory implements ThreadFactory {
018    
019                    @Override
020                    public Thread newThread(Runnable r) {
021                            Thread t = new Thread(r, "atomic-processor");
022                            t.setDaemon(true);
023                            return t;
024                    }
025            }
026    
027            /**
028             * Heart of overall processing system. This class process all native calls
029             * wrapped in tasks, by doing this all tasks executions are
030             * super-synchronized.
031             * 
032             * @author Bartosz Firyn (SarXos)
033             */
034            private static final class AtomicProcessor implements Runnable {
035    
036                    private SynchronousQueue<WebcamTask> inbound = new SynchronousQueue<WebcamTask>(true);
037                    private SynchronousQueue<WebcamTask> outbound = new SynchronousQueue<WebcamTask>(true);
038    
039                    /**
040                     * Process task.
041                     * 
042                     * @param task the task to be processed
043                     * @return Processed task
044                     * @throws InterruptedException when thread has been interrupted
045                     */
046                    public void process(WebcamTask task) throws InterruptedException {
047    
048                            inbound.put(task);
049    
050                            Throwable t = outbound.take().getThrowable();
051                            if (t != null) {
052                                    throw new WebcamException("Cannot execute task", t);
053                            }
054                    }
055    
056                    @Override
057                    public void run() {
058                            while (true) {
059                                    WebcamTask t = null;
060                                    try {
061                                            (t = inbound.take()).handle();
062                                    } catch (InterruptedException e) {
063                                            break;
064                                    } catch (Throwable e) {
065                                            t.setThrowable(e);
066                                    } finally {
067                                            try {
068                                                    if (t != null) {
069                                                            outbound.put(t);
070                                                    }
071                                            } catch (InterruptedException e) {
072                                                    break;
073                                            }
074                                    }
075                            }
076                    }
077            }
078    
079            /**
080             * Is processor started?
081             */
082            private static final AtomicBoolean started = new AtomicBoolean(false);
083    
084            /**
085             * Execution service.
086             */
087            private static final ExecutorService runner = Executors.newSingleThreadExecutor(new ProcessorThreadFactory());
088    
089            /**
090             * Static processor.
091             */
092            private static final AtomicProcessor processor = new AtomicProcessor();
093    
094            /**
095             * Singleton instance.
096             */
097            private static WebcamProcessor instance = null;
098    
099            private WebcamProcessor() {
100            }
101    
102            /**
103             * Process single webcam task.
104             * 
105             * @param task the task to be processed
106             * @throws InterruptedException when thread has been interrupted
107             */
108            public void process(WebcamTask task) throws InterruptedException {
109                    if (started.compareAndSet(false, true)) {
110                            runner.execute(processor);
111                    }
112                    processor.process(task);
113            }
114    
115            public static synchronized WebcamProcessor getInstance() {
116                    if (instance == null) {
117                            instance = new WebcamProcessor();
118                    }
119                    return instance;
120            }
121    }