import threading import pydub import time from transitions.extensions import HierarchicalMachine as Machine import pyaudio as pa import sounddevice as sd import os.path from .lock import Lock from . import gain file_lock = Lock("file") pyaudio = pa.PyAudio() class MusicFile(Machine): def __init__(self, filename, mapping, name = None, gain = 1): states = [ 'initial', 'loading', 'failed', { 'name': 'loaded', 'children': ['stopped', 'playing', 'paused', 'stopping'] } ] transitions = [ { 'trigger': 'load', 'source': 'initial', 'dest': 'loading'}, { 'trigger': 'fail', 'source': 'loading', 'dest': 'failed'}, { 'trigger': 'success', 'source': 'loading', 'dest': 'loaded_stopped'}, { 'trigger': 'start_playing', 'source': 'loaded_stopped', 'dest': 'loaded_playing'}, { 'trigger': 'pause', 'source': 'loaded_playing', 'dest': 'loaded_paused'}, { 'trigger': 'unpause', 'source': 'loaded_paused', 'dest': 'loaded_playing'}, { 'trigger': 'stop_playing', 'source': ['loaded_playing','loaded_paused'], 'dest': 'loaded_stopping'}, { 'trigger': 'stopped', 'source': 'loaded_stopping', 'dest': 'loaded_stopped'} ] Machine.__init__(self, states=states, transitions=transitions, initial='initial') self.volume = 100 self.mapping = mapping self.filename = filename self.stream = None self.name = name or filename self.audio_segment = None self.volume_factor = gain self.music_lock = Lock("music__" + filename) self.wait_event = threading.Event() threading.Thread(name = "MSMusicLoad", target = self.load).start() def on_enter_loading(self): with file_lock: try: print("Loading « {} »".format(self.name)) db_gain = gain(self.volume_factor * 100) self.audio_segment = pydub.AudioSegment.from_file(self.filename).set_frame_rate(44100).apply_gain(db_gain) self.sound_duration = self.audio_segment.duration_seconds except Exception as e: print("failed to load « {} »: {}".format(self.name, e)) self.loading_error = e self.fail() else: self.success() print("Loaded « {} »".format(self.name)) def check_is_loaded(self): return self.state.startswith('loaded_') def is_not_stopped(self): return self.check_is_loaded() and not self.is_loaded_stopped() def is_paused(self): return self.is_loaded_paused() @property def sound_position(self): if self.is_not_stopped(): return self.current_frame / self.current_audio_segment.frame_rate else: return 0 def play(self, fade_in = 0, volume = 100, start_at = 0): db_gain = gain(volume) + self.mapping.master_gain self.volume = volume ms = int(start_at * 1000) ms_fi = max(1, int(fade_in * 1000)) with self.music_lock: self.current_audio_segment = (self.audio_segment + db_gain).fade(from_gain=-120, duration=ms_fi, start=ms) self.before_loaded_playing(initial_frame = int(start_at * self.audio_segment.frame_rate)) self.start_playing() def before_loaded_playing(self, initial_frame = 0): self.current_frame = initial_frame with self.music_lock: segment = self.current_audio_segment self.stream = sd.RawOutputStream(samplerate=segment.frame_rate, channels=segment.channels, dtype='int' + str(8*segment.sample_width), # FIXME: ? latency=1., callback=self.play_callback, finished_callback=self.finished_callback ) def on_enter_loaded_playing(self): self.stream.start() def on_enter_loaded_paused(self): self.stream.stop() def finished_callback(self): if self.is_loaded_playing(): self.stop_playing() if self.is_loaded_stopping(): self.stopped() def on_enter_loaded_stopped(self): self.wait_event.set() def play_callback(self, out_data, frame_count, time_info, status_flags): with self.music_lock: audio_segment = self.current_audio_segment.get_sample_slice_data( start_sample=self.current_frame, end_sample=self.current_frame + frame_count ) self.current_frame += frame_count if len(audio_segment) == 0: raise sd.CallbackStop out_data[:] = audio_segment.ljust(len(out_data), b'\0') def stop(self, fade_out = 0, wait = False): if self.is_loaded_playing(): ms = int(self.sound_position * 1000) ms_fo = max(1, int(fade_out * 1000)) with self.music_lock: self.current_audio_segment = self.current_audio_segment[:ms + ms_fo].fade_out(ms_fo) self.stop_playing() if wait: self.wait_end() else: self.stop_playing() self.stopped() def set_gain(self, db_gain): if not self.is_not_stopped(): return new_audio_segment = self.current_audio_segment + db_gain with self.music_lock: self.current_audio_segment = new_audio_segment def set_volume(self, value, delta = False): [db_gain, self.volume] = gain(value + int(delta) * self.volume, self.volume) self.set_gain(db_gain) def wait_end(self): self.wait_event.clear() self.wait_event.wait() # Add some more functions to AudioSegments def get_sample_slice_data(self, start_sample=0, end_sample=float('inf')): max_val = int(self.frame_count()) start_i = max(start_sample, 0) * self.frame_width end_i = min(end_sample, max_val) * self.frame_width return self._data[start_i:end_i] pydub.AudioSegment.get_sample_slice_data = get_sample_slice_data