led_control/client/client.py
2021-04-26 20:13:46 +02:00

270 lines
7.0 KiB
Python
Executable File

#!/usr/bin/python3
import socket
import getopt
import sys
import subprocess
#import pyscreenshot
#from PIL import Image, ImageDraw, ImageFont
#import cv2
import numpy as np
#from skimage import io
from threading import Thread
import time
import io
import time
#from gi.repository import GLib
#from pydbus import SessionBus
import curses
s1 = socket.socket()
s2 = socket.socket()
server = ["192.168.188.64", "192.168.188.76"]
def connect(port = 5000):
s1.connect((server[0], port))
s2.connect((server[1], port))
def disconnect():
s1.close() # close the connection
s2.close()
def send(data):
s1.send(data.encode()) # send message
s2.send(data.encode()) # send message
def helpmenu():
print("light controll\n")
print("Options:")
print("-h show help")
print("-s <hex-color> set static color")
print("-v <hex-color> visualizer")
print("-i interactive interface")
print("-a ambient light")
print("-t test function (debug)")
def base_color(r,g,b):
min_value = min([r,g,b])
return r//min_value, g//min_value, b//min_value
def visualizer(color):
r,g,b = hex_to_rgb(color)
r_base,g_base,b_base = base_color(r,g,b)
cava = subprocess.Popen(["cava", "-p", "/etc/lc/cava.conf"], stdout=subprocess.PIPE)
sed = subprocess.Popen(["sed", "-u", "s/;.*;$//"], stdin=cava.stdout, stdout=subprocess.PIPE)
#subprocess.Popen([sys.argv[0]], stdin=sed.stdout, shell=False)
for line in sed.stdout:
line = int(line)
r_out = int(r*(line/1000))
g_out = int(g*(line/1000))
b_out = int(b*(line/1000))
# print(line, ',', r_out,g_out,b_out, ' | ', r,g,b)
print(r_out,g_out,b_out)
if r_out < r_base or g_out < g_base or b_out < b_base:
r_out = r_base
g_out = g_base
b_out = b_base
hex_color = rgb_to_hex(r_out,g_out,b_out)
send(hex_color)
cava.stdout.close()
sed.stdout.close()
def get_base_color(color):
return [i / min(color) for i in color]
_r,_g,_b = 0,0,0
def vibrant(r,g,b):
intensity = 1 # usabel range 1-100 max:1000
intensity = 1+intensity/1000
rgb = [r,g,b]
#min_idx = rgb.index(min(rgb))
d = (r+g+b)/3
for c in range(3):
if rgb[c] < d:
rgb[c] = int(rgb[c]*(intensity**(rgb[c]-d)))
elif rgb[c] > d:
rgb[c] = int(rgb[c]*(-intensity**(-rgb[c]+d)+2))
if rgb[c] > 255:
rgb[c] = 255
#rgb[min_idx] = int(rgb[min_idx]*(rgb[min_idx]/d)**2)
return rgb
def ambient_light_thread():
r,g,b = 0,0,0
brighness = 1
while True:
if r > _r:
r-=1
elif r < _r:
r+=1
if g > _g:
g-=1
elif g < _g:
g+=1
if b > _b:
b-=1
elif b < _b:
b+=1
print(r,g,b)
send(rgb_to_hex(int(r*brighness),int(g*brighness),int(b*brighness)))
time.sleep(0.01)
def ambient_light():
t = Thread(target=ambient_light_thread)
t.start()
global _r,_g,_b
#bus = SessionBus()
#RecorderPipeline = "vp8enc min_quantizer=10 max_quantizer=50 cq_level=13 cpu-used=1 deadline=1000000 threads=%T ! queue ! webmmux"
#GNOMEScreencast = bus.get('org.gnome.Shell.Screencast', '/org/gnome/Shell/Screencast')
#GNOMEScreencast.Screencast('/tmp/a', {})
#GNOMEScreencast.StopScreencast()
#time.sleep(1)
amp = [1,1,0.8]
counter = 0
start_time = time.time()
while True:
counter += 1
time_d = time.time()-start_time
ups = counter/time_d
#print(ups)
# screenshot
# Xorg
img = pyscreenshot.grab(backend="mss", childprocess=False, bbox=(1920,0,4480,1440))
#Wayland
#time.sleep(0.1)
#cap = cv2.VideoCapture('/tmp/a')
#count = cap.get(cv2.CAP_PROP_FRAME_COUNT)
#cap.set(cv2.CAP_PROP_POS_FRAMES, count-1)
#ret, frame = cap.read()
#frame = cv2.cvtColor(frame,cv2.COLOR_BGR2RGB)
#img = Image.fromarray(frame)
#cap.release()
'''
img = np.array(img)
pixels = np.float32(img.reshape(-1, 3))
img = cv2.UMat(img)
n_colors = 1
criteria = (cv2.TERM_CRITERIA_EPS + cv2.TERM_CRITERIA_MAX_ITER, 200, .1)
flags = cv2.KMEANS_RANDOM_CENTERS
_, labels, palette = cv2.kmeans(img, n_colors, None, criteria, 10, flags)
_, counts = np.unique(labels, return_counts=True)
dominant = palette[np.argmax(counts)]
_r,_g,_b = int(dominant[0]), int(dominant[1]), int(dominant[2])
'''
# find dominant color
img.thumbnail((1,1))
r,g,b = img.getpixel((0, 0))
_r,_g,_b = vibrant(r,g,b)
time.sleep(0.05)
def rgb_to_hex(r,g,b):
return "%02x%02x%02x" % (r,g,b)
def hex_to_rgb(hex):
r = int(hex[0:2],16)
g = int(hex[2:4],16)
b = int(hex[4:6],16)
return r,g,b
def test():
for i in range(256):
h = rgb_to_hex(0,i,0)
send(h)
print(h)
time.sleep(0.0)
def tui_main(scr, *args):
# -- Perform an action with Screen --
scr.border(0)
scr.addstr(5, 5, 'Hello from Curses!', curses.A_BOLD)
scr.addstr(6, 5, 'Press q to close this screen', curses.A_NORMAL)
scr.addstr(8, 5, '\u250C')
rgb = [0,0,0]
color_selector = 0
while True:
status = '{},{},{} {}'.format(rgb[0], rgb[1], rgb[2], color_selector)
scr.addstr(1, 1, status)
ch = scr.getch()
if ch == ord('q'):
break
elif ch == ord('j'):
if rgb[color_selector] > 0:
rgb[color_selector] -= 1
send(rgb_to_hex(rgb[0], rgb[1], rgb[2]))
elif ch == ord('k'):
if rgb[color_selector] < 255:
rgb[color_selector] += 1
send(rgb_to_hex(rgb[0], rgb[1], rgb[2]))
elif ch == ord('l'):
if color_selector < 3:
color_selector += 1
elif ch == ord('h'):
if color_selector > 0:
color_selector -= 1
def main(argv):
if not sys.stdin.isatty():
connect()
for volume in sys.stdin:
volume = int(volume)
hex_color = rgb_to_hex(volume,0,0)
send(hex_color)
sys.exit()
try:
opts, args = getopt.getopt(argv, "s:v:ahti")
except getopt.GetoptError:
print(sys.argv[0], "invalid option")
print("Try", sys.argv[0], "-h for help")
sys.exit(1)
for opt, arg in opts:
if opt == "-h":
helpmenu()
elif opt == "-s":
connect()
send(arg)
disconnect()
elif opt == "-a":
connect()
ambient_light()
disconnect()
elif opt == "-v":
connect()
visualizer(arg)
disconnect()
elif opt == "-t":
connect()
test()
disconnect()
elif opt == '-i':
connect()
curses.wrapper(tui_main)
disconnect()
sys.exit()
if __name__ == "__main__":
main(sys.argv[1:])