1 /*
2 * Copyright 2019-2022 Arm Limited and/or its affiliates <open-source-office@arm.com>
3 *
4 * SPDX-License-Identifier: Apache-2.0
5 */
6
7 /****************************************************************************
8 * Includes
9 ****************************************************************************/
10
11 #include "inference_process.hpp"
12
13 #include <inttypes.h>
14 #include <string>
15 #include <stdio.h>
16 #include <vector>
17 #include <zephyr/kernel.h>
18
19 /* Model data */
20 #include "input.h"
21 #include "model.h"
22 #include "output.h"
23
24 using namespace std;
25 using namespace InferenceProcess;
26
27 /****************************************************************************
28 * Defines
29 ****************************************************************************/
30
31 /* Number of worker threads running the inferences. There should typically be
32 * one worker thread per NPU. */
33 #ifndef NUM_INFERENCE_TASKS
34 #define NUM_INFERENCE_TASKS 1
35 #endif
36
37 /* Number of sender tasks, that post inference requests to the worker threads. */
38 #ifndef NUM_JOB_TASKS
39 #define NUM_JOB_TASKS 2
40 #endif
41
42 /* Number of inferences per sender task. */
43 #ifndef NUM_JOBS_PER_TASK
44 #define NUM_JOBS_PER_TASK 2
45 #endif
46
47 /* Tensor arena size */
48 #ifdef TENSOR_ARENA_SIZE /* If defined in model.h */
49 #define TENSOR_ARENA_SIZE_PER_INFERENCE TENSOR_ARENA_SIZE
50 #else /* If not defined, use maximum available */
51 #define TENSOR_ARENA_SIZE_PER_INFERENCE 2000000 / NUM_INFERENCE_TASKS
52 #endif
53
54 /****************************************************************************
55 * InferenceJob
56 ****************************************************************************/
57
58 namespace
59 {
60 struct InferenceProcessParams {
InferenceProcessParams__anon38a1f12a0111::InferenceProcessParams61 InferenceProcessParams() : tensorArena(nullptr), arenaSize(0)
62 {
63 }
64
InferenceProcessParams__anon38a1f12a0111::InferenceProcessParams65 InferenceProcessParams(k_queue *_queue, uint8_t *_tensorArena, size_t _arenaSize)
66 : queueHandle(_queue), tensorArena(_tensorArena), arenaSize(_arenaSize)
67 {
68 }
69
70 k_queue *queueHandle;
71 uint8_t *tensorArena;
72 size_t arenaSize;
73 };
74
75 /* Wrapper around InferenceProcess::InferenceJob. Adds responseQueue and status
76 * for Zephyr multi-tasking purposes.& */
77 struct xInferenceJob : public InferenceJob {
xInferenceJob__anon38a1f12a0111::xInferenceJob78 xInferenceJob() : InferenceJob(), responseQueue(nullptr), status(false)
79 {
80 }
81
xInferenceJob__anon38a1f12a0111::xInferenceJob82 xInferenceJob(const string &_name, const DataPtr &_networkModel,
83 const vector<DataPtr> &_input, const vector<DataPtr> &_output,
84 const vector<DataPtr> &_expectedOutput, k_queue *_queue)
85 : InferenceJob(_name, _networkModel, _input, _output, _expectedOutput),
86 responseQueue(_queue), status(false)
87 {
88 }
89
90 k_queue *responseQueue;
91 bool status;
92 };
93
94 /* Number of total completed jobs, needed to exit application correctly if
95 * NUM_JOB_TASKS > 1 */
96 volatile int totalCompletedJobs = 0;
97
98 /* TensorArena static initialisation */
99 const size_t arenaSize = TENSOR_ARENA_SIZE_PER_INFERENCE;
100
101 __attribute__((section("tflm_arena"), aligned(16)))
102 uint8_t inferenceProcessTensorArena[NUM_INFERENCE_TASKS][arenaSize];
103
104 /* Allocate and initialize heap */
allocateHeap(const size_t size)105 void *allocateHeap(const size_t size)
106 {
107 k_heap *heap = static_cast<k_heap *>(k_malloc(sizeof(k_heap)));
108 uint8_t *buf = static_cast<uint8_t *>(k_malloc(size));
109
110 if ((buf == nullptr) || (heap == nullptr)) {
111 printk("Heap allocation failed. heap=%p, buf=%p, size=%zu\n", heap, buf, size);
112 exit(1);
113 }
114
115 k_heap_init(heap, buf, size);
116
117 return static_cast<void *>(heap);
118 }
119
120 /* inferenceProcessTask - Run jobs from queue with available driver */
inferenceProcessTask(void * _name,void * heap,void * _params)121 void inferenceProcessTask(void *_name, void *heap, void *_params)
122 {
123 string *name = static_cast<string *>(_name);
124 InferenceProcessParams *params = static_cast<InferenceProcessParams *>(_params);
125
126 /* Assign the pre allocated heap - used in the k_queue_alloc_append */
127 k_thread_heap_assign(k_current_get(), static_cast<k_heap *>(heap));
128
129 class InferenceProcess inferenceProcess(params->tensorArena, params->arenaSize);
130
131 for (;;) {
132 /* Receive inference job */
133 xInferenceJob *job =
134 static_cast<xInferenceJob *>(k_queue_get(params->queueHandle, Z_FOREVER));
135
136 printk("%s: Received inference job. job=%p\n", name->c_str(), job);
137
138 /* Run inference */
139 job->status = inferenceProcess.runJob(*job);
140
141 printk("%s: Sending inference response. job=%p\n", name->c_str(), job);
142
143 /* Return inference message */
144 int ret = k_queue_alloc_append(job->responseQueue, job);
145 if (0 != ret) {
146 printk("%s: Failed to send message\n", name->c_str());
147 exit(1);
148 }
149 }
150
151 k_thread_abort(k_current_get());
152 }
153
154 /* inferenceSenderTask - Creates NUM_INFERENCE_JOBS jobs, queues them, and then
155 * listens for completion status */
inferenceSenderTask(void * _name,void * heap,void * _queue)156 void inferenceSenderTask(void *_name, void *heap, void *_queue)
157 {
158 string *name = static_cast<string *>(_name);
159 k_queue *inferenceQueue = static_cast<k_queue *>(_queue);
160 int ret = 0;
161
162 /* Assign the pre allocated heap - used in the k_queue_alloc_append */
163 k_thread_heap_assign(k_current_get(), static_cast<k_heap *>(heap));
164
165 /* Create queue for response messages */
166 k_queue senderQueue;
167 k_queue_init(&senderQueue);
168
169 /* Loop over all jobs and push them to inference queue */
170 xInferenceJob jobs[NUM_JOBS_PER_TASK];
171 for (int n = 0; n < NUM_JOBS_PER_TASK; n++) {
172 auto &job = jobs[n];
173 job = xInferenceJob(modelName, DataPtr(networkModelData, sizeof(networkModelData)),
174 { DataPtr(inputData, sizeof(inputData)) }, {},
175 { DataPtr(expectedOutputData, sizeof(expectedOutputData)) },
176 &senderQueue);
177
178 printk("%s: Sending inference. job=%p, name=%s\n", name->c_str(), &job,
179 job.name.c_str());
180
181 /* Queue job */
182 ret = k_queue_alloc_append(inferenceQueue, &job);
183 if (0 != ret) {
184 printk("%s: Failed to send message\n", name->c_str());
185 exit(1);
186 }
187 }
188
189 /* Listen for completion status */
190 do {
191 xInferenceJob *job =
192 static_cast<xInferenceJob *>(k_queue_get(&senderQueue, Z_FOREVER));
193
194 printk("%s: Received job response. job=%p, status=%u\n", name->c_str(), job,
195 job->status);
196
197 totalCompletedJobs++;
198
199 ret += job->status;
200 if (job->status != 0) {
201 break;
202 }
203 } while (totalCompletedJobs < NUM_JOBS_PER_TASK * NUM_JOB_TASKS);
204
205 exit(ret);
206 }
207
208 } /* namespace */
209
210 /* Zephyr application. NOTE: Additional tasks may require increased heap size. */
main()211 int main()
212 {
213 struct {
214 k_thread thread;
215 k_tid_t id;
216 } threads[NUM_JOB_TASKS + NUM_INFERENCE_TASKS];
217 size_t nthreads = 0;
218
219 /* Allocate one global heap for all threads */
220 void *heapPtr = allocateHeap(256);
221
222 k_queue inferenceQueue;
223 k_queue_init(&inferenceQueue);
224
225 /* inferenceSender tasks to create and queue the jobs */
226 for (int n = 0; n < NUM_JOB_TASKS; n++) {
227 const size_t stackSize = 2048;
228 k_thread_stack_t *stack = static_cast<k_thread_stack_t *>(k_malloc(stackSize));
229 if (stack == nullptr) {
230 printk("Failed to allocate stack to 'inferenceSenderTask%i'\n", n);
231 exit(1);
232 }
233
234 auto &thread = threads[nthreads];
235 string *name = new string("sender " + to_string(n));
236
237 thread.id = k_thread_create(&thread.thread, stack, stackSize, inferenceSenderTask,
238 name, heapPtr, &inferenceQueue, 3, 0, K_FOREVER);
239 if (thread.id == 0) {
240 printk("Failed to create 'inferenceSenderTask%i'\n", n);
241 exit(1);
242 }
243
244 nthreads++;
245 }
246
247 /* Create inferenceProcess tasks to process the queued jobs */
248 InferenceProcessParams taskParams[NUM_INFERENCE_TASKS];
249 for (int n = 0; n < NUM_INFERENCE_TASKS; n++) {
250 const size_t stackSize = 8192;
251 k_thread_stack_t *stack = static_cast<k_thread_stack_t *>(k_malloc(stackSize));
252 if (stack == nullptr) {
253 printk("Failed to allocate stack to 'inferenceSenderTask%i'\n", n);
254 exit(1);
255 }
256
257 auto &thread = threads[nthreads];
258 auto &taskParam = taskParams[n];
259 taskParam = InferenceProcessParams(&inferenceQueue, inferenceProcessTensorArena[n],
260 arenaSize);
261 string *name = new string("runner " + to_string(n));
262
263 thread.id = k_thread_create(&thread.thread, stack, stackSize, inferenceProcessTask,
264 name, heapPtr, &taskParam, 2, 0, K_FOREVER);
265 if (thread.id == 0) {
266 printk("Failed to create 'inferenceProcessTask%i'\n", n);
267 exit(1);
268 }
269
270 nthreads++;
271 }
272
273 /* start Scheduler */
274 for (size_t n = 0; n < nthreads; n++) {
275 k_thread_start(threads[n].id);
276 }
277
278 /* put the task in the lowest priority */
279 k_thread_priority_set(k_current_get(), 4);
280
281 /* Safety belt */
282 k_thread_suspend(k_current_get());
283
284 printk("Zephyr application failed to initialise \n");
285
286 return 1;
287 }
288