| @@ -1,11 +1,34 @@ | |||
| from pythonosc import udp_client | |||
| import curses | |||
| import random | |||
| import sys | |||
| import re | |||
| import time | |||
| ESCAPE = 27 | |||
| LETTERS = range(97, 122) | |||
| NUMBERS = range(48, 57) | |||
| LETTERS = range(97, 123) | |||
| NUMBERS = range(48, 58) | |||
| ALPHABET = "ABCDEFGHIJKLMNOPQRSTUVWXYZ" | |||
| SCALE = (60 + x for x in range(len(ALPHABET))) | |||
| FLOWER = """ | |||
| .-~~-.--. WELCOME TO | |||
| : ) CHEEKYFSM | |||
| .~ ~ -.\ /.- ~~ . | |||
| > `. .' < | |||
| ( .- -. ) | |||
| `- -.-~ `- -' ~-.- -' | |||
| ( : ) _ _ .-: | |||
| ~--. : .--~ .-~ .-~ } | |||
| ~-.-^-.-~ \_ .~ .-~ .~ | |||
| \ \' \ '_ _ -~ | |||
| `.`. // | |||
| . - ~ ~-.__`.`-.// | |||
| .-~ . - ~ }~ ~ ~-.~-. | |||
| .' .-~ .-~ :/~-.~-./: | |||
| /_~_ _ . - ~ ~-.~-._ | |||
| ~-.< | |||
| """ | |||
| class CheekyFSM(object): | |||
| @@ -14,37 +37,82 @@ class CheekyFSM(object): | |||
| self.buffer = "" | |||
| self.message = "" | |||
| self.edges = {} | |||
| self.durations = {} | |||
| self.state = "A" | |||
| self.history = " "*30 | |||
| self.history = " " * 30 | |||
| self.repeat = 0 | |||
| self.osc = udp_client.SimpleUDPClient("0.0.0.0", 5005) | |||
| self.bpm = 120 | |||
| self.last_update = time.time() | |||
| self.scale = dict(zip(ALPHABET, SCALE)) | |||
| def show_edges(self): | |||
| """ Print the help menu """ | |||
| y, x = self.w.getmaxyx() | |||
| self.w.addstr(10, 2, f"Transitions:", curses.A_DIM) | |||
| for index, ((start, end), prob) in enumerate(self.edges.items()): | |||
| self.w.addstr(10 + index, 2, f"{start} -> {end} : {prob}%") | |||
| self.w.addstr(11 + index, 2, f"{start} -> {end} : {prob}%") | |||
| def show_flower(self): | |||
| y, x = self.w.getmaxyx() | |||
| y -= 20 | |||
| x -= 45 | |||
| for index, line in enumerate(FLOWER.split("\n")): | |||
| self.w.addstr(y + index, x, line, curses.A_DIM) | |||
| def show_durations(self): | |||
| """ Print the help menu """ | |||
| y, x = self.w.getmaxyx() | |||
| self.w.addstr(10, 30, f"Durations:", curses.A_DIM) | |||
| for index, (key, dur) in enumerate(self.durations.items()): | |||
| self.w.addstr(11 + index, 30, f"{key} : {dur}♪") | |||
| def show_scale(self): | |||
| """ Print the scale """ | |||
| y, x = self.w.getmaxyx() | |||
| self.w.addstr(10, 30, f"Scale:", curses.A_DIM) | |||
| for index, (key, dur) in enumerate(self.scale.items()): | |||
| self.w.addstr(11 + index, 30, f"{key} : {dur}") | |||
| def update(self): | |||
| """ Update the FSM """ | |||
| self.repeat += 1 | |||
| if self.repeat < self.durations.get(self.state, 1): | |||
| self.osc.send_message(self.get_note(), 0) | |||
| self.history = self.history[1:] + "." | |||
| return | |||
| self.repeat = 0 | |||
| changed = False | |||
| edges = list(self.edges.items()) | |||
| random.shuffle(edges) | |||
| for ((start, end), prob) in edges: | |||
| if start == self.state: | |||
| if random.randint(0, 100) < prob: | |||
| val = random.randint(0, 99) | |||
| self.message = f"{start}, {end}, {val}, {prob}" | |||
| if val < prob: | |||
| self.state = end | |||
| changed = True | |||
| break | |||
| self.history = self.history[1:] + self.state | |||
| if changed is False: | |||
| self.history = self.history[1:] + self.state.lower() | |||
| else: | |||
| self.osc.send_message(self.get_note(), 80) | |||
| self.history = self.history[1:] + self.state | |||
| self.last_update = time.time() | |||
| def show_state(self): | |||
| """ Show the state of the machine """ | |||
| self.w.addstr(2, 2, f"Tempo: 120bpm") | |||
| self.w.addstr(2, 2, f"Tempo: {self.bpm} bpm", curses.A_DIM) | |||
| self.w.addstr(4, 2, f"State: {self.state}") | |||
| self.w.addstr(5, 2, f"History: {self.history}") | |||
| self.w.addstr(2, 30, f"{self.history}") | |||
| offset = 6 | |||
| offset = 5 | |||
| for ((start, end), prob) in self.edges.items(): | |||
| if start == self.state: | |||
| self.w.addstr(offset, 2, | |||
| f"Going to {end} with probability {prob}%") | |||
| self.w.addstr(offset, 2, f"-> {end} with {prob}%", | |||
| curses.A_DIM) | |||
| offset += 1 | |||
| def show_status(self): | |||
| @@ -52,8 +120,14 @@ class CheekyFSM(object): | |||
| y, x = self.w.getmaxyx() | |||
| self.w.addstr(y - 2, 2, f"> {self.buffer}", curses.A_BOLD) | |||
| def show_debug(self): | |||
| """ Print the status bar """ | |||
| y, x = self.w.getmaxyx() | |||
| self.w.addstr(y - 4, 2, f"{self.message}", curses.A_DIM) | |||
| def quit(self): | |||
| """ Quit the program """ | |||
| self.osc.send_message(self.get_note(), 0) | |||
| curses.nocbreak() | |||
| self.w.keypad(False) | |||
| curses.echo() | |||
| @@ -63,6 +137,7 @@ class CheekyFSM(object): | |||
| def process_key(self): | |||
| """ Process a key """ | |||
| key = self.w.getch() | |||
| self.lastkey = key | |||
| if key == ESCAPE: | |||
| quit(self.w) | |||
| @@ -74,42 +149,85 @@ class CheekyFSM(object): | |||
| self.accept() | |||
| elif key == 263 and len(self.buffer) > 0: | |||
| self.buffer = self.buffer[:-1] | |||
| elif key == curses.KEY_UP: | |||
| self.set_bpm(self.bpm + 10) | |||
| elif key == curses.KEY_DOWN: | |||
| self.set_bpm(self.bpm - 10) | |||
| def set_bpm(self, bpm): | |||
| """ Set the BPM """ | |||
| self.bpm = bpm | |||
| self.w.timeout(int(1000 * 60 / self.bpm)) | |||
| def accept(self): | |||
| """ Accept the string """ | |||
| # Match on edge | |||
| match = re.match(r"([A-Z])([A-Z])(\d+)", self.buffer) | |||
| if not match: | |||
| self.message = " Invalid command :(" | |||
| else: | |||
| if match: | |||
| groups = match.groups() | |||
| start = groups[0] | |||
| end = groups[1] | |||
| prob = int(groups[2]) | |||
| prob = max(0, min(100, prob)) | |||
| self.modify(start, end, prob) | |||
| self.buffer = "" | |||
| return | |||
| # Match on duration / scale | |||
| match = re.match(r"([A-Z])(\d+)", self.buffer) | |||
| if match: | |||
| groups = match.groups() | |||
| key = groups[0] | |||
| value = int(groups[1]) | |||
| if value > 0 and value < 1000: | |||
| self.scale[key] = value | |||
| self.buffer = "" | |||
| return | |||
| # Match on switch | |||
| match = re.match(r"([A-Z])", self.buffer) | |||
| if match: | |||
| groups = match.groups() | |||
| self.state = groups[0] | |||
| self.history = self.history[1:] + self.state | |||
| self.osc.send_message(self.get_note(), 80) | |||
| self.buffer = "" | |||
| return | |||
| self.buffer = "" | |||
| def get_note(self): | |||
| """ Get the note """ | |||
| note = self.scale.get(self.state, 60) | |||
| return f"i/vkb_midi/1/note/{note}" | |||
| def modify(self, start, end, prob): | |||
| """ Make a modification """ | |||
| key = (start, end) | |||
| self.edges[key] = prob | |||
| self.message = f" {start} -> {end} : {prob}%" | |||
| if prob == 0 and key in self.edges: | |||
| self.message = f" {start} -> {end} : deleted" | |||
| del self.edges[key] | |||
| def loop(self, w): | |||
| """ Run the loop """ | |||
| self.w = w | |||
| curses.curs_set(0) # invisible | |||
| w.timeout(100) | |||
| w.timeout(int(1000 * 60 / self.bpm)) | |||
| while True: | |||
| self.process_key() | |||
| w.clear() | |||
| self.show_edges() | |||
| self.show_status() | |||
| self.show_debug() | |||
| self.show_state() | |||
| # self.show_durations() | |||
| self.show_scale() | |||
| self.show_flower() | |||
| time.sleep(.01) | |||
| self.osc.send_message(self.get_note(), 0) | |||
| self.update() | |||
| w.refresh() | |||
| self.process_key() | |||
| # time.sleep(max(0, expect - (time.time() - t))) | |||
| if __name__ == "__main__": | |||