1###########################################
2# Project:      CMSIS DSP Library
3# Title:        appnodes.py
4# Description:  Application nodes for kws example
5#
6# $Date:        16 March 2022
7# $Revision:    V1.10.0
8#
9# Target Processor: Cortex-M and Cortex-A cores
10# -------------------------------------------------------------------- */
11#
12# Copyright (C) 2010-2022 ARM Limited or its affiliates. All rights reserved.
13#
14# SPDX-License-Identifier: Apache-2.0
15#
16# Licensed under the Apache License, Version 2.0 (the License); you may
17# not use this file except in compliance with the License.
18# You may obtain a copy of the License at
19#
20# www.apache.org/licenses/LICENSE-2.0
21#
22# Unless required by applicable law or agreed to in writing, software
23# distributed under the License is distributed on an AS IS BASIS, WITHOUT
24# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
25# See the License for the specific language governing permissions and
26# limitations under the License.
27############################################
28from cmsisdsp.sdf.nodes.simu import *
29from custom import *
30import cmsisdsp.fixedpoint as fix
31import cmsisdsp as dsp
32
33
34# Sink node displaying the recognized word
35class Sink(GenericSink):
36    def __init__(self,inputSize,fifoin):
37        GenericSink.__init__(self,inputSize,fifoin)
38
39
40    def run(self):
41        i=self.getReadBuffer()
42
43        if i[0] == -1:
44            print("Unknown")
45
46        if i[0] == 0:
47            print("Yes")
48
49        return(0)
50
51# Source node getting sample from NumPy buffer
52# At the end of the buffer we generate 0 samples
53class Source(GenericSource):
54    def __init__(self,outputSize,fifoout,buffer):
55        GenericSource.__init__(self,outputSize,fifoout)
56        self._offset=0
57        self._buffer=np.array(buffer)
58
59    def run(self):
60        a=self.getWriteBuffer()
61
62        if self._offset + self._outputSize >= len(self._buffer):
63            a[0:self._outputSize] = np.zeros(self._outputSize,dtype=np.int16)
64        else:
65           a[0:self._outputSize] = self._buffer[self._offset:self._offset+self._outputSize]
66           self._offset = self._offset + self._outputSize
67
68        return(0)
69
70
71# Same implementation as the one in the python notebook
72def dsp_zcr_q15(w):
73    m = dsp.arm_mean_q15(w)
74    # Negate can saturate so we use CMSIS-DSP function which is working on array (and we have a scalar)
75    m = dsp.arm_negate_q15(np.array([m]))[0]
76    w = dsp.arm_offset_q15(w,m)
77
78    f=w[:-1]
79    g=w[1:]
80    k=np.count_nonzero(np.logical_and(np.logical_or(np.logical_and(f>0,g<0), np.logical_and(f<0,g>0)),g>f))
81
82    # k < len(f) so shift should be 0 except when k == len(f)
83    # When k==len(f) normally quotient is 0x4000 and shift 1 and we convert
84    # this to 0x7FFF
85    status,quotient,shift_val=dsp.arm_divide_q15(k,len(f))
86    if shift_val==1:
87        return(dsp.arm_shift_q31(np.array([quotient]),shift)[0])
88    else:
89        return(quotient)
90
91# Same implementation as the one in the notebook
92class FIR(GenericNode):
93    def __init__(self,inputSize,outSize,fifoin,fifoout):
94        GenericNode.__init__(self,inputSize,outSize,fifoin,fifoout)
95        self._firq15=dsp.arm_fir_instance_q15()
96
97
98
99
100    def run(self):
101        a=self.getReadBuffer()
102        b=self.getWriteBuffer()
103        errorStatus = 0
104
105        blockSize=self._inputSize
106        numTaps=10
107        stateLength = numTaps + blockSize - 1
108        dsp.arm_fir_init_q15(self._firq15,10,fix.toQ15(np.ones(10)/10.0),np.zeros(stateLength,dtype=np.int16))
109
110        b[:] = dsp.arm_fir_q15(self._firq15,a)
111
112        return(errorStatus)
113
114class Feature(GenericNode):
115    def __init__(self,inputSize,outSize,fifoin,fifoout,window):
116        GenericNode.__init__(self,inputSize,outSize,fifoin,fifoout)
117        self._window=window
118
119
120    def run(self):
121        a=self.getReadBuffer()
122        b=self.getWriteBuffer()
123        errorStatus = 0
124
125        b[:] = dsp_zcr_q15(dsp.arm_mult_q15(a,self._window))
126
127        return(errorStatus)
128
129
130
131class KWS(GenericNode):
132    def __init__(self,inputSize,outSize,fifoin,fifoout,coef_q15,coef_shift,intercept_q15,intercept_shift):
133        GenericNode.__init__(self,inputSize,outSize,fifoin,fifoout)
134        self._coef_q15=coef_q15
135        self._coef_shift=coef_shift
136        self._intercept_q15=intercept_q15
137        self._intercept_shift=intercept_shift
138
139
140    def run(self):
141        a=self.getReadBuffer()
142        b=self.getWriteBuffer()
143        errorStatus = 0
144
145        res=dsp.arm_dot_prod_q15(self._coef_q15,a)
146
147        scaled=dsp.arm_shift_q15(np.array([self._intercept_q15]),self._intercept_shift-self._coef_shift)[0]
148        # Because dot prod output is in Q34.30
149        # and ret is on 64 bits
150        scaled = np.int64(scaled) << 15
151
152        res = res + scaled
153
154
155
156        if res<0:
157            b[0]=-1
158        else:
159            b[0]=0
160
161        return(errorStatus)
162
163