1/*
2 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
3 *
4 * This software is available to you under a choice of one of two
5 * licenses.  You may choose to be licensed under the terms of the GNU
6 * General Public License (GPL) Version 2, available from the file
7 * COPYING in the main directory of this source tree, or the BSD-type
8 * license below:
9 *
10 * Redistribution and use in source and binary forms, with or without
11 * modification, are permitted provided that the following conditions
12 * are met:
13 *
14 *      Redistributions of source code must retain the above copyright
15 *      notice, this list of conditions and the following disclaimer.
16 *
17 *      Redistributions in binary form must reproduce the above
18 *      copyright notice, this list of conditions and the following
19 *      disclaimer in the documentation and/or other materials provided
20 *      with the distribution.
21 *
22 *      Neither the name of the Network Appliance, Inc. nor the names of
23 *      its contributors may be used to endorse or promote products
24 *      derived from this software without specific prior written
25 *      permission.
26 *
27 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
28 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
29 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
30 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
31 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
32 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
33 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
34 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
35 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
36 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
37 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
38 */
39
40/*
41 * verbs.c
42 *
43 * Encapsulates the major functions managing:
44 *  o adapters
45 *  o endpoints
46 *  o connections
47 *  o buffer memory
48 */
49
50#include <linux/interrupt.h>
51#include <linux/slab.h>
52#include <asm/bitops.h>
53
54#include "xprt_rdma.h"
55
56/*
57 * Globals/Macros
58 */
59
60#ifdef RPC_DEBUG
61# define RPCDBG_FACILITY	RPCDBG_TRANS
62#endif
63
64static void rpcrdma_reset_frmrs(struct rpcrdma_ia *);
65
66/*
67 * internal functions
68 */
69
70/*
71 * handle replies in tasklet context, using a single, global list
72 * rdma tasklet function -- just turn around and call the func
73 * for all replies on the list
74 */
75
76static DEFINE_SPINLOCK(rpcrdma_tk_lock_g);
77static LIST_HEAD(rpcrdma_tasklets_g);
78
79static void
80rpcrdma_run_tasklet(unsigned long data)
81{
82	struct rpcrdma_rep *rep;
83	void (*func)(struct rpcrdma_rep *);
84	unsigned long flags;
85
86	data = data;
87	spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
88	while (!list_empty(&rpcrdma_tasklets_g)) {
89		rep = list_entry(rpcrdma_tasklets_g.next,
90				 struct rpcrdma_rep, rr_list);
91		list_del(&rep->rr_list);
92		func = rep->rr_func;
93		rep->rr_func = NULL;
94		spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
95
96		if (func)
97			func(rep);
98		else
99			rpcrdma_recv_buffer_put(rep);
100
101		spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
102	}
103	spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
104}
105
106static DECLARE_TASKLET(rpcrdma_tasklet_g, rpcrdma_run_tasklet, 0UL);
107
108static void
109rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context)
110{
111	struct rpcrdma_ep *ep = context;
112
113	dprintk("RPC:       %s: QP error %X on device %s ep %p\n",
114		__func__, event->event, event->device->name, context);
115	if (ep->rep_connected == 1) {
116		ep->rep_connected = -EIO;
117		ep->rep_func(ep);
118		wake_up_all(&ep->rep_connect_wait);
119	}
120}
121
122static void
123rpcrdma_cq_async_error_upcall(struct ib_event *event, void *context)
124{
125	struct rpcrdma_ep *ep = context;
126
127	dprintk("RPC:       %s: CQ error %X on device %s ep %p\n",
128		__func__, event->event, event->device->name, context);
129	if (ep->rep_connected == 1) {
130		ep->rep_connected = -EIO;
131		ep->rep_func(ep);
132		wake_up_all(&ep->rep_connect_wait);
133	}
134}
135
136static void
137rpcrdma_sendcq_process_wc(struct ib_wc *wc)
138{
139	struct rpcrdma_mw *frmr = (struct rpcrdma_mw *)(unsigned long)wc->wr_id;
140
141	dprintk("RPC:       %s: frmr %p status %X opcode %d\n",
142		__func__, frmr, wc->status, wc->opcode);
143
144	if (wc->wr_id == 0ULL)
145		return;
146	if (wc->status != IB_WC_SUCCESS)
147		frmr->r.frmr.fr_state = FRMR_IS_STALE;
148}
149
150static int
151rpcrdma_sendcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep)
152{
153	struct ib_wc *wcs;
154	int budget, count, rc;
155
156	budget = RPCRDMA_WC_BUDGET / RPCRDMA_POLLSIZE;
157	do {
158		wcs = ep->rep_send_wcs;
159
160		rc = ib_poll_cq(cq, RPCRDMA_POLLSIZE, wcs);
161		if (rc <= 0)
162			return rc;
163
164		count = rc;
165		while (count-- > 0)
166			rpcrdma_sendcq_process_wc(wcs++);
167	} while (rc == RPCRDMA_POLLSIZE && --budget);
168	return 0;
169}
170
171/*
172 * Handle send, fast_reg_mr, and local_inv completions.
173 *
174 * Send events are typically suppressed and thus do not result
175 * in an upcall. Occasionally one is signaled, however. This
176 * prevents the provider's completion queue from wrapping and
177 * losing a completion.
178 */
179static void
180rpcrdma_sendcq_upcall(struct ib_cq *cq, void *cq_context)
181{
182	struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context;
183	int rc;
184
185	rc = rpcrdma_sendcq_poll(cq, ep);
186	if (rc) {
187		dprintk("RPC:       %s: ib_poll_cq failed: %i\n",
188			__func__, rc);
189		return;
190	}
191
192	rc = ib_req_notify_cq(cq,
193			IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS);
194	if (rc == 0)
195		return;
196	if (rc < 0) {
197		dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
198			__func__, rc);
199		return;
200	}
201
202	rpcrdma_sendcq_poll(cq, ep);
203}
204
205static void
206rpcrdma_recvcq_process_wc(struct ib_wc *wc, struct list_head *sched_list)
207{
208	struct rpcrdma_rep *rep =
209			(struct rpcrdma_rep *)(unsigned long)wc->wr_id;
210
211	dprintk("RPC:       %s: rep %p status %X opcode %X length %u\n",
212		__func__, rep, wc->status, wc->opcode, wc->byte_len);
213
214	if (wc->status != IB_WC_SUCCESS) {
215		rep->rr_len = ~0U;
216		goto out_schedule;
217	}
218	if (wc->opcode != IB_WC_RECV)
219		return;
220
221	rep->rr_len = wc->byte_len;
222	ib_dma_sync_single_for_cpu(rdmab_to_ia(rep->rr_buffer)->ri_id->device,
223			rep->rr_iov.addr, rep->rr_len, DMA_FROM_DEVICE);
224
225	if (rep->rr_len >= 16) {
226		struct rpcrdma_msg *p = (struct rpcrdma_msg *)rep->rr_base;
227		unsigned int credits = ntohl(p->rm_credit);
228
229		if (credits == 0)
230			credits = 1;	/* don't deadlock */
231		else if (credits > rep->rr_buffer->rb_max_requests)
232			credits = rep->rr_buffer->rb_max_requests;
233		atomic_set(&rep->rr_buffer->rb_credits, credits);
234	}
235
236out_schedule:
237	list_add_tail(&rep->rr_list, sched_list);
238}
239
240static int
241rpcrdma_recvcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep)
242{
243	struct list_head sched_list;
244	struct ib_wc *wcs;
245	int budget, count, rc;
246	unsigned long flags;
247
248	INIT_LIST_HEAD(&sched_list);
249	budget = RPCRDMA_WC_BUDGET / RPCRDMA_POLLSIZE;
250	do {
251		wcs = ep->rep_recv_wcs;
252
253		rc = ib_poll_cq(cq, RPCRDMA_POLLSIZE, wcs);
254		if (rc <= 0)
255			goto out_schedule;
256
257		count = rc;
258		while (count-- > 0)
259			rpcrdma_recvcq_process_wc(wcs++, &sched_list);
260	} while (rc == RPCRDMA_POLLSIZE && --budget);
261	rc = 0;
262
263out_schedule:
264	spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
265	list_splice_tail(&sched_list, &rpcrdma_tasklets_g);
266	spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
267	tasklet_schedule(&rpcrdma_tasklet_g);
268	return rc;
269}
270
271/*
272 * Handle receive completions.
273 *
274 * It is reentrant but processes single events in order to maintain
275 * ordering of receives to keep server credits.
276 *
277 * It is the responsibility of the scheduled tasklet to return
278 * recv buffers to the pool. NOTE: this affects synchronization of
279 * connection shutdown. That is, the structures required for
280 * the completion of the reply handler must remain intact until
281 * all memory has been reclaimed.
282 */
283static void
284rpcrdma_recvcq_upcall(struct ib_cq *cq, void *cq_context)
285{
286	struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context;
287	int rc;
288
289	rc = rpcrdma_recvcq_poll(cq, ep);
290	if (rc) {
291		dprintk("RPC:       %s: ib_poll_cq failed: %i\n",
292			__func__, rc);
293		return;
294	}
295
296	rc = ib_req_notify_cq(cq,
297			IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS);
298	if (rc == 0)
299		return;
300	if (rc < 0) {
301		dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
302			__func__, rc);
303		return;
304	}
305
306	rpcrdma_recvcq_poll(cq, ep);
307}
308
309static void
310rpcrdma_flush_cqs(struct rpcrdma_ep *ep)
311{
312	rpcrdma_recvcq_upcall(ep->rep_attr.recv_cq, ep);
313	rpcrdma_sendcq_upcall(ep->rep_attr.send_cq, ep);
314}
315
316#ifdef RPC_DEBUG
317static const char * const conn[] = {
318	"address resolved",
319	"address error",
320	"route resolved",
321	"route error",
322	"connect request",
323	"connect response",
324	"connect error",
325	"unreachable",
326	"rejected",
327	"established",
328	"disconnected",
329	"device removal",
330	"multicast join",
331	"multicast error",
332	"address change",
333	"timewait exit",
334};
335
336#define CONNECTION_MSG(status)						\
337	((status) < ARRAY_SIZE(conn) ?					\
338		conn[(status)] : "unrecognized connection error")
339#endif
340
341static int
342rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
343{
344	struct rpcrdma_xprt *xprt = id->context;
345	struct rpcrdma_ia *ia = &xprt->rx_ia;
346	struct rpcrdma_ep *ep = &xprt->rx_ep;
347#ifdef RPC_DEBUG
348	struct sockaddr_in *addr = (struct sockaddr_in *) &ep->rep_remote_addr;
349#endif
350	struct ib_qp_attr attr;
351	struct ib_qp_init_attr iattr;
352	int connstate = 0;
353
354	switch (event->event) {
355	case RDMA_CM_EVENT_ADDR_RESOLVED:
356	case RDMA_CM_EVENT_ROUTE_RESOLVED:
357		ia->ri_async_rc = 0;
358		complete(&ia->ri_done);
359		break;
360	case RDMA_CM_EVENT_ADDR_ERROR:
361		ia->ri_async_rc = -EHOSTUNREACH;
362		dprintk("RPC:       %s: CM address resolution error, ep 0x%p\n",
363			__func__, ep);
364		complete(&ia->ri_done);
365		break;
366	case RDMA_CM_EVENT_ROUTE_ERROR:
367		ia->ri_async_rc = -ENETUNREACH;
368		dprintk("RPC:       %s: CM route resolution error, ep 0x%p\n",
369			__func__, ep);
370		complete(&ia->ri_done);
371		break;
372	case RDMA_CM_EVENT_ESTABLISHED:
373		connstate = 1;
374		ib_query_qp(ia->ri_id->qp, &attr,
375			IB_QP_MAX_QP_RD_ATOMIC | IB_QP_MAX_DEST_RD_ATOMIC,
376			&iattr);
377		dprintk("RPC:       %s: %d responder resources"
378			" (%d initiator)\n",
379			__func__, attr.max_dest_rd_atomic, attr.max_rd_atomic);
380		goto connected;
381	case RDMA_CM_EVENT_CONNECT_ERROR:
382		connstate = -ENOTCONN;
383		goto connected;
384	case RDMA_CM_EVENT_UNREACHABLE:
385		connstate = -ENETDOWN;
386		goto connected;
387	case RDMA_CM_EVENT_REJECTED:
388		connstate = -ECONNREFUSED;
389		goto connected;
390	case RDMA_CM_EVENT_DISCONNECTED:
391		connstate = -ECONNABORTED;
392		goto connected;
393	case RDMA_CM_EVENT_DEVICE_REMOVAL:
394		connstate = -ENODEV;
395connected:
396		atomic_set(&rpcx_to_rdmax(ep->rep_xprt)->rx_buf.rb_credits, 1);
397		dprintk("RPC:       %s: %sconnected\n",
398					__func__, connstate > 0 ? "" : "dis");
399		ep->rep_connected = connstate;
400		ep->rep_func(ep);
401		wake_up_all(&ep->rep_connect_wait);
402		/*FALLTHROUGH*/
403	default:
404		dprintk("RPC:       %s: %pI4:%u (ep 0x%p): %s\n",
405			__func__, &addr->sin_addr.s_addr,
406			ntohs(addr->sin_port), ep,
407			CONNECTION_MSG(event->event));
408		break;
409	}
410
411#ifdef RPC_DEBUG
412	if (connstate == 1) {
413		int ird = attr.max_dest_rd_atomic;
414		int tird = ep->rep_remote_cma.responder_resources;
415		printk(KERN_INFO "rpcrdma: connection to %pI4:%u "
416			"on %s, memreg %d slots %d ird %d%s\n",
417			&addr->sin_addr.s_addr,
418			ntohs(addr->sin_port),
419			ia->ri_id->device->name,
420			ia->ri_memreg_strategy,
421			xprt->rx_buf.rb_max_requests,
422			ird, ird < 4 && ird < tird / 2 ? " (low!)" : "");
423	} else if (connstate < 0) {
424		printk(KERN_INFO "rpcrdma: connection to %pI4:%u closed (%d)\n",
425			&addr->sin_addr.s_addr,
426			ntohs(addr->sin_port),
427			connstate);
428	}
429#endif
430
431	return 0;
432}
433
434static struct rdma_cm_id *
435rpcrdma_create_id(struct rpcrdma_xprt *xprt,
436			struct rpcrdma_ia *ia, struct sockaddr *addr)
437{
438	struct rdma_cm_id *id;
439	int rc;
440
441	init_completion(&ia->ri_done);
442
443	id = rdma_create_id(rpcrdma_conn_upcall, xprt, RDMA_PS_TCP, IB_QPT_RC);
444	if (IS_ERR(id)) {
445		rc = PTR_ERR(id);
446		dprintk("RPC:       %s: rdma_create_id() failed %i\n",
447			__func__, rc);
448		return id;
449	}
450
451	ia->ri_async_rc = -ETIMEDOUT;
452	rc = rdma_resolve_addr(id, NULL, addr, RDMA_RESOLVE_TIMEOUT);
453	if (rc) {
454		dprintk("RPC:       %s: rdma_resolve_addr() failed %i\n",
455			__func__, rc);
456		goto out;
457	}
458	wait_for_completion_interruptible_timeout(&ia->ri_done,
459				msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
460	rc = ia->ri_async_rc;
461	if (rc)
462		goto out;
463
464	ia->ri_async_rc = -ETIMEDOUT;
465	rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT);
466	if (rc) {
467		dprintk("RPC:       %s: rdma_resolve_route() failed %i\n",
468			__func__, rc);
469		goto out;
470	}
471	wait_for_completion_interruptible_timeout(&ia->ri_done,
472				msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
473	rc = ia->ri_async_rc;
474	if (rc)
475		goto out;
476
477	return id;
478
479out:
480	rdma_destroy_id(id);
481	return ERR_PTR(rc);
482}
483
484/*
485 * Drain any cq, prior to teardown.
486 */
487static void
488rpcrdma_clean_cq(struct ib_cq *cq)
489{
490	struct ib_wc wc;
491	int count = 0;
492
493	while (1 == ib_poll_cq(cq, 1, &wc))
494		++count;
495
496	if (count)
497		dprintk("RPC:       %s: flushed %d events (last 0x%x)\n",
498			__func__, count, wc.opcode);
499}
500
501/*
502 * Exported functions.
503 */
504
505/*
506 * Open and initialize an Interface Adapter.
507 *  o initializes fields of struct rpcrdma_ia, including
508 *    interface and provider attributes and protection zone.
509 */
510int
511rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
512{
513	int rc, mem_priv;
514	struct ib_device_attr devattr;
515	struct rpcrdma_ia *ia = &xprt->rx_ia;
516
517	ia->ri_id = rpcrdma_create_id(xprt, ia, addr);
518	if (IS_ERR(ia->ri_id)) {
519		rc = PTR_ERR(ia->ri_id);
520		goto out1;
521	}
522
523	ia->ri_pd = ib_alloc_pd(ia->ri_id->device);
524	if (IS_ERR(ia->ri_pd)) {
525		rc = PTR_ERR(ia->ri_pd);
526		dprintk("RPC:       %s: ib_alloc_pd() failed %i\n",
527			__func__, rc);
528		goto out2;
529	}
530
531	/*
532	 * Query the device to determine if the requested memory
533	 * registration strategy is supported. If it isn't, set the
534	 * strategy to a globally supported model.
535	 */
536	rc = ib_query_device(ia->ri_id->device, &devattr);
537	if (rc) {
538		dprintk("RPC:       %s: ib_query_device failed %d\n",
539			__func__, rc);
540		goto out2;
541	}
542
543	if (devattr.device_cap_flags & IB_DEVICE_LOCAL_DMA_LKEY) {
544		ia->ri_have_dma_lkey = 1;
545		ia->ri_dma_lkey = ia->ri_id->device->local_dma_lkey;
546	}
547
548	if (memreg == RPCRDMA_FRMR) {
549		/* Requires both frmr reg and local dma lkey */
550		if ((devattr.device_cap_flags &
551		     (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) !=
552		    (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) {
553			dprintk("RPC:       %s: FRMR registration "
554				"not supported by HCA\n", __func__);
555			memreg = RPCRDMA_MTHCAFMR;
556		} else {
557			/* Mind the ia limit on FRMR page list depth */
558			ia->ri_max_frmr_depth = min_t(unsigned int,
559				RPCRDMA_MAX_DATA_SEGS,
560				devattr.max_fast_reg_page_list_len);
561		}
562	}
563	if (memreg == RPCRDMA_MTHCAFMR) {
564		if (!ia->ri_id->device->alloc_fmr) {
565			dprintk("RPC:       %s: MTHCAFMR registration "
566				"not supported by HCA\n", __func__);
567			memreg = RPCRDMA_ALLPHYSICAL;
568		}
569	}
570
571	/*
572	 * Optionally obtain an underlying physical identity mapping in
573	 * order to do a memory window-based bind. This base registration
574	 * is protected from remote access - that is enabled only by binding
575	 * for the specific bytes targeted during each RPC operation, and
576	 * revoked after the corresponding completion similar to a storage
577	 * adapter.
578	 */
579	switch (memreg) {
580	case RPCRDMA_FRMR:
581		break;
582	case RPCRDMA_ALLPHYSICAL:
583		mem_priv = IB_ACCESS_LOCAL_WRITE |
584				IB_ACCESS_REMOTE_WRITE |
585				IB_ACCESS_REMOTE_READ;
586		goto register_setup;
587	case RPCRDMA_MTHCAFMR:
588		if (ia->ri_have_dma_lkey)
589			break;
590		mem_priv = IB_ACCESS_LOCAL_WRITE;
591	register_setup:
592		ia->ri_bind_mem = ib_get_dma_mr(ia->ri_pd, mem_priv);
593		if (IS_ERR(ia->ri_bind_mem)) {
594			printk(KERN_ALERT "%s: ib_get_dma_mr for "
595				"phys register failed with %lX\n",
596				__func__, PTR_ERR(ia->ri_bind_mem));
597			rc = -ENOMEM;
598			goto out2;
599		}
600		break;
601	default:
602		printk(KERN_ERR "RPC: Unsupported memory "
603				"registration mode: %d\n", memreg);
604		rc = -ENOMEM;
605		goto out2;
606	}
607	dprintk("RPC:       %s: memory registration strategy is %d\n",
608		__func__, memreg);
609
610	/* Else will do memory reg/dereg for each chunk */
611	ia->ri_memreg_strategy = memreg;
612
613	rwlock_init(&ia->ri_qplock);
614	return 0;
615out2:
616	rdma_destroy_id(ia->ri_id);
617	ia->ri_id = NULL;
618out1:
619	return rc;
620}
621
622/*
623 * Clean up/close an IA.
624 *   o if event handles and PD have been initialized, free them.
625 *   o close the IA
626 */
627void
628rpcrdma_ia_close(struct rpcrdma_ia *ia)
629{
630	int rc;
631
632	dprintk("RPC:       %s: entering\n", __func__);
633	if (ia->ri_bind_mem != NULL) {
634		rc = ib_dereg_mr(ia->ri_bind_mem);
635		dprintk("RPC:       %s: ib_dereg_mr returned %i\n",
636			__func__, rc);
637	}
638	if (ia->ri_id != NULL && !IS_ERR(ia->ri_id)) {
639		if (ia->ri_id->qp)
640			rdma_destroy_qp(ia->ri_id);
641		rdma_destroy_id(ia->ri_id);
642		ia->ri_id = NULL;
643	}
644	if (ia->ri_pd != NULL && !IS_ERR(ia->ri_pd)) {
645		rc = ib_dealloc_pd(ia->ri_pd);
646		dprintk("RPC:       %s: ib_dealloc_pd returned %i\n",
647			__func__, rc);
648	}
649}
650
651/*
652 * Create unconnected endpoint.
653 */
654int
655rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
656				struct rpcrdma_create_data_internal *cdata)
657{
658	struct ib_device_attr devattr;
659	struct ib_cq *sendcq, *recvcq;
660	int rc, err;
661
662	rc = ib_query_device(ia->ri_id->device, &devattr);
663	if (rc) {
664		dprintk("RPC:       %s: ib_query_device failed %d\n",
665			__func__, rc);
666		return rc;
667	}
668
669	/* check provider's send/recv wr limits */
670	if (cdata->max_requests > devattr.max_qp_wr)
671		cdata->max_requests = devattr.max_qp_wr;
672
673	ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall;
674	ep->rep_attr.qp_context = ep;
675	/* send_cq and recv_cq initialized below */
676	ep->rep_attr.srq = NULL;
677	ep->rep_attr.cap.max_send_wr = cdata->max_requests;
678	switch (ia->ri_memreg_strategy) {
679	case RPCRDMA_FRMR: {
680		int depth = 7;
681
682		/* Add room for frmr register and invalidate WRs.
683		 * 1. FRMR reg WR for head
684		 * 2. FRMR invalidate WR for head
685		 * 3. N FRMR reg WRs for pagelist
686		 * 4. N FRMR invalidate WRs for pagelist
687		 * 5. FRMR reg WR for tail
688		 * 6. FRMR invalidate WR for tail
689		 * 7. The RDMA_SEND WR
690		 */
691
692		/* Calculate N if the device max FRMR depth is smaller than
693		 * RPCRDMA_MAX_DATA_SEGS.
694		 */
695		if (ia->ri_max_frmr_depth < RPCRDMA_MAX_DATA_SEGS) {
696			int delta = RPCRDMA_MAX_DATA_SEGS -
697				    ia->ri_max_frmr_depth;
698
699			do {
700				depth += 2; /* FRMR reg + invalidate */
701				delta -= ia->ri_max_frmr_depth;
702			} while (delta > 0);
703
704		}
705		ep->rep_attr.cap.max_send_wr *= depth;
706		if (ep->rep_attr.cap.max_send_wr > devattr.max_qp_wr) {
707			cdata->max_requests = devattr.max_qp_wr / depth;
708			if (!cdata->max_requests)
709				return -EINVAL;
710			ep->rep_attr.cap.max_send_wr = cdata->max_requests *
711						       depth;
712		}
713		break;
714	}
715	default:
716		break;
717	}
718	ep->rep_attr.cap.max_recv_wr = cdata->max_requests;
719	ep->rep_attr.cap.max_send_sge = (cdata->padding ? 4 : 2);
720	ep->rep_attr.cap.max_recv_sge = 1;
721	ep->rep_attr.cap.max_inline_data = 0;
722	ep->rep_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
723	ep->rep_attr.qp_type = IB_QPT_RC;
724	ep->rep_attr.port_num = ~0;
725
726	dprintk("RPC:       %s: requested max: dtos: send %d recv %d; "
727		"iovs: send %d recv %d\n",
728		__func__,
729		ep->rep_attr.cap.max_send_wr,
730		ep->rep_attr.cap.max_recv_wr,
731		ep->rep_attr.cap.max_send_sge,
732		ep->rep_attr.cap.max_recv_sge);
733
734	/* set trigger for requesting send completion */
735	ep->rep_cqinit = ep->rep_attr.cap.max_send_wr/2 - 1;
736	if (ep->rep_cqinit <= 2)
737		ep->rep_cqinit = 0;
738	INIT_CQCOUNT(ep);
739	ep->rep_ia = ia;
740	init_waitqueue_head(&ep->rep_connect_wait);
741	INIT_DELAYED_WORK(&ep->rep_connect_worker, rpcrdma_connect_worker);
742
743	sendcq = ib_create_cq(ia->ri_id->device, rpcrdma_sendcq_upcall,
744				  rpcrdma_cq_async_error_upcall, ep,
745				  ep->rep_attr.cap.max_send_wr + 1, 0);
746	if (IS_ERR(sendcq)) {
747		rc = PTR_ERR(sendcq);
748		dprintk("RPC:       %s: failed to create send CQ: %i\n",
749			__func__, rc);
750		goto out1;
751	}
752
753	rc = ib_req_notify_cq(sendcq, IB_CQ_NEXT_COMP);
754	if (rc) {
755		dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
756			__func__, rc);
757		goto out2;
758	}
759
760	recvcq = ib_create_cq(ia->ri_id->device, rpcrdma_recvcq_upcall,
761				  rpcrdma_cq_async_error_upcall, ep,
762				  ep->rep_attr.cap.max_recv_wr + 1, 0);
763	if (IS_ERR(recvcq)) {
764		rc = PTR_ERR(recvcq);
765		dprintk("RPC:       %s: failed to create recv CQ: %i\n",
766			__func__, rc);
767		goto out2;
768	}
769
770	rc = ib_req_notify_cq(recvcq, IB_CQ_NEXT_COMP);
771	if (rc) {
772		dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
773			__func__, rc);
774		ib_destroy_cq(recvcq);
775		goto out2;
776	}
777
778	ep->rep_attr.send_cq = sendcq;
779	ep->rep_attr.recv_cq = recvcq;
780
781	/* Initialize cma parameters */
782
783	/* RPC/RDMA does not use private data */
784	ep->rep_remote_cma.private_data = NULL;
785	ep->rep_remote_cma.private_data_len = 0;
786
787	/* Client offers RDMA Read but does not initiate */
788	ep->rep_remote_cma.initiator_depth = 0;
789	if (devattr.max_qp_rd_atom > 32)	/* arbitrary but <= 255 */
790		ep->rep_remote_cma.responder_resources = 32;
791	else
792		ep->rep_remote_cma.responder_resources = devattr.max_qp_rd_atom;
793
794	ep->rep_remote_cma.retry_count = 7;
795	ep->rep_remote_cma.flow_control = 0;
796	ep->rep_remote_cma.rnr_retry_count = 0;
797
798	return 0;
799
800out2:
801	err = ib_destroy_cq(sendcq);
802	if (err)
803		dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
804			__func__, err);
805out1:
806	return rc;
807}
808
809/*
810 * rpcrdma_ep_destroy
811 *
812 * Disconnect and destroy endpoint. After this, the only
813 * valid operations on the ep are to free it (if dynamically
814 * allocated) or re-create it.
815 */
816void
817rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
818{
819	int rc;
820
821	dprintk("RPC:       %s: entering, connected is %d\n",
822		__func__, ep->rep_connected);
823
824	cancel_delayed_work_sync(&ep->rep_connect_worker);
825
826	if (ia->ri_id->qp) {
827		rpcrdma_ep_disconnect(ep, ia);
828		rdma_destroy_qp(ia->ri_id);
829		ia->ri_id->qp = NULL;
830	}
831
832	/* padding - could be done in rpcrdma_buffer_destroy... */
833	if (ep->rep_pad_mr) {
834		rpcrdma_deregister_internal(ia, ep->rep_pad_mr, &ep->rep_pad);
835		ep->rep_pad_mr = NULL;
836	}
837
838	rpcrdma_clean_cq(ep->rep_attr.recv_cq);
839	rc = ib_destroy_cq(ep->rep_attr.recv_cq);
840	if (rc)
841		dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
842			__func__, rc);
843
844	rpcrdma_clean_cq(ep->rep_attr.send_cq);
845	rc = ib_destroy_cq(ep->rep_attr.send_cq);
846	if (rc)
847		dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
848			__func__, rc);
849}
850
851/*
852 * Connect unconnected endpoint.
853 */
854int
855rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
856{
857	struct rdma_cm_id *id, *old;
858	int rc = 0;
859	int retry_count = 0;
860
861	if (ep->rep_connected != 0) {
862		struct rpcrdma_xprt *xprt;
863retry:
864		dprintk("RPC:       %s: reconnecting...\n", __func__);
865
866		rpcrdma_ep_disconnect(ep, ia);
867		rpcrdma_flush_cqs(ep);
868
869		if (ia->ri_memreg_strategy == RPCRDMA_FRMR)
870			rpcrdma_reset_frmrs(ia);
871
872		xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
873		id = rpcrdma_create_id(xprt, ia,
874				(struct sockaddr *)&xprt->rx_data.addr);
875		if (IS_ERR(id)) {
876			rc = -EHOSTUNREACH;
877			goto out;
878		}
879		/* TEMP TEMP TEMP - fail if new device:
880		 * Deregister/remarshal *all* requests!
881		 * Close and recreate adapter, pd, etc!
882		 * Re-determine all attributes still sane!
883		 * More stuff I haven't thought of!
884		 * Rrrgh!
885		 */
886		if (ia->ri_id->device != id->device) {
887			printk("RPC:       %s: can't reconnect on "
888				"different device!\n", __func__);
889			rdma_destroy_id(id);
890			rc = -ENETUNREACH;
891			goto out;
892		}
893		/* END TEMP */
894		rc = rdma_create_qp(id, ia->ri_pd, &ep->rep_attr);
895		if (rc) {
896			dprintk("RPC:       %s: rdma_create_qp failed %i\n",
897				__func__, rc);
898			rdma_destroy_id(id);
899			rc = -ENETUNREACH;
900			goto out;
901		}
902
903		write_lock(&ia->ri_qplock);
904		old = ia->ri_id;
905		ia->ri_id = id;
906		write_unlock(&ia->ri_qplock);
907
908		rdma_destroy_qp(old);
909		rdma_destroy_id(old);
910	} else {
911		dprintk("RPC:       %s: connecting...\n", __func__);
912		rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr);
913		if (rc) {
914			dprintk("RPC:       %s: rdma_create_qp failed %i\n",
915				__func__, rc);
916			/* do not update ep->rep_connected */
917			return -ENETUNREACH;
918		}
919	}
920
921	ep->rep_connected = 0;
922
923	rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma);
924	if (rc) {
925		dprintk("RPC:       %s: rdma_connect() failed with %i\n",
926				__func__, rc);
927		goto out;
928	}
929
930	wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 0);
931
932	/*
933	 * Check state. A non-peer reject indicates no listener
934	 * (ECONNREFUSED), which may be a transient state. All
935	 * others indicate a transport condition which has already
936	 * undergone a best-effort.
937	 */
938	if (ep->rep_connected == -ECONNREFUSED &&
939	    ++retry_count <= RDMA_CONNECT_RETRY_MAX) {
940		dprintk("RPC:       %s: non-peer_reject, retry\n", __func__);
941		goto retry;
942	}
943	if (ep->rep_connected <= 0) {
944		/* Sometimes, the only way to reliably connect to remote
945		 * CMs is to use same nonzero values for ORD and IRD. */
946		if (retry_count++ <= RDMA_CONNECT_RETRY_MAX + 1 &&
947		    (ep->rep_remote_cma.responder_resources == 0 ||
948		     ep->rep_remote_cma.initiator_depth !=
949				ep->rep_remote_cma.responder_resources)) {
950			if (ep->rep_remote_cma.responder_resources == 0)
951				ep->rep_remote_cma.responder_resources = 1;
952			ep->rep_remote_cma.initiator_depth =
953				ep->rep_remote_cma.responder_resources;
954			goto retry;
955		}
956		rc = ep->rep_connected;
957	} else {
958		dprintk("RPC:       %s: connected\n", __func__);
959	}
960
961out:
962	if (rc)
963		ep->rep_connected = rc;
964	return rc;
965}
966
967/*
968 * rpcrdma_ep_disconnect
969 *
970 * This is separate from destroy to facilitate the ability
971 * to reconnect without recreating the endpoint.
972 *
973 * This call is not reentrant, and must not be made in parallel
974 * on the same endpoint.
975 */
976void
977rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
978{
979	int rc;
980
981	rpcrdma_flush_cqs(ep);
982	rc = rdma_disconnect(ia->ri_id);
983	if (!rc) {
984		/* returns without wait if not connected */
985		wait_event_interruptible(ep->rep_connect_wait,
986							ep->rep_connected != 1);
987		dprintk("RPC:       %s: after wait, %sconnected\n", __func__,
988			(ep->rep_connected == 1) ? "still " : "dis");
989	} else {
990		dprintk("RPC:       %s: rdma_disconnect %i\n", __func__, rc);
991		ep->rep_connected = rc;
992	}
993}
994
995static int
996rpcrdma_init_fmrs(struct rpcrdma_ia *ia, struct rpcrdma_buffer *buf)
997{
998	int mr_access_flags = IB_ACCESS_REMOTE_WRITE | IB_ACCESS_REMOTE_READ;
999	struct ib_fmr_attr fmr_attr = {
1000		.max_pages	= RPCRDMA_MAX_DATA_SEGS,
1001		.max_maps	= 1,
1002		.page_shift	= PAGE_SHIFT
1003	};
1004	struct rpcrdma_mw *r;
1005	int i, rc;
1006
1007	i = (buf->rb_max_requests + 1) * RPCRDMA_MAX_SEGS;
1008	dprintk("RPC:       %s: initalizing %d FMRs\n", __func__, i);
1009
1010	while (i--) {
1011		r = kzalloc(sizeof(*r), GFP_KERNEL);
1012		if (r == NULL)
1013			return -ENOMEM;
1014
1015		r->r.fmr = ib_alloc_fmr(ia->ri_pd, mr_access_flags, &fmr_attr);
1016		if (IS_ERR(r->r.fmr)) {
1017			rc = PTR_ERR(r->r.fmr);
1018			dprintk("RPC:       %s: ib_alloc_fmr failed %i\n",
1019				__func__, rc);
1020			goto out_free;
1021		}
1022
1023		list_add(&r->mw_list, &buf->rb_mws);
1024		list_add(&r->mw_all, &buf->rb_all);
1025	}
1026	return 0;
1027
1028out_free:
1029	kfree(r);
1030	return rc;
1031}
1032
1033static int
1034rpcrdma_init_frmrs(struct rpcrdma_ia *ia, struct rpcrdma_buffer *buf)
1035{
1036	struct rpcrdma_frmr *f;
1037	struct rpcrdma_mw *r;
1038	int i, rc;
1039
1040	i = (buf->rb_max_requests + 1) * RPCRDMA_MAX_SEGS;
1041	dprintk("RPC:       %s: initalizing %d FRMRs\n", __func__, i);
1042
1043	while (i--) {
1044		r = kzalloc(sizeof(*r), GFP_KERNEL);
1045		if (r == NULL)
1046			return -ENOMEM;
1047		f = &r->r.frmr;
1048
1049		f->fr_mr = ib_alloc_fast_reg_mr(ia->ri_pd,
1050						ia->ri_max_frmr_depth);
1051		if (IS_ERR(f->fr_mr)) {
1052			rc = PTR_ERR(f->fr_mr);
1053			dprintk("RPC:       %s: ib_alloc_fast_reg_mr "
1054				"failed %i\n", __func__, rc);
1055			goto out_free;
1056		}
1057
1058		f->fr_pgl = ib_alloc_fast_reg_page_list(ia->ri_id->device,
1059							ia->ri_max_frmr_depth);
1060		if (IS_ERR(f->fr_pgl)) {
1061			rc = PTR_ERR(f->fr_pgl);
1062			dprintk("RPC:       %s: ib_alloc_fast_reg_page_list "
1063				"failed %i\n", __func__, rc);
1064
1065			ib_dereg_mr(f->fr_mr);
1066			goto out_free;
1067		}
1068
1069		list_add(&r->mw_list, &buf->rb_mws);
1070		list_add(&r->mw_all, &buf->rb_all);
1071	}
1072
1073	return 0;
1074
1075out_free:
1076	kfree(r);
1077	return rc;
1078}
1079
1080int
1081rpcrdma_buffer_create(struct rpcrdma_buffer *buf, struct rpcrdma_ep *ep,
1082	struct rpcrdma_ia *ia, struct rpcrdma_create_data_internal *cdata)
1083{
1084	char *p;
1085	size_t len, rlen, wlen;
1086	int i, rc;
1087
1088	buf->rb_max_requests = cdata->max_requests;
1089	spin_lock_init(&buf->rb_lock);
1090	atomic_set(&buf->rb_credits, 1);
1091
1092	/* Need to allocate:
1093	 *   1.  arrays for send and recv pointers
1094	 *   2.  arrays of struct rpcrdma_req to fill in pointers
1095	 *   3.  array of struct rpcrdma_rep for replies
1096	 *   4.  padding, if any
1097	 * Send/recv buffers in req/rep need to be registered
1098	 */
1099	len = buf->rb_max_requests *
1100		(sizeof(struct rpcrdma_req *) + sizeof(struct rpcrdma_rep *));
1101	len += cdata->padding;
1102
1103	p = kzalloc(len, GFP_KERNEL);
1104	if (p == NULL) {
1105		dprintk("RPC:       %s: req_t/rep_t/pad kzalloc(%zd) failed\n",
1106			__func__, len);
1107		rc = -ENOMEM;
1108		goto out;
1109	}
1110	buf->rb_pool = p;	/* for freeing it later */
1111
1112	buf->rb_send_bufs = (struct rpcrdma_req **) p;
1113	p = (char *) &buf->rb_send_bufs[buf->rb_max_requests];
1114	buf->rb_recv_bufs = (struct rpcrdma_rep **) p;
1115	p = (char *) &buf->rb_recv_bufs[buf->rb_max_requests];
1116
1117	/*
1118	 * Register the zeroed pad buffer, if any.
1119	 */
1120	if (cdata->padding) {
1121		rc = rpcrdma_register_internal(ia, p, cdata->padding,
1122					    &ep->rep_pad_mr, &ep->rep_pad);
1123		if (rc)
1124			goto out;
1125	}
1126	p += cdata->padding;
1127
1128	INIT_LIST_HEAD(&buf->rb_mws);
1129	INIT_LIST_HEAD(&buf->rb_all);
1130	switch (ia->ri_memreg_strategy) {
1131	case RPCRDMA_FRMR:
1132		rc = rpcrdma_init_frmrs(ia, buf);
1133		if (rc)
1134			goto out;
1135		break;
1136	case RPCRDMA_MTHCAFMR:
1137		rc = rpcrdma_init_fmrs(ia, buf);
1138		if (rc)
1139			goto out;
1140		break;
1141	default:
1142		break;
1143	}
1144
1145	/*
1146	 * Allocate/init the request/reply buffers. Doing this
1147	 * using kmalloc for now -- one for each buf.
1148	 */
1149	wlen = 1 << fls(cdata->inline_wsize + sizeof(struct rpcrdma_req));
1150	rlen = 1 << fls(cdata->inline_rsize + sizeof(struct rpcrdma_rep));
1151	dprintk("RPC:       %s: wlen = %zu, rlen = %zu\n",
1152		__func__, wlen, rlen);
1153
1154	for (i = 0; i < buf->rb_max_requests; i++) {
1155		struct rpcrdma_req *req;
1156		struct rpcrdma_rep *rep;
1157
1158		req = kmalloc(wlen, GFP_KERNEL);
1159		if (req == NULL) {
1160			dprintk("RPC:       %s: request buffer %d alloc"
1161				" failed\n", __func__, i);
1162			rc = -ENOMEM;
1163			goto out;
1164		}
1165		memset(req, 0, sizeof(struct rpcrdma_req));
1166		buf->rb_send_bufs[i] = req;
1167		buf->rb_send_bufs[i]->rl_buffer = buf;
1168
1169		rc = rpcrdma_register_internal(ia, req->rl_base,
1170				wlen - offsetof(struct rpcrdma_req, rl_base),
1171				&buf->rb_send_bufs[i]->rl_handle,
1172				&buf->rb_send_bufs[i]->rl_iov);
1173		if (rc)
1174			goto out;
1175
1176		buf->rb_send_bufs[i]->rl_size = wlen -
1177						sizeof(struct rpcrdma_req);
1178
1179		rep = kmalloc(rlen, GFP_KERNEL);
1180		if (rep == NULL) {
1181			dprintk("RPC:       %s: reply buffer %d alloc failed\n",
1182				__func__, i);
1183			rc = -ENOMEM;
1184			goto out;
1185		}
1186		memset(rep, 0, sizeof(struct rpcrdma_rep));
1187		buf->rb_recv_bufs[i] = rep;
1188		buf->rb_recv_bufs[i]->rr_buffer = buf;
1189
1190		rc = rpcrdma_register_internal(ia, rep->rr_base,
1191				rlen - offsetof(struct rpcrdma_rep, rr_base),
1192				&buf->rb_recv_bufs[i]->rr_handle,
1193				&buf->rb_recv_bufs[i]->rr_iov);
1194		if (rc)
1195			goto out;
1196
1197	}
1198	dprintk("RPC:       %s: max_requests %d\n",
1199		__func__, buf->rb_max_requests);
1200	/* done */
1201	return 0;
1202out:
1203	rpcrdma_buffer_destroy(buf);
1204	return rc;
1205}
1206
1207static void
1208rpcrdma_destroy_fmrs(struct rpcrdma_buffer *buf)
1209{
1210	struct rpcrdma_mw *r;
1211	int rc;
1212
1213	while (!list_empty(&buf->rb_all)) {
1214		r = list_entry(buf->rb_all.next, struct rpcrdma_mw, mw_all);
1215		list_del(&r->mw_all);
1216		list_del(&r->mw_list);
1217
1218		rc = ib_dealloc_fmr(r->r.fmr);
1219		if (rc)
1220			dprintk("RPC:       %s: ib_dealloc_fmr failed %i\n",
1221				__func__, rc);
1222
1223		kfree(r);
1224	}
1225}
1226
1227static void
1228rpcrdma_destroy_frmrs(struct rpcrdma_buffer *buf)
1229{
1230	struct rpcrdma_mw *r;
1231	int rc;
1232
1233	while (!list_empty(&buf->rb_all)) {
1234		r = list_entry(buf->rb_all.next, struct rpcrdma_mw, mw_all);
1235		list_del(&r->mw_all);
1236		list_del(&r->mw_list);
1237
1238		rc = ib_dereg_mr(r->r.frmr.fr_mr);
1239		if (rc)
1240			dprintk("RPC:       %s: ib_dereg_mr failed %i\n",
1241				__func__, rc);
1242		ib_free_fast_reg_page_list(r->r.frmr.fr_pgl);
1243
1244		kfree(r);
1245	}
1246}
1247
1248void
1249rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
1250{
1251	struct rpcrdma_ia *ia = rdmab_to_ia(buf);
1252	int i;
1253
1254	/* clean up in reverse order from create
1255	 *   1.  recv mr memory (mr free, then kfree)
1256	 *   2.  send mr memory (mr free, then kfree)
1257	 *   3.  MWs
1258	 */
1259	dprintk("RPC:       %s: entering\n", __func__);
1260
1261	for (i = 0; i < buf->rb_max_requests; i++) {
1262		if (buf->rb_recv_bufs && buf->rb_recv_bufs[i]) {
1263			rpcrdma_deregister_internal(ia,
1264					buf->rb_recv_bufs[i]->rr_handle,
1265					&buf->rb_recv_bufs[i]->rr_iov);
1266			kfree(buf->rb_recv_bufs[i]);
1267		}
1268		if (buf->rb_send_bufs && buf->rb_send_bufs[i]) {
1269			rpcrdma_deregister_internal(ia,
1270					buf->rb_send_bufs[i]->rl_handle,
1271					&buf->rb_send_bufs[i]->rl_iov);
1272			kfree(buf->rb_send_bufs[i]);
1273		}
1274	}
1275
1276	switch (ia->ri_memreg_strategy) {
1277	case RPCRDMA_FRMR:
1278		rpcrdma_destroy_frmrs(buf);
1279		break;
1280	case RPCRDMA_MTHCAFMR:
1281		rpcrdma_destroy_fmrs(buf);
1282		break;
1283	default:
1284		break;
1285	}
1286
1287	kfree(buf->rb_pool);
1288}
1289
1290/* After a disconnect, a flushed FAST_REG_MR can leave an FRMR in
1291 * an unusable state. Find FRMRs in this state and dereg / reg
1292 * each.  FRMRs that are VALID and attached to an rpcrdma_req are
1293 * also torn down.
1294 *
1295 * This gives all in-use FRMRs a fresh rkey and leaves them INVALID.
1296 *
1297 * This is invoked only in the transport connect worker in order
1298 * to serialize with rpcrdma_register_frmr_external().
1299 */
1300static void
1301rpcrdma_reset_frmrs(struct rpcrdma_ia *ia)
1302{
1303	struct rpcrdma_xprt *r_xprt =
1304				container_of(ia, struct rpcrdma_xprt, rx_ia);
1305	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1306	struct list_head *pos;
1307	struct rpcrdma_mw *r;
1308	int rc;
1309
1310	list_for_each(pos, &buf->rb_all) {
1311		r = list_entry(pos, struct rpcrdma_mw, mw_all);
1312
1313		if (r->r.frmr.fr_state == FRMR_IS_INVALID)
1314			continue;
1315
1316		rc = ib_dereg_mr(r->r.frmr.fr_mr);
1317		if (rc)
1318			dprintk("RPC:       %s: ib_dereg_mr failed %i\n",
1319				__func__, rc);
1320		ib_free_fast_reg_page_list(r->r.frmr.fr_pgl);
1321
1322		r->r.frmr.fr_mr = ib_alloc_fast_reg_mr(ia->ri_pd,
1323					ia->ri_max_frmr_depth);
1324		if (IS_ERR(r->r.frmr.fr_mr)) {
1325			rc = PTR_ERR(r->r.frmr.fr_mr);
1326			dprintk("RPC:       %s: ib_alloc_fast_reg_mr"
1327				" failed %i\n", __func__, rc);
1328			continue;
1329		}
1330		r->r.frmr.fr_pgl = ib_alloc_fast_reg_page_list(
1331					ia->ri_id->device,
1332					ia->ri_max_frmr_depth);
1333		if (IS_ERR(r->r.frmr.fr_pgl)) {
1334			rc = PTR_ERR(r->r.frmr.fr_pgl);
1335			dprintk("RPC:       %s: "
1336				"ib_alloc_fast_reg_page_list "
1337				"failed %i\n", __func__, rc);
1338
1339			ib_dereg_mr(r->r.frmr.fr_mr);
1340			continue;
1341		}
1342		r->r.frmr.fr_state = FRMR_IS_INVALID;
1343	}
1344}
1345
1346/* "*mw" can be NULL when rpcrdma_buffer_get_mrs() fails, leaving
1347 * some req segments uninitialized.
1348 */
1349static void
1350rpcrdma_buffer_put_mr(struct rpcrdma_mw **mw, struct rpcrdma_buffer *buf)
1351{
1352	if (*mw) {
1353		list_add_tail(&(*mw)->mw_list, &buf->rb_mws);
1354		*mw = NULL;
1355	}
1356}
1357
1358/* Cycle mw's back in reverse order, and "spin" them.
1359 * This delays and scrambles reuse as much as possible.
1360 */
1361static void
1362rpcrdma_buffer_put_mrs(struct rpcrdma_req *req, struct rpcrdma_buffer *buf)
1363{
1364	struct rpcrdma_mr_seg *seg = req->rl_segments;
1365	struct rpcrdma_mr_seg *seg1 = seg;
1366	int i;
1367
1368	for (i = 1, seg++; i < RPCRDMA_MAX_SEGS; seg++, i++)
1369		rpcrdma_buffer_put_mr(&seg->mr_chunk.rl_mw, buf);
1370	rpcrdma_buffer_put_mr(&seg1->mr_chunk.rl_mw, buf);
1371}
1372
1373static void
1374rpcrdma_buffer_put_sendbuf(struct rpcrdma_req *req, struct rpcrdma_buffer *buf)
1375{
1376	buf->rb_send_bufs[--buf->rb_send_index] = req;
1377	req->rl_niovs = 0;
1378	if (req->rl_reply) {
1379		buf->rb_recv_bufs[--buf->rb_recv_index] = req->rl_reply;
1380		req->rl_reply->rr_func = NULL;
1381		req->rl_reply = NULL;
1382	}
1383}
1384
1385/* rpcrdma_unmap_one() was already done by rpcrdma_deregister_frmr_external().
1386 * Redo only the ib_post_send().
1387 */
1388static void
1389rpcrdma_retry_local_inv(struct rpcrdma_mw *r, struct rpcrdma_ia *ia)
1390{
1391	struct rpcrdma_xprt *r_xprt =
1392				container_of(ia, struct rpcrdma_xprt, rx_ia);
1393	struct ib_send_wr invalidate_wr, *bad_wr;
1394	int rc;
1395
1396	dprintk("RPC:       %s: FRMR %p is stale\n", __func__, r);
1397
1398	/* When this FRMR is re-inserted into rb_mws, it is no longer stale */
1399	r->r.frmr.fr_state = FRMR_IS_INVALID;
1400
1401	memset(&invalidate_wr, 0, sizeof(invalidate_wr));
1402	invalidate_wr.wr_id = (unsigned long)(void *)r;
1403	invalidate_wr.opcode = IB_WR_LOCAL_INV;
1404	invalidate_wr.ex.invalidate_rkey = r->r.frmr.fr_mr->rkey;
1405	DECR_CQCOUNT(&r_xprt->rx_ep);
1406
1407	dprintk("RPC:       %s: frmr %p invalidating rkey %08x\n",
1408		__func__, r, r->r.frmr.fr_mr->rkey);
1409
1410	read_lock(&ia->ri_qplock);
1411	rc = ib_post_send(ia->ri_id->qp, &invalidate_wr, &bad_wr);
1412	read_unlock(&ia->ri_qplock);
1413	if (rc) {
1414		/* Force rpcrdma_buffer_get() to retry */
1415		r->r.frmr.fr_state = FRMR_IS_STALE;
1416		dprintk("RPC:       %s: ib_post_send failed, %i\n",
1417			__func__, rc);
1418	}
1419}
1420
1421static void
1422rpcrdma_retry_flushed_linv(struct list_head *stale,
1423			   struct rpcrdma_buffer *buf)
1424{
1425	struct rpcrdma_ia *ia = rdmab_to_ia(buf);
1426	struct list_head *pos;
1427	struct rpcrdma_mw *r;
1428	unsigned long flags;
1429
1430	list_for_each(pos, stale) {
1431		r = list_entry(pos, struct rpcrdma_mw, mw_list);
1432		rpcrdma_retry_local_inv(r, ia);
1433	}
1434
1435	spin_lock_irqsave(&buf->rb_lock, flags);
1436	list_splice_tail(stale, &buf->rb_mws);
1437	spin_unlock_irqrestore(&buf->rb_lock, flags);
1438}
1439
1440static struct rpcrdma_req *
1441rpcrdma_buffer_get_frmrs(struct rpcrdma_req *req, struct rpcrdma_buffer *buf,
1442			 struct list_head *stale)
1443{
1444	struct rpcrdma_mw *r;
1445	int i;
1446
1447	i = RPCRDMA_MAX_SEGS - 1;
1448	while (!list_empty(&buf->rb_mws)) {
1449		r = list_entry(buf->rb_mws.next,
1450			       struct rpcrdma_mw, mw_list);
1451		list_del(&r->mw_list);
1452		if (r->r.frmr.fr_state == FRMR_IS_STALE) {
1453			list_add(&r->mw_list, stale);
1454			continue;
1455		}
1456		req->rl_segments[i].mr_chunk.rl_mw = r;
1457		if (unlikely(i-- == 0))
1458			return req;	/* Success */
1459	}
1460
1461	/* Not enough entries on rb_mws for this req */
1462	rpcrdma_buffer_put_sendbuf(req, buf);
1463	rpcrdma_buffer_put_mrs(req, buf);
1464	return NULL;
1465}
1466
1467static struct rpcrdma_req *
1468rpcrdma_buffer_get_fmrs(struct rpcrdma_req *req, struct rpcrdma_buffer *buf)
1469{
1470	struct rpcrdma_mw *r;
1471	int i;
1472
1473	i = RPCRDMA_MAX_SEGS - 1;
1474	while (!list_empty(&buf->rb_mws)) {
1475		r = list_entry(buf->rb_mws.next,
1476			       struct rpcrdma_mw, mw_list);
1477		list_del(&r->mw_list);
1478		req->rl_segments[i].mr_chunk.rl_mw = r;
1479		if (unlikely(i-- == 0))
1480			return req;	/* Success */
1481	}
1482
1483	/* Not enough entries on rb_mws for this req */
1484	rpcrdma_buffer_put_sendbuf(req, buf);
1485	rpcrdma_buffer_put_mrs(req, buf);
1486	return NULL;
1487}
1488
1489/*
1490 * Get a set of request/reply buffers.
1491 *
1492 * Reply buffer (if needed) is attached to send buffer upon return.
1493 * Rule:
1494 *    rb_send_index and rb_recv_index MUST always be pointing to the
1495 *    *next* available buffer (non-NULL). They are incremented after
1496 *    removing buffers, and decremented *before* returning them.
1497 */
1498struct rpcrdma_req *
1499rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
1500{
1501	struct rpcrdma_ia *ia = rdmab_to_ia(buffers);
1502	struct list_head stale;
1503	struct rpcrdma_req *req;
1504	unsigned long flags;
1505
1506	spin_lock_irqsave(&buffers->rb_lock, flags);
1507	if (buffers->rb_send_index == buffers->rb_max_requests) {
1508		spin_unlock_irqrestore(&buffers->rb_lock, flags);
1509		dprintk("RPC:       %s: out of request buffers\n", __func__);
1510		return ((struct rpcrdma_req *)NULL);
1511	}
1512
1513	req = buffers->rb_send_bufs[buffers->rb_send_index];
1514	if (buffers->rb_send_index < buffers->rb_recv_index) {
1515		dprintk("RPC:       %s: %d extra receives outstanding (ok)\n",
1516			__func__,
1517			buffers->rb_recv_index - buffers->rb_send_index);
1518		req->rl_reply = NULL;
1519	} else {
1520		req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
1521		buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
1522	}
1523	buffers->rb_send_bufs[buffers->rb_send_index++] = NULL;
1524
1525	INIT_LIST_HEAD(&stale);
1526	switch (ia->ri_memreg_strategy) {
1527	case RPCRDMA_FRMR:
1528		req = rpcrdma_buffer_get_frmrs(req, buffers, &stale);
1529		break;
1530	case RPCRDMA_MTHCAFMR:
1531		req = rpcrdma_buffer_get_fmrs(req, buffers);
1532		break;
1533	default:
1534		break;
1535	}
1536	spin_unlock_irqrestore(&buffers->rb_lock, flags);
1537	if (!list_empty(&stale))
1538		rpcrdma_retry_flushed_linv(&stale, buffers);
1539	return req;
1540}
1541
1542/*
1543 * Put request/reply buffers back into pool.
1544 * Pre-decrement counter/array index.
1545 */
1546void
1547rpcrdma_buffer_put(struct rpcrdma_req *req)
1548{
1549	struct rpcrdma_buffer *buffers = req->rl_buffer;
1550	struct rpcrdma_ia *ia = rdmab_to_ia(buffers);
1551	unsigned long flags;
1552
1553	spin_lock_irqsave(&buffers->rb_lock, flags);
1554	rpcrdma_buffer_put_sendbuf(req, buffers);
1555	switch (ia->ri_memreg_strategy) {
1556	case RPCRDMA_FRMR:
1557	case RPCRDMA_MTHCAFMR:
1558		rpcrdma_buffer_put_mrs(req, buffers);
1559		break;
1560	default:
1561		break;
1562	}
1563	spin_unlock_irqrestore(&buffers->rb_lock, flags);
1564}
1565
1566/*
1567 * Recover reply buffers from pool.
1568 * This happens when recovering from error conditions.
1569 * Post-increment counter/array index.
1570 */
1571void
1572rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
1573{
1574	struct rpcrdma_buffer *buffers = req->rl_buffer;
1575	unsigned long flags;
1576
1577	if (req->rl_iov.length == 0)	/* special case xprt_rdma_allocate() */
1578		buffers = ((struct rpcrdma_req *) buffers)->rl_buffer;
1579	spin_lock_irqsave(&buffers->rb_lock, flags);
1580	if (buffers->rb_recv_index < buffers->rb_max_requests) {
1581		req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
1582		buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
1583	}
1584	spin_unlock_irqrestore(&buffers->rb_lock, flags);
1585}
1586
1587/*
1588 * Put reply buffers back into pool when not attached to
1589 * request. This happens in error conditions.
1590 */
1591void
1592rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
1593{
1594	struct rpcrdma_buffer *buffers = rep->rr_buffer;
1595	unsigned long flags;
1596
1597	rep->rr_func = NULL;
1598	spin_lock_irqsave(&buffers->rb_lock, flags);
1599	buffers->rb_recv_bufs[--buffers->rb_recv_index] = rep;
1600	spin_unlock_irqrestore(&buffers->rb_lock, flags);
1601}
1602
1603/*
1604 * Wrappers for internal-use kmalloc memory registration, used by buffer code.
1605 */
1606
1607int
1608rpcrdma_register_internal(struct rpcrdma_ia *ia, void *va, int len,
1609				struct ib_mr **mrp, struct ib_sge *iov)
1610{
1611	struct ib_phys_buf ipb;
1612	struct ib_mr *mr;
1613	int rc;
1614
1615	/*
1616	 * All memory passed here was kmalloc'ed, therefore phys-contiguous.
1617	 */
1618	iov->addr = ib_dma_map_single(ia->ri_id->device,
1619			va, len, DMA_BIDIRECTIONAL);
1620	if (ib_dma_mapping_error(ia->ri_id->device, iov->addr))
1621		return -ENOMEM;
1622
1623	iov->length = len;
1624
1625	if (ia->ri_have_dma_lkey) {
1626		*mrp = NULL;
1627		iov->lkey = ia->ri_dma_lkey;
1628		return 0;
1629	} else if (ia->ri_bind_mem != NULL) {
1630		*mrp = NULL;
1631		iov->lkey = ia->ri_bind_mem->lkey;
1632		return 0;
1633	}
1634
1635	ipb.addr = iov->addr;
1636	ipb.size = iov->length;
1637	mr = ib_reg_phys_mr(ia->ri_pd, &ipb, 1,
1638			IB_ACCESS_LOCAL_WRITE, &iov->addr);
1639
1640	dprintk("RPC:       %s: phys convert: 0x%llx "
1641			"registered 0x%llx length %d\n",
1642			__func__, (unsigned long long)ipb.addr,
1643			(unsigned long long)iov->addr, len);
1644
1645	if (IS_ERR(mr)) {
1646		*mrp = NULL;
1647		rc = PTR_ERR(mr);
1648		dprintk("RPC:       %s: failed with %i\n", __func__, rc);
1649	} else {
1650		*mrp = mr;
1651		iov->lkey = mr->lkey;
1652		rc = 0;
1653	}
1654
1655	return rc;
1656}
1657
1658int
1659rpcrdma_deregister_internal(struct rpcrdma_ia *ia,
1660				struct ib_mr *mr, struct ib_sge *iov)
1661{
1662	int rc;
1663
1664	ib_dma_unmap_single(ia->ri_id->device,
1665			iov->addr, iov->length, DMA_BIDIRECTIONAL);
1666
1667	if (NULL == mr)
1668		return 0;
1669
1670	rc = ib_dereg_mr(mr);
1671	if (rc)
1672		dprintk("RPC:       %s: ib_dereg_mr failed %i\n", __func__, rc);
1673	return rc;
1674}
1675
1676/*
1677 * Wrappers for chunk registration, shared by read/write chunk code.
1678 */
1679
1680static void
1681rpcrdma_map_one(struct rpcrdma_ia *ia, struct rpcrdma_mr_seg *seg, int writing)
1682{
1683	seg->mr_dir = writing ? DMA_FROM_DEVICE : DMA_TO_DEVICE;
1684	seg->mr_dmalen = seg->mr_len;
1685	if (seg->mr_page)
1686		seg->mr_dma = ib_dma_map_page(ia->ri_id->device,
1687				seg->mr_page, offset_in_page(seg->mr_offset),
1688				seg->mr_dmalen, seg->mr_dir);
1689	else
1690		seg->mr_dma = ib_dma_map_single(ia->ri_id->device,
1691				seg->mr_offset,
1692				seg->mr_dmalen, seg->mr_dir);
1693	if (ib_dma_mapping_error(ia->ri_id->device, seg->mr_dma)) {
1694		dprintk("RPC:       %s: mr_dma %llx mr_offset %p mr_dma_len %zu\n",
1695			__func__,
1696			(unsigned long long)seg->mr_dma,
1697			seg->mr_offset, seg->mr_dmalen);
1698	}
1699}
1700
1701static void
1702rpcrdma_unmap_one(struct rpcrdma_ia *ia, struct rpcrdma_mr_seg *seg)
1703{
1704	if (seg->mr_page)
1705		ib_dma_unmap_page(ia->ri_id->device,
1706				seg->mr_dma, seg->mr_dmalen, seg->mr_dir);
1707	else
1708		ib_dma_unmap_single(ia->ri_id->device,
1709				seg->mr_dma, seg->mr_dmalen, seg->mr_dir);
1710}
1711
1712static int
1713rpcrdma_register_frmr_external(struct rpcrdma_mr_seg *seg,
1714			int *nsegs, int writing, struct rpcrdma_ia *ia,
1715			struct rpcrdma_xprt *r_xprt)
1716{
1717	struct rpcrdma_mr_seg *seg1 = seg;
1718	struct rpcrdma_mw *mw = seg1->mr_chunk.rl_mw;
1719	struct rpcrdma_frmr *frmr = &mw->r.frmr;
1720	struct ib_mr *mr = frmr->fr_mr;
1721	struct ib_send_wr fastreg_wr, *bad_wr;
1722	u8 key;
1723	int len, pageoff;
1724	int i, rc;
1725	int seg_len;
1726	u64 pa;
1727	int page_no;
1728
1729	pageoff = offset_in_page(seg1->mr_offset);
1730	seg1->mr_offset -= pageoff;	/* start of page */
1731	seg1->mr_len += pageoff;
1732	len = -pageoff;
1733	if (*nsegs > ia->ri_max_frmr_depth)
1734		*nsegs = ia->ri_max_frmr_depth;
1735	for (page_no = i = 0; i < *nsegs;) {
1736		rpcrdma_map_one(ia, seg, writing);
1737		pa = seg->mr_dma;
1738		for (seg_len = seg->mr_len; seg_len > 0; seg_len -= PAGE_SIZE) {
1739			frmr->fr_pgl->page_list[page_no++] = pa;
1740			pa += PAGE_SIZE;
1741		}
1742		len += seg->mr_len;
1743		++seg;
1744		++i;
1745		/* Check for holes */
1746		if ((i < *nsegs && offset_in_page(seg->mr_offset)) ||
1747		    offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
1748			break;
1749	}
1750	dprintk("RPC:       %s: Using frmr %p to map %d segments\n",
1751		__func__, mw, i);
1752
1753	frmr->fr_state = FRMR_IS_VALID;
1754
1755	memset(&fastreg_wr, 0, sizeof(fastreg_wr));
1756	fastreg_wr.wr_id = (unsigned long)(void *)mw;
1757	fastreg_wr.opcode = IB_WR_FAST_REG_MR;
1758	fastreg_wr.wr.fast_reg.iova_start = seg1->mr_dma;
1759	fastreg_wr.wr.fast_reg.page_list = frmr->fr_pgl;
1760	fastreg_wr.wr.fast_reg.page_list_len = page_no;
1761	fastreg_wr.wr.fast_reg.page_shift = PAGE_SHIFT;
1762	fastreg_wr.wr.fast_reg.length = page_no << PAGE_SHIFT;
1763	if (fastreg_wr.wr.fast_reg.length < len) {
1764		rc = -EIO;
1765		goto out_err;
1766	}
1767
1768	/* Bump the key */
1769	key = (u8)(mr->rkey & 0x000000FF);
1770	ib_update_fast_reg_key(mr, ++key);
1771
1772	fastreg_wr.wr.fast_reg.access_flags = (writing ?
1773				IB_ACCESS_REMOTE_WRITE | IB_ACCESS_LOCAL_WRITE :
1774				IB_ACCESS_REMOTE_READ);
1775	fastreg_wr.wr.fast_reg.rkey = mr->rkey;
1776	DECR_CQCOUNT(&r_xprt->rx_ep);
1777
1778	rc = ib_post_send(ia->ri_id->qp, &fastreg_wr, &bad_wr);
1779	if (rc) {
1780		dprintk("RPC:       %s: failed ib_post_send for register,"
1781			" status %i\n", __func__, rc);
1782		ib_update_fast_reg_key(mr, --key);
1783		goto out_err;
1784	} else {
1785		seg1->mr_rkey = mr->rkey;
1786		seg1->mr_base = seg1->mr_dma + pageoff;
1787		seg1->mr_nsegs = i;
1788		seg1->mr_len = len;
1789	}
1790	*nsegs = i;
1791	return 0;
1792out_err:
1793	frmr->fr_state = FRMR_IS_INVALID;
1794	while (i--)
1795		rpcrdma_unmap_one(ia, --seg);
1796	return rc;
1797}
1798
1799static int
1800rpcrdma_deregister_frmr_external(struct rpcrdma_mr_seg *seg,
1801			struct rpcrdma_ia *ia, struct rpcrdma_xprt *r_xprt)
1802{
1803	struct rpcrdma_mr_seg *seg1 = seg;
1804	struct ib_send_wr invalidate_wr, *bad_wr;
1805	int rc;
1806
1807	seg1->mr_chunk.rl_mw->r.frmr.fr_state = FRMR_IS_INVALID;
1808
1809	memset(&invalidate_wr, 0, sizeof invalidate_wr);
1810	invalidate_wr.wr_id = (unsigned long)(void *)seg1->mr_chunk.rl_mw;
1811	invalidate_wr.opcode = IB_WR_LOCAL_INV;
1812	invalidate_wr.ex.invalidate_rkey = seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
1813	DECR_CQCOUNT(&r_xprt->rx_ep);
1814
1815	read_lock(&ia->ri_qplock);
1816	while (seg1->mr_nsegs--)
1817		rpcrdma_unmap_one(ia, seg++);
1818	rc = ib_post_send(ia->ri_id->qp, &invalidate_wr, &bad_wr);
1819	read_unlock(&ia->ri_qplock);
1820	if (rc) {
1821		/* Force rpcrdma_buffer_get() to retry */
1822		seg1->mr_chunk.rl_mw->r.frmr.fr_state = FRMR_IS_STALE;
1823		dprintk("RPC:       %s: failed ib_post_send for invalidate,"
1824			" status %i\n", __func__, rc);
1825	}
1826	return rc;
1827}
1828
1829static int
1830rpcrdma_register_fmr_external(struct rpcrdma_mr_seg *seg,
1831			int *nsegs, int writing, struct rpcrdma_ia *ia)
1832{
1833	struct rpcrdma_mr_seg *seg1 = seg;
1834	u64 physaddrs[RPCRDMA_MAX_DATA_SEGS];
1835	int len, pageoff, i, rc;
1836
1837	pageoff = offset_in_page(seg1->mr_offset);
1838	seg1->mr_offset -= pageoff;	/* start of page */
1839	seg1->mr_len += pageoff;
1840	len = -pageoff;
1841	if (*nsegs > RPCRDMA_MAX_DATA_SEGS)
1842		*nsegs = RPCRDMA_MAX_DATA_SEGS;
1843	for (i = 0; i < *nsegs;) {
1844		rpcrdma_map_one(ia, seg, writing);
1845		physaddrs[i] = seg->mr_dma;
1846		len += seg->mr_len;
1847		++seg;
1848		++i;
1849		/* Check for holes */
1850		if ((i < *nsegs && offset_in_page(seg->mr_offset)) ||
1851		    offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
1852			break;
1853	}
1854	rc = ib_map_phys_fmr(seg1->mr_chunk.rl_mw->r.fmr,
1855				physaddrs, i, seg1->mr_dma);
1856	if (rc) {
1857		dprintk("RPC:       %s: failed ib_map_phys_fmr "
1858			"%u@0x%llx+%i (%d)... status %i\n", __func__,
1859			len, (unsigned long long)seg1->mr_dma,
1860			pageoff, i, rc);
1861		while (i--)
1862			rpcrdma_unmap_one(ia, --seg);
1863	} else {
1864		seg1->mr_rkey = seg1->mr_chunk.rl_mw->r.fmr->rkey;
1865		seg1->mr_base = seg1->mr_dma + pageoff;
1866		seg1->mr_nsegs = i;
1867		seg1->mr_len = len;
1868	}
1869	*nsegs = i;
1870	return rc;
1871}
1872
1873static int
1874rpcrdma_deregister_fmr_external(struct rpcrdma_mr_seg *seg,
1875			struct rpcrdma_ia *ia)
1876{
1877	struct rpcrdma_mr_seg *seg1 = seg;
1878	LIST_HEAD(l);
1879	int rc;
1880
1881	list_add(&seg1->mr_chunk.rl_mw->r.fmr->list, &l);
1882	rc = ib_unmap_fmr(&l);
1883	read_lock(&ia->ri_qplock);
1884	while (seg1->mr_nsegs--)
1885		rpcrdma_unmap_one(ia, seg++);
1886	read_unlock(&ia->ri_qplock);
1887	if (rc)
1888		dprintk("RPC:       %s: failed ib_unmap_fmr,"
1889			" status %i\n", __func__, rc);
1890	return rc;
1891}
1892
1893int
1894rpcrdma_register_external(struct rpcrdma_mr_seg *seg,
1895			int nsegs, int writing, struct rpcrdma_xprt *r_xprt)
1896{
1897	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1898	int rc = 0;
1899
1900	switch (ia->ri_memreg_strategy) {
1901
1902	case RPCRDMA_ALLPHYSICAL:
1903		rpcrdma_map_one(ia, seg, writing);
1904		seg->mr_rkey = ia->ri_bind_mem->rkey;
1905		seg->mr_base = seg->mr_dma;
1906		seg->mr_nsegs = 1;
1907		nsegs = 1;
1908		break;
1909
1910	/* Registration using frmr registration */
1911	case RPCRDMA_FRMR:
1912		rc = rpcrdma_register_frmr_external(seg, &nsegs, writing, ia, r_xprt);
1913		break;
1914
1915	/* Registration using fmr memory registration */
1916	case RPCRDMA_MTHCAFMR:
1917		rc = rpcrdma_register_fmr_external(seg, &nsegs, writing, ia);
1918		break;
1919
1920	default:
1921		return -1;
1922	}
1923	if (rc)
1924		return -1;
1925
1926	return nsegs;
1927}
1928
1929int
1930rpcrdma_deregister_external(struct rpcrdma_mr_seg *seg,
1931		struct rpcrdma_xprt *r_xprt)
1932{
1933	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1934	int nsegs = seg->mr_nsegs, rc;
1935
1936	switch (ia->ri_memreg_strategy) {
1937
1938	case RPCRDMA_ALLPHYSICAL:
1939		read_lock(&ia->ri_qplock);
1940		rpcrdma_unmap_one(ia, seg);
1941		read_unlock(&ia->ri_qplock);
1942		break;
1943
1944	case RPCRDMA_FRMR:
1945		rc = rpcrdma_deregister_frmr_external(seg, ia, r_xprt);
1946		break;
1947
1948	case RPCRDMA_MTHCAFMR:
1949		rc = rpcrdma_deregister_fmr_external(seg, ia);
1950		break;
1951
1952	default:
1953		break;
1954	}
1955	return nsegs;
1956}
1957
1958/*
1959 * Prepost any receive buffer, then post send.
1960 *
1961 * Receive buffer is donated to hardware, reclaimed upon recv completion.
1962 */
1963int
1964rpcrdma_ep_post(struct rpcrdma_ia *ia,
1965		struct rpcrdma_ep *ep,
1966		struct rpcrdma_req *req)
1967{
1968	struct ib_send_wr send_wr, *send_wr_fail;
1969	struct rpcrdma_rep *rep = req->rl_reply;
1970	int rc;
1971
1972	if (rep) {
1973		rc = rpcrdma_ep_post_recv(ia, ep, rep);
1974		if (rc)
1975			goto out;
1976		req->rl_reply = NULL;
1977	}
1978
1979	send_wr.next = NULL;
1980	send_wr.wr_id = 0ULL;	/* no send cookie */
1981	send_wr.sg_list = req->rl_send_iov;
1982	send_wr.num_sge = req->rl_niovs;
1983	send_wr.opcode = IB_WR_SEND;
1984	if (send_wr.num_sge == 4)	/* no need to sync any pad (constant) */
1985		ib_dma_sync_single_for_device(ia->ri_id->device,
1986			req->rl_send_iov[3].addr, req->rl_send_iov[3].length,
1987			DMA_TO_DEVICE);
1988	ib_dma_sync_single_for_device(ia->ri_id->device,
1989		req->rl_send_iov[1].addr, req->rl_send_iov[1].length,
1990		DMA_TO_DEVICE);
1991	ib_dma_sync_single_for_device(ia->ri_id->device,
1992		req->rl_send_iov[0].addr, req->rl_send_iov[0].length,
1993		DMA_TO_DEVICE);
1994
1995	if (DECR_CQCOUNT(ep) > 0)
1996		send_wr.send_flags = 0;
1997	else { /* Provider must take a send completion every now and then */
1998		INIT_CQCOUNT(ep);
1999		send_wr.send_flags = IB_SEND_SIGNALED;
2000	}
2001
2002	rc = ib_post_send(ia->ri_id->qp, &send_wr, &send_wr_fail);
2003	if (rc)
2004		dprintk("RPC:       %s: ib_post_send returned %i\n", __func__,
2005			rc);
2006out:
2007	return rc;
2008}
2009
2010/*
2011 * (Re)post a receive buffer.
2012 */
2013int
2014rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
2015		     struct rpcrdma_ep *ep,
2016		     struct rpcrdma_rep *rep)
2017{
2018	struct ib_recv_wr recv_wr, *recv_wr_fail;
2019	int rc;
2020
2021	recv_wr.next = NULL;
2022	recv_wr.wr_id = (u64) (unsigned long) rep;
2023	recv_wr.sg_list = &rep->rr_iov;
2024	recv_wr.num_sge = 1;
2025
2026	ib_dma_sync_single_for_cpu(ia->ri_id->device,
2027		rep->rr_iov.addr, rep->rr_iov.length, DMA_BIDIRECTIONAL);
2028
2029	rc = ib_post_recv(ia->ri_id->qp, &recv_wr, &recv_wr_fail);
2030
2031	if (rc)
2032		dprintk("RPC:       %s: ib_post_recv returned %i\n", __func__,
2033			rc);
2034	return rc;
2035}
2036
2037/* Physical mapping means one Read/Write list entry per-page.
2038 * All list entries must fit within an inline buffer
2039 *
2040 * NB: The server must return a Write list for NFS READ,
2041 *     which has the same constraint. Factor in the inline
2042 *     rsize as well.
2043 */
2044static size_t
2045rpcrdma_physical_max_payload(struct rpcrdma_xprt *r_xprt)
2046{
2047	struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
2048	unsigned int inline_size, pages;
2049
2050	inline_size = min_t(unsigned int,
2051			    cdata->inline_wsize, cdata->inline_rsize);
2052	inline_size -= RPCRDMA_HDRLEN_MIN;
2053	pages = inline_size / sizeof(struct rpcrdma_segment);
2054	return pages << PAGE_SHIFT;
2055}
2056
2057static size_t
2058rpcrdma_mr_max_payload(struct rpcrdma_xprt *r_xprt)
2059{
2060	return RPCRDMA_MAX_DATA_SEGS << PAGE_SHIFT;
2061}
2062
2063size_t
2064rpcrdma_max_payload(struct rpcrdma_xprt *r_xprt)
2065{
2066	size_t result;
2067
2068	switch (r_xprt->rx_ia.ri_memreg_strategy) {
2069	case RPCRDMA_ALLPHYSICAL:
2070		result = rpcrdma_physical_max_payload(r_xprt);
2071		break;
2072	default:
2073		result = rpcrdma_mr_max_payload(r_xprt);
2074	}
2075	return result;
2076}
2077