/* * Copyright 2009 Mike Cumings * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.kenai.jbosh; import com.kenai.jbosh.ComposableBody.Builder; import java.util.ArrayList; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Queue; import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; import java.util.logging.Level; import java.util.logging.Logger; /** * BOSH Client session instance. Each communication session with a remote * connection manager is represented and handled by an instance of this * class. This is the main entry point for client-side communications. * To create a new session, a client configuration must first be created * and then used to create a client instance: *
 * BOSHClientConfig cfg = BOSHClientConfig.Builder.create(
 *         "http://server:1234/httpbind", "jabber.org")
 *     .setFrom("user@jabber.org")
 *     .build();
 * BOSHClient client = BOSHClient.create(cfg);
 * 
* Additional client configuration options are available. See the * {@code BOSHClientConfig.Builder} class for more information. *

* Once a {@code BOSHClient} instance has been created, communication with * the remote connection manager can begin. No attempt will be made to * establish a connection to the connection manager until the first call * is made to the {@code send(ComposableBody)} method. Note that it is * possible to send an empty body to cause an immediate connection attempt * to the connection manager. Sending an empty message would look like * the following: *

 * client.send(ComposableBody.builder().build());
 * 
* For more information on creating body messages with content, see the * {@code ComposableBody.Builder} class documentation. *

* Once a session has been successfully started, the client instance can be * used to send arbitrary payload data. All aspects of the BOSH * protocol involving setting and processing attributes in the BOSH * namespace will be handled by the client code transparently and behind the * scenes. The user of the client instance can therefore concentrate * entirely on the content of the message payload, leaving the semantics of * the BOSH protocol to the client implementation. *

* To be notified of incoming messages from the remote connection manager, * a {@code BOSHClientResponseListener} should be added to the client instance. * All incoming messages will be published to all response listeners as they * arrive and are processed. As with the transmission of payload data via * the {@code send(ComposableBody)} method, there is no need to worry about * handling of the BOSH attributes, since this is handled behind the scenes. *

* If the connection to the remote connection manager is terminated (either * explicitly or due to a terminal condition of some sort), all connection * listeners will be notified. After the connection has been closed, the * client instance is considered dead and a new one must be created in order * to resume communications with the remote server. *

* Instances of this class are thread-safe. * * @see BOSHClientConfig.Builder * @see BOSHClientResponseListener * @see BOSHClientConnListener * @see ComposableBody.Builder */ public final class BOSHClient { /** * Logger. */ private static final Logger LOG = Logger.getLogger( BOSHClient.class.getName()); /** * Value of the 'type' attribute used for session termination. */ private static final String TERMINATE = "terminate"; /** * Value of the 'type' attribute used for recoverable errors. */ private static final String ERROR = "error"; /** * Message to use for interrupted exceptions. */ private static final String INTERRUPTED = "Interrupted"; /** * Message used for unhandled exceptions. */ private static final String UNHANDLED = "Unhandled Exception"; /** * Message used whena null listener is detected. */ private static final String NULL_LISTENER = "Listener may not b enull"; /** * Default empty request delay. */ private static final int DEFAULT_EMPTY_REQUEST_DELAY = 100; /** * Amount of time to wait before sending an empty request, in * milliseconds. */ private static final int EMPTY_REQUEST_DELAY = Integer.getInteger( BOSHClient.class.getName() + ".emptyRequestDelay", DEFAULT_EMPTY_REQUEST_DELAY); /** * Default value for the pause margin. */ private static final int DEFAULT_PAUSE_MARGIN = 500; /** * The amount of time in milliseconds which will be reserved as a * safety margin when scheduling empty requests against a maxpause * value. This should give us enough time to build the message * and transport it to the remote host. */ private static final int PAUSE_MARGIN = Integer.getInteger( BOSHClient.class.getName() + ".pauseMargin", DEFAULT_PAUSE_MARGIN); /** * Flag indicating whether or not we want to perform assertions. */ private static final boolean ASSERTIONS; /** * Connection listeners. */ private final Set connListeners = new CopyOnWriteArraySet(); /** * Request listeners. */ private final Set requestListeners = new CopyOnWriteArraySet(); /** * Response listeners. */ private final Set responseListeners = new CopyOnWriteArraySet(); /** * Lock instance. */ private final ReentrantLock lock = new ReentrantLock(); /** * Condition indicating that there are messages to be exchanged. */ private final Condition notEmpty = lock.newCondition(); /** * Condition indicating that there are available slots for sending * messages. */ private final Condition notFull = lock.newCondition(); /** * Condition indicating that there are no outstanding connections. */ private final Condition drained = lock.newCondition(); /** * Session configuration. */ private final BOSHClientConfig cfg; /** * Processor thread runnable instance. */ private final Runnable procRunnable = new Runnable() { /** * Process incoming messages. */ public void run() { processMessages(); } }; /** * Processor thread runnable instance. */ private final Runnable emptyRequestRunnable = new Runnable() { /** * Process incoming messages. */ public void run() { sendEmptyRequest(); } }; /** * HTTPSender instance. */ private final HTTPSender httpSender = new ApacheHTTPSender(); /** * Storage for test hook implementation. */ private final AtomicReference exchInterceptor = new AtomicReference(); /** * Request ID sequence to use for the session. */ private final RequestIDSequence requestIDSeq = new RequestIDSequence(); /** * ScheduledExcecutor to use for deferred tasks. */ private final ScheduledExecutorService schedExec = Executors.newSingleThreadScheduledExecutor(); /************************************************************ * The following vars must be accessed via the lock instance. */ /** * Thread which is used to process responses from the connection * manager. Becomes null when session is terminated. */ private Thread procThread; /** * Future for sending a deferred empty request, if needed. */ private ScheduledFuture emptyRequestFuture; /** * Connection Manager session parameters. Only available when in a * connected state. */ private CMSessionParams cmParams; /** * List of active/outstanding requests. */ private Queue exchanges = new LinkedList(); /** * Set of RIDs which have been received, for the purpose of sending * response acknowledgements. */ private SortedSet pendingResponseAcks = new TreeSet(); /** * The highest RID that we've already received a response for. This value * is used to implement response acks. */ private Long responseAck = Long.valueOf(-1L); /** * List of requests which have been made but not yet acknowledged. This * list remains unpopulated if the CM is not acking requests. */ private List pendingRequestAcks = new ArrayList(); /////////////////////////////////////////////////////////////////////////// // Classes: /** * Class used in testing to dynamically manipulate received exchanges * at test runtime. */ abstract static class ExchangeInterceptor { /** * Limit construction. */ ExchangeInterceptor() { // Empty; } /** * Hook to manipulate an HTTPExchange as is is about to be processed. * * @param exch original exchange that would be processed * @return replacement exchange instance, or {@code null} to skip * processing of this exchange */ abstract HTTPExchange interceptExchange(final HTTPExchange exch); } /////////////////////////////////////////////////////////////////////////// // Constructors: /** * Determine whether or not we should perform assertions. Assertions * can be specified via system property explicitly, or defaulted to * the JVM assertions status. */ static { final String prop = BOSHClient.class.getSimpleName() + ".assertionsEnabled"; boolean enabled = false; if (System.getProperty(prop) == null) { assert enabled = true; } else { enabled = Boolean.getBoolean(prop); } ASSERTIONS = enabled; } /** * Prevent direct construction. */ private BOSHClient(final BOSHClientConfig sessCfg) { cfg = sessCfg; init(); } /////////////////////////////////////////////////////////////////////////// // Public methods: /** * Create a new BOSH client session using the client configuration * information provided. * * @param clientCfg session configuration * @return BOSH session instance */ public static BOSHClient create(final BOSHClientConfig clientCfg) { if (clientCfg == null) { throw(new IllegalArgumentException( "Client configuration may not be null")); } return new BOSHClient(clientCfg); } /** * Get the client configuration that was used to create this client * instance. * * @return client configuration */ public BOSHClientConfig getBOSHClientConfig() { return cfg; } /** * Adds a connection listener to the session. * * @param listener connection listener to add, if not already added */ public void addBOSHClientConnListener( final BOSHClientConnListener listener) { if (listener == null) { throw(new IllegalArgumentException(NULL_LISTENER)); } connListeners.add(listener); } /** * Removes a connection listener from the session. * * @param listener connection listener to remove, if previously added */ public void removeBOSHClientConnListener( final BOSHClientConnListener listener) { if (listener == null) { throw(new IllegalArgumentException(NULL_LISTENER)); } connListeners.remove(listener); } /** * Adds a request message listener to the session. * * @param listener request listener to add, if not already added */ public void addBOSHClientRequestListener( final BOSHClientRequestListener listener) { if (listener == null) { throw(new IllegalArgumentException(NULL_LISTENER)); } requestListeners.add(listener); } /** * Removes a request message listener from the session, if previously * added. * * @param listener instance to remove */ public void removeBOSHClientRequestListener( final BOSHClientRequestListener listener) { if (listener == null) { throw(new IllegalArgumentException(NULL_LISTENER)); } requestListeners.remove(listener); } /** * Adds a response message listener to the session. * * @param listener response listener to add, if not already added */ public void addBOSHClientResponseListener( final BOSHClientResponseListener listener) { if (listener == null) { throw(new IllegalArgumentException(NULL_LISTENER)); } responseListeners.add(listener); } /** * Removes a response message listener from the session, if previously * added. * * @param listener instance to remove */ public void removeBOSHClientResponseListener( final BOSHClientResponseListener listener) { if (listener == null) { throw(new IllegalArgumentException(NULL_LISTENER)); } responseListeners.remove(listener); } /** * Send the provided message data to the remote connection manager. The * provided message body does not need to have any BOSH-specific attribute * information set. It only needs to contain the actual message payload * that should be delivered to the remote server. *

* The first call to this method will result in a connection attempt * to the remote connection manager. Subsequent calls to this method * will block until the underlying session state allows for the message * to be transmitted. In certain scenarios - such as when the maximum * number of outbound connections has been reached - calls to this method * will block for short periods of time. * * @param body message data to send to remote server * @throws BOSHException on message transmission failure */ public void send(final ComposableBody body) throws BOSHException { assertUnlocked(); if (body == null) { throw(new IllegalArgumentException( "Message body may not be null")); } HTTPExchange exch; CMSessionParams params; lock.lock(); try { blockUntilSendable(body); if (!isWorking() && !isTermination(body)) { throw(new BOSHException( "Cannot send message when session is closed")); } long rid = requestIDSeq.getNextRID(); ComposableBody request = body; params = cmParams; if (params == null && exchanges.isEmpty()) { // This is the first message being sent request = applySessionCreationRequest(rid, body); } else { request = applySessionData(rid, body); if (cmParams.isAckingRequests()) { pendingRequestAcks.add(request); } } exch = new HTTPExchange(request); exchanges.add(exch); notEmpty.signalAll(); clearEmptyRequest(); } finally { lock.unlock(); } AbstractBody finalReq = exch.getRequest(); HTTPResponse resp = httpSender.send(params, finalReq); exch.setHTTPResponse(resp); fireRequestSent(finalReq); } /** * Attempt to pause the current session. When supported by the remote * connection manager, pausing the session will result in the connection * manager closing out all outstanding requests (including the pause * request) and increases the inactivity timeout of the session. The * exact value of the temporary timeout is dependent upon the connection * manager. This method should be used if a client encounters an * exceptional temporary situation during which it will be unable to send * requests to the connection manager for a period of time greater than * the maximum inactivity period. * * The session will revert back to it's normal, unpaused state when the * client sends it's next message. * * @return {@code true} if the connection manager supports session pausing, * {@code false} if the connection manager does not support session * pausing or if the session has not yet been established */ public boolean pause() { assertUnlocked(); lock.lock(); AttrMaxPause maxPause = null; try { if (cmParams == null) { return false; } maxPause = cmParams.getMaxPause(); if (maxPause == null) { return false; } } finally { lock.unlock(); } try { send(ComposableBody.builder() .setAttribute(Attributes.PAUSE, maxPause.toString()) .build()); } catch (BOSHException boshx) { LOG.log(Level.FINEST, "Could not send pause", boshx); } return true; } /** * End the BOSH session by disconnecting from the remote BOSH connection * manager. * * @throws BOSHException when termination message cannot be sent */ public void disconnect() throws BOSHException { disconnect(ComposableBody.builder().build()); } /** * End the BOSH session by disconnecting from the remote BOSH connection * manager, sending the provided content in the final connection * termination message. * * @param msg final message to send * @throws BOSHException when termination message cannot be sent */ public void disconnect(final ComposableBody msg) throws BOSHException { if (msg == null) { throw(new IllegalArgumentException( "Message body may not be null")); } Builder builder = msg.rebuild(); builder.setAttribute(Attributes.TYPE, TERMINATE); send(builder.build()); } /** * Forcibly close this client session instance. The preferred mechanism * to close the connection is to send a disconnect message and wait for * organic termination. Calling this method simply shuts down the local * session without sending a termination message, releasing all resources * associated with the session. */ public void close() { dispose(new BOSHException("Session explicitly closed by caller")); } /////////////////////////////////////////////////////////////////////////// // Package-private methods: /** * Get the current CM session params. * * @return current session params, or {@code null} */ CMSessionParams getCMSessionParams() { lock.lock(); try { return cmParams; } finally { lock.unlock(); } } /** * Wait until no more messages are waiting to be processed. */ void drain() { lock.lock(); try { LOG.finest("Waiting while draining..."); while (isWorking() && (emptyRequestFuture == null || emptyRequestFuture.isDone())) { try { drained.await(); } catch (InterruptedException intx) { LOG.log(Level.FINEST, INTERRUPTED, intx); } } LOG.finest("Drained"); } finally { lock.unlock(); } } /** * Test method used to forcibly discard next exchange. * * @param interceptor exchange interceptor */ void setExchangeInterceptor(final ExchangeInterceptor interceptor) { exchInterceptor.set(interceptor); } /////////////////////////////////////////////////////////////////////////// // Private methods: /** * Initialize the session. This initializes the underlying HTTP * transport implementation and starts the receive thread. */ private void init() { assertUnlocked(); lock.lock(); try { httpSender.init(cfg); procThread = new Thread(procRunnable); procThread.setDaemon(true); procThread.setName(BOSHClient.class.getSimpleName() + "[" + System.identityHashCode(this) + "]: Receive thread"); procThread.start(); } finally { lock.unlock(); } } /** * Destroy this session. * * @param cause the reason for the session termination, or {@code null} * for normal termination */ private void dispose(final Throwable cause) { assertUnlocked(); lock.lock(); try { if (procThread == null) { // Already disposed return; } procThread = null; } finally { lock.unlock(); } if (cause == null) { fireConnectionClosed(); } else { fireConnectionClosedOnError(cause); } lock.lock(); try { clearEmptyRequest(); exchanges = null; cmParams = null; pendingResponseAcks = null; pendingRequestAcks = null; notEmpty.signalAll(); notFull.signalAll(); drained.signalAll(); } finally { lock.unlock(); } httpSender.destroy(); schedExec.shutdownNow(); } /** * Determines if the message body specified indicates a request to * pause the session. * * @param msg message to evaluate * @return {@code true} if the message is a pause request, {@code false} * otherwise */ private static boolean isPause(final AbstractBody msg) { return msg.getAttribute(Attributes.PAUSE) != null; } /** * Determines if the message body specified indicates a termination of * the session. * * @param msg message to evaluate * @return {@code true} if the message is a session termination, * {@code false} otherwise */ private static boolean isTermination(final AbstractBody msg) { return TERMINATE.equals(msg.getAttribute(Attributes.TYPE)); } /** * Evaluates the HTTP response code and response message and returns the * terminal binding condition that it describes, if any. * * @param respCode HTTP response code * @param respBody response body * @return terminal binding condition, or {@code null} if not a terminal * binding condition message */ private TerminalBindingCondition getTerminalBindingCondition( final int respCode, final AbstractBody respBody) { assertLocked(); if (isTermination(respBody)) { String str = respBody.getAttribute(Attributes.CONDITION); return TerminalBindingCondition.forString(str); } // Check for deprecated HTTP Error Conditions if (cmParams != null && cmParams.getVersion() == null) { return TerminalBindingCondition.forHTTPResponseCode(respCode); } return null; } /** * Determines if the message specified is immediately sendable or if it * needs to block until the session state changes. * * @param msg message to evaluate * @return {@code true} if the message can be immediately sent, * {@code false} otherwise */ private boolean isImmediatelySendable(final AbstractBody msg) { assertLocked(); if (cmParams == null) { // block if we're waiting for a response to our first request return exchanges.isEmpty(); } AttrRequests requests = cmParams.getRequests(); if (requests == null) { return true; } int maxRequests = requests.intValue(); if (exchanges.size() < maxRequests) { return true; } if (exchanges.size() == maxRequests && (isTermination(msg) || isPause(msg))) { // One additional terminate or pause message is allowed return true; } return false; } /** * Determines whether or not the session is still active. * * @return {@code true} if it is, {@code false} otherwise */ private boolean isWorking() { assertLocked(); return procThread != null; } /** * Blocks until either the message provided becomes immediately * sendable or until the session is terminated. * * @param msg message to evaluate */ private void blockUntilSendable(final AbstractBody msg) { assertLocked(); while (isWorking() && !isImmediatelySendable(msg)) { try { notFull.await(); } catch (InterruptedException intx) { LOG.log(Level.FINEST, INTERRUPTED, intx); } } } /** * Modifies the specified body message such that it becomes a new * BOSH session creation request. * * @param rid request ID to use * @param orig original body to modify * @return modified message which acts as a session creation request */ private ComposableBody applySessionCreationRequest( final long rid, final ComposableBody orig) throws BOSHException { assertLocked(); Builder builder = orig.rebuild(); builder.setAttribute(Attributes.TO, cfg.getTo()); builder.setAttribute(Attributes.XML_LANG, cfg.getLang()); builder.setAttribute(Attributes.VER, AttrVersion.getSupportedVersion().toString()); builder.setAttribute(Attributes.WAIT, "60"); builder.setAttribute(Attributes.HOLD, "1"); builder.setAttribute(Attributes.RID, Long.toString(rid)); applyRoute(builder); applyFrom(builder); builder.setAttribute(Attributes.ACK, "1"); // Make sure the following are NOT present (i.e., during retries) builder.setAttribute(Attributes.SID, null); return builder.build(); } /** * Applies routing information to the request message who's builder has * been provided. * * @param builder builder instance to add routing information to */ private void applyRoute(final Builder builder) { assertLocked(); String route = cfg.getRoute(); if (route != null) { builder.setAttribute(Attributes.ROUTE, route); } } /** * Applies the local station ID information to the request message who's * builder has been provided. * * @param builder builder instance to add station ID information to */ private void applyFrom(final Builder builder) { assertLocked(); String from = cfg.getFrom(); if (from != null) { builder.setAttribute(Attributes.FROM, from); } } /** * Applies existing session data to the outbound request, returning the * modified request. * * This method assumes the lock is currently held. * * @param rid request ID to use * @param orig original/raw request * @return modified request with session information applied */ private ComposableBody applySessionData( final long rid, final ComposableBody orig) throws BOSHException { assertLocked(); Builder builder = orig.rebuild(); builder.setAttribute(Attributes.SID, cmParams.getSessionID().toString()); builder.setAttribute(Attributes.RID, Long.toString(rid)); applyResponseAcknowledgement(builder, rid); return builder.build(); } /** * Sets the 'ack' attribute of the request to the value of the highest * 'rid' of a request for which it has already received a response in the * case where it has also received all responses associated with lower * 'rid' values. The only exception is that, after its session creation * request, the client SHOULD NOT include an 'ack' attribute in any request * if it has received responses to all its previous requests. * * @param builder message builder * @param rid current request RID */ private void applyResponseAcknowledgement( final Builder builder, final long rid) { assertLocked(); if (responseAck.equals(Long.valueOf(-1L))) { // We have not received any responses yet return; } Long prevRID = Long.valueOf(rid - 1L); if (responseAck.equals(prevRID)) { // Implicit ack return; } builder.setAttribute(Attributes.ACK, responseAck.toString()); } /** * While we are "connected", process received responses. * * This method is run in the processing thread. */ private void processMessages() { LOG.log(Level.FINEST, "Processing thread starting"); try { HTTPExchange exch; do { exch = nextExchange(); if (exch == null) { break; } // Test hook to manipulate what the client sees: ExchangeInterceptor interceptor = exchInterceptor.get(); if (interceptor != null) { HTTPExchange newExch = interceptor.interceptExchange(exch); if (newExch == null) { LOG.log(Level.FINE, "Discarding exchange on request " + "of test hook: RID=" + exch.getRequest().getAttribute( Attributes.RID)); lock.lock(); try { exchanges.remove(exch); } finally { lock.unlock(); } continue; } exch = newExch; } processExchange(exch); } while (true); } finally { LOG.log(Level.FINEST, "Processing thread exiting"); } } /** * Get the next message exchange to process, blocking until one becomes * available if nothing is already waiting for processing. * * @return next available exchange to process, or {@code null} if no * exchanges are immediately available */ private HTTPExchange nextExchange() { assertUnlocked(); final Thread thread = Thread.currentThread(); HTTPExchange exch = null; lock.lock(); try { do { if (!thread.equals(procThread)) { break; } exch = exchanges.peek(); if (exch == null) { try { notEmpty.await(); } catch (InterruptedException intx) { LOG.log(Level.FINEST, INTERRUPTED, intx); } } } while (exch == null); } finally { lock.unlock(); } return exch; } /** * Process the next, provided exchange. This is the main processing * method of the receive thread. * * @param exch message exchange to process */ private void processExchange(final HTTPExchange exch) { assertUnlocked(); HTTPResponse resp; AbstractBody body; int respCode; try { resp = exch.getHTTPResponse(); body = resp.getBody(); respCode = resp.getHTTPStatus(); } catch (BOSHException boshx) { LOG.log(Level.FINEST, "Could not obtain response", boshx); dispose(boshx); return; } catch (InterruptedException intx) { LOG.log(Level.FINEST, INTERRUPTED, intx); dispose(intx); return; } fireResponseReceived(body); // Process the message with the current session state AbstractBody req = exch.getRequest(); CMSessionParams params; List toResend = null; lock.lock(); try { // Check for session creation response info, if needed if (cmParams == null) { cmParams = CMSessionParams.fromSessionInit(req, body); // The following call handles the lock. It's not an escape. fireConnectionEstablished(); } params = cmParams; checkForTerminalBindingConditions(body, respCode); if (isTermination(body)) { // Explicit termination lock.unlock(); dispose(null); return; } if (isRecoverableBindingCondition(body)) { // Retransmit outstanding requests if (toResend == null) { toResend = new ArrayList(exchanges.size()); } for (HTTPExchange exchange : exchanges) { HTTPExchange resendExch = new HTTPExchange(exchange.getRequest()); toResend.add(resendExch); } for (HTTPExchange exchange : toResend) { exchanges.add(exchange); } } else { // Process message as normal processRequestAcknowledgements(req, body); processResponseAcknowledgementData(req); HTTPExchange resendExch = processResponseAcknowledgementReport(body); if (resendExch != null && toResend == null) { toResend = new ArrayList(1); toResend.add(resendExch); exchanges.add(resendExch); } } } catch (BOSHException boshx) { LOG.log(Level.FINEST, "Could not process response", boshx); lock.unlock(); dispose(boshx); return; } finally { if (lock.isHeldByCurrentThread()) { try { exchanges.remove(exch); if (exchanges.isEmpty()) { scheduleEmptyRequest(processPauseRequest(req)); } notFull.signalAll(); } finally { lock.unlock(); } } } if (toResend != null) { for (HTTPExchange resend : toResend) { HTTPResponse response = httpSender.send(params, resend.getRequest()); resend.setHTTPResponse(response); fireRequestSent(resend.getRequest()); } } } /** * Clears any scheduled empty requests. */ private void clearEmptyRequest() { assertLocked(); if (emptyRequestFuture != null) { emptyRequestFuture.cancel(false); emptyRequestFuture = null; } } /** * Calculates the default empty request delay/interval to use for the * active session. * * @return delay in milliseconds */ private long getDefaultEmptyRequestDelay() { assertLocked(); // Figure out how long we should wait before sending an empty request AttrPolling polling = cmParams.getPollingInterval(); long delay; if (polling == null) { delay = EMPTY_REQUEST_DELAY; } else { delay = polling.getInMilliseconds(); } return delay; } /** * Schedule an empty request to be sent if no other requests are * sent in a reasonable amount of time. */ private void scheduleEmptyRequest(long delay) { assertLocked(); if (delay < 0L) { throw(new IllegalArgumentException( "Empty request delay must be >= 0 (was: " + delay + ")")); } clearEmptyRequest(); if (!isWorking()) { return; } // Schedule the transmission if (LOG.isLoggable(Level.FINER)) { LOG.finer("Scheduling empty request in " + delay + "ms"); } try { emptyRequestFuture = schedExec.schedule(emptyRequestRunnable, delay, TimeUnit.MILLISECONDS); } catch (RejectedExecutionException rex) { LOG.log(Level.FINEST, "Could not schedule empty request", rex); } drained.signalAll(); } /** * Sends an empty request to maintain session requirements. If a request * is sent within a reasonable time window, the empty request transmission * will be cancelled. */ private void sendEmptyRequest() { assertUnlocked(); // Send an empty request LOG.finest("Sending empty request"); try { send(ComposableBody.builder().build()); } catch (BOSHException boshx) { dispose(boshx); } } /** * Assert that the internal lock is held. */ private void assertLocked() { if (ASSERTIONS) { if (!lock.isHeldByCurrentThread()) { throw(new AssertionError("Lock is not held by current thread")); } return; } } /** * Assert that the internal lock is *not* held. */ private void assertUnlocked() { if (ASSERTIONS) { if (lock.isHeldByCurrentThread()) { throw(new AssertionError("Lock is held by current thread")); } return; } } /** * Checks to see if the response indicates a terminal binding condition * (as per XEP-0124 section 17). If it does, an exception is thrown. * * @param body response body to evaluate * @param code HTTP response code * @throws BOSHException if a terminal binding condition is detected */ private void checkForTerminalBindingConditions( final AbstractBody body, final int code) throws BOSHException { TerminalBindingCondition cond = getTerminalBindingCondition(code, body); if (cond != null) { throw(new BOSHException( "Terminal binding condition encountered: " + cond.getCondition() + " (" + cond.getMessage() + ")")); } } /** * Determines whether or not the response indicates a recoverable * binding condition (as per XEP-0124 section 17). * * @param resp response body * @return {@code true} if it does, {@code false} otherwise */ private static boolean isRecoverableBindingCondition( final AbstractBody resp) { return ERROR.equals(resp.getAttribute(Attributes.TYPE)); } /** * Process the request to determine if the empty request delay * can be determined by looking to see if the request is a pause * request. If it can, the request's delay is returned, otherwise * the default delay is returned. * * @return delay in milliseconds that should elapse prior to an * empty message being sent */ private long processPauseRequest( final AbstractBody req) { assertLocked(); if (cmParams != null && cmParams.getMaxPause() != null) { try { AttrPause pause = AttrPause.createFromString( req.getAttribute(Attributes.PAUSE)); if (pause != null) { long delay = pause.getInMilliseconds() - PAUSE_MARGIN; if (delay < 0) { delay = EMPTY_REQUEST_DELAY; } return delay; } } catch (BOSHException boshx) { LOG.log(Level.FINEST, "Could not extract", boshx); } } return getDefaultEmptyRequestDelay(); } /** * Check the response for request acknowledgements and take appropriate * action. * * This method assumes the lock is currently held. * * @param req request * @param resp response */ private void processRequestAcknowledgements( final AbstractBody req, final AbstractBody resp) { assertLocked(); if (!cmParams.isAckingRequests()) { return; } // If a report or time attribute is set, we aren't acking anything if (resp.getAttribute(Attributes.REPORT) != null) { return; } // Figure out what the highest acked RID is String acked = resp.getAttribute(Attributes.ACK); Long ackUpTo; if (acked == null) { // Implicit ack of all prior requests up until RID ackUpTo = Long.parseLong(req.getAttribute(Attributes.RID)); } else { ackUpTo = Long.parseLong(acked); } // Remove the acked requests from the list if (LOG.isLoggable(Level.FINEST)) { LOG.finest("Removing pending acks up to: " + ackUpTo); } Iterator iter = pendingRequestAcks.iterator(); while (iter.hasNext()) { AbstractBody pending = iter.next(); Long pendingRID = Long.parseLong( pending.getAttribute(Attributes.RID)); if (pendingRID.compareTo(ackUpTo) <= 0) { iter.remove(); } } } /** * Process the response in order to update the response acknowlegement * data. * * This method assumes the lock is currently held. * * @param req request */ private void processResponseAcknowledgementData( final AbstractBody req) { assertLocked(); Long rid = Long.parseLong(req.getAttribute(Attributes.RID)); if (responseAck.equals(Long.valueOf(-1L))) { // This is the first request responseAck = rid; } else { pendingResponseAcks.add(rid); // Remove up until the first missing response (or end of queue) Long whileVal = responseAck; while (whileVal.equals(pendingResponseAcks.first())) { responseAck = whileVal; pendingResponseAcks.remove(whileVal); whileVal = Long.valueOf(whileVal.longValue() + 1); } } } /** * Process the response in order to check for and respond to any potential * ack reports. * * This method assumes the lock is currently held. * * @param resp response * @return exchange to transmit if a resend is to be performed, or * {@code null} if no resend is necessary * @throws BOSHException when a a retry is needed but cannot be performed */ private HTTPExchange processResponseAcknowledgementReport( final AbstractBody resp) throws BOSHException { assertLocked(); String reportStr = resp.getAttribute(Attributes.REPORT); if (reportStr == null) { // No report on this message return null; } Long report = Long.parseLong(reportStr); Long time = Long.parseLong(resp.getAttribute(Attributes.TIME)); if (LOG.isLoggable(Level.FINE)) { LOG.fine("Received report of missing request (RID=" + report + ", time=" + time + "ms)"); } // Find the missing request Iterator iter = pendingRequestAcks.iterator(); AbstractBody req = null; while (iter.hasNext() && req == null) { AbstractBody pending = iter.next(); Long pendingRID = Long.parseLong( pending.getAttribute(Attributes.RID)); if (report.equals(pendingRID)) { req = pending; } } if (req == null) { throw(new BOSHException("Report of missing message with RID '" + reportStr + "' but local copy of that request was not found")); } // Resend the missing request HTTPExchange exch = new HTTPExchange(req); exchanges.add(exch); notEmpty.signalAll(); return exch; } /** * Notifies all request listeners that the specified request is being * sent. * * @param request request being sent */ private void fireRequestSent(final AbstractBody request) { assertUnlocked(); BOSHMessageEvent event = null; for (BOSHClientRequestListener listener : requestListeners) { if (event == null) { event = BOSHMessageEvent.createRequestSentEvent(this, request); } try { listener.requestSent(event); } catch (Exception ex) { LOG.log(Level.WARNING, UNHANDLED, ex); } } } /** * Notifies all response listeners that the specified response has been * received. * * @param response response received */ private void fireResponseReceived(final AbstractBody response) { assertUnlocked(); BOSHMessageEvent event = null; for (BOSHClientResponseListener listener : responseListeners) { if (event == null) { event = BOSHMessageEvent.createResponseReceivedEvent( this, response); } try { listener.responseReceived(event); } catch (Exception ex) { LOG.log(Level.WARNING, UNHANDLED, ex); } } } /** * Notifies all connection listeners that the session has been successfully * established. */ private void fireConnectionEstablished() { final boolean hadLock = lock.isHeldByCurrentThread(); if (hadLock) { lock.unlock(); } try { BOSHClientConnEvent event = null; for (BOSHClientConnListener listener : connListeners) { if (event == null) { event = BOSHClientConnEvent .createConnectionEstablishedEvent(this); } try { listener.connectionEvent(event); } catch (Exception ex) { LOG.log(Level.WARNING, UNHANDLED, ex); } } } finally { if (hadLock) { lock.lock(); } } } /** * Notifies all connection listeners that the session has been * terminated normally. */ private void fireConnectionClosed() { assertUnlocked(); BOSHClientConnEvent event = null; for (BOSHClientConnListener listener : connListeners) { if (event == null) { event = BOSHClientConnEvent.createConnectionClosedEvent(this); } try { listener.connectionEvent(event); } catch (Exception ex) { LOG.log(Level.WARNING, UNHANDLED, ex); } } } /** * Notifies all connection listeners that the session has been * terminated due to the exceptional condition provided. * * @param cause cause of the termination */ private void fireConnectionClosedOnError( final Throwable cause) { assertUnlocked(); BOSHClientConnEvent event = null; for (BOSHClientConnListener listener : connListeners) { if (event == null) { event = BOSHClientConnEvent .createConnectionClosedOnErrorEvent( this, pendingRequestAcks, cause); } try { listener.connectionEvent(event); } catch (Exception ex) { LOG.log(Level.WARNING, UNHANDLED, ex); } } } }