project reimplemented

This commit is contained in:
Lucca Ketterer 2023-01-14 01:01:51 +01:00
parent 9ba01e69d1
commit 9a58d5f118
21 changed files with 196 additions and 363 deletions

3
.gitignore vendored
View File

@ -1,3 +0,0 @@
*.ipynb*
*/__pycache__/

View File

@ -1,17 +0,0 @@
# led controll
## client
## server
### requirements
SPI has to be enabled:
`sudo raspi-config -> Interface -> SPI`
### usage
-d : (deamon mode) receive tcp packages with rgb color code in hex format on port 5000
-s $(hex) : (direct mode) used to set color directly
-h : help menu

View File

@ -1,3 +0,0 @@
install :
cp client.py /bin/lc; mkdir -p /etc/lc; cp cava.conf /etc/lc/cava.conf; sudo chmod 666 /etc/lc/cava.conf

View File

View File

@ -1,8 +0,0 @@
[server]
ip = "172.0.0.1"
port = "5001"
[server]
ip = "172.0.0.1"
port = "5000"

View File

@ -1 +0,0 @@
numpy

3
controller/Makefile Normal file
View File

@ -0,0 +1,3 @@
install :
cp lc /bin/lc; mkdir -p ~/.config/lc; cp cava.conf ~/.config/lc/cava.conf; sudo chmod 666 /etc/lc/cava.conf

View File

@ -21,6 +21,7 @@ sensitivity = 1000
# The number of bars (0-200). 0 sets it to auto (fill up console). # The number of bars (0-200). 0 sets it to auto (fill up console).
# Bars' width and space between bars in number of characters. # Bars' width and space between bars in number of characters.
;bars = 128
bars = 2 bars = 2
; bar_width = 2 ; bar_width = 2
; bar_spacing = 1 ; bar_spacing = 1

View File

