Connector.java revision 645501c2ab19a559ce82a1d5a29ced159a4c30fb
1// Copyright 2014 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5package org.chromium.mojo.bindings; 6 7import org.chromium.mojo.system.AsyncWaiter; 8import org.chromium.mojo.system.Core; 9import org.chromium.mojo.system.MessagePipeHandle; 10import org.chromium.mojo.system.MessagePipeHandle.ReadMessageResult; 11import org.chromium.mojo.system.MojoException; 12import org.chromium.mojo.system.MojoResult; 13import org.chromium.mojo.system.ResultAnd; 14 15import java.nio.ByteBuffer; 16 17/** 18 * A {@link Connector} owns a {@link MessagePipeHandle} and will send any received messages to the 19 * registered {@link MessageReceiver}. It also acts as a {@link MessageReceiver} and will send any 20 * message through the handle. 21 * <p> 22 * The method |start| must be called before the {@link Connector} will start listening to incoming 23 * messages. 24 */ 25public class Connector implements MessageReceiver, HandleOwner<MessagePipeHandle> { 26 27 /** 28 * The callback that is notified when the state of the owned handle changes. 29 */ 30 private final AsyncWaiterCallback mAsyncWaiterCallback = new AsyncWaiterCallback(); 31 32 /** 33 * The owned message pipe. 34 */ 35 private final MessagePipeHandle mMessagePipeHandle; 36 37 /** 38 * A waiter which is notified when a new message is available on the owned message pipe. 39 */ 40 private final AsyncWaiter mAsyncWaiter; 41 42 /** 43 * The {@link MessageReceiver} to which received messages are sent. 44 */ 45 private MessageReceiver mIncomingMessageReceiver; 46 47 /** 48 * The Cancellable for the current wait. Is |null| when not currently waiting for new messages. 49 */ 50 private AsyncWaiter.Cancellable mCancellable; 51 52 /** 53 * The error handler to notify of errors. 54 */ 55 private ConnectionErrorHandler mErrorHandler; 56 57 /** 58 * Create a new connector over a |messagePipeHandle|. The created connector will use the default 59 * {@link AsyncWaiter} from the {@link Core} implementation of |messagePipeHandle|. 60 */ 61 public Connector(MessagePipeHandle messagePipeHandle) { 62 this(messagePipeHandle, BindingsHelper.getDefaultAsyncWaiterForHandle(messagePipeHandle)); 63 } 64 65 /** 66 * Create a new connector over a |messagePipeHandle| using the given {@link AsyncWaiter} to get 67 * notified of changes on the handle. 68 */ 69 public Connector(MessagePipeHandle messagePipeHandle, AsyncWaiter asyncWaiter) { 70 mCancellable = null; 71 mMessagePipeHandle = messagePipeHandle; 72 mAsyncWaiter = asyncWaiter; 73 } 74 75 /** 76 * Set the {@link MessageReceiver} that will receive message from the owned message pipe. 77 */ 78 public void setIncomingMessageReceiver(MessageReceiver incomingMessageReceiver) { 79 mIncomingMessageReceiver = incomingMessageReceiver; 80 } 81 82 /** 83 * Set the {@link ConnectionErrorHandler} that will be notified of errors on the owned message 84 * pipe. 85 */ 86 public void setErrorHandler(ConnectionErrorHandler errorHandler) { 87 mErrorHandler = errorHandler; 88 } 89 90 /** 91 * Start listening for incoming messages. 92 */ 93 public void start() { 94 assert mCancellable == null; 95 registerAsyncWaiterForRead(); 96 } 97 98 /** 99 * @see MessageReceiver#accept(Message) 100 */ 101 @Override 102 public boolean accept(Message message) { 103 try { 104 mMessagePipeHandle.writeMessage(message.getData(), 105 message.getHandles(), MessagePipeHandle.WriteFlags.NONE); 106 return true; 107 } catch (MojoException e) { 108 onError(e); 109 return false; 110 } 111 } 112 113 /** 114 * Pass the owned handle of the connector. After this, the connector is disconnected. It cannot 115 * accept new message and it isn't listening to the handle anymore. 116 * 117 * @see org.chromium.mojo.bindings.HandleOwner#passHandle() 118 */ 119 @Override 120 public MessagePipeHandle passHandle() { 121 cancelIfActive(); 122 MessagePipeHandle handle = mMessagePipeHandle.pass(); 123 if (mIncomingMessageReceiver != null) { 124 mIncomingMessageReceiver.close(); 125 } 126 return handle; 127 } 128 129 /** 130 * @see java.io.Closeable#close() 131 */ 132 @Override 133 public void close() { 134 cancelIfActive(); 135 mMessagePipeHandle.close(); 136 if (mIncomingMessageReceiver != null) { 137 MessageReceiver incomingMessageReceiver = mIncomingMessageReceiver; 138 mIncomingMessageReceiver = null; 139 incomingMessageReceiver.close(); 140 } 141 } 142 143 private class AsyncWaiterCallback implements AsyncWaiter.Callback { 144 145 /** 146 * @see org.chromium.mojo.system.AsyncWaiter.Callback#onResult(int) 147 */ 148 @Override 149 public void onResult(int result) { 150 Connector.this.onAsyncWaiterResult(result); 151 } 152 153 /** 154 * @see org.chromium.mojo.system.AsyncWaiter.Callback#onError(MojoException) 155 */ 156 @Override 157 public void onError(MojoException exception) { 158 mCancellable = null; 159 Connector.this.onError(exception); 160 } 161 162 } 163 164 /** 165 * @see org.chromium.mojo.system.AsyncWaiter.Callback#onResult(int) 166 */ 167 private void onAsyncWaiterResult(int result) { 168 mCancellable = null; 169 if (result == MojoResult.OK) { 170 readOutstandingMessages(); 171 } else { 172 onError(new MojoException(result)); 173 } 174 } 175 176 private void onError(MojoException exception) { 177 close(); 178 assert mCancellable == null; 179 if (mErrorHandler != null) { 180 mErrorHandler.onConnectionError(exception); 181 } 182 } 183 184 /** 185 * Register to be called back when a new message is available on the owned message pipe. 186 */ 187 private void registerAsyncWaiterForRead() { 188 assert mCancellable == null; 189 if (mAsyncWaiter != null) { 190 mCancellable = mAsyncWaiter.asyncWait(mMessagePipeHandle, Core.HandleSignals.READABLE, 191 Core.DEADLINE_INFINITE, mAsyncWaiterCallback); 192 } else { 193 onError(new MojoException(MojoResult.INVALID_ARGUMENT)); 194 } 195 } 196 197 /** 198 * Read all available messages on the owned message pipe. 199 */ 200 private void readOutstandingMessages() { 201 ResultAnd<Boolean> result; 202 do { 203 try { 204 result = readAndDispatchMessage(mMessagePipeHandle, mIncomingMessageReceiver); 205 } catch (MojoException e) { 206 onError(e); 207 return; 208 } 209 } while (result.getValue()); 210 if (result.getMojoResult() == MojoResult.SHOULD_WAIT) { 211 registerAsyncWaiterForRead(); 212 } else { 213 onError(new MojoException(result.getMojoResult())); 214 } 215 } 216 217 private void cancelIfActive() { 218 if (mCancellable != null) { 219 mCancellable.cancel(); 220 mCancellable = null; 221 } 222 } 223 224 /** 225 * Read a message, and pass it to the given |MessageReceiver| if not null. If the 226 * |MessageReceiver| is null, the message is lost. 227 * 228 * @param receiver The {@link MessageReceiver} that will receive the read {@link Message}. Can 229 * be <code>null</code>, in which case the message is discarded. 230 */ 231 static ResultAnd<Boolean> readAndDispatchMessage( 232 MessagePipeHandle handle, MessageReceiver receiver) { 233 // TODO(qsr) Allow usage of a pool of pre-allocated buffer for performance. 234 ResultAnd<ReadMessageResult> result = 235 handle.readMessage(null, 0, MessagePipeHandle.ReadFlags.NONE); 236 if (result.getMojoResult() != MojoResult.RESOURCE_EXHAUSTED) { 237 return new ResultAnd<Boolean>(result.getMojoResult(), false); 238 } 239 ReadMessageResult readResult = result.getValue(); 240 assert readResult != null; 241 ByteBuffer buffer = ByteBuffer.allocateDirect(readResult.getMessageSize()); 242 result = handle.readMessage( 243 buffer, readResult.getHandlesCount(), MessagePipeHandle.ReadFlags.NONE); 244 if (receiver != null && result.getMojoResult() == MojoResult.OK) { 245 boolean accepted = receiver.accept(new Message(buffer, result.getValue().getHandles())); 246 return new ResultAnd<Boolean>(result.getMojoResult(), accepted); 247 } 248 return new ResultAnd<Boolean>(result.getMojoResult(), false); 249 } 250} 251