1 // SPDX-License-Identifier: GPL-2.0-only
2 /******************************************************************************
3 *******************************************************************************
4 **
5 **  Copyright (C) 2005-2011 Red Hat, Inc.  All rights reserved.
6 **
7 **
8 *******************************************************************************
9 ******************************************************************************/
10 
11 #include "dlm_internal.h"
12 #include "lockspace.h"
13 #include "member.h"
14 #include "recoverd.h"
15 #include "recover.h"
16 #include "rcom.h"
17 #include "config.h"
18 #include "lowcomms.h"
19 
dlm_slots_version(struct dlm_header * h)20 int dlm_slots_version(struct dlm_header *h)
21 {
22 	if ((h->h_version & 0x0000FFFF) < DLM_HEADER_SLOTS)
23 		return 0;
24 	return 1;
25 }
26 
dlm_slot_save(struct dlm_ls * ls,struct dlm_rcom * rc,struct dlm_member * memb)27 void dlm_slot_save(struct dlm_ls *ls, struct dlm_rcom *rc,
28 		   struct dlm_member *memb)
29 {
30 	struct rcom_config *rf = (struct rcom_config *)rc->rc_buf;
31 
32 	if (!dlm_slots_version(&rc->rc_header))
33 		return;
34 
35 	memb->slot = le16_to_cpu(rf->rf_our_slot);
36 	memb->generation = le32_to_cpu(rf->rf_generation);
37 }
38 
dlm_slots_copy_out(struct dlm_ls * ls,struct dlm_rcom * rc)39 void dlm_slots_copy_out(struct dlm_ls *ls, struct dlm_rcom *rc)
40 {
41 	struct dlm_slot *slot;
42 	struct rcom_slot *ro;
43 	int i;
44 
45 	ro = (struct rcom_slot *)(rc->rc_buf + sizeof(struct rcom_config));
46 
47 	/* ls_slots array is sparse, but not rcom_slots */
48 
49 	for (i = 0; i < ls->ls_slots_size; i++) {
50 		slot = &ls->ls_slots[i];
51 		if (!slot->nodeid)
52 			continue;
53 		ro->ro_nodeid = cpu_to_le32(slot->nodeid);
54 		ro->ro_slot = cpu_to_le16(slot->slot);
55 		ro++;
56 	}
57 }
58 
59 #define SLOT_DEBUG_LINE 128
60 
log_slots(struct dlm_ls * ls,uint32_t gen,int num_slots,struct rcom_slot * ro0,struct dlm_slot * array,int array_size)61 static void log_slots(struct dlm_ls *ls, uint32_t gen, int num_slots,
62 		      struct rcom_slot *ro0, struct dlm_slot *array,
63 		      int array_size)
64 {
65 	char line[SLOT_DEBUG_LINE];
66 	int len = SLOT_DEBUG_LINE - 1;
67 	int pos = 0;
68 	int ret, i;
69 
70 	memset(line, 0, sizeof(line));
71 
72 	if (array) {
73 		for (i = 0; i < array_size; i++) {
74 			if (!array[i].nodeid)
75 				continue;
76 
77 			ret = snprintf(line + pos, len - pos, " %d:%d",
78 				       array[i].slot, array[i].nodeid);
79 			if (ret >= len - pos)
80 				break;
81 			pos += ret;
82 		}
83 	} else if (ro0) {
84 		for (i = 0; i < num_slots; i++) {
85 			ret = snprintf(line + pos, len - pos, " %d:%d",
86 				       ro0[i].ro_slot, ro0[i].ro_nodeid);
87 			if (ret >= len - pos)
88 				break;
89 			pos += ret;
90 		}
91 	}
92 
93 	log_rinfo(ls, "generation %u slots %d%s", gen, num_slots, line);
94 }
95 
dlm_slots_copy_in(struct dlm_ls * ls)96 int dlm_slots_copy_in(struct dlm_ls *ls)
97 {
98 	struct dlm_member *memb;
99 	struct dlm_rcom *rc = ls->ls_recover_buf;
100 	struct rcom_config *rf = (struct rcom_config *)rc->rc_buf;
101 	struct rcom_slot *ro0, *ro;
102 	int our_nodeid = dlm_our_nodeid();
103 	int i, num_slots;
104 	uint32_t gen;
105 
106 	if (!dlm_slots_version(&rc->rc_header))
107 		return -1;
108 
109 	gen = le32_to_cpu(rf->rf_generation);
110 	if (gen <= ls->ls_generation) {
111 		log_error(ls, "dlm_slots_copy_in gen %u old %u",
112 			  gen, ls->ls_generation);
113 	}
114 	ls->ls_generation = gen;
115 
116 	num_slots = le16_to_cpu(rf->rf_num_slots);
117 	if (!num_slots)
118 		return -1;
119 
120 	ro0 = (struct rcom_slot *)(rc->rc_buf + sizeof(struct rcom_config));
121 
122 	for (i = 0, ro = ro0; i < num_slots; i++, ro++) {
123 		ro->ro_nodeid = le32_to_cpu(ro->ro_nodeid);
124 		ro->ro_slot = le16_to_cpu(ro->ro_slot);
125 	}
126 
127 	log_slots(ls, gen, num_slots, ro0, NULL, 0);
128 
129 	list_for_each_entry(memb, &ls->ls_nodes, list) {
130 		for (i = 0, ro = ro0; i < num_slots; i++, ro++) {
131 			if (ro->ro_nodeid != memb->nodeid)
132 				continue;
133 			memb->slot = ro->ro_slot;
134 			memb->slot_prev = memb->slot;
135 			break;
136 		}
137 
138 		if (memb->nodeid == our_nodeid) {
139 			if (ls->ls_slot && ls->ls_slot != memb->slot) {
140 				log_error(ls, "dlm_slots_copy_in our slot "
141 					  "changed %d %d", ls->ls_slot,
142 					  memb->slot);
143 				return -1;
144 			}
145 
146 			if (!ls->ls_slot)
147 				ls->ls_slot = memb->slot;
148 		}
149 
150 		if (!memb->slot) {
151 			log_error(ls, "dlm_slots_copy_in nodeid %d no slot",
152 				   memb->nodeid);
153 			return -1;
154 		}
155 	}
156 
157 	return 0;
158 }
159 
160 /* for any nodes that do not support slots, we will not have set memb->slot
161    in wait_status_all(), so memb->slot will remain -1, and we will not
162    assign slots or set ls_num_slots here */
163 
dlm_slots_assign(struct dlm_ls * ls,int * num_slots,int * slots_size,struct dlm_slot ** slots_out,uint32_t * gen_out)164 int dlm_slots_assign(struct dlm_ls *ls, int *num_slots, int *slots_size,
165 		     struct dlm_slot **slots_out, uint32_t *gen_out)
166 {
167 	struct dlm_member *memb;
168 	struct dlm_slot *array;
169 	int our_nodeid = dlm_our_nodeid();
170 	int array_size, max_slots, i;
171 	int need = 0;
172 	int max = 0;
173 	int num = 0;
174 	uint32_t gen = 0;
175 
176 	/* our own memb struct will have slot -1 gen 0 */
177 
178 	list_for_each_entry(memb, &ls->ls_nodes, list) {
179 		if (memb->nodeid == our_nodeid) {
180 			memb->slot = ls->ls_slot;
181 			memb->generation = ls->ls_generation;
182 			break;
183 		}
184 	}
185 
186 	list_for_each_entry(memb, &ls->ls_nodes, list) {
187 		if (memb->generation > gen)
188 			gen = memb->generation;
189 
190 		/* node doesn't support slots */
191 
192 		if (memb->slot == -1)
193 			return -1;
194 
195 		/* node needs a slot assigned */
196 
197 		if (!memb->slot)
198 			need++;
199 
200 		/* node has a slot assigned */
201 
202 		num++;
203 
204 		if (!max || max < memb->slot)
205 			max = memb->slot;
206 
207 		/* sanity check, once slot is assigned it shouldn't change */
208 
209 		if (memb->slot_prev && memb->slot && memb->slot_prev != memb->slot) {
210 			log_error(ls, "nodeid %d slot changed %d %d",
211 				  memb->nodeid, memb->slot_prev, memb->slot);
212 			return -1;
213 		}
214 		memb->slot_prev = memb->slot;
215 	}
216 
217 	array_size = max + need;
218 	array = kcalloc(array_size, sizeof(*array), GFP_NOFS);
219 	if (!array)
220 		return -ENOMEM;
221 
222 	num = 0;
223 
224 	/* fill in slots (offsets) that are used */
225 
226 	list_for_each_entry(memb, &ls->ls_nodes, list) {
227 		if (!memb->slot)
228 			continue;
229 
230 		if (memb->slot > array_size) {
231 			log_error(ls, "invalid slot number %d", memb->slot);
232 			kfree(array);
233 			return -1;
234 		}
235 
236 		array[memb->slot - 1].nodeid = memb->nodeid;
237 		array[memb->slot - 1].slot = memb->slot;
238 		num++;
239 	}
240 
241 	/* assign new slots from unused offsets */
242 
243 	list_for_each_entry(memb, &ls->ls_nodes, list) {
244 		if (memb->slot)
245 			continue;
246 
247 		for (i = 0; i < array_size; i++) {
248 			if (array[i].nodeid)
249 				continue;
250 
251 			memb->slot = i + 1;
252 			memb->slot_prev = memb->slot;
253 			array[i].nodeid = memb->nodeid;
254 			array[i].slot = memb->slot;
255 			num++;
256 
257 			if (!ls->ls_slot && memb->nodeid == our_nodeid)
258 				ls->ls_slot = memb->slot;
259 			break;
260 		}
261 
262 		if (!memb->slot) {
263 			log_error(ls, "no free slot found");
264 			kfree(array);
265 			return -1;
266 		}
267 	}
268 
269 	gen++;
270 
271 	log_slots(ls, gen, num, NULL, array, array_size);
272 
273 	max_slots = (dlm_config.ci_buffer_size - sizeof(struct dlm_rcom) -
274 		     sizeof(struct rcom_config)) / sizeof(struct rcom_slot);
275 
276 	if (num > max_slots) {
277 		log_error(ls, "num_slots %d exceeds max_slots %d",
278 			  num, max_slots);
279 		kfree(array);
280 		return -1;
281 	}
282 
283 	*gen_out = gen;
284 	*slots_out = array;
285 	*slots_size = array_size;
286 	*num_slots = num;
287 	return 0;
288 }
289 
add_ordered_member(struct dlm_ls * ls,struct dlm_member * new)290 static void add_ordered_member(struct dlm_ls *ls, struct dlm_member *new)
291 {
292 	struct dlm_member *memb = NULL;
293 	struct list_head *tmp;
294 	struct list_head *newlist = &new->list;
295 	struct list_head *head = &ls->ls_nodes;
296 
297 	list_for_each(tmp, head) {
298 		memb = list_entry(tmp, struct dlm_member, list);
299 		if (new->nodeid < memb->nodeid)
300 			break;
301 	}
302 
303 	if (!memb)
304 		list_add_tail(newlist, head);
305 	else {
306 		/* FIXME: can use list macro here */
307 		newlist->prev = tmp->prev;
308 		newlist->next = tmp;
309 		tmp->prev->next = newlist;
310 		tmp->prev = newlist;
311 	}
312 }
313 
dlm_add_member(struct dlm_ls * ls,struct dlm_config_node * node)314 static int dlm_add_member(struct dlm_ls *ls, struct dlm_config_node *node)
315 {
316 	struct dlm_member *memb;
317 	int error;
318 
319 	memb = kzalloc(sizeof(*memb), GFP_NOFS);
320 	if (!memb)
321 		return -ENOMEM;
322 
323 	error = dlm_lowcomms_connect_node(node->nodeid);
324 	if (error < 0) {
325 		kfree(memb);
326 		return error;
327 	}
328 
329 	memb->nodeid = node->nodeid;
330 	memb->weight = node->weight;
331 	memb->comm_seq = node->comm_seq;
332 	add_ordered_member(ls, memb);
333 	ls->ls_num_nodes++;
334 	return 0;
335 }
336 
find_memb(struct list_head * head,int nodeid)337 static struct dlm_member *find_memb(struct list_head *head, int nodeid)
338 {
339 	struct dlm_member *memb;
340 
341 	list_for_each_entry(memb, head, list) {
342 		if (memb->nodeid == nodeid)
343 			return memb;
344 	}
345 	return NULL;
346 }
347 
dlm_is_member(struct dlm_ls * ls,int nodeid)348 int dlm_is_member(struct dlm_ls *ls, int nodeid)
349 {
350 	if (find_memb(&ls->ls_nodes, nodeid))
351 		return 1;
352 	return 0;
353 }
354 
dlm_is_removed(struct dlm_ls * ls,int nodeid)355 int dlm_is_removed(struct dlm_ls *ls, int nodeid)
356 {
357 	if (find_memb(&ls->ls_nodes_gone, nodeid))
358 		return 1;
359 	return 0;
360 }
361 
clear_memb_list(struct list_head * head)362 static void clear_memb_list(struct list_head *head)
363 {
364 	struct dlm_member *memb;
365 
366 	while (!list_empty(head)) {
367 		memb = list_entry(head->next, struct dlm_member, list);
368 		list_del(&memb->list);
369 		kfree(memb);
370 	}
371 }
372 
dlm_clear_members(struct dlm_ls * ls)373 void dlm_clear_members(struct dlm_ls *ls)
374 {
375 	clear_memb_list(&ls->ls_nodes);
376 	ls->ls_num_nodes = 0;
377 }
378 
dlm_clear_members_gone(struct dlm_ls * ls)379 void dlm_clear_members_gone(struct dlm_ls *ls)
380 {
381 	clear_memb_list(&ls->ls_nodes_gone);
382 }
383 
make_member_array(struct dlm_ls * ls)384 static void make_member_array(struct dlm_ls *ls)
385 {
386 	struct dlm_member *memb;
387 	int i, w, x = 0, total = 0, all_zero = 0, *array;
388 
389 	kfree(ls->ls_node_array);
390 	ls->ls_node_array = NULL;
391 
392 	list_for_each_entry(memb, &ls->ls_nodes, list) {
393 		if (memb->weight)
394 			total += memb->weight;
395 	}
396 
397 	/* all nodes revert to weight of 1 if all have weight 0 */
398 
399 	if (!total) {
400 		total = ls->ls_num_nodes;
401 		all_zero = 1;
402 	}
403 
404 	ls->ls_total_weight = total;
405 	array = kmalloc_array(total, sizeof(*array), GFP_NOFS);
406 	if (!array)
407 		return;
408 
409 	list_for_each_entry(memb, &ls->ls_nodes, list) {
410 		if (!all_zero && !memb->weight)
411 			continue;
412 
413 		if (all_zero)
414 			w = 1;
415 		else
416 			w = memb->weight;
417 
418 		DLM_ASSERT(x < total, printk("total %d x %d\n", total, x););
419 
420 		for (i = 0; i < w; i++)
421 			array[x++] = memb->nodeid;
422 	}
423 
424 	ls->ls_node_array = array;
425 }
426 
427 /* send a status request to all members just to establish comms connections */
428 
ping_members(struct dlm_ls * ls)429 static int ping_members(struct dlm_ls *ls)
430 {
431 	struct dlm_member *memb;
432 	int error = 0;
433 
434 	list_for_each_entry(memb, &ls->ls_nodes, list) {
435 		error = dlm_recovery_stopped(ls);
436 		if (error)
437 			break;
438 		error = dlm_rcom_status(ls, memb->nodeid, 0);
439 		if (error)
440 			break;
441 	}
442 	if (error)
443 		log_rinfo(ls, "ping_members aborted %d last nodeid %d",
444 			  error, ls->ls_recover_nodeid);
445 	return error;
446 }
447 
dlm_lsop_recover_prep(struct dlm_ls * ls)448 static void dlm_lsop_recover_prep(struct dlm_ls *ls)
449 {
450 	if (!ls->ls_ops || !ls->ls_ops->recover_prep)
451 		return;
452 	ls->ls_ops->recover_prep(ls->ls_ops_arg);
453 }
454 
dlm_lsop_recover_slot(struct dlm_ls * ls,struct dlm_member * memb)455 static void dlm_lsop_recover_slot(struct dlm_ls *ls, struct dlm_member *memb)
456 {
457 	struct dlm_slot slot;
458 	uint32_t seq;
459 	int error;
460 
461 	if (!ls->ls_ops || !ls->ls_ops->recover_slot)
462 		return;
463 
464 	/* if there is no comms connection with this node
465 	   or the present comms connection is newer
466 	   than the one when this member was added, then
467 	   we consider the node to have failed (versus
468 	   being removed due to dlm_release_lockspace) */
469 
470 	error = dlm_comm_seq(memb->nodeid, &seq);
471 
472 	if (!error && seq == memb->comm_seq)
473 		return;
474 
475 	slot.nodeid = memb->nodeid;
476 	slot.slot = memb->slot;
477 
478 	ls->ls_ops->recover_slot(ls->ls_ops_arg, &slot);
479 }
480 
dlm_lsop_recover_done(struct dlm_ls * ls)481 void dlm_lsop_recover_done(struct dlm_ls *ls)
482 {
483 	struct dlm_member *memb;
484 	struct dlm_slot *slots;
485 	int i, num;
486 
487 	if (!ls->ls_ops || !ls->ls_ops->recover_done)
488 		return;
489 
490 	num = ls->ls_num_nodes;
491 	slots = kcalloc(num, sizeof(*slots), GFP_KERNEL);
492 	if (!slots)
493 		return;
494 
495 	i = 0;
496 	list_for_each_entry(memb, &ls->ls_nodes, list) {
497 		if (i == num) {
498 			log_error(ls, "dlm_lsop_recover_done bad num %d", num);
499 			goto out;
500 		}
501 		slots[i].nodeid = memb->nodeid;
502 		slots[i].slot = memb->slot;
503 		i++;
504 	}
505 
506 	ls->ls_ops->recover_done(ls->ls_ops_arg, slots, num,
507 				 ls->ls_slot, ls->ls_generation);
508  out:
509 	kfree(slots);
510 }
511 
find_config_node(struct dlm_recover * rv,int nodeid)512 static struct dlm_config_node *find_config_node(struct dlm_recover *rv,
513 						int nodeid)
514 {
515 	int i;
516 
517 	for (i = 0; i < rv->nodes_count; i++) {
518 		if (rv->nodes[i].nodeid == nodeid)
519 			return &rv->nodes[i];
520 	}
521 	return NULL;
522 }
523 
dlm_recover_members(struct dlm_ls * ls,struct dlm_recover * rv,int * neg_out)524 int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv, int *neg_out)
525 {
526 	struct dlm_member *memb, *safe;
527 	struct dlm_config_node *node;
528 	int i, error, neg = 0, low = -1;
529 
530 	/* previously removed members that we've not finished removing need to
531 	   count as a negative change so the "neg" recovery steps will happen */
532 
533 	list_for_each_entry(memb, &ls->ls_nodes_gone, list) {
534 		log_rinfo(ls, "prev removed member %d", memb->nodeid);
535 		neg++;
536 	}
537 
538 	/* move departed members from ls_nodes to ls_nodes_gone */
539 
540 	list_for_each_entry_safe(memb, safe, &ls->ls_nodes, list) {
541 		node = find_config_node(rv, memb->nodeid);
542 		if (node && !node->new)
543 			continue;
544 
545 		if (!node) {
546 			log_rinfo(ls, "remove member %d", memb->nodeid);
547 		} else {
548 			/* removed and re-added */
549 			log_rinfo(ls, "remove member %d comm_seq %u %u",
550 				  memb->nodeid, memb->comm_seq, node->comm_seq);
551 		}
552 
553 		neg++;
554 		list_move(&memb->list, &ls->ls_nodes_gone);
555 		ls->ls_num_nodes--;
556 		dlm_lsop_recover_slot(ls, memb);
557 	}
558 
559 	/* add new members to ls_nodes */
560 
561 	for (i = 0; i < rv->nodes_count; i++) {
562 		node = &rv->nodes[i];
563 		if (dlm_is_member(ls, node->nodeid))
564 			continue;
565 		dlm_add_member(ls, node);
566 		log_rinfo(ls, "add member %d", node->nodeid);
567 	}
568 
569 	list_for_each_entry(memb, &ls->ls_nodes, list) {
570 		if (low == -1 || memb->nodeid < low)
571 			low = memb->nodeid;
572 	}
573 	ls->ls_low_nodeid = low;
574 
575 	make_member_array(ls);
576 	*neg_out = neg;
577 
578 	error = ping_members(ls);
579 	if (!error || error == -EPROTO) {
580 		/* new_lockspace() may be waiting to know if the config
581 		   is good or bad */
582 		ls->ls_members_result = error;
583 		complete(&ls->ls_members_done);
584 	}
585 
586 	log_rinfo(ls, "dlm_recover_members %d nodes", ls->ls_num_nodes);
587 	return error;
588 }
589 
590 /* Userspace guarantees that dlm_ls_stop() has completed on all nodes before
591    dlm_ls_start() is called on any of them to start the new recovery. */
592 
dlm_ls_stop(struct dlm_ls * ls)593 int dlm_ls_stop(struct dlm_ls *ls)
594 {
595 	int new;
596 
597 	/*
598 	 * Prevent dlm_recv from being in the middle of something when we do
599 	 * the stop.  This includes ensuring dlm_recv isn't processing a
600 	 * recovery message (rcom), while dlm_recoverd is aborting and
601 	 * resetting things from an in-progress recovery.  i.e. we want
602 	 * dlm_recoverd to abort its recovery without worrying about dlm_recv
603 	 * processing an rcom at the same time.  Stopping dlm_recv also makes
604 	 * it easy for dlm_receive_message() to check locking stopped and add a
605 	 * message to the requestqueue without races.
606 	 */
607 
608 	down_write(&ls->ls_recv_active);
609 
610 	/*
611 	 * Abort any recovery that's in progress (see RECOVER_STOP,
612 	 * dlm_recovery_stopped()) and tell any other threads running in the
613 	 * dlm to quit any processing (see RUNNING, dlm_locking_stopped()).
614 	 */
615 
616 	spin_lock(&ls->ls_recover_lock);
617 	set_bit(LSFL_RECOVER_STOP, &ls->ls_flags);
618 	new = test_and_clear_bit(LSFL_RUNNING, &ls->ls_flags);
619 	ls->ls_recover_seq++;
620 	spin_unlock(&ls->ls_recover_lock);
621 
622 	/*
623 	 * Let dlm_recv run again, now any normal messages will be saved on the
624 	 * requestqueue for later.
625 	 */
626 
627 	up_write(&ls->ls_recv_active);
628 
629 	/*
630 	 * This in_recovery lock does two things:
631 	 * 1) Keeps this function from returning until all threads are out
632 	 *    of locking routines and locking is truly stopped.
633 	 * 2) Keeps any new requests from being processed until it's unlocked
634 	 *    when recovery is complete.
635 	 */
636 
637 	if (new) {
638 		set_bit(LSFL_RECOVER_DOWN, &ls->ls_flags);
639 		wake_up_process(ls->ls_recoverd_task);
640 		wait_event(ls->ls_recover_lock_wait,
641 			   test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
642 	}
643 
644 	/*
645 	 * The recoverd suspend/resume makes sure that dlm_recoverd (if
646 	 * running) has noticed RECOVER_STOP above and quit processing the
647 	 * previous recovery.
648 	 */
649 
650 	dlm_recoverd_suspend(ls);
651 
652 	spin_lock(&ls->ls_recover_lock);
653 	kfree(ls->ls_slots);
654 	ls->ls_slots = NULL;
655 	ls->ls_num_slots = 0;
656 	ls->ls_slots_size = 0;
657 	ls->ls_recover_status = 0;
658 	spin_unlock(&ls->ls_recover_lock);
659 
660 	dlm_recoverd_resume(ls);
661 
662 	if (!ls->ls_recover_begin)
663 		ls->ls_recover_begin = jiffies;
664 
665 	dlm_lsop_recover_prep(ls);
666 	return 0;
667 }
668 
dlm_ls_start(struct dlm_ls * ls)669 int dlm_ls_start(struct dlm_ls *ls)
670 {
671 	struct dlm_recover *rv, *rv_old;
672 	struct dlm_config_node *nodes = NULL;
673 	int error, count;
674 
675 	rv = kzalloc(sizeof(*rv), GFP_NOFS);
676 	if (!rv)
677 		return -ENOMEM;
678 
679 	error = dlm_config_nodes(ls->ls_name, &nodes, &count);
680 	if (error < 0)
681 		goto fail_rv;
682 
683 	spin_lock(&ls->ls_recover_lock);
684 
685 	/* the lockspace needs to be stopped before it can be started */
686 
687 	if (!dlm_locking_stopped(ls)) {
688 		spin_unlock(&ls->ls_recover_lock);
689 		log_error(ls, "start ignored: lockspace running");
690 		error = -EINVAL;
691 		goto fail;
692 	}
693 
694 	rv->nodes = nodes;
695 	rv->nodes_count = count;
696 	rv->seq = ++ls->ls_recover_seq;
697 	rv_old = ls->ls_recover_args;
698 	ls->ls_recover_args = rv;
699 	spin_unlock(&ls->ls_recover_lock);
700 
701 	if (rv_old) {
702 		log_error(ls, "unused recovery %llx %d",
703 			  (unsigned long long)rv_old->seq, rv_old->nodes_count);
704 		kfree(rv_old->nodes);
705 		kfree(rv_old);
706 	}
707 
708 	set_bit(LSFL_RECOVER_WORK, &ls->ls_flags);
709 	wake_up_process(ls->ls_recoverd_task);
710 	return 0;
711 
712  fail:
713 	kfree(nodes);
714  fail_rv:
715 	kfree(rv);
716 	return error;
717 }
718 
719