1 /*
2  * Copyright 2019-2022 Arm Limited and/or its affiliates <open-source-office@arm.com>
3  *
4  * SPDX-License-Identifier: Apache-2.0
5  */
6 
7 /****************************************************************************
8  * Includes
9  ****************************************************************************/
10 
11 #include "inference_process.hpp"
12 
13 #include <inttypes.h>
14 #include <string>
15 #include <stdio.h>
16 #include <vector>
17 #include <zephyr/kernel.h>
18 
19 /* Model data */
20 #include "input.h"
21 #include "model.h"
22 #include "output.h"
23 
24 using namespace std;
25 using namespace InferenceProcess;
26 
27 /****************************************************************************
28  * Defines
29  ****************************************************************************/
30 
31 /* Number of worker threads running the inferences. There should typically be
32  * one worker thread per NPU. */
33 #ifndef NUM_INFERENCE_TASKS
34 #define NUM_INFERENCE_TASKS 1
35 #endif
36 
37 /* Number of sender tasks, that post inference requests to the worker threads. */
38 #ifndef NUM_JOB_TASKS
39 #define NUM_JOB_TASKS 2
40 #endif
41 
42 /* Number of inferences per sender task. */
43 #ifndef NUM_JOBS_PER_TASK
44 #define NUM_JOBS_PER_TASK 2
45 #endif
46 
47 /* Tensor arena size */
48 #ifdef TENSOR_ARENA_SIZE /* If defined in model.h */
49 #define TENSOR_ARENA_SIZE_PER_INFERENCE TENSOR_ARENA_SIZE
50 #else /* If not defined, use maximum available */
51 #define TENSOR_ARENA_SIZE_PER_INFERENCE 2000000 / NUM_INFERENCE_TASKS
52 #endif
53 
54 /****************************************************************************
55  * InferenceJob
56  ****************************************************************************/
57 
58 namespace
59 {
60 struct InferenceProcessParams {
InferenceProcessParams__anoned69602d0111::InferenceProcessParams61 	InferenceProcessParams() : tensorArena(nullptr), arenaSize(0)
62 	{
63 	}
64 
InferenceProcessParams__anoned69602d0111::InferenceProcessParams65 	InferenceProcessParams(k_queue *_queue, uint8_t *_tensorArena, size_t _arenaSize)
66 		: queueHandle(_queue), tensorArena(_tensorArena), arenaSize(_arenaSize)
67 	{
68 	}
69 
70 	k_queue *queueHandle;
71 	uint8_t *tensorArena;
72 	size_t arenaSize;
73 };
74 
75 /* Wrapper around InferenceProcess::InferenceJob. Adds responseQueue and status
76  * for Zephyr multi-tasking purposes.& */
77 struct xInferenceJob : public InferenceJob {
xInferenceJob__anoned69602d0111::xInferenceJob78 	xInferenceJob() : InferenceJob(), responseQueue(nullptr), status(false)
79 	{
80 	}
81 
xInferenceJob__anoned69602d0111::xInferenceJob82 	xInferenceJob(const string &_name, const DataPtr &_networkModel,
83 		      const vector<DataPtr> &_input, const vector<DataPtr> &_output,
84 		      const vector<DataPtr> &_expectedOutput, k_queue *_queue)
85 		: InferenceJob(_name, _networkModel, _input, _output, _expectedOutput),
86 		  responseQueue(_queue), status(false)
87 	{
88 	}
89 
90 	k_queue *responseQueue;
91 	bool status;
92 };
93 
94 /* Number of total completed jobs, needed to exit application correctly if
95  * NUM_JOB_TASKS > 1 */
96 volatile int totalCompletedJobs = 0;
97 
98 /* TensorArena static initialisation */
99 const size_t arenaSize = TENSOR_ARENA_SIZE_PER_INFERENCE;
100 
101 __attribute__((section("tflm_arena"), aligned(16)))
102 uint8_t inferenceProcessTensorArena[NUM_INFERENCE_TASKS][arenaSize];
103 
104 /* Allocate and initialize heap */
allocateHeap(const size_t size)105 void *allocateHeap(const size_t size)
106 {
107 	k_heap *heap = static_cast<k_heap *>(k_malloc(sizeof(k_heap)));
108 	uint8_t *buf = static_cast<uint8_t *>(k_malloc(size));
109 
110 	if ((buf == nullptr) || (heap == nullptr)) {
111 		printk("Heap allocation failed. heap=%p, buf=%p, size=%zu\n", heap, buf, size);
112 		exit(1);
113 	}
114 
115 	k_heap_init(heap, buf, size);
116 
117 	return static_cast<void *>(heap);
118 }
119 
120 /* inferenceProcessTask - Run jobs from queue with available driver */
inferenceProcessTask(void * _name,void * heap,void * _params)121 void inferenceProcessTask(void *_name, void *heap, void *_params)
122 {
123 	string *name = static_cast<string *>(_name);
124 	InferenceProcessParams *params = static_cast<InferenceProcessParams *>(_params);
125 
126 	/* Assign the pre allocated heap - used in the k_queue_alloc_append */
127 	k_thread_heap_assign(k_current_get(), static_cast<k_heap *>(heap));
128 
129 	class InferenceProcess inferenceProcess(params->tensorArena, params->arenaSize);
130 
131 	for (;;) {
132 		/* Receive inference job */
133 		xInferenceJob *job =
134 			static_cast<xInferenceJob *>(k_queue_get(params->queueHandle, Z_FOREVER));
135 
136 		printk("%s: Received inference job. job=%p\n", name->c_str(), job);
137 
138 		/* Run inference */
139 		job->status = inferenceProcess.runJob(*job);
140 
141 		printk("%s: Sending inference response. job=%p\n", name->c_str(), job);
142 
143 		/* Return inference message */
144 		int ret = k_queue_alloc_append(job->responseQueue, job);
145 		if (0 != ret) {
146 			printk("%s: Failed to send message\n", name->c_str());
147 			exit(1);
148 		}
149 	}
150 
151 	k_thread_abort(k_current_get());
152 
153 	CODE_UNREACHABLE;
154 }
155 
156 /* inferenceSenderTask - Creates NUM_INFERENCE_JOBS jobs, queues them, and then
157  * listens for completion status */
inferenceSenderTask(void * _name,void * heap,void * _queue)158 void inferenceSenderTask(void *_name, void *heap, void *_queue)
159 {
160 	string *name = static_cast<string *>(_name);
161 	k_queue *inferenceQueue = static_cast<k_queue *>(_queue);
162 	int ret = 0;
163 
164 	/* Assign the pre allocated heap - used in the k_queue_alloc_append */
165 	k_thread_heap_assign(k_current_get(), static_cast<k_heap *>(heap));
166 
167 	/* Create queue for response messages */
168 	k_queue senderQueue;
169 	k_queue_init(&senderQueue);
170 
171 	/* Loop over all jobs and push them to inference queue */
172 	xInferenceJob jobs[NUM_JOBS_PER_TASK];
173 	for (int n = 0; n < NUM_JOBS_PER_TASK; n++) {
174 		auto &job = jobs[n];
175 		job = xInferenceJob(modelName, DataPtr(networkModelData, sizeof(networkModelData)),
176 				    { DataPtr(inputData, sizeof(inputData)) }, {},
177 				    { DataPtr(expectedOutputData, sizeof(expectedOutputData)) },
178 				    &senderQueue);
179 
180 		printk("%s: Sending inference. job=%p, name=%s\n", name->c_str(), &job,
181 		       job.name.c_str());
182 
183 		/* Queue job */
184 		ret = k_queue_alloc_append(inferenceQueue, &job);
185 		if (0 != ret) {
186 			printk("%s: Failed to send message\n", name->c_str());
187 			exit(1);
188 		}
189 	}
190 
191 	/* Listen for completion status */
192 	do {
193 		xInferenceJob *job =
194 			static_cast<xInferenceJob *>(k_queue_get(&senderQueue, Z_FOREVER));
195 
196 		printk("%s: Received job response. job=%p, status=%u\n", name->c_str(), job,
197 		       job->status);
198 
199 		totalCompletedJobs++;
200 
201 		ret += job->status;
202 		if (job->status != 0) {
203 			break;
204 		}
205 	} while (totalCompletedJobs < NUM_JOBS_PER_TASK * NUM_JOB_TASKS);
206 
207 	exit(ret);
208 }
209 
210 } /* namespace */
211 
212 /* Zephyr application. NOTE: Additional tasks may require increased heap size. */
main()213 int main()
214 {
215 	struct {
216 		k_thread thread;
217 		k_tid_t id;
218 	} threads[NUM_JOB_TASKS + NUM_INFERENCE_TASKS];
219 	size_t nthreads = 0;
220 
221 	/* Allocate one global heap for all threads */
222 	void *heapPtr = allocateHeap(256);
223 
224 	k_queue inferenceQueue;
225 	k_queue_init(&inferenceQueue);
226 
227 	/* inferenceSender tasks to create and queue the jobs */
228 	for (int n = 0; n < NUM_JOB_TASKS; n++) {
229 		const size_t stackSize = 2048;
230 		k_thread_stack_t *stack = static_cast<k_thread_stack_t *>(k_malloc(stackSize));
231 		if (stack == nullptr) {
232 			printk("Failed to allocate stack to 'inferenceSenderTask%i'\n", n);
233 			exit(1);
234 		}
235 
236 		auto &thread = threads[nthreads];
237 		string *name = new string("sender " + to_string(n));
238 
239 		thread.id = k_thread_create(&thread.thread, stack, stackSize, inferenceSenderTask,
240 					    name, heapPtr, &inferenceQueue, 3, 0, K_FOREVER);
241 		if (thread.id == 0) {
242 			printk("Failed to create 'inferenceSenderTask%i'\n", n);
243 			exit(1);
244 		}
245 
246 		nthreads++;
247 	}
248 
249 	/* Create inferenceProcess tasks to process the queued jobs */
250 	InferenceProcessParams taskParams[NUM_INFERENCE_TASKS];
251 	for (int n = 0; n < NUM_INFERENCE_TASKS; n++) {
252 		const size_t stackSize = 8192;
253 		k_thread_stack_t *stack = static_cast<k_thread_stack_t *>(k_malloc(stackSize));
254 		if (stack == nullptr) {
255 			printk("Failed to allocate stack to 'inferenceSenderTask%i'\n", n);
256 			exit(1);
257 		}
258 
259 		auto &thread = threads[nthreads];
260 		auto &taskParam = taskParams[n];
261 		taskParam = InferenceProcessParams(&inferenceQueue, inferenceProcessTensorArena[n],
262 						   arenaSize);
263 		string *name = new string("runner " + to_string(n));
264 
265 		thread.id = k_thread_create(&thread.thread, stack, stackSize, inferenceProcessTask,
266 					    name, heapPtr, &taskParam, 2, 0, K_FOREVER);
267 		if (thread.id == 0) {
268 			printk("Failed to create 'inferenceProcessTask%i'\n", n);
269 			exit(1);
270 		}
271 
272 		nthreads++;
273 	}
274 
275 	/* start Scheduler */
276 	for (size_t n = 0; n < nthreads; n++) {
277 		k_thread_start(threads[n].id);
278 	}
279 
280 	/* put the task in the lowest priority */
281 	k_thread_priority_set(k_current_get(), 4);
282 
283 	/* Safety belt */
284 	k_thread_suspend(k_current_get());
285 
286 	printk("Zephyr application failed to initialise \n");
287 
288 	return 1;
289 }
290