1/*
2* Conditions Of Use
3*
4* This software was developed by employees of the National Institute of
5* Standards and Technology (NIST), an agency of the Federal Government.
6* Pursuant to title 15 Untied States Code Section 105, works of NIST
7* employees are not subject to copyright protection in the United States
8* and are considered to be in the public domain.  As a result, a formal
9* license is not needed to use the software.
10*
11* This software is provided by NIST as a service and is expressly
12* provided "AS IS."  NIST MAKES NO WARRANTY OF ANY KIND, EXPRESS, IMPLIED
13* OR STATUTORY, INCLUDING, WITHOUT LIMITATION, THE IMPLIED WARRANTY OF
14* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT
15* AND DATA ACCURACY.  NIST does not warrant or make any representations
16* regarding the use of the software or the results thereof, including but
17* not limited to the correctness, accuracy, reliability or usefulness of
18* the software.
19*
20* Permission to use this software is contingent upon your acceptance
21* of the terms of this agreement
22*
23* .
24*
25*/
26/*******************************************************************************
27 *   Product of NIST/ITL Advanced Networking Technologies Division (ANTD).     *
28 *******************************************************************************/
29package gov.nist.javax.sip.stack;
30
31import java.io.IOException;
32import java.util.LinkedList;
33import java.net.*;
34
35import gov.nist.core.*;
36
37/**
38 * Sit in a loop and handle incoming udp datagram messages. For each Datagram
39 * packet, a new UDPMessageChannel is created (upto the max thread pool size).
40 * Each UDP message is processed in its own thread).
41 *
42 * @version 1.2 $Revision: 1.37 $ $Date: 2009/11/14 20:06:16 $
43 *
44 * @author M. Ranganathan  <br/>
45 *
46 *
47 *
48 * <a href="{@docRoot}/../uml/udp-request-processing-sequence-diagram.jpg">
49 * See the implementation sequence diagram for processing incoming requests.
50 * </a>
51 *
52 *
53 * Acknowledgement: Jeff Keyser contributed ideas on starting and stoppping the
54 * stack that were incorporated into this code. Niklas Uhrberg suggested that
55 * thread pooling be added to limit the number of threads and improve
56 * performance.
57 */
58public class UDPMessageProcessor extends MessageProcessor {
59    /**
60     * The Mapped port (in case STUN suport is enabled)
61     */
62    private int port;
63
64    /**
65     * Incoming messages are queued here.
66     */
67    protected LinkedList messageQueue;
68
69    /**
70     * A list of message channels that we have started.
71     */
72    protected LinkedList messageChannels;
73
74    /**
75     * Max # of udp message channels
76     */
77    protected int threadPoolSize;
78
79    protected DatagramSocket sock;
80
81    /**
82     * A flag that is set to false to exit the message processor (suggestion by
83     * Jeff Keyser).
84     */
85    protected boolean isRunning;
86
87    private static final int HIGHWAT=5000;
88
89    private static final int LOWAT=2500;
90
91    /**
92     * Constructor.
93     *
94     * @param sipStack
95     *            pointer to the stack.
96     */
97    protected UDPMessageProcessor(InetAddress ipAddress,
98            SIPTransactionStack sipStack, int port) throws IOException {
99        super(ipAddress, port, "udp",sipStack);
100
101        this.sipStack = sipStack;
102
103        this.messageQueue = new LinkedList();
104
105        this.port = port;
106        try {
107            this.sock = sipStack.getNetworkLayer().createDatagramSocket(port,
108                    ipAddress);
109            // Create a new datagram socket.
110            sock.setReceiveBufferSize(sipStack.getReceiveUdpBufferSize());
111            sock.setSendBufferSize(sipStack.getSendUdpBufferSize());
112
113            /**
114             * If the thread auditor is enabled, define a socket timeout value in order to
115             * prevent sock.receive() from blocking forever
116             */
117            if (sipStack.getThreadAuditor().isEnabled()) {
118                sock.setSoTimeout((int) sipStack.getThreadAuditor().getPingIntervalInMillisecs());
119            }
120            if ( ipAddress.getHostAddress().equals(IN_ADDR_ANY)  ||
121                 ipAddress.getHostAddress().equals(IN6_ADDR_ANY)){
122                // Store the address to which we are actually bound
123                // Note that on WINDOWS this is actually broken. It will
124                // return IN_ADDR_ANY again. On linux it will return the
125                // address to which the socket was actually bound.
126                super.setIpAddress( sock.getLocalAddress() );
127
128            }
129        } catch (SocketException ex) {
130            throw new IOException(ex.getMessage());
131        }
132    }
133
134
135
136    /**
137     * Get port on which to listen for incoming stuff.
138     *
139     * @return port on which I am listening.
140     */
141    public int getPort() {
142        return this.port;
143    }
144
145    /**
146     * Start our processor thread.
147     */
148    public void start() throws IOException {
149
150
151        this.isRunning = true;
152        Thread thread = new Thread(this);
153        thread.setDaemon(true);
154        // Issue #32 on java.net
155        thread.setName("UDPMessageProcessorThread");
156        // Issue #184
157        thread.setPriority(Thread.MAX_PRIORITY);
158        thread.start();
159    }
160
161    /**
162     * Thread main routine.
163     */
164    public void run() {
165        // Check for running flag.
166        this.messageChannels = new LinkedList();
167        // start all our messageChannels (unless the thread pool size is
168        // infinity.
169        if (sipStack.threadPoolSize != -1) {
170            for (int i = 0; i < sipStack.threadPoolSize; i++) {
171                UDPMessageChannel channel = new UDPMessageChannel(sipStack,
172                        this);
173                this.messageChannels.add(channel);
174
175            }
176        }
177
178        // Ask the auditor to monitor this thread
179        ThreadAuditor.ThreadHandle threadHandle = sipStack.getThreadAuditor().addCurrentThread();
180
181        // Somebody asked us to exit. if isRunnning is set to false.
182        while (this.isRunning) {
183
184            try {
185                // Let the thread auditor know we're up and running
186                threadHandle.ping();
187
188                int bufsize = sock.getReceiveBufferSize();
189                byte message[] = new byte[bufsize];
190                DatagramPacket packet = new DatagramPacket(message, bufsize);
191                sock.receive(packet);
192
193
194
195             // This is a simplistic congestion control algorithm.
196             // It accepts packets if queuesize is < LOWAT. It drops
197             // requests if the queue size exceeds a HIGHWAT and accepts
198             // requests with probability p proportional to the difference
199             // between current queue size and LOWAT in the range
200             // of queue sizes between HIGHWAT and LOWAT.
201             // TODO -- penalize spammers by looking at the source
202             // port and IP address.
203             if ( sipStack.stackDoesCongestionControl ) {
204             if ( this.messageQueue.size() >= HIGHWAT) {
205                    if (sipStack.isLoggingEnabled()) {
206                        sipStack.getStackLogger().logDebug("Dropping message -- queue length exceeded");
207
208                    }
209                    //System.out.println("HIGHWAT Drop!");
210                    continue;
211                } else if ( this.messageQueue.size() > LOWAT && this .messageQueue.size() < HIGHWAT ) {
212                    // Drop the message with a probabilty that is linear in the range 0 to 1
213                    float threshold = ((float)(messageQueue.size() - LOWAT))/ ((float)(HIGHWAT - LOWAT));
214                    boolean decision = Math.random() > 1.0 - threshold;
215                    if ( decision ) {
216                        if (sipStack.isLoggingEnabled()) {
217                            sipStack.getStackLogger().logDebug("Dropping message with probability  " + (1.0 - threshold));
218
219                        }
220                        //System.out.println("RED Drop!");
221                        continue;
222                    }
223
224                }
225             }
226
227
228
229                // Count of # of packets in process.
230                // this.useCount++;
231                if (sipStack.threadPoolSize != -1) {
232                    // Note: the only condition watched for by threads
233                    // synchronizing on the messageQueue member is that it is
234                    // not empty. As soon as you introduce some other
235                    // condition you will have to call notifyAll instead of
236                    // notify below.
237
238                    synchronized (this.messageQueue) {
239                        // was addLast
240                        this.messageQueue.add(packet);
241                        this.messageQueue.notify();
242                    }
243                } else {
244                    new UDPMessageChannel(sipStack, this, packet);
245                }
246            } catch (SocketTimeoutException ex) {
247              // This socket timeout alows us to ping the thread auditor periodically
248            } catch (SocketException ex) {
249                if (sipStack.isLoggingEnabled())
250                    getSIPStack().getStackLogger()
251                            .logDebug("UDPMessageProcessor: Stopping");
252                isRunning = false;
253                // The notifyAll should be in a synchronized block.
254                // ( bug report by Niklas Uhrberg ).
255                synchronized (this.messageQueue) {
256                    this.messageQueue.notifyAll();
257                }
258            } catch (IOException ex) {
259                isRunning = false;
260                ex.printStackTrace();
261                if (sipStack.isLoggingEnabled())
262                    getSIPStack().getStackLogger()
263                            .logDebug("UDPMessageProcessor: Got an IO Exception");
264            } catch (Exception ex) {
265                if (sipStack.isLoggingEnabled())
266                    getSIPStack().getStackLogger()
267                            .logDebug("UDPMessageProcessor: Unexpected Exception - quitting");
268                InternalErrorHandler.handleException(ex);
269                return;
270            }
271        }
272    }
273
274    /**
275     * Shut down the message processor. Close the socket for recieving incoming
276     * messages.
277     */
278    public void stop() {
279        synchronized (this.messageQueue) {
280            this.isRunning = false;
281            this.messageQueue.notifyAll();
282            sock.close();
283
284
285        }
286    }
287
288    /**
289     * Return the transport string.
290     *
291     * @return the transport string
292     */
293    public String getTransport() {
294        return "udp";
295    }
296
297    /**
298     * Returns the stack.
299     *
300     * @return my sip stack.
301     */
302    public SIPTransactionStack getSIPStack() {
303        return sipStack;
304    }
305
306    /**
307     * Create and return new TCPMessageChannel for the given host/port.
308     */
309    public MessageChannel createMessageChannel(HostPort targetHostPort)
310            throws UnknownHostException {
311        return new UDPMessageChannel(targetHostPort.getInetAddress(),
312                targetHostPort.getPort(), sipStack, this);
313    }
314
315    public MessageChannel createMessageChannel(InetAddress host, int port)
316            throws IOException {
317        return new UDPMessageChannel(host, port, sipStack, this);
318    }
319
320    /**
321     * Default target port for UDP
322     */
323    public int getDefaultTargetPort() {
324        return 5060;
325    }
326
327    /**
328     * UDP is not a secure protocol.
329     */
330    public boolean isSecure() {
331        return false;
332    }
333
334    /**
335     * UDP can handle a message as large as the MAX_DATAGRAM_SIZE.
336     */
337    public int getMaximumMessageSize() {
338        return 8*1024;
339    }
340
341    /**
342     * Return true if there are any messages in use.
343     */
344    public boolean inUse() {
345        synchronized (messageQueue) {
346            return messageQueue.size() != 0;
347        }
348    }
349
350}
351