verbs.c revision f64f9e719261a87818dd192a3a2352e5b20fbd0f
1/*
2 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
3 *
4 * This software is available to you under a choice of one of two
5 * licenses.  You may choose to be licensed under the terms of the GNU
6 * General Public License (GPL) Version 2, available from the file
7 * COPYING in the main directory of this source tree, or the BSD-type
8 * license below:
9 *
10 * Redistribution and use in source and binary forms, with or without
11 * modification, are permitted provided that the following conditions
12 * are met:
13 *
14 *      Redistributions of source code must retain the above copyright
15 *      notice, this list of conditions and the following disclaimer.
16 *
17 *      Redistributions in binary form must reproduce the above
18 *      copyright notice, this list of conditions and the following
19 *      disclaimer in the documentation and/or other materials provided
20 *      with the distribution.
21 *
22 *      Neither the name of the Network Appliance, Inc. nor the names of
23 *      its contributors may be used to endorse or promote products
24 *      derived from this software without specific prior written
25 *      permission.
26 *
27 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
28 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
29 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
30 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
31 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
32 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
33 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
34 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
35 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
36 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
37 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
38 */
39
40/*
41 * verbs.c
42 *
43 * Encapsulates the major functions managing:
44 *  o adapters
45 *  o endpoints
46 *  o connections
47 *  o buffer memory
48 */
49
50#include <linux/pci.h>	/* for Tavor hack below */
51
52#include "xprt_rdma.h"
53
54/*
55 * Globals/Macros
56 */
57
58#ifdef RPC_DEBUG
59# define RPCDBG_FACILITY	RPCDBG_TRANS
60#endif
61
62/*
63 * internal functions
64 */
65
66/*
67 * handle replies in tasklet context, using a single, global list
68 * rdma tasklet function -- just turn around and call the func
69 * for all replies on the list
70 */
71
72static DEFINE_SPINLOCK(rpcrdma_tk_lock_g);
73static LIST_HEAD(rpcrdma_tasklets_g);
74
75static void
76rpcrdma_run_tasklet(unsigned long data)
77{
78	struct rpcrdma_rep *rep;
79	void (*func)(struct rpcrdma_rep *);
80	unsigned long flags;
81
82	data = data;
83	spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
84	while (!list_empty(&rpcrdma_tasklets_g)) {
85		rep = list_entry(rpcrdma_tasklets_g.next,
86				 struct rpcrdma_rep, rr_list);
87		list_del(&rep->rr_list);
88		func = rep->rr_func;
89		rep->rr_func = NULL;
90		spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
91
92		if (func)
93			func(rep);
94		else
95			rpcrdma_recv_buffer_put(rep);
96
97		spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
98	}
99	spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
100}
101
102static DECLARE_TASKLET(rpcrdma_tasklet_g, rpcrdma_run_tasklet, 0UL);
103
104static inline void
105rpcrdma_schedule_tasklet(struct rpcrdma_rep *rep)
106{
107	unsigned long flags;
108
109	spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
110	list_add_tail(&rep->rr_list, &rpcrdma_tasklets_g);
111	spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
112	tasklet_schedule(&rpcrdma_tasklet_g);
113}
114
115static void
116rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context)
117{
118	struct rpcrdma_ep *ep = context;
119
120	dprintk("RPC:       %s: QP error %X on device %s ep %p\n",
121		__func__, event->event, event->device->name, context);
122	if (ep->rep_connected == 1) {
123		ep->rep_connected = -EIO;
124		ep->rep_func(ep);
125		wake_up_all(&ep->rep_connect_wait);
126	}
127}
128
129static void
130rpcrdma_cq_async_error_upcall(struct ib_event *event, void *context)
131{
132	struct rpcrdma_ep *ep = context;
133
134	dprintk("RPC:       %s: CQ error %X on device %s ep %p\n",
135		__func__, event->event, event->device->name, context);
136	if (ep->rep_connected == 1) {
137		ep->rep_connected = -EIO;
138		ep->rep_func(ep);
139		wake_up_all(&ep->rep_connect_wait);
140	}
141}
142
143static inline
144void rpcrdma_event_process(struct ib_wc *wc)
145{
146	struct rpcrdma_rep *rep =
147			(struct rpcrdma_rep *)(unsigned long) wc->wr_id;
148
149	dprintk("RPC:       %s: event rep %p status %X opcode %X length %u\n",
150		__func__, rep, wc->status, wc->opcode, wc->byte_len);
151
152	if (!rep) /* send or bind completion that we don't care about */
153		return;
154
155	if (IB_WC_SUCCESS != wc->status) {
156		dprintk("RPC:       %s: %s WC status %X, connection lost\n",
157			__func__, (wc->opcode & IB_WC_RECV) ? "recv" : "send",
158			 wc->status);
159		rep->rr_len = ~0U;
160		rpcrdma_schedule_tasklet(rep);
161		return;
162	}
163
164	switch (wc->opcode) {
165	case IB_WC_RECV:
166		rep->rr_len = wc->byte_len;
167		ib_dma_sync_single_for_cpu(
168			rdmab_to_ia(rep->rr_buffer)->ri_id->device,
169			rep->rr_iov.addr, rep->rr_len, DMA_FROM_DEVICE);
170		/* Keep (only) the most recent credits, after check validity */
171		if (rep->rr_len >= 16) {
172			struct rpcrdma_msg *p =
173					(struct rpcrdma_msg *) rep->rr_base;
174			unsigned int credits = ntohl(p->rm_credit);
175			if (credits == 0) {
176				dprintk("RPC:       %s: server"
177					" dropped credits to 0!\n", __func__);
178				/* don't deadlock */
179				credits = 1;
180			} else if (credits > rep->rr_buffer->rb_max_requests) {
181				dprintk("RPC:       %s: server"
182					" over-crediting: %d (%d)\n",
183					__func__, credits,
184					rep->rr_buffer->rb_max_requests);
185				credits = rep->rr_buffer->rb_max_requests;
186			}
187			atomic_set(&rep->rr_buffer->rb_credits, credits);
188		}
189		/* fall through */
190	case IB_WC_BIND_MW:
191		rpcrdma_schedule_tasklet(rep);
192		break;
193	default:
194		dprintk("RPC:       %s: unexpected WC event %X\n",
195			__func__, wc->opcode);
196		break;
197	}
198}
199
200static inline int
201rpcrdma_cq_poll(struct ib_cq *cq)
202{
203	struct ib_wc wc;
204	int rc;
205
206	for (;;) {
207		rc = ib_poll_cq(cq, 1, &wc);
208		if (rc < 0) {
209			dprintk("RPC:       %s: ib_poll_cq failed %i\n",
210				__func__, rc);
211			return rc;
212		}
213		if (rc == 0)
214			break;
215
216		rpcrdma_event_process(&wc);
217	}
218
219	return 0;
220}
221
222/*
223 * rpcrdma_cq_event_upcall
224 *
225 * This upcall handles recv, send, bind and unbind events.
226 * It is reentrant but processes single events in order to maintain
227 * ordering of receives to keep server credits.
228 *
229 * It is the responsibility of the scheduled tasklet to return
230 * recv buffers to the pool. NOTE: this affects synchronization of
231 * connection shutdown. That is, the structures required for
232 * the completion of the reply handler must remain intact until
233 * all memory has been reclaimed.
234 *
235 * Note that send events are suppressed and do not result in an upcall.
236 */
237static void
238rpcrdma_cq_event_upcall(struct ib_cq *cq, void *context)
239{
240	int rc;
241
242	rc = rpcrdma_cq_poll(cq);
243	if (rc)
244		return;
245
246	rc = ib_req_notify_cq(cq, IB_CQ_NEXT_COMP);
247	if (rc) {
248		dprintk("RPC:       %s: ib_req_notify_cq failed %i\n",
249			__func__, rc);
250		return;
251	}
252
253	rpcrdma_cq_poll(cq);
254}
255
256#ifdef RPC_DEBUG
257static const char * const conn[] = {
258	"address resolved",
259	"address error",
260	"route resolved",
261	"route error",
262	"connect request",
263	"connect response",
264	"connect error",
265	"unreachable",
266	"rejected",
267	"established",
268	"disconnected",
269	"device removal"
270};
271#endif
272
273static int
274rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
275{
276	struct rpcrdma_xprt *xprt = id->context;
277	struct rpcrdma_ia *ia = &xprt->rx_ia;
278	struct rpcrdma_ep *ep = &xprt->rx_ep;
279#ifdef RPC_DEBUG
280	struct sockaddr_in *addr = (struct sockaddr_in *) &ep->rep_remote_addr;
281#endif
282	struct ib_qp_attr attr;
283	struct ib_qp_init_attr iattr;
284	int connstate = 0;
285
286	switch (event->event) {
287	case RDMA_CM_EVENT_ADDR_RESOLVED:
288	case RDMA_CM_EVENT_ROUTE_RESOLVED:
289		ia->ri_async_rc = 0;
290		complete(&ia->ri_done);
291		break;
292	case RDMA_CM_EVENT_ADDR_ERROR:
293		ia->ri_async_rc = -EHOSTUNREACH;
294		dprintk("RPC:       %s: CM address resolution error, ep 0x%p\n",
295			__func__, ep);
296		complete(&ia->ri_done);
297		break;
298	case RDMA_CM_EVENT_ROUTE_ERROR:
299		ia->ri_async_rc = -ENETUNREACH;
300		dprintk("RPC:       %s: CM route resolution error, ep 0x%p\n",
301			__func__, ep);
302		complete(&ia->ri_done);
303		break;
304	case RDMA_CM_EVENT_ESTABLISHED:
305		connstate = 1;
306		ib_query_qp(ia->ri_id->qp, &attr,
307			IB_QP_MAX_QP_RD_ATOMIC | IB_QP_MAX_DEST_RD_ATOMIC,
308			&iattr);
309		dprintk("RPC:       %s: %d responder resources"
310			" (%d initiator)\n",
311			__func__, attr.max_dest_rd_atomic, attr.max_rd_atomic);
312		goto connected;
313	case RDMA_CM_EVENT_CONNECT_ERROR:
314		connstate = -ENOTCONN;
315		goto connected;
316	case RDMA_CM_EVENT_UNREACHABLE:
317		connstate = -ENETDOWN;
318		goto connected;
319	case RDMA_CM_EVENT_REJECTED:
320		connstate = -ECONNREFUSED;
321		goto connected;
322	case RDMA_CM_EVENT_DISCONNECTED:
323		connstate = -ECONNABORTED;
324		goto connected;
325	case RDMA_CM_EVENT_DEVICE_REMOVAL:
326		connstate = -ENODEV;
327connected:
328		dprintk("RPC:       %s: %s: %pI4:%u (ep 0x%p event 0x%x)\n",
329			__func__,
330			(event->event <= 11) ? conn[event->event] :
331						"unknown connection error",
332			&addr->sin_addr.s_addr,
333			ntohs(addr->sin_port),
334			ep, event->event);
335		atomic_set(&rpcx_to_rdmax(ep->rep_xprt)->rx_buf.rb_credits, 1);
336		dprintk("RPC:       %s: %sconnected\n",
337					__func__, connstate > 0 ? "" : "dis");
338		ep->rep_connected = connstate;
339		ep->rep_func(ep);
340		wake_up_all(&ep->rep_connect_wait);
341		break;
342	default:
343		dprintk("RPC:       %s: unexpected CM event %d\n",
344			__func__, event->event);
345		break;
346	}
347
348#ifdef RPC_DEBUG
349	if (connstate == 1) {
350		int ird = attr.max_dest_rd_atomic;
351		int tird = ep->rep_remote_cma.responder_resources;
352		printk(KERN_INFO "rpcrdma: connection to %pI4:%u "
353			"on %s, memreg %d slots %d ird %d%s\n",
354			&addr->sin_addr.s_addr,
355			ntohs(addr->sin_port),
356			ia->ri_id->device->name,
357			ia->ri_memreg_strategy,
358			xprt->rx_buf.rb_max_requests,
359			ird, ird < 4 && ird < tird / 2 ? " (low!)" : "");
360	} else if (connstate < 0) {
361		printk(KERN_INFO "rpcrdma: connection to %pI4:%u closed (%d)\n",
362			&addr->sin_addr.s_addr,
363			ntohs(addr->sin_port),
364			connstate);
365	}
366#endif
367
368	return 0;
369}
370
371static struct rdma_cm_id *
372rpcrdma_create_id(struct rpcrdma_xprt *xprt,
373			struct rpcrdma_ia *ia, struct sockaddr *addr)
374{
375	struct rdma_cm_id *id;
376	int rc;
377
378	init_completion(&ia->ri_done);
379
380	id = rdma_create_id(rpcrdma_conn_upcall, xprt, RDMA_PS_TCP);
381	if (IS_ERR(id)) {
382		rc = PTR_ERR(id);
383		dprintk("RPC:       %s: rdma_create_id() failed %i\n",
384			__func__, rc);
385		return id;
386	}
387
388	ia->ri_async_rc = -ETIMEDOUT;
389	rc = rdma_resolve_addr(id, NULL, addr, RDMA_RESOLVE_TIMEOUT);
390	if (rc) {
391		dprintk("RPC:       %s: rdma_resolve_addr() failed %i\n",
392			__func__, rc);
393		goto out;
394	}
395	wait_for_completion_interruptible_timeout(&ia->ri_done,
396				msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
397	rc = ia->ri_async_rc;
398	if (rc)
399		goto out;
400
401	ia->ri_async_rc = -ETIMEDOUT;
402	rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT);
403	if (rc) {
404		dprintk("RPC:       %s: rdma_resolve_route() failed %i\n",
405			__func__, rc);
406		goto out;
407	}
408	wait_for_completion_interruptible_timeout(&ia->ri_done,
409				msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
410	rc = ia->ri_async_rc;
411	if (rc)
412		goto out;
413
414	return id;
415
416out:
417	rdma_destroy_id(id);
418	return ERR_PTR(rc);
419}
420
421/*
422 * Drain any cq, prior to teardown.
423 */
424static void
425rpcrdma_clean_cq(struct ib_cq *cq)
426{
427	struct ib_wc wc;
428	int count = 0;
429
430	while (1 == ib_poll_cq(cq, 1, &wc))
431		++count;
432
433	if (count)
434		dprintk("RPC:       %s: flushed %d events (last 0x%x)\n",
435			__func__, count, wc.opcode);
436}
437
438/*
439 * Exported functions.
440 */
441
442/*
443 * Open and initialize an Interface Adapter.
444 *  o initializes fields of struct rpcrdma_ia, including
445 *    interface and provider attributes and protection zone.
446 */
447int
448rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
449{
450	int rc, mem_priv;
451	struct ib_device_attr devattr;
452	struct rpcrdma_ia *ia = &xprt->rx_ia;
453
454	ia->ri_id = rpcrdma_create_id(xprt, ia, addr);
455	if (IS_ERR(ia->ri_id)) {
456		rc = PTR_ERR(ia->ri_id);
457		goto out1;
458	}
459
460	ia->ri_pd = ib_alloc_pd(ia->ri_id->device);
461	if (IS_ERR(ia->ri_pd)) {
462		rc = PTR_ERR(ia->ri_pd);
463		dprintk("RPC:       %s: ib_alloc_pd() failed %i\n",
464			__func__, rc);
465		goto out2;
466	}
467
468	/*
469	 * Query the device to determine if the requested memory
470	 * registration strategy is supported. If it isn't, set the
471	 * strategy to a globally supported model.
472	 */
473	rc = ib_query_device(ia->ri_id->device, &devattr);
474	if (rc) {
475		dprintk("RPC:       %s: ib_query_device failed %d\n",
476			__func__, rc);
477		goto out2;
478	}
479
480	if (devattr.device_cap_flags & IB_DEVICE_LOCAL_DMA_LKEY) {
481		ia->ri_have_dma_lkey = 1;
482		ia->ri_dma_lkey = ia->ri_id->device->local_dma_lkey;
483	}
484
485	switch (memreg) {
486	case RPCRDMA_MEMWINDOWS:
487	case RPCRDMA_MEMWINDOWS_ASYNC:
488		if (!(devattr.device_cap_flags & IB_DEVICE_MEM_WINDOW)) {
489			dprintk("RPC:       %s: MEMWINDOWS registration "
490				"specified but not supported by adapter, "
491				"using slower RPCRDMA_REGISTER\n",
492				__func__);
493			memreg = RPCRDMA_REGISTER;
494		}
495		break;
496	case RPCRDMA_MTHCAFMR:
497		if (!ia->ri_id->device->alloc_fmr) {
498#if RPCRDMA_PERSISTENT_REGISTRATION
499			dprintk("RPC:       %s: MTHCAFMR registration "
500				"specified but not supported by adapter, "
501				"using riskier RPCRDMA_ALLPHYSICAL\n",
502				__func__);
503			memreg = RPCRDMA_ALLPHYSICAL;
504#else
505			dprintk("RPC:       %s: MTHCAFMR registration "
506				"specified but not supported by adapter, "
507				"using slower RPCRDMA_REGISTER\n",
508				__func__);
509			memreg = RPCRDMA_REGISTER;
510#endif
511		}
512		break;
513	case RPCRDMA_FRMR:
514		/* Requires both frmr reg and local dma lkey */
515		if ((devattr.device_cap_flags &
516		     (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) !=
517		    (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) {
518#if RPCRDMA_PERSISTENT_REGISTRATION
519			dprintk("RPC:       %s: FRMR registration "
520				"specified but not supported by adapter, "
521				"using riskier RPCRDMA_ALLPHYSICAL\n",
522				__func__);
523			memreg = RPCRDMA_ALLPHYSICAL;
524#else
525			dprintk("RPC:       %s: FRMR registration "
526				"specified but not supported by adapter, "
527				"using slower RPCRDMA_REGISTER\n",
528				__func__);
529			memreg = RPCRDMA_REGISTER;
530#endif
531		}
532		break;
533	}
534
535	/*
536	 * Optionally obtain an underlying physical identity mapping in
537	 * order to do a memory window-based bind. This base registration
538	 * is protected from remote access - that is enabled only by binding
539	 * for the specific bytes targeted during each RPC operation, and
540	 * revoked after the corresponding completion similar to a storage
541	 * adapter.
542	 */
543	switch (memreg) {
544	case RPCRDMA_BOUNCEBUFFERS:
545	case RPCRDMA_REGISTER:
546	case RPCRDMA_FRMR:
547		break;
548#if RPCRDMA_PERSISTENT_REGISTRATION
549	case RPCRDMA_ALLPHYSICAL:
550		mem_priv = IB_ACCESS_LOCAL_WRITE |
551				IB_ACCESS_REMOTE_WRITE |
552				IB_ACCESS_REMOTE_READ;
553		goto register_setup;
554#endif
555	case RPCRDMA_MEMWINDOWS_ASYNC:
556	case RPCRDMA_MEMWINDOWS:
557		mem_priv = IB_ACCESS_LOCAL_WRITE |
558				IB_ACCESS_MW_BIND;
559		goto register_setup;
560	case RPCRDMA_MTHCAFMR:
561		if (ia->ri_have_dma_lkey)
562			break;
563		mem_priv = IB_ACCESS_LOCAL_WRITE;
564	register_setup:
565		ia->ri_bind_mem = ib_get_dma_mr(ia->ri_pd, mem_priv);
566		if (IS_ERR(ia->ri_bind_mem)) {
567			printk(KERN_ALERT "%s: ib_get_dma_mr for "
568				"phys register failed with %lX\n\t"
569				"Will continue with degraded performance\n",
570				__func__, PTR_ERR(ia->ri_bind_mem));
571			memreg = RPCRDMA_REGISTER;
572			ia->ri_bind_mem = NULL;
573		}
574		break;
575	default:
576		printk(KERN_ERR "%s: invalid memory registration mode %d\n",
577				__func__, memreg);
578		rc = -EINVAL;
579		goto out2;
580	}
581	dprintk("RPC:       %s: memory registration strategy is %d\n",
582		__func__, memreg);
583
584	/* Else will do memory reg/dereg for each chunk */
585	ia->ri_memreg_strategy = memreg;
586
587	return 0;
588out2:
589	rdma_destroy_id(ia->ri_id);
590	ia->ri_id = NULL;
591out1:
592	return rc;
593}
594
595/*
596 * Clean up/close an IA.
597 *   o if event handles and PD have been initialized, free them.
598 *   o close the IA
599 */
600void
601rpcrdma_ia_close(struct rpcrdma_ia *ia)
602{
603	int rc;
604
605	dprintk("RPC:       %s: entering\n", __func__);
606	if (ia->ri_bind_mem != NULL) {
607		rc = ib_dereg_mr(ia->ri_bind_mem);
608		dprintk("RPC:       %s: ib_dereg_mr returned %i\n",
609			__func__, rc);
610	}
611	if (ia->ri_id != NULL && !IS_ERR(ia->ri_id)) {
612		if (ia->ri_id->qp)
613			rdma_destroy_qp(ia->ri_id);
614		rdma_destroy_id(ia->ri_id);
615		ia->ri_id = NULL;
616	}
617	if (ia->ri_pd != NULL && !IS_ERR(ia->ri_pd)) {
618		rc = ib_dealloc_pd(ia->ri_pd);
619		dprintk("RPC:       %s: ib_dealloc_pd returned %i\n",
620			__func__, rc);
621	}
622}
623
624/*
625 * Create unconnected endpoint.
626 */
627int
628rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
629				struct rpcrdma_create_data_internal *cdata)
630{
631	struct ib_device_attr devattr;
632	int rc, err;
633
634	rc = ib_query_device(ia->ri_id->device, &devattr);
635	if (rc) {
636		dprintk("RPC:       %s: ib_query_device failed %d\n",
637			__func__, rc);
638		return rc;
639	}
640
641	/* check provider's send/recv wr limits */
642	if (cdata->max_requests > devattr.max_qp_wr)
643		cdata->max_requests = devattr.max_qp_wr;
644
645	ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall;
646	ep->rep_attr.qp_context = ep;
647	/* send_cq and recv_cq initialized below */
648	ep->rep_attr.srq = NULL;
649	ep->rep_attr.cap.max_send_wr = cdata->max_requests;
650	switch (ia->ri_memreg_strategy) {
651	case RPCRDMA_FRMR:
652		/* Add room for frmr register and invalidate WRs */
653		ep->rep_attr.cap.max_send_wr *= 3;
654		if (ep->rep_attr.cap.max_send_wr > devattr.max_qp_wr)
655			return -EINVAL;
656		break;
657	case RPCRDMA_MEMWINDOWS_ASYNC:
658	case RPCRDMA_MEMWINDOWS:
659		/* Add room for mw_binds+unbinds - overkill! */
660		ep->rep_attr.cap.max_send_wr++;
661		ep->rep_attr.cap.max_send_wr *= (2 * RPCRDMA_MAX_SEGS);
662		if (ep->rep_attr.cap.max_send_wr > devattr.max_qp_wr)
663			return -EINVAL;
664		break;
665	default:
666		break;
667	}
668	ep->rep_attr.cap.max_recv_wr = cdata->max_requests;
669	ep->rep_attr.cap.max_send_sge = (cdata->padding ? 4 : 2);
670	ep->rep_attr.cap.max_recv_sge = 1;
671	ep->rep_attr.cap.max_inline_data = 0;
672	ep->rep_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
673	ep->rep_attr.qp_type = IB_QPT_RC;
674	ep->rep_attr.port_num = ~0;
675
676	dprintk("RPC:       %s: requested max: dtos: send %d recv %d; "
677		"iovs: send %d recv %d\n",
678		__func__,
679		ep->rep_attr.cap.max_send_wr,
680		ep->rep_attr.cap.max_recv_wr,
681		ep->rep_attr.cap.max_send_sge,
682		ep->rep_attr.cap.max_recv_sge);
683
684	/* set trigger for requesting send completion */
685	ep->rep_cqinit = ep->rep_attr.cap.max_send_wr/2 /*  - 1*/;
686	switch (ia->ri_memreg_strategy) {
687	case RPCRDMA_MEMWINDOWS_ASYNC:
688	case RPCRDMA_MEMWINDOWS:
689		ep->rep_cqinit -= RPCRDMA_MAX_SEGS;
690		break;
691	default:
692		break;
693	}
694	if (ep->rep_cqinit <= 2)
695		ep->rep_cqinit = 0;
696	INIT_CQCOUNT(ep);
697	ep->rep_ia = ia;
698	init_waitqueue_head(&ep->rep_connect_wait);
699
700	/*
701	 * Create a single cq for receive dto and mw_bind (only ever
702	 * care about unbind, really). Send completions are suppressed.
703	 * Use single threaded tasklet upcalls to maintain ordering.
704	 */
705	ep->rep_cq = ib_create_cq(ia->ri_id->device, rpcrdma_cq_event_upcall,
706				  rpcrdma_cq_async_error_upcall, NULL,
707				  ep->rep_attr.cap.max_recv_wr +
708				  ep->rep_attr.cap.max_send_wr + 1, 0);
709	if (IS_ERR(ep->rep_cq)) {
710		rc = PTR_ERR(ep->rep_cq);
711		dprintk("RPC:       %s: ib_create_cq failed: %i\n",
712			__func__, rc);
713		goto out1;
714	}
715
716	rc = ib_req_notify_cq(ep->rep_cq, IB_CQ_NEXT_COMP);
717	if (rc) {
718		dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
719			__func__, rc);
720		goto out2;
721	}
722
723	ep->rep_attr.send_cq = ep->rep_cq;
724	ep->rep_attr.recv_cq = ep->rep_cq;
725
726	/* Initialize cma parameters */
727
728	/* RPC/RDMA does not use private data */
729	ep->rep_remote_cma.private_data = NULL;
730	ep->rep_remote_cma.private_data_len = 0;
731
732	/* Client offers RDMA Read but does not initiate */
733	ep->rep_remote_cma.initiator_depth = 0;
734	if (ia->ri_memreg_strategy == RPCRDMA_BOUNCEBUFFERS)
735		ep->rep_remote_cma.responder_resources = 0;
736	else if (devattr.max_qp_rd_atom > 32)	/* arbitrary but <= 255 */
737		ep->rep_remote_cma.responder_resources = 32;
738	else
739		ep->rep_remote_cma.responder_resources = devattr.max_qp_rd_atom;
740
741	ep->rep_remote_cma.retry_count = 7;
742	ep->rep_remote_cma.flow_control = 0;
743	ep->rep_remote_cma.rnr_retry_count = 0;
744
745	return 0;
746
747out2:
748	err = ib_destroy_cq(ep->rep_cq);
749	if (err)
750		dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
751			__func__, err);
752out1:
753	return rc;
754}
755
756/*
757 * rpcrdma_ep_destroy
758 *
759 * Disconnect and destroy endpoint. After this, the only
760 * valid operations on the ep are to free it (if dynamically
761 * allocated) or re-create it.
762 *
763 * The caller's error handling must be sure to not leak the endpoint
764 * if this function fails.
765 */
766int
767rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
768{
769	int rc;
770
771	dprintk("RPC:       %s: entering, connected is %d\n",
772		__func__, ep->rep_connected);
773
774	if (ia->ri_id->qp) {
775		rc = rpcrdma_ep_disconnect(ep, ia);
776		if (rc)
777			dprintk("RPC:       %s: rpcrdma_ep_disconnect"
778				" returned %i\n", __func__, rc);
779		rdma_destroy_qp(ia->ri_id);
780		ia->ri_id->qp = NULL;
781	}
782
783	/* padding - could be done in rpcrdma_buffer_destroy... */
784	if (ep->rep_pad_mr) {
785		rpcrdma_deregister_internal(ia, ep->rep_pad_mr, &ep->rep_pad);
786		ep->rep_pad_mr = NULL;
787	}
788
789	rpcrdma_clean_cq(ep->rep_cq);
790	rc = ib_destroy_cq(ep->rep_cq);
791	if (rc)
792		dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
793			__func__, rc);
794
795	return rc;
796}
797
798/*
799 * Connect unconnected endpoint.
800 */
801int
802rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
803{
804	struct rdma_cm_id *id;
805	int rc = 0;
806	int retry_count = 0;
807
808	if (ep->rep_connected != 0) {
809		struct rpcrdma_xprt *xprt;
810retry:
811		rc = rpcrdma_ep_disconnect(ep, ia);
812		if (rc && rc != -ENOTCONN)
813			dprintk("RPC:       %s: rpcrdma_ep_disconnect"
814				" status %i\n", __func__, rc);
815		rpcrdma_clean_cq(ep->rep_cq);
816
817		xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
818		id = rpcrdma_create_id(xprt, ia,
819				(struct sockaddr *)&xprt->rx_data.addr);
820		if (IS_ERR(id)) {
821			rc = PTR_ERR(id);
822			goto out;
823		}
824		/* TEMP TEMP TEMP - fail if new device:
825		 * Deregister/remarshal *all* requests!
826		 * Close and recreate adapter, pd, etc!
827		 * Re-determine all attributes still sane!
828		 * More stuff I haven't thought of!
829		 * Rrrgh!
830		 */
831		if (ia->ri_id->device != id->device) {
832			printk("RPC:       %s: can't reconnect on "
833				"different device!\n", __func__);
834			rdma_destroy_id(id);
835			rc = -ENETDOWN;
836			goto out;
837		}
838		/* END TEMP */
839		rdma_destroy_qp(ia->ri_id);
840		rdma_destroy_id(ia->ri_id);
841		ia->ri_id = id;
842	}
843
844	rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr);
845	if (rc) {
846		dprintk("RPC:       %s: rdma_create_qp failed %i\n",
847			__func__, rc);
848		goto out;
849	}
850
851/* XXX Tavor device performs badly with 2K MTU! */
852if (strnicmp(ia->ri_id->device->dma_device->bus->name, "pci", 3) == 0) {
853	struct pci_dev *pcid = to_pci_dev(ia->ri_id->device->dma_device);
854	if (pcid->device == PCI_DEVICE_ID_MELLANOX_TAVOR &&
855	    (pcid->vendor == PCI_VENDOR_ID_MELLANOX ||
856	     pcid->vendor == PCI_VENDOR_ID_TOPSPIN)) {
857		struct ib_qp_attr attr = {
858			.path_mtu = IB_MTU_1024
859		};
860		rc = ib_modify_qp(ia->ri_id->qp, &attr, IB_QP_PATH_MTU);
861	}
862}
863
864	ep->rep_connected = 0;
865
866	rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma);
867	if (rc) {
868		dprintk("RPC:       %s: rdma_connect() failed with %i\n",
869				__func__, rc);
870		goto out;
871	}
872
873	wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 0);
874
875	/*
876	 * Check state. A non-peer reject indicates no listener
877	 * (ECONNREFUSED), which may be a transient state. All
878	 * others indicate a transport condition which has already
879	 * undergone a best-effort.
880	 */
881	if (ep->rep_connected == -ECONNREFUSED &&
882	    ++retry_count <= RDMA_CONNECT_RETRY_MAX) {
883		dprintk("RPC:       %s: non-peer_reject, retry\n", __func__);
884		goto retry;
885	}
886	if (ep->rep_connected <= 0) {
887		/* Sometimes, the only way to reliably connect to remote
888		 * CMs is to use same nonzero values for ORD and IRD. */
889		if (retry_count++ <= RDMA_CONNECT_RETRY_MAX + 1 &&
890		    (ep->rep_remote_cma.responder_resources == 0 ||
891		     ep->rep_remote_cma.initiator_depth !=
892				ep->rep_remote_cma.responder_resources)) {
893			if (ep->rep_remote_cma.responder_resources == 0)
894				ep->rep_remote_cma.responder_resources = 1;
895			ep->rep_remote_cma.initiator_depth =
896				ep->rep_remote_cma.responder_resources;
897			goto retry;
898		}
899		rc = ep->rep_connected;
900	} else {
901		dprintk("RPC:       %s: connected\n", __func__);
902	}
903
904out:
905	if (rc)
906		ep->rep_connected = rc;
907	return rc;
908}
909
910/*
911 * rpcrdma_ep_disconnect
912 *
913 * This is separate from destroy to facilitate the ability
914 * to reconnect without recreating the endpoint.
915 *
916 * This call is not reentrant, and must not be made in parallel
917 * on the same endpoint.
918 */
919int
920rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
921{
922	int rc;
923
924	rpcrdma_clean_cq(ep->rep_cq);
925	rc = rdma_disconnect(ia->ri_id);
926	if (!rc) {
927		/* returns without wait if not connected */
928		wait_event_interruptible(ep->rep_connect_wait,
929							ep->rep_connected != 1);
930		dprintk("RPC:       %s: after wait, %sconnected\n", __func__,
931			(ep->rep_connected == 1) ? "still " : "dis");
932	} else {
933		dprintk("RPC:       %s: rdma_disconnect %i\n", __func__, rc);
934		ep->rep_connected = rc;
935	}
936	return rc;
937}
938
939/*
940 * Initialize buffer memory
941 */
942int
943rpcrdma_buffer_create(struct rpcrdma_buffer *buf, struct rpcrdma_ep *ep,
944	struct rpcrdma_ia *ia, struct rpcrdma_create_data_internal *cdata)
945{
946	char *p;
947	size_t len;
948	int i, rc;
949	struct rpcrdma_mw *r;
950
951	buf->rb_max_requests = cdata->max_requests;
952	spin_lock_init(&buf->rb_lock);
953	atomic_set(&buf->rb_credits, 1);
954
955	/* Need to allocate:
956	 *   1.  arrays for send and recv pointers
957	 *   2.  arrays of struct rpcrdma_req to fill in pointers
958	 *   3.  array of struct rpcrdma_rep for replies
959	 *   4.  padding, if any
960	 *   5.  mw's, fmr's or frmr's, if any
961	 * Send/recv buffers in req/rep need to be registered
962	 */
963
964	len = buf->rb_max_requests *
965		(sizeof(struct rpcrdma_req *) + sizeof(struct rpcrdma_rep *));
966	len += cdata->padding;
967	switch (ia->ri_memreg_strategy) {
968	case RPCRDMA_FRMR:
969		len += buf->rb_max_requests * RPCRDMA_MAX_SEGS *
970				sizeof(struct rpcrdma_mw);
971		break;
972	case RPCRDMA_MTHCAFMR:
973		/* TBD we are perhaps overallocating here */
974		len += (buf->rb_max_requests + 1) * RPCRDMA_MAX_SEGS *
975				sizeof(struct rpcrdma_mw);
976		break;
977	case RPCRDMA_MEMWINDOWS_ASYNC:
978	case RPCRDMA_MEMWINDOWS:
979		len += (buf->rb_max_requests + 1) * RPCRDMA_MAX_SEGS *
980				sizeof(struct rpcrdma_mw);
981		break;
982	default:
983		break;
984	}
985
986	/* allocate 1, 4 and 5 in one shot */
987	p = kzalloc(len, GFP_KERNEL);
988	if (p == NULL) {
989		dprintk("RPC:       %s: req_t/rep_t/pad kzalloc(%zd) failed\n",
990			__func__, len);
991		rc = -ENOMEM;
992		goto out;
993	}
994	buf->rb_pool = p;	/* for freeing it later */
995
996	buf->rb_send_bufs = (struct rpcrdma_req **) p;
997	p = (char *) &buf->rb_send_bufs[buf->rb_max_requests];
998	buf->rb_recv_bufs = (struct rpcrdma_rep **) p;
999	p = (char *) &buf->rb_recv_bufs[buf->rb_max_requests];
1000
1001	/*
1002	 * Register the zeroed pad buffer, if any.
1003	 */
1004	if (cdata->padding) {
1005		rc = rpcrdma_register_internal(ia, p, cdata->padding,
1006					    &ep->rep_pad_mr, &ep->rep_pad);
1007		if (rc)
1008			goto out;
1009	}
1010	p += cdata->padding;
1011
1012	/*
1013	 * Allocate the fmr's, or mw's for mw_bind chunk registration.
1014	 * We "cycle" the mw's in order to minimize rkey reuse,
1015	 * and also reduce unbind-to-bind collision.
1016	 */
1017	INIT_LIST_HEAD(&buf->rb_mws);
1018	r = (struct rpcrdma_mw *)p;
1019	switch (ia->ri_memreg_strategy) {
1020	case RPCRDMA_FRMR:
1021		for (i = buf->rb_max_requests * RPCRDMA_MAX_SEGS; i; i--) {
1022			r->r.frmr.fr_mr = ib_alloc_fast_reg_mr(ia->ri_pd,
1023							 RPCRDMA_MAX_SEGS);
1024			if (IS_ERR(r->r.frmr.fr_mr)) {
1025				rc = PTR_ERR(r->r.frmr.fr_mr);
1026				dprintk("RPC:       %s: ib_alloc_fast_reg_mr"
1027					" failed %i\n", __func__, rc);
1028				goto out;
1029			}
1030			r->r.frmr.fr_pgl =
1031				ib_alloc_fast_reg_page_list(ia->ri_id->device,
1032							    RPCRDMA_MAX_SEGS);
1033			if (IS_ERR(r->r.frmr.fr_pgl)) {
1034				rc = PTR_ERR(r->r.frmr.fr_pgl);
1035				dprintk("RPC:       %s: "
1036					"ib_alloc_fast_reg_page_list "
1037					"failed %i\n", __func__, rc);
1038				goto out;
1039			}
1040			list_add(&r->mw_list, &buf->rb_mws);
1041			++r;
1042		}
1043		break;
1044	case RPCRDMA_MTHCAFMR:
1045		/* TBD we are perhaps overallocating here */
1046		for (i = (buf->rb_max_requests+1) * RPCRDMA_MAX_SEGS; i; i--) {
1047			static struct ib_fmr_attr fa =
1048				{ RPCRDMA_MAX_DATA_SEGS, 1, PAGE_SHIFT };
1049			r->r.fmr = ib_alloc_fmr(ia->ri_pd,
1050				IB_ACCESS_REMOTE_WRITE | IB_ACCESS_REMOTE_READ,
1051				&fa);
1052			if (IS_ERR(r->r.fmr)) {
1053				rc = PTR_ERR(r->r.fmr);
1054				dprintk("RPC:       %s: ib_alloc_fmr"
1055					" failed %i\n", __func__, rc);
1056				goto out;
1057			}
1058			list_add(&r->mw_list, &buf->rb_mws);
1059			++r;
1060		}
1061		break;
1062	case RPCRDMA_MEMWINDOWS_ASYNC:
1063	case RPCRDMA_MEMWINDOWS:
1064		/* Allocate one extra request's worth, for full cycling */
1065		for (i = (buf->rb_max_requests+1) * RPCRDMA_MAX_SEGS; i; i--) {
1066			r->r.mw = ib_alloc_mw(ia->ri_pd);
1067			if (IS_ERR(r->r.mw)) {
1068				rc = PTR_ERR(r->r.mw);
1069				dprintk("RPC:       %s: ib_alloc_mw"
1070					" failed %i\n", __func__, rc);
1071				goto out;
1072			}
1073			list_add(&r->mw_list, &buf->rb_mws);
1074			++r;
1075		}
1076		break;
1077	default:
1078		break;
1079	}
1080
1081	/*
1082	 * Allocate/init the request/reply buffers. Doing this
1083	 * using kmalloc for now -- one for each buf.
1084	 */
1085	for (i = 0; i < buf->rb_max_requests; i++) {
1086		struct rpcrdma_req *req;
1087		struct rpcrdma_rep *rep;
1088
1089		len = cdata->inline_wsize + sizeof(struct rpcrdma_req);
1090		/* RPC layer requests *double* size + 1K RPC_SLACK_SPACE! */
1091		/* Typical ~2400b, so rounding up saves work later */
1092		if (len < 4096)
1093			len = 4096;
1094		req = kmalloc(len, GFP_KERNEL);
1095		if (req == NULL) {
1096			dprintk("RPC:       %s: request buffer %d alloc"
1097				" failed\n", __func__, i);
1098			rc = -ENOMEM;
1099			goto out;
1100		}
1101		memset(req, 0, sizeof(struct rpcrdma_req));
1102		buf->rb_send_bufs[i] = req;
1103		buf->rb_send_bufs[i]->rl_buffer = buf;
1104
1105		rc = rpcrdma_register_internal(ia, req->rl_base,
1106				len - offsetof(struct rpcrdma_req, rl_base),
1107				&buf->rb_send_bufs[i]->rl_handle,
1108				&buf->rb_send_bufs[i]->rl_iov);
1109		if (rc)
1110			goto out;
1111
1112		buf->rb_send_bufs[i]->rl_size = len-sizeof(struct rpcrdma_req);
1113
1114		len = cdata->inline_rsize + sizeof(struct rpcrdma_rep);
1115		rep = kmalloc(len, GFP_KERNEL);
1116		if (rep == NULL) {
1117			dprintk("RPC:       %s: reply buffer %d alloc failed\n",
1118				__func__, i);
1119			rc = -ENOMEM;
1120			goto out;
1121		}
1122		memset(rep, 0, sizeof(struct rpcrdma_rep));
1123		buf->rb_recv_bufs[i] = rep;
1124		buf->rb_recv_bufs[i]->rr_buffer = buf;
1125		init_waitqueue_head(&rep->rr_unbind);
1126
1127		rc = rpcrdma_register_internal(ia, rep->rr_base,
1128				len - offsetof(struct rpcrdma_rep, rr_base),
1129				&buf->rb_recv_bufs[i]->rr_handle,
1130				&buf->rb_recv_bufs[i]->rr_iov);
1131		if (rc)
1132			goto out;
1133
1134	}
1135	dprintk("RPC:       %s: max_requests %d\n",
1136		__func__, buf->rb_max_requests);
1137	/* done */
1138	return 0;
1139out:
1140	rpcrdma_buffer_destroy(buf);
1141	return rc;
1142}
1143
1144/*
1145 * Unregister and destroy buffer memory. Need to deal with
1146 * partial initialization, so it's callable from failed create.
1147 * Must be called before destroying endpoint, as registrations
1148 * reference it.
1149 */
1150void
1151rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
1152{
1153	int rc, i;
1154	struct rpcrdma_ia *ia = rdmab_to_ia(buf);
1155	struct rpcrdma_mw *r;
1156
1157	/* clean up in reverse order from create
1158	 *   1.  recv mr memory (mr free, then kfree)
1159	 *   1a. bind mw memory
1160	 *   2.  send mr memory (mr free, then kfree)
1161	 *   3.  padding (if any) [moved to rpcrdma_ep_destroy]
1162	 *   4.  arrays
1163	 */
1164	dprintk("RPC:       %s: entering\n", __func__);
1165
1166	for (i = 0; i < buf->rb_max_requests; i++) {
1167		if (buf->rb_recv_bufs && buf->rb_recv_bufs[i]) {
1168			rpcrdma_deregister_internal(ia,
1169					buf->rb_recv_bufs[i]->rr_handle,
1170					&buf->rb_recv_bufs[i]->rr_iov);
1171			kfree(buf->rb_recv_bufs[i]);
1172		}
1173		if (buf->rb_send_bufs && buf->rb_send_bufs[i]) {
1174			while (!list_empty(&buf->rb_mws)) {
1175				r = list_entry(buf->rb_mws.next,
1176					struct rpcrdma_mw, mw_list);
1177				list_del(&r->mw_list);
1178				switch (ia->ri_memreg_strategy) {
1179				case RPCRDMA_FRMR:
1180					rc = ib_dereg_mr(r->r.frmr.fr_mr);
1181					if (rc)
1182						dprintk("RPC:       %s:"
1183							" ib_dereg_mr"
1184							" failed %i\n",
1185							__func__, rc);
1186					ib_free_fast_reg_page_list(r->r.frmr.fr_pgl);
1187					break;
1188				case RPCRDMA_MTHCAFMR:
1189					rc = ib_dealloc_fmr(r->r.fmr);
1190					if (rc)
1191						dprintk("RPC:       %s:"
1192							" ib_dealloc_fmr"
1193							" failed %i\n",
1194							__func__, rc);
1195					break;
1196				case RPCRDMA_MEMWINDOWS_ASYNC:
1197				case RPCRDMA_MEMWINDOWS:
1198					rc = ib_dealloc_mw(r->r.mw);
1199					if (rc)
1200						dprintk("RPC:       %s:"
1201							" ib_dealloc_mw"
1202							" failed %i\n",
1203							__func__, rc);
1204					break;
1205				default:
1206					break;
1207				}
1208			}
1209			rpcrdma_deregister_internal(ia,
1210					buf->rb_send_bufs[i]->rl_handle,
1211					&buf->rb_send_bufs[i]->rl_iov);
1212			kfree(buf->rb_send_bufs[i]);
1213		}
1214	}
1215
1216	kfree(buf->rb_pool);
1217}
1218
1219/*
1220 * Get a set of request/reply buffers.
1221 *
1222 * Reply buffer (if needed) is attached to send buffer upon return.
1223 * Rule:
1224 *    rb_send_index and rb_recv_index MUST always be pointing to the
1225 *    *next* available buffer (non-NULL). They are incremented after
1226 *    removing buffers, and decremented *before* returning them.
1227 */
1228struct rpcrdma_req *
1229rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
1230{
1231	struct rpcrdma_req *req;
1232	unsigned long flags;
1233	int i;
1234	struct rpcrdma_mw *r;
1235
1236	spin_lock_irqsave(&buffers->rb_lock, flags);
1237	if (buffers->rb_send_index == buffers->rb_max_requests) {
1238		spin_unlock_irqrestore(&buffers->rb_lock, flags);
1239		dprintk("RPC:       %s: out of request buffers\n", __func__);
1240		return ((struct rpcrdma_req *)NULL);
1241	}
1242
1243	req = buffers->rb_send_bufs[buffers->rb_send_index];
1244	if (buffers->rb_send_index < buffers->rb_recv_index) {
1245		dprintk("RPC:       %s: %d extra receives outstanding (ok)\n",
1246			__func__,
1247			buffers->rb_recv_index - buffers->rb_send_index);
1248		req->rl_reply = NULL;
1249	} else {
1250		req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
1251		buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
1252	}
1253	buffers->rb_send_bufs[buffers->rb_send_index++] = NULL;
1254	if (!list_empty(&buffers->rb_mws)) {
1255		i = RPCRDMA_MAX_SEGS - 1;
1256		do {
1257			r = list_entry(buffers->rb_mws.next,
1258					struct rpcrdma_mw, mw_list);
1259			list_del(&r->mw_list);
1260			req->rl_segments[i].mr_chunk.rl_mw = r;
1261		} while (--i >= 0);
1262	}
1263	spin_unlock_irqrestore(&buffers->rb_lock, flags);
1264	return req;
1265}
1266
1267/*
1268 * Put request/reply buffers back into pool.
1269 * Pre-decrement counter/array index.
1270 */
1271void
1272rpcrdma_buffer_put(struct rpcrdma_req *req)
1273{
1274	struct rpcrdma_buffer *buffers = req->rl_buffer;
1275	struct rpcrdma_ia *ia = rdmab_to_ia(buffers);
1276	int i;
1277	unsigned long flags;
1278
1279	BUG_ON(req->rl_nchunks != 0);
1280	spin_lock_irqsave(&buffers->rb_lock, flags);
1281	buffers->rb_send_bufs[--buffers->rb_send_index] = req;
1282	req->rl_niovs = 0;
1283	if (req->rl_reply) {
1284		buffers->rb_recv_bufs[--buffers->rb_recv_index] = req->rl_reply;
1285		init_waitqueue_head(&req->rl_reply->rr_unbind);
1286		req->rl_reply->rr_func = NULL;
1287		req->rl_reply = NULL;
1288	}
1289	switch (ia->ri_memreg_strategy) {
1290	case RPCRDMA_FRMR:
1291	case RPCRDMA_MTHCAFMR:
1292	case RPCRDMA_MEMWINDOWS_ASYNC:
1293	case RPCRDMA_MEMWINDOWS:
1294		/*
1295		 * Cycle mw's back in reverse order, and "spin" them.
1296		 * This delays and scrambles reuse as much as possible.
1297		 */
1298		i = 1;
1299		do {
1300			struct rpcrdma_mw **mw;
1301			mw = &req->rl_segments[i].mr_chunk.rl_mw;
1302			list_add_tail(&(*mw)->mw_list, &buffers->rb_mws);
1303			*mw = NULL;
1304		} while (++i < RPCRDMA_MAX_SEGS);
1305		list_add_tail(&req->rl_segments[0].mr_chunk.rl_mw->mw_list,
1306					&buffers->rb_mws);
1307		req->rl_segments[0].mr_chunk.rl_mw = NULL;
1308		break;
1309	default:
1310		break;
1311	}
1312	spin_unlock_irqrestore(&buffers->rb_lock, flags);
1313}
1314
1315/*
1316 * Recover reply buffers from pool.
1317 * This happens when recovering from error conditions.
1318 * Post-increment counter/array index.
1319 */
1320void
1321rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
1322{
1323	struct rpcrdma_buffer *buffers = req->rl_buffer;
1324	unsigned long flags;
1325
1326	if (req->rl_iov.length == 0)	/* special case xprt_rdma_allocate() */
1327		buffers = ((struct rpcrdma_req *) buffers)->rl_buffer;
1328	spin_lock_irqsave(&buffers->rb_lock, flags);
1329	if (buffers->rb_recv_index < buffers->rb_max_requests) {
1330		req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
1331		buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
1332	}
1333	spin_unlock_irqrestore(&buffers->rb_lock, flags);
1334}
1335
1336/*
1337 * Put reply buffers back into pool when not attached to
1338 * request. This happens in error conditions, and when
1339 * aborting unbinds. Pre-decrement counter/array index.
1340 */
1341void
1342rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
1343{
1344	struct rpcrdma_buffer *buffers = rep->rr_buffer;
1345	unsigned long flags;
1346
1347	rep->rr_func = NULL;
1348	spin_lock_irqsave(&buffers->rb_lock, flags);
1349	buffers->rb_recv_bufs[--buffers->rb_recv_index] = rep;
1350	spin_unlock_irqrestore(&buffers->rb_lock, flags);
1351}
1352
1353/*
1354 * Wrappers for internal-use kmalloc memory registration, used by buffer code.
1355 */
1356
1357int
1358rpcrdma_register_internal(struct rpcrdma_ia *ia, void *va, int len,
1359				struct ib_mr **mrp, struct ib_sge *iov)
1360{
1361	struct ib_phys_buf ipb;
1362	struct ib_mr *mr;
1363	int rc;
1364
1365	/*
1366	 * All memory passed here was kmalloc'ed, therefore phys-contiguous.
1367	 */
1368	iov->addr = ib_dma_map_single(ia->ri_id->device,
1369			va, len, DMA_BIDIRECTIONAL);
1370	iov->length = len;
1371
1372	if (ia->ri_have_dma_lkey) {
1373		*mrp = NULL;
1374		iov->lkey = ia->ri_dma_lkey;
1375		return 0;
1376	} else if (ia->ri_bind_mem != NULL) {
1377		*mrp = NULL;
1378		iov->lkey = ia->ri_bind_mem->lkey;
1379		return 0;
1380	}
1381
1382	ipb.addr = iov->addr;
1383	ipb.size = iov->length;
1384	mr = ib_reg_phys_mr(ia->ri_pd, &ipb, 1,
1385			IB_ACCESS_LOCAL_WRITE, &iov->addr);
1386
1387	dprintk("RPC:       %s: phys convert: 0x%llx "
1388			"registered 0x%llx length %d\n",
1389			__func__, (unsigned long long)ipb.addr,
1390			(unsigned long long)iov->addr, len);
1391
1392	if (IS_ERR(mr)) {
1393		*mrp = NULL;
1394		rc = PTR_ERR(mr);
1395		dprintk("RPC:       %s: failed with %i\n", __func__, rc);
1396	} else {
1397		*mrp = mr;
1398		iov->lkey = mr->lkey;
1399		rc = 0;
1400	}
1401
1402	return rc;
1403}
1404
1405int
1406rpcrdma_deregister_internal(struct rpcrdma_ia *ia,
1407				struct ib_mr *mr, struct ib_sge *iov)
1408{
1409	int rc;
1410
1411	ib_dma_unmap_single(ia->ri_id->device,
1412			iov->addr, iov->length, DMA_BIDIRECTIONAL);
1413
1414	if (NULL == mr)
1415		return 0;
1416
1417	rc = ib_dereg_mr(mr);
1418	if (rc)
1419		dprintk("RPC:       %s: ib_dereg_mr failed %i\n", __func__, rc);
1420	return rc;
1421}
1422
1423/*
1424 * Wrappers for chunk registration, shared by read/write chunk code.
1425 */
1426
1427static void
1428rpcrdma_map_one(struct rpcrdma_ia *ia, struct rpcrdma_mr_seg *seg, int writing)
1429{
1430	seg->mr_dir = writing ? DMA_FROM_DEVICE : DMA_TO_DEVICE;
1431	seg->mr_dmalen = seg->mr_len;
1432	if (seg->mr_page)
1433		seg->mr_dma = ib_dma_map_page(ia->ri_id->device,
1434				seg->mr_page, offset_in_page(seg->mr_offset),
1435				seg->mr_dmalen, seg->mr_dir);
1436	else
1437		seg->mr_dma = ib_dma_map_single(ia->ri_id->device,
1438				seg->mr_offset,
1439				seg->mr_dmalen, seg->mr_dir);
1440}
1441
1442static void
1443rpcrdma_unmap_one(struct rpcrdma_ia *ia, struct rpcrdma_mr_seg *seg)
1444{
1445	if (seg->mr_page)
1446		ib_dma_unmap_page(ia->ri_id->device,
1447				seg->mr_dma, seg->mr_dmalen, seg->mr_dir);
1448	else
1449		ib_dma_unmap_single(ia->ri_id->device,
1450				seg->mr_dma, seg->mr_dmalen, seg->mr_dir);
1451}
1452
1453static int
1454rpcrdma_register_frmr_external(struct rpcrdma_mr_seg *seg,
1455			int *nsegs, int writing, struct rpcrdma_ia *ia,
1456			struct rpcrdma_xprt *r_xprt)
1457{
1458	struct rpcrdma_mr_seg *seg1 = seg;
1459	struct ib_send_wr frmr_wr, *bad_wr;
1460	u8 key;
1461	int len, pageoff;
1462	int i, rc;
1463
1464	pageoff = offset_in_page(seg1->mr_offset);
1465	seg1->mr_offset -= pageoff;	/* start of page */
1466	seg1->mr_len += pageoff;
1467	len = -pageoff;
1468	if (*nsegs > RPCRDMA_MAX_DATA_SEGS)
1469		*nsegs = RPCRDMA_MAX_DATA_SEGS;
1470	for (i = 0; i < *nsegs;) {
1471		rpcrdma_map_one(ia, seg, writing);
1472		seg1->mr_chunk.rl_mw->r.frmr.fr_pgl->page_list[i] = seg->mr_dma;
1473		len += seg->mr_len;
1474		++seg;
1475		++i;
1476		/* Check for holes */
1477		if ((i < *nsegs && offset_in_page(seg->mr_offset)) ||
1478		    offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
1479			break;
1480	}
1481	dprintk("RPC:       %s: Using frmr %p to map %d segments\n",
1482		__func__, seg1->mr_chunk.rl_mw, i);
1483
1484	/* Bump the key */
1485	key = (u8)(seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey & 0x000000FF);
1486	ib_update_fast_reg_key(seg1->mr_chunk.rl_mw->r.frmr.fr_mr, ++key);
1487
1488	/* Prepare FRMR WR */
1489	memset(&frmr_wr, 0, sizeof frmr_wr);
1490	frmr_wr.opcode = IB_WR_FAST_REG_MR;
1491	frmr_wr.send_flags = 0;			/* unsignaled */
1492	frmr_wr.wr.fast_reg.iova_start = (unsigned long)seg1->mr_dma;
1493	frmr_wr.wr.fast_reg.page_list = seg1->mr_chunk.rl_mw->r.frmr.fr_pgl;
1494	frmr_wr.wr.fast_reg.page_list_len = i;
1495	frmr_wr.wr.fast_reg.page_shift = PAGE_SHIFT;
1496	frmr_wr.wr.fast_reg.length = i << PAGE_SHIFT;
1497	frmr_wr.wr.fast_reg.access_flags = (writing ?
1498				IB_ACCESS_REMOTE_WRITE | IB_ACCESS_LOCAL_WRITE :
1499				IB_ACCESS_REMOTE_READ);
1500	frmr_wr.wr.fast_reg.rkey = seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
1501	DECR_CQCOUNT(&r_xprt->rx_ep);
1502
1503	rc = ib_post_send(ia->ri_id->qp, &frmr_wr, &bad_wr);
1504
1505	if (rc) {
1506		dprintk("RPC:       %s: failed ib_post_send for register,"
1507			" status %i\n", __func__, rc);
1508		while (i--)
1509			rpcrdma_unmap_one(ia, --seg);
1510	} else {
1511		seg1->mr_rkey = seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
1512		seg1->mr_base = seg1->mr_dma + pageoff;
1513		seg1->mr_nsegs = i;
1514		seg1->mr_len = len;
1515	}
1516	*nsegs = i;
1517	return rc;
1518}
1519
1520static int
1521rpcrdma_deregister_frmr_external(struct rpcrdma_mr_seg *seg,
1522			struct rpcrdma_ia *ia, struct rpcrdma_xprt *r_xprt)
1523{
1524	struct rpcrdma_mr_seg *seg1 = seg;
1525	struct ib_send_wr invalidate_wr, *bad_wr;
1526	int rc;
1527
1528	while (seg1->mr_nsegs--)
1529		rpcrdma_unmap_one(ia, seg++);
1530
1531	memset(&invalidate_wr, 0, sizeof invalidate_wr);
1532	invalidate_wr.opcode = IB_WR_LOCAL_INV;
1533	invalidate_wr.send_flags = 0;			/* unsignaled */
1534	invalidate_wr.ex.invalidate_rkey = seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
1535	DECR_CQCOUNT(&r_xprt->rx_ep);
1536
1537	rc = ib_post_send(ia->ri_id->qp, &invalidate_wr, &bad_wr);
1538	if (rc)
1539		dprintk("RPC:       %s: failed ib_post_send for invalidate,"
1540			" status %i\n", __func__, rc);
1541	return rc;
1542}
1543
1544static int
1545rpcrdma_register_fmr_external(struct rpcrdma_mr_seg *seg,
1546			int *nsegs, int writing, struct rpcrdma_ia *ia)
1547{
1548	struct rpcrdma_mr_seg *seg1 = seg;
1549	u64 physaddrs[RPCRDMA_MAX_DATA_SEGS];
1550	int len, pageoff, i, rc;
1551
1552	pageoff = offset_in_page(seg1->mr_offset);
1553	seg1->mr_offset -= pageoff;	/* start of page */
1554	seg1->mr_len += pageoff;
1555	len = -pageoff;
1556	if (*nsegs > RPCRDMA_MAX_DATA_SEGS)
1557		*nsegs = RPCRDMA_MAX_DATA_SEGS;
1558	for (i = 0; i < *nsegs;) {
1559		rpcrdma_map_one(ia, seg, writing);
1560		physaddrs[i] = seg->mr_dma;
1561		len += seg->mr_len;
1562		++seg;
1563		++i;
1564		/* Check for holes */
1565		if ((i < *nsegs && offset_in_page(seg->mr_offset)) ||
1566		    offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
1567			break;
1568	}
1569	rc = ib_map_phys_fmr(seg1->mr_chunk.rl_mw->r.fmr,
1570				physaddrs, i, seg1->mr_dma);
1571	if (rc) {
1572		dprintk("RPC:       %s: failed ib_map_phys_fmr "
1573			"%u@0x%llx+%i (%d)... status %i\n", __func__,
1574			len, (unsigned long long)seg1->mr_dma,
1575			pageoff, i, rc);
1576		while (i--)
1577			rpcrdma_unmap_one(ia, --seg);
1578	} else {
1579		seg1->mr_rkey = seg1->mr_chunk.rl_mw->r.fmr->rkey;
1580		seg1->mr_base = seg1->mr_dma + pageoff;
1581		seg1->mr_nsegs = i;
1582		seg1->mr_len = len;
1583	}
1584	*nsegs = i;
1585	return rc;
1586}
1587
1588static int
1589rpcrdma_deregister_fmr_external(struct rpcrdma_mr_seg *seg,
1590			struct rpcrdma_ia *ia)
1591{
1592	struct rpcrdma_mr_seg *seg1 = seg;
1593	LIST_HEAD(l);
1594	int rc;
1595
1596	list_add(&seg1->mr_chunk.rl_mw->r.fmr->list, &l);
1597	rc = ib_unmap_fmr(&l);
1598	while (seg1->mr_nsegs--)
1599		rpcrdma_unmap_one(ia, seg++);
1600	if (rc)
1601		dprintk("RPC:       %s: failed ib_unmap_fmr,"
1602			" status %i\n", __func__, rc);
1603	return rc;
1604}
1605
1606static int
1607rpcrdma_register_memwin_external(struct rpcrdma_mr_seg *seg,
1608			int *nsegs, int writing, struct rpcrdma_ia *ia,
1609			struct rpcrdma_xprt *r_xprt)
1610{
1611	int mem_priv = (writing ? IB_ACCESS_REMOTE_WRITE :
1612				  IB_ACCESS_REMOTE_READ);
1613	struct ib_mw_bind param;
1614	int rc;
1615
1616	*nsegs = 1;
1617	rpcrdma_map_one(ia, seg, writing);
1618	param.mr = ia->ri_bind_mem;
1619	param.wr_id = 0ULL;	/* no send cookie */
1620	param.addr = seg->mr_dma;
1621	param.length = seg->mr_len;
1622	param.send_flags = 0;
1623	param.mw_access_flags = mem_priv;
1624
1625	DECR_CQCOUNT(&r_xprt->rx_ep);
1626	rc = ib_bind_mw(ia->ri_id->qp, seg->mr_chunk.rl_mw->r.mw, &param);
1627	if (rc) {
1628		dprintk("RPC:       %s: failed ib_bind_mw "
1629			"%u@0x%llx status %i\n",
1630			__func__, seg->mr_len,
1631			(unsigned long long)seg->mr_dma, rc);
1632		rpcrdma_unmap_one(ia, seg);
1633	} else {
1634		seg->mr_rkey = seg->mr_chunk.rl_mw->r.mw->rkey;
1635		seg->mr_base = param.addr;
1636		seg->mr_nsegs = 1;
1637	}
1638	return rc;
1639}
1640
1641static int
1642rpcrdma_deregister_memwin_external(struct rpcrdma_mr_seg *seg,
1643			struct rpcrdma_ia *ia,
1644			struct rpcrdma_xprt *r_xprt, void **r)
1645{
1646	struct ib_mw_bind param;
1647	LIST_HEAD(l);
1648	int rc;
1649
1650	BUG_ON(seg->mr_nsegs != 1);
1651	param.mr = ia->ri_bind_mem;
1652	param.addr = 0ULL;	/* unbind */
1653	param.length = 0;
1654	param.mw_access_flags = 0;
1655	if (*r) {
1656		param.wr_id = (u64) (unsigned long) *r;
1657		param.send_flags = IB_SEND_SIGNALED;
1658		INIT_CQCOUNT(&r_xprt->rx_ep);
1659	} else {
1660		param.wr_id = 0ULL;
1661		param.send_flags = 0;
1662		DECR_CQCOUNT(&r_xprt->rx_ep);
1663	}
1664	rc = ib_bind_mw(ia->ri_id->qp, seg->mr_chunk.rl_mw->r.mw, &param);
1665	rpcrdma_unmap_one(ia, seg);
1666	if (rc)
1667		dprintk("RPC:       %s: failed ib_(un)bind_mw,"
1668			" status %i\n", __func__, rc);
1669	else
1670		*r = NULL;	/* will upcall on completion */
1671	return rc;
1672}
1673
1674static int
1675rpcrdma_register_default_external(struct rpcrdma_mr_seg *seg,
1676			int *nsegs, int writing, struct rpcrdma_ia *ia)
1677{
1678	int mem_priv = (writing ? IB_ACCESS_REMOTE_WRITE :
1679				  IB_ACCESS_REMOTE_READ);
1680	struct rpcrdma_mr_seg *seg1 = seg;
1681	struct ib_phys_buf ipb[RPCRDMA_MAX_DATA_SEGS];
1682	int len, i, rc = 0;
1683
1684	if (*nsegs > RPCRDMA_MAX_DATA_SEGS)
1685		*nsegs = RPCRDMA_MAX_DATA_SEGS;
1686	for (len = 0, i = 0; i < *nsegs;) {
1687		rpcrdma_map_one(ia, seg, writing);
1688		ipb[i].addr = seg->mr_dma;
1689		ipb[i].size = seg->mr_len;
1690		len += seg->mr_len;
1691		++seg;
1692		++i;
1693		/* Check for holes */
1694		if ((i < *nsegs && offset_in_page(seg->mr_offset)) ||
1695		    offset_in_page((seg-1)->mr_offset+(seg-1)->mr_len))
1696			break;
1697	}
1698	seg1->mr_base = seg1->mr_dma;
1699	seg1->mr_chunk.rl_mr = ib_reg_phys_mr(ia->ri_pd,
1700				ipb, i, mem_priv, &seg1->mr_base);
1701	if (IS_ERR(seg1->mr_chunk.rl_mr)) {
1702		rc = PTR_ERR(seg1->mr_chunk.rl_mr);
1703		dprintk("RPC:       %s: failed ib_reg_phys_mr "
1704			"%u@0x%llx (%d)... status %i\n",
1705			__func__, len,
1706			(unsigned long long)seg1->mr_dma, i, rc);
1707		while (i--)
1708			rpcrdma_unmap_one(ia, --seg);
1709	} else {
1710		seg1->mr_rkey = seg1->mr_chunk.rl_mr->rkey;
1711		seg1->mr_nsegs = i;
1712		seg1->mr_len = len;
1713	}
1714	*nsegs = i;
1715	return rc;
1716}
1717
1718static int
1719rpcrdma_deregister_default_external(struct rpcrdma_mr_seg *seg,
1720			struct rpcrdma_ia *ia)
1721{
1722	struct rpcrdma_mr_seg *seg1 = seg;
1723	int rc;
1724
1725	rc = ib_dereg_mr(seg1->mr_chunk.rl_mr);
1726	seg1->mr_chunk.rl_mr = NULL;
1727	while (seg1->mr_nsegs--)
1728		rpcrdma_unmap_one(ia, seg++);
1729	if (rc)
1730		dprintk("RPC:       %s: failed ib_dereg_mr,"
1731			" status %i\n", __func__, rc);
1732	return rc;
1733}
1734
1735int
1736rpcrdma_register_external(struct rpcrdma_mr_seg *seg,
1737			int nsegs, int writing, struct rpcrdma_xprt *r_xprt)
1738{
1739	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1740	int rc = 0;
1741
1742	switch (ia->ri_memreg_strategy) {
1743
1744#if RPCRDMA_PERSISTENT_REGISTRATION
1745	case RPCRDMA_ALLPHYSICAL:
1746		rpcrdma_map_one(ia, seg, writing);
1747		seg->mr_rkey = ia->ri_bind_mem->rkey;
1748		seg->mr_base = seg->mr_dma;
1749		seg->mr_nsegs = 1;
1750		nsegs = 1;
1751		break;
1752#endif
1753
1754	/* Registration using frmr registration */
1755	case RPCRDMA_FRMR:
1756		rc = rpcrdma_register_frmr_external(seg, &nsegs, writing, ia, r_xprt);
1757		break;
1758
1759	/* Registration using fmr memory registration */
1760	case RPCRDMA_MTHCAFMR:
1761		rc = rpcrdma_register_fmr_external(seg, &nsegs, writing, ia);
1762		break;
1763
1764	/* Registration using memory windows */
1765	case RPCRDMA_MEMWINDOWS_ASYNC:
1766	case RPCRDMA_MEMWINDOWS:
1767		rc = rpcrdma_register_memwin_external(seg, &nsegs, writing, ia, r_xprt);
1768		break;
1769
1770	/* Default registration each time */
1771	default:
1772		rc = rpcrdma_register_default_external(seg, &nsegs, writing, ia);
1773		break;
1774	}
1775	if (rc)
1776		return -1;
1777
1778	return nsegs;
1779}
1780
1781int
1782rpcrdma_deregister_external(struct rpcrdma_mr_seg *seg,
1783		struct rpcrdma_xprt *r_xprt, void *r)
1784{
1785	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1786	int nsegs = seg->mr_nsegs, rc;
1787
1788	switch (ia->ri_memreg_strategy) {
1789
1790#if RPCRDMA_PERSISTENT_REGISTRATION
1791	case RPCRDMA_ALLPHYSICAL:
1792		BUG_ON(nsegs != 1);
1793		rpcrdma_unmap_one(ia, seg);
1794		rc = 0;
1795		break;
1796#endif
1797
1798	case RPCRDMA_FRMR:
1799		rc = rpcrdma_deregister_frmr_external(seg, ia, r_xprt);
1800		break;
1801
1802	case RPCRDMA_MTHCAFMR:
1803		rc = rpcrdma_deregister_fmr_external(seg, ia);
1804		break;
1805
1806	case RPCRDMA_MEMWINDOWS_ASYNC:
1807	case RPCRDMA_MEMWINDOWS:
1808		rc = rpcrdma_deregister_memwin_external(seg, ia, r_xprt, &r);
1809		break;
1810
1811	default:
1812		rc = rpcrdma_deregister_default_external(seg, ia);
1813		break;
1814	}
1815	if (r) {
1816		struct rpcrdma_rep *rep = r;
1817		void (*func)(struct rpcrdma_rep *) = rep->rr_func;
1818		rep->rr_func = NULL;
1819		func(rep);	/* dereg done, callback now */
1820	}
1821	return nsegs;
1822}
1823
1824/*
1825 * Prepost any receive buffer, then post send.
1826 *
1827 * Receive buffer is donated to hardware, reclaimed upon recv completion.
1828 */
1829int
1830rpcrdma_ep_post(struct rpcrdma_ia *ia,
1831		struct rpcrdma_ep *ep,
1832		struct rpcrdma_req *req)
1833{
1834	struct ib_send_wr send_wr, *send_wr_fail;
1835	struct rpcrdma_rep *rep = req->rl_reply;
1836	int rc;
1837
1838	if (rep) {
1839		rc = rpcrdma_ep_post_recv(ia, ep, rep);
1840		if (rc)
1841			goto out;
1842		req->rl_reply = NULL;
1843	}
1844
1845	send_wr.next = NULL;
1846	send_wr.wr_id = 0ULL;	/* no send cookie */
1847	send_wr.sg_list = req->rl_send_iov;
1848	send_wr.num_sge = req->rl_niovs;
1849	send_wr.opcode = IB_WR_SEND;
1850	if (send_wr.num_sge == 4)	/* no need to sync any pad (constant) */
1851		ib_dma_sync_single_for_device(ia->ri_id->device,
1852			req->rl_send_iov[3].addr, req->rl_send_iov[3].length,
1853			DMA_TO_DEVICE);
1854	ib_dma_sync_single_for_device(ia->ri_id->device,
1855		req->rl_send_iov[1].addr, req->rl_send_iov[1].length,
1856		DMA_TO_DEVICE);
1857	ib_dma_sync_single_for_device(ia->ri_id->device,
1858		req->rl_send_iov[0].addr, req->rl_send_iov[0].length,
1859		DMA_TO_DEVICE);
1860
1861	if (DECR_CQCOUNT(ep) > 0)
1862		send_wr.send_flags = 0;
1863	else { /* Provider must take a send completion every now and then */
1864		INIT_CQCOUNT(ep);
1865		send_wr.send_flags = IB_SEND_SIGNALED;
1866	}
1867
1868	rc = ib_post_send(ia->ri_id->qp, &send_wr, &send_wr_fail);
1869	if (rc)
1870		dprintk("RPC:       %s: ib_post_send returned %i\n", __func__,
1871			rc);
1872out:
1873	return rc;
1874}
1875
1876/*
1877 * (Re)post a receive buffer.
1878 */
1879int
1880rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
1881		     struct rpcrdma_ep *ep,
1882		     struct rpcrdma_rep *rep)
1883{
1884	struct ib_recv_wr recv_wr, *recv_wr_fail;
1885	int rc;
1886
1887	recv_wr.next = NULL;
1888	recv_wr.wr_id = (u64) (unsigned long) rep;
1889	recv_wr.sg_list = &rep->rr_iov;
1890	recv_wr.num_sge = 1;
1891
1892	ib_dma_sync_single_for_cpu(ia->ri_id->device,
1893		rep->rr_iov.addr, rep->rr_iov.length, DMA_BIDIRECTIONAL);
1894
1895	DECR_CQCOUNT(ep);
1896	rc = ib_post_recv(ia->ri_id->qp, &recv_wr, &recv_wr_fail);
1897
1898	if (rc)
1899		dprintk("RPC:       %s: ib_post_recv returned %i\n", __func__,
1900			rc);
1901	return rc;
1902}
1903