Clients

The following are simple examples of asynchronous KNX clients using this project code.

USB HID

class knx_stack.client.usbhid.Client(ip: str, port: int, state: knx_stack.state.State, send_msgs: Iterable[NamedTuple])

A minimal asynchronous USB HID KNX Client.

It connects to a USB HID daemon listening on port 5555.

First run the server: a simple daemon running only on a Linux system. Execute the following command where the USB HID device is attached specifying the right hidraw device:

# knxstack-usbhid-daemon --dev-hidraw /dev/hidrawX

Parameters
  • ip – str, host where knxstack-usbhid-daemon is running

  • port – int, port where knxstack-usbhid-daemon is binded (5555 by default)

  • state – knx_stack.State, setup of all needed KNX tables

  • send_msgs – a list of messages to be sent on KNX bus

Example:

Turn on and off a light and read a property from its KNX device:

if __name__ == '__main__':

    address_table = knx_stack.layer.AddressTable(knx_stack.Address(0x100A), [], 255)
    association_table = knx_stack.layer.AssociationTable(address_table, {})
    asap_stack_instance = knx_stack.ASAP(1, "this stack instance identifier on bus")
    asap_command = knx_stack.ASAP(2, "turn on/off light")
    association_table.associate(asap_stack_instance, [knx_stack.Address(0x1029)])
    association_table.associate(asap_command, [knx_stack.GroupAddress(free_style=0x0F81)])
    state = knx_stack.State(knx_stack.Medium.usb_hid,
                            association_table,
                            knx_stack.GroupObjectTable({asap_command: knx_stack.datapointtypes.DPT_Switch}))

    msgs = list()

    switch_on = knx_stack.datapointtypes.DPT_Switch()
    switch_on.bits.action = knx_stack.datapointtypes.DPT_Switch.Action.on
    msgs.append(knx_stack.layer.application.a_group_value_write.req.Msg(asap=asap_command, dpt=switch_on))

    switch_off = knx_stack.datapointtypes.DPT_Switch()
    switch_off.bits.action = knx_stack.datapointtypes.DPT_Switch.Action.off
    msgs.append(knx_stack.layer.application.a_group_value_write.req.Msg(asap=asap_command, dpt=switch_off))

    msgs.append(knx_stack.layer.application.a_property_value_read.req.Msg(asap=0,
                                                                          object_index=0x01,
                                                                          property_id=0xC9,
                                                                          number_of_elements=1,
                                                                          start_index=0x01))

    client = knx_stack.client.usbhid.Client('172.31.11.251', 5555, state, msgs)
    client.run()
run()

KNXnet IP Tunneling Client

class knx_stack.client.knxnet_ip.Tunneling(local_addr: str, local_port: int, remote_addr: str, remote_port: int, state: knx_stack.state.State, msgs: Iterable[NamedTuple])

A minimal asynchronous KNXnet IP Tunneling Client.

Parameters
  • local_addr – tunneling running host ip address

  • local_port – tunneling running port bind

  • remote_addr – knxnet ip gateway ip address

  • remote_port – knxnet ip gateway listening port

Example:

Turn on and off a light:

async def start_tunneling(local_addr: str, local_port: int, remote_addr: str, remote_port: int,
                          state: knx_stack.knxnet_ip.State, msgs: Iterable[NamedTuple]):
    transport, protocol = await loop.create_datagram_endpoint(
            lambda: Tunneling(local_addr, local_port, remote_addr, remote_port, state, msgs),
            local_addr=(local_addr, local_port),
            remote_addr=(remote_addr, remote_port))
    return transport, protocol