@ -1,48 +1,119 @@
#!/usr/bin/python3 #!/usr/bin/python3
import socket import socket
import getopt import os
import sys
import subprocess import subprocess
import pyscreenshot
#from PIL import Image, ImageDraw, ImageFont
#import cv2
import numpy as np
#from skimage import io
from threading import Thread from threading import Thread
import time
import io
import time
#from gi.repository import GLib
#from pydbus import SessionBus
import curses import curses
import toml import asyncio
import click
import pickle
import re
s1 = socket.socket() async def scan_for_pi() -> dict():
s2 = socket.socket() ip = socket.gethostbyname(socket.gethostname())
server = ["192.168.188.64", "192.168.188.76"] baseIP = '.'.join(ip.split(".")[:3])
def connect(port = 5000): def scan(ip: str) -> str:
s1.connect((server[0], port)) try:
s2.connect((server[1], port)) with socket.socket() as s:
def disconnect(): s.settimeout(0.5)
s1.close() # close the connection s.connect((ip, 5000))
s2.close() s.send("PING".encode())
def send(data): data = s.recv(1024).decode()
s1.send(data.encode()) # send message if data == "PONG":
s2.send(data.encode()) # send message print("found:", ip)
return ip
except OSError:
pass
def helpmenu(): async def scan_async(ip: str) -> str:
print("light controll\n") return await asyncio.to_thread(scan, ip)
print("Options:")
print("-h show help")
print("-s <hex-color> set static color")
print("-v <hex-color> visualizer")
print("-i interactive interface")
print("-a ambient light")
print("-t test function (debug)")
def base_color(color): pi_list = await asyncio.gather(*[scan_async(baseIP + "." + str(i)) for i in range(255)])
return [i // min(color) for i in color] return [pi for pi in pi_list if pi is not None]
class PI:
def __init__(self, ip, port=5000):
self.ip = ip
self.port = port
self.connected = False
self.socket = socket.socket()
def __str__(self):
return self.ip
def connect(self):
self.socket.connect((self.ip, self.port))
def send(self, data):
self.socket.send(data.encode())
def disconnect(self):
self.socket.close()
def load_config(path=os.path.expanduser("~/.config/lc/lc.conf")):
try:
with open(path, 'rb') as f:
return [PI(ip) for ip in pickle.load(f)]
except FileNotFoundError:
print("Config does not exist")
exit()
def save_config(obj, path=os.path.expanduser("~/.config/lc/lc.conf")):
os.makedirs(path[::-1].split('/',1)[-1][::-1], exist_ok=True)
with open(path, 'wb') as f:
pickle.dump(list([o.ip for o in obj]), f)
@click.command()
@click.argument("arg", nargs=-1)
# @click.option("-s", help="Set HEX Color: -s ffffff")
# @click.option("-v", help="Set HEX Color as base visualizer color: -v ffffff")
def main(arg):
arg = ("help",) if arg == () else arg
match arg[0]:
case "help":
print("lc [help|set|search|list]")
case "set":
if len(arg) < 2 or not re.match("[0-9a-f]{6}$", arg[1]):
print("color not a valid hex code")
exit()
pi_list = load_config()
for pi in pi_list:
print(pi)
pi.connect()
pi.send(arg[1])
pi.disconnect()
case "search":
ip_list = asyncio.run(scan_for_pi())
pi_list = [PI(ip) for ip in ip_list]
save_config(pi_list)
case "list":
pi_list = load_config()
if pi_list is not None:
for pi in pi_list:
print(pi)
case "music":
pass
if __name__ == '__main__':
main()
# def helpmenu():
# print("light controll\n")
# print("Options:")
# print("-h show help")
# print("-s <hex-color> set static color")
# print("-v <hex-color> visualizer")
# print("-i interactive interface")
# print("-a ambient light")
# print("-t test function (debug)")
# def base_color(color):
# return [i // min(color) for i in color]
def visualizer(color, amp_strength=0.6): def visualizer(color, amp_strength=0.6):
r,g,b = hex_to_rgb(color) r,g,b = hex_to_rgb(color)
@ -51,8 +122,8 @@ def visualizer(color, amp_strength=0.6):
sed = subprocess.Popen(["sed", "-u", "s/;.*;$//"], stdin=cava.stdout, stdout=subprocess.PIPE) sed = subprocess.Popen(["sed", "-u", "s/;.*;$//"], stdin=cava.stdout, stdout=subprocess.PIPE)
for line in sed.stdout: for line in sed.stdout:
amp_factor = amp_strength*((int(line)/500)-1)+1 amp_factor = amp_strength * ((int(line) / 500) - 1) + 1
send(rgb_to_hex(int(r*amp_factor),int(g*amp_factor),int(b*amp_factor))) send(rgb_to_hex(int(r * amp_factor), int(g * amp_factor), int(b * amp_factor)))
cava.stdout.close() cava.stdout.close()
sed.stdout.close() sed.stdout.close()
@ -64,6 +135,7 @@ def visualizer_cava_thread():
for line in sed.stdout: for line in sed.stdout:
volume_amp = int(line) volume_amp = int(line)
print(volume_amp)
cava.stdout.close() cava.stdout.close()
sed.stdout.close() sed.stdout.close()

6
pi/Makefile Normal file
View File

@ -0,0 +1,6 @@
install:
chmod +x ./lc.py
cp lc.service /etc/systemd/system/
systemctl daemon-reload
systemctl enable --now lc.service
systemctl restart lc

5
pi/copy_to_pi.sh Executable file
View File

@ -0,0 +1,5 @@
scp lc.py lc.service Makefile pi@192.168.188.76:~/
ssh pi@192.168.188.76 'sudo make install'
scp lc.py lc.service Makefile pi@192.168.188.64:~/
ssh pi@192.168.188.76 'sudo make install'

52
pi/lc.py Executable file
View File

@ -0,0 +1,52 @@
#!/usr/bin/python3
import socket
import re
import Adafruit_WS2801
import Adafruit_GPIO.SPI as SPI
import RPi.GPIO as GPIO
PIXEL_COUNT = 32
SPI_PORT = 0
SPI_DEVICE = 0
pixels = Adafruit_WS2801.WS2801Pixels(PIXEL_COUNT, spi=SPI.SpiDev(SPI_PORT, SPI_DEVICE), gpio=GPIO)
def hex_to_rgb(hex):
r = int(hex[0:2],16)
g = int(hex[2:4],16)
b = int(hex[4:6],16)
return r,g,b
reg = re.compile("[0-9a-f]{6}$")
host = "0.0.0.0"
port = 5000
s = socket.socket()
s.bind((host, port))
s.listen(1)
while True:
conn, address = s.accept()
print("Connection from: ", str(address))
while True:
data = conn.recv(1024).decode()
if not data:
print("Disconneted: ", str(address))
break
if reg.match(data):
print("set color", data)
for i in range(PIXEL_COUNT):
pixels.set_pixel_rgb(i, *hex_to_rgb(data))
pixels.show()
elif data == 'PING':
conn.send('PONG'.encode())
else:
print(data, "incorrect format. use hex color code.")
break
conn.close()

19
pi/lc.service Normal file
View File

@ -0,0 +1,19 @@
[Unit]
Description=Start Light Controll as Daemon
After=multi-user.target
[Service]
Type=simple
User=pi
Group=pi
SyslogIdentifier=lc
Restart=on-failure
RestartSec=2
WorkingDirectory=/home/pi
ExecStart=/home/pi/lc.py
SyslogIdentifier=pyserver
StandardOutput=syslog
StandardError=syslog
[Install]
WantedBy=multi-user.target

View File

@ -1,34 +0,0 @@
import sys
import time
import ws2801
def color_cycle(speed):
if int(speed) == 0:
delay = 0
else:
delay = 1/int(speed) * 10
r = 255
g = 0
b = 0
while True:
while r > 0:
r = r-1
g = g+1
ws2801.set_color(r,g,b)
time.sleep(delay)
while g > 0:
g = g-1
b = b+1
ws2801.set_color(r,g,b)
time.sleep(delay)
while b > 0:
b = b-1
r = r+1
ws2801.set_color(r,g,b)
time.sleep(delay)
def visualizer():
for volume in sys.stdin:
volume = int(volume)
ws2801.set_color(volume,0,0)

View File

@ -1,2 +0,0 @@
scp *.py pi@192.168.188.76:~/led_controll/
scp *.py pi@192.168.188.64:~/led_controll/

View File

@ -1,101 +0,0 @@
# Simple demo of of the WS2801/SPI-like addressable RGB LED lights.
import time
import RPi.GPIO as GPIO
# Import the WS2801 module.
import Adafruit_WS2801
import Adafruit_GPIO.SPI as SPI
# Configure the count of pixels:
PIXEL_COUNT = 32
# Alternatively specify a hardware SPI connection on /dev/spidev0.0:
SPI_PORT = 0
SPI_DEVICE = 0
pixels = Adafruit_WS2801.WS2801Pixels(PIXEL_COUNT, spi=SPI.SpiDev(SPI_PORT, SPI_DEVICE), gpio=GPIO)
# Define the wheel function to interpolate between different hues.
def wheel(pos):
if pos < 85:
return Adafruit_WS2801.RGB_to_color(pos * 3, 255 - pos * 3, 0)
elif pos < 170:
pos -= 85
return Adafruit_WS2801.RGB_to_color(255 - pos * 3, 0, pos * 3)
else:
pos -= 170
return Adafruit_WS2801.RGB_to_color(0, pos * 3, 255 - pos * 3)
# Define rainbow cycle function to do a cycle of all hues.
def rainbow_cycle_successive(pixels, wait=0.1):
for i in range(pixels.count()):
# tricky math! we use each pixel as a fraction of the full 96-color wheel
# (thats the i / strip.numPixels() part)
# Then add in j which makes the colors go around per pixel
# the % 96 is to make the wheel cycle around
pixels.set_pixel(i, wheel(((i * 256 // pixels.count())) % 256) )
pixels.show()
if wait > 0:
time.sleep(wait)
def rainbow_cycle(pixels, wait=0.005):
for j in range(256): # one cycle of all 256 colors in the wheel
for i in range(pixels.count()):
pixels.set_pixel(i, wheel(((i * 256 // pixels.count()) + j) % 256) )
pixels.show()
if wait > 0:
time.sleep(wait)
def rainbow_colors(pixels, wait=0.05):
for j in range(256): # one cycle of all 256 colors in the wheel
for i in range(pixels.count()):
pixels.set_pixel(i, wheel(((256 // pixels.count() + j)) % 256) )
pixels.show()
if wait > 0:
time.sleep(wait)
def brightness_decrease(pixels, wait=0.01, step=1):
for j in range(int(256 // step)):
for i in range(pixels.count()):
r, g, b = pixels.get_pixel_rgb(i)
r = int(max(0, r - step))
g = int(max(0, g - step))
b = int(max(0, b - step))
pixels.set_pixel(i, Adafruit_WS2801.RGB_to_color( r, g, b ))
pixels.show()
if wait > 0:
time.sleep(wait)
def blink_color(pixels, blink_times=5, wait=0.5, color=(255,0,0)):
for i in range(blink_times):
# blink two times, then wait
pixels.clear()
for j in range(2):
for k in range(pixels.count()):
pixels.set_pixel(k, Adafruit_WS2801.RGB_to_color( color[0], color[1], color[2] ))
pixels.show()
time.sleep(0.08)
pixels.clear()
pixels.show()
time.sleep(0.08)
time.sleep(wait)
def appear_from_back(pixels, color=(255, 0, 0)):
pos = 0
for i in range(pixels.count()):
for j in reversed(range(i, pixels.count())):
pixels.clear()
# first set all pixels at the begin
for k in range(i):
pixels.set_pixel(k, Adafruit_WS2801.RGB_to_color( color[0], color[1], color[2] ))
# set then the pixel at position j
pixels.set_pixel(j, Adafruit_WS2801.RGB_to_color( color[0], color[1], color[2] ))
pixels.show()
time.sleep(0.02)
if __name__ == "__main__":
while True:
rainbow_colors(pixels, wait=0.3)

View File

@ -1,46 +0,0 @@
import time
import RPi.GPIO as GPIO
import Adafruit_WS2801
import Adafruit_GPIO.SPI as SPI
PIXEL_COUNT = 32
SPI_PORT = 0
SPI_DEVICE = 0
pixels = Adafruit_WS2801.WS2801Pixels(PIXEL_COUNT, spi=SPI.SpiDev(SPI_PORT, SPI_DEVICE), gpio=GPIO)
def set_color(r, g, b):
for i in range(pixels.count()):
pixels.set_pixel(i, Adafruit_WS2801.RGB_to_color(r,g,b))
pixels.show()
def clear_pixels():
pixels.clear()
pixels.show()
def main():
DELAY = 0.1
r = 255
g = 0
b = 0
while True:
while r > 0:
r = r-1
g = g+1
set_color(r,g,b)
time.sleep(DELAY)
while g > 0:
g = g-1
b = b+1
set_color(r,g,b)
time.sleep(DELAY)
while b > 0:
b = b-1
r = r+1
set_color(r,g,b)
time.sleep(DELAY)
if __name__ == "__main__":
main()

View File

@ -1,47 +0,0 @@
#!/usr/bin/env python3
import sys
import getopt
import ws2801
import server
import color_mode
def helpmenu():
print("Options:")
print("-h show help")
print("-s <color> color in hex")
print("-c <speed> colorcycle")
def main(argv):
if not sys.stdin.isatty():
color_mode.visualizer()
sys.exit()
try:
opts, args = getopt.getopt(argv,"hds:c:r")
except getopt.GetoptError:
print("ws2801.py: invalid option")
print("Try 'ws2801.py -h' for help")
sys.exit(2)
for opt, arg in opts:
if opt == "-h":
helpmenu()
sys.exit()
elif opt == "-s":
r,g,b = ws2801.hex_to_rgb(arg)
ws2801.set_color(r,g,b)
sys.exit()
elif opt == "-c":
color_mode.color_cycle(arg)
sys.exit()
elif opt == "-d":
server.start()
sys.exit()
helpmenu()
sys.exit()
if __name__ == "__main__":
main(sys.argv[1:])

View File

@ -1,5 +0,0 @@
RPi.GPIO
Adafruit-WS2801
Adafruit-GPIO

View File

@ -1,33 +0,0 @@
import socket
import ws2801
import re
def validate_data(data):
reg = re.compile("[0-9a-f]{6}$")
return reg.match(data)
def start():
host = "0.0.0.0"
port = 5000
server_socket = socket.socket()
server_socket.bind((host, port))
# configure how many client the server can listen simultaneously
server_socket.listen(1)
while True:
conn, address = server_socket.accept() # accept new connection
print("Connection from: " + str(address))
# receive data stream. it won't accept data packet greater than 1024 bytes
while True:
data = conn.recv(1024).decode()
if not data:
print("Disconnected: " + str(address))
break
if validate_data(data):
r,g,b = ws2801.hex_to_rgb(data)
ws2801.set_color(r,g,b)
else:
print("incorrect format. use hex color code.")
conn.close() # close the connection

View File

@ -1,25 +0,0 @@
import RPi.GPIO as GPIO
import Adafruit_WS2801
import Adafruit_GPIO.SPI as SPI
PIXEL_COUNT = 32
SPI_PORT = 0
SPI_DEVICE = 0
pixels = Adafruit_WS2801.WS2801Pixels(PIXEL_COUNT, spi=SPI.SpiDev(SPI_PORT, SPI_DEVICE), gpio=GPIO)
def hex_to_rgb(hex):
r = int(hex[0:2],16)
g = int(hex[2:4],16)
b = int(hex[4:6],16)
print(r,g,b)
return r,g,b
def set_color(r,g,b):
for i in range(pixels.count()):
pixels.set_pixel(i, Adafruit_WS2801.RGB_to_color(r,g,b))
pixels.show()
def clear_pixels():
pixels.clear()
pixels.show()