verbs.c revision 1a954051b0cf79bd67e5f9db40333e3a9b1d05d2
1/*
2 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
3 *
4 * This software is available to you under a choice of one of two
5 * licenses.  You may choose to be licensed under the terms of the GNU
6 * General Public License (GPL) Version 2, available from the file
7 * COPYING in the main directory of this source tree, or the BSD-type
8 * license below:
9 *
10 * Redistribution and use in source and binary forms, with or without
11 * modification, are permitted provided that the following conditions
12 * are met:
13 *
14 *      Redistributions of source code must retain the above copyright
15 *      notice, this list of conditions and the following disclaimer.
16 *
17 *      Redistributions in binary form must reproduce the above
18 *      copyright notice, this list of conditions and the following
19 *      disclaimer in the documentation and/or other materials provided
20 *      with the distribution.
21 *
22 *      Neither the name of the Network Appliance, Inc. nor the names of
23 *      its contributors may be used to endorse or promote products
24 *      derived from this software without specific prior written
25 *      permission.
26 *
27 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
28 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
29 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
30 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
31 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
32 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
33 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
34 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
35 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
36 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
37 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
38 */
39
40/*
41 * verbs.c
42 *
43 * Encapsulates the major functions managing:
44 *  o adapters
45 *  o endpoints
46 *  o connections
47 *  o buffer memory
48 */
49
50#include <linux/pci.h>	/* for Tavor hack below */
51
52#include "xprt_rdma.h"
53
54/*
55 * Globals/Macros
56 */
57
58#ifdef RPC_DEBUG
59# define RPCDBG_FACILITY	RPCDBG_TRANS
60#endif
61
62/*
63 * internal functions
64 */
65
66/*
67 * handle replies in tasklet context, using a single, global list
68 * rdma tasklet function -- just turn around and call the func
69 * for all replies on the list
70 */
71
72static DEFINE_SPINLOCK(rpcrdma_tk_lock_g);
73static LIST_HEAD(rpcrdma_tasklets_g);
74
75static void
76rpcrdma_run_tasklet(unsigned long data)
77{
78	struct rpcrdma_rep *rep;
79	void (*func)(struct rpcrdma_rep *);
80	unsigned long flags;
81
82	data = data;
83	spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
84	while (!list_empty(&rpcrdma_tasklets_g)) {
85		rep = list_entry(rpcrdma_tasklets_g.next,
86				 struct rpcrdma_rep, rr_list);
87		list_del(&rep->rr_list);
88		func = rep->rr_func;
89		rep->rr_func = NULL;
90		spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
91
92		if (func)
93			func(rep);
94		else
95			rpcrdma_recv_buffer_put(rep);
96
97		spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
98	}
99	spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
100}
101
102static DECLARE_TASKLET(rpcrdma_tasklet_g, rpcrdma_run_tasklet, 0UL);
103
104static inline void
105rpcrdma_schedule_tasklet(struct rpcrdma_rep *rep)
106{
107	unsigned long flags;
108
109	spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
110	list_add_tail(&rep->rr_list, &rpcrdma_tasklets_g);
111	spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
112	tasklet_schedule(&rpcrdma_tasklet_g);
113}
114
115static void
116rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context)
117{
118	struct rpcrdma_ep *ep = context;
119
120	dprintk("RPC:       %s: QP error %X on device %s ep %p\n",
121		__func__, event->event, event->device->name, context);
122	if (ep->rep_connected == 1) {
123		ep->rep_connected = -EIO;
124		ep->rep_func(ep);
125		wake_up_all(&ep->rep_connect_wait);
126	}
127}
128
129static void
130rpcrdma_cq_async_error_upcall(struct ib_event *event, void *context)
131{
132	struct rpcrdma_ep *ep = context;
133
134	dprintk("RPC:       %s: CQ error %X on device %s ep %p\n",
135		__func__, event->event, event->device->name, context);
136	if (ep->rep_connected == 1) {
137		ep->rep_connected = -EIO;
138		ep->rep_func(ep);
139		wake_up_all(&ep->rep_connect_wait);
140	}
141}
142
143static inline
144void rpcrdma_event_process(struct ib_wc *wc)
145{
146	struct rpcrdma_rep *rep =
147			(struct rpcrdma_rep *)(unsigned long) wc->wr_id;
148
149	dprintk("RPC:       %s: event rep %p status %X opcode %X length %u\n",
150		__func__, rep, wc->status, wc->opcode, wc->byte_len);
151
152	if (!rep) /* send or bind completion that we don't care about */
153		return;
154
155	if (IB_WC_SUCCESS != wc->status) {
156		dprintk("RPC:       %s: %s WC status %X, connection lost\n",
157			__func__, (wc->opcode & IB_WC_RECV) ? "recv" : "send",
158			 wc->status);
159		rep->rr_len = ~0U;
160		rpcrdma_schedule_tasklet(rep);
161		return;
162	}
163
164	switch (wc->opcode) {
165	case IB_WC_RECV:
166		rep->rr_len = wc->byte_len;
167		ib_dma_sync_single_for_cpu(
168			rdmab_to_ia(rep->rr_buffer)->ri_id->device,
169			rep->rr_iov.addr, rep->rr_len, DMA_FROM_DEVICE);
170		/* Keep (only) the most recent credits, after check validity */
171		if (rep->rr_len >= 16) {
172			struct rpcrdma_msg *p =
173					(struct rpcrdma_msg *) rep->rr_base;
174			unsigned int credits = ntohl(p->rm_credit);
175			if (credits == 0) {
176				dprintk("RPC:       %s: server"
177					" dropped credits to 0!\n", __func__);
178				/* don't deadlock */
179				credits = 1;
180			} else if (credits > rep->rr_buffer->rb_max_requests) {
181				dprintk("RPC:       %s: server"
182					" over-crediting: %d (%d)\n",
183					__func__, credits,
184					rep->rr_buffer->rb_max_requests);
185				credits = rep->rr_buffer->rb_max_requests;
186			}
187			atomic_set(&rep->rr_buffer->rb_credits, credits);
188		}
189		/* fall through */
190	case IB_WC_BIND_MW:
191		rpcrdma_schedule_tasklet(rep);
192		break;
193	default:
194		dprintk("RPC:       %s: unexpected WC event %X\n",
195			__func__, wc->opcode);
196		break;
197	}
198}
199
200static inline int
201rpcrdma_cq_poll(struct ib_cq *cq)
202{
203	struct ib_wc wc;
204	int rc;
205
206	for (;;) {
207		rc = ib_poll_cq(cq, 1, &wc);
208		if (rc < 0) {
209			dprintk("RPC:       %s: ib_poll_cq failed %i\n",
210				__func__, rc);
211			return rc;
212		}
213		if (rc == 0)
214			break;
215
216		rpcrdma_event_process(&wc);
217	}
218
219	return 0;
220}
221
222/*
223 * rpcrdma_cq_event_upcall
224 *
225 * This upcall handles recv, send, bind and unbind events.
226 * It is reentrant but processes single events in order to maintain
227 * ordering of receives to keep server credits.
228 *
229 * It is the responsibility of the scheduled tasklet to return
230 * recv buffers to the pool. NOTE: this affects synchronization of
231 * connection shutdown. That is, the structures required for
232 * the completion of the reply handler must remain intact until
233 * all memory has been reclaimed.
234 *
235 * Note that send events are suppressed and do not result in an upcall.
236 */
237static void
238rpcrdma_cq_event_upcall(struct ib_cq *cq, void *context)
239{
240	int rc;
241
242	rc = rpcrdma_cq_poll(cq);
243	if (rc)
244		return;
245
246	rc = ib_req_notify_cq(cq, IB_CQ_NEXT_COMP);
247	if (rc) {
248		dprintk("RPC:       %s: ib_req_notify_cq failed %i\n",
249			__func__, rc);
250		return;
251	}
252
253	rpcrdma_cq_poll(cq);
254}
255
256#ifdef RPC_DEBUG
257static const char * const conn[] = {
258	"address resolved",
259	"address error",
260	"route resolved",
261	"route error",
262	"connect request",
263	"connect response",
264	"connect error",
265	"unreachable",
266	"rejected",
267	"established",
268	"disconnected",
269	"device removal"
270};
271#endif
272
273static int
274rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
275{
276	struct rpcrdma_xprt *xprt = id->context;
277	struct rpcrdma_ia *ia = &xprt->rx_ia;
278	struct rpcrdma_ep *ep = &xprt->rx_ep;
279	struct sockaddr_in *addr = (struct sockaddr_in *) &ep->rep_remote_addr;
280	struct ib_qp_attr attr;
281	struct ib_qp_init_attr iattr;
282	int connstate = 0;
283
284	switch (event->event) {
285	case RDMA_CM_EVENT_ADDR_RESOLVED:
286	case RDMA_CM_EVENT_ROUTE_RESOLVED:
287		complete(&ia->ri_done);
288		break;
289	case RDMA_CM_EVENT_ADDR_ERROR:
290		ia->ri_async_rc = -EHOSTUNREACH;
291		dprintk("RPC:       %s: CM address resolution error, ep 0x%p\n",
292			__func__, ep);
293		complete(&ia->ri_done);
294		break;
295	case RDMA_CM_EVENT_ROUTE_ERROR:
296		ia->ri_async_rc = -ENETUNREACH;
297		dprintk("RPC:       %s: CM route resolution error, ep 0x%p\n",
298			__func__, ep);
299		complete(&ia->ri_done);
300		break;
301	case RDMA_CM_EVENT_ESTABLISHED:
302		connstate = 1;
303		ib_query_qp(ia->ri_id->qp, &attr,
304			IB_QP_MAX_QP_RD_ATOMIC | IB_QP_MAX_DEST_RD_ATOMIC,
305			&iattr);
306		dprintk("RPC:       %s: %d responder resources"
307			" (%d initiator)\n",
308			__func__, attr.max_dest_rd_atomic, attr.max_rd_atomic);
309		goto connected;
310	case RDMA_CM_EVENT_CONNECT_ERROR:
311		connstate = -ENOTCONN;
312		goto connected;
313	case RDMA_CM_EVENT_UNREACHABLE:
314		connstate = -ENETDOWN;
315		goto connected;
316	case RDMA_CM_EVENT_REJECTED:
317		connstate = -ECONNREFUSED;
318		goto connected;
319	case RDMA_CM_EVENT_DISCONNECTED:
320		connstate = -ECONNABORTED;
321		goto connected;
322	case RDMA_CM_EVENT_DEVICE_REMOVAL:
323		connstate = -ENODEV;
324connected:
325		dprintk("RPC:       %s: %s: %u.%u.%u.%u:%u"
326			" (ep 0x%p event 0x%x)\n",
327			__func__,
328			(event->event <= 11) ? conn[event->event] :
329						"unknown connection error",
330			NIPQUAD(addr->sin_addr.s_addr),
331			ntohs(addr->sin_port),
332			ep, event->event);
333		atomic_set(&rpcx_to_rdmax(ep->rep_xprt)->rx_buf.rb_credits, 1);
334		dprintk("RPC:       %s: %sconnected\n",
335					__func__, connstate > 0 ? "" : "dis");
336		ep->rep_connected = connstate;
337		ep->rep_func(ep);
338		wake_up_all(&ep->rep_connect_wait);
339		break;
340	default:
341		dprintk("RPC:       %s: unexpected CM event %d\n",
342			__func__, event->event);
343		break;
344	}
345
346	return 0;
347}
348
349static struct rdma_cm_id *
350rpcrdma_create_id(struct rpcrdma_xprt *xprt,
351			struct rpcrdma_ia *ia, struct sockaddr *addr)
352{
353	struct rdma_cm_id *id;
354	int rc;
355
356	init_completion(&ia->ri_done);
357
358	id = rdma_create_id(rpcrdma_conn_upcall, xprt, RDMA_PS_TCP);
359	if (IS_ERR(id)) {
360		rc = PTR_ERR(id);
361		dprintk("RPC:       %s: rdma_create_id() failed %i\n",
362			__func__, rc);
363		return id;
364	}
365
366	ia->ri_async_rc = 0;
367	rc = rdma_resolve_addr(id, NULL, addr, RDMA_RESOLVE_TIMEOUT);
368	if (rc) {
369		dprintk("RPC:       %s: rdma_resolve_addr() failed %i\n",
370			__func__, rc);
371		goto out;
372	}
373	wait_for_completion(&ia->ri_done);
374	rc = ia->ri_async_rc;
375	if (rc)
376		goto out;
377
378	ia->ri_async_rc = 0;
379	rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT);
380	if (rc) {
381		dprintk("RPC:       %s: rdma_resolve_route() failed %i\n",
382			__func__, rc);
383		goto out;
384	}
385	wait_for_completion(&ia->ri_done);
386	rc = ia->ri_async_rc;
387	if (rc)
388		goto out;
389
390	return id;
391
392out:
393	rdma_destroy_id(id);
394	return ERR_PTR(rc);
395}
396
397/*
398 * Drain any cq, prior to teardown.
399 */
400static void
401rpcrdma_clean_cq(struct ib_cq *cq)
402{
403	struct ib_wc wc;
404	int count = 0;
405
406	while (1 == ib_poll_cq(cq, 1, &wc))
407		++count;
408
409	if (count)
410		dprintk("RPC:       %s: flushed %d events (last 0x%x)\n",
411			__func__, count, wc.opcode);
412}
413
414/*
415 * Exported functions.
416 */
417
418/*
419 * Open and initialize an Interface Adapter.
420 *  o initializes fields of struct rpcrdma_ia, including
421 *    interface and provider attributes and protection zone.
422 */
423int
424rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
425{
426	int rc, mem_priv;
427	struct ib_device_attr devattr;
428	struct rpcrdma_ia *ia = &xprt->rx_ia;
429
430	ia->ri_id = rpcrdma_create_id(xprt, ia, addr);
431	if (IS_ERR(ia->ri_id)) {
432		rc = PTR_ERR(ia->ri_id);
433		goto out1;
434	}
435
436	ia->ri_pd = ib_alloc_pd(ia->ri_id->device);
437	if (IS_ERR(ia->ri_pd)) {
438		rc = PTR_ERR(ia->ri_pd);
439		dprintk("RPC:       %s: ib_alloc_pd() failed %i\n",
440			__func__, rc);
441		goto out2;
442	}
443
444	/*
445	 * Query the device to determine if the requested memory
446	 * registration strategy is supported. If it isn't, set the
447	 * strategy to a globally supported model.
448	 */
449	rc = ib_query_device(ia->ri_id->device, &devattr);
450	if (rc) {
451		dprintk("RPC:       %s: ib_query_device failed %d\n",
452			__func__, rc);
453		goto out2;
454	}
455
456	if (devattr.device_cap_flags & IB_DEVICE_LOCAL_DMA_LKEY) {
457		ia->ri_have_dma_lkey = 1;
458		ia->ri_dma_lkey = ia->ri_id->device->local_dma_lkey;
459	}
460
461	switch (memreg) {
462	case RPCRDMA_MEMWINDOWS:
463	case RPCRDMA_MEMWINDOWS_ASYNC:
464		if (!(devattr.device_cap_flags & IB_DEVICE_MEM_WINDOW)) {
465			dprintk("RPC:       %s: MEMWINDOWS registration "
466				"specified but not supported by adapter, "
467				"using slower RPCRDMA_REGISTER\n",
468				__func__);
469			memreg = RPCRDMA_REGISTER;
470		}
471		break;
472	case RPCRDMA_MTHCAFMR:
473		if (!ia->ri_id->device->alloc_fmr) {
474#if RPCRDMA_PERSISTENT_REGISTRATION
475			dprintk("RPC:       %s: MTHCAFMR registration "
476				"specified but not supported by adapter, "
477				"using riskier RPCRDMA_ALLPHYSICAL\n",
478				__func__);
479			memreg = RPCRDMA_ALLPHYSICAL;
480#else
481			dprintk("RPC:       %s: MTHCAFMR registration "
482				"specified but not supported by adapter, "
483				"using slower RPCRDMA_REGISTER\n",
484				__func__);
485			memreg = RPCRDMA_REGISTER;
486#endif
487		}
488		break;
489	case RPCRDMA_FRMR:
490		/* Requires both frmr reg and local dma lkey */
491		if ((devattr.device_cap_flags &
492		     (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) !=
493		    (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) {
494#if RPCRDMA_PERSISTENT_REGISTRATION
495			dprintk("RPC:       %s: FRMR registration "
496				"specified but not supported by adapter, "
497				"using riskier RPCRDMA_ALLPHYSICAL\n",
498				__func__);
499			memreg = RPCRDMA_ALLPHYSICAL;
500#else
501			dprintk("RPC:       %s: FRMR registration "
502				"specified but not supported by adapter, "
503				"using slower RPCRDMA_REGISTER\n",
504				__func__);
505			memreg = RPCRDMA_REGISTER;
506#endif
507		}
508		break;
509	}
510
511	/*
512	 * Optionally obtain an underlying physical identity mapping in
513	 * order to do a memory window-based bind. This base registration
514	 * is protected from remote access - that is enabled only by binding
515	 * for the specific bytes targeted during each RPC operation, and
516	 * revoked after the corresponding completion similar to a storage
517	 * adapter.
518	 */
519	switch (memreg) {
520	case RPCRDMA_BOUNCEBUFFERS:
521	case RPCRDMA_REGISTER:
522	case RPCRDMA_FRMR:
523		break;
524#if RPCRDMA_PERSISTENT_REGISTRATION
525	case RPCRDMA_ALLPHYSICAL:
526		mem_priv = IB_ACCESS_LOCAL_WRITE |
527				IB_ACCESS_REMOTE_WRITE |
528				IB_ACCESS_REMOTE_READ;
529		goto register_setup;
530#endif
531	case RPCRDMA_MEMWINDOWS_ASYNC:
532	case RPCRDMA_MEMWINDOWS:
533		mem_priv = IB_ACCESS_LOCAL_WRITE |
534				IB_ACCESS_MW_BIND;
535		goto register_setup;
536	case RPCRDMA_MTHCAFMR:
537		if (ia->ri_have_dma_lkey)
538			break;
539		mem_priv = IB_ACCESS_LOCAL_WRITE;
540	register_setup:
541		ia->ri_bind_mem = ib_get_dma_mr(ia->ri_pd, mem_priv);
542		if (IS_ERR(ia->ri_bind_mem)) {
543			printk(KERN_ALERT "%s: ib_get_dma_mr for "
544				"phys register failed with %lX\n\t"
545				"Will continue with degraded performance\n",
546				__func__, PTR_ERR(ia->ri_bind_mem));
547			memreg = RPCRDMA_REGISTER;
548			ia->ri_bind_mem = NULL;
549		}
550		break;
551	default:
552		printk(KERN_ERR "%s: invalid memory registration mode %d\n",
553				__func__, memreg);
554		rc = -EINVAL;
555		goto out2;
556	}
557	dprintk("RPC:       %s: memory registration strategy is %d\n",
558		__func__, memreg);
559
560	/* Else will do memory reg/dereg for each chunk */
561	ia->ri_memreg_strategy = memreg;
562
563	return 0;
564out2:
565	rdma_destroy_id(ia->ri_id);
566	ia->ri_id = NULL;
567out1:
568	return rc;
569}
570
571/*
572 * Clean up/close an IA.
573 *   o if event handles and PD have been initialized, free them.
574 *   o close the IA
575 */
576void
577rpcrdma_ia_close(struct rpcrdma_ia *ia)
578{
579	int rc;
580
581	dprintk("RPC:       %s: entering\n", __func__);
582	if (ia->ri_bind_mem != NULL) {
583		rc = ib_dereg_mr(ia->ri_bind_mem);
584		dprintk("RPC:       %s: ib_dereg_mr returned %i\n",
585			__func__, rc);
586	}
587	if (ia->ri_id != NULL && !IS_ERR(ia->ri_id)) {
588		if (ia->ri_id->qp)
589			rdma_destroy_qp(ia->ri_id);
590		rdma_destroy_id(ia->ri_id);
591		ia->ri_id = NULL;
592	}
593	if (ia->ri_pd != NULL && !IS_ERR(ia->ri_pd)) {
594		rc = ib_dealloc_pd(ia->ri_pd);
595		dprintk("RPC:       %s: ib_dealloc_pd returned %i\n",
596			__func__, rc);
597	}
598}
599
600/*
601 * Create unconnected endpoint.
602 */
603int
604rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
605				struct rpcrdma_create_data_internal *cdata)
606{
607	struct ib_device_attr devattr;
608	int rc, err;
609
610	rc = ib_query_device(ia->ri_id->device, &devattr);
611	if (rc) {
612		dprintk("RPC:       %s: ib_query_device failed %d\n",
613			__func__, rc);
614		return rc;
615	}
616
617	/* check provider's send/recv wr limits */
618	if (cdata->max_requests > devattr.max_qp_wr)
619		cdata->max_requests = devattr.max_qp_wr;
620
621	ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall;
622	ep->rep_attr.qp_context = ep;
623	/* send_cq and recv_cq initialized below */
624	ep->rep_attr.srq = NULL;
625	ep->rep_attr.cap.max_send_wr = cdata->max_requests;
626	switch (ia->ri_memreg_strategy) {
627	case RPCRDMA_FRMR:
628		/* Add room for frmr register and invalidate WRs */
629		ep->rep_attr.cap.max_send_wr *= 3;
630		if (ep->rep_attr.cap.max_send_wr > devattr.max_qp_wr)
631			return -EINVAL;
632		break;
633	case RPCRDMA_MEMWINDOWS_ASYNC:
634	case RPCRDMA_MEMWINDOWS:
635		/* Add room for mw_binds+unbinds - overkill! */
636		ep->rep_attr.cap.max_send_wr++;
637		ep->rep_attr.cap.max_send_wr *= (2 * RPCRDMA_MAX_SEGS);
638		if (ep->rep_attr.cap.max_send_wr > devattr.max_qp_wr)
639			return -EINVAL;
640		break;
641	default:
642		break;
643	}
644	ep->rep_attr.cap.max_recv_wr = cdata->max_requests;
645	ep->rep_attr.cap.max_send_sge = (cdata->padding ? 4 : 2);
646	ep->rep_attr.cap.max_recv_sge = 1;
647	ep->rep_attr.cap.max_inline_data = 0;
648	ep->rep_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
649	ep->rep_attr.qp_type = IB_QPT_RC;
650	ep->rep_attr.port_num = ~0;
651
652	dprintk("RPC:       %s: requested max: dtos: send %d recv %d; "
653		"iovs: send %d recv %d\n",
654		__func__,
655		ep->rep_attr.cap.max_send_wr,
656		ep->rep_attr.cap.max_recv_wr,
657		ep->rep_attr.cap.max_send_sge,
658		ep->rep_attr.cap.max_recv_sge);
659
660	/* set trigger for requesting send completion */
661	ep->rep_cqinit = ep->rep_attr.cap.max_send_wr/2 /*  - 1*/;
662	switch (ia->ri_memreg_strategy) {
663	case RPCRDMA_MEMWINDOWS_ASYNC:
664	case RPCRDMA_MEMWINDOWS:
665		ep->rep_cqinit -= RPCRDMA_MAX_SEGS;
666		break;
667	default:
668		break;
669	}
670	if (ep->rep_cqinit <= 2)
671		ep->rep_cqinit = 0;
672	INIT_CQCOUNT(ep);
673	ep->rep_ia = ia;
674	init_waitqueue_head(&ep->rep_connect_wait);
675
676	/*
677	 * Create a single cq for receive dto and mw_bind (only ever
678	 * care about unbind, really). Send completions are suppressed.
679	 * Use single threaded tasklet upcalls to maintain ordering.
680	 */
681	ep->rep_cq = ib_create_cq(ia->ri_id->device, rpcrdma_cq_event_upcall,
682				  rpcrdma_cq_async_error_upcall, NULL,
683				  ep->rep_attr.cap.max_recv_wr +
684				  ep->rep_attr.cap.max_send_wr + 1, 0);
685	if (IS_ERR(ep->rep_cq)) {
686		rc = PTR_ERR(ep->rep_cq);
687		dprintk("RPC:       %s: ib_create_cq failed: %i\n",
688			__func__, rc);
689		goto out1;
690	}
691
692	rc = ib_req_notify_cq(ep->rep_cq, IB_CQ_NEXT_COMP);
693	if (rc) {
694		dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
695			__func__, rc);
696		goto out2;
697	}
698
699	ep->rep_attr.send_cq = ep->rep_cq;
700	ep->rep_attr.recv_cq = ep->rep_cq;
701
702	/* Initialize cma parameters */
703
704	/* RPC/RDMA does not use private data */
705	ep->rep_remote_cma.private_data = NULL;
706	ep->rep_remote_cma.private_data_len = 0;
707
708	/* Client offers RDMA Read but does not initiate */
709	ep->rep_remote_cma.initiator_depth = 0;
710	if (ia->ri_memreg_strategy == RPCRDMA_BOUNCEBUFFERS)
711		ep->rep_remote_cma.responder_resources = 0;
712	else if (devattr.max_qp_rd_atom > 32)	/* arbitrary but <= 255 */
713		ep->rep_remote_cma.responder_resources = 32;
714	else
715		ep->rep_remote_cma.responder_resources = devattr.max_qp_rd_atom;
716
717	ep->rep_remote_cma.retry_count = 7;
718	ep->rep_remote_cma.flow_control = 0;
719	ep->rep_remote_cma.rnr_retry_count = 0;
720
721	return 0;
722
723out2:
724	err = ib_destroy_cq(ep->rep_cq);
725	if (err)
726		dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
727			__func__, err);
728out1:
729	return rc;
730}
731
732/*
733 * rpcrdma_ep_destroy
734 *
735 * Disconnect and destroy endpoint. After this, the only
736 * valid operations on the ep are to free it (if dynamically
737 * allocated) or re-create it.
738 *
739 * The caller's error handling must be sure to not leak the endpoint
740 * if this function fails.
741 */
742int
743rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
744{
745	int rc;
746
747	dprintk("RPC:       %s: entering, connected is %d\n",
748		__func__, ep->rep_connected);
749
750	if (ia->ri_id->qp) {
751		rc = rpcrdma_ep_disconnect(ep, ia);
752		if (rc)
753			dprintk("RPC:       %s: rpcrdma_ep_disconnect"
754				" returned %i\n", __func__, rc);
755		rdma_destroy_qp(ia->ri_id);
756		ia->ri_id->qp = NULL;
757	}
758
759	/* padding - could be done in rpcrdma_buffer_destroy... */
760	if (ep->rep_pad_mr) {
761		rpcrdma_deregister_internal(ia, ep->rep_pad_mr, &ep->rep_pad);
762		ep->rep_pad_mr = NULL;
763	}
764
765	rpcrdma_clean_cq(ep->rep_cq);
766	rc = ib_destroy_cq(ep->rep_cq);
767	if (rc)
768		dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
769			__func__, rc);
770
771	return rc;
772}
773
774/*
775 * Connect unconnected endpoint.
776 */
777int
778rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
779{
780	struct rdma_cm_id *id;
781	int rc = 0;
782	int retry_count = 0;
783	int reconnect = (ep->rep_connected != 0);
784
785	if (reconnect) {
786		struct rpcrdma_xprt *xprt;
787retry:
788		rc = rpcrdma_ep_disconnect(ep, ia);
789		if (rc && rc != -ENOTCONN)
790			dprintk("RPC:       %s: rpcrdma_ep_disconnect"
791				" status %i\n", __func__, rc);
792		rpcrdma_clean_cq(ep->rep_cq);
793
794		xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
795		id = rpcrdma_create_id(xprt, ia,
796				(struct sockaddr *)&xprt->rx_data.addr);
797		if (IS_ERR(id)) {
798			rc = PTR_ERR(id);
799			goto out;
800		}
801		/* TEMP TEMP TEMP - fail if new device:
802		 * Deregister/remarshal *all* requests!
803		 * Close and recreate adapter, pd, etc!
804		 * Re-determine all attributes still sane!
805		 * More stuff I haven't thought of!
806		 * Rrrgh!
807		 */
808		if (ia->ri_id->device != id->device) {
809			printk("RPC:       %s: can't reconnect on "
810				"different device!\n", __func__);
811			rdma_destroy_id(id);
812			rc = -ENETDOWN;
813			goto out;
814		}
815		/* END TEMP */
816		rdma_destroy_qp(ia->ri_id);
817		rdma_destroy_id(ia->ri_id);
818		ia->ri_id = id;
819	}
820
821	rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr);
822	if (rc) {
823		dprintk("RPC:       %s: rdma_create_qp failed %i\n",
824			__func__, rc);
825		goto out;
826	}
827
828/* XXX Tavor device performs badly with 2K MTU! */
829if (strnicmp(ia->ri_id->device->dma_device->bus->name, "pci", 3) == 0) {
830	struct pci_dev *pcid = to_pci_dev(ia->ri_id->device->dma_device);
831	if (pcid->device == PCI_DEVICE_ID_MELLANOX_TAVOR &&
832	    (pcid->vendor == PCI_VENDOR_ID_MELLANOX ||
833	     pcid->vendor == PCI_VENDOR_ID_TOPSPIN)) {
834		struct ib_qp_attr attr = {
835			.path_mtu = IB_MTU_1024
836		};
837		rc = ib_modify_qp(ia->ri_id->qp, &attr, IB_QP_PATH_MTU);
838	}
839}
840
841	ep->rep_connected = 0;
842
843	rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma);
844	if (rc) {
845		dprintk("RPC:       %s: rdma_connect() failed with %i\n",
846				__func__, rc);
847		goto out;
848	}
849
850	if (reconnect)
851		return 0;
852
853	wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 0);
854
855	/*
856	 * Check state. A non-peer reject indicates no listener
857	 * (ECONNREFUSED), which may be a transient state. All
858	 * others indicate a transport condition which has already
859	 * undergone a best-effort.
860	 */
861	if (ep->rep_connected == -ECONNREFUSED
862	    && ++retry_count <= RDMA_CONNECT_RETRY_MAX) {
863		dprintk("RPC:       %s: non-peer_reject, retry\n", __func__);
864		goto retry;
865	}
866	if (ep->rep_connected <= 0) {
867		/* Sometimes, the only way to reliably connect to remote
868		 * CMs is to use same nonzero values for ORD and IRD. */
869		if (retry_count++ <= RDMA_CONNECT_RETRY_MAX + 1 &&
870		    (ep->rep_remote_cma.responder_resources == 0 ||
871		     ep->rep_remote_cma.initiator_depth !=
872				ep->rep_remote_cma.responder_resources)) {
873			if (ep->rep_remote_cma.responder_resources == 0)
874				ep->rep_remote_cma.responder_resources = 1;
875			ep->rep_remote_cma.initiator_depth =
876				ep->rep_remote_cma.responder_resources;
877			goto retry;
878		}
879		rc = ep->rep_connected;
880	} else {
881		dprintk("RPC:       %s: connected\n", __func__);
882	}
883
884out:
885	if (rc)
886		ep->rep_connected = rc;
887	return rc;
888}
889
890/*
891 * rpcrdma_ep_disconnect
892 *
893 * This is separate from destroy to facilitate the ability
894 * to reconnect without recreating the endpoint.
895 *
896 * This call is not reentrant, and must not be made in parallel
897 * on the same endpoint.
898 */
899int
900rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
901{
902	int rc;
903
904	rpcrdma_clean_cq(ep->rep_cq);
905	rc = rdma_disconnect(ia->ri_id);
906	if (!rc) {
907		/* returns without wait if not connected */
908		wait_event_interruptible(ep->rep_connect_wait,
909							ep->rep_connected != 1);
910		dprintk("RPC:       %s: after wait, %sconnected\n", __func__,
911			(ep->rep_connected == 1) ? "still " : "dis");
912	} else {
913		dprintk("RPC:       %s: rdma_disconnect %i\n", __func__, rc);
914		ep->rep_connected = rc;
915	}
916	return rc;
917}
918
919/*
920 * Initialize buffer memory
921 */
922int
923rpcrdma_buffer_create(struct rpcrdma_buffer *buf, struct rpcrdma_ep *ep,
924	struct rpcrdma_ia *ia, struct rpcrdma_create_data_internal *cdata)
925{
926	char *p;
927	size_t len;
928	int i, rc;
929	struct rpcrdma_mw *r;
930
931	buf->rb_max_requests = cdata->max_requests;
932	spin_lock_init(&buf->rb_lock);
933	atomic_set(&buf->rb_credits, 1);
934
935	/* Need to allocate:
936	 *   1.  arrays for send and recv pointers
937	 *   2.  arrays of struct rpcrdma_req to fill in pointers
938	 *   3.  array of struct rpcrdma_rep for replies
939	 *   4.  padding, if any
940	 *   5.  mw's, fmr's or frmr's, if any
941	 * Send/recv buffers in req/rep need to be registered
942	 */
943
944	len = buf->rb_max_requests *
945		(sizeof(struct rpcrdma_req *) + sizeof(struct rpcrdma_rep *));
946	len += cdata->padding;
947	switch (ia->ri_memreg_strategy) {
948	case RPCRDMA_FRMR:
949		len += buf->rb_max_requests * RPCRDMA_MAX_SEGS *
950				sizeof(struct rpcrdma_mw);
951		break;
952	case RPCRDMA_MTHCAFMR:
953		/* TBD we are perhaps overallocating here */
954		len += (buf->rb_max_requests + 1) * RPCRDMA_MAX_SEGS *
955				sizeof(struct rpcrdma_mw);
956		break;
957	case RPCRDMA_MEMWINDOWS_ASYNC:
958	case RPCRDMA_MEMWINDOWS:
959		len += (buf->rb_max_requests + 1) * RPCRDMA_MAX_SEGS *
960				sizeof(struct rpcrdma_mw);
961		break;
962	default:
963		break;
964	}
965
966	/* allocate 1, 4 and 5 in one shot */
967	p = kzalloc(len, GFP_KERNEL);
968	if (p == NULL) {
969		dprintk("RPC:       %s: req_t/rep_t/pad kzalloc(%zd) failed\n",
970			__func__, len);
971		rc = -ENOMEM;
972		goto out;
973	}
974	buf->rb_pool = p;	/* for freeing it later */
975
976	buf->rb_send_bufs = (struct rpcrdma_req **) p;
977	p = (char *) &buf->rb_send_bufs[buf->rb_max_requests];
978	buf->rb_recv_bufs = (struct rpcrdma_rep **) p;
979	p = (char *) &buf->rb_recv_bufs[buf->rb_max_requests];
980
981	/*
982	 * Register the zeroed pad buffer, if any.
983	 */
984	if (cdata->padding) {
985		rc = rpcrdma_register_internal(ia, p, cdata->padding,
986					    &ep->rep_pad_mr, &ep->rep_pad);
987		if (rc)
988			goto out;
989	}
990	p += cdata->padding;
991
992	/*
993	 * Allocate the fmr's, or mw's for mw_bind chunk registration.
994	 * We "cycle" the mw's in order to minimize rkey reuse,
995	 * and also reduce unbind-to-bind collision.
996	 */
997	INIT_LIST_HEAD(&buf->rb_mws);
998	r = (struct rpcrdma_mw *)p;
999	switch (ia->ri_memreg_strategy) {
1000	case RPCRDMA_FRMR:
1001		for (i = buf->rb_max_requests * RPCRDMA_MAX_SEGS; i; i--) {
1002			r->r.frmr.fr_mr = ib_alloc_fast_reg_mr(ia->ri_pd,
1003							 RPCRDMA_MAX_SEGS);
1004			if (IS_ERR(r->r.frmr.fr_mr)) {
1005				rc = PTR_ERR(r->r.frmr.fr_mr);
1006				dprintk("RPC:       %s: ib_alloc_fast_reg_mr"
1007					" failed %i\n", __func__, rc);
1008				goto out;
1009			}
1010			r->r.frmr.fr_pgl =
1011				ib_alloc_fast_reg_page_list(ia->ri_id->device,
1012							    RPCRDMA_MAX_SEGS);
1013			if (IS_ERR(r->r.frmr.fr_pgl)) {
1014				rc = PTR_ERR(r->r.frmr.fr_pgl);
1015				dprintk("RPC:       %s: "
1016					"ib_alloc_fast_reg_page_list "
1017					"failed %i\n", __func__, rc);
1018				goto out;
1019			}
1020			list_add(&r->mw_list, &buf->rb_mws);
1021			++r;
1022		}
1023		break;
1024	case RPCRDMA_MTHCAFMR:
1025		/* TBD we are perhaps overallocating here */
1026		for (i = (buf->rb_max_requests+1) * RPCRDMA_MAX_SEGS; i; i--) {
1027			static struct ib_fmr_attr fa =
1028				{ RPCRDMA_MAX_DATA_SEGS, 1, PAGE_SHIFT };
1029			r->r.fmr = ib_alloc_fmr(ia->ri_pd,
1030				IB_ACCESS_REMOTE_WRITE | IB_ACCESS_REMOTE_READ,
1031				&fa);
1032			if (IS_ERR(r->r.fmr)) {
1033				rc = PTR_ERR(r->r.fmr);
1034				dprintk("RPC:       %s: ib_alloc_fmr"
1035					" failed %i\n", __func__, rc);
1036				goto out;
1037			}
1038			list_add(&r->mw_list, &buf->rb_mws);
1039			++r;
1040		}
1041		break;
1042	case RPCRDMA_MEMWINDOWS_ASYNC:
1043	case RPCRDMA_MEMWINDOWS:
1044		/* Allocate one extra request's worth, for full cycling */
1045		for (i = (buf->rb_max_requests+1) * RPCRDMA_MAX_SEGS; i; i--) {
1046			r->r.mw = ib_alloc_mw(ia->ri_pd);
1047			if (IS_ERR(r->r.mw)) {
1048				rc = PTR_ERR(r->r.mw);
1049				dprintk("RPC:       %s: ib_alloc_mw"
1050					" failed %i\n", __func__, rc);
1051				goto out;
1052			}
1053			list_add(&r->mw_list, &buf->rb_mws);
1054			++r;
1055		}
1056		break;
1057	default:
1058		break;
1059	}
1060
1061	/*
1062	 * Allocate/init the request/reply buffers. Doing this
1063	 * using kmalloc for now -- one for each buf.
1064	 */
1065	for (i = 0; i < buf->rb_max_requests; i++) {
1066		struct rpcrdma_req *req;
1067		struct rpcrdma_rep *rep;
1068
1069		len = cdata->inline_wsize + sizeof(struct rpcrdma_req);
1070		/* RPC layer requests *double* size + 1K RPC_SLACK_SPACE! */
1071		/* Typical ~2400b, so rounding up saves work later */
1072		if (len < 4096)
1073			len = 4096;
1074		req = kmalloc(len, GFP_KERNEL);
1075		if (req == NULL) {
1076			dprintk("RPC:       %s: request buffer %d alloc"
1077				" failed\n", __func__, i);
1078			rc = -ENOMEM;
1079			goto out;
1080		}
1081		memset(req, 0, sizeof(struct rpcrdma_req));
1082		buf->rb_send_bufs[i] = req;
1083		buf->rb_send_bufs[i]->rl_buffer = buf;
1084
1085		rc = rpcrdma_register_internal(ia, req->rl_base,
1086				len - offsetof(struct rpcrdma_req, rl_base),
1087				&buf->rb_send_bufs[i]->rl_handle,
1088				&buf->rb_send_bufs[i]->rl_iov);
1089		if (rc)
1090			goto out;
1091
1092		buf->rb_send_bufs[i]->rl_size = len-sizeof(struct rpcrdma_req);
1093
1094		len = cdata->inline_rsize + sizeof(struct rpcrdma_rep);
1095		rep = kmalloc(len, GFP_KERNEL);
1096		if (rep == NULL) {
1097			dprintk("RPC:       %s: reply buffer %d alloc failed\n",
1098				__func__, i);
1099			rc = -ENOMEM;
1100			goto out;
1101		}
1102		memset(rep, 0, sizeof(struct rpcrdma_rep));
1103		buf->rb_recv_bufs[i] = rep;
1104		buf->rb_recv_bufs[i]->rr_buffer = buf;
1105		init_waitqueue_head(&rep->rr_unbind);
1106
1107		rc = rpcrdma_register_internal(ia, rep->rr_base,
1108				len - offsetof(struct rpcrdma_rep, rr_base),
1109				&buf->rb_recv_bufs[i]->rr_handle,
1110				&buf->rb_recv_bufs[i]->rr_iov);
1111		if (rc)
1112			goto out;
1113
1114	}
1115	dprintk("RPC:       %s: max_requests %d\n",
1116		__func__, buf->rb_max_requests);
1117	/* done */
1118	return 0;
1119out:
1120	rpcrdma_buffer_destroy(buf);
1121	return rc;
1122}
1123
1124/*
1125 * Unregister and destroy buffer memory. Need to deal with
1126 * partial initialization, so it's callable from failed create.
1127 * Must be called before destroying endpoint, as registrations
1128 * reference it.
1129 */
1130void
1131rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
1132{
1133	int rc, i;
1134	struct rpcrdma_ia *ia = rdmab_to_ia(buf);
1135	struct rpcrdma_mw *r;
1136
1137	/* clean up in reverse order from create
1138	 *   1.  recv mr memory (mr free, then kfree)
1139	 *   1a. bind mw memory
1140	 *   2.  send mr memory (mr free, then kfree)
1141	 *   3.  padding (if any) [moved to rpcrdma_ep_destroy]
1142	 *   4.  arrays
1143	 */
1144	dprintk("RPC:       %s: entering\n", __func__);
1145
1146	for (i = 0; i < buf->rb_max_requests; i++) {
1147		if (buf->rb_recv_bufs && buf->rb_recv_bufs[i]) {
1148			rpcrdma_deregister_internal(ia,
1149					buf->rb_recv_bufs[i]->rr_handle,
1150					&buf->rb_recv_bufs[i]->rr_iov);
1151			kfree(buf->rb_recv_bufs[i]);
1152		}
1153		if (buf->rb_send_bufs && buf->rb_send_bufs[i]) {
1154			while (!list_empty(&buf->rb_mws)) {
1155				r = list_entry(buf->rb_mws.next,
1156					struct rpcrdma_mw, mw_list);
1157				list_del(&r->mw_list);
1158				switch (ia->ri_memreg_strategy) {
1159				case RPCRDMA_FRMR:
1160					rc = ib_dereg_mr(r->r.frmr.fr_mr);
1161					if (rc)
1162						dprintk("RPC:       %s:"
1163							" ib_dereg_mr"
1164							" failed %i\n",
1165							__func__, rc);
1166					ib_free_fast_reg_page_list(r->r.frmr.fr_pgl);
1167					break;
1168				case RPCRDMA_MTHCAFMR:
1169					rc = ib_dealloc_fmr(r->r.fmr);
1170					if (rc)
1171						dprintk("RPC:       %s:"
1172							" ib_dealloc_fmr"
1173							" failed %i\n",
1174							__func__, rc);
1175					break;
1176				case RPCRDMA_MEMWINDOWS_ASYNC:
1177				case RPCRDMA_MEMWINDOWS:
1178					rc = ib_dealloc_mw(r->r.mw);
1179					if (rc)
1180						dprintk("RPC:       %s:"
1181							" ib_dealloc_mw"
1182							" failed %i\n",
1183							__func__, rc);
1184					break;
1185				default:
1186					break;
1187				}
1188			}
1189			rpcrdma_deregister_internal(ia,
1190					buf->rb_send_bufs[i]->rl_handle,
1191					&buf->rb_send_bufs[i]->rl_iov);
1192			kfree(buf->rb_send_bufs[i]);
1193		}
1194	}
1195
1196	kfree(buf->rb_pool);
1197}
1198
1199/*
1200 * Get a set of request/reply buffers.
1201 *
1202 * Reply buffer (if needed) is attached to send buffer upon return.
1203 * Rule:
1204 *    rb_send_index and rb_recv_index MUST always be pointing to the
1205 *    *next* available buffer (non-NULL). They are incremented after
1206 *    removing buffers, and decremented *before* returning them.
1207 */
1208struct rpcrdma_req *
1209rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
1210{
1211	struct rpcrdma_req *req;
1212	unsigned long flags;
1213	int i;
1214	struct rpcrdma_mw *r;
1215
1216	spin_lock_irqsave(&buffers->rb_lock, flags);
1217	if (buffers->rb_send_index == buffers->rb_max_requests) {
1218		spin_unlock_irqrestore(&buffers->rb_lock, flags);
1219		dprintk("RPC:       %s: out of request buffers\n", __func__);
1220		return ((struct rpcrdma_req *)NULL);
1221	}
1222
1223	req = buffers->rb_send_bufs[buffers->rb_send_index];
1224	if (buffers->rb_send_index < buffers->rb_recv_index) {
1225		dprintk("RPC:       %s: %d extra receives outstanding (ok)\n",
1226			__func__,
1227			buffers->rb_recv_index - buffers->rb_send_index);
1228		req->rl_reply = NULL;
1229	} else {
1230		req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
1231		buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
1232	}
1233	buffers->rb_send_bufs[buffers->rb_send_index++] = NULL;
1234	if (!list_empty(&buffers->rb_mws)) {
1235		i = RPCRDMA_MAX_SEGS - 1;
1236		do {
1237			r = list_entry(buffers->rb_mws.next,
1238					struct rpcrdma_mw, mw_list);
1239			list_del(&r->mw_list);
1240			req->rl_segments[i].mr_chunk.rl_mw = r;
1241		} while (--i >= 0);
1242	}
1243	spin_unlock_irqrestore(&buffers->rb_lock, flags);
1244	return req;
1245}
1246
1247/*
1248 * Put request/reply buffers back into pool.
1249 * Pre-decrement counter/array index.
1250 */
1251void
1252rpcrdma_buffer_put(struct rpcrdma_req *req)
1253{
1254	struct rpcrdma_buffer *buffers = req->rl_buffer;
1255	struct rpcrdma_ia *ia = rdmab_to_ia(buffers);
1256	int i;
1257	unsigned long flags;
1258
1259	BUG_ON(req->rl_nchunks != 0);
1260	spin_lock_irqsave(&buffers->rb_lock, flags);
1261	buffers->rb_send_bufs[--buffers->rb_send_index] = req;
1262	req->rl_niovs = 0;
1263	if (req->rl_reply) {
1264		buffers->rb_recv_bufs[--buffers->rb_recv_index] = req->rl_reply;
1265		init_waitqueue_head(&req->rl_reply->rr_unbind);
1266		req->rl_reply->rr_func = NULL;
1267		req->rl_reply = NULL;
1268	}
1269	switch (ia->ri_memreg_strategy) {
1270	case RPCRDMA_FRMR:
1271	case RPCRDMA_MTHCAFMR:
1272	case RPCRDMA_MEMWINDOWS_ASYNC:
1273	case RPCRDMA_MEMWINDOWS:
1274		/*
1275		 * Cycle mw's back in reverse order, and "spin" them.
1276		 * This delays and scrambles reuse as much as possible.
1277		 */
1278		i = 1;
1279		do {
1280			struct rpcrdma_mw **mw;
1281			mw = &req->rl_segments[i].mr_chunk.rl_mw;
1282			list_add_tail(&(*mw)->mw_list, &buffers->rb_mws);
1283			*mw = NULL;
1284		} while (++i < RPCRDMA_MAX_SEGS);
1285		list_add_tail(&req->rl_segments[0].mr_chunk.rl_mw->mw_list,
1286					&buffers->rb_mws);
1287		req->rl_segments[0].mr_chunk.rl_mw = NULL;
1288		break;
1289	default:
1290		break;
1291	}
1292	spin_unlock_irqrestore(&buffers->rb_lock, flags);
1293}
1294
1295/*
1296 * Recover reply buffers from pool.
1297 * This happens when recovering from error conditions.
1298 * Post-increment counter/array index.
1299 */
1300void
1301rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
1302{
1303	struct rpcrdma_buffer *buffers = req->rl_buffer;
1304	unsigned long flags;
1305
1306	if (req->rl_iov.length == 0)	/* special case xprt_rdma_allocate() */
1307		buffers = ((struct rpcrdma_req *) buffers)->rl_buffer;
1308	spin_lock_irqsave(&buffers->rb_lock, flags);
1309	if (buffers->rb_recv_index < buffers->rb_max_requests) {
1310		req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
1311		buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
1312	}
1313	spin_unlock_irqrestore(&buffers->rb_lock, flags);
1314}
1315
1316/*
1317 * Put reply buffers back into pool when not attached to
1318 * request. This happens in error conditions, and when
1319 * aborting unbinds. Pre-decrement counter/array index.
1320 */
1321void
1322rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
1323{
1324	struct rpcrdma_buffer *buffers = rep->rr_buffer;
1325	unsigned long flags;
1326
1327	rep->rr_func = NULL;
1328	spin_lock_irqsave(&buffers->rb_lock, flags);
1329	buffers->rb_recv_bufs[--buffers->rb_recv_index] = rep;
1330	spin_unlock_irqrestore(&buffers->rb_lock, flags);
1331}
1332
1333/*
1334 * Wrappers for internal-use kmalloc memory registration, used by buffer code.
1335 */
1336
1337int
1338rpcrdma_register_internal(struct rpcrdma_ia *ia, void *va, int len,
1339				struct ib_mr **mrp, struct ib_sge *iov)
1340{
1341	struct ib_phys_buf ipb;
1342	struct ib_mr *mr;
1343	int rc;
1344
1345	/*
1346	 * All memory passed here was kmalloc'ed, therefore phys-contiguous.
1347	 */
1348	iov->addr = ib_dma_map_single(ia->ri_id->device,
1349			va, len, DMA_BIDIRECTIONAL);
1350	iov->length = len;
1351
1352	if (ia->ri_have_dma_lkey) {
1353		*mrp = NULL;
1354		iov->lkey = ia->ri_dma_lkey;
1355		return 0;
1356	} else if (ia->ri_bind_mem != NULL) {
1357		*mrp = NULL;
1358		iov->lkey = ia->ri_bind_mem->lkey;
1359		return 0;
1360	}
1361
1362	ipb.addr = iov->addr;
1363	ipb.size = iov->length;
1364	mr = ib_reg_phys_mr(ia->ri_pd, &ipb, 1,
1365			IB_ACCESS_LOCAL_WRITE, &iov->addr);
1366
1367	dprintk("RPC:       %s: phys convert: 0x%llx "
1368			"registered 0x%llx length %d\n",
1369			__func__, (unsigned long long)ipb.addr,
1370			(unsigned long long)iov->addr, len);
1371
1372	if (IS_ERR(mr)) {
1373		*mrp = NULL;
1374		rc = PTR_ERR(mr);
1375		dprintk("RPC:       %s: failed with %i\n", __func__, rc);
1376	} else {
1377		*mrp = mr;
1378		iov->lkey = mr->lkey;
1379		rc = 0;
1380	}
1381
1382	return rc;
1383}
1384
1385int
1386rpcrdma_deregister_internal(struct rpcrdma_ia *ia,
1387				struct ib_mr *mr, struct ib_sge *iov)
1388{
1389	int rc;
1390
1391	ib_dma_unmap_single(ia->ri_id->device,
1392			iov->addr, iov->length, DMA_BIDIRECTIONAL);
1393
1394	if (NULL == mr)
1395		return 0;
1396
1397	rc = ib_dereg_mr(mr);
1398	if (rc)
1399		dprintk("RPC:       %s: ib_dereg_mr failed %i\n", __func__, rc);
1400	return rc;
1401}
1402
1403/*
1404 * Wrappers for chunk registration, shared by read/write chunk code.
1405 */
1406
1407static void
1408rpcrdma_map_one(struct rpcrdma_ia *ia, struct rpcrdma_mr_seg *seg, int writing)
1409{
1410	seg->mr_dir = writing ? DMA_FROM_DEVICE : DMA_TO_DEVICE;
1411	seg->mr_dmalen = seg->mr_len;
1412	if (seg->mr_page)
1413		seg->mr_dma = ib_dma_map_page(ia->ri_id->device,
1414				seg->mr_page, offset_in_page(seg->mr_offset),
1415				seg->mr_dmalen, seg->mr_dir);
1416	else
1417		seg->mr_dma = ib_dma_map_single(ia->ri_id->device,
1418				seg->mr_offset,
1419				seg->mr_dmalen, seg->mr_dir);
1420}
1421
1422static void
1423rpcrdma_unmap_one(struct rpcrdma_ia *ia, struct rpcrdma_mr_seg *seg)
1424{
1425	if (seg->mr_page)
1426		ib_dma_unmap_page(ia->ri_id->device,
1427				seg->mr_dma, seg->mr_dmalen, seg->mr_dir);
1428	else
1429		ib_dma_unmap_single(ia->ri_id->device,
1430				seg->mr_dma, seg->mr_dmalen, seg->mr_dir);
1431}
1432
1433static int
1434rpcrdma_register_frmr_external(struct rpcrdma_mr_seg *seg,
1435			int *nsegs, int writing, struct rpcrdma_ia *ia,
1436			struct rpcrdma_xprt *r_xprt)
1437{
1438	struct rpcrdma_mr_seg *seg1 = seg;
1439	struct ib_send_wr frmr_wr, *bad_wr;
1440	u8 key;
1441	int len, pageoff;
1442	int i, rc;
1443
1444	pageoff = offset_in_page(seg1->mr_offset);
1445	seg1->mr_offset -= pageoff;	/* start of page */
1446	seg1->mr_len += pageoff;
1447	len = -pageoff;
1448	if (*nsegs > RPCRDMA_MAX_DATA_SEGS)
1449		*nsegs = RPCRDMA_MAX_DATA_SEGS;
1450	for (i = 0; i < *nsegs;) {
1451		rpcrdma_map_one(ia, seg, writing);
1452		seg1->mr_chunk.rl_mw->r.frmr.fr_pgl->page_list[i] = seg->mr_dma;
1453		len += seg->mr_len;
1454		++seg;
1455		++i;
1456		/* Check for holes */
1457		if ((i < *nsegs && offset_in_page(seg->mr_offset)) ||
1458		    offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
1459			break;
1460	}
1461	dprintk("RPC:       %s: Using frmr %p to map %d segments\n",
1462		__func__, seg1->mr_chunk.rl_mw, i);
1463
1464	/* Bump the key */
1465	key = (u8)(seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey & 0x000000FF);
1466	ib_update_fast_reg_key(seg1->mr_chunk.rl_mw->r.frmr.fr_mr, ++key);
1467
1468	/* Prepare FRMR WR */
1469	memset(&frmr_wr, 0, sizeof frmr_wr);
1470	frmr_wr.opcode = IB_WR_FAST_REG_MR;
1471	frmr_wr.send_flags = 0;			/* unsignaled */
1472	frmr_wr.wr.fast_reg.iova_start = (unsigned long)seg1->mr_dma;
1473	frmr_wr.wr.fast_reg.page_list = seg1->mr_chunk.rl_mw->r.frmr.fr_pgl;
1474	frmr_wr.wr.fast_reg.page_list_len = i;
1475	frmr_wr.wr.fast_reg.page_shift = PAGE_SHIFT;
1476	frmr_wr.wr.fast_reg.length = i << PAGE_SHIFT;
1477	frmr_wr.wr.fast_reg.access_flags = (writing ?
1478				IB_ACCESS_REMOTE_WRITE : IB_ACCESS_REMOTE_READ);
1479	frmr_wr.wr.fast_reg.rkey = seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
1480	DECR_CQCOUNT(&r_xprt->rx_ep);
1481
1482	rc = ib_post_send(ia->ri_id->qp, &frmr_wr, &bad_wr);
1483
1484	if (rc) {
1485		dprintk("RPC:       %s: failed ib_post_send for register,"
1486			" status %i\n", __func__, rc);
1487		while (i--)
1488			rpcrdma_unmap_one(ia, --seg);
1489	} else {
1490		seg1->mr_rkey = seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
1491		seg1->mr_base = seg1->mr_dma + pageoff;
1492		seg1->mr_nsegs = i;
1493		seg1->mr_len = len;
1494	}
1495	*nsegs = i;
1496	return rc;
1497}
1498
1499static int
1500rpcrdma_deregister_frmr_external(struct rpcrdma_mr_seg *seg,
1501			struct rpcrdma_ia *ia, struct rpcrdma_xprt *r_xprt)
1502{
1503	struct rpcrdma_mr_seg *seg1 = seg;
1504	struct ib_send_wr invalidate_wr, *bad_wr;
1505	int rc;
1506
1507	while (seg1->mr_nsegs--)
1508		rpcrdma_unmap_one(ia, seg++);
1509
1510	memset(&invalidate_wr, 0, sizeof invalidate_wr);
1511	invalidate_wr.opcode = IB_WR_LOCAL_INV;
1512	invalidate_wr.send_flags = 0;			/* unsignaled */
1513	invalidate_wr.ex.invalidate_rkey = seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
1514	DECR_CQCOUNT(&r_xprt->rx_ep);
1515
1516	rc = ib_post_send(ia->ri_id->qp, &invalidate_wr, &bad_wr);
1517	if (rc)
1518		dprintk("RPC:       %s: failed ib_post_send for invalidate,"
1519			" status %i\n", __func__, rc);
1520	return rc;
1521}
1522
1523static int
1524rpcrdma_register_fmr_external(struct rpcrdma_mr_seg *seg,
1525			int *nsegs, int writing, struct rpcrdma_ia *ia)
1526{
1527	struct rpcrdma_mr_seg *seg1 = seg;
1528	u64 physaddrs[RPCRDMA_MAX_DATA_SEGS];
1529	int len, pageoff, i, rc;
1530
1531	pageoff = offset_in_page(seg1->mr_offset);
1532	seg1->mr_offset -= pageoff;	/* start of page */
1533	seg1->mr_len += pageoff;
1534	len = -pageoff;
1535	if (*nsegs > RPCRDMA_MAX_DATA_SEGS)
1536		*nsegs = RPCRDMA_MAX_DATA_SEGS;
1537	for (i = 0; i < *nsegs;) {
1538		rpcrdma_map_one(ia, seg, writing);
1539		physaddrs[i] = seg->mr_dma;
1540		len += seg->mr_len;
1541		++seg;
1542		++i;
1543		/* Check for holes */
1544		if ((i < *nsegs && offset_in_page(seg->mr_offset)) ||
1545		    offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
1546			break;
1547	}
1548	rc = ib_map_phys_fmr(seg1->mr_chunk.rl_mw->r.fmr,
1549				physaddrs, i, seg1->mr_dma);
1550	if (rc) {
1551		dprintk("RPC:       %s: failed ib_map_phys_fmr "
1552			"%u@0x%llx+%i (%d)... status %i\n", __func__,
1553			len, (unsigned long long)seg1->mr_dma,
1554			pageoff, i, rc);
1555		while (i--)
1556			rpcrdma_unmap_one(ia, --seg);
1557	} else {
1558		seg1->mr_rkey = seg1->mr_chunk.rl_mw->r.fmr->rkey;
1559		seg1->mr_base = seg1->mr_dma + pageoff;
1560		seg1->mr_nsegs = i;
1561		seg1->mr_len = len;
1562	}
1563	*nsegs = i;
1564	return rc;
1565}
1566
1567static int
1568rpcrdma_deregister_fmr_external(struct rpcrdma_mr_seg *seg,
1569			struct rpcrdma_ia *ia)
1570{
1571	struct rpcrdma_mr_seg *seg1 = seg;
1572	LIST_HEAD(l);
1573	int rc;
1574
1575	list_add(&seg1->mr_chunk.rl_mw->r.fmr->list, &l);
1576	rc = ib_unmap_fmr(&l);
1577	while (seg1->mr_nsegs--)
1578		rpcrdma_unmap_one(ia, seg++);
1579	if (rc)
1580		dprintk("RPC:       %s: failed ib_unmap_fmr,"
1581			" status %i\n", __func__, rc);
1582	return rc;
1583}
1584
1585static int
1586rpcrdma_register_memwin_external(struct rpcrdma_mr_seg *seg,
1587			int *nsegs, int writing, struct rpcrdma_ia *ia,
1588			struct rpcrdma_xprt *r_xprt)
1589{
1590	int mem_priv = (writing ? IB_ACCESS_REMOTE_WRITE :
1591				  IB_ACCESS_REMOTE_READ);
1592	struct ib_mw_bind param;
1593	int rc;
1594
1595	*nsegs = 1;
1596	rpcrdma_map_one(ia, seg, writing);
1597	param.mr = ia->ri_bind_mem;
1598	param.wr_id = 0ULL;	/* no send cookie */
1599	param.addr = seg->mr_dma;
1600	param.length = seg->mr_len;
1601	param.send_flags = 0;
1602	param.mw_access_flags = mem_priv;
1603
1604	DECR_CQCOUNT(&r_xprt->rx_ep);
1605	rc = ib_bind_mw(ia->ri_id->qp, seg->mr_chunk.rl_mw->r.mw, &param);
1606	if (rc) {
1607		dprintk("RPC:       %s: failed ib_bind_mw "
1608			"%u@0x%llx status %i\n",
1609			__func__, seg->mr_len,
1610			(unsigned long long)seg->mr_dma, rc);
1611		rpcrdma_unmap_one(ia, seg);
1612	} else {
1613		seg->mr_rkey = seg->mr_chunk.rl_mw->r.mw->rkey;
1614		seg->mr_base = param.addr;
1615		seg->mr_nsegs = 1;
1616	}
1617	return rc;
1618}
1619
1620static int
1621rpcrdma_deregister_memwin_external(struct rpcrdma_mr_seg *seg,
1622			struct rpcrdma_ia *ia,
1623			struct rpcrdma_xprt *r_xprt, void **r)
1624{
1625	struct ib_mw_bind param;
1626	LIST_HEAD(l);
1627	int rc;
1628
1629	BUG_ON(seg->mr_nsegs != 1);
1630	param.mr = ia->ri_bind_mem;
1631	param.addr = 0ULL;	/* unbind */
1632	param.length = 0;
1633	param.mw_access_flags = 0;
1634	if (*r) {
1635		param.wr_id = (u64) (unsigned long) *r;
1636		param.send_flags = IB_SEND_SIGNALED;
1637		INIT_CQCOUNT(&r_xprt->rx_ep);
1638	} else {
1639		param.wr_id = 0ULL;
1640		param.send_flags = 0;
1641		DECR_CQCOUNT(&r_xprt->rx_ep);
1642	}
1643	rc = ib_bind_mw(ia->ri_id->qp, seg->mr_chunk.rl_mw->r.mw, &param);
1644	rpcrdma_unmap_one(ia, seg);
1645	if (rc)
1646		dprintk("RPC:       %s: failed ib_(un)bind_mw,"
1647			" status %i\n", __func__, rc);
1648	else
1649		*r = NULL;	/* will upcall on completion */
1650	return rc;
1651}
1652
1653static int
1654rpcrdma_register_default_external(struct rpcrdma_mr_seg *seg,
1655			int *nsegs, int writing, struct rpcrdma_ia *ia)
1656{
1657	int mem_priv = (writing ? IB_ACCESS_REMOTE_WRITE :
1658				  IB_ACCESS_REMOTE_READ);
1659	struct rpcrdma_mr_seg *seg1 = seg;
1660	struct ib_phys_buf ipb[RPCRDMA_MAX_DATA_SEGS];
1661	int len, i, rc = 0;
1662
1663	if (*nsegs > RPCRDMA_MAX_DATA_SEGS)
1664		*nsegs = RPCRDMA_MAX_DATA_SEGS;
1665	for (len = 0, i = 0; i < *nsegs;) {
1666		rpcrdma_map_one(ia, seg, writing);
1667		ipb[i].addr = seg->mr_dma;
1668		ipb[i].size = seg->mr_len;
1669		len += seg->mr_len;
1670		++seg;
1671		++i;
1672		/* Check for holes */
1673		if ((i < *nsegs && offset_in_page(seg->mr_offset)) ||
1674		    offset_in_page((seg-1)->mr_offset+(seg-1)->mr_len))
1675			break;
1676	}
1677	seg1->mr_base = seg1->mr_dma;
1678	seg1->mr_chunk.rl_mr = ib_reg_phys_mr(ia->ri_pd,
1679				ipb, i, mem_priv, &seg1->mr_base);
1680	if (IS_ERR(seg1->mr_chunk.rl_mr)) {
1681		rc = PTR_ERR(seg1->mr_chunk.rl_mr);
1682		dprintk("RPC:       %s: failed ib_reg_phys_mr "
1683			"%u@0x%llx (%d)... status %i\n",
1684			__func__, len,
1685			(unsigned long long)seg1->mr_dma, i, rc);
1686		while (i--)
1687			rpcrdma_unmap_one(ia, --seg);
1688	} else {
1689		seg1->mr_rkey = seg1->mr_chunk.rl_mr->rkey;
1690		seg1->mr_nsegs = i;
1691		seg1->mr_len = len;
1692	}
1693	*nsegs = i;
1694	return rc;
1695}
1696
1697static int
1698rpcrdma_deregister_default_external(struct rpcrdma_mr_seg *seg,
1699			struct rpcrdma_ia *ia)
1700{
1701	struct rpcrdma_mr_seg *seg1 = seg;
1702	int rc;
1703
1704	rc = ib_dereg_mr(seg1->mr_chunk.rl_mr);
1705	seg1->mr_chunk.rl_mr = NULL;
1706	while (seg1->mr_nsegs--)
1707		rpcrdma_unmap_one(ia, seg++);
1708	if (rc)
1709		dprintk("RPC:       %s: failed ib_dereg_mr,"
1710			" status %i\n", __func__, rc);
1711	return rc;
1712}
1713
1714int
1715rpcrdma_register_external(struct rpcrdma_mr_seg *seg,
1716			int nsegs, int writing, struct rpcrdma_xprt *r_xprt)
1717{
1718	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1719	int rc = 0;
1720
1721	switch (ia->ri_memreg_strategy) {
1722
1723#if RPCRDMA_PERSISTENT_REGISTRATION
1724	case RPCRDMA_ALLPHYSICAL:
1725		rpcrdma_map_one(ia, seg, writing);
1726		seg->mr_rkey = ia->ri_bind_mem->rkey;
1727		seg->mr_base = seg->mr_dma;
1728		seg->mr_nsegs = 1;
1729		nsegs = 1;
1730		break;
1731#endif
1732
1733	/* Registration using frmr registration */
1734	case RPCRDMA_FRMR:
1735		rc = rpcrdma_register_frmr_external(seg, &nsegs, writing, ia, r_xprt);
1736		break;
1737
1738	/* Registration using fmr memory registration */
1739	case RPCRDMA_MTHCAFMR:
1740		rc = rpcrdma_register_fmr_external(seg, &nsegs, writing, ia);
1741		break;
1742
1743	/* Registration using memory windows */
1744	case RPCRDMA_MEMWINDOWS_ASYNC:
1745	case RPCRDMA_MEMWINDOWS:
1746		rc = rpcrdma_register_memwin_external(seg, &nsegs, writing, ia, r_xprt);
1747		break;
1748
1749	/* Default registration each time */
1750	default:
1751		rc = rpcrdma_register_default_external(seg, &nsegs, writing, ia);
1752		break;
1753	}
1754	if (rc)
1755		return -1;
1756
1757	return nsegs;
1758}
1759
1760int
1761rpcrdma_deregister_external(struct rpcrdma_mr_seg *seg,
1762		struct rpcrdma_xprt *r_xprt, void *r)
1763{
1764	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1765	int nsegs = seg->mr_nsegs, rc;
1766
1767	switch (ia->ri_memreg_strategy) {
1768
1769#if RPCRDMA_PERSISTENT_REGISTRATION
1770	case RPCRDMA_ALLPHYSICAL:
1771		BUG_ON(nsegs != 1);
1772		rpcrdma_unmap_one(ia, seg);
1773		rc = 0;
1774		break;
1775#endif
1776
1777	case RPCRDMA_FRMR:
1778		rc = rpcrdma_deregister_frmr_external(seg, ia, r_xprt);
1779		break;
1780
1781	case RPCRDMA_MTHCAFMR:
1782		rc = rpcrdma_deregister_fmr_external(seg, ia);
1783		break;
1784
1785	case RPCRDMA_MEMWINDOWS_ASYNC:
1786	case RPCRDMA_MEMWINDOWS:
1787		rc = rpcrdma_deregister_memwin_external(seg, ia, r_xprt, &r);
1788		break;
1789
1790	default:
1791		rc = rpcrdma_deregister_default_external(seg, ia);
1792		break;
1793	}
1794	if (r) {
1795		struct rpcrdma_rep *rep = r;
1796		void (*func)(struct rpcrdma_rep *) = rep->rr_func;
1797		rep->rr_func = NULL;
1798		func(rep);	/* dereg done, callback now */
1799	}
1800	return nsegs;
1801}
1802
1803/*
1804 * Prepost any receive buffer, then post send.
1805 *
1806 * Receive buffer is donated to hardware, reclaimed upon recv completion.
1807 */
1808int
1809rpcrdma_ep_post(struct rpcrdma_ia *ia,
1810		struct rpcrdma_ep *ep,
1811		struct rpcrdma_req *req)
1812{
1813	struct ib_send_wr send_wr, *send_wr_fail;
1814	struct rpcrdma_rep *rep = req->rl_reply;
1815	int rc;
1816
1817	if (rep) {
1818		rc = rpcrdma_ep_post_recv(ia, ep, rep);
1819		if (rc)
1820			goto out;
1821		req->rl_reply = NULL;
1822	}
1823
1824	send_wr.next = NULL;
1825	send_wr.wr_id = 0ULL;	/* no send cookie */
1826	send_wr.sg_list = req->rl_send_iov;
1827	send_wr.num_sge = req->rl_niovs;
1828	send_wr.opcode = IB_WR_SEND;
1829	if (send_wr.num_sge == 4)	/* no need to sync any pad (constant) */
1830		ib_dma_sync_single_for_device(ia->ri_id->device,
1831			req->rl_send_iov[3].addr, req->rl_send_iov[3].length,
1832			DMA_TO_DEVICE);
1833	ib_dma_sync_single_for_device(ia->ri_id->device,
1834		req->rl_send_iov[1].addr, req->rl_send_iov[1].length,
1835		DMA_TO_DEVICE);
1836	ib_dma_sync_single_for_device(ia->ri_id->device,
1837		req->rl_send_iov[0].addr, req->rl_send_iov[0].length,
1838		DMA_TO_DEVICE);
1839
1840	if (DECR_CQCOUNT(ep) > 0)
1841		send_wr.send_flags = 0;
1842	else { /* Provider must take a send completion every now and then */
1843		INIT_CQCOUNT(ep);
1844		send_wr.send_flags = IB_SEND_SIGNALED;
1845	}
1846
1847	rc = ib_post_send(ia->ri_id->qp, &send_wr, &send_wr_fail);
1848	if (rc)
1849		dprintk("RPC:       %s: ib_post_send returned %i\n", __func__,
1850			rc);
1851out:
1852	return rc;
1853}
1854
1855/*
1856 * (Re)post a receive buffer.
1857 */
1858int
1859rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
1860		     struct rpcrdma_ep *ep,
1861		     struct rpcrdma_rep *rep)
1862{
1863	struct ib_recv_wr recv_wr, *recv_wr_fail;
1864	int rc;
1865
1866	recv_wr.next = NULL;
1867	recv_wr.wr_id = (u64) (unsigned long) rep;
1868	recv_wr.sg_list = &rep->rr_iov;
1869	recv_wr.num_sge = 1;
1870
1871	ib_dma_sync_single_for_cpu(ia->ri_id->device,
1872		rep->rr_iov.addr, rep->rr_iov.length, DMA_BIDIRECTIONAL);
1873
1874	DECR_CQCOUNT(ep);
1875	rc = ib_post_recv(ia->ri_id->qp, &recv_wr, &recv_wr_fail);
1876
1877	if (rc)
1878		dprintk("RPC:       %s: ib_post_recv returned %i\n", __func__,
1879			rc);
1880	return rc;
1881}
1882