1/*
2 * Copyright (C) 2013 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package com.android.accessorydisplay.common;
18
19import android.os.Handler;
20import android.os.Looper;
21import android.os.Message;
22import android.util.SparseArray;
23
24import java.io.IOException;
25import java.nio.ByteBuffer;
26
27/**
28 * A simple message transport.
29 * <p>
30 * This object's interface is thread-safe, however incoming messages
31 * are always delivered on the {@link Looper} thread on which the transport
32 * was created.
33 * </p>
34 */
35public abstract class Transport {
36    private static final int MAX_INPUT_BUFFERS = 8;
37
38    private final Logger mLogger;
39
40    // The transport thread looper and handler.
41    private final TransportHandler mHandler;
42
43    // Lock to guard all mutable state.
44    private final Object mLock = new Object();
45
46    // The output buffer.  Set to null when the transport is closed.
47    private ByteBuffer mOutputBuffer;
48
49    // The input buffer pool.
50    private BufferPool mInputBufferPool;
51
52    // The reader thread.  Initialized when reading starts.
53    private ReaderThread mThread;
54
55    // The list of callbacks indexed by service id.
56    private final SparseArray<Callback> mServices = new SparseArray<Callback>();
57
58    public Transport(Logger logger, int maxPacketSize) {
59        mLogger = logger;
60        mHandler = new TransportHandler();
61        mOutputBuffer = ByteBuffer.allocate(maxPacketSize);
62        mInputBufferPool = new BufferPool(
63                maxPacketSize, Protocol.MAX_ENVELOPE_SIZE, MAX_INPUT_BUFFERS);
64    }
65
66    /**
67     * Gets the logger for debugging.
68     */
69    public Logger getLogger() {
70        return mLogger;
71    }
72
73    /**
74     * Gets the handler on the transport's thread.
75     */
76    public Handler getHandler() {
77        return mHandler;
78    }
79
80    /**
81     * Closes the transport.
82     */
83    public void close() {
84        synchronized (mLock) {
85            if (mOutputBuffer != null) {
86                if (mThread == null) {
87                    ioClose();
88                } else {
89                    // If the thread was started then it will be responsible for
90                    // closing the stream when it quits because it may currently
91                    // be in the process of reading from the stream so we can't simply
92                    // shut it down right now.
93                    mThread.quit();
94                }
95                mOutputBuffer = null;
96            }
97        }
98    }
99
100    /**
101     * Sends a message.
102     *
103     * @param service The service to whom the message is addressed.
104     * @param what The message type.
105     * @param content The content, or null if there is none.
106     * @return True if the message was sent successfully, false if an error occurred.
107     */
108    public boolean sendMessage(int service, int what, ByteBuffer content) {
109        checkServiceId(service);
110        checkMessageId(what);
111
112        try {
113            synchronized (mLock) {
114                if (mOutputBuffer == null) {
115                    mLogger.logError("Send message failed because transport was closed.");
116                    return false;
117                }
118
119                final byte[] outputArray = mOutputBuffer.array();
120                final int capacity = mOutputBuffer.capacity();
121                mOutputBuffer.clear();
122                mOutputBuffer.putShort((short)service);
123                mOutputBuffer.putShort((short)what);
124                if (content == null) {
125                    mOutputBuffer.putInt(0);
126                } else {
127                    final int contentLimit = content.limit();
128                    int contentPosition = content.position();
129                    int contentRemaining = contentLimit - contentPosition;
130                    if (contentRemaining > Protocol.MAX_CONTENT_SIZE) {
131                        throw new IllegalArgumentException("Message content too large: "
132                                + contentRemaining + " > " + Protocol.MAX_CONTENT_SIZE);
133                    }
134                    mOutputBuffer.putInt(contentRemaining);
135                    while (contentRemaining != 0) {
136                        final int outputAvailable = capacity - mOutputBuffer.position();
137                        if (contentRemaining <= outputAvailable) {
138                            mOutputBuffer.put(content);
139                            break;
140                        }
141                        content.limit(contentPosition + outputAvailable);
142                        mOutputBuffer.put(content);
143                        content.limit(contentLimit);
144                        ioWrite(outputArray, 0, capacity);
145                        contentPosition += outputAvailable;
146                        contentRemaining -= outputAvailable;
147                        mOutputBuffer.clear();
148                    }
149                }
150                ioWrite(outputArray, 0, mOutputBuffer.position());
151                return true;
152            }
153        } catch (IOException ex) {
154            mLogger.logError("Send message failed: " + ex);
155            return false;
156        }
157    }
158
159    /**
160     * Starts reading messages on a separate thread.
161     */
162    public void startReading() {
163        synchronized (mLock) {
164            if (mOutputBuffer == null) {
165                throw new IllegalStateException("Transport has been closed");
166            }
167
168            mThread = new ReaderThread();
169            mThread.start();
170        }
171    }
172
173    /**
174     * Registers a service and provides a callback to receive messages.
175     *
176     * @param service The service id.
177     * @param callback The callback to use.
178     */
179    public void registerService(int service, Callback callback) {
180        checkServiceId(service);
181        if (callback == null) {
182            throw new IllegalArgumentException("callback must not be null");
183        }
184
185        synchronized (mLock) {
186            mServices.put(service, callback);
187        }
188    }
189
190    /**
191     * Unregisters a service.
192     *
193     * @param service The service to unregister.
194     */
195    public void unregisterService(int service) {
196        checkServiceId(service);
197
198        synchronized (mLock) {
199            mServices.remove(service);
200        }
201    }
202
203    private void dispatchMessageReceived(int service, int what, ByteBuffer content) {
204        final Callback callback;
205        synchronized (mLock) {
206            callback = mServices.get(service);
207        }
208        if (callback != null) {
209            callback.onMessageReceived(service, what, content);
210        } else {
211            mLogger.log("Discarding message " + what
212                    + " for unregistered service " + service);
213        }
214    }
215
216    private static void checkServiceId(int service) {
217        if (service < 0 || service > 0xffff) {
218            throw new IllegalArgumentException("service id out of range: " + service);
219        }
220    }
221
222    private static void checkMessageId(int what) {
223        if (what < 0 || what > 0xffff) {
224            throw new IllegalArgumentException("message id out of range: " + what);
225        }
226    }
227
228    // The IO methods must be safe to call on any thread.
229    // They may be called concurrently.
230    protected abstract void ioClose();
231    protected abstract int ioRead(byte[] buffer, int offset, int count)
232            throws IOException;
233    protected abstract void ioWrite(byte[] buffer, int offset, int count)
234            throws IOException;
235
236    /**
237     * Callback for services that handle received messages.
238     */
239    public interface Callback {
240        /**
241         * Indicates that a message was received.
242         *
243         * @param service The service to whom the message is addressed.
244         * @param what The message type.
245         * @param content The content, or null if there is none.
246         */
247        public void onMessageReceived(int service, int what, ByteBuffer content);
248    }
249
250    final class TransportHandler extends Handler {
251        @Override
252        public void handleMessage(Message msg) {
253            final ByteBuffer buffer = (ByteBuffer)msg.obj;
254            try {
255                final int limit = buffer.limit();
256                while (buffer.position() < limit) {
257                    final int service = buffer.getShort() & 0xffff;
258                    final int what = buffer.getShort() & 0xffff;
259                    final int contentSize = buffer.getInt();
260                    if (contentSize == 0) {
261                        dispatchMessageReceived(service, what, null);
262                    } else {
263                        final int end = buffer.position() + contentSize;
264                        buffer.limit(end);
265                        dispatchMessageReceived(service, what, buffer);
266                        buffer.limit(limit);
267                        buffer.position(end);
268                    }
269                }
270            } finally {
271                mInputBufferPool.release(buffer);
272            }
273        }
274    }
275
276    final class ReaderThread extends Thread {
277        // Set to true when quitting.
278        private volatile boolean mQuitting;
279
280        public ReaderThread() {
281            super("Accessory Display Transport");
282        }
283
284        @Override
285        public void run() {
286            loop();
287            ioClose();
288        }
289
290        private void loop() {
291            ByteBuffer buffer = null;
292            int length = Protocol.HEADER_SIZE;
293            int contentSize = -1;
294            outer: while (!mQuitting) {
295                // Get a buffer.
296                if (buffer == null) {
297                    buffer = mInputBufferPool.acquire(length);
298                } else {
299                    buffer = mInputBufferPool.grow(buffer, length);
300                }
301
302                // Read more data until needed number of bytes obtained.
303                int position = buffer.position();
304                int count;
305                try {
306                    count = ioRead(buffer.array(), position, buffer.capacity() - position);
307                    if (count < 0) {
308                        break; // end of stream
309                    }
310                } catch (IOException ex) {
311                    mLogger.logError("Read failed: " + ex);
312                    break; // error
313                }
314                position += count;
315                buffer.position(position);
316                if (contentSize < 0 && position >= Protocol.HEADER_SIZE) {
317                    contentSize = buffer.getInt(4);
318                    if (contentSize < 0 || contentSize > Protocol.MAX_CONTENT_SIZE) {
319                        mLogger.logError("Encountered invalid content size: " + contentSize);
320                        break; // malformed stream
321                    }
322                    length += contentSize;
323                }
324                if (position < length) {
325                    continue; // need more data
326                }
327
328                // There is at least one complete message in the buffer.
329                // Find the end of a contiguous chunk of complete messages.
330                int next = length;
331                int remaining;
332                for (;;) {
333                    length = Protocol.HEADER_SIZE;
334                    remaining = position - next;
335                    if (remaining < length) {
336                        contentSize = -1;
337                        break; // incomplete header, need more data
338                    }
339                    contentSize = buffer.getInt(next + 4);
340                    if (contentSize < 0 || contentSize > Protocol.MAX_CONTENT_SIZE) {
341                        mLogger.logError("Encountered invalid content size: " + contentSize);
342                        break outer; // malformed stream
343                    }
344                    length += contentSize;
345                    if (remaining < length) {
346                        break; // incomplete content, need more data
347                    }
348                    next += length;
349                }
350
351                // Post the buffer then don't modify it anymore.
352                // Now this is kind of sneaky.  We know that no other threads will
353                // be acquiring buffers from the buffer pool so we can keep on
354                // referring to this buffer as long as we don't modify its contents.
355                // This allows us to operate in a single-buffered mode if desired.
356                buffer.limit(next);
357                buffer.rewind();
358                mHandler.obtainMessage(0, buffer).sendToTarget();
359
360                // If there is an incomplete message at the end, then we will need
361                // to copy it to a fresh buffer before continuing.  In the single-buffered
362                // case, we may acquire the same buffer as before which is fine.
363                if (remaining == 0) {
364                    buffer = null;
365                } else {
366                    final ByteBuffer oldBuffer = buffer;
367                    buffer = mInputBufferPool.acquire(length);
368                    System.arraycopy(oldBuffer.array(), next, buffer.array(), 0, remaining);
369                    buffer.position(remaining);
370                }
371            }
372
373            if (buffer != null) {
374                mInputBufferPool.release(buffer);
375            }
376        }
377
378        public void quit() {
379            mQuitting = true;
380        }
381    }
382}
383