huecli/huecli/__main__.py
2024-11-28 21:52:57 +01:00

256 lines
6.6 KiB
Python

import importlib.metadata
import os
from enum import Enum
from typing import NamedTuple
from typing_extensions import Annotated
import typer
import json
import paho.mqtt.publish as pub
import paho.mqtt.subscribe as sub
class LightState(str, Enum):
on = "off"
off = "on"
class XYColor(NamedTuple):
x: float
y: float
NAME_TO_ID = {
"bedroom": "0x001788010d1b599a",
"hallway": "0x001788010d253b99",
"office": "0x001788010e371aa4",
"all": "all_lights",
"infuse": "infuse_group",
"living_room": "living_room",
}
COLOR_MAP = {
"red": (0.6942, 0.2963),
"blue": (0.1355, 0.0399),
"green": (0.1704, 0.709),
"yellow": (0.4339, 0.5008),
"coolest": (0.3131, 0.3232),
"cool": (0.3804, 0.3767),
"neutral": (0.4599, 0.4106),
"warm": (0.5056, 0.4152),
"warmest": (0.5267, 0.4133),
}
MQTT_BROKER = "ha1"
class LightID(str, Enum):
bedroom = "bedroom"
hallway = "hallway"
office = "office"
all = "all"
infuse = "infuse"
livingroom = "living_room"
def get_mqtt_broker() -> str:
if "HUECLI_BROKER" in os.environ:
return os.environ["HUECLI_BROKER"]
else:
return MQTT_BROKER
def complete_color(incomplete: str) -> list[str]:
completion = []
for color in COLOR_MAP:
if color.startswith(incomplete):
completion.append(color)
return completion
def complete_id(incomplete: str) -> list[str]:
completion = []
for id in NAME_TO_ID:
if id.startswith(incomplete):
completion.append(id)
return completion
def complete_state(incomplete: str) -> list[str]:
states = ["on", "off"]
completion = []
for id in states:
if id.startswith(incomplete):
completion.append(id)
return completion
def rgb_to_xy(red: float, green: float, blue: float) -> XYColor:
"""conversion of RGB colors to CIE1931 XY colors
Formulas implemented from: https://gist.github.com/popcorn245/30afa0f98eea1c2fd34d
Args:
red (float): a number between 0.0 and 1.0 representing red in the RGB space
green (float): a number between 0.0 and 1.0 representing green in the RGB space
blue (float): a number between 0.0 and 1.0 representing blue in the RGB space
Returns:
xy (list): x and y
"""
# gamma correction
red = pow((red + 0.055) / (1.0 + 0.055), 2.4) if red > 0.04045 else (red / 12.92)
green = (
pow((green + 0.055) / (1.0 + 0.055), 2.4)
if green > 0.04045
else (green / 12.92)
)
blue = (
pow((blue + 0.055) / (1.0 + 0.055), 2.4) if blue > 0.04045 else (blue / 12.92)
)
# convert rgb to xyz
x = red * 0.649926 + green * 0.103455 + blue * 0.197109
y = red * 0.234327 + green * 0.743075 + blue * 0.022598
z = green * 0.053077 + blue * 1.035763
# convert xyz to xy
x = x / (x + y + z)
y = y / (x + y + z)
# TODO check color gamut if known
return XYColor(x, y)
app = typer.Typer(rich_markup_mode=None)
@app.command()
@app.command("c", hidden=True)
def set_color(
id: Annotated[
LightID, typer.Argument(help="ID of light.", autocompletion=complete_id)
],
color: Annotated[
str, typer.Argument(help="Color to set", autocompletion=complete_color)
],
) -> None:
"""
Set the color of ID to COLOR where COLOR is either a known color value,
or a comma separated RGB value, like "0.1,0.2,0.3".
"""
str_id = NAME_TO_ID[id]
topic = f"zigbee2mqtt/{str_id}/set"
xy = parse_color(color)
payload_raw = {
"color": {"x": xy[0], "y": xy[1]},
"color_mode": "xy",
}
payload = json.dumps(payload_raw)
pub.single(topic, payload, hostname=get_mqtt_broker())
@app.command()
@app.command("s", hidden=True)
def set_state(
id: Annotated[
LightID, typer.Argument(help="ID of light.", autocompletion=complete_id)
],
state: Annotated[
LightState,
typer.Argument(help="State of light.", autocompletion=complete_state),
],
) -> None:
"""
Set the state of ID to STATE
"""
str_id = NAME_TO_ID[id]
topic = f"zigbee2mqtt/{str_id}/set"
payload = json.dumps({"state": state.value})
pub.single(topic, payload, hostname=get_mqtt_broker())
@app.command()
@app.command("b", hidden=True)
def set_brightness(
id: Annotated[
LightID, typer.Argument(help="ID of light.", autocompletion=complete_id)
],
brightness: int,
) -> None:
"""
Set brigthness of ID to BRIGHTNESS
"""
if brightness < 0 or brightness > 255:
raise ValueError("Brightness must be between 0 and 255")
str_id = NAME_TO_ID[id]
topic = f"zigbee2mqtt/{str_id}/set"
payload = json.dumps({"brightness": brightness})
pub.single(topic, payload, hostname=get_mqtt_broker())
@app.command()
@app.command("get", hidden=True)
def get_state(
id: Annotated[
LightID, typer.Argument(help="ID of light.", autocompletion=complete_id)
],
as_json: Annotated[
bool,
typer.Option("--json", help="Print state as json."),
] = False,
) -> None:
"""
Get the state of ID
"""
str_id = NAME_TO_ID[id]
topic = f"zigbee2mqtt/{str_id}/get"
resp_topic = f"zigbee2mqtt/{str_id}"
payload = json.dumps({"state": ""})
pub.single(topic, payload, hostname=get_mqtt_broker())
resp = sub.simple(resp_topic, hostname=get_mqtt_broker()) # type: ignore[no-untyped-call]
data = json.loads(resp.payload.decode("utf-8"))
if as_json:
print(json.dumps(data))
else:
print(f"{data['state']}")
@app.command()
def version() -> None:
"""
Print huecli version
"""
version = importlib.metadata.version("huecli")
typer.echo(f"huecli {version}")
def parse_color(color: str) -> XYColor:
if color in COLOR_MAP:
return XYColor(COLOR_MAP[color][0], COLOR_MAP[color][1])
split = color.split(",")
match len(split):
case 2:
return XYColor(float(split[0]), float(split[1]))
case 3:
return _parse_rgb_color(float(split[0]), float(split[1]), float(split[2]))
case _:
raise ValueError("Invalid color format")
def _parse_rgb_color(r: float, g: float, b: float) -> XYColor:
if any(val > 1 for val in [r, g, b]):
if any(val > 255 for val in [r, g, b]):
raise ValueError("RGB values must be between 0 and 255 or 0.0 and 1.0")
vals = [val / 255 for val in [r, g, b]]
return rgb_to_xy(vals[0], vals[1], vals[2])
if all(val > 0 and val < 1 for val in [r, g, b]):
return rgb_to_xy(r, g, b)
raise ValueError("RGB values must be between 0 and 255 or 0.0 and 1.0")
def main() -> None:
app()
if __name__ == "__main__":
main()