001package com.github.sarxos.webcam; 002 003import java.util.concurrent.ExecutorService; 004import java.util.concurrent.Executors; 005import java.util.concurrent.RejectedExecutionException; 006import java.util.concurrent.SynchronousQueue; 007import java.util.concurrent.ThreadFactory; 008import java.util.concurrent.atomic.AtomicBoolean; 009import java.util.concurrent.atomic.AtomicInteger; 010 011 012public class WebcamProcessor { 013 014 /** 015 * Thread factory for processor. 016 * 017 * @author Bartosz Firyn (SarXos) 018 */ 019 private static final class ProcessorThreadFactory implements ThreadFactory { 020 021 private static final AtomicInteger N = new AtomicInteger(0); 022 023 @Override 024 public Thread newThread(Runnable r) { 025 Thread t = new Thread(r, String.format("atomic-processor-%d", N.incrementAndGet())); 026 t.setUncaughtExceptionHandler(WebcamExceptionHandler.getInstance()); 027 t.setDaemon(true); 028 return t; 029 } 030 } 031 032 /** 033 * Heart of overall processing system. This class process all native calls 034 * wrapped in tasks, by doing this all tasks executions are 035 * super-synchronized. 036 * 037 * @author Bartosz Firyn (SarXos) 038 */ 039 private static final class AtomicProcessor implements Runnable { 040 041 private SynchronousQueue<WebcamTask> inbound = new SynchronousQueue<WebcamTask>(true); 042 private SynchronousQueue<WebcamTask> outbound = new SynchronousQueue<WebcamTask>(true); 043 044 /** 045 * Process task. 046 * 047 * @param task the task to be processed 048 * @return Processed task 049 * @throws InterruptedException when thread has been interrupted 050 */ 051 public void process(WebcamTask task) throws InterruptedException { 052 053 inbound.put(task); 054 055 Throwable t = outbound.take().getThrowable(); 056 if (t != null) { 057 throw new WebcamException("Cannot execute task", t); 058 } 059 } 060 061 @Override 062 public void run() { 063 while (true) { 064 WebcamTask t = null; 065 try { 066 (t = inbound.take()).handle(); 067 } catch (InterruptedException e) { 068 break; 069 } catch (Throwable e) { 070 if (t != null) { 071 t.setThrowable(e); 072 } 073 } finally { 074 if (t != null) { 075 try { 076 outbound.put(t); 077 } catch (InterruptedException e) { 078 break; 079 } catch (Exception e) { 080 throw new RuntimeException("Cannot put task into outbound queue", e); 081 } 082 } 083 } 084 } 085 } 086 } 087 088 /** 089 * Is processor started? 090 */ 091 private static final AtomicBoolean started = new AtomicBoolean(false); 092 093 /** 094 * Execution service. 095 */ 096 private static final ExecutorService runner = Executors.newSingleThreadExecutor(new ProcessorThreadFactory()); 097 098 /** 099 * Static processor. 100 */ 101 private static final AtomicProcessor processor = new AtomicProcessor(); 102 103 /** 104 * Singleton instance. 105 */ 106 private static final WebcamProcessor INSTANCE = new WebcamProcessor();; 107 108 private WebcamProcessor() { 109 } 110 111 /** 112 * Process single webcam task. 113 * 114 * @param task the task to be processed 115 * @throws InterruptedException when thread has been interrupted 116 */ 117 public void process(WebcamTask task) throws InterruptedException { 118 if (started.compareAndSet(false, true)) { 119 runner.execute(processor); 120 } 121 if (!runner.isShutdown()) { 122 processor.process(task); 123 } else { 124 throw new RejectedExecutionException("Cannot process because processor runner has been already shut down"); 125 } 126 } 127 128 public static synchronized WebcamProcessor getInstance() { 129 return INSTANCE; 130 } 131}