mqueue.c revision 7eafd7c74c3f2e67c27621b987b28397110d643f
1/*
2 * POSIX message queues filesystem for Linux.
3 *
4 * Copyright (C) 2003,2004  Krzysztof Benedyczak    (golbi@mat.uni.torun.pl)
5 *                          Michal Wronski          (michal.wronski@gmail.com)
6 *
7 * Spinlocks:               Mohamed Abbas           (abbas.mohamed@intel.com)
8 * Lockless receive & send, fd based notify:
9 * 			    Manfred Spraul	    (manfred@colorfullife.com)
10 *
11 * Audit:                   George Wilson           (ltcgcw@us.ibm.com)
12 *
13 * This file is released under the GPL.
14 */
15
16#include <linux/capability.h>
17#include <linux/init.h>
18#include <linux/pagemap.h>
19#include <linux/file.h>
20#include <linux/mount.h>
21#include <linux/namei.h>
22#include <linux/sysctl.h>
23#include <linux/poll.h>
24#include <linux/mqueue.h>
25#include <linux/msg.h>
26#include <linux/skbuff.h>
27#include <linux/netlink.h>
28#include <linux/syscalls.h>
29#include <linux/audit.h>
30#include <linux/signal.h>
31#include <linux/mutex.h>
32#include <linux/nsproxy.h>
33#include <linux/pid.h>
34#include <linux/ipc_namespace.h>
35
36#include <net/sock.h>
37#include "util.h"
38
39#define MQUEUE_MAGIC	0x19800202
40#define DIRENT_SIZE	20
41#define FILENT_SIZE	80
42
43#define SEND		0
44#define RECV		1
45
46#define STATE_NONE	0
47#define STATE_PENDING	1
48#define STATE_READY	2
49
50/*
51 * Define the ranges various user-specified maximum values can
52 * be set to.
53 */
54#define MIN_MSGMAX	1		/* min value for msg_max */
55#define MAX_MSGMAX	HARD_MSGMAX	/* max value for msg_max */
56#define MIN_MSGSIZEMAX	128		/* min value for msgsize_max */
57#define MAX_MSGSIZEMAX	(8192*128)	/* max value for msgsize_max */
58
59struct ext_wait_queue {		/* queue of sleeping tasks */
60	struct task_struct *task;
61	struct list_head list;
62	struct msg_msg *msg;	/* ptr of loaded message */
63	int state;		/* one of STATE_* values */
64};
65
66struct mqueue_inode_info {
67	spinlock_t lock;
68	struct inode vfs_inode;
69	wait_queue_head_t wait_q;
70
71	struct msg_msg **messages;
72	struct mq_attr attr;
73
74	struct sigevent notify;
75	struct pid* notify_owner;
76	struct user_struct *user;	/* user who created, for accounting */
77	struct sock *notify_sock;
78	struct sk_buff *notify_cookie;
79
80	/* for tasks waiting for free space and messages, respectively */
81	struct ext_wait_queue e_wait_q[2];
82
83	unsigned long qsize; /* size of queue in memory (sum of all msgs) */
84};
85
86static const struct inode_operations mqueue_dir_inode_operations;
87static const struct file_operations mqueue_file_operations;
88static struct super_operations mqueue_super_ops;
89static void remove_notification(struct mqueue_inode_info *info);
90
91static struct kmem_cache *mqueue_inode_cachep;
92
93static struct ctl_table_header * mq_sysctl_table;
94
95static inline struct mqueue_inode_info *MQUEUE_I(struct inode *inode)
96{
97	return container_of(inode, struct mqueue_inode_info, vfs_inode);
98}
99
100/*
101 * This routine should be called with the mq_lock held.
102 */
103static inline struct ipc_namespace *__get_ns_from_inode(struct inode *inode)
104{
105	return get_ipc_ns(inode->i_sb->s_fs_info);
106}
107
108static struct ipc_namespace *get_ns_from_inode(struct inode *inode)
109{
110	struct ipc_namespace *ns;
111
112	spin_lock(&mq_lock);
113	ns = __get_ns_from_inode(inode);
114	spin_unlock(&mq_lock);
115	return ns;
116}
117
118static struct inode *mqueue_get_inode(struct super_block *sb,
119		struct ipc_namespace *ipc_ns, int mode,
120		struct mq_attr *attr)
121{
122	struct user_struct *u = current_user();
123	struct inode *inode;
124
125	inode = new_inode(sb);
126	if (inode) {
127		inode->i_mode = mode;
128		inode->i_uid = current_fsuid();
129		inode->i_gid = current_fsgid();
130		inode->i_mtime = inode->i_ctime = inode->i_atime =
131				CURRENT_TIME;
132
133		if (S_ISREG(mode)) {
134			struct mqueue_inode_info *info;
135			struct task_struct *p = current;
136			unsigned long mq_bytes, mq_msg_tblsz;
137
138			inode->i_fop = &mqueue_file_operations;
139			inode->i_size = FILENT_SIZE;
140			/* mqueue specific info */
141			info = MQUEUE_I(inode);
142			spin_lock_init(&info->lock);
143			init_waitqueue_head(&info->wait_q);
144			INIT_LIST_HEAD(&info->e_wait_q[0].list);
145			INIT_LIST_HEAD(&info->e_wait_q[1].list);
146			info->messages = NULL;
147			info->notify_owner = NULL;
148			info->qsize = 0;
149			info->user = NULL;	/* set when all is ok */
150			memset(&info->attr, 0, sizeof(info->attr));
151			info->attr.mq_maxmsg = ipc_ns->mq_msg_max;
152			info->attr.mq_msgsize = ipc_ns->mq_msgsize_max;
153			if (attr) {
154				info->attr.mq_maxmsg = attr->mq_maxmsg;
155				info->attr.mq_msgsize = attr->mq_msgsize;
156			}
157			mq_msg_tblsz = info->attr.mq_maxmsg * sizeof(struct msg_msg *);
158			mq_bytes = (mq_msg_tblsz +
159				(info->attr.mq_maxmsg * info->attr.mq_msgsize));
160
161			spin_lock(&mq_lock);
162			if (u->mq_bytes + mq_bytes < u->mq_bytes ||
163		 	    u->mq_bytes + mq_bytes >
164			    p->signal->rlim[RLIMIT_MSGQUEUE].rlim_cur) {
165				spin_unlock(&mq_lock);
166				goto out_inode;
167			}
168			u->mq_bytes += mq_bytes;
169			spin_unlock(&mq_lock);
170
171			info->messages = kmalloc(mq_msg_tblsz, GFP_KERNEL);
172			if (!info->messages) {
173				spin_lock(&mq_lock);
174				u->mq_bytes -= mq_bytes;
175				spin_unlock(&mq_lock);
176				goto out_inode;
177			}
178			/* all is ok */
179			info->user = get_uid(u);
180		} else if (S_ISDIR(mode)) {
181			inc_nlink(inode);
182			/* Some things misbehave if size == 0 on a directory */
183			inode->i_size = 2 * DIRENT_SIZE;
184			inode->i_op = &mqueue_dir_inode_operations;
185			inode->i_fop = &simple_dir_operations;
186		}
187	}
188	return inode;
189out_inode:
190	make_bad_inode(inode);
191	iput(inode);
192	return NULL;
193}
194
195static int mqueue_fill_super(struct super_block *sb, void *data, int silent)
196{
197	struct inode *inode;
198	struct ipc_namespace *ns = data;
199	int error = 0;
200
201	sb->s_blocksize = PAGE_CACHE_SIZE;
202	sb->s_blocksize_bits = PAGE_CACHE_SHIFT;
203	sb->s_magic = MQUEUE_MAGIC;
204	sb->s_op = &mqueue_super_ops;
205
206	inode = mqueue_get_inode(sb, ns, S_IFDIR | S_ISVTX | S_IRWXUGO,
207				NULL);
208	if (!inode) {
209		error = -ENOMEM;
210		goto out;
211	}
212
213	sb->s_root = d_alloc_root(inode);
214	if (!sb->s_root) {
215		iput(inode);
216		error = -ENOMEM;
217	}
218
219out:
220	return error;
221}
222
223static int mqueue_get_sb(struct file_system_type *fs_type,
224			 int flags, const char *dev_name,
225			 void *data, struct vfsmount *mnt)
226{
227	if (!(flags & MS_KERNMOUNT))
228		data = current->nsproxy->ipc_ns;
229	return get_sb_ns(fs_type, flags, data, mqueue_fill_super, mnt);
230}
231
232static void init_once(void *foo)
233{
234	struct mqueue_inode_info *p = (struct mqueue_inode_info *) foo;
235
236	inode_init_once(&p->vfs_inode);
237}
238
239static struct inode *mqueue_alloc_inode(struct super_block *sb)
240{
241	struct mqueue_inode_info *ei;
242
243	ei = kmem_cache_alloc(mqueue_inode_cachep, GFP_KERNEL);
244	if (!ei)
245		return NULL;
246	return &ei->vfs_inode;
247}
248
249static void mqueue_destroy_inode(struct inode *inode)
250{
251	kmem_cache_free(mqueue_inode_cachep, MQUEUE_I(inode));
252}
253
254static void mqueue_delete_inode(struct inode *inode)
255{
256	struct mqueue_inode_info *info;
257	struct user_struct *user;
258	unsigned long mq_bytes;
259	int i;
260	struct ipc_namespace *ipc_ns;
261
262	if (S_ISDIR(inode->i_mode)) {
263		clear_inode(inode);
264		return;
265	}
266	ipc_ns = get_ns_from_inode(inode);
267	info = MQUEUE_I(inode);
268	spin_lock(&info->lock);
269	for (i = 0; i < info->attr.mq_curmsgs; i++)
270		free_msg(info->messages[i]);
271	kfree(info->messages);
272	spin_unlock(&info->lock);
273
274	clear_inode(inode);
275
276	mq_bytes = (info->attr.mq_maxmsg * sizeof(struct msg_msg *) +
277		   (info->attr.mq_maxmsg * info->attr.mq_msgsize));
278	user = info->user;
279	if (user) {
280		spin_lock(&mq_lock);
281		user->mq_bytes -= mq_bytes;
282		/*
283		 * get_ns_from_inode() ensures that the
284		 * (ipc_ns = sb->s_fs_info) is either a valid ipc_ns
285		 * to which we now hold a reference, or it is NULL.
286		 * We can't put it here under mq_lock, though.
287		 */
288		if (ipc_ns)
289			ipc_ns->mq_queues_count--;
290		spin_unlock(&mq_lock);
291		free_uid(user);
292	}
293	if (ipc_ns)
294		put_ipc_ns(ipc_ns);
295}
296
297static int mqueue_create(struct inode *dir, struct dentry *dentry,
298				int mode, struct nameidata *nd)
299{
300	struct inode *inode;
301	struct mq_attr *attr = dentry->d_fsdata;
302	int error;
303	struct ipc_namespace *ipc_ns;
304
305	spin_lock(&mq_lock);
306	ipc_ns = __get_ns_from_inode(dir);
307	if (!ipc_ns) {
308		error = -EACCES;
309		goto out_unlock;
310	}
311	if (ipc_ns->mq_queues_count >= ipc_ns->mq_queues_max &&
312			!capable(CAP_SYS_RESOURCE)) {
313		error = -ENOSPC;
314		goto out_unlock;
315	}
316	ipc_ns->mq_queues_count++;
317	spin_unlock(&mq_lock);
318
319	inode = mqueue_get_inode(dir->i_sb, ipc_ns, mode, attr);
320	if (!inode) {
321		error = -ENOMEM;
322		spin_lock(&mq_lock);
323		ipc_ns->mq_queues_count--;
324		goto out_unlock;
325	}
326
327	put_ipc_ns(ipc_ns);
328	dir->i_size += DIRENT_SIZE;
329	dir->i_ctime = dir->i_mtime = dir->i_atime = CURRENT_TIME;
330
331	d_instantiate(dentry, inode);
332	dget(dentry);
333	return 0;
334out_unlock:
335	spin_unlock(&mq_lock);
336	if (ipc_ns)
337		put_ipc_ns(ipc_ns);
338	return error;
339}
340
341static int mqueue_unlink(struct inode *dir, struct dentry *dentry)
342{
343  	struct inode *inode = dentry->d_inode;
344
345	dir->i_ctime = dir->i_mtime = dir->i_atime = CURRENT_TIME;
346	dir->i_size -= DIRENT_SIZE;
347  	drop_nlink(inode);
348  	dput(dentry);
349  	return 0;
350}
351
352/*
353*	This is routine for system read from queue file.
354*	To avoid mess with doing here some sort of mq_receive we allow
355*	to read only queue size & notification info (the only values
356*	that are interesting from user point of view and aren't accessible
357*	through std routines)
358*/
359static ssize_t mqueue_read_file(struct file *filp, char __user *u_data,
360				size_t count, loff_t *off)
361{
362	struct mqueue_inode_info *info = MQUEUE_I(filp->f_path.dentry->d_inode);
363	char buffer[FILENT_SIZE];
364	ssize_t ret;
365
366	spin_lock(&info->lock);
367	snprintf(buffer, sizeof(buffer),
368			"QSIZE:%-10lu NOTIFY:%-5d SIGNO:%-5d NOTIFY_PID:%-6d\n",
369			info->qsize,
370			info->notify_owner ? info->notify.sigev_notify : 0,
371			(info->notify_owner &&
372			 info->notify.sigev_notify == SIGEV_SIGNAL) ?
373				info->notify.sigev_signo : 0,
374			pid_vnr(info->notify_owner));
375	spin_unlock(&info->lock);
376	buffer[sizeof(buffer)-1] = '\0';
377
378	ret = simple_read_from_buffer(u_data, count, off, buffer,
379				strlen(buffer));
380	if (ret <= 0)
381		return ret;
382
383	filp->f_path.dentry->d_inode->i_atime = filp->f_path.dentry->d_inode->i_ctime = CURRENT_TIME;
384	return ret;
385}
386
387static int mqueue_flush_file(struct file *filp, fl_owner_t id)
388{
389	struct mqueue_inode_info *info = MQUEUE_I(filp->f_path.dentry->d_inode);
390
391	spin_lock(&info->lock);
392	if (task_tgid(current) == info->notify_owner)
393		remove_notification(info);
394
395	spin_unlock(&info->lock);
396	return 0;
397}
398
399static unsigned int mqueue_poll_file(struct file *filp, struct poll_table_struct *poll_tab)
400{
401	struct mqueue_inode_info *info = MQUEUE_I(filp->f_path.dentry->d_inode);
402	int retval = 0;
403
404	poll_wait(filp, &info->wait_q, poll_tab);
405
406	spin_lock(&info->lock);
407	if (info->attr.mq_curmsgs)
408		retval = POLLIN | POLLRDNORM;
409
410	if (info->attr.mq_curmsgs < info->attr.mq_maxmsg)
411		retval |= POLLOUT | POLLWRNORM;
412	spin_unlock(&info->lock);
413
414	return retval;
415}
416
417/* Adds current to info->e_wait_q[sr] before element with smaller prio */
418static void wq_add(struct mqueue_inode_info *info, int sr,
419			struct ext_wait_queue *ewp)
420{
421	struct ext_wait_queue *walk;
422
423	ewp->task = current;
424
425	list_for_each_entry(walk, &info->e_wait_q[sr].list, list) {
426		if (walk->task->static_prio <= current->static_prio) {
427			list_add_tail(&ewp->list, &walk->list);
428			return;
429		}
430	}
431	list_add_tail(&ewp->list, &info->e_wait_q[sr].list);
432}
433
434/*
435 * Puts current task to sleep. Caller must hold queue lock. After return
436 * lock isn't held.
437 * sr: SEND or RECV
438 */
439static int wq_sleep(struct mqueue_inode_info *info, int sr,
440			long timeout, struct ext_wait_queue *ewp)
441{
442	int retval;
443	signed long time;
444
445	wq_add(info, sr, ewp);
446
447	for (;;) {
448		set_current_state(TASK_INTERRUPTIBLE);
449
450		spin_unlock(&info->lock);
451		time = schedule_timeout(timeout);
452
453		while (ewp->state == STATE_PENDING)
454			cpu_relax();
455
456		if (ewp->state == STATE_READY) {
457			retval = 0;
458			goto out;
459		}
460		spin_lock(&info->lock);
461		if (ewp->state == STATE_READY) {
462			retval = 0;
463			goto out_unlock;
464		}
465		if (signal_pending(current)) {
466			retval = -ERESTARTSYS;
467			break;
468		}
469		if (time == 0) {
470			retval = -ETIMEDOUT;
471			break;
472		}
473	}
474	list_del(&ewp->list);
475out_unlock:
476	spin_unlock(&info->lock);
477out:
478	return retval;
479}
480
481/*
482 * Returns waiting task that should be serviced first or NULL if none exists
483 */
484static struct ext_wait_queue *wq_get_first_waiter(
485		struct mqueue_inode_info *info, int sr)
486{
487	struct list_head *ptr;
488
489	ptr = info->e_wait_q[sr].list.prev;
490	if (ptr == &info->e_wait_q[sr].list)
491		return NULL;
492	return list_entry(ptr, struct ext_wait_queue, list);
493}
494
495/* Auxiliary functions to manipulate messages' list */
496static void msg_insert(struct msg_msg *ptr, struct mqueue_inode_info *info)
497{
498	int k;
499
500	k = info->attr.mq_curmsgs - 1;
501	while (k >= 0 && info->messages[k]->m_type >= ptr->m_type) {
502		info->messages[k + 1] = info->messages[k];
503		k--;
504	}
505	info->attr.mq_curmsgs++;
506	info->qsize += ptr->m_ts;
507	info->messages[k + 1] = ptr;
508}
509
510static inline struct msg_msg *msg_get(struct mqueue_inode_info *info)
511{
512	info->qsize -= info->messages[--info->attr.mq_curmsgs]->m_ts;
513	return info->messages[info->attr.mq_curmsgs];
514}
515
516static inline void set_cookie(struct sk_buff *skb, char code)
517{
518	((char*)skb->data)[NOTIFY_COOKIE_LEN-1] = code;
519}
520
521/*
522 * The next function is only to split too long sys_mq_timedsend
523 */
524static void __do_notify(struct mqueue_inode_info *info)
525{
526	/* notification
527	 * invoked when there is registered process and there isn't process
528	 * waiting synchronously for message AND state of queue changed from
529	 * empty to not empty. Here we are sure that no one is waiting
530	 * synchronously. */
531	if (info->notify_owner &&
532	    info->attr.mq_curmsgs == 1) {
533		struct siginfo sig_i;
534		switch (info->notify.sigev_notify) {
535		case SIGEV_NONE:
536			break;
537		case SIGEV_SIGNAL:
538			/* sends signal */
539
540			sig_i.si_signo = info->notify.sigev_signo;
541			sig_i.si_errno = 0;
542			sig_i.si_code = SI_MESGQ;
543			sig_i.si_value = info->notify.sigev_value;
544			sig_i.si_pid = task_tgid_nr_ns(current,
545						ns_of_pid(info->notify_owner));
546			sig_i.si_uid = current_uid();
547
548			kill_pid_info(info->notify.sigev_signo,
549				      &sig_i, info->notify_owner);
550			break;
551		case SIGEV_THREAD:
552			set_cookie(info->notify_cookie, NOTIFY_WOKENUP);
553			netlink_sendskb(info->notify_sock, info->notify_cookie);
554			break;
555		}
556		/* after notification unregisters process */
557		put_pid(info->notify_owner);
558		info->notify_owner = NULL;
559	}
560	wake_up(&info->wait_q);
561}
562
563static long prepare_timeout(struct timespec *p)
564{
565	struct timespec nowts;
566	long timeout;
567
568	if (p) {
569		if (unlikely(p->tv_nsec < 0 || p->tv_sec < 0
570			|| p->tv_nsec >= NSEC_PER_SEC))
571			return -EINVAL;
572		nowts = CURRENT_TIME;
573		/* first subtract as jiffies can't be too big */
574		p->tv_sec -= nowts.tv_sec;
575		if (p->tv_nsec < nowts.tv_nsec) {
576			p->tv_nsec += NSEC_PER_SEC;
577			p->tv_sec--;
578		}
579		p->tv_nsec -= nowts.tv_nsec;
580		if (p->tv_sec < 0)
581			return 0;
582
583		timeout = timespec_to_jiffies(p) + 1;
584	} else
585		return MAX_SCHEDULE_TIMEOUT;
586
587	return timeout;
588}
589
590static void remove_notification(struct mqueue_inode_info *info)
591{
592	if (info->notify_owner != NULL &&
593	    info->notify.sigev_notify == SIGEV_THREAD) {
594		set_cookie(info->notify_cookie, NOTIFY_REMOVED);
595		netlink_sendskb(info->notify_sock, info->notify_cookie);
596	}
597	put_pid(info->notify_owner);
598	info->notify_owner = NULL;
599}
600
601static int mq_attr_ok(struct ipc_namespace *ipc_ns, struct mq_attr *attr)
602{
603	if (attr->mq_maxmsg <= 0 || attr->mq_msgsize <= 0)
604		return 0;
605	if (capable(CAP_SYS_RESOURCE)) {
606		if (attr->mq_maxmsg > HARD_MSGMAX)
607			return 0;
608	} else {
609		if (attr->mq_maxmsg > ipc_ns->mq_msg_max ||
610				attr->mq_msgsize > ipc_ns->mq_msgsize_max)
611			return 0;
612	}
613	/* check for overflow */
614	if (attr->mq_msgsize > ULONG_MAX/attr->mq_maxmsg)
615		return 0;
616	if ((unsigned long)(attr->mq_maxmsg * attr->mq_msgsize) +
617	    (attr->mq_maxmsg * sizeof (struct msg_msg *)) <
618	    (unsigned long)(attr->mq_maxmsg * attr->mq_msgsize))
619		return 0;
620	return 1;
621}
622
623/*
624 * Invoked when creating a new queue via sys_mq_open
625 */
626static struct file *do_create(struct ipc_namespace *ipc_ns, struct dentry *dir,
627			struct dentry *dentry, int oflag, mode_t mode,
628			struct mq_attr *attr)
629{
630	const struct cred *cred = current_cred();
631	struct file *result;
632	int ret;
633
634	if (attr) {
635		ret = -EINVAL;
636		if (!mq_attr_ok(ipc_ns, attr))
637			goto out;
638		/* store for use during create */
639		dentry->d_fsdata = attr;
640	}
641
642	mode &= ~current_umask();
643	ret = mnt_want_write(ipc_ns->mq_mnt);
644	if (ret)
645		goto out;
646	ret = vfs_create(dir->d_inode, dentry, mode, NULL);
647	dentry->d_fsdata = NULL;
648	if (ret)
649		goto out_drop_write;
650
651	result = dentry_open(dentry, ipc_ns->mq_mnt, oflag, cred);
652	/*
653	 * dentry_open() took a persistent mnt_want_write(),
654	 * so we can now drop this one.
655	 */
656	mnt_drop_write(ipc_ns->mq_mnt);
657	return result;
658
659out_drop_write:
660	mnt_drop_write(ipc_ns->mq_mnt);
661out:
662	dput(dentry);
663	mntput(ipc_ns->mq_mnt);
664	return ERR_PTR(ret);
665}
666
667/* Opens existing queue */
668static struct file *do_open(struct ipc_namespace *ipc_ns,
669				struct dentry *dentry, int oflag)
670{
671	const struct cred *cred = current_cred();
672
673	static const int oflag2acc[O_ACCMODE] = { MAY_READ, MAY_WRITE,
674						  MAY_READ | MAY_WRITE };
675
676	if ((oflag & O_ACCMODE) == (O_RDWR | O_WRONLY)) {
677		dput(dentry);
678		mntput(ipc_ns->mq_mnt);
679		return ERR_PTR(-EINVAL);
680	}
681
682	if (inode_permission(dentry->d_inode, oflag2acc[oflag & O_ACCMODE])) {
683		dput(dentry);
684		mntput(ipc_ns->mq_mnt);
685		return ERR_PTR(-EACCES);
686	}
687
688	return dentry_open(dentry, ipc_ns->mq_mnt, oflag, cred);
689}
690
691SYSCALL_DEFINE4(mq_open, const char __user *, u_name, int, oflag, mode_t, mode,
692		struct mq_attr __user *, u_attr)
693{
694	struct dentry *dentry;
695	struct file *filp;
696	char *name;
697	struct mq_attr attr;
698	int fd, error;
699	struct ipc_namespace *ipc_ns = current->nsproxy->ipc_ns;
700
701	if (u_attr && copy_from_user(&attr, u_attr, sizeof(struct mq_attr)))
702		return -EFAULT;
703
704	audit_mq_open(oflag, mode, u_attr ? &attr : NULL);
705
706	if (IS_ERR(name = getname(u_name)))
707		return PTR_ERR(name);
708
709	fd = get_unused_fd_flags(O_CLOEXEC);
710	if (fd < 0)
711		goto out_putname;
712
713	mutex_lock(&ipc_ns->mq_mnt->mnt_root->d_inode->i_mutex);
714	dentry = lookup_one_len(name, ipc_ns->mq_mnt->mnt_root, strlen(name));
715	if (IS_ERR(dentry)) {
716		error = PTR_ERR(dentry);
717		goto out_err;
718	}
719	mntget(ipc_ns->mq_mnt);
720
721	if (oflag & O_CREAT) {
722		if (dentry->d_inode) {	/* entry already exists */
723			audit_inode(name, dentry);
724			error = -EEXIST;
725			if (oflag & O_EXCL)
726				goto out;
727			filp = do_open(ipc_ns, dentry, oflag);
728		} else {
729			filp = do_create(ipc_ns, ipc_ns->mq_mnt->mnt_root,
730						dentry, oflag, mode,
731						u_attr ? &attr : NULL);
732		}
733	} else {
734		error = -ENOENT;
735		if (!dentry->d_inode)
736			goto out;
737		audit_inode(name, dentry);
738		filp = do_open(ipc_ns, dentry, oflag);
739	}
740
741	if (IS_ERR(filp)) {
742		error = PTR_ERR(filp);
743		goto out_putfd;
744	}
745
746	fd_install(fd, filp);
747	goto out_upsem;
748
749out:
750	dput(dentry);
751	mntput(ipc_ns->mq_mnt);
752out_putfd:
753	put_unused_fd(fd);
754out_err:
755	fd = error;
756out_upsem:
757	mutex_unlock(&ipc_ns->mq_mnt->mnt_root->d_inode->i_mutex);
758out_putname:
759	putname(name);
760	return fd;
761}
762
763SYSCALL_DEFINE1(mq_unlink, const char __user *, u_name)
764{
765	int err;
766	char *name;
767	struct dentry *dentry;
768	struct inode *inode = NULL;
769	struct ipc_namespace *ipc_ns = current->nsproxy->ipc_ns;
770
771	name = getname(u_name);
772	if (IS_ERR(name))
773		return PTR_ERR(name);
774
775	mutex_lock_nested(&ipc_ns->mq_mnt->mnt_root->d_inode->i_mutex,
776			I_MUTEX_PARENT);
777	dentry = lookup_one_len(name, ipc_ns->mq_mnt->mnt_root, strlen(name));
778	if (IS_ERR(dentry)) {
779		err = PTR_ERR(dentry);
780		goto out_unlock;
781	}
782
783	if (!dentry->d_inode) {
784		err = -ENOENT;
785		goto out_err;
786	}
787
788	inode = dentry->d_inode;
789	if (inode)
790		atomic_inc(&inode->i_count);
791	err = mnt_want_write(ipc_ns->mq_mnt);
792	if (err)
793		goto out_err;
794	err = vfs_unlink(dentry->d_parent->d_inode, dentry);
795	mnt_drop_write(ipc_ns->mq_mnt);
796out_err:
797	dput(dentry);
798
799out_unlock:
800	mutex_unlock(&ipc_ns->mq_mnt->mnt_root->d_inode->i_mutex);
801	putname(name);
802	if (inode)
803		iput(inode);
804
805	return err;
806}
807
808/* Pipelined send and receive functions.
809 *
810 * If a receiver finds no waiting message, then it registers itself in the
811 * list of waiting receivers. A sender checks that list before adding the new
812 * message into the message array. If there is a waiting receiver, then it
813 * bypasses the message array and directly hands the message over to the
814 * receiver.
815 * The receiver accepts the message and returns without grabbing the queue
816 * spinlock. Therefore an intermediate STATE_PENDING state and memory barriers
817 * are necessary. The same algorithm is used for sysv semaphores, see
818 * ipc/sem.c for more details.
819 *
820 * The same algorithm is used for senders.
821 */
822
823/* pipelined_send() - send a message directly to the task waiting in
824 * sys_mq_timedreceive() (without inserting message into a queue).
825 */
826static inline void pipelined_send(struct mqueue_inode_info *info,
827				  struct msg_msg *message,
828				  struct ext_wait_queue *receiver)
829{
830	receiver->msg = message;
831	list_del(&receiver->list);
832	receiver->state = STATE_PENDING;
833	wake_up_process(receiver->task);
834	smp_wmb();
835	receiver->state = STATE_READY;
836}
837
838/* pipelined_receive() - if there is task waiting in sys_mq_timedsend()
839 * gets its message and put to the queue (we have one free place for sure). */
840static inline void pipelined_receive(struct mqueue_inode_info *info)
841{
842	struct ext_wait_queue *sender = wq_get_first_waiter(info, SEND);
843
844	if (!sender) {
845		/* for poll */
846		wake_up_interruptible(&info->wait_q);
847		return;
848	}
849	msg_insert(sender->msg, info);
850	list_del(&sender->list);
851	sender->state = STATE_PENDING;
852	wake_up_process(sender->task);
853	smp_wmb();
854	sender->state = STATE_READY;
855}
856
857SYSCALL_DEFINE5(mq_timedsend, mqd_t, mqdes, const char __user *, u_msg_ptr,
858		size_t, msg_len, unsigned int, msg_prio,
859		const struct timespec __user *, u_abs_timeout)
860{
861	struct file *filp;
862	struct inode *inode;
863	struct ext_wait_queue wait;
864	struct ext_wait_queue *receiver;
865	struct msg_msg *msg_ptr;
866	struct mqueue_inode_info *info;
867	struct timespec ts, *p = NULL;
868	long timeout;
869	int ret;
870
871	if (u_abs_timeout) {
872		if (copy_from_user(&ts, u_abs_timeout,
873					sizeof(struct timespec)))
874			return -EFAULT;
875		p = &ts;
876	}
877
878	if (unlikely(msg_prio >= (unsigned long) MQ_PRIO_MAX))
879		return -EINVAL;
880
881	audit_mq_sendrecv(mqdes, msg_len, msg_prio, p);
882	timeout = prepare_timeout(p);
883
884	ret = -EBADF;
885	filp = fget(mqdes);
886	if (unlikely(!filp))
887		goto out;
888
889	inode = filp->f_path.dentry->d_inode;
890	if (unlikely(filp->f_op != &mqueue_file_operations))
891		goto out_fput;
892	info = MQUEUE_I(inode);
893	audit_inode(NULL, filp->f_path.dentry);
894
895	if (unlikely(!(filp->f_mode & FMODE_WRITE)))
896		goto out_fput;
897
898	if (unlikely(msg_len > info->attr.mq_msgsize)) {
899		ret = -EMSGSIZE;
900		goto out_fput;
901	}
902
903	/* First try to allocate memory, before doing anything with
904	 * existing queues. */
905	msg_ptr = load_msg(u_msg_ptr, msg_len);
906	if (IS_ERR(msg_ptr)) {
907		ret = PTR_ERR(msg_ptr);
908		goto out_fput;
909	}
910	msg_ptr->m_ts = msg_len;
911	msg_ptr->m_type = msg_prio;
912
913	spin_lock(&info->lock);
914
915	if (info->attr.mq_curmsgs == info->attr.mq_maxmsg) {
916		if (filp->f_flags & O_NONBLOCK) {
917			spin_unlock(&info->lock);
918			ret = -EAGAIN;
919		} else if (unlikely(timeout < 0)) {
920			spin_unlock(&info->lock);
921			ret = timeout;
922		} else {
923			wait.task = current;
924			wait.msg = (void *) msg_ptr;
925			wait.state = STATE_NONE;
926			ret = wq_sleep(info, SEND, timeout, &wait);
927		}
928		if (ret < 0)
929			free_msg(msg_ptr);
930	} else {
931		receiver = wq_get_first_waiter(info, RECV);
932		if (receiver) {
933			pipelined_send(info, msg_ptr, receiver);
934		} else {
935			/* adds message to the queue */
936			msg_insert(msg_ptr, info);
937			__do_notify(info);
938		}
939		inode->i_atime = inode->i_mtime = inode->i_ctime =
940				CURRENT_TIME;
941		spin_unlock(&info->lock);
942		ret = 0;
943	}
944out_fput:
945	fput(filp);
946out:
947	return ret;
948}
949
950SYSCALL_DEFINE5(mq_timedreceive, mqd_t, mqdes, char __user *, u_msg_ptr,
951		size_t, msg_len, unsigned int __user *, u_msg_prio,
952		const struct timespec __user *, u_abs_timeout)
953{
954	long timeout;
955	ssize_t ret;
956	struct msg_msg *msg_ptr;
957	struct file *filp;
958	struct inode *inode;
959	struct mqueue_inode_info *info;
960	struct ext_wait_queue wait;
961	struct timespec ts, *p = NULL;
962
963	if (u_abs_timeout) {
964		if (copy_from_user(&ts, u_abs_timeout,
965					sizeof(struct timespec)))
966			return -EFAULT;
967		p = &ts;
968	}
969
970	audit_mq_sendrecv(mqdes, msg_len, 0, p);
971	timeout = prepare_timeout(p);
972
973	ret = -EBADF;
974	filp = fget(mqdes);
975	if (unlikely(!filp))
976		goto out;
977
978	inode = filp->f_path.dentry->d_inode;
979	if (unlikely(filp->f_op != &mqueue_file_operations))
980		goto out_fput;
981	info = MQUEUE_I(inode);
982	audit_inode(NULL, filp->f_path.dentry);
983
984	if (unlikely(!(filp->f_mode & FMODE_READ)))
985		goto out_fput;
986
987	/* checks if buffer is big enough */
988	if (unlikely(msg_len < info->attr.mq_msgsize)) {
989		ret = -EMSGSIZE;
990		goto out_fput;
991	}
992
993	spin_lock(&info->lock);
994	if (info->attr.mq_curmsgs == 0) {
995		if (filp->f_flags & O_NONBLOCK) {
996			spin_unlock(&info->lock);
997			ret = -EAGAIN;
998			msg_ptr = NULL;
999		} else if (unlikely(timeout < 0)) {
1000			spin_unlock(&info->lock);
1001			ret = timeout;
1002			msg_ptr = NULL;
1003		} else {
1004			wait.task = current;
1005			wait.state = STATE_NONE;
1006			ret = wq_sleep(info, RECV, timeout, &wait);
1007			msg_ptr = wait.msg;
1008		}
1009	} else {
1010		msg_ptr = msg_get(info);
1011
1012		inode->i_atime = inode->i_mtime = inode->i_ctime =
1013				CURRENT_TIME;
1014
1015		/* There is now free space in queue. */
1016		pipelined_receive(info);
1017		spin_unlock(&info->lock);
1018		ret = 0;
1019	}
1020	if (ret == 0) {
1021		ret = msg_ptr->m_ts;
1022
1023		if ((u_msg_prio && put_user(msg_ptr->m_type, u_msg_prio)) ||
1024			store_msg(u_msg_ptr, msg_ptr, msg_ptr->m_ts)) {
1025			ret = -EFAULT;
1026		}
1027		free_msg(msg_ptr);
1028	}
1029out_fput:
1030	fput(filp);
1031out:
1032	return ret;
1033}
1034
1035/*
1036 * Notes: the case when user wants us to deregister (with NULL as pointer)
1037 * and he isn't currently owner of notification, will be silently discarded.
1038 * It isn't explicitly defined in the POSIX.
1039 */
1040SYSCALL_DEFINE2(mq_notify, mqd_t, mqdes,
1041		const struct sigevent __user *, u_notification)
1042{
1043	int ret;
1044	struct file *filp;
1045	struct sock *sock;
1046	struct inode *inode;
1047	struct sigevent notification;
1048	struct mqueue_inode_info *info;
1049	struct sk_buff *nc;
1050
1051	if (u_notification) {
1052		if (copy_from_user(&notification, u_notification,
1053					sizeof(struct sigevent)))
1054			return -EFAULT;
1055	}
1056
1057	audit_mq_notify(mqdes, u_notification ? &notification : NULL);
1058
1059	nc = NULL;
1060	sock = NULL;
1061	if (u_notification != NULL) {
1062		if (unlikely(notification.sigev_notify != SIGEV_NONE &&
1063			     notification.sigev_notify != SIGEV_SIGNAL &&
1064			     notification.sigev_notify != SIGEV_THREAD))
1065			return -EINVAL;
1066		if (notification.sigev_notify == SIGEV_SIGNAL &&
1067			!valid_signal(notification.sigev_signo)) {
1068			return -EINVAL;
1069		}
1070		if (notification.sigev_notify == SIGEV_THREAD) {
1071			long timeo;
1072
1073			/* create the notify skb */
1074			nc = alloc_skb(NOTIFY_COOKIE_LEN, GFP_KERNEL);
1075			ret = -ENOMEM;
1076			if (!nc)
1077				goto out;
1078			ret = -EFAULT;
1079			if (copy_from_user(nc->data,
1080					notification.sigev_value.sival_ptr,
1081					NOTIFY_COOKIE_LEN)) {
1082				goto out;
1083			}
1084
1085			/* TODO: add a header? */
1086			skb_put(nc, NOTIFY_COOKIE_LEN);
1087			/* and attach it to the socket */
1088retry:
1089			filp = fget(notification.sigev_signo);
1090			ret = -EBADF;
1091			if (!filp)
1092				goto out;
1093			sock = netlink_getsockbyfilp(filp);
1094			fput(filp);
1095			if (IS_ERR(sock)) {
1096				ret = PTR_ERR(sock);
1097				sock = NULL;
1098				goto out;
1099			}
1100
1101			timeo = MAX_SCHEDULE_TIMEOUT;
1102			ret = netlink_attachskb(sock, nc, &timeo, NULL);
1103			if (ret == 1)
1104		       		goto retry;
1105			if (ret) {
1106				sock = NULL;
1107				nc = NULL;
1108				goto out;
1109			}
1110		}
1111	}
1112
1113	ret = -EBADF;
1114	filp = fget(mqdes);
1115	if (!filp)
1116		goto out;
1117
1118	inode = filp->f_path.dentry->d_inode;
1119	if (unlikely(filp->f_op != &mqueue_file_operations))
1120		goto out_fput;
1121	info = MQUEUE_I(inode);
1122
1123	ret = 0;
1124	spin_lock(&info->lock);
1125	if (u_notification == NULL) {
1126		if (info->notify_owner == task_tgid(current)) {
1127			remove_notification(info);
1128			inode->i_atime = inode->i_ctime = CURRENT_TIME;
1129		}
1130	} else if (info->notify_owner != NULL) {
1131		ret = -EBUSY;
1132	} else {
1133		switch (notification.sigev_notify) {
1134		case SIGEV_NONE:
1135			info->notify.sigev_notify = SIGEV_NONE;
1136			break;
1137		case SIGEV_THREAD:
1138			info->notify_sock = sock;
1139			info->notify_cookie = nc;
1140			sock = NULL;
1141			nc = NULL;
1142			info->notify.sigev_notify = SIGEV_THREAD;
1143			break;
1144		case SIGEV_SIGNAL:
1145			info->notify.sigev_signo = notification.sigev_signo;
1146			info->notify.sigev_value = notification.sigev_value;
1147			info->notify.sigev_notify = SIGEV_SIGNAL;
1148			break;
1149		}
1150
1151		info->notify_owner = get_pid(task_tgid(current));
1152		inode->i_atime = inode->i_ctime = CURRENT_TIME;
1153	}
1154	spin_unlock(&info->lock);
1155out_fput:
1156	fput(filp);
1157out:
1158	if (sock) {
1159		netlink_detachskb(sock, nc);
1160	} else if (nc) {
1161		dev_kfree_skb(nc);
1162	}
1163	return ret;
1164}
1165
1166SYSCALL_DEFINE3(mq_getsetattr, mqd_t, mqdes,
1167		const struct mq_attr __user *, u_mqstat,
1168		struct mq_attr __user *, u_omqstat)
1169{
1170	int ret;
1171	struct mq_attr mqstat, omqstat;
1172	struct file *filp;
1173	struct inode *inode;
1174	struct mqueue_inode_info *info;
1175
1176	if (u_mqstat != NULL) {
1177		if (copy_from_user(&mqstat, u_mqstat, sizeof(struct mq_attr)))
1178			return -EFAULT;
1179		if (mqstat.mq_flags & (~O_NONBLOCK))
1180			return -EINVAL;
1181	}
1182
1183	ret = -EBADF;
1184	filp = fget(mqdes);
1185	if (!filp)
1186		goto out;
1187
1188	inode = filp->f_path.dentry->d_inode;
1189	if (unlikely(filp->f_op != &mqueue_file_operations))
1190		goto out_fput;
1191	info = MQUEUE_I(inode);
1192
1193	spin_lock(&info->lock);
1194
1195	omqstat = info->attr;
1196	omqstat.mq_flags = filp->f_flags & O_NONBLOCK;
1197	if (u_mqstat) {
1198		audit_mq_getsetattr(mqdes, &mqstat);
1199		spin_lock(&filp->f_lock);
1200		if (mqstat.mq_flags & O_NONBLOCK)
1201			filp->f_flags |= O_NONBLOCK;
1202		else
1203			filp->f_flags &= ~O_NONBLOCK;
1204		spin_unlock(&filp->f_lock);
1205
1206		inode->i_atime = inode->i_ctime = CURRENT_TIME;
1207	}
1208
1209	spin_unlock(&info->lock);
1210
1211	ret = 0;
1212	if (u_omqstat != NULL && copy_to_user(u_omqstat, &omqstat,
1213						sizeof(struct mq_attr)))
1214		ret = -EFAULT;
1215
1216out_fput:
1217	fput(filp);
1218out:
1219	return ret;
1220}
1221
1222static const struct inode_operations mqueue_dir_inode_operations = {
1223	.lookup = simple_lookup,
1224	.create = mqueue_create,
1225	.unlink = mqueue_unlink,
1226};
1227
1228static const struct file_operations mqueue_file_operations = {
1229	.flush = mqueue_flush_file,
1230	.poll = mqueue_poll_file,
1231	.read = mqueue_read_file,
1232};
1233
1234static struct super_operations mqueue_super_ops = {
1235	.alloc_inode = mqueue_alloc_inode,
1236	.destroy_inode = mqueue_destroy_inode,
1237	.statfs = simple_statfs,
1238	.delete_inode = mqueue_delete_inode,
1239	.drop_inode = generic_delete_inode,
1240};
1241
1242static struct file_system_type mqueue_fs_type = {
1243	.name = "mqueue",
1244	.get_sb = mqueue_get_sb,
1245	.kill_sb = kill_litter_super,
1246};
1247
1248int mq_init_ns(struct ipc_namespace *ns)
1249{
1250	ns->mq_queues_count  = 0;
1251	ns->mq_queues_max    = DFLT_QUEUESMAX;
1252	ns->mq_msg_max       = DFLT_MSGMAX;
1253	ns->mq_msgsize_max   = DFLT_MSGSIZEMAX;
1254
1255	ns->mq_mnt = kern_mount_data(&mqueue_fs_type, ns);
1256	if (IS_ERR(ns->mq_mnt)) {
1257		int err = PTR_ERR(ns->mq_mnt);
1258		ns->mq_mnt = NULL;
1259		return err;
1260	}
1261	return 0;
1262}
1263
1264void mq_clear_sbinfo(struct ipc_namespace *ns)
1265{
1266	ns->mq_mnt->mnt_sb->s_fs_info = NULL;
1267}
1268
1269void mq_put_mnt(struct ipc_namespace *ns)
1270{
1271	mntput(ns->mq_mnt);
1272}
1273
1274static int msg_max_limit_min = MIN_MSGMAX;
1275static int msg_max_limit_max = MAX_MSGMAX;
1276
1277static int msg_maxsize_limit_min = MIN_MSGSIZEMAX;
1278static int msg_maxsize_limit_max = MAX_MSGSIZEMAX;
1279
1280static ctl_table mq_sysctls[] = {
1281	{
1282		.procname	= "queues_max",
1283		.data		= &init_ipc_ns.mq_queues_max,
1284		.maxlen		= sizeof(int),
1285		.mode		= 0644,
1286		.proc_handler	= &proc_dointvec,
1287	},
1288	{
1289		.procname	= "msg_max",
1290		.data		= &init_ipc_ns.mq_msg_max,
1291		.maxlen		= sizeof(int),
1292		.mode		= 0644,
1293		.proc_handler	= &proc_dointvec_minmax,
1294		.extra1		= &msg_max_limit_min,
1295		.extra2		= &msg_max_limit_max,
1296	},
1297	{
1298		.procname	= "msgsize_max",
1299		.data		= &init_ipc_ns.mq_msgsize_max,
1300		.maxlen		= sizeof(int),
1301		.mode		= 0644,
1302		.proc_handler	= &proc_dointvec_minmax,
1303		.extra1		= &msg_maxsize_limit_min,
1304		.extra2		= &msg_maxsize_limit_max,
1305	},
1306	{ .ctl_name = 0 }
1307};
1308
1309static ctl_table mq_sysctl_dir[] = {
1310	{
1311		.procname	= "mqueue",
1312		.mode		= 0555,
1313		.child		= mq_sysctls,
1314	},
1315	{ .ctl_name = 0 }
1316};
1317
1318static ctl_table mq_sysctl_root[] = {
1319	{
1320		.ctl_name	= CTL_FS,
1321		.procname	= "fs",
1322		.mode		= 0555,
1323		.child		= mq_sysctl_dir,
1324	},
1325	{ .ctl_name = 0 }
1326};
1327
1328static int __init init_mqueue_fs(void)
1329{
1330	int error;
1331
1332	mqueue_inode_cachep = kmem_cache_create("mqueue_inode_cache",
1333				sizeof(struct mqueue_inode_info), 0,
1334				SLAB_HWCACHE_ALIGN, init_once);
1335	if (mqueue_inode_cachep == NULL)
1336		return -ENOMEM;
1337
1338	/* ignore failues - they are not fatal */
1339	mq_sysctl_table = register_sysctl_table(mq_sysctl_root);
1340
1341	error = register_filesystem(&mqueue_fs_type);
1342	if (error)
1343		goto out_sysctl;
1344
1345	spin_lock_init(&mq_lock);
1346
1347	init_ipc_ns.mq_mnt = kern_mount_data(&mqueue_fs_type, &init_ipc_ns);
1348	if (IS_ERR(init_ipc_ns.mq_mnt)) {
1349		error = PTR_ERR(init_ipc_ns.mq_mnt);
1350		goto out_filesystem;
1351	}
1352
1353	return 0;
1354
1355out_filesystem:
1356	unregister_filesystem(&mqueue_fs_type);
1357out_sysctl:
1358	if (mq_sysctl_table)
1359		unregister_sysctl_table(mq_sysctl_table);
1360	kmem_cache_destroy(mqueue_inode_cachep);
1361	return error;
1362}
1363
1364__initcall(init_mqueue_fs);
1365