1#
2# Copyright 2022 Google LLC
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#     http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17import numpy as np
18import scipy.signal as signal
19import scipy.io.wavfile as wavfile
20import struct
21import argparse
22
23import lc3
24import tables as T, appendix_c as C
25
26import mdct, energy, bwdet, sns, tns, spec, ltpf
27import bitstream
28
29class Decoder:
30
31    def __init__(self, dt_ms, sr_hz):
32
33        dt = { 7.5: T.DT_7M5, 10: T.DT_10M }[dt_ms]
34
35        sr = {  8000: T.SRATE_8K , 16000: T.SRATE_16K, 24000: T.SRATE_24K,
36               32000: T.SRATE_32K, 48000: T.SRATE_48K }[sr_hz]
37
38        self.sr = sr
39        self.ne = T.NE[dt][sr]
40        self.ns = T.NS[dt][sr]
41
42        self.mdct = mdct.MdctInverse(dt, sr)
43
44        self.bwdet = bwdet.BandwidthDetector(dt, sr)
45        self.spec = spec.SpectrumSynthesis(dt, sr)
46        self.tns = tns.TnsSynthesis(dt)
47        self.sns = sns.SnsSynthesis(dt, sr)
48        self.ltpf = ltpf.LtpfSynthesis(dt, sr)
49
50    def decode(self, data):
51
52        b = bitstream.BitstreamReader(data)
53
54        bw = self.bwdet.get(b)
55        if bw > self.sr:
56            raise ValueError('Invalid bandwidth indication')
57
58        self.spec.load(b)
59
60        self.tns.load(b, bw, len(data))
61
62        pitch = b.read_bit()
63
64        self.sns.load(b)
65
66        if pitch:
67            self.ltpf.load(b)
68        else:
69            self.ltpf.disable()
70
71        x = self.spec.decode(b, bw, len(data))
72
73        return (x, bw, pitch)
74
75    def synthesize(self, x, bw, pitch, nbytes):
76
77        x = self.tns.run(x, bw)
78
79        x = self.sns.run(x)
80
81        x = np.append(x, np.zeros(self.ns - self.ne))
82        x = self.mdct.run(x)
83
84        x = self.ltpf.run(x)
85
86        return x
87
88    def run(self, data):
89
90        (x, bw, pitch) = self.decode(data)
91
92        x = self.synthesize(x, bw, pitch, len(data))
93
94        return x
95
96def check_appendix_c(dt):
97
98    i0 = dt - T.DT_7M5
99
100    dec_c = lc3.setup_decoder(int(T.DT_MS[dt] * 1000), 16000)
101    ok = True
102
103    for i in range(len(C.BYTES_AC[i0])):
104
105        pcm = lc3.decode(dec_c, bytes(C.BYTES_AC[i0][i]))
106        ok = ok and np.max(np.abs(pcm - C.X_HAT_CLIP[i0][i])) < 1
107
108    return ok
109
110def check():
111
112    ok = True
113
114    for dt in range(T.DT_7M5, T.NUM_DT):
115        ok = ok and check_appendix_c(dt)
116
117    return ok
118