1# 2# Copyright 2022 Google LLC 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15# 16 17import numpy as np 18import scipy.signal as signal 19import scipy.io.wavfile as wavfile 20import struct 21import argparse 22 23import lc3 24import tables as T, appendix_c as C 25 26import mdct, energy, bwdet, sns, tns, spec, ltpf 27import bitstream 28 29class Decoder: 30 31 def __init__(self, dt_ms, sr_hz): 32 33 dt = { 7.5: T.DT_7M5, 10: T.DT_10M }[dt_ms] 34 35 sr = { 8000: T.SRATE_8K , 16000: T.SRATE_16K, 24000: T.SRATE_24K, 36 32000: T.SRATE_32K, 48000: T.SRATE_48K }[sr_hz] 37 38 self.sr = sr 39 self.ne = T.NE[dt][sr] 40 self.ns = T.NS[dt][sr] 41 42 self.mdct = mdct.MdctInverse(dt, sr) 43 44 self.bwdet = bwdet.BandwidthDetector(dt, sr) 45 self.spec = spec.SpectrumSynthesis(dt, sr) 46 self.tns = tns.TnsSynthesis(dt) 47 self.sns = sns.SnsSynthesis(dt, sr) 48 self.ltpf = ltpf.LtpfSynthesis(dt, sr) 49 50 def decode(self, data): 51 52 b = bitstream.BitstreamReader(data) 53 54 bw = self.bwdet.get(b) 55 if bw > self.sr: 56 raise ValueError('Invalid bandwidth indication') 57 58 self.spec.load(b) 59 60 self.tns.load(b, bw, len(data)) 61 62 pitch = b.read_bit() 63 64 self.sns.load(b) 65 66 if pitch: 67 self.ltpf.load(b) 68 else: 69 self.ltpf.disable() 70 71 x = self.spec.decode(b, bw, len(data)) 72 73 return (x, bw, pitch) 74 75 def synthesize(self, x, bw, pitch, nbytes): 76 77 x = self.tns.run(x, bw) 78 79 x = self.sns.run(x) 80 81 x = np.append(x, np.zeros(self.ns - self.ne)) 82 x = self.mdct.run(x) 83 84 x = self.ltpf.run(x) 85 86 return x 87 88 def run(self, data): 89 90 (x, bw, pitch) = self.decode(data) 91 92 x = self.synthesize(x, bw, pitch, len(data)) 93 94 return x 95 96def check_appendix_c(dt): 97 98 i0 = dt - T.DT_7M5 99 100 dec_c = lc3.setup_decoder(int(T.DT_MS[dt] * 1000), 16000) 101 ok = True 102 103 for i in range(len(C.BYTES_AC[i0])): 104 105 pcm = lc3.decode(dec_c, bytes(C.BYTES_AC[i0][i])) 106 ok = ok and np.max(np.abs(pcm - C.X_HAT_CLIP[i0][i])) < 1 107 108 return ok 109 110def check(): 111 112 ok = True 113 114 for dt in range(T.DT_7M5, T.NUM_DT): 115 ok = ok and check_appendix_c(dt) 116 117 return ok 118