1 /*
2  * Copyright (c) 2018 CPqD Foundation
3  *
4  * SPDX-License-Identifier: Apache-2.0
5  */
6 
7 #include <dirent.h>
8 #include <errno.h>
9 #include <fcntl.h>
10 #include <glib.h>
11 #include <linux/if_ether.h>
12 #include <pcap/pcap.h>
13 #include <pthread.h>
14 #include <semaphore.h>
15 #include <signal.h>
16 #include <stdarg.h>
17 #include <stdbool.h>
18 #include <stdio.h>
19 #include <stdlib.h>
20 #include <string.h>
21 #include <sys/signalfd.h>
22 #include <sys/time.h>
23 #include <sys/uio.h>
24 #include <syslog.h>
25 #include <unistd.h>
26 
27 #include "graphs.h"
28 
29 #define PIPE_IN ".in"
30 #define PIPE_OUT ".out"
31 
32 #define PACKET_START 0
33 #define FRAME_SIZE 1
34 #define PACKET_PSDU 2
35 
36 static GMainLoop *main_loop;
37 static char **pipe_in;
38 static char **pipe_out;
39 static int *fd_in;
40 static int fdSize;
41 unsigned char **buffer;
42 int number_nodes;
43 float **m;
44 
45 /*semaphore variables */
46 sem_t *sem;
47 int *buffer_index;
48 int *packet_state;
49 int *packet_index;
50 int *packet_len;
51 bool **send;
52 float channel_statistic_send;
53 
54 /* allocate size for arrays */
alloc_size(void)55 void alloc_size(void)
56 {
57 	fd_in = malloc(number_nodes * sizeof(int));
58 	buffer_index = malloc(number_nodes * sizeof(int));
59 	packet_state = malloc(number_nodes * sizeof(int));
60 	packet_index = malloc(number_nodes * sizeof(int));
61 	packet_len = malloc(number_nodes * sizeof(int));
62 
63 	buffer = (unsigned char **) malloc(
64 		number_nodes * sizeof(unsigned char *));
65 
66 	for (int lines = 0; lines < number_nodes; lines++) {
67 		buffer[lines] = (unsigned char *) malloc(
68 			10000 * sizeof(unsigned char));
69 	}
70 
71 	send = (bool **) malloc(number_nodes * sizeof(bool *));
72 	for (int lines = 0; lines < number_nodes; lines++) {
73 		send[lines] = (bool *) malloc(number_nodes * sizeof(bool));
74 	}
75 
76 	pipe_in = (char **) malloc(number_nodes * sizeof(char *));
77 	pipe_out = (char **) malloc(number_nodes * sizeof(char *));
78 	for (int lines = 0; lines < number_nodes; lines++) {
79 		pipe_in[lines] = (char *) malloc(40 * sizeof(char));
80 		pipe_out[lines] = (char *) malloc(40 * sizeof(char));
81 	}
82 }
83 
84 /* free space */
dealloc_size(void)85 void dealloc_size(void)
86 {
87 	free(fd_in);
88 	free(buffer_index);
89 	free(packet_state);
90 	free(packet_index);
91 	free(packet_len);
92 
93 	for (int i = 0; i < number_nodes; i++) {
94 		free(buffer[i]);
95 		free(send[i]);
96 		free(pipe_in[i]);
97 		free(pipe_out[i]);
98 	}
99 	free(buffer);
100 	free(send);
101 	free(pipe_in);
102 	free(pipe_out);
103 }
104 
setup_reachable_nodes(int pos)105 void setup_reachable_nodes(int pos)
106 {
107 	/*set randomly probability to send the packet */
108 	channel_statistic_send = random_nodes();
109 
110 	for (int i = 0; i < number_nodes; i++) {
111 		if (m[pos][i] > channel_statistic_send) {
112 			send[pos][i] = TRUE;
113 		}
114 	}
115 }
116 
reset_reachable_nodes(int pos)117 void reset_reachable_nodes(int pos)
118 {
119 	for (int i = 0; i < number_nodes; i++) {
120 		send[pos][i] = FALSE;
121 	}
122 }
123 
broadcast_data(int pos,unsigned char * buf)124 void broadcast_data(int pos, unsigned char *buf)
125 {
126 	for (int i = 0; i < fdSize; i++) {
127 		if (send[pos][i] == TRUE) {
128 			write(fd_in[i], buf, 1);
129 		}
130 	}
131 }
132 
fifo_handler(GIOChannel * channel,GIOCondition cond,gpointer user_data)133 static gboolean fifo_handler(GIOChannel *channel,
134 			     GIOCondition cond,
135 			     gpointer user_data)
136 {
137 	unsigned char buf[1];
138 	ssize_t result;
139 	int fd;
140 	int pos = GPOINTER_TO_INT(user_data);
141 
142 	if (cond & (G_IO_NVAL | G_IO_ERR | G_IO_HUP)) {
143 		printf("Pipe closed");
144 		return FALSE;
145 	}
146 
147 	memset(buf, 0, sizeof(buf));
148 	fd = g_io_channel_unix_get_fd(channel);
149 
150 	result = read(fd, buf, 1);
151 
152 	if (result != 1) {
153 		printf("Failed to read %lu", result);
154 		return FALSE;
155 	}
156 
157 	if (packet_state[pos] == PACKET_START) {
158 		int ret = sem_trywait(sem);
159 
160 		if (ret != 0) {
161 			buffer[pos][buffer_index[pos]] = *buf;
162 			buffer_index[pos] += 1;
163 			return TRUE;
164 		}
165 	}
166 
167 	if (buffer_index[pos] == 0) {
168 		/* We have no buffer do the regular process */
169 
170 		if (packet_state[pos] == PACKET_START) {
171 			/* packet not started */
172 			if (*buf == 0xF0) {
173 				setup_reachable_nodes(pos);
174 
175 				/* Packet started */
176 				broadcast_data(pos, buf);
177 				packet_state[pos] = FRAME_SIZE;
178 
179 			} else {
180 				/* Wrong byte of start */
181 				printf("Wrong byte for pkt start %d\n", *buf);
182 				sem_post(sem);
183 				return TRUE;
184 			}
185 		} else if (packet_state[pos] == FRAME_SIZE) {
186 			packet_len[pos] = (int)*buf;
187 			broadcast_data(pos, buf);
188 			packet_state[pos] = PACKET_PSDU;
189 		} else if (packet_state[pos] == PACKET_PSDU) {
190 			broadcast_data(pos, buf);
191 			packet_index[pos] += 1;
192 
193 			if (packet_index[pos] == packet_len[pos]) {
194 				packet_state[pos] = 0;
195 				packet_index[pos] = 0;
196 				packet_len[pos] = 0;
197 				reset_reachable_nodes(pos);
198 				printf("Unlock packet %d\n", pos);
199 				sem_post(sem);
200 			}
201 		}
202 	} else {
203 		/* We do have buffer. Go for bulk send */
204 		buffer[pos][buffer_index[pos]] = *buf;
205 		buffer_index[pos] += 1;
206 
207 		for (int i = 0; i < buffer_index[pos]; ++i) {
208 			*buf = buffer[pos][i];
209 			buffer[pos][i] = 0xFF;
210 
211 			if (packet_state[pos] == PACKET_START) {
212 				/* packet not started */
213 				if (*buf == 0xF0) {
214 					setup_reachable_nodes(pos);
215 
216 					/* Packet started */
217 					broadcast_data(pos, buf);
218 					packet_state[pos] = FRAME_SIZE;
219 
220 				} else {
221 					/* Wrong byte of start */
222 					printf("Wrong byte for pkt start %d\n",
223 						*buf);
224 				}
225 			} else if (packet_state[pos] == FRAME_SIZE) {
226 				packet_len[pos] = (int)*buf;
227 				broadcast_data(pos, buf);
228 				packet_state[pos] = PACKET_PSDU;
229 			} else if (packet_state[pos] == PACKET_PSDU) {
230 				broadcast_data(pos, buf);
231 				packet_index[pos] += 1;
232 
233 				if (packet_index[pos] == packet_len[pos]) {
234 					packet_state[pos] = 0;
235 					packet_index[pos] = 0;
236 					packet_len[pos] = 0;
237 					reset_reachable_nodes(pos);
238 				}
239 			}
240 		}
241 
242 		buffer_index[pos] = 0;
243 		if (packet_state[pos] == PACKET_START) {
244 			printf("Unlock buffer %d\n", pos);
245 			sem_post(sem);
246 		}
247 	}
248 
249 finish:
250 	return TRUE;
251 }
252 
setup_fifofd(char * pipe,int pos)253 static int setup_fifofd(char *pipe, int pos)
254 {
255 	GIOChannel *channel;
256 	guint source;
257 	int fd;
258 	char *pipe_new = pipe;
259 
260 	printf("%s\n", pipe_new);
261 
262 	fd = open(pipe_new, O_RDONLY);
263 	if (fd < 0) {
264 		return fd;
265 	}
266 
267 	channel = g_io_channel_unix_new(fd);
268 	g_io_channel_set_close_on_unref(channel, TRUE);
269 
270 	source = g_io_add_watch(channel,
271 			G_IO_IN | G_IO_HUP | G_IO_ERR | G_IO_NVAL,
272 			fifo_handler, GINT_TO_POINTER(pos));
273 
274 	g_io_channel_unref(channel);
275 	return source;
276 }
277 
main(int argc,char * argv[])278 int main(int argc, char *argv[])
279 {
280 	int ret;
281 	char str_num[4];
282 	int currentIndex;
283 
284 	srand((unsigned int) time(NULL));
285 
286 	/* verify if the hub is using graph's */
287 	if (argc == 2) {
288 		m = read_csv(argv[1]);
289 
290 		if (m == NULL) {
291 			printf("Error reading csv file.\n");
292 			return 1;
293 		}
294 
295 		number_nodes = read_lines(argv[1]);
296 	} else {
297 		printf("Wrong input. Expecting: ./hub <graph.csv>");
298 	}
299 
300 	/* alocate the size */
301 	alloc_size();
302 
303 	int fifoTrain[number_nodes];
304 	int open_fifo;
305 
306 	/* initializing variables */
307 	for (int i = 0; i < number_nodes; i++) {
308 		buffer_index[i] = 0;
309 		packet_state[i] = 0;
310 		packet_index[i] = 0;
311 		packet_len[i] = 0;
312 	}
313 
314 	/* Initialize reachable nodes*/
315 	for (int i = 0; i < number_nodes; i++) {
316 		for (int j = 0; j < number_nodes; j++) {
317 			send[i][j] = FALSE;
318 		}
319 	}
320 
321 	sem = (sem_t *) malloc(sizeof(sem_t));
322 	sem_init(sem, 0, 1);
323 
324 	fdSize = number_nodes;
325 
326 	main_loop = g_main_loop_new(NULL, FALSE);
327 
328 	for (int i = 0; i < fdSize; i++) {
329 		snprintf(str_num, sizeof(str_num), "%d", (i + 1));
330 
331 		pipe_in[i] = g_strconcat("/tmp/hub/ip-stack-node",
332 			g_strconcat(str_num, PIPE_IN, NULL), NULL);
333 		pipe_out[i] = g_strconcat("/tmp/hub/ip-stack-node",
334 			g_strconcat(str_num, PIPE_OUT, NULL), NULL);
335 
336 		fifoTrain[i] = setup_fifofd(pipe_out[i], i);
337 
338 		if (fifoTrain[i] > 0) {
339 			open_fifo++;
340 		} else {
341 			printf("Failed to open the fifo node %d\n", (i + 1));
342 			printf("Probably inserted wrong number of nodes"
343 				" or this node doesn't exists\n");
344 			printf("******** Closing virtual-hub **************\n");
345 			ret = -EINVAL;
346 			goto exit;
347 		}
348 	}
349 
350 	if (open_fifo == fdSize) {
351 		printf("-------------------------------\n");
352 		printf("All fifo opened with sucess!!\n");
353 		printf("-------------------------------\n");
354 	}
355 
356 	for (int i = 0; i < fdSize; i++) {
357 		currentIndex = i;
358 		fd_in[i] = open(pipe_in[i], O_WRONLY);
359 
360 		if (fd_in[i] < 0) {
361 			ret = -EINVAL;
362 			goto exit;
363 		}
364 	}
365 
366 	g_main_loop_run(main_loop);
367 	ret = 0;
368 
369 exit:
370 	for (int i = 0; i < currentIndex; i++) {
371 		if (fifoTrain[i] >= 0) {
372 			g_source_remove(fifoTrain[i]);
373 		}
374 
375 		if (fd_in[i] >= 0) {
376 			close(fd_in[i]);
377 		}
378 
379 		g_free(pipe_in[i]);
380 		g_free(pipe_out[i]);
381 	}
382 
383 	sem_destroy(sem);
384 	free_memory(m, number_nodes);
385 	g_main_loop_unref(main_loop);
386 	dealloc_size();
387 
388 	exit(ret);
389 }
390