204 lines
5.4 KiB
Python
204 lines
5.4 KiB
Python
import os
|
|
from enum import Enum
|
|
from typing import NamedTuple
|
|
from typing_extensions import Annotated
|
|
import typer
|
|
import json
|
|
import paho.mqtt.publish as pub
|
|
|
|
|
|
class LightState(str, Enum):
|
|
on = "off"
|
|
off = "on"
|
|
|
|
class XYColor(NamedTuple):
|
|
x: float
|
|
y: float
|
|
|
|
|
|
NAME_TO_ID = {
|
|
"bedroom": "0x001788010d1b599a",
|
|
"hallway": "0x001788010d253b99",
|
|
"office": "0x001788010e371aa4",
|
|
"all": "all_lights",
|
|
"infuse": "infuse_group",
|
|
}
|
|
COLOR_MAP = {
|
|
"red": (0.6942, 0.2963),
|
|
"blue": (0.1355, 0.0399),
|
|
"green": (0.1704, 0.709),
|
|
"yellow": (0.4339, 0.5008),
|
|
"coolest": (0.3131, 0.3232),
|
|
"cool": (0.3804, 0.3767),
|
|
"neutral": (0.4599, 0.4106),
|
|
"warm": (0.5056, 0.4152),
|
|
"warmest": (0.5267, 0.4133),
|
|
}
|
|
MQTT_BROKER = "ha1"
|
|
|
|
|
|
class LightID(str, Enum):
|
|
bedroom = "bedroom"
|
|
hallway = "hallway"
|
|
office = "office"
|
|
all = "all"
|
|
infuse = "infuse"
|
|
|
|
def get_mqtt_broker():
|
|
if "HUECLI_BROKER" in os.environ:
|
|
return os.environ["HUECLI_BROKER"]
|
|
else:
|
|
return MQTT_BROKER
|
|
|
|
|
|
def complete_color(incomplete: str):
|
|
completion = []
|
|
for color in COLOR_MAP:
|
|
if color.startswith(incomplete):
|
|
completion.append(color)
|
|
return completion
|
|
|
|
|
|
def complete_id(incomplete: str):
|
|
completion = []
|
|
for id in NAME_TO_ID:
|
|
if id.startswith(incomplete):
|
|
completion.append(id)
|
|
return completion
|
|
|
|
def complete_state(incomplete: str):
|
|
states = ["on", "off"]
|
|
completion = []
|
|
for id in states:
|
|
if id.startswith(incomplete):
|
|
completion.append(id)
|
|
return completion
|
|
|
|
|
|
def rgb_to_xy(red, green, blue) -> XYColor:
|
|
"""conversion of RGB colors to CIE1931 XY colors
|
|
Formulas implemented from: https://gist.github.com/popcorn245/30afa0f98eea1c2fd34d
|
|
|
|
Args:
|
|
red (float): a number between 0.0 and 1.0 representing red in the RGB space
|
|
green (float): a number between 0.0 and 1.0 representing green in the RGB space
|
|
blue (float): a number between 0.0 and 1.0 representing blue in the RGB space
|
|
|
|
Returns:
|
|
xy (list): x and y
|
|
"""
|
|
|
|
# gamma correction
|
|
red = pow((red + 0.055) / (1.0 + 0.055), 2.4) if red > 0.04045 else (red / 12.92)
|
|
green = (
|
|
pow((green + 0.055) / (1.0 + 0.055), 2.4)
|
|
if green > 0.04045
|
|
else (green / 12.92)
|
|
)
|
|
blue = (
|
|
pow((blue + 0.055) / (1.0 + 0.055), 2.4) if blue > 0.04045 else (blue / 12.92)
|
|
)
|
|
|
|
# convert rgb to xyz
|
|
x = red * 0.649926 + green * 0.103455 + blue * 0.197109
|
|
y = red * 0.234327 + green * 0.743075 + blue * 0.022598
|
|
z = green * 0.053077 + blue * 1.035763
|
|
|
|
# convert xyz to xy
|
|
x = x / (x + y + z)
|
|
y = y / (x + y + z)
|
|
|
|
# TODO check color gamut if known
|
|
|
|
return XYColor(x, y)
|
|
|
|
|
|
app = typer.Typer()
|
|
|
|
|
|
@app.command()
|
|
def set_color(
|
|
id: Annotated[
|
|
LightID, typer.Argument(help="ID of light.", autocompletion=complete_id)
|
|
],
|
|
color: Annotated[
|
|
str, typer.Argument(help="Color to set", autocompletion=complete_color)
|
|
],
|
|
):
|
|
"""
|
|
Set the color of ID to COLOR where COLOR is either a known color value,
|
|
or a comma separated RGB value, like "0.1,0.2,0.3".
|
|
"""
|
|
str_id = NAME_TO_ID[id]
|
|
topic = f"zigbee2mqtt/{str_id}/set"
|
|
xy = parse_color(color)
|
|
payload_raw= {
|
|
"color": {"x": xy[0], "y": xy[1]},
|
|
"color_mode": "xy",
|
|
}
|
|
payload = json.dumps(payload_raw)
|
|
pub.single(topic, payload, hostname=get_mqtt_broker())
|
|
|
|
|
|
@app.command()
|
|
def set_state(
|
|
id: Annotated[
|
|
LightID, typer.Argument(help="ID of light.", autocompletion=complete_id)
|
|
],
|
|
state: Annotated[
|
|
LightState, typer.Argument(help="State of light.", autocompletion=complete_state)
|
|
]):
|
|
"""
|
|
Set the state of ID to STATE
|
|
"""
|
|
str_id = NAME_TO_ID[id]
|
|
topic = f"zigbee2mqtt/{str_id}/set"
|
|
payload = json.dumps({"state": state.value})
|
|
pub.single(topic, payload, hostname=get_mqtt_broker())
|
|
|
|
|
|
@app.command()
|
|
def set_brightness(
|
|
id: Annotated[
|
|
LightID, typer.Argument(help="ID of light.", autocompletion=complete_id)
|
|
],
|
|
brightness: int):
|
|
"""
|
|
Set brigthness of ID to BRIGHTNESS
|
|
"""
|
|
if brightness < 0 or brightness > 255:
|
|
raise ValueError("Brightness must be between 0 and 255")
|
|
str_id = NAME_TO_ID[id]
|
|
topic = f"zigbee2mqtt/{str_id}/set"
|
|
payload = json.dumps({"brightness": brightness})
|
|
pub.single(topic, payload, hostname=get_mqtt_broker())
|
|
|
|
def parse_color(color: str) -> XYColor:
|
|
if color in COLOR_MAP:
|
|
return XYColor(COLOR_MAP[color][0], COLOR_MAP[color][1])
|
|
split = color.split(",")
|
|
match len(split):
|
|
case 2:
|
|
return XYColor(float(split[0]), float(split[1]))
|
|
case 3:
|
|
return _parse_rgb_color(float(split[0]), float(split[1]), float(split[2]))
|
|
case _:
|
|
raise ValueError("Invalid color format")
|
|
|
|
def _parse_rgb_color(r: float, g: float, b: float) -> XYColor:
|
|
if any(val > 1 for val in [r,g,b]):
|
|
if any(val > 255 for val in [r,g,b]):
|
|
raise ValueError("RGB values must be between 0 and 255 or 0.0 and 1.0")
|
|
vals = [val/255 for val in [r,g,b]]
|
|
return rgb_to_xy(vals[0], vals[1], vals[2])
|
|
if all(val > 0 and val < 1 for val in [r,g,b]):
|
|
return rgb_to_xy(r,g,b)
|
|
raise ValueError("RGB values must be between 0 and 255 or 0.0 and 1.0")
|
|
|
|
def main():
|
|
app()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|