1/** 2 * $RCSfile$ 3 * $Revision$ 4 * $Date$ 5 * 6 * Copyright 2003-2007 Jive Software. 7 * 8 * All rights reserved. Licensed under the Apache License, Version 2.0 (the "License"); 9 * you may not use this file except in compliance with the License. 10 * You may obtain a copy of the License at 11 * 12 * http://www.apache.org/licenses/LICENSE-2.0 13 * 14 * Unless required by applicable law or agreed to in writing, software 15 * distributed under the License is distributed on an "AS IS" BASIS, 16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 17 * See the License for the specific language governing permissions and 18 * limitations under the License. 19 */ 20 21package org.jivesoftware.smack; 22 23import org.jivesoftware.smack.packet.Packet; 24 25import java.io.IOException; 26import java.io.Writer; 27import java.util.concurrent.ArrayBlockingQueue; 28import java.util.concurrent.BlockingQueue; 29 30/** 31 * Writes packets to a XMPP server. Packets are sent using a dedicated thread. Packet 32 * interceptors can be registered to dynamically modify packets before they're actually 33 * sent. Packet listeners can be registered to listen for all outgoing packets. 34 * 35 * @see Connection#addPacketInterceptor 36 * @see Connection#addPacketSendingListener 37 * 38 * @author Matt Tucker 39 */ 40class PacketWriter { 41 42 private Thread writerThread; 43 private Thread keepAliveThread; 44 private Writer writer; 45 private XMPPConnection connection; 46 private final BlockingQueue<Packet> queue; 47 volatile boolean done; 48 49 /** 50 * Creates a new packet writer with the specified connection. 51 * 52 * @param connection the connection. 53 */ 54 protected PacketWriter(XMPPConnection connection) { 55 this.queue = new ArrayBlockingQueue<Packet>(500, true); 56 this.connection = connection; 57 init(); 58 } 59 60 /** 61 * Initializes the writer in order to be used. It is called at the first connection and also 62 * is invoked if the connection is disconnected by an error. 63 */ 64 protected void init() { 65 this.writer = connection.writer; 66 done = false; 67 68 writerThread = new Thread() { 69 public void run() { 70 writePackets(this); 71 } 72 }; 73 writerThread.setName("Smack Packet Writer (" + connection.connectionCounterValue + ")"); 74 writerThread.setDaemon(true); 75 } 76 77 /** 78 * Sends the specified packet to the server. 79 * 80 * @param packet the packet to send. 81 */ 82 public void sendPacket(Packet packet) { 83 if (!done) { 84 // Invoke interceptors for the new packet that is about to be sent. Interceptors 85 // may modify the content of the packet. 86 connection.firePacketInterceptors(packet); 87 88 try { 89 queue.put(packet); 90 } 91 catch (InterruptedException ie) { 92 ie.printStackTrace(); 93 return; 94 } 95 synchronized (queue) { 96 queue.notifyAll(); 97 } 98 99 // Process packet writer listeners. Note that we're using the sending 100 // thread so it's expected that listeners are fast. 101 connection.firePacketSendingListeners(packet); 102 } 103 } 104 105 /** 106 * Starts the packet writer thread and opens a connection to the server. The 107 * packet writer will continue writing packets until {@link #shutdown} or an 108 * error occurs. 109 */ 110 public void startup() { 111 writerThread.start(); 112 } 113 114 void setWriter(Writer writer) { 115 this.writer = writer; 116 } 117 118 /** 119 * Shuts down the packet writer. Once this method has been called, no further 120 * packets will be written to the server. 121 */ 122 public void shutdown() { 123 done = true; 124 synchronized (queue) { 125 queue.notifyAll(); 126 } 127 // Interrupt the keep alive thread if one was created 128 if (keepAliveThread != null) 129 keepAliveThread.interrupt(); 130 } 131 132 /** 133 * Cleans up all resources used by the packet writer. 134 */ 135 void cleanup() { 136 connection.interceptors.clear(); 137 connection.sendListeners.clear(); 138 } 139 140 /** 141 * Returns the next available packet from the queue for writing. 142 * 143 * @return the next packet for writing. 144 */ 145 private Packet nextPacket() { 146 Packet packet = null; 147 // Wait until there's a packet or we're done. 148 while (!done && (packet = queue.poll()) == null) { 149 try { 150 synchronized (queue) { 151 queue.wait(); 152 } 153 } 154 catch (InterruptedException ie) { 155 // Do nothing 156 } 157 } 158 return packet; 159 } 160 161 private void writePackets(Thread thisThread) { 162 try { 163 // Open the stream. 164 openStream(); 165 // Write out packets from the queue. 166 while (!done && (writerThread == thisThread)) { 167 Packet packet = nextPacket(); 168 if (packet != null) { 169 writer.write(packet.toXML()); 170 if (queue.isEmpty()) { 171 writer.flush(); 172 } 173 } 174 } 175 // Flush out the rest of the queue. If the queue is extremely large, it's possible 176 // we won't have time to entirely flush it before the socket is forced closed 177 // by the shutdown process. 178 try { 179 while (!queue.isEmpty()) { 180 Packet packet = queue.remove(); 181 writer.write(packet.toXML()); 182 } 183 writer.flush(); 184 } 185 catch (Exception e) { 186 e.printStackTrace(); 187 } 188 189 // Delete the queue contents (hopefully nothing is left). 190 queue.clear(); 191 192 // Close the stream. 193 try { 194 writer.write("</stream:stream>"); 195 writer.flush(); 196 } 197 catch (Exception e) { 198 // Do nothing 199 } 200 finally { 201 try { 202 writer.close(); 203 } 204 catch (Exception e) { 205 // Do nothing 206 } 207 } 208 } 209 catch (IOException ioe) { 210 // The exception can be ignored if the the connection is 'done' 211 // or if the it was caused because the socket got closed 212 if (!(done || connection.isSocketClosed())) { 213 done = true; 214 // packetReader could be set to null by an concurrent disconnect() call. 215 // Therefore Prevent NPE exceptions by checking packetReader. 216 if (connection.packetReader != null) { 217 connection.notifyConnectionError(ioe); 218 } 219 } 220 } 221 } 222 223 /** 224 * Sends to the server a new stream element. This operation may be requested several times 225 * so we need to encapsulate the logic in one place. This message will be sent while doing 226 * TLS, SASL and resource binding. 227 * 228 * @throws IOException If an error occurs while sending the stanza to the server. 229 */ 230 void openStream() throws IOException { 231 StringBuilder stream = new StringBuilder(); 232 stream.append("<stream:stream"); 233 stream.append(" to=\"").append(connection.getServiceName()).append("\""); 234 stream.append(" xmlns=\"jabber:client\""); 235 stream.append(" xmlns:stream=\"http://etherx.jabber.org/streams\""); 236 stream.append(" version=\"1.0\">"); 237 writer.write(stream.toString()); 238 writer.flush(); 239 } 240} 241