socket.c revision f662c07058f7e6365ae65080d772f9122f6f50a9
1/*
2 * net/tipc/socket.c: TIPC socket API
3 *
4 * Copyright (c) 2001-2007, Ericsson AB
5 * Copyright (c) 2004-2008, Wind River Systems
6 * All rights reserved.
7 *
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions are met:
10 *
11 * 1. Redistributions of source code must retain the above copyright
12 *    notice, this list of conditions and the following disclaimer.
13 * 2. Redistributions in binary form must reproduce the above copyright
14 *    notice, this list of conditions and the following disclaimer in the
15 *    documentation and/or other materials provided with the distribution.
16 * 3. Neither the names of the copyright holders nor the names of its
17 *    contributors may be used to endorse or promote products derived from
18 *    this software without specific prior written permission.
19 *
20 * Alternatively, this software may be distributed under the terms of the
21 * GNU General Public License ("GPL") version 2 as published by the Free
22 * Software Foundation.
23 *
24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
25 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
26 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
27 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
28 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
29 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
30 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
31 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
32 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
33 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
34 * POSSIBILITY OF SUCH DAMAGE.
35 */
36
37#include <linux/module.h>
38#include <linux/types.h>
39#include <linux/net.h>
40#include <linux/socket.h>
41#include <linux/errno.h>
42#include <linux/mm.h>
43#include <linux/poll.h>
44#include <linux/fcntl.h>
45#include <linux/gfp.h>
46#include <asm/string.h>
47#include <asm/atomic.h>
48#include <net/sock.h>
49
50#include <linux/tipc.h>
51#include <linux/tipc_config.h>
52#include <net/tipc/tipc_msg.h>
53#include <net/tipc/tipc_port.h>
54
55#include "core.h"
56
57#define SS_LISTENING	-1	/* socket is listening */
58#define SS_READY	-2	/* socket is connectionless */
59
60#define OVERLOAD_LIMIT_BASE	5000
61#define CONN_TIMEOUT_DEFAULT	8000	/* default connect timeout = 8s */
62
63struct tipc_sock {
64	struct sock sk;
65	struct tipc_port *p;
66	struct tipc_portid peer_name;
67};
68
69#define tipc_sk(sk) ((struct tipc_sock *)(sk))
70#define tipc_sk_port(sk) ((struct tipc_port *)(tipc_sk(sk)->p))
71
72static int backlog_rcv(struct sock *sk, struct sk_buff *skb);
73static u32 dispatch(struct tipc_port *tport, struct sk_buff *buf);
74static void wakeupdispatch(struct tipc_port *tport);
75
76static const struct proto_ops packet_ops;
77static const struct proto_ops stream_ops;
78static const struct proto_ops msg_ops;
79
80static struct proto tipc_proto;
81
82static int sockets_enabled = 0;
83
84static atomic_t tipc_queue_size = ATOMIC_INIT(0);
85
86/*
87 * Revised TIPC socket locking policy:
88 *
89 * Most socket operations take the standard socket lock when they start
90 * and hold it until they finish (or until they need to sleep).  Acquiring
91 * this lock grants the owner exclusive access to the fields of the socket
92 * data structures, with the exception of the backlog queue.  A few socket
93 * operations can be done without taking the socket lock because they only
94 * read socket information that never changes during the life of the socket.
95 *
96 * Socket operations may acquire the lock for the associated TIPC port if they
97 * need to perform an operation on the port.  If any routine needs to acquire
98 * both the socket lock and the port lock it must take the socket lock first
99 * to avoid the risk of deadlock.
100 *
101 * The dispatcher handling incoming messages cannot grab the socket lock in
102 * the standard fashion, since invoked it runs at the BH level and cannot block.
103 * Instead, it checks to see if the socket lock is currently owned by someone,
104 * and either handles the message itself or adds it to the socket's backlog
105 * queue; in the latter case the queued message is processed once the process
106 * owning the socket lock releases it.
107 *
108 * NOTE: Releasing the socket lock while an operation is sleeping overcomes
109 * the problem of a blocked socket operation preventing any other operations
110 * from occurring.  However, applications must be careful if they have
111 * multiple threads trying to send (or receive) on the same socket, as these
112 * operations might interfere with each other.  For example, doing a connect
113 * and a receive at the same time might allow the receive to consume the
114 * ACK message meant for the connect.  While additional work could be done
115 * to try and overcome this, it doesn't seem to be worthwhile at the present.
116 *
117 * NOTE: Releasing the socket lock while an operation is sleeping also ensures
118 * that another operation that must be performed in a non-blocking manner is
119 * not delayed for very long because the lock has already been taken.
120 *
121 * NOTE: This code assumes that certain fields of a port/socket pair are
122 * constant over its lifetime; such fields can be examined without taking
123 * the socket lock and/or port lock, and do not need to be re-read even
124 * after resuming processing after waiting.  These fields include:
125 *   - socket type
126 *   - pointer to socket sk structure (aka tipc_sock structure)
127 *   - pointer to port structure
128 *   - port reference
129 */
130
131/**
132 * advance_rx_queue - discard first buffer in socket receive queue
133 *
134 * Caller must hold socket lock
135 */
136
137static void advance_rx_queue(struct sock *sk)
138{
139	buf_discard(__skb_dequeue(&sk->sk_receive_queue));
140	atomic_dec(&tipc_queue_size);
141}
142
143/**
144 * discard_rx_queue - discard all buffers in socket receive queue
145 *
146 * Caller must hold socket lock
147 */
148
149static void discard_rx_queue(struct sock *sk)
150{
151	struct sk_buff *buf;
152
153	while ((buf = __skb_dequeue(&sk->sk_receive_queue))) {
154		atomic_dec(&tipc_queue_size);
155		buf_discard(buf);
156	}
157}
158
159/**
160 * reject_rx_queue - reject all buffers in socket receive queue
161 *
162 * Caller must hold socket lock
163 */
164
165static void reject_rx_queue(struct sock *sk)
166{
167	struct sk_buff *buf;
168
169	while ((buf = __skb_dequeue(&sk->sk_receive_queue))) {
170		tipc_reject_msg(buf, TIPC_ERR_NO_PORT);
171		atomic_dec(&tipc_queue_size);
172	}
173}
174
175/**
176 * tipc_create - create a TIPC socket
177 * @net: network namespace (must be default network)
178 * @sock: pre-allocated socket structure
179 * @protocol: protocol indicator (must be 0)
180 * @kern: caused by kernel or by userspace?
181 *
182 * This routine creates additional data structures used by the TIPC socket,
183 * initializes them, and links them together.
184 *
185 * Returns 0 on success, errno otherwise
186 */
187
188static int tipc_create(struct net *net, struct socket *sock, int protocol,
189		       int kern)
190{
191	const struct proto_ops *ops;
192	socket_state state;
193	struct sock *sk;
194	struct tipc_port *tp_ptr;
195
196	/* Validate arguments */
197
198	if (!net_eq(net, &init_net))
199		return -EAFNOSUPPORT;
200
201	if (unlikely(protocol != 0))
202		return -EPROTONOSUPPORT;
203
204	switch (sock->type) {
205	case SOCK_STREAM:
206		ops = &stream_ops;
207		state = SS_UNCONNECTED;
208		break;
209	case SOCK_SEQPACKET:
210		ops = &packet_ops;
211		state = SS_UNCONNECTED;
212		break;
213	case SOCK_DGRAM:
214	case SOCK_RDM:
215		ops = &msg_ops;
216		state = SS_READY;
217		break;
218	default:
219		return -EPROTOTYPE;
220	}
221
222	/* Allocate socket's protocol area */
223
224	sk = sk_alloc(net, AF_TIPC, GFP_KERNEL, &tipc_proto);
225	if (sk == NULL)
226		return -ENOMEM;
227
228	/* Allocate TIPC port for socket to use */
229
230	tp_ptr = tipc_createport_raw(sk, &dispatch, &wakeupdispatch,
231				     TIPC_LOW_IMPORTANCE);
232	if (unlikely(!tp_ptr)) {
233		sk_free(sk);
234		return -ENOMEM;
235	}
236
237	/* Finish initializing socket data structures */
238
239	sock->ops = ops;
240	sock->state = state;
241
242	sock_init_data(sock, sk);
243	sk->sk_rcvtimeo = msecs_to_jiffies(CONN_TIMEOUT_DEFAULT);
244	sk->sk_backlog_rcv = backlog_rcv;
245	tipc_sk(sk)->p = tp_ptr;
246
247	spin_unlock_bh(tp_ptr->lock);
248
249	if (sock->state == SS_READY) {
250		tipc_set_portunreturnable(tp_ptr->ref, 1);
251		if (sock->type == SOCK_DGRAM)
252			tipc_set_portunreliable(tp_ptr->ref, 1);
253	}
254
255	atomic_inc(&tipc_user_count);
256	return 0;
257}
258
259/**
260 * release - destroy a TIPC socket
261 * @sock: socket to destroy
262 *
263 * This routine cleans up any messages that are still queued on the socket.
264 * For DGRAM and RDM socket types, all queued messages are rejected.
265 * For SEQPACKET and STREAM socket types, the first message is rejected
266 * and any others are discarded.  (If the first message on a STREAM socket
267 * is partially-read, it is discarded and the next one is rejected instead.)
268 *
269 * NOTE: Rejected messages are not necessarily returned to the sender!  They
270 * are returned or discarded according to the "destination droppable" setting
271 * specified for the message by the sender.
272 *
273 * Returns 0 on success, errno otherwise
274 */
275
276static int release(struct socket *sock)
277{
278	struct sock *sk = sock->sk;
279	struct tipc_port *tport;
280	struct sk_buff *buf;
281	int res;
282
283	/*
284	 * Exit if socket isn't fully initialized (occurs when a failed accept()
285	 * releases a pre-allocated child socket that was never used)
286	 */
287
288	if (sk == NULL)
289		return 0;
290
291	tport = tipc_sk_port(sk);
292	lock_sock(sk);
293
294	/*
295	 * Reject all unreceived messages, except on an active connection
296	 * (which disconnects locally & sends a 'FIN+' to peer)
297	 */
298
299	while (sock->state != SS_DISCONNECTING) {
300		buf = __skb_dequeue(&sk->sk_receive_queue);
301		if (buf == NULL)
302			break;
303		atomic_dec(&tipc_queue_size);
304		if (TIPC_SKB_CB(buf)->handle != msg_data(buf_msg(buf)))
305			buf_discard(buf);
306		else {
307			if ((sock->state == SS_CONNECTING) ||
308			    (sock->state == SS_CONNECTED)) {
309				sock->state = SS_DISCONNECTING;
310				tipc_disconnect(tport->ref);
311			}
312			tipc_reject_msg(buf, TIPC_ERR_NO_PORT);
313		}
314	}
315
316	/*
317	 * Delete TIPC port; this ensures no more messages are queued
318	 * (also disconnects an active connection & sends a 'FIN-' to peer)
319	 */
320
321	res = tipc_deleteport(tport->ref);
322
323	/* Discard any remaining (connection-based) messages in receive queue */
324
325	discard_rx_queue(sk);
326
327	/* Reject any messages that accumulated in backlog queue */
328
329	sock->state = SS_DISCONNECTING;
330	release_sock(sk);
331
332	sock_put(sk);
333	sock->sk = NULL;
334
335	atomic_dec(&tipc_user_count);
336	return res;
337}
338
339/**
340 * bind - associate or disassocate TIPC name(s) with a socket
341 * @sock: socket structure
342 * @uaddr: socket address describing name(s) and desired operation
343 * @uaddr_len: size of socket address data structure
344 *
345 * Name and name sequence binding is indicated using a positive scope value;
346 * a negative scope value unbinds the specified name.  Specifying no name
347 * (i.e. a socket address length of 0) unbinds all names from the socket.
348 *
349 * Returns 0 on success, errno otherwise
350 *
351 * NOTE: This routine doesn't need to take the socket lock since it doesn't
352 *       access any non-constant socket information.
353 */
354
355static int bind(struct socket *sock, struct sockaddr *uaddr, int uaddr_len)
356{
357	struct sockaddr_tipc *addr = (struct sockaddr_tipc *)uaddr;
358	u32 portref = tipc_sk_port(sock->sk)->ref;
359
360	if (unlikely(!uaddr_len))
361		return tipc_withdraw(portref, 0, NULL);
362
363	if (uaddr_len < sizeof(struct sockaddr_tipc))
364		return -EINVAL;
365	if (addr->family != AF_TIPC)
366		return -EAFNOSUPPORT;
367
368	if (addr->addrtype == TIPC_ADDR_NAME)
369		addr->addr.nameseq.upper = addr->addr.nameseq.lower;
370	else if (addr->addrtype != TIPC_ADDR_NAMESEQ)
371		return -EAFNOSUPPORT;
372
373	return (addr->scope > 0) ?
374		tipc_publish(portref, addr->scope, &addr->addr.nameseq) :
375		tipc_withdraw(portref, -addr->scope, &addr->addr.nameseq);
376}
377
378/**
379 * get_name - get port ID of socket or peer socket
380 * @sock: socket structure
381 * @uaddr: area for returned socket address
382 * @uaddr_len: area for returned length of socket address
383 * @peer: 0 = own ID, 1 = current peer ID, 2 = current/former peer ID
384 *
385 * Returns 0 on success, errno otherwise
386 *
387 * NOTE: This routine doesn't need to take the socket lock since it only
388 *       accesses socket information that is unchanging (or which changes in
389 * 	 a completely predictable manner).
390 */
391
392static int get_name(struct socket *sock, struct sockaddr *uaddr,
393		    int *uaddr_len, int peer)
394{
395	struct sockaddr_tipc *addr = (struct sockaddr_tipc *)uaddr;
396	struct tipc_sock *tsock = tipc_sk(sock->sk);
397
398	if (peer) {
399		if ((sock->state != SS_CONNECTED) &&
400			((peer != 2) || (sock->state != SS_DISCONNECTING)))
401			return -ENOTCONN;
402		addr->addr.id.ref = tsock->peer_name.ref;
403		addr->addr.id.node = tsock->peer_name.node;
404	} else {
405		tipc_ownidentity(tsock->p->ref, &addr->addr.id);
406	}
407
408	*uaddr_len = sizeof(*addr);
409	addr->addrtype = TIPC_ADDR_ID;
410	addr->family = AF_TIPC;
411	addr->scope = 0;
412	addr->addr.name.domain = 0;
413
414	return 0;
415}
416
417/**
418 * poll - read and possibly block on pollmask
419 * @file: file structure associated with the socket
420 * @sock: socket for which to calculate the poll bits
421 * @wait: ???
422 *
423 * Returns pollmask value
424 *
425 * COMMENTARY:
426 * It appears that the usual socket locking mechanisms are not useful here
427 * since the pollmask info is potentially out-of-date the moment this routine
428 * exits.  TCP and other protocols seem to rely on higher level poll routines
429 * to handle any preventable race conditions, so TIPC will do the same ...
430 *
431 * TIPC sets the returned events as follows:
432 *
433 * socket state		flags set
434 * ------------		---------
435 * unconnected		no read flags
436 *			no write flags
437 *
438 * connecting		POLLIN/POLLRDNORM if ACK/NACK in rx queue
439 *			no write flags
440 *
441 * connected		POLLIN/POLLRDNORM if data in rx queue
442 *			POLLOUT if port is not congested
443 *
444 * disconnecting	POLLIN/POLLRDNORM/POLLHUP
445 *			no write flags
446 *
447 * listening		POLLIN if SYN in rx queue
448 *			no write flags
449 *
450 * ready		POLLIN/POLLRDNORM if data in rx queue
451 * [connectionless]	POLLOUT (since port cannot be congested)
452 *
453 * IMPORTANT: The fact that a read or write operation is indicated does NOT
454 * imply that the operation will succeed, merely that it should be performed
455 * and will not block.
456 */
457
458static unsigned int poll(struct file *file, struct socket *sock,
459			 poll_table *wait)
460{
461	struct sock *sk = sock->sk;
462	u32 mask = 0;
463
464	poll_wait(file, sk_sleep(sk), wait);
465
466	switch ((int)sock->state) {
467	case SS_READY:
468	case SS_CONNECTED:
469		if (!tipc_sk_port(sk)->congested)
470			mask |= POLLOUT;
471		/* fall thru' */
472	case SS_CONNECTING:
473	case SS_LISTENING:
474		if (!skb_queue_empty(&sk->sk_receive_queue))
475			mask |= (POLLIN | POLLRDNORM);
476		break;
477	case SS_DISCONNECTING:
478		mask = (POLLIN | POLLRDNORM | POLLHUP);
479		break;
480	}
481
482	return mask;
483}
484
485/**
486 * dest_name_check - verify user is permitted to send to specified port name
487 * @dest: destination address
488 * @m: descriptor for message to be sent
489 *
490 * Prevents restricted configuration commands from being issued by
491 * unauthorized users.
492 *
493 * Returns 0 if permission is granted, otherwise errno
494 */
495
496static int dest_name_check(struct sockaddr_tipc *dest, struct msghdr *m)
497{
498	struct tipc_cfg_msg_hdr hdr;
499
500	if (likely(dest->addr.name.name.type >= TIPC_RESERVED_TYPES))
501		return 0;
502	if (likely(dest->addr.name.name.type == TIPC_TOP_SRV))
503		return 0;
504	if (likely(dest->addr.name.name.type != TIPC_CFG_SRV))
505		return -EACCES;
506
507	if (copy_from_user(&hdr, m->msg_iov[0].iov_base, sizeof(hdr)))
508		return -EFAULT;
509	if ((ntohs(hdr.tcm_type) & 0xC000) && (!capable(CAP_NET_ADMIN)))
510		return -EACCES;
511
512	return 0;
513}
514
515/**
516 * send_msg - send message in connectionless manner
517 * @iocb: if NULL, indicates that socket lock is already held
518 * @sock: socket structure
519 * @m: message to send
520 * @total_len: length of message
521 *
522 * Message must have an destination specified explicitly.
523 * Used for SOCK_RDM and SOCK_DGRAM messages,
524 * and for 'SYN' messages on SOCK_SEQPACKET and SOCK_STREAM connections.
525 * (Note: 'SYN+' is prohibited on SOCK_STREAM.)
526 *
527 * Returns the number of bytes sent on success, or errno otherwise
528 */
529
530static int send_msg(struct kiocb *iocb, struct socket *sock,
531		    struct msghdr *m, size_t total_len)
532{
533	struct sock *sk = sock->sk;
534	struct tipc_port *tport = tipc_sk_port(sk);
535	struct sockaddr_tipc *dest = (struct sockaddr_tipc *)m->msg_name;
536	int needs_conn;
537	int res = -EINVAL;
538
539	if (unlikely(!dest))
540		return -EDESTADDRREQ;
541	if (unlikely((m->msg_namelen < sizeof(*dest)) ||
542		     (dest->family != AF_TIPC)))
543		return -EINVAL;
544
545	if (iocb)
546		lock_sock(sk);
547
548	needs_conn = (sock->state != SS_READY);
549	if (unlikely(needs_conn)) {
550		if (sock->state == SS_LISTENING) {
551			res = -EPIPE;
552			goto exit;
553		}
554		if (sock->state != SS_UNCONNECTED) {
555			res = -EISCONN;
556			goto exit;
557		}
558		if ((tport->published) ||
559		    ((sock->type == SOCK_STREAM) && (total_len != 0))) {
560			res = -EOPNOTSUPP;
561			goto exit;
562		}
563		if (dest->addrtype == TIPC_ADDR_NAME) {
564			tport->conn_type = dest->addr.name.name.type;
565			tport->conn_instance = dest->addr.name.name.instance;
566		}
567
568		/* Abort any pending connection attempts (very unlikely) */
569
570		reject_rx_queue(sk);
571	}
572
573	do {
574		if (dest->addrtype == TIPC_ADDR_NAME) {
575			if ((res = dest_name_check(dest, m)))
576				break;
577			res = tipc_send2name(tport->ref,
578					     &dest->addr.name.name,
579					     dest->addr.name.domain,
580					     m->msg_iovlen,
581					     m->msg_iov);
582		}
583		else if (dest->addrtype == TIPC_ADDR_ID) {
584			res = tipc_send2port(tport->ref,
585					     &dest->addr.id,
586					     m->msg_iovlen,
587					     m->msg_iov);
588		}
589		else if (dest->addrtype == TIPC_ADDR_MCAST) {
590			if (needs_conn) {
591				res = -EOPNOTSUPP;
592				break;
593			}
594			if ((res = dest_name_check(dest, m)))
595				break;
596			res = tipc_multicast(tport->ref,
597					     &dest->addr.nameseq,
598					     0,
599					     m->msg_iovlen,
600					     m->msg_iov);
601		}
602		if (likely(res != -ELINKCONG)) {
603			if (needs_conn && (res >= 0)) {
604				sock->state = SS_CONNECTING;
605			}
606			break;
607		}
608		if (m->msg_flags & MSG_DONTWAIT) {
609			res = -EWOULDBLOCK;
610			break;
611		}
612		release_sock(sk);
613		res = wait_event_interruptible(*sk_sleep(sk),
614					       !tport->congested);
615		lock_sock(sk);
616		if (res)
617			break;
618	} while (1);
619
620exit:
621	if (iocb)
622		release_sock(sk);
623	return res;
624}
625
626/**
627 * send_packet - send a connection-oriented message
628 * @iocb: if NULL, indicates that socket lock is already held
629 * @sock: socket structure
630 * @m: message to send
631 * @total_len: length of message
632 *
633 * Used for SOCK_SEQPACKET messages and SOCK_STREAM data.
634 *
635 * Returns the number of bytes sent on success, or errno otherwise
636 */
637
638static int send_packet(struct kiocb *iocb, struct socket *sock,
639		       struct msghdr *m, size_t total_len)
640{
641	struct sock *sk = sock->sk;
642	struct tipc_port *tport = tipc_sk_port(sk);
643	struct sockaddr_tipc *dest = (struct sockaddr_tipc *)m->msg_name;
644	int res;
645
646	/* Handle implied connection establishment */
647
648	if (unlikely(dest))
649		return send_msg(iocb, sock, m, total_len);
650
651	if (iocb)
652		lock_sock(sk);
653
654	do {
655		if (unlikely(sock->state != SS_CONNECTED)) {
656			if (sock->state == SS_DISCONNECTING)
657				res = -EPIPE;
658			else
659				res = -ENOTCONN;
660			break;
661		}
662
663		res = tipc_send(tport->ref, m->msg_iovlen, m->msg_iov);
664		if (likely(res != -ELINKCONG)) {
665			break;
666		}
667		if (m->msg_flags & MSG_DONTWAIT) {
668			res = -EWOULDBLOCK;
669			break;
670		}
671		release_sock(sk);
672		res = wait_event_interruptible(*sk_sleep(sk),
673			(!tport->congested || !tport->connected));
674		lock_sock(sk);
675		if (res)
676			break;
677	} while (1);
678
679	if (iocb)
680		release_sock(sk);
681	return res;
682}
683
684/**
685 * send_stream - send stream-oriented data
686 * @iocb: (unused)
687 * @sock: socket structure
688 * @m: data to send
689 * @total_len: total length of data to be sent
690 *
691 * Used for SOCK_STREAM data.
692 *
693 * Returns the number of bytes sent on success (or partial success),
694 * or errno if no data sent
695 */
696
697static int send_stream(struct kiocb *iocb, struct socket *sock,
698		       struct msghdr *m, size_t total_len)
699{
700	struct sock *sk = sock->sk;
701	struct tipc_port *tport = tipc_sk_port(sk);
702	struct msghdr my_msg;
703	struct iovec my_iov;
704	struct iovec *curr_iov;
705	int curr_iovlen;
706	char __user *curr_start;
707	u32 hdr_size;
708	int curr_left;
709	int bytes_to_send;
710	int bytes_sent;
711	int res;
712
713	lock_sock(sk);
714
715	/* Handle special cases where there is no connection */
716
717	if (unlikely(sock->state != SS_CONNECTED)) {
718		if (sock->state == SS_UNCONNECTED) {
719			res = send_packet(NULL, sock, m, total_len);
720			goto exit;
721		} else if (sock->state == SS_DISCONNECTING) {
722			res = -EPIPE;
723			goto exit;
724		} else {
725			res = -ENOTCONN;
726			goto exit;
727		}
728	}
729
730	if (unlikely(m->msg_name)) {
731		res = -EISCONN;
732		goto exit;
733	}
734
735	/*
736	 * Send each iovec entry using one or more messages
737	 *
738	 * Note: This algorithm is good for the most likely case
739	 * (i.e. one large iovec entry), but could be improved to pass sets
740	 * of small iovec entries into send_packet().
741	 */
742
743	curr_iov = m->msg_iov;
744	curr_iovlen = m->msg_iovlen;
745	my_msg.msg_iov = &my_iov;
746	my_msg.msg_iovlen = 1;
747	my_msg.msg_flags = m->msg_flags;
748	my_msg.msg_name = NULL;
749	bytes_sent = 0;
750
751	hdr_size = msg_hdr_sz(&tport->phdr);
752
753	while (curr_iovlen--) {
754		curr_start = curr_iov->iov_base;
755		curr_left = curr_iov->iov_len;
756
757		while (curr_left) {
758			bytes_to_send = tport->max_pkt - hdr_size;
759			if (bytes_to_send > TIPC_MAX_USER_MSG_SIZE)
760				bytes_to_send = TIPC_MAX_USER_MSG_SIZE;
761			if (curr_left < bytes_to_send)
762				bytes_to_send = curr_left;
763			my_iov.iov_base = curr_start;
764			my_iov.iov_len = bytes_to_send;
765			if ((res = send_packet(NULL, sock, &my_msg, 0)) < 0) {
766				if (bytes_sent)
767					res = bytes_sent;
768				goto exit;
769			}
770			curr_left -= bytes_to_send;
771			curr_start += bytes_to_send;
772			bytes_sent += bytes_to_send;
773		}
774
775		curr_iov++;
776	}
777	res = bytes_sent;
778exit:
779	release_sock(sk);
780	return res;
781}
782
783/**
784 * auto_connect - complete connection setup to a remote port
785 * @sock: socket structure
786 * @msg: peer's response message
787 *
788 * Returns 0 on success, errno otherwise
789 */
790
791static int auto_connect(struct socket *sock, struct tipc_msg *msg)
792{
793	struct tipc_sock *tsock = tipc_sk(sock->sk);
794
795	if (msg_errcode(msg)) {
796		sock->state = SS_DISCONNECTING;
797		return -ECONNREFUSED;
798	}
799
800	tsock->peer_name.ref = msg_origport(msg);
801	tsock->peer_name.node = msg_orignode(msg);
802	tipc_connect2port(tsock->p->ref, &tsock->peer_name);
803	tipc_set_portimportance(tsock->p->ref, msg_importance(msg));
804	sock->state = SS_CONNECTED;
805	return 0;
806}
807
808/**
809 * set_orig_addr - capture sender's address for received message
810 * @m: descriptor for message info
811 * @msg: received message header
812 *
813 * Note: Address is not captured if not requested by receiver.
814 */
815
816static void set_orig_addr(struct msghdr *m, struct tipc_msg *msg)
817{
818	struct sockaddr_tipc *addr = (struct sockaddr_tipc *)m->msg_name;
819
820	if (addr) {
821		addr->family = AF_TIPC;
822		addr->addrtype = TIPC_ADDR_ID;
823		addr->addr.id.ref = msg_origport(msg);
824		addr->addr.id.node = msg_orignode(msg);
825		addr->addr.name.domain = 0;   	/* could leave uninitialized */
826		addr->scope = 0;   		/* could leave uninitialized */
827		m->msg_namelen = sizeof(struct sockaddr_tipc);
828	}
829}
830
831/**
832 * anc_data_recv - optionally capture ancillary data for received message
833 * @m: descriptor for message info
834 * @msg: received message header
835 * @tport: TIPC port associated with message
836 *
837 * Note: Ancillary data is not captured if not requested by receiver.
838 *
839 * Returns 0 if successful, otherwise errno
840 */
841
842static int anc_data_recv(struct msghdr *m, struct tipc_msg *msg,
843				struct tipc_port *tport)
844{
845	u32 anc_data[3];
846	u32 err;
847	u32 dest_type;
848	int has_name;
849	int res;
850
851	if (likely(m->msg_controllen == 0))
852		return 0;
853
854	/* Optionally capture errored message object(s) */
855
856	err = msg ? msg_errcode(msg) : 0;
857	if (unlikely(err)) {
858		anc_data[0] = err;
859		anc_data[1] = msg_data_sz(msg);
860		if ((res = put_cmsg(m, SOL_TIPC, TIPC_ERRINFO, 8, anc_data)))
861			return res;
862		if (anc_data[1] &&
863		    (res = put_cmsg(m, SOL_TIPC, TIPC_RETDATA, anc_data[1],
864				    msg_data(msg))))
865			return res;
866	}
867
868	/* Optionally capture message destination object */
869
870	dest_type = msg ? msg_type(msg) : TIPC_DIRECT_MSG;
871	switch (dest_type) {
872	case TIPC_NAMED_MSG:
873		has_name = 1;
874		anc_data[0] = msg_nametype(msg);
875		anc_data[1] = msg_namelower(msg);
876		anc_data[2] = msg_namelower(msg);
877		break;
878	case TIPC_MCAST_MSG:
879		has_name = 1;
880		anc_data[0] = msg_nametype(msg);
881		anc_data[1] = msg_namelower(msg);
882		anc_data[2] = msg_nameupper(msg);
883		break;
884	case TIPC_CONN_MSG:
885		has_name = (tport->conn_type != 0);
886		anc_data[0] = tport->conn_type;
887		anc_data[1] = tport->conn_instance;
888		anc_data[2] = tport->conn_instance;
889		break;
890	default:
891		has_name = 0;
892	}
893	if (has_name &&
894	    (res = put_cmsg(m, SOL_TIPC, TIPC_DESTNAME, 12, anc_data)))
895		return res;
896
897	return 0;
898}
899
900/**
901 * recv_msg - receive packet-oriented message
902 * @iocb: (unused)
903 * @m: descriptor for message info
904 * @buf_len: total size of user buffer area
905 * @flags: receive flags
906 *
907 * Used for SOCK_DGRAM, SOCK_RDM, and SOCK_SEQPACKET messages.
908 * If the complete message doesn't fit in user area, truncate it.
909 *
910 * Returns size of returned message data, errno otherwise
911 */
912
913static int recv_msg(struct kiocb *iocb, struct socket *sock,
914		    struct msghdr *m, size_t buf_len, int flags)
915{
916	struct sock *sk = sock->sk;
917	struct tipc_port *tport = tipc_sk_port(sk);
918	struct sk_buff *buf;
919	struct tipc_msg *msg;
920	unsigned int sz;
921	u32 err;
922	int res;
923
924	/* Catch invalid receive requests */
925
926	if (m->msg_iovlen != 1)
927		return -EOPNOTSUPP;   /* Don't do multiple iovec entries yet */
928
929	if (unlikely(!buf_len))
930		return -EINVAL;
931
932	lock_sock(sk);
933
934	if (unlikely(sock->state == SS_UNCONNECTED)) {
935		res = -ENOTCONN;
936		goto exit;
937	}
938
939restart:
940
941	/* Look for a message in receive queue; wait if necessary */
942
943	while (skb_queue_empty(&sk->sk_receive_queue)) {
944		if (sock->state == SS_DISCONNECTING) {
945			res = -ENOTCONN;
946			goto exit;
947		}
948		if (flags & MSG_DONTWAIT) {
949			res = -EWOULDBLOCK;
950			goto exit;
951		}
952		release_sock(sk);
953		res = wait_event_interruptible(*sk_sleep(sk),
954			(!skb_queue_empty(&sk->sk_receive_queue) ||
955			 (sock->state == SS_DISCONNECTING)));
956		lock_sock(sk);
957		if (res)
958			goto exit;
959	}
960
961	/* Look at first message in receive queue */
962
963	buf = skb_peek(&sk->sk_receive_queue);
964	msg = buf_msg(buf);
965	sz = msg_data_sz(msg);
966	err = msg_errcode(msg);
967
968	/* Complete connection setup for an implied connect */
969
970	if (unlikely(sock->state == SS_CONNECTING)) {
971		res = auto_connect(sock, msg);
972		if (res)
973			goto exit;
974	}
975
976	/* Discard an empty non-errored message & try again */
977
978	if ((!sz) && (!err)) {
979		advance_rx_queue(sk);
980		goto restart;
981	}
982
983	/* Capture sender's address (optional) */
984
985	set_orig_addr(m, msg);
986
987	/* Capture ancillary data (optional) */
988
989	res = anc_data_recv(m, msg, tport);
990	if (res)
991		goto exit;
992
993	/* Capture message data (if valid) & compute return value (always) */
994
995	if (!err) {
996		if (unlikely(buf_len < sz)) {
997			sz = buf_len;
998			m->msg_flags |= MSG_TRUNC;
999		}
1000		if (unlikely(copy_to_user(m->msg_iov->iov_base, msg_data(msg),
1001					  sz))) {
1002			res = -EFAULT;
1003			goto exit;
1004		}
1005		res = sz;
1006	} else {
1007		if ((sock->state == SS_READY) ||
1008		    ((err == TIPC_CONN_SHUTDOWN) || m->msg_control))
1009			res = 0;
1010		else
1011			res = -ECONNRESET;
1012	}
1013
1014	/* Consume received message (optional) */
1015
1016	if (likely(!(flags & MSG_PEEK))) {
1017		if ((sock->state != SS_READY) &&
1018		    (++tport->conn_unacked >= TIPC_FLOW_CONTROL_WIN))
1019			tipc_acknowledge(tport->ref, tport->conn_unacked);
1020		advance_rx_queue(sk);
1021	}
1022exit:
1023	release_sock(sk);
1024	return res;
1025}
1026
1027/**
1028 * recv_stream - receive stream-oriented data
1029 * @iocb: (unused)
1030 * @m: descriptor for message info
1031 * @buf_len: total size of user buffer area
1032 * @flags: receive flags
1033 *
1034 * Used for SOCK_STREAM messages only.  If not enough data is available
1035 * will optionally wait for more; never truncates data.
1036 *
1037 * Returns size of returned message data, errno otherwise
1038 */
1039
1040static int recv_stream(struct kiocb *iocb, struct socket *sock,
1041		       struct msghdr *m, size_t buf_len, int flags)
1042{
1043	struct sock *sk = sock->sk;
1044	struct tipc_port *tport = tipc_sk_port(sk);
1045	struct sk_buff *buf;
1046	struct tipc_msg *msg;
1047	unsigned int sz;
1048	int sz_to_copy, target, needed;
1049	int sz_copied = 0;
1050	char __user *crs = m->msg_iov->iov_base;
1051	unsigned char *buf_crs;
1052	u32 err;
1053	int res = 0;
1054
1055	/* Catch invalid receive attempts */
1056
1057	if (m->msg_iovlen != 1)
1058		return -EOPNOTSUPP;   /* Don't do multiple iovec entries yet */
1059
1060	if (unlikely(!buf_len))
1061		return -EINVAL;
1062
1063	lock_sock(sk);
1064
1065	if (unlikely((sock->state == SS_UNCONNECTED) ||
1066		     (sock->state == SS_CONNECTING))) {
1067		res = -ENOTCONN;
1068		goto exit;
1069	}
1070
1071	target = sock_rcvlowat(sk, flags & MSG_WAITALL, buf_len);
1072
1073restart:
1074
1075	/* Look for a message in receive queue; wait if necessary */
1076
1077	while (skb_queue_empty(&sk->sk_receive_queue)) {
1078		if (sock->state == SS_DISCONNECTING) {
1079			res = -ENOTCONN;
1080			goto exit;
1081		}
1082		if (flags & MSG_DONTWAIT) {
1083			res = -EWOULDBLOCK;
1084			goto exit;
1085		}
1086		release_sock(sk);
1087		res = wait_event_interruptible(*sk_sleep(sk),
1088			(!skb_queue_empty(&sk->sk_receive_queue) ||
1089			 (sock->state == SS_DISCONNECTING)));
1090		lock_sock(sk);
1091		if (res)
1092			goto exit;
1093	}
1094
1095	/* Look at first message in receive queue */
1096
1097	buf = skb_peek(&sk->sk_receive_queue);
1098	msg = buf_msg(buf);
1099	sz = msg_data_sz(msg);
1100	err = msg_errcode(msg);
1101
1102	/* Discard an empty non-errored message & try again */
1103
1104	if ((!sz) && (!err)) {
1105		advance_rx_queue(sk);
1106		goto restart;
1107	}
1108
1109	/* Optionally capture sender's address & ancillary data of first msg */
1110
1111	if (sz_copied == 0) {
1112		set_orig_addr(m, msg);
1113		res = anc_data_recv(m, msg, tport);
1114		if (res)
1115			goto exit;
1116	}
1117
1118	/* Capture message data (if valid) & compute return value (always) */
1119
1120	if (!err) {
1121		buf_crs = (unsigned char *)(TIPC_SKB_CB(buf)->handle);
1122		sz = (unsigned char *)msg + msg_size(msg) - buf_crs;
1123
1124		needed = (buf_len - sz_copied);
1125		sz_to_copy = (sz <= needed) ? sz : needed;
1126		if (unlikely(copy_to_user(crs, buf_crs, sz_to_copy))) {
1127			res = -EFAULT;
1128			goto exit;
1129		}
1130		sz_copied += sz_to_copy;
1131
1132		if (sz_to_copy < sz) {
1133			if (!(flags & MSG_PEEK))
1134				TIPC_SKB_CB(buf)->handle = buf_crs + sz_to_copy;
1135			goto exit;
1136		}
1137
1138		crs += sz_to_copy;
1139	} else {
1140		if (sz_copied != 0)
1141			goto exit; /* can't add error msg to valid data */
1142
1143		if ((err == TIPC_CONN_SHUTDOWN) || m->msg_control)
1144			res = 0;
1145		else
1146			res = -ECONNRESET;
1147	}
1148
1149	/* Consume received message (optional) */
1150
1151	if (likely(!(flags & MSG_PEEK))) {
1152		if (unlikely(++tport->conn_unacked >= TIPC_FLOW_CONTROL_WIN))
1153			tipc_acknowledge(tport->ref, tport->conn_unacked);
1154		advance_rx_queue(sk);
1155	}
1156
1157	/* Loop around if more data is required */
1158
1159	if ((sz_copied < buf_len) &&	/* didn't get all requested data */
1160	    (!skb_queue_empty(&sk->sk_receive_queue) ||
1161	    (sz_copied < target)) &&	/* and more is ready or required */
1162	    (!(flags & MSG_PEEK)) &&	/* and aren't just peeking at data */
1163	    (!err))			/* and haven't reached a FIN */
1164		goto restart;
1165
1166exit:
1167	release_sock(sk);
1168	return sz_copied ? sz_copied : res;
1169}
1170
1171/**
1172 * rx_queue_full - determine if receive queue can accept another message
1173 * @msg: message to be added to queue
1174 * @queue_size: current size of queue
1175 * @base: nominal maximum size of queue
1176 *
1177 * Returns 1 if queue is unable to accept message, 0 otherwise
1178 */
1179
1180static int rx_queue_full(struct tipc_msg *msg, u32 queue_size, u32 base)
1181{
1182	u32 threshold;
1183	u32 imp = msg_importance(msg);
1184
1185	if (imp == TIPC_LOW_IMPORTANCE)
1186		threshold = base;
1187	else if (imp == TIPC_MEDIUM_IMPORTANCE)
1188		threshold = base * 2;
1189	else if (imp == TIPC_HIGH_IMPORTANCE)
1190		threshold = base * 100;
1191	else
1192		return 0;
1193
1194	if (msg_connected(msg))
1195		threshold *= 4;
1196
1197	return (queue_size >= threshold);
1198}
1199
1200/**
1201 * filter_rcv - validate incoming message
1202 * @sk: socket
1203 * @buf: message
1204 *
1205 * Enqueues message on receive queue if acceptable; optionally handles
1206 * disconnect indication for a connected socket.
1207 *
1208 * Called with socket lock already taken; port lock may also be taken.
1209 *
1210 * Returns TIPC error status code (TIPC_OK if message is not to be rejected)
1211 */
1212
1213static u32 filter_rcv(struct sock *sk, struct sk_buff *buf)
1214{
1215	struct socket *sock = sk->sk_socket;
1216	struct tipc_msg *msg = buf_msg(buf);
1217	u32 recv_q_len;
1218
1219	/* Reject message if it is wrong sort of message for socket */
1220
1221	/*
1222	 * WOULD IT BE BETTER TO JUST DISCARD THESE MESSAGES INSTEAD?
1223	 * "NO PORT" ISN'T REALLY THE RIGHT ERROR CODE, AND THERE MAY
1224	 * BE SECURITY IMPLICATIONS INHERENT IN REJECTING INVALID TRAFFIC
1225	 */
1226
1227	if (sock->state == SS_READY) {
1228		if (msg_connected(msg)) {
1229			msg_dbg(msg, "dispatch filter 1\n");
1230			return TIPC_ERR_NO_PORT;
1231		}
1232	} else {
1233		if (msg_mcast(msg)) {
1234			msg_dbg(msg, "dispatch filter 2\n");
1235			return TIPC_ERR_NO_PORT;
1236		}
1237		if (sock->state == SS_CONNECTED) {
1238			if (!msg_connected(msg)) {
1239				msg_dbg(msg, "dispatch filter 3\n");
1240				return TIPC_ERR_NO_PORT;
1241			}
1242		}
1243		else if (sock->state == SS_CONNECTING) {
1244			if (!msg_connected(msg) && (msg_errcode(msg) == 0)) {
1245				msg_dbg(msg, "dispatch filter 4\n");
1246				return TIPC_ERR_NO_PORT;
1247			}
1248		}
1249		else if (sock->state == SS_LISTENING) {
1250			if (msg_connected(msg) || msg_errcode(msg)) {
1251				msg_dbg(msg, "dispatch filter 5\n");
1252				return TIPC_ERR_NO_PORT;
1253			}
1254		}
1255		else if (sock->state == SS_DISCONNECTING) {
1256			msg_dbg(msg, "dispatch filter 6\n");
1257			return TIPC_ERR_NO_PORT;
1258		}
1259		else /* (sock->state == SS_UNCONNECTED) */ {
1260			if (msg_connected(msg) || msg_errcode(msg)) {
1261				msg_dbg(msg, "dispatch filter 7\n");
1262				return TIPC_ERR_NO_PORT;
1263			}
1264		}
1265	}
1266
1267	/* Reject message if there isn't room to queue it */
1268
1269	recv_q_len = (u32)atomic_read(&tipc_queue_size);
1270	if (unlikely(recv_q_len >= OVERLOAD_LIMIT_BASE)) {
1271		if (rx_queue_full(msg, recv_q_len, OVERLOAD_LIMIT_BASE))
1272			return TIPC_ERR_OVERLOAD;
1273	}
1274	recv_q_len = skb_queue_len(&sk->sk_receive_queue);
1275	if (unlikely(recv_q_len >= (OVERLOAD_LIMIT_BASE / 2))) {
1276		if (rx_queue_full(msg, recv_q_len, OVERLOAD_LIMIT_BASE / 2))
1277			return TIPC_ERR_OVERLOAD;
1278	}
1279
1280	/* Enqueue message (finally!) */
1281
1282	msg_dbg(msg, "<DISP<: ");
1283	TIPC_SKB_CB(buf)->handle = msg_data(msg);
1284	atomic_inc(&tipc_queue_size);
1285	__skb_queue_tail(&sk->sk_receive_queue, buf);
1286
1287	/* Initiate connection termination for an incoming 'FIN' */
1288
1289	if (unlikely(msg_errcode(msg) && (sock->state == SS_CONNECTED))) {
1290		sock->state = SS_DISCONNECTING;
1291		tipc_disconnect_port(tipc_sk_port(sk));
1292	}
1293
1294	if (waitqueue_active(sk_sleep(sk)))
1295		wake_up_interruptible(sk_sleep(sk));
1296	return TIPC_OK;
1297}
1298
1299/**
1300 * backlog_rcv - handle incoming message from backlog queue
1301 * @sk: socket
1302 * @buf: message
1303 *
1304 * Caller must hold socket lock, but not port lock.
1305 *
1306 * Returns 0
1307 */
1308
1309static int backlog_rcv(struct sock *sk, struct sk_buff *buf)
1310{
1311	u32 res;
1312
1313	res = filter_rcv(sk, buf);
1314	if (res)
1315		tipc_reject_msg(buf, res);
1316	return 0;
1317}
1318
1319/**
1320 * dispatch - handle incoming message
1321 * @tport: TIPC port that received message
1322 * @buf: message
1323 *
1324 * Called with port lock already taken.
1325 *
1326 * Returns TIPC error status code (TIPC_OK if message is not to be rejected)
1327 */
1328
1329static u32 dispatch(struct tipc_port *tport, struct sk_buff *buf)
1330{
1331	struct sock *sk = (struct sock *)tport->usr_handle;
1332	u32 res;
1333
1334	/*
1335	 * Process message if socket is unlocked; otherwise add to backlog queue
1336	 *
1337	 * This code is based on sk_receive_skb(), but must be distinct from it
1338	 * since a TIPC-specific filter/reject mechanism is utilized
1339	 */
1340
1341	bh_lock_sock(sk);
1342	if (!sock_owned_by_user(sk)) {
1343		res = filter_rcv(sk, buf);
1344	} else {
1345		if (sk_add_backlog(sk, buf))
1346			res = TIPC_ERR_OVERLOAD;
1347		else
1348			res = TIPC_OK;
1349	}
1350	bh_unlock_sock(sk);
1351
1352	return res;
1353}
1354
1355/**
1356 * wakeupdispatch - wake up port after congestion
1357 * @tport: port to wakeup
1358 *
1359 * Called with port lock already taken.
1360 */
1361
1362static void wakeupdispatch(struct tipc_port *tport)
1363{
1364	struct sock *sk = (struct sock *)tport->usr_handle;
1365
1366	if (waitqueue_active(sk_sleep(sk)))
1367		wake_up_interruptible(sk_sleep(sk));
1368}
1369
1370/**
1371 * connect - establish a connection to another TIPC port
1372 * @sock: socket structure
1373 * @dest: socket address for destination port
1374 * @destlen: size of socket address data structure
1375 * @flags: file-related flags associated with socket
1376 *
1377 * Returns 0 on success, errno otherwise
1378 */
1379
1380static int connect(struct socket *sock, struct sockaddr *dest, int destlen,
1381		   int flags)
1382{
1383	struct sock *sk = sock->sk;
1384	struct sockaddr_tipc *dst = (struct sockaddr_tipc *)dest;
1385	struct msghdr m = {NULL,};
1386	struct sk_buff *buf;
1387	struct tipc_msg *msg;
1388	int res;
1389
1390	lock_sock(sk);
1391
1392	/* For now, TIPC does not allow use of connect() with DGRAM/RDM types */
1393
1394	if (sock->state == SS_READY) {
1395		res = -EOPNOTSUPP;
1396		goto exit;
1397	}
1398
1399	/* For now, TIPC does not support the non-blocking form of connect() */
1400
1401	if (flags & O_NONBLOCK) {
1402		res = -EOPNOTSUPP;
1403		goto exit;
1404	}
1405
1406	/* Issue Posix-compliant error code if socket is in the wrong state */
1407
1408	if (sock->state == SS_LISTENING) {
1409		res = -EOPNOTSUPP;
1410		goto exit;
1411	}
1412	if (sock->state == SS_CONNECTING) {
1413		res = -EALREADY;
1414		goto exit;
1415	}
1416	if (sock->state != SS_UNCONNECTED) {
1417		res = -EISCONN;
1418		goto exit;
1419	}
1420
1421	/*
1422	 * Reject connection attempt using multicast address
1423	 *
1424	 * Note: send_msg() validates the rest of the address fields,
1425	 *       so there's no need to do it here
1426	 */
1427
1428	if (dst->addrtype == TIPC_ADDR_MCAST) {
1429		res = -EINVAL;
1430		goto exit;
1431	}
1432
1433	/* Reject any messages already in receive queue (very unlikely) */
1434
1435	reject_rx_queue(sk);
1436
1437	/* Send a 'SYN-' to destination */
1438
1439	m.msg_name = dest;
1440	m.msg_namelen = destlen;
1441	res = send_msg(NULL, sock, &m, 0);
1442	if (res < 0) {
1443		goto exit;
1444	}
1445
1446	/* Wait until an 'ACK' or 'RST' arrives, or a timeout occurs */
1447
1448	release_sock(sk);
1449	res = wait_event_interruptible_timeout(*sk_sleep(sk),
1450			(!skb_queue_empty(&sk->sk_receive_queue) ||
1451			(sock->state != SS_CONNECTING)),
1452			sk->sk_rcvtimeo);
1453	lock_sock(sk);
1454
1455	if (res > 0) {
1456		buf = skb_peek(&sk->sk_receive_queue);
1457		if (buf != NULL) {
1458			msg = buf_msg(buf);
1459			res = auto_connect(sock, msg);
1460			if (!res) {
1461				if (!msg_data_sz(msg))
1462					advance_rx_queue(sk);
1463			}
1464		} else {
1465			if (sock->state == SS_CONNECTED) {
1466				res = -EISCONN;
1467			} else {
1468				res = -ECONNREFUSED;
1469			}
1470		}
1471	} else {
1472		if (res == 0)
1473			res = -ETIMEDOUT;
1474		else
1475			; /* leave "res" unchanged */
1476		sock->state = SS_DISCONNECTING;
1477	}
1478
1479exit:
1480	release_sock(sk);
1481	return res;
1482}
1483
1484/**
1485 * listen - allow socket to listen for incoming connections
1486 * @sock: socket structure
1487 * @len: (unused)
1488 *
1489 * Returns 0 on success, errno otherwise
1490 */
1491
1492static int listen(struct socket *sock, int len)
1493{
1494	struct sock *sk = sock->sk;
1495	int res;
1496
1497	lock_sock(sk);
1498
1499	if (sock->state == SS_READY)
1500		res = -EOPNOTSUPP;
1501	else if (sock->state != SS_UNCONNECTED)
1502		res = -EINVAL;
1503	else {
1504		sock->state = SS_LISTENING;
1505		res = 0;
1506	}
1507
1508	release_sock(sk);
1509	return res;
1510}
1511
1512/**
1513 * accept - wait for connection request
1514 * @sock: listening socket
1515 * @newsock: new socket that is to be connected
1516 * @flags: file-related flags associated with socket
1517 *
1518 * Returns 0 on success, errno otherwise
1519 */
1520
1521static int accept(struct socket *sock, struct socket *new_sock, int flags)
1522{
1523	struct sock *sk = sock->sk;
1524	struct sk_buff *buf;
1525	int res;
1526
1527	lock_sock(sk);
1528
1529	if (sock->state == SS_READY) {
1530		res = -EOPNOTSUPP;
1531		goto exit;
1532	}
1533	if (sock->state != SS_LISTENING) {
1534		res = -EINVAL;
1535		goto exit;
1536	}
1537
1538	while (skb_queue_empty(&sk->sk_receive_queue)) {
1539		if (flags & O_NONBLOCK) {
1540			res = -EWOULDBLOCK;
1541			goto exit;
1542		}
1543		release_sock(sk);
1544		res = wait_event_interruptible(*sk_sleep(sk),
1545				(!skb_queue_empty(&sk->sk_receive_queue)));
1546		lock_sock(sk);
1547		if (res)
1548			goto exit;
1549	}
1550
1551	buf = skb_peek(&sk->sk_receive_queue);
1552
1553	res = tipc_create(sock_net(sock->sk), new_sock, 0, 0);
1554	if (!res) {
1555		struct sock *new_sk = new_sock->sk;
1556		struct tipc_sock *new_tsock = tipc_sk(new_sk);
1557		struct tipc_port *new_tport = new_tsock->p;
1558		u32 new_ref = new_tport->ref;
1559		struct tipc_msg *msg = buf_msg(buf);
1560
1561		lock_sock(new_sk);
1562
1563		/*
1564		 * Reject any stray messages received by new socket
1565		 * before the socket lock was taken (very, very unlikely)
1566		 */
1567
1568		reject_rx_queue(new_sk);
1569
1570		/* Connect new socket to it's peer */
1571
1572		new_tsock->peer_name.ref = msg_origport(msg);
1573		new_tsock->peer_name.node = msg_orignode(msg);
1574		tipc_connect2port(new_ref, &new_tsock->peer_name);
1575		new_sock->state = SS_CONNECTED;
1576
1577		tipc_set_portimportance(new_ref, msg_importance(msg));
1578		if (msg_named(msg)) {
1579			new_tport->conn_type = msg_nametype(msg);
1580			new_tport->conn_instance = msg_nameinst(msg);
1581		}
1582
1583		/*
1584		 * Respond to 'SYN-' by discarding it & returning 'ACK'-.
1585		 * Respond to 'SYN+' by queuing it on new socket.
1586		 */
1587
1588		msg_dbg(msg,"<ACC<: ");
1589		if (!msg_data_sz(msg)) {
1590			struct msghdr m = {NULL,};
1591
1592			advance_rx_queue(sk);
1593			send_packet(NULL, new_sock, &m, 0);
1594		} else {
1595			__skb_dequeue(&sk->sk_receive_queue);
1596			__skb_queue_head(&new_sk->sk_receive_queue, buf);
1597		}
1598		release_sock(new_sk);
1599	}
1600exit:
1601	release_sock(sk);
1602	return res;
1603}
1604
1605/**
1606 * shutdown - shutdown socket connection
1607 * @sock: socket structure
1608 * @how: direction to close (must be SHUT_RDWR)
1609 *
1610 * Terminates connection (if necessary), then purges socket's receive queue.
1611 *
1612 * Returns 0 on success, errno otherwise
1613 */
1614
1615static int shutdown(struct socket *sock, int how)
1616{
1617	struct sock *sk = sock->sk;
1618	struct tipc_port *tport = tipc_sk_port(sk);
1619	struct sk_buff *buf;
1620	int res;
1621
1622	if (how != SHUT_RDWR)
1623		return -EINVAL;
1624
1625	lock_sock(sk);
1626
1627	switch (sock->state) {
1628	case SS_CONNECTING:
1629	case SS_CONNECTED:
1630
1631		/* Disconnect and send a 'FIN+' or 'FIN-' message to peer */
1632restart:
1633		buf = __skb_dequeue(&sk->sk_receive_queue);
1634		if (buf) {
1635			atomic_dec(&tipc_queue_size);
1636			if (TIPC_SKB_CB(buf)->handle != msg_data(buf_msg(buf))) {
1637				buf_discard(buf);
1638				goto restart;
1639			}
1640			tipc_disconnect(tport->ref);
1641			tipc_reject_msg(buf, TIPC_CONN_SHUTDOWN);
1642		} else {
1643			tipc_shutdown(tport->ref);
1644		}
1645
1646		sock->state = SS_DISCONNECTING;
1647
1648		/* fall through */
1649
1650	case SS_DISCONNECTING:
1651
1652		/* Discard any unreceived messages; wake up sleeping tasks */
1653
1654		discard_rx_queue(sk);
1655		if (waitqueue_active(sk_sleep(sk)))
1656			wake_up_interruptible(sk_sleep(sk));
1657		res = 0;
1658		break;
1659
1660	default:
1661		res = -ENOTCONN;
1662	}
1663
1664	release_sock(sk);
1665	return res;
1666}
1667
1668/**
1669 * setsockopt - set socket option
1670 * @sock: socket structure
1671 * @lvl: option level
1672 * @opt: option identifier
1673 * @ov: pointer to new option value
1674 * @ol: length of option value
1675 *
1676 * For stream sockets only, accepts and ignores all IPPROTO_TCP options
1677 * (to ease compatibility).
1678 *
1679 * Returns 0 on success, errno otherwise
1680 */
1681
1682static int setsockopt(struct socket *sock,
1683		      int lvl, int opt, char __user *ov, unsigned int ol)
1684{
1685	struct sock *sk = sock->sk;
1686	struct tipc_port *tport = tipc_sk_port(sk);
1687	u32 value;
1688	int res;
1689
1690	if ((lvl == IPPROTO_TCP) && (sock->type == SOCK_STREAM))
1691		return 0;
1692	if (lvl != SOL_TIPC)
1693		return -ENOPROTOOPT;
1694	if (ol < sizeof(value))
1695		return -EINVAL;
1696	if ((res = get_user(value, (u32 __user *)ov)))
1697		return res;
1698
1699	lock_sock(sk);
1700
1701	switch (opt) {
1702	case TIPC_IMPORTANCE:
1703		res = tipc_set_portimportance(tport->ref, value);
1704		break;
1705	case TIPC_SRC_DROPPABLE:
1706		if (sock->type != SOCK_STREAM)
1707			res = tipc_set_portunreliable(tport->ref, value);
1708		else
1709			res = -ENOPROTOOPT;
1710		break;
1711	case TIPC_DEST_DROPPABLE:
1712		res = tipc_set_portunreturnable(tport->ref, value);
1713		break;
1714	case TIPC_CONN_TIMEOUT:
1715		sk->sk_rcvtimeo = msecs_to_jiffies(value);
1716		/* no need to set "res", since already 0 at this point */
1717		break;
1718	default:
1719		res = -EINVAL;
1720	}
1721
1722	release_sock(sk);
1723
1724	return res;
1725}
1726
1727/**
1728 * getsockopt - get socket option
1729 * @sock: socket structure
1730 * @lvl: option level
1731 * @opt: option identifier
1732 * @ov: receptacle for option value
1733 * @ol: receptacle for length of option value
1734 *
1735 * For stream sockets only, returns 0 length result for all IPPROTO_TCP options
1736 * (to ease compatibility).
1737 *
1738 * Returns 0 on success, errno otherwise
1739 */
1740
1741static int getsockopt(struct socket *sock,
1742		      int lvl, int opt, char __user *ov, int __user *ol)
1743{
1744	struct sock *sk = sock->sk;
1745	struct tipc_port *tport = tipc_sk_port(sk);
1746	int len;
1747	u32 value;
1748	int res;
1749
1750	if ((lvl == IPPROTO_TCP) && (sock->type == SOCK_STREAM))
1751		return put_user(0, ol);
1752	if (lvl != SOL_TIPC)
1753		return -ENOPROTOOPT;
1754	if ((res = get_user(len, ol)))
1755		return res;
1756
1757	lock_sock(sk);
1758
1759	switch (opt) {
1760	case TIPC_IMPORTANCE:
1761		res = tipc_portimportance(tport->ref, &value);
1762		break;
1763	case TIPC_SRC_DROPPABLE:
1764		res = tipc_portunreliable(tport->ref, &value);
1765		break;
1766	case TIPC_DEST_DROPPABLE:
1767		res = tipc_portunreturnable(tport->ref, &value);
1768		break;
1769	case TIPC_CONN_TIMEOUT:
1770		value = jiffies_to_msecs(sk->sk_rcvtimeo);
1771		/* no need to set "res", since already 0 at this point */
1772		break;
1773	 case TIPC_NODE_RECVQ_DEPTH:
1774		value = (u32)atomic_read(&tipc_queue_size);
1775		break;
1776	 case TIPC_SOCK_RECVQ_DEPTH:
1777		value = skb_queue_len(&sk->sk_receive_queue);
1778		break;
1779	default:
1780		res = -EINVAL;
1781	}
1782
1783	release_sock(sk);
1784
1785	if (res) {
1786		/* "get" failed */
1787	}
1788	else if (len < sizeof(value)) {
1789		res = -EINVAL;
1790	}
1791	else if (copy_to_user(ov, &value, sizeof(value))) {
1792		res = -EFAULT;
1793	}
1794	else {
1795		res = put_user(sizeof(value), ol);
1796	}
1797
1798	return res;
1799}
1800
1801/**
1802 * Protocol switches for the various types of TIPC sockets
1803 */
1804
1805static const struct proto_ops msg_ops = {
1806	.owner 		= THIS_MODULE,
1807	.family		= AF_TIPC,
1808	.release	= release,
1809	.bind		= bind,
1810	.connect	= connect,
1811	.socketpair	= sock_no_socketpair,
1812	.accept		= accept,
1813	.getname	= get_name,
1814	.poll		= poll,
1815	.ioctl		= sock_no_ioctl,
1816	.listen		= listen,
1817	.shutdown	= shutdown,
1818	.setsockopt	= setsockopt,
1819	.getsockopt	= getsockopt,
1820	.sendmsg	= send_msg,
1821	.recvmsg	= recv_msg,
1822	.mmap		= sock_no_mmap,
1823	.sendpage	= sock_no_sendpage
1824};
1825
1826static const struct proto_ops packet_ops = {
1827	.owner 		= THIS_MODULE,
1828	.family		= AF_TIPC,
1829	.release	= release,
1830	.bind		= bind,
1831	.connect	= connect,
1832	.socketpair	= sock_no_socketpair,
1833	.accept		= accept,
1834	.getname	= get_name,
1835	.poll		= poll,
1836	.ioctl		= sock_no_ioctl,
1837	.listen		= listen,
1838	.shutdown	= shutdown,
1839	.setsockopt	= setsockopt,
1840	.getsockopt	= getsockopt,
1841	.sendmsg	= send_packet,
1842	.recvmsg	= recv_msg,
1843	.mmap		= sock_no_mmap,
1844	.sendpage	= sock_no_sendpage
1845};
1846
1847static const struct proto_ops stream_ops = {
1848	.owner 		= THIS_MODULE,
1849	.family		= AF_TIPC,
1850	.release	= release,
1851	.bind		= bind,
1852	.connect	= connect,
1853	.socketpair	= sock_no_socketpair,
1854	.accept		= accept,
1855	.getname	= get_name,
1856	.poll		= poll,
1857	.ioctl		= sock_no_ioctl,
1858	.listen		= listen,
1859	.shutdown	= shutdown,
1860	.setsockopt	= setsockopt,
1861	.getsockopt	= getsockopt,
1862	.sendmsg	= send_stream,
1863	.recvmsg	= recv_stream,
1864	.mmap		= sock_no_mmap,
1865	.sendpage	= sock_no_sendpage
1866};
1867
1868static const struct net_proto_family tipc_family_ops = {
1869	.owner 		= THIS_MODULE,
1870	.family		= AF_TIPC,
1871	.create		= tipc_create
1872};
1873
1874static struct proto tipc_proto = {
1875	.name		= "TIPC",
1876	.owner		= THIS_MODULE,
1877	.obj_size	= sizeof(struct tipc_sock)
1878};
1879
1880/**
1881 * tipc_socket_init - initialize TIPC socket interface
1882 *
1883 * Returns 0 on success, errno otherwise
1884 */
1885int tipc_socket_init(void)
1886{
1887	int res;
1888
1889	res = proto_register(&tipc_proto, 1);
1890	if (res) {
1891		err("Failed to register TIPC protocol type\n");
1892		goto out;
1893	}
1894
1895	res = sock_register(&tipc_family_ops);
1896	if (res) {
1897		err("Failed to register TIPC socket type\n");
1898		proto_unregister(&tipc_proto);
1899		goto out;
1900	}
1901
1902	sockets_enabled = 1;
1903 out:
1904	return res;
1905}
1906
1907/**
1908 * tipc_socket_stop - stop TIPC socket interface
1909 */
1910
1911void tipc_socket_stop(void)
1912{
1913	if (!sockets_enabled)
1914		return;
1915
1916	sockets_enabled = 0;
1917	sock_unregister(tipc_family_ops.family);
1918	proto_unregister(&tipc_proto);
1919}
1920
1921