1  // SPDX-License-Identifier: GPL-2.0-only
2  /*
3   * stack_user.c
4   *
5   * Code which interfaces ocfs2 with fs/dlm and a userspace stack.
6   *
7   * Copyright (C) 2007 Oracle.  All rights reserved.
8   */
9  
10  #include <linux/module.h>
11  #include <linux/fs.h>
12  #include <linux/filelock.h>
13  #include <linux/miscdevice.h>
14  #include <linux/mutex.h>
15  #include <linux/slab.h>
16  #include <linux/reboot.h>
17  #include <linux/sched.h>
18  #include <linux/uaccess.h>
19  
20  #include "stackglue.h"
21  
22  #include <linux/dlm_plock.h>
23  
24  /*
25   * The control protocol starts with a handshake.  Until the handshake
26   * is complete, the control device will fail all write(2)s.
27   *
28   * The handshake is simple.  First, the client reads until EOF.  Each line
29   * of output is a supported protocol tag.  All protocol tags are a single
30   * character followed by a two hex digit version number.  Currently the
31   * only things supported is T01, for "Text-base version 0x01".  Next, the
32   * client writes the version they would like to use, including the newline.
33   * Thus, the protocol tag is 'T01\n'.  If the version tag written is
34   * unknown, -EINVAL is returned.  Once the negotiation is complete, the
35   * client can start sending messages.
36   *
37   * The T01 protocol has three messages.  First is the "SETN" message.
38   * It has the following syntax:
39   *
40   *  SETN<space><8-char-hex-nodenum><newline>
41   *
42   * This is 14 characters.
43   *
44   * The "SETN" message must be the first message following the protocol.
45   * It tells ocfs2_control the local node number.
46   *
47   * Next comes the "SETV" message.  It has the following syntax:
48   *
49   *  SETV<space><2-char-hex-major><space><2-char-hex-minor><newline>
50   *
51   * This is 11 characters.
52   *
53   * The "SETV" message sets the filesystem locking protocol version as
54   * negotiated by the client.  The client negotiates based on the maximum
55   * version advertised in /sys/fs/ocfs2/max_locking_protocol.  The major
56   * number from the "SETV" message must match
57   * ocfs2_user_plugin.sp_max_proto.pv_major, and the minor number
58   * must be less than or equal to ...sp_max_version.pv_minor.
59   *
60   * Once this information has been set, mounts will be allowed.  From this
61   * point on, the "DOWN" message can be sent for node down notification.
62   * It has the following syntax:
63   *
64   *  DOWN<space><32-char-cap-hex-uuid><space><8-char-hex-nodenum><newline>
65   *
66   * eg:
67   *
68   *  DOWN 632A924FDD844190BDA93C0DF6B94899 00000001\n
69   *
70   * This is 47 characters.
71   */
72  
73  /*
74   * Whether or not the client has done the handshake.
75   * For now, we have just one protocol version.
76   */
77  #define OCFS2_CONTROL_PROTO			"T01\n"
78  #define OCFS2_CONTROL_PROTO_LEN			4
79  
80  /* Handshake states */
81  #define OCFS2_CONTROL_HANDSHAKE_INVALID		(0)
82  #define OCFS2_CONTROL_HANDSHAKE_READ		(1)
83  #define OCFS2_CONTROL_HANDSHAKE_PROTOCOL	(2)
84  #define OCFS2_CONTROL_HANDSHAKE_VALID		(3)
85  
86  /* Messages */
87  #define OCFS2_CONTROL_MESSAGE_OP_LEN		4
88  #define OCFS2_CONTROL_MESSAGE_SETNODE_OP	"SETN"
89  #define OCFS2_CONTROL_MESSAGE_SETNODE_TOTAL_LEN	14
90  #define OCFS2_CONTROL_MESSAGE_SETVERSION_OP	"SETV"
91  #define OCFS2_CONTROL_MESSAGE_SETVERSION_TOTAL_LEN	11
92  #define OCFS2_CONTROL_MESSAGE_DOWN_OP		"DOWN"
93  #define OCFS2_CONTROL_MESSAGE_DOWN_TOTAL_LEN	47
94  #define OCFS2_TEXT_UUID_LEN			32
95  #define OCFS2_CONTROL_MESSAGE_VERNUM_LEN	2
96  #define OCFS2_CONTROL_MESSAGE_NODENUM_LEN	8
97  #define VERSION_LOCK				"version_lock"
98  
99  enum ocfs2_connection_type {
100  	WITH_CONTROLD,
101  	NO_CONTROLD
102  };
103  
104  /*
105   * ocfs2_live_connection is refcounted because the filesystem and
106   * miscdevice sides can detach in different order.  Let's just be safe.
107   */
108  struct ocfs2_live_connection {
109  	struct list_head		oc_list;
110  	struct ocfs2_cluster_connection	*oc_conn;
111  	enum ocfs2_connection_type	oc_type;
112  	atomic_t                        oc_this_node;
113  	int                             oc_our_slot;
114  	struct dlm_lksb                 oc_version_lksb;
115  	char                            oc_lvb[DLM_LVB_LEN];
116  	struct completion               oc_sync_wait;
117  	wait_queue_head_t		oc_wait;
118  };
119  
120  struct ocfs2_control_private {
121  	struct list_head op_list;
122  	int op_state;
123  	int op_this_node;
124  	struct ocfs2_protocol_version op_proto;
125  };
126  
127  /* SETN<space><8-char-hex-nodenum><newline> */
128  struct ocfs2_control_message_setn {
129  	char	tag[OCFS2_CONTROL_MESSAGE_OP_LEN];
130  	char	space;
131  	char	nodestr[OCFS2_CONTROL_MESSAGE_NODENUM_LEN];
132  	char	newline;
133  };
134  
135  /* SETV<space><2-char-hex-major><space><2-char-hex-minor><newline> */
136  struct ocfs2_control_message_setv {
137  	char	tag[OCFS2_CONTROL_MESSAGE_OP_LEN];
138  	char	space1;
139  	char	major[OCFS2_CONTROL_MESSAGE_VERNUM_LEN];
140  	char	space2;
141  	char	minor[OCFS2_CONTROL_MESSAGE_VERNUM_LEN];
142  	char	newline;
143  };
144  
145  /* DOWN<space><32-char-cap-hex-uuid><space><8-char-hex-nodenum><newline> */
146  struct ocfs2_control_message_down {
147  	char	tag[OCFS2_CONTROL_MESSAGE_OP_LEN];
148  	char	space1;
149  	char	uuid[OCFS2_TEXT_UUID_LEN];
150  	char	space2;
151  	char	nodestr[OCFS2_CONTROL_MESSAGE_NODENUM_LEN];
152  	char	newline;
153  };
154  
155  union ocfs2_control_message {
156  	char					tag[OCFS2_CONTROL_MESSAGE_OP_LEN];
157  	struct ocfs2_control_message_setn	u_setn;
158  	struct ocfs2_control_message_setv	u_setv;
159  	struct ocfs2_control_message_down	u_down;
160  };
161  
162  static struct ocfs2_stack_plugin ocfs2_user_plugin;
163  
164  static atomic_t ocfs2_control_opened;
165  static int ocfs2_control_this_node = -1;
166  static struct ocfs2_protocol_version running_proto;
167  
168  static LIST_HEAD(ocfs2_live_connection_list);
169  static LIST_HEAD(ocfs2_control_private_list);
170  static DEFINE_MUTEX(ocfs2_control_lock);
171  
ocfs2_control_set_handshake_state(struct file * file,int state)172  static inline void ocfs2_control_set_handshake_state(struct file *file,
173  						     int state)
174  {
175  	struct ocfs2_control_private *p = file->private_data;
176  	p->op_state = state;
177  }
178  
ocfs2_control_get_handshake_state(struct file * file)179  static inline int ocfs2_control_get_handshake_state(struct file *file)
180  {
181  	struct ocfs2_control_private *p = file->private_data;
182  	return p->op_state;
183  }
184  
ocfs2_connection_find(const char * name)185  static struct ocfs2_live_connection *ocfs2_connection_find(const char *name)
186  {
187  	size_t len = strlen(name);
188  	struct ocfs2_live_connection *c;
189  
190  	BUG_ON(!mutex_is_locked(&ocfs2_control_lock));
191  
192  	list_for_each_entry(c, &ocfs2_live_connection_list, oc_list) {
193  		if ((c->oc_conn->cc_namelen == len) &&
194  		    !strncmp(c->oc_conn->cc_name, name, len))
195  			return c;
196  	}
197  
198  	return NULL;
199  }
200  
201  /*
202   * ocfs2_live_connection structures are created underneath the ocfs2
203   * mount path.  Since the VFS prevents multiple calls to
204   * fill_super(), we can't get dupes here.
205   */
ocfs2_live_connection_attach(struct ocfs2_cluster_connection * conn,struct ocfs2_live_connection * c)206  static int ocfs2_live_connection_attach(struct ocfs2_cluster_connection *conn,
207  				     struct ocfs2_live_connection *c)
208  {
209  	int rc = 0;
210  
211  	mutex_lock(&ocfs2_control_lock);
212  	c->oc_conn = conn;
213  
214  	if ((c->oc_type == NO_CONTROLD) || atomic_read(&ocfs2_control_opened))
215  		list_add(&c->oc_list, &ocfs2_live_connection_list);
216  	else {
217  		printk(KERN_ERR
218  		       "ocfs2: Userspace control daemon is not present\n");
219  		rc = -ESRCH;
220  	}
221  
222  	mutex_unlock(&ocfs2_control_lock);
223  	return rc;
224  }
225  
226  /*
227   * This function disconnects the cluster connection from ocfs2_control.
228   * Afterwards, userspace can't affect the cluster connection.
229   */
ocfs2_live_connection_drop(struct ocfs2_live_connection * c)230  static void ocfs2_live_connection_drop(struct ocfs2_live_connection *c)
231  {
232  	mutex_lock(&ocfs2_control_lock);
233  	list_del_init(&c->oc_list);
234  	c->oc_conn = NULL;
235  	mutex_unlock(&ocfs2_control_lock);
236  
237  	kfree(c);
238  }
239  
ocfs2_control_cfu(void * target,size_t target_len,const char __user * buf,size_t count)240  static int ocfs2_control_cfu(void *target, size_t target_len,
241  			     const char __user *buf, size_t count)
242  {
243  	/* The T01 expects write(2) calls to have exactly one command */
244  	if ((count != target_len) ||
245  	    (count > sizeof(union ocfs2_control_message)))
246  		return -EINVAL;
247  
248  	if (copy_from_user(target, buf, target_len))
249  		return -EFAULT;
250  
251  	return 0;
252  }
253  
ocfs2_control_validate_protocol(struct file * file,const char __user * buf,size_t count)254  static ssize_t ocfs2_control_validate_protocol(struct file *file,
255  					       const char __user *buf,
256  					       size_t count)
257  {
258  	ssize_t ret;
259  	char kbuf[OCFS2_CONTROL_PROTO_LEN];
260  
261  	ret = ocfs2_control_cfu(kbuf, OCFS2_CONTROL_PROTO_LEN,
262  				buf, count);
263  	if (ret)
264  		return ret;
265  
266  	if (strncmp(kbuf, OCFS2_CONTROL_PROTO, OCFS2_CONTROL_PROTO_LEN))
267  		return -EINVAL;
268  
269  	ocfs2_control_set_handshake_state(file,
270  					  OCFS2_CONTROL_HANDSHAKE_PROTOCOL);
271  
272  	return count;
273  }
274  
ocfs2_control_send_down(const char * uuid,int nodenum)275  static void ocfs2_control_send_down(const char *uuid,
276  				    int nodenum)
277  {
278  	struct ocfs2_live_connection *c;
279  
280  	mutex_lock(&ocfs2_control_lock);
281  
282  	c = ocfs2_connection_find(uuid);
283  	if (c) {
284  		BUG_ON(c->oc_conn == NULL);
285  		c->oc_conn->cc_recovery_handler(nodenum,
286  						c->oc_conn->cc_recovery_data);
287  	}
288  
289  	mutex_unlock(&ocfs2_control_lock);
290  }
291  
292  /*
293   * Called whenever configuration elements are sent to /dev/ocfs2_control.
294   * If all configuration elements are present, try to set the global
295   * values.  If there is a problem, return an error.  Skip any missing
296   * elements, and only bump ocfs2_control_opened when we have all elements
297   * and are successful.
298   */
ocfs2_control_install_private(struct file * file)299  static int ocfs2_control_install_private(struct file *file)
300  {
301  	int rc = 0;
302  	int set_p = 1;
303  	struct ocfs2_control_private *p = file->private_data;
304  
305  	BUG_ON(p->op_state != OCFS2_CONTROL_HANDSHAKE_PROTOCOL);
306  
307  	mutex_lock(&ocfs2_control_lock);
308  
309  	if (p->op_this_node < 0) {
310  		set_p = 0;
311  	} else if ((ocfs2_control_this_node >= 0) &&
312  		   (ocfs2_control_this_node != p->op_this_node)) {
313  		rc = -EINVAL;
314  		goto out_unlock;
315  	}
316  
317  	if (!p->op_proto.pv_major) {
318  		set_p = 0;
319  	} else if (!list_empty(&ocfs2_live_connection_list) &&
320  		   ((running_proto.pv_major != p->op_proto.pv_major) ||
321  		    (running_proto.pv_minor != p->op_proto.pv_minor))) {
322  		rc = -EINVAL;
323  		goto out_unlock;
324  	}
325  
326  	if (set_p) {
327  		ocfs2_control_this_node = p->op_this_node;
328  		running_proto.pv_major = p->op_proto.pv_major;
329  		running_proto.pv_minor = p->op_proto.pv_minor;
330  	}
331  
332  out_unlock:
333  	mutex_unlock(&ocfs2_control_lock);
334  
335  	if (!rc && set_p) {
336  		/* We set the global values successfully */
337  		atomic_inc(&ocfs2_control_opened);
338  		ocfs2_control_set_handshake_state(file,
339  					OCFS2_CONTROL_HANDSHAKE_VALID);
340  	}
341  
342  	return rc;
343  }
344  
ocfs2_control_get_this_node(void)345  static int ocfs2_control_get_this_node(void)
346  {
347  	int rc;
348  
349  	mutex_lock(&ocfs2_control_lock);
350  	if (ocfs2_control_this_node < 0)
351  		rc = -EINVAL;
352  	else
353  		rc = ocfs2_control_this_node;
354  	mutex_unlock(&ocfs2_control_lock);
355  
356  	return rc;
357  }
358  
ocfs2_control_do_setnode_msg(struct file * file,struct ocfs2_control_message_setn * msg)359  static int ocfs2_control_do_setnode_msg(struct file *file,
360  					struct ocfs2_control_message_setn *msg)
361  {
362  	long nodenum;
363  	char *ptr = NULL;
364  	struct ocfs2_control_private *p = file->private_data;
365  
366  	if (ocfs2_control_get_handshake_state(file) !=
367  	    OCFS2_CONTROL_HANDSHAKE_PROTOCOL)
368  		return -EINVAL;
369  
370  	if (strncmp(msg->tag, OCFS2_CONTROL_MESSAGE_SETNODE_OP,
371  		    OCFS2_CONTROL_MESSAGE_OP_LEN))
372  		return -EINVAL;
373  
374  	if ((msg->space != ' ') || (msg->newline != '\n'))
375  		return -EINVAL;
376  	msg->space = msg->newline = '\0';
377  
378  	nodenum = simple_strtol(msg->nodestr, &ptr, 16);
379  	if (!ptr || *ptr)
380  		return -EINVAL;
381  
382  	if ((nodenum == LONG_MIN) || (nodenum == LONG_MAX) ||
383  	    (nodenum > INT_MAX) || (nodenum < 0))
384  		return -ERANGE;
385  	p->op_this_node = nodenum;
386  
387  	return ocfs2_control_install_private(file);
388  }
389  
ocfs2_control_do_setversion_msg(struct file * file,struct ocfs2_control_message_setv * msg)390  static int ocfs2_control_do_setversion_msg(struct file *file,
391  					   struct ocfs2_control_message_setv *msg)
392  {
393  	long major, minor;
394  	char *ptr = NULL;
395  	struct ocfs2_control_private *p = file->private_data;
396  	struct ocfs2_protocol_version *max =
397  		&ocfs2_user_plugin.sp_max_proto;
398  
399  	if (ocfs2_control_get_handshake_state(file) !=
400  	    OCFS2_CONTROL_HANDSHAKE_PROTOCOL)
401  		return -EINVAL;
402  
403  	if (strncmp(msg->tag, OCFS2_CONTROL_MESSAGE_SETVERSION_OP,
404  		    OCFS2_CONTROL_MESSAGE_OP_LEN))
405  		return -EINVAL;
406  
407  	if ((msg->space1 != ' ') || (msg->space2 != ' ') ||
408  	    (msg->newline != '\n'))
409  		return -EINVAL;
410  	msg->space1 = msg->space2 = msg->newline = '\0';
411  
412  	major = simple_strtol(msg->major, &ptr, 16);
413  	if (!ptr || *ptr)
414  		return -EINVAL;
415  	minor = simple_strtol(msg->minor, &ptr, 16);
416  	if (!ptr || *ptr)
417  		return -EINVAL;
418  
419  	/*
420  	 * The major must be between 1 and 255, inclusive.  The minor
421  	 * must be between 0 and 255, inclusive.  The version passed in
422  	 * must be within the maximum version supported by the filesystem.
423  	 */
424  	if ((major == LONG_MIN) || (major == LONG_MAX) ||
425  	    (major > (u8)-1) || (major < 1))
426  		return -ERANGE;
427  	if ((minor == LONG_MIN) || (minor == LONG_MAX) ||
428  	    (minor > (u8)-1) || (minor < 0))
429  		return -ERANGE;
430  	if ((major != max->pv_major) ||
431  	    (minor > max->pv_minor))
432  		return -EINVAL;
433  
434  	p->op_proto.pv_major = major;
435  	p->op_proto.pv_minor = minor;
436  
437  	return ocfs2_control_install_private(file);
438  }
439  
ocfs2_control_do_down_msg(struct file * file,struct ocfs2_control_message_down * msg)440  static int ocfs2_control_do_down_msg(struct file *file,
441  				     struct ocfs2_control_message_down *msg)
442  {
443  	long nodenum;
444  	char *p = NULL;
445  
446  	if (ocfs2_control_get_handshake_state(file) !=
447  	    OCFS2_CONTROL_HANDSHAKE_VALID)
448  		return -EINVAL;
449  
450  	if (strncmp(msg->tag, OCFS2_CONTROL_MESSAGE_DOWN_OP,
451  		    OCFS2_CONTROL_MESSAGE_OP_LEN))
452  		return -EINVAL;
453  
454  	if ((msg->space1 != ' ') || (msg->space2 != ' ') ||
455  	    (msg->newline != '\n'))
456  		return -EINVAL;
457  	msg->space1 = msg->space2 = msg->newline = '\0';
458  
459  	nodenum = simple_strtol(msg->nodestr, &p, 16);
460  	if (!p || *p)
461  		return -EINVAL;
462  
463  	if ((nodenum == LONG_MIN) || (nodenum == LONG_MAX) ||
464  	    (nodenum > INT_MAX) || (nodenum < 0))
465  		return -ERANGE;
466  
467  	ocfs2_control_send_down(msg->uuid, nodenum);
468  
469  	return 0;
470  }
471  
ocfs2_control_message(struct file * file,const char __user * buf,size_t count)472  static ssize_t ocfs2_control_message(struct file *file,
473  				     const char __user *buf,
474  				     size_t count)
475  {
476  	ssize_t ret;
477  	union ocfs2_control_message msg;
478  
479  	/* Try to catch padding issues */
480  	WARN_ON(offsetof(struct ocfs2_control_message_down, uuid) !=
481  		(sizeof(msg.u_down.tag) + sizeof(msg.u_down.space1)));
482  
483  	memset(&msg, 0, sizeof(union ocfs2_control_message));
484  	ret = ocfs2_control_cfu(&msg, count, buf, count);
485  	if (ret)
486  		goto out;
487  
488  	if ((count == OCFS2_CONTROL_MESSAGE_SETNODE_TOTAL_LEN) &&
489  	    !strncmp(msg.tag, OCFS2_CONTROL_MESSAGE_SETNODE_OP,
490  		     OCFS2_CONTROL_MESSAGE_OP_LEN))
491  		ret = ocfs2_control_do_setnode_msg(file, &msg.u_setn);
492  	else if ((count == OCFS2_CONTROL_MESSAGE_SETVERSION_TOTAL_LEN) &&
493  		 !strncmp(msg.tag, OCFS2_CONTROL_MESSAGE_SETVERSION_OP,
494  			  OCFS2_CONTROL_MESSAGE_OP_LEN))
495  		ret = ocfs2_control_do_setversion_msg(file, &msg.u_setv);
496  	else if ((count == OCFS2_CONTROL_MESSAGE_DOWN_TOTAL_LEN) &&
497  		 !strncmp(msg.tag, OCFS2_CONTROL_MESSAGE_DOWN_OP,
498  			  OCFS2_CONTROL_MESSAGE_OP_LEN))
499  		ret = ocfs2_control_do_down_msg(file, &msg.u_down);
500  	else
501  		ret = -EINVAL;
502  
503  out:
504  	return ret ? ret : count;
505  }
506  
ocfs2_control_write(struct file * file,const char __user * buf,size_t count,loff_t * ppos)507  static ssize_t ocfs2_control_write(struct file *file,
508  				   const char __user *buf,
509  				   size_t count,
510  				   loff_t *ppos)
511  {
512  	ssize_t ret;
513  
514  	switch (ocfs2_control_get_handshake_state(file)) {
515  		case OCFS2_CONTROL_HANDSHAKE_INVALID:
516  			ret = -EINVAL;
517  			break;
518  
519  		case OCFS2_CONTROL_HANDSHAKE_READ:
520  			ret = ocfs2_control_validate_protocol(file, buf,
521  							      count);
522  			break;
523  
524  		case OCFS2_CONTROL_HANDSHAKE_PROTOCOL:
525  		case OCFS2_CONTROL_HANDSHAKE_VALID:
526  			ret = ocfs2_control_message(file, buf, count);
527  			break;
528  
529  		default:
530  			BUG();
531  			ret = -EIO;
532  			break;
533  	}
534  
535  	return ret;
536  }
537  
538  /*
539   * This is a naive version.  If we ever have a new protocol, we'll expand
540   * it.  Probably using seq_file.
541   */
ocfs2_control_read(struct file * file,char __user * buf,size_t count,loff_t * ppos)542  static ssize_t ocfs2_control_read(struct file *file,
543  				  char __user *buf,
544  				  size_t count,
545  				  loff_t *ppos)
546  {
547  	ssize_t ret;
548  
549  	ret = simple_read_from_buffer(buf, count, ppos,
550  			OCFS2_CONTROL_PROTO, OCFS2_CONTROL_PROTO_LEN);
551  
552  	/* Have we read the whole protocol list? */
553  	if (ret > 0 && *ppos >= OCFS2_CONTROL_PROTO_LEN)
554  		ocfs2_control_set_handshake_state(file,
555  						  OCFS2_CONTROL_HANDSHAKE_READ);
556  
557  	return ret;
558  }
559  
ocfs2_control_release(struct inode * inode,struct file * file)560  static int ocfs2_control_release(struct inode *inode, struct file *file)
561  {
562  	struct ocfs2_control_private *p = file->private_data;
563  
564  	mutex_lock(&ocfs2_control_lock);
565  
566  	if (ocfs2_control_get_handshake_state(file) !=
567  	    OCFS2_CONTROL_HANDSHAKE_VALID)
568  		goto out;
569  
570  	if (atomic_dec_and_test(&ocfs2_control_opened)) {
571  		if (!list_empty(&ocfs2_live_connection_list)) {
572  			/* XXX: Do bad things! */
573  			printk(KERN_ERR
574  			       "ocfs2: Unexpected release of ocfs2_control!\n"
575  			       "       Loss of cluster connection requires "
576  			       "an emergency restart!\n");
577  			emergency_restart();
578  		}
579  		/*
580  		 * Last valid close clears the node number and resets
581  		 * the locking protocol version
582  		 */
583  		ocfs2_control_this_node = -1;
584  		running_proto.pv_major = 0;
585  		running_proto.pv_minor = 0;
586  	}
587  
588  out:
589  	list_del_init(&p->op_list);
590  	file->private_data = NULL;
591  
592  	mutex_unlock(&ocfs2_control_lock);
593  
594  	kfree(p);
595  
596  	return 0;
597  }
598  
ocfs2_control_open(struct inode * inode,struct file * file)599  static int ocfs2_control_open(struct inode *inode, struct file *file)
600  {
601  	struct ocfs2_control_private *p;
602  
603  	p = kzalloc(sizeof(struct ocfs2_control_private), GFP_KERNEL);
604  	if (!p)
605  		return -ENOMEM;
606  	p->op_this_node = -1;
607  
608  	mutex_lock(&ocfs2_control_lock);
609  	file->private_data = p;
610  	list_add(&p->op_list, &ocfs2_control_private_list);
611  	mutex_unlock(&ocfs2_control_lock);
612  
613  	return 0;
614  }
615  
616  static const struct file_operations ocfs2_control_fops = {
617  	.open    = ocfs2_control_open,
618  	.release = ocfs2_control_release,
619  	.read    = ocfs2_control_read,
620  	.write   = ocfs2_control_write,
621  	.owner   = THIS_MODULE,
622  	.llseek  = default_llseek,
623  };
624  
625  static struct miscdevice ocfs2_control_device = {
626  	.minor		= MISC_DYNAMIC_MINOR,
627  	.name		= "ocfs2_control",
628  	.fops		= &ocfs2_control_fops,
629  };
630  
ocfs2_control_init(void)631  static int ocfs2_control_init(void)
632  {
633  	int rc;
634  
635  	atomic_set(&ocfs2_control_opened, 0);
636  
637  	rc = misc_register(&ocfs2_control_device);
638  	if (rc)
639  		printk(KERN_ERR
640  		       "ocfs2: Unable to register ocfs2_control device "
641  		       "(errno %d)\n",
642  		       -rc);
643  
644  	return rc;
645  }
646  
ocfs2_control_exit(void)647  static void ocfs2_control_exit(void)
648  {
649  	misc_deregister(&ocfs2_control_device);
650  }
651  
fsdlm_lock_ast_wrapper(void * astarg)652  static void fsdlm_lock_ast_wrapper(void *astarg)
653  {
654  	struct ocfs2_dlm_lksb *lksb = astarg;
655  	int status = lksb->lksb_fsdlm.sb_status;
656  
657  	/*
658  	 * For now we're punting on the issue of other non-standard errors
659  	 * where we can't tell if the unlock_ast or lock_ast should be called.
660  	 * The main "other error" that's possible is EINVAL which means the
661  	 * function was called with invalid args, which shouldn't be possible
662  	 * since the caller here is under our control.  Other non-standard
663  	 * errors probably fall into the same category, or otherwise are fatal
664  	 * which means we can't carry on anyway.
665  	 */
666  
667  	if (status == -DLM_EUNLOCK || status == -DLM_ECANCEL)
668  		lksb->lksb_conn->cc_proto->lp_unlock_ast(lksb, 0);
669  	else
670  		lksb->lksb_conn->cc_proto->lp_lock_ast(lksb);
671  }
672  
fsdlm_blocking_ast_wrapper(void * astarg,int level)673  static void fsdlm_blocking_ast_wrapper(void *astarg, int level)
674  {
675  	struct ocfs2_dlm_lksb *lksb = astarg;
676  
677  	lksb->lksb_conn->cc_proto->lp_blocking_ast(lksb, level);
678  }
679  
user_dlm_lock(struct ocfs2_cluster_connection * conn,int mode,struct ocfs2_dlm_lksb * lksb,u32 flags,void * name,unsigned int namelen)680  static int user_dlm_lock(struct ocfs2_cluster_connection *conn,
681  			 int mode,
682  			 struct ocfs2_dlm_lksb *lksb,
683  			 u32 flags,
684  			 void *name,
685  			 unsigned int namelen)
686  {
687  	if (!lksb->lksb_fsdlm.sb_lvbptr)
688  		lksb->lksb_fsdlm.sb_lvbptr = (char *)lksb +
689  					     sizeof(struct dlm_lksb);
690  
691  	return dlm_lock(conn->cc_lockspace, mode, &lksb->lksb_fsdlm,
692  			flags|DLM_LKF_NODLCKWT, name, namelen, 0,
693  			fsdlm_lock_ast_wrapper, lksb,
694  			fsdlm_blocking_ast_wrapper);
695  }
696  
user_dlm_unlock(struct ocfs2_cluster_connection * conn,struct ocfs2_dlm_lksb * lksb,u32 flags)697  static int user_dlm_unlock(struct ocfs2_cluster_connection *conn,
698  			   struct ocfs2_dlm_lksb *lksb,
699  			   u32 flags)
700  {
701  	return dlm_unlock(conn->cc_lockspace, lksb->lksb_fsdlm.sb_lkid,
702  			  flags, &lksb->lksb_fsdlm, lksb);
703  }
704  
user_dlm_lock_status(struct ocfs2_dlm_lksb * lksb)705  static int user_dlm_lock_status(struct ocfs2_dlm_lksb *lksb)
706  {
707  	return lksb->lksb_fsdlm.sb_status;
708  }
709  
user_dlm_lvb_valid(struct ocfs2_dlm_lksb * lksb)710  static int user_dlm_lvb_valid(struct ocfs2_dlm_lksb *lksb)
711  {
712  	int invalid = lksb->lksb_fsdlm.sb_flags & DLM_SBF_VALNOTVALID;
713  
714  	return !invalid;
715  }
716  
user_dlm_lvb(struct ocfs2_dlm_lksb * lksb)717  static void *user_dlm_lvb(struct ocfs2_dlm_lksb *lksb)
718  {
719  	if (!lksb->lksb_fsdlm.sb_lvbptr)
720  		lksb->lksb_fsdlm.sb_lvbptr = (char *)lksb +
721  					     sizeof(struct dlm_lksb);
722  	return (void *)(lksb->lksb_fsdlm.sb_lvbptr);
723  }
724  
user_dlm_dump_lksb(struct ocfs2_dlm_lksb * lksb)725  static void user_dlm_dump_lksb(struct ocfs2_dlm_lksb *lksb)
726  {
727  }
728  
user_plock(struct ocfs2_cluster_connection * conn,u64 ino,struct file * file,int cmd,struct file_lock * fl)729  static int user_plock(struct ocfs2_cluster_connection *conn,
730  		      u64 ino,
731  		      struct file *file,
732  		      int cmd,
733  		      struct file_lock *fl)
734  {
735  	/*
736  	 * This more or less just demuxes the plock request into any
737  	 * one of three dlm calls.
738  	 *
739  	 * Internally, fs/dlm will pass these to a misc device, which
740  	 * a userspace daemon will read and write to.
741  	 */
742  
743  	if (cmd == F_CANCELLK)
744  		return dlm_posix_cancel(conn->cc_lockspace, ino, file, fl);
745  	else if (IS_GETLK(cmd))
746  		return dlm_posix_get(conn->cc_lockspace, ino, file, fl);
747  	else if (fl->fl_type == F_UNLCK)
748  		return dlm_posix_unlock(conn->cc_lockspace, ino, file, fl);
749  	else
750  		return dlm_posix_lock(conn->cc_lockspace, ino, file, cmd, fl);
751  }
752  
753  /*
754   * Compare a requested locking protocol version against the current one.
755   *
756   * If the major numbers are different, they are incompatible.
757   * If the current minor is greater than the request, they are incompatible.
758   * If the current minor is less than or equal to the request, they are
759   * compatible, and the requester should run at the current minor version.
760   */
fs_protocol_compare(struct ocfs2_protocol_version * existing,struct ocfs2_protocol_version * request)761  static int fs_protocol_compare(struct ocfs2_protocol_version *existing,
762  			       struct ocfs2_protocol_version *request)
763  {
764  	if (existing->pv_major != request->pv_major)
765  		return 1;
766  
767  	if (existing->pv_minor > request->pv_minor)
768  		return 1;
769  
770  	if (existing->pv_minor < request->pv_minor)
771  		request->pv_minor = existing->pv_minor;
772  
773  	return 0;
774  }
775  
lvb_to_version(char * lvb,struct ocfs2_protocol_version * ver)776  static void lvb_to_version(char *lvb, struct ocfs2_protocol_version *ver)
777  {
778  	struct ocfs2_protocol_version *pv =
779  		(struct ocfs2_protocol_version *)lvb;
780  	/*
781  	 * ocfs2_protocol_version has two u8 variables, so we don't
782  	 * need any endian conversion.
783  	 */
784  	ver->pv_major = pv->pv_major;
785  	ver->pv_minor = pv->pv_minor;
786  }
787  
version_to_lvb(struct ocfs2_protocol_version * ver,char * lvb)788  static void version_to_lvb(struct ocfs2_protocol_version *ver, char *lvb)
789  {
790  	struct ocfs2_protocol_version *pv =
791  		(struct ocfs2_protocol_version *)lvb;
792  	/*
793  	 * ocfs2_protocol_version has two u8 variables, so we don't
794  	 * need any endian conversion.
795  	 */
796  	pv->pv_major = ver->pv_major;
797  	pv->pv_minor = ver->pv_minor;
798  }
799  
sync_wait_cb(void * arg)800  static void sync_wait_cb(void *arg)
801  {
802  	struct ocfs2_cluster_connection *conn = arg;
803  	struct ocfs2_live_connection *lc = conn->cc_private;
804  	complete(&lc->oc_sync_wait);
805  }
806  
sync_unlock(struct ocfs2_cluster_connection * conn,struct dlm_lksb * lksb,char * name)807  static int sync_unlock(struct ocfs2_cluster_connection *conn,
808  		struct dlm_lksb *lksb, char *name)
809  {
810  	int error;
811  	struct ocfs2_live_connection *lc = conn->cc_private;
812  
813  	error = dlm_unlock(conn->cc_lockspace, lksb->sb_lkid, 0, lksb, conn);
814  	if (error) {
815  		printk(KERN_ERR "%s lkid %x error %d\n",
816  				name, lksb->sb_lkid, error);
817  		return error;
818  	}
819  
820  	wait_for_completion(&lc->oc_sync_wait);
821  
822  	if (lksb->sb_status != -DLM_EUNLOCK) {
823  		printk(KERN_ERR "%s lkid %x status %d\n",
824  				name, lksb->sb_lkid, lksb->sb_status);
825  		return -1;
826  	}
827  	return 0;
828  }
829  
sync_lock(struct ocfs2_cluster_connection * conn,int mode,uint32_t flags,struct dlm_lksb * lksb,char * name)830  static int sync_lock(struct ocfs2_cluster_connection *conn,
831  		int mode, uint32_t flags,
832  		struct dlm_lksb *lksb, char *name)
833  {
834  	int error, status;
835  	struct ocfs2_live_connection *lc = conn->cc_private;
836  
837  	error = dlm_lock(conn->cc_lockspace, mode, lksb, flags,
838  			name, strlen(name),
839  			0, sync_wait_cb, conn, NULL);
840  	if (error) {
841  		printk(KERN_ERR "%s lkid %x flags %x mode %d error %d\n",
842  				name, lksb->sb_lkid, flags, mode, error);
843  		return error;
844  	}
845  
846  	wait_for_completion(&lc->oc_sync_wait);
847  
848  	status = lksb->sb_status;
849  
850  	if (status && status != -EAGAIN) {
851  		printk(KERN_ERR "%s lkid %x flags %x mode %d status %d\n",
852  				name, lksb->sb_lkid, flags, mode, status);
853  	}
854  
855  	return status;
856  }
857  
858  
version_lock(struct ocfs2_cluster_connection * conn,int mode,int flags)859  static int version_lock(struct ocfs2_cluster_connection *conn, int mode,
860  		int flags)
861  {
862  	struct ocfs2_live_connection *lc = conn->cc_private;
863  	return sync_lock(conn, mode, flags,
864  			&lc->oc_version_lksb, VERSION_LOCK);
865  }
866  
version_unlock(struct ocfs2_cluster_connection * conn)867  static int version_unlock(struct ocfs2_cluster_connection *conn)
868  {
869  	struct ocfs2_live_connection *lc = conn->cc_private;
870  	return sync_unlock(conn, &lc->oc_version_lksb, VERSION_LOCK);
871  }
872  
873  /* get_protocol_version()
874   *
875   * To exchange ocfs2 versioning, we use the LVB of the version dlm lock.
876   * The algorithm is:
877   * 1. Attempt to take the lock in EX mode (non-blocking).
878   * 2. If successful (which means it is the first mount), write the
879   *    version number and downconvert to PR lock.
880   * 3. If unsuccessful (returns -EAGAIN), read the version from the LVB after
881   *    taking the PR lock.
882   */
883  
get_protocol_version(struct ocfs2_cluster_connection * conn)884  static int get_protocol_version(struct ocfs2_cluster_connection *conn)
885  {
886  	int ret;
887  	struct ocfs2_live_connection *lc = conn->cc_private;
888  	struct ocfs2_protocol_version pv;
889  
890  	running_proto.pv_major =
891  		ocfs2_user_plugin.sp_max_proto.pv_major;
892  	running_proto.pv_minor =
893  		ocfs2_user_plugin.sp_max_proto.pv_minor;
894  
895  	lc->oc_version_lksb.sb_lvbptr = lc->oc_lvb;
896  	ret = version_lock(conn, DLM_LOCK_EX,
897  			DLM_LKF_VALBLK|DLM_LKF_NOQUEUE);
898  	if (!ret) {
899  		conn->cc_version.pv_major = running_proto.pv_major;
900  		conn->cc_version.pv_minor = running_proto.pv_minor;
901  		version_to_lvb(&running_proto, lc->oc_lvb);
902  		version_lock(conn, DLM_LOCK_PR, DLM_LKF_CONVERT|DLM_LKF_VALBLK);
903  	} else if (ret == -EAGAIN) {
904  		ret = version_lock(conn, DLM_LOCK_PR, DLM_LKF_VALBLK);
905  		if (ret)
906  			goto out;
907  		lvb_to_version(lc->oc_lvb, &pv);
908  
909  		if ((pv.pv_major != running_proto.pv_major) ||
910  				(pv.pv_minor > running_proto.pv_minor)) {
911  			ret = -EINVAL;
912  			goto out;
913  		}
914  
915  		conn->cc_version.pv_major = pv.pv_major;
916  		conn->cc_version.pv_minor = pv.pv_minor;
917  	}
918  out:
919  	return ret;
920  }
921  
user_recover_prep(void * arg)922  static void user_recover_prep(void *arg)
923  {
924  }
925  
user_recover_slot(void * arg,struct dlm_slot * slot)926  static void user_recover_slot(void *arg, struct dlm_slot *slot)
927  {
928  	struct ocfs2_cluster_connection *conn = arg;
929  	printk(KERN_INFO "ocfs2: Node %d/%d down. Initiating recovery.\n",
930  			slot->nodeid, slot->slot);
931  	conn->cc_recovery_handler(slot->nodeid, conn->cc_recovery_data);
932  
933  }
934  
user_recover_done(void * arg,struct dlm_slot * slots,int num_slots,int our_slot,uint32_t generation)935  static void user_recover_done(void *arg, struct dlm_slot *slots,
936  		int num_slots, int our_slot,
937  		uint32_t generation)
938  {
939  	struct ocfs2_cluster_connection *conn = arg;
940  	struct ocfs2_live_connection *lc = conn->cc_private;
941  	int i;
942  
943  	for (i = 0; i < num_slots; i++)
944  		if (slots[i].slot == our_slot) {
945  			atomic_set(&lc->oc_this_node, slots[i].nodeid);
946  			break;
947  		}
948  
949  	lc->oc_our_slot = our_slot;
950  	wake_up(&lc->oc_wait);
951  }
952  
953  static const struct dlm_lockspace_ops ocfs2_ls_ops = {
954  	.recover_prep = user_recover_prep,
955  	.recover_slot = user_recover_slot,
956  	.recover_done = user_recover_done,
957  };
958  
user_cluster_disconnect(struct ocfs2_cluster_connection * conn)959  static int user_cluster_disconnect(struct ocfs2_cluster_connection *conn)
960  {
961  	version_unlock(conn);
962  	dlm_release_lockspace(conn->cc_lockspace, 2);
963  	conn->cc_lockspace = NULL;
964  	ocfs2_live_connection_drop(conn->cc_private);
965  	conn->cc_private = NULL;
966  	return 0;
967  }
968  
user_cluster_connect(struct ocfs2_cluster_connection * conn)969  static int user_cluster_connect(struct ocfs2_cluster_connection *conn)
970  {
971  	dlm_lockspace_t *fsdlm;
972  	struct ocfs2_live_connection *lc;
973  	int rc, ops_rv;
974  
975  	BUG_ON(conn == NULL);
976  
977  	lc = kzalloc(sizeof(struct ocfs2_live_connection), GFP_KERNEL);
978  	if (!lc)
979  		return -ENOMEM;
980  
981  	init_waitqueue_head(&lc->oc_wait);
982  	init_completion(&lc->oc_sync_wait);
983  	atomic_set(&lc->oc_this_node, 0);
984  	conn->cc_private = lc;
985  	lc->oc_type = NO_CONTROLD;
986  
987  	rc = dlm_new_lockspace(conn->cc_name, conn->cc_cluster_name,
988  			       DLM_LSFL_NEWEXCL, DLM_LVB_LEN,
989  			       &ocfs2_ls_ops, conn, &ops_rv, &fsdlm);
990  	if (rc) {
991  		if (rc == -EEXIST || rc == -EPROTO)
992  			printk(KERN_ERR "ocfs2: Unable to create the "
993  				"lockspace %s (%d), because a ocfs2-tools "
994  				"program is running on this file system "
995  				"with the same name lockspace\n",
996  				conn->cc_name, rc);
997  		goto out;
998  	}
999  
1000  	if (ops_rv == -EOPNOTSUPP) {
1001  		lc->oc_type = WITH_CONTROLD;
1002  		printk(KERN_NOTICE "ocfs2: You seem to be using an older "
1003  				"version of dlm_controld and/or ocfs2-tools."
1004  				" Please consider upgrading.\n");
1005  	} else if (ops_rv) {
1006  		rc = ops_rv;
1007  		goto out;
1008  	}
1009  	conn->cc_lockspace = fsdlm;
1010  
1011  	rc = ocfs2_live_connection_attach(conn, lc);
1012  	if (rc)
1013  		goto out;
1014  
1015  	if (lc->oc_type == NO_CONTROLD) {
1016  		rc = get_protocol_version(conn);
1017  		if (rc) {
1018  			printk(KERN_ERR "ocfs2: Could not determine"
1019  					" locking version\n");
1020  			user_cluster_disconnect(conn);
1021  			goto out;
1022  		}
1023  		wait_event(lc->oc_wait, (atomic_read(&lc->oc_this_node) > 0));
1024  	}
1025  
1026  	/*
1027  	 * running_proto must have been set before we allowed any mounts
1028  	 * to proceed.
1029  	 */
1030  	if (fs_protocol_compare(&running_proto, &conn->cc_version)) {
1031  		printk(KERN_ERR
1032  		       "Unable to mount with fs locking protocol version "
1033  		       "%u.%u because negotiated protocol is %u.%u\n",
1034  		       conn->cc_version.pv_major, conn->cc_version.pv_minor,
1035  		       running_proto.pv_major, running_proto.pv_minor);
1036  		rc = -EPROTO;
1037  		ocfs2_live_connection_drop(lc);
1038  		lc = NULL;
1039  	}
1040  
1041  out:
1042  	if (rc)
1043  		kfree(lc);
1044  	return rc;
1045  }
1046  
1047  
user_cluster_this_node(struct ocfs2_cluster_connection * conn,unsigned int * this_node)1048  static int user_cluster_this_node(struct ocfs2_cluster_connection *conn,
1049  				  unsigned int *this_node)
1050  {
1051  	int rc;
1052  	struct ocfs2_live_connection *lc = conn->cc_private;
1053  
1054  	if (lc->oc_type == WITH_CONTROLD)
1055  		rc = ocfs2_control_get_this_node();
1056  	else if (lc->oc_type == NO_CONTROLD)
1057  		rc = atomic_read(&lc->oc_this_node);
1058  	else
1059  		rc = -EINVAL;
1060  
1061  	if (rc < 0)
1062  		return rc;
1063  
1064  	*this_node = rc;
1065  	return 0;
1066  }
1067  
1068  static struct ocfs2_stack_operations ocfs2_user_plugin_ops = {
1069  	.connect	= user_cluster_connect,
1070  	.disconnect	= user_cluster_disconnect,
1071  	.this_node	= user_cluster_this_node,
1072  	.dlm_lock	= user_dlm_lock,
1073  	.dlm_unlock	= user_dlm_unlock,
1074  	.lock_status	= user_dlm_lock_status,
1075  	.lvb_valid	= user_dlm_lvb_valid,
1076  	.lock_lvb	= user_dlm_lvb,
1077  	.plock		= user_plock,
1078  	.dump_lksb	= user_dlm_dump_lksb,
1079  };
1080  
1081  static struct ocfs2_stack_plugin ocfs2_user_plugin = {
1082  	.sp_name	= "user",
1083  	.sp_ops		= &ocfs2_user_plugin_ops,
1084  	.sp_owner	= THIS_MODULE,
1085  };
1086  
1087  
ocfs2_user_plugin_init(void)1088  static int __init ocfs2_user_plugin_init(void)
1089  {
1090  	int rc;
1091  
1092  	rc = ocfs2_control_init();
1093  	if (!rc) {
1094  		rc = ocfs2_stack_glue_register(&ocfs2_user_plugin);
1095  		if (rc)
1096  			ocfs2_control_exit();
1097  	}
1098  
1099  	return rc;
1100  }
1101  
ocfs2_user_plugin_exit(void)1102  static void __exit ocfs2_user_plugin_exit(void)
1103  {
1104  	ocfs2_stack_glue_unregister(&ocfs2_user_plugin);
1105  	ocfs2_control_exit();
1106  }
1107  
1108  MODULE_AUTHOR("Oracle");
1109  MODULE_DESCRIPTION("ocfs2 driver for userspace cluster stacks");
1110  MODULE_LICENSE("GPL");
1111  module_init(ocfs2_user_plugin_init);
1112  module_exit(ocfs2_user_plugin_exit);
1113