1/**
2 * $RCSfile$
3 * $Revision$
4 * $Date$
5 *
6 * Copyright 2003-2007 Jive Software.
7 *
8 * All rights reserved. Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
11 *
12 *     http://www.apache.org/licenses/LICENSE-2.0
13 *
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 */
20
21package org.jivesoftware.smack;
22
23import org.jivesoftware.smack.packet.Packet;
24
25import java.io.IOException;
26import java.io.Writer;
27import java.util.concurrent.ArrayBlockingQueue;
28import java.util.concurrent.BlockingQueue;
29
30/**
31 * Writes packets to a XMPP server. Packets are sent using a dedicated thread. Packet
32 * interceptors can be registered to dynamically modify packets before they're actually
33 * sent. Packet listeners can be registered to listen for all outgoing packets.
34 *
35 * @see Connection#addPacketInterceptor
36 * @see Connection#addPacketSendingListener
37 *
38 * @author Matt Tucker
39 */
40class PacketWriter {
41
42    private Thread writerThread;
43    private Thread keepAliveThread;
44    private Writer writer;
45    private XMPPConnection connection;
46    private final BlockingQueue<Packet> queue;
47    volatile boolean done;
48
49    /**
50     * Creates a new packet writer with the specified connection.
51     *
52     * @param connection the connection.
53     */
54    protected PacketWriter(XMPPConnection connection) {
55        this.queue = new ArrayBlockingQueue<Packet>(500, true);
56        this.connection = connection;
57        init();
58    }
59
60    /**
61    * Initializes the writer in order to be used. It is called at the first connection and also
62    * is invoked if the connection is disconnected by an error.
63    */
64    protected void init() {
65        this.writer = connection.writer;
66        done = false;
67
68        writerThread = new Thread() {
69            public void run() {
70                writePackets(this);
71            }
72        };
73        writerThread.setName("Smack Packet Writer (" + connection.connectionCounterValue + ")");
74        writerThread.setDaemon(true);
75    }
76
77    /**
78     * Sends the specified packet to the server.
79     *
80     * @param packet the packet to send.
81     */
82    public void sendPacket(Packet packet) {
83        if (!done) {
84            // Invoke interceptors for the new packet that is about to be sent. Interceptors
85            // may modify the content of the packet.
86            connection.firePacketInterceptors(packet);
87
88            try {
89                queue.put(packet);
90            }
91            catch (InterruptedException ie) {
92                ie.printStackTrace();
93                return;
94            }
95            synchronized (queue) {
96                queue.notifyAll();
97            }
98
99            // Process packet writer listeners. Note that we're using the sending
100            // thread so it's expected that listeners are fast.
101            connection.firePacketSendingListeners(packet);
102        }
103    }
104
105    /**
106     * Starts the packet writer thread and opens a connection to the server. The
107     * packet writer will continue writing packets until {@link #shutdown} or an
108     * error occurs.
109     */
110    public void startup() {
111        writerThread.start();
112    }
113
114    void setWriter(Writer writer) {
115        this.writer = writer;
116    }
117
118    /**
119     * Shuts down the packet writer. Once this method has been called, no further
120     * packets will be written to the server.
121     */
122    public void shutdown() {
123        done = true;
124        synchronized (queue) {
125            queue.notifyAll();
126        }
127        // Interrupt the keep alive thread if one was created
128        if (keepAliveThread != null)
129                keepAliveThread.interrupt();
130    }
131
132    /**
133     * Cleans up all resources used by the packet writer.
134     */
135    void cleanup() {
136        connection.interceptors.clear();
137        connection.sendListeners.clear();
138    }
139
140    /**
141     * Returns the next available packet from the queue for writing.
142     *
143     * @return the next packet for writing.
144     */
145    private Packet nextPacket() {
146        Packet packet = null;
147        // Wait until there's a packet or we're done.
148        while (!done && (packet = queue.poll()) == null) {
149            try {
150                synchronized (queue) {
151                    queue.wait();
152                }
153            }
154            catch (InterruptedException ie) {
155                // Do nothing
156            }
157        }
158        return packet;
159    }
160
161    private void writePackets(Thread thisThread) {
162        try {
163            // Open the stream.
164            openStream();
165            // Write out packets from the queue.
166            while (!done && (writerThread == thisThread)) {
167                Packet packet = nextPacket();
168                if (packet != null) {
169                    writer.write(packet.toXML());
170                    if (queue.isEmpty()) {
171                        writer.flush();
172                    }
173                }
174            }
175            // Flush out the rest of the queue. If the queue is extremely large, it's possible
176            // we won't have time to entirely flush it before the socket is forced closed
177            // by the shutdown process.
178            try {
179                while (!queue.isEmpty()) {
180                    Packet packet = queue.remove();
181                    writer.write(packet.toXML());
182                }
183                writer.flush();
184            }
185            catch (Exception e) {
186                e.printStackTrace();
187            }
188
189            // Delete the queue contents (hopefully nothing is left).
190            queue.clear();
191
192            // Close the stream.
193            try {
194                writer.write("</stream:stream>");
195                writer.flush();
196            }
197            catch (Exception e) {
198                // Do nothing
199            }
200            finally {
201                try {
202                    writer.close();
203                }
204                catch (Exception e) {
205                    // Do nothing
206                }
207            }
208        }
209        catch (IOException ioe) {
210            // The exception can be ignored if the the connection is 'done'
211            // or if the it was caused because the socket got closed
212            if (!(done || connection.isSocketClosed())) {
213                done = true;
214                // packetReader could be set to null by an concurrent disconnect() call.
215                // Therefore Prevent NPE exceptions by checking packetReader.
216                if (connection.packetReader != null) {
217                        connection.notifyConnectionError(ioe);
218                }
219            }
220        }
221    }
222
223    /**
224     * Sends to the server a new stream element. This operation may be requested several times
225     * so we need to encapsulate the logic in one place. This message will be sent while doing
226     * TLS, SASL and resource binding.
227     *
228     * @throws IOException If an error occurs while sending the stanza to the server.
229     */
230    void openStream() throws IOException {
231        StringBuilder stream = new StringBuilder();
232        stream.append("<stream:stream");
233        stream.append(" to=\"").append(connection.getServiceName()).append("\"");
234        stream.append(" xmlns=\"jabber:client\"");
235        stream.append(" xmlns:stream=\"http://etherx.jabber.org/streams\"");
236        stream.append(" version=\"1.0\">");
237        writer.write(stream.toString());
238        writer.flush();
239    }
240}
241