RouterImpl.java revision 645501c2ab19a559ce82a1d5a29ced159a4c30fb
1// Copyright 2014 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5package org.chromium.mojo.bindings;
6
7import android.annotation.SuppressLint;
8
9import org.chromium.mojo.system.AsyncWaiter;
10import org.chromium.mojo.system.Core;
11import org.chromium.mojo.system.MessagePipeHandle;
12
13import java.util.HashMap;
14import java.util.Map;
15import java.util.concurrent.Executor;
16
17/**
18 * Implementation of {@link Router}.
19 */
20@SuppressLint("UseSparseArrays")  // https://crbug.com/600699
21public class RouterImpl implements Router {
22
23    /**
24     * {@link MessageReceiver} used as the {@link Connector} callback.
25     */
26    private class HandleIncomingMessageThunk implements MessageReceiver {
27
28        /**
29         * @see MessageReceiver#accept(Message)
30         */
31        @Override
32        public boolean accept(Message message) {
33            return handleIncomingMessage(message);
34        }
35
36        /**
37         * @see MessageReceiver#close()
38         */
39        @Override
40        public void close() {
41            handleConnectorClose();
42        }
43
44    }
45
46    /**
47     *
48     * {@link MessageReceiver} used to return responses to the caller.
49     */
50    class ResponderThunk implements MessageReceiver {
51        private boolean mAcceptWasInvoked = false;
52
53        /**
54         * @see
55         * MessageReceiver#accept(Message)
56         */
57        @Override
58        public boolean accept(Message message) {
59            mAcceptWasInvoked = true;
60            return RouterImpl.this.accept(message);
61        }
62
63        /**
64         * @see MessageReceiver#close()
65         */
66        @Override
67        public void close() {
68            RouterImpl.this.close();
69        }
70
71        @Override
72        protected void finalize() throws Throwable {
73            if (!mAcceptWasInvoked) {
74                // We close the pipe here as a way of signaling to the calling application that an
75                // error condition occurred. Without this the calling application would have no
76                // way of knowing it should stop waiting for a response.
77                RouterImpl.this.closeOnHandleThread();
78            }
79            super.finalize();
80        }
81    }
82
83    /**
84     * The {@link Connector} which is connected to the handle.
85     */
86    private final Connector mConnector;
87
88    /**
89     * The {@link MessageReceiverWithResponder} that will consume the messages received from the
90     * pipe.
91     */
92    private MessageReceiverWithResponder mIncomingMessageReceiver;
93
94    /**
95     * The next id to use for a request id which needs a response. It is auto-incremented.
96     */
97    private long mNextRequestId = 1;
98
99    /**
100     * The map from request ids to {@link MessageReceiver} of request currently in flight.
101     */
102    private Map<Long, MessageReceiver> mResponders = new HashMap<Long, MessageReceiver>();
103
104    /**
105     * An Executor that will run on the thread associated with the MessagePipe to which
106     * this Router is bound. This may be {@code Null} if the MessagePipeHandle passed
107     * in to the constructor is not valid.
108     */
109    private final Executor mExecutor;
110
111    /**
112     * Constructor that will use the default {@link AsyncWaiter}.
113     *
114     * @param messagePipeHandle The {@link MessagePipeHandle} to route message for.
115     */
116    public RouterImpl(MessagePipeHandle messagePipeHandle) {
117        this(messagePipeHandle, BindingsHelper.getDefaultAsyncWaiterForHandle(messagePipeHandle));
118    }
119
120    /**
121     * Constructor.
122     *
123     * @param messagePipeHandle The {@link MessagePipeHandle} to route message for.
124     * @param asyncWaiter the {@link AsyncWaiter} to use to get notification of new messages on the
125     *            handle.
126     */
127    public RouterImpl(MessagePipeHandle messagePipeHandle, AsyncWaiter asyncWaiter) {
128        mConnector = new Connector(messagePipeHandle, asyncWaiter);
129        mConnector.setIncomingMessageReceiver(new HandleIncomingMessageThunk());
130        Core core = messagePipeHandle.getCore();
131        if (core != null) {
132            mExecutor = ExecutorFactory.getExecutorForCurrentThread(core);
133        } else {
134            mExecutor = null;
135        }
136    }
137
138    /**
139     * @see org.chromium.mojo.bindings.Router#start()
140     */
141    @Override
142    public void start() {
143        mConnector.start();
144    }
145
146    /**
147     * @see Router#setIncomingMessageReceiver(MessageReceiverWithResponder)
148     */
149    @Override
150    public void setIncomingMessageReceiver(MessageReceiverWithResponder incomingMessageReceiver) {
151        this.mIncomingMessageReceiver = incomingMessageReceiver;
152    }
153
154    /**
155     * @see MessageReceiver#accept(Message)
156     */
157    @Override
158    public boolean accept(Message message) {
159        // A message without responder is directly forwarded to the connector.
160        return mConnector.accept(message);
161    }
162
163    /**
164     * @see MessageReceiverWithResponder#acceptWithResponder(Message, MessageReceiver)
165     */
166    @Override
167    public boolean acceptWithResponder(Message message, MessageReceiver responder) {
168        // The message must have a header.
169        ServiceMessage messageWithHeader = message.asServiceMessage();
170        // Checking the message expects a response.
171        assert messageWithHeader.getHeader().hasFlag(MessageHeader.MESSAGE_EXPECTS_RESPONSE_FLAG);
172
173        // Compute a request id for being able to route the response.
174        // TODO(lhchavez): Remove this hack. See b/28986534 for details.
175        synchronized (mResponders) {
176            long requestId = mNextRequestId++;
177            // Reserve 0 in case we want it to convey special meaning in the future.
178            if (requestId == 0) {
179                requestId = mNextRequestId++;
180            }
181            if (mResponders.containsKey(requestId)) {
182                throw new IllegalStateException("Unable to find a new request identifier.");
183            }
184            messageWithHeader.setRequestId(requestId);
185            if (!mConnector.accept(messageWithHeader)) {
186                return false;
187            }
188            // Only keep the responder is the message has been accepted.
189            mResponders.put(requestId, responder);
190        }
191        return true;
192    }
193
194    /**
195     * @see org.chromium.mojo.bindings.HandleOwner#passHandle()
196     */
197    @Override
198    public MessagePipeHandle passHandle() {
199        return mConnector.passHandle();
200    }
201
202    /**
203     * @see java.io.Closeable#close()
204     */
205    @Override
206    public void close() {
207        mConnector.close();
208    }
209
210    /**
211     * @see Router#setErrorHandler(ConnectionErrorHandler)
212     */
213    @Override
214    public void setErrorHandler(ConnectionErrorHandler errorHandler) {
215        mConnector.setErrorHandler(errorHandler);
216    }
217
218    /**
219     * Receive a message from the connector. Returns |true| if the message has been handled.
220     */
221    private boolean handleIncomingMessage(Message message) {
222        MessageHeader header = message.asServiceMessage().getHeader();
223        if (header.hasFlag(MessageHeader.MESSAGE_EXPECTS_RESPONSE_FLAG)) {
224            if (mIncomingMessageReceiver != null) {
225                return mIncomingMessageReceiver.acceptWithResponder(message, new ResponderThunk());
226            }
227            // If we receive a request expecting a response when the client is not
228            // listening, then we have no choice but to tear down the pipe.
229            close();
230            return false;
231        } else if (header.hasFlag(MessageHeader.MESSAGE_IS_RESPONSE_FLAG)) {
232            long requestId = header.getRequestId();
233            MessageReceiver responder;
234            // TODO(lhchavez): Remove this hack. See b/28986534 for details.
235            synchronized (mResponders) {
236                responder = mResponders.get(requestId);
237                if (responder == null) {
238                    return false;
239                }
240                mResponders.remove(requestId);
241            }
242            return responder.accept(message);
243        } else {
244            if (mIncomingMessageReceiver != null) {
245                return mIncomingMessageReceiver.accept(message);
246            }
247            // OK to drop the message.
248        }
249        return false;
250    }
251
252    private void handleConnectorClose() {
253        if (mIncomingMessageReceiver != null) {
254            mIncomingMessageReceiver.close();
255        }
256    }
257
258    /**
259     * Invokes {@link #close()} asynchronously on the thread associated with
260     * this Router's Handle. If this Router was constructed with an invalid
261     * handle then this method does nothing.
262     */
263    private void closeOnHandleThread() {
264        if (mExecutor != null) {
265            mExecutor.execute(new Runnable() {
266
267                @Override
268                public void run() {
269                    close();
270                }
271            });
272        }
273    }
274}
275