#!/usr/bin/python3 import socket import os import subprocess from threading import Thread import curses import asyncio import click import pickle import re import webview # function generate fibonacci sequence def fibonacci(n): if n == 0: return 0 elif n == 1: return 1 else: return fibonacci(n-1) + fibonacci(n-2) async def scan_for_pi() -> dict(): ip = socket.gethostbyname(socket.gethostname()) baseIP = '.'.join(ip.split(".")[:3]) def scan(ip: str) -> str: try: with socket.socket() as s: s.settimeout(0.5) s.connect((ip, 5000)) s.send("PING".encode()) data = s.recv(1024).decode() if data == "PONG": print("found:", ip) return ip except OSError: pass async def scan_async(ip: str) -> str: return await asyncio.to_thread(scan, ip) pi_list = await asyncio.gather(*[scan_async(baseIP + "." + str(i)) for i in range(255)]) return [pi for pi in pi_list if pi is not None] class PI: def __init__(self, ip, port=5000): self.ip = ip self.port = port self.connected = False self.socket = socket.socket() def __str__(self): return self.ip def connect(self): self.socket.connect((self.ip, self.port)) def send(self, data): self.socket.send(data.encode()) def disconnect(self): self.socket.close() def load_config(path=os.path.expanduser("~/.config/lc/lc.conf")): try: with open(path, 'rb') as f: return [PI(ip) for ip in pickle.load(f)] except FileNotFoundError: print("Config does not exist") exit() def save_config(obj, path=os.path.expanduser("~/.config/lc/lc.conf")): os.makedirs(path[::-1].split('/',1)[-1][::-1], exist_ok=True) with open(path, 'wb') as f: pickle.dump(list([o.ip for o in obj]), f) def set_pixels(color): if not re.match("[0-9a-f]{6}$", color): print(f"{color} not a valid hex color code") return pi_list = load_config() for pi in pi_list: pi.connect() pi.send(color) pi.disconnect() @click.command() @click.argument("arg", nargs=-1) # @click.option("-s", help="Set HEX Color: -s ffffff") # @click.option("-v", help="Set HEX Color as base visualizer color: -v ffffff") def main(arg): if arg == (): pass match arg[0]: case "help": print("lc [help|set|search|list]") case "set": if len(arg) < 2: print("color argument missing") set_pixels(arg[1]) case "search": ip_list = asyncio.run(scan_for_pi()) pi_list = [PI(ip) for ip in ip_list] save_config(pi_list) case "list": pi_list = load_config() if pi_list is not None: for pi in pi_list: print(pi) case "music": pass if __name__ == '__main__': main() # def helpmenu(): # print("light controll\n") # print("Options:") # print("-h show help") # print("-s set static color") # print("-v visualizer") # print("-i interactive interface") # print("-a ambient light") # print("-t test function (debug)") # def base_color(color): # return [i // min(color) for i in color] # def visualizer(color, amp_strength=0.6): # r,g,b = hex_to_rgb(color) # cava = subprocess.Popen(["cava", "-p", "/etc/lc/cava.conf"], stdout=subprocess.PIPE) # sed = subprocess.Popen(["sed", "-u", "s/;.*;$//"], stdin=cava.stdout, stdout=subprocess.PIPE) # for line in sed.stdout: # amp_factor = amp_strength * ((int(line) / 500) - 1) + 1 # send(rgb_to_hex(int(r * amp_factor), int(g * amp_factor), int(b * amp_factor))) # cava.stdout.close() # sed.stdout.close() # def visualizer_cava_thread(): # global volume_amp # cava = subprocess.Popen(["cava", "-p", "/etc/lc/cava.conf"], stdout=subprocess.PIPE) # sed = subprocess.Popen(["sed", "-u", "s/;.*;$//"], stdin=cava.stdout, stdout=subprocess.PIPE) # for line in sed.stdout: # volume_amp = int(line) # print(volume_amp) # cava.stdout.close() # sed.stdout.close() # def amp_by_vol(color, amp_strength): # global volume_amp # # amp_strength in percentage # amp_factor = amp_strength*((volume_amp/500)-1)+1 # #print(amp_factor, color,[c*amp_factor for c in color], volume_amp) # return [c*amp_factor for c in color] # def vibrant(r,g,b): # intensity = 50 # usabel range 1-100 max:1000 # intensity = 1+intensity/1000 # rgb = [r,g,b] # #min_idx = rgb.index(min(rgb)) # d = (r+g+b)/3 # for c in range(3): # if rgb[c] < d: # rgb[c] = int(rgb[c]*(intensity**(rgb[c]-d))) # elif rgb[c] > d: # rgb[c] = int(rgb[c]*(-intensity**(-rgb[c]+d)+2)) # if rgb[c] > 255: # rgb[c] = 255 # #rgb[min_idx] = int(rgb[min_idx]*(rgb[min_idx]/d)**2) # return rgb # def ambient_light_thread(): # r,g,b = 0,0,0 # brighness = 1 # active_color = '' # while True: # # P-Regler # r,g,b = [w+((y-w)*0.1) for y,w in zip((_r,_g,_b),(r,g,b))] # if ((round(r),round(g),round(b)) == (_r,_g,_b)): # active_color = '\033[0;32;40m' # else: # active_color = '\033[0;31;40m' # r_out,g_out,b_out = amp_by_vol((r,g,b), 0.6) # #r_out,g_out,b_out = r,g,b # print(active_color, round(r),round(g),round(b), round(r_out),round(g_out),round(b_out), '\033[0;37;40m') # send(rgb_to_hex(int(r_out*brighness),int(g_out*brighness),int(b_out*brighness))) # time.sleep(0.01) # ups_counter = 0 # start_time = time.time() # def ups(): # global ups_counter # global start_time # ups_counter += 1 # time_d = time.time()-start_time # ups = ups_counter/time_d # print(ups) # def color_correction(r,g,b): # amp = [1,1,0.8] # threshold = 10 # if r < threshold and g < threshold and b < threshold: # return 0,0,0 # return int(r*amp[0]), int(g*amp[1]), int(b*amp[2]) # def ambient_light(): # t1 = Thread(target=ambient_light_thread) # t1.start() # t2 = Thread(target=visualizer_cava_thread) # t2.start() # global _r,_g,_b # counter = 0 # start_time = time.time() # while True: # # screenshot # # Xorg # img = pyscreenshot.grab(backend="mss", childprocess=False, bbox=(1920,0,4480,1440)) # #Wayland # #time.sleep(0.1) # #cap = cv2.VideoCapture('/tmp/a') # #count = cap.get(cv2.CAP_PROP_FRAME_COUNT) # #cap.set(cv2.CAP_PROP_POS_FRAMES, count-1) # #ret, frame = cap.read() # #frame = cv2.cvtColor(frame,cv2.COLOR_BGR2RGB) # #img = Image.fromarray(frame) # #cap.release() # # find dominant color # img.thumbnail((2,2)) # r,g,b = img.getpixel((0, 0)) # r,g,b = vibrant(r,g,b) # _r,_g,_b = color_correction(r,g,b) # time.sleep(0.05) # def rgb_to_hex(r,g,b): # return "%02x%02x%02x" % (r,g,b) # def hex_to_rgb(hex): # r = int(hex[0:2],16) # g = int(hex[2:4],16) # b = int(hex[4:6],16) # return r,g,b # def test(): # for i in range(256): # h = rgb_to_hex(0,i,0) # send(h) # print(h) # time.sleep(0.0) # def tui_main(scr, *args): # # -- Perform an action with Screen -- # scr.border(0) # scr.addstr(5, 5, 'Hello from Curses!', curses.A_BOLD) # scr.addstr(6, 5, 'Press q to close this screen', curses.A_NORMAL) # scr.addstr(8, 5, '\u250C') # rgb = [0,0,0] # color_selector = 0 # while True: # status = '{},{},{} {}'.format(rgb[0], rgb[1], rgb[2], color_selector) # scr.addstr(1, 1, status) # ch = scr.getch() # if ch == ord('q'): # break # elif ch == ord('j'): # if rgb[color_selector] > 0: # rgb[color_selector] -= 1 # send(rgb_to_hex(rgb[0], rgb[1], rgb[2])) # elif ch == ord('k'): # if rgb[color_selector] < 255: # rgb[color_selector] += 1 # send(rgb_to_hex(rgb[0], rgb[1], rgb[2])) # elif ch == ord('l'): # if color_selector < 3: # color_selector += 1 # elif ch == ord('h'): # if color_selector > 0: # color_selector -= 1 # def main(argv): # if not sys.stdin.isatty(): # connect() # for volume in sys.stdin: # volume = int(volume) # hex_color = rgb_to_hex(volume,0,0) # send(hex_color) # sys.exit() # try: # opts, args = getopt.getopt(argv, "s:v:ahti") # except getopt.GetoptError: # print(sys.argv[0], "invalid option") # print("Try", sys.argv[0], "-h for help") # sys.exit(1) # for opt, arg in opts: # if opt == "-h": # helpmenu() # elif opt == "-s": # connect() # send(arg) # disconnect() # elif opt == "-a": # connect() # ambient_light() # disconnect() # elif opt == "-v": # connect() # visualizer(arg) # disconnect() # elif opt == "-t": # connect() # test() # disconnect() # elif opt == '-i': # connect() # curses.wrapper(tui_main) # disconnect() # sys.exit() # if __name__ == "__main__": # main(sys.argv[1:])