From 63ba5a8dc2aa4ec3e6f203b0ba4db249ecf0b00e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Isma=C3=ABl=20Bouya?= Date: Wed, 27 Jul 2016 21:33:09 +0200 Subject: Rename helpers to music_sampler --- music_sampler/__init__.py | 192 ++++++++++++++ music_sampler/action.py | 113 +++++++++ music_sampler/actions/__init__.py | 10 + music_sampler/actions/interrupt_wait.py | 5 + music_sampler/actions/pause.py | 10 + music_sampler/actions/play.py | 44 ++++ music_sampler/actions/run_command.py | 13 + music_sampler/actions/seek.py | 19 ++ music_sampler/actions/stop.py | 42 ++++ music_sampler/actions/stop_all_actions.py | 14 ++ music_sampler/actions/unpause.py | 10 + music_sampler/actions/volume.py | 28 +++ music_sampler/actions/wait.py | 40 +++ music_sampler/key.py | 280 +++++++++++++++++++++ music_sampler/lock.py | 23 ++ music_sampler/mapping.py | 399 ++++++++++++++++++++++++++++++ music_sampler/mixer.py | 63 +++++ music_sampler/music_effect.py | 62 +++++ music_sampler/music_file.py | 378 ++++++++++++++++++++++++++++ music_sampler/sysfont.py | 224 +++++++++++++++++ 20 files changed, 1969 insertions(+) create mode 100644 music_sampler/__init__.py create mode 100644 music_sampler/action.py create mode 100644 music_sampler/actions/__init__.py create mode 100644 music_sampler/actions/interrupt_wait.py create mode 100644 music_sampler/actions/pause.py create mode 100644 music_sampler/actions/play.py create mode 100644 music_sampler/actions/run_command.py create mode 100644 music_sampler/actions/seek.py create mode 100644 music_sampler/actions/stop.py create mode 100644 music_sampler/actions/stop_all_actions.py create mode 100644 music_sampler/actions/unpause.py create mode 100644 music_sampler/actions/volume.py create mode 100644 music_sampler/actions/wait.py create mode 100644 music_sampler/key.py create mode 100644 music_sampler/lock.py create mode 100644 music_sampler/mapping.py create mode 100644 music_sampler/mixer.py create mode 100644 music_sampler/music_effect.py create mode 100644 music_sampler/music_file.py create mode 100644 music_sampler/sysfont.py (limited to 'music_sampler') diff --git a/music_sampler/__init__.py b/music_sampler/__init__.py new file mode 100644 index 0000000..4827e6c --- /dev/null +++ b/music_sampler/__init__.py @@ -0,0 +1,192 @@ +# -*- coding: utf-8 -*- +import argparse +import sys +import os +import math +import sounddevice as sd +import logging + +from . import sysfont + +class Config: + pass + +def find_font(name, style=sysfont.STYLE_NONE): + if getattr(sys, 'frozen', False): + font = sys._MEIPASS + "/fonts/{}_{}.ttf".format(name, style) + else: + font = sysfont.get_font(name, style=style) + if font is not None: + font = font[4] + return font + +def register_fonts(): + from kivy.core.text import LabelBase + + ubuntu_regular = find_font("Ubuntu", style=sysfont.STYLE_NORMAL) + ubuntu_bold = find_font("Ubuntu", style=sysfont.STYLE_BOLD) + symbola = find_font("Symbola") + + if ubuntu_regular is None: + error_print("Font Ubuntu regular could not be found, please install it.") + sys.exit() + if symbola is None: + error_print("Font Symbola could not be found, please install it.") + sys.exit() + if ubuntu_bold is None: + warn_print("Font Ubuntu Bold could not be found.") + + LabelBase.register(name="Ubuntu", + fn_regular=ubuntu_regular, + fn_bold=ubuntu_bold) + LabelBase.register(name="Symbola", + fn_regular=symbola) + + +def path(): + if getattr(sys, 'frozen', False): + return sys._MEIPASS + "/" + else: + path = os.path.dirname(os.path.realpath(__file__)) + return path + "/../" + +def parse_args(): + argv = sys.argv[1 :] + sys.argv = sys.argv[: 1] + if "--" in argv: + index = argv.index("--") + kivy_args = argv[index+1 :] + argv = argv[: index] + + sys.argv.extend(kivy_args) + + parser = argparse.ArgumentParser( + description="A Music Sampler application.", + formatter_class=argparse.ArgumentDefaultsHelpFormatter) + parser.add_argument("-c", "--config", + default="config.yml", + required=False, + help="Config file to load") + parser.add_argument("-p", "--music-path", + default=".", + required=False, + help="Folder in which to find the music files") + parser.add_argument("-d", "--debug", + nargs=0, + action=DebugModeAction, + help="Print messages in console") + parser.add_argument("-m", "--builtin-mixing", + action="store_true", + help="Make the mixing of sounds manually\ + (do it if the system cannot handle it correctly)") + parser.add_argument("-l", "--latency", + default="high", + required=False, + help="Latency: low, high or number of seconds") + parser.add_argument("-b", "--blocksize", + default=0, + type=int, + required=False, + help="Blocksize: If not 0, the number of frames to take\ + at each step for the mixer") + parser.add_argument("-f", "--frame-rate", + default=44100, + type=int, + required=False, + help="Frame rate to play the musics") + parser.add_argument("-x", "--channels", + default=2, + type=int, + required=False, + help="Number of channels to use") + parser.add_argument("-s", "--sample-width", + default=2, + type=int, + required=False, + help="Sample width (number of bytes for each frame)") + parser.add_argument("-V", "--version", + action="version", + help="Displays the current version and exits. Only use\ + in bundled package", + version=show_version()) + parser.add_argument("--device", + action=SelectDeviceAction, + help="Select this sound device" + ) + parser.add_argument("--list-devices", + nargs=0, + action=ListDevicesAction, + help="List available sound devices" + ) + parser.add_argument('--', + dest="args", + help="Kivy arguments. All arguments after this are interpreted\ + by Kivy. Pass \"-- --help\" to get Kivy's usage.") + + from kivy.logger import Logger + Logger.setLevel(logging.WARN) + + args = parser.parse_args(argv) + + Config.yml_file = args.config + + Config.latency = args.latency + Config.blocksize = args.blocksize + Config.frame_rate = args.frame_rate + Config.channels = args.channels + Config.sample_width = args.sample_width + Config.builtin_mixing = args.builtin_mixing + if args.music_path.endswith("/"): + Config.music_path = args.music_path + else: + Config.music_path = args.music_path + "/" + +class DebugModeAction(argparse.Action): + def __call__(self, parser, namespace, values, option_string=None): + from kivy.logger import Logger + Logger.setLevel(logging.DEBUG) + +class SelectDeviceAction(argparse.Action): + def __call__(self, parser, namespace, values, option_string=None): + sd.default.device = values + +class ListDevicesAction(argparse.Action): + nargs = 0 + def __call__(self, parser, namespace, values, option_string=None): + print(sd.query_devices()) + sys.exit() + +def show_version(): + if getattr(sys, 'frozen', False): + with open(path() + ".pyinstaller_commit", "r") as f: + return f.read() + else: + return "option '-v' can only be used in bundled package" + +def duration_to_min_sec(duration): + minutes = int(duration / 60) + seconds = int(duration) % 60 + if minutes < 100: + return "{:2}:{:0>2}".format(minutes, seconds) + else: + return "{}:{:0>2}".format(minutes, seconds) + +def gain(volume, old_volume=None): + if old_volume is None: + return 20 * math.log10(max(volume, 0.1) / 100) + else: + return [ + 20 * math.log10(max(volume, 0.1) / max(old_volume, 0.1)), + max(volume, 0)] + +def debug_print(message, with_trace=False): + from kivy.logger import Logger + Logger.debug('MusicSampler: ' + message, exc_info=with_trace) + +def error_print(message, with_trace=False): + from kivy.logger import Logger + Logger.error('MusicSampler: ' + message, exc_info=with_trace) + +def warn_print(message, with_trace=False): + from kivy.logger import Logger + Logger.warn('MusicSampler: ' + message, exc_info=with_trace) diff --git a/music_sampler/action.py b/music_sampler/action.py new file mode 100644 index 0000000..4b5a71d --- /dev/null +++ b/music_sampler/action.py @@ -0,0 +1,113 @@ +from transitions.extensions import HierarchicalMachine as Machine +from . import debug_print, error_print +from . import actions + +class Action: + STATES = [ + 'initial', + 'loading', + 'failed', + { + 'name': 'loaded', + 'children': ['running'] + } + ] + + TRANSITIONS = [ + { + 'trigger': 'load', + 'source': 'initial', + 'dest': 'loading' + }, + { + 'trigger': 'fail', + 'source': 'loading', + 'dest': 'failed', + 'after': 'poll_loaded' + }, + { + 'trigger': 'success', + 'source': 'loading', + 'dest': 'loaded', + 'after': 'poll_loaded' + }, + { + 'trigger': 'run', + 'source': 'loaded', + 'dest': 'loaded_running', + 'after': 'finish_action', + # if a child has no transitions, then it is bubbled to the parent, + # and we don't want that. Not useful in that machine precisely. + 'conditions': ['is_loaded'] + }, + { + 'trigger': 'finish_action', + 'source': 'loaded_running', + 'dest': 'loaded' + } + ] + + def __init__(self, action, key, **kwargs): + Machine(model=self, states=self.STATES, + transitions=self.TRANSITIONS, initial='initial', + ignore_invalid_triggers=True, queued=True) + + self.action = action + self.key = key + self.mapping = key.parent + self.arguments = kwargs + self.sleep_event = None + self.waiting_music = None + + def is_loaded_or_failed(self): + return self.is_loaded(allow_substates=True) or self.is_failed() + + def callback_music_loaded(self, success): + if success: + self.success() + else: + self.fail() + + # Machine states / events + def on_enter_loading(self): + if hasattr(actions, self.action): + if 'music' in self.arguments: + self.arguments['music'].subscribe_loaded( + self.callback_music_loaded) + else: + self.success() + else: + error_print("Unknown action {}".format(self.action)) + self.fail() + + def on_enter_loaded_running(self, key_start_time): + debug_print(self.description()) + if hasattr(actions, self.action): + getattr(actions, self.action).run(self, + key_start_time=key_start_time, **self.arguments) + + def poll_loaded(self): + self.key.callback_action_ready(self, + self.is_loaded(allow_substates=True)) + + # This one cannot be in the Machine state since it would be queued to run + # *after* the wait is ended... + def interrupt(self): + if getattr(actions, self.action, None) and\ + hasattr(getattr(actions, self.action), 'interrupt'): + return getattr(getattr(actions, self.action), 'interrupt')( + self, **self.arguments) + + # Helpers + def music_list(self, music): + if music is not None: + return [music] + else: + return self.mapping.open_files.values() + + def description(self): + if hasattr(actions, self.action): + return getattr(actions, self.action)\ + .description(self, **self.arguments) + else: + return "unknown action {}".format(self.action) diff --git a/music_sampler/actions/__init__.py b/music_sampler/actions/__init__.py new file mode 100644 index 0000000..658cef0 --- /dev/null +++ b/music_sampler/actions/__init__.py @@ -0,0 +1,10 @@ +from . import interrupt_wait +from . import pause +from . import play +from . import run_command +from . import seek +from . import stop +from . import stop_all_actions +from . import unpause +from . import volume +from . import wait diff --git a/music_sampler/actions/interrupt_wait.py b/music_sampler/actions/interrupt_wait.py new file mode 100644 index 0000000..8f465f0 --- /dev/null +++ b/music_sampler/actions/interrupt_wait.py @@ -0,0 +1,5 @@ +def run(action, wait_id=None, **kwargs): + action.mapping.interrupt_wait(wait_id) + +def description(action, wait_id=None, **kwargs): + return "interrupt wait with id {}".format(wait_id) diff --git a/music_sampler/actions/pause.py b/music_sampler/actions/pause.py new file mode 100644 index 0000000..bb27734 --- /dev/null +++ b/music_sampler/actions/pause.py @@ -0,0 +1,10 @@ +def run(action, music=None, **kwargs): + for music in action.music_list(music): + if music.is_loaded_playing(): + music.pause() + +def description(action, music=None, **kwargs): + if music is not None: + return "pausing « {} »".format(music.name) + else: + return "pausing all musics" diff --git a/music_sampler/actions/play.py b/music_sampler/actions/play.py new file mode 100644 index 0000000..fdba95b --- /dev/null +++ b/music_sampler/actions/play.py @@ -0,0 +1,44 @@ +def run(action, music=None, fade_in=0, start_at=0, + restart_if_running=False, volume=100, + loop=0, **kwargs): + for music in action.music_list(music): + if restart_if_running: + if music.is_in_use(): + music.stop() + music.play( + volume=volume, + fade_in=fade_in, + start_at=start_at, + loop=loop) + elif not music.is_in_use(): + music.play( + volume=volume, + fade_in=fade_in, + start_at=start_at, + loop=loop) + +def description(action, music=None, fade_in=0, start_at=0, + restart_if_running=False, volume=100, loop=0, **kwargs): + message = "starting " + if music is not None: + message += "« {} »".format(music.name) + else: + message += "all musics" + + if start_at != 0: + message += " at {}s".format(start_at) + + if fade_in != 0: + message += " with {}s fade_in".format(fade_in) + + message += " at volume {}%".format(volume) + + if loop > 0: + message += " {} times".format(loop + 1) + elif loop < 0: + message += " in loop" + + if restart_if_running: + message += " (restarting if already running)" + + return message diff --git a/music_sampler/actions/run_command.py b/music_sampler/actions/run_command.py new file mode 100644 index 0000000..1e80c1e --- /dev/null +++ b/music_sampler/actions/run_command.py @@ -0,0 +1,13 @@ +import shlex, subprocess + +def run(action, command="", wait=False, **kwargs): + action.process = subprocess.Popen(command, shell=True) + if wait: + action.process.wait() + +def description(action, command="", wait=False, **kwargs): + message = "running command {}".format(command) + if wait: + message += " (waiting for its execution to finish)" + + return message diff --git a/music_sampler/actions/seek.py b/music_sampler/actions/seek.py new file mode 100644 index 0000000..467af7d --- /dev/null +++ b/music_sampler/actions/seek.py @@ -0,0 +1,19 @@ +def run(action, music=None, value=0, delta=False, **kwargs): + for music in action.music_list(music): + music.seek(value=value, delta=delta) + +def description(action, music=None, value=0, delta=False, **kwargs): + if delta: + if music is not None: + return "moving music « {} » by {:+d}s" \ + .format(music.name, value) + else: + return "moving all musics by {:+d}s" \ + .format(value) + else: + if music is not None: + return "moving music « {} » to position {}s" \ + .format(music.name, value) + else: + return "moving all musics to position {}s" \ + .format(value) diff --git a/music_sampler/actions/stop.py b/music_sampler/actions/stop.py new file mode 100644 index 0000000..88cc66d --- /dev/null +++ b/music_sampler/actions/stop.py @@ -0,0 +1,42 @@ +def run(action, music=None, fade_out=0, wait=False, + set_wait_id=None, **kwargs): + previous = None + for music in action.music_list(music): + if music.is_loaded_paused() or music.is_loaded_playing(): + if previous is not None: + previous.stop(fade_out=fade_out) + previous = music + else: + music.stop(fade_out=fade_out) + + if previous is not None: + action.waiting_music = previous + previous.stop( + fade_out=fade_out, + wait=wait, + set_wait_id=set_wait_id) + +def description(action, music=None, fade_out=0, wait=False, + set_wait_id=None, **kwargs): + + message = "stopping " + if music is not None: + message += "music « {} »".format(music.name) + else: + message += "all musics" + + if fade_out > 0: + message += " with {}s fadeout".format(fade_out) + if wait: + if set_wait_id is not None: + message += " (waiting the end of fadeout, with id {})"\ + .format(set_wait_id) + else: + message += " (waiting the end of fadeout)" + + return message + +def interrupt(action, music=None, fade_out=0, wait=False, + set_wait_id=None, **kwargs): + if action.waiting_music is not None: + action.waiting_music.wait_event.set() diff --git a/music_sampler/actions/stop_all_actions.py b/music_sampler/actions/stop_all_actions.py new file mode 100644 index 0000000..4ea875a --- /dev/null +++ b/music_sampler/actions/stop_all_actions.py @@ -0,0 +1,14 @@ +def run(action, key_start_time=0, other_only=False, **kwargs): + if other_only: + action.mapping.stop_all_running( + except_key=action.key, + key_start_time=key_start_time) + else: + action.mapping.stop_all_running() + +def description(action, other_only=False, **kwargs): + message = "stopping all actions" + if other_only: + message += " except this key" + + return message diff --git a/music_sampler/actions/unpause.py b/music_sampler/actions/unpause.py new file mode 100644 index 0000000..5fa88c3 --- /dev/null +++ b/music_sampler/actions/unpause.py @@ -0,0 +1,10 @@ +def run(action, music=None, **kwargs): + for music in action.music_list(music): + if music.is_loaded_paused(): + music.unpause() + +def description(action, music=None, **kwargs): + if music is not None: + return "unpausing « {} »".format(music.name) + else: + return "unpausing all musics" diff --git a/music_sampler/actions/volume.py b/music_sampler/actions/volume.py new file mode 100644 index 0000000..7dda3c1 --- /dev/null +++ b/music_sampler/actions/volume.py @@ -0,0 +1,28 @@ +def run(action, music=None, value=100, fade=0, delta=False, **kwargs): + if music is not None: + music.set_volume(value, delta=delta, fade=fade) + else: + action.mapping.set_master_volume(value, delta=delta, fade=fade) + +def description(action, music=None, + value=100, delta=False, fade=0, **kwargs): + message = "" + if delta: + if music is not None: + message += "{:+d}% to volume of « {} »" \ + .format(value, music.name) + else: + message += "{:+d}% to volume" \ + .format(value) + else: + if music is not None: + message += "setting volume of « {} » to {}%" \ + .format(music.name, value) + else: + message += "setting volume to {}%" \ + .format(value) + + if fade > 0: + message += " with {}s fade".format(fade) + + return message diff --git a/music_sampler/actions/wait.py b/music_sampler/actions/wait.py new file mode 100644 index 0000000..ea42408 --- /dev/null +++ b/music_sampler/actions/wait.py @@ -0,0 +1,40 @@ +import threading + +def run(action, duration=0, music=None, set_wait_id=None, **kwargs): + if set_wait_id is not None: + action.mapping.add_wait_id(set_wait_id, action) + + action.sleep_event = threading.Event() + action.sleep_event_timer = threading.Timer( + duration, + action.sleep_event.set) + + if music is not None: + music.wait_end() + + action.sleep_event_timer.start() + action.sleep_event.wait() + +def description(action, duration=0, music=None, set_wait_id=None, **kwargs): + message = "" + if music is None: + message += "waiting {}s" \ + .format(duration) + elif duration == 0: + message += "waiting the end of « {} »" \ + .format(music.name) + else: + message += "waiting the end of « {} » + {}s" \ + .format(music.name, duration) + + if set_wait_id is not None: + message += " (setting id = {})".format(set_wait_id) + + return message + +def interrupt(action, duration=0, music=None, **kwargs): + if action.sleep_event is not None: + action.sleep_event.set() + action.sleep_event_timer.cancel() + if music is not None: + music.wait_event.set() diff --git a/music_sampler/key.py b/music_sampler/key.py new file mode 100644 index 0000000..66e792d --- /dev/null +++ b/music_sampler/key.py @@ -0,0 +1,280 @@ +from kivy.uix.widget import Widget +from kivy.properties import AliasProperty, BooleanProperty, \ + ListProperty, StringProperty +from kivy.uix.behaviors import ButtonBehavior + +from .action import Action +from . import debug_print +import time +import threading +from transitions.extensions import HierarchicalMachine as Machine + +class Key(ButtonBehavior, Widget): + STATES = [ + 'initial', + 'configuring', + 'configured', + 'loading', + 'failed', + { + 'name': 'loaded', + 'children': [ + 'no_config', + 'no_actions', + 'running', + 'protecting_repeat' + ] + } + ] + + TRANSITIONS = [ + { + 'trigger': 'configure', + 'source': 'initial', + 'dest': 'configuring' + }, + { + 'trigger': 'fail', + 'source': 'configuring', + 'dest': 'failed', + 'after': 'key_loaded_callback' + }, + { + 'trigger': 'success', + 'source': 'configuring', + 'dest': 'configured', + 'after': 'load' + }, + { + 'trigger': 'no_config', + 'source': 'configuring', + 'dest': 'loaded_no_config', + 'after': 'key_loaded_callback' + }, + { + 'trigger': 'load', + 'source': 'configured', + 'dest': 'loading' + }, + { + 'trigger': 'fail', + 'source': 'loading', + 'dest': 'failed', + 'after': 'key_loaded_callback' + }, + { + 'trigger': 'success', + 'source': 'loading', + 'dest': 'loaded', + 'after': 'key_loaded_callback' + }, + { + 'trigger': 'no_actions', + 'source': 'loading', + 'dest': 'loaded_no_actions', + 'after': 'key_loaded_callback' + }, + { + 'trigger': 'reload', + 'source': ['loaded','failed'], + 'dest': 'configuring', + 'after': 'key_loaded_callback' + }, + { + 'trigger': 'run', + 'source': 'loaded', + 'dest': 'loaded_running', + 'after': ['run_actions', 'finish'], + # if a child, like loaded_no_actions, has no transitions, then it + # is bubbled to the parent, and we don't want that. + 'conditions': ['is_loaded'] + }, + { + 'trigger': 'finish', + 'source': 'loaded_running', + 'dest': 'loaded_protecting_repeat' + }, + { + 'trigger': 'repeat_protection_finished', + 'source': 'loaded_protecting_repeat', + 'dest': 'loaded' + }, + ] + + key_sym = StringProperty(None) + custom_color = ListProperty([0, 1, 0]) + description_title = StringProperty("") + description = ListProperty([]) + state = StringProperty("") + + def get_alias_line_cross_color(self): + if not self.is_failed() and ( + not self.is_loaded(allow_substates=True)\ + or self.is_loaded_running()\ + or self.is_loaded_protecting_repeat()): + return [120/255, 120/255, 120/255, 1] + else: + return [0, 0, 0, 0] + + def set_alias_line_cross_color(self): + pass + + line_cross_color = AliasProperty( + get_alias_line_cross_color, + set_alias_line_cross_color, + bind=['state']) + + def get_alias_line_color(self): + if self.is_loaded_running(): + return [0, 0, 0, 1] + else: + return [120/255, 120/255, 120/255, 1] + + def set_alias_line_color(self): + pass + + line_color = AliasProperty(get_alias_line_color, set_alias_line_color, + bind=['state']) + + def get_alias_color(self): + if self.is_loaded_inactive(): + return [1, 1, 1, 1] + elif self.is_loaded_protecting_repeat(): + return [*self.custom_color, 100/255] + elif self.is_loaded_running(): + return [*self.custom_color, 100/255] + elif self.is_loaded(allow_substates=True): + return [*self.custom_color, 1] + elif self.is_failed(): + return [0, 0, 0, 1] + else: + return [*self.custom_color, 100/255] + def set_alias_color(self): + pass + + color = AliasProperty(get_alias_color, set_alias_color, + bind=['state', 'custom_color']) + + def __init__(self, **kwargs): + self.actions = [] + self.current_action = None + + Machine(model=self, states=self.STATES, + transitions=self.TRANSITIONS, initial='initial', + ignore_invalid_triggers=True, queued=True) + super(Key, self).__init__(**kwargs) + + # Kivy events + def on_key_sym(self, key, key_sym): + if key_sym != "": + self.configure() + + def on_press(self): + self.list_actions() + + # Machine states / events + def is_loaded_or_failed(self): + return self.is_loaded(allow_substates=True) or self.is_failed() + + def is_loaded_inactive(self): + return self.is_loaded_no_config() or self.is_loaded_no_actions() + + def on_enter_configuring(self): + if self.key_sym in self.parent.key_config: + self.config = self.parent.key_config[self.key_sym] + + self.actions = [] + for key_action in self.config['actions']: + self.add_action(key_action[0], **key_action[1]) + + if 'description' in self.config['properties']: + self.set_description(self.config['properties']['description']) + if 'color' in self.config['properties']: + self.set_color(self.config['properties']['color']) + self.success() + else: + self.no_config() + + def on_enter_loading(self): + if len(self.actions) > 0: + for action in self.actions: + action.load() + else: + self.no_actions() + + def run_actions(self, modifiers): + self.parent.parent.ids['KeyList'].append(self.key_sym) + debug_print("running actions for {}".format(self.key_sym)) + start_time = time.time() + self.parent.start_running(self, start_time) + for self.current_action in self.actions: + if self.parent.keep_running(self, start_time): + self.list_actions() + self.current_action.run(start_time) + self.list_actions(last_action_finished=True) + + self.parent.finished_running(self, start_time) + + def on_enter_loaded_protecting_repeat(self, modifiers): + if 'repeat_delay' in self.config['properties']: + self.protecting_repeat_timer = threading.Timer( + self.config['properties']['repeat_delay'], + self.repeat_protection_finished) + self.protecting_repeat_timer.start() + else: + self.repeat_protection_finished() + + # This one cannot be in the Machine state since it would be queued to run + # *after* the loop is ended... + def interrupt(self): + self.current_action.interrupt() + + # Callbacks + def key_loaded_callback(self): + self.parent.key_loaded_callback() + + def callback_action_ready(self, action, success): + if not success: + self.fail() + elif all(action.is_loaded_or_failed() for action in self.actions): + self.success() + + # Setters + def set_description(self, description): + if description[0] is not None: + self.description_title = str(description[0]) + self.description = [] + for desc in description[1 :]: + if desc is None: + self.description.append("") + else: + self.description.append(str(desc).replace(" ", " ")) + + def set_color(self, color): + color = [x / 255 for x in color] + self.custom_color = color + + # Actions handling + def add_action(self, action_name, **arguments): + self.actions.append(Action(action_name, self, **arguments)) + + def list_actions(self, last_action_finished=False): + not_running = (not self.is_loaded_running()) + current_action_seen = False + action_descriptions = [] + for action in self.actions: + if not_running: + state = "inactive" + elif last_action_finished: + state = "done" + elif current_action_seen: + state = "pending" + elif action == self.current_action: + current_action_seen = True + state = "current" + else: + state = "done" + action_descriptions.append([action.description(), state]) + self.parent.parent.ids['ActionList'].update_list( + self, + action_descriptions) diff --git a/music_sampler/lock.py b/music_sampler/lock.py new file mode 100644 index 0000000..9beafcd --- /dev/null +++ b/music_sampler/lock.py @@ -0,0 +1,23 @@ +import threading + +from . import debug_print + +class Lock: + def __init__(self, lock_type): + self.type = lock_type + self.lock = threading.RLock() + + def __enter__(self, *args, **kwargs): + self.acquire(*args, **kwargs) + + def __exit__(self, type, value, traceback, *args, **kwargs): + self.release(*args, **kwargs) + + def acquire(self, *args, **kwargs): + #debug_print("acquiring lock for {}".format(self.type)) + self.lock.acquire(*args, **kwargs) + + def release(self, *args, **kwargs): + #debug_print("releasing lock for {}".format(self.type)) + self.lock.release(*args, **kwargs) + diff --git a/music_sampler/mapping.py b/music_sampler/mapping.py new file mode 100644 index 0000000..bb20e67 --- /dev/null +++ b/music_sampler/mapping.py @@ -0,0 +1,399 @@ +from kivy.uix.relativelayout import RelativeLayout +from kivy.properties import NumericProperty, ListProperty, StringProperty +from kivy.core.window import Window +from kivy.clock import Clock + +import threading +import yaml +import sys +from collections import defaultdict + +from transitions.extensions import HierarchicalMachine as Machine + +from .music_file import MusicFile +from .mixer import Mixer +from . import Config, gain, error_print, warn_print +from .action import Action + +class Mapping(RelativeLayout): + STATES = [ + 'initial', + 'configuring', + 'configured', + 'loading', + 'loaded', + 'failed' + ] + + TRANSITIONS = [ + { + 'trigger': 'configure', + 'source': 'initial', + 'dest': 'configuring' + }, + { + 'trigger': 'fail', + 'source': 'configuring', + 'dest': 'failed' + }, + { + 'trigger': 'success', + 'source': 'configuring', + 'dest': 'configured', + 'after': 'load' + }, + { + 'trigger': 'load', + 'source': 'configured', + 'dest': 'loading' + }, + { + 'trigger': 'fail', + 'source': 'loading', + 'dest': 'failed' + }, + { + 'trigger': 'success', + 'source': 'loading', + 'dest': 'loaded' + }, + { + 'trigger': 'reload', + 'source': 'loaded', + 'dest': 'configuring' + } + ] + + master_volume = NumericProperty(100) + ready_color = ListProperty([1, 165/255, 0, 1]) + state = StringProperty("") + + def __init__(self, **kwargs): + self.keys = [] + self.running = [] + self.wait_ids = {} + self.open_files = {} + + Machine(model=self, states=self.STATES, + transitions=self.TRANSITIONS, initial='initial', + ignore_invalid_triggers=True, queued=True) + super(Mapping, self).__init__(**kwargs) + self.keyboard = Window.request_keyboard(self.on_keyboard_closed, self) + self.keyboard.bind(on_key_down=self.on_keyboard_down) + + self.configure() + + def on_enter_configuring(self): + if Config.builtin_mixing: + self.mixer = Mixer() + else: + self.mixer = None + + try: + self.key_config, self.open_files = self.parse_config() + except Exception as e: + error_print("Error while loading configuration: {}".format(e), + with_trace=True) + sys.exit() + else: + self.success() + + def on_enter_loading(self): + for key in self.keys: + key.reload() + self.success() + + # Kivy events + def add_widget(self, widget, index=0): + if type(widget).__name__ == "Key" and widget not in self.keys: + self.keys.append(widget) + return super(Mapping, self).add_widget(widget, index) + + def remove_widget(self, widget, index=0): + if type(widget).__name__ == "Key" and widget in self.keys: + self.keys.remove(widget) + return super(Mapping, self).remove_widget(widget, index) + + def on_keyboard_closed(self): + self.keyboard.unbind(on_key_down=self.on_keyboard_down) + self.keyboard = None + + def on_keyboard_down(self, keyboard, keycode, text, modifiers): + key = self.find_by_key_code(keycode) + if self.allowed_modifiers(modifiers) and key is not None: + modifiers.sort() + threading.Thread(name="MSKeyAction", target=key.run, + args=['-'.join(modifiers)]).start() + elif 'ctrl' in modifiers and (keycode[0] == 113 or keycode[0] == '99'): + self.stop_all_running() + for thread in threading.enumerate(): + if thread.getName()[0:2] != "MS": + continue + thread.join() + + sys.exit() + elif 'ctrl' in modifiers and keycode[0] == 114: + threading.Thread(name="MSReload", target=self.reload).start() + return True + + # Helpers + def allowed_modifiers(self, modifiers): + allowed = [] + return len([a for a in modifiers if a not in allowed]) == 0 + + def find_by_key_code(self, key_code): + if "Key_" + str(key_code[0]) in self.ids: + return self.ids["Key_" + str(key_code[0])] + return None + + def all_keys_ready(self): + partial = False + for key in self.keys: + if not key.is_loaded_or_failed(): + return "not_ready" + partial = partial or key.is_failed() + + if partial: + return "partial" + else: + return "success" + + # Callbacks + def key_loaded_callback(self): + result = self.all_keys_ready() + if result == "success": + self.ready_color = [0, 1, 0, 1] + elif result == "partial": + self.ready_color = [1, 0, 0, 1] + else: + self.ready_color = [1, 165/255, 0, 1] + + ## Some global actions + def stop_all_running(self, except_key=None, key_start_time=0): + running = self.running + self.running = [r for r in running\ + if r[0] == except_key and r[1] == key_start_time] + for (key, start_time) in running: + if (key, start_time) != (except_key, key_start_time): + key.interrupt() + + # Master volume methods + @property + def master_gain(self): + return gain(self.master_volume) + + def set_master_volume(self, value, delta=False, fade=0): + [db_gain, self.master_volume] = gain( + value + int(delta) * self.master_volume, + self.master_volume) + + for music in self.open_files.values(): + music.set_gain_with_effect(db_gain, fade=fade) + + # Wait handler methods + def add_wait_id(self, wait_id, action_or_wait): + self.wait_ids[wait_id] = action_or_wait + + def interrupt_wait(self, wait_id): + if wait_id in self.wait_ids: + action_or_wait = self.wait_ids[wait_id] + del(self.wait_ids[wait_id]) + if isinstance(action_or_wait, Action): + action_or_wait.interrupt() + else: + action_or_wait.set() + + # Methods to control running keys + def start_running(self, key, start_time): + self.running.append((key, start_time)) + + def keep_running(self, key, start_time): + return (key, start_time) in self.running + + def finished_running(self, key, start_time): + if (key, start_time) in self.running: + self.running.remove((key, start_time)) + + # YML config parser + def parse_config(self): + def update_alias(prop_hash, aliases, key): + if isinstance(aliases[key], dict): + prop_hash.update(aliases[key], **prop_hash) + else: + warn_print("Alias {} is not a hash, ignored".format(key)) + + def include_aliases(prop_hash, aliases): + if 'include' not in prop_hash: + return + + included = prop_hash['include'] + del(prop_hash['include']) + if isinstance(included, str): + update_alias(prop_hash, aliases, included) + elif isinstance(included, list): + for included_ in included: + if isinstance(included_, str): + update_alias(prop_hash, aliases, included_) + else: + warn_print("Unkown alias include type, ignored: " + "{} in {}".format(included_, included)) + else: + warn_print("Unkown alias include type, ignored: {}" + .format(included)) + + def check_key_property(key_property, key): + if 'description' in key_property: + desc = key_property['description'] + if not isinstance(desc, list): + warn_print("description in key_property '{}' is not " + "a list, ignored".format(key)) + del(key_property['description']) + if 'color' in key_property: + color = key_property['color'] + if not isinstance(color, list)\ + or len(color) != 3\ + or not all(isinstance(item, int) for item in color)\ + or any(item < 0 or item > 255 for item in color): + warn_print("color in key_property '{}' is not " + "a list of 3 valid integers, ignored".format(key)) + del(key_property['color']) + + def check_key_properties(config): + if 'key_properties' in config: + if isinstance(config['key_properties'], dict): + return config['key_properties'] + else: + warn_print("key_properties config is not a hash, ignored") + return {} + else: + return {} + + def check_mapped_keys(config): + if 'keys' in config: + if isinstance(config['keys'], dict): + return config['keys'] + else: + warn_print("keys config is not a hash, ignored") + return {} + else: + return {} + + def check_mapped_key(mapped_keys, key): + if not isinstance(mapped_keys[key], list): + warn_print("key config '{}' is not an array, ignored" + .format(key)) + return [] + else: + return mapped_keys[key] + + def check_music_property(music_property, filename): + if not isinstance(music_property, dict): + warn_print("music_property config '{}' is not a hash, ignored" + .format(filename)) + return {} + if 'name' in music_property: + music_property['name'] = str(music_property['name']) + if 'gain' in music_property: + try: + music_property['gain'] = float(music_property['gain']) + except ValueError as e: + del(music_property['gain']) + warn_print("gain for music_property '{}' is not " + "a float, ignored".format(filename)) + return music_property + + stream = open(Config.yml_file, "r") + try: + config = yaml.safe_load(stream) + except Exception as e: + error_print("Error while loading config file: {}".format(e)) + sys.exit() + stream.close() + + if not isinstance(config, dict): + raise Exception("Top level config is supposed to be a hash") + + if 'aliases' in config and isinstance(config['aliases'], dict): + aliases = config['aliases'] + else: + aliases = defaultdict(dict) + if 'aliases' in config: + warn_print("aliases config is not a hash, ignored") + + music_properties = defaultdict(dict) + if 'music_properties' in config and\ + isinstance(config['music_properties'], dict): + music_properties.update(config['music_properties']) + elif 'music_properties' in config: + warn_print("music_properties config is not a hash, ignored") + + seen_files = {} + + key_properties = defaultdict(lambda: { + "actions": [], + "properties": {}, + "files": [] + }) + + for key in check_key_properties(config): + key_prop = config['key_properties'][key] + + if not isinstance(key_prop, dict): + warn_print("key_property '{}' is not a hash, ignored" + .format(key)) + continue + + include_aliases(key_prop, aliases) + check_key_property(key_prop, key) + + key_properties[key]["properties"] = key_prop + + for mapped_key in check_mapped_keys(config): + for index, action in enumerate(check_mapped_key( + config['keys'], mapped_key)): + if not isinstance(action, dict) or\ + not len(action) == 1 or\ + not isinstance(list(action.values())[0] or {}, dict): + warn_print("action number {} of key '{}' is invalid, " + "ignored".format(index + 1, mapped_key)) + continue + + action_name = list(action)[0] + action_args = {} + if action[action_name] is None: + action[action_name] = {} + + include_aliases(action[action_name], aliases) + + for argument in action[action_name]: + if argument == 'file': + filename = str(action[action_name]['file']) + if filename not in seen_files: + music_property = check_music_property( + music_properties[filename], + filename) + + if filename in self.open_files: + self.open_files[filename]\ + .reload_properties(**music_property) + + seen_files[filename] =\ + self.open_files[filename] + else: + seen_files[filename] = MusicFile( + filename, self, **music_property) + + if filename not in key_properties[mapped_key]['files']: + key_properties[mapped_key]['files'] \ + .append(seen_files[filename]) + + action_args['music'] = seen_files[filename] + else: + action_args[argument] = action[action_name][argument] + + key_properties[mapped_key]['actions'] \ + .append([action_name, action_args]) + + return (key_properties, seen_files) + + diff --git a/music_sampler/mixer.py b/music_sampler/mixer.py new file mode 100644 index 0000000..9242b61 --- /dev/null +++ b/music_sampler/mixer.py @@ -0,0 +1,63 @@ +import sounddevice as sd +import audioop +import time + +from . import Config + +sample_width = Config.sample_width + +def sample_width_to_dtype(sample_width): + if sample_width == 1 or sample_width == 2 or sample_width == 4: + return 'int' + str(8*sample_width) + else: + raise "Unknown sample width" + +def _latency(latency): + if latency == "high" or latency == "low": + return latency + else: + return float(latency) + +class Mixer: + def __init__(self): + self.stream = sd.RawOutputStream( + samplerate=Config.frame_rate, + channels=Config.channels, + dtype=sample_width_to_dtype(Config.sample_width), + latency=_latency(Config.latency), + blocksize=Config.blocksize, + callback=self.play_callback) + self.open_files = [] + + def add_file(self, music_file): + if music_file not in self.open_files: + self.open_files.append(music_file) + self.start() + + def remove_file(self, music_file): + if music_file in self.open_files: + self.open_files.remove(music_file) + if len(self.open_files) == 0: + self.stop() + + def stop(self): + self.stream.stop() + + def start(self): + self.stream.start() + + def play_callback(self, out_data, frame_count, time_info, status_flags): + out_data_length = len(out_data) + empty_data = b"\0" * out_data_length + data = b"\0" * out_data_length + + for open_file in self.open_files: + file_data = open_file.play_callback(out_data_length, frame_count) + + if data == empty_data: + data = file_data + elif file_data != empty_data: + data = audioop.add(data, file_data, sample_width) + + out_data[:] = data + diff --git a/music_sampler/music_effect.py b/music_sampler/music_effect.py new file mode 100644 index 0000000..4bdbb26 --- /dev/null +++ b/music_sampler/music_effect.py @@ -0,0 +1,62 @@ +class GainEffect: + effect_types = [ + 'fade' + ] + + def __init__(self, effect, audio_segment, initial_loop, start, end, + **kwargs): + if effect in self.effect_types: + self.effect = effect + else: + raise Exception("Unknown effect {}".format(effect)) + + self.start = start + self.end = end + self.audio_segment = audio_segment + self.initial_loop = initial_loop + getattr(self, self.effect + "_init")(**kwargs) + + def get_last_gain(self): + return getattr(self, self.effect + "_get_last_gain")() + + def get_next_gain(self, current_frame, current_loop, frame_count): + # This returns two values: + # - The first one is the gain to apply on that frame + # - The last one is True or False depending on whether it is the last + # call to the function and the last gain should be saved permanently + return getattr(self, self.effect + "_get_next_gain")( + current_frame, + current_loop, + frame_count) + + # Fading + def fade_init(self, gain=0, **kwargs): + self.audio_segment_frame_count = self.audio_segment.frame_count() + self.first_frame = int( + self.audio_segment_frame_count * self.initial_loop +\ + self.audio_segment.frame_rate * self.start) + self.last_frame = int( + self.audio_segment_frame_count * self.initial_loop +\ + self.audio_segment.frame_rate * self.end) + self.gain= gain + + def fade_get_last_gain(self): + return self.gain + + def fade_get_next_gain(self, current_frame, current_loop, frame_count): + current_frame = current_frame \ + + (current_loop - self.initial_loop) \ + * self.audio_segment_frame_count + + if current_frame >= self.last_frame: + return [self.gain, True] + elif current_frame < self.first_frame: + return [0, False] + else: + return [ + (current_frame - self.first_frame) / \ + (self.last_frame - self.first_frame) * self.gain, + False + ] + + diff --git a/music_sampler/music_file.py b/music_sampler/music_file.py new file mode 100644 index 0000000..2d3ba72 --- /dev/null +++ b/music_sampler/music_file.py @@ -0,0 +1,378 @@ +import threading +import pydub +import time +from transitions.extensions import HierarchicalMachine as Machine + +import os.path + +import audioop + +from .lock import Lock +from . import Config, gain, debug_print, error_print +from .mixer import Mixer +from .music_effect import GainEffect + +file_lock = Lock("file") + +class MusicFile: + STATES = [ + 'initial', + 'loading', + 'failed', + { + 'name': 'loaded', + 'children': [ + 'playing', + 'paused', + 'stopping' + ] + } + ] + TRANSITIONS = [ + { + 'trigger': 'load', + 'source': 'initial', + 'dest': 'loading', + 'after': 'poll_loaded' + }, + { + 'trigger': 'fail', + 'source': 'loading', + 'dest': 'failed' + }, + { + 'trigger': 'success', + 'source': 'loading', + 'dest': 'loaded' + }, + { + 'trigger': 'start_playing', + 'source': 'loaded', + 'dest': 'loaded_playing', + # if a child has no transitions, then it is bubbled to the parent, + # and we don't want that. Not useful in that machine precisely. + 'conditions': ['is_loaded'] + }, + { + 'trigger': 'pause', + 'source': 'loaded_playing', + 'dest': 'loaded_paused' + }, + { + 'trigger': 'unpause', + 'source': 'loaded_paused', + 'dest': 'loaded_playing' + }, + { + 'trigger': 'stop_playing', + 'source': ['loaded_playing','loaded_paused'], + 'dest': 'loaded_stopping' + }, + { + 'trigger': 'stopped', + 'source': '*', + 'dest': 'loaded', + 'before': 'trigger_stopped_events', + 'conditions': ['is_in_use'] + } + ] + + def __init__(self, filename, mapping, name=None, gain=1): + Machine(model=self, states=self.STATES, + transitions=self.TRANSITIONS, initial='initial', + ignore_invalid_triggers=True) + + self.loaded_callbacks = [] + self.mapping = mapping + self.filename = filename + self.name = name or filename + self.audio_segment = None + self.initial_volume_factor = gain + self.music_lock = Lock("music__" + filename) + + threading.Thread(name="MSMusicLoad", target=self.load).start() + + def reload_properties(self, name=None, gain=1): + self.name = name or self.filename + if gain != self.initial_volume_factor: + self.initial_volume_factor = gain + self.reload_music_file() + + def reload_music_file(self): + with file_lock: + try: + if self.filename.startswith("/"): + filename = self.filename + else: + filename = Config.music_path + self.filename + + debug_print("Reloading « {} »".format(self.name)) + initial_db_gain = gain(self.initial_volume_factor * 100) + self.audio_segment = pydub.AudioSegment \ + .from_file(filename) \ + .set_frame_rate(Config.frame_rate) \ + .set_channels(Config.channels) \ + .set_sample_width(Config.sample_width) \ + .apply_gain(initial_db_gain) + except Exception as e: + error_print("failed to reload « {} »: {}"\ + .format(self.name, e)) + self.loading_error = e + self.to_failed() + else: + debug_print("Reloaded « {} »".format(self.name)) + + # Machine related events + def on_enter_loading(self): + with file_lock: + try: + if self.filename.startswith("/"): + filename = self.filename + else: + filename = Config.music_path + self.filename + + debug_print("Loading « {} »".format(self.name)) + self.mixer = self.mapping.mixer or Mixer() + initial_db_gain = gain(self.initial_volume_factor * 100) + self.audio_segment = pydub.AudioSegment \ + .from_file(filename) \ + .set_frame_rate(Config.frame_rate) \ + .set_channels(Config.channels) \ + .set_sample_width(Config.sample_width) \ + .apply_gain(initial_db_gain) + self.sound_duration = self.audio_segment.duration_seconds + except Exception as e: + error_print("failed to load « {} »: {}".format(self.name, e)) + self.loading_error = e + self.fail() + else: + self.success() + debug_print("Loaded « {} »".format(self.name)) + + def on_enter_loaded(self): + self.cleanup() + + def cleanup(self): + self.gain_effects = [] + self.set_gain(0, absolute=True) + self.current_audio_segment = None + self.volume = 100 + self.wait_event = threading.Event() + self.current_loop = 0 + + def on_enter_loaded_playing(self): + self.mixer.add_file(self) + + # Machine related states + def is_in_use(self): + return self.is_loaded(allow_substates=True) and not self.is_loaded() + + def is_in_use_not_stopping(self): + return self.is_loaded_playing() or self.is_loaded_paused() + + # Machine related triggers + def trigger_stopped_events(self): + self.mixer.remove_file(self) + self.wait_event.set() + self.cleanup() + + # Actions and properties called externally + @property + def sound_position(self): + if self.is_in_use(): + return self.current_frame / self.current_audio_segment.frame_rate + else: + return 0 + + def play(self, fade_in=0, volume=100, loop=0, start_at=0): + self.set_gain(gain(volume) + self.mapping.master_gain, absolute=True) + self.volume = volume + if loop < 0: + self.last_loop = float('inf') + else: + self.last_loop = loop + + with self.music_lock: + self.current_audio_segment = self.audio_segment + self.current_frame = int(start_at * self.audio_segment.frame_rate) + + self.start_playing() + + if fade_in > 0: + db_gain = gain(self.volume, 0)[0] + self.set_gain(-db_gain) + self.add_fade_effect(db_gain, fade_in) + + def seek(self, value=0, delta=False): + if not self.is_in_use_not_stopping(): + return + + with self.music_lock: + self.abandon_all_effects() + if delta: + frame_count = int(self.audio_segment.frame_count()) + frame_diff = int(value * self.audio_segment.frame_rate) + self.current_frame += frame_diff + while self.current_frame < 0: + self.current_loop -= 1 + self.current_frame += frame_count + while self.current_frame > frame_count: + self.current_loop += 1 + self.current_frame -= frame_count + if self.current_loop < 0: + self.current_loop = 0 + self.current_frame = 0 + if self.current_loop > self.last_loop: + self.current_loop = self.last_loop + self.current_frame = frame_count + else: + self.current_frame = max( + 0, + int(value * self.audio_segment.frame_rate)) + + def stop(self, fade_out=0, wait=False, set_wait_id=None): + if self.is_loaded_playing(): + ms = int(self.sound_position * 1000) + ms_fo = max(1, int(fade_out * 1000)) + + new_audio_segment = self.current_audio_segment[: ms+ms_fo] \ + .fade_out(ms_fo) + with self.music_lock: + self.current_audio_segment = new_audio_segment + self.stop_playing() + if wait: + if set_wait_id is not None: + self.mapping.add_wait_id(set_wait_id, self.wait_event) + self.wait_end() + else: + self.stopped() + + def abandon_all_effects(self): + db_gain = 0 + for gain_effect in self.gain_effects: + db_gain += gain_effect.get_last_gain() + + self.gain_effects = [] + self.set_gain(db_gain) + + def set_volume(self, value, delta=False, fade=0): + [db_gain, self.volume] = gain( + value + int(delta) * self.volume, + self.volume) + + self.set_gain_with_effect(db_gain, fade=fade) + + def set_gain_with_effect(self, db_gain, fade=0): + if not self.is_in_use(): + return + + if fade > 0: + self.add_fade_effect(db_gain, fade) + else: + self.set_gain(db_gain) + + def wait_end(self): + self.wait_event.clear() + self.wait_event.wait() + + # Let other subscribe for an event when they are ready + def subscribe_loaded(self, callback): + # FIXME: should lock to be sure we have no race, but it makes the + # initialization screen not showing until everything is loaded + if self.is_loaded(allow_substates=True): + callback(True) + elif self.is_failed(): + callback(False) + else: + self.loaded_callbacks.append(callback) + + def poll_loaded(self): + for callback in self.loaded_callbacks: + callback(self.is_loaded()) + self.loaded_callbacks = [] + + # Callbacks + def finished_callback(self): + self.stopped() + + def play_callback(self, out_data_length, frame_count): + if self.is_loaded_paused(): + return b'\0' * out_data_length + + with self.music_lock: + [data, nb_frames] = self.get_next_sample(frame_count) + if nb_frames < frame_count: + if self.is_loaded_playing() and\ + self.current_loop < self.last_loop: + self.current_loop += 1 + self.current_frame = 0 + [new_data, new_nb_frames] = self.get_next_sample( + frame_count - nb_frames) + data += new_data + nb_frames += new_nb_frames + elif nb_frames == 0: + # FIXME: too slow when mixing multiple streams + threading.Thread( + name="MSFinishedCallback", + target=self.finished_callback).start() + + return data.ljust(out_data_length, b'\0') + + # Helpers + def set_gain(self, db_gain, absolute=False): + if absolute: + self.db_gain = db_gain + else: + self.db_gain += db_gain + + def get_next_sample(self, frame_count): + fw = self.audio_segment.frame_width + + data = b"" + nb_frames = 0 + + segment = self.current_audio_segment + max_val = int(segment.frame_count()) + + start_i = max(self.current_frame, 0) + end_i = min(self.current_frame + frame_count, max_val) + data += segment._data[start_i*fw : end_i*fw] + nb_frames += end_i - start_i + self.current_frame += end_i - start_i + + volume_factor = self.volume_factor(self.effects_next_gain(nb_frames)) + + data = audioop.mul(data, Config.sample_width, volume_factor) + + return [data, nb_frames] + + def add_fade_effect(self, db_gain, fade_duration): + if not self.is_in_use(): + return + + self.gain_effects.append(GainEffect( + "fade", + self.current_audio_segment, + self.current_loop, + self.sound_position, + self.sound_position + fade_duration, + gain=db_gain)) + + def effects_next_gain(self, frame_count): + db_gain = 0 + for gain_effect in self.gain_effects: + [new_gain, last_gain] = gain_effect.get_next_gain( + self.current_frame, + self.current_loop, + frame_count) + if last_gain: + self.set_gain(new_gain) + self.gain_effects.remove(gain_effect) + else: + db_gain += new_gain + return db_gain + + + def volume_factor(self, additional_gain=0): + return 10 ** ( (self.db_gain + additional_gain) / 20) + diff --git a/music_sampler/sysfont.py b/music_sampler/sysfont.py new file mode 100644 index 0000000..f47693e --- /dev/null +++ b/music_sampler/sysfont.py @@ -0,0 +1,224 @@ +# This file was imported from +# https://bitbucket.org/marcusva/python-utils/overview +# And slightly adapted + +"""OS-specific font detection.""" +import os +import sys +from subprocess import Popen, PIPE + +if sys.platform in ("win32", "cli"): + import winreg + +__all__ = ["STYLE_NORMAL", "STYLE_BOLD", "STYLE_ITALIC", "STYLE_LIGHT", + "init", "list_fonts", "get_fonts", "get_font" + ] + +# Font cache entries: +# { family : [..., +# (name, styles, fonttype, filename) +# ... +# ] +# } +__FONTCACHE = None + + +STYLE_NONE = 0x00 +STYLE_NORMAL = 0x01 +STYLE_BOLD = 0x02 +STYLE_ITALIC = 0x04 +STYLE_LIGHT = 0x08 +STYLE_MEDIUM = 0x10 + +def _add_font(family, name, styles, fonttype, filename): + """Adds a font to the internal font cache.""" + global __FONTCACHE + + if family not in __FONTCACHE: + __FONTCACHE[family] = [] + __FONTCACHE[family].append((name, styles, fonttype, filename)) + + +def _cache_fonts_win32(): + """Caches fonts on a Win32 platform.""" + key = "SOFTWARE\\Microsoft\\Windows NT\\CurrentVersion\\Fonts" + regfonts = [] + try: + with winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, key) as fontkey: + idx = 0 + enumval = winreg.EnumValue + rappend = regfonts.append + while True: + rappend(enumval(fontkey, idx)[:2]) + idx += 1 + except WindowsError: + pass + + # TODO: integrate alias handling for fonts within the registry. + # TODO: Scan and index fonts from %SystemRoot%\\Fonts that are not in the + # registry + + # Received all fonts from the registry. + for name, filename in regfonts: + fonttype = os.path.splitext(filename)[1][1:].lower() + if name.endswith("(TrueType)"): + name = name[:-10].strip() + if name.endswith("(All Res)"): + name = name[:-9].strip() + style = STYLE_NORMAL + if name.find(" Bold") >= 0: + style |= STYLE_BOLD + if name.find(" Italic") >= 0 or name.find(" Oblique") >= 0: + style |= STYLE_ITALIC + + family = name + for rm in ("Bold", "Italic", "Oblique"): + family = family.replace(rm, "") + family = family.lower().strip() + + fontpath = os.environ.get("SystemRoot", "C:\\Windows") + fontpath = os.path.join(fontpath, "Fonts") + if filename.find("\\") == -1: + # No path delimiter is given; we assume it to be a font in + # %SystemRoot%\Fonts + filename = os.path.join(fontpath, filename) + _add_font(family, name, style, fonttype, filename) + + +def _cache_fonts_darwin(): + """Caches fonts on Mac OS.""" + raise NotImplementedError("Mac OS X support is not given yet") + + +def _cache_fonts_fontconfig(): + """Caches font on POSIX-alike platforms.""" + try: + command = "fc-list : file family style fullname fullnamelang" + proc = Popen(command, stdout=PIPE, shell=True, stderr=PIPE) + pout = proc.communicate()[0] + output = pout.decode("utf-8") + except OSError: + return + + for entry in output.split(os.linesep): + if entry.strip() == "": + continue + values = entry.split(":") + filename = values[0] + + # get the font type + fname, fonttype = os.path.splitext(filename) + if fonttype == ".gz": + fonttype = os.path.splitext(fname)[1][1:].lower() + else: + fonttype = fonttype.lstrip(".").lower() + + # get the font name + name = None + if len(values) > 3: + fullnames, fullnamelangs = values[3:] + langs = fullnamelangs.split(",") + try: + offset = langs.index("fullnamelang=en") + except ValueError: + offset = -1 + if offset == -1: + try: + offset = langs.index("en") + except ValueError: + offset = -1 + if offset != -1: + # got an english name, use that one + name = fullnames.split(",")[offset] + if name.startswith("fullname="): + name = name[9:] + if name is None: + if fname.endswith(".pcf") or fname.endswith(".bdf"): + name = os.path.basename(fname[:-4]) + else: + name = os.path.basename(fname) + name = name.lower() + + # family and styles + family = values[1].strip().lower() + stylevals = values[2].strip() + style = STYLE_NONE + + if stylevals.find("Bold") >= 0: + style |= STYLE_BOLD + if stylevals.find("Light") >= 0: + style |= STYLE_LIGHT + if stylevals.find("Italic") >= 0 or stylevals.find("Oblique") >= 0: + style |= STYLE_ITALIC + if stylevals.find("Medium") >= 0: + style |= STYLE_MEDIUM + if style == STYLE_NONE: + style = STYLE_NORMAL + _add_font(family, name, style, fonttype, filename) + + +def init(): + """Initialises the internal font cache. + + It does not need to be called explicitly. + """ + global __FONTCACHE + if __FONTCACHE is not None: + return + __FONTCACHE = {} + if sys.platform in ("win32", "cli"): + _cache_fonts_win32() + elif sys.platform == "darwin": + _cache_fonts_darwin() + else: + _cache_fonts_fontconfig() + + +def list_fonts(): + """Returns an iterator over the cached fonts.""" + if __FONTCACHE is None: + init() + if len(__FONTCACHE) == 0: + yield None + for family, entries in __FONTCACHE.items(): + for fname, styles, fonttype, filename in entries: + yield (family, fname, styles, fonttype, filename) + + +def get_fonts(name, style=STYLE_NONE, ftype=None): + """Retrieves all fonts matching the given family or font name.""" + if __FONTCACHE is None: + init() + if len(__FONTCACHE) == 0: + return None + + results = [] + rappend = results.append + + name = name.lower() + if ftype: + ftype = ftype.lower() + + fonts = __FONTCACHE.get(name, []) + for fname, fstyles, fonttype, filename in fonts: + if ftype and fonttype != ftype: + # ignore font filetype mismatches + continue + if style == STYLE_NONE or fstyles == style: + rappend((name, fname, fstyles, fonttype, filename)) + + for family, fonts in __FONTCACHE.items(): + for fname, fstyles, fonttype, filename in fonts: + if fname.lower() == name and filename not in results: + rappend((family, fname, fstyles, fonttype, filename)) + return results + + +def get_font(name, style=STYLE_NONE, ftype=None): + """Retrieves the best matching font file for the given name and + criteria. + """ + retvals = get_fonts(name, style, ftype) + if len(retvals) > 0: + return retvals[0] + return None -- cgit v1.2.3