1# 2# Copyright 2022 Google LLC 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15# 16 17import numpy as np 18 19import lc3 20import tables as T, appendix_c as C 21 22 23### ------------------------------------------------------------------------ ### 24 25class AttackDetector: 26 27 def __init__(self, dt, sr): 28 29 self.dt = dt 30 self.sr = sr 31 self.ms = T.DT_MS[dt] 32 33 self.xn1 = 0 34 self.xn2 = 0 35 self.en1 = 0 36 self.an1 = 0 37 self.p_att = 0 38 39 def is_enabled(self, nbytes): 40 41 c1 = self.dt == T.DT_10M and \ 42 self.sr == T.SRATE_32K and nbytes > 80 43 44 c2 = self.dt == T.DT_10M and \ 45 self.sr >= T.SRATE_48K and nbytes >= 100 46 47 c3 = self.dt == T.DT_7M5 and \ 48 self.sr == T.SRATE_32K and nbytes >= 61 and nbytes < 150 49 50 c4 = self.dt == T.DT_7M5 and \ 51 self.sr >= T.SRATE_48K and nbytes >= 75 and nbytes < 150 52 53 return c1 or c2 or c3 or c4 54 55 def run(self, nbytes, x): 56 57 ### 3.3.6.2 Downsampling and filtering input 58 59 mf = int(16 * self.ms) 60 61 r = len(x) // mf 62 x_att = np.array([ np.sum(x[i*r:(i+1)*r]) for i in range(mf) ]) 63 64 x_hp = np.empty(mf) 65 x_hp[0 ] = 0.375 * x_att[0 ] - 0.5 * self.xn1 + 0.125 * self.xn2 66 x_hp[1 ] = 0.375 * x_att[1 ] - 0.5 * x_att[0 ] + 0.125 * self.xn1 67 x_hp[2:] = 0.375 * x_att[2:] - 0.5 * x_att[1:-1] + 0.125 * x_att[0:-2] 68 self.xn2 = x_att[-2] 69 self.xn1 = x_att[-1] 70 71 ### 3.3.6.3 Energy calculation 72 73 nb = int(self.ms / 2.5) 74 75 e_att = np.array([ np.sum(np.square(x_hp[40*i:40*(i+1)])) 76 for i in range(nb) ]) 77 78 a_att = np.empty(nb) 79 a_att[0] = np.maximum(0.25 * self.an1, self.en1) 80 for i in range(1,nb): 81 a_att[i] = np.maximum(0.25 * a_att[i-1], e_att[i-1]) 82 self.en1 = e_att[-1] 83 self.an1 = a_att[-1] 84 85 ### 3.3.6.4 Attack Detection 86 87 p_att = -1 88 flags = [ (e_att[i] > 8.5 * a_att[i]) for i in range(nb) ] 89 90 for (i, f) in enumerate(flags): 91 if f: p_att = i 92 93 f_att = p_att >= 0 or self.p_att - 1 >= nb // 2 94 self.p_att = 1 + p_att 95 96 return self.is_enabled(nbytes) and f_att 97 98 99def initial_state(): 100 return { 'en1': 0.0, 'an1': 0.0, 'p_att': 0 } 101 102### ------------------------------------------------------------------------ ### 103 104def check_enabling(rng, dt): 105 106 ok = True 107 108 for sr in range(T.SRATE_16K, T.NUM_SRATE): 109 110 attdet = AttackDetector(dt, sr) 111 112 for nbytes in [ 61, 61-1, 75-1, 75, 80, 80+1, 100-1, 100, 150-1, 150 ]: 113 114 f_att = lc3.attdet_run(dt, sr, nbytes, 115 initial_state(), 2 * rng.random(T.NS[dt][sr]+6) - 1) 116 117 ok = ok and f_att == attdet.is_enabled(nbytes) 118 119 return ok 120 121def check_unit(rng, dt, sr): 122 123 ns = T.NS[dt][sr] 124 ok = True 125 126 attdet = AttackDetector(dt, sr) 127 128 state_c = initial_state() 129 x_c = np.zeros(ns+6) 130 131 for run in range(100): 132 133 ### Generate noise, and an attack at random point 134 135 x = ((2 * rng.random(ns)) - 1) * (2 ** 8 - 1) 136 x[(ns * rng.random()).astype(int)] *= 2 ** 7 137 138 ### Check Implementation 139 140 f_att = attdet.run(100, x) 141 142 x_c = np.append(x_c[-6:], x) 143 f_att_c = lc3.attdet_run(dt, sr, 100, state_c, x_c) 144 145 ok = ok and f_att_c == f_att 146 ok = ok and np.amax(np.abs(1 - state_c['en1']/attdet.en1)) < 2 147 ok = ok and np.amax(np.abs(1 - state_c['an1']/attdet.an1)) < 2 148 ok = ok and state_c['p_att'] == attdet.p_att 149 150 return ok 151 152def check_appendix_c(dt): 153 154 sr = T.SRATE_48K 155 156 state = initial_state() 157 158 x = np.append(np.zeros(6), C.X_PCM_ATT[dt][0]) 159 f_att = lc3.attdet_run(dt, sr, C.NBYTES_ATT[dt], state, x) 160 ok = f_att == C.F_ATT[dt][0] 161 162 x = np.append(x[-6:], C.X_PCM_ATT[dt][1]) 163 f_att = lc3.attdet_run(dt, sr, C.NBYTES_ATT[dt], state, x) 164 ok = f_att == C.F_ATT[dt][1] 165 166 return ok 167 168def check(): 169 170 rng = np.random.default_rng(1234) 171 ok = True 172 173 for dt in range(T.NUM_DT): 174 ok and check_enabling(rng, dt) 175 176 for dt in range(T.NUM_DT): 177 for sr in range(T.SRATE_32K, T.NUM_SRATE): 178 ok = ok and check_unit(rng, dt, sr) 179 180 for dt in range(T.NUM_DT): 181 ok = ok and check_appendix_c(dt) 182 183 return ok 184 185### ------------------------------------------------------------------------ ### 186