#!/usr/bin/python3 import socket import getopt import sys import subprocess import pyscreenshot #from PIL import Image, ImageDraw, ImageFont #import cv2 import numpy as np #from skimage import io from threading import Thread import time import io import time #from gi.repository import GLib #from pydbus import SessionBus import curses import toml s1 = socket.socket() s2 = socket.socket() server = ["192.168.188.64", "192.168.188.76"] def connect(port = 5000): s1.connect((server[0], port)) s2.connect((server[1], port)) def disconnect(): s1.close() # close the connection s2.close() def send(data): s1.send(data.encode()) # send message s2.send(data.encode()) # send message def helpmenu(): print("light controll\n") print("Options:") print("-h show help") print("-s set static color") print("-v visualizer") print("-i interactive interface") print("-a ambient light") print("-t test function (debug)") def base_color(color): return [i // min(color) for i in color] def visualizer(color, amp_strength=0.6): r,g,b = hex_to_rgb(color) cava = subprocess.Popen(["cava", "-p", "/etc/lc/cava.conf"], stdout=subprocess.PIPE) sed = subprocess.Popen(["sed", "-u", "s/;.*;$//"], stdin=cava.stdout, stdout=subprocess.PIPE) for line in sed.stdout: amp_factor = amp_strength*((int(line)/500)-1)+1 send(rgb_to_hex(int(r*amp_factor),int(g*amp_factor),int(b*amp_factor))) cava.stdout.close() sed.stdout.close() def visualizer_cava_thread(): global volume_amp cava = subprocess.Popen(["cava", "-p", "/etc/lc/cava.conf"], stdout=subprocess.PIPE) sed = subprocess.Popen(["sed", "-u", "s/;.*;$//"], stdin=cava.stdout, stdout=subprocess.PIPE) for line in sed.stdout: volume_amp = int(line) cava.stdout.close() sed.stdout.close() def amp_by_vol(color, amp_strength): global volume_amp # amp_strength in percentage amp_factor = amp_strength*((volume_amp/500)-1)+1 #print(amp_factor, color,[c*amp_factor for c in color], volume_amp) return [c*amp_factor for c in color] def vibrant(r,g,b): intensity = 50 # usabel range 1-100 max:1000 intensity = 1+intensity/1000 rgb = [r,g,b] #min_idx = rgb.index(min(rgb)) d = (r+g+b)/3 for c in range(3): if rgb[c] < d: rgb[c] = int(rgb[c]*(intensity**(rgb[c]-d))) elif rgb[c] > d: rgb[c] = int(rgb[c]*(-intensity**(-rgb[c]+d)+2)) if rgb[c] > 255: rgb[c] = 255 #rgb[min_idx] = int(rgb[min_idx]*(rgb[min_idx]/d)**2) return rgb def ambient_light_thread(): r,g,b = 0,0,0 brighness = 1 active_color = '' while True: # P-Regler r,g,b = [w+((y-w)*0.1) for y,w in zip((_r,_g,_b),(r,g,b))] if ((round(r),round(g),round(b)) == (_r,_g,_b)): active_color = '\033[0;32;40m' else: active_color = '\033[0;31;40m' r_out,g_out,b_out = amp_by_vol((r,g,b), 0.6) #r_out,g_out,b_out = r,g,b print(active_color, round(r),round(g),round(b), round(r_out),round(g_out),round(b_out), '\033[0;37;40m') send(rgb_to_hex(int(r_out*brighness),int(g_out*brighness),int(b_out*brighness))) time.sleep(0.01) ups_counter = 0 start_time = time.time() def ups(): global ups_counter global start_time ups_counter += 1 time_d = time.time()-start_time ups = ups_counter/time_d print(ups) def color_correction(r,g,b): amp = [1,1,0.8] threshold = 10 if r < threshold and g < threshold and b < threshold: return 0,0,0 return int(r*amp[0]), int(g*amp[1]), int(b*amp[2]) def ambient_light(): t1 = Thread(target=ambient_light_thread) t1.start() t2 = Thread(target=visualizer_cava_thread) t2.start() global _r,_g,_b counter = 0 start_time = time.time() while True: # screenshot # Xorg img = pyscreenshot.grab(backend="mss", childprocess=False, bbox=(1920,0,4480,1440)) #Wayland #time.sleep(0.1) #cap = cv2.VideoCapture('/tmp/a') #count = cap.get(cv2.CAP_PROP_FRAME_COUNT) #cap.set(cv2.CAP_PROP_POS_FRAMES, count-1) #ret, frame = cap.read() #frame = cv2.cvtColor(frame,cv2.COLOR_BGR2RGB) #img = Image.fromarray(frame) #cap.release() # find dominant color img.thumbnail((2,2)) r,g,b = img.getpixel((0, 0)) r,g,b = vibrant(r,g,b) _r,_g,_b = color_correction(r,g,b) time.sleep(0.05) def rgb_to_hex(r,g,b): return "%02x%02x%02x" % (r,g,b) def hex_to_rgb(hex): r = int(hex[0:2],16) g = int(hex[2:4],16) b = int(hex[4:6],16) return r,g,b def test(): for i in range(256): h = rgb_to_hex(0,i,0) send(h) print(h) time.sleep(0.0) def tui_main(scr, *args): # -- Perform an action with Screen -- scr.border(0) scr.addstr(5, 5, 'Hello from Curses!', curses.A_BOLD) scr.addstr(6, 5, 'Press q to close this screen', curses.A_NORMAL) scr.addstr(8, 5, '\u250C') rgb = [0,0,0] color_selector = 0 while True: status = '{},{},{} {}'.format(rgb[0], rgb[1], rgb[2], color_selector) scr.addstr(1, 1, status) ch = scr.getch() if ch == ord('q'): break elif ch == ord('j'): if rgb[color_selector] > 0: rgb[color_selector] -= 1 send(rgb_to_hex(rgb[0], rgb[1], rgb[2])) elif ch == ord('k'): if rgb[color_selector] < 255: rgb[color_selector] += 1 send(rgb_to_hex(rgb[0], rgb[1], rgb[2])) elif ch == ord('l'): if color_selector < 3: color_selector += 1 elif ch == ord('h'): if color_selector > 0: color_selector -= 1 def main(argv): if not sys.stdin.isatty(): connect() for volume in sys.stdin: volume = int(volume) hex_color = rgb_to_hex(volume,0,0) send(hex_color) sys.exit() try: opts, args = getopt.getopt(argv, "s:v:ahti") except getopt.GetoptError: print(sys.argv[0], "invalid option") print("Try", sys.argv[0], "-h for help") sys.exit(1) for opt, arg in opts: if opt == "-h": helpmenu() elif opt == "-s": connect() send(arg) disconnect() elif opt == "-a": connect() ambient_light() disconnect() elif opt == "-v": connect() visualizer(arg) disconnect() elif opt == "-t": connect() test() disconnect() elif opt == '-i': connect() curses.wrapper(tui_main) disconnect() sys.exit() if __name__ == "__main__": main(sys.argv[1:])