1#
2# Copyright 2022 Google LLC
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#     http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17import numpy as np
18import scipy.signal as signal
19import scipy.io.wavfile as wavfile
20import struct
21import argparse
22
23import lc3
24import tables as T, appendix_c as C
25
26import attdet, ltpf
27import mdct, energy, bwdet, sns, tns, spec
28import bitstream
29
30class Encoder:
31
32    def __init__(self, dt_ms, sr_hz):
33
34        dt = { 7.5: T.DT_7M5, 10: T.DT_10M }[dt_ms]
35
36        sr = {  8000: T.SRATE_8K , 16000: T.SRATE_16K, 24000: T.SRATE_24K,
37               32000: T.SRATE_32K, 48000: T.SRATE_48K }[sr_hz]
38
39        self.ne = T.NE[dt][sr]
40
41        self.attdet = attdet.AttackDetector(dt, sr)
42        self.ltpf = ltpf.LtpfAnalysis(dt, sr)
43
44        self.mdct = mdct.MdctForward(dt, sr)
45        self.energy = energy.EnergyBand(dt, sr)
46        self.bwdet = bwdet.BandwidthDetector(dt, sr)
47        self.sns = sns.SnsAnalysis(dt, sr)
48        self.tns = tns.TnsAnalysis(dt)
49        self.spec = spec.SpectrumAnalysis(dt, sr)
50
51    def analyse(self, x, nbytes):
52
53        att = self.attdet.run(nbytes, x)
54
55        pitch_present = self.ltpf.run(x)
56
57        x = self.mdct.run(x)[:self.ne]
58
59        (e, nn_flag) = self.energy.compute(x)
60        if nn_flag:
61            self.ltpf.disable()
62
63        bw = self.bwdet.run(e)
64
65        x = self.sns.run(e, att, x)
66
67        x = self.tns.run(x, bw, nn_flag, nbytes)
68
69        (xq, lastnz, x) = self.spec.run(bw, nbytes,
70            self.bwdet.get_nbits(), self.ltpf.get_nbits(),
71            self.sns.get_nbits(), self.tns.get_nbits(), x)
72
73        return pitch_present
74
75    def encode(self, pitch_present, nbytes):
76
77        b = bitstream.BitstreamWriter(nbytes)
78
79        self.bwdet.store(b)
80
81        self.spec.store(b)
82
83        self.tns.store(b)
84
85        b.write_bit(pitch_present)
86
87        self.sns.store(b)
88
89        if pitch_present:
90            self.ltpf.store(b)
91
92        self.spec.encode(b)
93
94        return b.terminate()
95
96    def run(self, x, nbytes):
97
98        pitch_present = self.analyse(x, nbytes)
99
100        data = self.encode(pitch_present, nbytes)
101
102        return data
103
104def check_appendix_c(dt):
105
106    i0 = dt - T.DT_7M5
107
108    enc_c = lc3.setup_encoder(int(T.DT_MS[dt] * 1000), 16000)
109    ok = True
110
111    for i in range(len(C.X_PCM[i0])):
112
113        data = lc3.encode(enc_c, C.X_PCM[i0][i], C.NBYTES[i0])
114        ok = ok and data == C.BYTES_AC[i0][i]
115
116    return ok
117
118def check():
119
120    ok = True
121
122    for dt in ( T.DT_7M5, T.DT_10M ):
123        ok = ok and check_appendix_c(dt)
124
125    return ok
126