1/* RxRPC individual remote procedure call handling
2 *
3 * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved.
4 * Written by David Howells (dhowells@redhat.com)
5 *
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public License
8 * as published by the Free Software Foundation; either version
9 * 2 of the License, or (at your option) any later version.
10 */
11
12#include <linux/slab.h>
13#include <linux/module.h>
14#include <linux/circ_buf.h>
15#include <linux/hashtable.h>
16#include <linux/spinlock_types.h>
17#include <net/sock.h>
18#include <net/af_rxrpc.h>
19#include "ar-internal.h"
20
21/*
22 * Maximum lifetime of a call (in jiffies).
23 */
24unsigned rxrpc_max_call_lifetime = 60 * HZ;
25
26/*
27 * Time till dead call expires after last use (in jiffies).
28 */
29unsigned rxrpc_dead_call_expiry = 2 * HZ;
30
31const char *const rxrpc_call_states[] = {
32	[RXRPC_CALL_CLIENT_SEND_REQUEST]	= "ClSndReq",
33	[RXRPC_CALL_CLIENT_AWAIT_REPLY]		= "ClAwtRpl",
34	[RXRPC_CALL_CLIENT_RECV_REPLY]		= "ClRcvRpl",
35	[RXRPC_CALL_CLIENT_FINAL_ACK]		= "ClFnlACK",
36	[RXRPC_CALL_SERVER_SECURING]		= "SvSecure",
37	[RXRPC_CALL_SERVER_ACCEPTING]		= "SvAccept",
38	[RXRPC_CALL_SERVER_RECV_REQUEST]	= "SvRcvReq",
39	[RXRPC_CALL_SERVER_ACK_REQUEST]		= "SvAckReq",
40	[RXRPC_CALL_SERVER_SEND_REPLY]		= "SvSndRpl",
41	[RXRPC_CALL_SERVER_AWAIT_ACK]		= "SvAwtACK",
42	[RXRPC_CALL_COMPLETE]			= "Complete",
43	[RXRPC_CALL_SERVER_BUSY]		= "SvBusy  ",
44	[RXRPC_CALL_REMOTELY_ABORTED]		= "RmtAbort",
45	[RXRPC_CALL_LOCALLY_ABORTED]		= "LocAbort",
46	[RXRPC_CALL_NETWORK_ERROR]		= "NetError",
47	[RXRPC_CALL_DEAD]			= "Dead    ",
48};
49
50struct kmem_cache *rxrpc_call_jar;
51LIST_HEAD(rxrpc_calls);
52DEFINE_RWLOCK(rxrpc_call_lock);
53
54static void rxrpc_destroy_call(struct work_struct *work);
55static void rxrpc_call_life_expired(unsigned long _call);
56static void rxrpc_dead_call_expired(unsigned long _call);
57static void rxrpc_ack_time_expired(unsigned long _call);
58static void rxrpc_resend_time_expired(unsigned long _call);
59
60static DEFINE_SPINLOCK(rxrpc_call_hash_lock);
61static DEFINE_HASHTABLE(rxrpc_call_hash, 10);
62
63/*
64 * Hash function for rxrpc_call_hash
65 */
66static unsigned long rxrpc_call_hashfunc(
67	u8		clientflag,
68	__be32		cid,
69	__be32		call_id,
70	__be32		epoch,
71	__be16		service_id,
72	sa_family_t	proto,
73	void		*localptr,
74	unsigned int	addr_size,
75	const u8	*peer_addr)
76{
77	const u16 *p;
78	unsigned int i;
79	unsigned long key;
80	u32 hcid = ntohl(cid);
81
82	_enter("");
83
84	key = (unsigned long)localptr;
85	/* We just want to add up the __be32 values, so forcing the
86	 * cast should be okay.
87	 */
88	key += (__force u32)epoch;
89	key += (__force u16)service_id;
90	key += (__force u32)call_id;
91	key += (hcid & RXRPC_CIDMASK) >> RXRPC_CIDSHIFT;
92	key += hcid & RXRPC_CHANNELMASK;
93	key += clientflag;
94	key += proto;
95	/* Step through the peer address in 16-bit portions for speed */
96	for (i = 0, p = (const u16 *)peer_addr; i < addr_size >> 1; i++, p++)
97		key += *p;
98	_leave(" key = 0x%lx", key);
99	return key;
100}
101
102/*
103 * Add a call to the hashtable
104 */
105static void rxrpc_call_hash_add(struct rxrpc_call *call)
106{
107	unsigned long key;
108	unsigned int addr_size = 0;
109
110	_enter("");
111	switch (call->proto) {
112	case AF_INET:
113		addr_size = sizeof(call->peer_ip.ipv4_addr);
114		break;
115	case AF_INET6:
116		addr_size = sizeof(call->peer_ip.ipv6_addr);
117		break;
118	default:
119		break;
120	}
121	key = rxrpc_call_hashfunc(call->in_clientflag, call->cid,
122				  call->call_id, call->epoch,
123				  call->service_id, call->proto,
124				  call->conn->trans->local, addr_size,
125				  call->peer_ip.ipv6_addr);
126	/* Store the full key in the call */
127	call->hash_key = key;
128	spin_lock(&rxrpc_call_hash_lock);
129	hash_add_rcu(rxrpc_call_hash, &call->hash_node, key);
130	spin_unlock(&rxrpc_call_hash_lock);
131	_leave("");
132}
133
134/*
135 * Remove a call from the hashtable
136 */
137static void rxrpc_call_hash_del(struct rxrpc_call *call)
138{
139	_enter("");
140	spin_lock(&rxrpc_call_hash_lock);
141	hash_del_rcu(&call->hash_node);
142	spin_unlock(&rxrpc_call_hash_lock);
143	_leave("");
144}
145
146/*
147 * Find a call in the hashtable and return it, or NULL if it
148 * isn't there.
149 */
150struct rxrpc_call *rxrpc_find_call_hash(
151	u8		clientflag,
152	__be32		cid,
153	__be32		call_id,
154	__be32		epoch,
155	__be16		service_id,
156	void		*localptr,
157	sa_family_t	proto,
158	const u8	*peer_addr)
159{
160	unsigned long key;
161	unsigned int addr_size = 0;
162	struct rxrpc_call *call = NULL;
163	struct rxrpc_call *ret = NULL;
164
165	_enter("");
166	switch (proto) {
167	case AF_INET:
168		addr_size = sizeof(call->peer_ip.ipv4_addr);
169		break;
170	case AF_INET6:
171		addr_size = sizeof(call->peer_ip.ipv6_addr);
172		break;
173	default:
174		break;
175	}
176
177	key = rxrpc_call_hashfunc(clientflag, cid, call_id, epoch,
178				  service_id, proto, localptr, addr_size,
179				  peer_addr);
180	hash_for_each_possible_rcu(rxrpc_call_hash, call, hash_node, key) {
181		if (call->hash_key == key &&
182		    call->call_id == call_id &&
183		    call->cid == cid &&
184		    call->in_clientflag == clientflag &&
185		    call->service_id == service_id &&
186		    call->proto == proto &&
187		    call->local == localptr &&
188		    memcmp(call->peer_ip.ipv6_addr, peer_addr,
189			      addr_size) == 0 &&
190		    call->epoch == epoch) {
191			ret = call;
192			break;
193		}
194	}
195	_leave(" = %p", ret);
196	return ret;
197}
198
199/*
200 * allocate a new call
201 */
202static struct rxrpc_call *rxrpc_alloc_call(gfp_t gfp)
203{
204	struct rxrpc_call *call;
205
206	call = kmem_cache_zalloc(rxrpc_call_jar, gfp);
207	if (!call)
208		return NULL;
209
210	call->acks_winsz = 16;
211	call->acks_window = kmalloc(call->acks_winsz * sizeof(unsigned long),
212				    gfp);
213	if (!call->acks_window) {
214		kmem_cache_free(rxrpc_call_jar, call);
215		return NULL;
216	}
217
218	setup_timer(&call->lifetimer, &rxrpc_call_life_expired,
219		    (unsigned long) call);
220	setup_timer(&call->deadspan, &rxrpc_dead_call_expired,
221		    (unsigned long) call);
222	setup_timer(&call->ack_timer, &rxrpc_ack_time_expired,
223		    (unsigned long) call);
224	setup_timer(&call->resend_timer, &rxrpc_resend_time_expired,
225		    (unsigned long) call);
226	INIT_WORK(&call->destroyer, &rxrpc_destroy_call);
227	INIT_WORK(&call->processor, &rxrpc_process_call);
228	INIT_LIST_HEAD(&call->accept_link);
229	skb_queue_head_init(&call->rx_queue);
230	skb_queue_head_init(&call->rx_oos_queue);
231	init_waitqueue_head(&call->tx_waitq);
232	spin_lock_init(&call->lock);
233	rwlock_init(&call->state_lock);
234	atomic_set(&call->usage, 1);
235	call->debug_id = atomic_inc_return(&rxrpc_debug_id);
236	call->state = RXRPC_CALL_CLIENT_SEND_REQUEST;
237
238	memset(&call->sock_node, 0xed, sizeof(call->sock_node));
239
240	call->rx_data_expect = 1;
241	call->rx_data_eaten = 0;
242	call->rx_first_oos = 0;
243	call->ackr_win_top = call->rx_data_eaten + 1 + rxrpc_rx_window_size;
244	call->creation_jif = jiffies;
245	return call;
246}
247
248/*
249 * allocate a new client call and attempt to get a connection slot for it
250 */
251static struct rxrpc_call *rxrpc_alloc_client_call(
252	struct rxrpc_sock *rx,
253	struct rxrpc_transport *trans,
254	struct rxrpc_conn_bundle *bundle,
255	gfp_t gfp)
256{
257	struct rxrpc_call *call;
258	int ret;
259
260	_enter("");
261
262	ASSERT(rx != NULL);
263	ASSERT(trans != NULL);
264	ASSERT(bundle != NULL);
265
266	call = rxrpc_alloc_call(gfp);
267	if (!call)
268		return ERR_PTR(-ENOMEM);
269
270	sock_hold(&rx->sk);
271	call->socket = rx;
272	call->rx_data_post = 1;
273
274	ret = rxrpc_connect_call(rx, trans, bundle, call, gfp);
275	if (ret < 0) {
276		kmem_cache_free(rxrpc_call_jar, call);
277		return ERR_PTR(ret);
278	}
279
280	/* Record copies of information for hashtable lookup */
281	call->proto = rx->proto;
282	call->local = trans->local;
283	switch (call->proto) {
284	case AF_INET:
285		call->peer_ip.ipv4_addr =
286			trans->peer->srx.transport.sin.sin_addr.s_addr;
287		break;
288	case AF_INET6:
289		memcpy(call->peer_ip.ipv6_addr,
290		       trans->peer->srx.transport.sin6.sin6_addr.in6_u.u6_addr8,
291		       sizeof(call->peer_ip.ipv6_addr));
292		break;
293	}
294	call->epoch = call->conn->epoch;
295	call->service_id = call->conn->service_id;
296	call->in_clientflag = call->conn->in_clientflag;
297	/* Add the new call to the hashtable */
298	rxrpc_call_hash_add(call);
299
300	spin_lock(&call->conn->trans->peer->lock);
301	list_add(&call->error_link, &call->conn->trans->peer->error_targets);
302	spin_unlock(&call->conn->trans->peer->lock);
303
304	call->lifetimer.expires = jiffies + rxrpc_max_call_lifetime;
305	add_timer(&call->lifetimer);
306
307	_leave(" = %p", call);
308	return call;
309}
310
311/*
312 * set up a call for the given data
313 * - called in process context with IRQs enabled
314 */
315struct rxrpc_call *rxrpc_get_client_call(struct rxrpc_sock *rx,
316					 struct rxrpc_transport *trans,
317					 struct rxrpc_conn_bundle *bundle,
318					 unsigned long user_call_ID,
319					 int create,
320					 gfp_t gfp)
321{
322	struct rxrpc_call *call, *candidate;
323	struct rb_node *p, *parent, **pp;
324
325	_enter("%p,%d,%d,%lx,%d",
326	       rx, trans ? trans->debug_id : -1, bundle ? bundle->debug_id : -1,
327	       user_call_ID, create);
328
329	/* search the extant calls first for one that matches the specified
330	 * user ID */
331	read_lock(&rx->call_lock);
332
333	p = rx->calls.rb_node;
334	while (p) {
335		call = rb_entry(p, struct rxrpc_call, sock_node);
336
337		if (user_call_ID < call->user_call_ID)
338			p = p->rb_left;
339		else if (user_call_ID > call->user_call_ID)
340			p = p->rb_right;
341		else
342			goto found_extant_call;
343	}
344
345	read_unlock(&rx->call_lock);
346
347	if (!create || !trans)
348		return ERR_PTR(-EBADSLT);
349
350	/* not yet present - create a candidate for a new record and then
351	 * redo the search */
352	candidate = rxrpc_alloc_client_call(rx, trans, bundle, gfp);
353	if (IS_ERR(candidate)) {
354		_leave(" = %ld", PTR_ERR(candidate));
355		return candidate;
356	}
357
358	candidate->user_call_ID = user_call_ID;
359	__set_bit(RXRPC_CALL_HAS_USERID, &candidate->flags);
360
361	write_lock(&rx->call_lock);
362
363	pp = &rx->calls.rb_node;
364	parent = NULL;
365	while (*pp) {
366		parent = *pp;
367		call = rb_entry(parent, struct rxrpc_call, sock_node);
368
369		if (user_call_ID < call->user_call_ID)
370			pp = &(*pp)->rb_left;
371		else if (user_call_ID > call->user_call_ID)
372			pp = &(*pp)->rb_right;
373		else
374			goto found_extant_second;
375	}
376
377	/* second search also failed; add the new call */
378	call = candidate;
379	candidate = NULL;
380	rxrpc_get_call(call);
381
382	rb_link_node(&call->sock_node, parent, pp);
383	rb_insert_color(&call->sock_node, &rx->calls);
384	write_unlock(&rx->call_lock);
385
386	write_lock_bh(&rxrpc_call_lock);
387	list_add_tail(&call->link, &rxrpc_calls);
388	write_unlock_bh(&rxrpc_call_lock);
389
390	_net("CALL new %d on CONN %d", call->debug_id, call->conn->debug_id);
391
392	_leave(" = %p [new]", call);
393	return call;
394
395	/* we found the call in the list immediately */
396found_extant_call:
397	rxrpc_get_call(call);
398	read_unlock(&rx->call_lock);
399	_leave(" = %p [extant %d]", call, atomic_read(&call->usage));
400	return call;
401
402	/* we found the call on the second time through the list */
403found_extant_second:
404	rxrpc_get_call(call);
405	write_unlock(&rx->call_lock);
406	rxrpc_put_call(candidate);
407	_leave(" = %p [second %d]", call, atomic_read(&call->usage));
408	return call;
409}
410
411/*
412 * set up an incoming call
413 * - called in process context with IRQs enabled
414 */
415struct rxrpc_call *rxrpc_incoming_call(struct rxrpc_sock *rx,
416				       struct rxrpc_connection *conn,
417				       struct rxrpc_header *hdr,
418				       gfp_t gfp)
419{
420	struct rxrpc_call *call, *candidate;
421	struct rb_node **p, *parent;
422	__be32 call_id;
423
424	_enter(",%d,,%x", conn->debug_id, gfp);
425
426	ASSERT(rx != NULL);
427
428	candidate = rxrpc_alloc_call(gfp);
429	if (!candidate)
430		return ERR_PTR(-EBUSY);
431
432	candidate->socket = rx;
433	candidate->conn = conn;
434	candidate->cid = hdr->cid;
435	candidate->call_id = hdr->callNumber;
436	candidate->channel = ntohl(hdr->cid) & RXRPC_CHANNELMASK;
437	candidate->rx_data_post = 0;
438	candidate->state = RXRPC_CALL_SERVER_ACCEPTING;
439	if (conn->security_ix > 0)
440		candidate->state = RXRPC_CALL_SERVER_SECURING;
441
442	write_lock_bh(&conn->lock);
443
444	/* set the channel for this call */
445	call = conn->channels[candidate->channel];
446	_debug("channel[%u] is %p", candidate->channel, call);
447	if (call && call->call_id == hdr->callNumber) {
448		/* already set; must've been a duplicate packet */
449		_debug("extant call [%d]", call->state);
450		ASSERTCMP(call->conn, ==, conn);
451
452		read_lock(&call->state_lock);
453		switch (call->state) {
454		case RXRPC_CALL_LOCALLY_ABORTED:
455			if (!test_and_set_bit(RXRPC_CALL_ABORT, &call->events))
456				rxrpc_queue_call(call);
457		case RXRPC_CALL_REMOTELY_ABORTED:
458			read_unlock(&call->state_lock);
459			goto aborted_call;
460		default:
461			rxrpc_get_call(call);
462			read_unlock(&call->state_lock);
463			goto extant_call;
464		}
465	}
466
467	if (call) {
468		/* it seems the channel is still in use from the previous call
469		 * - ditch the old binding if its call is now complete */
470		_debug("CALL: %u { %s }",
471		       call->debug_id, rxrpc_call_states[call->state]);
472
473		if (call->state >= RXRPC_CALL_COMPLETE) {
474			conn->channels[call->channel] = NULL;
475		} else {
476			write_unlock_bh(&conn->lock);
477			kmem_cache_free(rxrpc_call_jar, candidate);
478			_leave(" = -EBUSY");
479			return ERR_PTR(-EBUSY);
480		}
481	}
482
483	/* check the call number isn't duplicate */
484	_debug("check dup");
485	call_id = hdr->callNumber;
486	p = &conn->calls.rb_node;
487	parent = NULL;
488	while (*p) {
489		parent = *p;
490		call = rb_entry(parent, struct rxrpc_call, conn_node);
491
492		/* The tree is sorted in order of the __be32 value without
493		 * turning it into host order.
494		 */
495		if ((__force u32)call_id < (__force u32)call->call_id)
496			p = &(*p)->rb_left;
497		else if ((__force u32)call_id > (__force u32)call->call_id)
498			p = &(*p)->rb_right;
499		else
500			goto old_call;
501	}
502
503	/* make the call available */
504	_debug("new call");
505	call = candidate;
506	candidate = NULL;
507	rb_link_node(&call->conn_node, parent, p);
508	rb_insert_color(&call->conn_node, &conn->calls);
509	conn->channels[call->channel] = call;
510	sock_hold(&rx->sk);
511	atomic_inc(&conn->usage);
512	write_unlock_bh(&conn->lock);
513
514	spin_lock(&conn->trans->peer->lock);
515	list_add(&call->error_link, &conn->trans->peer->error_targets);
516	spin_unlock(&conn->trans->peer->lock);
517
518	write_lock_bh(&rxrpc_call_lock);
519	list_add_tail(&call->link, &rxrpc_calls);
520	write_unlock_bh(&rxrpc_call_lock);
521
522	/* Record copies of information for hashtable lookup */
523	call->proto = rx->proto;
524	call->local = conn->trans->local;
525	switch (call->proto) {
526	case AF_INET:
527		call->peer_ip.ipv4_addr =
528			conn->trans->peer->srx.transport.sin.sin_addr.s_addr;
529		break;
530	case AF_INET6:
531		memcpy(call->peer_ip.ipv6_addr,
532		       conn->trans->peer->srx.transport.sin6.sin6_addr.in6_u.u6_addr8,
533		       sizeof(call->peer_ip.ipv6_addr));
534		break;
535	default:
536		break;
537	}
538	call->epoch = conn->epoch;
539	call->service_id = conn->service_id;
540	call->in_clientflag = conn->in_clientflag;
541	/* Add the new call to the hashtable */
542	rxrpc_call_hash_add(call);
543
544	_net("CALL incoming %d on CONN %d", call->debug_id, call->conn->debug_id);
545
546	call->lifetimer.expires = jiffies + rxrpc_max_call_lifetime;
547	add_timer(&call->lifetimer);
548	_leave(" = %p {%d} [new]", call, call->debug_id);
549	return call;
550
551extant_call:
552	write_unlock_bh(&conn->lock);
553	kmem_cache_free(rxrpc_call_jar, candidate);
554	_leave(" = %p {%d} [extant]", call, call ? call->debug_id : -1);
555	return call;
556
557aborted_call:
558	write_unlock_bh(&conn->lock);
559	kmem_cache_free(rxrpc_call_jar, candidate);
560	_leave(" = -ECONNABORTED");
561	return ERR_PTR(-ECONNABORTED);
562
563old_call:
564	write_unlock_bh(&conn->lock);
565	kmem_cache_free(rxrpc_call_jar, candidate);
566	_leave(" = -ECONNRESET [old]");
567	return ERR_PTR(-ECONNRESET);
568}
569
570/*
571 * find an extant server call
572 * - called in process context with IRQs enabled
573 */
574struct rxrpc_call *rxrpc_find_server_call(struct rxrpc_sock *rx,
575					  unsigned long user_call_ID)
576{
577	struct rxrpc_call *call;
578	struct rb_node *p;
579
580	_enter("%p,%lx", rx, user_call_ID);
581
582	/* search the extant calls for one that matches the specified user
583	 * ID */
584	read_lock(&rx->call_lock);
585
586	p = rx->calls.rb_node;
587	while (p) {
588		call = rb_entry(p, struct rxrpc_call, sock_node);
589
590		if (user_call_ID < call->user_call_ID)
591			p = p->rb_left;
592		else if (user_call_ID > call->user_call_ID)
593			p = p->rb_right;
594		else
595			goto found_extant_call;
596	}
597
598	read_unlock(&rx->call_lock);
599	_leave(" = NULL");
600	return NULL;
601
602	/* we found the call in the list immediately */
603found_extant_call:
604	rxrpc_get_call(call);
605	read_unlock(&rx->call_lock);
606	_leave(" = %p [%d]", call, atomic_read(&call->usage));
607	return call;
608}
609
610/*
611 * detach a call from a socket and set up for release
612 */
613void rxrpc_release_call(struct rxrpc_call *call)
614{
615	struct rxrpc_connection *conn = call->conn;
616	struct rxrpc_sock *rx = call->socket;
617
618	_enter("{%d,%d,%d,%d}",
619	       call->debug_id, atomic_read(&call->usage),
620	       atomic_read(&call->ackr_not_idle),
621	       call->rx_first_oos);
622
623	spin_lock_bh(&call->lock);
624	if (test_and_set_bit(RXRPC_CALL_RELEASED, &call->flags))
625		BUG();
626	spin_unlock_bh(&call->lock);
627
628	/* dissociate from the socket
629	 * - the socket's ref on the call is passed to the death timer
630	 */
631	_debug("RELEASE CALL %p (%d CONN %p)", call, call->debug_id, conn);
632
633	write_lock_bh(&rx->call_lock);
634	if (!list_empty(&call->accept_link)) {
635		_debug("unlinking once-pending call %p { e=%lx f=%lx }",
636		       call, call->events, call->flags);
637		ASSERT(!test_bit(RXRPC_CALL_HAS_USERID, &call->flags));
638		list_del_init(&call->accept_link);
639		sk_acceptq_removed(&rx->sk);
640	} else if (test_bit(RXRPC_CALL_HAS_USERID, &call->flags)) {
641		rb_erase(&call->sock_node, &rx->calls);
642		memset(&call->sock_node, 0xdd, sizeof(call->sock_node));
643		clear_bit(RXRPC_CALL_HAS_USERID, &call->flags);
644	}
645	write_unlock_bh(&rx->call_lock);
646
647	/* free up the channel for reuse */
648	spin_lock(&conn->trans->client_lock);
649	write_lock_bh(&conn->lock);
650	write_lock(&call->state_lock);
651
652	if (conn->channels[call->channel] == call)
653		conn->channels[call->channel] = NULL;
654
655	if (conn->out_clientflag && conn->bundle) {
656		conn->avail_calls++;
657		switch (conn->avail_calls) {
658		case 1:
659			list_move_tail(&conn->bundle_link,
660				       &conn->bundle->avail_conns);
661		case 2 ... RXRPC_MAXCALLS - 1:
662			ASSERT(conn->channels[0] == NULL ||
663			       conn->channels[1] == NULL ||
664			       conn->channels[2] == NULL ||
665			       conn->channels[3] == NULL);
666			break;
667		case RXRPC_MAXCALLS:
668			list_move_tail(&conn->bundle_link,
669				       &conn->bundle->unused_conns);
670			ASSERT(conn->channels[0] == NULL &&
671			       conn->channels[1] == NULL &&
672			       conn->channels[2] == NULL &&
673			       conn->channels[3] == NULL);
674			break;
675		default:
676			printk(KERN_ERR "RxRPC: conn->avail_calls=%d\n",
677			       conn->avail_calls);
678			BUG();
679		}
680	}
681
682	spin_unlock(&conn->trans->client_lock);
683
684	if (call->state < RXRPC_CALL_COMPLETE &&
685	    call->state != RXRPC_CALL_CLIENT_FINAL_ACK) {
686		_debug("+++ ABORTING STATE %d +++\n", call->state);
687		call->state = RXRPC_CALL_LOCALLY_ABORTED;
688		call->abort_code = RX_CALL_DEAD;
689		set_bit(RXRPC_CALL_ABORT, &call->events);
690		rxrpc_queue_call(call);
691	}
692	write_unlock(&call->state_lock);
693	write_unlock_bh(&conn->lock);
694
695	/* clean up the Rx queue */
696	if (!skb_queue_empty(&call->rx_queue) ||
697	    !skb_queue_empty(&call->rx_oos_queue)) {
698		struct rxrpc_skb_priv *sp;
699		struct sk_buff *skb;
700
701		_debug("purge Rx queues");
702
703		spin_lock_bh(&call->lock);
704		while ((skb = skb_dequeue(&call->rx_queue)) ||
705		       (skb = skb_dequeue(&call->rx_oos_queue))) {
706			sp = rxrpc_skb(skb);
707			if (sp->call) {
708				ASSERTCMP(sp->call, ==, call);
709				rxrpc_put_call(call);
710				sp->call = NULL;
711			}
712			skb->destructor = NULL;
713			spin_unlock_bh(&call->lock);
714
715			_debug("- zap %s %%%u #%u",
716			       rxrpc_pkts[sp->hdr.type],
717			       ntohl(sp->hdr.serial),
718			       ntohl(sp->hdr.seq));
719			rxrpc_free_skb(skb);
720			spin_lock_bh(&call->lock);
721		}
722		spin_unlock_bh(&call->lock);
723
724		ASSERTCMP(call->state, !=, RXRPC_CALL_COMPLETE);
725	}
726
727	del_timer_sync(&call->resend_timer);
728	del_timer_sync(&call->ack_timer);
729	del_timer_sync(&call->lifetimer);
730	call->deadspan.expires = jiffies + rxrpc_dead_call_expiry;
731	add_timer(&call->deadspan);
732
733	_leave("");
734}
735
736/*
737 * handle a dead call being ready for reaping
738 */
739static void rxrpc_dead_call_expired(unsigned long _call)
740{
741	struct rxrpc_call *call = (struct rxrpc_call *) _call;
742
743	_enter("{%d}", call->debug_id);
744
745	write_lock_bh(&call->state_lock);
746	call->state = RXRPC_CALL_DEAD;
747	write_unlock_bh(&call->state_lock);
748	rxrpc_put_call(call);
749}
750
751/*
752 * mark a call as to be released, aborting it if it's still in progress
753 * - called with softirqs disabled
754 */
755static void rxrpc_mark_call_released(struct rxrpc_call *call)
756{
757	bool sched;
758
759	write_lock(&call->state_lock);
760	if (call->state < RXRPC_CALL_DEAD) {
761		sched = false;
762		if (call->state < RXRPC_CALL_COMPLETE) {
763			_debug("abort call %p", call);
764			call->state = RXRPC_CALL_LOCALLY_ABORTED;
765			call->abort_code = RX_CALL_DEAD;
766			if (!test_and_set_bit(RXRPC_CALL_ABORT, &call->events))
767				sched = true;
768		}
769		if (!test_and_set_bit(RXRPC_CALL_RELEASE, &call->events))
770			sched = true;
771		if (sched)
772			rxrpc_queue_call(call);
773	}
774	write_unlock(&call->state_lock);
775}
776
777/*
778 * release all the calls associated with a socket
779 */
780void rxrpc_release_calls_on_socket(struct rxrpc_sock *rx)
781{
782	struct rxrpc_call *call;
783	struct rb_node *p;
784
785	_enter("%p", rx);
786
787	read_lock_bh(&rx->call_lock);
788
789	/* mark all the calls as no longer wanting incoming packets */
790	for (p = rb_first(&rx->calls); p; p = rb_next(p)) {
791		call = rb_entry(p, struct rxrpc_call, sock_node);
792		rxrpc_mark_call_released(call);
793	}
794
795	/* kill the not-yet-accepted incoming calls */
796	list_for_each_entry(call, &rx->secureq, accept_link) {
797		rxrpc_mark_call_released(call);
798	}
799
800	list_for_each_entry(call, &rx->acceptq, accept_link) {
801		rxrpc_mark_call_released(call);
802	}
803
804	read_unlock_bh(&rx->call_lock);
805	_leave("");
806}
807
808/*
809 * release a call
810 */
811void __rxrpc_put_call(struct rxrpc_call *call)
812{
813	ASSERT(call != NULL);
814
815	_enter("%p{u=%d}", call, atomic_read(&call->usage));
816
817	ASSERTCMP(atomic_read(&call->usage), >, 0);
818
819	if (atomic_dec_and_test(&call->usage)) {
820		_debug("call %d dead", call->debug_id);
821		ASSERTCMP(call->state, ==, RXRPC_CALL_DEAD);
822		rxrpc_queue_work(&call->destroyer);
823	}
824	_leave("");
825}
826
827/*
828 * clean up a call
829 */
830static void rxrpc_cleanup_call(struct rxrpc_call *call)
831{
832	_net("DESTROY CALL %d", call->debug_id);
833
834	ASSERT(call->socket);
835
836	memset(&call->sock_node, 0xcd, sizeof(call->sock_node));
837
838	del_timer_sync(&call->lifetimer);
839	del_timer_sync(&call->deadspan);
840	del_timer_sync(&call->ack_timer);
841	del_timer_sync(&call->resend_timer);
842
843	ASSERT(test_bit(RXRPC_CALL_RELEASED, &call->flags));
844	ASSERTCMP(call->events, ==, 0);
845	if (work_pending(&call->processor)) {
846		_debug("defer destroy");
847		rxrpc_queue_work(&call->destroyer);
848		return;
849	}
850
851	if (call->conn) {
852		spin_lock(&call->conn->trans->peer->lock);
853		list_del(&call->error_link);
854		spin_unlock(&call->conn->trans->peer->lock);
855
856		write_lock_bh(&call->conn->lock);
857		rb_erase(&call->conn_node, &call->conn->calls);
858		write_unlock_bh(&call->conn->lock);
859		rxrpc_put_connection(call->conn);
860	}
861
862	/* Remove the call from the hash */
863	rxrpc_call_hash_del(call);
864
865	if (call->acks_window) {
866		_debug("kill Tx window %d",
867		       CIRC_CNT(call->acks_head, call->acks_tail,
868				call->acks_winsz));
869		smp_mb();
870		while (CIRC_CNT(call->acks_head, call->acks_tail,
871				call->acks_winsz) > 0) {
872			struct rxrpc_skb_priv *sp;
873			unsigned long _skb;
874
875			_skb = call->acks_window[call->acks_tail] & ~1;
876			sp = rxrpc_skb((struct sk_buff *) _skb);
877			_debug("+++ clear Tx %u", ntohl(sp->hdr.seq));
878			rxrpc_free_skb((struct sk_buff *) _skb);
879			call->acks_tail =
880				(call->acks_tail + 1) & (call->acks_winsz - 1);
881		}
882
883		kfree(call->acks_window);
884	}
885
886	rxrpc_free_skb(call->tx_pending);
887
888	rxrpc_purge_queue(&call->rx_queue);
889	ASSERT(skb_queue_empty(&call->rx_oos_queue));
890	sock_put(&call->socket->sk);
891	kmem_cache_free(rxrpc_call_jar, call);
892}
893
894/*
895 * destroy a call
896 */
897static void rxrpc_destroy_call(struct work_struct *work)
898{
899	struct rxrpc_call *call =
900		container_of(work, struct rxrpc_call, destroyer);
901
902	_enter("%p{%d,%d,%p}",
903	       call, atomic_read(&call->usage), call->channel, call->conn);
904
905	ASSERTCMP(call->state, ==, RXRPC_CALL_DEAD);
906
907	write_lock_bh(&rxrpc_call_lock);
908	list_del_init(&call->link);
909	write_unlock_bh(&rxrpc_call_lock);
910
911	rxrpc_cleanup_call(call);
912	_leave("");
913}
914
915/*
916 * preemptively destroy all the call records from a transport endpoint rather
917 * than waiting for them to time out
918 */
919void __exit rxrpc_destroy_all_calls(void)
920{
921	struct rxrpc_call *call;
922
923	_enter("");
924	write_lock_bh(&rxrpc_call_lock);
925
926	while (!list_empty(&rxrpc_calls)) {
927		call = list_entry(rxrpc_calls.next, struct rxrpc_call, link);
928		_debug("Zapping call %p", call);
929
930		list_del_init(&call->link);
931
932		switch (atomic_read(&call->usage)) {
933		case 0:
934			ASSERTCMP(call->state, ==, RXRPC_CALL_DEAD);
935			break;
936		case 1:
937			if (del_timer_sync(&call->deadspan) != 0 &&
938			    call->state != RXRPC_CALL_DEAD)
939				rxrpc_dead_call_expired((unsigned long) call);
940			if (call->state != RXRPC_CALL_DEAD)
941				break;
942		default:
943			printk(KERN_ERR "RXRPC:"
944			       " Call %p still in use (%d,%d,%s,%lx,%lx)!\n",
945			       call, atomic_read(&call->usage),
946			       atomic_read(&call->ackr_not_idle),
947			       rxrpc_call_states[call->state],
948			       call->flags, call->events);
949			if (!skb_queue_empty(&call->rx_queue))
950				printk(KERN_ERR"RXRPC: Rx queue occupied\n");
951			if (!skb_queue_empty(&call->rx_oos_queue))
952				printk(KERN_ERR"RXRPC: OOS queue occupied\n");
953			break;
954		}
955
956		write_unlock_bh(&rxrpc_call_lock);
957		cond_resched();
958		write_lock_bh(&rxrpc_call_lock);
959	}
960
961	write_unlock_bh(&rxrpc_call_lock);
962	_leave("");
963}
964
965/*
966 * handle call lifetime being exceeded
967 */
968static void rxrpc_call_life_expired(unsigned long _call)
969{
970	struct rxrpc_call *call = (struct rxrpc_call *) _call;
971
972	if (call->state >= RXRPC_CALL_COMPLETE)
973		return;
974
975	_enter("{%d}", call->debug_id);
976	read_lock_bh(&call->state_lock);
977	if (call->state < RXRPC_CALL_COMPLETE) {
978		set_bit(RXRPC_CALL_LIFE_TIMER, &call->events);
979		rxrpc_queue_call(call);
980	}
981	read_unlock_bh(&call->state_lock);
982}
983
984/*
985 * handle resend timer expiry
986 * - may not take call->state_lock as this can deadlock against del_timer_sync()
987 */
988static void rxrpc_resend_time_expired(unsigned long _call)
989{
990	struct rxrpc_call *call = (struct rxrpc_call *) _call;
991
992	_enter("{%d}", call->debug_id);
993
994	if (call->state >= RXRPC_CALL_COMPLETE)
995		return;
996
997	clear_bit(RXRPC_CALL_RUN_RTIMER, &call->flags);
998	if (!test_and_set_bit(RXRPC_CALL_RESEND_TIMER, &call->events))
999		rxrpc_queue_call(call);
1000}
1001
1002/*
1003 * handle ACK timer expiry
1004 */
1005static void rxrpc_ack_time_expired(unsigned long _call)
1006{
1007	struct rxrpc_call *call = (struct rxrpc_call *) _call;
1008
1009	_enter("{%d}", call->debug_id);
1010
1011	if (call->state >= RXRPC_CALL_COMPLETE)
1012		return;
1013
1014	read_lock_bh(&call->state_lock);
1015	if (call->state < RXRPC_CALL_COMPLETE &&
1016	    !test_and_set_bit(RXRPC_CALL_ACK, &call->events))
1017		rxrpc_queue_call(call);
1018	read_unlock_bh(&call->state_lock);
1019}
1020