]> git.immae.eu Git - perso/Immae/Projets/Python/MusicSampler.git/blob - helpers/music_file.py
Coding styles
[perso/Immae/Projets/Python/MusicSampler.git] / helpers / music_file.py
1 import threading
2 import pydub
3 import time
4 from transitions.extensions import HierarchicalMachine as Machine
5
6 import os.path
7
8 import audioop
9
10 from .lock import Lock
11 from . import Config, gain, debug_print, error_print
12 from .mixer import Mixer
13
14 file_lock = Lock("file")
15
16 class MusicFile(Machine):
17 def __init__(self, filename, mapping, name=None, gain=1):
18 states = [
19 'initial',
20 'loading',
21 'failed',
22 {
23 'name': 'loaded',
24 'children': ['stopped', 'playing', 'paused', 'stopping']
25 }
26 ]
27 transitions = [
28 {
29 'trigger': 'load',
30 'source': 'initial',
31 'dest': 'loading'
32 },
33 {
34 'trigger': 'fail',
35 'source': 'loading',
36 'dest': 'failed'
37 },
38 {
39 'trigger': 'success',
40 'source': 'loading',
41 'dest': 'loaded_stopped'
42 },
43 {
44 'trigger': 'start_playing',
45 'source': 'loaded_stopped',
46 'dest': 'loaded_playing'
47 },
48 {
49 'trigger': 'pause',
50 'source': 'loaded_playing',
51 'dest': 'loaded_paused'
52 },
53 {
54 'trigger': 'unpause',
55 'source': 'loaded_paused',
56 'dest': 'loaded_playing'
57 },
58 {
59 'trigger': 'stop_playing',
60 'source': ['loaded_playing','loaded_paused'],
61 'dest': 'loaded_stopping'
62 },
63 {
64 'trigger': 'stopped',
65 'source': 'loaded_stopping',
66 'dest': 'loaded_stopped',
67 'after': 'trigger_stopped_events'
68 }
69 ]
70
71 Machine.__init__(self, states=states,
72 transitions=transitions, initial='initial')
73
74 self.volume = 100
75 self.mapping = mapping
76 self.filename = filename
77 self.name = name or filename
78 self.audio_segment = None
79 self.audio_segment_frame_width = 0
80 self.initial_volume_factor = gain
81 self.music_lock = Lock("music__" + filename)
82 self.wait_event = threading.Event()
83 self.db_gain = 0
84
85 threading.Thread(name="MSMusicLoad", target=self.load).start()
86
87 def on_enter_loading(self):
88 with file_lock:
89 try:
90 debug_print("Loading « {} »".format(self.name))
91 self.mixer = self.mapping.mixer or Mixer()
92 initial_db_gain = gain(self.initial_volume_factor * 100)
93 self.audio_segment = pydub.AudioSegment \
94 .from_file(self.filename) \
95 .set_frame_rate(Config.frame_rate) \
96 .set_channels(Config.channels) \
97 .set_sample_width(Config.sample_width) \
98 .apply_gain(initial_db_gain)
99 self.audio_segment_frame_width = self.audio_segment.frame_width
100 self.sound_duration = self.audio_segment.duration_seconds
101 except Exception as e:
102 error_print("failed to load « {} »: {}".format(self.name, e))
103 self.loading_error = e
104 self.fail()
105 else:
106 self.success()
107 debug_print("Loaded « {} »".format(self.name))
108
109 def check_is_loaded(self):
110 return self.state.startswith('loaded_')
111
112 def is_not_stopped(self):
113 return self.check_is_loaded() and not self.is_loaded_stopped()
114
115 def is_paused(self):
116 return self.is_loaded_paused()
117
118 @property
119 def sound_position(self):
120 if self.is_not_stopped():
121 return self.current_frame / self.current_audio_segment.frame_rate
122 else:
123 return 0
124
125 def play(self, fade_in=0, volume=100, loop=0, start_at=0):
126 self.db_gain = gain(volume) + self.mapping.master_gain
127 self.volume = volume
128 self.loop = loop
129
130 ms = int(start_at * 1000)
131 ms_fi = int(fade_in * 1000)
132 with self.music_lock:
133 self.current_audio_segment = self.audio_segment
134 self.current_frame = int(start_at * self.audio_segment.frame_rate)
135 if ms_fi > 0:
136 # FIXME: apply it to repeated when looping?
137 self.a_s_with_effect = self \
138 .current_audio_segment[ms : ms+ms_fi] \
139 .fade_in(ms_fi)
140 self.current_frame_with_effect = 0
141 else:
142 self.a_s_with_effect = None
143
144 self.start_playing()
145
146 def on_enter_loaded_playing(self):
147 self.mixer.add_file(self)
148
149 def finished_callback(self):
150 if self.is_loaded_playing():
151 self.stop_playing()
152 if self.is_loaded_stopping():
153 self.stopped()
154
155 def trigger_stopped_events(self):
156 self.mixer.remove_file(self)
157 self.wait_event.set()
158
159 def play_callback(self, out_data_length, frame_count):
160 if self.is_loaded_paused():
161 return b'\0' * out_data_length
162
163 with self.music_lock:
164 [data, nb_frames] = self.get_next_sample(frame_count)
165 if nb_frames < frame_count:
166 if self.is_loaded_playing() and self.loop != 0:
167 self.loop -= 1
168 self.current_frame = 0
169 [new_data, new_nb_frames] = self.get_next_sample(
170 frame_count - nb_frames)
171 data += new_data
172 nb_frames += new_nb_frames
173 elif nb_frames == 0:
174 # FIXME: too slow
175 threading.Thread(
176 name="MSFinishedCallback",
177 target=self.finished_callback).start()
178
179 return data.ljust(out_data_length, b'\0')
180
181 def get_next_sample(self, frame_count):
182 fw = self.audio_segment_frame_width
183
184 data = b""
185 nb_frames = 0
186 if self.a_s_with_effect is not None:
187 segment = self.a_s_with_effect
188 max_val = int(segment.frame_count())
189
190 start_i = max(self.current_frame_with_effect, 0)
191 end_i = min(self.current_frame_with_effect + frame_count, max_val)
192
193 data += segment._data[start_i*fw : end_i*fw]
194
195 frame_count = max(
196 0,
197 self.current_frame_with_effect + frame_count - max_val)
198
199 self.current_frame_with_effect += end_i - start_i
200 self.current_frame += end_i - start_i
201 nb_frames += end_i - start_i
202
203 if frame_count > 0:
204 self.a_s_with_effect = None
205
206 segment = self.current_audio_segment
207 max_val = int(segment.frame_count())
208
209 start_i = max(self.current_frame, 0)
210 end_i = min(self.current_frame + frame_count, max_val)
211 data += segment._data[start_i*fw : end_i*fw]
212 nb_frames += end_i - start_i
213 self.current_frame += end_i - start_i
214
215 data = audioop.mul(data, Config.sample_width, self.volume_factor)
216
217 return [data, nb_frames]
218
219 def seek(self, value=0, delta=False):
220 # We don't want to do that while stopping
221 if not (self.is_loaded_playing() or self.is_loaded_paused()):
222 return
223 with self.music_lock:
224 self.a_s_with_effect = None
225 self.current_frame = max(
226 0,
227 int(delta) * self.current_frame
228 + int(value * self.audio_segment.frame_rate))
229 # FIXME: si on fait un seek + delta, adapter le "loop"
230
231 def stop(self, fade_out=0, wait=False):
232 if self.is_loaded_playing():
233 ms = int(self.sound_position * 1000)
234 ms_fo = max(1, int(fade_out * 1000))
235
236 new_audio_segment = self.current_audio_segment[: ms+ms_fo] \
237 .fade_out(ms_fo)
238 with self.music_lock:
239 self.current_audio_segment = new_audio_segment
240 self.stop_playing()
241 if wait:
242 self.wait_end()
243 else:
244 self.stop_playing()
245 self.stopped()
246
247 def set_gain(self, db_gain):
248 self.db_gain += db_gain
249 self.volume_factor = 10 ** (self.db_gain / 20)
250
251 def set_volume(self, value, delta=False):
252 [db_gain, self.volume] = gain(
253 value + int(delta) * self.volume,
254 self.volume)
255
256 self.set_gain(db_gain)
257
258 def wait_end(self):
259 self.wait_event.clear()
260 self.wait_event.wait()
261