1 // SPDX-License-Identifier: GPL-2.0-only
2 /******************************************************************************
3 *******************************************************************************
4 **
5 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
6 **  Copyright (C) 2004-2005 Red Hat, Inc.  All rights reserved.
7 **
8 **
9 *******************************************************************************
10 ******************************************************************************/
11 
12 #include "dlm_internal.h"
13 #include "lockspace.h"
14 #include "member.h"
15 #include "lowcomms.h"
16 #include "rcom.h"
17 #include "config.h"
18 #include "memory.h"
19 #include "recover.h"
20 #include "util.h"
21 #include "lock.h"
22 #include "dir.h"
23 
24 /*
25  * We use the upper 16 bits of the hash value to select the directory node.
26  * Low bits are used for distribution of rsb's among hash buckets on each node.
27  *
28  * To give the exact range wanted (0 to num_nodes-1), we apply a modulus of
29  * num_nodes to the hash value.  This value in the desired range is used as an
30  * offset into the sorted list of nodeid's to give the particular nodeid.
31  */
32 
dlm_hash2nodeid(struct dlm_ls * ls,uint32_t hash)33 int dlm_hash2nodeid(struct dlm_ls *ls, uint32_t hash)
34 {
35 	uint32_t node;
36 
37 	if (ls->ls_num_nodes == 1)
38 		return dlm_our_nodeid();
39 	else {
40 		node = (hash >> 16) % ls->ls_total_weight;
41 		return ls->ls_node_array[node];
42 	}
43 }
44 
dlm_dir_nodeid(struct dlm_rsb * r)45 int dlm_dir_nodeid(struct dlm_rsb *r)
46 {
47 	return r->res_dir_nodeid;
48 }
49 
dlm_recover_dir_nodeid(struct dlm_ls * ls)50 void dlm_recover_dir_nodeid(struct dlm_ls *ls)
51 {
52 	struct dlm_rsb *r;
53 
54 	down_read(&ls->ls_root_sem);
55 	list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
56 		r->res_dir_nodeid = dlm_hash2nodeid(ls, r->res_hash);
57 	}
58 	up_read(&ls->ls_root_sem);
59 }
60 
dlm_recover_directory(struct dlm_ls * ls,uint64_t seq)61 int dlm_recover_directory(struct dlm_ls *ls, uint64_t seq)
62 {
63 	struct dlm_member *memb;
64 	char *b, *last_name = NULL;
65 	int error = -ENOMEM, last_len, nodeid, result;
66 	uint16_t namelen;
67 	unsigned int count = 0, count_match = 0, count_bad = 0, count_add = 0;
68 
69 	log_rinfo(ls, "dlm_recover_directory");
70 
71 	if (dlm_no_directory(ls))
72 		goto out_status;
73 
74 	last_name = kmalloc(DLM_RESNAME_MAXLEN, GFP_NOFS);
75 	if (!last_name)
76 		goto out;
77 
78 	list_for_each_entry(memb, &ls->ls_nodes, list) {
79 		if (memb->nodeid == dlm_our_nodeid())
80 			continue;
81 
82 		memset(last_name, 0, DLM_RESNAME_MAXLEN);
83 		last_len = 0;
84 
85 		for (;;) {
86 			int left;
87 			if (dlm_recovery_stopped(ls)) {
88 				error = -EINTR;
89 				goto out_free;
90 			}
91 
92 			error = dlm_rcom_names(ls, memb->nodeid,
93 					       last_name, last_len, seq);
94 			if (error)
95 				goto out_free;
96 
97 			cond_resched();
98 
99 			/*
100 			 * pick namelen/name pairs out of received buffer
101 			 */
102 
103 			b = ls->ls_recover_buf->rc_buf;
104 			left = le16_to_cpu(ls->ls_recover_buf->rc_header.h_length);
105 			left -= sizeof(struct dlm_rcom);
106 
107 			for (;;) {
108 				__be16 v;
109 
110 				error = -EINVAL;
111 				if (left < sizeof(__be16))
112 					goto out_free;
113 
114 				memcpy(&v, b, sizeof(__be16));
115 				namelen = be16_to_cpu(v);
116 				b += sizeof(__be16);
117 				left -= sizeof(__be16);
118 
119 				/* namelen of 0xFFFFF marks end of names for
120 				   this node; namelen of 0 marks end of the
121 				   buffer */
122 
123 				if (namelen == 0xFFFF)
124 					goto done;
125 				if (!namelen)
126 					break;
127 
128 				if (namelen > left)
129 					goto out_free;
130 
131 				if (namelen > DLM_RESNAME_MAXLEN)
132 					goto out_free;
133 
134 				error = dlm_master_lookup(ls, memb->nodeid,
135 							  b, namelen,
136 							  DLM_LU_RECOVER_DIR,
137 							  &nodeid, &result);
138 				if (error) {
139 					log_error(ls, "recover_dir lookup %d",
140 						  error);
141 					goto out_free;
142 				}
143 
144 				/* The name was found in rsbtbl, but the
145 				 * master nodeid is different from
146 				 * memb->nodeid which says it is the master.
147 				 * This should not happen. */
148 
149 				if (result == DLM_LU_MATCH &&
150 				    nodeid != memb->nodeid) {
151 					count_bad++;
152 					log_error(ls, "recover_dir lookup %d "
153 						  "nodeid %d memb %d bad %u",
154 						  result, nodeid, memb->nodeid,
155 						  count_bad);
156 					print_hex_dump_bytes("dlm_recover_dir ",
157 							     DUMP_PREFIX_NONE,
158 							     b, namelen);
159 				}
160 
161 				/* The name was found in rsbtbl, and the
162 				 * master nodeid matches memb->nodeid. */
163 
164 				if (result == DLM_LU_MATCH &&
165 				    nodeid == memb->nodeid) {
166 					count_match++;
167 				}
168 
169 				/* The name was not found in rsbtbl and was
170 				 * added with memb->nodeid as the master. */
171 
172 				if (result == DLM_LU_ADD) {
173 					count_add++;
174 				}
175 
176 				last_len = namelen;
177 				memcpy(last_name, b, namelen);
178 				b += namelen;
179 				left -= namelen;
180 				count++;
181 			}
182 		}
183 	 done:
184 		;
185 	}
186 
187  out_status:
188 	error = 0;
189 	dlm_set_recover_status(ls, DLM_RS_DIR);
190 
191 	log_rinfo(ls, "dlm_recover_directory %u in %u new",
192 		  count, count_add);
193  out_free:
194 	kfree(last_name);
195  out:
196 	return error;
197 }
198 
find_rsb_root(struct dlm_ls * ls,const char * name,int len)199 static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, const char *name,
200 				     int len)
201 {
202 	struct dlm_rsb *r;
203 	uint32_t hash, bucket;
204 	int rv;
205 
206 	hash = jhash(name, len, 0);
207 	bucket = hash & (ls->ls_rsbtbl_size - 1);
208 
209 	spin_lock(&ls->ls_rsbtbl[bucket].lock);
210 	rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].keep, name, len, &r);
211 	if (rv)
212 		rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].toss,
213 					 name, len, &r);
214 	spin_unlock(&ls->ls_rsbtbl[bucket].lock);
215 
216 	if (!rv)
217 		return r;
218 
219 	down_read(&ls->ls_root_sem);
220 	list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
221 		if (len == r->res_length && !memcmp(name, r->res_name, len)) {
222 			up_read(&ls->ls_root_sem);
223 			log_debug(ls, "find_rsb_root revert to root_list %s",
224 				  r->res_name);
225 			return r;
226 		}
227 	}
228 	up_read(&ls->ls_root_sem);
229 	return NULL;
230 }
231 
232 /* Find the rsb where we left off (or start again), then send rsb names
233    for rsb's we're master of and whose directory node matches the requesting
234    node.  inbuf is the rsb name last sent, inlen is the name's length */
235 
dlm_copy_master_names(struct dlm_ls * ls,const char * inbuf,int inlen,char * outbuf,int outlen,int nodeid)236 void dlm_copy_master_names(struct dlm_ls *ls, const char *inbuf, int inlen,
237  			   char *outbuf, int outlen, int nodeid)
238 {
239 	struct list_head *list;
240 	struct dlm_rsb *r;
241 	int offset = 0, dir_nodeid;
242 	__be16 be_namelen;
243 
244 	down_read(&ls->ls_root_sem);
245 
246 	if (inlen > 1) {
247 		r = find_rsb_root(ls, inbuf, inlen);
248 		if (!r) {
249 			log_error(ls, "copy_master_names from %d start %d %.*s",
250 				  nodeid, inlen, inlen, inbuf);
251 			goto out;
252 		}
253 		list = r->res_root_list.next;
254 	} else {
255 		list = ls->ls_root_list.next;
256 	}
257 
258 	for (offset = 0; list != &ls->ls_root_list; list = list->next) {
259 		r = list_entry(list, struct dlm_rsb, res_root_list);
260 		if (r->res_nodeid)
261 			continue;
262 
263 		dir_nodeid = dlm_dir_nodeid(r);
264 		if (dir_nodeid != nodeid)
265 			continue;
266 
267 		/*
268 		 * The block ends when we can't fit the following in the
269 		 * remaining buffer space:
270 		 * namelen (uint16_t) +
271 		 * name (r->res_length) +
272 		 * end-of-block record 0x0000 (uint16_t)
273 		 */
274 
275 		if (offset + sizeof(uint16_t)*2 + r->res_length > outlen) {
276 			/* Write end-of-block record */
277 			be_namelen = cpu_to_be16(0);
278 			memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
279 			offset += sizeof(__be16);
280 			ls->ls_recover_dir_sent_msg++;
281 			goto out;
282 		}
283 
284 		be_namelen = cpu_to_be16(r->res_length);
285 		memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
286 		offset += sizeof(__be16);
287 		memcpy(outbuf + offset, r->res_name, r->res_length);
288 		offset += r->res_length;
289 		ls->ls_recover_dir_sent_res++;
290 	}
291 
292 	/*
293 	 * If we've reached the end of the list (and there's room) write a
294 	 * terminating record.
295 	 */
296 
297 	if ((list == &ls->ls_root_list) &&
298 	    (offset + sizeof(uint16_t) <= outlen)) {
299 		be_namelen = cpu_to_be16(0xFFFF);
300 		memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
301 		offset += sizeof(__be16);
302 		ls->ls_recover_dir_sent_msg++;
303 	}
304  out:
305 	up_read(&ls->ls_root_sem);
306 }
307 
308