[go: nahoru, domu]

1/*
2 * GPL HEADER START
3 *
4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
15 *
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; if not, write to the
18 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19 * Boston, MA 021110-1307, USA
20 *
21 * GPL HEADER END
22 */
23/*
24 * Copyright (c) 2012, Intel Corporation.
25 */
26/*
27 * This file is part of Lustre, http://www.lustre.org/
28 * Lustre is a trademark of Sun Microsystems, Inc.
29 *
30 * lnet/lnet/lib-ptl.c
31 *
32 * portal & match routines
33 *
34 * Author: liang@whamcloud.com
35 */
36
37#define DEBUG_SUBSYSTEM S_LNET
38
39#include "../../include/linux/lnet/lib-lnet.h"
40
41/* NB: add /proc interfaces in upcoming patches */
42int	portal_rotor	= LNET_PTL_ROTOR_HASH_RT;
43module_param(portal_rotor, int, 0644);
44MODULE_PARM_DESC(portal_rotor, "redirect PUTs to different cpu-partitions");
45
46static int
47lnet_ptl_match_type(unsigned int index, lnet_process_id_t match_id,
48		    __u64 mbits, __u64 ignore_bits)
49{
50	struct lnet_portal	*ptl = the_lnet.ln_portals[index];
51	int			unique;
52
53	unique = ignore_bits == 0 &&
54		 match_id.nid != LNET_NID_ANY &&
55		 match_id.pid != LNET_PID_ANY;
56
57	LASSERT(!lnet_ptl_is_unique(ptl) || !lnet_ptl_is_wildcard(ptl));
58
59	/* prefer to check w/o any lock */
60	if (likely(lnet_ptl_is_unique(ptl) || lnet_ptl_is_wildcard(ptl)))
61		goto match;
62
63	/* unset, new portal */
64	lnet_ptl_lock(ptl);
65	/* check again with lock */
66	if (unlikely(lnet_ptl_is_unique(ptl) || lnet_ptl_is_wildcard(ptl))) {
67		lnet_ptl_unlock(ptl);
68		goto match;
69	}
70
71	/* still not set */
72	if (unique)
73		lnet_ptl_setopt(ptl, LNET_PTL_MATCH_UNIQUE);
74	else
75		lnet_ptl_setopt(ptl, LNET_PTL_MATCH_WILDCARD);
76
77	lnet_ptl_unlock(ptl);
78
79	return 1;
80
81 match:
82	if ((lnet_ptl_is_unique(ptl) && !unique) ||
83	    (lnet_ptl_is_wildcard(ptl) && unique))
84		return 0;
85	return 1;
86}
87
88static void
89lnet_ptl_enable_mt(struct lnet_portal *ptl, int cpt)
90{
91	struct lnet_match_table	*mtable = ptl->ptl_mtables[cpt];
92	int			i;
93
94	/* with hold of both lnet_res_lock(cpt) and lnet_ptl_lock */
95	LASSERT(lnet_ptl_is_wildcard(ptl));
96
97	mtable->mt_enabled = 1;
98
99	ptl->ptl_mt_maps[ptl->ptl_mt_nmaps] = cpt;
100	for (i = ptl->ptl_mt_nmaps - 1; i >= 0; i--) {
101		LASSERT(ptl->ptl_mt_maps[i] != cpt);
102		if (ptl->ptl_mt_maps[i] < cpt)
103			break;
104
105		/* swap to order */
106		ptl->ptl_mt_maps[i + 1] = ptl->ptl_mt_maps[i];
107		ptl->ptl_mt_maps[i] = cpt;
108	}
109
110	ptl->ptl_mt_nmaps++;
111}
112
113static void
114lnet_ptl_disable_mt(struct lnet_portal *ptl, int cpt)
115{
116	struct lnet_match_table	*mtable = ptl->ptl_mtables[cpt];
117	int			i;
118
119	/* with hold of both lnet_res_lock(cpt) and lnet_ptl_lock */
120	LASSERT(lnet_ptl_is_wildcard(ptl));
121
122	if (LNET_CPT_NUMBER == 1)
123		return; /* never disable the only match-table */
124
125	mtable->mt_enabled = 0;
126
127	LASSERT(ptl->ptl_mt_nmaps > 0 &&
128		ptl->ptl_mt_nmaps <= LNET_CPT_NUMBER);
129
130	/* remove it from mt_maps */
131	ptl->ptl_mt_nmaps--;
132	for (i = 0; i < ptl->ptl_mt_nmaps; i++) {
133		if (ptl->ptl_mt_maps[i] >= cpt) /* overwrite it */
134			ptl->ptl_mt_maps[i] = ptl->ptl_mt_maps[i + 1];
135	}
136}
137
138static int
139lnet_try_match_md(lnet_libmd_t *md,
140		  struct lnet_match_info *info, struct lnet_msg *msg)
141{
142	/* ALWAYS called holding the lnet_res_lock, and can't lnet_res_unlock;
143	 * lnet_match_blocked_msg() relies on this to avoid races */
144	unsigned int	offset;
145	unsigned int	mlength;
146	lnet_me_t	*me = md->md_me;
147
148	/* MD exhausted */
149	if (lnet_md_exhausted(md))
150		return LNET_MATCHMD_NONE | LNET_MATCHMD_EXHAUSTED;
151
152	/* mismatched MD op */
153	if ((md->md_options & info->mi_opc) == 0)
154		return LNET_MATCHMD_NONE;
155
156	/* mismatched ME nid/pid? */
157	if (me->me_match_id.nid != LNET_NID_ANY &&
158	    me->me_match_id.nid != info->mi_id.nid)
159		return LNET_MATCHMD_NONE;
160
161	if (me->me_match_id.pid != LNET_PID_ANY &&
162	    me->me_match_id.pid != info->mi_id.pid)
163		return LNET_MATCHMD_NONE;
164
165	/* mismatched ME matchbits? */
166	if (((me->me_match_bits ^ info->mi_mbits) & ~me->me_ignore_bits) != 0)
167		return LNET_MATCHMD_NONE;
168
169	/* Hurrah! This _is_ a match; check it out... */
170
171	if ((md->md_options & LNET_MD_MANAGE_REMOTE) == 0)
172		offset = md->md_offset;
173	else
174		offset = info->mi_roffset;
175
176	if ((md->md_options & LNET_MD_MAX_SIZE) != 0) {
177		mlength = md->md_max_size;
178		LASSERT(md->md_offset + mlength <= md->md_length);
179	} else {
180		mlength = md->md_length - offset;
181	}
182
183	if (info->mi_rlength <= mlength) {	/* fits in allowed space */
184		mlength = info->mi_rlength;
185	} else if ((md->md_options & LNET_MD_TRUNCATE) == 0) {
186		/* this packet _really_ is too big */
187		CERROR("Matching packet from %s, match %llu length %d too big: %d left, %d allowed\n",
188		       libcfs_id2str(info->mi_id), info->mi_mbits,
189		       info->mi_rlength, md->md_length - offset, mlength);
190
191		return LNET_MATCHMD_DROP;
192	}
193
194	/* Commit to this ME/MD */
195	CDEBUG(D_NET, "Incoming %s index %x from %s of "
196	       "length %d/%d into md %#llx [%d] + %d\n",
197	       (info->mi_opc == LNET_MD_OP_PUT) ? "put" : "get",
198	       info->mi_portal, libcfs_id2str(info->mi_id), mlength,
199	       info->mi_rlength, md->md_lh.lh_cookie, md->md_niov, offset);
200
201	lnet_msg_attach_md(msg, md, offset, mlength);
202	md->md_offset = offset + mlength;
203
204	if (!lnet_md_exhausted(md))
205		return LNET_MATCHMD_OK;
206
207	/* Auto-unlink NOW, so the ME gets unlinked if required.
208	 * We bumped md->md_refcount above so the MD just gets flagged
209	 * for unlink when it is finalized. */
210	if ((md->md_flags & LNET_MD_FLAG_AUTO_UNLINK) != 0)
211		lnet_md_unlink(md);
212
213	return LNET_MATCHMD_OK | LNET_MATCHMD_EXHAUSTED;
214}
215
216static struct lnet_match_table *
217lnet_match2mt(struct lnet_portal *ptl, lnet_process_id_t id, __u64 mbits)
218{
219	if (LNET_CPT_NUMBER == 1)
220		return ptl->ptl_mtables[0]; /* the only one */
221
222	/* if it's a unique portal, return match-table hashed by NID */
223	return lnet_ptl_is_unique(ptl) ?
224	       ptl->ptl_mtables[lnet_cpt_of_nid(id.nid)] : NULL;
225}
226
227struct lnet_match_table *
228lnet_mt_of_attach(unsigned int index, lnet_process_id_t id,
229		  __u64 mbits, __u64 ignore_bits, lnet_ins_pos_t pos)
230{
231	struct lnet_portal	*ptl;
232	struct lnet_match_table	*mtable;
233
234	/* NB: called w/o lock */
235	LASSERT(index < the_lnet.ln_nportals);
236
237	if (!lnet_ptl_match_type(index, id, mbits, ignore_bits))
238		return NULL;
239
240	ptl = the_lnet.ln_portals[index];
241
242	mtable = lnet_match2mt(ptl, id, mbits);
243	if (mtable != NULL) /* unique portal or only one match-table */
244		return mtable;
245
246	/* it's a wildcard portal */
247	switch (pos) {
248	default:
249		return NULL;
250	case LNET_INS_BEFORE:
251	case LNET_INS_AFTER:
252		/* posted by no affinity thread, always hash to specific
253		 * match-table to avoid buffer stealing which is heavy */
254		return ptl->ptl_mtables[ptl->ptl_index % LNET_CPT_NUMBER];
255	case LNET_INS_LOCAL:
256		/* posted by cpu-affinity thread */
257		return ptl->ptl_mtables[lnet_cpt_current()];
258	}
259}
260
261static struct lnet_match_table *
262lnet_mt_of_match(struct lnet_match_info *info, struct lnet_msg *msg)
263{
264	struct lnet_match_table	*mtable;
265	struct lnet_portal	*ptl;
266	int			nmaps;
267	int			rotor;
268	int			routed;
269	int			cpt;
270
271	/* NB: called w/o lock */
272	LASSERT(info->mi_portal < the_lnet.ln_nportals);
273	ptl = the_lnet.ln_portals[info->mi_portal];
274
275	LASSERT(lnet_ptl_is_wildcard(ptl) || lnet_ptl_is_unique(ptl));
276
277	mtable = lnet_match2mt(ptl, info->mi_id, info->mi_mbits);
278	if (mtable != NULL)
279		return mtable;
280
281	/* it's a wildcard portal */
282	routed = LNET_NIDNET(msg->msg_hdr.src_nid) !=
283		 LNET_NIDNET(msg->msg_hdr.dest_nid);
284
285	if (portal_rotor == LNET_PTL_ROTOR_OFF ||
286	    (portal_rotor != LNET_PTL_ROTOR_ON && !routed)) {
287		cpt = lnet_cpt_current();
288		if (ptl->ptl_mtables[cpt]->mt_enabled)
289			return ptl->ptl_mtables[cpt];
290	}
291
292	rotor = ptl->ptl_rotor++; /* get round-robin factor */
293	if (portal_rotor == LNET_PTL_ROTOR_HASH_RT && routed)
294		cpt = lnet_cpt_of_nid(msg->msg_hdr.src_nid);
295	else
296		cpt = rotor % LNET_CPT_NUMBER;
297
298	if (!ptl->ptl_mtables[cpt]->mt_enabled) {
299		/* is there any active entry for this portal? */
300		nmaps = ptl->ptl_mt_nmaps;
301		/* map to an active mtable to avoid heavy "stealing" */
302		if (nmaps != 0) {
303			/* NB: there is possibility that ptl_mt_maps is being
304			 * changed because we are not under protection of
305			 * lnet_ptl_lock, but it shouldn't hurt anything */
306			cpt = ptl->ptl_mt_maps[rotor % nmaps];
307		}
308	}
309
310	return ptl->ptl_mtables[cpt];
311}
312
313static int
314lnet_mt_test_exhausted(struct lnet_match_table *mtable, int pos)
315{
316	__u64	*bmap;
317	int	i;
318
319	if (!lnet_ptl_is_wildcard(the_lnet.ln_portals[mtable->mt_portal]))
320		return 0;
321
322	if (pos < 0) { /* check all bits */
323		for (i = 0; i < LNET_MT_EXHAUSTED_BMAP; i++) {
324			if (mtable->mt_exhausted[i] != (__u64)(-1))
325				return 0;
326		}
327		return 1;
328	}
329
330	LASSERT(pos <= LNET_MT_HASH_IGNORE);
331	/* mtable::mt_mhash[pos] is marked as exhausted or not */
332	bmap = &mtable->mt_exhausted[pos >> LNET_MT_BITS_U64];
333	pos &= (1 << LNET_MT_BITS_U64) - 1;
334
335	return ((*bmap) & (1ULL << pos)) != 0;
336}
337
338static void
339lnet_mt_set_exhausted(struct lnet_match_table *mtable, int pos, int exhausted)
340{
341	__u64	*bmap;
342
343	LASSERT(lnet_ptl_is_wildcard(the_lnet.ln_portals[mtable->mt_portal]));
344	LASSERT(pos <= LNET_MT_HASH_IGNORE);
345
346	/* set mtable::mt_mhash[pos] as exhausted/non-exhausted */
347	bmap = &mtable->mt_exhausted[pos >> LNET_MT_BITS_U64];
348	pos &= (1 << LNET_MT_BITS_U64) - 1;
349
350	if (!exhausted)
351		*bmap &= ~(1ULL << pos);
352	else
353		*bmap |= 1ULL << pos;
354}
355
356struct list_head *
357lnet_mt_match_head(struct lnet_match_table *mtable,
358		   lnet_process_id_t id, __u64 mbits)
359{
360	struct lnet_portal *ptl = the_lnet.ln_portals[mtable->mt_portal];
361
362	if (lnet_ptl_is_wildcard(ptl)) {
363		return &mtable->mt_mhash[mbits & LNET_MT_HASH_MASK];
364	} else {
365		unsigned long hash = mbits + id.nid + id.pid;
366
367		LASSERT(lnet_ptl_is_unique(ptl));
368		hash = hash_long(hash, LNET_MT_HASH_BITS);
369		return &mtable->mt_mhash[hash];
370	}
371}
372
373int
374lnet_mt_match_md(struct lnet_match_table *mtable,
375		 struct lnet_match_info *info, struct lnet_msg *msg)
376{
377	struct list_head		*head;
378	lnet_me_t		*me;
379	lnet_me_t		*tmp;
380	int			exhausted = 0;
381	int			rc;
382
383	/* any ME with ignore bits? */
384	if (!list_empty(&mtable->mt_mhash[LNET_MT_HASH_IGNORE]))
385		head = &mtable->mt_mhash[LNET_MT_HASH_IGNORE];
386	else
387		head = lnet_mt_match_head(mtable, info->mi_id, info->mi_mbits);
388 again:
389	/* NB: only wildcard portal needs to return LNET_MATCHMD_EXHAUSTED */
390	if (lnet_ptl_is_wildcard(the_lnet.ln_portals[mtable->mt_portal]))
391		exhausted = LNET_MATCHMD_EXHAUSTED;
392
393	list_for_each_entry_safe(me, tmp, head, me_list) {
394		/* ME attached but MD not attached yet */
395		if (me->me_md == NULL)
396			continue;
397
398		LASSERT(me == me->me_md->md_me);
399
400		rc = lnet_try_match_md(me->me_md, info, msg);
401		if ((rc & LNET_MATCHMD_EXHAUSTED) == 0)
402			exhausted = 0; /* mlist is not empty */
403
404		if ((rc & LNET_MATCHMD_FINISH) != 0) {
405			/* don't return EXHAUSTED bit because we don't know
406			 * whether the mlist is empty or not */
407			return rc & ~LNET_MATCHMD_EXHAUSTED;
408		}
409	}
410
411	if (exhausted == LNET_MATCHMD_EXHAUSTED) { /* @head is exhausted */
412		lnet_mt_set_exhausted(mtable, head - mtable->mt_mhash, 1);
413		if (!lnet_mt_test_exhausted(mtable, -1))
414			exhausted = 0;
415	}
416
417	if (exhausted == 0 && head == &mtable->mt_mhash[LNET_MT_HASH_IGNORE]) {
418		head = lnet_mt_match_head(mtable, info->mi_id, info->mi_mbits);
419		goto again; /* re-check MEs w/o ignore-bits */
420	}
421
422	if (info->mi_opc == LNET_MD_OP_GET ||
423	    !lnet_ptl_is_lazy(the_lnet.ln_portals[info->mi_portal]))
424		return LNET_MATCHMD_DROP | exhausted;
425
426	return LNET_MATCHMD_NONE | exhausted;
427}
428
429static int
430lnet_ptl_match_early(struct lnet_portal *ptl, struct lnet_msg *msg)
431{
432	int	rc;
433
434	/* message arrived before any buffer posting on this portal,
435	 * simply delay or drop this message */
436	if (likely(lnet_ptl_is_wildcard(ptl) || lnet_ptl_is_unique(ptl)))
437		return 0;
438
439	lnet_ptl_lock(ptl);
440	/* check it again with hold of lock */
441	if (lnet_ptl_is_wildcard(ptl) || lnet_ptl_is_unique(ptl)) {
442		lnet_ptl_unlock(ptl);
443		return 0;
444	}
445
446	if (lnet_ptl_is_lazy(ptl)) {
447		if (msg->msg_rx_ready_delay) {
448			msg->msg_rx_delayed = 1;
449			list_add_tail(&msg->msg_list,
450					  &ptl->ptl_msg_delayed);
451		}
452		rc = LNET_MATCHMD_NONE;
453	} else {
454		rc = LNET_MATCHMD_DROP;
455	}
456
457	lnet_ptl_unlock(ptl);
458	return rc;
459}
460
461static int
462lnet_ptl_match_delay(struct lnet_portal *ptl,
463		     struct lnet_match_info *info, struct lnet_msg *msg)
464{
465	int	first = ptl->ptl_mt_maps[0]; /* read w/o lock */
466	int	rc = 0;
467	int	i;
468
469	/* steal buffer from other CPTs, and delay it if nothing to steal,
470	 * this function is more expensive than a regular match, but we
471	 * don't expect it can happen a lot */
472	LASSERT(lnet_ptl_is_wildcard(ptl));
473
474	for (i = 0; i < LNET_CPT_NUMBER; i++) {
475		struct lnet_match_table *mtable;
476		int			cpt;
477
478		cpt = (first + i) % LNET_CPT_NUMBER;
479		mtable = ptl->ptl_mtables[cpt];
480		if (i != 0 && i != LNET_CPT_NUMBER - 1 && !mtable->mt_enabled)
481			continue;
482
483		lnet_res_lock(cpt);
484		lnet_ptl_lock(ptl);
485
486		if (i == 0) { /* the first try, attach on stealing list */
487			list_add_tail(&msg->msg_list,
488					  &ptl->ptl_msg_stealing);
489		}
490
491		if (!list_empty(&msg->msg_list)) { /* on stealing list */
492			rc = lnet_mt_match_md(mtable, info, msg);
493
494			if ((rc & LNET_MATCHMD_EXHAUSTED) != 0 &&
495			    mtable->mt_enabled)
496				lnet_ptl_disable_mt(ptl, cpt);
497
498			if ((rc & LNET_MATCHMD_FINISH) != 0)
499				list_del_init(&msg->msg_list);
500
501		} else {
502			/* could be matched by lnet_ptl_attach_md()
503			 * which is called by another thread */
504			rc = msg->msg_md == NULL ?
505			     LNET_MATCHMD_DROP : LNET_MATCHMD_OK;
506		}
507
508		if (!list_empty(&msg->msg_list) && /* not matched yet */
509		    (i == LNET_CPT_NUMBER - 1 || /* the last CPT */
510		     ptl->ptl_mt_nmaps == 0 ||   /* no active CPT */
511		     (ptl->ptl_mt_nmaps == 1 &&  /* the only active CPT */
512		      ptl->ptl_mt_maps[0] == cpt))) {
513			/* nothing to steal, delay or drop */
514			list_del_init(&msg->msg_list);
515
516			if (lnet_ptl_is_lazy(ptl)) {
517				msg->msg_rx_delayed = 1;
518				list_add_tail(&msg->msg_list,
519						  &ptl->ptl_msg_delayed);
520				rc = LNET_MATCHMD_NONE;
521			} else {
522				rc = LNET_MATCHMD_DROP;
523			}
524		}
525
526		lnet_ptl_unlock(ptl);
527		lnet_res_unlock(cpt);
528
529		if ((rc & LNET_MATCHMD_FINISH) != 0 || msg->msg_rx_delayed)
530			break;
531	}
532
533	return rc;
534}
535
536int
537lnet_ptl_match_md(struct lnet_match_info *info, struct lnet_msg *msg)
538{
539	struct lnet_match_table	*mtable;
540	struct lnet_portal	*ptl;
541	int			rc;
542
543	CDEBUG(D_NET, "Request from %s of length %d into portal %d MB=%#llx\n",
544	       libcfs_id2str(info->mi_id), info->mi_rlength, info->mi_portal,
545	       info->mi_mbits);
546
547	if (info->mi_portal >= the_lnet.ln_nportals) {
548		CERROR("Invalid portal %d not in [0-%d]\n",
549		       info->mi_portal, the_lnet.ln_nportals);
550		return LNET_MATCHMD_DROP;
551	}
552
553	ptl = the_lnet.ln_portals[info->mi_portal];
554	rc = lnet_ptl_match_early(ptl, msg);
555	if (rc != 0) /* matched or delayed early message */
556		return rc;
557
558	mtable = lnet_mt_of_match(info, msg);
559	lnet_res_lock(mtable->mt_cpt);
560
561	if (the_lnet.ln_shutdown) {
562		rc = LNET_MATCHMD_DROP;
563		goto out1;
564	}
565
566	rc = lnet_mt_match_md(mtable, info, msg);
567	if ((rc & LNET_MATCHMD_EXHAUSTED) != 0 && mtable->mt_enabled) {
568		lnet_ptl_lock(ptl);
569		lnet_ptl_disable_mt(ptl, mtable->mt_cpt);
570		lnet_ptl_unlock(ptl);
571	}
572
573	if ((rc & LNET_MATCHMD_FINISH) != 0)	/* matched or dropping */
574		goto out1;
575
576	if (!msg->msg_rx_ready_delay)
577		goto out1;
578
579	LASSERT(lnet_ptl_is_lazy(ptl));
580	LASSERT(!msg->msg_rx_delayed);
581
582	/* NB: we don't expect "delay" can happen a lot */
583	if (lnet_ptl_is_unique(ptl) || LNET_CPT_NUMBER == 1) {
584		lnet_ptl_lock(ptl);
585
586		msg->msg_rx_delayed = 1;
587		list_add_tail(&msg->msg_list, &ptl->ptl_msg_delayed);
588
589		lnet_ptl_unlock(ptl);
590		lnet_res_unlock(mtable->mt_cpt);
591
592	} else  {
593		lnet_res_unlock(mtable->mt_cpt);
594		rc = lnet_ptl_match_delay(ptl, info, msg);
595	}
596
597	if (msg->msg_rx_delayed) {
598		CDEBUG(D_NET,
599		       "Delaying %s from %s ptl %d MB %#llx off %d len %d\n",
600		       info->mi_opc == LNET_MD_OP_PUT ? "PUT" : "GET",
601		       libcfs_id2str(info->mi_id), info->mi_portal,
602		       info->mi_mbits, info->mi_roffset, info->mi_rlength);
603	}
604	goto out0;
605 out1:
606	lnet_res_unlock(mtable->mt_cpt);
607 out0:
608	/* EXHAUSTED bit is only meaningful for internal functions */
609	return rc & ~LNET_MATCHMD_EXHAUSTED;
610}
611
612void
613lnet_ptl_detach_md(lnet_me_t *me, lnet_libmd_t *md)
614{
615	LASSERT(me->me_md == md && md->md_me == me);
616
617	me->me_md = NULL;
618	md->md_me = NULL;
619}
620
621/* called with lnet_res_lock held */
622void
623lnet_ptl_attach_md(lnet_me_t *me, lnet_libmd_t *md,
624		   struct list_head *matches, struct list_head *drops)
625{
626	struct lnet_portal	*ptl = the_lnet.ln_portals[me->me_portal];
627	struct lnet_match_table	*mtable;
628	struct list_head		*head;
629	lnet_msg_t		*tmp;
630	lnet_msg_t		*msg;
631	int			exhausted = 0;
632	int			cpt;
633
634	LASSERT(md->md_refcount == 0); /* a brand new MD */
635
636	me->me_md = md;
637	md->md_me = me;
638
639	cpt = lnet_cpt_of_cookie(md->md_lh.lh_cookie);
640	mtable = ptl->ptl_mtables[cpt];
641
642	if (list_empty(&ptl->ptl_msg_stealing) &&
643	    list_empty(&ptl->ptl_msg_delayed) &&
644	    !lnet_mt_test_exhausted(mtable, me->me_pos))
645		return;
646
647	lnet_ptl_lock(ptl);
648	head = &ptl->ptl_msg_stealing;
649 again:
650	list_for_each_entry_safe(msg, tmp, head, msg_list) {
651		struct lnet_match_info	info;
652		lnet_hdr_t		*hdr;
653		int			rc;
654
655		LASSERT(msg->msg_rx_delayed || head == &ptl->ptl_msg_stealing);
656
657		hdr   = &msg->msg_hdr;
658		info.mi_id.nid	= hdr->src_nid;
659		info.mi_id.pid	= hdr->src_pid;
660		info.mi_opc	= LNET_MD_OP_PUT;
661		info.mi_portal	= hdr->msg.put.ptl_index;
662		info.mi_rlength	= hdr->payload_length;
663		info.mi_roffset	= hdr->msg.put.offset;
664		info.mi_mbits	= hdr->msg.put.match_bits;
665
666		rc = lnet_try_match_md(md, &info, msg);
667
668		exhausted = (rc & LNET_MATCHMD_EXHAUSTED) != 0;
669		if ((rc & LNET_MATCHMD_NONE) != 0) {
670			if (exhausted)
671				break;
672			continue;
673		}
674
675		/* Hurrah! This _is_ a match */
676		LASSERT((rc & LNET_MATCHMD_FINISH) != 0);
677		list_del_init(&msg->msg_list);
678
679		if (head == &ptl->ptl_msg_stealing) {
680			if (exhausted)
681				break;
682			/* stealing thread will handle the message */
683			continue;
684		}
685
686		if ((rc & LNET_MATCHMD_OK) != 0) {
687			list_add_tail(&msg->msg_list, matches);
688
689			CDEBUG(D_NET, "Resuming delayed PUT from %s portal %d match %llu offset %d length %d.\n",
690			       libcfs_id2str(info.mi_id),
691			       info.mi_portal, info.mi_mbits,
692			       info.mi_roffset, info.mi_rlength);
693		} else {
694			list_add_tail(&msg->msg_list, drops);
695		}
696
697		if (exhausted)
698			break;
699	}
700
701	if (!exhausted && head == &ptl->ptl_msg_stealing) {
702		head = &ptl->ptl_msg_delayed;
703		goto again;
704	}
705
706	if (lnet_ptl_is_wildcard(ptl) && !exhausted) {
707		lnet_mt_set_exhausted(mtable, me->me_pos, 0);
708		if (!mtable->mt_enabled)
709			lnet_ptl_enable_mt(ptl, cpt);
710	}
711
712	lnet_ptl_unlock(ptl);
713}
714
715static void
716lnet_ptl_cleanup(struct lnet_portal *ptl)
717{
718	struct lnet_match_table	*mtable;
719	int			i;
720
721	if (ptl->ptl_mtables == NULL) /* uninitialized portal */
722		return;
723
724	LASSERT(list_empty(&ptl->ptl_msg_delayed));
725	LASSERT(list_empty(&ptl->ptl_msg_stealing));
726	cfs_percpt_for_each(mtable, i, ptl->ptl_mtables) {
727		struct list_head	*mhash;
728		lnet_me_t	*me;
729		int		j;
730
731		if (mtable->mt_mhash == NULL) /* uninitialized match-table */
732			continue;
733
734		mhash = mtable->mt_mhash;
735		/* cleanup ME */
736		for (j = 0; j < LNET_MT_HASH_SIZE + 1; j++) {
737			while (!list_empty(&mhash[j])) {
738				me = list_entry(mhash[j].next,
739						    lnet_me_t, me_list);
740				CERROR("Active ME %p on exit\n", me);
741				list_del(&me->me_list);
742				lnet_me_free(me);
743			}
744		}
745		/* the extra entry is for MEs with ignore bits */
746		LIBCFS_FREE(mhash, sizeof(*mhash) * (LNET_MT_HASH_SIZE + 1));
747	}
748
749	cfs_percpt_free(ptl->ptl_mtables);
750	ptl->ptl_mtables = NULL;
751}
752
753static int
754lnet_ptl_setup(struct lnet_portal *ptl, int index)
755{
756	struct lnet_match_table	*mtable;
757	struct list_head		*mhash;
758	int			i;
759	int			j;
760
761	ptl->ptl_mtables = cfs_percpt_alloc(lnet_cpt_table(),
762					    sizeof(struct lnet_match_table));
763	if (ptl->ptl_mtables == NULL) {
764		CERROR("Failed to create match table for portal %d\n", index);
765		return -ENOMEM;
766	}
767
768	ptl->ptl_index = index;
769	INIT_LIST_HEAD(&ptl->ptl_msg_delayed);
770	INIT_LIST_HEAD(&ptl->ptl_msg_stealing);
771	spin_lock_init(&ptl->ptl_lock);
772	cfs_percpt_for_each(mtable, i, ptl->ptl_mtables) {
773		/* the extra entry is for MEs with ignore bits */
774		LIBCFS_CPT_ALLOC(mhash, lnet_cpt_table(), i,
775				 sizeof(*mhash) * (LNET_MT_HASH_SIZE + 1));
776		if (mhash == NULL) {
777			CERROR("Failed to create match hash for portal %d\n",
778			       index);
779			goto failed;
780		}
781
782		memset(&mtable->mt_exhausted[0], -1,
783		       sizeof(mtable->mt_exhausted[0]) *
784		       LNET_MT_EXHAUSTED_BMAP);
785		mtable->mt_mhash = mhash;
786		for (j = 0; j < LNET_MT_HASH_SIZE + 1; j++)
787			INIT_LIST_HEAD(&mhash[j]);
788
789		mtable->mt_portal = index;
790		mtable->mt_cpt = i;
791	}
792
793	return 0;
794 failed:
795	lnet_ptl_cleanup(ptl);
796	return -ENOMEM;
797}
798
799void
800lnet_portals_destroy(void)
801{
802	int	i;
803
804	if (the_lnet.ln_portals == NULL)
805		return;
806
807	for (i = 0; i < the_lnet.ln_nportals; i++)
808		lnet_ptl_cleanup(the_lnet.ln_portals[i]);
809
810	cfs_array_free(the_lnet.ln_portals);
811	the_lnet.ln_portals = NULL;
812}
813
814int
815lnet_portals_create(void)
816{
817	int	size;
818	int	i;
819
820	size = offsetof(struct lnet_portal, ptl_mt_maps[LNET_CPT_NUMBER]);
821
822	the_lnet.ln_nportals = MAX_PORTALS;
823	the_lnet.ln_portals = cfs_array_alloc(the_lnet.ln_nportals, size);
824	if (the_lnet.ln_portals == NULL) {
825		CERROR("Failed to allocate portals table\n");
826		return -ENOMEM;
827	}
828
829	for (i = 0; i < the_lnet.ln_nportals; i++) {
830		if (lnet_ptl_setup(the_lnet.ln_portals[i], i)) {
831			lnet_portals_destroy();
832			return -ENOMEM;
833		}
834	}
835
836	return 0;
837}
838
839/**
840 * Turn on the lazy portal attribute. Use with caution!
841 *
842 * This portal attribute only affects incoming PUT requests to the portal,
843 * and is off by default. By default, if there's no matching MD for an
844 * incoming PUT request, it is simply dropped. With the lazy attribute on,
845 * such requests are queued indefinitely until either a matching MD is
846 * posted to the portal or the lazy attribute is turned off.
847 *
848 * It would prevent dropped requests, however it should be regarded as the
849 * last line of defense - i.e. users must keep a close watch on active
850 * buffers on a lazy portal and once it becomes too low post more buffers as
851 * soon as possible. This is because delayed requests usually have detrimental
852 * effects on underlying network connections. A few delayed requests often
853 * suffice to bring an underlying connection to a complete halt, due to flow
854 * control mechanisms.
855 *
856 * There's also a DOS attack risk. If users don't post match-all MDs on a
857 * lazy portal, a malicious peer can easily stop a service by sending some
858 * PUT requests with match bits that won't match any MD. A routed server is
859 * especially vulnerable since the connections to its neighbor routers are
860 * shared among all clients.
861 *
862 * \param portal Index of the portal to enable the lazy attribute on.
863 *
864 * \retval 0       On success.
865 * \retval -EINVAL If \a portal is not a valid index.
866 */
867int
868LNetSetLazyPortal(int portal)
869{
870	struct lnet_portal *ptl;
871
872	if (portal < 0 || portal >= the_lnet.ln_nportals)
873		return -EINVAL;
874
875	CDEBUG(D_NET, "Setting portal %d lazy\n", portal);
876	ptl = the_lnet.ln_portals[portal];
877
878	lnet_res_lock(LNET_LOCK_EX);
879	lnet_ptl_lock(ptl);
880
881	lnet_ptl_setopt(ptl, LNET_PTL_LAZY);
882
883	lnet_ptl_unlock(ptl);
884	lnet_res_unlock(LNET_LOCK_EX);
885
886	return 0;
887}
888EXPORT_SYMBOL(LNetSetLazyPortal);
889
890/**
891 * Turn off the lazy portal attribute. Delayed requests on the portal,
892 * if any, will be all dropped when this function returns.
893 *
894 * \param portal Index of the portal to disable the lazy attribute on.
895 *
896 * \retval 0       On success.
897 * \retval -EINVAL If \a portal is not a valid index.
898 */
899int
900LNetClearLazyPortal(int portal)
901{
902	struct lnet_portal	*ptl;
903	LIST_HEAD		(zombies);
904
905	if (portal < 0 || portal >= the_lnet.ln_nportals)
906		return -EINVAL;
907
908	ptl = the_lnet.ln_portals[portal];
909
910	lnet_res_lock(LNET_LOCK_EX);
911	lnet_ptl_lock(ptl);
912
913	if (!lnet_ptl_is_lazy(ptl)) {
914		lnet_ptl_unlock(ptl);
915		lnet_res_unlock(LNET_LOCK_EX);
916		return 0;
917	}
918
919	if (the_lnet.ln_shutdown)
920		CWARN("Active lazy portal %d on exit\n", portal);
921	else
922		CDEBUG(D_NET, "clearing portal %d lazy\n", portal);
923
924	/* grab all the blocked messages atomically */
925	list_splice_init(&ptl->ptl_msg_delayed, &zombies);
926
927	lnet_ptl_unsetopt(ptl, LNET_PTL_LAZY);
928
929	lnet_ptl_unlock(ptl);
930	lnet_res_unlock(LNET_LOCK_EX);
931
932	lnet_drop_delayed_msg_list(&zombies, "Clearing lazy portal attr");
933
934	return 0;
935}
936EXPORT_SYMBOL(LNetClearLazyPortal);
937