1  // SPDX-License-Identifier: GPL-2.0-or-later
2  /*
3     drbd.c
4  
5     This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
6  
7     Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
8     Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
9     Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
10  
11     Thanks to Carter Burden, Bart Grantham and Gennadiy Nerubayev
12     from Logicworks, Inc. for making SDP replication support possible.
13  
14  
15   */
16  
17  #define pr_fmt(fmt)	KBUILD_MODNAME ": " fmt
18  
19  #include <linux/module.h>
20  #include <linux/jiffies.h>
21  #include <linux/drbd.h>
22  #include <linux/uaccess.h>
23  #include <asm/types.h>
24  #include <net/sock.h>
25  #include <linux/ctype.h>
26  #include <linux/mutex.h>
27  #include <linux/fs.h>
28  #include <linux/file.h>
29  #include <linux/proc_fs.h>
30  #include <linux/init.h>
31  #include <linux/mm.h>
32  #include <linux/memcontrol.h>
33  #include <linux/mm_inline.h>
34  #include <linux/slab.h>
35  #include <linux/random.h>
36  #include <linux/reboot.h>
37  #include <linux/notifier.h>
38  #include <linux/kthread.h>
39  #include <linux/workqueue.h>
40  #define __KERNEL_SYSCALLS__
41  #include <linux/unistd.h>
42  #include <linux/vmalloc.h>
43  #include <linux/sched/signal.h>
44  
45  #include <linux/drbd_limits.h>
46  #include "drbd_int.h"
47  #include "drbd_protocol.h"
48  #include "drbd_req.h" /* only for _req_mod in tl_release and tl_clear */
49  #include "drbd_vli.h"
50  #include "drbd_debugfs.h"
51  
52  static DEFINE_MUTEX(drbd_main_mutex);
53  static int drbd_open(struct block_device *bdev, fmode_t mode);
54  static void drbd_release(struct gendisk *gd, fmode_t mode);
55  static void md_sync_timer_fn(struct timer_list *t);
56  static int w_bitmap_io(struct drbd_work *w, int unused);
57  
58  MODULE_AUTHOR("Philipp Reisner <phil@linbit.com>, "
59  	      "Lars Ellenberg <lars@linbit.com>");
60  MODULE_DESCRIPTION("drbd - Distributed Replicated Block Device v" REL_VERSION);
61  MODULE_VERSION(REL_VERSION);
62  MODULE_LICENSE("GPL");
63  MODULE_PARM_DESC(minor_count, "Approximate number of drbd devices ("
64  		 __stringify(DRBD_MINOR_COUNT_MIN) "-" __stringify(DRBD_MINOR_COUNT_MAX) ")");
65  MODULE_ALIAS_BLOCKDEV_MAJOR(DRBD_MAJOR);
66  
67  #include <linux/moduleparam.h>
68  /* thanks to these macros, if compiled into the kernel (not-module),
69   * these become boot parameters (e.g., drbd.minor_count) */
70  
71  #ifdef CONFIG_DRBD_FAULT_INJECTION
72  int drbd_enable_faults;
73  int drbd_fault_rate;
74  static int drbd_fault_count;
75  static int drbd_fault_devs;
76  /* bitmap of enabled faults */
77  module_param_named(enable_faults, drbd_enable_faults, int, 0664);
78  /* fault rate % value - applies to all enabled faults */
79  module_param_named(fault_rate, drbd_fault_rate, int, 0664);
80  /* count of faults inserted */
81  module_param_named(fault_count, drbd_fault_count, int, 0664);
82  /* bitmap of devices to insert faults on */
83  module_param_named(fault_devs, drbd_fault_devs, int, 0644);
84  #endif
85  
86  /* module parameters we can keep static */
87  static bool drbd_allow_oos; /* allow_open_on_secondary */
88  static bool drbd_disable_sendpage;
89  MODULE_PARM_DESC(allow_oos, "DONT USE!");
90  module_param_named(allow_oos, drbd_allow_oos, bool, 0);
91  module_param_named(disable_sendpage, drbd_disable_sendpage, bool, 0644);
92  
93  /* module parameters we share */
94  int drbd_proc_details; /* Detail level in proc drbd*/
95  module_param_named(proc_details, drbd_proc_details, int, 0644);
96  /* module parameters shared with defaults */
97  unsigned int drbd_minor_count = DRBD_MINOR_COUNT_DEF;
98  /* Module parameter for setting the user mode helper program
99   * to run. Default is /sbin/drbdadm */
100  char drbd_usermode_helper[80] = "/sbin/drbdadm";
101  module_param_named(minor_count, drbd_minor_count, uint, 0444);
102  module_param_string(usermode_helper, drbd_usermode_helper, sizeof(drbd_usermode_helper), 0644);
103  
104  /* in 2.6.x, our device mapping and config info contains our virtual gendisks
105   * as member "struct gendisk *vdisk;"
106   */
107  struct idr drbd_devices;
108  struct list_head drbd_resources;
109  struct mutex resources_mutex;
110  
111  struct kmem_cache *drbd_request_cache;
112  struct kmem_cache *drbd_ee_cache;	/* peer requests */
113  struct kmem_cache *drbd_bm_ext_cache;	/* bitmap extents */
114  struct kmem_cache *drbd_al_ext_cache;	/* activity log extents */
115  mempool_t drbd_request_mempool;
116  mempool_t drbd_ee_mempool;
117  mempool_t drbd_md_io_page_pool;
118  struct bio_set drbd_md_io_bio_set;
119  struct bio_set drbd_io_bio_set;
120  
121  /* I do not use a standard mempool, because:
122     1) I want to hand out the pre-allocated objects first.
123     2) I want to be able to interrupt sleeping allocation with a signal.
124     Note: This is a single linked list, the next pointer is the private
125  	 member of struct page.
126   */
127  struct page *drbd_pp_pool;
128  DEFINE_SPINLOCK(drbd_pp_lock);
129  int          drbd_pp_vacant;
130  wait_queue_head_t drbd_pp_wait;
131  
132  DEFINE_RATELIMIT_STATE(drbd_ratelimit_state, 5 * HZ, 5);
133  
134  static const struct block_device_operations drbd_ops = {
135  	.owner		= THIS_MODULE,
136  	.submit_bio	= drbd_submit_bio,
137  	.open		= drbd_open,
138  	.release	= drbd_release,
139  };
140  
141  #ifdef __CHECKER__
142  /* When checking with sparse, and this is an inline function, sparse will
143     give tons of false positives. When this is a real functions sparse works.
144   */
_get_ldev_if_state(struct drbd_device * device,enum drbd_disk_state mins)145  int _get_ldev_if_state(struct drbd_device *device, enum drbd_disk_state mins)
146  {
147  	int io_allowed;
148  
149  	atomic_inc(&device->local_cnt);
150  	io_allowed = (device->state.disk >= mins);
151  	if (!io_allowed) {
152  		if (atomic_dec_and_test(&device->local_cnt))
153  			wake_up(&device->misc_wait);
154  	}
155  	return io_allowed;
156  }
157  
158  #endif
159  
160  /**
161   * tl_release() - mark as BARRIER_ACKED all requests in the corresponding transfer log epoch
162   * @connection:	DRBD connection.
163   * @barrier_nr:	Expected identifier of the DRBD write barrier packet.
164   * @set_size:	Expected number of requests before that barrier.
165   *
166   * In case the passed barrier_nr or set_size does not match the oldest
167   * epoch of not yet barrier-acked requests, this function will cause a
168   * termination of the connection.
169   */
tl_release(struct drbd_connection * connection,unsigned int barrier_nr,unsigned int set_size)170  void tl_release(struct drbd_connection *connection, unsigned int barrier_nr,
171  		unsigned int set_size)
172  {
173  	struct drbd_request *r;
174  	struct drbd_request *req = NULL, *tmp = NULL;
175  	int expect_epoch = 0;
176  	int expect_size = 0;
177  
178  	spin_lock_irq(&connection->resource->req_lock);
179  
180  	/* find oldest not yet barrier-acked write request,
181  	 * count writes in its epoch. */
182  	list_for_each_entry(r, &connection->transfer_log, tl_requests) {
183  		const unsigned s = r->rq_state;
184  		if (!req) {
185  			if (!(s & RQ_WRITE))
186  				continue;
187  			if (!(s & RQ_NET_MASK))
188  				continue;
189  			if (s & RQ_NET_DONE)
190  				continue;
191  			req = r;
192  			expect_epoch = req->epoch;
193  			expect_size ++;
194  		} else {
195  			if (r->epoch != expect_epoch)
196  				break;
197  			if (!(s & RQ_WRITE))
198  				continue;
199  			/* if (s & RQ_DONE): not expected */
200  			/* if (!(s & RQ_NET_MASK)): not expected */
201  			expect_size++;
202  		}
203  	}
204  
205  	/* first some paranoia code */
206  	if (req == NULL) {
207  		drbd_err(connection, "BAD! BarrierAck #%u received, but no epoch in tl!?\n",
208  			 barrier_nr);
209  		goto bail;
210  	}
211  	if (expect_epoch != barrier_nr) {
212  		drbd_err(connection, "BAD! BarrierAck #%u received, expected #%u!\n",
213  			 barrier_nr, expect_epoch);
214  		goto bail;
215  	}
216  
217  	if (expect_size != set_size) {
218  		drbd_err(connection, "BAD! BarrierAck #%u received with n_writes=%u, expected n_writes=%u!\n",
219  			 barrier_nr, set_size, expect_size);
220  		goto bail;
221  	}
222  
223  	/* Clean up list of requests processed during current epoch. */
224  	/* this extra list walk restart is paranoia,
225  	 * to catch requests being barrier-acked "unexpectedly".
226  	 * It usually should find the same req again, or some READ preceding it. */
227  	list_for_each_entry(req, &connection->transfer_log, tl_requests)
228  		if (req->epoch == expect_epoch) {
229  			tmp = req;
230  			break;
231  		}
232  	req = list_prepare_entry(tmp, &connection->transfer_log, tl_requests);
233  	list_for_each_entry_safe_from(req, r, &connection->transfer_log, tl_requests) {
234  		if (req->epoch != expect_epoch)
235  			break;
236  		_req_mod(req, BARRIER_ACKED);
237  	}
238  	spin_unlock_irq(&connection->resource->req_lock);
239  
240  	return;
241  
242  bail:
243  	spin_unlock_irq(&connection->resource->req_lock);
244  	conn_request_state(connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
245  }
246  
247  
248  /**
249   * _tl_restart() - Walks the transfer log, and applies an action to all requests
250   * @connection:	DRBD connection to operate on.
251   * @what:       The action/event to perform with all request objects
252   *
253   * @what might be one of CONNECTION_LOST_WHILE_PENDING, RESEND, FAIL_FROZEN_DISK_IO,
254   * RESTART_FROZEN_DISK_IO.
255   */
256  /* must hold resource->req_lock */
_tl_restart(struct drbd_connection * connection,enum drbd_req_event what)257  void _tl_restart(struct drbd_connection *connection, enum drbd_req_event what)
258  {
259  	struct drbd_request *req, *r;
260  
261  	list_for_each_entry_safe(req, r, &connection->transfer_log, tl_requests)
262  		_req_mod(req, what);
263  }
264  
tl_restart(struct drbd_connection * connection,enum drbd_req_event what)265  void tl_restart(struct drbd_connection *connection, enum drbd_req_event what)
266  {
267  	spin_lock_irq(&connection->resource->req_lock);
268  	_tl_restart(connection, what);
269  	spin_unlock_irq(&connection->resource->req_lock);
270  }
271  
272  /**
273   * tl_clear() - Clears all requests and &struct drbd_tl_epoch objects out of the TL
274   * @connection:	DRBD connection.
275   *
276   * This is called after the connection to the peer was lost. The storage covered
277   * by the requests on the transfer gets marked as our of sync. Called from the
278   * receiver thread and the worker thread.
279   */
tl_clear(struct drbd_connection * connection)280  void tl_clear(struct drbd_connection *connection)
281  {
282  	tl_restart(connection, CONNECTION_LOST_WHILE_PENDING);
283  }
284  
285  /**
286   * tl_abort_disk_io() - Abort disk I/O for all requests for a certain device in the TL
287   * @device:	DRBD device.
288   */
tl_abort_disk_io(struct drbd_device * device)289  void tl_abort_disk_io(struct drbd_device *device)
290  {
291  	struct drbd_connection *connection = first_peer_device(device)->connection;
292  	struct drbd_request *req, *r;
293  
294  	spin_lock_irq(&connection->resource->req_lock);
295  	list_for_each_entry_safe(req, r, &connection->transfer_log, tl_requests) {
296  		if (!(req->rq_state & RQ_LOCAL_PENDING))
297  			continue;
298  		if (req->device != device)
299  			continue;
300  		_req_mod(req, ABORT_DISK_IO);
301  	}
302  	spin_unlock_irq(&connection->resource->req_lock);
303  }
304  
drbd_thread_setup(void * arg)305  static int drbd_thread_setup(void *arg)
306  {
307  	struct drbd_thread *thi = (struct drbd_thread *) arg;
308  	struct drbd_resource *resource = thi->resource;
309  	unsigned long flags;
310  	int retval;
311  
312  	snprintf(current->comm, sizeof(current->comm), "drbd_%c_%s",
313  		 thi->name[0],
314  		 resource->name);
315  
316  	allow_kernel_signal(DRBD_SIGKILL);
317  	allow_kernel_signal(SIGXCPU);
318  restart:
319  	retval = thi->function(thi);
320  
321  	spin_lock_irqsave(&thi->t_lock, flags);
322  
323  	/* if the receiver has been "EXITING", the last thing it did
324  	 * was set the conn state to "StandAlone",
325  	 * if now a re-connect request comes in, conn state goes C_UNCONNECTED,
326  	 * and receiver thread will be "started".
327  	 * drbd_thread_start needs to set "RESTARTING" in that case.
328  	 * t_state check and assignment needs to be within the same spinlock,
329  	 * so either thread_start sees EXITING, and can remap to RESTARTING,
330  	 * or thread_start see NONE, and can proceed as normal.
331  	 */
332  
333  	if (thi->t_state == RESTARTING) {
334  		drbd_info(resource, "Restarting %s thread\n", thi->name);
335  		thi->t_state = RUNNING;
336  		spin_unlock_irqrestore(&thi->t_lock, flags);
337  		goto restart;
338  	}
339  
340  	thi->task = NULL;
341  	thi->t_state = NONE;
342  	smp_mb();
343  	complete_all(&thi->stop);
344  	spin_unlock_irqrestore(&thi->t_lock, flags);
345  
346  	drbd_info(resource, "Terminating %s\n", current->comm);
347  
348  	/* Release mod reference taken when thread was started */
349  
350  	if (thi->connection)
351  		kref_put(&thi->connection->kref, drbd_destroy_connection);
352  	kref_put(&resource->kref, drbd_destroy_resource);
353  	module_put(THIS_MODULE);
354  	return retval;
355  }
356  
drbd_thread_init(struct drbd_resource * resource,struct drbd_thread * thi,int (* func)(struct drbd_thread *),const char * name)357  static void drbd_thread_init(struct drbd_resource *resource, struct drbd_thread *thi,
358  			     int (*func) (struct drbd_thread *), const char *name)
359  {
360  	spin_lock_init(&thi->t_lock);
361  	thi->task    = NULL;
362  	thi->t_state = NONE;
363  	thi->function = func;
364  	thi->resource = resource;
365  	thi->connection = NULL;
366  	thi->name = name;
367  }
368  
drbd_thread_start(struct drbd_thread * thi)369  int drbd_thread_start(struct drbd_thread *thi)
370  {
371  	struct drbd_resource *resource = thi->resource;
372  	struct task_struct *nt;
373  	unsigned long flags;
374  
375  	/* is used from state engine doing drbd_thread_stop_nowait,
376  	 * while holding the req lock irqsave */
377  	spin_lock_irqsave(&thi->t_lock, flags);
378  
379  	switch (thi->t_state) {
380  	case NONE:
381  		drbd_info(resource, "Starting %s thread (from %s [%d])\n",
382  			 thi->name, current->comm, current->pid);
383  
384  		/* Get ref on module for thread - this is released when thread exits */
385  		if (!try_module_get(THIS_MODULE)) {
386  			drbd_err(resource, "Failed to get module reference in drbd_thread_start\n");
387  			spin_unlock_irqrestore(&thi->t_lock, flags);
388  			return false;
389  		}
390  
391  		kref_get(&resource->kref);
392  		if (thi->connection)
393  			kref_get(&thi->connection->kref);
394  
395  		init_completion(&thi->stop);
396  		thi->reset_cpu_mask = 1;
397  		thi->t_state = RUNNING;
398  		spin_unlock_irqrestore(&thi->t_lock, flags);
399  		flush_signals(current); /* otherw. may get -ERESTARTNOINTR */
400  
401  		nt = kthread_create(drbd_thread_setup, (void *) thi,
402  				    "drbd_%c_%s", thi->name[0], thi->resource->name);
403  
404  		if (IS_ERR(nt)) {
405  			drbd_err(resource, "Couldn't start thread\n");
406  
407  			if (thi->connection)
408  				kref_put(&thi->connection->kref, drbd_destroy_connection);
409  			kref_put(&resource->kref, drbd_destroy_resource);
410  			module_put(THIS_MODULE);
411  			return false;
412  		}
413  		spin_lock_irqsave(&thi->t_lock, flags);
414  		thi->task = nt;
415  		thi->t_state = RUNNING;
416  		spin_unlock_irqrestore(&thi->t_lock, flags);
417  		wake_up_process(nt);
418  		break;
419  	case EXITING:
420  		thi->t_state = RESTARTING;
421  		drbd_info(resource, "Restarting %s thread (from %s [%d])\n",
422  				thi->name, current->comm, current->pid);
423  		fallthrough;
424  	case RUNNING:
425  	case RESTARTING:
426  	default:
427  		spin_unlock_irqrestore(&thi->t_lock, flags);
428  		break;
429  	}
430  
431  	return true;
432  }
433  
434  
_drbd_thread_stop(struct drbd_thread * thi,int restart,int wait)435  void _drbd_thread_stop(struct drbd_thread *thi, int restart, int wait)
436  {
437  	unsigned long flags;
438  
439  	enum drbd_thread_state ns = restart ? RESTARTING : EXITING;
440  
441  	/* may be called from state engine, holding the req lock irqsave */
442  	spin_lock_irqsave(&thi->t_lock, flags);
443  
444  	if (thi->t_state == NONE) {
445  		spin_unlock_irqrestore(&thi->t_lock, flags);
446  		if (restart)
447  			drbd_thread_start(thi);
448  		return;
449  	}
450  
451  	if (thi->t_state != ns) {
452  		if (thi->task == NULL) {
453  			spin_unlock_irqrestore(&thi->t_lock, flags);
454  			return;
455  		}
456  
457  		thi->t_state = ns;
458  		smp_mb();
459  		init_completion(&thi->stop);
460  		if (thi->task != current)
461  			send_sig(DRBD_SIGKILL, thi->task, 1);
462  	}
463  
464  	spin_unlock_irqrestore(&thi->t_lock, flags);
465  
466  	if (wait)
467  		wait_for_completion(&thi->stop);
468  }
469  
conn_lowest_minor(struct drbd_connection * connection)470  int conn_lowest_minor(struct drbd_connection *connection)
471  {
472  	struct drbd_peer_device *peer_device;
473  	int vnr = 0, minor = -1;
474  
475  	rcu_read_lock();
476  	peer_device = idr_get_next(&connection->peer_devices, &vnr);
477  	if (peer_device)
478  		minor = device_to_minor(peer_device->device);
479  	rcu_read_unlock();
480  
481  	return minor;
482  }
483  
484  #ifdef CONFIG_SMP
485  /*
486   * drbd_calc_cpu_mask() - Generate CPU masks, spread over all CPUs
487   *
488   * Forces all threads of a resource onto the same CPU. This is beneficial for
489   * DRBD's performance. May be overwritten by user's configuration.
490   */
drbd_calc_cpu_mask(cpumask_var_t * cpu_mask)491  static void drbd_calc_cpu_mask(cpumask_var_t *cpu_mask)
492  {
493  	unsigned int *resources_per_cpu, min_index = ~0;
494  
495  	resources_per_cpu = kcalloc(nr_cpu_ids, sizeof(*resources_per_cpu),
496  				    GFP_KERNEL);
497  	if (resources_per_cpu) {
498  		struct drbd_resource *resource;
499  		unsigned int cpu, min = ~0;
500  
501  		rcu_read_lock();
502  		for_each_resource_rcu(resource, &drbd_resources) {
503  			for_each_cpu(cpu, resource->cpu_mask)
504  				resources_per_cpu[cpu]++;
505  		}
506  		rcu_read_unlock();
507  		for_each_online_cpu(cpu) {
508  			if (resources_per_cpu[cpu] < min) {
509  				min = resources_per_cpu[cpu];
510  				min_index = cpu;
511  			}
512  		}
513  		kfree(resources_per_cpu);
514  	}
515  	if (min_index == ~0) {
516  		cpumask_setall(*cpu_mask);
517  		return;
518  	}
519  	cpumask_set_cpu(min_index, *cpu_mask);
520  }
521  
522  /**
523   * drbd_thread_current_set_cpu() - modifies the cpu mask of the _current_ thread
524   * @thi:	drbd_thread object
525   *
526   * call in the "main loop" of _all_ threads, no need for any mutex, current won't die
527   * prematurely.
528   */
drbd_thread_current_set_cpu(struct drbd_thread * thi)529  void drbd_thread_current_set_cpu(struct drbd_thread *thi)
530  {
531  	struct drbd_resource *resource = thi->resource;
532  	struct task_struct *p = current;
533  
534  	if (!thi->reset_cpu_mask)
535  		return;
536  	thi->reset_cpu_mask = 0;
537  	set_cpus_allowed_ptr(p, resource->cpu_mask);
538  }
539  #else
540  #define drbd_calc_cpu_mask(A) ({})
541  #endif
542  
543  /*
544   * drbd_header_size  -  size of a packet header
545   *
546   * The header size is a multiple of 8, so any payload following the header is
547   * word aligned on 64-bit architectures.  (The bitmap send and receive code
548   * relies on this.)
549   */
drbd_header_size(struct drbd_connection * connection)550  unsigned int drbd_header_size(struct drbd_connection *connection)
551  {
552  	if (connection->agreed_pro_version >= 100) {
553  		BUILD_BUG_ON(!IS_ALIGNED(sizeof(struct p_header100), 8));
554  		return sizeof(struct p_header100);
555  	} else {
556  		BUILD_BUG_ON(sizeof(struct p_header80) !=
557  			     sizeof(struct p_header95));
558  		BUILD_BUG_ON(!IS_ALIGNED(sizeof(struct p_header80), 8));
559  		return sizeof(struct p_header80);
560  	}
561  }
562  
prepare_header80(struct p_header80 * h,enum drbd_packet cmd,int size)563  static unsigned int prepare_header80(struct p_header80 *h, enum drbd_packet cmd, int size)
564  {
565  	h->magic   = cpu_to_be32(DRBD_MAGIC);
566  	h->command = cpu_to_be16(cmd);
567  	h->length  = cpu_to_be16(size);
568  	return sizeof(struct p_header80);
569  }
570  
prepare_header95(struct p_header95 * h,enum drbd_packet cmd,int size)571  static unsigned int prepare_header95(struct p_header95 *h, enum drbd_packet cmd, int size)
572  {
573  	h->magic   = cpu_to_be16(DRBD_MAGIC_BIG);
574  	h->command = cpu_to_be16(cmd);
575  	h->length = cpu_to_be32(size);
576  	return sizeof(struct p_header95);
577  }
578  
prepare_header100(struct p_header100 * h,enum drbd_packet cmd,int size,int vnr)579  static unsigned int prepare_header100(struct p_header100 *h, enum drbd_packet cmd,
580  				      int size, int vnr)
581  {
582  	h->magic = cpu_to_be32(DRBD_MAGIC_100);
583  	h->volume = cpu_to_be16(vnr);
584  	h->command = cpu_to_be16(cmd);
585  	h->length = cpu_to_be32(size);
586  	h->pad = 0;
587  	return sizeof(struct p_header100);
588  }
589  
prepare_header(struct drbd_connection * connection,int vnr,void * buffer,enum drbd_packet cmd,int size)590  static unsigned int prepare_header(struct drbd_connection *connection, int vnr,
591  				   void *buffer, enum drbd_packet cmd, int size)
592  {
593  	if (connection->agreed_pro_version >= 100)
594  		return prepare_header100(buffer, cmd, size, vnr);
595  	else if (connection->agreed_pro_version >= 95 &&
596  		 size > DRBD_MAX_SIZE_H80_PACKET)
597  		return prepare_header95(buffer, cmd, size);
598  	else
599  		return prepare_header80(buffer, cmd, size);
600  }
601  
__conn_prepare_command(struct drbd_connection * connection,struct drbd_socket * sock)602  static void *__conn_prepare_command(struct drbd_connection *connection,
603  				    struct drbd_socket *sock)
604  {
605  	if (!sock->socket)
606  		return NULL;
607  	return sock->sbuf + drbd_header_size(connection);
608  }
609  
conn_prepare_command(struct drbd_connection * connection,struct drbd_socket * sock)610  void *conn_prepare_command(struct drbd_connection *connection, struct drbd_socket *sock)
611  {
612  	void *p;
613  
614  	mutex_lock(&sock->mutex);
615  	p = __conn_prepare_command(connection, sock);
616  	if (!p)
617  		mutex_unlock(&sock->mutex);
618  
619  	return p;
620  }
621  
drbd_prepare_command(struct drbd_peer_device * peer_device,struct drbd_socket * sock)622  void *drbd_prepare_command(struct drbd_peer_device *peer_device, struct drbd_socket *sock)
623  {
624  	return conn_prepare_command(peer_device->connection, sock);
625  }
626  
__send_command(struct drbd_connection * connection,int vnr,struct drbd_socket * sock,enum drbd_packet cmd,unsigned int header_size,void * data,unsigned int size)627  static int __send_command(struct drbd_connection *connection, int vnr,
628  			  struct drbd_socket *sock, enum drbd_packet cmd,
629  			  unsigned int header_size, void *data,
630  			  unsigned int size)
631  {
632  	int msg_flags;
633  	int err;
634  
635  	/*
636  	 * Called with @data == NULL and the size of the data blocks in @size
637  	 * for commands that send data blocks.  For those commands, omit the
638  	 * MSG_MORE flag: this will increase the likelihood that data blocks
639  	 * which are page aligned on the sender will end up page aligned on the
640  	 * receiver.
641  	 */
642  	msg_flags = data ? MSG_MORE : 0;
643  
644  	header_size += prepare_header(connection, vnr, sock->sbuf, cmd,
645  				      header_size + size);
646  	err = drbd_send_all(connection, sock->socket, sock->sbuf, header_size,
647  			    msg_flags);
648  	if (data && !err)
649  		err = drbd_send_all(connection, sock->socket, data, size, 0);
650  	/* DRBD protocol "pings" are latency critical.
651  	 * This is supposed to trigger tcp_push_pending_frames() */
652  	if (!err && (cmd == P_PING || cmd == P_PING_ACK))
653  		tcp_sock_set_nodelay(sock->socket->sk);
654  
655  	return err;
656  }
657  
__conn_send_command(struct drbd_connection * connection,struct drbd_socket * sock,enum drbd_packet cmd,unsigned int header_size,void * data,unsigned int size)658  static int __conn_send_command(struct drbd_connection *connection, struct drbd_socket *sock,
659  			       enum drbd_packet cmd, unsigned int header_size,
660  			       void *data, unsigned int size)
661  {
662  	return __send_command(connection, 0, sock, cmd, header_size, data, size);
663  }
664  
conn_send_command(struct drbd_connection * connection,struct drbd_socket * sock,enum drbd_packet cmd,unsigned int header_size,void * data,unsigned int size)665  int conn_send_command(struct drbd_connection *connection, struct drbd_socket *sock,
666  		      enum drbd_packet cmd, unsigned int header_size,
667  		      void *data, unsigned int size)
668  {
669  	int err;
670  
671  	err = __conn_send_command(connection, sock, cmd, header_size, data, size);
672  	mutex_unlock(&sock->mutex);
673  	return err;
674  }
675  
drbd_send_command(struct drbd_peer_device * peer_device,struct drbd_socket * sock,enum drbd_packet cmd,unsigned int header_size,void * data,unsigned int size)676  int drbd_send_command(struct drbd_peer_device *peer_device, struct drbd_socket *sock,
677  		      enum drbd_packet cmd, unsigned int header_size,
678  		      void *data, unsigned int size)
679  {
680  	int err;
681  
682  	err = __send_command(peer_device->connection, peer_device->device->vnr,
683  			     sock, cmd, header_size, data, size);
684  	mutex_unlock(&sock->mutex);
685  	return err;
686  }
687  
drbd_send_ping(struct drbd_connection * connection)688  int drbd_send_ping(struct drbd_connection *connection)
689  {
690  	struct drbd_socket *sock;
691  
692  	sock = &connection->meta;
693  	if (!conn_prepare_command(connection, sock))
694  		return -EIO;
695  	return conn_send_command(connection, sock, P_PING, 0, NULL, 0);
696  }
697  
drbd_send_ping_ack(struct drbd_connection * connection)698  int drbd_send_ping_ack(struct drbd_connection *connection)
699  {
700  	struct drbd_socket *sock;
701  
702  	sock = &connection->meta;
703  	if (!conn_prepare_command(connection, sock))
704  		return -EIO;
705  	return conn_send_command(connection, sock, P_PING_ACK, 0, NULL, 0);
706  }
707  
drbd_send_sync_param(struct drbd_peer_device * peer_device)708  int drbd_send_sync_param(struct drbd_peer_device *peer_device)
709  {
710  	struct drbd_socket *sock;
711  	struct p_rs_param_95 *p;
712  	int size;
713  	const int apv = peer_device->connection->agreed_pro_version;
714  	enum drbd_packet cmd;
715  	struct net_conf *nc;
716  	struct disk_conf *dc;
717  
718  	sock = &peer_device->connection->data;
719  	p = drbd_prepare_command(peer_device, sock);
720  	if (!p)
721  		return -EIO;
722  
723  	rcu_read_lock();
724  	nc = rcu_dereference(peer_device->connection->net_conf);
725  
726  	size = apv <= 87 ? sizeof(struct p_rs_param)
727  		: apv == 88 ? sizeof(struct p_rs_param)
728  			+ strlen(nc->verify_alg) + 1
729  		: apv <= 94 ? sizeof(struct p_rs_param_89)
730  		: /* apv >= 95 */ sizeof(struct p_rs_param_95);
731  
732  	cmd = apv >= 89 ? P_SYNC_PARAM89 : P_SYNC_PARAM;
733  
734  	/* initialize verify_alg and csums_alg */
735  	BUILD_BUG_ON(sizeof(p->algs) != 2 * SHARED_SECRET_MAX);
736  	memset(&p->algs, 0, sizeof(p->algs));
737  
738  	if (get_ldev(peer_device->device)) {
739  		dc = rcu_dereference(peer_device->device->ldev->disk_conf);
740  		p->resync_rate = cpu_to_be32(dc->resync_rate);
741  		p->c_plan_ahead = cpu_to_be32(dc->c_plan_ahead);
742  		p->c_delay_target = cpu_to_be32(dc->c_delay_target);
743  		p->c_fill_target = cpu_to_be32(dc->c_fill_target);
744  		p->c_max_rate = cpu_to_be32(dc->c_max_rate);
745  		put_ldev(peer_device->device);
746  	} else {
747  		p->resync_rate = cpu_to_be32(DRBD_RESYNC_RATE_DEF);
748  		p->c_plan_ahead = cpu_to_be32(DRBD_C_PLAN_AHEAD_DEF);
749  		p->c_delay_target = cpu_to_be32(DRBD_C_DELAY_TARGET_DEF);
750  		p->c_fill_target = cpu_to_be32(DRBD_C_FILL_TARGET_DEF);
751  		p->c_max_rate = cpu_to_be32(DRBD_C_MAX_RATE_DEF);
752  	}
753  
754  	if (apv >= 88)
755  		strcpy(p->verify_alg, nc->verify_alg);
756  	if (apv >= 89)
757  		strcpy(p->csums_alg, nc->csums_alg);
758  	rcu_read_unlock();
759  
760  	return drbd_send_command(peer_device, sock, cmd, size, NULL, 0);
761  }
762  
__drbd_send_protocol(struct drbd_connection * connection,enum drbd_packet cmd)763  int __drbd_send_protocol(struct drbd_connection *connection, enum drbd_packet cmd)
764  {
765  	struct drbd_socket *sock;
766  	struct p_protocol *p;
767  	struct net_conf *nc;
768  	int size, cf;
769  
770  	sock = &connection->data;
771  	p = __conn_prepare_command(connection, sock);
772  	if (!p)
773  		return -EIO;
774  
775  	rcu_read_lock();
776  	nc = rcu_dereference(connection->net_conf);
777  
778  	if (nc->tentative && connection->agreed_pro_version < 92) {
779  		rcu_read_unlock();
780  		drbd_err(connection, "--dry-run is not supported by peer");
781  		return -EOPNOTSUPP;
782  	}
783  
784  	size = sizeof(*p);
785  	if (connection->agreed_pro_version >= 87)
786  		size += strlen(nc->integrity_alg) + 1;
787  
788  	p->protocol      = cpu_to_be32(nc->wire_protocol);
789  	p->after_sb_0p   = cpu_to_be32(nc->after_sb_0p);
790  	p->after_sb_1p   = cpu_to_be32(nc->after_sb_1p);
791  	p->after_sb_2p   = cpu_to_be32(nc->after_sb_2p);
792  	p->two_primaries = cpu_to_be32(nc->two_primaries);
793  	cf = 0;
794  	if (nc->discard_my_data)
795  		cf |= CF_DISCARD_MY_DATA;
796  	if (nc->tentative)
797  		cf |= CF_DRY_RUN;
798  	p->conn_flags    = cpu_to_be32(cf);
799  
800  	if (connection->agreed_pro_version >= 87)
801  		strcpy(p->integrity_alg, nc->integrity_alg);
802  	rcu_read_unlock();
803  
804  	return __conn_send_command(connection, sock, cmd, size, NULL, 0);
805  }
806  
drbd_send_protocol(struct drbd_connection * connection)807  int drbd_send_protocol(struct drbd_connection *connection)
808  {
809  	int err;
810  
811  	mutex_lock(&connection->data.mutex);
812  	err = __drbd_send_protocol(connection, P_PROTOCOL);
813  	mutex_unlock(&connection->data.mutex);
814  
815  	return err;
816  }
817  
_drbd_send_uuids(struct drbd_peer_device * peer_device,u64 uuid_flags)818  static int _drbd_send_uuids(struct drbd_peer_device *peer_device, u64 uuid_flags)
819  {
820  	struct drbd_device *device = peer_device->device;
821  	struct drbd_socket *sock;
822  	struct p_uuids *p;
823  	int i;
824  
825  	if (!get_ldev_if_state(device, D_NEGOTIATING))
826  		return 0;
827  
828  	sock = &peer_device->connection->data;
829  	p = drbd_prepare_command(peer_device, sock);
830  	if (!p) {
831  		put_ldev(device);
832  		return -EIO;
833  	}
834  	spin_lock_irq(&device->ldev->md.uuid_lock);
835  	for (i = UI_CURRENT; i < UI_SIZE; i++)
836  		p->uuid[i] = cpu_to_be64(device->ldev->md.uuid[i]);
837  	spin_unlock_irq(&device->ldev->md.uuid_lock);
838  
839  	device->comm_bm_set = drbd_bm_total_weight(device);
840  	p->uuid[UI_SIZE] = cpu_to_be64(device->comm_bm_set);
841  	rcu_read_lock();
842  	uuid_flags |= rcu_dereference(peer_device->connection->net_conf)->discard_my_data ? 1 : 0;
843  	rcu_read_unlock();
844  	uuid_flags |= test_bit(CRASHED_PRIMARY, &device->flags) ? 2 : 0;
845  	uuid_flags |= device->new_state_tmp.disk == D_INCONSISTENT ? 4 : 0;
846  	p->uuid[UI_FLAGS] = cpu_to_be64(uuid_flags);
847  
848  	put_ldev(device);
849  	return drbd_send_command(peer_device, sock, P_UUIDS, sizeof(*p), NULL, 0);
850  }
851  
drbd_send_uuids(struct drbd_peer_device * peer_device)852  int drbd_send_uuids(struct drbd_peer_device *peer_device)
853  {
854  	return _drbd_send_uuids(peer_device, 0);
855  }
856  
drbd_send_uuids_skip_initial_sync(struct drbd_peer_device * peer_device)857  int drbd_send_uuids_skip_initial_sync(struct drbd_peer_device *peer_device)
858  {
859  	return _drbd_send_uuids(peer_device, 8);
860  }
861  
drbd_print_uuids(struct drbd_device * device,const char * text)862  void drbd_print_uuids(struct drbd_device *device, const char *text)
863  {
864  	if (get_ldev_if_state(device, D_NEGOTIATING)) {
865  		u64 *uuid = device->ldev->md.uuid;
866  		drbd_info(device, "%s %016llX:%016llX:%016llX:%016llX\n",
867  		     text,
868  		     (unsigned long long)uuid[UI_CURRENT],
869  		     (unsigned long long)uuid[UI_BITMAP],
870  		     (unsigned long long)uuid[UI_HISTORY_START],
871  		     (unsigned long long)uuid[UI_HISTORY_END]);
872  		put_ldev(device);
873  	} else {
874  		drbd_info(device, "%s effective data uuid: %016llX\n",
875  				text,
876  				(unsigned long long)device->ed_uuid);
877  	}
878  }
879  
drbd_gen_and_send_sync_uuid(struct drbd_peer_device * peer_device)880  void drbd_gen_and_send_sync_uuid(struct drbd_peer_device *peer_device)
881  {
882  	struct drbd_device *device = peer_device->device;
883  	struct drbd_socket *sock;
884  	struct p_rs_uuid *p;
885  	u64 uuid;
886  
887  	D_ASSERT(device, device->state.disk == D_UP_TO_DATE);
888  
889  	uuid = device->ldev->md.uuid[UI_BITMAP];
890  	if (uuid && uuid != UUID_JUST_CREATED)
891  		uuid = uuid + UUID_NEW_BM_OFFSET;
892  	else
893  		get_random_bytes(&uuid, sizeof(u64));
894  	drbd_uuid_set(device, UI_BITMAP, uuid);
895  	drbd_print_uuids(device, "updated sync UUID");
896  	drbd_md_sync(device);
897  
898  	sock = &peer_device->connection->data;
899  	p = drbd_prepare_command(peer_device, sock);
900  	if (p) {
901  		p->uuid = cpu_to_be64(uuid);
902  		drbd_send_command(peer_device, sock, P_SYNC_UUID, sizeof(*p), NULL, 0);
903  	}
904  }
905  
drbd_send_sizes(struct drbd_peer_device * peer_device,int trigger_reply,enum dds_flags flags)906  int drbd_send_sizes(struct drbd_peer_device *peer_device, int trigger_reply, enum dds_flags flags)
907  {
908  	struct drbd_device *device = peer_device->device;
909  	struct drbd_socket *sock;
910  	struct p_sizes *p;
911  	sector_t d_size, u_size;
912  	int q_order_type;
913  	unsigned int max_bio_size;
914  	unsigned int packet_size;
915  
916  	sock = &peer_device->connection->data;
917  	p = drbd_prepare_command(peer_device, sock);
918  	if (!p)
919  		return -EIO;
920  
921  	packet_size = sizeof(*p);
922  	if (peer_device->connection->agreed_features & DRBD_FF_WSAME)
923  		packet_size += sizeof(p->qlim[0]);
924  
925  	memset(p, 0, packet_size);
926  	if (get_ldev_if_state(device, D_NEGOTIATING)) {
927  		struct block_device *bdev = device->ldev->backing_bdev;
928  		struct request_queue *q = bdev_get_queue(bdev);
929  
930  		d_size = drbd_get_max_capacity(device->ldev);
931  		rcu_read_lock();
932  		u_size = rcu_dereference(device->ldev->disk_conf)->disk_size;
933  		rcu_read_unlock();
934  		q_order_type = drbd_queue_order_type(device);
935  		max_bio_size = queue_max_hw_sectors(q) << 9;
936  		max_bio_size = min(max_bio_size, DRBD_MAX_BIO_SIZE);
937  		p->qlim->physical_block_size =
938  			cpu_to_be32(bdev_physical_block_size(bdev));
939  		p->qlim->logical_block_size =
940  			cpu_to_be32(bdev_logical_block_size(bdev));
941  		p->qlim->alignment_offset =
942  			cpu_to_be32(bdev_alignment_offset(bdev));
943  		p->qlim->io_min = cpu_to_be32(bdev_io_min(bdev));
944  		p->qlim->io_opt = cpu_to_be32(bdev_io_opt(bdev));
945  		p->qlim->discard_enabled = !!bdev_max_discard_sectors(bdev);
946  		put_ldev(device);
947  	} else {
948  		struct request_queue *q = device->rq_queue;
949  
950  		p->qlim->physical_block_size =
951  			cpu_to_be32(queue_physical_block_size(q));
952  		p->qlim->logical_block_size =
953  			cpu_to_be32(queue_logical_block_size(q));
954  		p->qlim->alignment_offset = 0;
955  		p->qlim->io_min = cpu_to_be32(queue_io_min(q));
956  		p->qlim->io_opt = cpu_to_be32(queue_io_opt(q));
957  		p->qlim->discard_enabled = 0;
958  
959  		d_size = 0;
960  		u_size = 0;
961  		q_order_type = QUEUE_ORDERED_NONE;
962  		max_bio_size = DRBD_MAX_BIO_SIZE; /* ... multiple BIOs per peer_request */
963  	}
964  
965  	if (peer_device->connection->agreed_pro_version <= 94)
966  		max_bio_size = min(max_bio_size, DRBD_MAX_SIZE_H80_PACKET);
967  	else if (peer_device->connection->agreed_pro_version < 100)
968  		max_bio_size = min(max_bio_size, DRBD_MAX_BIO_SIZE_P95);
969  
970  	p->d_size = cpu_to_be64(d_size);
971  	p->u_size = cpu_to_be64(u_size);
972  	if (trigger_reply)
973  		p->c_size = 0;
974  	else
975  		p->c_size = cpu_to_be64(get_capacity(device->vdisk));
976  	p->max_bio_size = cpu_to_be32(max_bio_size);
977  	p->queue_order_type = cpu_to_be16(q_order_type);
978  	p->dds_flags = cpu_to_be16(flags);
979  
980  	return drbd_send_command(peer_device, sock, P_SIZES, packet_size, NULL, 0);
981  }
982  
983  /**
984   * drbd_send_current_state() - Sends the drbd state to the peer
985   * @peer_device:	DRBD peer device.
986   */
drbd_send_current_state(struct drbd_peer_device * peer_device)987  int drbd_send_current_state(struct drbd_peer_device *peer_device)
988  {
989  	struct drbd_socket *sock;
990  	struct p_state *p;
991  
992  	sock = &peer_device->connection->data;
993  	p = drbd_prepare_command(peer_device, sock);
994  	if (!p)
995  		return -EIO;
996  	p->state = cpu_to_be32(peer_device->device->state.i); /* Within the send mutex */
997  	return drbd_send_command(peer_device, sock, P_STATE, sizeof(*p), NULL, 0);
998  }
999  
1000  /**
1001   * drbd_send_state() - After a state change, sends the new state to the peer
1002   * @peer_device:      DRBD peer device.
1003   * @state:     the state to send, not necessarily the current state.
1004   *
1005   * Each state change queues an "after_state_ch" work, which will eventually
1006   * send the resulting new state to the peer. If more state changes happen
1007   * between queuing and processing of the after_state_ch work, we still
1008   * want to send each intermediary state in the order it occurred.
1009   */
drbd_send_state(struct drbd_peer_device * peer_device,union drbd_state state)1010  int drbd_send_state(struct drbd_peer_device *peer_device, union drbd_state state)
1011  {
1012  	struct drbd_socket *sock;
1013  	struct p_state *p;
1014  
1015  	sock = &peer_device->connection->data;
1016  	p = drbd_prepare_command(peer_device, sock);
1017  	if (!p)
1018  		return -EIO;
1019  	p->state = cpu_to_be32(state.i); /* Within the send mutex */
1020  	return drbd_send_command(peer_device, sock, P_STATE, sizeof(*p), NULL, 0);
1021  }
1022  
drbd_send_state_req(struct drbd_peer_device * peer_device,union drbd_state mask,union drbd_state val)1023  int drbd_send_state_req(struct drbd_peer_device *peer_device, union drbd_state mask, union drbd_state val)
1024  {
1025  	struct drbd_socket *sock;
1026  	struct p_req_state *p;
1027  
1028  	sock = &peer_device->connection->data;
1029  	p = drbd_prepare_command(peer_device, sock);
1030  	if (!p)
1031  		return -EIO;
1032  	p->mask = cpu_to_be32(mask.i);
1033  	p->val = cpu_to_be32(val.i);
1034  	return drbd_send_command(peer_device, sock, P_STATE_CHG_REQ, sizeof(*p), NULL, 0);
1035  }
1036  
conn_send_state_req(struct drbd_connection * connection,union drbd_state mask,union drbd_state val)1037  int conn_send_state_req(struct drbd_connection *connection, union drbd_state mask, union drbd_state val)
1038  {
1039  	enum drbd_packet cmd;
1040  	struct drbd_socket *sock;
1041  	struct p_req_state *p;
1042  
1043  	cmd = connection->agreed_pro_version < 100 ? P_STATE_CHG_REQ : P_CONN_ST_CHG_REQ;
1044  	sock = &connection->data;
1045  	p = conn_prepare_command(connection, sock);
1046  	if (!p)
1047  		return -EIO;
1048  	p->mask = cpu_to_be32(mask.i);
1049  	p->val = cpu_to_be32(val.i);
1050  	return conn_send_command(connection, sock, cmd, sizeof(*p), NULL, 0);
1051  }
1052  
drbd_send_sr_reply(struct drbd_peer_device * peer_device,enum drbd_state_rv retcode)1053  void drbd_send_sr_reply(struct drbd_peer_device *peer_device, enum drbd_state_rv retcode)
1054  {
1055  	struct drbd_socket *sock;
1056  	struct p_req_state_reply *p;
1057  
1058  	sock = &peer_device->connection->meta;
1059  	p = drbd_prepare_command(peer_device, sock);
1060  	if (p) {
1061  		p->retcode = cpu_to_be32(retcode);
1062  		drbd_send_command(peer_device, sock, P_STATE_CHG_REPLY, sizeof(*p), NULL, 0);
1063  	}
1064  }
1065  
conn_send_sr_reply(struct drbd_connection * connection,enum drbd_state_rv retcode)1066  void conn_send_sr_reply(struct drbd_connection *connection, enum drbd_state_rv retcode)
1067  {
1068  	struct drbd_socket *sock;
1069  	struct p_req_state_reply *p;
1070  	enum drbd_packet cmd = connection->agreed_pro_version < 100 ? P_STATE_CHG_REPLY : P_CONN_ST_CHG_REPLY;
1071  
1072  	sock = &connection->meta;
1073  	p = conn_prepare_command(connection, sock);
1074  	if (p) {
1075  		p->retcode = cpu_to_be32(retcode);
1076  		conn_send_command(connection, sock, cmd, sizeof(*p), NULL, 0);
1077  	}
1078  }
1079  
dcbp_set_code(struct p_compressed_bm * p,enum drbd_bitmap_code code)1080  static void dcbp_set_code(struct p_compressed_bm *p, enum drbd_bitmap_code code)
1081  {
1082  	BUG_ON(code & ~0xf);
1083  	p->encoding = (p->encoding & ~0xf) | code;
1084  }
1085  
dcbp_set_start(struct p_compressed_bm * p,int set)1086  static void dcbp_set_start(struct p_compressed_bm *p, int set)
1087  {
1088  	p->encoding = (p->encoding & ~0x80) | (set ? 0x80 : 0);
1089  }
1090  
dcbp_set_pad_bits(struct p_compressed_bm * p,int n)1091  static void dcbp_set_pad_bits(struct p_compressed_bm *p, int n)
1092  {
1093  	BUG_ON(n & ~0x7);
1094  	p->encoding = (p->encoding & (~0x7 << 4)) | (n << 4);
1095  }
1096  
fill_bitmap_rle_bits(struct drbd_device * device,struct p_compressed_bm * p,unsigned int size,struct bm_xfer_ctx * c)1097  static int fill_bitmap_rle_bits(struct drbd_device *device,
1098  			 struct p_compressed_bm *p,
1099  			 unsigned int size,
1100  			 struct bm_xfer_ctx *c)
1101  {
1102  	struct bitstream bs;
1103  	unsigned long plain_bits;
1104  	unsigned long tmp;
1105  	unsigned long rl;
1106  	unsigned len;
1107  	unsigned toggle;
1108  	int bits, use_rle;
1109  
1110  	/* may we use this feature? */
1111  	rcu_read_lock();
1112  	use_rle = rcu_dereference(first_peer_device(device)->connection->net_conf)->use_rle;
1113  	rcu_read_unlock();
1114  	if (!use_rle || first_peer_device(device)->connection->agreed_pro_version < 90)
1115  		return 0;
1116  
1117  	if (c->bit_offset >= c->bm_bits)
1118  		return 0; /* nothing to do. */
1119  
1120  	/* use at most thus many bytes */
1121  	bitstream_init(&bs, p->code, size, 0);
1122  	memset(p->code, 0, size);
1123  	/* plain bits covered in this code string */
1124  	plain_bits = 0;
1125  
1126  	/* p->encoding & 0x80 stores whether the first run length is set.
1127  	 * bit offset is implicit.
1128  	 * start with toggle == 2 to be able to tell the first iteration */
1129  	toggle = 2;
1130  
1131  	/* see how much plain bits we can stuff into one packet
1132  	 * using RLE and VLI. */
1133  	do {
1134  		tmp = (toggle == 0) ? _drbd_bm_find_next_zero(device, c->bit_offset)
1135  				    : _drbd_bm_find_next(device, c->bit_offset);
1136  		if (tmp == -1UL)
1137  			tmp = c->bm_bits;
1138  		rl = tmp - c->bit_offset;
1139  
1140  		if (toggle == 2) { /* first iteration */
1141  			if (rl == 0) {
1142  				/* the first checked bit was set,
1143  				 * store start value, */
1144  				dcbp_set_start(p, 1);
1145  				/* but skip encoding of zero run length */
1146  				toggle = !toggle;
1147  				continue;
1148  			}
1149  			dcbp_set_start(p, 0);
1150  		}
1151  
1152  		/* paranoia: catch zero runlength.
1153  		 * can only happen if bitmap is modified while we scan it. */
1154  		if (rl == 0) {
1155  			drbd_err(device, "unexpected zero runlength while encoding bitmap "
1156  			    "t:%u bo:%lu\n", toggle, c->bit_offset);
1157  			return -1;
1158  		}
1159  
1160  		bits = vli_encode_bits(&bs, rl);
1161  		if (bits == -ENOBUFS) /* buffer full */
1162  			break;
1163  		if (bits <= 0) {
1164  			drbd_err(device, "error while encoding bitmap: %d\n", bits);
1165  			return 0;
1166  		}
1167  
1168  		toggle = !toggle;
1169  		plain_bits += rl;
1170  		c->bit_offset = tmp;
1171  	} while (c->bit_offset < c->bm_bits);
1172  
1173  	len = bs.cur.b - p->code + !!bs.cur.bit;
1174  
1175  	if (plain_bits < (len << 3)) {
1176  		/* incompressible with this method.
1177  		 * we need to rewind both word and bit position. */
1178  		c->bit_offset -= plain_bits;
1179  		bm_xfer_ctx_bit_to_word_offset(c);
1180  		c->bit_offset = c->word_offset * BITS_PER_LONG;
1181  		return 0;
1182  	}
1183  
1184  	/* RLE + VLI was able to compress it just fine.
1185  	 * update c->word_offset. */
1186  	bm_xfer_ctx_bit_to_word_offset(c);
1187  
1188  	/* store pad_bits */
1189  	dcbp_set_pad_bits(p, (8 - bs.cur.bit) & 0x7);
1190  
1191  	return len;
1192  }
1193  
1194  /*
1195   * send_bitmap_rle_or_plain
1196   *
1197   * Return 0 when done, 1 when another iteration is needed, and a negative error
1198   * code upon failure.
1199   */
1200  static int
send_bitmap_rle_or_plain(struct drbd_device * device,struct bm_xfer_ctx * c)1201  send_bitmap_rle_or_plain(struct drbd_device *device, struct bm_xfer_ctx *c)
1202  {
1203  	struct drbd_socket *sock = &first_peer_device(device)->connection->data;
1204  	unsigned int header_size = drbd_header_size(first_peer_device(device)->connection);
1205  	struct p_compressed_bm *p = sock->sbuf + header_size;
1206  	int len, err;
1207  
1208  	len = fill_bitmap_rle_bits(device, p,
1209  			DRBD_SOCKET_BUFFER_SIZE - header_size - sizeof(*p), c);
1210  	if (len < 0)
1211  		return -EIO;
1212  
1213  	if (len) {
1214  		dcbp_set_code(p, RLE_VLI_Bits);
1215  		err = __send_command(first_peer_device(device)->connection, device->vnr, sock,
1216  				     P_COMPRESSED_BITMAP, sizeof(*p) + len,
1217  				     NULL, 0);
1218  		c->packets[0]++;
1219  		c->bytes[0] += header_size + sizeof(*p) + len;
1220  
1221  		if (c->bit_offset >= c->bm_bits)
1222  			len = 0; /* DONE */
1223  	} else {
1224  		/* was not compressible.
1225  		 * send a buffer full of plain text bits instead. */
1226  		unsigned int data_size;
1227  		unsigned long num_words;
1228  		unsigned long *p = sock->sbuf + header_size;
1229  
1230  		data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
1231  		num_words = min_t(size_t, data_size / sizeof(*p),
1232  				  c->bm_words - c->word_offset);
1233  		len = num_words * sizeof(*p);
1234  		if (len)
1235  			drbd_bm_get_lel(device, c->word_offset, num_words, p);
1236  		err = __send_command(first_peer_device(device)->connection, device->vnr, sock, P_BITMAP, len, NULL, 0);
1237  		c->word_offset += num_words;
1238  		c->bit_offset = c->word_offset * BITS_PER_LONG;
1239  
1240  		c->packets[1]++;
1241  		c->bytes[1] += header_size + len;
1242  
1243  		if (c->bit_offset > c->bm_bits)
1244  			c->bit_offset = c->bm_bits;
1245  	}
1246  	if (!err) {
1247  		if (len == 0) {
1248  			INFO_bm_xfer_stats(device, "send", c);
1249  			return 0;
1250  		} else
1251  			return 1;
1252  	}
1253  	return -EIO;
1254  }
1255  
1256  /* See the comment at receive_bitmap() */
_drbd_send_bitmap(struct drbd_device * device)1257  static int _drbd_send_bitmap(struct drbd_device *device)
1258  {
1259  	struct bm_xfer_ctx c;
1260  	int err;
1261  
1262  	if (!expect(device->bitmap))
1263  		return false;
1264  
1265  	if (get_ldev(device)) {
1266  		if (drbd_md_test_flag(device->ldev, MDF_FULL_SYNC)) {
1267  			drbd_info(device, "Writing the whole bitmap, MDF_FullSync was set.\n");
1268  			drbd_bm_set_all(device);
1269  			if (drbd_bm_write(device)) {
1270  				/* write_bm did fail! Leave full sync flag set in Meta P_DATA
1271  				 * but otherwise process as per normal - need to tell other
1272  				 * side that a full resync is required! */
1273  				drbd_err(device, "Failed to write bitmap to disk!\n");
1274  			} else {
1275  				drbd_md_clear_flag(device, MDF_FULL_SYNC);
1276  				drbd_md_sync(device);
1277  			}
1278  		}
1279  		put_ldev(device);
1280  	}
1281  
1282  	c = (struct bm_xfer_ctx) {
1283  		.bm_bits = drbd_bm_bits(device),
1284  		.bm_words = drbd_bm_words(device),
1285  	};
1286  
1287  	do {
1288  		err = send_bitmap_rle_or_plain(device, &c);
1289  	} while (err > 0);
1290  
1291  	return err == 0;
1292  }
1293  
drbd_send_bitmap(struct drbd_device * device)1294  int drbd_send_bitmap(struct drbd_device *device)
1295  {
1296  	struct drbd_socket *sock = &first_peer_device(device)->connection->data;
1297  	int err = -1;
1298  
1299  	mutex_lock(&sock->mutex);
1300  	if (sock->socket)
1301  		err = !_drbd_send_bitmap(device);
1302  	mutex_unlock(&sock->mutex);
1303  	return err;
1304  }
1305  
drbd_send_b_ack(struct drbd_connection * connection,u32 barrier_nr,u32 set_size)1306  void drbd_send_b_ack(struct drbd_connection *connection, u32 barrier_nr, u32 set_size)
1307  {
1308  	struct drbd_socket *sock;
1309  	struct p_barrier_ack *p;
1310  
1311  	if (connection->cstate < C_WF_REPORT_PARAMS)
1312  		return;
1313  
1314  	sock = &connection->meta;
1315  	p = conn_prepare_command(connection, sock);
1316  	if (!p)
1317  		return;
1318  	p->barrier = barrier_nr;
1319  	p->set_size = cpu_to_be32(set_size);
1320  	conn_send_command(connection, sock, P_BARRIER_ACK, sizeof(*p), NULL, 0);
1321  }
1322  
1323  /**
1324   * _drbd_send_ack() - Sends an ack packet
1325   * @peer_device:	DRBD peer device.
1326   * @cmd:		Packet command code.
1327   * @sector:		sector, needs to be in big endian byte order
1328   * @blksize:		size in byte, needs to be in big endian byte order
1329   * @block_id:		Id, big endian byte order
1330   */
_drbd_send_ack(struct drbd_peer_device * peer_device,enum drbd_packet cmd,u64 sector,u32 blksize,u64 block_id)1331  static int _drbd_send_ack(struct drbd_peer_device *peer_device, enum drbd_packet cmd,
1332  			  u64 sector, u32 blksize, u64 block_id)
1333  {
1334  	struct drbd_socket *sock;
1335  	struct p_block_ack *p;
1336  
1337  	if (peer_device->device->state.conn < C_CONNECTED)
1338  		return -EIO;
1339  
1340  	sock = &peer_device->connection->meta;
1341  	p = drbd_prepare_command(peer_device, sock);
1342  	if (!p)
1343  		return -EIO;
1344  	p->sector = sector;
1345  	p->block_id = block_id;
1346  	p->blksize = blksize;
1347  	p->seq_num = cpu_to_be32(atomic_inc_return(&peer_device->device->packet_seq));
1348  	return drbd_send_command(peer_device, sock, cmd, sizeof(*p), NULL, 0);
1349  }
1350  
1351  /* dp->sector and dp->block_id already/still in network byte order,
1352   * data_size is payload size according to dp->head,
1353   * and may need to be corrected for digest size. */
drbd_send_ack_dp(struct drbd_peer_device * peer_device,enum drbd_packet cmd,struct p_data * dp,int data_size)1354  void drbd_send_ack_dp(struct drbd_peer_device *peer_device, enum drbd_packet cmd,
1355  		      struct p_data *dp, int data_size)
1356  {
1357  	if (peer_device->connection->peer_integrity_tfm)
1358  		data_size -= crypto_shash_digestsize(peer_device->connection->peer_integrity_tfm);
1359  	_drbd_send_ack(peer_device, cmd, dp->sector, cpu_to_be32(data_size),
1360  		       dp->block_id);
1361  }
1362  
drbd_send_ack_rp(struct drbd_peer_device * peer_device,enum drbd_packet cmd,struct p_block_req * rp)1363  void drbd_send_ack_rp(struct drbd_peer_device *peer_device, enum drbd_packet cmd,
1364  		      struct p_block_req *rp)
1365  {
1366  	_drbd_send_ack(peer_device, cmd, rp->sector, rp->blksize, rp->block_id);
1367  }
1368  
1369  /**
1370   * drbd_send_ack() - Sends an ack packet
1371   * @peer_device:	DRBD peer device
1372   * @cmd:		packet command code
1373   * @peer_req:		peer request
1374   */
drbd_send_ack(struct drbd_peer_device * peer_device,enum drbd_packet cmd,struct drbd_peer_request * peer_req)1375  int drbd_send_ack(struct drbd_peer_device *peer_device, enum drbd_packet cmd,
1376  		  struct drbd_peer_request *peer_req)
1377  {
1378  	return _drbd_send_ack(peer_device, cmd,
1379  			      cpu_to_be64(peer_req->i.sector),
1380  			      cpu_to_be32(peer_req->i.size),
1381  			      peer_req->block_id);
1382  }
1383  
1384  /* This function misuses the block_id field to signal if the blocks
1385   * are is sync or not. */
drbd_send_ack_ex(struct drbd_peer_device * peer_device,enum drbd_packet cmd,sector_t sector,int blksize,u64 block_id)1386  int drbd_send_ack_ex(struct drbd_peer_device *peer_device, enum drbd_packet cmd,
1387  		     sector_t sector, int blksize, u64 block_id)
1388  {
1389  	return _drbd_send_ack(peer_device, cmd,
1390  			      cpu_to_be64(sector),
1391  			      cpu_to_be32(blksize),
1392  			      cpu_to_be64(block_id));
1393  }
1394  
drbd_send_rs_deallocated(struct drbd_peer_device * peer_device,struct drbd_peer_request * peer_req)1395  int drbd_send_rs_deallocated(struct drbd_peer_device *peer_device,
1396  			     struct drbd_peer_request *peer_req)
1397  {
1398  	struct drbd_socket *sock;
1399  	struct p_block_desc *p;
1400  
1401  	sock = &peer_device->connection->data;
1402  	p = drbd_prepare_command(peer_device, sock);
1403  	if (!p)
1404  		return -EIO;
1405  	p->sector = cpu_to_be64(peer_req->i.sector);
1406  	p->blksize = cpu_to_be32(peer_req->i.size);
1407  	p->pad = 0;
1408  	return drbd_send_command(peer_device, sock, P_RS_DEALLOCATED, sizeof(*p), NULL, 0);
1409  }
1410  
drbd_send_drequest(struct drbd_peer_device * peer_device,int cmd,sector_t sector,int size,u64 block_id)1411  int drbd_send_drequest(struct drbd_peer_device *peer_device, int cmd,
1412  		       sector_t sector, int size, u64 block_id)
1413  {
1414  	struct drbd_socket *sock;
1415  	struct p_block_req *p;
1416  
1417  	sock = &peer_device->connection->data;
1418  	p = drbd_prepare_command(peer_device, sock);
1419  	if (!p)
1420  		return -EIO;
1421  	p->sector = cpu_to_be64(sector);
1422  	p->block_id = block_id;
1423  	p->blksize = cpu_to_be32(size);
1424  	return drbd_send_command(peer_device, sock, cmd, sizeof(*p), NULL, 0);
1425  }
1426  
drbd_send_drequest_csum(struct drbd_peer_device * peer_device,sector_t sector,int size,void * digest,int digest_size,enum drbd_packet cmd)1427  int drbd_send_drequest_csum(struct drbd_peer_device *peer_device, sector_t sector, int size,
1428  			    void *digest, int digest_size, enum drbd_packet cmd)
1429  {
1430  	struct drbd_socket *sock;
1431  	struct p_block_req *p;
1432  
1433  	/* FIXME: Put the digest into the preallocated socket buffer.  */
1434  
1435  	sock = &peer_device->connection->data;
1436  	p = drbd_prepare_command(peer_device, sock);
1437  	if (!p)
1438  		return -EIO;
1439  	p->sector = cpu_to_be64(sector);
1440  	p->block_id = ID_SYNCER /* unused */;
1441  	p->blksize = cpu_to_be32(size);
1442  	return drbd_send_command(peer_device, sock, cmd, sizeof(*p), digest, digest_size);
1443  }
1444  
drbd_send_ov_request(struct drbd_peer_device * peer_device,sector_t sector,int size)1445  int drbd_send_ov_request(struct drbd_peer_device *peer_device, sector_t sector, int size)
1446  {
1447  	struct drbd_socket *sock;
1448  	struct p_block_req *p;
1449  
1450  	sock = &peer_device->connection->data;
1451  	p = drbd_prepare_command(peer_device, sock);
1452  	if (!p)
1453  		return -EIO;
1454  	p->sector = cpu_to_be64(sector);
1455  	p->block_id = ID_SYNCER /* unused */;
1456  	p->blksize = cpu_to_be32(size);
1457  	return drbd_send_command(peer_device, sock, P_OV_REQUEST, sizeof(*p), NULL, 0);
1458  }
1459  
1460  /* called on sndtimeo
1461   * returns false if we should retry,
1462   * true if we think connection is dead
1463   */
we_should_drop_the_connection(struct drbd_connection * connection,struct socket * sock)1464  static int we_should_drop_the_connection(struct drbd_connection *connection, struct socket *sock)
1465  {
1466  	int drop_it;
1467  	/* long elapsed = (long)(jiffies - device->last_received); */
1468  
1469  	drop_it =   connection->meta.socket == sock
1470  		|| !connection->ack_receiver.task
1471  		|| get_t_state(&connection->ack_receiver) != RUNNING
1472  		|| connection->cstate < C_WF_REPORT_PARAMS;
1473  
1474  	if (drop_it)
1475  		return true;
1476  
1477  	drop_it = !--connection->ko_count;
1478  	if (!drop_it) {
1479  		drbd_err(connection, "[%s/%d] sock_sendmsg time expired, ko = %u\n",
1480  			 current->comm, current->pid, connection->ko_count);
1481  		request_ping(connection);
1482  	}
1483  
1484  	return drop_it; /* && (device->state == R_PRIMARY) */;
1485  }
1486  
drbd_update_congested(struct drbd_connection * connection)1487  static void drbd_update_congested(struct drbd_connection *connection)
1488  {
1489  	struct sock *sk = connection->data.socket->sk;
1490  	if (sk->sk_wmem_queued > sk->sk_sndbuf * 4 / 5)
1491  		set_bit(NET_CONGESTED, &connection->flags);
1492  }
1493  
1494  /* The idea of sendpage seems to be to put some kind of reference
1495   * to the page into the skb, and to hand it over to the NIC. In
1496   * this process get_page() gets called.
1497   *
1498   * As soon as the page was really sent over the network put_page()
1499   * gets called by some part of the network layer. [ NIC driver? ]
1500   *
1501   * [ get_page() / put_page() increment/decrement the count. If count
1502   *   reaches 0 the page will be freed. ]
1503   *
1504   * This works nicely with pages from FSs.
1505   * But this means that in protocol A we might signal IO completion too early!
1506   *
1507   * In order not to corrupt data during a resync we must make sure
1508   * that we do not reuse our own buffer pages (EEs) to early, therefore
1509   * we have the net_ee list.
1510   *
1511   * XFS seems to have problems, still, it submits pages with page_count == 0!
1512   * As a workaround, we disable sendpage on pages
1513   * with page_count == 0 or PageSlab.
1514   */
_drbd_no_send_page(struct drbd_peer_device * peer_device,struct page * page,int offset,size_t size,unsigned msg_flags)1515  static int _drbd_no_send_page(struct drbd_peer_device *peer_device, struct page *page,
1516  			      int offset, size_t size, unsigned msg_flags)
1517  {
1518  	struct socket *socket;
1519  	void *addr;
1520  	int err;
1521  
1522  	socket = peer_device->connection->data.socket;
1523  	addr = kmap(page) + offset;
1524  	err = drbd_send_all(peer_device->connection, socket, addr, size, msg_flags);
1525  	kunmap(page);
1526  	if (!err)
1527  		peer_device->device->send_cnt += size >> 9;
1528  	return err;
1529  }
1530  
_drbd_send_page(struct drbd_peer_device * peer_device,struct page * page,int offset,size_t size,unsigned msg_flags)1531  static int _drbd_send_page(struct drbd_peer_device *peer_device, struct page *page,
1532  		    int offset, size_t size, unsigned msg_flags)
1533  {
1534  	struct socket *socket = peer_device->connection->data.socket;
1535  	int len = size;
1536  	int err = -EIO;
1537  
1538  	/* e.g. XFS meta- & log-data is in slab pages, which have a
1539  	 * page_count of 0 and/or have PageSlab() set.
1540  	 * we cannot use send_page for those, as that does get_page();
1541  	 * put_page(); and would cause either a VM_BUG directly, or
1542  	 * __page_cache_release a page that would actually still be referenced
1543  	 * by someone, leading to some obscure delayed Oops somewhere else. */
1544  	if (drbd_disable_sendpage || !sendpage_ok(page))
1545  		return _drbd_no_send_page(peer_device, page, offset, size, msg_flags);
1546  
1547  	msg_flags |= MSG_NOSIGNAL;
1548  	drbd_update_congested(peer_device->connection);
1549  	do {
1550  		int sent;
1551  
1552  		sent = socket->ops->sendpage(socket, page, offset, len, msg_flags);
1553  		if (sent <= 0) {
1554  			if (sent == -EAGAIN) {
1555  				if (we_should_drop_the_connection(peer_device->connection, socket))
1556  					break;
1557  				continue;
1558  			}
1559  			drbd_warn(peer_device->device, "%s: size=%d len=%d sent=%d\n",
1560  			     __func__, (int)size, len, sent);
1561  			if (sent < 0)
1562  				err = sent;
1563  			break;
1564  		}
1565  		len    -= sent;
1566  		offset += sent;
1567  	} while (len > 0 /* THINK && device->cstate >= C_CONNECTED*/);
1568  	clear_bit(NET_CONGESTED, &peer_device->connection->flags);
1569  
1570  	if (len == 0) {
1571  		err = 0;
1572  		peer_device->device->send_cnt += size >> 9;
1573  	}
1574  	return err;
1575  }
1576  
_drbd_send_bio(struct drbd_peer_device * peer_device,struct bio * bio)1577  static int _drbd_send_bio(struct drbd_peer_device *peer_device, struct bio *bio)
1578  {
1579  	struct bio_vec bvec;
1580  	struct bvec_iter iter;
1581  
1582  	/* hint all but last page with MSG_MORE */
1583  	bio_for_each_segment(bvec, bio, iter) {
1584  		int err;
1585  
1586  		err = _drbd_no_send_page(peer_device, bvec.bv_page,
1587  					 bvec.bv_offset, bvec.bv_len,
1588  					 bio_iter_last(bvec, iter)
1589  					 ? 0 : MSG_MORE);
1590  		if (err)
1591  			return err;
1592  	}
1593  	return 0;
1594  }
1595  
_drbd_send_zc_bio(struct drbd_peer_device * peer_device,struct bio * bio)1596  static int _drbd_send_zc_bio(struct drbd_peer_device *peer_device, struct bio *bio)
1597  {
1598  	struct bio_vec bvec;
1599  	struct bvec_iter iter;
1600  
1601  	/* hint all but last page with MSG_MORE */
1602  	bio_for_each_segment(bvec, bio, iter) {
1603  		int err;
1604  
1605  		err = _drbd_send_page(peer_device, bvec.bv_page,
1606  				      bvec.bv_offset, bvec.bv_len,
1607  				      bio_iter_last(bvec, iter) ? 0 : MSG_MORE);
1608  		if (err)
1609  			return err;
1610  	}
1611  	return 0;
1612  }
1613  
_drbd_send_zc_ee(struct drbd_peer_device * peer_device,struct drbd_peer_request * peer_req)1614  static int _drbd_send_zc_ee(struct drbd_peer_device *peer_device,
1615  			    struct drbd_peer_request *peer_req)
1616  {
1617  	struct page *page = peer_req->pages;
1618  	unsigned len = peer_req->i.size;
1619  	int err;
1620  
1621  	/* hint all but last page with MSG_MORE */
1622  	page_chain_for_each(page) {
1623  		unsigned l = min_t(unsigned, len, PAGE_SIZE);
1624  
1625  		err = _drbd_send_page(peer_device, page, 0, l,
1626  				      page_chain_next(page) ? MSG_MORE : 0);
1627  		if (err)
1628  			return err;
1629  		len -= l;
1630  	}
1631  	return 0;
1632  }
1633  
bio_flags_to_wire(struct drbd_connection * connection,struct bio * bio)1634  static u32 bio_flags_to_wire(struct drbd_connection *connection,
1635  			     struct bio *bio)
1636  {
1637  	if (connection->agreed_pro_version >= 95)
1638  		return  (bio->bi_opf & REQ_SYNC ? DP_RW_SYNC : 0) |
1639  			(bio->bi_opf & REQ_FUA ? DP_FUA : 0) |
1640  			(bio->bi_opf & REQ_PREFLUSH ? DP_FLUSH : 0) |
1641  			(bio_op(bio) == REQ_OP_DISCARD ? DP_DISCARD : 0) |
1642  			(bio_op(bio) == REQ_OP_WRITE_ZEROES ?
1643  			  ((connection->agreed_features & DRBD_FF_WZEROES) ?
1644  			   (DP_ZEROES |(!(bio->bi_opf & REQ_NOUNMAP) ? DP_DISCARD : 0))
1645  			   : DP_DISCARD)
1646  			: 0);
1647  	else
1648  		return bio->bi_opf & REQ_SYNC ? DP_RW_SYNC : 0;
1649  }
1650  
1651  /* Used to send write or TRIM aka REQ_OP_DISCARD requests
1652   * R_PRIMARY -> Peer	(P_DATA, P_TRIM)
1653   */
drbd_send_dblock(struct drbd_peer_device * peer_device,struct drbd_request * req)1654  int drbd_send_dblock(struct drbd_peer_device *peer_device, struct drbd_request *req)
1655  {
1656  	struct drbd_device *device = peer_device->device;
1657  	struct drbd_socket *sock;
1658  	struct p_data *p;
1659  	void *digest_out;
1660  	unsigned int dp_flags = 0;
1661  	int digest_size;
1662  	int err;
1663  
1664  	sock = &peer_device->connection->data;
1665  	p = drbd_prepare_command(peer_device, sock);
1666  	digest_size = peer_device->connection->integrity_tfm ?
1667  		      crypto_shash_digestsize(peer_device->connection->integrity_tfm) : 0;
1668  
1669  	if (!p)
1670  		return -EIO;
1671  	p->sector = cpu_to_be64(req->i.sector);
1672  	p->block_id = (unsigned long)req;
1673  	p->seq_num = cpu_to_be32(atomic_inc_return(&device->packet_seq));
1674  	dp_flags = bio_flags_to_wire(peer_device->connection, req->master_bio);
1675  	if (device->state.conn >= C_SYNC_SOURCE &&
1676  	    device->state.conn <= C_PAUSED_SYNC_T)
1677  		dp_flags |= DP_MAY_SET_IN_SYNC;
1678  	if (peer_device->connection->agreed_pro_version >= 100) {
1679  		if (req->rq_state & RQ_EXP_RECEIVE_ACK)
1680  			dp_flags |= DP_SEND_RECEIVE_ACK;
1681  		/* During resync, request an explicit write ack,
1682  		 * even in protocol != C */
1683  		if (req->rq_state & RQ_EXP_WRITE_ACK
1684  		|| (dp_flags & DP_MAY_SET_IN_SYNC))
1685  			dp_flags |= DP_SEND_WRITE_ACK;
1686  	}
1687  	p->dp_flags = cpu_to_be32(dp_flags);
1688  
1689  	if (dp_flags & (DP_DISCARD|DP_ZEROES)) {
1690  		enum drbd_packet cmd = (dp_flags & DP_ZEROES) ? P_ZEROES : P_TRIM;
1691  		struct p_trim *t = (struct p_trim*)p;
1692  		t->size = cpu_to_be32(req->i.size);
1693  		err = __send_command(peer_device->connection, device->vnr, sock, cmd, sizeof(*t), NULL, 0);
1694  		goto out;
1695  	}
1696  	digest_out = p + 1;
1697  
1698  	/* our digest is still only over the payload.
1699  	 * TRIM does not carry any payload. */
1700  	if (digest_size)
1701  		drbd_csum_bio(peer_device->connection->integrity_tfm, req->master_bio, digest_out);
1702  	err = __send_command(peer_device->connection, device->vnr, sock, P_DATA,
1703  			     sizeof(*p) + digest_size, NULL, req->i.size);
1704  	if (!err) {
1705  		/* For protocol A, we have to memcpy the payload into
1706  		 * socket buffers, as we may complete right away
1707  		 * as soon as we handed it over to tcp, at which point the data
1708  		 * pages may become invalid.
1709  		 *
1710  		 * For data-integrity enabled, we copy it as well, so we can be
1711  		 * sure that even if the bio pages may still be modified, it
1712  		 * won't change the data on the wire, thus if the digest checks
1713  		 * out ok after sending on this side, but does not fit on the
1714  		 * receiving side, we sure have detected corruption elsewhere.
1715  		 */
1716  		if (!(req->rq_state & (RQ_EXP_RECEIVE_ACK | RQ_EXP_WRITE_ACK)) || digest_size)
1717  			err = _drbd_send_bio(peer_device, req->master_bio);
1718  		else
1719  			err = _drbd_send_zc_bio(peer_device, req->master_bio);
1720  
1721  		/* double check digest, sometimes buffers have been modified in flight. */
1722  		if (digest_size > 0 && digest_size <= 64) {
1723  			/* 64 byte, 512 bit, is the largest digest size
1724  			 * currently supported in kernel crypto. */
1725  			unsigned char digest[64];
1726  			drbd_csum_bio(peer_device->connection->integrity_tfm, req->master_bio, digest);
1727  			if (memcmp(p + 1, digest, digest_size)) {
1728  				drbd_warn(device,
1729  					"Digest mismatch, buffer modified by upper layers during write: %llus +%u\n",
1730  					(unsigned long long)req->i.sector, req->i.size);
1731  			}
1732  		} /* else if (digest_size > 64) {
1733  		     ... Be noisy about digest too large ...
1734  		} */
1735  	}
1736  out:
1737  	mutex_unlock(&sock->mutex);  /* locked by drbd_prepare_command() */
1738  
1739  	return err;
1740  }
1741  
1742  /* answer packet, used to send data back for read requests:
1743   *  Peer       -> (diskless) R_PRIMARY   (P_DATA_REPLY)
1744   *  C_SYNC_SOURCE -> C_SYNC_TARGET         (P_RS_DATA_REPLY)
1745   */
drbd_send_block(struct drbd_peer_device * peer_device,enum drbd_packet cmd,struct drbd_peer_request * peer_req)1746  int drbd_send_block(struct drbd_peer_device *peer_device, enum drbd_packet cmd,
1747  		    struct drbd_peer_request *peer_req)
1748  {
1749  	struct drbd_device *device = peer_device->device;
1750  	struct drbd_socket *sock;
1751  	struct p_data *p;
1752  	int err;
1753  	int digest_size;
1754  
1755  	sock = &peer_device->connection->data;
1756  	p = drbd_prepare_command(peer_device, sock);
1757  
1758  	digest_size = peer_device->connection->integrity_tfm ?
1759  		      crypto_shash_digestsize(peer_device->connection->integrity_tfm) : 0;
1760  
1761  	if (!p)
1762  		return -EIO;
1763  	p->sector = cpu_to_be64(peer_req->i.sector);
1764  	p->block_id = peer_req->block_id;
1765  	p->seq_num = 0;  /* unused */
1766  	p->dp_flags = 0;
1767  	if (digest_size)
1768  		drbd_csum_ee(peer_device->connection->integrity_tfm, peer_req, p + 1);
1769  	err = __send_command(peer_device->connection, device->vnr, sock, cmd, sizeof(*p) + digest_size, NULL, peer_req->i.size);
1770  	if (!err)
1771  		err = _drbd_send_zc_ee(peer_device, peer_req);
1772  	mutex_unlock(&sock->mutex);  /* locked by drbd_prepare_command() */
1773  
1774  	return err;
1775  }
1776  
drbd_send_out_of_sync(struct drbd_peer_device * peer_device,struct drbd_request * req)1777  int drbd_send_out_of_sync(struct drbd_peer_device *peer_device, struct drbd_request *req)
1778  {
1779  	struct drbd_socket *sock;
1780  	struct p_block_desc *p;
1781  
1782  	sock = &peer_device->connection->data;
1783  	p = drbd_prepare_command(peer_device, sock);
1784  	if (!p)
1785  		return -EIO;
1786  	p->sector = cpu_to_be64(req->i.sector);
1787  	p->blksize = cpu_to_be32(req->i.size);
1788  	return drbd_send_command(peer_device, sock, P_OUT_OF_SYNC, sizeof(*p), NULL, 0);
1789  }
1790  
1791  /*
1792    drbd_send distinguishes two cases:
1793  
1794    Packets sent via the data socket "sock"
1795    and packets sent via the meta data socket "msock"
1796  
1797  		    sock                      msock
1798    -----------------+-------------------------+------------------------------
1799    timeout           conf.timeout / 2          conf.timeout / 2
1800    timeout action    send a ping via msock     Abort communication
1801  					      and close all sockets
1802  */
1803  
1804  /*
1805   * you must have down()ed the appropriate [m]sock_mutex elsewhere!
1806   */
drbd_send(struct drbd_connection * connection,struct socket * sock,void * buf,size_t size,unsigned msg_flags)1807  int drbd_send(struct drbd_connection *connection, struct socket *sock,
1808  	      void *buf, size_t size, unsigned msg_flags)
1809  {
1810  	struct kvec iov = {.iov_base = buf, .iov_len = size};
1811  	struct msghdr msg = {.msg_flags = msg_flags | MSG_NOSIGNAL};
1812  	int rv, sent = 0;
1813  
1814  	if (!sock)
1815  		return -EBADR;
1816  
1817  	/* THINK  if (signal_pending) return ... ? */
1818  
1819  	iov_iter_kvec(&msg.msg_iter, WRITE, &iov, 1, size);
1820  
1821  	if (sock == connection->data.socket) {
1822  		rcu_read_lock();
1823  		connection->ko_count = rcu_dereference(connection->net_conf)->ko_count;
1824  		rcu_read_unlock();
1825  		drbd_update_congested(connection);
1826  	}
1827  	do {
1828  		rv = sock_sendmsg(sock, &msg);
1829  		if (rv == -EAGAIN) {
1830  			if (we_should_drop_the_connection(connection, sock))
1831  				break;
1832  			else
1833  				continue;
1834  		}
1835  		if (rv == -EINTR) {
1836  			flush_signals(current);
1837  			rv = 0;
1838  		}
1839  		if (rv < 0)
1840  			break;
1841  		sent += rv;
1842  	} while (sent < size);
1843  
1844  	if (sock == connection->data.socket)
1845  		clear_bit(NET_CONGESTED, &connection->flags);
1846  
1847  	if (rv <= 0) {
1848  		if (rv != -EAGAIN) {
1849  			drbd_err(connection, "%s_sendmsg returned %d\n",
1850  				 sock == connection->meta.socket ? "msock" : "sock",
1851  				 rv);
1852  			conn_request_state(connection, NS(conn, C_BROKEN_PIPE), CS_HARD);
1853  		} else
1854  			conn_request_state(connection, NS(conn, C_TIMEOUT), CS_HARD);
1855  	}
1856  
1857  	return sent;
1858  }
1859  
1860  /*
1861   * drbd_send_all  -  Send an entire buffer
1862   *
1863   * Returns 0 upon success and a negative error value otherwise.
1864   */
drbd_send_all(struct drbd_connection * connection,struct socket * sock,void * buffer,size_t size,unsigned msg_flags)1865  int drbd_send_all(struct drbd_connection *connection, struct socket *sock, void *buffer,
1866  		  size_t size, unsigned msg_flags)
1867  {
1868  	int err;
1869  
1870  	err = drbd_send(connection, sock, buffer, size, msg_flags);
1871  	if (err < 0)
1872  		return err;
1873  	if (err != size)
1874  		return -EIO;
1875  	return 0;
1876  }
1877  
drbd_open(struct block_device * bdev,fmode_t mode)1878  static int drbd_open(struct block_device *bdev, fmode_t mode)
1879  {
1880  	struct drbd_device *device = bdev->bd_disk->private_data;
1881  	unsigned long flags;
1882  	int rv = 0;
1883  
1884  	mutex_lock(&drbd_main_mutex);
1885  	spin_lock_irqsave(&device->resource->req_lock, flags);
1886  	/* to have a stable device->state.role
1887  	 * and no race with updating open_cnt */
1888  
1889  	if (device->state.role != R_PRIMARY) {
1890  		if (mode & FMODE_WRITE)
1891  			rv = -EROFS;
1892  		else if (!drbd_allow_oos)
1893  			rv = -EMEDIUMTYPE;
1894  	}
1895  
1896  	if (!rv)
1897  		device->open_cnt++;
1898  	spin_unlock_irqrestore(&device->resource->req_lock, flags);
1899  	mutex_unlock(&drbd_main_mutex);
1900  
1901  	return rv;
1902  }
1903  
drbd_release(struct gendisk * gd,fmode_t mode)1904  static void drbd_release(struct gendisk *gd, fmode_t mode)
1905  {
1906  	struct drbd_device *device = gd->private_data;
1907  	mutex_lock(&drbd_main_mutex);
1908  	device->open_cnt--;
1909  	mutex_unlock(&drbd_main_mutex);
1910  }
1911  
1912  /* need to hold resource->req_lock */
drbd_queue_unplug(struct drbd_device * device)1913  void drbd_queue_unplug(struct drbd_device *device)
1914  {
1915  	if (device->state.pdsk >= D_INCONSISTENT && device->state.conn >= C_CONNECTED) {
1916  		D_ASSERT(device, device->state.role == R_PRIMARY);
1917  		if (test_and_clear_bit(UNPLUG_REMOTE, &device->flags)) {
1918  			drbd_queue_work_if_unqueued(
1919  				&first_peer_device(device)->connection->sender_work,
1920  				&device->unplug_work);
1921  		}
1922  	}
1923  }
1924  
drbd_set_defaults(struct drbd_device * device)1925  static void drbd_set_defaults(struct drbd_device *device)
1926  {
1927  	/* Beware! The actual layout differs
1928  	 * between big endian and little endian */
1929  	device->state = (union drbd_dev_state) {
1930  		{ .role = R_SECONDARY,
1931  		  .peer = R_UNKNOWN,
1932  		  .conn = C_STANDALONE,
1933  		  .disk = D_DISKLESS,
1934  		  .pdsk = D_UNKNOWN,
1935  		} };
1936  }
1937  
drbd_init_set_defaults(struct drbd_device * device)1938  void drbd_init_set_defaults(struct drbd_device *device)
1939  {
1940  	/* the memset(,0,) did most of this.
1941  	 * note: only assignments, no allocation in here */
1942  
1943  	drbd_set_defaults(device);
1944  
1945  	atomic_set(&device->ap_bio_cnt, 0);
1946  	atomic_set(&device->ap_actlog_cnt, 0);
1947  	atomic_set(&device->ap_pending_cnt, 0);
1948  	atomic_set(&device->rs_pending_cnt, 0);
1949  	atomic_set(&device->unacked_cnt, 0);
1950  	atomic_set(&device->local_cnt, 0);
1951  	atomic_set(&device->pp_in_use_by_net, 0);
1952  	atomic_set(&device->rs_sect_in, 0);
1953  	atomic_set(&device->rs_sect_ev, 0);
1954  	atomic_set(&device->ap_in_flight, 0);
1955  	atomic_set(&device->md_io.in_use, 0);
1956  
1957  	mutex_init(&device->own_state_mutex);
1958  	device->state_mutex = &device->own_state_mutex;
1959  
1960  	spin_lock_init(&device->al_lock);
1961  	spin_lock_init(&device->peer_seq_lock);
1962  
1963  	INIT_LIST_HEAD(&device->active_ee);
1964  	INIT_LIST_HEAD(&device->sync_ee);
1965  	INIT_LIST_HEAD(&device->done_ee);
1966  	INIT_LIST_HEAD(&device->read_ee);
1967  	INIT_LIST_HEAD(&device->net_ee);
1968  	INIT_LIST_HEAD(&device->resync_reads);
1969  	INIT_LIST_HEAD(&device->resync_work.list);
1970  	INIT_LIST_HEAD(&device->unplug_work.list);
1971  	INIT_LIST_HEAD(&device->bm_io_work.w.list);
1972  	INIT_LIST_HEAD(&device->pending_master_completion[0]);
1973  	INIT_LIST_HEAD(&device->pending_master_completion[1]);
1974  	INIT_LIST_HEAD(&device->pending_completion[0]);
1975  	INIT_LIST_HEAD(&device->pending_completion[1]);
1976  
1977  	device->resync_work.cb  = w_resync_timer;
1978  	device->unplug_work.cb  = w_send_write_hint;
1979  	device->bm_io_work.w.cb = w_bitmap_io;
1980  
1981  	timer_setup(&device->resync_timer, resync_timer_fn, 0);
1982  	timer_setup(&device->md_sync_timer, md_sync_timer_fn, 0);
1983  	timer_setup(&device->start_resync_timer, start_resync_timer_fn, 0);
1984  	timer_setup(&device->request_timer, request_timer_fn, 0);
1985  
1986  	init_waitqueue_head(&device->misc_wait);
1987  	init_waitqueue_head(&device->state_wait);
1988  	init_waitqueue_head(&device->ee_wait);
1989  	init_waitqueue_head(&device->al_wait);
1990  	init_waitqueue_head(&device->seq_wait);
1991  
1992  	device->resync_wenr = LC_FREE;
1993  	device->peer_max_bio_size = DRBD_MAX_BIO_SIZE_SAFE;
1994  	device->local_max_bio_size = DRBD_MAX_BIO_SIZE_SAFE;
1995  }
1996  
drbd_set_my_capacity(struct drbd_device * device,sector_t size)1997  void drbd_set_my_capacity(struct drbd_device *device, sector_t size)
1998  {
1999  	char ppb[10];
2000  
2001  	set_capacity_and_notify(device->vdisk, size);
2002  
2003  	drbd_info(device, "size = %s (%llu KB)\n",
2004  		ppsize(ppb, size>>1), (unsigned long long)size>>1);
2005  }
2006  
drbd_device_cleanup(struct drbd_device * device)2007  void drbd_device_cleanup(struct drbd_device *device)
2008  {
2009  	int i;
2010  	if (first_peer_device(device)->connection->receiver.t_state != NONE)
2011  		drbd_err(device, "ASSERT FAILED: receiver t_state == %d expected 0.\n",
2012  				first_peer_device(device)->connection->receiver.t_state);
2013  
2014  	device->al_writ_cnt  =
2015  	device->bm_writ_cnt  =
2016  	device->read_cnt     =
2017  	device->recv_cnt     =
2018  	device->send_cnt     =
2019  	device->writ_cnt     =
2020  	device->p_size       =
2021  	device->rs_start     =
2022  	device->rs_total     =
2023  	device->rs_failed    = 0;
2024  	device->rs_last_events = 0;
2025  	device->rs_last_sect_ev = 0;
2026  	for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2027  		device->rs_mark_left[i] = 0;
2028  		device->rs_mark_time[i] = 0;
2029  	}
2030  	D_ASSERT(device, first_peer_device(device)->connection->net_conf == NULL);
2031  
2032  	set_capacity_and_notify(device->vdisk, 0);
2033  	if (device->bitmap) {
2034  		/* maybe never allocated. */
2035  		drbd_bm_resize(device, 0, 1);
2036  		drbd_bm_cleanup(device);
2037  	}
2038  
2039  	drbd_backing_dev_free(device, device->ldev);
2040  	device->ldev = NULL;
2041  
2042  	clear_bit(AL_SUSPENDED, &device->flags);
2043  
2044  	D_ASSERT(device, list_empty(&device->active_ee));
2045  	D_ASSERT(device, list_empty(&device->sync_ee));
2046  	D_ASSERT(device, list_empty(&device->done_ee));
2047  	D_ASSERT(device, list_empty(&device->read_ee));
2048  	D_ASSERT(device, list_empty(&device->net_ee));
2049  	D_ASSERT(device, list_empty(&device->resync_reads));
2050  	D_ASSERT(device, list_empty(&first_peer_device(device)->connection->sender_work.q));
2051  	D_ASSERT(device, list_empty(&device->resync_work.list));
2052  	D_ASSERT(device, list_empty(&device->unplug_work.list));
2053  
2054  	drbd_set_defaults(device);
2055  }
2056  
2057  
drbd_destroy_mempools(void)2058  static void drbd_destroy_mempools(void)
2059  {
2060  	struct page *page;
2061  
2062  	while (drbd_pp_pool) {
2063  		page = drbd_pp_pool;
2064  		drbd_pp_pool = (struct page *)page_private(page);
2065  		__free_page(page);
2066  		drbd_pp_vacant--;
2067  	}
2068  
2069  	/* D_ASSERT(device, atomic_read(&drbd_pp_vacant)==0); */
2070  
2071  	bioset_exit(&drbd_io_bio_set);
2072  	bioset_exit(&drbd_md_io_bio_set);
2073  	mempool_exit(&drbd_md_io_page_pool);
2074  	mempool_exit(&drbd_ee_mempool);
2075  	mempool_exit(&drbd_request_mempool);
2076  	kmem_cache_destroy(drbd_ee_cache);
2077  	kmem_cache_destroy(drbd_request_cache);
2078  	kmem_cache_destroy(drbd_bm_ext_cache);
2079  	kmem_cache_destroy(drbd_al_ext_cache);
2080  
2081  	drbd_ee_cache        = NULL;
2082  	drbd_request_cache   = NULL;
2083  	drbd_bm_ext_cache    = NULL;
2084  	drbd_al_ext_cache    = NULL;
2085  
2086  	return;
2087  }
2088  
drbd_create_mempools(void)2089  static int drbd_create_mempools(void)
2090  {
2091  	struct page *page;
2092  	const int number = (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * drbd_minor_count;
2093  	int i, ret;
2094  
2095  	/* caches */
2096  	drbd_request_cache = kmem_cache_create(
2097  		"drbd_req", sizeof(struct drbd_request), 0, 0, NULL);
2098  	if (drbd_request_cache == NULL)
2099  		goto Enomem;
2100  
2101  	drbd_ee_cache = kmem_cache_create(
2102  		"drbd_ee", sizeof(struct drbd_peer_request), 0, 0, NULL);
2103  	if (drbd_ee_cache == NULL)
2104  		goto Enomem;
2105  
2106  	drbd_bm_ext_cache = kmem_cache_create(
2107  		"drbd_bm", sizeof(struct bm_extent), 0, 0, NULL);
2108  	if (drbd_bm_ext_cache == NULL)
2109  		goto Enomem;
2110  
2111  	drbd_al_ext_cache = kmem_cache_create(
2112  		"drbd_al", sizeof(struct lc_element), 0, 0, NULL);
2113  	if (drbd_al_ext_cache == NULL)
2114  		goto Enomem;
2115  
2116  	/* mempools */
2117  	ret = bioset_init(&drbd_io_bio_set, BIO_POOL_SIZE, 0, 0);
2118  	if (ret)
2119  		goto Enomem;
2120  
2121  	ret = bioset_init(&drbd_md_io_bio_set, DRBD_MIN_POOL_PAGES, 0,
2122  			  BIOSET_NEED_BVECS);
2123  	if (ret)
2124  		goto Enomem;
2125  
2126  	ret = mempool_init_page_pool(&drbd_md_io_page_pool, DRBD_MIN_POOL_PAGES, 0);
2127  	if (ret)
2128  		goto Enomem;
2129  
2130  	ret = mempool_init_slab_pool(&drbd_request_mempool, number,
2131  				     drbd_request_cache);
2132  	if (ret)
2133  		goto Enomem;
2134  
2135  	ret = mempool_init_slab_pool(&drbd_ee_mempool, number, drbd_ee_cache);
2136  	if (ret)
2137  		goto Enomem;
2138  
2139  	for (i = 0; i < number; i++) {
2140  		page = alloc_page(GFP_HIGHUSER);
2141  		if (!page)
2142  			goto Enomem;
2143  		set_page_private(page, (unsigned long)drbd_pp_pool);
2144  		drbd_pp_pool = page;
2145  	}
2146  	drbd_pp_vacant = number;
2147  
2148  	return 0;
2149  
2150  Enomem:
2151  	drbd_destroy_mempools(); /* in case we allocated some */
2152  	return -ENOMEM;
2153  }
2154  
drbd_release_all_peer_reqs(struct drbd_device * device)2155  static void drbd_release_all_peer_reqs(struct drbd_device *device)
2156  {
2157  	int rr;
2158  
2159  	rr = drbd_free_peer_reqs(device, &device->active_ee);
2160  	if (rr)
2161  		drbd_err(device, "%d EEs in active list found!\n", rr);
2162  
2163  	rr = drbd_free_peer_reqs(device, &device->sync_ee);
2164  	if (rr)
2165  		drbd_err(device, "%d EEs in sync list found!\n", rr);
2166  
2167  	rr = drbd_free_peer_reqs(device, &device->read_ee);
2168  	if (rr)
2169  		drbd_err(device, "%d EEs in read list found!\n", rr);
2170  
2171  	rr = drbd_free_peer_reqs(device, &device->done_ee);
2172  	if (rr)
2173  		drbd_err(device, "%d EEs in done list found!\n", rr);
2174  
2175  	rr = drbd_free_peer_reqs(device, &device->net_ee);
2176  	if (rr)
2177  		drbd_err(device, "%d EEs in net list found!\n", rr);
2178  }
2179  
2180  /* caution. no locking. */
drbd_destroy_device(struct kref * kref)2181  void drbd_destroy_device(struct kref *kref)
2182  {
2183  	struct drbd_device *device = container_of(kref, struct drbd_device, kref);
2184  	struct drbd_resource *resource = device->resource;
2185  	struct drbd_peer_device *peer_device, *tmp_peer_device;
2186  
2187  	del_timer_sync(&device->request_timer);
2188  
2189  	/* paranoia asserts */
2190  	D_ASSERT(device, device->open_cnt == 0);
2191  	/* end paranoia asserts */
2192  
2193  	/* cleanup stuff that may have been allocated during
2194  	 * device (re-)configuration or state changes */
2195  
2196  	drbd_backing_dev_free(device, device->ldev);
2197  	device->ldev = NULL;
2198  
2199  	drbd_release_all_peer_reqs(device);
2200  
2201  	lc_destroy(device->act_log);
2202  	lc_destroy(device->resync);
2203  
2204  	kfree(device->p_uuid);
2205  	/* device->p_uuid = NULL; */
2206  
2207  	if (device->bitmap) /* should no longer be there. */
2208  		drbd_bm_cleanup(device);
2209  	__free_page(device->md_io.page);
2210  	put_disk(device->vdisk);
2211  	kfree(device->rs_plan_s);
2212  
2213  	/* not for_each_connection(connection, resource):
2214  	 * those may have been cleaned up and disassociated already.
2215  	 */
2216  	for_each_peer_device_safe(peer_device, tmp_peer_device, device) {
2217  		kref_put(&peer_device->connection->kref, drbd_destroy_connection);
2218  		kfree(peer_device);
2219  	}
2220  	memset(device, 0xfd, sizeof(*device));
2221  	kfree(device);
2222  	kref_put(&resource->kref, drbd_destroy_resource);
2223  }
2224  
2225  /* One global retry thread, if we need to push back some bio and have it
2226   * reinserted through our make request function.
2227   */
2228  static struct retry_worker {
2229  	struct workqueue_struct *wq;
2230  	struct work_struct worker;
2231  
2232  	spinlock_t lock;
2233  	struct list_head writes;
2234  } retry;
2235  
do_retry(struct work_struct * ws)2236  static void do_retry(struct work_struct *ws)
2237  {
2238  	struct retry_worker *retry = container_of(ws, struct retry_worker, worker);
2239  	LIST_HEAD(writes);
2240  	struct drbd_request *req, *tmp;
2241  
2242  	spin_lock_irq(&retry->lock);
2243  	list_splice_init(&retry->writes, &writes);
2244  	spin_unlock_irq(&retry->lock);
2245  
2246  	list_for_each_entry_safe(req, tmp, &writes, tl_requests) {
2247  		struct drbd_device *device = req->device;
2248  		struct bio *bio = req->master_bio;
2249  		bool expected;
2250  
2251  		expected =
2252  			expect(atomic_read(&req->completion_ref) == 0) &&
2253  			expect(req->rq_state & RQ_POSTPONED) &&
2254  			expect((req->rq_state & RQ_LOCAL_PENDING) == 0 ||
2255  				(req->rq_state & RQ_LOCAL_ABORTED) != 0);
2256  
2257  		if (!expected)
2258  			drbd_err(device, "req=%p completion_ref=%d rq_state=%x\n",
2259  				req, atomic_read(&req->completion_ref),
2260  				req->rq_state);
2261  
2262  		/* We still need to put one kref associated with the
2263  		 * "completion_ref" going zero in the code path that queued it
2264  		 * here.  The request object may still be referenced by a
2265  		 * frozen local req->private_bio, in case we force-detached.
2266  		 */
2267  		kref_put(&req->kref, drbd_req_destroy);
2268  
2269  		/* A single suspended or otherwise blocking device may stall
2270  		 * all others as well.  Fortunately, this code path is to
2271  		 * recover from a situation that "should not happen":
2272  		 * concurrent writes in multi-primary setup.
2273  		 * In a "normal" lifecycle, this workqueue is supposed to be
2274  		 * destroyed without ever doing anything.
2275  		 * If it turns out to be an issue anyways, we can do per
2276  		 * resource (replication group) or per device (minor) retry
2277  		 * workqueues instead.
2278  		 */
2279  
2280  		/* We are not just doing submit_bio_noacct(),
2281  		 * as we want to keep the start_time information. */
2282  		inc_ap_bio(device);
2283  		__drbd_make_request(device, bio);
2284  	}
2285  }
2286  
2287  /* called via drbd_req_put_completion_ref(),
2288   * holds resource->req_lock */
drbd_restart_request(struct drbd_request * req)2289  void drbd_restart_request(struct drbd_request *req)
2290  {
2291  	unsigned long flags;
2292  	spin_lock_irqsave(&retry.lock, flags);
2293  	list_move_tail(&req->tl_requests, &retry.writes);
2294  	spin_unlock_irqrestore(&retry.lock, flags);
2295  
2296  	/* Drop the extra reference that would otherwise
2297  	 * have been dropped by complete_master_bio.
2298  	 * do_retry() needs to grab a new one. */
2299  	dec_ap_bio(req->device);
2300  
2301  	queue_work(retry.wq, &retry.worker);
2302  }
2303  
drbd_destroy_resource(struct kref * kref)2304  void drbd_destroy_resource(struct kref *kref)
2305  {
2306  	struct drbd_resource *resource =
2307  		container_of(kref, struct drbd_resource, kref);
2308  
2309  	idr_destroy(&resource->devices);
2310  	free_cpumask_var(resource->cpu_mask);
2311  	kfree(resource->name);
2312  	memset(resource, 0xf2, sizeof(*resource));
2313  	kfree(resource);
2314  }
2315  
drbd_free_resource(struct drbd_resource * resource)2316  void drbd_free_resource(struct drbd_resource *resource)
2317  {
2318  	struct drbd_connection *connection, *tmp;
2319  
2320  	for_each_connection_safe(connection, tmp, resource) {
2321  		list_del(&connection->connections);
2322  		drbd_debugfs_connection_cleanup(connection);
2323  		kref_put(&connection->kref, drbd_destroy_connection);
2324  	}
2325  	drbd_debugfs_resource_cleanup(resource);
2326  	kref_put(&resource->kref, drbd_destroy_resource);
2327  }
2328  
drbd_cleanup(void)2329  static void drbd_cleanup(void)
2330  {
2331  	unsigned int i;
2332  	struct drbd_device *device;
2333  	struct drbd_resource *resource, *tmp;
2334  
2335  	/* first remove proc,
2336  	 * drbdsetup uses it's presence to detect
2337  	 * whether DRBD is loaded.
2338  	 * If we would get stuck in proc removal,
2339  	 * but have netlink already deregistered,
2340  	 * some drbdsetup commands may wait forever
2341  	 * for an answer.
2342  	 */
2343  	if (drbd_proc)
2344  		remove_proc_entry("drbd", NULL);
2345  
2346  	if (retry.wq)
2347  		destroy_workqueue(retry.wq);
2348  
2349  	drbd_genl_unregister();
2350  
2351  	idr_for_each_entry(&drbd_devices, device, i)
2352  		drbd_delete_device(device);
2353  
2354  	/* not _rcu since, no other updater anymore. Genl already unregistered */
2355  	for_each_resource_safe(resource, tmp, &drbd_resources) {
2356  		list_del(&resource->resources);
2357  		drbd_free_resource(resource);
2358  	}
2359  
2360  	drbd_debugfs_cleanup();
2361  
2362  	drbd_destroy_mempools();
2363  	unregister_blkdev(DRBD_MAJOR, "drbd");
2364  
2365  	idr_destroy(&drbd_devices);
2366  
2367  	pr_info("module cleanup done.\n");
2368  }
2369  
drbd_init_workqueue(struct drbd_work_queue * wq)2370  static void drbd_init_workqueue(struct drbd_work_queue* wq)
2371  {
2372  	spin_lock_init(&wq->q_lock);
2373  	INIT_LIST_HEAD(&wq->q);
2374  	init_waitqueue_head(&wq->q_wait);
2375  }
2376  
2377  struct completion_work {
2378  	struct drbd_work w;
2379  	struct completion done;
2380  };
2381  
w_complete(struct drbd_work * w,int cancel)2382  static int w_complete(struct drbd_work *w, int cancel)
2383  {
2384  	struct completion_work *completion_work =
2385  		container_of(w, struct completion_work, w);
2386  
2387  	complete(&completion_work->done);
2388  	return 0;
2389  }
2390  
drbd_flush_workqueue(struct drbd_work_queue * work_queue)2391  void drbd_flush_workqueue(struct drbd_work_queue *work_queue)
2392  {
2393  	struct completion_work completion_work;
2394  
2395  	completion_work.w.cb = w_complete;
2396  	init_completion(&completion_work.done);
2397  	drbd_queue_work(work_queue, &completion_work.w);
2398  	wait_for_completion(&completion_work.done);
2399  }
2400  
drbd_find_resource(const char * name)2401  struct drbd_resource *drbd_find_resource(const char *name)
2402  {
2403  	struct drbd_resource *resource;
2404  
2405  	if (!name || !name[0])
2406  		return NULL;
2407  
2408  	rcu_read_lock();
2409  	for_each_resource_rcu(resource, &drbd_resources) {
2410  		if (!strcmp(resource->name, name)) {
2411  			kref_get(&resource->kref);
2412  			goto found;
2413  		}
2414  	}
2415  	resource = NULL;
2416  found:
2417  	rcu_read_unlock();
2418  	return resource;
2419  }
2420  
conn_get_by_addrs(void * my_addr,int my_addr_len,void * peer_addr,int peer_addr_len)2421  struct drbd_connection *conn_get_by_addrs(void *my_addr, int my_addr_len,
2422  				     void *peer_addr, int peer_addr_len)
2423  {
2424  	struct drbd_resource *resource;
2425  	struct drbd_connection *connection;
2426  
2427  	rcu_read_lock();
2428  	for_each_resource_rcu(resource, &drbd_resources) {
2429  		for_each_connection_rcu(connection, resource) {
2430  			if (connection->my_addr_len == my_addr_len &&
2431  			    connection->peer_addr_len == peer_addr_len &&
2432  			    !memcmp(&connection->my_addr, my_addr, my_addr_len) &&
2433  			    !memcmp(&connection->peer_addr, peer_addr, peer_addr_len)) {
2434  				kref_get(&connection->kref);
2435  				goto found;
2436  			}
2437  		}
2438  	}
2439  	connection = NULL;
2440  found:
2441  	rcu_read_unlock();
2442  	return connection;
2443  }
2444  
drbd_alloc_socket(struct drbd_socket * socket)2445  static int drbd_alloc_socket(struct drbd_socket *socket)
2446  {
2447  	socket->rbuf = (void *) __get_free_page(GFP_KERNEL);
2448  	if (!socket->rbuf)
2449  		return -ENOMEM;
2450  	socket->sbuf = (void *) __get_free_page(GFP_KERNEL);
2451  	if (!socket->sbuf)
2452  		return -ENOMEM;
2453  	return 0;
2454  }
2455  
drbd_free_socket(struct drbd_socket * socket)2456  static void drbd_free_socket(struct drbd_socket *socket)
2457  {
2458  	free_page((unsigned long) socket->sbuf);
2459  	free_page((unsigned long) socket->rbuf);
2460  }
2461  
conn_free_crypto(struct drbd_connection * connection)2462  void conn_free_crypto(struct drbd_connection *connection)
2463  {
2464  	drbd_free_sock(connection);
2465  
2466  	crypto_free_shash(connection->csums_tfm);
2467  	crypto_free_shash(connection->verify_tfm);
2468  	crypto_free_shash(connection->cram_hmac_tfm);
2469  	crypto_free_shash(connection->integrity_tfm);
2470  	crypto_free_shash(connection->peer_integrity_tfm);
2471  	kfree(connection->int_dig_in);
2472  	kfree(connection->int_dig_vv);
2473  
2474  	connection->csums_tfm = NULL;
2475  	connection->verify_tfm = NULL;
2476  	connection->cram_hmac_tfm = NULL;
2477  	connection->integrity_tfm = NULL;
2478  	connection->peer_integrity_tfm = NULL;
2479  	connection->int_dig_in = NULL;
2480  	connection->int_dig_vv = NULL;
2481  }
2482  
set_resource_options(struct drbd_resource * resource,struct res_opts * res_opts)2483  int set_resource_options(struct drbd_resource *resource, struct res_opts *res_opts)
2484  {
2485  	struct drbd_connection *connection;
2486  	cpumask_var_t new_cpu_mask;
2487  	int err;
2488  
2489  	if (!zalloc_cpumask_var(&new_cpu_mask, GFP_KERNEL))
2490  		return -ENOMEM;
2491  
2492  	/* silently ignore cpu mask on UP kernel */
2493  	if (nr_cpu_ids > 1 && res_opts->cpu_mask[0] != 0) {
2494  		err = bitmap_parse(res_opts->cpu_mask, DRBD_CPU_MASK_SIZE,
2495  				   cpumask_bits(new_cpu_mask), nr_cpu_ids);
2496  		if (err == -EOVERFLOW) {
2497  			/* So what. mask it out. */
2498  			cpumask_var_t tmp_cpu_mask;
2499  			if (zalloc_cpumask_var(&tmp_cpu_mask, GFP_KERNEL)) {
2500  				cpumask_setall(tmp_cpu_mask);
2501  				cpumask_and(new_cpu_mask, new_cpu_mask, tmp_cpu_mask);
2502  				drbd_warn(resource, "Overflow in bitmap_parse(%.12s%s), truncating to %u bits\n",
2503  					res_opts->cpu_mask,
2504  					strlen(res_opts->cpu_mask) > 12 ? "..." : "",
2505  					nr_cpu_ids);
2506  				free_cpumask_var(tmp_cpu_mask);
2507  				err = 0;
2508  			}
2509  		}
2510  		if (err) {
2511  			drbd_warn(resource, "bitmap_parse() failed with %d\n", err);
2512  			/* retcode = ERR_CPU_MASK_PARSE; */
2513  			goto fail;
2514  		}
2515  	}
2516  	resource->res_opts = *res_opts;
2517  	if (cpumask_empty(new_cpu_mask))
2518  		drbd_calc_cpu_mask(&new_cpu_mask);
2519  	if (!cpumask_equal(resource->cpu_mask, new_cpu_mask)) {
2520  		cpumask_copy(resource->cpu_mask, new_cpu_mask);
2521  		for_each_connection_rcu(connection, resource) {
2522  			connection->receiver.reset_cpu_mask = 1;
2523  			connection->ack_receiver.reset_cpu_mask = 1;
2524  			connection->worker.reset_cpu_mask = 1;
2525  		}
2526  	}
2527  	err = 0;
2528  
2529  fail:
2530  	free_cpumask_var(new_cpu_mask);
2531  	return err;
2532  
2533  }
2534  
drbd_create_resource(const char * name)2535  struct drbd_resource *drbd_create_resource(const char *name)
2536  {
2537  	struct drbd_resource *resource;
2538  
2539  	resource = kzalloc(sizeof(struct drbd_resource), GFP_KERNEL);
2540  	if (!resource)
2541  		goto fail;
2542  	resource->name = kstrdup(name, GFP_KERNEL);
2543  	if (!resource->name)
2544  		goto fail_free_resource;
2545  	if (!zalloc_cpumask_var(&resource->cpu_mask, GFP_KERNEL))
2546  		goto fail_free_name;
2547  	kref_init(&resource->kref);
2548  	idr_init(&resource->devices);
2549  	INIT_LIST_HEAD(&resource->connections);
2550  	resource->write_ordering = WO_BDEV_FLUSH;
2551  	list_add_tail_rcu(&resource->resources, &drbd_resources);
2552  	mutex_init(&resource->conf_update);
2553  	mutex_init(&resource->adm_mutex);
2554  	spin_lock_init(&resource->req_lock);
2555  	drbd_debugfs_resource_add(resource);
2556  	return resource;
2557  
2558  fail_free_name:
2559  	kfree(resource->name);
2560  fail_free_resource:
2561  	kfree(resource);
2562  fail:
2563  	return NULL;
2564  }
2565  
2566  /* caller must be under adm_mutex */
conn_create(const char * name,struct res_opts * res_opts)2567  struct drbd_connection *conn_create(const char *name, struct res_opts *res_opts)
2568  {
2569  	struct drbd_resource *resource;
2570  	struct drbd_connection *connection;
2571  
2572  	connection = kzalloc(sizeof(struct drbd_connection), GFP_KERNEL);
2573  	if (!connection)
2574  		return NULL;
2575  
2576  	if (drbd_alloc_socket(&connection->data))
2577  		goto fail;
2578  	if (drbd_alloc_socket(&connection->meta))
2579  		goto fail;
2580  
2581  	connection->current_epoch = kzalloc(sizeof(struct drbd_epoch), GFP_KERNEL);
2582  	if (!connection->current_epoch)
2583  		goto fail;
2584  
2585  	INIT_LIST_HEAD(&connection->transfer_log);
2586  
2587  	INIT_LIST_HEAD(&connection->current_epoch->list);
2588  	connection->epochs = 1;
2589  	spin_lock_init(&connection->epoch_lock);
2590  
2591  	connection->send.seen_any_write_yet = false;
2592  	connection->send.current_epoch_nr = 0;
2593  	connection->send.current_epoch_writes = 0;
2594  
2595  	resource = drbd_create_resource(name);
2596  	if (!resource)
2597  		goto fail;
2598  
2599  	connection->cstate = C_STANDALONE;
2600  	mutex_init(&connection->cstate_mutex);
2601  	init_waitqueue_head(&connection->ping_wait);
2602  	idr_init(&connection->peer_devices);
2603  
2604  	drbd_init_workqueue(&connection->sender_work);
2605  	mutex_init(&connection->data.mutex);
2606  	mutex_init(&connection->meta.mutex);
2607  
2608  	drbd_thread_init(resource, &connection->receiver, drbd_receiver, "receiver");
2609  	connection->receiver.connection = connection;
2610  	drbd_thread_init(resource, &connection->worker, drbd_worker, "worker");
2611  	connection->worker.connection = connection;
2612  	drbd_thread_init(resource, &connection->ack_receiver, drbd_ack_receiver, "ack_recv");
2613  	connection->ack_receiver.connection = connection;
2614  
2615  	kref_init(&connection->kref);
2616  
2617  	connection->resource = resource;
2618  
2619  	if (set_resource_options(resource, res_opts))
2620  		goto fail_resource;
2621  
2622  	kref_get(&resource->kref);
2623  	list_add_tail_rcu(&connection->connections, &resource->connections);
2624  	drbd_debugfs_connection_add(connection);
2625  	return connection;
2626  
2627  fail_resource:
2628  	list_del(&resource->resources);
2629  	drbd_free_resource(resource);
2630  fail:
2631  	kfree(connection->current_epoch);
2632  	drbd_free_socket(&connection->meta);
2633  	drbd_free_socket(&connection->data);
2634  	kfree(connection);
2635  	return NULL;
2636  }
2637  
drbd_destroy_connection(struct kref * kref)2638  void drbd_destroy_connection(struct kref *kref)
2639  {
2640  	struct drbd_connection *connection = container_of(kref, struct drbd_connection, kref);
2641  	struct drbd_resource *resource = connection->resource;
2642  
2643  	if (atomic_read(&connection->current_epoch->epoch_size) !=  0)
2644  		drbd_err(connection, "epoch_size:%d\n", atomic_read(&connection->current_epoch->epoch_size));
2645  	kfree(connection->current_epoch);
2646  
2647  	idr_destroy(&connection->peer_devices);
2648  
2649  	drbd_free_socket(&connection->meta);
2650  	drbd_free_socket(&connection->data);
2651  	kfree(connection->int_dig_in);
2652  	kfree(connection->int_dig_vv);
2653  	memset(connection, 0xfc, sizeof(*connection));
2654  	kfree(connection);
2655  	kref_put(&resource->kref, drbd_destroy_resource);
2656  }
2657  
init_submitter(struct drbd_device * device)2658  static int init_submitter(struct drbd_device *device)
2659  {
2660  	/* opencoded create_singlethread_workqueue(),
2661  	 * to be able to say "drbd%d", ..., minor */
2662  	device->submit.wq =
2663  		alloc_ordered_workqueue("drbd%u_submit", WQ_MEM_RECLAIM, device->minor);
2664  	if (!device->submit.wq)
2665  		return -ENOMEM;
2666  
2667  	INIT_WORK(&device->submit.worker, do_submit);
2668  	INIT_LIST_HEAD(&device->submit.writes);
2669  	return 0;
2670  }
2671  
drbd_create_device(struct drbd_config_context * adm_ctx,unsigned int minor)2672  enum drbd_ret_code drbd_create_device(struct drbd_config_context *adm_ctx, unsigned int minor)
2673  {
2674  	struct drbd_resource *resource = adm_ctx->resource;
2675  	struct drbd_connection *connection, *n;
2676  	struct drbd_device *device;
2677  	struct drbd_peer_device *peer_device, *tmp_peer_device;
2678  	struct gendisk *disk;
2679  	int id;
2680  	int vnr = adm_ctx->volume;
2681  	enum drbd_ret_code err = ERR_NOMEM;
2682  
2683  	device = minor_to_device(minor);
2684  	if (device)
2685  		return ERR_MINOR_OR_VOLUME_EXISTS;
2686  
2687  	/* GFP_KERNEL, we are outside of all write-out paths */
2688  	device = kzalloc(sizeof(struct drbd_device), GFP_KERNEL);
2689  	if (!device)
2690  		return ERR_NOMEM;
2691  	kref_init(&device->kref);
2692  
2693  	kref_get(&resource->kref);
2694  	device->resource = resource;
2695  	device->minor = minor;
2696  	device->vnr = vnr;
2697  
2698  	drbd_init_set_defaults(device);
2699  
2700  	disk = blk_alloc_disk(NUMA_NO_NODE);
2701  	if (!disk)
2702  		goto out_no_disk;
2703  
2704  	device->vdisk = disk;
2705  	device->rq_queue = disk->queue;
2706  
2707  	set_disk_ro(disk, true);
2708  
2709  	disk->major = DRBD_MAJOR;
2710  	disk->first_minor = minor;
2711  	disk->minors = 1;
2712  	disk->fops = &drbd_ops;
2713  	disk->flags |= GENHD_FL_NO_PART;
2714  	sprintf(disk->disk_name, "drbd%d", minor);
2715  	disk->private_data = device;
2716  
2717  	blk_queue_flag_set(QUEUE_FLAG_STABLE_WRITES, disk->queue);
2718  	blk_queue_write_cache(disk->queue, true, true);
2719  	/* Setting the max_hw_sectors to an odd value of 8kibyte here
2720  	   This triggers a max_bio_size message upon first attach or connect */
2721  	blk_queue_max_hw_sectors(disk->queue, DRBD_MAX_BIO_SIZE_SAFE >> 8);
2722  
2723  	device->md_io.page = alloc_page(GFP_KERNEL);
2724  	if (!device->md_io.page)
2725  		goto out_no_io_page;
2726  
2727  	if (drbd_bm_init(device))
2728  		goto out_no_bitmap;
2729  	device->read_requests = RB_ROOT;
2730  	device->write_requests = RB_ROOT;
2731  
2732  	id = idr_alloc(&drbd_devices, device, minor, minor + 1, GFP_KERNEL);
2733  	if (id < 0) {
2734  		if (id == -ENOSPC)
2735  			err = ERR_MINOR_OR_VOLUME_EXISTS;
2736  		goto out_no_minor_idr;
2737  	}
2738  	kref_get(&device->kref);
2739  
2740  	id = idr_alloc(&resource->devices, device, vnr, vnr + 1, GFP_KERNEL);
2741  	if (id < 0) {
2742  		if (id == -ENOSPC)
2743  			err = ERR_MINOR_OR_VOLUME_EXISTS;
2744  		goto out_idr_remove_minor;
2745  	}
2746  	kref_get(&device->kref);
2747  
2748  	INIT_LIST_HEAD(&device->peer_devices);
2749  	INIT_LIST_HEAD(&device->pending_bitmap_io);
2750  	for_each_connection(connection, resource) {
2751  		peer_device = kzalloc(sizeof(struct drbd_peer_device), GFP_KERNEL);
2752  		if (!peer_device)
2753  			goto out_idr_remove_from_resource;
2754  		peer_device->connection = connection;
2755  		peer_device->device = device;
2756  
2757  		list_add(&peer_device->peer_devices, &device->peer_devices);
2758  		kref_get(&device->kref);
2759  
2760  		id = idr_alloc(&connection->peer_devices, peer_device, vnr, vnr + 1, GFP_KERNEL);
2761  		if (id < 0) {
2762  			if (id == -ENOSPC)
2763  				err = ERR_INVALID_REQUEST;
2764  			goto out_idr_remove_from_resource;
2765  		}
2766  		kref_get(&connection->kref);
2767  		INIT_WORK(&peer_device->send_acks_work, drbd_send_acks_wf);
2768  	}
2769  
2770  	if (init_submitter(device)) {
2771  		err = ERR_NOMEM;
2772  		goto out_idr_remove_from_resource;
2773  	}
2774  
2775  	err = add_disk(disk);
2776  	if (err)
2777  		goto out_idr_remove_from_resource;
2778  
2779  	/* inherit the connection state */
2780  	device->state.conn = first_connection(resource)->cstate;
2781  	if (device->state.conn == C_WF_REPORT_PARAMS) {
2782  		for_each_peer_device(peer_device, device)
2783  			drbd_connected(peer_device);
2784  	}
2785  	/* move to create_peer_device() */
2786  	for_each_peer_device(peer_device, device)
2787  		drbd_debugfs_peer_device_add(peer_device);
2788  	drbd_debugfs_device_add(device);
2789  	return NO_ERROR;
2790  
2791  out_idr_remove_from_resource:
2792  	for_each_connection_safe(connection, n, resource) {
2793  		peer_device = idr_remove(&connection->peer_devices, vnr);
2794  		if (peer_device)
2795  			kref_put(&connection->kref, drbd_destroy_connection);
2796  	}
2797  	for_each_peer_device_safe(peer_device, tmp_peer_device, device) {
2798  		list_del(&peer_device->peer_devices);
2799  		kfree(peer_device);
2800  	}
2801  	idr_remove(&resource->devices, vnr);
2802  out_idr_remove_minor:
2803  	idr_remove(&drbd_devices, minor);
2804  	synchronize_rcu();
2805  out_no_minor_idr:
2806  	drbd_bm_cleanup(device);
2807  out_no_bitmap:
2808  	__free_page(device->md_io.page);
2809  out_no_io_page:
2810  	put_disk(disk);
2811  out_no_disk:
2812  	kref_put(&resource->kref, drbd_destroy_resource);
2813  	kfree(device);
2814  	return err;
2815  }
2816  
drbd_delete_device(struct drbd_device * device)2817  void drbd_delete_device(struct drbd_device *device)
2818  {
2819  	struct drbd_resource *resource = device->resource;
2820  	struct drbd_connection *connection;
2821  	struct drbd_peer_device *peer_device;
2822  
2823  	/* move to free_peer_device() */
2824  	for_each_peer_device(peer_device, device)
2825  		drbd_debugfs_peer_device_cleanup(peer_device);
2826  	drbd_debugfs_device_cleanup(device);
2827  	for_each_connection(connection, resource) {
2828  		idr_remove(&connection->peer_devices, device->vnr);
2829  		kref_put(&device->kref, drbd_destroy_device);
2830  	}
2831  	idr_remove(&resource->devices, device->vnr);
2832  	kref_put(&device->kref, drbd_destroy_device);
2833  	idr_remove(&drbd_devices, device_to_minor(device));
2834  	kref_put(&device->kref, drbd_destroy_device);
2835  	del_gendisk(device->vdisk);
2836  	synchronize_rcu();
2837  	kref_put(&device->kref, drbd_destroy_device);
2838  }
2839  
drbd_init(void)2840  static int __init drbd_init(void)
2841  {
2842  	int err;
2843  
2844  	if (drbd_minor_count < DRBD_MINOR_COUNT_MIN || drbd_minor_count > DRBD_MINOR_COUNT_MAX) {
2845  		pr_err("invalid minor_count (%d)\n", drbd_minor_count);
2846  #ifdef MODULE
2847  		return -EINVAL;
2848  #else
2849  		drbd_minor_count = DRBD_MINOR_COUNT_DEF;
2850  #endif
2851  	}
2852  
2853  	err = register_blkdev(DRBD_MAJOR, "drbd");
2854  	if (err) {
2855  		pr_err("unable to register block device major %d\n",
2856  		       DRBD_MAJOR);
2857  		return err;
2858  	}
2859  
2860  	/*
2861  	 * allocate all necessary structs
2862  	 */
2863  	init_waitqueue_head(&drbd_pp_wait);
2864  
2865  	drbd_proc = NULL; /* play safe for drbd_cleanup */
2866  	idr_init(&drbd_devices);
2867  
2868  	mutex_init(&resources_mutex);
2869  	INIT_LIST_HEAD(&drbd_resources);
2870  
2871  	err = drbd_genl_register();
2872  	if (err) {
2873  		pr_err("unable to register generic netlink family\n");
2874  		goto fail;
2875  	}
2876  
2877  	err = drbd_create_mempools();
2878  	if (err)
2879  		goto fail;
2880  
2881  	err = -ENOMEM;
2882  	drbd_proc = proc_create_single("drbd", S_IFREG | 0444 , NULL, drbd_seq_show);
2883  	if (!drbd_proc)	{
2884  		pr_err("unable to register proc file\n");
2885  		goto fail;
2886  	}
2887  
2888  	retry.wq = create_singlethread_workqueue("drbd-reissue");
2889  	if (!retry.wq) {
2890  		pr_err("unable to create retry workqueue\n");
2891  		goto fail;
2892  	}
2893  	INIT_WORK(&retry.worker, do_retry);
2894  	spin_lock_init(&retry.lock);
2895  	INIT_LIST_HEAD(&retry.writes);
2896  
2897  	drbd_debugfs_init();
2898  
2899  	pr_info("initialized. "
2900  	       "Version: " REL_VERSION " (api:%d/proto:%d-%d)\n",
2901  	       API_VERSION, PRO_VERSION_MIN, PRO_VERSION_MAX);
2902  	pr_info("%s\n", drbd_buildtag());
2903  	pr_info("registered as block device major %d\n", DRBD_MAJOR);
2904  	return 0; /* Success! */
2905  
2906  fail:
2907  	drbd_cleanup();
2908  	if (err == -ENOMEM)
2909  		pr_err("ran out of memory\n");
2910  	else
2911  		pr_err("initialization failure\n");
2912  	return err;
2913  }
2914  
drbd_free_one_sock(struct drbd_socket * ds)2915  static void drbd_free_one_sock(struct drbd_socket *ds)
2916  {
2917  	struct socket *s;
2918  	mutex_lock(&ds->mutex);
2919  	s = ds->socket;
2920  	ds->socket = NULL;
2921  	mutex_unlock(&ds->mutex);
2922  	if (s) {
2923  		/* so debugfs does not need to mutex_lock() */
2924  		synchronize_rcu();
2925  		kernel_sock_shutdown(s, SHUT_RDWR);
2926  		sock_release(s);
2927  	}
2928  }
2929  
drbd_free_sock(struct drbd_connection * connection)2930  void drbd_free_sock(struct drbd_connection *connection)
2931  {
2932  	if (connection->data.socket)
2933  		drbd_free_one_sock(&connection->data);
2934  	if (connection->meta.socket)
2935  		drbd_free_one_sock(&connection->meta);
2936  }
2937  
2938  /* meta data management */
2939  
conn_md_sync(struct drbd_connection * connection)2940  void conn_md_sync(struct drbd_connection *connection)
2941  {
2942  	struct drbd_peer_device *peer_device;
2943  	int vnr;
2944  
2945  	rcu_read_lock();
2946  	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
2947  		struct drbd_device *device = peer_device->device;
2948  
2949  		kref_get(&device->kref);
2950  		rcu_read_unlock();
2951  		drbd_md_sync(device);
2952  		kref_put(&device->kref, drbd_destroy_device);
2953  		rcu_read_lock();
2954  	}
2955  	rcu_read_unlock();
2956  }
2957  
2958  /* aligned 4kByte */
2959  struct meta_data_on_disk {
2960  	u64 la_size_sect;      /* last agreed size. */
2961  	u64 uuid[UI_SIZE];   /* UUIDs. */
2962  	u64 device_uuid;
2963  	u64 reserved_u64_1;
2964  	u32 flags;             /* MDF */
2965  	u32 magic;
2966  	u32 md_size_sect;
2967  	u32 al_offset;         /* offset to this block */
2968  	u32 al_nr_extents;     /* important for restoring the AL (userspace) */
2969  	      /* `-- act_log->nr_elements <-- ldev->dc.al_extents */
2970  	u32 bm_offset;         /* offset to the bitmap, from here */
2971  	u32 bm_bytes_per_bit;  /* BM_BLOCK_SIZE */
2972  	u32 la_peer_max_bio_size;   /* last peer max_bio_size */
2973  
2974  	/* see al_tr_number_to_on_disk_sector() */
2975  	u32 al_stripes;
2976  	u32 al_stripe_size_4k;
2977  
2978  	u8 reserved_u8[4096 - (7*8 + 10*4)];
2979  } __packed;
2980  
2981  
2982  
drbd_md_write(struct drbd_device * device,void * b)2983  void drbd_md_write(struct drbd_device *device, void *b)
2984  {
2985  	struct meta_data_on_disk *buffer = b;
2986  	sector_t sector;
2987  	int i;
2988  
2989  	memset(buffer, 0, sizeof(*buffer));
2990  
2991  	buffer->la_size_sect = cpu_to_be64(get_capacity(device->vdisk));
2992  	for (i = UI_CURRENT; i < UI_SIZE; i++)
2993  		buffer->uuid[i] = cpu_to_be64(device->ldev->md.uuid[i]);
2994  	buffer->flags = cpu_to_be32(device->ldev->md.flags);
2995  	buffer->magic = cpu_to_be32(DRBD_MD_MAGIC_84_UNCLEAN);
2996  
2997  	buffer->md_size_sect  = cpu_to_be32(device->ldev->md.md_size_sect);
2998  	buffer->al_offset     = cpu_to_be32(device->ldev->md.al_offset);
2999  	buffer->al_nr_extents = cpu_to_be32(device->act_log->nr_elements);
3000  	buffer->bm_bytes_per_bit = cpu_to_be32(BM_BLOCK_SIZE);
3001  	buffer->device_uuid = cpu_to_be64(device->ldev->md.device_uuid);
3002  
3003  	buffer->bm_offset = cpu_to_be32(device->ldev->md.bm_offset);
3004  	buffer->la_peer_max_bio_size = cpu_to_be32(device->peer_max_bio_size);
3005  
3006  	buffer->al_stripes = cpu_to_be32(device->ldev->md.al_stripes);
3007  	buffer->al_stripe_size_4k = cpu_to_be32(device->ldev->md.al_stripe_size_4k);
3008  
3009  	D_ASSERT(device, drbd_md_ss(device->ldev) == device->ldev->md.md_offset);
3010  	sector = device->ldev->md.md_offset;
3011  
3012  	if (drbd_md_sync_page_io(device, device->ldev, sector, REQ_OP_WRITE)) {
3013  		/* this was a try anyways ... */
3014  		drbd_err(device, "meta data update failed!\n");
3015  		drbd_chk_io_error(device, 1, DRBD_META_IO_ERROR);
3016  	}
3017  }
3018  
3019  /**
3020   * drbd_md_sync() - Writes the meta data super block if the MD_DIRTY flag bit is set
3021   * @device:	DRBD device.
3022   */
drbd_md_sync(struct drbd_device * device)3023  void drbd_md_sync(struct drbd_device *device)
3024  {
3025  	struct meta_data_on_disk *buffer;
3026  
3027  	/* Don't accidentally change the DRBD meta data layout. */
3028  	BUILD_BUG_ON(UI_SIZE != 4);
3029  	BUILD_BUG_ON(sizeof(struct meta_data_on_disk) != 4096);
3030  
3031  	del_timer(&device->md_sync_timer);
3032  	/* timer may be rearmed by drbd_md_mark_dirty() now. */
3033  	if (!test_and_clear_bit(MD_DIRTY, &device->flags))
3034  		return;
3035  
3036  	/* We use here D_FAILED and not D_ATTACHING because we try to write
3037  	 * metadata even if we detach due to a disk failure! */
3038  	if (!get_ldev_if_state(device, D_FAILED))
3039  		return;
3040  
3041  	buffer = drbd_md_get_buffer(device, __func__);
3042  	if (!buffer)
3043  		goto out;
3044  
3045  	drbd_md_write(device, buffer);
3046  
3047  	/* Update device->ldev->md.la_size_sect,
3048  	 * since we updated it on metadata. */
3049  	device->ldev->md.la_size_sect = get_capacity(device->vdisk);
3050  
3051  	drbd_md_put_buffer(device);
3052  out:
3053  	put_ldev(device);
3054  }
3055  
check_activity_log_stripe_size(struct drbd_device * device,struct meta_data_on_disk * on_disk,struct drbd_md * in_core)3056  static int check_activity_log_stripe_size(struct drbd_device *device,
3057  		struct meta_data_on_disk *on_disk,
3058  		struct drbd_md *in_core)
3059  {
3060  	u32 al_stripes = be32_to_cpu(on_disk->al_stripes);
3061  	u32 al_stripe_size_4k = be32_to_cpu(on_disk->al_stripe_size_4k);
3062  	u64 al_size_4k;
3063  
3064  	/* both not set: default to old fixed size activity log */
3065  	if (al_stripes == 0 && al_stripe_size_4k == 0) {
3066  		al_stripes = 1;
3067  		al_stripe_size_4k = MD_32kB_SECT/8;
3068  	}
3069  
3070  	/* some paranoia plausibility checks */
3071  
3072  	/* we need both values to be set */
3073  	if (al_stripes == 0 || al_stripe_size_4k == 0)
3074  		goto err;
3075  
3076  	al_size_4k = (u64)al_stripes * al_stripe_size_4k;
3077  
3078  	/* Upper limit of activity log area, to avoid potential overflow
3079  	 * problems in al_tr_number_to_on_disk_sector(). As right now, more
3080  	 * than 72 * 4k blocks total only increases the amount of history,
3081  	 * limiting this arbitrarily to 16 GB is not a real limitation ;-)  */
3082  	if (al_size_4k > (16 * 1024 * 1024/4))
3083  		goto err;
3084  
3085  	/* Lower limit: we need at least 8 transaction slots (32kB)
3086  	 * to not break existing setups */
3087  	if (al_size_4k < MD_32kB_SECT/8)
3088  		goto err;
3089  
3090  	in_core->al_stripe_size_4k = al_stripe_size_4k;
3091  	in_core->al_stripes = al_stripes;
3092  	in_core->al_size_4k = al_size_4k;
3093  
3094  	return 0;
3095  err:
3096  	drbd_err(device, "invalid activity log striping: al_stripes=%u, al_stripe_size_4k=%u\n",
3097  			al_stripes, al_stripe_size_4k);
3098  	return -EINVAL;
3099  }
3100  
check_offsets_and_sizes(struct drbd_device * device,struct drbd_backing_dev * bdev)3101  static int check_offsets_and_sizes(struct drbd_device *device, struct drbd_backing_dev *bdev)
3102  {
3103  	sector_t capacity = drbd_get_capacity(bdev->md_bdev);
3104  	struct drbd_md *in_core = &bdev->md;
3105  	s32 on_disk_al_sect;
3106  	s32 on_disk_bm_sect;
3107  
3108  	/* The on-disk size of the activity log, calculated from offsets, and
3109  	 * the size of the activity log calculated from the stripe settings,
3110  	 * should match.
3111  	 * Though we could relax this a bit: it is ok, if the striped activity log
3112  	 * fits in the available on-disk activity log size.
3113  	 * Right now, that would break how resize is implemented.
3114  	 * TODO: make drbd_determine_dev_size() (and the drbdmeta tool) aware
3115  	 * of possible unused padding space in the on disk layout. */
3116  	if (in_core->al_offset < 0) {
3117  		if (in_core->bm_offset > in_core->al_offset)
3118  			goto err;
3119  		on_disk_al_sect = -in_core->al_offset;
3120  		on_disk_bm_sect = in_core->al_offset - in_core->bm_offset;
3121  	} else {
3122  		if (in_core->al_offset != MD_4kB_SECT)
3123  			goto err;
3124  		if (in_core->bm_offset < in_core->al_offset + in_core->al_size_4k * MD_4kB_SECT)
3125  			goto err;
3126  
3127  		on_disk_al_sect = in_core->bm_offset - MD_4kB_SECT;
3128  		on_disk_bm_sect = in_core->md_size_sect - in_core->bm_offset;
3129  	}
3130  
3131  	/* old fixed size meta data is exactly that: fixed. */
3132  	if (in_core->meta_dev_idx >= 0) {
3133  		if (in_core->md_size_sect != MD_128MB_SECT
3134  		||  in_core->al_offset != MD_4kB_SECT
3135  		||  in_core->bm_offset != MD_4kB_SECT + MD_32kB_SECT
3136  		||  in_core->al_stripes != 1
3137  		||  in_core->al_stripe_size_4k != MD_32kB_SECT/8)
3138  			goto err;
3139  	}
3140  
3141  	if (capacity < in_core->md_size_sect)
3142  		goto err;
3143  	if (capacity - in_core->md_size_sect < drbd_md_first_sector(bdev))
3144  		goto err;
3145  
3146  	/* should be aligned, and at least 32k */
3147  	if ((on_disk_al_sect & 7) || (on_disk_al_sect < MD_32kB_SECT))
3148  		goto err;
3149  
3150  	/* should fit (for now: exactly) into the available on-disk space;
3151  	 * overflow prevention is in check_activity_log_stripe_size() above. */
3152  	if (on_disk_al_sect != in_core->al_size_4k * MD_4kB_SECT)
3153  		goto err;
3154  
3155  	/* again, should be aligned */
3156  	if (in_core->bm_offset & 7)
3157  		goto err;
3158  
3159  	/* FIXME check for device grow with flex external meta data? */
3160  
3161  	/* can the available bitmap space cover the last agreed device size? */
3162  	if (on_disk_bm_sect < (in_core->la_size_sect+7)/MD_4kB_SECT/8/512)
3163  		goto err;
3164  
3165  	return 0;
3166  
3167  err:
3168  	drbd_err(device, "meta data offsets don't make sense: idx=%d "
3169  			"al_s=%u, al_sz4k=%u, al_offset=%d, bm_offset=%d, "
3170  			"md_size_sect=%u, la_size=%llu, md_capacity=%llu\n",
3171  			in_core->meta_dev_idx,
3172  			in_core->al_stripes, in_core->al_stripe_size_4k,
3173  			in_core->al_offset, in_core->bm_offset, in_core->md_size_sect,
3174  			(unsigned long long)in_core->la_size_sect,
3175  			(unsigned long long)capacity);
3176  
3177  	return -EINVAL;
3178  }
3179  
3180  
3181  /**
3182   * drbd_md_read() - Reads in the meta data super block
3183   * @device:	DRBD device.
3184   * @bdev:	Device from which the meta data should be read in.
3185   *
3186   * Return NO_ERROR on success, and an enum drbd_ret_code in case
3187   * something goes wrong.
3188   *
3189   * Called exactly once during drbd_adm_attach(), while still being D_DISKLESS,
3190   * even before @bdev is assigned to @device->ldev.
3191   */
drbd_md_read(struct drbd_device * device,struct drbd_backing_dev * bdev)3192  int drbd_md_read(struct drbd_device *device, struct drbd_backing_dev *bdev)
3193  {
3194  	struct meta_data_on_disk *buffer;
3195  	u32 magic, flags;
3196  	int i, rv = NO_ERROR;
3197  
3198  	if (device->state.disk != D_DISKLESS)
3199  		return ERR_DISK_CONFIGURED;
3200  
3201  	buffer = drbd_md_get_buffer(device, __func__);
3202  	if (!buffer)
3203  		return ERR_NOMEM;
3204  
3205  	/* First, figure out where our meta data superblock is located,
3206  	 * and read it. */
3207  	bdev->md.meta_dev_idx = bdev->disk_conf->meta_dev_idx;
3208  	bdev->md.md_offset = drbd_md_ss(bdev);
3209  	/* Even for (flexible or indexed) external meta data,
3210  	 * initially restrict us to the 4k superblock for now.
3211  	 * Affects the paranoia out-of-range access check in drbd_md_sync_page_io(). */
3212  	bdev->md.md_size_sect = 8;
3213  
3214  	if (drbd_md_sync_page_io(device, bdev, bdev->md.md_offset,
3215  				 REQ_OP_READ)) {
3216  		/* NOTE: can't do normal error processing here as this is
3217  		   called BEFORE disk is attached */
3218  		drbd_err(device, "Error while reading metadata.\n");
3219  		rv = ERR_IO_MD_DISK;
3220  		goto err;
3221  	}
3222  
3223  	magic = be32_to_cpu(buffer->magic);
3224  	flags = be32_to_cpu(buffer->flags);
3225  	if (magic == DRBD_MD_MAGIC_84_UNCLEAN ||
3226  	    (magic == DRBD_MD_MAGIC_08 && !(flags & MDF_AL_CLEAN))) {
3227  			/* btw: that's Activity Log clean, not "all" clean. */
3228  		drbd_err(device, "Found unclean meta data. Did you \"drbdadm apply-al\"?\n");
3229  		rv = ERR_MD_UNCLEAN;
3230  		goto err;
3231  	}
3232  
3233  	rv = ERR_MD_INVALID;
3234  	if (magic != DRBD_MD_MAGIC_08) {
3235  		if (magic == DRBD_MD_MAGIC_07)
3236  			drbd_err(device, "Found old (0.7) meta data magic. Did you \"drbdadm create-md\"?\n");
3237  		else
3238  			drbd_err(device, "Meta data magic not found. Did you \"drbdadm create-md\"?\n");
3239  		goto err;
3240  	}
3241  
3242  	if (be32_to_cpu(buffer->bm_bytes_per_bit) != BM_BLOCK_SIZE) {
3243  		drbd_err(device, "unexpected bm_bytes_per_bit: %u (expected %u)\n",
3244  		    be32_to_cpu(buffer->bm_bytes_per_bit), BM_BLOCK_SIZE);
3245  		goto err;
3246  	}
3247  
3248  
3249  	/* convert to in_core endian */
3250  	bdev->md.la_size_sect = be64_to_cpu(buffer->la_size_sect);
3251  	for (i = UI_CURRENT; i < UI_SIZE; i++)
3252  		bdev->md.uuid[i] = be64_to_cpu(buffer->uuid[i]);
3253  	bdev->md.flags = be32_to_cpu(buffer->flags);
3254  	bdev->md.device_uuid = be64_to_cpu(buffer->device_uuid);
3255  
3256  	bdev->md.md_size_sect = be32_to_cpu(buffer->md_size_sect);
3257  	bdev->md.al_offset = be32_to_cpu(buffer->al_offset);
3258  	bdev->md.bm_offset = be32_to_cpu(buffer->bm_offset);
3259  
3260  	if (check_activity_log_stripe_size(device, buffer, &bdev->md))
3261  		goto err;
3262  	if (check_offsets_and_sizes(device, bdev))
3263  		goto err;
3264  
3265  	if (be32_to_cpu(buffer->bm_offset) != bdev->md.bm_offset) {
3266  		drbd_err(device, "unexpected bm_offset: %d (expected %d)\n",
3267  		    be32_to_cpu(buffer->bm_offset), bdev->md.bm_offset);
3268  		goto err;
3269  	}
3270  	if (be32_to_cpu(buffer->md_size_sect) != bdev->md.md_size_sect) {
3271  		drbd_err(device, "unexpected md_size: %u (expected %u)\n",
3272  		    be32_to_cpu(buffer->md_size_sect), bdev->md.md_size_sect);
3273  		goto err;
3274  	}
3275  
3276  	rv = NO_ERROR;
3277  
3278  	spin_lock_irq(&device->resource->req_lock);
3279  	if (device->state.conn < C_CONNECTED) {
3280  		unsigned int peer;
3281  		peer = be32_to_cpu(buffer->la_peer_max_bio_size);
3282  		peer = max(peer, DRBD_MAX_BIO_SIZE_SAFE);
3283  		device->peer_max_bio_size = peer;
3284  	}
3285  	spin_unlock_irq(&device->resource->req_lock);
3286  
3287   err:
3288  	drbd_md_put_buffer(device);
3289  
3290  	return rv;
3291  }
3292  
3293  /**
3294   * drbd_md_mark_dirty() - Mark meta data super block as dirty
3295   * @device:	DRBD device.
3296   *
3297   * Call this function if you change anything that should be written to
3298   * the meta-data super block. This function sets MD_DIRTY, and starts a
3299   * timer that ensures that within five seconds you have to call drbd_md_sync().
3300   */
drbd_md_mark_dirty(struct drbd_device * device)3301  void drbd_md_mark_dirty(struct drbd_device *device)
3302  {
3303  	if (!test_and_set_bit(MD_DIRTY, &device->flags))
3304  		mod_timer(&device->md_sync_timer, jiffies + 5*HZ);
3305  }
3306  
drbd_uuid_move_history(struct drbd_device * device)3307  void drbd_uuid_move_history(struct drbd_device *device) __must_hold(local)
3308  {
3309  	int i;
3310  
3311  	for (i = UI_HISTORY_START; i < UI_HISTORY_END; i++)
3312  		device->ldev->md.uuid[i+1] = device->ldev->md.uuid[i];
3313  }
3314  
__drbd_uuid_set(struct drbd_device * device,int idx,u64 val)3315  void __drbd_uuid_set(struct drbd_device *device, int idx, u64 val) __must_hold(local)
3316  {
3317  	if (idx == UI_CURRENT) {
3318  		if (device->state.role == R_PRIMARY)
3319  			val |= 1;
3320  		else
3321  			val &= ~((u64)1);
3322  
3323  		drbd_set_ed_uuid(device, val);
3324  	}
3325  
3326  	device->ldev->md.uuid[idx] = val;
3327  	drbd_md_mark_dirty(device);
3328  }
3329  
_drbd_uuid_set(struct drbd_device * device,int idx,u64 val)3330  void _drbd_uuid_set(struct drbd_device *device, int idx, u64 val) __must_hold(local)
3331  {
3332  	unsigned long flags;
3333  	spin_lock_irqsave(&device->ldev->md.uuid_lock, flags);
3334  	__drbd_uuid_set(device, idx, val);
3335  	spin_unlock_irqrestore(&device->ldev->md.uuid_lock, flags);
3336  }
3337  
drbd_uuid_set(struct drbd_device * device,int idx,u64 val)3338  void drbd_uuid_set(struct drbd_device *device, int idx, u64 val) __must_hold(local)
3339  {
3340  	unsigned long flags;
3341  	spin_lock_irqsave(&device->ldev->md.uuid_lock, flags);
3342  	if (device->ldev->md.uuid[idx]) {
3343  		drbd_uuid_move_history(device);
3344  		device->ldev->md.uuid[UI_HISTORY_START] = device->ldev->md.uuid[idx];
3345  	}
3346  	__drbd_uuid_set(device, idx, val);
3347  	spin_unlock_irqrestore(&device->ldev->md.uuid_lock, flags);
3348  }
3349  
3350  /**
3351   * drbd_uuid_new_current() - Creates a new current UUID
3352   * @device:	DRBD device.
3353   *
3354   * Creates a new current UUID, and rotates the old current UUID into
3355   * the bitmap slot. Causes an incremental resync upon next connect.
3356   */
drbd_uuid_new_current(struct drbd_device * device)3357  void drbd_uuid_new_current(struct drbd_device *device) __must_hold(local)
3358  {
3359  	u64 val;
3360  	unsigned long long bm_uuid;
3361  
3362  	get_random_bytes(&val, sizeof(u64));
3363  
3364  	spin_lock_irq(&device->ldev->md.uuid_lock);
3365  	bm_uuid = device->ldev->md.uuid[UI_BITMAP];
3366  
3367  	if (bm_uuid)
3368  		drbd_warn(device, "bm UUID was already set: %llX\n", bm_uuid);
3369  
3370  	device->ldev->md.uuid[UI_BITMAP] = device->ldev->md.uuid[UI_CURRENT];
3371  	__drbd_uuid_set(device, UI_CURRENT, val);
3372  	spin_unlock_irq(&device->ldev->md.uuid_lock);
3373  
3374  	drbd_print_uuids(device, "new current UUID");
3375  	/* get it to stable storage _now_ */
3376  	drbd_md_sync(device);
3377  }
3378  
drbd_uuid_set_bm(struct drbd_device * device,u64 val)3379  void drbd_uuid_set_bm(struct drbd_device *device, u64 val) __must_hold(local)
3380  {
3381  	unsigned long flags;
3382  	if (device->ldev->md.uuid[UI_BITMAP] == 0 && val == 0)
3383  		return;
3384  
3385  	spin_lock_irqsave(&device->ldev->md.uuid_lock, flags);
3386  	if (val == 0) {
3387  		drbd_uuid_move_history(device);
3388  		device->ldev->md.uuid[UI_HISTORY_START] = device->ldev->md.uuid[UI_BITMAP];
3389  		device->ldev->md.uuid[UI_BITMAP] = 0;
3390  	} else {
3391  		unsigned long long bm_uuid = device->ldev->md.uuid[UI_BITMAP];
3392  		if (bm_uuid)
3393  			drbd_warn(device, "bm UUID was already set: %llX\n", bm_uuid);
3394  
3395  		device->ldev->md.uuid[UI_BITMAP] = val & ~((u64)1);
3396  	}
3397  	spin_unlock_irqrestore(&device->ldev->md.uuid_lock, flags);
3398  
3399  	drbd_md_mark_dirty(device);
3400  }
3401  
3402  /**
3403   * drbd_bmio_set_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bitmap_io()
3404   * @device:	DRBD device.
3405   *
3406   * Sets all bits in the bitmap and writes the whole bitmap to stable storage.
3407   */
drbd_bmio_set_n_write(struct drbd_device * device)3408  int drbd_bmio_set_n_write(struct drbd_device *device) __must_hold(local)
3409  {
3410  	int rv = -EIO;
3411  
3412  	drbd_md_set_flag(device, MDF_FULL_SYNC);
3413  	drbd_md_sync(device);
3414  	drbd_bm_set_all(device);
3415  
3416  	rv = drbd_bm_write(device);
3417  
3418  	if (!rv) {
3419  		drbd_md_clear_flag(device, MDF_FULL_SYNC);
3420  		drbd_md_sync(device);
3421  	}
3422  
3423  	return rv;
3424  }
3425  
3426  /**
3427   * drbd_bmio_clear_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bitmap_io()
3428   * @device:	DRBD device.
3429   *
3430   * Clears all bits in the bitmap and writes the whole bitmap to stable storage.
3431   */
drbd_bmio_clear_n_write(struct drbd_device * device)3432  int drbd_bmio_clear_n_write(struct drbd_device *device) __must_hold(local)
3433  {
3434  	drbd_resume_al(device);
3435  	drbd_bm_clear_all(device);
3436  	return drbd_bm_write(device);
3437  }
3438  
w_bitmap_io(struct drbd_work * w,int unused)3439  static int w_bitmap_io(struct drbd_work *w, int unused)
3440  {
3441  	struct drbd_device *device =
3442  		container_of(w, struct drbd_device, bm_io_work.w);
3443  	struct bm_io_work *work = &device->bm_io_work;
3444  	int rv = -EIO;
3445  
3446  	if (work->flags != BM_LOCKED_CHANGE_ALLOWED) {
3447  		int cnt = atomic_read(&device->ap_bio_cnt);
3448  		if (cnt)
3449  			drbd_err(device, "FIXME: ap_bio_cnt %d, expected 0; queued for '%s'\n",
3450  					cnt, work->why);
3451  	}
3452  
3453  	if (get_ldev(device)) {
3454  		drbd_bm_lock(device, work->why, work->flags);
3455  		rv = work->io_fn(device);
3456  		drbd_bm_unlock(device);
3457  		put_ldev(device);
3458  	}
3459  
3460  	clear_bit_unlock(BITMAP_IO, &device->flags);
3461  	wake_up(&device->misc_wait);
3462  
3463  	if (work->done)
3464  		work->done(device, rv);
3465  
3466  	clear_bit(BITMAP_IO_QUEUED, &device->flags);
3467  	work->why = NULL;
3468  	work->flags = 0;
3469  
3470  	return 0;
3471  }
3472  
3473  /**
3474   * drbd_queue_bitmap_io() - Queues an IO operation on the whole bitmap
3475   * @device:	DRBD device.
3476   * @io_fn:	IO callback to be called when bitmap IO is possible
3477   * @done:	callback to be called after the bitmap IO was performed
3478   * @why:	Descriptive text of the reason for doing the IO
3479   * @flags:	Bitmap flags
3480   *
3481   * While IO on the bitmap happens we freeze application IO thus we ensure
3482   * that drbd_set_out_of_sync() can not be called. This function MAY ONLY be
3483   * called from worker context. It MUST NOT be used while a previous such
3484   * work is still pending!
3485   *
3486   * Its worker function encloses the call of io_fn() by get_ldev() and
3487   * put_ldev().
3488   */
drbd_queue_bitmap_io(struct drbd_device * device,int (* io_fn)(struct drbd_device *),void (* done)(struct drbd_device *,int),char * why,enum bm_flag flags)3489  void drbd_queue_bitmap_io(struct drbd_device *device,
3490  			  int (*io_fn)(struct drbd_device *),
3491  			  void (*done)(struct drbd_device *, int),
3492  			  char *why, enum bm_flag flags)
3493  {
3494  	D_ASSERT(device, current == first_peer_device(device)->connection->worker.task);
3495  
3496  	D_ASSERT(device, !test_bit(BITMAP_IO_QUEUED, &device->flags));
3497  	D_ASSERT(device, !test_bit(BITMAP_IO, &device->flags));
3498  	D_ASSERT(device, list_empty(&device->bm_io_work.w.list));
3499  	if (device->bm_io_work.why)
3500  		drbd_err(device, "FIXME going to queue '%s' but '%s' still pending?\n",
3501  			why, device->bm_io_work.why);
3502  
3503  	device->bm_io_work.io_fn = io_fn;
3504  	device->bm_io_work.done = done;
3505  	device->bm_io_work.why = why;
3506  	device->bm_io_work.flags = flags;
3507  
3508  	spin_lock_irq(&device->resource->req_lock);
3509  	set_bit(BITMAP_IO, &device->flags);
3510  	/* don't wait for pending application IO if the caller indicates that
3511  	 * application IO does not conflict anyways. */
3512  	if (flags == BM_LOCKED_CHANGE_ALLOWED || atomic_read(&device->ap_bio_cnt) == 0) {
3513  		if (!test_and_set_bit(BITMAP_IO_QUEUED, &device->flags))
3514  			drbd_queue_work(&first_peer_device(device)->connection->sender_work,
3515  					&device->bm_io_work.w);
3516  	}
3517  	spin_unlock_irq(&device->resource->req_lock);
3518  }
3519  
3520  /**
3521   * drbd_bitmap_io() -  Does an IO operation on the whole bitmap
3522   * @device:	DRBD device.
3523   * @io_fn:	IO callback to be called when bitmap IO is possible
3524   * @why:	Descriptive text of the reason for doing the IO
3525   * @flags:	Bitmap flags
3526   *
3527   * freezes application IO while that the actual IO operations runs. This
3528   * functions MAY NOT be called from worker context.
3529   */
drbd_bitmap_io(struct drbd_device * device,int (* io_fn)(struct drbd_device *),char * why,enum bm_flag flags)3530  int drbd_bitmap_io(struct drbd_device *device, int (*io_fn)(struct drbd_device *),
3531  		char *why, enum bm_flag flags)
3532  {
3533  	/* Only suspend io, if some operation is supposed to be locked out */
3534  	const bool do_suspend_io = flags & (BM_DONT_CLEAR|BM_DONT_SET|BM_DONT_TEST);
3535  	int rv;
3536  
3537  	D_ASSERT(device, current != first_peer_device(device)->connection->worker.task);
3538  
3539  	if (do_suspend_io)
3540  		drbd_suspend_io(device);
3541  
3542  	drbd_bm_lock(device, why, flags);
3543  	rv = io_fn(device);
3544  	drbd_bm_unlock(device);
3545  
3546  	if (do_suspend_io)
3547  		drbd_resume_io(device);
3548  
3549  	return rv;
3550  }
3551  
drbd_md_set_flag(struct drbd_device * device,int flag)3552  void drbd_md_set_flag(struct drbd_device *device, int flag) __must_hold(local)
3553  {
3554  	if ((device->ldev->md.flags & flag) != flag) {
3555  		drbd_md_mark_dirty(device);
3556  		device->ldev->md.flags |= flag;
3557  	}
3558  }
3559  
drbd_md_clear_flag(struct drbd_device * device,int flag)3560  void drbd_md_clear_flag(struct drbd_device *device, int flag) __must_hold(local)
3561  {
3562  	if ((device->ldev->md.flags & flag) != 0) {
3563  		drbd_md_mark_dirty(device);
3564  		device->ldev->md.flags &= ~flag;
3565  	}
3566  }
drbd_md_test_flag(struct drbd_backing_dev * bdev,int flag)3567  int drbd_md_test_flag(struct drbd_backing_dev *bdev, int flag)
3568  {
3569  	return (bdev->md.flags & flag) != 0;
3570  }
3571  
md_sync_timer_fn(struct timer_list * t)3572  static void md_sync_timer_fn(struct timer_list *t)
3573  {
3574  	struct drbd_device *device = from_timer(device, t, md_sync_timer);
3575  	drbd_device_post_work(device, MD_SYNC);
3576  }
3577  
cmdname(enum drbd_packet cmd)3578  const char *cmdname(enum drbd_packet cmd)
3579  {
3580  	/* THINK may need to become several global tables
3581  	 * when we want to support more than
3582  	 * one PRO_VERSION */
3583  	static const char *cmdnames[] = {
3584  
3585  		[P_DATA]	        = "Data",
3586  		[P_DATA_REPLY]	        = "DataReply",
3587  		[P_RS_DATA_REPLY]	= "RSDataReply",
3588  		[P_BARRIER]	        = "Barrier",
3589  		[P_BITMAP]	        = "ReportBitMap",
3590  		[P_BECOME_SYNC_TARGET]  = "BecomeSyncTarget",
3591  		[P_BECOME_SYNC_SOURCE]  = "BecomeSyncSource",
3592  		[P_UNPLUG_REMOTE]	= "UnplugRemote",
3593  		[P_DATA_REQUEST]	= "DataRequest",
3594  		[P_RS_DATA_REQUEST]     = "RSDataRequest",
3595  		[P_SYNC_PARAM]	        = "SyncParam",
3596  		[P_PROTOCOL]            = "ReportProtocol",
3597  		[P_UUIDS]	        = "ReportUUIDs",
3598  		[P_SIZES]	        = "ReportSizes",
3599  		[P_STATE]	        = "ReportState",
3600  		[P_SYNC_UUID]           = "ReportSyncUUID",
3601  		[P_AUTH_CHALLENGE]      = "AuthChallenge",
3602  		[P_AUTH_RESPONSE]	= "AuthResponse",
3603  		[P_STATE_CHG_REQ]       = "StateChgRequest",
3604  		[P_PING]		= "Ping",
3605  		[P_PING_ACK]	        = "PingAck",
3606  		[P_RECV_ACK]	        = "RecvAck",
3607  		[P_WRITE_ACK]	        = "WriteAck",
3608  		[P_RS_WRITE_ACK]	= "RSWriteAck",
3609  		[P_SUPERSEDED]          = "Superseded",
3610  		[P_NEG_ACK]	        = "NegAck",
3611  		[P_NEG_DREPLY]	        = "NegDReply",
3612  		[P_NEG_RS_DREPLY]	= "NegRSDReply",
3613  		[P_BARRIER_ACK]	        = "BarrierAck",
3614  		[P_STATE_CHG_REPLY]     = "StateChgReply",
3615  		[P_OV_REQUEST]          = "OVRequest",
3616  		[P_OV_REPLY]            = "OVReply",
3617  		[P_OV_RESULT]           = "OVResult",
3618  		[P_CSUM_RS_REQUEST]     = "CsumRSRequest",
3619  		[P_RS_IS_IN_SYNC]	= "CsumRSIsInSync",
3620  		[P_SYNC_PARAM89]	= "SyncParam89",
3621  		[P_COMPRESSED_BITMAP]   = "CBitmap",
3622  		[P_DELAY_PROBE]         = "DelayProbe",
3623  		[P_OUT_OF_SYNC]		= "OutOfSync",
3624  		[P_RS_CANCEL]		= "RSCancel",
3625  		[P_CONN_ST_CHG_REQ]	= "conn_st_chg_req",
3626  		[P_CONN_ST_CHG_REPLY]	= "conn_st_chg_reply",
3627  		[P_PROTOCOL_UPDATE]	= "protocol_update",
3628  		[P_TRIM]	        = "Trim",
3629  		[P_RS_THIN_REQ]         = "rs_thin_req",
3630  		[P_RS_DEALLOCATED]      = "rs_deallocated",
3631  		[P_WSAME]	        = "WriteSame",
3632  		[P_ZEROES]		= "Zeroes",
3633  
3634  		/* enum drbd_packet, but not commands - obsoleted flags:
3635  		 *	P_MAY_IGNORE
3636  		 *	P_MAX_OPT_CMD
3637  		 */
3638  	};
3639  
3640  	/* too big for the array: 0xfffX */
3641  	if (cmd == P_INITIAL_META)
3642  		return "InitialMeta";
3643  	if (cmd == P_INITIAL_DATA)
3644  		return "InitialData";
3645  	if (cmd == P_CONNECTION_FEATURES)
3646  		return "ConnectionFeatures";
3647  	if (cmd >= ARRAY_SIZE(cmdnames))
3648  		return "Unknown";
3649  	return cmdnames[cmd];
3650  }
3651  
3652  /**
3653   * drbd_wait_misc  -  wait for a request to make progress
3654   * @device:	device associated with the request
3655   * @i:		the struct drbd_interval embedded in struct drbd_request or
3656   *		struct drbd_peer_request
3657   */
drbd_wait_misc(struct drbd_device * device,struct drbd_interval * i)3658  int drbd_wait_misc(struct drbd_device *device, struct drbd_interval *i)
3659  {
3660  	struct net_conf *nc;
3661  	DEFINE_WAIT(wait);
3662  	long timeout;
3663  
3664  	rcu_read_lock();
3665  	nc = rcu_dereference(first_peer_device(device)->connection->net_conf);
3666  	if (!nc) {
3667  		rcu_read_unlock();
3668  		return -ETIMEDOUT;
3669  	}
3670  	timeout = nc->ko_count ? nc->timeout * HZ / 10 * nc->ko_count : MAX_SCHEDULE_TIMEOUT;
3671  	rcu_read_unlock();
3672  
3673  	/* Indicate to wake up device->misc_wait on progress.  */
3674  	i->waiting = true;
3675  	prepare_to_wait(&device->misc_wait, &wait, TASK_INTERRUPTIBLE);
3676  	spin_unlock_irq(&device->resource->req_lock);
3677  	timeout = schedule_timeout(timeout);
3678  	finish_wait(&device->misc_wait, &wait);
3679  	spin_lock_irq(&device->resource->req_lock);
3680  	if (!timeout || device->state.conn < C_CONNECTED)
3681  		return -ETIMEDOUT;
3682  	if (signal_pending(current))
3683  		return -ERESTARTSYS;
3684  	return 0;
3685  }
3686  
lock_all_resources(void)3687  void lock_all_resources(void)
3688  {
3689  	struct drbd_resource *resource;
3690  	int __maybe_unused i = 0;
3691  
3692  	mutex_lock(&resources_mutex);
3693  	local_irq_disable();
3694  	for_each_resource(resource, &drbd_resources)
3695  		spin_lock_nested(&resource->req_lock, i++);
3696  }
3697  
unlock_all_resources(void)3698  void unlock_all_resources(void)
3699  {
3700  	struct drbd_resource *resource;
3701  
3702  	for_each_resource(resource, &drbd_resources)
3703  		spin_unlock(&resource->req_lock);
3704  	local_irq_enable();
3705  	mutex_unlock(&resources_mutex);
3706  }
3707  
3708  #ifdef CONFIG_DRBD_FAULT_INJECTION
3709  /* Fault insertion support including random number generator shamelessly
3710   * stolen from kernel/rcutorture.c */
3711  struct fault_random_state {
3712  	unsigned long state;
3713  	unsigned long count;
3714  };
3715  
3716  #define FAULT_RANDOM_MULT 39916801  /* prime */
3717  #define FAULT_RANDOM_ADD	479001701 /* prime */
3718  #define FAULT_RANDOM_REFRESH 10000
3719  
3720  /*
3721   * Crude but fast random-number generator.  Uses a linear congruential
3722   * generator, with occasional help from get_random_bytes().
3723   */
3724  static unsigned long
_drbd_fault_random(struct fault_random_state * rsp)3725  _drbd_fault_random(struct fault_random_state *rsp)
3726  {
3727  	long refresh;
3728  
3729  	if (!rsp->count--) {
3730  		get_random_bytes(&refresh, sizeof(refresh));
3731  		rsp->state += refresh;
3732  		rsp->count = FAULT_RANDOM_REFRESH;
3733  	}
3734  	rsp->state = rsp->state * FAULT_RANDOM_MULT + FAULT_RANDOM_ADD;
3735  	return swahw32(rsp->state);
3736  }
3737  
3738  static char *
_drbd_fault_str(unsigned int type)3739  _drbd_fault_str(unsigned int type) {
3740  	static char *_faults[] = {
3741  		[DRBD_FAULT_MD_WR] = "Meta-data write",
3742  		[DRBD_FAULT_MD_RD] = "Meta-data read",
3743  		[DRBD_FAULT_RS_WR] = "Resync write",
3744  		[DRBD_FAULT_RS_RD] = "Resync read",
3745  		[DRBD_FAULT_DT_WR] = "Data write",
3746  		[DRBD_FAULT_DT_RD] = "Data read",
3747  		[DRBD_FAULT_DT_RA] = "Data read ahead",
3748  		[DRBD_FAULT_BM_ALLOC] = "BM allocation",
3749  		[DRBD_FAULT_AL_EE] = "EE allocation",
3750  		[DRBD_FAULT_RECEIVE] = "receive data corruption",
3751  	};
3752  
3753  	return (type < DRBD_FAULT_MAX) ? _faults[type] : "**Unknown**";
3754  }
3755  
3756  unsigned int
_drbd_insert_fault(struct drbd_device * device,unsigned int type)3757  _drbd_insert_fault(struct drbd_device *device, unsigned int type)
3758  {
3759  	static struct fault_random_state rrs = {0, 0};
3760  
3761  	unsigned int ret = (
3762  		(drbd_fault_devs == 0 ||
3763  			((1 << device_to_minor(device)) & drbd_fault_devs) != 0) &&
3764  		(((_drbd_fault_random(&rrs) % 100) + 1) <= drbd_fault_rate));
3765  
3766  	if (ret) {
3767  		drbd_fault_count++;
3768  
3769  		if (__ratelimit(&drbd_ratelimit_state))
3770  			drbd_warn(device, "***Simulating %s failure\n",
3771  				_drbd_fault_str(type));
3772  	}
3773  
3774  	return ret;
3775  }
3776  #endif
3777  
drbd_buildtag(void)3778  const char *drbd_buildtag(void)
3779  {
3780  	/* DRBD built from external sources has here a reference to the
3781  	   git hash of the source code. */
3782  
3783  	static char buildtag[38] = "\0uilt-in";
3784  
3785  	if (buildtag[0] == 0) {
3786  #ifdef MODULE
3787  		sprintf(buildtag, "srcversion: %-24s", THIS_MODULE->srcversion);
3788  #else
3789  		buildtag[0] = 'b';
3790  #endif
3791  	}
3792  
3793  	return buildtag;
3794  }
3795  
3796  module_init(drbd_init)
3797  module_exit(drbd_cleanup)
3798  
3799  EXPORT_SYMBOL(drbd_conn_str);
3800  EXPORT_SYMBOL(drbd_role_str);
3801  EXPORT_SYMBOL(drbd_disk_str);
3802  EXPORT_SYMBOL(drbd_set_st_err_str);
3803