1 /*
2  * Copyright (c) 2020 Synopsys, Inc.
3  *
4  * SPDX-License-Identifier: Apache-2.0
5  */
6 
7 #include "pktqueue.h"
8 
9 /* Amount of parallel processed sender/receiver queues of packet headers */
10 #define QUEUE_NUM 2
11 
12 /* Amount of execution threads per pair of queues*/
13 #define THREADS_NUM (CONFIG_MP_MAX_NUM_CPUS+1)
14 
15 /* Amount of packet headers in a queue */
16 #define SIZE_OF_QUEUE 5000
17 
18 /* Size of packet header (in bytes) */
19 #define SIZE_OF_HEADER 24
20 
21 /* CRC16 polynomial */
22 #define POLYNOMIAL 0x8005
23 
24 /* CRC bytes in the packet */
25 #define CRC_BYTE_1 10
26 #define CRC_BYTE_2 11
27 
28 #define STACK_SIZE	2048
29 
30 static struct k_thread tthread[THREADS_NUM*QUEUE_NUM];
31 static struct k_thread qthread[QUEUE_NUM];
32 
33 /* Each queue has its own mutex */
34 struct k_mutex sender_queue_mtx[QUEUE_NUM];
35 struct k_mutex receiver_queue_mtx[QUEUE_NUM];
36 
37 /* Variable which indicates the amount of processed queues */
38 int queues_remain = QUEUE_NUM;
39 /* Variable to define current queue in thread */
40 int current_queue;
41 
42 /* Array of packet header descriptors */
43 struct phdr_desc descriptors[QUEUE_NUM][SIZE_OF_QUEUE];
44 
45 /* Arrays of receiver and sender queues */
46 struct phdr_desc_queue sender[QUEUE_NUM], receiver[QUEUE_NUM];
47 
48 /* Array of packet headers */
49 uint8_t headers[QUEUE_NUM][SIZE_OF_QUEUE][SIZE_OF_HEADER];
50 
51 static K_THREAD_STACK_ARRAY_DEFINE(tstack, THREADS_NUM*QUEUE_NUM, STACK_SIZE);
52 static K_THREAD_STACK_ARRAY_DEFINE(qstack, QUEUE_NUM, STACK_SIZE);
53 
54 K_MUTEX_DEFINE(fetch_queue_mtx);
55 
56 /* Function for initializing "sender" packet header queue */
init_datagram_queue(struct phdr_desc_queue * queue,int queue_num)57 void init_datagram_queue(struct phdr_desc_queue *queue, int queue_num)
58 {
59 	queue->head = descriptors[queue_num];
60 
61 	for (int i = 0; i < SIZE_OF_QUEUE; i++) {
62 		queue->tail = &descriptors[queue_num][i];
63 		descriptors[queue_num][i].ptr = (uint8_t *)&headers[queue_num][i];
64 		/* Fill packet header with random values */
65 		for (int j = 0; j < SIZE_OF_HEADER; j++) {
66 			/* leave crc field zeroed */
67 			if (j < CRC_BYTE_1 || j > CRC_BYTE_2) {
68 				descriptors[queue_num][i].ptr[j] = sys_rand8_get();
69 			} else {
70 				descriptors[queue_num][i].ptr[j] = 0;
71 			}
72 		}
73 		/* Compute crc for further comparison */
74 		uint16_t crc;
75 
76 		crc = crc16(POLYNOMIAL, 0x0000,
77 			    descriptors[queue_num][i].ptr, SIZE_OF_HEADER);
78 
79 		/* Save crc value in header[CRC_BYTE_1-CRC_BYTE_2] field */
80 		descriptors[queue_num][i].ptr[CRC_BYTE_1] = (uint8_t)(crc >> 8);
81 		descriptors[queue_num][i].ptr[CRC_BYTE_2] = (uint8_t)(crc);
82 		queue->count++;
83 		descriptors[queue_num][i].next = &descriptors[queue_num][i+1];
84 	}
85 }
86 
87 /* Thread takes packet from "sender" queue and puts it to "receiver" queue.
88  * Each queue can be accessed only by one thread in a time. */
test_thread(void * arg1,void * arg2,void * arg3)89 void test_thread(void *arg1, void *arg2, void *arg3)
90 {
91 	struct phdr_desc_queue *sender_queue = (struct phdr_desc_queue *)arg1;
92 	struct phdr_desc_queue *receiver_queue = (struct phdr_desc_queue *)arg2;
93 	struct phdr_desc *qin_ptr = NULL;
94 	int queue_num = *(int *)arg3;
95 
96 	/* Fetching one queue */
97 	uint16_t crc, crc_orig;
98 
99 	qin_ptr = phdr_desc_dequeue(sender_queue, &sender_queue_mtx[queue_num]);
100 	while (qin_ptr != NULL) {
101 		/* Store original crc value from header */
102 		crc_orig  =  qin_ptr->ptr[CRC_BYTE_1] << 8;
103 		crc_orig |=   qin_ptr->ptr[11];
104 
105 		/* Crc field should be zero before crc calculation */
106 		qin_ptr->ptr[CRC_BYTE_1] = 0;
107 		qin_ptr->ptr[CRC_BYTE_2] = 0;
108 		crc = crc16(POLYNOMIAL, 0x0000, qin_ptr->ptr, SIZE_OF_HEADER);
109 
110 		/* Compare computed crc with crc from phdr_desc->crc */
111 		if (crc == crc_orig) {
112 			phdr_desc_enqueue(receiver_queue, qin_ptr,
113 						 &receiver_queue_mtx[queue_num]);
114 		}
115 		/* Take next element from "sender queue" */
116 		qin_ptr = phdr_desc_dequeue(sender_queue,
117 						&sender_queue_mtx[queue_num]);
118 	}
119 }
120 
121 /* Thread that processes one pair of sender/receiver queue */
queue_thread(void * arg1,void * arg2,void * arg3)122 void queue_thread(void *arg1, void *arg2, void *arg3)
123 {
124 	ARG_UNUSED(arg1);
125 	ARG_UNUSED(arg2);
126 	ARG_UNUSED(arg3);
127 
128 	int queue_num;
129 
130 	/* Fetching one queue */
131 	k_mutex_lock(&fetch_queue_mtx, K_FOREVER);
132 	queue_num = current_queue;
133 	current_queue++;
134 	k_mutex_unlock(&fetch_queue_mtx);
135 
136 	for (int i = 0; i < THREADS_NUM; i++) {
137 		k_thread_create(&tthread[i+THREADS_NUM*queue_num],
138 			tstack[i+THREADS_NUM*queue_num], STACK_SIZE,
139 			test_thread,
140 			(void *)&sender[queue_num],
141 			(void *)&receiver[queue_num], (void *)&queue_num,
142 			K_PRIO_PREEMPT(10), 0, K_NO_WAIT);
143 	}
144 
145 	/* Wait until sender queue is not empty */
146 	while (sender[queue_num].count != 0) {
147 		k_sleep(K_MSEC(1));
148 	}
149 
150 	/* Decrementing queue counter */
151 	k_mutex_lock(&fetch_queue_mtx, K_FOREVER);
152 	queues_remain--;
153 	k_mutex_unlock(&fetch_queue_mtx);
154 }
155 
main(void)156 int main(void)
157 {
158 	uint32_t start_time, stop_time, cycles_spent, nanoseconds_spent;
159 
160 	current_queue = 0;
161 	printk("Simulating IP header validation on multiple cores.\n");
162 	printk("Each of %d parallel queues is processed by %d threads"
163 		" on %d cores and contain %d packet headers.\n",
164 		QUEUE_NUM, THREADS_NUM, arch_num_cpus(), SIZE_OF_QUEUE);
165 	printk("Bytes in packet header: %d\n\n", SIZE_OF_HEADER);
166 
167 	/* initializing "sender" queue */
168 	for (int i = 0; i < QUEUE_NUM; i++) {
169 		init_datagram_queue(&sender[i], i);
170 		k_mutex_init(&sender_queue_mtx[i]);
171 		k_mutex_init(&receiver_queue_mtx[i]);
172 	}
173 
174 	/* Capture initial time stamp */
175 	start_time = k_cycle_get_32();
176 
177 	for (int i = 0; i < QUEUE_NUM; i++) {
178 		k_thread_create(&qthread[i], qstack[i], STACK_SIZE,
179 				queue_thread,
180 				(void *)&sender[i], (void *)&receiver[i],
181 				(void *)&i, K_PRIO_PREEMPT(11), 0, K_NO_WAIT);
182 	}
183 
184 	/* Wait until all queues are not processed */
185 	while (queues_remain > 0) {
186 		k_sleep(K_MSEC(1));
187 	}
188 
189 	/* Capture final time stamp */
190 	stop_time = k_cycle_get_32();
191 	cycles_spent = stop_time - start_time;
192 	nanoseconds_spent = (uint32_t)k_cyc_to_ns_floor64(cycles_spent);
193 
194 	/* Verify result of packet transmission
195 	 * The counter of correct receiver queues */
196 	int correct = 0;
197 	struct phdr_desc *tmp;
198 	/* Iterate and count amount of packages in receiver queues */
199 	for (int i = 0; i < QUEUE_NUM; i++) {
200 		int count = 0;
201 
202 		tmp = receiver[i].head;
203 		while (tmp != NULL) {
204 			tmp = tmp->next;
205 			count++;
206 		}
207 		if (receiver[i].count == SIZE_OF_QUEUE && count == SIZE_OF_QUEUE) {
208 			correct++;
209 		}
210 	}
211 	if (correct == QUEUE_NUM) {
212 		printk("RESULT: OK\n"
213 			"Application ran successfully.\n"
214 			"All %d headers were processed in %d msec\n",
215 			SIZE_OF_QUEUE*QUEUE_NUM,
216 			nanoseconds_spent / 1000 / 1000);
217 	} else {
218 		printk("RESULT: FAIL\n"
219 			"Application failed.\n"
220 			"The amount of packets in receiver queue "
221 			"is less than expected.\n");
222 	}
223 
224 	k_sleep(K_MSEC(10));
225 	return 0;
226 }
227