1 // SPDX-License-Identifier: GPL-2.0-or-later
2 /* -*- mode: c; c-basic-offset: 8; -*-
3  * vim: noexpandtab sw=8 ts=8 sts=0:
4  *
5  * dlmconvert.c
6  *
7  * underlying calls for lock conversion
8  *
9  * Copyright (C) 2004 Oracle.  All rights reserved.
10  */
11 
12 
13 #include <linux/module.h>
14 #include <linux/fs.h>
15 #include <linux/types.h>
16 #include <linux/highmem.h>
17 #include <linux/init.h>
18 #include <linux/sysctl.h>
19 #include <linux/random.h>
20 #include <linux/blkdev.h>
21 #include <linux/socket.h>
22 #include <linux/inet.h>
23 #include <linux/spinlock.h>
24 
25 
26 #include "../cluster/heartbeat.h"
27 #include "../cluster/nodemanager.h"
28 #include "../cluster/tcp.h"
29 
30 #include "dlmapi.h"
31 #include "dlmcommon.h"
32 
33 #include "dlmconvert.h"
34 
35 #define MLOG_MASK_PREFIX ML_DLM
36 #include "../cluster/masklog.h"
37 
38 /* NOTE: __dlmconvert_master is the only function in here that
39  * needs a spinlock held on entry (res->spinlock) and it is the
40  * only one that holds a lock on exit (res->spinlock).
41  * All other functions in here need no locks and drop all of
42  * the locks that they acquire. */
43 static enum dlm_status __dlmconvert_master(struct dlm_ctxt *dlm,
44 					   struct dlm_lock_resource *res,
45 					   struct dlm_lock *lock, int flags,
46 					   int type, int *call_ast,
47 					   int *kick_thread);
48 static enum dlm_status dlm_send_remote_convert_request(struct dlm_ctxt *dlm,
49 					   struct dlm_lock_resource *res,
50 					   struct dlm_lock *lock, int flags, int type);
51 
52 /*
53  * this is only called directly by dlmlock(), and only when the
54  * local node is the owner of the lockres
55  * locking:
56  *   caller needs:  none
57  *   taken:         takes and drops res->spinlock
58  *   held on exit:  none
59  * returns: see __dlmconvert_master
60  */
dlmconvert_master(struct dlm_ctxt * dlm,struct dlm_lock_resource * res,struct dlm_lock * lock,int flags,int type)61 enum dlm_status dlmconvert_master(struct dlm_ctxt *dlm,
62 				  struct dlm_lock_resource *res,
63 				  struct dlm_lock *lock, int flags, int type)
64 {
65 	int call_ast = 0, kick_thread = 0;
66 	enum dlm_status status;
67 
68 	spin_lock(&res->spinlock);
69 	/* we are not in a network handler, this is fine */
70 	__dlm_wait_on_lockres(res);
71 	__dlm_lockres_reserve_ast(res);
72 	res->state |= DLM_LOCK_RES_IN_PROGRESS;
73 
74 	status = __dlmconvert_master(dlm, res, lock, flags, type,
75 				     &call_ast, &kick_thread);
76 
77 	res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
78 	spin_unlock(&res->spinlock);
79 	wake_up(&res->wq);
80 	if (status != DLM_NORMAL && status != DLM_NOTQUEUED)
81 		dlm_error(status);
82 
83 	/* either queue the ast or release it */
84 	if (call_ast)
85 		dlm_queue_ast(dlm, lock);
86 	else
87 		dlm_lockres_release_ast(dlm, res);
88 
89 	if (kick_thread)
90 		dlm_kick_thread(dlm, res);
91 
92 	return status;
93 }
94 
95 /* performs lock conversion at the lockres master site
96  * locking:
97  *   caller needs:  res->spinlock
98  *   taken:         takes and drops lock->spinlock
99  *   held on exit:  res->spinlock
100  * returns: DLM_NORMAL, DLM_NOTQUEUED, DLM_DENIED
101  *   call_ast: whether ast should be called for this lock
102  *   kick_thread: whether dlm_kick_thread should be called
103  */
__dlmconvert_master(struct dlm_ctxt * dlm,struct dlm_lock_resource * res,struct dlm_lock * lock,int flags,int type,int * call_ast,int * kick_thread)104 static enum dlm_status __dlmconvert_master(struct dlm_ctxt *dlm,
105 					   struct dlm_lock_resource *res,
106 					   struct dlm_lock *lock, int flags,
107 					   int type, int *call_ast,
108 					   int *kick_thread)
109 {
110 	enum dlm_status status = DLM_NORMAL;
111 	struct dlm_lock *tmplock=NULL;
112 
113 	assert_spin_locked(&res->spinlock);
114 
115 	mlog(0, "type=%d, convert_type=%d, new convert_type=%d\n",
116 	     lock->ml.type, lock->ml.convert_type, type);
117 
118 	spin_lock(&lock->spinlock);
119 
120 	/* already converting? */
121 	if (lock->ml.convert_type != LKM_IVMODE) {
122 		mlog(ML_ERROR, "attempted to convert a lock with a lock "
123 		     "conversion pending\n");
124 		status = DLM_DENIED;
125 		goto unlock_exit;
126 	}
127 
128 	/* must be on grant queue to convert */
129 	if (!dlm_lock_on_list(&res->granted, lock)) {
130 		mlog(ML_ERROR, "attempted to convert a lock not on grant "
131 		     "queue\n");
132 		status = DLM_DENIED;
133 		goto unlock_exit;
134 	}
135 
136 	if (flags & LKM_VALBLK) {
137 		switch (lock->ml.type) {
138 			case LKM_EXMODE:
139 				/* EX + LKM_VALBLK + convert == set lvb */
140 				mlog(0, "will set lvb: converting %s->%s\n",
141 				     dlm_lock_mode_name(lock->ml.type),
142 				     dlm_lock_mode_name(type));
143 				lock->lksb->flags |= DLM_LKSB_PUT_LVB;
144 				break;
145 			case LKM_PRMODE:
146 			case LKM_NLMODE:
147 				/* refetch if new level is not NL */
148 				if (type > LKM_NLMODE) {
149 					mlog(0, "will fetch new value into "
150 					     "lvb: converting %s->%s\n",
151 					     dlm_lock_mode_name(lock->ml.type),
152 					     dlm_lock_mode_name(type));
153 					lock->lksb->flags |= DLM_LKSB_GET_LVB;
154 				} else {
155 					mlog(0, "will NOT fetch new value "
156 					     "into lvb: converting %s->%s\n",
157 					     dlm_lock_mode_name(lock->ml.type),
158 					     dlm_lock_mode_name(type));
159 					flags &= ~(LKM_VALBLK);
160 				}
161 				break;
162 		}
163 	}
164 
165 
166 	/* in-place downconvert? */
167 	if (type <= lock->ml.type)
168 		goto grant;
169 
170 	/* upconvert from here on */
171 	status = DLM_NORMAL;
172 	list_for_each_entry(tmplock, &res->granted, list) {
173 		if (tmplock == lock)
174 			continue;
175 		if (!dlm_lock_compatible(tmplock->ml.type, type))
176 			goto switch_queues;
177 	}
178 
179 	list_for_each_entry(tmplock, &res->converting, list) {
180 		if (!dlm_lock_compatible(tmplock->ml.type, type))
181 			goto switch_queues;
182 		/* existing conversion requests take precedence */
183 		if (!dlm_lock_compatible(tmplock->ml.convert_type, type))
184 			goto switch_queues;
185 	}
186 
187 	/* fall thru to grant */
188 
189 grant:
190 	mlog(0, "res %.*s, granting %s lock\n", res->lockname.len,
191 	     res->lockname.name, dlm_lock_mode_name(type));
192 	/* immediately grant the new lock type */
193 	lock->lksb->status = DLM_NORMAL;
194 	if (lock->ml.node == dlm->node_num)
195 		mlog(0, "doing in-place convert for nonlocal lock\n");
196 	lock->ml.type = type;
197 	if (lock->lksb->flags & DLM_LKSB_PUT_LVB)
198 		memcpy(res->lvb, lock->lksb->lvb, DLM_LVB_LEN);
199 
200 	/*
201 	 * Move the lock to the tail because it may be the only lock which has
202 	 * an invalid lvb.
203 	 */
204 	list_move_tail(&lock->list, &res->granted);
205 
206 	status = DLM_NORMAL;
207 	*call_ast = 1;
208 	goto unlock_exit;
209 
210 switch_queues:
211 	if (flags & LKM_NOQUEUE) {
212 		mlog(0, "failed to convert NOQUEUE lock %.*s from "
213 		     "%d to %d...\n", res->lockname.len, res->lockname.name,
214 		     lock->ml.type, type);
215 		status = DLM_NOTQUEUED;
216 		goto unlock_exit;
217 	}
218 	mlog(0, "res %.*s, queueing...\n", res->lockname.len,
219 	     res->lockname.name);
220 
221 	lock->ml.convert_type = type;
222 	/* do not alter lock refcount.  switching lists. */
223 	list_move_tail(&lock->list, &res->converting);
224 
225 unlock_exit:
226 	spin_unlock(&lock->spinlock);
227 	if (status == DLM_DENIED) {
228 		__dlm_print_one_lock_resource(res);
229 	}
230 	if (status == DLM_NORMAL)
231 		*kick_thread = 1;
232 	return status;
233 }
234 
dlm_revert_pending_convert(struct dlm_lock_resource * res,struct dlm_lock * lock)235 void dlm_revert_pending_convert(struct dlm_lock_resource *res,
236 				struct dlm_lock *lock)
237 {
238 	/* do not alter lock refcount.  switching lists. */
239 	list_move_tail(&lock->list, &res->granted);
240 	lock->ml.convert_type = LKM_IVMODE;
241 	lock->lksb->flags &= ~(DLM_LKSB_GET_LVB|DLM_LKSB_PUT_LVB);
242 }
243 
244 /* messages the master site to do lock conversion
245  * locking:
246  *   caller needs:  none
247  *   taken:         takes and drops res->spinlock, uses DLM_LOCK_RES_IN_PROGRESS
248  *   held on exit:  none
249  * returns: DLM_NORMAL, DLM_RECOVERING, status from remote node
250  */
dlmconvert_remote(struct dlm_ctxt * dlm,struct dlm_lock_resource * res,struct dlm_lock * lock,int flags,int type)251 enum dlm_status dlmconvert_remote(struct dlm_ctxt *dlm,
252 				  struct dlm_lock_resource *res,
253 				  struct dlm_lock *lock, int flags, int type)
254 {
255 	enum dlm_status status;
256 
257 	mlog(0, "type=%d, convert_type=%d, busy=%d\n", lock->ml.type,
258 	     lock->ml.convert_type, res->state & DLM_LOCK_RES_IN_PROGRESS);
259 
260 	spin_lock(&res->spinlock);
261 	if (res->state & DLM_LOCK_RES_RECOVERING) {
262 		mlog(0, "bailing out early since res is RECOVERING "
263 		     "on secondary queue\n");
264 		/* __dlm_print_one_lock_resource(res); */
265 		status = DLM_RECOVERING;
266 		goto bail;
267 	}
268 	/* will exit this call with spinlock held */
269 	__dlm_wait_on_lockres(res);
270 
271 	if (lock->ml.convert_type != LKM_IVMODE) {
272 		__dlm_print_one_lock_resource(res);
273 		mlog(ML_ERROR, "converting a remote lock that is already "
274 		     "converting! (cookie=%u:%llu, conv=%d)\n",
275 		     dlm_get_lock_cookie_node(be64_to_cpu(lock->ml.cookie)),
276 		     dlm_get_lock_cookie_seq(be64_to_cpu(lock->ml.cookie)),
277 		     lock->ml.convert_type);
278 		status = DLM_DENIED;
279 		goto bail;
280 	}
281 
282 	if (lock->ml.type == type && lock->ml.convert_type == LKM_IVMODE) {
283 		mlog(0, "last convert request returned DLM_RECOVERING, but "
284 		     "owner has already queued and sent ast to me. res %.*s, "
285 		     "(cookie=%u:%llu, type=%d, conv=%d)\n",
286 		     res->lockname.len, res->lockname.name,
287 		     dlm_get_lock_cookie_node(be64_to_cpu(lock->ml.cookie)),
288 		     dlm_get_lock_cookie_seq(be64_to_cpu(lock->ml.cookie)),
289 		     lock->ml.type, lock->ml.convert_type);
290 		status = DLM_NORMAL;
291 		goto bail;
292 	}
293 
294 	res->state |= DLM_LOCK_RES_IN_PROGRESS;
295 	/* move lock to local convert queue */
296 	/* do not alter lock refcount.  switching lists. */
297 	list_move_tail(&lock->list, &res->converting);
298 	lock->convert_pending = 1;
299 	lock->ml.convert_type = type;
300 
301 	if (flags & LKM_VALBLK) {
302 		if (lock->ml.type == LKM_EXMODE) {
303 			flags |= LKM_PUT_LVB;
304 			lock->lksb->flags |= DLM_LKSB_PUT_LVB;
305 		} else {
306 			if (lock->ml.convert_type == LKM_NLMODE)
307 				flags &= ~LKM_VALBLK;
308 			else {
309 				flags |= LKM_GET_LVB;
310 				lock->lksb->flags |= DLM_LKSB_GET_LVB;
311 			}
312 		}
313 	}
314 	spin_unlock(&res->spinlock);
315 
316 	/* no locks held here.
317 	 * need to wait for a reply as to whether it got queued or not. */
318 	status = dlm_send_remote_convert_request(dlm, res, lock, flags, type);
319 
320 	spin_lock(&res->spinlock);
321 	res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
322 	/* if it failed, move it back to granted queue.
323 	 * if master returns DLM_NORMAL and then down before sending ast,
324 	 * it may have already been moved to granted queue, reset to
325 	 * DLM_RECOVERING and retry convert */
326 	if (status != DLM_NORMAL) {
327 		if (status != DLM_NOTQUEUED)
328 			dlm_error(status);
329 		dlm_revert_pending_convert(res, lock);
330 	} else if (!lock->convert_pending) {
331 		mlog(0, "%s: res %.*s, owner died and lock has been moved back "
332 				"to granted list, retry convert.\n",
333 				dlm->name, res->lockname.len, res->lockname.name);
334 		status = DLM_RECOVERING;
335 	}
336 
337 	lock->convert_pending = 0;
338 bail:
339 	spin_unlock(&res->spinlock);
340 
341 	/* TODO: should this be a wake_one? */
342 	/* wake up any IN_PROGRESS waiters */
343 	wake_up(&res->wq);
344 
345 	return status;
346 }
347 
348 /* sends DLM_CONVERT_LOCK_MSG to master site
349  * locking:
350  *   caller needs:  none
351  *   taken:         none
352  *   held on exit:  none
353  * returns: DLM_NOLOCKMGR, status from remote node
354  */
dlm_send_remote_convert_request(struct dlm_ctxt * dlm,struct dlm_lock_resource * res,struct dlm_lock * lock,int flags,int type)355 static enum dlm_status dlm_send_remote_convert_request(struct dlm_ctxt *dlm,
356 					   struct dlm_lock_resource *res,
357 					   struct dlm_lock *lock, int flags, int type)
358 {
359 	struct dlm_convert_lock convert;
360 	int tmpret;
361 	enum dlm_status ret;
362 	int status = 0;
363 	struct kvec vec[2];
364 	size_t veclen = 1;
365 
366 	mlog(0, "%.*s\n", res->lockname.len, res->lockname.name);
367 
368 	memset(&convert, 0, sizeof(struct dlm_convert_lock));
369 	convert.node_idx = dlm->node_num;
370 	convert.requested_type = type;
371 	convert.cookie = lock->ml.cookie;
372 	convert.namelen = res->lockname.len;
373 	convert.flags = cpu_to_be32(flags);
374 	memcpy(convert.name, res->lockname.name, convert.namelen);
375 
376 	vec[0].iov_len = sizeof(struct dlm_convert_lock);
377 	vec[0].iov_base = &convert;
378 
379 	if (flags & LKM_PUT_LVB) {
380 		/* extra data to send if we are updating lvb */
381 		vec[1].iov_len = DLM_LVB_LEN;
382 		vec[1].iov_base = lock->lksb->lvb;
383 		veclen++;
384 	}
385 
386 	tmpret = o2net_send_message_vec(DLM_CONVERT_LOCK_MSG, dlm->key,
387 					vec, veclen, res->owner, &status);
388 	if (tmpret >= 0) {
389 		// successfully sent and received
390 		ret = status;  // this is already a dlm_status
391 		if (ret == DLM_RECOVERING) {
392 			mlog(0, "node %u returned DLM_RECOVERING from convert "
393 			     "message!\n", res->owner);
394 		} else if (ret == DLM_MIGRATING) {
395 			mlog(0, "node %u returned DLM_MIGRATING from convert "
396 			     "message!\n", res->owner);
397 		} else if (ret == DLM_FORWARD) {
398 			mlog(0, "node %u returned DLM_FORWARD from convert "
399 			     "message!\n", res->owner);
400 		} else if (ret != DLM_NORMAL && ret != DLM_NOTQUEUED)
401 			dlm_error(ret);
402 	} else {
403 		mlog(ML_ERROR, "Error %d when sending message %u (key 0x%x) to "
404 		     "node %u\n", tmpret, DLM_CONVERT_LOCK_MSG, dlm->key,
405 		     res->owner);
406 		if (dlm_is_host_down(tmpret)) {
407 			/* instead of logging the same network error over
408 			 * and over, sleep here and wait for the heartbeat
409 			 * to notice the node is dead.  times out after 5s. */
410 			dlm_wait_for_node_death(dlm, res->owner,
411 						DLM_NODE_DEATH_WAIT_MAX);
412 			ret = DLM_RECOVERING;
413 			mlog(0, "node %u died so returning DLM_RECOVERING "
414 			     "from convert message!\n", res->owner);
415 		} else {
416 			ret = dlm_err_to_dlm_status(tmpret);
417 		}
418 	}
419 
420 	return ret;
421 }
422 
423 /* handler for DLM_CONVERT_LOCK_MSG on master site
424  * locking:
425  *   caller needs:  none
426  *   taken:         takes and drop res->spinlock
427  *   held on exit:  none
428  * returns: DLM_NORMAL, DLM_IVLOCKID, DLM_BADARGS,
429  *          status from __dlmconvert_master
430  */
dlm_convert_lock_handler(struct o2net_msg * msg,u32 len,void * data,void ** ret_data)431 int dlm_convert_lock_handler(struct o2net_msg *msg, u32 len, void *data,
432 			     void **ret_data)
433 {
434 	struct dlm_ctxt *dlm = data;
435 	struct dlm_convert_lock *cnv = (struct dlm_convert_lock *)msg->buf;
436 	struct dlm_lock_resource *res = NULL;
437 	struct dlm_lock *lock = NULL;
438 	struct dlm_lock *tmp_lock;
439 	struct dlm_lockstatus *lksb;
440 	enum dlm_status status = DLM_NORMAL;
441 	u32 flags;
442 	int call_ast = 0, kick_thread = 0, ast_reserved = 0, wake = 0;
443 
444 	if (!dlm_grab(dlm)) {
445 		dlm_error(DLM_REJECTED);
446 		return DLM_REJECTED;
447 	}
448 
449 	mlog_bug_on_msg(!dlm_domain_fully_joined(dlm),
450 			"Domain %s not fully joined!\n", dlm->name);
451 
452 	if (cnv->namelen > DLM_LOCKID_NAME_MAX) {
453 		status = DLM_IVBUFLEN;
454 		dlm_error(status);
455 		goto leave;
456 	}
457 
458 	flags = be32_to_cpu(cnv->flags);
459 
460 	if ((flags & (LKM_PUT_LVB|LKM_GET_LVB)) ==
461 	     (LKM_PUT_LVB|LKM_GET_LVB)) {
462 		mlog(ML_ERROR, "both PUT and GET lvb specified\n");
463 		status = DLM_BADARGS;
464 		goto leave;
465 	}
466 
467 	mlog(0, "lvb: %s\n", flags & LKM_PUT_LVB ? "put lvb" :
468 	     (flags & LKM_GET_LVB ? "get lvb" : "none"));
469 
470 	status = DLM_IVLOCKID;
471 	res = dlm_lookup_lockres(dlm, cnv->name, cnv->namelen);
472 	if (!res) {
473 		dlm_error(status);
474 		goto leave;
475 	}
476 
477 	spin_lock(&res->spinlock);
478 	status = __dlm_lockres_state_to_status(res);
479 	if (status != DLM_NORMAL) {
480 		spin_unlock(&res->spinlock);
481 		dlm_error(status);
482 		goto leave;
483 	}
484 	list_for_each_entry(tmp_lock, &res->granted, list) {
485 		if (tmp_lock->ml.cookie == cnv->cookie &&
486 		    tmp_lock->ml.node == cnv->node_idx) {
487 			lock = tmp_lock;
488 			dlm_lock_get(lock);
489 			break;
490 		}
491 	}
492 	spin_unlock(&res->spinlock);
493 	if (!lock) {
494 		status = DLM_IVLOCKID;
495 		mlog(ML_ERROR, "did not find lock to convert on grant queue! "
496 			       "cookie=%u:%llu\n",
497 		     dlm_get_lock_cookie_node(be64_to_cpu(cnv->cookie)),
498 		     dlm_get_lock_cookie_seq(be64_to_cpu(cnv->cookie)));
499 		dlm_print_one_lock_resource(res);
500 		goto leave;
501 	}
502 
503 	/* found the lock */
504 	lksb = lock->lksb;
505 
506 	/* see if caller needed to get/put lvb */
507 	if (flags & LKM_PUT_LVB) {
508 		BUG_ON(lksb->flags & (DLM_LKSB_PUT_LVB|DLM_LKSB_GET_LVB));
509 		lksb->flags |= DLM_LKSB_PUT_LVB;
510 		memcpy(&lksb->lvb[0], &cnv->lvb[0], DLM_LVB_LEN);
511 	} else if (flags & LKM_GET_LVB) {
512 		BUG_ON(lksb->flags & (DLM_LKSB_PUT_LVB|DLM_LKSB_GET_LVB));
513 		lksb->flags |= DLM_LKSB_GET_LVB;
514 	}
515 
516 	spin_lock(&res->spinlock);
517 	status = __dlm_lockres_state_to_status(res);
518 	if (status == DLM_NORMAL) {
519 		__dlm_lockres_reserve_ast(res);
520 		ast_reserved = 1;
521 		res->state |= DLM_LOCK_RES_IN_PROGRESS;
522 		status = __dlmconvert_master(dlm, res, lock, flags,
523 					     cnv->requested_type,
524 					     &call_ast, &kick_thread);
525 		res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
526 		wake = 1;
527 	}
528 	spin_unlock(&res->spinlock);
529 	if (wake)
530 		wake_up(&res->wq);
531 
532 	if (status != DLM_NORMAL) {
533 		if (status != DLM_NOTQUEUED)
534 			dlm_error(status);
535 		lksb->flags &= ~(DLM_LKSB_GET_LVB|DLM_LKSB_PUT_LVB);
536 	}
537 
538 leave:
539 	if (lock)
540 		dlm_lock_put(lock);
541 
542 	/* either queue the ast or release it, if reserved */
543 	if (call_ast)
544 		dlm_queue_ast(dlm, lock);
545 	else if (ast_reserved)
546 		dlm_lockres_release_ast(dlm, res);
547 
548 	if (kick_thread)
549 		dlm_kick_thread(dlm, res);
550 
551 	if (res)
552 		dlm_lockres_put(res);
553 
554 	dlm_put(dlm);
555 
556 	return status;
557 }
558