[go: nahoru, domu]

1/*
2 * GPL HEADER START
3 *
4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
15 *
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19 *
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
22 * have any questions.
23 *
24 * GPL HEADER END
25 */
26/*
27 * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
29 *
30 * Copyright (c) 2011, 2012, Intel Corporation.
31 */
32/*
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
35 *
36 * lnet/klnds/o2iblnd/o2iblnd.c
37 *
38 * Author: Eric Barton <eric@bartonsoftware.com>
39 */
40
41#include "o2iblnd.h"
42#include <asm/div64.h>
43
44static lnd_t the_o2iblnd = {
45	.lnd_type       = O2IBLND,
46	.lnd_startup    = kiblnd_startup,
47	.lnd_shutdown   = kiblnd_shutdown,
48	.lnd_ctl	= kiblnd_ctl,
49	.lnd_query      = kiblnd_query,
50	.lnd_send       = kiblnd_send,
51	.lnd_recv       = kiblnd_recv,
52};
53
54kib_data_t	      kiblnd_data;
55
56static __u32
57kiblnd_cksum(void *ptr, int nob)
58{
59	char  *c  = ptr;
60	__u32  sum = 0;
61
62	while (nob-- > 0)
63		sum = ((sum << 1) | (sum >> 31)) + *c++;
64
65	/* ensure I don't return 0 (== no checksum) */
66	return (sum == 0) ? 1 : sum;
67}
68
69static char *
70kiblnd_msgtype2str(int type)
71{
72	switch (type) {
73	case IBLND_MSG_CONNREQ:
74		return "CONNREQ";
75
76	case IBLND_MSG_CONNACK:
77		return "CONNACK";
78
79	case IBLND_MSG_NOOP:
80		return "NOOP";
81
82	case IBLND_MSG_IMMEDIATE:
83		return "IMMEDIATE";
84
85	case IBLND_MSG_PUT_REQ:
86		return "PUT_REQ";
87
88	case IBLND_MSG_PUT_NAK:
89		return "PUT_NAK";
90
91	case IBLND_MSG_PUT_ACK:
92		return "PUT_ACK";
93
94	case IBLND_MSG_PUT_DONE:
95		return "PUT_DONE";
96
97	case IBLND_MSG_GET_REQ:
98		return "GET_REQ";
99
100	case IBLND_MSG_GET_DONE:
101		return "GET_DONE";
102
103	default:
104		return "???";
105	}
106}
107
108static int
109kiblnd_msgtype2size(int type)
110{
111	const int hdr_size = offsetof(kib_msg_t, ibm_u);
112
113	switch (type) {
114	case IBLND_MSG_CONNREQ:
115	case IBLND_MSG_CONNACK:
116		return hdr_size + sizeof(kib_connparams_t);
117
118	case IBLND_MSG_NOOP:
119		return hdr_size;
120
121	case IBLND_MSG_IMMEDIATE:
122		return offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[0]);
123
124	case IBLND_MSG_PUT_REQ:
125		return hdr_size + sizeof(kib_putreq_msg_t);
126
127	case IBLND_MSG_PUT_ACK:
128		return hdr_size + sizeof(kib_putack_msg_t);
129
130	case IBLND_MSG_GET_REQ:
131		return hdr_size + sizeof(kib_get_msg_t);
132
133	case IBLND_MSG_PUT_NAK:
134	case IBLND_MSG_PUT_DONE:
135	case IBLND_MSG_GET_DONE:
136		return hdr_size + sizeof(kib_completion_msg_t);
137	default:
138		return -1;
139	}
140}
141
142static int
143kiblnd_unpack_rd(kib_msg_t *msg, int flip)
144{
145	kib_rdma_desc_t   *rd;
146	int		nob;
147	int		n;
148	int		i;
149
150	LASSERT (msg->ibm_type == IBLND_MSG_GET_REQ ||
151		 msg->ibm_type == IBLND_MSG_PUT_ACK);
152
153	rd = msg->ibm_type == IBLND_MSG_GET_REQ ?
154			      &msg->ibm_u.get.ibgm_rd :
155			      &msg->ibm_u.putack.ibpam_rd;
156
157	if (flip) {
158		__swab32s(&rd->rd_key);
159		__swab32s(&rd->rd_nfrags);
160	}
161
162	n = rd->rd_nfrags;
163
164	if (n <= 0 || n > IBLND_MAX_RDMA_FRAGS) {
165		CERROR("Bad nfrags: %d, should be 0 < n <= %d\n",
166		       n, IBLND_MAX_RDMA_FRAGS);
167		return 1;
168	}
169
170	nob = offsetof (kib_msg_t, ibm_u) +
171	      kiblnd_rd_msg_size(rd, msg->ibm_type, n);
172
173	if (msg->ibm_nob < nob) {
174		CERROR("Short %s: %d(%d)\n",
175		       kiblnd_msgtype2str(msg->ibm_type), msg->ibm_nob, nob);
176		return 1;
177	}
178
179	if (!flip)
180		return 0;
181
182	for (i = 0; i < n; i++) {
183		__swab32s(&rd->rd_frags[i].rf_nob);
184		__swab64s(&rd->rd_frags[i].rf_addr);
185	}
186
187	return 0;
188}
189
190void
191kiblnd_pack_msg (lnet_ni_t *ni, kib_msg_t *msg, int version,
192		 int credits, lnet_nid_t dstnid, __u64 dststamp)
193{
194	kib_net_t *net = ni->ni_data;
195
196	/* CAVEAT EMPTOR! all message fields not set here should have been
197	 * initialised previously. */
198	msg->ibm_magic    = IBLND_MSG_MAGIC;
199	msg->ibm_version  = version;
200	/*   ibm_type */
201	msg->ibm_credits  = credits;
202	/*   ibm_nob */
203	msg->ibm_cksum    = 0;
204	msg->ibm_srcnid   = ni->ni_nid;
205	msg->ibm_srcstamp = net->ibn_incarnation;
206	msg->ibm_dstnid   = dstnid;
207	msg->ibm_dststamp = dststamp;
208
209	if (*kiblnd_tunables.kib_cksum) {
210		/* NB ibm_cksum zero while computing cksum */
211		msg->ibm_cksum = kiblnd_cksum(msg, msg->ibm_nob);
212	}
213}
214
215int
216kiblnd_unpack_msg(kib_msg_t *msg, int nob)
217{
218	const int hdr_size = offsetof(kib_msg_t, ibm_u);
219	__u32     msg_cksum;
220	__u16     version;
221	int       msg_nob;
222	int       flip;
223
224	/* 6 bytes are enough to have received magic + version */
225	if (nob < 6) {
226		CERROR("Short message: %d\n", nob);
227		return -EPROTO;
228	}
229
230	if (msg->ibm_magic == IBLND_MSG_MAGIC) {
231		flip = 0;
232	} else if (msg->ibm_magic == __swab32(IBLND_MSG_MAGIC)) {
233		flip = 1;
234	} else {
235		CERROR("Bad magic: %08x\n", msg->ibm_magic);
236		return -EPROTO;
237	}
238
239	version = flip ? __swab16(msg->ibm_version) : msg->ibm_version;
240	if (version != IBLND_MSG_VERSION &&
241	    version != IBLND_MSG_VERSION_1) {
242		CERROR("Bad version: %x\n", version);
243		return -EPROTO;
244	}
245
246	if (nob < hdr_size) {
247		CERROR("Short message: %d\n", nob);
248		return -EPROTO;
249	}
250
251	msg_nob = flip ? __swab32(msg->ibm_nob) : msg->ibm_nob;
252	if (msg_nob > nob) {
253		CERROR("Short message: got %d, wanted %d\n", nob, msg_nob);
254		return -EPROTO;
255	}
256
257	/* checksum must be computed with ibm_cksum zero and BEFORE anything
258	 * gets flipped */
259	msg_cksum = flip ? __swab32(msg->ibm_cksum) : msg->ibm_cksum;
260	msg->ibm_cksum = 0;
261	if (msg_cksum != 0 &&
262	    msg_cksum != kiblnd_cksum(msg, msg_nob)) {
263		CERROR("Bad checksum\n");
264		return -EPROTO;
265	}
266
267	msg->ibm_cksum = msg_cksum;
268
269	if (flip) {
270		/* leave magic unflipped as a clue to peer endianness */
271		msg->ibm_version = version;
272		CLASSERT (sizeof(msg->ibm_type) == 1);
273		CLASSERT (sizeof(msg->ibm_credits) == 1);
274		msg->ibm_nob     = msg_nob;
275		__swab64s(&msg->ibm_srcnid);
276		__swab64s(&msg->ibm_srcstamp);
277		__swab64s(&msg->ibm_dstnid);
278		__swab64s(&msg->ibm_dststamp);
279	}
280
281	if (msg->ibm_srcnid == LNET_NID_ANY) {
282		CERROR("Bad src nid: %s\n", libcfs_nid2str(msg->ibm_srcnid));
283		return -EPROTO;
284	}
285
286	if (msg_nob < kiblnd_msgtype2size(msg->ibm_type)) {
287		CERROR("Short %s: %d(%d)\n", kiblnd_msgtype2str(msg->ibm_type),
288		       msg_nob, kiblnd_msgtype2size(msg->ibm_type));
289		return -EPROTO;
290	}
291
292	switch (msg->ibm_type) {
293	default:
294		CERROR("Unknown message type %x\n", msg->ibm_type);
295		return -EPROTO;
296
297	case IBLND_MSG_NOOP:
298	case IBLND_MSG_IMMEDIATE:
299	case IBLND_MSG_PUT_REQ:
300		break;
301
302	case IBLND_MSG_PUT_ACK:
303	case IBLND_MSG_GET_REQ:
304		if (kiblnd_unpack_rd(msg, flip))
305			return -EPROTO;
306		break;
307
308	case IBLND_MSG_PUT_NAK:
309	case IBLND_MSG_PUT_DONE:
310	case IBLND_MSG_GET_DONE:
311		if (flip)
312			__swab32s(&msg->ibm_u.completion.ibcm_status);
313		break;
314
315	case IBLND_MSG_CONNREQ:
316	case IBLND_MSG_CONNACK:
317		if (flip) {
318			__swab16s(&msg->ibm_u.connparams.ibcp_queue_depth);
319			__swab16s(&msg->ibm_u.connparams.ibcp_max_frags);
320			__swab32s(&msg->ibm_u.connparams.ibcp_max_msg_size);
321		}
322		break;
323	}
324	return 0;
325}
326
327int
328kiblnd_create_peer(lnet_ni_t *ni, kib_peer_t **peerp, lnet_nid_t nid)
329{
330	kib_peer_t	*peer;
331	kib_net_t	*net = ni->ni_data;
332	int		cpt = lnet_cpt_of_nid(nid);
333	unsigned long   flags;
334
335	LASSERT(net != NULL);
336	LASSERT(nid != LNET_NID_ANY);
337
338	LIBCFS_CPT_ALLOC(peer, lnet_cpt_table(), cpt, sizeof(*peer));
339	if (peer == NULL) {
340		CERROR("Cannot allocate peer\n");
341		return -ENOMEM;
342	}
343
344	memset(peer, 0, sizeof(*peer));	 /* zero flags etc */
345
346	peer->ibp_ni = ni;
347	peer->ibp_nid = nid;
348	peer->ibp_error = 0;
349	peer->ibp_last_alive = 0;
350	atomic_set(&peer->ibp_refcount, 1);  /* 1 ref for caller */
351
352	INIT_LIST_HEAD(&peer->ibp_list);     /* not in the peer table yet */
353	INIT_LIST_HEAD(&peer->ibp_conns);
354	INIT_LIST_HEAD(&peer->ibp_tx_queue);
355
356	write_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
357
358	/* always called with a ref on ni, which prevents ni being shutdown */
359	LASSERT (net->ibn_shutdown == 0);
360
361	/* npeers only grows with the global lock held */
362	atomic_inc(&net->ibn_npeers);
363
364	write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
365
366	*peerp = peer;
367	return 0;
368}
369
370void
371kiblnd_destroy_peer (kib_peer_t *peer)
372{
373	kib_net_t *net = peer->ibp_ni->ni_data;
374
375	LASSERT (net != NULL);
376	LASSERT (atomic_read(&peer->ibp_refcount) == 0);
377	LASSERT (!kiblnd_peer_active(peer));
378	LASSERT (peer->ibp_connecting == 0);
379	LASSERT (peer->ibp_accepting == 0);
380	LASSERT (list_empty(&peer->ibp_conns));
381	LASSERT (list_empty(&peer->ibp_tx_queue));
382
383	LIBCFS_FREE(peer, sizeof(*peer));
384
385	/* NB a peer's connections keep a reference on their peer until
386	 * they are destroyed, so we can be assured that _all_ state to do
387	 * with this peer has been cleaned up when its refcount drops to
388	 * zero. */
389	atomic_dec(&net->ibn_npeers);
390}
391
392kib_peer_t *
393kiblnd_find_peer_locked (lnet_nid_t nid)
394{
395	/* the caller is responsible for accounting the additional reference
396	 * that this creates */
397	struct list_head       *peer_list = kiblnd_nid2peerlist(nid);
398	struct list_head       *tmp;
399	kib_peer_t       *peer;
400
401	list_for_each (tmp, peer_list) {
402
403		peer = list_entry(tmp, kib_peer_t, ibp_list);
404
405		LASSERT (peer->ibp_connecting > 0 || /* creating conns */
406			 peer->ibp_accepting > 0 ||
407			 !list_empty(&peer->ibp_conns));  /* active conn */
408
409		if (peer->ibp_nid != nid)
410			continue;
411
412		CDEBUG(D_NET, "got peer [%p] -> %s (%d) version: %x\n",
413		       peer, libcfs_nid2str(nid),
414		       atomic_read(&peer->ibp_refcount),
415		       peer->ibp_version);
416		return peer;
417	}
418	return NULL;
419}
420
421void
422kiblnd_unlink_peer_locked (kib_peer_t *peer)
423{
424	LASSERT (list_empty(&peer->ibp_conns));
425
426	LASSERT (kiblnd_peer_active(peer));
427	list_del_init(&peer->ibp_list);
428	/* lose peerlist's ref */
429	kiblnd_peer_decref(peer);
430}
431
432static int
433kiblnd_get_peer_info(lnet_ni_t *ni, int index,
434		      lnet_nid_t *nidp, int *count)
435{
436	kib_peer_t	    *peer;
437	struct list_head	    *ptmp;
438	int		    i;
439	unsigned long	  flags;
440
441	read_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
442
443	for (i = 0; i < kiblnd_data.kib_peer_hash_size; i++) {
444
445		list_for_each (ptmp, &kiblnd_data.kib_peers[i]) {
446
447			peer = list_entry(ptmp, kib_peer_t, ibp_list);
448			LASSERT (peer->ibp_connecting > 0 ||
449				 peer->ibp_accepting > 0 ||
450				 !list_empty(&peer->ibp_conns));
451
452			if (peer->ibp_ni != ni)
453				continue;
454
455			if (index-- > 0)
456				continue;
457
458			*nidp = peer->ibp_nid;
459			*count = atomic_read(&peer->ibp_refcount);
460
461			read_unlock_irqrestore(&kiblnd_data.kib_global_lock,
462					       flags);
463			return 0;
464		}
465	}
466
467	read_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
468	return -ENOENT;
469}
470
471static void
472kiblnd_del_peer_locked(kib_peer_t *peer)
473{
474	struct list_head	   *ctmp;
475	struct list_head	   *cnxt;
476	kib_conn_t	   *conn;
477
478	if (list_empty(&peer->ibp_conns)) {
479		kiblnd_unlink_peer_locked(peer);
480	} else {
481		list_for_each_safe (ctmp, cnxt, &peer->ibp_conns) {
482			conn = list_entry(ctmp, kib_conn_t, ibc_list);
483
484			kiblnd_close_conn_locked(conn, 0);
485		}
486		/* NB closing peer's last conn unlinked it. */
487	}
488	/* NB peer now unlinked; might even be freed if the peer table had the
489	 * last ref on it. */
490}
491
492static int
493kiblnd_del_peer(lnet_ni_t *ni, lnet_nid_t nid)
494{
495	LIST_HEAD	 (zombies);
496	struct list_head	    *ptmp;
497	struct list_head	    *pnxt;
498	kib_peer_t	    *peer;
499	int		    lo;
500	int		    hi;
501	int		    i;
502	unsigned long	  flags;
503	int		    rc = -ENOENT;
504
505	write_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
506
507	if (nid != LNET_NID_ANY) {
508		lo = hi = kiblnd_nid2peerlist(nid) - kiblnd_data.kib_peers;
509	} else {
510		lo = 0;
511		hi = kiblnd_data.kib_peer_hash_size - 1;
512	}
513
514	for (i = lo; i <= hi; i++) {
515		list_for_each_safe (ptmp, pnxt, &kiblnd_data.kib_peers[i]) {
516			peer = list_entry(ptmp, kib_peer_t, ibp_list);
517			LASSERT (peer->ibp_connecting > 0 ||
518				 peer->ibp_accepting > 0 ||
519				 !list_empty(&peer->ibp_conns));
520
521			if (peer->ibp_ni != ni)
522				continue;
523
524			if (!(nid == LNET_NID_ANY || peer->ibp_nid == nid))
525				continue;
526
527			if (!list_empty(&peer->ibp_tx_queue)) {
528				LASSERT (list_empty(&peer->ibp_conns));
529
530				list_splice_init(&peer->ibp_tx_queue,
531						     &zombies);
532			}
533
534			kiblnd_del_peer_locked(peer);
535			rc = 0;	 /* matched something */
536		}
537	}
538
539	write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
540
541	kiblnd_txlist_done(ni, &zombies, -EIO);
542
543	return rc;
544}
545
546static kib_conn_t *
547kiblnd_get_conn_by_idx(lnet_ni_t *ni, int index)
548{
549	kib_peer_t	    *peer;
550	struct list_head	    *ptmp;
551	kib_conn_t	    *conn;
552	struct list_head	    *ctmp;
553	int		    i;
554	unsigned long	  flags;
555
556	read_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
557
558	for (i = 0; i < kiblnd_data.kib_peer_hash_size; i++) {
559		list_for_each (ptmp, &kiblnd_data.kib_peers[i]) {
560
561			peer = list_entry(ptmp, kib_peer_t, ibp_list);
562			LASSERT (peer->ibp_connecting > 0 ||
563				 peer->ibp_accepting > 0 ||
564				 !list_empty(&peer->ibp_conns));
565
566			if (peer->ibp_ni != ni)
567				continue;
568
569			list_for_each (ctmp, &peer->ibp_conns) {
570				if (index-- > 0)
571					continue;
572
573				conn = list_entry(ctmp, kib_conn_t,
574						      ibc_list);
575				kiblnd_conn_addref(conn);
576				read_unlock_irqrestore(&kiblnd_data.kib_global_lock,
577						       flags);
578				return conn;
579			}
580		}
581	}
582
583	read_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
584	return NULL;
585}
586
587int
588kiblnd_translate_mtu(int value)
589{
590	switch (value) {
591	default:
592		return -1;
593	case 0:
594		return 0;
595	case 256:
596		return IB_MTU_256;
597	case 512:
598		return IB_MTU_512;
599	case 1024:
600		return IB_MTU_1024;
601	case 2048:
602		return IB_MTU_2048;
603	case 4096:
604		return IB_MTU_4096;
605	}
606}
607
608static void
609kiblnd_setup_mtu_locked(struct rdma_cm_id *cmid)
610{
611	int	   mtu;
612
613	/* XXX There is no path record for iWARP, set by netdev->change_mtu? */
614	if (cmid->route.path_rec == NULL)
615		return;
616
617	mtu = kiblnd_translate_mtu(*kiblnd_tunables.kib_ib_mtu);
618	LASSERT (mtu >= 0);
619	if (mtu != 0)
620		cmid->route.path_rec->mtu = mtu;
621}
622
623static int
624kiblnd_get_completion_vector(kib_conn_t *conn, int cpt)
625{
626	cpumask_t	*mask;
627	int		vectors;
628	int		off;
629	int		i;
630	lnet_nid_t	nid = conn->ibc_peer->ibp_nid;
631
632	vectors = conn->ibc_cmid->device->num_comp_vectors;
633	if (vectors <= 1)
634		return 0;
635
636	mask = cfs_cpt_cpumask(lnet_cpt_table(), cpt);
637	if (mask == NULL)
638		return 0;
639
640	/* hash NID to CPU id in this partition... */
641	off = do_div(nid, cpus_weight(*mask));
642	for_each_cpu_mask(i, *mask) {
643		if (off-- == 0)
644			return i % vectors;
645	}
646
647	LBUG();
648	return 1;
649}
650
651kib_conn_t *
652kiblnd_create_conn(kib_peer_t *peer, struct rdma_cm_id *cmid,
653		   int state, int version)
654{
655	/* CAVEAT EMPTOR:
656	 * If the new conn is created successfully it takes over the caller's
657	 * ref on 'peer'.  It also "owns" 'cmid' and destroys it when it itself
658	 * is destroyed.  On failure, the caller's ref on 'peer' remains and
659	 * she must dispose of 'cmid'.  (Actually I'd block forever if I tried
660	 * to destroy 'cmid' here since I'm called from the CM which still has
661	 * its ref on 'cmid'). */
662	rwlock_t		*glock = &kiblnd_data.kib_global_lock;
663	kib_net_t	      *net = peer->ibp_ni->ni_data;
664	kib_dev_t	      *dev;
665	struct ib_qp_init_attr *init_qp_attr;
666	struct kib_sched_info	*sched;
667	kib_conn_t		*conn;
668	struct ib_cq		*cq;
669	unsigned long		flags;
670	int			cpt;
671	int			rc;
672	int			i;
673
674	LASSERT(net != NULL);
675	LASSERT(!in_interrupt());
676
677	dev = net->ibn_dev;
678
679	cpt = lnet_cpt_of_nid(peer->ibp_nid);
680	sched = kiblnd_data.kib_scheds[cpt];
681
682	LASSERT(sched->ibs_nthreads > 0);
683
684	LIBCFS_CPT_ALLOC(init_qp_attr, lnet_cpt_table(), cpt,
685			 sizeof(*init_qp_attr));
686	if (init_qp_attr == NULL) {
687		CERROR("Can't allocate qp_attr for %s\n",
688		       libcfs_nid2str(peer->ibp_nid));
689		goto failed_0;
690	}
691
692	LIBCFS_CPT_ALLOC(conn, lnet_cpt_table(), cpt, sizeof(*conn));
693	if (conn == NULL) {
694		CERROR("Can't allocate connection for %s\n",
695		       libcfs_nid2str(peer->ibp_nid));
696		goto failed_1;
697	}
698
699	conn->ibc_state = IBLND_CONN_INIT;
700	conn->ibc_version = version;
701	conn->ibc_peer = peer;		  /* I take the caller's ref */
702	cmid->context = conn;		   /* for future CM callbacks */
703	conn->ibc_cmid = cmid;
704
705	INIT_LIST_HEAD(&conn->ibc_early_rxs);
706	INIT_LIST_HEAD(&conn->ibc_tx_noops);
707	INIT_LIST_HEAD(&conn->ibc_tx_queue);
708	INIT_LIST_HEAD(&conn->ibc_tx_queue_rsrvd);
709	INIT_LIST_HEAD(&conn->ibc_tx_queue_nocred);
710	INIT_LIST_HEAD(&conn->ibc_active_txs);
711	spin_lock_init(&conn->ibc_lock);
712
713	LIBCFS_CPT_ALLOC(conn->ibc_connvars, lnet_cpt_table(), cpt,
714			 sizeof(*conn->ibc_connvars));
715	if (conn->ibc_connvars == NULL) {
716		CERROR("Can't allocate in-progress connection state\n");
717		goto failed_2;
718	}
719
720	write_lock_irqsave(glock, flags);
721	if (dev->ibd_failover) {
722		write_unlock_irqrestore(glock, flags);
723		CERROR("%s: failover in progress\n", dev->ibd_ifname);
724		goto failed_2;
725	}
726
727	if (dev->ibd_hdev->ibh_ibdev != cmid->device) {
728		/* wakeup failover thread and teardown connection */
729		if (kiblnd_dev_can_failover(dev)) {
730			list_add_tail(&dev->ibd_fail_list,
731				      &kiblnd_data.kib_failed_devs);
732			wake_up(&kiblnd_data.kib_failover_waitq);
733		}
734
735		write_unlock_irqrestore(glock, flags);
736		CERROR("cmid HCA(%s), kib_dev(%s) need failover\n",
737		       cmid->device->name, dev->ibd_ifname);
738		goto failed_2;
739	}
740
741	kiblnd_hdev_addref_locked(dev->ibd_hdev);
742	conn->ibc_hdev = dev->ibd_hdev;
743
744	kiblnd_setup_mtu_locked(cmid);
745
746	write_unlock_irqrestore(glock, flags);
747
748	LIBCFS_CPT_ALLOC(conn->ibc_rxs, lnet_cpt_table(), cpt,
749			 IBLND_RX_MSGS(version) * sizeof(kib_rx_t));
750	if (conn->ibc_rxs == NULL) {
751		CERROR("Cannot allocate RX buffers\n");
752		goto failed_2;
753	}
754
755	rc = kiblnd_alloc_pages(&conn->ibc_rx_pages, cpt,
756				IBLND_RX_MSG_PAGES(version));
757	if (rc != 0)
758		goto failed_2;
759
760	kiblnd_map_rx_descs(conn);
761
762	cq = ib_create_cq(cmid->device,
763			  kiblnd_cq_completion, kiblnd_cq_event, conn,
764			  IBLND_CQ_ENTRIES(version),
765			  kiblnd_get_completion_vector(conn, cpt));
766	if (IS_ERR(cq)) {
767		CERROR("Can't create CQ: %ld, cqe: %d\n",
768		       PTR_ERR(cq), IBLND_CQ_ENTRIES(version));
769		goto failed_2;
770	}
771
772	conn->ibc_cq = cq;
773
774	rc = ib_req_notify_cq(cq, IB_CQ_NEXT_COMP);
775	if (rc != 0) {
776		CERROR("Can't request completion notificiation: %d\n", rc);
777		goto failed_2;
778	}
779
780	init_qp_attr->event_handler = kiblnd_qp_event;
781	init_qp_attr->qp_context = conn;
782	init_qp_attr->cap.max_send_wr = IBLND_SEND_WRS(version);
783	init_qp_attr->cap.max_recv_wr = IBLND_RECV_WRS(version);
784	init_qp_attr->cap.max_send_sge = 1;
785	init_qp_attr->cap.max_recv_sge = 1;
786	init_qp_attr->sq_sig_type = IB_SIGNAL_REQ_WR;
787	init_qp_attr->qp_type = IB_QPT_RC;
788	init_qp_attr->send_cq = cq;
789	init_qp_attr->recv_cq = cq;
790
791	conn->ibc_sched = sched;
792
793	rc = rdma_create_qp(cmid, conn->ibc_hdev->ibh_pd, init_qp_attr);
794	if (rc != 0) {
795		CERROR("Can't create QP: %d, send_wr: %d, recv_wr: %d\n",
796		       rc, init_qp_attr->cap.max_send_wr,
797		       init_qp_attr->cap.max_recv_wr);
798		goto failed_2;
799	}
800
801	LIBCFS_FREE(init_qp_attr, sizeof(*init_qp_attr));
802
803	/* 1 ref for caller and each rxmsg */
804	atomic_set(&conn->ibc_refcount, 1 + IBLND_RX_MSGS(version));
805	conn->ibc_nrx = IBLND_RX_MSGS(version);
806
807	/* post receives */
808	for (i = 0; i < IBLND_RX_MSGS(version); i++) {
809		rc = kiblnd_post_rx(&conn->ibc_rxs[i],
810				    IBLND_POSTRX_NO_CREDIT);
811		if (rc != 0) {
812			CERROR("Can't post rxmsg: %d\n", rc);
813
814			/* Make posted receives complete */
815			kiblnd_abort_receives(conn);
816
817			/* correct # of posted buffers
818			 * NB locking needed now I'm racing with completion */
819			spin_lock_irqsave(&sched->ibs_lock, flags);
820			conn->ibc_nrx -= IBLND_RX_MSGS(version) - i;
821			spin_unlock_irqrestore(&sched->ibs_lock, flags);
822
823			/* cmid will be destroyed by CM(ofed) after cm_callback
824			 * returned, so we can't refer it anymore
825			 * (by kiblnd_connd()->kiblnd_destroy_conn) */
826			rdma_destroy_qp(conn->ibc_cmid);
827			conn->ibc_cmid = NULL;
828
829			/* Drop my own and unused rxbuffer refcounts */
830			while (i++ <= IBLND_RX_MSGS(version))
831				kiblnd_conn_decref(conn);
832
833			return NULL;
834		}
835	}
836
837	/* Init successful! */
838	LASSERT (state == IBLND_CONN_ACTIVE_CONNECT ||
839		 state == IBLND_CONN_PASSIVE_WAIT);
840	conn->ibc_state = state;
841
842	/* 1 more conn */
843	atomic_inc(&net->ibn_nconns);
844	return conn;
845
846 failed_2:
847	kiblnd_destroy_conn(conn);
848 failed_1:
849	LIBCFS_FREE(init_qp_attr, sizeof(*init_qp_attr));
850 failed_0:
851	return NULL;
852}
853
854void
855kiblnd_destroy_conn (kib_conn_t *conn)
856{
857	struct rdma_cm_id *cmid = conn->ibc_cmid;
858	kib_peer_t	*peer = conn->ibc_peer;
859	int		rc;
860
861	LASSERT (!in_interrupt());
862	LASSERT (atomic_read(&conn->ibc_refcount) == 0);
863	LASSERT (list_empty(&conn->ibc_early_rxs));
864	LASSERT (list_empty(&conn->ibc_tx_noops));
865	LASSERT (list_empty(&conn->ibc_tx_queue));
866	LASSERT (list_empty(&conn->ibc_tx_queue_rsrvd));
867	LASSERT (list_empty(&conn->ibc_tx_queue_nocred));
868	LASSERT (list_empty(&conn->ibc_active_txs));
869	LASSERT (conn->ibc_noops_posted == 0);
870	LASSERT (conn->ibc_nsends_posted == 0);
871
872	switch (conn->ibc_state) {
873	default:
874		/* conn must be completely disengaged from the network */
875		LBUG();
876
877	case IBLND_CONN_DISCONNECTED:
878		/* connvars should have been freed already */
879		LASSERT (conn->ibc_connvars == NULL);
880		break;
881
882	case IBLND_CONN_INIT:
883		break;
884	}
885
886	/* conn->ibc_cmid might be destroyed by CM already */
887	if (cmid != NULL && cmid->qp != NULL)
888		rdma_destroy_qp(cmid);
889
890	if (conn->ibc_cq != NULL) {
891		rc = ib_destroy_cq(conn->ibc_cq);
892		if (rc != 0)
893			CWARN("Error destroying CQ: %d\n", rc);
894	}
895
896	if (conn->ibc_rx_pages != NULL)
897		kiblnd_unmap_rx_descs(conn);
898
899	if (conn->ibc_rxs != NULL) {
900		LIBCFS_FREE(conn->ibc_rxs,
901			    IBLND_RX_MSGS(conn->ibc_version) * sizeof(kib_rx_t));
902	}
903
904	if (conn->ibc_connvars != NULL)
905		LIBCFS_FREE(conn->ibc_connvars, sizeof(*conn->ibc_connvars));
906
907	if (conn->ibc_hdev != NULL)
908		kiblnd_hdev_decref(conn->ibc_hdev);
909
910	/* See CAVEAT EMPTOR above in kiblnd_create_conn */
911	if (conn->ibc_state != IBLND_CONN_INIT) {
912		kib_net_t *net = peer->ibp_ni->ni_data;
913
914		kiblnd_peer_decref(peer);
915		rdma_destroy_id(cmid);
916		atomic_dec(&net->ibn_nconns);
917	}
918
919	LIBCFS_FREE(conn, sizeof(*conn));
920}
921
922int
923kiblnd_close_peer_conns_locked (kib_peer_t *peer, int why)
924{
925	kib_conn_t	     *conn;
926	struct list_head	     *ctmp;
927	struct list_head	     *cnxt;
928	int		     count = 0;
929
930	list_for_each_safe (ctmp, cnxt, &peer->ibp_conns) {
931		conn = list_entry(ctmp, kib_conn_t, ibc_list);
932
933		CDEBUG(D_NET, "Closing conn -> %s, "
934			      "version: %x, reason: %d\n",
935		       libcfs_nid2str(peer->ibp_nid),
936		       conn->ibc_version, why);
937
938		kiblnd_close_conn_locked(conn, why);
939		count++;
940	}
941
942	return count;
943}
944
945int
946kiblnd_close_stale_conns_locked (kib_peer_t *peer,
947				 int version, __u64 incarnation)
948{
949	kib_conn_t	     *conn;
950	struct list_head	     *ctmp;
951	struct list_head	     *cnxt;
952	int		     count = 0;
953
954	list_for_each_safe (ctmp, cnxt, &peer->ibp_conns) {
955		conn = list_entry(ctmp, kib_conn_t, ibc_list);
956
957		if (conn->ibc_version     == version &&
958		    conn->ibc_incarnation == incarnation)
959			continue;
960
961		CDEBUG(D_NET, "Closing stale conn -> %s version: %x, "
962			      "incarnation:%#llx(%x, %#llx)\n",
963		       libcfs_nid2str(peer->ibp_nid),
964		       conn->ibc_version, conn->ibc_incarnation,
965		       version, incarnation);
966
967		kiblnd_close_conn_locked(conn, -ESTALE);
968		count++;
969	}
970
971	return count;
972}
973
974static int
975kiblnd_close_matching_conns(lnet_ni_t *ni, lnet_nid_t nid)
976{
977	kib_peer_t	     *peer;
978	struct list_head	     *ptmp;
979	struct list_head	     *pnxt;
980	int		     lo;
981	int		     hi;
982	int		     i;
983	unsigned long	   flags;
984	int		     count = 0;
985
986	write_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
987
988	if (nid != LNET_NID_ANY)
989		lo = hi = kiblnd_nid2peerlist(nid) - kiblnd_data.kib_peers;
990	else {
991		lo = 0;
992		hi = kiblnd_data.kib_peer_hash_size - 1;
993	}
994
995	for (i = lo; i <= hi; i++) {
996		list_for_each_safe (ptmp, pnxt, &kiblnd_data.kib_peers[i]) {
997
998			peer = list_entry(ptmp, kib_peer_t, ibp_list);
999			LASSERT (peer->ibp_connecting > 0 ||
1000				 peer->ibp_accepting > 0 ||
1001				 !list_empty(&peer->ibp_conns));
1002
1003			if (peer->ibp_ni != ni)
1004				continue;
1005
1006			if (!(nid == LNET_NID_ANY || nid == peer->ibp_nid))
1007				continue;
1008
1009			count += kiblnd_close_peer_conns_locked(peer, 0);
1010		}
1011	}
1012
1013	write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
1014
1015	/* wildcards always succeed */
1016	if (nid == LNET_NID_ANY)
1017		return 0;
1018
1019	return (count == 0) ? -ENOENT : 0;
1020}
1021
1022int
1023kiblnd_ctl(lnet_ni_t *ni, unsigned int cmd, void *arg)
1024{
1025	struct libcfs_ioctl_data *data = arg;
1026	int		       rc = -EINVAL;
1027
1028	switch (cmd) {
1029	case IOC_LIBCFS_GET_PEER: {
1030		lnet_nid_t   nid = 0;
1031		int	  count = 0;
1032
1033		rc = kiblnd_get_peer_info(ni, data->ioc_count,
1034					  &nid, &count);
1035		data->ioc_nid    = nid;
1036		data->ioc_count  = count;
1037		break;
1038	}
1039
1040	case IOC_LIBCFS_DEL_PEER: {
1041		rc = kiblnd_del_peer(ni, data->ioc_nid);
1042		break;
1043	}
1044	case IOC_LIBCFS_GET_CONN: {
1045		kib_conn_t *conn;
1046
1047		rc = 0;
1048		conn = kiblnd_get_conn_by_idx(ni, data->ioc_count);
1049		if (conn == NULL) {
1050			rc = -ENOENT;
1051			break;
1052		}
1053
1054		LASSERT (conn->ibc_cmid != NULL);
1055		data->ioc_nid = conn->ibc_peer->ibp_nid;
1056		if (conn->ibc_cmid->route.path_rec == NULL)
1057			data->ioc_u32[0] = 0; /* iWarp has no path MTU */
1058		else
1059			data->ioc_u32[0] =
1060			ib_mtu_enum_to_int(conn->ibc_cmid->route.path_rec->mtu);
1061		kiblnd_conn_decref(conn);
1062		break;
1063	}
1064	case IOC_LIBCFS_CLOSE_CONNECTION: {
1065		rc = kiblnd_close_matching_conns(ni, data->ioc_nid);
1066		break;
1067	}
1068
1069	default:
1070		break;
1071	}
1072
1073	return rc;
1074}
1075
1076void
1077kiblnd_query (lnet_ni_t *ni, lnet_nid_t nid, unsigned long *when)
1078{
1079	unsigned long	last_alive = 0;
1080	unsigned long	now = cfs_time_current();
1081	rwlock_t	*glock = &kiblnd_data.kib_global_lock;
1082	kib_peer_t	*peer;
1083	unsigned long	flags;
1084
1085	read_lock_irqsave(glock, flags);
1086
1087	peer = kiblnd_find_peer_locked(nid);
1088	if (peer != NULL) {
1089		LASSERT (peer->ibp_connecting > 0 || /* creating conns */
1090			 peer->ibp_accepting > 0 ||
1091			 !list_empty(&peer->ibp_conns));  /* active conn */
1092		last_alive = peer->ibp_last_alive;
1093	}
1094
1095	read_unlock_irqrestore(glock, flags);
1096
1097	if (last_alive != 0)
1098		*when = last_alive;
1099
1100	/* peer is not persistent in hash, trigger peer creation
1101	 * and connection establishment with a NULL tx */
1102	if (peer == NULL)
1103		kiblnd_launch_tx(ni, NULL, nid);
1104
1105	CDEBUG(D_NET, "Peer %s %p, alive %ld secs ago\n",
1106	       libcfs_nid2str(nid), peer,
1107	       last_alive ? cfs_duration_sec(now - last_alive) : -1);
1108	return;
1109}
1110
1111void
1112kiblnd_free_pages(kib_pages_t *p)
1113{
1114	int	npages = p->ibp_npages;
1115	int	i;
1116
1117	for (i = 0; i < npages; i++) {
1118		if (p->ibp_pages[i] != NULL)
1119			__free_page(p->ibp_pages[i]);
1120	}
1121
1122	LIBCFS_FREE(p, offsetof(kib_pages_t, ibp_pages[npages]));
1123}
1124
1125int
1126kiblnd_alloc_pages(kib_pages_t **pp, int cpt, int npages)
1127{
1128	kib_pages_t	*p;
1129	int		i;
1130
1131	LIBCFS_CPT_ALLOC(p, lnet_cpt_table(), cpt,
1132			 offsetof(kib_pages_t, ibp_pages[npages]));
1133	if (p == NULL) {
1134		CERROR("Can't allocate descriptor for %d pages\n", npages);
1135		return -ENOMEM;
1136	}
1137
1138	memset(p, 0, offsetof(kib_pages_t, ibp_pages[npages]));
1139	p->ibp_npages = npages;
1140
1141	for (i = 0; i < npages; i++) {
1142		p->ibp_pages[i] = alloc_pages_node(
1143				    cfs_cpt_spread_node(lnet_cpt_table(), cpt),
1144				    GFP_NOFS, 0);
1145		if (p->ibp_pages[i] == NULL) {
1146			CERROR("Can't allocate page %d of %d\n", i, npages);
1147			kiblnd_free_pages(p);
1148			return -ENOMEM;
1149		}
1150	}
1151
1152	*pp = p;
1153	return 0;
1154}
1155
1156void
1157kiblnd_unmap_rx_descs(kib_conn_t *conn)
1158{
1159	kib_rx_t *rx;
1160	int       i;
1161
1162	LASSERT (conn->ibc_rxs != NULL);
1163	LASSERT (conn->ibc_hdev != NULL);
1164
1165	for (i = 0; i < IBLND_RX_MSGS(conn->ibc_version); i++) {
1166		rx = &conn->ibc_rxs[i];
1167
1168		LASSERT (rx->rx_nob >= 0); /* not posted */
1169
1170		kiblnd_dma_unmap_single(conn->ibc_hdev->ibh_ibdev,
1171					KIBLND_UNMAP_ADDR(rx, rx_msgunmap,
1172							  rx->rx_msgaddr),
1173					IBLND_MSG_SIZE, DMA_FROM_DEVICE);
1174	}
1175
1176	kiblnd_free_pages(conn->ibc_rx_pages);
1177
1178	conn->ibc_rx_pages = NULL;
1179}
1180
1181void
1182kiblnd_map_rx_descs(kib_conn_t *conn)
1183{
1184	kib_rx_t       *rx;
1185	struct page    *pg;
1186	int	     pg_off;
1187	int	     ipg;
1188	int	     i;
1189
1190	for (pg_off = ipg = i = 0;
1191	     i < IBLND_RX_MSGS(conn->ibc_version); i++) {
1192		pg = conn->ibc_rx_pages->ibp_pages[ipg];
1193		rx = &conn->ibc_rxs[i];
1194
1195		rx->rx_conn = conn;
1196		rx->rx_msg = (kib_msg_t *)(((char *)page_address(pg)) + pg_off);
1197
1198		rx->rx_msgaddr = kiblnd_dma_map_single(conn->ibc_hdev->ibh_ibdev,
1199						       rx->rx_msg, IBLND_MSG_SIZE,
1200						       DMA_FROM_DEVICE);
1201		LASSERT (!kiblnd_dma_mapping_error(conn->ibc_hdev->ibh_ibdev,
1202						   rx->rx_msgaddr));
1203		KIBLND_UNMAP_ADDR_SET(rx, rx_msgunmap, rx->rx_msgaddr);
1204
1205		CDEBUG(D_NET, "rx %d: %p %#llx(%#llx)\n",
1206		       i, rx->rx_msg, rx->rx_msgaddr,
1207		       lnet_page2phys(pg) + pg_off);
1208
1209		pg_off += IBLND_MSG_SIZE;
1210		LASSERT (pg_off <= PAGE_SIZE);
1211
1212		if (pg_off == PAGE_SIZE) {
1213			pg_off = 0;
1214			ipg++;
1215			LASSERT (ipg <= IBLND_RX_MSG_PAGES(conn->ibc_version));
1216		}
1217	}
1218}
1219
1220static void
1221kiblnd_unmap_tx_pool(kib_tx_pool_t *tpo)
1222{
1223	kib_hca_dev_t  *hdev = tpo->tpo_hdev;
1224	kib_tx_t       *tx;
1225	int	     i;
1226
1227	LASSERT (tpo->tpo_pool.po_allocated == 0);
1228
1229	if (hdev == NULL)
1230		return;
1231
1232	for (i = 0; i < tpo->tpo_pool.po_size; i++) {
1233		tx = &tpo->tpo_tx_descs[i];
1234		kiblnd_dma_unmap_single(hdev->ibh_ibdev,
1235					KIBLND_UNMAP_ADDR(tx, tx_msgunmap,
1236							  tx->tx_msgaddr),
1237					IBLND_MSG_SIZE, DMA_TO_DEVICE);
1238	}
1239
1240	kiblnd_hdev_decref(hdev);
1241	tpo->tpo_hdev = NULL;
1242}
1243
1244static kib_hca_dev_t *
1245kiblnd_current_hdev(kib_dev_t *dev)
1246{
1247	kib_hca_dev_t *hdev;
1248	unsigned long  flags;
1249	int	    i = 0;
1250
1251	read_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
1252	while (dev->ibd_failover) {
1253		read_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
1254		if (i++ % 50 == 0)
1255			CDEBUG(D_NET, "%s: Wait for failover\n",
1256			       dev->ibd_ifname);
1257		schedule_timeout(cfs_time_seconds(1) / 100);
1258
1259		read_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
1260	}
1261
1262	kiblnd_hdev_addref_locked(dev->ibd_hdev);
1263	hdev = dev->ibd_hdev;
1264
1265	read_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
1266
1267	return hdev;
1268}
1269
1270static void
1271kiblnd_map_tx_pool(kib_tx_pool_t *tpo)
1272{
1273	kib_pages_t    *txpgs = tpo->tpo_tx_pages;
1274	kib_pool_t     *pool  = &tpo->tpo_pool;
1275	kib_net_t      *net   = pool->po_owner->ps_net;
1276	kib_dev_t      *dev;
1277	struct page    *page;
1278	kib_tx_t       *tx;
1279	int	     page_offset;
1280	int	     ipage;
1281	int	     i;
1282
1283	LASSERT (net != NULL);
1284
1285	dev = net->ibn_dev;
1286
1287	/* pre-mapped messages are not bigger than 1 page */
1288	CLASSERT (IBLND_MSG_SIZE <= PAGE_SIZE);
1289
1290	/* No fancy arithmetic when we do the buffer calculations */
1291	CLASSERT (PAGE_SIZE % IBLND_MSG_SIZE == 0);
1292
1293	tpo->tpo_hdev = kiblnd_current_hdev(dev);
1294
1295	for (ipage = page_offset = i = 0; i < pool->po_size; i++) {
1296		page = txpgs->ibp_pages[ipage];
1297		tx = &tpo->tpo_tx_descs[i];
1298
1299		tx->tx_msg = (kib_msg_t *)(((char *)page_address(page)) +
1300					   page_offset);
1301
1302		tx->tx_msgaddr = kiblnd_dma_map_single(
1303			tpo->tpo_hdev->ibh_ibdev, tx->tx_msg,
1304			IBLND_MSG_SIZE, DMA_TO_DEVICE);
1305		LASSERT (!kiblnd_dma_mapping_error(tpo->tpo_hdev->ibh_ibdev,
1306						   tx->tx_msgaddr));
1307		KIBLND_UNMAP_ADDR_SET(tx, tx_msgunmap, tx->tx_msgaddr);
1308
1309		list_add(&tx->tx_list, &pool->po_free_list);
1310
1311		page_offset += IBLND_MSG_SIZE;
1312		LASSERT (page_offset <= PAGE_SIZE);
1313
1314		if (page_offset == PAGE_SIZE) {
1315			page_offset = 0;
1316			ipage++;
1317			LASSERT (ipage <= txpgs->ibp_npages);
1318		}
1319	}
1320}
1321
1322struct ib_mr *
1323kiblnd_find_dma_mr(kib_hca_dev_t *hdev, __u64 addr, __u64 size)
1324{
1325	__u64   index;
1326
1327	LASSERT (hdev->ibh_mrs[0] != NULL);
1328
1329	if (hdev->ibh_nmrs == 1)
1330		return hdev->ibh_mrs[0];
1331
1332	index = addr >> hdev->ibh_mr_shift;
1333
1334	if (index <  hdev->ibh_nmrs &&
1335	    index == ((addr + size - 1) >> hdev->ibh_mr_shift))
1336		return hdev->ibh_mrs[index];
1337
1338	return NULL;
1339}
1340
1341struct ib_mr *
1342kiblnd_find_rd_dma_mr(kib_hca_dev_t *hdev, kib_rdma_desc_t *rd)
1343{
1344	struct ib_mr *prev_mr;
1345	struct ib_mr *mr;
1346	int	   i;
1347
1348	LASSERT (hdev->ibh_mrs[0] != NULL);
1349
1350	if (*kiblnd_tunables.kib_map_on_demand > 0 &&
1351	    *kiblnd_tunables.kib_map_on_demand <= rd->rd_nfrags)
1352		return NULL;
1353
1354	if (hdev->ibh_nmrs == 1)
1355		return hdev->ibh_mrs[0];
1356
1357	for (i = 0, mr = prev_mr = NULL;
1358	     i < rd->rd_nfrags; i++) {
1359		mr = kiblnd_find_dma_mr(hdev,
1360					rd->rd_frags[i].rf_addr,
1361					rd->rd_frags[i].rf_nob);
1362		if (prev_mr == NULL)
1363			prev_mr = mr;
1364
1365		if (mr == NULL || prev_mr != mr) {
1366			/* Can't covered by one single MR */
1367			mr = NULL;
1368			break;
1369		}
1370	}
1371
1372	return mr;
1373}
1374
1375static void
1376kiblnd_destroy_fmr_pool(kib_fmr_pool_t *pool)
1377{
1378	LASSERT (pool->fpo_map_count == 0);
1379
1380	if (pool->fpo_fmr_pool != NULL)
1381		ib_destroy_fmr_pool(pool->fpo_fmr_pool);
1382
1383	if (pool->fpo_hdev != NULL)
1384		kiblnd_hdev_decref(pool->fpo_hdev);
1385
1386	LIBCFS_FREE(pool, sizeof(kib_fmr_pool_t));
1387}
1388
1389static void
1390kiblnd_destroy_fmr_pool_list(struct list_head *head)
1391{
1392	kib_fmr_pool_t *pool;
1393
1394	while (!list_empty(head)) {
1395		pool = list_entry(head->next, kib_fmr_pool_t, fpo_list);
1396		list_del(&pool->fpo_list);
1397		kiblnd_destroy_fmr_pool(pool);
1398	}
1399}
1400
1401static int kiblnd_fmr_pool_size(int ncpts)
1402{
1403	int size = *kiblnd_tunables.kib_fmr_pool_size / ncpts;
1404
1405	return max(IBLND_FMR_POOL, size);
1406}
1407
1408static int kiblnd_fmr_flush_trigger(int ncpts)
1409{
1410	int size = *kiblnd_tunables.kib_fmr_flush_trigger / ncpts;
1411
1412	return max(IBLND_FMR_POOL_FLUSH, size);
1413}
1414
1415static int
1416kiblnd_create_fmr_pool(kib_fmr_poolset_t *fps, kib_fmr_pool_t **pp_fpo)
1417{
1418	/* FMR pool for RDMA */
1419	kib_dev_t	       *dev = fps->fps_net->ibn_dev;
1420	kib_fmr_pool_t	  *fpo;
1421	struct ib_fmr_pool_param param = {
1422		.max_pages_per_fmr = LNET_MAX_PAYLOAD/PAGE_SIZE,
1423		.page_shift	= PAGE_SHIFT,
1424		.access	    = (IB_ACCESS_LOCAL_WRITE |
1425				      IB_ACCESS_REMOTE_WRITE),
1426		.pool_size	   = fps->fps_pool_size,
1427		.dirty_watermark   = fps->fps_flush_trigger,
1428		.flush_function    = NULL,
1429		.flush_arg	 = NULL,
1430		.cache	     = !!*kiblnd_tunables.kib_fmr_cache};
1431	int rc;
1432
1433	LIBCFS_CPT_ALLOC(fpo, lnet_cpt_table(), fps->fps_cpt, sizeof(*fpo));
1434	if (fpo == NULL)
1435		return -ENOMEM;
1436
1437	fpo->fpo_hdev = kiblnd_current_hdev(dev);
1438
1439	fpo->fpo_fmr_pool = ib_create_fmr_pool(fpo->fpo_hdev->ibh_pd, &param);
1440	if (IS_ERR(fpo->fpo_fmr_pool)) {
1441		rc = PTR_ERR(fpo->fpo_fmr_pool);
1442		CERROR("Failed to create FMR pool: %d\n", rc);
1443
1444		kiblnd_hdev_decref(fpo->fpo_hdev);
1445		LIBCFS_FREE(fpo, sizeof(kib_fmr_pool_t));
1446		return rc;
1447	}
1448
1449	fpo->fpo_deadline = cfs_time_shift(IBLND_POOL_DEADLINE);
1450	fpo->fpo_owner    = fps;
1451	*pp_fpo = fpo;
1452
1453	return 0;
1454}
1455
1456static void
1457kiblnd_fail_fmr_poolset(kib_fmr_poolset_t *fps, struct list_head *zombies)
1458{
1459	if (fps->fps_net == NULL) /* intialized? */
1460		return;
1461
1462	spin_lock(&fps->fps_lock);
1463
1464	while (!list_empty(&fps->fps_pool_list)) {
1465		kib_fmr_pool_t *fpo = list_entry(fps->fps_pool_list.next,
1466						 kib_fmr_pool_t, fpo_list);
1467		fpo->fpo_failed = 1;
1468		list_del(&fpo->fpo_list);
1469		if (fpo->fpo_map_count == 0)
1470			list_add(&fpo->fpo_list, zombies);
1471		else
1472			list_add(&fpo->fpo_list, &fps->fps_failed_pool_list);
1473	}
1474
1475	spin_unlock(&fps->fps_lock);
1476}
1477
1478static void
1479kiblnd_fini_fmr_poolset(kib_fmr_poolset_t *fps)
1480{
1481	if (fps->fps_net != NULL) { /* initialized? */
1482		kiblnd_destroy_fmr_pool_list(&fps->fps_failed_pool_list);
1483		kiblnd_destroy_fmr_pool_list(&fps->fps_pool_list);
1484	}
1485}
1486
1487static int
1488kiblnd_init_fmr_poolset(kib_fmr_poolset_t *fps, int cpt, kib_net_t *net,
1489			int pool_size, int flush_trigger)
1490{
1491	kib_fmr_pool_t *fpo;
1492	int	     rc;
1493
1494	memset(fps, 0, sizeof(kib_fmr_poolset_t));
1495
1496	fps->fps_net = net;
1497	fps->fps_cpt = cpt;
1498	fps->fps_pool_size = pool_size;
1499	fps->fps_flush_trigger = flush_trigger;
1500	spin_lock_init(&fps->fps_lock);
1501	INIT_LIST_HEAD(&fps->fps_pool_list);
1502	INIT_LIST_HEAD(&fps->fps_failed_pool_list);
1503
1504	rc = kiblnd_create_fmr_pool(fps, &fpo);
1505	if (rc == 0)
1506		list_add_tail(&fpo->fpo_list, &fps->fps_pool_list);
1507
1508	return rc;
1509}
1510
1511static int
1512kiblnd_fmr_pool_is_idle(kib_fmr_pool_t *fpo, unsigned long now)
1513{
1514	if (fpo->fpo_map_count != 0) /* still in use */
1515		return 0;
1516	if (fpo->fpo_failed)
1517		return 1;
1518	return cfs_time_aftereq(now, fpo->fpo_deadline);
1519}
1520
1521void
1522kiblnd_fmr_pool_unmap(kib_fmr_t *fmr, int status)
1523{
1524	LIST_HEAD     (zombies);
1525	kib_fmr_pool_t    *fpo = fmr->fmr_pool;
1526	kib_fmr_poolset_t *fps = fpo->fpo_owner;
1527	unsigned long	 now = cfs_time_current();
1528	kib_fmr_pool_t    *tmp;
1529	int		rc;
1530
1531	rc = ib_fmr_pool_unmap(fmr->fmr_pfmr);
1532	LASSERT (rc == 0);
1533
1534	if (status != 0) {
1535		rc = ib_flush_fmr_pool(fpo->fpo_fmr_pool);
1536		LASSERT (rc == 0);
1537	}
1538
1539	fmr->fmr_pool = NULL;
1540	fmr->fmr_pfmr = NULL;
1541
1542	spin_lock(&fps->fps_lock);
1543	fpo->fpo_map_count --;  /* decref the pool */
1544
1545	list_for_each_entry_safe(fpo, tmp, &fps->fps_pool_list, fpo_list) {
1546		/* the first pool is persistent */
1547		if (fps->fps_pool_list.next == &fpo->fpo_list)
1548			continue;
1549
1550		if (kiblnd_fmr_pool_is_idle(fpo, now)) {
1551			list_move(&fpo->fpo_list, &zombies);
1552			fps->fps_version ++;
1553		}
1554	}
1555	spin_unlock(&fps->fps_lock);
1556
1557	if (!list_empty(&zombies))
1558		kiblnd_destroy_fmr_pool_list(&zombies);
1559}
1560
1561int
1562kiblnd_fmr_pool_map(kib_fmr_poolset_t *fps, __u64 *pages, int npages,
1563		    __u64 iov, kib_fmr_t *fmr)
1564{
1565	struct ib_pool_fmr *pfmr;
1566	kib_fmr_pool_t     *fpo;
1567	__u64	       version;
1568	int		 rc;
1569
1570 again:
1571	spin_lock(&fps->fps_lock);
1572	version = fps->fps_version;
1573	list_for_each_entry(fpo, &fps->fps_pool_list, fpo_list) {
1574		fpo->fpo_deadline = cfs_time_shift(IBLND_POOL_DEADLINE);
1575		fpo->fpo_map_count++;
1576		spin_unlock(&fps->fps_lock);
1577
1578		pfmr = ib_fmr_pool_map_phys(fpo->fpo_fmr_pool,
1579					    pages, npages, iov);
1580		if (likely(!IS_ERR(pfmr))) {
1581			fmr->fmr_pool = fpo;
1582			fmr->fmr_pfmr = pfmr;
1583			return 0;
1584		}
1585
1586		spin_lock(&fps->fps_lock);
1587		fpo->fpo_map_count--;
1588		if (PTR_ERR(pfmr) != -EAGAIN) {
1589			spin_unlock(&fps->fps_lock);
1590			return PTR_ERR(pfmr);
1591		}
1592
1593		/* EAGAIN and ... */
1594		if (version != fps->fps_version) {
1595			spin_unlock(&fps->fps_lock);
1596			goto again;
1597		}
1598	}
1599
1600	if (fps->fps_increasing) {
1601		spin_unlock(&fps->fps_lock);
1602		CDEBUG(D_NET, "Another thread is allocating new "
1603		       "FMR pool, waiting for her to complete\n");
1604		schedule();
1605		goto again;
1606
1607	}
1608
1609	if (time_before(cfs_time_current(), fps->fps_next_retry)) {
1610		/* someone failed recently */
1611		spin_unlock(&fps->fps_lock);
1612		return -EAGAIN;
1613	}
1614
1615	fps->fps_increasing = 1;
1616	spin_unlock(&fps->fps_lock);
1617
1618	CDEBUG(D_NET, "Allocate new FMR pool\n");
1619	rc = kiblnd_create_fmr_pool(fps, &fpo);
1620	spin_lock(&fps->fps_lock);
1621	fps->fps_increasing = 0;
1622	if (rc == 0) {
1623		fps->fps_version++;
1624		list_add_tail(&fpo->fpo_list, &fps->fps_pool_list);
1625	} else {
1626		fps->fps_next_retry = cfs_time_shift(IBLND_POOL_RETRY);
1627	}
1628	spin_unlock(&fps->fps_lock);
1629
1630	goto again;
1631}
1632
1633static void
1634kiblnd_fini_pool(kib_pool_t *pool)
1635{
1636	LASSERT (list_empty(&pool->po_free_list));
1637	LASSERT (pool->po_allocated == 0);
1638
1639	CDEBUG(D_NET, "Finalize %s pool\n", pool->po_owner->ps_name);
1640}
1641
1642static void
1643kiblnd_init_pool(kib_poolset_t *ps, kib_pool_t *pool, int size)
1644{
1645	CDEBUG(D_NET, "Initialize %s pool\n", ps->ps_name);
1646
1647	memset(pool, 0, sizeof(kib_pool_t));
1648	INIT_LIST_HEAD(&pool->po_free_list);
1649	pool->po_deadline = cfs_time_shift(IBLND_POOL_DEADLINE);
1650	pool->po_owner    = ps;
1651	pool->po_size     = size;
1652}
1653
1654static void
1655kiblnd_destroy_pool_list(struct list_head *head)
1656{
1657	kib_pool_t *pool;
1658
1659	while (!list_empty(head)) {
1660		pool = list_entry(head->next, kib_pool_t, po_list);
1661		list_del(&pool->po_list);
1662
1663		LASSERT (pool->po_owner != NULL);
1664		pool->po_owner->ps_pool_destroy(pool);
1665	}
1666}
1667
1668static void
1669kiblnd_fail_poolset(kib_poolset_t *ps, struct list_head *zombies)
1670{
1671	if (ps->ps_net == NULL) /* intialized? */
1672		return;
1673
1674	spin_lock(&ps->ps_lock);
1675	while (!list_empty(&ps->ps_pool_list)) {
1676		kib_pool_t *po = list_entry(ps->ps_pool_list.next,
1677					    kib_pool_t, po_list);
1678		po->po_failed = 1;
1679		list_del(&po->po_list);
1680		if (po->po_allocated == 0)
1681			list_add(&po->po_list, zombies);
1682		else
1683			list_add(&po->po_list, &ps->ps_failed_pool_list);
1684	}
1685	spin_unlock(&ps->ps_lock);
1686}
1687
1688static void
1689kiblnd_fini_poolset(kib_poolset_t *ps)
1690{
1691	if (ps->ps_net != NULL) { /* initialized? */
1692		kiblnd_destroy_pool_list(&ps->ps_failed_pool_list);
1693		kiblnd_destroy_pool_list(&ps->ps_pool_list);
1694	}
1695}
1696
1697static int
1698kiblnd_init_poolset(kib_poolset_t *ps, int cpt,
1699		    kib_net_t *net, char *name, int size,
1700		    kib_ps_pool_create_t po_create,
1701		    kib_ps_pool_destroy_t po_destroy,
1702		    kib_ps_node_init_t nd_init,
1703		    kib_ps_node_fini_t nd_fini)
1704{
1705	kib_pool_t	*pool;
1706	int		rc;
1707
1708	memset(ps, 0, sizeof(kib_poolset_t));
1709
1710	ps->ps_cpt	    = cpt;
1711	ps->ps_net	  = net;
1712	ps->ps_pool_create  = po_create;
1713	ps->ps_pool_destroy = po_destroy;
1714	ps->ps_node_init    = nd_init;
1715	ps->ps_node_fini    = nd_fini;
1716	ps->ps_pool_size    = size;
1717	if (strlcpy(ps->ps_name, name, sizeof(ps->ps_name))
1718	    >= sizeof(ps->ps_name))
1719		return -E2BIG;
1720	spin_lock_init(&ps->ps_lock);
1721	INIT_LIST_HEAD(&ps->ps_pool_list);
1722	INIT_LIST_HEAD(&ps->ps_failed_pool_list);
1723
1724	rc = ps->ps_pool_create(ps, size, &pool);
1725	if (rc == 0)
1726		list_add(&pool->po_list, &ps->ps_pool_list);
1727	else
1728		CERROR("Failed to create the first pool for %s\n", ps->ps_name);
1729
1730	return rc;
1731}
1732
1733static int
1734kiblnd_pool_is_idle(kib_pool_t *pool, unsigned long now)
1735{
1736	if (pool->po_allocated != 0) /* still in use */
1737		return 0;
1738	if (pool->po_failed)
1739		return 1;
1740	return cfs_time_aftereq(now, pool->po_deadline);
1741}
1742
1743void
1744kiblnd_pool_free_node(kib_pool_t *pool, struct list_head *node)
1745{
1746	LIST_HEAD  (zombies);
1747	kib_poolset_t  *ps = pool->po_owner;
1748	kib_pool_t     *tmp;
1749	unsigned long      now = cfs_time_current();
1750
1751	spin_lock(&ps->ps_lock);
1752
1753	if (ps->ps_node_fini != NULL)
1754		ps->ps_node_fini(pool, node);
1755
1756	LASSERT (pool->po_allocated > 0);
1757	list_add(node, &pool->po_free_list);
1758	pool->po_allocated --;
1759
1760	list_for_each_entry_safe(pool, tmp, &ps->ps_pool_list, po_list) {
1761		/* the first pool is persistent */
1762		if (ps->ps_pool_list.next == &pool->po_list)
1763			continue;
1764
1765		if (kiblnd_pool_is_idle(pool, now))
1766			list_move(&pool->po_list, &zombies);
1767	}
1768	spin_unlock(&ps->ps_lock);
1769
1770	if (!list_empty(&zombies))
1771		kiblnd_destroy_pool_list(&zombies);
1772}
1773
1774struct list_head *
1775kiblnd_pool_alloc_node(kib_poolset_t *ps)
1776{
1777	struct list_head	    *node;
1778	kib_pool_t	    *pool;
1779	int		    rc;
1780
1781 again:
1782	spin_lock(&ps->ps_lock);
1783	list_for_each_entry(pool, &ps->ps_pool_list, po_list) {
1784		if (list_empty(&pool->po_free_list))
1785			continue;
1786
1787		pool->po_allocated ++;
1788		pool->po_deadline = cfs_time_shift(IBLND_POOL_DEADLINE);
1789		node = pool->po_free_list.next;
1790		list_del(node);
1791
1792		if (ps->ps_node_init != NULL) {
1793			/* still hold the lock */
1794			ps->ps_node_init(pool, node);
1795		}
1796		spin_unlock(&ps->ps_lock);
1797		return node;
1798	}
1799
1800	/* no available tx pool and ... */
1801	if (ps->ps_increasing) {
1802		/* another thread is allocating a new pool */
1803		spin_unlock(&ps->ps_lock);
1804		CDEBUG(D_NET, "Another thread is allocating new "
1805		       "%s pool, waiting for her to complete\n",
1806		       ps->ps_name);
1807		schedule();
1808		goto again;
1809	}
1810
1811	if (time_before(cfs_time_current(), ps->ps_next_retry)) {
1812		/* someone failed recently */
1813		spin_unlock(&ps->ps_lock);
1814		return NULL;
1815	}
1816
1817	ps->ps_increasing = 1;
1818	spin_unlock(&ps->ps_lock);
1819
1820	CDEBUG(D_NET, "%s pool exhausted, allocate new pool\n", ps->ps_name);
1821
1822	rc = ps->ps_pool_create(ps, ps->ps_pool_size, &pool);
1823
1824	spin_lock(&ps->ps_lock);
1825	ps->ps_increasing = 0;
1826	if (rc == 0) {
1827		list_add_tail(&pool->po_list, &ps->ps_pool_list);
1828	} else {
1829		ps->ps_next_retry = cfs_time_shift(IBLND_POOL_RETRY);
1830		CERROR("Can't allocate new %s pool because out of memory\n",
1831		       ps->ps_name);
1832	}
1833	spin_unlock(&ps->ps_lock);
1834
1835	goto again;
1836}
1837
1838void
1839kiblnd_pmr_pool_unmap(kib_phys_mr_t *pmr)
1840{
1841	kib_pmr_pool_t      *ppo = pmr->pmr_pool;
1842	struct ib_mr	*mr  = pmr->pmr_mr;
1843
1844	pmr->pmr_mr = NULL;
1845	kiblnd_pool_free_node(&ppo->ppo_pool, &pmr->pmr_list);
1846	if (mr != NULL)
1847		ib_dereg_mr(mr);
1848}
1849
1850int
1851kiblnd_pmr_pool_map(kib_pmr_poolset_t *pps, kib_hca_dev_t *hdev,
1852		    kib_rdma_desc_t *rd, __u64 *iova, kib_phys_mr_t **pp_pmr)
1853{
1854	kib_phys_mr_t *pmr;
1855	struct list_head    *node;
1856	int	    rc;
1857	int	    i;
1858
1859	node = kiblnd_pool_alloc_node(&pps->pps_poolset);
1860	if (node == NULL) {
1861		CERROR("Failed to allocate PMR descriptor\n");
1862		return -ENOMEM;
1863	}
1864
1865	pmr = container_of(node, kib_phys_mr_t, pmr_list);
1866	if (pmr->pmr_pool->ppo_hdev != hdev) {
1867		kiblnd_pool_free_node(&pmr->pmr_pool->ppo_pool, node);
1868		return -EAGAIN;
1869	}
1870
1871	for (i = 0; i < rd->rd_nfrags; i ++) {
1872		pmr->pmr_ipb[i].addr = rd->rd_frags[i].rf_addr;
1873		pmr->pmr_ipb[i].size = rd->rd_frags[i].rf_nob;
1874	}
1875
1876	pmr->pmr_mr = ib_reg_phys_mr(hdev->ibh_pd,
1877				     pmr->pmr_ipb, rd->rd_nfrags,
1878				     IB_ACCESS_LOCAL_WRITE |
1879				     IB_ACCESS_REMOTE_WRITE,
1880				     iova);
1881	if (!IS_ERR(pmr->pmr_mr)) {
1882		pmr->pmr_iova = *iova;
1883		*pp_pmr = pmr;
1884		return 0;
1885	}
1886
1887	rc = PTR_ERR(pmr->pmr_mr);
1888	CERROR("Failed ib_reg_phys_mr: %d\n", rc);
1889
1890	pmr->pmr_mr = NULL;
1891	kiblnd_pool_free_node(&pmr->pmr_pool->ppo_pool, node);
1892
1893	return rc;
1894}
1895
1896static void
1897kiblnd_destroy_pmr_pool(kib_pool_t *pool)
1898{
1899	kib_pmr_pool_t *ppo = container_of(pool, kib_pmr_pool_t, ppo_pool);
1900	kib_phys_mr_t  *pmr;
1901
1902	LASSERT (pool->po_allocated == 0);
1903
1904	while (!list_empty(&pool->po_free_list)) {
1905		pmr = list_entry(pool->po_free_list.next,
1906				     kib_phys_mr_t, pmr_list);
1907
1908		LASSERT (pmr->pmr_mr == NULL);
1909		list_del(&pmr->pmr_list);
1910
1911		if (pmr->pmr_ipb != NULL) {
1912			LIBCFS_FREE(pmr->pmr_ipb,
1913				    IBLND_MAX_RDMA_FRAGS *
1914				    sizeof(struct ib_phys_buf));
1915		}
1916
1917		LIBCFS_FREE(pmr, sizeof(kib_phys_mr_t));
1918	}
1919
1920	kiblnd_fini_pool(pool);
1921	if (ppo->ppo_hdev != NULL)
1922		kiblnd_hdev_decref(ppo->ppo_hdev);
1923
1924	LIBCFS_FREE(ppo, sizeof(kib_pmr_pool_t));
1925}
1926
1927static inline int kiblnd_pmr_pool_size(int ncpts)
1928{
1929	int size = *kiblnd_tunables.kib_pmr_pool_size / ncpts;
1930
1931	return max(IBLND_PMR_POOL, size);
1932}
1933
1934static int
1935kiblnd_create_pmr_pool(kib_poolset_t *ps, int size, kib_pool_t **pp_po)
1936{
1937	struct kib_pmr_pool	*ppo;
1938	struct kib_pool		*pool;
1939	kib_phys_mr_t		*pmr;
1940	int			i;
1941
1942	LIBCFS_CPT_ALLOC(ppo, lnet_cpt_table(),
1943			 ps->ps_cpt, sizeof(kib_pmr_pool_t));
1944	if (ppo == NULL) {
1945		CERROR("Failed to allocate PMR pool\n");
1946		return -ENOMEM;
1947	}
1948
1949	pool = &ppo->ppo_pool;
1950	kiblnd_init_pool(ps, pool, size);
1951
1952	for (i = 0; i < size; i++) {
1953		LIBCFS_CPT_ALLOC(pmr, lnet_cpt_table(),
1954				 ps->ps_cpt, sizeof(kib_phys_mr_t));
1955		if (pmr == NULL)
1956			break;
1957
1958		pmr->pmr_pool = ppo;
1959		LIBCFS_CPT_ALLOC(pmr->pmr_ipb, lnet_cpt_table(), ps->ps_cpt,
1960				 IBLND_MAX_RDMA_FRAGS * sizeof(*pmr->pmr_ipb));
1961		if (pmr->pmr_ipb == NULL)
1962			break;
1963
1964		list_add(&pmr->pmr_list, &pool->po_free_list);
1965	}
1966
1967	if (i < size) {
1968		ps->ps_pool_destroy(pool);
1969		return -ENOMEM;
1970	}
1971
1972	ppo->ppo_hdev = kiblnd_current_hdev(ps->ps_net->ibn_dev);
1973	*pp_po = pool;
1974	return 0;
1975}
1976
1977static void
1978kiblnd_destroy_tx_pool(kib_pool_t *pool)
1979{
1980	kib_tx_pool_t  *tpo = container_of(pool, kib_tx_pool_t, tpo_pool);
1981	int	     i;
1982
1983	LASSERT (pool->po_allocated == 0);
1984
1985	if (tpo->tpo_tx_pages != NULL) {
1986		kiblnd_unmap_tx_pool(tpo);
1987		kiblnd_free_pages(tpo->tpo_tx_pages);
1988	}
1989
1990	if (tpo->tpo_tx_descs == NULL)
1991		goto out;
1992
1993	for (i = 0; i < pool->po_size; i++) {
1994		kib_tx_t *tx = &tpo->tpo_tx_descs[i];
1995
1996		list_del(&tx->tx_list);
1997		if (tx->tx_pages != NULL)
1998			LIBCFS_FREE(tx->tx_pages,
1999				    LNET_MAX_IOV *
2000				    sizeof(*tx->tx_pages));
2001		if (tx->tx_frags != NULL)
2002			LIBCFS_FREE(tx->tx_frags,
2003				    IBLND_MAX_RDMA_FRAGS *
2004					    sizeof(*tx->tx_frags));
2005		if (tx->tx_wrq != NULL)
2006			LIBCFS_FREE(tx->tx_wrq,
2007				    (1 + IBLND_MAX_RDMA_FRAGS) *
2008				    sizeof(*tx->tx_wrq));
2009		if (tx->tx_sge != NULL)
2010			LIBCFS_FREE(tx->tx_sge,
2011				    (1 + IBLND_MAX_RDMA_FRAGS) *
2012				    sizeof(*tx->tx_sge));
2013		if (tx->tx_rd != NULL)
2014			LIBCFS_FREE(tx->tx_rd,
2015				    offsetof(kib_rdma_desc_t,
2016					     rd_frags[IBLND_MAX_RDMA_FRAGS]));
2017	}
2018
2019	LIBCFS_FREE(tpo->tpo_tx_descs,
2020		    pool->po_size * sizeof(kib_tx_t));
2021out:
2022	kiblnd_fini_pool(pool);
2023	LIBCFS_FREE(tpo, sizeof(kib_tx_pool_t));
2024}
2025
2026static int kiblnd_tx_pool_size(int ncpts)
2027{
2028	int ntx = *kiblnd_tunables.kib_ntx / ncpts;
2029
2030	return max(IBLND_TX_POOL, ntx);
2031}
2032
2033static int
2034kiblnd_create_tx_pool(kib_poolset_t *ps, int size, kib_pool_t **pp_po)
2035{
2036	int	    i;
2037	int	    npg;
2038	kib_pool_t    *pool;
2039	kib_tx_pool_t *tpo;
2040
2041	LIBCFS_CPT_ALLOC(tpo, lnet_cpt_table(), ps->ps_cpt, sizeof(*tpo));
2042	if (tpo == NULL) {
2043		CERROR("Failed to allocate TX pool\n");
2044		return -ENOMEM;
2045	}
2046
2047	pool = &tpo->tpo_pool;
2048	kiblnd_init_pool(ps, pool, size);
2049	tpo->tpo_tx_descs = NULL;
2050	tpo->tpo_tx_pages = NULL;
2051
2052	npg = (size * IBLND_MSG_SIZE + PAGE_SIZE - 1) / PAGE_SIZE;
2053	if (kiblnd_alloc_pages(&tpo->tpo_tx_pages, ps->ps_cpt, npg) != 0) {
2054		CERROR("Can't allocate tx pages: %d\n", npg);
2055		LIBCFS_FREE(tpo, sizeof(kib_tx_pool_t));
2056		return -ENOMEM;
2057	}
2058
2059	LIBCFS_CPT_ALLOC(tpo->tpo_tx_descs, lnet_cpt_table(), ps->ps_cpt,
2060			 size * sizeof(kib_tx_t));
2061	if (tpo->tpo_tx_descs == NULL) {
2062		CERROR("Can't allocate %d tx descriptors\n", size);
2063		ps->ps_pool_destroy(pool);
2064		return -ENOMEM;
2065	}
2066
2067	memset(tpo->tpo_tx_descs, 0, size * sizeof(kib_tx_t));
2068
2069	for (i = 0; i < size; i++) {
2070		kib_tx_t *tx = &tpo->tpo_tx_descs[i];
2071
2072		tx->tx_pool = tpo;
2073		if (ps->ps_net->ibn_fmr_ps != NULL) {
2074			LIBCFS_CPT_ALLOC(tx->tx_pages,
2075					 lnet_cpt_table(), ps->ps_cpt,
2076					 LNET_MAX_IOV * sizeof(*tx->tx_pages));
2077			if (tx->tx_pages == NULL)
2078				break;
2079		}
2080
2081		LIBCFS_CPT_ALLOC(tx->tx_frags, lnet_cpt_table(), ps->ps_cpt,
2082				 IBLND_MAX_RDMA_FRAGS * sizeof(*tx->tx_frags));
2083		if (tx->tx_frags == NULL)
2084			break;
2085
2086		sg_init_table(tx->tx_frags, IBLND_MAX_RDMA_FRAGS);
2087
2088		LIBCFS_CPT_ALLOC(tx->tx_wrq, lnet_cpt_table(), ps->ps_cpt,
2089				 (1 + IBLND_MAX_RDMA_FRAGS) *
2090				 sizeof(*tx->tx_wrq));
2091		if (tx->tx_wrq == NULL)
2092			break;
2093
2094		LIBCFS_CPT_ALLOC(tx->tx_sge, lnet_cpt_table(), ps->ps_cpt,
2095				 (1 + IBLND_MAX_RDMA_FRAGS) *
2096				 sizeof(*tx->tx_sge));
2097		if (tx->tx_sge == NULL)
2098			break;
2099
2100		LIBCFS_CPT_ALLOC(tx->tx_rd, lnet_cpt_table(), ps->ps_cpt,
2101				 offsetof(kib_rdma_desc_t,
2102					  rd_frags[IBLND_MAX_RDMA_FRAGS]));
2103		if (tx->tx_rd == NULL)
2104			break;
2105	}
2106
2107	if (i == size) {
2108		kiblnd_map_tx_pool(tpo);
2109		*pp_po = pool;
2110		return 0;
2111	}
2112
2113	ps->ps_pool_destroy(pool);
2114	return -ENOMEM;
2115}
2116
2117static void
2118kiblnd_tx_init(kib_pool_t *pool, struct list_head *node)
2119{
2120	kib_tx_poolset_t *tps = container_of(pool->po_owner, kib_tx_poolset_t,
2121					     tps_poolset);
2122	kib_tx_t	 *tx  = list_entry(node, kib_tx_t, tx_list);
2123
2124	tx->tx_cookie = tps->tps_next_tx_cookie ++;
2125}
2126
2127static void
2128kiblnd_net_fini_pools(kib_net_t *net)
2129{
2130	int	i;
2131
2132	cfs_cpt_for_each(i, lnet_cpt_table()) {
2133		kib_tx_poolset_t	*tps;
2134		kib_fmr_poolset_t	*fps;
2135		kib_pmr_poolset_t	*pps;
2136
2137		if (net->ibn_tx_ps != NULL) {
2138			tps = net->ibn_tx_ps[i];
2139			kiblnd_fini_poolset(&tps->tps_poolset);
2140		}
2141
2142		if (net->ibn_fmr_ps != NULL) {
2143			fps = net->ibn_fmr_ps[i];
2144			kiblnd_fini_fmr_poolset(fps);
2145		}
2146
2147		if (net->ibn_pmr_ps != NULL) {
2148			pps = net->ibn_pmr_ps[i];
2149			kiblnd_fini_poolset(&pps->pps_poolset);
2150		}
2151	}
2152
2153	if (net->ibn_tx_ps != NULL) {
2154		cfs_percpt_free(net->ibn_tx_ps);
2155		net->ibn_tx_ps = NULL;
2156	}
2157
2158	if (net->ibn_fmr_ps != NULL) {
2159		cfs_percpt_free(net->ibn_fmr_ps);
2160		net->ibn_fmr_ps = NULL;
2161	}
2162
2163	if (net->ibn_pmr_ps != NULL) {
2164		cfs_percpt_free(net->ibn_pmr_ps);
2165		net->ibn_pmr_ps = NULL;
2166	}
2167}
2168
2169static int
2170kiblnd_net_init_pools(kib_net_t *net, __u32 *cpts, int ncpts)
2171{
2172	unsigned long	flags;
2173	int		cpt;
2174	int		rc;
2175	int		i;
2176
2177	read_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
2178	if (*kiblnd_tunables.kib_map_on_demand == 0 &&
2179	    net->ibn_dev->ibd_hdev->ibh_nmrs == 1) {
2180		read_unlock_irqrestore(&kiblnd_data.kib_global_lock,
2181					   flags);
2182		goto create_tx_pool;
2183	}
2184
2185	read_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
2186
2187	if (*kiblnd_tunables.kib_fmr_pool_size <
2188	    *kiblnd_tunables.kib_ntx / 4) {
2189		CERROR("Can't set fmr pool size (%d) < ntx / 4(%d)\n",
2190		       *kiblnd_tunables.kib_fmr_pool_size,
2191		       *kiblnd_tunables.kib_ntx / 4);
2192		rc = -EINVAL;
2193		goto failed;
2194	}
2195
2196	/* TX pool must be created later than FMR/PMR, see LU-2268
2197	 * for details */
2198	LASSERT(net->ibn_tx_ps == NULL);
2199
2200	/* premapping can fail if ibd_nmr > 1, so we always create
2201	 * FMR/PMR pool and map-on-demand if premapping failed */
2202
2203	net->ibn_fmr_ps = cfs_percpt_alloc(lnet_cpt_table(),
2204					   sizeof(kib_fmr_poolset_t));
2205	if (net->ibn_fmr_ps == NULL) {
2206		CERROR("Failed to allocate FMR pool array\n");
2207		rc = -ENOMEM;
2208		goto failed;
2209	}
2210
2211	for (i = 0; i < ncpts; i++) {
2212		cpt = (cpts == NULL) ? i : cpts[i];
2213		rc = kiblnd_init_fmr_poolset(net->ibn_fmr_ps[cpt], cpt, net,
2214					     kiblnd_fmr_pool_size(ncpts),
2215					     kiblnd_fmr_flush_trigger(ncpts));
2216		if (rc == -ENOSYS && i == 0) /* no FMR */
2217			break; /* create PMR pool */
2218
2219		if (rc != 0) { /* a real error */
2220			CERROR("Can't initialize FMR pool for CPT %d: %d\n",
2221			       cpt, rc);
2222			goto failed;
2223		}
2224	}
2225
2226	if (i > 0) {
2227		LASSERT(i == ncpts);
2228		goto create_tx_pool;
2229	}
2230
2231	cfs_percpt_free(net->ibn_fmr_ps);
2232	net->ibn_fmr_ps = NULL;
2233
2234	CWARN("Device does not support FMR, failing back to PMR\n");
2235
2236	if (*kiblnd_tunables.kib_pmr_pool_size <
2237	    *kiblnd_tunables.kib_ntx / 4) {
2238		CERROR("Can't set pmr pool size (%d) < ntx / 4(%d)\n",
2239		       *kiblnd_tunables.kib_pmr_pool_size,
2240		       *kiblnd_tunables.kib_ntx / 4);
2241		rc = -EINVAL;
2242		goto failed;
2243	}
2244
2245	net->ibn_pmr_ps = cfs_percpt_alloc(lnet_cpt_table(),
2246					   sizeof(kib_pmr_poolset_t));
2247	if (net->ibn_pmr_ps == NULL) {
2248		CERROR("Failed to allocate PMR pool array\n");
2249		rc = -ENOMEM;
2250		goto failed;
2251	}
2252
2253	for (i = 0; i < ncpts; i++) {
2254		cpt = (cpts == NULL) ? i : cpts[i];
2255		rc = kiblnd_init_poolset(&net->ibn_pmr_ps[cpt]->pps_poolset,
2256					 cpt, net, "PMR",
2257					 kiblnd_pmr_pool_size(ncpts),
2258					 kiblnd_create_pmr_pool,
2259					 kiblnd_destroy_pmr_pool, NULL, NULL);
2260		if (rc != 0) {
2261			CERROR("Can't initialize PMR pool for CPT %d: %d\n",
2262			       cpt, rc);
2263			goto failed;
2264		}
2265	}
2266
2267 create_tx_pool:
2268	net->ibn_tx_ps = cfs_percpt_alloc(lnet_cpt_table(),
2269					  sizeof(kib_tx_poolset_t));
2270	if (net->ibn_tx_ps == NULL) {
2271		CERROR("Failed to allocate tx pool array\n");
2272		rc = -ENOMEM;
2273		goto failed;
2274	}
2275
2276	for (i = 0; i < ncpts; i++) {
2277		cpt = (cpts == NULL) ? i : cpts[i];
2278		rc = kiblnd_init_poolset(&net->ibn_tx_ps[cpt]->tps_poolset,
2279					 cpt, net, "TX",
2280					 kiblnd_tx_pool_size(ncpts),
2281					 kiblnd_create_tx_pool,
2282					 kiblnd_destroy_tx_pool,
2283					 kiblnd_tx_init, NULL);
2284		if (rc != 0) {
2285			CERROR("Can't initialize TX pool for CPT %d: %d\n",
2286			       cpt, rc);
2287			goto failed;
2288		}
2289	}
2290
2291	return 0;
2292 failed:
2293	kiblnd_net_fini_pools(net);
2294	LASSERT(rc != 0);
2295	return rc;
2296}
2297
2298static int
2299kiblnd_hdev_get_attr(kib_hca_dev_t *hdev)
2300{
2301	struct ib_device_attr *attr;
2302	int		    rc;
2303
2304	/* It's safe to assume a HCA can handle a page size
2305	 * matching that of the native system */
2306	hdev->ibh_page_shift = PAGE_SHIFT;
2307	hdev->ibh_page_size  = 1 << PAGE_SHIFT;
2308	hdev->ibh_page_mask  = ~((__u64)hdev->ibh_page_size - 1);
2309
2310	LIBCFS_ALLOC(attr, sizeof(*attr));
2311	if (attr == NULL) {
2312		CERROR("Out of memory\n");
2313		return -ENOMEM;
2314	}
2315
2316	rc = ib_query_device(hdev->ibh_ibdev, attr);
2317	if (rc == 0)
2318		hdev->ibh_mr_size = attr->max_mr_size;
2319
2320	LIBCFS_FREE(attr, sizeof(*attr));
2321
2322	if (rc != 0) {
2323		CERROR("Failed to query IB device: %d\n", rc);
2324		return rc;
2325	}
2326
2327	if (hdev->ibh_mr_size == ~0ULL) {
2328		hdev->ibh_mr_shift = 64;
2329		return 0;
2330	}
2331
2332	for (hdev->ibh_mr_shift = 0;
2333	     hdev->ibh_mr_shift < 64; hdev->ibh_mr_shift ++) {
2334		if (hdev->ibh_mr_size == (1ULL << hdev->ibh_mr_shift) ||
2335		    hdev->ibh_mr_size == (1ULL << hdev->ibh_mr_shift) - 1)
2336			return 0;
2337	}
2338
2339	CERROR("Invalid mr size: %#llx\n", hdev->ibh_mr_size);
2340	return -EINVAL;
2341}
2342
2343static void
2344kiblnd_hdev_cleanup_mrs(kib_hca_dev_t *hdev)
2345{
2346	int     i;
2347
2348	if (hdev->ibh_nmrs == 0 || hdev->ibh_mrs == NULL)
2349		return;
2350
2351	for (i = 0; i < hdev->ibh_nmrs; i++) {
2352		if (hdev->ibh_mrs[i] == NULL)
2353			break;
2354
2355		ib_dereg_mr(hdev->ibh_mrs[i]);
2356	}
2357
2358	LIBCFS_FREE(hdev->ibh_mrs, sizeof(*hdev->ibh_mrs) * hdev->ibh_nmrs);
2359	hdev->ibh_mrs  = NULL;
2360	hdev->ibh_nmrs = 0;
2361}
2362
2363void
2364kiblnd_hdev_destroy(kib_hca_dev_t *hdev)
2365{
2366	kiblnd_hdev_cleanup_mrs(hdev);
2367
2368	if (hdev->ibh_pd != NULL)
2369		ib_dealloc_pd(hdev->ibh_pd);
2370
2371	if (hdev->ibh_cmid != NULL)
2372		rdma_destroy_id(hdev->ibh_cmid);
2373
2374	LIBCFS_FREE(hdev, sizeof(*hdev));
2375}
2376
2377static int
2378kiblnd_hdev_setup_mrs(kib_hca_dev_t *hdev)
2379{
2380	struct ib_mr *mr;
2381	int	   i;
2382	int	   rc;
2383	__u64	 mm_size;
2384	__u64	 mr_size;
2385	int	   acflags = IB_ACCESS_LOCAL_WRITE |
2386				IB_ACCESS_REMOTE_WRITE;
2387
2388	rc = kiblnd_hdev_get_attr(hdev);
2389	if (rc != 0)
2390		return rc;
2391
2392	if (hdev->ibh_mr_shift == 64) {
2393		LIBCFS_ALLOC(hdev->ibh_mrs, 1 * sizeof(*hdev->ibh_mrs));
2394		if (hdev->ibh_mrs == NULL) {
2395			CERROR("Failed to allocate MRs table\n");
2396			return -ENOMEM;
2397		}
2398
2399		hdev->ibh_mrs[0] = NULL;
2400		hdev->ibh_nmrs   = 1;
2401
2402		mr = ib_get_dma_mr(hdev->ibh_pd, acflags);
2403		if (IS_ERR(mr)) {
2404			CERROR("Failed ib_get_dma_mr : %ld\n", PTR_ERR(mr));
2405			kiblnd_hdev_cleanup_mrs(hdev);
2406			return PTR_ERR(mr);
2407		}
2408
2409		hdev->ibh_mrs[0] = mr;
2410
2411		goto out;
2412	}
2413
2414	mr_size = (1ULL << hdev->ibh_mr_shift);
2415	mm_size = (unsigned long)high_memory - PAGE_OFFSET;
2416
2417	hdev->ibh_nmrs = (int)((mm_size + mr_size - 1) >> hdev->ibh_mr_shift);
2418
2419	if (hdev->ibh_mr_shift < 32 || hdev->ibh_nmrs > 1024) {
2420		/* it's 4T..., assume we will re-code at that time */
2421		CERROR("Can't support memory size: x%#llx with MR size: x%#llx\n",
2422		       mm_size, mr_size);
2423		return -EINVAL;
2424	}
2425
2426	/* create an array of MRs to cover all memory */
2427	LIBCFS_ALLOC(hdev->ibh_mrs, sizeof(*hdev->ibh_mrs) * hdev->ibh_nmrs);
2428	if (hdev->ibh_mrs == NULL) {
2429		CERROR("Failed to allocate MRs' table\n");
2430		return -ENOMEM;
2431	}
2432
2433	for (i = 0; i < hdev->ibh_nmrs; i++) {
2434		struct ib_phys_buf ipb;
2435		__u64	      iova;
2436
2437		ipb.size = hdev->ibh_mr_size;
2438		ipb.addr = i * mr_size;
2439		iova     = ipb.addr;
2440
2441		mr = ib_reg_phys_mr(hdev->ibh_pd, &ipb, 1, acflags, &iova);
2442		if (IS_ERR(mr)) {
2443			CERROR("Failed ib_reg_phys_mr addr %#llx size %#llx : %ld\n",
2444			       ipb.addr, ipb.size, PTR_ERR(mr));
2445			kiblnd_hdev_cleanup_mrs(hdev);
2446			return PTR_ERR(mr);
2447		}
2448
2449		LASSERT (iova == ipb.addr);
2450
2451		hdev->ibh_mrs[i] = mr;
2452	}
2453
2454out:
2455	if (hdev->ibh_mr_size != ~0ULL || hdev->ibh_nmrs != 1)
2456		LCONSOLE_INFO("Register global MR array, MR size: %#llx, array size: %d\n",
2457			      hdev->ibh_mr_size, hdev->ibh_nmrs);
2458	return 0;
2459}
2460
2461static int
2462kiblnd_dummy_callback(struct rdma_cm_id *cmid, struct rdma_cm_event *event)
2463{       /* DUMMY */
2464	return 0;
2465}
2466
2467static int
2468kiblnd_dev_need_failover(kib_dev_t *dev)
2469{
2470	struct rdma_cm_id  *cmid;
2471	struct sockaddr_in  srcaddr;
2472	struct sockaddr_in  dstaddr;
2473	int		 rc;
2474
2475	if (dev->ibd_hdev == NULL || /* initializing */
2476	    dev->ibd_hdev->ibh_cmid == NULL || /* listener is dead */
2477	    *kiblnd_tunables.kib_dev_failover > 1) /* debugging */
2478		return 1;
2479
2480	/* XXX: it's UGLY, but I don't have better way to find
2481	 * ib-bonding HCA failover because:
2482	 *
2483	 * a. no reliable CM event for HCA failover...
2484	 * b. no OFED API to get ib_device for current net_device...
2485	 *
2486	 * We have only two choices at this point:
2487	 *
2488	 * a. rdma_bind_addr(), it will conflict with listener cmid
2489	 * b. rdma_resolve_addr() to zero addr */
2490	cmid = kiblnd_rdma_create_id(kiblnd_dummy_callback, dev, RDMA_PS_TCP,
2491				     IB_QPT_RC);
2492	if (IS_ERR(cmid)) {
2493		rc = PTR_ERR(cmid);
2494		CERROR("Failed to create cmid for failover: %d\n", rc);
2495		return rc;
2496	}
2497
2498	memset(&srcaddr, 0, sizeof(srcaddr));
2499	srcaddr.sin_family      = AF_INET;
2500	srcaddr.sin_addr.s_addr = (__force u32)htonl(dev->ibd_ifip);
2501
2502	memset(&dstaddr, 0, sizeof(dstaddr));
2503	dstaddr.sin_family = AF_INET;
2504	rc = rdma_resolve_addr(cmid, (struct sockaddr *)&srcaddr,
2505			       (struct sockaddr *)&dstaddr, 1);
2506	if (rc != 0 || cmid->device == NULL) {
2507		CERROR("Failed to bind %s:%pI4h to device(%p): %d\n",
2508		       dev->ibd_ifname, &dev->ibd_ifip,
2509		       cmid->device, rc);
2510		rdma_destroy_id(cmid);
2511		return rc;
2512	}
2513
2514	if (dev->ibd_hdev->ibh_ibdev == cmid->device) {
2515		/* don't need device failover */
2516		rdma_destroy_id(cmid);
2517		return 0;
2518	}
2519
2520	return 1;
2521}
2522
2523int
2524kiblnd_dev_failover(kib_dev_t *dev)
2525{
2526	LIST_HEAD      (zombie_tpo);
2527	LIST_HEAD      (zombie_ppo);
2528	LIST_HEAD      (zombie_fpo);
2529	struct rdma_cm_id  *cmid  = NULL;
2530	kib_hca_dev_t      *hdev  = NULL;
2531	kib_hca_dev_t      *old;
2532	struct ib_pd       *pd;
2533	kib_net_t	  *net;
2534	struct sockaddr_in  addr;
2535	unsigned long       flags;
2536	int		 rc = 0;
2537	int		    i;
2538
2539	LASSERT (*kiblnd_tunables.kib_dev_failover > 1 ||
2540		 dev->ibd_can_failover ||
2541		 dev->ibd_hdev == NULL);
2542
2543	rc = kiblnd_dev_need_failover(dev);
2544	if (rc <= 0)
2545		goto out;
2546
2547	if (dev->ibd_hdev != NULL &&
2548	    dev->ibd_hdev->ibh_cmid != NULL) {
2549		/* XXX it's not good to close old listener at here,
2550		 * because we can fail to create new listener.
2551		 * But we have to close it now, otherwise rdma_bind_addr
2552		 * will return EADDRINUSE... How crap! */
2553		write_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
2554
2555		cmid = dev->ibd_hdev->ibh_cmid;
2556		/* make next schedule of kiblnd_dev_need_failover()
2557		 * return 1 for me */
2558		dev->ibd_hdev->ibh_cmid  = NULL;
2559		write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
2560
2561		rdma_destroy_id(cmid);
2562	}
2563
2564	cmid = kiblnd_rdma_create_id(kiblnd_cm_callback, dev, RDMA_PS_TCP,
2565				     IB_QPT_RC);
2566	if (IS_ERR(cmid)) {
2567		rc = PTR_ERR(cmid);
2568		CERROR("Failed to create cmid for failover: %d\n", rc);
2569		goto out;
2570	}
2571
2572	memset(&addr, 0, sizeof(addr));
2573	addr.sin_family      = AF_INET;
2574	addr.sin_addr.s_addr = (__force u32)htonl(dev->ibd_ifip);
2575	addr.sin_port	= htons(*kiblnd_tunables.kib_service);
2576
2577	/* Bind to failover device or port */
2578	rc = rdma_bind_addr(cmid, (struct sockaddr *)&addr);
2579	if (rc != 0 || cmid->device == NULL) {
2580		CERROR("Failed to bind %s:%pI4h to device(%p): %d\n",
2581		       dev->ibd_ifname, &dev->ibd_ifip,
2582		       cmid->device, rc);
2583		rdma_destroy_id(cmid);
2584		goto out;
2585	}
2586
2587	LIBCFS_ALLOC(hdev, sizeof(*hdev));
2588	if (hdev == NULL) {
2589		CERROR("Failed to allocate kib_hca_dev\n");
2590		rdma_destroy_id(cmid);
2591		rc = -ENOMEM;
2592		goto out;
2593	}
2594
2595	atomic_set(&hdev->ibh_ref, 1);
2596	hdev->ibh_dev   = dev;
2597	hdev->ibh_cmid  = cmid;
2598	hdev->ibh_ibdev = cmid->device;
2599
2600	pd = ib_alloc_pd(cmid->device);
2601	if (IS_ERR(pd)) {
2602		rc = PTR_ERR(pd);
2603		CERROR("Can't allocate PD: %d\n", rc);
2604		goto out;
2605	}
2606
2607	hdev->ibh_pd = pd;
2608
2609	rc = rdma_listen(cmid, 0);
2610	if (rc != 0) {
2611		CERROR("Can't start new listener: %d\n", rc);
2612		goto out;
2613	}
2614
2615	rc = kiblnd_hdev_setup_mrs(hdev);
2616	if (rc != 0) {
2617		CERROR("Can't setup device: %d\n", rc);
2618		goto out;
2619	}
2620
2621	write_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
2622
2623	old = dev->ibd_hdev;
2624	dev->ibd_hdev = hdev; /* take over the refcount */
2625	hdev = old;
2626
2627	list_for_each_entry(net, &dev->ibd_nets, ibn_list) {
2628		cfs_cpt_for_each(i, lnet_cpt_table()) {
2629			kiblnd_fail_poolset(&net->ibn_tx_ps[i]->tps_poolset,
2630					    &zombie_tpo);
2631
2632			if (net->ibn_fmr_ps != NULL) {
2633				kiblnd_fail_fmr_poolset(net->ibn_fmr_ps[i],
2634							&zombie_fpo);
2635
2636			} else if (net->ibn_pmr_ps != NULL) {
2637				kiblnd_fail_poolset(&net->ibn_pmr_ps[i]->
2638						    pps_poolset, &zombie_ppo);
2639			}
2640		}
2641	}
2642
2643	write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
2644 out:
2645	if (!list_empty(&zombie_tpo))
2646		kiblnd_destroy_pool_list(&zombie_tpo);
2647	if (!list_empty(&zombie_ppo))
2648		kiblnd_destroy_pool_list(&zombie_ppo);
2649	if (!list_empty(&zombie_fpo))
2650		kiblnd_destroy_fmr_pool_list(&zombie_fpo);
2651	if (hdev != NULL)
2652		kiblnd_hdev_decref(hdev);
2653
2654	if (rc != 0)
2655		dev->ibd_failed_failover++;
2656	else
2657		dev->ibd_failed_failover = 0;
2658
2659	return rc;
2660}
2661
2662void
2663kiblnd_destroy_dev (kib_dev_t *dev)
2664{
2665	LASSERT (dev->ibd_nnets == 0);
2666	LASSERT (list_empty(&dev->ibd_nets));
2667
2668	list_del(&dev->ibd_fail_list);
2669	list_del(&dev->ibd_list);
2670
2671	if (dev->ibd_hdev != NULL)
2672		kiblnd_hdev_decref(dev->ibd_hdev);
2673
2674	LIBCFS_FREE(dev, sizeof(*dev));
2675}
2676
2677static kib_dev_t *
2678kiblnd_create_dev(char *ifname)
2679{
2680	struct net_device *netdev;
2681	kib_dev_t	 *dev;
2682	__u32	      netmask;
2683	__u32	      ip;
2684	int		up;
2685	int		rc;
2686
2687	rc = libcfs_ipif_query(ifname, &up, &ip, &netmask);
2688	if (rc != 0) {
2689		CERROR("Can't query IPoIB interface %s: %d\n",
2690		       ifname, rc);
2691		return NULL;
2692	}
2693
2694	if (!up) {
2695		CERROR("Can't query IPoIB interface %s: it's down\n", ifname);
2696		return NULL;
2697	}
2698
2699	LIBCFS_ALLOC(dev, sizeof(*dev));
2700	if (dev == NULL)
2701		return NULL;
2702
2703	netdev = dev_get_by_name(&init_net, ifname);
2704	if (netdev == NULL) {
2705		dev->ibd_can_failover = 0;
2706	} else {
2707		dev->ibd_can_failover = !!(netdev->flags & IFF_MASTER);
2708		dev_put(netdev);
2709	}
2710
2711	INIT_LIST_HEAD(&dev->ibd_nets);
2712	INIT_LIST_HEAD(&dev->ibd_list); /* not yet in kib_devs */
2713	INIT_LIST_HEAD(&dev->ibd_fail_list);
2714	dev->ibd_ifip = ip;
2715	strcpy(&dev->ibd_ifname[0], ifname);
2716
2717	/* initialize the device */
2718	rc = kiblnd_dev_failover(dev);
2719	if (rc != 0) {
2720		CERROR("Can't initialize device: %d\n", rc);
2721		LIBCFS_FREE(dev, sizeof(*dev));
2722		return NULL;
2723	}
2724
2725	list_add_tail(&dev->ibd_list,
2726			  &kiblnd_data.kib_devs);
2727	return dev;
2728}
2729
2730static void
2731kiblnd_base_shutdown(void)
2732{
2733	struct kib_sched_info	*sched;
2734	int			i;
2735
2736	LASSERT (list_empty(&kiblnd_data.kib_devs));
2737
2738	CDEBUG(D_MALLOC, "before LND base cleanup: kmem %d\n",
2739	       atomic_read(&libcfs_kmemory));
2740
2741	switch (kiblnd_data.kib_init) {
2742	default:
2743		LBUG();
2744
2745	case IBLND_INIT_ALL:
2746	case IBLND_INIT_DATA:
2747		LASSERT (kiblnd_data.kib_peers != NULL);
2748		for (i = 0; i < kiblnd_data.kib_peer_hash_size; i++) {
2749			LASSERT (list_empty(&kiblnd_data.kib_peers[i]));
2750		}
2751		LASSERT (list_empty(&kiblnd_data.kib_connd_zombies));
2752		LASSERT (list_empty(&kiblnd_data.kib_connd_conns));
2753
2754		/* flag threads to terminate; wake and wait for them to die */
2755		kiblnd_data.kib_shutdown = 1;
2756
2757		/* NB: we really want to stop scheduler threads net by net
2758		 * instead of the whole module, this should be improved
2759		 * with dynamic configuration LNet */
2760		cfs_percpt_for_each(sched, i, kiblnd_data.kib_scheds)
2761			wake_up_all(&sched->ibs_waitq);
2762
2763		wake_up_all(&kiblnd_data.kib_connd_waitq);
2764		wake_up_all(&kiblnd_data.kib_failover_waitq);
2765
2766		i = 2;
2767		while (atomic_read(&kiblnd_data.kib_nthreads) != 0) {
2768			i++;
2769			CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
2770			       "Waiting for %d threads to terminate\n",
2771			       atomic_read(&kiblnd_data.kib_nthreads));
2772			set_current_state(TASK_UNINTERRUPTIBLE);
2773			schedule_timeout(cfs_time_seconds(1));
2774		}
2775
2776		/* fall through */
2777
2778	case IBLND_INIT_NOTHING:
2779		break;
2780	}
2781
2782	if (kiblnd_data.kib_peers != NULL) {
2783		LIBCFS_FREE(kiblnd_data.kib_peers,
2784			    sizeof(struct list_head) *
2785			    kiblnd_data.kib_peer_hash_size);
2786	}
2787
2788	if (kiblnd_data.kib_scheds != NULL)
2789		cfs_percpt_free(kiblnd_data.kib_scheds);
2790
2791	CDEBUG(D_MALLOC, "after LND base cleanup: kmem %d\n",
2792	       atomic_read(&libcfs_kmemory));
2793
2794	kiblnd_data.kib_init = IBLND_INIT_NOTHING;
2795	module_put(THIS_MODULE);
2796}
2797
2798void
2799kiblnd_shutdown (lnet_ni_t *ni)
2800{
2801	kib_net_t	*net = ni->ni_data;
2802	rwlock_t     *g_lock = &kiblnd_data.kib_global_lock;
2803	int	       i;
2804	unsigned long     flags;
2805
2806	LASSERT(kiblnd_data.kib_init == IBLND_INIT_ALL);
2807
2808	if (net == NULL)
2809		goto out;
2810
2811	CDEBUG(D_MALLOC, "before LND net cleanup: kmem %d\n",
2812	       atomic_read(&libcfs_kmemory));
2813
2814	write_lock_irqsave(g_lock, flags);
2815	net->ibn_shutdown = 1;
2816	write_unlock_irqrestore(g_lock, flags);
2817
2818	switch (net->ibn_init) {
2819	default:
2820		LBUG();
2821
2822	case IBLND_INIT_ALL:
2823		/* nuke all existing peers within this net */
2824		kiblnd_del_peer(ni, LNET_NID_ANY);
2825
2826		/* Wait for all peer state to clean up */
2827		i = 2;
2828		while (atomic_read(&net->ibn_npeers) != 0) {
2829			i++;
2830			CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* 2**n? */
2831			       "%s: waiting for %d peers to disconnect\n",
2832			       libcfs_nid2str(ni->ni_nid),
2833			       atomic_read(&net->ibn_npeers));
2834			set_current_state(TASK_UNINTERRUPTIBLE);
2835			schedule_timeout(cfs_time_seconds(1));
2836		}
2837
2838		kiblnd_net_fini_pools(net);
2839
2840		write_lock_irqsave(g_lock, flags);
2841		LASSERT(net->ibn_dev->ibd_nnets > 0);
2842		net->ibn_dev->ibd_nnets--;
2843		list_del(&net->ibn_list);
2844		write_unlock_irqrestore(g_lock, flags);
2845
2846		/* fall through */
2847
2848	case IBLND_INIT_NOTHING:
2849		LASSERT (atomic_read(&net->ibn_nconns) == 0);
2850
2851		if (net->ibn_dev != NULL &&
2852		    net->ibn_dev->ibd_nnets == 0)
2853			kiblnd_destroy_dev(net->ibn_dev);
2854
2855		break;
2856	}
2857
2858	CDEBUG(D_MALLOC, "after LND net cleanup: kmem %d\n",
2859	       atomic_read(&libcfs_kmemory));
2860
2861	net->ibn_init = IBLND_INIT_NOTHING;
2862	ni->ni_data = NULL;
2863
2864	LIBCFS_FREE(net, sizeof(*net));
2865
2866out:
2867	if (list_empty(&kiblnd_data.kib_devs))
2868		kiblnd_base_shutdown();
2869	return;
2870}
2871
2872static int
2873kiblnd_base_startup(void)
2874{
2875	struct kib_sched_info	*sched;
2876	int			rc;
2877	int			i;
2878
2879	LASSERT (kiblnd_data.kib_init == IBLND_INIT_NOTHING);
2880
2881	try_module_get(THIS_MODULE);
2882	memset(&kiblnd_data, 0, sizeof(kiblnd_data)); /* zero pointers, flags etc */
2883
2884	rwlock_init(&kiblnd_data.kib_global_lock);
2885
2886	INIT_LIST_HEAD(&kiblnd_data.kib_devs);
2887	INIT_LIST_HEAD(&kiblnd_data.kib_failed_devs);
2888
2889	kiblnd_data.kib_peer_hash_size = IBLND_PEER_HASH_SIZE;
2890	LIBCFS_ALLOC(kiblnd_data.kib_peers,
2891		     sizeof(struct list_head) *
2892			    kiblnd_data.kib_peer_hash_size);
2893	if (kiblnd_data.kib_peers == NULL) {
2894		goto failed;
2895	}
2896	for (i = 0; i < kiblnd_data.kib_peer_hash_size; i++)
2897		INIT_LIST_HEAD(&kiblnd_data.kib_peers[i]);
2898
2899	spin_lock_init(&kiblnd_data.kib_connd_lock);
2900	INIT_LIST_HEAD(&kiblnd_data.kib_connd_conns);
2901	INIT_LIST_HEAD(&kiblnd_data.kib_connd_zombies);
2902	init_waitqueue_head(&kiblnd_data.kib_connd_waitq);
2903	init_waitqueue_head(&kiblnd_data.kib_failover_waitq);
2904
2905	kiblnd_data.kib_scheds = cfs_percpt_alloc(lnet_cpt_table(),
2906						  sizeof(*sched));
2907	if (kiblnd_data.kib_scheds == NULL)
2908		goto failed;
2909
2910	cfs_percpt_for_each(sched, i, kiblnd_data.kib_scheds) {
2911		int	nthrs;
2912
2913		spin_lock_init(&sched->ibs_lock);
2914		INIT_LIST_HEAD(&sched->ibs_conns);
2915		init_waitqueue_head(&sched->ibs_waitq);
2916
2917		nthrs = cfs_cpt_weight(lnet_cpt_table(), i);
2918		if (*kiblnd_tunables.kib_nscheds > 0) {
2919			nthrs = min(nthrs, *kiblnd_tunables.kib_nscheds);
2920		} else {
2921			/* max to half of CPUs, another half is reserved for
2922			 * upper layer modules */
2923			nthrs = min(max(IBLND_N_SCHED, nthrs >> 1), nthrs);
2924		}
2925
2926		sched->ibs_nthreads_max = nthrs;
2927		sched->ibs_cpt = i;
2928	}
2929
2930	kiblnd_data.kib_error_qpa.qp_state = IB_QPS_ERR;
2931
2932	/* lists/ptrs/locks initialised */
2933	kiblnd_data.kib_init = IBLND_INIT_DATA;
2934	/*****************************************************/
2935
2936	rc = kiblnd_thread_start(kiblnd_connd, NULL, "kiblnd_connd");
2937	if (rc != 0) {
2938		CERROR("Can't spawn o2iblnd connd: %d\n", rc);
2939		goto failed;
2940	}
2941
2942	if (*kiblnd_tunables.kib_dev_failover != 0)
2943		rc = kiblnd_thread_start(kiblnd_failover_thread, NULL,
2944					 "kiblnd_failover");
2945
2946	if (rc != 0) {
2947		CERROR("Can't spawn o2iblnd failover thread: %d\n", rc);
2948		goto failed;
2949	}
2950
2951	/* flag everything initialised */
2952	kiblnd_data.kib_init = IBLND_INIT_ALL;
2953	/*****************************************************/
2954
2955	return 0;
2956
2957 failed:
2958	kiblnd_base_shutdown();
2959	return -ENETDOWN;
2960}
2961
2962static int
2963kiblnd_start_schedulers(struct kib_sched_info *sched)
2964{
2965	int	rc = 0;
2966	int	nthrs;
2967	int	i;
2968
2969	if (sched->ibs_nthreads == 0) {
2970		if (*kiblnd_tunables.kib_nscheds > 0) {
2971			nthrs = sched->ibs_nthreads_max;
2972		} else {
2973			nthrs = cfs_cpt_weight(lnet_cpt_table(),
2974					       sched->ibs_cpt);
2975			nthrs = min(max(IBLND_N_SCHED, nthrs >> 1), nthrs);
2976			nthrs = min(IBLND_N_SCHED_HIGH, nthrs);
2977		}
2978	} else {
2979		LASSERT(sched->ibs_nthreads <= sched->ibs_nthreads_max);
2980		/* increase one thread if there is new interface */
2981		nthrs = (sched->ibs_nthreads < sched->ibs_nthreads_max);
2982	}
2983
2984	for (i = 0; i < nthrs; i++) {
2985		long	id;
2986		char	name[20];
2987		id = KIB_THREAD_ID(sched->ibs_cpt, sched->ibs_nthreads + i);
2988		snprintf(name, sizeof(name), "kiblnd_sd_%02ld_%02ld",
2989			 KIB_THREAD_CPT(id), KIB_THREAD_TID(id));
2990		rc = kiblnd_thread_start(kiblnd_scheduler, (void *)id, name);
2991		if (rc == 0)
2992			continue;
2993
2994		CERROR("Can't spawn thread %d for scheduler[%d]: %d\n",
2995		       sched->ibs_cpt, sched->ibs_nthreads + i, rc);
2996		break;
2997	}
2998
2999	sched->ibs_nthreads += i;
3000	return rc;
3001}
3002
3003static int
3004kiblnd_dev_start_threads(kib_dev_t *dev, int newdev, __u32 *cpts, int ncpts)
3005{
3006	int	cpt;
3007	int	rc;
3008	int	i;
3009
3010	for (i = 0; i < ncpts; i++) {
3011		struct kib_sched_info *sched;
3012
3013		cpt = (cpts == NULL) ? i : cpts[i];
3014		sched = kiblnd_data.kib_scheds[cpt];
3015
3016		if (!newdev && sched->ibs_nthreads > 0)
3017			continue;
3018
3019		rc = kiblnd_start_schedulers(kiblnd_data.kib_scheds[cpt]);
3020		if (rc != 0) {
3021			CERROR("Failed to start scheduler threads for %s\n",
3022			       dev->ibd_ifname);
3023			return rc;
3024		}
3025	}
3026	return 0;
3027}
3028
3029static kib_dev_t *
3030kiblnd_dev_search(char *ifname)
3031{
3032	kib_dev_t	*alias = NULL;
3033	kib_dev_t	*dev;
3034	char		*colon;
3035	char		*colon2;
3036
3037	colon = strchr(ifname, ':');
3038	list_for_each_entry(dev, &kiblnd_data.kib_devs, ibd_list) {
3039		if (strcmp(&dev->ibd_ifname[0], ifname) == 0)
3040			return dev;
3041
3042		if (alias != NULL)
3043			continue;
3044
3045		colon2 = strchr(dev->ibd_ifname, ':');
3046		if (colon != NULL)
3047			*colon = 0;
3048		if (colon2 != NULL)
3049			*colon2 = 0;
3050
3051		if (strcmp(&dev->ibd_ifname[0], ifname) == 0)
3052			alias = dev;
3053
3054		if (colon != NULL)
3055			*colon = ':';
3056		if (colon2 != NULL)
3057			*colon2 = ':';
3058	}
3059	return alias;
3060}
3061
3062int
3063kiblnd_startup (lnet_ni_t *ni)
3064{
3065	char		     *ifname;
3066	kib_dev_t		*ibdev = NULL;
3067	kib_net_t		*net;
3068	struct timeval	    tv;
3069	unsigned long	     flags;
3070	int		       rc;
3071	int			  newdev;
3072
3073	LASSERT (ni->ni_lnd == &the_o2iblnd);
3074
3075	if (kiblnd_data.kib_init == IBLND_INIT_NOTHING) {
3076		rc = kiblnd_base_startup();
3077		if (rc != 0)
3078			return rc;
3079	}
3080
3081	LIBCFS_ALLOC(net, sizeof(*net));
3082	ni->ni_data = net;
3083	if (net == NULL)
3084		goto failed;
3085
3086	do_gettimeofday(&tv);
3087	net->ibn_incarnation = (((__u64)tv.tv_sec) * 1000000) + tv.tv_usec;
3088
3089	ni->ni_peertimeout    = *kiblnd_tunables.kib_peertimeout;
3090	ni->ni_maxtxcredits   = *kiblnd_tunables.kib_credits;
3091	ni->ni_peertxcredits  = *kiblnd_tunables.kib_peertxcredits;
3092	ni->ni_peerrtrcredits = *kiblnd_tunables.kib_peerrtrcredits;
3093
3094	if (ni->ni_interfaces[0] != NULL) {
3095		/* Use the IPoIB interface specified in 'networks=' */
3096
3097		CLASSERT (LNET_MAX_INTERFACES > 1);
3098		if (ni->ni_interfaces[1] != NULL) {
3099			CERROR("Multiple interfaces not supported\n");
3100			goto failed;
3101		}
3102
3103		ifname = ni->ni_interfaces[0];
3104	} else {
3105		ifname = *kiblnd_tunables.kib_default_ipif;
3106	}
3107
3108	if (strlen(ifname) >= sizeof(ibdev->ibd_ifname)) {
3109		CERROR("IPoIB interface name too long: %s\n", ifname);
3110		goto failed;
3111	}
3112
3113	ibdev = kiblnd_dev_search(ifname);
3114
3115	newdev = ibdev == NULL;
3116	/* hmm...create kib_dev even for alias */
3117	if (ibdev == NULL || strcmp(&ibdev->ibd_ifname[0], ifname) != 0)
3118		ibdev = kiblnd_create_dev(ifname);
3119
3120	if (ibdev == NULL)
3121		goto failed;
3122
3123	net->ibn_dev = ibdev;
3124	ni->ni_nid = LNET_MKNID(LNET_NIDNET(ni->ni_nid), ibdev->ibd_ifip);
3125
3126	rc = kiblnd_dev_start_threads(ibdev, newdev,
3127				      ni->ni_cpts, ni->ni_ncpts);
3128	if (rc != 0)
3129		goto failed;
3130
3131	rc = kiblnd_net_init_pools(net, ni->ni_cpts, ni->ni_ncpts);
3132	if (rc != 0) {
3133		CERROR("Failed to initialize NI pools: %d\n", rc);
3134		goto failed;
3135	}
3136
3137	write_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
3138	ibdev->ibd_nnets++;
3139	list_add_tail(&net->ibn_list, &ibdev->ibd_nets);
3140	write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
3141
3142	net->ibn_init = IBLND_INIT_ALL;
3143
3144	return 0;
3145
3146failed:
3147	if (net->ibn_dev == NULL && ibdev != NULL)
3148		kiblnd_destroy_dev(ibdev);
3149
3150	kiblnd_shutdown(ni);
3151
3152	CDEBUG(D_NET, "kiblnd_startup failed\n");
3153	return -ENETDOWN;
3154}
3155
3156static void __exit
3157kiblnd_module_fini (void)
3158{
3159	lnet_unregister_lnd(&the_o2iblnd);
3160}
3161
3162static int __init
3163kiblnd_module_init (void)
3164{
3165	int    rc;
3166
3167	CLASSERT (sizeof(kib_msg_t) <= IBLND_MSG_SIZE);
3168	CLASSERT (offsetof(kib_msg_t, ibm_u.get.ibgm_rd.rd_frags[IBLND_MAX_RDMA_FRAGS])
3169		  <= IBLND_MSG_SIZE);
3170	CLASSERT (offsetof(kib_msg_t, ibm_u.putack.ibpam_rd.rd_frags[IBLND_MAX_RDMA_FRAGS])
3171		  <= IBLND_MSG_SIZE);
3172
3173	rc = kiblnd_tunables_init();
3174	if (rc != 0)
3175		return rc;
3176
3177	lnet_register_lnd(&the_o2iblnd);
3178
3179	return 0;
3180}
3181
3182MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
3183MODULE_DESCRIPTION("Kernel OpenIB gen2 LND v2.00");
3184MODULE_LICENSE("GPL");
3185
3186module_init(kiblnd_module_init);
3187module_exit(kiblnd_module_fini);
3188