diff --git a/controller/Makefile b/controller/Makefile index 2547d27..8eb5e7c 100644 --- a/controller/Makefile +++ b/controller/Makefile @@ -1,3 +1,3 @@ install : - cp lc /bin/lc; mkdir -p ~/.config/lc; cp cava.conf ~/.config/lc/cava.conf; sudo chmod 666 /etc/lc/cava.conf - + cp controller.py /bin/lc; mkdir -p ~/.config/lc; cp cava.conf ~/.config/lc/cava.conf; sudo chmod 666 ~/.config/lc/cava.conf + diff --git a/controller/lc b/controller/lc deleted file mode 100755 index b258111..0000000 --- a/controller/lc +++ /dev/null @@ -1,334 +0,0 @@ -#!/usr/bin/python3 -import socket -import os -import subprocess -from threading import Thread -import curses -import asyncio -import click -import pickle -import re - -async def scan_for_pi() -> dict(): - ip = socket.gethostbyname(socket.gethostname()) - baseIP = '.'.join(ip.split(".")[:3]) - - def scan(ip: str) -> str: - try: - with socket.socket() as s: - s.settimeout(0.5) - s.connect((ip, 5000)) - s.send("PING".encode()) - data = s.recv(1024).decode() - if data == "PONG": - print("found:", ip) - return ip - except OSError: - pass - - async def scan_async(ip: str) -> str: - return await asyncio.to_thread(scan, ip) - - pi_list = await asyncio.gather(*[scan_async(baseIP + "." + str(i)) for i in range(255)]) - return [pi for pi in pi_list if pi is not None] - -class PI: - def __init__(self, ip, port=5000): - self.ip = ip - self.port = port - self.connected = False - self.socket = socket.socket() - - def __str__(self): - return self.ip - - def connect(self): - self.socket.connect((self.ip, self.port)) - - def send(self, data): - self.socket.send(data.encode()) - - def disconnect(self): - self.socket.close() - -def load_config(path=os.path.expanduser("~/.config/lc/lc.conf")): - try: - with open(path, 'rb') as f: - return [PI(ip) for ip in pickle.load(f)] - except FileNotFoundError: - print("Config does not exist") - exit() - - -def save_config(obj, path=os.path.expanduser("~/.config/lc/lc.conf")): - os.makedirs(path[::-1].split('/',1)[-1][::-1], exist_ok=True) - with open(path, 'wb') as f: - pickle.dump(list([o.ip for o in obj]), f) - -@click.command() -@click.argument("arg", nargs=-1) -# @click.option("-s", help="Set HEX Color: -s ffffff") -# @click.option("-v", help="Set HEX Color as base visualizer color: -v ffffff") -def main(arg): - arg = ("help",) if arg == () else arg - match arg[0]: - case "help": - print("lc [help|set|search|list]") - - case "set": - if len(arg) < 2 or not re.match("[0-9a-f]{6}$", arg[1]): - print("color not a valid hex code") - exit() - pi_list = load_config() - for pi in pi_list: - print(pi) - pi.connect() - pi.send(arg[1]) - pi.disconnect() - - case "search": - ip_list = asyncio.run(scan_for_pi()) - pi_list = [PI(ip) for ip in ip_list] - save_config(pi_list) - - case "list": - pi_list = load_config() - if pi_list is not None: - for pi in pi_list: - print(pi) - case "music": - pass - - -if __name__ == '__main__': - main() -# def helpmenu(): - # print("light controll\n") - # print("Options:") - # print("-h show help") - # print("-s set static color") - # print("-v visualizer") - # print("-i interactive interface") - # print("-a ambient light") - # print("-t test function (debug)") - -# def base_color(color): - # return [i // min(color) for i in color] - -def visualizer(color, amp_strength=0.6): - r,g,b = hex_to_rgb(color) - - cava = subprocess.Popen(["cava", "-p", "/etc/lc/cava.conf"], stdout=subprocess.PIPE) - sed = subprocess.Popen(["sed", "-u", "s/;.*;$//"], stdin=cava.stdout, stdout=subprocess.PIPE) - - for line in sed.stdout: - amp_factor = amp_strength * ((int(line) / 500) - 1) + 1 - send(rgb_to_hex(int(r * amp_factor), int(g * amp_factor), int(b * amp_factor))) - cava.stdout.close() - sed.stdout.close() - -def visualizer_cava_thread(): - global volume_amp - - cava = subprocess.Popen(["cava", "-p", "/etc/lc/cava.conf"], stdout=subprocess.PIPE) - sed = subprocess.Popen(["sed", "-u", "s/;.*;$//"], stdin=cava.stdout, stdout=subprocess.PIPE) - - for line in sed.stdout: - volume_amp = int(line) - print(volume_amp) - cava.stdout.close() - sed.stdout.close() - -def amp_by_vol(color, amp_strength): - global volume_amp - # amp_strength in percentage - amp_factor = amp_strength*((volume_amp/500)-1)+1 - #print(amp_factor, color,[c*amp_factor for c in color], volume_amp) - return [c*amp_factor for c in color] - -def vibrant(r,g,b): - intensity = 50 # usabel range 1-100 max:1000 - - intensity = 1+intensity/1000 - rgb = [r,g,b] - #min_idx = rgb.index(min(rgb)) - d = (r+g+b)/3 - for c in range(3): - if rgb[c] < d: - rgb[c] = int(rgb[c]*(intensity**(rgb[c]-d))) - elif rgb[c] > d: - rgb[c] = int(rgb[c]*(-intensity**(-rgb[c]+d)+2)) - if rgb[c] > 255: - rgb[c] = 255 - #rgb[min_idx] = int(rgb[min_idx]*(rgb[min_idx]/d)**2) - return rgb - -def ambient_light_thread(): - r,g,b = 0,0,0 - brighness = 1 - active_color = '' - - while True: - # P-Regler - r,g,b = [w+((y-w)*0.1) for y,w in zip((_r,_g,_b),(r,g,b))] - if ((round(r),round(g),round(b)) == (_r,_g,_b)): - active_color = '\033[0;32;40m' - else: - active_color = '\033[0;31;40m' - r_out,g_out,b_out = amp_by_vol((r,g,b), 0.6) - #r_out,g_out,b_out = r,g,b - print(active_color, round(r),round(g),round(b), round(r_out),round(g_out),round(b_out), '\033[0;37;40m') - send(rgb_to_hex(int(r_out*brighness),int(g_out*brighness),int(b_out*brighness))) - time.sleep(0.01) - -ups_counter = 0 -start_time = time.time() - -def ups(): - global ups_counter - global start_time - - ups_counter += 1 - time_d = time.time()-start_time - ups = ups_counter/time_d - print(ups) - -def color_correction(r,g,b): - amp = [1,1,0.8] - threshold = 10 - if r < threshold and g < threshold and b < threshold: - return 0,0,0 - return int(r*amp[0]), int(g*amp[1]), int(b*amp[2]) - -def ambient_light(): - - t1 = Thread(target=ambient_light_thread) - t1.start() - - t2 = Thread(target=visualizer_cava_thread) - t2.start() - - global _r,_g,_b - - counter = 0 - start_time = time.time() - while True: - # screenshot - - # Xorg - img = pyscreenshot.grab(backend="mss", childprocess=False, bbox=(1920,0,4480,1440)) - - #Wayland - #time.sleep(0.1) - #cap = cv2.VideoCapture('/tmp/a') - #count = cap.get(cv2.CAP_PROP_FRAME_COUNT) - #cap.set(cv2.CAP_PROP_POS_FRAMES, count-1) - - #ret, frame = cap.read() - - #frame = cv2.cvtColor(frame,cv2.COLOR_BGR2RGB) - #img = Image.fromarray(frame) - #cap.release() - - # find dominant color - img.thumbnail((2,2)) - r,g,b = img.getpixel((0, 0)) - r,g,b = vibrant(r,g,b) - _r,_g,_b = color_correction(r,g,b) - - time.sleep(0.05) - - -def rgb_to_hex(r,g,b): - return "%02x%02x%02x" % (r,g,b) - -def hex_to_rgb(hex): - r = int(hex[0:2],16) - g = int(hex[2:4],16) - b = int(hex[4:6],16) - return r,g,b - -def test(): - for i in range(256): - h = rgb_to_hex(0,i,0) - send(h) - print(h) - time.sleep(0.0) - -def tui_main(scr, *args): - # -- Perform an action with Screen -- - scr.border(0) - scr.addstr(5, 5, 'Hello from Curses!', curses.A_BOLD) - scr.addstr(6, 5, 'Press q to close this screen', curses.A_NORMAL) - scr.addstr(8, 5, '\u250C') - - rgb = [0,0,0] - color_selector = 0 - - while True: - status = '{},{},{} {}'.format(rgb[0], rgb[1], rgb[2], color_selector) - scr.addstr(1, 1, status) - - ch = scr.getch() - if ch == ord('q'): - break - elif ch == ord('j'): - if rgb[color_selector] > 0: - rgb[color_selector] -= 1 - send(rgb_to_hex(rgb[0], rgb[1], rgb[2])) - elif ch == ord('k'): - if rgb[color_selector] < 255: - rgb[color_selector] += 1 - send(rgb_to_hex(rgb[0], rgb[1], rgb[2])) - elif ch == ord('l'): - if color_selector < 3: - color_selector += 1 - elif ch == ord('h'): - if color_selector > 0: - color_selector -= 1 - -def main(argv): - if not sys.stdin.isatty(): - connect() - for volume in sys.stdin: - volume = int(volume) - hex_color = rgb_to_hex(volume,0,0) - send(hex_color) - sys.exit() - - try: - opts, args = getopt.getopt(argv, "s:v:ahti") - except getopt.GetoptError: - print(sys.argv[0], "invalid option") - print("Try", sys.argv[0], "-h for help") - sys.exit(1) - - for opt, arg in opts: - if opt == "-h": - helpmenu() - elif opt == "-s": - connect() - send(arg) - disconnect() - elif opt == "-a": - connect() - ambient_light() - disconnect() - elif opt == "-v": - connect() - visualizer(arg) - disconnect() - elif opt == "-t": - connect() - test() - disconnect() - elif opt == '-i': - connect() - curses.wrapper(tui_main) - disconnect() - - sys.exit() - -if __name__ == "__main__": - - main(sys.argv[1:])