1/*
2 * Copyright 2009 Mike Cumings
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *   http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package com.kenai.jbosh;
18
19import com.kenai.jbosh.ComposableBody.Builder;
20import java.util.ArrayList;
21import java.util.Iterator;
22import java.util.LinkedList;
23import java.util.List;
24import java.util.Queue;
25import java.util.Set;
26import java.util.SortedSet;
27import java.util.TreeSet;
28import java.util.concurrent.CopyOnWriteArraySet;
29import java.util.concurrent.Executors;
30import java.util.concurrent.RejectedExecutionException;
31import java.util.concurrent.ScheduledExecutorService;
32import java.util.concurrent.ScheduledFuture;
33import java.util.concurrent.TimeUnit;
34import java.util.concurrent.atomic.AtomicReference;
35import java.util.concurrent.locks.Condition;
36import java.util.concurrent.locks.ReentrantLock;
37import java.util.logging.Level;
38import java.util.logging.Logger;
39
40/**
41 * BOSH Client session instance.  Each communication session with a remote
42 * connection manager is represented and handled by an instance of this
43 * class.  This is the main entry point for client-side communications.
44 * To create a new session, a client configuration must first be created
45 * and then used to create a client instance:
46 * <pre>
47 * BOSHClientConfig cfg = BOSHClientConfig.Builder.create(
48 *         "http://server:1234/httpbind", "jabber.org")
49 *     .setFrom("user@jabber.org")
50 *     .build();
51 * BOSHClient client = BOSHClient.create(cfg);
52 * </pre>
53 * Additional client configuration options are available.  See the
54 * {@code BOSHClientConfig.Builder} class for more information.
55 * <p/>
56 * Once a {@code BOSHClient} instance has been created, communication with
57 * the remote connection manager can begin.  No attempt will be made to
58 * establish a connection to the connection manager until the first call
59 * is made to the {@code send(ComposableBody)} method.  Note that it is
60 * possible to send an empty body to cause an immediate connection attempt
61 * to the connection manager.  Sending an empty message would look like
62 * the following:
63 * <pre>
64 * client.send(ComposableBody.builder().build());
65 * </pre>
66 * For more information on creating body messages with content, see the
67 * {@code ComposableBody.Builder} class documentation.
68 * <p/>
69 * Once a session has been successfully started, the client instance can be
70 * used to send arbitrary payload data.  All aspects of the BOSH
71 * protocol involving setting and processing attributes in the BOSH
72 * namespace will be handled by the client code transparently and behind the
73 * scenes.  The user of the client instance can therefore concentrate
74 * entirely on the content of the message payload, leaving the semantics of
75 * the BOSH protocol to the client implementation.
76 * <p/>
77 * To be notified of incoming messages from the remote connection manager,
78 * a {@code BOSHClientResponseListener} should be added to the client instance.
79 * All incoming messages will be published to all response listeners as they
80 * arrive and are processed.  As with the transmission of payload data via
81 * the {@code send(ComposableBody)} method, there is no need to worry about
82 * handling of the BOSH attributes, since this is handled behind the scenes.
83 * <p/>
84 * If the connection to the remote connection manager is terminated (either
85 * explicitly or due to a terminal condition of some sort), all connection
86 * listeners will be notified.  After the connection has been closed, the
87 * client instance is considered dead and a new one must be created in order
88 * to resume communications with the remote server.
89 * <p/>
90 * Instances of this class are thread-safe.
91 *
92 * @see BOSHClientConfig.Builder
93 * @see BOSHClientResponseListener
94 * @see BOSHClientConnListener
95 * @see ComposableBody.Builder
96 */
97public final class BOSHClient {
98
99    /**
100     * Logger.
101     */
102    private static final Logger LOG = Logger.getLogger(
103            BOSHClient.class.getName());
104
105    /**
106     * Value of the 'type' attribute used for session termination.
107     */
108    private static final String TERMINATE = "terminate";
109
110    /**
111     * Value of the 'type' attribute used for recoverable errors.
112     */
113    private static final String ERROR = "error";
114
115    /**
116     * Message to use for interrupted exceptions.
117     */
118    private static final String INTERRUPTED = "Interrupted";
119
120    /**
121     * Message used for unhandled exceptions.
122     */
123    private static final String UNHANDLED = "Unhandled Exception";
124
125    /**
126     * Message used whena null listener is detected.
127     */
128    private static final String NULL_LISTENER = "Listener may not b enull";
129
130    /**
131     * Default empty request delay.
132     */
133    private static final int DEFAULT_EMPTY_REQUEST_DELAY = 100;
134
135    /**
136     * Amount of time to wait before sending an empty request, in
137     * milliseconds.
138     */
139    private static final int EMPTY_REQUEST_DELAY = Integer.getInteger(
140            BOSHClient.class.getName() + ".emptyRequestDelay",
141            DEFAULT_EMPTY_REQUEST_DELAY);
142
143    /**
144     * Default value for the pause margin.
145     */
146    private static final int DEFAULT_PAUSE_MARGIN = 500;
147
148    /**
149     * The amount of time in milliseconds which will be reserved as a
150     * safety margin when scheduling empty requests against a maxpause
151     * value.   This should give us enough time to build the message
152     * and transport it to the remote host.
153     */
154    private static final int PAUSE_MARGIN = Integer.getInteger(
155            BOSHClient.class.getName() + ".pauseMargin",
156            DEFAULT_PAUSE_MARGIN);
157
158    /**
159     * Flag indicating whether or not we want to perform assertions.
160     */
161    private static final boolean ASSERTIONS;
162
163    /**
164     * Connection listeners.
165     */
166    private final Set<BOSHClientConnListener> connListeners =
167            new CopyOnWriteArraySet<BOSHClientConnListener>();
168
169    /**
170     * Request listeners.
171     */
172    private final Set<BOSHClientRequestListener> requestListeners =
173            new CopyOnWriteArraySet<BOSHClientRequestListener>();
174
175    /**
176     * Response listeners.
177     */
178    private final Set<BOSHClientResponseListener> responseListeners =
179            new CopyOnWriteArraySet<BOSHClientResponseListener>();
180
181    /**
182     * Lock instance.
183     */
184    private final ReentrantLock lock = new ReentrantLock();
185
186    /**
187     * Condition indicating that there are messages to be exchanged.
188     */
189    private final Condition notEmpty = lock.newCondition();
190
191    /**
192     * Condition indicating that there are available slots for sending
193     * messages.
194     */
195    private final Condition notFull = lock.newCondition();
196
197    /**
198     * Condition indicating that there are no outstanding connections.
199     */
200    private final Condition drained = lock.newCondition();
201
202    /**
203     * Session configuration.
204     */
205    private final BOSHClientConfig cfg;
206
207    /**
208     * Processor thread runnable instance.
209     */
210    private final Runnable procRunnable = new Runnable() {
211        /**
212         * Process incoming messages.
213         */
214        public void run() {
215            processMessages();
216        }
217    };
218
219    /**
220     * Processor thread runnable instance.
221     */
222    private final Runnable emptyRequestRunnable = new Runnable() {
223        /**
224         * Process incoming messages.
225         */
226        public void run() {
227            sendEmptyRequest();
228        }
229    };
230
231    /**
232     * HTTPSender instance.
233     */
234    private final HTTPSender httpSender =
235            new ApacheHTTPSender();
236
237    /**
238     * Storage for test hook implementation.
239     */
240    private final AtomicReference<ExchangeInterceptor> exchInterceptor =
241            new AtomicReference<ExchangeInterceptor>();
242
243    /**
244     * Request ID sequence to use for the session.
245     */
246    private final RequestIDSequence requestIDSeq = new RequestIDSequence();
247
248    /**
249     * ScheduledExcecutor to use for deferred tasks.
250     */
251    private final ScheduledExecutorService schedExec =
252            Executors.newSingleThreadScheduledExecutor();
253
254    /************************************************************
255     * The following vars must be accessed via the lock instance.
256     */
257
258    /**
259     * Thread which is used to process responses from the connection
260     * manager.  Becomes null when session is terminated.
261     */
262    private Thread procThread;
263
264    /**
265     * Future for sending a deferred empty request, if needed.
266     */
267    private ScheduledFuture emptyRequestFuture;
268
269    /**
270     * Connection Manager session parameters.  Only available when in a
271     * connected state.
272     */
273    private CMSessionParams cmParams;
274
275    /**
276     * List of active/outstanding requests.
277     */
278    private Queue<HTTPExchange> exchanges = new LinkedList<HTTPExchange>();
279
280    /**
281     * Set of RIDs which have been received, for the purpose of sending
282     * response acknowledgements.
283     */
284    private SortedSet<Long> pendingResponseAcks = new TreeSet<Long>();
285
286    /**
287     * The highest RID that we've already received a response for.  This value
288     * is used to implement response acks.
289     */
290    private Long responseAck = Long.valueOf(-1L);
291
292    /**
293     * List of requests which have been made but not yet acknowledged.  This
294     * list remains unpopulated if the CM is not acking requests.
295     */
296    private List<ComposableBody> pendingRequestAcks =
297            new ArrayList<ComposableBody>();
298
299    ///////////////////////////////////////////////////////////////////////////
300    // Classes:
301
302    /**
303     * Class used in testing to dynamically manipulate received exchanges
304     * at test runtime.
305     */
306    abstract static class ExchangeInterceptor {
307        /**
308         * Limit construction.
309         */
310        ExchangeInterceptor() {
311            // Empty;
312        }
313
314        /**
315         * Hook to manipulate an HTTPExchange as is is about to be processed.
316         *
317         * @param exch original exchange that would be processed
318         * @return replacement exchange instance, or {@code null} to skip
319         *  processing of this exchange
320         */
321        abstract HTTPExchange interceptExchange(final HTTPExchange exch);
322    }
323
324    ///////////////////////////////////////////////////////////////////////////
325    // Constructors:
326
327    /**
328     * Determine whether or not we should perform assertions.  Assertions
329     * can be specified via system property explicitly, or defaulted to
330     * the JVM assertions status.
331     */
332    static {
333        final String prop =
334                BOSHClient.class.getSimpleName() + ".assertionsEnabled";
335        boolean enabled = false;
336        if (System.getProperty(prop) == null) {
337            assert enabled = true;
338        } else {
339            enabled = Boolean.getBoolean(prop);
340        }
341        ASSERTIONS = enabled;
342    }
343
344    /**
345     * Prevent direct construction.
346     */
347    private BOSHClient(final BOSHClientConfig sessCfg) {
348        cfg = sessCfg;
349        init();
350    }
351
352    ///////////////////////////////////////////////////////////////////////////
353    // Public methods:
354
355    /**
356     * Create a new BOSH client session using the client configuration
357     * information provided.
358     *
359     * @param clientCfg session configuration
360     * @return BOSH session instance
361     */
362    public static BOSHClient create(final BOSHClientConfig clientCfg) {
363        if (clientCfg == null) {
364            throw(new IllegalArgumentException(
365                    "Client configuration may not be null"));
366        }
367        return new BOSHClient(clientCfg);
368    }
369
370    /**
371     * Get the client configuration that was used to create this client
372     * instance.
373     *
374     * @return client configuration
375     */
376    public BOSHClientConfig getBOSHClientConfig() {
377        return cfg;
378    }
379
380    /**
381     * Adds a connection listener to the session.
382     *
383     * @param listener connection listener to add, if not already added
384     */
385    public void addBOSHClientConnListener(
386            final BOSHClientConnListener listener) {
387        if (listener == null) {
388            throw(new IllegalArgumentException(NULL_LISTENER));
389        }
390        connListeners.add(listener);
391    }
392
393    /**
394     * Removes a connection listener from the session.
395     *
396     * @param listener connection listener to remove, if previously added
397     */
398    public void removeBOSHClientConnListener(
399            final BOSHClientConnListener listener) {
400        if (listener == null) {
401            throw(new IllegalArgumentException(NULL_LISTENER));
402        }
403        connListeners.remove(listener);
404    }
405
406    /**
407     * Adds a request message listener to the session.
408     *
409     * @param listener request listener to add, if not already added
410     */
411    public void addBOSHClientRequestListener(
412            final BOSHClientRequestListener listener) {
413        if (listener == null) {
414            throw(new IllegalArgumentException(NULL_LISTENER));
415        }
416        requestListeners.add(listener);
417    }
418
419    /**
420     * Removes a request message listener from the session, if previously
421     * added.
422     *
423     * @param listener instance to remove
424     */
425    public void removeBOSHClientRequestListener(
426            final BOSHClientRequestListener listener) {
427        if (listener == null) {
428            throw(new IllegalArgumentException(NULL_LISTENER));
429        }
430        requestListeners.remove(listener);
431    }
432
433    /**
434     * Adds a response message listener to the session.
435     *
436     * @param listener response listener to add, if not already added
437     */
438    public void addBOSHClientResponseListener(
439            final BOSHClientResponseListener listener) {
440        if (listener == null) {
441            throw(new IllegalArgumentException(NULL_LISTENER));
442        }
443        responseListeners.add(listener);
444    }
445
446    /**
447     * Removes a response message listener from the session, if previously
448     * added.
449     *
450     * @param listener instance to remove
451     */
452    public void removeBOSHClientResponseListener(
453            final BOSHClientResponseListener listener) {
454        if (listener == null) {
455            throw(new IllegalArgumentException(NULL_LISTENER));
456        }
457        responseListeners.remove(listener);
458    }
459
460    /**
461     * Send the provided message data to the remote connection manager.  The
462     * provided message body does not need to have any BOSH-specific attribute
463     * information set.  It only needs to contain the actual message payload
464     * that should be delivered to the remote server.
465     * <p/>
466     * The first call to this method will result in a connection attempt
467     * to the remote connection manager.  Subsequent calls to this method
468     * will block until the underlying session state allows for the message
469     * to be transmitted.  In certain scenarios - such as when the maximum
470     * number of outbound connections has been reached - calls to this method
471     * will block for short periods of time.
472     *
473     * @param body message data to send to remote server
474     * @throws BOSHException on message transmission failure
475     */
476    public void send(final ComposableBody body) throws BOSHException {
477        assertUnlocked();
478        if (body == null) {
479            throw(new IllegalArgumentException(
480                    "Message body may not be null"));
481        }
482
483        HTTPExchange exch;
484        CMSessionParams params;
485        lock.lock();
486        try {
487            blockUntilSendable(body);
488            if (!isWorking() && !isTermination(body)) {
489                throw(new BOSHException(
490                        "Cannot send message when session is closed"));
491            }
492
493            long rid = requestIDSeq.getNextRID();
494            ComposableBody request = body;
495            params = cmParams;
496            if (params == null && exchanges.isEmpty()) {
497                // This is the first message being sent
498                request = applySessionCreationRequest(rid, body);
499            } else {
500                request = applySessionData(rid, body);
501                if (cmParams.isAckingRequests()) {
502                    pendingRequestAcks.add(request);
503                }
504            }
505            exch = new HTTPExchange(request);
506            exchanges.add(exch);
507            notEmpty.signalAll();
508            clearEmptyRequest();
509        } finally {
510            lock.unlock();
511        }
512        AbstractBody finalReq = exch.getRequest();
513        HTTPResponse resp = httpSender.send(params, finalReq);
514        exch.setHTTPResponse(resp);
515        fireRequestSent(finalReq);
516    }
517
518    /**
519     * Attempt to pause the current session.  When supported by the remote
520     * connection manager, pausing the session will result in the connection
521     * manager closing out all outstanding requests (including the pause
522     * request) and increases the inactivity timeout of the session.  The
523     * exact value of the temporary timeout is dependent upon the connection
524     * manager.  This method should be used if a client encounters an
525     * exceptional temporary situation during which it will be unable to send
526     * requests to the connection manager for a period of time greater than
527     * the maximum inactivity period.
528     *
529     * The session will revert back to it's normal, unpaused state when the
530     * client sends it's next message.
531     *
532     * @return {@code true} if the connection manager supports session pausing,
533     *  {@code false} if the connection manager does not support session
534     *  pausing or if the session has not yet been established
535     */
536    public boolean pause() {
537        assertUnlocked();
538        lock.lock();
539        AttrMaxPause maxPause = null;
540        try {
541            if (cmParams == null) {
542                return false;
543            }
544
545            maxPause = cmParams.getMaxPause();
546            if (maxPause == null) {
547                return false;
548            }
549        } finally {
550            lock.unlock();
551        }
552        try {
553            send(ComposableBody.builder()
554                    .setAttribute(Attributes.PAUSE, maxPause.toString())
555                    .build());
556        } catch (BOSHException boshx) {
557            LOG.log(Level.FINEST, "Could not send pause", boshx);
558        }
559        return true;
560    }
561
562    /**
563     * End the BOSH session by disconnecting from the remote BOSH connection
564     * manager.
565     *
566     * @throws BOSHException when termination message cannot be sent
567     */
568    public void disconnect() throws BOSHException {
569        disconnect(ComposableBody.builder().build());
570    }
571
572    /**
573     * End the BOSH session by disconnecting from the remote BOSH connection
574     * manager, sending the provided content in the final connection
575     * termination message.
576     *
577     * @param msg final message to send
578     * @throws BOSHException when termination message cannot be sent
579     */
580    public void disconnect(final ComposableBody msg) throws BOSHException {
581        if (msg == null) {
582            throw(new IllegalArgumentException(
583                    "Message body may not be null"));
584        }
585
586        Builder builder = msg.rebuild();
587        builder.setAttribute(Attributes.TYPE, TERMINATE);
588        send(builder.build());
589    }
590
591    /**
592     * Forcibly close this client session instance.  The preferred mechanism
593     * to close the connection is to send a disconnect message and wait for
594     * organic termination.  Calling this method simply shuts down the local
595     * session without sending a termination message, releasing all resources
596     * associated with the session.
597     */
598    public void close() {
599        dispose(new BOSHException("Session explicitly closed by caller"));
600    }
601
602    ///////////////////////////////////////////////////////////////////////////
603    // Package-private methods:
604
605    /**
606     * Get the current CM session params.
607     *
608     * @return current session params, or {@code null}
609     */
610    CMSessionParams getCMSessionParams() {
611        lock.lock();
612        try {
613            return cmParams;
614        } finally {
615            lock.unlock();
616        }
617    }
618
619    /**
620     * Wait until no more messages are waiting to be processed.
621     */
622    void drain() {
623        lock.lock();
624        try {
625            LOG.finest("Waiting while draining...");
626            while (isWorking()
627                    && (emptyRequestFuture == null
628                    || emptyRequestFuture.isDone())) {
629                try {
630                    drained.await();
631                } catch (InterruptedException intx) {
632                    LOG.log(Level.FINEST, INTERRUPTED, intx);
633                }
634            }
635            LOG.finest("Drained");
636        } finally {
637            lock.unlock();
638        }
639    }
640
641    /**
642     * Test method used to forcibly discard next exchange.
643     *
644     * @param interceptor exchange interceptor
645     */
646    void setExchangeInterceptor(final ExchangeInterceptor interceptor) {
647        exchInterceptor.set(interceptor);
648    }
649
650
651    ///////////////////////////////////////////////////////////////////////////
652    // Private methods:
653
654    /**
655     * Initialize the session.  This initializes the underlying HTTP
656     * transport implementation and starts the receive thread.
657     */
658    private void init() {
659        assertUnlocked();
660
661        lock.lock();
662        try {
663            httpSender.init(cfg);
664            procThread = new Thread(procRunnable);
665            procThread.setDaemon(true);
666            procThread.setName(BOSHClient.class.getSimpleName()
667                    + "[" + System.identityHashCode(this)
668                    + "]: Receive thread");
669            procThread.start();
670        } finally {
671            lock.unlock();
672        }
673    }
674
675    /**
676     * Destroy this session.
677     *
678     * @param cause the reason for the session termination, or {@code null}
679     *  for normal termination
680     */
681    private void dispose(final Throwable cause) {
682        assertUnlocked();
683
684        lock.lock();
685        try {
686            if (procThread == null) {
687                // Already disposed
688                return;
689            }
690            procThread = null;
691        } finally {
692            lock.unlock();
693        }
694
695        if (cause == null) {
696            fireConnectionClosed();
697        } else {
698            fireConnectionClosedOnError(cause);
699        }
700
701        lock.lock();
702        try {
703            clearEmptyRequest();
704            exchanges = null;
705            cmParams = null;
706            pendingResponseAcks = null;
707            pendingRequestAcks = null;
708            notEmpty.signalAll();
709            notFull.signalAll();
710            drained.signalAll();
711        } finally {
712            lock.unlock();
713        }
714
715        httpSender.destroy();
716        schedExec.shutdownNow();
717    }
718
719    /**
720     * Determines if the message body specified indicates a request to
721     * pause the session.
722     *
723     * @param msg message to evaluate
724     * @return {@code true} if the message is a pause request, {@code false}
725     *  otherwise
726     */
727    private static boolean isPause(final AbstractBody msg) {
728        return msg.getAttribute(Attributes.PAUSE) != null;
729    }
730
731    /**
732     * Determines if the message body specified indicates a termination of
733     * the session.
734     *
735     * @param msg message to evaluate
736     * @return {@code true} if the message is a session termination,
737     *  {@code false} otherwise
738     */
739    private static boolean isTermination(final AbstractBody msg) {
740        return TERMINATE.equals(msg.getAttribute(Attributes.TYPE));
741    }
742
743    /**
744     * Evaluates the HTTP response code and response message and returns the
745     * terminal binding condition that it describes, if any.
746     *
747     * @param respCode HTTP response code
748     * @param respBody response body
749     * @return terminal binding condition, or {@code null} if not a terminal
750     *  binding condition message
751     */
752    private TerminalBindingCondition getTerminalBindingCondition(
753            final int respCode,
754            final AbstractBody respBody) {
755        assertLocked();
756
757        if (isTermination(respBody)) {
758            String str = respBody.getAttribute(Attributes.CONDITION);
759            return TerminalBindingCondition.forString(str);
760        }
761        // Check for deprecated HTTP Error Conditions
762        if (cmParams != null && cmParams.getVersion() == null) {
763            return TerminalBindingCondition.forHTTPResponseCode(respCode);
764        }
765        return null;
766    }
767
768    /**
769     * Determines if the message specified is immediately sendable or if it
770     * needs to block until the session state changes.
771     *
772     * @param msg message to evaluate
773     * @return {@code true} if the message can be immediately sent,
774     *  {@code false} otherwise
775     */
776    private boolean isImmediatelySendable(final AbstractBody msg) {
777        assertLocked();
778
779        if (cmParams == null) {
780            // block if we're waiting for a response to our first request
781            return exchanges.isEmpty();
782        }
783
784        AttrRequests requests = cmParams.getRequests();
785        if (requests == null) {
786            return true;
787        }
788        int maxRequests = requests.intValue();
789        if (exchanges.size() < maxRequests) {
790            return true;
791        }
792        if (exchanges.size() == maxRequests
793                && (isTermination(msg) || isPause(msg))) {
794            // One additional terminate or pause message is allowed
795            return true;
796        }
797        return false;
798    }
799
800    /**
801     * Determines whether or not the session is still active.
802     *
803     * @return {@code true} if it is, {@code false} otherwise
804     */
805    private boolean isWorking() {
806        assertLocked();
807
808        return procThread != null;
809    }
810
811    /**
812     * Blocks until either the message provided becomes immediately
813     * sendable or until the session is terminated.
814     *
815     * @param msg message to evaluate
816     */
817    private void blockUntilSendable(final AbstractBody msg) {
818        assertLocked();
819
820        while (isWorking() && !isImmediatelySendable(msg)) {
821            try {
822                notFull.await();
823            } catch (InterruptedException intx) {
824                LOG.log(Level.FINEST, INTERRUPTED, intx);
825            }
826        }
827    }
828
829    /**
830     * Modifies the specified body message such that it becomes a new
831     * BOSH session creation request.
832     *
833     * @param rid request ID to use
834     * @param orig original body to modify
835     * @return modified message which acts as a session creation request
836     */
837    private ComposableBody applySessionCreationRequest(
838            final long rid, final ComposableBody orig) throws BOSHException {
839        assertLocked();
840
841        Builder builder = orig.rebuild();
842        builder.setAttribute(Attributes.TO, cfg.getTo());
843        builder.setAttribute(Attributes.XML_LANG, cfg.getLang());
844        builder.setAttribute(Attributes.VER,
845                AttrVersion.getSupportedVersion().toString());
846        builder.setAttribute(Attributes.WAIT, "60");
847        builder.setAttribute(Attributes.HOLD, "1");
848        builder.setAttribute(Attributes.RID, Long.toString(rid));
849        applyRoute(builder);
850        applyFrom(builder);
851        builder.setAttribute(Attributes.ACK, "1");
852
853        // Make sure the following are NOT present (i.e., during retries)
854        builder.setAttribute(Attributes.SID, null);
855        return builder.build();
856    }
857
858    /**
859     * Applies routing information to the request message who's builder has
860     * been provided.
861     *
862     * @param builder builder instance to add routing information to
863     */
864    private void applyRoute(final Builder builder) {
865        assertLocked();
866
867        String route = cfg.getRoute();
868        if (route != null) {
869            builder.setAttribute(Attributes.ROUTE, route);
870        }
871    }
872
873    /**
874     * Applies the local station ID information to the request message who's
875     * builder has been provided.
876     *
877     * @param builder builder instance to add station ID information to
878     */
879    private void applyFrom(final Builder builder) {
880        assertLocked();
881
882        String from = cfg.getFrom();
883        if (from != null) {
884            builder.setAttribute(Attributes.FROM, from);
885        }
886    }
887
888    /**
889     * Applies existing session data to the outbound request, returning the
890     * modified request.
891     *
892     * This method assumes the lock is currently held.
893     *
894     * @param rid request ID to use
895     * @param orig original/raw request
896     * @return modified request with session information applied
897     */
898    private ComposableBody applySessionData(
899            final long rid,
900            final ComposableBody orig) throws BOSHException {
901        assertLocked();
902
903        Builder builder = orig.rebuild();
904        builder.setAttribute(Attributes.SID,
905                cmParams.getSessionID().toString());
906        builder.setAttribute(Attributes.RID, Long.toString(rid));
907        applyResponseAcknowledgement(builder, rid);
908        return builder.build();
909    }
910
911    /**
912     * Sets the 'ack' attribute of the request to the value of the highest
913     * 'rid' of a request for which it has already received a response in the
914     * case where it has also received all responses associated with lower
915     * 'rid' values.  The only exception is that, after its session creation
916     * request, the client SHOULD NOT include an 'ack' attribute in any request
917     * if it has received responses to all its previous requests.
918     *
919     * @param builder message builder
920     * @param rid current request RID
921     */
922    private void applyResponseAcknowledgement(
923            final Builder builder,
924            final long rid) {
925        assertLocked();
926
927        if (responseAck.equals(Long.valueOf(-1L))) {
928            // We have not received any responses yet
929            return;
930        }
931
932        Long prevRID = Long.valueOf(rid - 1L);
933        if (responseAck.equals(prevRID)) {
934            // Implicit ack
935            return;
936        }
937
938        builder.setAttribute(Attributes.ACK, responseAck.toString());
939    }
940
941    /**
942     * While we are "connected", process received responses.
943     *
944     * This method is run in the processing thread.
945     */
946    private void processMessages() {
947        LOG.log(Level.FINEST, "Processing thread starting");
948        try {
949            HTTPExchange exch;
950            do {
951                exch = nextExchange();
952                if (exch == null) {
953                    break;
954                }
955
956                // Test hook to manipulate what the client sees:
957                ExchangeInterceptor interceptor = exchInterceptor.get();
958                if (interceptor != null) {
959                    HTTPExchange newExch = interceptor.interceptExchange(exch);
960                    if (newExch == null) {
961                        LOG.log(Level.FINE, "Discarding exchange on request "
962                                + "of test hook: RID="
963                                + exch.getRequest().getAttribute(
964                                    Attributes.RID));
965                        lock.lock();
966                        try {
967                            exchanges.remove(exch);
968                        } finally {
969                            lock.unlock();
970                        }
971                        continue;
972                    }
973                    exch = newExch;
974                }
975
976                processExchange(exch);
977            } while (true);
978        } finally {
979            LOG.log(Level.FINEST, "Processing thread exiting");
980        }
981
982    }
983
984    /**
985     * Get the next message exchange to process, blocking until one becomes
986     * available if nothing is already waiting for processing.
987     *
988     * @return next available exchange to process, or {@code null} if no
989     *  exchanges are immediately available
990     */
991    private HTTPExchange nextExchange() {
992        assertUnlocked();
993
994        final Thread thread = Thread.currentThread();
995        HTTPExchange exch = null;
996        lock.lock();
997        try {
998            do {
999                if (!thread.equals(procThread)) {
1000                    break;
1001                }
1002                exch = exchanges.peek();
1003                if (exch == null) {
1004                    try {
1005                        notEmpty.await();
1006                    } catch (InterruptedException intx) {
1007                        LOG.log(Level.FINEST, INTERRUPTED, intx);
1008                    }
1009                }
1010            } while (exch == null);
1011        } finally {
1012            lock.unlock();
1013        }
1014        return exch;
1015    }
1016
1017    /**
1018     * Process the next, provided exchange.  This is the main processing
1019     * method of the receive thread.
1020     *
1021     * @param exch message exchange to process
1022     */
1023    private void processExchange(final HTTPExchange exch) {
1024        assertUnlocked();
1025
1026        HTTPResponse resp;
1027        AbstractBody body;
1028        int respCode;
1029        try {
1030            resp = exch.getHTTPResponse();
1031            body = resp.getBody();
1032            respCode = resp.getHTTPStatus();
1033        } catch (BOSHException boshx) {
1034            LOG.log(Level.FINEST, "Could not obtain response", boshx);
1035            dispose(boshx);
1036            return;
1037        } catch (InterruptedException intx) {
1038            LOG.log(Level.FINEST, INTERRUPTED, intx);
1039            dispose(intx);
1040            return;
1041        }
1042        fireResponseReceived(body);
1043
1044        // Process the message with the current session state
1045        AbstractBody req = exch.getRequest();
1046        CMSessionParams params;
1047        List<HTTPExchange> toResend = null;
1048        lock.lock();
1049        try {
1050            // Check for session creation response info, if needed
1051            if (cmParams == null) {
1052                cmParams = CMSessionParams.fromSessionInit(req, body);
1053
1054                // The following call handles the lock. It's not an escape.
1055                fireConnectionEstablished();
1056            }
1057            params = cmParams;
1058
1059            checkForTerminalBindingConditions(body, respCode);
1060            if (isTermination(body)) {
1061                // Explicit termination
1062                lock.unlock();
1063                dispose(null);
1064                return;
1065            }
1066
1067            if (isRecoverableBindingCondition(body)) {
1068                // Retransmit outstanding requests
1069                if (toResend == null) {
1070                    toResend = new ArrayList<HTTPExchange>(exchanges.size());
1071                }
1072                for (HTTPExchange exchange : exchanges) {
1073                    HTTPExchange resendExch =
1074                            new HTTPExchange(exchange.getRequest());
1075                    toResend.add(resendExch);
1076                }
1077                for (HTTPExchange exchange : toResend) {
1078                    exchanges.add(exchange);
1079                }
1080            } else {
1081                // Process message as normal
1082                processRequestAcknowledgements(req, body);
1083                processResponseAcknowledgementData(req);
1084                HTTPExchange resendExch =
1085                        processResponseAcknowledgementReport(body);
1086                if (resendExch != null && toResend == null) {
1087                    toResend = new ArrayList<HTTPExchange>(1);
1088                    toResend.add(resendExch);
1089                    exchanges.add(resendExch);
1090                }
1091            }
1092        } catch (BOSHException boshx) {
1093            LOG.log(Level.FINEST, "Could not process response", boshx);
1094            lock.unlock();
1095            dispose(boshx);
1096            return;
1097        } finally {
1098            if (lock.isHeldByCurrentThread()) {
1099                try {
1100                    exchanges.remove(exch);
1101                    if (exchanges.isEmpty()) {
1102                        scheduleEmptyRequest(processPauseRequest(req));
1103                    }
1104                    notFull.signalAll();
1105                } finally {
1106                    lock.unlock();
1107                }
1108            }
1109        }
1110
1111        if (toResend != null) {
1112            for (HTTPExchange resend : toResend) {
1113                HTTPResponse response =
1114                        httpSender.send(params, resend.getRequest());
1115                resend.setHTTPResponse(response);
1116                fireRequestSent(resend.getRequest());
1117            }
1118        }
1119    }
1120
1121    /**
1122     * Clears any scheduled empty requests.
1123     */
1124    private void clearEmptyRequest() {
1125        assertLocked();
1126
1127        if (emptyRequestFuture != null) {
1128            emptyRequestFuture.cancel(false);
1129            emptyRequestFuture = null;
1130        }
1131    }
1132
1133    /**
1134     * Calculates the default empty request delay/interval to use for the
1135     * active session.
1136     *
1137     * @return delay in milliseconds
1138     */
1139    private long getDefaultEmptyRequestDelay() {
1140        assertLocked();
1141
1142        // Figure out how long we should wait before sending an empty request
1143        AttrPolling polling = cmParams.getPollingInterval();
1144        long delay;
1145        if (polling == null) {
1146            delay = EMPTY_REQUEST_DELAY;
1147        } else {
1148            delay = polling.getInMilliseconds();
1149        }
1150        return delay;
1151    }
1152
1153    /**
1154     * Schedule an empty request to be sent if no other requests are
1155     * sent in a reasonable amount of time.
1156     */
1157    private void scheduleEmptyRequest(long delay) {
1158        assertLocked();
1159        if (delay < 0L) {
1160            throw(new IllegalArgumentException(
1161                    "Empty request delay must be >= 0 (was: " + delay + ")"));
1162        }
1163
1164        clearEmptyRequest();
1165        if (!isWorking()) {
1166            return;
1167        }
1168
1169        // Schedule the transmission
1170        if (LOG.isLoggable(Level.FINER)) {
1171            LOG.finer("Scheduling empty request in " + delay + "ms");
1172        }
1173        try {
1174            emptyRequestFuture = schedExec.schedule(emptyRequestRunnable,
1175                    delay, TimeUnit.MILLISECONDS);
1176        } catch (RejectedExecutionException rex) {
1177            LOG.log(Level.FINEST, "Could not schedule empty request", rex);
1178        }
1179        drained.signalAll();
1180    }
1181
1182    /**
1183     * Sends an empty request to maintain session requirements.  If a request
1184     * is sent within a reasonable time window, the empty request transmission
1185     * will be cancelled.
1186     */
1187    private void sendEmptyRequest() {
1188        assertUnlocked();
1189        // Send an empty request
1190        LOG.finest("Sending empty request");
1191        try {
1192            send(ComposableBody.builder().build());
1193        } catch (BOSHException boshx) {
1194            dispose(boshx);
1195        }
1196    }
1197
1198    /**
1199     * Assert that the internal lock is held.
1200     */
1201    private void assertLocked() {
1202        if (ASSERTIONS) {
1203            if (!lock.isHeldByCurrentThread()) {
1204                throw(new AssertionError("Lock is not held by current thread"));
1205            }
1206            return;
1207        }
1208    }
1209
1210    /**
1211     * Assert that the internal lock is *not* held.
1212     */
1213    private void assertUnlocked() {
1214        if (ASSERTIONS) {
1215            if (lock.isHeldByCurrentThread()) {
1216                throw(new AssertionError("Lock is held by current thread"));
1217            }
1218            return;
1219        }
1220    }
1221
1222    /**
1223     * Checks to see if the response indicates a terminal binding condition
1224     * (as per XEP-0124 section 17).  If it does, an exception is thrown.
1225     *
1226     * @param body response body to evaluate
1227     * @param code HTTP response code
1228     * @throws BOSHException if a terminal binding condition is detected
1229     */
1230    private void checkForTerminalBindingConditions(
1231            final AbstractBody body,
1232            final int code)
1233            throws BOSHException {
1234        TerminalBindingCondition cond =
1235                getTerminalBindingCondition(code, body);
1236        if (cond != null) {
1237            throw(new BOSHException(
1238                    "Terminal binding condition encountered: "
1239                    + cond.getCondition() + "  ("
1240                    + cond.getMessage() + ")"));
1241        }
1242    }
1243
1244    /**
1245     * Determines whether or not the response indicates a recoverable
1246     * binding condition (as per XEP-0124 section 17).
1247     *
1248     * @param resp response body
1249     * @return {@code true} if it does, {@code false} otherwise
1250     */
1251    private static boolean isRecoverableBindingCondition(
1252            final AbstractBody resp) {
1253        return ERROR.equals(resp.getAttribute(Attributes.TYPE));
1254    }
1255
1256    /**
1257     * Process the request to determine if the empty request delay
1258     * can be determined by looking to see if the request is a pause
1259     * request.  If it can, the request's delay is returned, otherwise
1260     * the default delay is returned.
1261     *
1262     * @return delay in milliseconds that should elapse prior to an
1263     *  empty message being sent
1264     */
1265    private long processPauseRequest(
1266            final AbstractBody req) {
1267        assertLocked();
1268
1269        if (cmParams != null && cmParams.getMaxPause() != null) {
1270            try {
1271                AttrPause pause = AttrPause.createFromString(
1272                        req.getAttribute(Attributes.PAUSE));
1273                if (pause != null) {
1274                    long delay = pause.getInMilliseconds() - PAUSE_MARGIN;
1275                    if (delay < 0) {
1276                        delay = EMPTY_REQUEST_DELAY;
1277                    }
1278                    return delay;
1279                }
1280            } catch (BOSHException boshx) {
1281                LOG.log(Level.FINEST, "Could not extract", boshx);
1282            }
1283        }
1284
1285        return getDefaultEmptyRequestDelay();
1286    }
1287
1288    /**
1289     * Check the response for request acknowledgements and take appropriate
1290     * action.
1291     *
1292     * This method assumes the lock is currently held.
1293     *
1294     * @param req request
1295     * @param resp response
1296     */
1297    private void processRequestAcknowledgements(
1298            final AbstractBody req, final AbstractBody resp) {
1299        assertLocked();
1300
1301        if (!cmParams.isAckingRequests()) {
1302            return;
1303        }
1304
1305        // If a report or time attribute is set, we aren't acking anything
1306        if (resp.getAttribute(Attributes.REPORT) != null) {
1307            return;
1308        }
1309
1310        // Figure out what the highest acked RID is
1311        String acked = resp.getAttribute(Attributes.ACK);
1312        Long ackUpTo;
1313        if (acked == null) {
1314            // Implicit ack of all prior requests up until RID
1315            ackUpTo = Long.parseLong(req.getAttribute(Attributes.RID));
1316        } else {
1317            ackUpTo = Long.parseLong(acked);
1318        }
1319
1320        // Remove the acked requests from the list
1321        if (LOG.isLoggable(Level.FINEST)) {
1322            LOG.finest("Removing pending acks up to: " + ackUpTo);
1323        }
1324        Iterator<ComposableBody> iter = pendingRequestAcks.iterator();
1325        while (iter.hasNext()) {
1326            AbstractBody pending = iter.next();
1327            Long pendingRID = Long.parseLong(
1328                    pending.getAttribute(Attributes.RID));
1329            if (pendingRID.compareTo(ackUpTo) <= 0) {
1330                iter.remove();
1331            }
1332        }
1333    }
1334
1335    /**
1336     * Process the response in order to update the response acknowlegement
1337     * data.
1338     *
1339     * This method assumes the lock is currently held.
1340     *
1341     * @param req request
1342     */
1343    private void processResponseAcknowledgementData(
1344            final AbstractBody req) {
1345        assertLocked();
1346
1347        Long rid = Long.parseLong(req.getAttribute(Attributes.RID));
1348        if (responseAck.equals(Long.valueOf(-1L))) {
1349            // This is the first request
1350            responseAck = rid;
1351        } else {
1352            pendingResponseAcks.add(rid);
1353            // Remove up until the first missing response (or end of queue)
1354            Long whileVal = responseAck;
1355            while (whileVal.equals(pendingResponseAcks.first())) {
1356                responseAck = whileVal;
1357                pendingResponseAcks.remove(whileVal);
1358                whileVal = Long.valueOf(whileVal.longValue() + 1);
1359            }
1360        }
1361    }
1362
1363    /**
1364     * Process the response in order to check for and respond to any potential
1365     * ack reports.
1366     *
1367     * This method assumes the lock is currently held.
1368     *
1369     * @param resp response
1370     * @return exchange to transmit if a resend is to be performed, or
1371     *  {@code null} if no resend is necessary
1372     * @throws BOSHException when a a retry is needed but cannot be performed
1373     */
1374    private HTTPExchange processResponseAcknowledgementReport(
1375            final AbstractBody resp)
1376            throws BOSHException {
1377        assertLocked();
1378
1379        String reportStr = resp.getAttribute(Attributes.REPORT);
1380        if (reportStr == null) {
1381            // No report on this message
1382            return null;
1383        }
1384
1385        Long report = Long.parseLong(reportStr);
1386        Long time = Long.parseLong(resp.getAttribute(Attributes.TIME));
1387        if (LOG.isLoggable(Level.FINE)) {
1388            LOG.fine("Received report of missing request (RID="
1389                    + report + ", time=" + time + "ms)");
1390        }
1391
1392        // Find the missing request
1393        Iterator<ComposableBody> iter = pendingRequestAcks.iterator();
1394        AbstractBody req = null;
1395        while (iter.hasNext() && req == null) {
1396            AbstractBody pending = iter.next();
1397            Long pendingRID = Long.parseLong(
1398                    pending.getAttribute(Attributes.RID));
1399            if (report.equals(pendingRID)) {
1400                req = pending;
1401            }
1402        }
1403
1404        if (req == null) {
1405            throw(new BOSHException("Report of missing message with RID '"
1406                    + reportStr
1407                    + "' but local copy of that request was not found"));
1408        }
1409
1410        // Resend the missing request
1411        HTTPExchange exch = new HTTPExchange(req);
1412        exchanges.add(exch);
1413        notEmpty.signalAll();
1414        return exch;
1415    }
1416
1417    /**
1418     * Notifies all request listeners that the specified request is being
1419     * sent.
1420     *
1421     * @param request request being sent
1422     */
1423    private void fireRequestSent(final AbstractBody request) {
1424        assertUnlocked();
1425
1426        BOSHMessageEvent event = null;
1427        for (BOSHClientRequestListener listener : requestListeners) {
1428            if (event == null) {
1429                event = BOSHMessageEvent.createRequestSentEvent(this, request);
1430            }
1431            try {
1432                listener.requestSent(event);
1433            } catch (Exception ex) {
1434                LOG.log(Level.WARNING, UNHANDLED, ex);
1435            }
1436        }
1437    }
1438
1439    /**
1440     * Notifies all response listeners that the specified response has been
1441     * received.
1442     *
1443     * @param response response received
1444     */
1445    private void fireResponseReceived(final AbstractBody response) {
1446        assertUnlocked();
1447
1448        BOSHMessageEvent event = null;
1449        for (BOSHClientResponseListener listener : responseListeners) {
1450            if (event == null) {
1451                event = BOSHMessageEvent.createResponseReceivedEvent(
1452                        this, response);
1453            }
1454            try {
1455                listener.responseReceived(event);
1456            } catch (Exception ex) {
1457                LOG.log(Level.WARNING, UNHANDLED, ex);
1458            }
1459        }
1460    }
1461
1462    /**
1463     * Notifies all connection listeners that the session has been successfully
1464     * established.
1465     */
1466    private void fireConnectionEstablished() {
1467        final boolean hadLock = lock.isHeldByCurrentThread();
1468        if (hadLock) {
1469            lock.unlock();
1470        }
1471        try {
1472            BOSHClientConnEvent event = null;
1473            for (BOSHClientConnListener listener : connListeners) {
1474                if (event == null) {
1475                    event = BOSHClientConnEvent
1476                            .createConnectionEstablishedEvent(this);
1477                }
1478                try {
1479                    listener.connectionEvent(event);
1480                } catch (Exception ex) {
1481                    LOG.log(Level.WARNING, UNHANDLED, ex);
1482                }
1483            }
1484        } finally {
1485            if (hadLock) {
1486                lock.lock();
1487            }
1488        }
1489    }
1490
1491    /**
1492     * Notifies all connection listeners that the session has been
1493     * terminated normally.
1494     */
1495    private void fireConnectionClosed() {
1496        assertUnlocked();
1497
1498        BOSHClientConnEvent event = null;
1499        for (BOSHClientConnListener listener : connListeners) {
1500            if (event == null) {
1501                event = BOSHClientConnEvent.createConnectionClosedEvent(this);
1502            }
1503            try {
1504                listener.connectionEvent(event);
1505            } catch (Exception ex) {
1506                LOG.log(Level.WARNING, UNHANDLED, ex);
1507            }
1508        }
1509    }
1510
1511    /**
1512     * Notifies all connection listeners that the session has been
1513     * terminated due to the exceptional condition provided.
1514     *
1515     * @param cause cause of the termination
1516     */
1517    private void fireConnectionClosedOnError(
1518            final Throwable cause) {
1519        assertUnlocked();
1520
1521        BOSHClientConnEvent event = null;
1522        for (BOSHClientConnListener listener : connListeners) {
1523            if (event == null) {
1524                event = BOSHClientConnEvent
1525                        .createConnectionClosedOnErrorEvent(
1526                        this, pendingRequestAcks, cause);
1527            }
1528            try {
1529                listener.connectionEvent(event);
1530            } catch (Exception ex) {
1531                LOG.log(Level.WARNING, UNHANDLED, ex);
1532            }
1533        }
1534    }
1535
1536}
1537