256 lines
6.6 KiB
Python
256 lines
6.6 KiB
Python
import importlib.metadata
|
|
import os
|
|
from enum import Enum
|
|
from typing import NamedTuple
|
|
from typing_extensions import Annotated
|
|
import typer
|
|
import json
|
|
import paho.mqtt.publish as pub
|
|
import paho.mqtt.subscribe as sub
|
|
|
|
|
|
class LightState(str, Enum):
|
|
on = "off"
|
|
off = "on"
|
|
|
|
|
|
class XYColor(NamedTuple):
|
|
x: float
|
|
y: float
|
|
|
|
|
|
NAME_TO_ID = {
|
|
"bedroom": "0x001788010d1b599a",
|
|
"hallway": "0x001788010d253b99",
|
|
"office": "0x001788010e371aa4",
|
|
"all": "all_lights",
|
|
"infuse": "infuse_group",
|
|
"living_room": "living_room",
|
|
}
|
|
COLOR_MAP = {
|
|
"red": (0.6942, 0.2963),
|
|
"blue": (0.1355, 0.0399),
|
|
"green": (0.1704, 0.709),
|
|
"yellow": (0.4339, 0.5008),
|
|
"coolest": (0.3131, 0.3232),
|
|
"cool": (0.3804, 0.3767),
|
|
"neutral": (0.4599, 0.4106),
|
|
"warm": (0.5056, 0.4152),
|
|
"warmest": (0.5267, 0.4133),
|
|
}
|
|
MQTT_BROKER = "ha1"
|
|
|
|
|
|
class LightID(str, Enum):
|
|
bedroom = "bedroom"
|
|
hallway = "hallway"
|
|
office = "office"
|
|
all = "all"
|
|
infuse = "infuse"
|
|
livingroom = "living_room"
|
|
|
|
|
|
def get_mqtt_broker() -> str:
|
|
if "HUECLI_BROKER" in os.environ:
|
|
return os.environ["HUECLI_BROKER"]
|
|
else:
|
|
return MQTT_BROKER
|
|
|
|
|
|
def complete_color(incomplete: str) -> list[str]:
|
|
completion = []
|
|
for color in COLOR_MAP:
|
|
if color.startswith(incomplete):
|
|
completion.append(color)
|
|
return completion
|
|
|
|
|
|
def complete_id(incomplete: str) -> list[str]:
|
|
completion = []
|
|
for id in NAME_TO_ID:
|
|
if id.startswith(incomplete):
|
|
completion.append(id)
|
|
return completion
|
|
|
|
|
|
def complete_state(incomplete: str) -> list[str]:
|
|
states = ["on", "off"]
|
|
completion = []
|
|
for id in states:
|
|
if id.startswith(incomplete):
|
|
completion.append(id)
|
|
return completion
|
|
|
|
|
|
def rgb_to_xy(red: float, green: float, blue: float) -> XYColor:
|
|
"""conversion of RGB colors to CIE1931 XY colors
|
|
Formulas implemented from: https://gist.github.com/popcorn245/30afa0f98eea1c2fd34d
|
|
|
|
Args:
|
|
red (float): a number between 0.0 and 1.0 representing red in the RGB space
|
|
green (float): a number between 0.0 and 1.0 representing green in the RGB space
|
|
blue (float): a number between 0.0 and 1.0 representing blue in the RGB space
|
|
|
|
Returns:
|
|
xy (list): x and y
|
|
"""
|
|
|
|
# gamma correction
|
|
red = pow((red + 0.055) / (1.0 + 0.055), 2.4) if red > 0.04045 else (red / 12.92)
|
|
green = (
|
|
pow((green + 0.055) / (1.0 + 0.055), 2.4)
|
|
if green > 0.04045
|
|
else (green / 12.92)
|
|
)
|
|
blue = (
|
|
pow((blue + 0.055) / (1.0 + 0.055), 2.4) if blue > 0.04045 else (blue / 12.92)
|
|
)
|
|
|
|
# convert rgb to xyz
|
|
x = red * 0.649926 + green * 0.103455 + blue * 0.197109
|
|
y = red * 0.234327 + green * 0.743075 + blue * 0.022598
|
|
z = green * 0.053077 + blue * 1.035763
|
|
|
|
# convert xyz to xy
|
|
x = x / (x + y + z)
|
|
y = y / (x + y + z)
|
|
|
|
# TODO check color gamut if known
|
|
|
|
return XYColor(x, y)
|
|
|
|
|
|
app = typer.Typer(rich_markup_mode=None)
|
|
|
|
|
|
@app.command()
|
|
@app.command("c", hidden=True)
|
|
def set_color(
|
|
id: Annotated[
|
|
LightID, typer.Argument(help="ID of light.", autocompletion=complete_id)
|
|
],
|
|
color: Annotated[
|
|
str, typer.Argument(help="Color to set", autocompletion=complete_color)
|
|
],
|
|
) -> None:
|
|
"""
|
|
Set the color of ID to COLOR where COLOR is either a known color value,
|
|
or a comma separated RGB value, like "0.1,0.2,0.3".
|
|
"""
|
|
str_id = NAME_TO_ID[id]
|
|
topic = f"zigbee2mqtt/{str_id}/set"
|
|
xy = parse_color(color)
|
|
payload_raw = {
|
|
"color": {"x": xy[0], "y": xy[1]},
|
|
"color_mode": "xy",
|
|
}
|
|
payload = json.dumps(payload_raw)
|
|
pub.single(topic, payload, hostname=get_mqtt_broker())
|
|
|
|
|
|
@app.command()
|
|
@app.command("s", hidden=True)
|
|
def set_state(
|
|
id: Annotated[
|
|
LightID, typer.Argument(help="ID of light.", autocompletion=complete_id)
|
|
],
|
|
state: Annotated[
|
|
LightState,
|
|
typer.Argument(help="State of light.", autocompletion=complete_state),
|
|
],
|
|
) -> None:
|
|
"""
|
|
Set the state of ID to STATE
|
|
"""
|
|
str_id = NAME_TO_ID[id]
|
|
topic = f"zigbee2mqtt/{str_id}/set"
|
|
payload = json.dumps({"state": state.value})
|
|
pub.single(topic, payload, hostname=get_mqtt_broker())
|
|
|
|
|
|
@app.command()
|
|
@app.command("b", hidden=True)
|
|
def set_brightness(
|
|
id: Annotated[
|
|
LightID, typer.Argument(help="ID of light.", autocompletion=complete_id)
|
|
],
|
|
brightness: int,
|
|
) -> None:
|
|
"""
|
|
Set brigthness of ID to BRIGHTNESS
|
|
"""
|
|
if brightness < 0 or brightness > 255:
|
|
raise ValueError("Brightness must be between 0 and 255")
|
|
str_id = NAME_TO_ID[id]
|
|
topic = f"zigbee2mqtt/{str_id}/set"
|
|
payload = json.dumps({"brightness": brightness})
|
|
pub.single(topic, payload, hostname=get_mqtt_broker())
|
|
|
|
|
|
@app.command()
|
|
@app.command("get", hidden=True)
|
|
def get_state(
|
|
id: Annotated[
|
|
LightID, typer.Argument(help="ID of light.", autocompletion=complete_id)
|
|
],
|
|
as_json: Annotated[
|
|
bool,
|
|
typer.Option("--json", help="Print state as json."),
|
|
] = False,
|
|
) -> None:
|
|
"""
|
|
Get the state of ID
|
|
"""
|
|
str_id = NAME_TO_ID[id]
|
|
topic = f"zigbee2mqtt/{str_id}/get"
|
|
resp_topic = f"zigbee2mqtt/{str_id}"
|
|
payload = json.dumps({"state": ""})
|
|
pub.single(topic, payload, hostname=get_mqtt_broker())
|
|
resp = sub.simple(resp_topic, hostname=get_mqtt_broker()) # type: ignore[no-untyped-call]
|
|
data = json.loads(resp.payload.decode("utf-8"))
|
|
if as_json:
|
|
print(json.dumps(data))
|
|
else:
|
|
print(f"{data['state']}")
|
|
|
|
|
|
@app.command()
|
|
def version() -> None:
|
|
"""
|
|
Print huecli version
|
|
"""
|
|
version = importlib.metadata.version("huecli")
|
|
typer.echo(f"huecli {version}")
|
|
|
|
|
|
def parse_color(color: str) -> XYColor:
|
|
if color in COLOR_MAP:
|
|
return XYColor(COLOR_MAP[color][0], COLOR_MAP[color][1])
|
|
split = color.split(",")
|
|
match len(split):
|
|
case 2:
|
|
return XYColor(float(split[0]), float(split[1]))
|
|
case 3:
|
|
return _parse_rgb_color(float(split[0]), float(split[1]), float(split[2]))
|
|
case _:
|
|
raise ValueError("Invalid color format")
|
|
|
|
|
|
def _parse_rgb_color(r: float, g: float, b: float) -> XYColor:
|
|
if any(val > 1 for val in [r, g, b]):
|
|
if any(val > 255 for val in [r, g, b]):
|
|
raise ValueError("RGB values must be between 0 and 255 or 0.0 and 1.0")
|
|
vals = [val / 255 for val in [r, g, b]]
|
|
return rgb_to_xy(vals[0], vals[1], vals[2])
|
|
if all(val > 0 and val < 1 for val in [r, g, b]):
|
|
return rgb_to_xy(r, g, b)
|
|
raise ValueError("RGB values must be between 0 and 255 or 0.0 and 1.0")
|
|
|
|
|
|
def main() -> None:
|
|
app()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|