Connector.java revision 645501c2ab19a559ce82a1d5a29ced159a4c30fb
1// Copyright 2014 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5package org.chromium.mojo.bindings;
6
7import org.chromium.mojo.system.AsyncWaiter;
8import org.chromium.mojo.system.Core;
9import org.chromium.mojo.system.MessagePipeHandle;
10import org.chromium.mojo.system.MessagePipeHandle.ReadMessageResult;
11import org.chromium.mojo.system.MojoException;
12import org.chromium.mojo.system.MojoResult;
13import org.chromium.mojo.system.ResultAnd;
14
15import java.nio.ByteBuffer;
16
17/**
18 * A {@link Connector} owns a {@link MessagePipeHandle} and will send any received messages to the
19 * registered {@link MessageReceiver}. It also acts as a {@link MessageReceiver} and will send any
20 * message through the handle.
21 * <p>
22 * The method |start| must be called before the {@link Connector} will start listening to incoming
23 * messages.
24 */
25public class Connector implements MessageReceiver, HandleOwner<MessagePipeHandle> {
26
27    /**
28     * The callback that is notified when the state of the owned handle changes.
29     */
30    private final AsyncWaiterCallback mAsyncWaiterCallback = new AsyncWaiterCallback();
31
32    /**
33     * The owned message pipe.
34     */
35    private final MessagePipeHandle mMessagePipeHandle;
36
37    /**
38     * A waiter which is notified when a new message is available on the owned message pipe.
39     */
40    private final AsyncWaiter mAsyncWaiter;
41
42    /**
43     * The {@link MessageReceiver} to which received messages are sent.
44     */
45    private MessageReceiver mIncomingMessageReceiver;
46
47    /**
48     * The Cancellable for the current wait. Is |null| when not currently waiting for new messages.
49     */
50    private AsyncWaiter.Cancellable mCancellable;
51
52    /**
53     * The error handler to notify of errors.
54     */
55    private ConnectionErrorHandler mErrorHandler;
56
57    /**
58     * Create a new connector over a |messagePipeHandle|. The created connector will use the default
59     * {@link AsyncWaiter} from the {@link Core} implementation of |messagePipeHandle|.
60     */
61    public Connector(MessagePipeHandle messagePipeHandle) {
62        this(messagePipeHandle, BindingsHelper.getDefaultAsyncWaiterForHandle(messagePipeHandle));
63    }
64
65    /**
66     * Create a new connector over a |messagePipeHandle| using the given {@link AsyncWaiter} to get
67     * notified of changes on the handle.
68     */
69    public Connector(MessagePipeHandle messagePipeHandle, AsyncWaiter asyncWaiter) {
70        mCancellable = null;
71        mMessagePipeHandle = messagePipeHandle;
72        mAsyncWaiter = asyncWaiter;
73    }
74
75    /**
76     * Set the {@link MessageReceiver} that will receive message from the owned message pipe.
77     */
78    public void setIncomingMessageReceiver(MessageReceiver incomingMessageReceiver) {
79        mIncomingMessageReceiver = incomingMessageReceiver;
80    }
81
82    /**
83     * Set the {@link ConnectionErrorHandler} that will be notified of errors on the owned message
84     * pipe.
85     */
86    public void setErrorHandler(ConnectionErrorHandler errorHandler) {
87        mErrorHandler = errorHandler;
88    }
89
90    /**
91     * Start listening for incoming messages.
92     */
93    public void start() {
94        assert mCancellable == null;
95        registerAsyncWaiterForRead();
96    }
97
98    /**
99     * @see MessageReceiver#accept(Message)
100     */
101    @Override
102    public boolean accept(Message message) {
103        try {
104            mMessagePipeHandle.writeMessage(message.getData(),
105                    message.getHandles(), MessagePipeHandle.WriteFlags.NONE);
106            return true;
107        } catch (MojoException e) {
108            onError(e);
109            return false;
110        }
111    }
112
113    /**
114     * Pass the owned handle of the connector. After this, the connector is disconnected. It cannot
115     * accept new message and it isn't listening to the handle anymore.
116     *
117     * @see org.chromium.mojo.bindings.HandleOwner#passHandle()
118     */
119    @Override
120    public MessagePipeHandle passHandle() {
121        cancelIfActive();
122        MessagePipeHandle handle = mMessagePipeHandle.pass();
123        if (mIncomingMessageReceiver != null) {
124            mIncomingMessageReceiver.close();
125        }
126        return handle;
127    }
128
129    /**
130     * @see java.io.Closeable#close()
131     */
132    @Override
133    public void close() {
134        cancelIfActive();
135        mMessagePipeHandle.close();
136        if (mIncomingMessageReceiver != null) {
137            MessageReceiver incomingMessageReceiver = mIncomingMessageReceiver;
138            mIncomingMessageReceiver = null;
139            incomingMessageReceiver.close();
140        }
141    }
142
143    private class AsyncWaiterCallback implements AsyncWaiter.Callback {
144
145        /**
146         * @see org.chromium.mojo.system.AsyncWaiter.Callback#onResult(int)
147         */
148        @Override
149        public void onResult(int result) {
150            Connector.this.onAsyncWaiterResult(result);
151        }
152
153        /**
154         * @see org.chromium.mojo.system.AsyncWaiter.Callback#onError(MojoException)
155         */
156        @Override
157        public void onError(MojoException exception) {
158            mCancellable = null;
159            Connector.this.onError(exception);
160        }
161
162    }
163
164    /**
165     * @see org.chromium.mojo.system.AsyncWaiter.Callback#onResult(int)
166     */
167    private void onAsyncWaiterResult(int result) {
168        mCancellable = null;
169        if (result == MojoResult.OK) {
170            readOutstandingMessages();
171        } else {
172            onError(new MojoException(result));
173        }
174    }
175
176    private void onError(MojoException exception) {
177        close();
178        assert mCancellable == null;
179        if (mErrorHandler != null) {
180            mErrorHandler.onConnectionError(exception);
181        }
182    }
183
184    /**
185     * Register to be called back when a new message is available on the owned message pipe.
186     */
187    private void registerAsyncWaiterForRead() {
188        assert mCancellable == null;
189        if (mAsyncWaiter != null) {
190            mCancellable = mAsyncWaiter.asyncWait(mMessagePipeHandle, Core.HandleSignals.READABLE,
191                    Core.DEADLINE_INFINITE, mAsyncWaiterCallback);
192        } else {
193            onError(new MojoException(MojoResult.INVALID_ARGUMENT));
194        }
195    }
196
197    /**
198     * Read all available messages on the owned message pipe.
199     */
200    private void readOutstandingMessages() {
201        ResultAnd<Boolean> result;
202        do {
203            try {
204                result = readAndDispatchMessage(mMessagePipeHandle, mIncomingMessageReceiver);
205            } catch (MojoException e) {
206                onError(e);
207                return;
208            }
209        } while (result.getValue());
210        if (result.getMojoResult() == MojoResult.SHOULD_WAIT) {
211            registerAsyncWaiterForRead();
212        } else {
213            onError(new MojoException(result.getMojoResult()));
214        }
215    }
216
217    private void cancelIfActive() {
218        if (mCancellable != null) {
219            mCancellable.cancel();
220            mCancellable = null;
221        }
222    }
223
224    /**
225     * Read a message, and pass it to the given |MessageReceiver| if not null. If the
226     * |MessageReceiver| is null, the message is lost.
227     *
228     * @param receiver The {@link MessageReceiver} that will receive the read {@link Message}. Can
229     *            be <code>null</code>, in which case the message is discarded.
230     */
231    static ResultAnd<Boolean> readAndDispatchMessage(
232            MessagePipeHandle handle, MessageReceiver receiver) {
233        // TODO(qsr) Allow usage of a pool of pre-allocated buffer for performance.
234        ResultAnd<ReadMessageResult> result =
235                handle.readMessage(null, 0, MessagePipeHandle.ReadFlags.NONE);
236        if (result.getMojoResult() != MojoResult.RESOURCE_EXHAUSTED) {
237            return new ResultAnd<Boolean>(result.getMojoResult(), false);
238        }
239        ReadMessageResult readResult = result.getValue();
240        assert readResult != null;
241        ByteBuffer buffer = ByteBuffer.allocateDirect(readResult.getMessageSize());
242        result = handle.readMessage(
243                buffer, readResult.getHandlesCount(), MessagePipeHandle.ReadFlags.NONE);
244        if (receiver != null && result.getMojoResult() == MojoResult.OK) {
245            boolean accepted = receiver.accept(new Message(buffer, result.getValue().getHandles()));
246            return new ResultAnd<Boolean>(result.getMojoResult(), accepted);
247        }
248        return new ResultAnd<Boolean>(result.getMojoResult(), false);
249    }
250}
251