verbs.c revision 73806c8832b3438ef0439603dab1f3cfc61cb6cd
1/*
2 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
3 *
4 * This software is available to you under a choice of one of two
5 * licenses.  You may choose to be licensed under the terms of the GNU
6 * General Public License (GPL) Version 2, available from the file
7 * COPYING in the main directory of this source tree, or the BSD-type
8 * license below:
9 *
10 * Redistribution and use in source and binary forms, with or without
11 * modification, are permitted provided that the following conditions
12 * are met:
13 *
14 *      Redistributions of source code must retain the above copyright
15 *      notice, this list of conditions and the following disclaimer.
16 *
17 *      Redistributions in binary form must reproduce the above
18 *      copyright notice, this list of conditions and the following
19 *      disclaimer in the documentation and/or other materials provided
20 *      with the distribution.
21 *
22 *      Neither the name of the Network Appliance, Inc. nor the names of
23 *      its contributors may be used to endorse or promote products
24 *      derived from this software without specific prior written
25 *      permission.
26 *
27 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
28 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
29 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
30 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
31 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
32 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
33 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
34 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
35 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
36 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
37 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
38 */
39
40/*
41 * verbs.c
42 *
43 * Encapsulates the major functions managing:
44 *  o adapters
45 *  o endpoints
46 *  o connections
47 *  o buffer memory
48 */
49
50#include <linux/interrupt.h>
51#include <linux/slab.h>
52#include <asm/bitops.h>
53
54#include "xprt_rdma.h"
55
56/*
57 * Globals/Macros
58 */
59
60#ifdef RPC_DEBUG
61# define RPCDBG_FACILITY	RPCDBG_TRANS
62#endif
63
64/*
65 * internal functions
66 */
67
68/*
69 * handle replies in tasklet context, using a single, global list
70 * rdma tasklet function -- just turn around and call the func
71 * for all replies on the list
72 */
73
74static DEFINE_SPINLOCK(rpcrdma_tk_lock_g);
75static LIST_HEAD(rpcrdma_tasklets_g);
76
77static void
78rpcrdma_run_tasklet(unsigned long data)
79{
80	struct rpcrdma_rep *rep;
81	void (*func)(struct rpcrdma_rep *);
82	unsigned long flags;
83
84	data = data;
85	spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
86	while (!list_empty(&rpcrdma_tasklets_g)) {
87		rep = list_entry(rpcrdma_tasklets_g.next,
88				 struct rpcrdma_rep, rr_list);
89		list_del(&rep->rr_list);
90		func = rep->rr_func;
91		rep->rr_func = NULL;
92		spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
93
94		if (func)
95			func(rep);
96		else
97			rpcrdma_recv_buffer_put(rep);
98
99		spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
100	}
101	spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
102}
103
104static DECLARE_TASKLET(rpcrdma_tasklet_g, rpcrdma_run_tasklet, 0UL);
105
106static inline void
107rpcrdma_schedule_tasklet(struct rpcrdma_rep *rep)
108{
109	unsigned long flags;
110
111	spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
112	list_add_tail(&rep->rr_list, &rpcrdma_tasklets_g);
113	spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
114	tasklet_schedule(&rpcrdma_tasklet_g);
115}
116
117static void
118rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context)
119{
120	struct rpcrdma_ep *ep = context;
121
122	dprintk("RPC:       %s: QP error %X on device %s ep %p\n",
123		__func__, event->event, event->device->name, context);
124	if (ep->rep_connected == 1) {
125		ep->rep_connected = -EIO;
126		ep->rep_func(ep);
127		wake_up_all(&ep->rep_connect_wait);
128	}
129}
130
131static void
132rpcrdma_cq_async_error_upcall(struct ib_event *event, void *context)
133{
134	struct rpcrdma_ep *ep = context;
135
136	dprintk("RPC:       %s: CQ error %X on device %s ep %p\n",
137		__func__, event->event, event->device->name, context);
138	if (ep->rep_connected == 1) {
139		ep->rep_connected = -EIO;
140		ep->rep_func(ep);
141		wake_up_all(&ep->rep_connect_wait);
142	}
143}
144
145static void
146rpcrdma_sendcq_process_wc(struct ib_wc *wc)
147{
148	struct rpcrdma_mw *frmr = (struct rpcrdma_mw *)(unsigned long)wc->wr_id;
149
150	dprintk("RPC:       %s: frmr %p status %X opcode %d\n",
151		__func__, frmr, wc->status, wc->opcode);
152
153	if (wc->wr_id == 0ULL)
154		return;
155	if (wc->status != IB_WC_SUCCESS)
156		return;
157
158	if (wc->opcode == IB_WC_FAST_REG_MR)
159		frmr->r.frmr.state = FRMR_IS_VALID;
160	else if (wc->opcode == IB_WC_LOCAL_INV)
161		frmr->r.frmr.state = FRMR_IS_INVALID;
162}
163
164static int
165rpcrdma_sendcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep)
166{
167	struct ib_wc *wcs;
168	int budget, count, rc;
169
170	budget = RPCRDMA_WC_BUDGET / RPCRDMA_POLLSIZE;
171	do {
172		wcs = ep->rep_send_wcs;
173
174		rc = ib_poll_cq(cq, RPCRDMA_POLLSIZE, wcs);
175		if (rc <= 0)
176			return rc;
177
178		count = rc;
179		while (count-- > 0)
180			rpcrdma_sendcq_process_wc(wcs++);
181	} while (rc == RPCRDMA_POLLSIZE && --budget);
182	return 0;
183}
184
185/*
186 * Handle send, fast_reg_mr, and local_inv completions.
187 *
188 * Send events are typically suppressed and thus do not result
189 * in an upcall. Occasionally one is signaled, however. This
190 * prevents the provider's completion queue from wrapping and
191 * losing a completion.
192 */
193static void
194rpcrdma_sendcq_upcall(struct ib_cq *cq, void *cq_context)
195{
196	struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context;
197	int rc;
198
199	rc = rpcrdma_sendcq_poll(cq, ep);
200	if (rc) {
201		dprintk("RPC:       %s: ib_poll_cq failed: %i\n",
202			__func__, rc);
203		return;
204	}
205
206	rc = ib_req_notify_cq(cq,
207			IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS);
208	if (rc == 0)
209		return;
210	if (rc < 0) {
211		dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
212			__func__, rc);
213		return;
214	}
215
216	rpcrdma_sendcq_poll(cq, ep);
217}
218
219static void
220rpcrdma_recvcq_process_wc(struct ib_wc *wc)
221{
222	struct rpcrdma_rep *rep =
223			(struct rpcrdma_rep *)(unsigned long)wc->wr_id;
224
225	dprintk("RPC:       %s: rep %p status %X opcode %X length %u\n",
226		__func__, rep, wc->status, wc->opcode, wc->byte_len);
227
228	if (wc->status != IB_WC_SUCCESS) {
229		rep->rr_len = ~0U;
230		goto out_schedule;
231	}
232	if (wc->opcode != IB_WC_RECV)
233		return;
234
235	rep->rr_len = wc->byte_len;
236	ib_dma_sync_single_for_cpu(rdmab_to_ia(rep->rr_buffer)->ri_id->device,
237			rep->rr_iov.addr, rep->rr_len, DMA_FROM_DEVICE);
238
239	if (rep->rr_len >= 16) {
240		struct rpcrdma_msg *p = (struct rpcrdma_msg *)rep->rr_base;
241		unsigned int credits = ntohl(p->rm_credit);
242
243		if (credits == 0)
244			credits = 1;	/* don't deadlock */
245		else if (credits > rep->rr_buffer->rb_max_requests)
246			credits = rep->rr_buffer->rb_max_requests;
247		atomic_set(&rep->rr_buffer->rb_credits, credits);
248	}
249
250out_schedule:
251	rpcrdma_schedule_tasklet(rep);
252}
253
254static int
255rpcrdma_recvcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep)
256{
257	struct ib_wc *wcs;
258	int budget, count, rc;
259
260	budget = RPCRDMA_WC_BUDGET / RPCRDMA_POLLSIZE;
261	do {
262		wcs = ep->rep_recv_wcs;
263
264		rc = ib_poll_cq(cq, RPCRDMA_POLLSIZE, wcs);
265		if (rc <= 0)
266			return rc;
267
268		count = rc;
269		while (count-- > 0)
270			rpcrdma_recvcq_process_wc(wcs++);
271	} while (rc == RPCRDMA_POLLSIZE && --budget);
272	return 0;
273}
274
275/*
276 * Handle receive completions.
277 *
278 * It is reentrant but processes single events in order to maintain
279 * ordering of receives to keep server credits.
280 *
281 * It is the responsibility of the scheduled tasklet to return
282 * recv buffers to the pool. NOTE: this affects synchronization of
283 * connection shutdown. That is, the structures required for
284 * the completion of the reply handler must remain intact until
285 * all memory has been reclaimed.
286 */
287static void
288rpcrdma_recvcq_upcall(struct ib_cq *cq, void *cq_context)
289{
290	struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context;
291	int rc;
292
293	rc = rpcrdma_recvcq_poll(cq, ep);
294	if (rc) {
295		dprintk("RPC:       %s: ib_poll_cq failed: %i\n",
296			__func__, rc);
297		return;
298	}
299
300	rc = ib_req_notify_cq(cq,
301			IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS);
302	if (rc == 0)
303		return;
304	if (rc < 0) {
305		dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
306			__func__, rc);
307		return;
308	}
309
310	rpcrdma_recvcq_poll(cq, ep);
311}
312
313#ifdef RPC_DEBUG
314static const char * const conn[] = {
315	"address resolved",
316	"address error",
317	"route resolved",
318	"route error",
319	"connect request",
320	"connect response",
321	"connect error",
322	"unreachable",
323	"rejected",
324	"established",
325	"disconnected",
326	"device removal"
327};
328#endif
329
330static int
331rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
332{
333	struct rpcrdma_xprt *xprt = id->context;
334	struct rpcrdma_ia *ia = &xprt->rx_ia;
335	struct rpcrdma_ep *ep = &xprt->rx_ep;
336#ifdef RPC_DEBUG
337	struct sockaddr_in *addr = (struct sockaddr_in *) &ep->rep_remote_addr;
338#endif
339	struct ib_qp_attr attr;
340	struct ib_qp_init_attr iattr;
341	int connstate = 0;
342
343	switch (event->event) {
344	case RDMA_CM_EVENT_ADDR_RESOLVED:
345	case RDMA_CM_EVENT_ROUTE_RESOLVED:
346		ia->ri_async_rc = 0;
347		complete(&ia->ri_done);
348		break;
349	case RDMA_CM_EVENT_ADDR_ERROR:
350		ia->ri_async_rc = -EHOSTUNREACH;
351		dprintk("RPC:       %s: CM address resolution error, ep 0x%p\n",
352			__func__, ep);
353		complete(&ia->ri_done);
354		break;
355	case RDMA_CM_EVENT_ROUTE_ERROR:
356		ia->ri_async_rc = -ENETUNREACH;
357		dprintk("RPC:       %s: CM route resolution error, ep 0x%p\n",
358			__func__, ep);
359		complete(&ia->ri_done);
360		break;
361	case RDMA_CM_EVENT_ESTABLISHED:
362		connstate = 1;
363		ib_query_qp(ia->ri_id->qp, &attr,
364			IB_QP_MAX_QP_RD_ATOMIC | IB_QP_MAX_DEST_RD_ATOMIC,
365			&iattr);
366		dprintk("RPC:       %s: %d responder resources"
367			" (%d initiator)\n",
368			__func__, attr.max_dest_rd_atomic, attr.max_rd_atomic);
369		goto connected;
370	case RDMA_CM_EVENT_CONNECT_ERROR:
371		connstate = -ENOTCONN;
372		goto connected;
373	case RDMA_CM_EVENT_UNREACHABLE:
374		connstate = -ENETDOWN;
375		goto connected;
376	case RDMA_CM_EVENT_REJECTED:
377		connstate = -ECONNREFUSED;
378		goto connected;
379	case RDMA_CM_EVENT_DISCONNECTED:
380		connstate = -ECONNABORTED;
381		goto connected;
382	case RDMA_CM_EVENT_DEVICE_REMOVAL:
383		connstate = -ENODEV;
384connected:
385		dprintk("RPC:       %s: %s: %pI4:%u (ep 0x%p event 0x%x)\n",
386			__func__,
387			(event->event <= 11) ? conn[event->event] :
388						"unknown connection error",
389			&addr->sin_addr.s_addr,
390			ntohs(addr->sin_port),
391			ep, event->event);
392		atomic_set(&rpcx_to_rdmax(ep->rep_xprt)->rx_buf.rb_credits, 1);
393		dprintk("RPC:       %s: %sconnected\n",
394					__func__, connstate > 0 ? "" : "dis");
395		ep->rep_connected = connstate;
396		ep->rep_func(ep);
397		wake_up_all(&ep->rep_connect_wait);
398		break;
399	default:
400		dprintk("RPC:       %s: unexpected CM event %d\n",
401			__func__, event->event);
402		break;
403	}
404
405#ifdef RPC_DEBUG
406	if (connstate == 1) {
407		int ird = attr.max_dest_rd_atomic;
408		int tird = ep->rep_remote_cma.responder_resources;
409		printk(KERN_INFO "rpcrdma: connection to %pI4:%u "
410			"on %s, memreg %d slots %d ird %d%s\n",
411			&addr->sin_addr.s_addr,
412			ntohs(addr->sin_port),
413			ia->ri_id->device->name,
414			ia->ri_memreg_strategy,
415			xprt->rx_buf.rb_max_requests,
416			ird, ird < 4 && ird < tird / 2 ? " (low!)" : "");
417	} else if (connstate < 0) {
418		printk(KERN_INFO "rpcrdma: connection to %pI4:%u closed (%d)\n",
419			&addr->sin_addr.s_addr,
420			ntohs(addr->sin_port),
421			connstate);
422	}
423#endif
424
425	return 0;
426}
427
428static struct rdma_cm_id *
429rpcrdma_create_id(struct rpcrdma_xprt *xprt,
430			struct rpcrdma_ia *ia, struct sockaddr *addr)
431{
432	struct rdma_cm_id *id;
433	int rc;
434
435	init_completion(&ia->ri_done);
436
437	id = rdma_create_id(rpcrdma_conn_upcall, xprt, RDMA_PS_TCP, IB_QPT_RC);
438	if (IS_ERR(id)) {
439		rc = PTR_ERR(id);
440		dprintk("RPC:       %s: rdma_create_id() failed %i\n",
441			__func__, rc);
442		return id;
443	}
444
445	ia->ri_async_rc = -ETIMEDOUT;
446	rc = rdma_resolve_addr(id, NULL, addr, RDMA_RESOLVE_TIMEOUT);
447	if (rc) {
448		dprintk("RPC:       %s: rdma_resolve_addr() failed %i\n",
449			__func__, rc);
450		goto out;
451	}
452	wait_for_completion_interruptible_timeout(&ia->ri_done,
453				msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
454	rc = ia->ri_async_rc;
455	if (rc)
456		goto out;
457
458	ia->ri_async_rc = -ETIMEDOUT;
459	rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT);
460	if (rc) {
461		dprintk("RPC:       %s: rdma_resolve_route() failed %i\n",
462			__func__, rc);
463		goto out;
464	}
465	wait_for_completion_interruptible_timeout(&ia->ri_done,
466				msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
467	rc = ia->ri_async_rc;
468	if (rc)
469		goto out;
470
471	return id;
472
473out:
474	rdma_destroy_id(id);
475	return ERR_PTR(rc);
476}
477
478/*
479 * Drain any cq, prior to teardown.
480 */
481static void
482rpcrdma_clean_cq(struct ib_cq *cq)
483{
484	struct ib_wc wc;
485	int count = 0;
486
487	while (1 == ib_poll_cq(cq, 1, &wc))
488		++count;
489
490	if (count)
491		dprintk("RPC:       %s: flushed %d events (last 0x%x)\n",
492			__func__, count, wc.opcode);
493}
494
495/*
496 * Exported functions.
497 */
498
499/*
500 * Open and initialize an Interface Adapter.
501 *  o initializes fields of struct rpcrdma_ia, including
502 *    interface and provider attributes and protection zone.
503 */
504int
505rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
506{
507	int rc, mem_priv;
508	struct ib_device_attr devattr;
509	struct rpcrdma_ia *ia = &xprt->rx_ia;
510
511	ia->ri_id = rpcrdma_create_id(xprt, ia, addr);
512	if (IS_ERR(ia->ri_id)) {
513		rc = PTR_ERR(ia->ri_id);
514		goto out1;
515	}
516
517	ia->ri_pd = ib_alloc_pd(ia->ri_id->device);
518	if (IS_ERR(ia->ri_pd)) {
519		rc = PTR_ERR(ia->ri_pd);
520		dprintk("RPC:       %s: ib_alloc_pd() failed %i\n",
521			__func__, rc);
522		goto out2;
523	}
524
525	/*
526	 * Query the device to determine if the requested memory
527	 * registration strategy is supported. If it isn't, set the
528	 * strategy to a globally supported model.
529	 */
530	rc = ib_query_device(ia->ri_id->device, &devattr);
531	if (rc) {
532		dprintk("RPC:       %s: ib_query_device failed %d\n",
533			__func__, rc);
534		goto out2;
535	}
536
537	if (devattr.device_cap_flags & IB_DEVICE_LOCAL_DMA_LKEY) {
538		ia->ri_have_dma_lkey = 1;
539		ia->ri_dma_lkey = ia->ri_id->device->local_dma_lkey;
540	}
541
542	if (memreg == RPCRDMA_FRMR) {
543		/* Requires both frmr reg and local dma lkey */
544		if ((devattr.device_cap_flags &
545		     (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) !=
546		    (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) {
547			dprintk("RPC:       %s: FRMR registration "
548				"not supported by HCA\n", __func__);
549			memreg = RPCRDMA_MTHCAFMR;
550		} else {
551			/* Mind the ia limit on FRMR page list depth */
552			ia->ri_max_frmr_depth = min_t(unsigned int,
553				RPCRDMA_MAX_DATA_SEGS,
554				devattr.max_fast_reg_page_list_len);
555		}
556	}
557	if (memreg == RPCRDMA_MTHCAFMR) {
558		if (!ia->ri_id->device->alloc_fmr) {
559			dprintk("RPC:       %s: MTHCAFMR registration "
560				"not supported by HCA\n", __func__);
561#if RPCRDMA_PERSISTENT_REGISTRATION
562			memreg = RPCRDMA_ALLPHYSICAL;
563#else
564			rc = -ENOMEM;
565			goto out2;
566#endif
567		}
568	}
569
570	/*
571	 * Optionally obtain an underlying physical identity mapping in
572	 * order to do a memory window-based bind. This base registration
573	 * is protected from remote access - that is enabled only by binding
574	 * for the specific bytes targeted during each RPC operation, and
575	 * revoked after the corresponding completion similar to a storage
576	 * adapter.
577	 */
578	switch (memreg) {
579	case RPCRDMA_FRMR:
580		break;
581#if RPCRDMA_PERSISTENT_REGISTRATION
582	case RPCRDMA_ALLPHYSICAL:
583		mem_priv = IB_ACCESS_LOCAL_WRITE |
584				IB_ACCESS_REMOTE_WRITE |
585				IB_ACCESS_REMOTE_READ;
586		goto register_setup;
587#endif
588	case RPCRDMA_MTHCAFMR:
589		if (ia->ri_have_dma_lkey)
590			break;
591		mem_priv = IB_ACCESS_LOCAL_WRITE;
592#if RPCRDMA_PERSISTENT_REGISTRATION
593	register_setup:
594#endif
595		ia->ri_bind_mem = ib_get_dma_mr(ia->ri_pd, mem_priv);
596		if (IS_ERR(ia->ri_bind_mem)) {
597			printk(KERN_ALERT "%s: ib_get_dma_mr for "
598				"phys register failed with %lX\n",
599				__func__, PTR_ERR(ia->ri_bind_mem));
600			rc = -ENOMEM;
601			goto out2;
602		}
603		break;
604	default:
605		printk(KERN_ERR "RPC: Unsupported memory "
606				"registration mode: %d\n", memreg);
607		rc = -ENOMEM;
608		goto out2;
609	}
610	dprintk("RPC:       %s: memory registration strategy is %d\n",
611		__func__, memreg);
612
613	/* Else will do memory reg/dereg for each chunk */
614	ia->ri_memreg_strategy = memreg;
615
616	rwlock_init(&ia->ri_qplock);
617	return 0;
618out2:
619	rdma_destroy_id(ia->ri_id);
620	ia->ri_id = NULL;
621out1:
622	return rc;
623}
624
625/*
626 * Clean up/close an IA.
627 *   o if event handles and PD have been initialized, free them.
628 *   o close the IA
629 */
630void
631rpcrdma_ia_close(struct rpcrdma_ia *ia)
632{
633	int rc;
634
635	dprintk("RPC:       %s: entering\n", __func__);
636	if (ia->ri_bind_mem != NULL) {
637		rc = ib_dereg_mr(ia->ri_bind_mem);
638		dprintk("RPC:       %s: ib_dereg_mr returned %i\n",
639			__func__, rc);
640	}
641	if (ia->ri_id != NULL && !IS_ERR(ia->ri_id)) {
642		if (ia->ri_id->qp)
643			rdma_destroy_qp(ia->ri_id);
644		rdma_destroy_id(ia->ri_id);
645		ia->ri_id = NULL;
646	}
647	if (ia->ri_pd != NULL && !IS_ERR(ia->ri_pd)) {
648		rc = ib_dealloc_pd(ia->ri_pd);
649		dprintk("RPC:       %s: ib_dealloc_pd returned %i\n",
650			__func__, rc);
651	}
652}
653
654/*
655 * Create unconnected endpoint.
656 */
657int
658rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
659				struct rpcrdma_create_data_internal *cdata)
660{
661	struct ib_device_attr devattr;
662	struct ib_cq *sendcq, *recvcq;
663	int rc, err;
664
665	rc = ib_query_device(ia->ri_id->device, &devattr);
666	if (rc) {
667		dprintk("RPC:       %s: ib_query_device failed %d\n",
668			__func__, rc);
669		return rc;
670	}
671
672	/* check provider's send/recv wr limits */
673	if (cdata->max_requests > devattr.max_qp_wr)
674		cdata->max_requests = devattr.max_qp_wr;
675
676	ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall;
677	ep->rep_attr.qp_context = ep;
678	/* send_cq and recv_cq initialized below */
679	ep->rep_attr.srq = NULL;
680	ep->rep_attr.cap.max_send_wr = cdata->max_requests;
681	switch (ia->ri_memreg_strategy) {
682	case RPCRDMA_FRMR: {
683		int depth = 7;
684
685		/* Add room for frmr register and invalidate WRs.
686		 * 1. FRMR reg WR for head
687		 * 2. FRMR invalidate WR for head
688		 * 3. N FRMR reg WRs for pagelist
689		 * 4. N FRMR invalidate WRs for pagelist
690		 * 5. FRMR reg WR for tail
691		 * 6. FRMR invalidate WR for tail
692		 * 7. The RDMA_SEND WR
693		 */
694
695		/* Calculate N if the device max FRMR depth is smaller than
696		 * RPCRDMA_MAX_DATA_SEGS.
697		 */
698		if (ia->ri_max_frmr_depth < RPCRDMA_MAX_DATA_SEGS) {
699			int delta = RPCRDMA_MAX_DATA_SEGS -
700				    ia->ri_max_frmr_depth;
701
702			do {
703				depth += 2; /* FRMR reg + invalidate */
704				delta -= ia->ri_max_frmr_depth;
705			} while (delta > 0);
706
707		}
708		ep->rep_attr.cap.max_send_wr *= depth;
709		if (ep->rep_attr.cap.max_send_wr > devattr.max_qp_wr) {
710			cdata->max_requests = devattr.max_qp_wr / depth;
711			if (!cdata->max_requests)
712				return -EINVAL;
713			ep->rep_attr.cap.max_send_wr = cdata->max_requests *
714						       depth;
715		}
716		break;
717	}
718	default:
719		break;
720	}
721	ep->rep_attr.cap.max_recv_wr = cdata->max_requests;
722	ep->rep_attr.cap.max_send_sge = (cdata->padding ? 4 : 2);
723	ep->rep_attr.cap.max_recv_sge = 1;
724	ep->rep_attr.cap.max_inline_data = 0;
725	ep->rep_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
726	ep->rep_attr.qp_type = IB_QPT_RC;
727	ep->rep_attr.port_num = ~0;
728
729	dprintk("RPC:       %s: requested max: dtos: send %d recv %d; "
730		"iovs: send %d recv %d\n",
731		__func__,
732		ep->rep_attr.cap.max_send_wr,
733		ep->rep_attr.cap.max_recv_wr,
734		ep->rep_attr.cap.max_send_sge,
735		ep->rep_attr.cap.max_recv_sge);
736
737	/* set trigger for requesting send completion */
738	ep->rep_cqinit = ep->rep_attr.cap.max_send_wr/2 - 1;
739	if (ep->rep_cqinit <= 2)
740		ep->rep_cqinit = 0;
741	INIT_CQCOUNT(ep);
742	ep->rep_ia = ia;
743	init_waitqueue_head(&ep->rep_connect_wait);
744	INIT_DELAYED_WORK(&ep->rep_connect_worker, rpcrdma_connect_worker);
745
746	sendcq = ib_create_cq(ia->ri_id->device, rpcrdma_sendcq_upcall,
747				  rpcrdma_cq_async_error_upcall, ep,
748				  ep->rep_attr.cap.max_send_wr + 1, 0);
749	if (IS_ERR(sendcq)) {
750		rc = PTR_ERR(sendcq);
751		dprintk("RPC:       %s: failed to create send CQ: %i\n",
752			__func__, rc);
753		goto out1;
754	}
755
756	rc = ib_req_notify_cq(sendcq, IB_CQ_NEXT_COMP);
757	if (rc) {
758		dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
759			__func__, rc);
760		goto out2;
761	}
762
763	recvcq = ib_create_cq(ia->ri_id->device, rpcrdma_recvcq_upcall,
764				  rpcrdma_cq_async_error_upcall, ep,
765				  ep->rep_attr.cap.max_recv_wr + 1, 0);
766	if (IS_ERR(recvcq)) {
767		rc = PTR_ERR(recvcq);
768		dprintk("RPC:       %s: failed to create recv CQ: %i\n",
769			__func__, rc);
770		goto out2;
771	}
772
773	rc = ib_req_notify_cq(recvcq, IB_CQ_NEXT_COMP);
774	if (rc) {
775		dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
776			__func__, rc);
777		ib_destroy_cq(recvcq);
778		goto out2;
779	}
780
781	ep->rep_attr.send_cq = sendcq;
782	ep->rep_attr.recv_cq = recvcq;
783
784	/* Initialize cma parameters */
785
786	/* RPC/RDMA does not use private data */
787	ep->rep_remote_cma.private_data = NULL;
788	ep->rep_remote_cma.private_data_len = 0;
789
790	/* Client offers RDMA Read but does not initiate */
791	ep->rep_remote_cma.initiator_depth = 0;
792	if (devattr.max_qp_rd_atom > 32)	/* arbitrary but <= 255 */
793		ep->rep_remote_cma.responder_resources = 32;
794	else
795		ep->rep_remote_cma.responder_resources = devattr.max_qp_rd_atom;
796
797	ep->rep_remote_cma.retry_count = 7;
798	ep->rep_remote_cma.flow_control = 0;
799	ep->rep_remote_cma.rnr_retry_count = 0;
800
801	return 0;
802
803out2:
804	err = ib_destroy_cq(sendcq);
805	if (err)
806		dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
807			__func__, err);
808out1:
809	return rc;
810}
811
812/*
813 * rpcrdma_ep_destroy
814 *
815 * Disconnect and destroy endpoint. After this, the only
816 * valid operations on the ep are to free it (if dynamically
817 * allocated) or re-create it.
818 */
819void
820rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
821{
822	int rc;
823
824	dprintk("RPC:       %s: entering, connected is %d\n",
825		__func__, ep->rep_connected);
826
827	cancel_delayed_work_sync(&ep->rep_connect_worker);
828
829	if (ia->ri_id->qp) {
830		rc = rpcrdma_ep_disconnect(ep, ia);
831		if (rc)
832			dprintk("RPC:       %s: rpcrdma_ep_disconnect"
833				" returned %i\n", __func__, rc);
834		rdma_destroy_qp(ia->ri_id);
835		ia->ri_id->qp = NULL;
836	}
837
838	/* padding - could be done in rpcrdma_buffer_destroy... */
839	if (ep->rep_pad_mr) {
840		rpcrdma_deregister_internal(ia, ep->rep_pad_mr, &ep->rep_pad);
841		ep->rep_pad_mr = NULL;
842	}
843
844	rpcrdma_clean_cq(ep->rep_attr.recv_cq);
845	rc = ib_destroy_cq(ep->rep_attr.recv_cq);
846	if (rc)
847		dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
848			__func__, rc);
849
850	rpcrdma_clean_cq(ep->rep_attr.send_cq);
851	rc = ib_destroy_cq(ep->rep_attr.send_cq);
852	if (rc)
853		dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
854			__func__, rc);
855}
856
857/*
858 * Connect unconnected endpoint.
859 */
860int
861rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
862{
863	struct rdma_cm_id *id, *old;
864	int rc = 0;
865	int retry_count = 0;
866
867	if (ep->rep_connected != 0) {
868		struct rpcrdma_xprt *xprt;
869retry:
870		dprintk("RPC:       %s: reconnecting...\n", __func__);
871		rc = rpcrdma_ep_disconnect(ep, ia);
872		if (rc && rc != -ENOTCONN)
873			dprintk("RPC:       %s: rpcrdma_ep_disconnect"
874				" status %i\n", __func__, rc);
875
876		rpcrdma_clean_cq(ep->rep_attr.recv_cq);
877		rpcrdma_clean_cq(ep->rep_attr.send_cq);
878
879		xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
880		id = rpcrdma_create_id(xprt, ia,
881				(struct sockaddr *)&xprt->rx_data.addr);
882		if (IS_ERR(id)) {
883			rc = -EHOSTUNREACH;
884			goto out;
885		}
886		/* TEMP TEMP TEMP - fail if new device:
887		 * Deregister/remarshal *all* requests!
888		 * Close and recreate adapter, pd, etc!
889		 * Re-determine all attributes still sane!
890		 * More stuff I haven't thought of!
891		 * Rrrgh!
892		 */
893		if (ia->ri_id->device != id->device) {
894			printk("RPC:       %s: can't reconnect on "
895				"different device!\n", __func__);
896			rdma_destroy_id(id);
897			rc = -ENETUNREACH;
898			goto out;
899		}
900		/* END TEMP */
901		rc = rdma_create_qp(id, ia->ri_pd, &ep->rep_attr);
902		if (rc) {
903			dprintk("RPC:       %s: rdma_create_qp failed %i\n",
904				__func__, rc);
905			rdma_destroy_id(id);
906			rc = -ENETUNREACH;
907			goto out;
908		}
909
910		write_lock(&ia->ri_qplock);
911		old = ia->ri_id;
912		ia->ri_id = id;
913		write_unlock(&ia->ri_qplock);
914
915		rdma_destroy_qp(old);
916		rdma_destroy_id(old);
917	} else {
918		dprintk("RPC:       %s: connecting...\n", __func__);
919		rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr);
920		if (rc) {
921			dprintk("RPC:       %s: rdma_create_qp failed %i\n",
922				__func__, rc);
923			/* do not update ep->rep_connected */
924			return -ENETUNREACH;
925		}
926	}
927
928	ep->rep_connected = 0;
929
930	rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma);
931	if (rc) {
932		dprintk("RPC:       %s: rdma_connect() failed with %i\n",
933				__func__, rc);
934		goto out;
935	}
936
937	wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 0);
938
939	/*
940	 * Check state. A non-peer reject indicates no listener
941	 * (ECONNREFUSED), which may be a transient state. All
942	 * others indicate a transport condition which has already
943	 * undergone a best-effort.
944	 */
945	if (ep->rep_connected == -ECONNREFUSED &&
946	    ++retry_count <= RDMA_CONNECT_RETRY_MAX) {
947		dprintk("RPC:       %s: non-peer_reject, retry\n", __func__);
948		goto retry;
949	}
950	if (ep->rep_connected <= 0) {
951		/* Sometimes, the only way to reliably connect to remote
952		 * CMs is to use same nonzero values for ORD and IRD. */
953		if (retry_count++ <= RDMA_CONNECT_RETRY_MAX + 1 &&
954		    (ep->rep_remote_cma.responder_resources == 0 ||
955		     ep->rep_remote_cma.initiator_depth !=
956				ep->rep_remote_cma.responder_resources)) {
957			if (ep->rep_remote_cma.responder_resources == 0)
958				ep->rep_remote_cma.responder_resources = 1;
959			ep->rep_remote_cma.initiator_depth =
960				ep->rep_remote_cma.responder_resources;
961			goto retry;
962		}
963		rc = ep->rep_connected;
964	} else {
965		dprintk("RPC:       %s: connected\n", __func__);
966	}
967
968out:
969	if (rc)
970		ep->rep_connected = rc;
971	return rc;
972}
973
974/*
975 * rpcrdma_ep_disconnect
976 *
977 * This is separate from destroy to facilitate the ability
978 * to reconnect without recreating the endpoint.
979 *
980 * This call is not reentrant, and must not be made in parallel
981 * on the same endpoint.
982 */
983int
984rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
985{
986	int rc;
987
988	rpcrdma_clean_cq(ep->rep_attr.recv_cq);
989	rpcrdma_clean_cq(ep->rep_attr.send_cq);
990	rc = rdma_disconnect(ia->ri_id);
991	if (!rc) {
992		/* returns without wait if not connected */
993		wait_event_interruptible(ep->rep_connect_wait,
994							ep->rep_connected != 1);
995		dprintk("RPC:       %s: after wait, %sconnected\n", __func__,
996			(ep->rep_connected == 1) ? "still " : "dis");
997	} else {
998		dprintk("RPC:       %s: rdma_disconnect %i\n", __func__, rc);
999		ep->rep_connected = rc;
1000	}
1001	return rc;
1002}
1003
1004/*
1005 * Initialize buffer memory
1006 */
1007int
1008rpcrdma_buffer_create(struct rpcrdma_buffer *buf, struct rpcrdma_ep *ep,
1009	struct rpcrdma_ia *ia, struct rpcrdma_create_data_internal *cdata)
1010{
1011	char *p;
1012	size_t len, rlen, wlen;
1013	int i, rc;
1014	struct rpcrdma_mw *r;
1015
1016	buf->rb_max_requests = cdata->max_requests;
1017	spin_lock_init(&buf->rb_lock);
1018	atomic_set(&buf->rb_credits, 1);
1019
1020	/* Need to allocate:
1021	 *   1.  arrays for send and recv pointers
1022	 *   2.  arrays of struct rpcrdma_req to fill in pointers
1023	 *   3.  array of struct rpcrdma_rep for replies
1024	 *   4.  padding, if any
1025	 *   5.  mw's, fmr's or frmr's, if any
1026	 * Send/recv buffers in req/rep need to be registered
1027	 */
1028
1029	len = buf->rb_max_requests *
1030		(sizeof(struct rpcrdma_req *) + sizeof(struct rpcrdma_rep *));
1031	len += cdata->padding;
1032	switch (ia->ri_memreg_strategy) {
1033	case RPCRDMA_FRMR:
1034		len += buf->rb_max_requests * RPCRDMA_MAX_SEGS *
1035				sizeof(struct rpcrdma_mw);
1036		break;
1037	case RPCRDMA_MTHCAFMR:
1038		/* TBD we are perhaps overallocating here */
1039		len += (buf->rb_max_requests + 1) * RPCRDMA_MAX_SEGS *
1040				sizeof(struct rpcrdma_mw);
1041		break;
1042	default:
1043		break;
1044	}
1045
1046	/* allocate 1, 4 and 5 in one shot */
1047	p = kzalloc(len, GFP_KERNEL);
1048	if (p == NULL) {
1049		dprintk("RPC:       %s: req_t/rep_t/pad kzalloc(%zd) failed\n",
1050			__func__, len);
1051		rc = -ENOMEM;
1052		goto out;
1053	}
1054	buf->rb_pool = p;	/* for freeing it later */
1055
1056	buf->rb_send_bufs = (struct rpcrdma_req **) p;
1057	p = (char *) &buf->rb_send_bufs[buf->rb_max_requests];
1058	buf->rb_recv_bufs = (struct rpcrdma_rep **) p;
1059	p = (char *) &buf->rb_recv_bufs[buf->rb_max_requests];
1060
1061	/*
1062	 * Register the zeroed pad buffer, if any.
1063	 */
1064	if (cdata->padding) {
1065		rc = rpcrdma_register_internal(ia, p, cdata->padding,
1066					    &ep->rep_pad_mr, &ep->rep_pad);
1067		if (rc)
1068			goto out;
1069	}
1070	p += cdata->padding;
1071
1072	INIT_LIST_HEAD(&buf->rb_mws);
1073	r = (struct rpcrdma_mw *)p;
1074	switch (ia->ri_memreg_strategy) {
1075	case RPCRDMA_FRMR:
1076		for (i = buf->rb_max_requests * RPCRDMA_MAX_SEGS; i; i--) {
1077			r->r.frmr.fr_mr = ib_alloc_fast_reg_mr(ia->ri_pd,
1078						ia->ri_max_frmr_depth);
1079			if (IS_ERR(r->r.frmr.fr_mr)) {
1080				rc = PTR_ERR(r->r.frmr.fr_mr);
1081				dprintk("RPC:       %s: ib_alloc_fast_reg_mr"
1082					" failed %i\n", __func__, rc);
1083				goto out;
1084			}
1085			r->r.frmr.fr_pgl = ib_alloc_fast_reg_page_list(
1086						ia->ri_id->device,
1087						ia->ri_max_frmr_depth);
1088			if (IS_ERR(r->r.frmr.fr_pgl)) {
1089				rc = PTR_ERR(r->r.frmr.fr_pgl);
1090				dprintk("RPC:       %s: "
1091					"ib_alloc_fast_reg_page_list "
1092					"failed %i\n", __func__, rc);
1093
1094				ib_dereg_mr(r->r.frmr.fr_mr);
1095				goto out;
1096			}
1097			list_add(&r->mw_list, &buf->rb_mws);
1098			++r;
1099		}
1100		break;
1101	case RPCRDMA_MTHCAFMR:
1102		/* TBD we are perhaps overallocating here */
1103		for (i = (buf->rb_max_requests+1) * RPCRDMA_MAX_SEGS; i; i--) {
1104			static struct ib_fmr_attr fa =
1105				{ RPCRDMA_MAX_DATA_SEGS, 1, PAGE_SHIFT };
1106			r->r.fmr = ib_alloc_fmr(ia->ri_pd,
1107				IB_ACCESS_REMOTE_WRITE | IB_ACCESS_REMOTE_READ,
1108				&fa);
1109			if (IS_ERR(r->r.fmr)) {
1110				rc = PTR_ERR(r->r.fmr);
1111				dprintk("RPC:       %s: ib_alloc_fmr"
1112					" failed %i\n", __func__, rc);
1113				goto out;
1114			}
1115			list_add(&r->mw_list, &buf->rb_mws);
1116			++r;
1117		}
1118		break;
1119	default:
1120		break;
1121	}
1122
1123	/*
1124	 * Allocate/init the request/reply buffers. Doing this
1125	 * using kmalloc for now -- one for each buf.
1126	 */
1127	wlen = 1 << fls(cdata->inline_wsize + sizeof(struct rpcrdma_req));
1128	rlen = 1 << fls(cdata->inline_rsize + sizeof(struct rpcrdma_rep));
1129	dprintk("RPC:       %s: wlen = %zu, rlen = %zu\n",
1130		__func__, wlen, rlen);
1131
1132	for (i = 0; i < buf->rb_max_requests; i++) {
1133		struct rpcrdma_req *req;
1134		struct rpcrdma_rep *rep;
1135
1136		req = kmalloc(wlen, GFP_KERNEL);
1137		if (req == NULL) {
1138			dprintk("RPC:       %s: request buffer %d alloc"
1139				" failed\n", __func__, i);
1140			rc = -ENOMEM;
1141			goto out;
1142		}
1143		memset(req, 0, sizeof(struct rpcrdma_req));
1144		buf->rb_send_bufs[i] = req;
1145		buf->rb_send_bufs[i]->rl_buffer = buf;
1146
1147		rc = rpcrdma_register_internal(ia, req->rl_base,
1148				wlen - offsetof(struct rpcrdma_req, rl_base),
1149				&buf->rb_send_bufs[i]->rl_handle,
1150				&buf->rb_send_bufs[i]->rl_iov);
1151		if (rc)
1152			goto out;
1153
1154		buf->rb_send_bufs[i]->rl_size = wlen -
1155						sizeof(struct rpcrdma_req);
1156
1157		rep = kmalloc(rlen, GFP_KERNEL);
1158		if (rep == NULL) {
1159			dprintk("RPC:       %s: reply buffer %d alloc failed\n",
1160				__func__, i);
1161			rc = -ENOMEM;
1162			goto out;
1163		}
1164		memset(rep, 0, sizeof(struct rpcrdma_rep));
1165		buf->rb_recv_bufs[i] = rep;
1166		buf->rb_recv_bufs[i]->rr_buffer = buf;
1167
1168		rc = rpcrdma_register_internal(ia, rep->rr_base,
1169				rlen - offsetof(struct rpcrdma_rep, rr_base),
1170				&buf->rb_recv_bufs[i]->rr_handle,
1171				&buf->rb_recv_bufs[i]->rr_iov);
1172		if (rc)
1173			goto out;
1174
1175	}
1176	dprintk("RPC:       %s: max_requests %d\n",
1177		__func__, buf->rb_max_requests);
1178	/* done */
1179	return 0;
1180out:
1181	rpcrdma_buffer_destroy(buf);
1182	return rc;
1183}
1184
1185/*
1186 * Unregister and destroy buffer memory. Need to deal with
1187 * partial initialization, so it's callable from failed create.
1188 * Must be called before destroying endpoint, as registrations
1189 * reference it.
1190 */
1191void
1192rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
1193{
1194	int rc, i;
1195	struct rpcrdma_ia *ia = rdmab_to_ia(buf);
1196	struct rpcrdma_mw *r;
1197
1198	/* clean up in reverse order from create
1199	 *   1.  recv mr memory (mr free, then kfree)
1200	 *   2.  send mr memory (mr free, then kfree)
1201	 *   3.  padding (if any) [moved to rpcrdma_ep_destroy]
1202	 *   4.  arrays
1203	 */
1204	dprintk("RPC:       %s: entering\n", __func__);
1205
1206	for (i = 0; i < buf->rb_max_requests; i++) {
1207		if (buf->rb_recv_bufs && buf->rb_recv_bufs[i]) {
1208			rpcrdma_deregister_internal(ia,
1209					buf->rb_recv_bufs[i]->rr_handle,
1210					&buf->rb_recv_bufs[i]->rr_iov);
1211			kfree(buf->rb_recv_bufs[i]);
1212		}
1213		if (buf->rb_send_bufs && buf->rb_send_bufs[i]) {
1214			rpcrdma_deregister_internal(ia,
1215					buf->rb_send_bufs[i]->rl_handle,
1216					&buf->rb_send_bufs[i]->rl_iov);
1217			kfree(buf->rb_send_bufs[i]);
1218		}
1219	}
1220
1221	while (!list_empty(&buf->rb_mws)) {
1222		r = list_entry(buf->rb_mws.next,
1223			struct rpcrdma_mw, mw_list);
1224		list_del(&r->mw_list);
1225		switch (ia->ri_memreg_strategy) {
1226		case RPCRDMA_FRMR:
1227			rc = ib_dereg_mr(r->r.frmr.fr_mr);
1228			if (rc)
1229				dprintk("RPC:       %s:"
1230					" ib_dereg_mr"
1231					" failed %i\n",
1232					__func__, rc);
1233			ib_free_fast_reg_page_list(r->r.frmr.fr_pgl);
1234			break;
1235		case RPCRDMA_MTHCAFMR:
1236			rc = ib_dealloc_fmr(r->r.fmr);
1237			if (rc)
1238				dprintk("RPC:       %s:"
1239					" ib_dealloc_fmr"
1240					" failed %i\n",
1241					__func__, rc);
1242			break;
1243		default:
1244			break;
1245		}
1246	}
1247
1248	kfree(buf->rb_pool);
1249}
1250
1251/*
1252 * Get a set of request/reply buffers.
1253 *
1254 * Reply buffer (if needed) is attached to send buffer upon return.
1255 * Rule:
1256 *    rb_send_index and rb_recv_index MUST always be pointing to the
1257 *    *next* available buffer (non-NULL). They are incremented after
1258 *    removing buffers, and decremented *before* returning them.
1259 */
1260struct rpcrdma_req *
1261rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
1262{
1263	struct rpcrdma_req *req;
1264	unsigned long flags;
1265	int i;
1266	struct rpcrdma_mw *r;
1267
1268	spin_lock_irqsave(&buffers->rb_lock, flags);
1269	if (buffers->rb_send_index == buffers->rb_max_requests) {
1270		spin_unlock_irqrestore(&buffers->rb_lock, flags);
1271		dprintk("RPC:       %s: out of request buffers\n", __func__);
1272		return ((struct rpcrdma_req *)NULL);
1273	}
1274
1275	req = buffers->rb_send_bufs[buffers->rb_send_index];
1276	if (buffers->rb_send_index < buffers->rb_recv_index) {
1277		dprintk("RPC:       %s: %d extra receives outstanding (ok)\n",
1278			__func__,
1279			buffers->rb_recv_index - buffers->rb_send_index);
1280		req->rl_reply = NULL;
1281	} else {
1282		req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
1283		buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
1284	}
1285	buffers->rb_send_bufs[buffers->rb_send_index++] = NULL;
1286	if (!list_empty(&buffers->rb_mws)) {
1287		i = RPCRDMA_MAX_SEGS - 1;
1288		do {
1289			r = list_entry(buffers->rb_mws.next,
1290					struct rpcrdma_mw, mw_list);
1291			list_del(&r->mw_list);
1292			req->rl_segments[i].mr_chunk.rl_mw = r;
1293		} while (--i >= 0);
1294	}
1295	spin_unlock_irqrestore(&buffers->rb_lock, flags);
1296	return req;
1297}
1298
1299/*
1300 * Put request/reply buffers back into pool.
1301 * Pre-decrement counter/array index.
1302 */
1303void
1304rpcrdma_buffer_put(struct rpcrdma_req *req)
1305{
1306	struct rpcrdma_buffer *buffers = req->rl_buffer;
1307	struct rpcrdma_ia *ia = rdmab_to_ia(buffers);
1308	int i;
1309	unsigned long flags;
1310
1311	spin_lock_irqsave(&buffers->rb_lock, flags);
1312	buffers->rb_send_bufs[--buffers->rb_send_index] = req;
1313	req->rl_niovs = 0;
1314	if (req->rl_reply) {
1315		buffers->rb_recv_bufs[--buffers->rb_recv_index] = req->rl_reply;
1316		req->rl_reply->rr_func = NULL;
1317		req->rl_reply = NULL;
1318	}
1319	switch (ia->ri_memreg_strategy) {
1320	case RPCRDMA_FRMR:
1321	case RPCRDMA_MTHCAFMR:
1322		/*
1323		 * Cycle mw's back in reverse order, and "spin" them.
1324		 * This delays and scrambles reuse as much as possible.
1325		 */
1326		i = 1;
1327		do {
1328			struct rpcrdma_mw **mw;
1329			mw = &req->rl_segments[i].mr_chunk.rl_mw;
1330			list_add_tail(&(*mw)->mw_list, &buffers->rb_mws);
1331			*mw = NULL;
1332		} while (++i < RPCRDMA_MAX_SEGS);
1333		list_add_tail(&req->rl_segments[0].mr_chunk.rl_mw->mw_list,
1334					&buffers->rb_mws);
1335		req->rl_segments[0].mr_chunk.rl_mw = NULL;
1336		break;
1337	default:
1338		break;
1339	}
1340	spin_unlock_irqrestore(&buffers->rb_lock, flags);
1341}
1342
1343/*
1344 * Recover reply buffers from pool.
1345 * This happens when recovering from error conditions.
1346 * Post-increment counter/array index.
1347 */
1348void
1349rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
1350{
1351	struct rpcrdma_buffer *buffers = req->rl_buffer;
1352	unsigned long flags;
1353
1354	if (req->rl_iov.length == 0)	/* special case xprt_rdma_allocate() */
1355		buffers = ((struct rpcrdma_req *) buffers)->rl_buffer;
1356	spin_lock_irqsave(&buffers->rb_lock, flags);
1357	if (buffers->rb_recv_index < buffers->rb_max_requests) {
1358		req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
1359		buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
1360	}
1361	spin_unlock_irqrestore(&buffers->rb_lock, flags);
1362}
1363
1364/*
1365 * Put reply buffers back into pool when not attached to
1366 * request. This happens in error conditions.
1367 */
1368void
1369rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
1370{
1371	struct rpcrdma_buffer *buffers = rep->rr_buffer;
1372	unsigned long flags;
1373
1374	rep->rr_func = NULL;
1375	spin_lock_irqsave(&buffers->rb_lock, flags);
1376	buffers->rb_recv_bufs[--buffers->rb_recv_index] = rep;
1377	spin_unlock_irqrestore(&buffers->rb_lock, flags);
1378}
1379
1380/*
1381 * Wrappers for internal-use kmalloc memory registration, used by buffer code.
1382 */
1383
1384int
1385rpcrdma_register_internal(struct rpcrdma_ia *ia, void *va, int len,
1386				struct ib_mr **mrp, struct ib_sge *iov)
1387{
1388	struct ib_phys_buf ipb;
1389	struct ib_mr *mr;
1390	int rc;
1391
1392	/*
1393	 * All memory passed here was kmalloc'ed, therefore phys-contiguous.
1394	 */
1395	iov->addr = ib_dma_map_single(ia->ri_id->device,
1396			va, len, DMA_BIDIRECTIONAL);
1397	if (ib_dma_mapping_error(ia->ri_id->device, iov->addr))
1398		return -ENOMEM;
1399
1400	iov->length = len;
1401
1402	if (ia->ri_have_dma_lkey) {
1403		*mrp = NULL;
1404		iov->lkey = ia->ri_dma_lkey;
1405		return 0;
1406	} else if (ia->ri_bind_mem != NULL) {
1407		*mrp = NULL;
1408		iov->lkey = ia->ri_bind_mem->lkey;
1409		return 0;
1410	}
1411
1412	ipb.addr = iov->addr;
1413	ipb.size = iov->length;
1414	mr = ib_reg_phys_mr(ia->ri_pd, &ipb, 1,
1415			IB_ACCESS_LOCAL_WRITE, &iov->addr);
1416
1417	dprintk("RPC:       %s: phys convert: 0x%llx "
1418			"registered 0x%llx length %d\n",
1419			__func__, (unsigned long long)ipb.addr,
1420			(unsigned long long)iov->addr, len);
1421
1422	if (IS_ERR(mr)) {
1423		*mrp = NULL;
1424		rc = PTR_ERR(mr);
1425		dprintk("RPC:       %s: failed with %i\n", __func__, rc);
1426	} else {
1427		*mrp = mr;
1428		iov->lkey = mr->lkey;
1429		rc = 0;
1430	}
1431
1432	return rc;
1433}
1434
1435int
1436rpcrdma_deregister_internal(struct rpcrdma_ia *ia,
1437				struct ib_mr *mr, struct ib_sge *iov)
1438{
1439	int rc;
1440
1441	ib_dma_unmap_single(ia->ri_id->device,
1442			iov->addr, iov->length, DMA_BIDIRECTIONAL);
1443
1444	if (NULL == mr)
1445		return 0;
1446
1447	rc = ib_dereg_mr(mr);
1448	if (rc)
1449		dprintk("RPC:       %s: ib_dereg_mr failed %i\n", __func__, rc);
1450	return rc;
1451}
1452
1453/*
1454 * Wrappers for chunk registration, shared by read/write chunk code.
1455 */
1456
1457static void
1458rpcrdma_map_one(struct rpcrdma_ia *ia, struct rpcrdma_mr_seg *seg, int writing)
1459{
1460	seg->mr_dir = writing ? DMA_FROM_DEVICE : DMA_TO_DEVICE;
1461	seg->mr_dmalen = seg->mr_len;
1462	if (seg->mr_page)
1463		seg->mr_dma = ib_dma_map_page(ia->ri_id->device,
1464				seg->mr_page, offset_in_page(seg->mr_offset),
1465				seg->mr_dmalen, seg->mr_dir);
1466	else
1467		seg->mr_dma = ib_dma_map_single(ia->ri_id->device,
1468				seg->mr_offset,
1469				seg->mr_dmalen, seg->mr_dir);
1470	if (ib_dma_mapping_error(ia->ri_id->device, seg->mr_dma)) {
1471		dprintk("RPC:       %s: mr_dma %llx mr_offset %p mr_dma_len %zu\n",
1472			__func__,
1473			(unsigned long long)seg->mr_dma,
1474			seg->mr_offset, seg->mr_dmalen);
1475	}
1476}
1477
1478static void
1479rpcrdma_unmap_one(struct rpcrdma_ia *ia, struct rpcrdma_mr_seg *seg)
1480{
1481	if (seg->mr_page)
1482		ib_dma_unmap_page(ia->ri_id->device,
1483				seg->mr_dma, seg->mr_dmalen, seg->mr_dir);
1484	else
1485		ib_dma_unmap_single(ia->ri_id->device,
1486				seg->mr_dma, seg->mr_dmalen, seg->mr_dir);
1487}
1488
1489static int
1490rpcrdma_register_frmr_external(struct rpcrdma_mr_seg *seg,
1491			int *nsegs, int writing, struct rpcrdma_ia *ia,
1492			struct rpcrdma_xprt *r_xprt)
1493{
1494	struct rpcrdma_mr_seg *seg1 = seg;
1495	struct ib_send_wr invalidate_wr, frmr_wr, *bad_wr, *post_wr;
1496
1497	u8 key;
1498	int len, pageoff;
1499	int i, rc;
1500	int seg_len;
1501	u64 pa;
1502	int page_no;
1503
1504	pageoff = offset_in_page(seg1->mr_offset);
1505	seg1->mr_offset -= pageoff;	/* start of page */
1506	seg1->mr_len += pageoff;
1507	len = -pageoff;
1508	if (*nsegs > ia->ri_max_frmr_depth)
1509		*nsegs = ia->ri_max_frmr_depth;
1510	for (page_no = i = 0; i < *nsegs;) {
1511		rpcrdma_map_one(ia, seg, writing);
1512		pa = seg->mr_dma;
1513		for (seg_len = seg->mr_len; seg_len > 0; seg_len -= PAGE_SIZE) {
1514			seg1->mr_chunk.rl_mw->r.frmr.fr_pgl->
1515				page_list[page_no++] = pa;
1516			pa += PAGE_SIZE;
1517		}
1518		len += seg->mr_len;
1519		++seg;
1520		++i;
1521		/* Check for holes */
1522		if ((i < *nsegs && offset_in_page(seg->mr_offset)) ||
1523		    offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
1524			break;
1525	}
1526	dprintk("RPC:       %s: Using frmr %p to map %d segments\n",
1527		__func__, seg1->mr_chunk.rl_mw, i);
1528
1529	if (unlikely(seg1->mr_chunk.rl_mw->r.frmr.state == FRMR_IS_VALID)) {
1530		dprintk("RPC:       %s: frmr %x left valid, posting invalidate.\n",
1531			__func__,
1532			seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey);
1533		/* Invalidate before using. */
1534		memset(&invalidate_wr, 0, sizeof invalidate_wr);
1535		invalidate_wr.wr_id = (unsigned long)(void *)seg1->mr_chunk.rl_mw;
1536		invalidate_wr.next = &frmr_wr;
1537		invalidate_wr.opcode = IB_WR_LOCAL_INV;
1538		invalidate_wr.send_flags = IB_SEND_SIGNALED;
1539		invalidate_wr.ex.invalidate_rkey =
1540			seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
1541		DECR_CQCOUNT(&r_xprt->rx_ep);
1542		post_wr = &invalidate_wr;
1543	} else
1544		post_wr = &frmr_wr;
1545
1546	/* Prepare FRMR WR */
1547	memset(&frmr_wr, 0, sizeof frmr_wr);
1548	frmr_wr.wr_id = (unsigned long)(void *)seg1->mr_chunk.rl_mw;
1549	frmr_wr.opcode = IB_WR_FAST_REG_MR;
1550	frmr_wr.send_flags = IB_SEND_SIGNALED;
1551	frmr_wr.wr.fast_reg.iova_start = seg1->mr_dma;
1552	frmr_wr.wr.fast_reg.page_list = seg1->mr_chunk.rl_mw->r.frmr.fr_pgl;
1553	frmr_wr.wr.fast_reg.page_list_len = page_no;
1554	frmr_wr.wr.fast_reg.page_shift = PAGE_SHIFT;
1555	frmr_wr.wr.fast_reg.length = page_no << PAGE_SHIFT;
1556	if (frmr_wr.wr.fast_reg.length < len) {
1557		rc = -EIO;
1558		goto out_err;
1559	}
1560
1561	/* Bump the key */
1562	key = (u8)(seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey & 0x000000FF);
1563	ib_update_fast_reg_key(seg1->mr_chunk.rl_mw->r.frmr.fr_mr, ++key);
1564
1565	frmr_wr.wr.fast_reg.access_flags = (writing ?
1566				IB_ACCESS_REMOTE_WRITE | IB_ACCESS_LOCAL_WRITE :
1567				IB_ACCESS_REMOTE_READ);
1568	frmr_wr.wr.fast_reg.rkey = seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
1569	DECR_CQCOUNT(&r_xprt->rx_ep);
1570
1571	rc = ib_post_send(ia->ri_id->qp, post_wr, &bad_wr);
1572
1573	if (rc) {
1574		dprintk("RPC:       %s: failed ib_post_send for register,"
1575			" status %i\n", __func__, rc);
1576		goto out_err;
1577	} else {
1578		seg1->mr_rkey = seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
1579		seg1->mr_base = seg1->mr_dma + pageoff;
1580		seg1->mr_nsegs = i;
1581		seg1->mr_len = len;
1582	}
1583	*nsegs = i;
1584	return 0;
1585out_err:
1586	while (i--)
1587		rpcrdma_unmap_one(ia, --seg);
1588	return rc;
1589}
1590
1591static int
1592rpcrdma_deregister_frmr_external(struct rpcrdma_mr_seg *seg,
1593			struct rpcrdma_ia *ia, struct rpcrdma_xprt *r_xprt)
1594{
1595	struct rpcrdma_mr_seg *seg1 = seg;
1596	struct ib_send_wr invalidate_wr, *bad_wr;
1597	int rc;
1598
1599	memset(&invalidate_wr, 0, sizeof invalidate_wr);
1600	invalidate_wr.wr_id = (unsigned long)(void *)seg1->mr_chunk.rl_mw;
1601	invalidate_wr.opcode = IB_WR_LOCAL_INV;
1602	invalidate_wr.send_flags = IB_SEND_SIGNALED;
1603	invalidate_wr.ex.invalidate_rkey = seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
1604	DECR_CQCOUNT(&r_xprt->rx_ep);
1605
1606	read_lock(&ia->ri_qplock);
1607	while (seg1->mr_nsegs--)
1608		rpcrdma_unmap_one(ia, seg++);
1609	rc = ib_post_send(ia->ri_id->qp, &invalidate_wr, &bad_wr);
1610	read_unlock(&ia->ri_qplock);
1611	if (rc)
1612		dprintk("RPC:       %s: failed ib_post_send for invalidate,"
1613			" status %i\n", __func__, rc);
1614	return rc;
1615}
1616
1617static int
1618rpcrdma_register_fmr_external(struct rpcrdma_mr_seg *seg,
1619			int *nsegs, int writing, struct rpcrdma_ia *ia)
1620{
1621	struct rpcrdma_mr_seg *seg1 = seg;
1622	u64 physaddrs[RPCRDMA_MAX_DATA_SEGS];
1623	int len, pageoff, i, rc;
1624
1625	pageoff = offset_in_page(seg1->mr_offset);
1626	seg1->mr_offset -= pageoff;	/* start of page */
1627	seg1->mr_len += pageoff;
1628	len = -pageoff;
1629	if (*nsegs > RPCRDMA_MAX_DATA_SEGS)
1630		*nsegs = RPCRDMA_MAX_DATA_SEGS;
1631	for (i = 0; i < *nsegs;) {
1632		rpcrdma_map_one(ia, seg, writing);
1633		physaddrs[i] = seg->mr_dma;
1634		len += seg->mr_len;
1635		++seg;
1636		++i;
1637		/* Check for holes */
1638		if ((i < *nsegs && offset_in_page(seg->mr_offset)) ||
1639		    offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
1640			break;
1641	}
1642	rc = ib_map_phys_fmr(seg1->mr_chunk.rl_mw->r.fmr,
1643				physaddrs, i, seg1->mr_dma);
1644	if (rc) {
1645		dprintk("RPC:       %s: failed ib_map_phys_fmr "
1646			"%u@0x%llx+%i (%d)... status %i\n", __func__,
1647			len, (unsigned long long)seg1->mr_dma,
1648			pageoff, i, rc);
1649		while (i--)
1650			rpcrdma_unmap_one(ia, --seg);
1651	} else {
1652		seg1->mr_rkey = seg1->mr_chunk.rl_mw->r.fmr->rkey;
1653		seg1->mr_base = seg1->mr_dma + pageoff;
1654		seg1->mr_nsegs = i;
1655		seg1->mr_len = len;
1656	}
1657	*nsegs = i;
1658	return rc;
1659}
1660
1661static int
1662rpcrdma_deregister_fmr_external(struct rpcrdma_mr_seg *seg,
1663			struct rpcrdma_ia *ia)
1664{
1665	struct rpcrdma_mr_seg *seg1 = seg;
1666	LIST_HEAD(l);
1667	int rc;
1668
1669	list_add(&seg1->mr_chunk.rl_mw->r.fmr->list, &l);
1670	rc = ib_unmap_fmr(&l);
1671	read_lock(&ia->ri_qplock);
1672	while (seg1->mr_nsegs--)
1673		rpcrdma_unmap_one(ia, seg++);
1674	read_unlock(&ia->ri_qplock);
1675	if (rc)
1676		dprintk("RPC:       %s: failed ib_unmap_fmr,"
1677			" status %i\n", __func__, rc);
1678	return rc;
1679}
1680
1681int
1682rpcrdma_register_external(struct rpcrdma_mr_seg *seg,
1683			int nsegs, int writing, struct rpcrdma_xprt *r_xprt)
1684{
1685	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1686	int rc = 0;
1687
1688	switch (ia->ri_memreg_strategy) {
1689
1690#if RPCRDMA_PERSISTENT_REGISTRATION
1691	case RPCRDMA_ALLPHYSICAL:
1692		rpcrdma_map_one(ia, seg, writing);
1693		seg->mr_rkey = ia->ri_bind_mem->rkey;
1694		seg->mr_base = seg->mr_dma;
1695		seg->mr_nsegs = 1;
1696		nsegs = 1;
1697		break;
1698#endif
1699
1700	/* Registration using frmr registration */
1701	case RPCRDMA_FRMR:
1702		rc = rpcrdma_register_frmr_external(seg, &nsegs, writing, ia, r_xprt);
1703		break;
1704
1705	/* Registration using fmr memory registration */
1706	case RPCRDMA_MTHCAFMR:
1707		rc = rpcrdma_register_fmr_external(seg, &nsegs, writing, ia);
1708		break;
1709
1710	default:
1711		return -1;
1712	}
1713	if (rc)
1714		return -1;
1715
1716	return nsegs;
1717}
1718
1719int
1720rpcrdma_deregister_external(struct rpcrdma_mr_seg *seg,
1721		struct rpcrdma_xprt *r_xprt)
1722{
1723	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1724	int nsegs = seg->mr_nsegs, rc;
1725
1726	switch (ia->ri_memreg_strategy) {
1727
1728#if RPCRDMA_PERSISTENT_REGISTRATION
1729	case RPCRDMA_ALLPHYSICAL:
1730		read_lock(&ia->ri_qplock);
1731		rpcrdma_unmap_one(ia, seg);
1732		read_unlock(&ia->ri_qplock);
1733		break;
1734#endif
1735
1736	case RPCRDMA_FRMR:
1737		rc = rpcrdma_deregister_frmr_external(seg, ia, r_xprt);
1738		break;
1739
1740	case RPCRDMA_MTHCAFMR:
1741		rc = rpcrdma_deregister_fmr_external(seg, ia);
1742		break;
1743
1744	default:
1745		break;
1746	}
1747	return nsegs;
1748}
1749
1750/*
1751 * Prepost any receive buffer, then post send.
1752 *
1753 * Receive buffer is donated to hardware, reclaimed upon recv completion.
1754 */
1755int
1756rpcrdma_ep_post(struct rpcrdma_ia *ia,
1757		struct rpcrdma_ep *ep,
1758		struct rpcrdma_req *req)
1759{
1760	struct ib_send_wr send_wr, *send_wr_fail;
1761	struct rpcrdma_rep *rep = req->rl_reply;
1762	int rc;
1763
1764	if (rep) {
1765		rc = rpcrdma_ep_post_recv(ia, ep, rep);
1766		if (rc)
1767			goto out;
1768		req->rl_reply = NULL;
1769	}
1770
1771	send_wr.next = NULL;
1772	send_wr.wr_id = 0ULL;	/* no send cookie */
1773	send_wr.sg_list = req->rl_send_iov;
1774	send_wr.num_sge = req->rl_niovs;
1775	send_wr.opcode = IB_WR_SEND;
1776	if (send_wr.num_sge == 4)	/* no need to sync any pad (constant) */
1777		ib_dma_sync_single_for_device(ia->ri_id->device,
1778			req->rl_send_iov[3].addr, req->rl_send_iov[3].length,
1779			DMA_TO_DEVICE);
1780	ib_dma_sync_single_for_device(ia->ri_id->device,
1781		req->rl_send_iov[1].addr, req->rl_send_iov[1].length,
1782		DMA_TO_DEVICE);
1783	ib_dma_sync_single_for_device(ia->ri_id->device,
1784		req->rl_send_iov[0].addr, req->rl_send_iov[0].length,
1785		DMA_TO_DEVICE);
1786
1787	if (DECR_CQCOUNT(ep) > 0)
1788		send_wr.send_flags = 0;
1789	else { /* Provider must take a send completion every now and then */
1790		INIT_CQCOUNT(ep);
1791		send_wr.send_flags = IB_SEND_SIGNALED;
1792	}
1793
1794	rc = ib_post_send(ia->ri_id->qp, &send_wr, &send_wr_fail);
1795	if (rc)
1796		dprintk("RPC:       %s: ib_post_send returned %i\n", __func__,
1797			rc);
1798out:
1799	return rc;
1800}
1801
1802/*
1803 * (Re)post a receive buffer.
1804 */
1805int
1806rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
1807		     struct rpcrdma_ep *ep,
1808		     struct rpcrdma_rep *rep)
1809{
1810	struct ib_recv_wr recv_wr, *recv_wr_fail;
1811	int rc;
1812
1813	recv_wr.next = NULL;
1814	recv_wr.wr_id = (u64) (unsigned long) rep;
1815	recv_wr.sg_list = &rep->rr_iov;
1816	recv_wr.num_sge = 1;
1817
1818	ib_dma_sync_single_for_cpu(ia->ri_id->device,
1819		rep->rr_iov.addr, rep->rr_iov.length, DMA_BIDIRECTIONAL);
1820
1821	rc = ib_post_recv(ia->ri_id->qp, &recv_wr, &recv_wr_fail);
1822
1823	if (rc)
1824		dprintk("RPC:       %s: ib_post_recv returned %i\n", __func__,
1825			rc);
1826	return rc;
1827}
1828