Clients¶
The following are simple examples of asynchronous KNX clients using this project code.
USB HID¶
- class knx_stack.client.usbhid.Client(ip: str, port: int, state: knx_stack.state.State, send_msgs: Iterable[NamedTuple])¶
A minimal asynchronous USB HID KNX Client.
It connects to a USB HID daemon listening on port 5555.
First run the server: a simple daemon running only on a Linux system. Execute the following command where the USB HID device is attached specifying the right hidraw device:
# knxstack-usbhid-daemon --dev-hidraw /dev/hidrawX
- Parameters
ip – str, host where knxstack-usbhid-daemon is running
port – int, port where knxstack-usbhid-daemon is binded (5555 by default)
state – knx_stack.State, setup of all needed KNX tables
send_msgs – a list of messages to be sent on KNX bus
- Example:
Turn on and off a light and read a property from its KNX device:
if __name__ == '__main__': address_table = knx_stack.layer.AddressTable(knx_stack.Address(0x100A), [], 255) association_table = knx_stack.layer.AssociationTable(address_table, {}) asap_stack_instance = knx_stack.ASAP(1, "this stack instance identifier on bus") asap_command = knx_stack.ASAP(2, "turn on/off light") association_table.associate(asap_stack_instance, [knx_stack.Address(0x1029)]) association_table.associate(asap_command, [knx_stack.GroupAddress(free_style=0x0F81)]) state = knx_stack.State(knx_stack.Medium.usb_hid, association_table, knx_stack.GroupObjectTable({asap_command: knx_stack.datapointtypes.DPT_Switch})) msgs = list() switch_on = knx_stack.datapointtypes.DPT_Switch() switch_on.bits.action = knx_stack.datapointtypes.DPT_Switch.Action.on msgs.append(knx_stack.layer.application.a_group_value_write.req.Msg(asap=asap_command, dpt=switch_on)) switch_off = knx_stack.datapointtypes.DPT_Switch() switch_off.bits.action = knx_stack.datapointtypes.DPT_Switch.Action.off msgs.append(knx_stack.layer.application.a_group_value_write.req.Msg(asap=asap_command, dpt=switch_off)) msgs.append(knx_stack.layer.application.a_property_value_read.req.Msg(asap=0, object_index=0x01, property_id=0xC9, number_of_elements=1, start_index=0x01)) client = knx_stack.client.usbhid.Client('172.31.11.251', 5555, state, msgs) client.run()
- run()¶
KNXnet IP Tunneling Client¶
- class knx_stack.client.knxnet_ip.Tunneling(local_addr: str, local_port: int, remote_addr: str, remote_port: int, state: knx_stack.state.State, msgs: Iterable[NamedTuple])¶
A minimal asynchronous KNXnet IP Tunneling Client.
- Parameters
local_addr – tunneling running host ip address
local_port – tunneling running port bind
remote_addr – knxnet ip gateway ip address
remote_port – knxnet ip gateway listening port
- Example:
Turn on and off a light:
async def start_tunneling(local_addr: str, local_port: int, remote_addr: str, remote_port: int, state: knx_stack.knxnet_ip.State, msgs: Iterable[NamedTuple]): transport, protocol = await loop.create_datagram_endpoint( lambda: Tunneling(local_addr, local_port, remote_addr, remote_port, state, msgs), local_addr=(local_addr, local_port), remote_addr=(remote_addr, remote_port)) return transport, protocol if __name__ == '__main__': import sys root = logging.getLogger() root.setLevel(logging.INFO) handler = logging.StreamHandler(sys.stdout) root.addHandler(handler) address_table = knx_stack.AddressTable(knx_stack.Address(0x1004), [], 255) association_table = knx_stack.AssociationTable(address_table, {}) asap_command = knx_stack.ASAP(1, "turn on/off floor light") association_table.associate(asap_command, [knx_stack.GroupAddress(free_style=0x0F81)]) state = knx_stack.knxnet_ip.State(knx_stack.Medium.knxnet_ip, association_table, knx_stack.GroupObjectTable({asap_command: knx_stack.datapointtypes.DPT_Switch})) msgs = list() switch_on = knx_stack.datapointtypes.DPT_Switch() switch_on.bits.action = knx_stack.datapointtypes.DPT_Switch.Action.on msgs.append(knx_stack.layer.application.a_group_value_write.req.Msg(asap=asap_command, dpt=switch_on)) switch_off = knx_stack.datapointtypes.DPT_Switch() switch_off.bits.action = knx_stack.datapointtypes.DPT_Switch.Action.off msgs.append(knx_stack.layer.application.a_group_value_write.req.Msg(asap=asap_command, dpt=switch_off)) loop = asyncio.get_event_loop() transport, protocol = loop.run_until_complete(loop.create_task(start_tunneling('172.31.10.111', 5544, '172.31.10.250', 3671, state, msgs))) loop.run_until_complete(loop.create_task(protocol.writer())) transport.close()
- connection_made(transport)¶
Called when a connection is made.
The argument is the transport representing the pipe connection. To receive data, wait for data_received() calls. When the connection is closed, connection_lost() is called.
- connection_lost(exc)¶
Called when the connection is lost or closed.
The argument is an exception object or None (the latter meaning a regular EOF is received or the connection was aborted or closed).
- error_received(exc)¶
Called when a send or receive operation raises an OSError.
(Other than BlockingIOError or InterruptedError.)
- send(msg)¶
- datagram_received(data, addr)¶
Called when some datagram is received.
- async writer()¶
- async knx_stack.client.knxnet_ip.start_tunneling(local_addr: str, local_port: int, remote_addr: str, remote_port: int, state: knx_stack.definition.knxnet_ip.state.State, msgs: Iterable[NamedTuple])¶
KNXnet IP Discovery Request/Listen services¶
A simple discovery script, able to discover KNXnet IP gateways, is already shipped with this package.
When calling the script specify your ip address:
python3 -m knx_stack.client.knxnet_ip_discovery 172.31.10.111
- class knx_stack.client.knxnet_ip_discovery.Request(local_addr: str, local_port: int)¶
- connection_made(transport)¶
Called when a connection is made.
The argument is the transport representing the pipe connection. To receive data, wait for data_received() calls. When the connection is closed, connection_lost() is called.
- connection_lost(exc)¶
Called when the connection is lost or closed.
The argument is an exception object or None (the latter meaning a regular EOF is received or the connection was aborted or closed).
- error_received(exc)¶
Called when a send or receive operation raises an OSError.
(Other than BlockingIOError or InterruptedError.)
- datagram_received(data, addr)¶
Called when some datagram is received.
- class knx_stack.client.knxnet_ip_discovery.Listen¶
A KNXnet IP Discovery listener service
- Parameters
local_addr – discovery listener instance host ip address
local_port – discovery listener instance binding port
Example:
async def listen_discovery_responses(local_addr: str, local_port: int): transport, protocol = await loop.create_datagram_endpoint( lambda: Listen(), local_addr=(local_addr, local_port), ) return transport, protocol if __name__ == '__main__': import sys root = logging.getLogger() root.setLevel(logging.DEBUG) handler = logging.StreamHandler(sys.stdout) root.addHandler(handler) loop = asyncio.get_event_loop() transport1, _ = loop.run_until_complete(loop.create_task(listen_discovery_responses('172.31.10.111', 5544))) transport2, _ = loop.run_until_complete(loop.create_task(send_discovery_request('172.31.10.111', 5544))) try: loop.run_forever() except KeyboardInterrupt: pass print("Closing transport...") transport1.close() transport2.close() loop.close()
- connection_made(transport)¶
Called when a connection is made.
The argument is the transport representing the pipe connection. To receive data, wait for data_received() calls. When the connection is closed, connection_lost() is called.
- connection_lost(exc)¶
Called when the connection is lost or closed.
The argument is an exception object or None (the latter meaning a regular EOF is received or the connection was aborted or closed).
- error_received(exc)¶
Called when a send or receive operation raises an OSError.
(Other than BlockingIOError or InterruptedError.)
- datagram_received(data, addr)¶
Called when some datagram is received.
- async knx_stack.client.knxnet_ip_discovery.send_discovery_request(local_addr: str, local_port: int)¶
- async knx_stack.client.knxnet_ip_discovery.listen_discovery_responses(local_addr: str, local_port: int)¶