1 /*
2 * Copyright (c) 2018 CPqD Foundation
3 *
4 * SPDX-License-Identifier: Apache-2.0
5 */
6
7 #include <dirent.h>
8 #include <errno.h>
9 #include <fcntl.h>
10 #include <glib.h>
11 #include <linux/if_ether.h>
12 #include <pcap/pcap.h>
13 #include <pthread.h>
14 #include <semaphore.h>
15 #include <signal.h>
16 #include <stdarg.h>
17 #include <stdbool.h>
18 #include <stdio.h>
19 #include <stdlib.h>
20 #include <string.h>
21 #include <sys/signalfd.h>
22 #include <sys/time.h>
23 #include <sys/uio.h>
24 #include <syslog.h>
25 #include <unistd.h>
26
27 #include "graphs.h"
28
29 #define PIPE_IN ".in"
30 #define PIPE_OUT ".out"
31
32 #define PACKET_START 0
33 #define FRAME_SIZE 1
34 #define PACKET_PSDU 2
35
36 static GMainLoop *main_loop;
37 static char **pipe_in;
38 static char **pipe_out;
39 static int *fd_in;
40 static int fdSize;
41 unsigned char **buffer;
42 int number_nodes;
43 float **m;
44
45 /*semaphore variables */
46 sem_t *sem;
47 int *buffer_index;
48 int *packet_state;
49 int *packet_index;
50 int *packet_len;
51 bool **send;
52 float channel_statistic_send;
53
54 /* allocate size for arrays */
alloc_size(void)55 void alloc_size(void)
56 {
57 fd_in = malloc(number_nodes * sizeof(int));
58 buffer_index = malloc(number_nodes * sizeof(int));
59 packet_state = malloc(number_nodes * sizeof(int));
60 packet_index = malloc(number_nodes * sizeof(int));
61 packet_len = malloc(number_nodes * sizeof(int));
62
63 buffer = (unsigned char **) malloc(
64 number_nodes * sizeof(unsigned char *));
65
66 for (int lines = 0; lines < number_nodes; lines++) {
67 buffer[lines] = (unsigned char *) malloc(
68 10000 * sizeof(unsigned char));
69 }
70
71 send = (bool **) malloc(number_nodes * sizeof(bool *));
72 for (int lines = 0; lines < number_nodes; lines++) {
73 send[lines] = (bool *) malloc(number_nodes * sizeof(bool));
74 }
75
76 pipe_in = (char **) malloc(number_nodes * sizeof(char *));
77 pipe_out = (char **) malloc(number_nodes * sizeof(char *));
78 for (int lines = 0; lines < number_nodes; lines++) {
79 pipe_in[lines] = (char *) malloc(40 * sizeof(char));
80 pipe_out[lines] = (char *) malloc(40 * sizeof(char));
81 }
82 }
83
84 /* free space */
dealloc_size(void)85 void dealloc_size(void)
86 {
87 free(fd_in);
88 free(buffer_index);
89 free(packet_state);
90 free(packet_index);
91 free(packet_len);
92
93 for (int i = 0; i < number_nodes; i++) {
94 free(buffer[i]);
95 free(send[i]);
96 free(pipe_in[i]);
97 free(pipe_out[i]);
98 }
99 free(buffer);
100 free(send);
101 free(pipe_in);
102 free(pipe_out);
103 }
104
setup_reachable_nodes(int pos)105 void setup_reachable_nodes(int pos)
106 {
107 /*set randomly probability to send the packet */
108 channel_statistic_send = random_nodes();
109
110 for (int i = 0; i < number_nodes; i++) {
111 if (m[pos][i] > channel_statistic_send) {
112 send[pos][i] = TRUE;
113 }
114 }
115 }
116
reset_reachable_nodes(int pos)117 void reset_reachable_nodes(int pos)
118 {
119 for (int i = 0; i < number_nodes; i++) {
120 send[pos][i] = FALSE;
121 }
122 }
123
broadcast_data(int pos,unsigned char * buf)124 void broadcast_data(int pos, unsigned char *buf)
125 {
126 for (int i = 0; i < fdSize; i++) {
127 if (send[pos][i] == TRUE) {
128 write(fd_in[i], buf, 1);
129 }
130 }
131 }
132
fifo_handler(GIOChannel * channel,GIOCondition cond,gpointer user_data)133 static gboolean fifo_handler(GIOChannel *channel,
134 GIOCondition cond,
135 gpointer user_data)
136 {
137 unsigned char buf[1];
138 ssize_t result;
139 int fd;
140 int pos = GPOINTER_TO_INT(user_data);
141
142 if (cond & (G_IO_NVAL | G_IO_ERR | G_IO_HUP)) {
143 printf("Pipe closed");
144 return FALSE;
145 }
146
147 memset(buf, 0, sizeof(buf));
148 fd = g_io_channel_unix_get_fd(channel);
149
150 result = read(fd, buf, 1);
151
152 if (result != 1) {
153 printf("Failed to read %lu", result);
154 return FALSE;
155 }
156
157 if (packet_state[pos] == PACKET_START) {
158 int ret = sem_trywait(sem);
159
160 if (ret != 0) {
161 buffer[pos][buffer_index[pos]] = *buf;
162 buffer_index[pos] += 1;
163 return TRUE;
164 }
165 }
166
167 if (buffer_index[pos] == 0) {
168 /* We have no buffer do the regular process */
169
170 if (packet_state[pos] == PACKET_START) {
171 /* packet not started */
172 if (*buf == 0xF0) {
173 setup_reachable_nodes(pos);
174
175 /* Packet started */
176 broadcast_data(pos, buf);
177 packet_state[pos] = FRAME_SIZE;
178
179 } else {
180 /* Wrong byte of start */
181 printf("Wrong byte for pkt start %d\n", *buf);
182 sem_post(sem);
183 return TRUE;
184 }
185 } else if (packet_state[pos] == FRAME_SIZE) {
186 packet_len[pos] = (int)*buf;
187 broadcast_data(pos, buf);
188 packet_state[pos] = PACKET_PSDU;
189 } else if (packet_state[pos] == PACKET_PSDU) {
190 broadcast_data(pos, buf);
191 packet_index[pos] += 1;
192
193 if (packet_index[pos] == packet_len[pos]) {
194 packet_state[pos] = 0;
195 packet_index[pos] = 0;
196 packet_len[pos] = 0;
197 reset_reachable_nodes(pos);
198 printf("Unlock packet %d\n", pos);
199 sem_post(sem);
200 }
201 }
202 } else {
203 /* We do have buffer. Go for bulk send */
204 buffer[pos][buffer_index[pos]] = *buf;
205 buffer_index[pos] += 1;
206
207 for (int i = 0; i < buffer_index[pos]; ++i) {
208 *buf = buffer[pos][i];
209 buffer[pos][i] = 0xFF;
210
211 if (packet_state[pos] == PACKET_START) {
212 /* packet not started */
213 if (*buf == 0xF0) {
214 setup_reachable_nodes(pos);
215
216 /* Packet started */
217 broadcast_data(pos, buf);
218 packet_state[pos] = FRAME_SIZE;
219
220 } else {
221 /* Wrong byte of start */
222 printf("Wrong byte for pkt start %d\n",
223 *buf);
224 }
225 } else if (packet_state[pos] == FRAME_SIZE) {
226 packet_len[pos] = (int)*buf;
227 broadcast_data(pos, buf);
228 packet_state[pos] = PACKET_PSDU;
229 } else if (packet_state[pos] == PACKET_PSDU) {
230 broadcast_data(pos, buf);
231 packet_index[pos] += 1;
232
233 if (packet_index[pos] == packet_len[pos]) {
234 packet_state[pos] = 0;
235 packet_index[pos] = 0;
236 packet_len[pos] = 0;
237 reset_reachable_nodes(pos);
238 }
239 }
240 }
241
242 buffer_index[pos] = 0;
243 if (packet_state[pos] == PACKET_START) {
244 printf("Unlock buffer %d\n", pos);
245 sem_post(sem);
246 }
247 }
248
249 finish:
250 return TRUE;
251 }
252
setup_fifofd(char * pipe,int pos)253 static int setup_fifofd(char *pipe, int pos)
254 {
255 GIOChannel *channel;
256 guint source;
257 int fd;
258 char *pipe_new = pipe;
259
260 printf("%s\n", pipe_new);
261
262 fd = open(pipe_new, O_RDONLY);
263 if (fd < 0) {
264 return fd;
265 }
266
267 channel = g_io_channel_unix_new(fd);
268 g_io_channel_set_close_on_unref(channel, TRUE);
269
270 source = g_io_add_watch(channel,
271 G_IO_IN | G_IO_HUP | G_IO_ERR | G_IO_NVAL,
272 fifo_handler, GINT_TO_POINTER(pos));
273
274 g_io_channel_unref(channel);
275 return source;
276 }
277
main(int argc,char * argv[])278 int main(int argc, char *argv[])
279 {
280 int ret;
281 char str_num[4];
282 int currentIndex;
283
284 srand((unsigned int) time(NULL));
285
286 /* verify if the hub is using graph's */
287 if (argc == 2) {
288 m = read_csv(argv[1]);
289
290 if (m == NULL) {
291 printf("Error reading csv file.\n");
292 return 1;
293 }
294
295 number_nodes = read_lines(argv[1]);
296 } else {
297 printf("Wrong input. Expecting: ./hub <graph.csv>");
298 }
299
300 /* alocate the size */
301 alloc_size();
302
303 int fifoTrain[number_nodes];
304 int open_fifo;
305
306 /* initializing variables */
307 for (int i = 0; i < number_nodes; i++) {
308 buffer_index[i] = 0;
309 packet_state[i] = 0;
310 packet_index[i] = 0;
311 packet_len[i] = 0;
312 }
313
314 /* Initialize reachable nodes*/
315 for (int i = 0; i < number_nodes; i++) {
316 for (int j = 0; j < number_nodes; j++) {
317 send[i][j] = FALSE;
318 }
319 }
320
321 sem = (sem_t *) malloc(sizeof(sem_t));
322 sem_init(sem, 0, 1);
323
324 fdSize = number_nodes;
325
326 main_loop = g_main_loop_new(NULL, FALSE);
327
328 for (int i = 0; i < fdSize; i++) {
329 snprintf(str_num, sizeof(str_num), "%d", (i + 1));
330
331 pipe_in[i] = g_strconcat("/tmp/hub/ip-stack-node",
332 g_strconcat(str_num, PIPE_IN, NULL), NULL);
333 pipe_out[i] = g_strconcat("/tmp/hub/ip-stack-node",
334 g_strconcat(str_num, PIPE_OUT, NULL), NULL);
335
336 fifoTrain[i] = setup_fifofd(pipe_out[i], i);
337
338 if (fifoTrain[i] > 0) {
339 open_fifo++;
340 } else {
341 printf("Failed to open the fifo node %d\n", (i + 1));
342 printf("Probably inserted wrong number of nodes"
343 " or this node doesn't exists\n");
344 printf("******** Closing virtual-hub **************\n");
345 ret = -EINVAL;
346 goto exit;
347 }
348 }
349
350 if (open_fifo == fdSize) {
351 printf("-------------------------------\n");
352 printf("All fifo opened with sucess!!\n");
353 printf("-------------------------------\n");
354 }
355
356 for (int i = 0; i < fdSize; i++) {
357 currentIndex = i;
358 fd_in[i] = open(pipe_in[i], O_WRONLY);
359
360 if (fd_in[i] < 0) {
361 ret = -EINVAL;
362 goto exit;
363 }
364 }
365
366 g_main_loop_run(main_loop);
367 ret = 0;
368
369 exit:
370 for (int i = 0; i < currentIndex; i++) {
371 if (fifoTrain[i] >= 0) {
372 g_source_remove(fifoTrain[i]);
373 }
374
375 if (fd_in[i] >= 0) {
376 close(fd_in[i]);
377 }
378
379 g_free(pipe_in[i]);
380 g_free(pipe_out[i]);
381 }
382
383 sem_destroy(sem);
384 free_memory(m, number_nodes);
385 g_main_loop_unref(main_loop);
386 dealloc_size();
387
388 exit(ret);
389 }
390