if __name__ == '__main__':
    import sys

    root = logging.getLogger()
    root.setLevel(logging.INFO)
    handler = logging.StreamHandler(sys.stdout)
    root.addHandler(handler)

    address_table = knx_stack.AddressTable(knx_stack.Address(0x1004), [], 255)
    association_table = knx_stack.AssociationTable(address_table, {})
    asap_command = knx_stack.ASAP(1, "turn on/off floor light")
    association_table.associate(asap_command, [knx_stack.GroupAddress(free_style=0x0F81)])
    state = knx_stack.knxnet_ip.State(knx_stack.Medium.knxnet_ip, association_table,
                                      knx_stack.GroupObjectTable({asap_command: knx_stack.datapointtypes.DPT_Switch}))

    msgs = list()

    switch_on = knx_stack.datapointtypes.DPT_Switch()
    switch_on.bits.action = knx_stack.datapointtypes.DPT_Switch.Action.on
    msgs.append(knx_stack.layer.application.a_group_value_write.req.Msg(asap=asap_command, dpt=switch_on))

    switch_off = knx_stack.datapointtypes.DPT_Switch()
    switch_off.bits.action = knx_stack.datapointtypes.DPT_Switch.Action.off
    msgs.append(knx_stack.layer.application.a_group_value_write.req.Msg(asap=asap_command, dpt=switch_off))

    loop = asyncio.get_event_loop()
    transport, protocol = loop.run_until_complete(loop.create_task(start_tunneling('172.31.10.111', 5544, '172.31.10.250', 3671,
                                                                                   state, msgs)))
    loop.run_until_complete(loop.create_task(protocol.writer()))

    transport.close()
connection_made(transport)

Called when a connection is made.

The argument is the transport representing the pipe connection. To receive data, wait for data_received() calls. When the connection is closed, connection_lost() is called.

connection_lost(exc)

Called when the connection is lost or closed.

The argument is an exception object or None (the latter meaning a regular EOF is received or the connection was aborted or closed).

error_received(exc)

Called when a send or receive operation raises an OSError.

(Other than BlockingIOError or InterruptedError.)

send(msg)
datagram_received(data, addr)

Called when some datagram is received.

async writer()
async knx_stack.client.knxnet_ip.start_tunneling(local_addr: str, local_port: int, remote_addr: str, remote_port: int, state: knx_stack.definition.knxnet_ip.state.State, msgs: Iterable[NamedTuple])

KNXnet IP Discovery Request/Listen services

A simple discovery script, able to discover KNXnet IP gateways, is already shipped with this package.

When calling the script specify your ip address:

python3 -m knx_stack.client.knxnet_ip_discovery 172.31.10.111
class knx_stack.client.knxnet_ip_discovery.Request(local_addr: str, local_port: int)
connection_made(transport)

Called when a connection is made.

The argument is the transport representing the pipe connection. To receive data, wait for data_received() calls. When the connection is closed, connection_lost() is called.

connection_lost(exc)

Called when the connection is lost or closed.

The argument is an exception object or None (the latter meaning a regular EOF is received or the connection was aborted or closed).

error_received(exc)

Called when a send or receive operation raises an OSError.

(Other than BlockingIOError or InterruptedError.)

datagram_received(data, addr)

Called when some datagram is received.

class knx_stack.client.knxnet_ip_discovery.Listen

A KNXnet IP Discovery listener service

Parameters
  • local_addr – discovery listener instance host ip address

  • local_port – discovery listener instance binding port

Example:

async def listen_discovery_responses(local_addr: str, local_port: int):
    transport, protocol = await loop.create_datagram_endpoint(
        lambda: Listen(), local_addr=(local_addr, local_port),
    )
    return transport, protocol

if __name__ == '__main__':
    import sys

    root = logging.getLogger()
    root.setLevel(logging.DEBUG)
    handler = logging.StreamHandler(sys.stdout)
    root.addHandler(handler)

    loop = asyncio.get_event_loop()
    transport1, _ = loop.run_until_complete(loop.create_task(listen_discovery_responses('172.31.10.111', 5544)))
    transport2, _ = loop.run_until_complete(loop.create_task(send_discovery_request('172.31.10.111', 5544)))

    try:
        loop.run_forever()
    except KeyboardInterrupt:
        pass
    print("Closing transport...")
    transport1.close()
    transport2.close()
    loop.close()
connection_made(transport)

Called when a connection is made.

The argument is the transport representing the pipe connection. To receive data, wait for data_received() calls. When the connection is closed, connection_lost() is called.

connection_lost(exc)

Called when the connection is lost or closed.

The argument is an exception object or None (the latter meaning a regular EOF is received or the connection was aborted or closed).

error_received(exc)

Called when a send or receive operation raises an OSError.

(Other than BlockingIOError or InterruptedError.)

datagram_received(data, addr)

Called when some datagram is received.

async knx_stack.client.knxnet_ip_discovery.send_discovery_request(local_addr: str, local_port: int)
async knx_stack.client.knxnet_ip_discovery.listen_discovery_responses(local_addr: str, local_port: int)