348 lines
9.8 KiB
Python
Executable File
348 lines
9.8 KiB
Python
Executable File
#!/usr/bin/python3
|
|
import socket
|
|
import os
|
|
import subprocess
|
|
from threading import Thread
|
|
import curses
|
|
import asyncio
|
|
import click
|
|
import pickle
|
|
import re
|
|
import webview
|
|
|
|
async def scan_for_pi() -> dict():
|
|
ip = socket.gethostbyname(socket.gethostname())
|
|
baseIP = '.'.join(ip.split(".")[:3])
|
|
|
|
def scan(ip: str) -> str:
|
|
try:
|
|
with socket.socket() as s:
|
|
s.settimeout(0.5)
|
|
s.connect((ip, 5000))
|
|
s.send("PING".encode())
|
|
data = s.recv(1024).decode()
|
|
if data == "PONG":
|
|
print("found:", ip)
|
|
return ip
|
|
except OSError:
|
|
pass
|
|
|
|
async def scan_async(ip: str) -> str:
|
|
return await asyncio.to_thread(scan, ip)
|
|
|
|
pi_list = await asyncio.gather(*[scan_async(baseIP + "." + str(i)) for i in range(255)])
|
|
return [pi for pi in pi_list if pi is not None]
|
|
|
|
class PI:
|
|
def __init__(self, ip, port=5000):
|
|
self.ip = ip
|
|
self.port = port
|
|
self.connected = False
|
|
self.socket = socket.socket()
|
|
|
|
def __str__(self):
|
|
return self.ip
|
|
|
|
def connect(self):
|
|
self.socket.connect((self.ip, self.port))
|
|
|
|
def send(self, data):
|
|
self.socket.send(data.encode())
|
|
|
|
def disconnect(self):
|
|
self.socket.close()
|
|
|
|
def load_config(path=os.path.expanduser("~/.config/lc/lc.conf")):
|
|
try:
|
|
with open(path, 'rb') as f:
|
|
return [PI(ip) for ip in pickle.load(f)]
|
|
except FileNotFoundError:
|
|
print("Config does not exist")
|
|
exit()
|
|
|
|
def save_config(obj, path=os.path.expanduser("~/.config/lc/lc.conf")):
|
|
os.makedirs(path[::-1].split('/',1)[-1][::-1], exist_ok=True)
|
|
with open(path, 'wb') as f:
|
|
pickle.dump(list([o.ip for o in obj]), f)
|
|
|
|
def set_pixels(color):
|
|
if not re.match("[0-9a-f]{6}$", color):
|
|
print(f"{color} not a valid hex color code")
|
|
return
|
|
pi_list = load_config()
|
|
for pi in pi_list:
|
|
pi.connect()
|
|
pi.send(color)
|
|
pi.disconnect()
|
|
|
|
@click.command()
|
|
@click.argument("arg", nargs=-1)
|
|
# @click.option("-s", help="Set HEX Color: -s ffffff")
|
|
# @click.option("-v", help="Set HEX Color as base visualizer color: -v ffffff")
|
|
def main(arg):
|
|
if arg == ():
|
|
path = os.path.expanduser("~/.config/lc/gui.py")
|
|
proc = subprocess.Popen(["python", "-m", "streamlit", "run", path, "--server.headless", "True"], stdout=subprocess.PIPE)
|
|
for line in proc.stdout:
|
|
if line == b' You can now view your Streamlit app in your browser.\n':
|
|
break
|
|
webview.create_window('LED Control', 'http://localhost:8501')
|
|
webview.start()
|
|
exit()
|
|
|
|
match arg[0]:
|
|
case "help":
|
|
print("lc [help|set|search|list]")
|
|
|
|
case "set":
|
|
if len(arg) < 2:
|
|
print("color argument missing")
|
|
set_pixels(arg[1])
|
|
|
|
case "search":
|
|
ip_list = asyncio.run(scan_for_pi())
|
|
pi_list = [PI(ip) for ip in ip_list]
|
|
save_config(pi_list)
|
|
|
|
case "list":
|
|
pi_list = load_config()
|
|
if pi_list is not None:
|
|
for pi in pi_list:
|
|
print(pi)
|
|
case "music":
|
|
pass
|
|
|
|
if __name__ == '__main__':
|
|
main()
|
|
|
|
# def helpmenu():
|
|
# print("light controll\n")
|
|
# print("Options:")
|
|
# print("-h show help")
|
|
# print("-s <hex-color> set static color")
|
|
# print("-v <hex-color> visualizer")
|
|
# print("-i interactive interface")
|
|
# print("-a ambient light")
|
|
# print("-t test function (debug)")
|
|
|
|
# def base_color(color):
|
|
# return [i // min(color) for i in color]
|
|
|
|
# def visualizer(color, amp_strength=0.6):
|
|
# r,g,b = hex_to_rgb(color)
|
|
|
|
# cava = subprocess.Popen(["cava", "-p", "/etc/lc/cava.conf"], stdout=subprocess.PIPE)
|
|
# sed = subprocess.Popen(["sed", "-u", "s/;.*;$//"], stdin=cava.stdout, stdout=subprocess.PIPE)
|
|
|
|
# for line in sed.stdout:
|
|
# amp_factor = amp_strength * ((int(line) / 500) - 1) + 1
|
|
# send(rgb_to_hex(int(r * amp_factor), int(g * amp_factor), int(b * amp_factor)))
|
|
# cava.stdout.close()
|
|
# sed.stdout.close()
|
|
|
|
# def visualizer_cava_thread():
|
|
# global volume_amp
|
|
|
|
# cava = subprocess.Popen(["cava", "-p", "/etc/lc/cava.conf"], stdout=subprocess.PIPE)
|
|
# sed = subprocess.Popen(["sed", "-u", "s/;.*;$//"], stdin=cava.stdout, stdout=subprocess.PIPE)
|
|
|
|
# for line in sed.stdout:
|
|
# volume_amp = int(line)
|
|
# print(volume_amp)
|
|
# cava.stdout.close()
|
|
# sed.stdout.close()
|
|
|
|
# def amp_by_vol(color, amp_strength):
|
|
# global volume_amp
|
|
# # amp_strength in percentage
|
|
# amp_factor = amp_strength*((volume_amp/500)-1)+1
|
|
# #print(amp_factor, color,[c*amp_factor for c in color], volume_amp)
|
|
# return [c*amp_factor for c in color]
|
|
|
|
# def vibrant(r,g,b):
|
|
# intensity = 50 # usabel range 1-100 max:1000
|
|
|
|
# intensity = 1+intensity/1000
|
|
# rgb = [r,g,b]
|
|
# #min_idx = rgb.index(min(rgb))
|
|
# d = (r+g+b)/3
|
|
# for c in range(3):
|
|
# if rgb[c] < d:
|
|
# rgb[c] = int(rgb[c]*(intensity**(rgb[c]-d)))
|
|
# elif rgb[c] > d:
|
|
# rgb[c] = int(rgb[c]*(-intensity**(-rgb[c]+d)+2))
|
|
# if rgb[c] > 255:
|
|
# rgb[c] = 255
|
|
# #rgb[min_idx] = int(rgb[min_idx]*(rgb[min_idx]/d)**2)
|
|
# return rgb
|
|
|
|
# def ambient_light_thread():
|
|
# r,g,b = 0,0,0
|
|
# brighness = 1
|
|
# active_color = ''
|
|
|
|
# while True:
|
|
# # P-Regler
|
|
# r,g,b = [w+((y-w)*0.1) for y,w in zip((_r,_g,_b),(r,g,b))]
|
|
# if ((round(r),round(g),round(b)) == (_r,_g,_b)):
|
|
# active_color = '\033[0;32;40m'
|
|
# else:
|
|
# active_color = '\033[0;31;40m'
|
|
# r_out,g_out,b_out = amp_by_vol((r,g,b), 0.6)
|
|
# #r_out,g_out,b_out = r,g,b
|
|
# print(active_color, round(r),round(g),round(b), round(r_out),round(g_out),round(b_out), '\033[0;37;40m')
|
|
# send(rgb_to_hex(int(r_out*brighness),int(g_out*brighness),int(b_out*brighness)))
|
|
# time.sleep(0.01)
|
|
|
|
# ups_counter = 0
|
|
# start_time = time.time()
|
|
|
|
# def ups():
|
|
# global ups_counter
|
|
# global start_time
|
|
|
|
# ups_counter += 1
|
|
# time_d = time.time()-start_time
|
|
# ups = ups_counter/time_d
|
|
# print(ups)
|
|
|
|
# def color_correction(r,g,b):
|
|
# amp = [1,1,0.8]
|
|
# threshold = 10
|
|
# if r < threshold and g < threshold and b < threshold:
|
|
# return 0,0,0
|
|
# return int(r*amp[0]), int(g*amp[1]), int(b*amp[2])
|
|
|
|
# def ambient_light():
|
|
|
|
# t1 = Thread(target=ambient_light_thread)
|
|
# t1.start()
|
|
|
|
# t2 = Thread(target=visualizer_cava_thread)
|
|
# t2.start()
|
|
|
|
# global _r,_g,_b
|
|
|
|
# counter = 0
|
|
# start_time = time.time()
|
|
# while True:
|
|
# # screenshot
|
|
|
|
# # Xorg
|
|
# img = pyscreenshot.grab(backend="mss", childprocess=False, bbox=(1920,0,4480,1440))
|
|
|
|
# #Wayland
|
|
# #time.sleep(0.1)
|
|
# #cap = cv2.VideoCapture('/tmp/a')
|
|
# #count = cap.get(cv2.CAP_PROP_FRAME_COUNT)
|
|
# #cap.set(cv2.CAP_PROP_POS_FRAMES, count-1)
|
|
|
|
# #ret, frame = cap.read()
|
|
|
|
# #frame = cv2.cvtColor(frame,cv2.COLOR_BGR2RGB)
|
|
# #img = Image.fromarray(frame)
|
|
# #cap.release()
|
|
|
|
# # find dominant color
|
|
# img.thumbnail((2,2))
|
|
# r,g,b = img.getpixel((0, 0))
|
|
# r,g,b = vibrant(r,g,b)
|
|
# _r,_g,_b = color_correction(r,g,b)
|
|
|
|
# time.sleep(0.05)
|
|
|
|
|
|
# def rgb_to_hex(r,g,b):
|
|
# return "%02x%02x%02x" % (r,g,b)
|
|
|
|
# def hex_to_rgb(hex):
|
|
# r = int(hex[0:2],16)
|
|
# g = int(hex[2:4],16)
|
|
# b = int(hex[4:6],16)
|
|
# return r,g,b
|
|
|
|
# def test():
|
|
# for i in range(256):
|
|
# h = rgb_to_hex(0,i,0)
|
|
# send(h)
|
|
# print(h)
|
|
# time.sleep(0.0)
|
|
|
|
# def tui_main(scr, *args):
|
|
# # -- Perform an action with Screen --
|
|
# scr.border(0)
|
|
# scr.addstr(5, 5, 'Hello from Curses!', curses.A_BOLD)
|
|
# scr.addstr(6, 5, 'Press q to close this screen', curses.A_NORMAL)
|
|
# scr.addstr(8, 5, '\u250C')
|
|
|
|
# rgb = [0,0,0]
|
|
# color_selector = 0
|
|
|
|
# while True:
|
|
# status = '{},{},{} {}'.format(rgb[0], rgb[1], rgb[2], color_selector)
|
|
# scr.addstr(1, 1, status)
|
|
|
|
# ch = scr.getch()
|
|
# if ch == ord('q'):
|
|
# break
|
|
# elif ch == ord('j'):
|
|
# if rgb[color_selector] > 0:
|
|
# rgb[color_selector] -= 1
|
|
# send(rgb_to_hex(rgb[0], rgb[1], rgb[2]))
|
|
# elif ch == ord('k'):
|
|
# if rgb[color_selector] < 255:
|
|
# rgb[color_selector] += 1
|
|
# send(rgb_to_hex(rgb[0], rgb[1], rgb[2]))
|
|
# elif ch == ord('l'):
|
|
# if color_selector < 3:
|
|
# color_selector += 1
|
|
# elif ch == ord('h'):
|
|
# if color_selector > 0:
|
|
# color_selector -= 1
|
|
|
|
# def main(argv):
|
|
# if not sys.stdin.isatty():
|
|
# connect()
|
|
# for volume in sys.stdin:
|
|
# volume = int(volume)
|
|
# hex_color = rgb_to_hex(volume,0,0)
|
|
# send(hex_color)
|
|
# sys.exit()
|
|
|
|
# try:
|
|
# opts, args = getopt.getopt(argv, "s:v:ahti")
|
|
# except getopt.GetoptError:
|
|
# print(sys.argv[0], "invalid option")
|
|
# print("Try", sys.argv[0], "-h for help")
|
|
# sys.exit(1)
|
|
|
|
# for opt, arg in opts:
|
|
# if opt == "-h":
|
|
# helpmenu()
|
|
# elif opt == "-s":
|
|
# connect()
|
|
# send(arg)
|
|
# disconnect()
|
|
# elif opt == "-a":
|
|
# connect()
|
|
# ambient_light()
|
|
# disconnect()
|
|
# elif opt == "-v":
|
|
# connect()
|
|
# visualizer(arg)
|
|
# disconnect()
|
|
# elif opt == "-t":
|
|
# connect()
|
|
# test()
|
|
# disconnect()
|
|
# elif opt == '-i':
|
|
# connect()
|
|
# curses.wrapper(tui_main)
|
|
# disconnect()
|
|
|
|
# sys.exit()
|
|
|
|
# if __name__ == "__main__":
|
|
|
|
# main(sys.argv[1:])
|