1########################################### 2# Project: CMSIS DSP Library 3# Title: appnodes.py 4# Description: Application nodes for kws example 5# 6# $Date: 16 March 2022 7# $Revision: V1.10.0 8# 9# Target Processor: Cortex-M and Cortex-A cores 10# -------------------------------------------------------------------- */ 11# 12# Copyright (C) 2010-2022 ARM Limited or its affiliates. All rights reserved. 13# 14# SPDX-License-Identifier: Apache-2.0 15# 16# Licensed under the Apache License, Version 2.0 (the License); you may 17# not use this file except in compliance with the License. 18# You may obtain a copy of the License at 19# 20# www.apache.org/licenses/LICENSE-2.0 21# 22# Unless required by applicable law or agreed to in writing, software 23# distributed under the License is distributed on an AS IS BASIS, WITHOUT 24# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 25# See the License for the specific language governing permissions and 26# limitations under the License. 27############################################ 28from cmsisdsp.sdf.nodes.simu import * 29from custom import * 30import cmsisdsp.fixedpoint as fix 31import cmsisdsp as dsp 32 33 34# Sink node displaying the recognized word 35class Sink(GenericSink): 36 def __init__(self,inputSize,fifoin): 37 GenericSink.__init__(self,inputSize,fifoin) 38 39 40 def run(self): 41 i=self.getReadBuffer() 42 43 if i[0] == -1: 44 print("Unknown") 45 46 if i[0] == 0: 47 print("Yes") 48 49 return(0) 50 51# Source node getting sample from NumPy buffer 52# At the end of the buffer we generate 0 samples 53class Source(GenericSource): 54 def __init__(self,outputSize,fifoout,buffer): 55 GenericSource.__init__(self,outputSize,fifoout) 56 self._offset=0 57 self._buffer=np.array(buffer) 58 59 def run(self): 60 a=self.getWriteBuffer() 61 62 if self._offset + self._outputSize >= len(self._buffer): 63 a[0:self._outputSize] = np.zeros(self._outputSize,dtype=np.int16) 64 else: 65 a[0:self._outputSize] = self._buffer[self._offset:self._offset+self._outputSize] 66 self._offset = self._offset + self._outputSize 67 68 return(0) 69 70 71# Same implementation as the one in the python notebook 72def dsp_zcr_q15(w): 73 m = dsp.arm_mean_q15(w) 74 # Negate can saturate so we use CMSIS-DSP function which is working on array (and we have a scalar) 75 m = dsp.arm_negate_q15(np.array([m]))[0] 76 w = dsp.arm_offset_q15(w,m) 77 78 f=w[:-1] 79 g=w[1:] 80 k=np.count_nonzero(np.logical_and(np.logical_or(np.logical_and(f>0,g<0), np.logical_and(f<0,g>0)),g>f)) 81 82 # k < len(f) so shift should be 0 except when k == len(f) 83 # When k==len(f) normally quotient is 0x4000 and shift 1 and we convert 84 # this to 0x7FFF 85 status,quotient,shift_val=dsp.arm_divide_q15(k,len(f)) 86 if shift_val==1: 87 return(dsp.arm_shift_q31(np.array([quotient]),shift)[0]) 88 else: 89 return(quotient) 90 91# Same implementation as the one in the notebook 92class FIR(GenericNode): 93 def __init__(self,inputSize,outSize,fifoin,fifoout): 94 GenericNode.__init__(self,inputSize,outSize,fifoin,fifoout) 95 self._firq15=dsp.arm_fir_instance_q15() 96 97 98 99 100 def run(self): 101 a=self.getReadBuffer() 102 b=self.getWriteBuffer() 103 errorStatus = 0 104 105 blockSize=self._inputSize 106 numTaps=10 107 stateLength = numTaps + blockSize - 1 108 dsp.arm_fir_init_q15(self._firq15,10,fix.toQ15(np.ones(10)/10.0),np.zeros(stateLength,dtype=np.int16)) 109 110 b[:] = dsp.arm_fir_q15(self._firq15,a) 111 112 return(errorStatus) 113 114class Feature(GenericNode): 115 def __init__(self,inputSize,outSize,fifoin,fifoout,window): 116 GenericNode.__init__(self,inputSize,outSize,fifoin,fifoout) 117 self._window=window 118 119 120 def run(self): 121 a=self.getReadBuffer() 122 b=self.getWriteBuffer() 123 errorStatus = 0 124 125 b[:] = dsp_zcr_q15(dsp.arm_mult_q15(a,self._window)) 126 127 return(errorStatus) 128 129 130 131class KWS(GenericNode): 132 def __init__(self,inputSize,outSize,fifoin,fifoout,coef_q15,coef_shift,intercept_q15,intercept_shift): 133 GenericNode.__init__(self,inputSize,outSize,fifoin,fifoout) 134 self._coef_q15=coef_q15 135 self._coef_shift=coef_shift 136 self._intercept_q15=intercept_q15 137 self._intercept_shift=intercept_shift 138 139 140 def run(self): 141 a=self.getReadBuffer() 142 b=self.getWriteBuffer() 143 errorStatus = 0 144 145 res=dsp.arm_dot_prod_q15(self._coef_q15,a) 146 147 scaled=dsp.arm_shift_q15(np.array([self._intercept_q15]),self._intercept_shift-self._coef_shift)[0] 148 # Because dot prod output is in Q34.30 149 # and ret is on 64 bits 150 scaled = np.int64(scaled) << 15 151 152 res = res + scaled 153 154 155 156 if res<0: 157 b[0]=-1 158 else: 159 b[0]=0 160 161 return(errorStatus) 162 163