Lines Matching refs:dlm

40 static void dlm_do_local_recovery_cleanup(struct dlm_ctxt *dlm, u8 dead_node);
43 static int dlm_do_recovery(struct dlm_ctxt *dlm);
45 static int dlm_pick_recovery_master(struct dlm_ctxt *dlm);
46 static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node);
47 static int dlm_init_recovery_area(struct dlm_ctxt *dlm, u8 dead_node);
48 static int dlm_request_all_locks(struct dlm_ctxt *dlm,
50 static void dlm_destroy_recovery_area(struct dlm_ctxt *dlm);
57 static int dlm_send_mig_lockres_msg(struct dlm_ctxt *dlm,
62 static int dlm_process_recovery_data(struct dlm_ctxt *dlm,
65 static int dlm_send_finalize_reco_message(struct dlm_ctxt *dlm);
66 static int dlm_send_all_done_msg(struct dlm_ctxt *dlm,
68 static int dlm_send_begin_reco_message(struct dlm_ctxt *dlm, u8 dead_node);
69 static void dlm_move_reco_locks_to_list(struct dlm_ctxt *dlm,
71 static void dlm_finish_local_lockres_recovery(struct dlm_ctxt *dlm,
79 static int dlm_lockres_master_requery(struct dlm_ctxt *dlm,
102 static inline void dlm_set_reco_dead_node(struct dlm_ctxt *dlm, in dlm_set_reco_dead_node() argument
105 assert_spin_locked(&dlm->spinlock); in dlm_set_reco_dead_node()
106 if (dlm->reco.dead_node != dead_node) in dlm_set_reco_dead_node()
108 dlm->name, dlm->reco.dead_node, dead_node); in dlm_set_reco_dead_node()
109 dlm->reco.dead_node = dead_node; in dlm_set_reco_dead_node()
112 static inline void dlm_set_reco_master(struct dlm_ctxt *dlm, in dlm_set_reco_master() argument
115 assert_spin_locked(&dlm->spinlock); in dlm_set_reco_master()
117 dlm->name, dlm->reco.new_master, master); in dlm_set_reco_master()
118 dlm->reco.new_master = master; in dlm_set_reco_master()
121 static inline void __dlm_reset_recovery(struct dlm_ctxt *dlm) in __dlm_reset_recovery() argument
123 assert_spin_locked(&dlm->spinlock); in __dlm_reset_recovery()
124 clear_bit(dlm->reco.dead_node, dlm->recovery_map); in __dlm_reset_recovery()
125 dlm_set_reco_dead_node(dlm, O2NM_INVALID_NODE_NUM); in __dlm_reset_recovery()
126 dlm_set_reco_master(dlm, O2NM_INVALID_NODE_NUM); in __dlm_reset_recovery()
129 static inline void dlm_reset_recovery(struct dlm_ctxt *dlm) in dlm_reset_recovery() argument
131 spin_lock(&dlm->spinlock); in dlm_reset_recovery()
132 __dlm_reset_recovery(dlm); in dlm_reset_recovery()
133 spin_unlock(&dlm->spinlock); in dlm_reset_recovery()
139 struct dlm_ctxt *dlm = in dlm_dispatch_work() local
146 spin_lock(&dlm->work_lock); in dlm_dispatch_work()
147 list_splice_init(&dlm->work_list, &tmp_list); in dlm_dispatch_work()
148 spin_unlock(&dlm->work_lock); in dlm_dispatch_work()
153 mlog(0, "%s: work thread has %d work items\n", dlm->name, tot); in dlm_dispatch_work()
161 BUG_ON(item->dlm != dlm); in dlm_dispatch_work()
167 dlm_put(dlm); in dlm_dispatch_work()
176 void dlm_kick_recovery_thread(struct dlm_ctxt *dlm) in dlm_kick_recovery_thread() argument
184 wake_up(&dlm->dlm_reco_thread_wq); in dlm_kick_recovery_thread()
188 int dlm_launch_recovery_thread(struct dlm_ctxt *dlm) in dlm_launch_recovery_thread() argument
192 dlm->dlm_reco_thread_task = kthread_run(dlm_recovery_thread, dlm, in dlm_launch_recovery_thread()
193 "dlm_reco-%s", dlm->name); in dlm_launch_recovery_thread()
194 if (IS_ERR(dlm->dlm_reco_thread_task)) { in dlm_launch_recovery_thread()
195 mlog_errno(PTR_ERR(dlm->dlm_reco_thread_task)); in dlm_launch_recovery_thread()
196 dlm->dlm_reco_thread_task = NULL; in dlm_launch_recovery_thread()
203 void dlm_complete_recovery_thread(struct dlm_ctxt *dlm) in dlm_complete_recovery_thread() argument
205 if (dlm->dlm_reco_thread_task) { in dlm_complete_recovery_thread()
207 kthread_stop(dlm->dlm_reco_thread_task); in dlm_complete_recovery_thread()
208 dlm->dlm_reco_thread_task = NULL; in dlm_complete_recovery_thread()
237 static void dlm_print_reco_node_status(struct dlm_ctxt *dlm) in dlm_print_reco_node_status() argument
243 dlm->name, task_pid_nr(dlm->dlm_reco_thread_task), in dlm_print_reco_node_status()
244 dlm->reco.state & DLM_RECO_STATE_ACTIVE ? "ACTIVE" : "inactive", in dlm_print_reco_node_status()
245 dlm->reco.dead_node, dlm->reco.new_master); in dlm_print_reco_node_status()
247 list_for_each_entry(ndata, &dlm->reco.node_data, list) { in dlm_print_reco_node_status()
276 dlm->name, ndata->node_num, st); in dlm_print_reco_node_status()
278 list_for_each_entry(res, &dlm->reco.resources, recovering) { in dlm_print_reco_node_status()
280 dlm->name, res->lockname.len, res->lockname.name); in dlm_print_reco_node_status()
289 struct dlm_ctxt *dlm = data; in dlm_recovery_thread() local
292 mlog(0, "dlm thread running for %s...\n", dlm->name); in dlm_recovery_thread()
295 if (dlm_domain_fully_joined(dlm)) { in dlm_recovery_thread()
296 status = dlm_do_recovery(dlm); in dlm_recovery_thread()
305 wait_event_interruptible_timeout(dlm->dlm_reco_thread_wq, in dlm_recovery_thread()
315 static int dlm_reco_master_ready(struct dlm_ctxt *dlm) in dlm_reco_master_ready() argument
318 spin_lock(&dlm->spinlock); in dlm_reco_master_ready()
319 ready = (dlm->reco.new_master != O2NM_INVALID_NODE_NUM); in dlm_reco_master_ready()
320 spin_unlock(&dlm->spinlock); in dlm_reco_master_ready()
326 int dlm_is_node_dead(struct dlm_ctxt *dlm, u8 node) in dlm_is_node_dead() argument
329 spin_lock(&dlm->spinlock); in dlm_is_node_dead()
330 dead = !test_bit(node, dlm->domain_map); in dlm_is_node_dead()
331 spin_unlock(&dlm->spinlock); in dlm_is_node_dead()
337 static int dlm_is_node_recovered(struct dlm_ctxt *dlm, u8 node) in dlm_is_node_recovered() argument
340 spin_lock(&dlm->spinlock); in dlm_is_node_recovered()
341 recovered = !test_bit(node, dlm->recovery_map); in dlm_is_node_recovered()
342 spin_unlock(&dlm->spinlock); in dlm_is_node_recovered()
347 void dlm_wait_for_node_death(struct dlm_ctxt *dlm, u8 node, int timeout) in dlm_wait_for_node_death() argument
349 if (dlm_is_node_dead(dlm, node)) in dlm_wait_for_node_death()
353 "domain %s\n", node, dlm->name); in dlm_wait_for_node_death()
356 wait_event_timeout(dlm->dlm_reco_thread_wq, in dlm_wait_for_node_death()
357 dlm_is_node_dead(dlm, node), in dlm_wait_for_node_death()
360 wait_event(dlm->dlm_reco_thread_wq, in dlm_wait_for_node_death()
361 dlm_is_node_dead(dlm, node)); in dlm_wait_for_node_death()
364 void dlm_wait_for_node_recovery(struct dlm_ctxt *dlm, u8 node, int timeout) in dlm_wait_for_node_recovery() argument
366 if (dlm_is_node_recovered(dlm, node)) in dlm_wait_for_node_recovery()
370 "domain %s\n", node, dlm->name); in dlm_wait_for_node_recovery()
373 wait_event_timeout(dlm->dlm_reco_thread_wq, in dlm_wait_for_node_recovery()
374 dlm_is_node_recovered(dlm, node), in dlm_wait_for_node_recovery()
377 wait_event(dlm->dlm_reco_thread_wq, in dlm_wait_for_node_recovery()
378 dlm_is_node_recovered(dlm, node)); in dlm_wait_for_node_recovery()
387 static int dlm_in_recovery(struct dlm_ctxt *dlm) in dlm_in_recovery() argument
390 spin_lock(&dlm->spinlock); in dlm_in_recovery()
391 in_recovery = !!(dlm->reco.state & DLM_RECO_STATE_ACTIVE); in dlm_in_recovery()
392 spin_unlock(&dlm->spinlock); in dlm_in_recovery()
397 void dlm_wait_for_recovery(struct dlm_ctxt *dlm) in dlm_wait_for_recovery() argument
399 if (dlm_in_recovery(dlm)) { in dlm_wait_for_recovery()
402 dlm->name, task_pid_nr(dlm->dlm_reco_thread_task), in dlm_wait_for_recovery()
403 dlm->reco.state, dlm->reco.new_master, in dlm_wait_for_recovery()
404 dlm->reco.dead_node); in dlm_wait_for_recovery()
406 wait_event(dlm->reco.event, !dlm_in_recovery(dlm)); in dlm_wait_for_recovery()
409 static void dlm_begin_recovery(struct dlm_ctxt *dlm) in dlm_begin_recovery() argument
411 assert_spin_locked(&dlm->spinlock); in dlm_begin_recovery()
412 BUG_ON(dlm->reco.state & DLM_RECO_STATE_ACTIVE); in dlm_begin_recovery()
414 dlm->name, dlm->reco.dead_node); in dlm_begin_recovery()
415 dlm->reco.state |= DLM_RECO_STATE_ACTIVE; in dlm_begin_recovery()
418 static void dlm_end_recovery(struct dlm_ctxt *dlm) in dlm_end_recovery() argument
420 spin_lock(&dlm->spinlock); in dlm_end_recovery()
421 BUG_ON(!(dlm->reco.state & DLM_RECO_STATE_ACTIVE)); in dlm_end_recovery()
422 dlm->reco.state &= ~DLM_RECO_STATE_ACTIVE; in dlm_end_recovery()
423 spin_unlock(&dlm->spinlock); in dlm_end_recovery()
424 printk(KERN_NOTICE "o2dlm: End recovery on domain %s\n", dlm->name); in dlm_end_recovery()
425 wake_up(&dlm->reco.event); in dlm_end_recovery()
428 static void dlm_print_recovery_master(struct dlm_ctxt *dlm) in dlm_print_recovery_master() argument
431 "dead node %u in domain %s\n", dlm->reco.new_master, in dlm_print_recovery_master()
432 (dlm->node_num == dlm->reco.new_master ? "me" : "he"), in dlm_print_recovery_master()
433 dlm->reco.dead_node, dlm->name); in dlm_print_recovery_master()
436 static int dlm_do_recovery(struct dlm_ctxt *dlm) in dlm_do_recovery() argument
441 spin_lock(&dlm->spinlock); in dlm_do_recovery()
443 if (dlm->migrate_done) { in dlm_do_recovery()
445 "lock resources\n", dlm->name); in dlm_do_recovery()
446 spin_unlock(&dlm->spinlock); in dlm_do_recovery()
451 if (dlm->reco.new_master != O2NM_INVALID_NODE_NUM && in dlm_do_recovery()
452 test_bit(dlm->reco.new_master, dlm->recovery_map)) { in dlm_do_recovery()
454 dlm->reco.new_master, dlm->reco.dead_node); in dlm_do_recovery()
456 dlm_set_reco_master(dlm, O2NM_INVALID_NODE_NUM); in dlm_do_recovery()
460 if (dlm->reco.dead_node == O2NM_INVALID_NODE_NUM) { in dlm_do_recovery()
463 bit = find_next_bit (dlm->recovery_map, O2NM_MAX_NODES, 0); in dlm_do_recovery()
465 dlm_set_reco_dead_node(dlm, O2NM_INVALID_NODE_NUM); in dlm_do_recovery()
467 dlm_set_reco_dead_node(dlm, bit); in dlm_do_recovery()
468 } else if (!test_bit(dlm->reco.dead_node, dlm->recovery_map)) { in dlm_do_recovery()
471 dlm->reco.dead_node); in dlm_do_recovery()
472 dlm_set_reco_dead_node(dlm, O2NM_INVALID_NODE_NUM); in dlm_do_recovery()
475 if (dlm->reco.dead_node == O2NM_INVALID_NODE_NUM) { in dlm_do_recovery()
477 spin_unlock(&dlm->spinlock); in dlm_do_recovery()
482 dlm->name, task_pid_nr(dlm->dlm_reco_thread_task), in dlm_do_recovery()
483 dlm->reco.dead_node); in dlm_do_recovery()
487 dlm_begin_recovery(dlm); in dlm_do_recovery()
489 spin_unlock(&dlm->spinlock); in dlm_do_recovery()
491 if (dlm->reco.new_master == dlm->node_num) in dlm_do_recovery()
494 if (dlm->reco.new_master == O2NM_INVALID_NODE_NUM) { in dlm_do_recovery()
499 ret = dlm_pick_recovery_master(dlm); in dlm_do_recovery()
507 dlm_print_recovery_master(dlm); in dlm_do_recovery()
512 dlm_end_recovery(dlm); in dlm_do_recovery()
518 dlm_print_recovery_master(dlm); in dlm_do_recovery()
520 status = dlm_remaster_locks(dlm, dlm->reco.dead_node); in dlm_do_recovery()
524 "retrying.\n", dlm->name, status, dlm->reco.dead_node); in dlm_do_recovery()
531 dlm->name, dlm->reco.dead_node, dlm->node_num); in dlm_do_recovery()
532 spin_lock(&dlm->spinlock); in dlm_do_recovery()
533 __dlm_reset_recovery(dlm); in dlm_do_recovery()
534 dlm->reco.state &= ~DLM_RECO_STATE_FINALIZE; in dlm_do_recovery()
535 spin_unlock(&dlm->spinlock); in dlm_do_recovery()
537 dlm_end_recovery(dlm); in dlm_do_recovery()
543 static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node) in dlm_remaster_locks() argument
554 status = dlm_init_recovery_area(dlm, dead_node); in dlm_remaster_locks()
557 "retrying\n", dlm->name); in dlm_remaster_locks()
564 list_for_each_entry(ndata, &dlm->reco.node_data, list) { in dlm_remaster_locks()
568 mlog(0, "%s: Requesting lock info from node %u\n", dlm->name, in dlm_remaster_locks()
571 if (ndata->node_num == dlm->node_num) { in dlm_remaster_locks()
577 status = dlm_request_all_locks(dlm, ndata->node_num, in dlm_remaster_locks()
587 wait_event_timeout(dlm->dlm_reco_thread_wq, in dlm_remaster_locks()
588 dlm_is_node_dead(dlm, in dlm_remaster_locks()
593 dlm_is_node_dead(dlm, ndata->node_num) ? in dlm_remaster_locks()
600 dlm->name, ndata->node_num, in dlm_remaster_locks()
641 mlog(0, "%s: Done requesting all lock info\n", dlm->name); in dlm_remaster_locks()
651 list_for_each_entry(ndata, &dlm->reco.node_data, list) { in dlm_remaster_locks()
671 dlm->name, ndata->node_num, in dlm_remaster_locks()
678 dlm->name, ndata->node_num); in dlm_remaster_locks()
682 dlm->name, ndata->node_num); in dlm_remaster_locks()
697 spin_lock(&dlm->spinlock); in dlm_remaster_locks()
698 dlm->reco.state |= DLM_RECO_STATE_FINALIZE; in dlm_remaster_locks()
699 spin_unlock(&dlm->spinlock); in dlm_remaster_locks()
705 ret = dlm_send_finalize_reco_message(dlm); in dlm_remaster_locks()
709 spin_lock(&dlm->spinlock); in dlm_remaster_locks()
710 dlm_finish_local_lockres_recovery(dlm, dead_node, in dlm_remaster_locks()
711 dlm->node_num); in dlm_remaster_locks()
712 spin_unlock(&dlm->spinlock); in dlm_remaster_locks()
716 "dead=%u, this=%u, new=%u\n", dlm->name, in dlm_remaster_locks()
717 jiffies, dlm->reco.dead_node, in dlm_remaster_locks()
718 dlm->node_num, dlm->reco.new_master); in dlm_remaster_locks()
722 dlm_kick_thread(dlm, NULL); in dlm_remaster_locks()
727 wait_event_interruptible_timeout(dlm->dlm_reco_thread_wq, in dlm_remaster_locks()
734 dlm_destroy_recovery_area(dlm); in dlm_remaster_locks()
739 static int dlm_init_recovery_area(struct dlm_ctxt *dlm, u8 dead_node) in dlm_init_recovery_area() argument
744 spin_lock(&dlm->spinlock); in dlm_init_recovery_area()
745 memcpy(dlm->reco.node_map, dlm->domain_map, sizeof(dlm->domain_map)); in dlm_init_recovery_area()
748 spin_unlock(&dlm->spinlock); in dlm_init_recovery_area()
751 num = find_next_bit (dlm->reco.node_map, O2NM_MAX_NODES, num); in dlm_init_recovery_area()
759 dlm_destroy_recovery_area(dlm); in dlm_init_recovery_area()
765 list_add_tail(&ndata->list, &dlm->reco.node_data); in dlm_init_recovery_area()
773 static void dlm_destroy_recovery_area(struct dlm_ctxt *dlm) in dlm_destroy_recovery_area() argument
779 list_splice_init(&dlm->reco.node_data, &tmplist); in dlm_destroy_recovery_area()
788 static int dlm_request_all_locks(struct dlm_ctxt *dlm, u8 request_from, in dlm_request_all_locks() argument
802 lr.node_idx = dlm->node_num; in dlm_request_all_locks()
806 ret = o2net_send_message(DLM_LOCK_REQUEST_MSG, dlm->key, in dlm_request_all_locks()
812 "to recover dead node %u\n", dlm->name, ret, in dlm_request_all_locks()
825 struct dlm_ctxt *dlm = data; in dlm_request_all_locks_handler() local
830 if (!dlm_grab(dlm)) in dlm_request_all_locks_handler()
833 if (lr->dead_node != dlm->reco.dead_node) { in dlm_request_all_locks_handler()
835 "dead_node is %u\n", dlm->name, lr->node_idx, in dlm_request_all_locks_handler()
836 lr->dead_node, dlm->reco.dead_node); in dlm_request_all_locks_handler()
837 dlm_print_reco_node_status(dlm); in dlm_request_all_locks_handler()
839 dlm_put(dlm); in dlm_request_all_locks_handler()
842 BUG_ON(lr->dead_node != dlm->reco.dead_node); in dlm_request_all_locks_handler()
846 dlm_put(dlm); in dlm_request_all_locks_handler()
854 dlm_put(dlm); in dlm_request_all_locks_handler()
859 dlm_grab(dlm); /* get an extra ref for the work item */ in dlm_request_all_locks_handler()
860 dlm_init_work_item(dlm, item, dlm_request_all_locks_worker, buf); in dlm_request_all_locks_handler()
863 spin_lock(&dlm->work_lock); in dlm_request_all_locks_handler()
864 list_add_tail(&item->list, &dlm->work_list); in dlm_request_all_locks_handler()
865 spin_unlock(&dlm->work_lock); in dlm_request_all_locks_handler()
866 queue_work(dlm->dlm_worker, &dlm->dispatched_work); in dlm_request_all_locks_handler()
868 dlm_put(dlm); in dlm_request_all_locks_handler()
876 struct dlm_ctxt *dlm; in dlm_request_all_locks_worker() local
882 dlm = item->dlm; in dlm_request_all_locks_worker()
888 dlm->name, dead_node, reco_master); in dlm_request_all_locks_worker()
890 if (dead_node != dlm->reco.dead_node || in dlm_request_all_locks_worker()
891 reco_master != dlm->reco.new_master) { in dlm_request_all_locks_worker()
894 if (dlm->reco.new_master == O2NM_INVALID_NODE_NUM) { in dlm_request_all_locks_worker()
897 " current=(dead=%u,mas=%u)\n", dlm->name, in dlm_request_all_locks_worker()
899 dlm->reco.dead_node, dlm->reco.new_master); in dlm_request_all_locks_worker()
903 dlm->name, dlm->reco.dead_node, in dlm_request_all_locks_worker()
904 dlm->reco.new_master, dead_node, reco_master); in dlm_request_all_locks_worker()
915 dlm_move_reco_locks_to_list(dlm, &resources, dead_node); in dlm_request_all_locks_worker()
922 ret = dlm_send_one_lockres(dlm, res, mres, reco_master, in dlm_request_all_locks_worker()
926 "recovery state for dead node %u, ret=%d\n", dlm->name, in dlm_request_all_locks_worker()
934 spin_lock(&dlm->spinlock); in dlm_request_all_locks_worker()
935 list_splice_init(&resources, &dlm->reco.resources); in dlm_request_all_locks_worker()
936 spin_unlock(&dlm->spinlock); in dlm_request_all_locks_worker()
939 ret = dlm_send_all_done_msg(dlm, dead_node, reco_master); in dlm_request_all_locks_worker()
943 dlm->name, reco_master, dead_node, ret); in dlm_request_all_locks_worker()
951 static int dlm_send_all_done_msg(struct dlm_ctxt *dlm, u8 dead_node, u8 send_to) in dlm_send_all_done_msg() argument
957 done_msg.node_idx = dlm->node_num; in dlm_send_all_done_msg()
963 ret = o2net_send_message(DLM_RECO_DATA_DONE_MSG, dlm->key, &done_msg, in dlm_send_all_done_msg()
967 "to recover dead node %u\n", dlm->name, ret, send_to, in dlm_send_all_done_msg()
981 struct dlm_ctxt *dlm = data; in dlm_reco_data_done_handler() local
986 if (!dlm_grab(dlm)) in dlm_reco_data_done_handler()
991 dlm->reco.dead_node, done->node_idx, dlm->node_num); in dlm_reco_data_done_handler()
993 mlog_bug_on_msg((done->dead_node != dlm->reco.dead_node), in dlm_reco_data_done_handler()
996 dlm->reco.dead_node, done->node_idx, dlm->node_num); in dlm_reco_data_done_handler()
999 list_for_each_entry(ndata, &dlm->reco.node_data, list) { in dlm_reco_data_done_handler()
1032 dlm_kick_recovery_thread(dlm); in dlm_reco_data_done_handler()
1037 dlm_put(dlm); in dlm_reco_data_done_handler()
1043 static void dlm_move_reco_locks_to_list(struct dlm_ctxt *dlm, in dlm_move_reco_locks_to_list() argument
1050 spin_lock(&dlm->spinlock); in dlm_move_reco_locks_to_list()
1051 list_for_each_entry_safe(res, next, &dlm->reco.resources, recovering) { in dlm_move_reco_locks_to_list()
1062 dead_node, dlm->name); in dlm_move_reco_locks_to_list()
1086 spin_unlock(&dlm->spinlock); in dlm_move_reco_locks_to_list()
1104 static int dlm_send_mig_lockres_msg(struct dlm_ctxt *dlm, in dlm_send_mig_lockres_msg() argument
1127 dlm->name, res->lockname.len, res->lockname.name, in dlm_send_mig_lockres_msg()
1132 ret = o2net_send_message(DLM_MIG_LOCKRES_MSG, dlm->key, mres, in dlm_send_mig_lockres_msg()
1139 "node %u (%s)\n", dlm->name, mres->lockname_len, in dlm_send_mig_lockres_msg()
1240 static void dlm_add_dummy_lock(struct dlm_ctxt *dlm, in dlm_add_dummy_lock() argument
1250 dummy.ml.node = dlm->node_num; in dlm_add_dummy_lock()
1254 static inline int dlm_is_dummy_lock(struct dlm_ctxt *dlm, in dlm_is_dummy_lock() argument
1269 int dlm_send_one_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, in dlm_send_one_lockres() argument
1307 ret = dlm_send_mig_lockres_msg(dlm, mres, send_to, in dlm_send_one_lockres()
1316 dlm->name, res->lockname.len, res->lockname.name, in dlm_send_one_lockres()
1319 dlm_add_dummy_lock(dlm, mres); in dlm_send_one_lockres()
1322 ret = dlm_send_mig_lockres_msg(dlm, mres, send_to, res, total_locks); in dlm_send_one_lockres()
1329 dlm->name, ret); in dlm_send_one_lockres()
1333 "lockres %.*s\n", dlm->name, send_to, in dlm_send_one_lockres()
1357 struct dlm_ctxt *dlm = data; in dlm_mig_lockres_handler() local
1368 if (!dlm_grab(dlm)) in dlm_mig_lockres_handler()
1371 if (!dlm_joined(dlm)) { in dlm_mig_lockres_handler()
1374 dlm->name, mres->lockname_len, in dlm_mig_lockres_handler()
1376 dlm_put(dlm); in dlm_mig_lockres_handler()
1404 spin_lock(&dlm->spinlock); in dlm_mig_lockres_handler()
1405 res = __dlm_lookup_lockres_full(dlm, mres->lockname, mres->lockname_len, in dlm_mig_lockres_handler()
1414 " ref!\n", dlm->name, in dlm_mig_lockres_handler()
1418 spin_unlock(&dlm->spinlock); in dlm_mig_lockres_handler()
1439 spin_unlock(&dlm->spinlock); in dlm_mig_lockres_handler()
1446 spin_unlock(&dlm->spinlock); in dlm_mig_lockres_handler()
1448 spin_unlock(&dlm->spinlock); in dlm_mig_lockres_handler()
1451 res = dlm_new_lockres(dlm, mres->lockname, mres->lockname_len); in dlm_mig_lockres_handler()
1465 spin_lock(&dlm->spinlock); in dlm_mig_lockres_handler()
1466 __dlm_insert_lockres(dlm, res); in dlm_mig_lockres_handler()
1467 spin_unlock(&dlm->spinlock); in dlm_mig_lockres_handler()
1500 dlm_lockres_grab_inflight_ref(dlm, res); in dlm_mig_lockres_handler()
1510 dlm_change_lockres_owner(dlm, res, dlm->node_num); in dlm_mig_lockres_handler()
1515 dlm_grab(dlm); /* get an extra ref for the work item */ in dlm_mig_lockres_handler()
1517 dlm_init_work_item(dlm, item, dlm_mig_lockres_worker, buf); in dlm_mig_lockres_handler()
1521 spin_lock(&dlm->work_lock); in dlm_mig_lockres_handler()
1522 list_add_tail(&item->list, &dlm->work_list); in dlm_mig_lockres_handler()
1523 spin_unlock(&dlm->work_lock); in dlm_mig_lockres_handler()
1524 queue_work(dlm->dlm_worker, &dlm->dispatched_work); in dlm_mig_lockres_handler()
1531 dlm_put(dlm); in dlm_mig_lockres_handler()
1544 struct dlm_ctxt *dlm; in dlm_mig_lockres_worker() local
1551 dlm = item->dlm; in dlm_mig_lockres_worker()
1562 ret = dlm_lockres_master_requery(dlm, res, &real_master); in dlm_mig_lockres_worker()
1574 dlm_lockres_drop_inflight_ref(dlm, res); in dlm_mig_lockres_worker()
1585 ret = dlm_process_recovery_data(dlm, res, mres); in dlm_mig_lockres_worker()
1593 ret = dlm_finish_migration(dlm, res, mres->master); in dlm_mig_lockres_worker()
1610 static int dlm_lockres_master_requery(struct dlm_ctxt *dlm, in dlm_lockres_master_requery() argument
1643 spin_lock(&dlm->spinlock); in dlm_lockres_master_requery()
1644 dlm_node_iter_init(dlm->domain_map, &iter); in dlm_lockres_master_requery()
1645 spin_unlock(&dlm->spinlock); in dlm_lockres_master_requery()
1649 if (nodenum == dlm->node_num) in dlm_lockres_master_requery()
1651 ret = dlm_do_master_requery(dlm, res, nodenum, real_master); in dlm_lockres_master_requery()
1668 int dlm_do_master_requery(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, in dlm_do_master_requery() argument
1676 req.node_idx = dlm->node_num; in dlm_do_master_requery()
1681 ret = o2net_send_message(DLM_MASTER_REQUERY_MSG, dlm->key, in dlm_do_master_requery()
1686 dlm->key, nodenum); in dlm_do_master_requery()
1709 struct dlm_ctxt *dlm = data; in dlm_master_requery_handler() local
1717 if (!dlm_grab(dlm)) { in dlm_master_requery_handler()
1725 spin_lock(&dlm->spinlock); in dlm_master_requery_handler()
1726 res = __dlm_lookup_lockres(dlm, req->name, req->namelen, hash); in dlm_master_requery_handler()
1730 if (master == dlm->node_num) { in dlm_master_requery_handler()
1731 int ret = dlm_dispatch_assert_master(dlm, res, in dlm_master_requery_handler()
1737 spin_unlock(&dlm->spinlock); in dlm_master_requery_handler()
1738 dlm_put(dlm); in dlm_master_requery_handler()
1743 __dlm_lockres_grab_inflight_worker(dlm, res); in dlm_master_requery_handler()
1752 spin_unlock(&dlm->spinlock); in dlm_master_requery_handler()
1755 dlm_put(dlm); in dlm_master_requery_handler()
1796 static int dlm_process_recovery_data(struct dlm_ctxt *dlm, in dlm_process_recovery_data() argument
1815 if (dlm_is_dummy_lock(dlm, ml, &from)) { in dlm_process_recovery_data()
1819 dlm->name, mres->lockname_len, mres->lockname, in dlm_process_recovery_data()
1822 dlm_lockres_set_refmap_bit(dlm, res, from); in dlm_process_recovery_data()
1836 if (ml->node == dlm->node_num) { in dlm_process_recovery_data()
1964 "lvb! type=%d\n", dlm->name, in dlm_process_recovery_data()
2004 "exists on this lockres!\n", dlm->name, in dlm_process_recovery_data()
2033 "setting refmap bit\n", dlm->name, in dlm_process_recovery_data()
2035 dlm_lockres_set_refmap_bit(dlm, res, ml->node); in dlm_process_recovery_data()
2044 dlm_lockres_drop_inflight_ref(dlm, res); in dlm_process_recovery_data()
2053 void dlm_move_lockres_to_recovery_list(struct dlm_ctxt *dlm, in dlm_move_lockres_to_recovery_list() argument
2060 assert_spin_locked(&dlm->spinlock); in dlm_move_lockres_to_recovery_list()
2066 dlm->name, res->lockname.len, res->lockname.name); in dlm_move_lockres_to_recovery_list()
2072 list_add_tail(&res->recovering, &dlm->reco.resources); in dlm_move_lockres_to_recovery_list()
2137 static void dlm_finish_local_lockres_recovery(struct dlm_ctxt *dlm, in dlm_finish_local_lockres_recovery() argument
2144 assert_spin_locked(&dlm->spinlock); in dlm_finish_local_lockres_recovery()
2146 list_for_each_entry_safe(res, next, &dlm->reco.resources, recovering) { in dlm_finish_local_lockres_recovery()
2149 dlm->name, res->lockname.len, res->lockname.name, in dlm_finish_local_lockres_recovery()
2155 dlm_change_lockres_owner(dlm, res, new_master); in dlm_finish_local_lockres_recovery()
2158 __dlm_dirty_lockres(dlm, res); in dlm_finish_local_lockres_recovery()
2170 bucket = dlm_lockres_hash(dlm, i); in dlm_finish_local_lockres_recovery()
2183 res->owner != dlm->node_num) in dlm_finish_local_lockres_recovery()
2194 dlm->name, res->lockname.len, res->lockname.name, in dlm_finish_local_lockres_recovery()
2197 dlm_change_lockres_owner(dlm, res, new_master); in dlm_finish_local_lockres_recovery()
2200 __dlm_dirty_lockres(dlm, res); in dlm_finish_local_lockres_recovery()
2218 static void dlm_revalidate_lvb(struct dlm_ctxt *dlm, in dlm_revalidate_lvb() argument
2227 assert_spin_locked(&dlm->spinlock); in dlm_revalidate_lvb()
2230 if (res->owner == dlm->node_num) in dlm_revalidate_lvb()
2237 search_node = dlm->node_num; in dlm_revalidate_lvb()
2261 static void dlm_free_dead_locks(struct dlm_ctxt *dlm, in dlm_free_dead_locks() argument
2271 assert_spin_locked(&dlm->spinlock); in dlm_free_dead_locks()
2308 "dropping ref from lockres\n", dlm->name, in dlm_free_dead_locks()
2312 "but ref was not set\n", dlm->name, in dlm_free_dead_locks()
2317 dlm_lockres_clear_refmap_bit(dlm, res, dead_node); in dlm_free_dead_locks()
2320 "no locks and had not purged before dying\n", dlm->name, in dlm_free_dead_locks()
2322 dlm_lockres_clear_refmap_bit(dlm, res, dead_node); in dlm_free_dead_locks()
2326 __dlm_dirty_lockres(dlm, res); in dlm_free_dead_locks()
2329 static void dlm_do_local_recovery_cleanup(struct dlm_ctxt *dlm, u8 dead_node) in dlm_do_local_recovery_cleanup() argument
2339 dlm_clean_master_list(dlm, dead_node); in dlm_do_local_recovery_cleanup()
2356 bucket = dlm_lockres_hash(dlm, i); in dlm_do_local_recovery_cleanup()
2368 dead_node, dlm->name); in dlm_do_local_recovery_cleanup()
2382 __dlm_do_purge_lockres(dlm, res); in dlm_do_local_recovery_cleanup()
2387 } else if (res->owner == dlm->node_num) in dlm_do_local_recovery_cleanup()
2388 dlm_lockres_clear_refmap_bit(dlm, res, dead_node); in dlm_do_local_recovery_cleanup()
2394 dlm_revalidate_lvb(dlm, res, dead_node); in dlm_do_local_recovery_cleanup()
2401 dlm->name, res->lockname.len, in dlm_do_local_recovery_cleanup()
2404 __dlm_do_purge_lockres(dlm, res); in dlm_do_local_recovery_cleanup()
2410 dlm_move_lockres_to_recovery_list(dlm, res); in dlm_do_local_recovery_cleanup()
2411 } else if (res->owner == dlm->node_num) { in dlm_do_local_recovery_cleanup()
2412 dlm_free_dead_locks(dlm, res, dead_node); in dlm_do_local_recovery_cleanup()
2413 __dlm_lockres_calc_usage(dlm, res); in dlm_do_local_recovery_cleanup()
2418 dlm->name, res->lockname.len, in dlm_do_local_recovery_cleanup()
2420 dlm_lockres_clear_refmap_bit(dlm, res, dead_node); in dlm_do_local_recovery_cleanup()
2429 static void __dlm_hb_node_down(struct dlm_ctxt *dlm, int idx) in __dlm_hb_node_down() argument
2431 assert_spin_locked(&dlm->spinlock); in __dlm_hb_node_down()
2433 if (dlm->reco.new_master == idx) { in __dlm_hb_node_down()
2435 dlm->name, idx); in __dlm_hb_node_down()
2436 if (dlm->reco.state & DLM_RECO_STATE_FINALIZE) { in __dlm_hb_node_down()
2441 "finalize1 state, clearing\n", dlm->name, idx); in __dlm_hb_node_down()
2442 dlm->reco.state &= ~DLM_RECO_STATE_FINALIZE; in __dlm_hb_node_down()
2443 __dlm_reset_recovery(dlm); in __dlm_hb_node_down()
2448 if (dlm->joining_node == idx) { in __dlm_hb_node_down()
2450 __dlm_set_joining_node(dlm, DLM_LOCK_RES_OWNER_UNKNOWN); in __dlm_hb_node_down()
2454 if (!test_bit(idx, dlm->live_nodes_map)) { in __dlm_hb_node_down()
2457 dlm->name, idx); in __dlm_hb_node_down()
2462 if (!test_bit(idx, dlm->domain_map)) { in __dlm_hb_node_down()
2469 clear_bit(idx, dlm->live_nodes_map); in __dlm_hb_node_down()
2472 if (!test_bit(idx, dlm->recovery_map)) in __dlm_hb_node_down()
2473 dlm_do_local_recovery_cleanup(dlm, idx); in __dlm_hb_node_down()
2476 dlm_hb_event_notify_attached(dlm, idx, 0); in __dlm_hb_node_down()
2479 clear_bit(idx, dlm->domain_map); in __dlm_hb_node_down()
2480 clear_bit(idx, dlm->exit_domain_map); in __dlm_hb_node_down()
2483 wake_up(&dlm->migration_wq); in __dlm_hb_node_down()
2485 set_bit(idx, dlm->recovery_map); in __dlm_hb_node_down()
2490 struct dlm_ctxt *dlm = data; in dlm_hb_node_down_cb() local
2492 if (!dlm_grab(dlm)) in dlm_hb_node_down_cb()
2499 if (test_bit(idx, dlm->domain_map)) in dlm_hb_node_down_cb()
2500 dlm_fire_domain_eviction_callbacks(dlm, idx); in dlm_hb_node_down_cb()
2502 spin_lock(&dlm->spinlock); in dlm_hb_node_down_cb()
2503 __dlm_hb_node_down(dlm, idx); in dlm_hb_node_down_cb()
2504 spin_unlock(&dlm->spinlock); in dlm_hb_node_down_cb()
2506 dlm_put(dlm); in dlm_hb_node_down_cb()
2511 struct dlm_ctxt *dlm = data; in dlm_hb_node_up_cb() local
2513 if (!dlm_grab(dlm)) in dlm_hb_node_up_cb()
2516 spin_lock(&dlm->spinlock); in dlm_hb_node_up_cb()
2517 set_bit(idx, dlm->live_nodes_map); in dlm_hb_node_up_cb()
2520 spin_unlock(&dlm->spinlock); in dlm_hb_node_up_cb()
2522 dlm_put(dlm); in dlm_hb_node_up_cb()
2527 struct dlm_ctxt *dlm = astdata; in dlm_reco_ast() local
2529 dlm->node_num, dlm->name); in dlm_reco_ast()
2533 struct dlm_ctxt *dlm = astdata; in dlm_reco_bast() local
2535 dlm->node_num, dlm->name); in dlm_reco_bast()
2554 static int dlm_pick_recovery_master(struct dlm_ctxt *dlm) in dlm_pick_recovery_master() argument
2561 dlm->name, jiffies, dlm->reco.dead_node, dlm->node_num); in dlm_pick_recovery_master()
2565 ret = dlmlock(dlm, LKM_EXMODE, &lksb, LKM_NOQUEUE|LKM_RECOVERY, in dlm_pick_recovery_master()
2567 dlm_reco_ast, dlm, dlm_reco_bast); in dlm_pick_recovery_master()
2570 dlm->name, ret, lksb.status); in dlm_pick_recovery_master()
2574 dlm->name, dlm->node_num); in dlm_pick_recovery_master()
2578 if (dlm_reco_master_ready(dlm)) { in dlm_pick_recovery_master()
2580 "do the recovery\n", dlm->name, in dlm_pick_recovery_master()
2581 dlm->reco.new_master); in dlm_pick_recovery_master()
2587 spin_lock(&dlm->spinlock); in dlm_pick_recovery_master()
2588 if (dlm->reco.dead_node == O2NM_INVALID_NODE_NUM) { in dlm_pick_recovery_master()
2591 "node got recovered already\n", dlm->name); in dlm_pick_recovery_master()
2592 if (dlm->reco.new_master != O2NM_INVALID_NODE_NUM) { in dlm_pick_recovery_master()
2595 dlm->name, dlm->reco.new_master); in dlm_pick_recovery_master()
2599 spin_unlock(&dlm->spinlock); in dlm_pick_recovery_master()
2606 "begin_reco now\n", dlm->name, in dlm_pick_recovery_master()
2607 dlm->reco.dead_node, dlm->node_num); in dlm_pick_recovery_master()
2608 status = dlm_send_begin_reco_message(dlm, in dlm_pick_recovery_master()
2609 dlm->reco.dead_node); in dlm_pick_recovery_master()
2614 spin_lock(&dlm->spinlock); in dlm_pick_recovery_master()
2615 dlm_set_reco_master(dlm, dlm->node_num); in dlm_pick_recovery_master()
2616 spin_unlock(&dlm->spinlock); in dlm_pick_recovery_master()
2621 ret = dlmunlock(dlm, &lksb, 0, dlm_reco_unlock_ast, dlm); in dlm_pick_recovery_master()
2624 ret = dlmunlock(dlm, &lksb, LKM_CANCEL, dlm_reco_unlock_ast, dlm); in dlm_pick_recovery_master()
2637 dlm->name, dlm->node_num); in dlm_pick_recovery_master()
2641 wait_event_timeout(dlm->dlm_reco_thread_wq, in dlm_pick_recovery_master()
2642 dlm_reco_master_ready(dlm), in dlm_pick_recovery_master()
2644 if (!dlm_reco_master_ready(dlm)) { in dlm_pick_recovery_master()
2646 dlm->name); in dlm_pick_recovery_master()
2651 dlm->name, dlm->reco.new_master, dlm->reco.dead_node); in dlm_pick_recovery_master()
2655 dlm->name, dlm->node_num); in dlm_pick_recovery_master()
2662 "lksb.status=%s\n", dlm->name, dlm_errname(ret), in dlm_pick_recovery_master()
2664 res = dlm_lookup_lockres(dlm, DLM_RECOVERY_LOCK_NAME, in dlm_pick_recovery_master()
2678 static int dlm_send_begin_reco_message(struct dlm_ctxt *dlm, u8 dead_node) in dlm_send_begin_reco_message() argument
2686 mlog(0, "%s: dead node is %u\n", dlm->name, dead_node); in dlm_send_begin_reco_message()
2688 spin_lock(&dlm->spinlock); in dlm_send_begin_reco_message()
2689 dlm_node_iter_init(dlm->domain_map, &iter); in dlm_send_begin_reco_message()
2690 spin_unlock(&dlm->spinlock); in dlm_send_begin_reco_message()
2695 br.node_idx = dlm->node_num; in dlm_send_begin_reco_message()
2705 if (nodenum == dlm->node_num) { in dlm_send_begin_reco_message()
2713 ret = o2net_send_message(DLM_BEGIN_RECO_MSG, dlm->key, in dlm_send_begin_reco_message()
2722 "begin reco msg (%d)\n", dlm->name, nodenum, ret); in dlm_send_begin_reco_message()
2734 "to complete, backoff for a bit\n", dlm->name, in dlm_send_begin_reco_message()
2746 "returned %d\n", dlm->name, nodenum, ret); in dlm_send_begin_reco_message()
2747 res = dlm_lookup_lockres(dlm, DLM_RECOVERY_LOCK_NAME, in dlm_send_begin_reco_message()
2768 struct dlm_ctxt *dlm = data; in dlm_begin_reco_handler() local
2772 if (!dlm_grab(dlm)) in dlm_begin_reco_handler()
2775 spin_lock(&dlm->spinlock); in dlm_begin_reco_handler()
2776 if (dlm->reco.state & DLM_RECO_STATE_FINALIZE) { in dlm_begin_reco_handler()
2779 dlm->name, br->node_idx, br->dead_node, in dlm_begin_reco_handler()
2780 dlm->reco.dead_node, dlm->reco.new_master); in dlm_begin_reco_handler()
2781 spin_unlock(&dlm->spinlock); in dlm_begin_reco_handler()
2782 dlm_put(dlm); in dlm_begin_reco_handler()
2785 spin_unlock(&dlm->spinlock); in dlm_begin_reco_handler()
2788 dlm->name, br->node_idx, br->dead_node, in dlm_begin_reco_handler()
2789 dlm->reco.dead_node, dlm->reco.new_master); in dlm_begin_reco_handler()
2791 dlm_fire_domain_eviction_callbacks(dlm, br->dead_node); in dlm_begin_reco_handler()
2793 spin_lock(&dlm->spinlock); in dlm_begin_reco_handler()
2794 if (dlm->reco.new_master != O2NM_INVALID_NODE_NUM) { in dlm_begin_reco_handler()
2795 if (test_bit(dlm->reco.new_master, dlm->recovery_map)) { in dlm_begin_reco_handler()
2797 "to %u\n", dlm->name, dlm->reco.new_master, in dlm_begin_reco_handler()
2801 "to %u\n", dlm->name, dlm->reco.new_master, in dlm_begin_reco_handler()
2806 if (dlm->reco.dead_node != O2NM_INVALID_NODE_NUM) { in dlm_begin_reco_handler()
2808 "node %u changing it to %u\n", dlm->name, in dlm_begin_reco_handler()
2809 dlm->reco.dead_node, br->node_idx, br->dead_node); in dlm_begin_reco_handler()
2811 dlm_set_reco_master(dlm, br->node_idx); in dlm_begin_reco_handler()
2812 dlm_set_reco_dead_node(dlm, br->dead_node); in dlm_begin_reco_handler()
2813 if (!test_bit(br->dead_node, dlm->recovery_map)) { in dlm_begin_reco_handler()
2817 if (!test_bit(br->dead_node, dlm->domain_map) || in dlm_begin_reco_handler()
2818 !test_bit(br->dead_node, dlm->live_nodes_map)) in dlm_begin_reco_handler()
2824 set_bit(br->dead_node, dlm->domain_map); in dlm_begin_reco_handler()
2825 set_bit(br->dead_node, dlm->live_nodes_map); in dlm_begin_reco_handler()
2826 __dlm_hb_node_down(dlm, br->dead_node); in dlm_begin_reco_handler()
2828 spin_unlock(&dlm->spinlock); in dlm_begin_reco_handler()
2830 dlm_kick_recovery_thread(dlm); in dlm_begin_reco_handler()
2833 dlm->name, br->node_idx, br->dead_node, in dlm_begin_reco_handler()
2834 dlm->reco.dead_node, dlm->reco.new_master); in dlm_begin_reco_handler()
2836 dlm_put(dlm); in dlm_begin_reco_handler()
2841 static int dlm_send_finalize_reco_message(struct dlm_ctxt *dlm) in dlm_send_finalize_reco_message() argument
2851 "stage %d\n", dlm->name, dlm->reco.dead_node, stage); in dlm_send_finalize_reco_message()
2853 spin_lock(&dlm->spinlock); in dlm_send_finalize_reco_message()
2854 dlm_node_iter_init(dlm->domain_map, &iter); in dlm_send_finalize_reco_message()
2855 spin_unlock(&dlm->spinlock); in dlm_send_finalize_reco_message()
2859 fr.node_idx = dlm->node_num; in dlm_send_finalize_reco_message()
2860 fr.dead_node = dlm->reco.dead_node; in dlm_send_finalize_reco_message()
2865 if (nodenum == dlm->node_num) in dlm_send_finalize_reco_message()
2867 ret = o2net_send_message(DLM_FINALIZE_RECO_MSG, dlm->key, in dlm_send_finalize_reco_message()
2874 dlm->key, nodenum); in dlm_send_finalize_reco_message()
2900 struct dlm_ctxt *dlm = data; in dlm_finalize_reco_handler() local
2905 if (!dlm_grab(dlm)) in dlm_finalize_reco_handler()
2912 "node %u (%u:%u)\n", dlm->name, fr->node_idx, stage, in dlm_finalize_reco_handler()
2913 fr->dead_node, dlm->reco.dead_node, dlm->reco.new_master); in dlm_finalize_reco_handler()
2915 spin_lock(&dlm->spinlock); in dlm_finalize_reco_handler()
2917 if (dlm->reco.new_master != fr->node_idx) { in dlm_finalize_reco_handler()
2920 fr->node_idx, dlm->reco.new_master, fr->dead_node); in dlm_finalize_reco_handler()
2923 if (dlm->reco.dead_node != fr->dead_node) { in dlm_finalize_reco_handler()
2926 fr->node_idx, fr->dead_node, dlm->reco.dead_node); in dlm_finalize_reco_handler()
2932 dlm_finish_local_lockres_recovery(dlm, fr->dead_node, fr->node_idx); in dlm_finalize_reco_handler()
2933 if (dlm->reco.state & DLM_RECO_STATE_FINALIZE) { in dlm_finalize_reco_handler()
2937 dlm->name, fr->node_idx, fr->dead_node); in dlm_finalize_reco_handler()
2938 dlm_print_reco_node_status(dlm); in dlm_finalize_reco_handler()
2941 dlm->reco.state |= DLM_RECO_STATE_FINALIZE; in dlm_finalize_reco_handler()
2942 spin_unlock(&dlm->spinlock); in dlm_finalize_reco_handler()
2945 if (!(dlm->reco.state & DLM_RECO_STATE_FINALIZE)) { in dlm_finalize_reco_handler()
2949 dlm->name, fr->node_idx, fr->dead_node); in dlm_finalize_reco_handler()
2950 dlm_print_reco_node_status(dlm); in dlm_finalize_reco_handler()
2953 dlm->reco.state &= ~DLM_RECO_STATE_FINALIZE; in dlm_finalize_reco_handler()
2954 __dlm_reset_recovery(dlm); in dlm_finalize_reco_handler()
2955 spin_unlock(&dlm->spinlock); in dlm_finalize_reco_handler()
2956 dlm_kick_recovery_thread(dlm); in dlm_finalize_reco_handler()
2961 dlm->name, fr->node_idx, dlm->reco.dead_node, dlm->reco.new_master); in dlm_finalize_reco_handler()
2963 dlm_put(dlm); in dlm_finalize_reco_handler()