1# 2# Copyright 2022 Google LLC 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15# 16 17import numpy as np 18import scipy.signal as signal 19import scipy.io.wavfile as wavfile 20import struct 21import argparse 22 23import lc3 24import tables as T, appendix_c as C 25 26import attdet, ltpf 27import mdct, energy, bwdet, sns, tns, spec 28import bitstream 29 30class Encoder: 31 32 def __init__(self, dt_ms, sr_hz): 33 34 dt = { 7.5: T.DT_7M5, 10: T.DT_10M }[dt_ms] 35 36 sr = { 8000: T.SRATE_8K , 16000: T.SRATE_16K, 24000: T.SRATE_24K, 37 32000: T.SRATE_32K, 48000: T.SRATE_48K }[sr_hz] 38 39 self.ne = T.NE[dt][sr] 40 41 self.attdet = attdet.AttackDetector(dt, sr) 42 self.ltpf = ltpf.LtpfAnalysis(dt, sr) 43 44 self.mdct = mdct.MdctForward(dt, sr) 45 self.energy = energy.EnergyBand(dt, sr) 46 self.bwdet = bwdet.BandwidthDetector(dt, sr) 47 self.sns = sns.SnsAnalysis(dt, sr) 48 self.tns = tns.TnsAnalysis(dt) 49 self.spec = spec.SpectrumAnalysis(dt, sr) 50 51 def analyse(self, x, nbytes): 52 53 att = self.attdet.run(nbytes, x) 54 55 pitch_present = self.ltpf.run(x) 56 57 x = self.mdct.run(x)[:self.ne] 58 59 (e, nn_flag) = self.energy.compute(x) 60 if nn_flag: 61 self.ltpf.disable() 62 63 bw = self.bwdet.run(e) 64 65 x = self.sns.run(e, att, x) 66 67 x = self.tns.run(x, bw, nn_flag, nbytes) 68 69 (xq, lastnz, x) = self.spec.run(bw, nbytes, 70 self.bwdet.get_nbits(), self.ltpf.get_nbits(), 71 self.sns.get_nbits(), self.tns.get_nbits(), x) 72 73 return pitch_present 74 75 def encode(self, pitch_present, nbytes): 76 77 b = bitstream.BitstreamWriter(nbytes) 78 79 self.bwdet.store(b) 80 81 self.spec.store(b) 82 83 self.tns.store(b) 84 85 b.write_bit(pitch_present) 86 87 self.sns.store(b) 88 89 if pitch_present: 90 self.ltpf.store(b) 91 92 self.spec.encode(b) 93 94 return b.terminate() 95 96 def run(self, x, nbytes): 97 98 pitch_present = self.analyse(x, nbytes) 99 100 data = self.encode(pitch_present, nbytes) 101 102 return data 103 104def check_appendix_c(dt): 105 106 i0 = dt - T.DT_7M5 107 108 enc_c = lc3.setup_encoder(int(T.DT_MS[dt] * 1000), 16000) 109 ok = True 110 111 for i in range(len(C.X_PCM[i0])): 112 113 data = lc3.encode(enc_c, C.X_PCM[i0][i], C.NBYTES[i0]) 114 ok = ok and data == C.BYTES_AC[i0][i] 115 116 return ok 117 118def check(): 119 120 ok = True 121 122 for dt in ( T.DT_7M5, T.DT_10M ): 123 ok = ok and check_appendix_c(dt) 124 125 return ok 126