001package com.github.sarxos.webcam;
002
003import java.util.concurrent.ExecutorService;
004import java.util.concurrent.Executors;
005import java.util.concurrent.RejectedExecutionException;
006import java.util.concurrent.SynchronousQueue;
007import java.util.concurrent.ThreadFactory;
008import java.util.concurrent.atomic.AtomicBoolean;
009import java.util.concurrent.atomic.AtomicInteger;
010
011
012public class WebcamProcessor {
013
014        /**
015         * Thread factory for processor.
016         * 
017         * @author Bartosz Firyn (SarXos)
018         */
019        private static final class ProcessorThreadFactory implements ThreadFactory {
020
021                private static final AtomicInteger N = new AtomicInteger(0);
022
023                @Override
024                public Thread newThread(Runnable r) {
025                        Thread t = new Thread(r, String.format("atomic-processor-%d", N.incrementAndGet()));
026                        t.setUncaughtExceptionHandler(WebcamExceptionHandler.getInstance());
027                        t.setDaemon(true);
028                        return t;
029                }
030        }
031
032        /**
033         * Heart of overall processing system. This class process all native calls
034         * wrapped in tasks, by doing this all tasks executions are
035         * super-synchronized.
036         * 
037         * @author Bartosz Firyn (SarXos)
038         */
039        private static final class AtomicProcessor implements Runnable {
040
041                private SynchronousQueue<WebcamTask> inbound = new SynchronousQueue<WebcamTask>(true);
042                private SynchronousQueue<WebcamTask> outbound = new SynchronousQueue<WebcamTask>(true);
043
044                /**
045                 * Process task.
046                 * 
047                 * @param task the task to be processed
048                 * @return Processed task
049                 * @throws InterruptedException when thread has been interrupted
050                 */
051                public void process(WebcamTask task) throws InterruptedException {
052
053                        inbound.put(task);
054
055                        Throwable t = outbound.take().getThrowable();
056                        if (t != null) {
057                                throw new WebcamException("Cannot execute task", t);
058                        }
059                }
060
061                @Override
062                public void run() {
063                        while (true) {
064                                WebcamTask t = null;
065                                try {
066                                        (t = inbound.take()).handle();
067                                } catch (InterruptedException e) {
068                                        break;
069                                } catch (Throwable e) {
070                                        if (t != null) {
071                                                t.setThrowable(e);
072                                        }
073                                } finally {
074                                        if (t != null) {
075                                                try {
076                                                        outbound.put(t);
077                                                } catch (InterruptedException e) {
078                                                        break;
079                                                } catch (Exception e) {
080                                                        throw new RuntimeException("Cannot put task into outbound queue", e);
081                                                }
082                                        }
083                                }
084                        }
085                }
086        }
087
088        /**
089         * Is processor started?
090         */
091        private static final AtomicBoolean started = new AtomicBoolean(false);
092
093        /**
094         * Execution service.
095         */
096        private static final ExecutorService runner = Executors.newSingleThreadExecutor(new ProcessorThreadFactory());
097
098        /**
099         * Static processor.
100         */
101        private static final AtomicProcessor processor = new AtomicProcessor();
102
103        /**
104         * Singleton instance.
105         */
106        private static final WebcamProcessor INSTANCE = new WebcamProcessor();;
107
108        private WebcamProcessor() {
109        }
110
111        /**
112         * Process single webcam task.
113         * 
114         * @param task the task to be processed
115         * @throws InterruptedException when thread has been interrupted
116         */
117        public void process(WebcamTask task) throws InterruptedException {
118                if (started.compareAndSet(false, true)) {
119                        runner.execute(processor);
120                }
121                if (!runner.isShutdown()) {
122                        processor.process(task);
123                } else {
124                        throw new RejectedExecutionException("Cannot process because processor runner has been already shut down");
125                }
126        }
127
128        public static synchronized WebcamProcessor getInstance() {
129                return INSTANCE;
130        }
131}