001package com.github.sarxos.webcam;
002
003import java.awt.image.BufferedImage;
004import java.io.BufferedOutputStream;
005import java.io.BufferedReader;
006import java.io.ByteArrayOutputStream;
007import java.io.Closeable;
008import java.io.IOException;
009import java.io.InputStreamReader;
010import java.net.ServerSocket;
011import java.net.Socket;
012import java.net.SocketException;
013import java.util.concurrent.ExecutorService;
014import java.util.concurrent.Executors;
015import java.util.concurrent.ThreadFactory;
016import java.util.concurrent.atomic.AtomicBoolean;
017
018import javax.imageio.ImageIO;
019
020import org.slf4j.Logger;
021import org.slf4j.LoggerFactory;
022
023
024public class WebcamStreamer implements ThreadFactory, WebcamListener {
025
026        private static final Logger LOG = LoggerFactory.getLogger(WebcamStreamer.class);
027
028        private static final String BOUNDARY = "mjpegframe";
029
030        private static final String CRLF = "\r\n";
031
032        private class Acceptor implements Runnable {
033
034                @Override
035                public void run() {
036                        try {
037                                ServerSocket server = new ServerSocket(port);
038                                while (started.get()) {
039                                        Socket socket = server.accept();
040                                        LOG.info("New connection from {}", socket.getRemoteSocketAddress());
041                                        executor.execute(new Connection(socket));
042                                }
043                        } catch (Exception e) {
044                                LOG.error("Cannot accept socket connection", e);
045                        }
046                }
047        }
048
049        private class Connection implements Runnable {
050
051                private Socket socket = null;
052
053                public Connection(Socket socket) {
054                        this.socket = socket;
055                }
056
057                @Override
058                public void run() {
059
060                        BufferedReader br = null;
061                        BufferedOutputStream bos = null;
062                        ByteArrayOutputStream baos = new ByteArrayOutputStream();
063
064                        try {
065                                br = new BufferedReader(new InputStreamReader(socket.getInputStream()));
066                                bos = new BufferedOutputStream(socket.getOutputStream());
067                        } catch (IOException e) {
068                                LOG.error("Fatal I/O exception when creating socket streams", e);
069                                try {
070                                        socket.close();
071                                } catch (IOException e1) {
072                                        LOG.error("Canot close socket connection from " + socket.getRemoteSocketAddress(), e1);
073                                }
074                                return;
075                        }
076
077                        // consume whole input
078
079                        try {
080                                while (br.ready()) {
081                                        br.readLine();
082                                }
083                        } catch (IOException e) {
084                                LOG.error("Error when reading input", e);
085                                return;
086                        }
087
088                        // stream
089
090                        try {
091
092                                socket.setSoTimeout(0);
093                                socket.setKeepAlive(false);
094                                socket.setTcpNoDelay(true);
095
096                                while (started.get()) {
097
098                                        StringBuilder sb = new StringBuilder();
099                                        sb.append("HTTP/1.0 200 OK").append(CRLF);
100                                        sb.append("Connection: close").append(CRLF);
101                                        sb.append("Cache-Control: no-cache").append(CRLF);
102                                        sb.append("Cache-Control: private").append(CRLF);
103                                        sb.append("Pragma: no-cache").append(CRLF);
104                                        sb.append("Content-type: multipart/x-mixed-replace; boundary=--").append(BOUNDARY).append(CRLF);
105                                        sb.append(CRLF);
106
107                                        bos.write(sb.toString().getBytes());
108
109                                        do {
110
111                                                if (!webcam.isOpen() || socket.isInputShutdown() || socket.isClosed()) {
112                                                        br.close();
113                                                        bos.close();
114                                                        return;
115                                                }
116
117                                                baos.reset();
118
119                                                long now = System.currentTimeMillis();
120                                                if (now > last + delay) {
121                                                        image = webcam.getImage();
122                                                }
123
124                                                ImageIO.write(image, "JPG", baos);
125
126                                                sb.delete(0, sb.length());
127                                                sb.append("--").append(BOUNDARY).append(CRLF);
128                                                sb.append("Content-type: image/jpeg").append(CRLF);
129                                                sb.append("Content-Length: ").append(baos.size()).append(CRLF);
130                                                sb.append(CRLF);
131
132                                                try {
133                                                        bos.write(sb.toString().getBytes());
134                                                        bos.write(baos.toByteArray());
135                                                        bos.write(CRLF.getBytes());
136                                                        bos.flush();
137                                                } catch (SocketException e) {
138                                                        LOG.error("Socket exception from " + socket.getRemoteSocketAddress(), e);
139                                                        br.close();
140                                                        bos.close();
141                                                        return;
142                                                }
143
144                                                Thread.sleep(delay);
145
146                                        } while (started.get());
147                                }
148                        } catch (Exception e) {
149
150                                String message = e.getMessage();
151
152                                if (message != null) {
153                                        if (message.startsWith("Software caused connection abort")) {
154                                                LOG.info("User closed stream");
155                                                return;
156                                        }
157                                        if (message.startsWith("Broken pipe")) {
158                                                LOG.info("User connection broken");
159                                                return;
160                                        }
161                                }
162
163                                LOG.error("Error", e);
164
165                                try {
166                                        bos.write("HTTP/1.0 501 Internal Server Error\r\n\r\n\r\n".getBytes());
167                                } catch (IOException e1) {
168                                        LOG.error("Not ablte to write to output stream", e);
169                                }
170
171                        } finally {
172                                for (Closeable closeable : new Closeable[] { br, bos, baos }) {
173                                        try {
174                                                closeable.close();
175                                        } catch (IOException e) {
176                                                LOG.error("Cannot close socket", e);
177                                        }
178                                }
179                                try {
180                                        socket.close();
181                                } catch (IOException e) {
182                                        LOG.error("Cannot close socket", e);
183                                }
184                        }
185                }
186        }
187
188        private Webcam webcam = null;
189        private double fps = 0;
190        private int number = 0;
191        private int port = 0;
192        private long last = -1;
193        private long delay = -1;
194        private BufferedImage image = null;
195        private ExecutorService executor = Executors.newCachedThreadPool(this);
196        private AtomicBoolean started = new AtomicBoolean(false);
197
198        public WebcamStreamer(int port, Webcam webcam, double fps, boolean start) {
199
200                if (webcam == null) {
201                        throw new IllegalArgumentException("Webcam for streaming cannot be null");
202                }
203
204                this.port = port;
205                this.webcam = webcam;
206                this.fps = fps;
207                this.delay = (long) (1000 / fps);
208
209                if (start) {
210                        start();
211                }
212        }
213
214        @Override
215        public Thread newThread(Runnable r) {
216                Thread thread = new Thread(r, String.format("streamer-thread-%s", number++));
217                thread.setUncaughtExceptionHandler(WebcamExceptionHandler.getInstance());
218                thread.setDaemon(true);
219                return thread;
220        }
221
222        public void start() {
223                if (started.compareAndSet(false, true)) {
224                        webcam.addWebcamListener(this);
225                        webcam.open();
226                        executor.execute(new Acceptor());
227                }
228        }
229
230        public void stop() {
231                if (started.compareAndSet(true, false)) {
232                        executor.shutdown();
233                        webcam.removeWebcamListener(this);
234                        webcam.close();
235                }
236        }
237
238        @Override
239        public void webcamOpen(WebcamEvent we) {
240                start();
241        }
242
243        @Override
244        public void webcamClosed(WebcamEvent we) {
245                stop();
246        }
247
248        @Override
249        public void webcamDisposed(WebcamEvent we) {
250        }
251
252        @Override
253        public void webcamImageObtained(WebcamEvent we) {
254        }
255
256        public double getFPS() {
257                return fps;
258        }
259
260        public boolean isInitialized() {
261                return started.get();
262        }
263
264        public int getPort() {
265                return port;
266        }
267
268}