001 package com.github.sarxos.webcam; 002 003 import java.util.concurrent.ExecutorService; 004 import java.util.concurrent.Executors; 005 import java.util.concurrent.SynchronousQueue; 006 import java.util.concurrent.ThreadFactory; 007 import java.util.concurrent.atomic.AtomicBoolean; 008 009 010 public class WebcamProcessor { 011 012 /** 013 * Thread factory for processor. 014 * 015 * @author Bartosz Firyn (SarXos) 016 */ 017 private static final class ProcessorThreadFactory implements ThreadFactory { 018 019 @Override 020 public Thread newThread(Runnable r) { 021 Thread t = new Thread(r, "atomic-processor"); 022 t.setDaemon(true); 023 return t; 024 } 025 } 026 027 /** 028 * Heart of overall processing system. This class process all native calls 029 * wrapped in tasks, by doing this all tasks executions are 030 * super-synchronized. 031 * 032 * @author Bartosz Firyn (SarXos) 033 */ 034 private static final class AtomicProcessor implements Runnable { 035 036 private SynchronousQueue<WebcamTask> inbound = new SynchronousQueue<WebcamTask>(true); 037 private SynchronousQueue<WebcamTask> outbound = new SynchronousQueue<WebcamTask>(true); 038 039 /** 040 * Process task. 041 * 042 * @param task the task to be processed 043 * @return Processed task 044 * @throws InterruptedException when thread has been interrupted 045 */ 046 public void process(WebcamTask task) throws InterruptedException { 047 048 inbound.put(task); 049 050 Throwable t = outbound.take().getThrowable(); 051 if (t != null) { 052 throw new WebcamException("Cannot execute task", t); 053 } 054 } 055 056 @Override 057 public void run() { 058 while (true) { 059 WebcamTask t = null; 060 try { 061 (t = inbound.take()).handle(); 062 } catch (InterruptedException e) { 063 break; 064 } catch (Throwable e) { 065 t.setThrowable(e); 066 } finally { 067 try { 068 if (t != null) { 069 outbound.put(t); 070 } 071 } catch (InterruptedException e) { 072 break; 073 } 074 } 075 } 076 } 077 } 078 079 /** 080 * Is processor started? 081 */ 082 private static final AtomicBoolean started = new AtomicBoolean(false); 083 084 /** 085 * Execution service. 086 */ 087 private static final ExecutorService runner = Executors.newSingleThreadExecutor(new ProcessorThreadFactory()); 088 089 /** 090 * Static processor. 091 */ 092 private static final AtomicProcessor processor = new AtomicProcessor(); 093 094 /** 095 * Singleton instance. 096 */ 097 private static WebcamProcessor instance = null; 098 099 private WebcamProcessor() { 100 } 101 102 /** 103 * Process single webcam task. 104 * 105 * @param task the task to be processed 106 * @throws InterruptedException when thread has been interrupted 107 */ 108 public void process(WebcamTask task) throws InterruptedException { 109 if (started.compareAndSet(false, true)) { 110 runner.execute(processor); 111 } 112 processor.process(task); 113 } 114 115 public static synchronized WebcamProcessor getInstance() { 116 if (instance == null) { 117 instance = new WebcamProcessor(); 118 } 119 return instance; 120 } 121 }