1/* Management of Tx window, Tx resend, ACKs and out-of-sequence reception
2 *
3 * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved.
4 * Written by David Howells (dhowells@redhat.com)
5 *
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public License
8 * as published by the Free Software Foundation; either version
9 * 2 of the License, or (at your option) any later version.
10 */
11
12#include <linux/module.h>
13#include <linux/circ_buf.h>
14#include <linux/net.h>
15#include <linux/skbuff.h>
16#include <linux/slab.h>
17#include <linux/udp.h>
18#include <net/sock.h>
19#include <net/af_rxrpc.h>
20#include "ar-internal.h"
21
22/*
23 * How long to wait before scheduling ACK generation after seeing a
24 * packet with RXRPC_REQUEST_ACK set (in jiffies).
25 */
26unsigned rxrpc_requested_ack_delay = 1;
27
28/*
29 * How long to wait before scheduling an ACK with subtype DELAY (in jiffies).
30 *
31 * We use this when we've received new data packets.  If those packets aren't
32 * all consumed within this time we will send a DELAY ACK if an ACK was not
33 * requested to let the sender know it doesn't need to resend.
34 */
35unsigned rxrpc_soft_ack_delay = 1 * HZ;
36
37/*
38 * How long to wait before scheduling an ACK with subtype IDLE (in jiffies).
39 *
40 * We use this when we've consumed some previously soft-ACK'd packets when
41 * further packets aren't immediately received to decide when to send an IDLE
42 * ACK let the other end know that it can free up its Tx buffer space.
43 */
44unsigned rxrpc_idle_ack_delay = 0.5 * HZ;
45
46/*
47 * Receive window size in packets.  This indicates the maximum number of
48 * unconsumed received packets we're willing to retain in memory.  Once this
49 * limit is hit, we should generate an EXCEEDS_WINDOW ACK and discard further
50 * packets.
51 */
52unsigned rxrpc_rx_window_size = 32;
53
54/*
55 * Maximum Rx MTU size.  This indicates to the sender the size of jumbo packet
56 * made by gluing normal packets together that we're willing to handle.
57 */
58unsigned rxrpc_rx_mtu = 5692;
59
60/*
61 * The maximum number of fragments in a received jumbo packet that we tell the
62 * sender that we're willing to handle.
63 */
64unsigned rxrpc_rx_jumbo_max = 4;
65
66static const char *rxrpc_acks(u8 reason)
67{
68	static const char *const str[] = {
69		"---", "REQ", "DUP", "OOS", "WIN", "MEM", "PNG", "PNR", "DLY",
70		"IDL", "-?-"
71	};
72
73	if (reason >= ARRAY_SIZE(str))
74		reason = ARRAY_SIZE(str) - 1;
75	return str[reason];
76}
77
78static const s8 rxrpc_ack_priority[] = {
79	[0]				= 0,
80	[RXRPC_ACK_DELAY]		= 1,
81	[RXRPC_ACK_REQUESTED]		= 2,
82	[RXRPC_ACK_IDLE]		= 3,
83	[RXRPC_ACK_PING_RESPONSE]	= 4,
84	[RXRPC_ACK_DUPLICATE]		= 5,
85	[RXRPC_ACK_OUT_OF_SEQUENCE]	= 6,
86	[RXRPC_ACK_EXCEEDS_WINDOW]	= 7,
87	[RXRPC_ACK_NOSPACE]		= 8,
88};
89
90/*
91 * propose an ACK be sent
92 */
93void __rxrpc_propose_ACK(struct rxrpc_call *call, u8 ack_reason,
94			 __be32 serial, bool immediate)
95{
96	unsigned long expiry;
97	s8 prior = rxrpc_ack_priority[ack_reason];
98
99	ASSERTCMP(prior, >, 0);
100
101	_enter("{%d},%s,%%%x,%u",
102	       call->debug_id, rxrpc_acks(ack_reason), ntohl(serial),
103	       immediate);
104
105	if (prior < rxrpc_ack_priority[call->ackr_reason]) {
106		if (immediate)
107			goto cancel_timer;
108		return;
109	}
110
111	/* update DELAY, IDLE, REQUESTED and PING_RESPONSE ACK serial
112	 * numbers */
113	if (prior == rxrpc_ack_priority[call->ackr_reason]) {
114		if (prior <= 4)
115			call->ackr_serial = serial;
116		if (immediate)
117			goto cancel_timer;
118		return;
119	}
120
121	call->ackr_reason = ack_reason;
122	call->ackr_serial = serial;
123
124	switch (ack_reason) {
125	case RXRPC_ACK_DELAY:
126		_debug("run delay timer");
127		expiry = rxrpc_soft_ack_delay;
128		goto run_timer;
129
130	case RXRPC_ACK_IDLE:
131		if (!immediate) {
132			_debug("run defer timer");
133			expiry = rxrpc_idle_ack_delay;
134			goto run_timer;
135		}
136		goto cancel_timer;
137
138	case RXRPC_ACK_REQUESTED:
139		expiry = rxrpc_requested_ack_delay;
140		if (!expiry)
141			goto cancel_timer;
142		if (!immediate || serial == cpu_to_be32(1)) {
143			_debug("run defer timer");
144			goto run_timer;
145		}
146
147	default:
148		_debug("immediate ACK");
149		goto cancel_timer;
150	}
151
152run_timer:
153	expiry += jiffies;
154	if (!timer_pending(&call->ack_timer) ||
155	    time_after(call->ack_timer.expires, expiry))
156		mod_timer(&call->ack_timer, expiry);
157	return;
158
159cancel_timer:
160	_debug("cancel timer %%%u", ntohl(serial));
161	try_to_del_timer_sync(&call->ack_timer);
162	read_lock_bh(&call->state_lock);
163	if (call->state <= RXRPC_CALL_COMPLETE &&
164	    !test_and_set_bit(RXRPC_CALL_ACK, &call->events))
165		rxrpc_queue_call(call);
166	read_unlock_bh(&call->state_lock);
167}
168
169/*
170 * propose an ACK be sent, locking the call structure
171 */
172void rxrpc_propose_ACK(struct rxrpc_call *call, u8 ack_reason,
173		       __be32 serial, bool immediate)
174{
175	s8 prior = rxrpc_ack_priority[ack_reason];
176
177	if (prior > rxrpc_ack_priority[call->ackr_reason]) {
178		spin_lock_bh(&call->lock);
179		__rxrpc_propose_ACK(call, ack_reason, serial, immediate);
180		spin_unlock_bh(&call->lock);
181	}
182}
183
184/*
185 * set the resend timer
186 */
187static void rxrpc_set_resend(struct rxrpc_call *call, u8 resend,
188			     unsigned long resend_at)
189{
190	read_lock_bh(&call->state_lock);
191	if (call->state >= RXRPC_CALL_COMPLETE)
192		resend = 0;
193
194	if (resend & 1) {
195		_debug("SET RESEND");
196		set_bit(RXRPC_CALL_RESEND, &call->events);
197	}
198
199	if (resend & 2) {
200		_debug("MODIFY RESEND TIMER");
201		set_bit(RXRPC_CALL_RUN_RTIMER, &call->flags);
202		mod_timer(&call->resend_timer, resend_at);
203	} else {
204		_debug("KILL RESEND TIMER");
205		del_timer_sync(&call->resend_timer);
206		clear_bit(RXRPC_CALL_RESEND_TIMER, &call->events);
207		clear_bit(RXRPC_CALL_RUN_RTIMER, &call->flags);
208	}
209	read_unlock_bh(&call->state_lock);
210}
211
212/*
213 * resend packets
214 */
215static void rxrpc_resend(struct rxrpc_call *call)
216{
217	struct rxrpc_skb_priv *sp;
218	struct rxrpc_header *hdr;
219	struct sk_buff *txb;
220	unsigned long *p_txb, resend_at;
221	int loop, stop;
222	u8 resend;
223
224	_enter("{%d,%d,%d,%d},",
225	       call->acks_hard, call->acks_unacked,
226	       atomic_read(&call->sequence),
227	       CIRC_CNT(call->acks_head, call->acks_tail, call->acks_winsz));
228
229	stop = 0;
230	resend = 0;
231	resend_at = 0;
232
233	for (loop = call->acks_tail;
234	     loop != call->acks_head || stop;
235	     loop = (loop + 1) &  (call->acks_winsz - 1)
236	     ) {
237		p_txb = call->acks_window + loop;
238		smp_read_barrier_depends();
239		if (*p_txb & 1)
240			continue;
241
242		txb = (struct sk_buff *) *p_txb;
243		sp = rxrpc_skb(txb);
244
245		if (sp->need_resend) {
246			sp->need_resend = false;
247
248			/* each Tx packet has a new serial number */
249			sp->hdr.serial =
250				htonl(atomic_inc_return(&call->conn->serial));
251
252			hdr = (struct rxrpc_header *) txb->head;
253			hdr->serial = sp->hdr.serial;
254
255			_proto("Tx DATA %%%u { #%d }",
256			       ntohl(sp->hdr.serial), ntohl(sp->hdr.seq));
257			if (rxrpc_send_packet(call->conn->trans, txb) < 0) {
258				stop = 0;
259				sp->resend_at = jiffies + 3;
260			} else {
261				sp->resend_at =
262					jiffies + rxrpc_resend_timeout * HZ;
263			}
264		}
265
266		if (time_after_eq(jiffies + 1, sp->resend_at)) {
267			sp->need_resend = true;
268			resend |= 1;
269		} else if (resend & 2) {
270			if (time_before(sp->resend_at, resend_at))
271				resend_at = sp->resend_at;
272		} else {
273			resend_at = sp->resend_at;
274			resend |= 2;
275		}
276	}
277
278	rxrpc_set_resend(call, resend, resend_at);
279	_leave("");
280}
281
282/*
283 * handle resend timer expiry
284 */
285static void rxrpc_resend_timer(struct rxrpc_call *call)
286{
287	struct rxrpc_skb_priv *sp;
288	struct sk_buff *txb;
289	unsigned long *p_txb, resend_at;
290	int loop;
291	u8 resend;
292
293	_enter("%d,%d,%d",
294	       call->acks_tail, call->acks_unacked, call->acks_head);
295
296	if (call->state >= RXRPC_CALL_COMPLETE)
297		return;
298
299	resend = 0;
300	resend_at = 0;
301
302	for (loop = call->acks_unacked;
303	     loop != call->acks_head;
304	     loop = (loop + 1) &  (call->acks_winsz - 1)
305	     ) {
306		p_txb = call->acks_window + loop;
307		smp_read_barrier_depends();
308		txb = (struct sk_buff *) (*p_txb & ~1);
309		sp = rxrpc_skb(txb);
310
311		ASSERT(!(*p_txb & 1));
312
313		if (sp->need_resend) {
314			;
315		} else if (time_after_eq(jiffies + 1, sp->resend_at)) {
316			sp->need_resend = true;
317			resend |= 1;
318		} else if (resend & 2) {
319			if (time_before(sp->resend_at, resend_at))
320				resend_at = sp->resend_at;
321		} else {
322			resend_at = sp->resend_at;
323			resend |= 2;
324		}
325	}
326
327	rxrpc_set_resend(call, resend, resend_at);
328	_leave("");
329}
330
331/*
332 * process soft ACKs of our transmitted packets
333 * - these indicate packets the peer has or has not received, but hasn't yet
334 *   given to the consumer, and so can still be discarded and re-requested
335 */
336static int rxrpc_process_soft_ACKs(struct rxrpc_call *call,
337				   struct rxrpc_ackpacket *ack,
338				   struct sk_buff *skb)
339{
340	struct rxrpc_skb_priv *sp;
341	struct sk_buff *txb;
342	unsigned long *p_txb, resend_at;
343	int loop;
344	u8 sacks[RXRPC_MAXACKS], resend;
345
346	_enter("{%d,%d},{%d},",
347	       call->acks_hard,
348	       CIRC_CNT(call->acks_head, call->acks_tail, call->acks_winsz),
349	       ack->nAcks);
350
351	if (skb_copy_bits(skb, 0, sacks, ack->nAcks) < 0)
352		goto protocol_error;
353
354	resend = 0;
355	resend_at = 0;
356	for (loop = 0; loop < ack->nAcks; loop++) {
357		p_txb = call->acks_window;
358		p_txb += (call->acks_tail + loop) & (call->acks_winsz - 1);
359		smp_read_barrier_depends();
360		txb = (struct sk_buff *) (*p_txb & ~1);
361		sp = rxrpc_skb(txb);
362
363		switch (sacks[loop]) {
364		case RXRPC_ACK_TYPE_ACK:
365			sp->need_resend = false;
366			*p_txb |= 1;
367			break;
368		case RXRPC_ACK_TYPE_NACK:
369			sp->need_resend = true;
370			*p_txb &= ~1;
371			resend = 1;
372			break;
373		default:
374			_debug("Unsupported ACK type %d", sacks[loop]);
375			goto protocol_error;
376		}
377	}
378
379	smp_mb();
380	call->acks_unacked = (call->acks_tail + loop) & (call->acks_winsz - 1);
381
382	/* anything not explicitly ACK'd is implicitly NACK'd, but may just not
383	 * have been received or processed yet by the far end */
384	for (loop = call->acks_unacked;
385	     loop != call->acks_head;
386	     loop = (loop + 1) &  (call->acks_winsz - 1)
387	     ) {
388		p_txb = call->acks_window + loop;
389		smp_read_barrier_depends();
390		txb = (struct sk_buff *) (*p_txb & ~1);
391		sp = rxrpc_skb(txb);
392
393		if (*p_txb & 1) {
394			/* packet must have been discarded */
395			sp->need_resend = true;
396			*p_txb &= ~1;
397			resend |= 1;
398		} else if (sp->need_resend) {
399			;
400		} else if (time_after_eq(jiffies + 1, sp->resend_at)) {
401			sp->need_resend = true;
402			resend |= 1;
403		} else if (resend & 2) {
404			if (time_before(sp->resend_at, resend_at))
405				resend_at = sp->resend_at;
406		} else {
407			resend_at = sp->resend_at;
408			resend |= 2;
409		}
410	}
411
412	rxrpc_set_resend(call, resend, resend_at);
413	_leave(" = 0");
414	return 0;
415
416protocol_error:
417	_leave(" = -EPROTO");
418	return -EPROTO;
419}
420
421/*
422 * discard hard-ACK'd packets from the Tx window
423 */
424static void rxrpc_rotate_tx_window(struct rxrpc_call *call, u32 hard)
425{
426	unsigned long _skb;
427	int tail = call->acks_tail, old_tail;
428	int win = CIRC_CNT(call->acks_head, tail, call->acks_winsz);
429
430	_enter("{%u,%u},%u", call->acks_hard, win, hard);
431
432	ASSERTCMP(hard - call->acks_hard, <=, win);
433
434	while (call->acks_hard < hard) {
435		smp_read_barrier_depends();
436		_skb = call->acks_window[tail] & ~1;
437		rxrpc_free_skb((struct sk_buff *) _skb);
438		old_tail = tail;
439		tail = (tail + 1) & (call->acks_winsz - 1);
440		call->acks_tail = tail;
441		if (call->acks_unacked == old_tail)
442			call->acks_unacked = tail;
443		call->acks_hard++;
444	}
445
446	wake_up(&call->tx_waitq);
447}
448
449/*
450 * clear the Tx window in the event of a failure
451 */
452static void rxrpc_clear_tx_window(struct rxrpc_call *call)
453{
454	rxrpc_rotate_tx_window(call, atomic_read(&call->sequence));
455}
456
457/*
458 * drain the out of sequence received packet queue into the packet Rx queue
459 */
460static int rxrpc_drain_rx_oos_queue(struct rxrpc_call *call)
461{
462	struct rxrpc_skb_priv *sp;
463	struct sk_buff *skb;
464	bool terminal;
465	int ret;
466
467	_enter("{%d,%d}", call->rx_data_post, call->rx_first_oos);
468
469	spin_lock_bh(&call->lock);
470
471	ret = -ECONNRESET;
472	if (test_bit(RXRPC_CALL_RELEASED, &call->flags))
473		goto socket_unavailable;
474
475	skb = skb_dequeue(&call->rx_oos_queue);
476	if (skb) {
477		sp = rxrpc_skb(skb);
478
479		_debug("drain OOS packet %d [%d]",
480		       ntohl(sp->hdr.seq), call->rx_first_oos);
481
482		if (ntohl(sp->hdr.seq) != call->rx_first_oos) {
483			skb_queue_head(&call->rx_oos_queue, skb);
484			call->rx_first_oos = ntohl(rxrpc_skb(skb)->hdr.seq);
485			_debug("requeue %p {%u}", skb, call->rx_first_oos);
486		} else {
487			skb->mark = RXRPC_SKB_MARK_DATA;
488			terminal = ((sp->hdr.flags & RXRPC_LAST_PACKET) &&
489				!(sp->hdr.flags & RXRPC_CLIENT_INITIATED));
490			ret = rxrpc_queue_rcv_skb(call, skb, true, terminal);
491			BUG_ON(ret < 0);
492			_debug("drain #%u", call->rx_data_post);
493			call->rx_data_post++;
494
495			/* find out what the next packet is */
496			skb = skb_peek(&call->rx_oos_queue);
497			if (skb)
498				call->rx_first_oos =
499					ntohl(rxrpc_skb(skb)->hdr.seq);
500			else
501				call->rx_first_oos = 0;
502			_debug("peek %p {%u}", skb, call->rx_first_oos);
503		}
504	}
505
506	ret = 0;
507socket_unavailable:
508	spin_unlock_bh(&call->lock);
509	_leave(" = %d", ret);
510	return ret;
511}
512
513/*
514 * insert an out of sequence packet into the buffer
515 */
516static void rxrpc_insert_oos_packet(struct rxrpc_call *call,
517				    struct sk_buff *skb)
518{
519	struct rxrpc_skb_priv *sp, *psp;
520	struct sk_buff *p;
521	u32 seq;
522
523	sp = rxrpc_skb(skb);
524	seq = ntohl(sp->hdr.seq);
525	_enter(",,{%u}", seq);
526
527	skb->destructor = rxrpc_packet_destructor;
528	ASSERTCMP(sp->call, ==, NULL);
529	sp->call = call;
530	rxrpc_get_call(call);
531
532	/* insert into the buffer in sequence order */
533	spin_lock_bh(&call->lock);
534
535	skb_queue_walk(&call->rx_oos_queue, p) {
536		psp = rxrpc_skb(p);
537		if (ntohl(psp->hdr.seq) > seq) {
538			_debug("insert oos #%u before #%u",
539			       seq, ntohl(psp->hdr.seq));
540			skb_insert(p, skb, &call->rx_oos_queue);
541			goto inserted;
542		}
543	}
544
545	_debug("append oos #%u", seq);
546	skb_queue_tail(&call->rx_oos_queue, skb);
547inserted:
548
549	/* we might now have a new front to the queue */
550	if (call->rx_first_oos == 0 || seq < call->rx_first_oos)
551		call->rx_first_oos = seq;
552
553	read_lock(&call->state_lock);
554	if (call->state < RXRPC_CALL_COMPLETE &&
555	    call->rx_data_post == call->rx_first_oos) {
556		_debug("drain rx oos now");
557		set_bit(RXRPC_CALL_DRAIN_RX_OOS, &call->events);
558	}
559	read_unlock(&call->state_lock);
560
561	spin_unlock_bh(&call->lock);
562	_leave(" [stored #%u]", call->rx_first_oos);
563}
564
565/*
566 * clear the Tx window on final ACK reception
567 */
568static void rxrpc_zap_tx_window(struct rxrpc_call *call)
569{
570	struct rxrpc_skb_priv *sp;
571	struct sk_buff *skb;
572	unsigned long _skb, *acks_window;
573	u8 winsz = call->acks_winsz;
574	int tail;
575
576	acks_window = call->acks_window;
577	call->acks_window = NULL;
578
579	while (CIRC_CNT(call->acks_head, call->acks_tail, winsz) > 0) {
580		tail = call->acks_tail;
581		smp_read_barrier_depends();
582		_skb = acks_window[tail] & ~1;
583		smp_mb();
584		call->acks_tail = (call->acks_tail + 1) & (winsz - 1);
585
586		skb = (struct sk_buff *) _skb;
587		sp = rxrpc_skb(skb);
588		_debug("+++ clear Tx %u", ntohl(sp->hdr.seq));
589		rxrpc_free_skb(skb);
590	}
591
592	kfree(acks_window);
593}
594
595/*
596 * process the extra information that may be appended to an ACK packet
597 */
598static void rxrpc_extract_ackinfo(struct rxrpc_call *call, struct sk_buff *skb,
599				  unsigned int latest, int nAcks)
600{
601	struct rxrpc_ackinfo ackinfo;
602	struct rxrpc_peer *peer;
603	unsigned int mtu;
604
605	if (skb_copy_bits(skb, nAcks + 3, &ackinfo, sizeof(ackinfo)) < 0) {
606		_leave(" [no ackinfo]");
607		return;
608	}
609
610	_proto("Rx ACK %%%u Info { rx=%u max=%u rwin=%u jm=%u }",
611	       latest,
612	       ntohl(ackinfo.rxMTU), ntohl(ackinfo.maxMTU),
613	       ntohl(ackinfo.rwind), ntohl(ackinfo.jumbo_max));
614
615	mtu = min(ntohl(ackinfo.rxMTU), ntohl(ackinfo.maxMTU));
616
617	peer = call->conn->trans->peer;
618	if (mtu < peer->maxdata) {
619		spin_lock_bh(&peer->lock);
620		peer->maxdata = mtu;
621		peer->mtu = mtu + peer->hdrsize;
622		spin_unlock_bh(&peer->lock);
623		_net("Net MTU %u (maxdata %u)", peer->mtu, peer->maxdata);
624	}
625}
626
627/*
628 * process packets in the reception queue
629 */
630static int rxrpc_process_rx_queue(struct rxrpc_call *call,
631				  u32 *_abort_code)
632{
633	struct rxrpc_ackpacket ack;
634	struct rxrpc_skb_priv *sp;
635	struct sk_buff *skb;
636	bool post_ACK;
637	int latest;
638	u32 hard, tx;
639
640	_enter("");
641
642process_further:
643	skb = skb_dequeue(&call->rx_queue);
644	if (!skb)
645		return -EAGAIN;
646
647	_net("deferred skb %p", skb);
648
649	sp = rxrpc_skb(skb);
650
651	_debug("process %s [st %d]", rxrpc_pkts[sp->hdr.type], call->state);
652
653	post_ACK = false;
654
655	switch (sp->hdr.type) {
656		/* data packets that wind up here have been received out of
657		 * order, need security processing or are jumbo packets */
658	case RXRPC_PACKET_TYPE_DATA:
659		_proto("OOSQ DATA %%%u { #%u }",
660		       ntohl(sp->hdr.serial), ntohl(sp->hdr.seq));
661
662		/* secured packets must be verified and possibly decrypted */
663		if (rxrpc_verify_packet(call, skb, _abort_code) < 0)
664			goto protocol_error;
665
666		rxrpc_insert_oos_packet(call, skb);
667		goto process_further;
668
669		/* partial ACK to process */
670	case RXRPC_PACKET_TYPE_ACK:
671		if (skb_copy_bits(skb, 0, &ack, sizeof(ack)) < 0) {
672			_debug("extraction failure");
673			goto protocol_error;
674		}
675		if (!skb_pull(skb, sizeof(ack)))
676			BUG();
677
678		latest = ntohl(sp->hdr.serial);
679		hard = ntohl(ack.firstPacket);
680		tx = atomic_read(&call->sequence);
681
682		_proto("Rx ACK %%%u { m=%hu f=#%u p=#%u s=%%%u r=%s n=%u }",
683		       latest,
684		       ntohs(ack.maxSkew),
685		       hard,
686		       ntohl(ack.previousPacket),
687		       ntohl(ack.serial),
688		       rxrpc_acks(ack.reason),
689		       ack.nAcks);
690
691		rxrpc_extract_ackinfo(call, skb, latest, ack.nAcks);
692
693		if (ack.reason == RXRPC_ACK_PING) {
694			_proto("Rx ACK %%%u PING Request", latest);
695			rxrpc_propose_ACK(call, RXRPC_ACK_PING_RESPONSE,
696					  sp->hdr.serial, true);
697		}
698
699		/* discard any out-of-order or duplicate ACKs */
700		if (latest - call->acks_latest <= 0) {
701			_debug("discard ACK %d <= %d",
702			       latest, call->acks_latest);
703			goto discard;
704		}
705		call->acks_latest = latest;
706
707		if (call->state != RXRPC_CALL_CLIENT_SEND_REQUEST &&
708		    call->state != RXRPC_CALL_CLIENT_AWAIT_REPLY &&
709		    call->state != RXRPC_CALL_SERVER_SEND_REPLY &&
710		    call->state != RXRPC_CALL_SERVER_AWAIT_ACK)
711			goto discard;
712
713		_debug("Tx=%d H=%u S=%d", tx, call->acks_hard, call->state);
714
715		if (hard > 0) {
716			if (hard - 1 > tx) {
717				_debug("hard-ACK'd packet %d not transmitted"
718				       " (%d top)",
719				       hard - 1, tx);
720				goto protocol_error;
721			}
722
723			if ((call->state == RXRPC_CALL_CLIENT_AWAIT_REPLY ||
724			     call->state == RXRPC_CALL_SERVER_AWAIT_ACK) &&
725			    hard > tx)
726				goto all_acked;
727
728			smp_rmb();
729			rxrpc_rotate_tx_window(call, hard - 1);
730		}
731
732		if (ack.nAcks > 0) {
733			if (hard - 1 + ack.nAcks > tx) {
734				_debug("soft-ACK'd packet %d+%d not"
735				       " transmitted (%d top)",
736				       hard - 1, ack.nAcks, tx);
737				goto protocol_error;
738			}
739
740			if (rxrpc_process_soft_ACKs(call, &ack, skb) < 0)
741				goto protocol_error;
742		}
743		goto discard;
744
745		/* complete ACK to process */
746	case RXRPC_PACKET_TYPE_ACKALL:
747		goto all_acked;
748
749		/* abort and busy are handled elsewhere */
750	case RXRPC_PACKET_TYPE_BUSY:
751	case RXRPC_PACKET_TYPE_ABORT:
752		BUG();
753
754		/* connection level events - also handled elsewhere */
755	case RXRPC_PACKET_TYPE_CHALLENGE:
756	case RXRPC_PACKET_TYPE_RESPONSE:
757	case RXRPC_PACKET_TYPE_DEBUG:
758		BUG();
759	}
760
761	/* if we've had a hard ACK that covers all the packets we've sent, then
762	 * that ends that phase of the operation */
763all_acked:
764	write_lock_bh(&call->state_lock);
765	_debug("ack all %d", call->state);
766
767	switch (call->state) {
768	case RXRPC_CALL_CLIENT_AWAIT_REPLY:
769		call->state = RXRPC_CALL_CLIENT_RECV_REPLY;
770		break;
771	case RXRPC_CALL_SERVER_AWAIT_ACK:
772		_debug("srv complete");
773		call->state = RXRPC_CALL_COMPLETE;
774		post_ACK = true;
775		break;
776	case RXRPC_CALL_CLIENT_SEND_REQUEST:
777	case RXRPC_CALL_SERVER_RECV_REQUEST:
778		goto protocol_error_unlock; /* can't occur yet */
779	default:
780		write_unlock_bh(&call->state_lock);
781		goto discard; /* assume packet left over from earlier phase */
782	}
783
784	write_unlock_bh(&call->state_lock);
785
786	/* if all the packets we sent are hard-ACK'd, then we can discard
787	 * whatever we've got left */
788	_debug("clear Tx %d",
789	       CIRC_CNT(call->acks_head, call->acks_tail, call->acks_winsz));
790
791	del_timer_sync(&call->resend_timer);
792	clear_bit(RXRPC_CALL_RUN_RTIMER, &call->flags);
793	clear_bit(RXRPC_CALL_RESEND_TIMER, &call->events);
794
795	if (call->acks_window)
796		rxrpc_zap_tx_window(call);
797
798	if (post_ACK) {
799		/* post the final ACK message for userspace to pick up */
800		_debug("post ACK");
801		skb->mark = RXRPC_SKB_MARK_FINAL_ACK;
802		sp->call = call;
803		rxrpc_get_call(call);
804		spin_lock_bh(&call->lock);
805		if (rxrpc_queue_rcv_skb(call, skb, true, true) < 0)
806			BUG();
807		spin_unlock_bh(&call->lock);
808		goto process_further;
809	}
810
811discard:
812	rxrpc_free_skb(skb);
813	goto process_further;
814
815protocol_error_unlock:
816	write_unlock_bh(&call->state_lock);
817protocol_error:
818	rxrpc_free_skb(skb);
819	_leave(" = -EPROTO");
820	return -EPROTO;
821}
822
823/*
824 * post a message to the socket Rx queue for recvmsg() to pick up
825 */
826static int rxrpc_post_message(struct rxrpc_call *call, u32 mark, u32 error,
827			      bool fatal)
828{
829	struct rxrpc_skb_priv *sp;
830	struct sk_buff *skb;
831	int ret;
832
833	_enter("{%d,%lx},%u,%u,%d",
834	       call->debug_id, call->flags, mark, error, fatal);
835
836	/* remove timers and things for fatal messages */
837	if (fatal) {
838		del_timer_sync(&call->resend_timer);
839		del_timer_sync(&call->ack_timer);
840		clear_bit(RXRPC_CALL_RUN_RTIMER, &call->flags);
841	}
842
843	if (mark != RXRPC_SKB_MARK_NEW_CALL &&
844	    !test_bit(RXRPC_CALL_HAS_USERID, &call->flags)) {
845		_leave("[no userid]");
846		return 0;
847	}
848
849	if (!test_bit(RXRPC_CALL_TERMINAL_MSG, &call->flags)) {
850		skb = alloc_skb(0, GFP_NOFS);
851		if (!skb)
852			return -ENOMEM;
853
854		rxrpc_new_skb(skb);
855
856		skb->mark = mark;
857
858		sp = rxrpc_skb(skb);
859		memset(sp, 0, sizeof(*sp));
860		sp->error = error;
861		sp->call = call;
862		rxrpc_get_call(call);
863
864		spin_lock_bh(&call->lock);
865		ret = rxrpc_queue_rcv_skb(call, skb, true, fatal);
866		spin_unlock_bh(&call->lock);
867		BUG_ON(ret < 0);
868	}
869
870	return 0;
871}
872
873/*
874 * handle background processing of incoming call packets and ACK / abort
875 * generation
876 */
877void rxrpc_process_call(struct work_struct *work)
878{
879	struct rxrpc_call *call =
880		container_of(work, struct rxrpc_call, processor);
881	struct rxrpc_ackpacket ack;
882	struct rxrpc_ackinfo ackinfo;
883	struct rxrpc_header hdr;
884	struct msghdr msg;
885	struct kvec iov[5];
886	unsigned long bits;
887	__be32 data, pad;
888	size_t len;
889	int genbit, loop, nbit, ioc, ret, mtu;
890	u32 abort_code = RX_PROTOCOL_ERROR;
891	u8 *acks = NULL;
892
893	//printk("\n--------------------\n");
894	_enter("{%d,%s,%lx} [%lu]",
895	       call->debug_id, rxrpc_call_states[call->state], call->events,
896	       (jiffies - call->creation_jif) / (HZ / 10));
897
898	if (test_and_set_bit(RXRPC_CALL_PROC_BUSY, &call->flags)) {
899		_debug("XXXXXXXXXXXXX RUNNING ON MULTIPLE CPUS XXXXXXXXXXXXX");
900		return;
901	}
902
903	/* there's a good chance we're going to have to send a message, so set
904	 * one up in advance */
905	msg.msg_name	= &call->conn->trans->peer->srx.transport.sin;
906	msg.msg_namelen	= sizeof(call->conn->trans->peer->srx.transport.sin);
907	msg.msg_control	= NULL;
908	msg.msg_controllen = 0;
909	msg.msg_flags	= 0;
910
911	hdr.epoch	= call->conn->epoch;
912	hdr.cid		= call->cid;
913	hdr.callNumber	= call->call_id;
914	hdr.seq		= 0;
915	hdr.type	= RXRPC_PACKET_TYPE_ACK;
916	hdr.flags	= call->conn->out_clientflag;
917	hdr.userStatus	= 0;
918	hdr.securityIndex = call->conn->security_ix;
919	hdr._rsvd	= 0;
920	hdr.serviceId	= call->conn->service_id;
921
922	memset(iov, 0, sizeof(iov));
923	iov[0].iov_base	= &hdr;
924	iov[0].iov_len	= sizeof(hdr);
925
926	/* deal with events of a final nature */
927	if (test_bit(RXRPC_CALL_RELEASE, &call->events)) {
928		rxrpc_release_call(call);
929		clear_bit(RXRPC_CALL_RELEASE, &call->events);
930	}
931
932	if (test_bit(RXRPC_CALL_RCVD_ERROR, &call->events)) {
933		int error;
934
935		clear_bit(RXRPC_CALL_CONN_ABORT, &call->events);
936		clear_bit(RXRPC_CALL_REJECT_BUSY, &call->events);
937		clear_bit(RXRPC_CALL_ABORT, &call->events);
938
939		error = call->conn->trans->peer->net_error;
940		_debug("post net error %d", error);
941
942		if (rxrpc_post_message(call, RXRPC_SKB_MARK_NET_ERROR,
943				       error, true) < 0)
944			goto no_mem;
945		clear_bit(RXRPC_CALL_RCVD_ERROR, &call->events);
946		goto kill_ACKs;
947	}
948
949	if (test_bit(RXRPC_CALL_CONN_ABORT, &call->events)) {
950		ASSERTCMP(call->state, >, RXRPC_CALL_COMPLETE);
951
952		clear_bit(RXRPC_CALL_REJECT_BUSY, &call->events);
953		clear_bit(RXRPC_CALL_ABORT, &call->events);
954
955		_debug("post conn abort");
956
957		if (rxrpc_post_message(call, RXRPC_SKB_MARK_LOCAL_ERROR,
958				       call->conn->error, true) < 0)
959			goto no_mem;
960		clear_bit(RXRPC_CALL_CONN_ABORT, &call->events);
961		goto kill_ACKs;
962	}
963
964	if (test_bit(RXRPC_CALL_REJECT_BUSY, &call->events)) {
965		hdr.type = RXRPC_PACKET_TYPE_BUSY;
966		genbit = RXRPC_CALL_REJECT_BUSY;
967		goto send_message;
968	}
969
970	if (test_bit(RXRPC_CALL_ABORT, &call->events)) {
971		ASSERTCMP(call->state, >, RXRPC_CALL_COMPLETE);
972
973		if (rxrpc_post_message(call, RXRPC_SKB_MARK_LOCAL_ERROR,
974				       ECONNABORTED, true) < 0)
975			goto no_mem;
976		hdr.type = RXRPC_PACKET_TYPE_ABORT;
977		data = htonl(call->abort_code);
978		iov[1].iov_base = &data;
979		iov[1].iov_len = sizeof(data);
980		genbit = RXRPC_CALL_ABORT;
981		goto send_message;
982	}
983
984	if (test_bit(RXRPC_CALL_ACK_FINAL, &call->events)) {
985		genbit = RXRPC_CALL_ACK_FINAL;
986
987		ack.bufferSpace	= htons(8);
988		ack.maxSkew	= 0;
989		ack.serial	= 0;
990		ack.reason	= RXRPC_ACK_IDLE;
991		ack.nAcks	= 0;
992		call->ackr_reason = 0;
993
994		spin_lock_bh(&call->lock);
995		ack.serial = call->ackr_serial;
996		ack.previousPacket = call->ackr_prev_seq;
997		ack.firstPacket = htonl(call->rx_data_eaten + 1);
998		spin_unlock_bh(&call->lock);
999
1000		pad = 0;
1001
1002		iov[1].iov_base = &ack;
1003		iov[1].iov_len	= sizeof(ack);
1004		iov[2].iov_base = &pad;
1005		iov[2].iov_len	= 3;
1006		iov[3].iov_base = &ackinfo;
1007		iov[3].iov_len	= sizeof(ackinfo);
1008		goto send_ACK;
1009	}
1010
1011	if (call->events & ((1 << RXRPC_CALL_RCVD_BUSY) |
1012			    (1 << RXRPC_CALL_RCVD_ABORT))
1013	    ) {
1014		u32 mark;
1015
1016		if (test_bit(RXRPC_CALL_RCVD_ABORT, &call->events))
1017			mark = RXRPC_SKB_MARK_REMOTE_ABORT;
1018		else
1019			mark = RXRPC_SKB_MARK_BUSY;
1020
1021		_debug("post abort/busy");
1022		rxrpc_clear_tx_window(call);
1023		if (rxrpc_post_message(call, mark, ECONNABORTED, true) < 0)
1024			goto no_mem;
1025
1026		clear_bit(RXRPC_CALL_RCVD_BUSY, &call->events);
1027		clear_bit(RXRPC_CALL_RCVD_ABORT, &call->events);
1028		goto kill_ACKs;
1029	}
1030
1031	if (test_and_clear_bit(RXRPC_CALL_RCVD_ACKALL, &call->events)) {
1032		_debug("do implicit ackall");
1033		rxrpc_clear_tx_window(call);
1034	}
1035
1036	if (test_bit(RXRPC_CALL_LIFE_TIMER, &call->events)) {
1037		write_lock_bh(&call->state_lock);
1038		if (call->state <= RXRPC_CALL_COMPLETE) {
1039			call->state = RXRPC_CALL_LOCALLY_ABORTED;
1040			call->abort_code = RX_CALL_TIMEOUT;
1041			set_bit(RXRPC_CALL_ABORT, &call->events);
1042		}
1043		write_unlock_bh(&call->state_lock);
1044
1045		_debug("post timeout");
1046		if (rxrpc_post_message(call, RXRPC_SKB_MARK_LOCAL_ERROR,
1047				       ETIME, true) < 0)
1048			goto no_mem;
1049
1050		clear_bit(RXRPC_CALL_LIFE_TIMER, &call->events);
1051		goto kill_ACKs;
1052	}
1053
1054	/* deal with assorted inbound messages */
1055	if (!skb_queue_empty(&call->rx_queue)) {
1056		switch (rxrpc_process_rx_queue(call, &abort_code)) {
1057		case 0:
1058		case -EAGAIN:
1059			break;
1060		case -ENOMEM:
1061			goto no_mem;
1062		case -EKEYEXPIRED:
1063		case -EKEYREJECTED:
1064		case -EPROTO:
1065			rxrpc_abort_call(call, abort_code);
1066			goto kill_ACKs;
1067		}
1068	}
1069
1070	/* handle resending */
1071	if (test_and_clear_bit(RXRPC_CALL_RESEND_TIMER, &call->events))
1072		rxrpc_resend_timer(call);
1073	if (test_and_clear_bit(RXRPC_CALL_RESEND, &call->events))
1074		rxrpc_resend(call);
1075
1076	/* consider sending an ordinary ACK */
1077	if (test_bit(RXRPC_CALL_ACK, &call->events)) {
1078		_debug("send ACK: window: %d - %d { %lx }",
1079		       call->rx_data_eaten, call->ackr_win_top,
1080		       call->ackr_window[0]);
1081
1082		if (call->state > RXRPC_CALL_SERVER_ACK_REQUEST &&
1083		    call->ackr_reason != RXRPC_ACK_PING_RESPONSE) {
1084			/* ACK by sending reply DATA packet in this state */
1085			clear_bit(RXRPC_CALL_ACK, &call->events);
1086			goto maybe_reschedule;
1087		}
1088
1089		genbit = RXRPC_CALL_ACK;
1090
1091		acks = kzalloc(call->ackr_win_top - call->rx_data_eaten,
1092			       GFP_NOFS);
1093		if (!acks)
1094			goto no_mem;
1095
1096		//hdr.flags	= RXRPC_SLOW_START_OK;
1097		ack.bufferSpace	= htons(8);
1098		ack.maxSkew	= 0;
1099		ack.serial	= 0;
1100		ack.reason	= 0;
1101
1102		spin_lock_bh(&call->lock);
1103		ack.reason = call->ackr_reason;
1104		ack.serial = call->ackr_serial;
1105		ack.previousPacket = call->ackr_prev_seq;
1106		ack.firstPacket = htonl(call->rx_data_eaten + 1);
1107
1108		ack.nAcks = 0;
1109		for (loop = 0; loop < RXRPC_ACKR_WINDOW_ASZ; loop++) {
1110			nbit = loop * BITS_PER_LONG;
1111			for (bits = call->ackr_window[loop]; bits; bits >>= 1
1112			     ) {
1113				_debug("- l=%d n=%d b=%lx", loop, nbit, bits);
1114				if (bits & 1) {
1115					acks[nbit] = RXRPC_ACK_TYPE_ACK;
1116					ack.nAcks = nbit + 1;
1117				}
1118				nbit++;
1119			}
1120		}
1121		call->ackr_reason = 0;
1122		spin_unlock_bh(&call->lock);
1123
1124		pad = 0;
1125
1126		iov[1].iov_base = &ack;
1127		iov[1].iov_len	= sizeof(ack);
1128		iov[2].iov_base = acks;
1129		iov[2].iov_len	= ack.nAcks;
1130		iov[3].iov_base = &pad;
1131		iov[3].iov_len	= 3;
1132		iov[4].iov_base = &ackinfo;
1133		iov[4].iov_len	= sizeof(ackinfo);
1134
1135		switch (ack.reason) {
1136		case RXRPC_ACK_REQUESTED:
1137		case RXRPC_ACK_DUPLICATE:
1138		case RXRPC_ACK_OUT_OF_SEQUENCE:
1139		case RXRPC_ACK_EXCEEDS_WINDOW:
1140		case RXRPC_ACK_NOSPACE:
1141		case RXRPC_ACK_PING:
1142		case RXRPC_ACK_PING_RESPONSE:
1143			goto send_ACK_with_skew;
1144		case RXRPC_ACK_DELAY:
1145		case RXRPC_ACK_IDLE:
1146			goto send_ACK;
1147		}
1148	}
1149
1150	/* handle completion of security negotiations on an incoming
1151	 * connection */
1152	if (test_and_clear_bit(RXRPC_CALL_SECURED, &call->events)) {
1153		_debug("secured");
1154		spin_lock_bh(&call->lock);
1155
1156		if (call->state == RXRPC_CALL_SERVER_SECURING) {
1157			_debug("securing");
1158			write_lock(&call->conn->lock);
1159			if (!test_bit(RXRPC_CALL_RELEASED, &call->flags) &&
1160			    !test_bit(RXRPC_CALL_RELEASE, &call->events)) {
1161				_debug("not released");
1162				call->state = RXRPC_CALL_SERVER_ACCEPTING;
1163				list_move_tail(&call->accept_link,
1164					       &call->socket->acceptq);
1165			}
1166			write_unlock(&call->conn->lock);
1167			read_lock(&call->state_lock);
1168			if (call->state < RXRPC_CALL_COMPLETE)
1169				set_bit(RXRPC_CALL_POST_ACCEPT, &call->events);
1170			read_unlock(&call->state_lock);
1171		}
1172
1173		spin_unlock_bh(&call->lock);
1174		if (!test_bit(RXRPC_CALL_POST_ACCEPT, &call->events))
1175			goto maybe_reschedule;
1176	}
1177
1178	/* post a notification of an acceptable connection to the app */
1179	if (test_bit(RXRPC_CALL_POST_ACCEPT, &call->events)) {
1180		_debug("post accept");
1181		if (rxrpc_post_message(call, RXRPC_SKB_MARK_NEW_CALL,
1182				       0, false) < 0)
1183			goto no_mem;
1184		clear_bit(RXRPC_CALL_POST_ACCEPT, &call->events);
1185		goto maybe_reschedule;
1186	}
1187
1188	/* handle incoming call acceptance */
1189	if (test_and_clear_bit(RXRPC_CALL_ACCEPTED, &call->events)) {
1190		_debug("accepted");
1191		ASSERTCMP(call->rx_data_post, ==, 0);
1192		call->rx_data_post = 1;
1193		read_lock_bh(&call->state_lock);
1194		if (call->state < RXRPC_CALL_COMPLETE)
1195			set_bit(RXRPC_CALL_DRAIN_RX_OOS, &call->events);
1196		read_unlock_bh(&call->state_lock);
1197	}
1198
1199	/* drain the out of sequence received packet queue into the packet Rx
1200	 * queue */
1201	if (test_and_clear_bit(RXRPC_CALL_DRAIN_RX_OOS, &call->events)) {
1202		while (call->rx_data_post == call->rx_first_oos)
1203			if (rxrpc_drain_rx_oos_queue(call) < 0)
1204				break;
1205		goto maybe_reschedule;
1206	}
1207
1208	/* other events may have been raised since we started checking */
1209	goto maybe_reschedule;
1210
1211send_ACK_with_skew:
1212	ack.maxSkew = htons(atomic_read(&call->conn->hi_serial) -
1213			    ntohl(ack.serial));
1214send_ACK:
1215	mtu = call->conn->trans->peer->if_mtu;
1216	mtu -= call->conn->trans->peer->hdrsize;
1217	ackinfo.maxMTU	= htonl(mtu);
1218	ackinfo.rwind	= htonl(rxrpc_rx_window_size);
1219
1220	/* permit the peer to send us jumbo packets if it wants to */
1221	ackinfo.rxMTU	= htonl(rxrpc_rx_mtu);
1222	ackinfo.jumbo_max = htonl(rxrpc_rx_jumbo_max);
1223
1224	hdr.serial = htonl(atomic_inc_return(&call->conn->serial));
1225	_proto("Tx ACK %%%u { m=%hu f=#%u p=#%u s=%%%u r=%s n=%u }",
1226	       ntohl(hdr.serial),
1227	       ntohs(ack.maxSkew),
1228	       ntohl(ack.firstPacket),
1229	       ntohl(ack.previousPacket),
1230	       ntohl(ack.serial),
1231	       rxrpc_acks(ack.reason),
1232	       ack.nAcks);
1233
1234	del_timer_sync(&call->ack_timer);
1235	if (ack.nAcks > 0)
1236		set_bit(RXRPC_CALL_TX_SOFT_ACK, &call->flags);
1237	goto send_message_2;
1238
1239send_message:
1240	_debug("send message");
1241
1242	hdr.serial = htonl(atomic_inc_return(&call->conn->serial));
1243	_proto("Tx %s %%%u", rxrpc_pkts[hdr.type], ntohl(hdr.serial));
1244send_message_2:
1245
1246	len = iov[0].iov_len;
1247	ioc = 1;
1248	if (iov[4].iov_len) {
1249		ioc = 5;
1250		len += iov[4].iov_len;
1251		len += iov[3].iov_len;
1252		len += iov[2].iov_len;
1253		len += iov[1].iov_len;
1254	} else if (iov[3].iov_len) {
1255		ioc = 4;
1256		len += iov[3].iov_len;
1257		len += iov[2].iov_len;
1258		len += iov[1].iov_len;
1259	} else if (iov[2].iov_len) {
1260		ioc = 3;
1261		len += iov[2].iov_len;
1262		len += iov[1].iov_len;
1263	} else if (iov[1].iov_len) {
1264		ioc = 2;
1265		len += iov[1].iov_len;
1266	}
1267
1268	ret = kernel_sendmsg(call->conn->trans->local->socket,
1269			     &msg, iov, ioc, len);
1270	if (ret < 0) {
1271		_debug("sendmsg failed: %d", ret);
1272		read_lock_bh(&call->state_lock);
1273		if (call->state < RXRPC_CALL_DEAD)
1274			rxrpc_queue_call(call);
1275		read_unlock_bh(&call->state_lock);
1276		goto error;
1277	}
1278
1279	switch (genbit) {
1280	case RXRPC_CALL_ABORT:
1281		clear_bit(genbit, &call->events);
1282		clear_bit(RXRPC_CALL_RCVD_ABORT, &call->events);
1283		goto kill_ACKs;
1284
1285	case RXRPC_CALL_ACK_FINAL:
1286		write_lock_bh(&call->state_lock);
1287		if (call->state == RXRPC_CALL_CLIENT_FINAL_ACK)
1288			call->state = RXRPC_CALL_COMPLETE;
1289		write_unlock_bh(&call->state_lock);
1290		goto kill_ACKs;
1291
1292	default:
1293		clear_bit(genbit, &call->events);
1294		switch (call->state) {
1295		case RXRPC_CALL_CLIENT_AWAIT_REPLY:
1296		case RXRPC_CALL_CLIENT_RECV_REPLY:
1297		case RXRPC_CALL_SERVER_RECV_REQUEST:
1298		case RXRPC_CALL_SERVER_ACK_REQUEST:
1299			_debug("start ACK timer");
1300			rxrpc_propose_ACK(call, RXRPC_ACK_DELAY,
1301					  call->ackr_serial, false);
1302		default:
1303			break;
1304		}
1305		goto maybe_reschedule;
1306	}
1307
1308kill_ACKs:
1309	del_timer_sync(&call->ack_timer);
1310	if (test_and_clear_bit(RXRPC_CALL_ACK_FINAL, &call->events))
1311		rxrpc_put_call(call);
1312	clear_bit(RXRPC_CALL_ACK, &call->events);
1313
1314maybe_reschedule:
1315	if (call->events || !skb_queue_empty(&call->rx_queue)) {
1316		read_lock_bh(&call->state_lock);
1317		if (call->state < RXRPC_CALL_DEAD)
1318			rxrpc_queue_call(call);
1319		read_unlock_bh(&call->state_lock);
1320	}
1321
1322	/* don't leave aborted connections on the accept queue */
1323	if (call->state >= RXRPC_CALL_COMPLETE &&
1324	    !list_empty(&call->accept_link)) {
1325		_debug("X unlinking once-pending call %p { e=%lx f=%lx c=%x }",
1326		       call, call->events, call->flags,
1327		       ntohl(call->conn->cid));
1328
1329		read_lock_bh(&call->state_lock);
1330		if (!test_bit(RXRPC_CALL_RELEASED, &call->flags) &&
1331		    !test_and_set_bit(RXRPC_CALL_RELEASE, &call->events))
1332			rxrpc_queue_call(call);
1333		read_unlock_bh(&call->state_lock);
1334	}
1335
1336error:
1337	clear_bit(RXRPC_CALL_PROC_BUSY, &call->flags);
1338	kfree(acks);
1339
1340	/* because we don't want two CPUs both processing the work item for one
1341	 * call at the same time, we use a flag to note when it's busy; however
1342	 * this means there's a race between clearing the flag and setting the
1343	 * work pending bit and the work item being processed again */
1344	if (call->events && !work_pending(&call->processor)) {
1345		_debug("jumpstart %x", ntohl(call->conn->cid));
1346		rxrpc_queue_call(call);
1347	}
1348
1349	_leave("");
1350	return;
1351
1352no_mem:
1353	_debug("out of memory");
1354	goto maybe_reschedule;
1355}
1356