Skip to content

Pyre

Pyre – Sending messages between applications on a local network

Introduction


This post records my efforts to learn ways to send messages between hosts (nodes, devices) on a local network (LAN). It demonstrates attempts to learn software that might be useful in implementing capabilities that enable the IoT (Internet of things).

pyre is a Python implementation of zyre.

A brief description of pyre:

“Pyre - an open-source framework for proximity-based peer-to-peer applications – Pyre does local area discovery and clustering. A Pyre node broadcasts UDP beacons, and connects to peers that it finds. This class wraps a Pyre node with a message-based API.”

1.1 Requirements and what it runs on

It requires Python and pyre. pyre requires pyzmq.

I’ve tested the attached scripts with both Python 2.7 and Python 3.x.

I’ve run the dealer and clients on a Raspberry Pi (running Raspbian GNU/Linux 8 (jessie)), a laptop (running Ubuntu 17.10 GNU/Linux), and a desktop machine (also running Ubuntu 17.10 GNU/Linux).

So, that gives us some breadth of platforms on which to run pyre. You will have to try it in order to learn whether it runs on your device.

2 Installing pyre


I use virtualenv and virtualenvwrapper. See: https://virtualenvwrapper.readthedocs.io/en/latest/

Then, install pyre with this:

1
2
3
pip install pyre-zeromq
# OR
pip install https://github.com/zeromq/pyre/archive/master.zip

I normally use pyre under Python 3, but it seems to work fine under older Python 2.7, also.

3 Sample code


There is some sample code:

  • The dealer/server
      1
      2
      3
      4
      5
      6
      7
      8
      9
     10
     11
     12
     13
     14
     15
     16
     17
     18
     19
     20
     21
     22
     23
     24
     25
     26
     27
     28
     29
     30
     31
     32
     33
     34
     35
     36
     37
     38
     39
     40
     41
     42
     43
     44
     45
     46
     47
     48
     49
     50
     51
     52
     53
     54
     55
     56
     57
     58
     59
     60
     61
     62
     63
     64
     65
     66
     67
     68
     69
     70
     71
     72
     73
     74
     75
     76
     77
     78
     79
     80
     81
     82
     83
     84
     85
     86
     87
     88
     89
     90
     91
     92
     93
     94
     95
     96
     97
     98
     99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    #!/usr/bin/env python
    
    """
    usage: pyre_dealer01.py [-h] [-c COUNT] [-m MESSAGE] [-v]
    
    synopsis:
      a simple Pyre dealer/server
    
    optional arguments:
      -h, --help            show this help message and exit
      -c COUNT, --count COUNT
                            number of messages to send. Default: 2
      -m MESSAGE, --message MESSAGE
                            a message body to be sent
      -v, --verbose         Print messages during actions.
    
    examples:
      python dealer01.py
      python dealer01.py --count=4
      python dealer01.py -c 4 --message="a simple message"
    """
    
    
    #
    # imports
    from __future__ import print_function
    import sys
    import argparse
    import socket
    import json
    import pyre
    
    if sys.version_info.major == 2:
        input = raw_input
    
    
    #
    # Global variables
    
    Node_name = socket.gethostname()
    Node_value = '{} server'.format(Node_name)
    Group_name = 'group1'
    
    
    #
    # Private functions
    
    def dbg_msg(options, msg):
        """Print a message if verbose is on."""
        if options.verbose:
            print(msg)
    
    
    #
    # Exported functions
    
    def run(opts):
        node = pyre.Pyre(Node_name)
        node.set_header(Node_name, Node_value)
        node.start()
        #node.join(Group_name)
        for idx in range(opts.count):
            response = input('{}. Press Enter.  "q"/"quit" to leave: '.format(idx))
            if response in ('q', 'quit'):
                break
            msg = '{}. {}'.format(idx, opts.message)
            body = {'host': Node_name, 'data': msg}
            bodys = json.dumps(body)
            print('sending: "{}"'.format(bodys))
            node.shouts(Group_name, bodys)
        node.leave(Group_name)
        node.stop()
    
    
    def main():
        description = """\
    synopsis:
      a simple Pyre dealer/server
    """
        epilog = """\
    examples:
      python dealer01.py
      python dealer01.py --count=4
      python dealer01.py -c 4 --message="a simple message"
    """
        parser = argparse.ArgumentParser(
            description=description,
            epilog=epilog,
            formatter_class=argparse.RawDescriptionHelpFormatter,
        )
    #     parser.add_argument(
    #         "command",
    #         help="A command, one of: getlist, getone, add, update, delete"
    #     )
        parser.add_argument(
            "-c", "--count",
            type=int, default=2,
            help="number of messages to send.  Default: 2",
        )
        parser.add_argument(
            "-m", "--message",
            default='default message',
            help="a message body to be sent",
        )
        parser.add_argument(
            "-v", "--verbose",
            action="store_true",
            help="Print messages during actions.",
        )
        options = parser.parse_args()
        run(options)
    
    
    if __name__ == '__main__':
        # import pdb; pdb.set_trace()
        # import ipdb; ipdb.set_trace()
        main()
    
  • The client
      1
      2
      3
      4
      5
      6
      7
      8
      9
     10
     11
     12
     13
     14
     15
     16
     17
     18
     19
     20
     21
     22
     23
     24
     25
     26
     27
     28
     29
     30
     31
     32
     33
     34
     35
     36
     37
     38
     39
     40
     41
     42
     43
     44
     45
     46
     47
     48
     49
     50
     51
     52
     53
     54
     55
     56
     57
     58
     59
     60
     61
     62
     63
     64
     65
     66
     67
     68
     69
     70
     71
     72
     73
     74
     75
     76
     77
     78
     79
     80
     81
     82
     83
     84
     85
     86
     87
     88
     89
     90
     91
     92
     93
     94
     95
     96
     97
     98
     99
    100
    101
    102
    103
    104
    105
    #!/usr/bin/env python
    
    """
    usage: pyre_client01.py [-h]
    
    synopsis:
      A simple Pyre client
    
    optional arguments:
      -h, --help  show this help message and exit
    
    examples:
      python client01.py
    """
    
    #
    # imports
    from __future__ import print_function
    import sys
    import argparse
    import pyre
    
    if sys.version_info.major == 2:
        input = raw_input
    
    
    #
    # Global variables
    
    Node_name = 'rook'
    Node_value = 'rook client'
    Group_name = 'group1'
    
    
    #
    # Private functions
    
    def dbg_msg(options, msg):
        """Print a message if verbose is on."""
        if options.verbose:
            print(msg)
    
    #
    # Exported functions
    
    def run(opts):
        node = pyre.Pyre(Node_name)
        node.set_header(Node_name, Node_value)
        node.start()
        node.join(Group_name)
        idx = 0
        while True:
            try:
                print('ready to receive')
                while True:
                    data = node.recv()
                    if data[0] == b'SHOUT':
                        break
                print('{}. data: {}'.format(idx, data))
                response = input('press Enter or "q"/"quit" to leave: ')
                if response in ('q', 'quit'):
                    break
                idx += 1
            except KeyboardInterrupt:
                break
        print('\ncleaning up and leaving')
        node.leave(Group_name)
        node.stop()
    
    
    def main():
        description = """\
    synopsis:
      A simple Pyre client
    """
        epilog = """\
    examples:
      python client01.py
    """
        parser = argparse.ArgumentParser(
            description=description,
            epilog=epilog,
            formatter_class=argparse.RawDescriptionHelpFormatter,
        )
    #    parser.add_argument(
    #        "command",
    #        help="A command, one of: getlist, getone, add, update, delete"
    #    )
    #    parser.add_argument(
    #        "-a", "--args",
    #        help="Arguments: (1) task name or (2) form parameters"
    #    )
    #    parser.add_argument(
    #        "-v", "--verbose",
    #        action="store_true",
    #        help="Print messages during actions.",
    #    )
        options = parser.parse_args()
        run(options)
    
    
    if __name__ == '__main__':
        # import pdb; pdb.set_trace()
        # import ipdb; ipdb.set_trace()
        main()
    

You should be able to start the client on several hosts in your local network, and then start the dealer/server to send messages to those clients. You will need to repeatedly press Enter on the clients and the dealer to send each individual message.

4 Notes about the sample code


4.1 The dealer

pyre_dealer01.py creates a pyre node and sets the header.

It then sends messages to members of the group. The message content and the number of messages sent depend on command line options.

The body of the message (the “payload”) is a Python dictionary that was converted to JSON.

Because that payload is a Python string, we send it with node.shouts(‘group1’, string_payload). If the payload were a byte array (Python type bytes), we would send it using node.shout(‘group1’, bytes_payload).

4.2 The client

pyre_client01.py creates a pyre node, the joins the group “group1”.

The client uses node.recv() to wait for and receive messages.

Each message is a list. The first item in the list is the message type.

The client skips messages until it receives a message whose type is b’SHOUT’, which is a message to the group.

It prints a message and then waits for the user to press Enter or enter a “q” to quit.

5 Hints and suggestions


5.1 Sending messages

Summary:

  • To send messages to a group, use node.shout(‘group1’, bytes) or node.shouts(‘group1’, string). This assumes that each peer that is to receive the message has done node.join(‘group1’).
  • To send messages to a specific node, use node.whisper(uuid, bytes) or node.whispers(uuid, string). You can use node.peers() to get a list of the UUIDs of active peers
  • To send messages that are byte arrays, use node.whisper(uuid, byte_array) or node.shout(group, byte_array).
  • To send messages that are strings, use node.whispers(uuid, message_string) or node.shouts(group, message_string).

5.2 Peers and UUIDs

Start a peer with the following:

1
2
3
4
node = pyre.Pyre()
node.set_header('description', 'peer on host apple')
node.set_header('hostname', 'apple')
node.start()

Once peers have been started, then on another host machine, you can get a list of the UUIDs of all the peers that have been started with the following:

1
peers = node.peers()

6 Another example: request and response


This example, a “dealer” distributes requests across the available pyre clients.

Here is the code:

  • The dealer/server

      1
      2
      3
      4
      5
      6
      7
      8
      9
     10
     11
     12
     13
     14
     15
     16
     17
     18
     19
     20
     21
     22
     23
     24
     25
     26
     27
     28
     29
     30
     31
     32
     33
     34
     35
     36
     37
     38
     39
     40
     41
     42
     43
     44
     45
     46
     47
     48
     49
     50
     51
     52
     53
     54
     55
     56
     57
     58
     59
     60
     61
     62
     63
     64
     65
     66
     67
     68
     69
     70
     71
     72
     73
     74
     75
     76
     77
     78
     79
     80
     81
     82
     83
     84
     85
     86
     87
     88
     89
     90
     91
     92
     93
     94
     95
     96
     97
     98
     99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    #!/usr/bin/env python
    
    """
    usage: pyre_dealer05.py [-h] [-c COUNT] [-m MESSAGE] [-v]
    
    synopsis:
      a simple Pyre dealer/server.  Receives response from clients.
    
    optional arguments:
      -h, --help            show this help message and exit
      -c COUNT, --count COUNT
                            number of messages to send. Default: 2
      -m MESSAGE, --message MESSAGE
                            a message body to be sent
      -v, --verbose         Print messages during actions.
    
    examples:
      python dealer05.py
      python dealer05.py --count=4
      python dealer05.py -c 4 --message="a simple message"
    """
    
    from __future__ import print_function
    # from six.moves import input
    import argparse
    import socket
    import json
    import time
    import itertools
    import pyre
    import gevent
    import gevent.monkey
    gevent.monkey.patch_socket()
    
    Node_name = socket.gethostname()
    Node_value = '{} server'.format(Node_name)
    Group_name = 'group1'
    
    
    def dbg_print(opts, msg, **kwargs):
        """Print a message if verbose is on."""
        if opts.verbose:
            print(msg, **kwargs)
    
    
    def send_request(node, peer, msg, opts):
        """Send a request."""
        body = {'host': Node_name, 'request': msg}
        bodystr = json.dumps(body)
        dbg_print(opts, 'sending: "{}"'.format(bodystr))
        node.whispers(peer, bodystr)
    
    
    def receive_response(node, opts):
        """Receive a response."""
        while True:
            data = node.recv()
            if data[0] == b'WHISPER':
                break
        dbg_print(opts, 'received response: {}'.format(data))
        return data
    
    
    def run(opts):
        node = pyre.Pyre(Node_name)
        node.set_header(Node_name, Node_value)
        node.start()
        while not node.peers():
            print('No peers.  Waiting.')
            time.sleep(2)
        peers = node.peers()
        dbg_print(opts, 'peers: {}'.format(peers))
        dbg_print(opts, 'sending')
        peer_cycler = itertools.cycle(peers)
        send_tasks = []
        for idx in range(opts.count):
            peer = peer_cycler.__next__()
            msg = '{}. {}'.format(idx, opts.message)
            task = gevent.spawn(send_request, node, peer, msg, opts)
            send_tasks.append(task)
        receive_tasks = []
        for idx in range(opts.count):
            task = gevent.spawn(receive_response, node, opts)
            receive_tasks.append(task)
        # gevent.joinall(list(itertools.chain(send_tasks, receive_tasks)))
        dbg_print(opts, 'before join send_tasks')
        gevent.joinall(send_tasks)
        dbg_print(opts, 'after join send_tasks')
        print('-' * 50)
        for task in gevent.iwait(receive_tasks):
            data1 = task.value
            data2 = data1[3]
            data2 = json.loads(data2)
            print('sent: "{}"  received from {}: "{}"'.format(
                data2['request'], data2['sender'], data2['response']))
        print('-' * 50)
        node.stop()
    
    
    def main():
        description = """\
    synopsis:
      a simple Pyre dealer/server.  Receives response from clients.
    """
        epilog = """\
    examples:
      python dealer05.py
      python dealer05.py --count=4
      python dealer05.py -c 4 --message="a simple message"
    """
        parser = argparse.ArgumentParser(
            description=description,
            epilog=epilog,
            formatter_class=argparse.RawDescriptionHelpFormatter,
        )
    #     parser.add_argument(
    #         "command",
    #         help="A command, one of: getlist, getone, add, update, delete"
    #     )
        parser.add_argument(
            "-c", "--count",
            type=int, default=2,
            help="number of messages to send.  Default: 2",
        )
        parser.add_argument(
            "-m", "--message",
            default='default message',
            help="a message body to be sent",
        )
    #     parser.add_argument(
    #         "-g", "--send-with-gevent",
    #         action="store_true",
    #         help="Send messages/requests with gevent tasks",
    #     )
        parser.add_argument(
            "-v", "--verbose",
            action="store_true",
            help="Print messages during actions.",
        )
        options = parser.parse_args()
        run(options)
    
    
    if __name__ == '__main__':
        # import pdb; pdb.set_trace()
        # import ipdb; ipdb.set_trace()
        main()
    

  • The client

      1
      2
      3
      4
      5
      6
      7
      8
      9
     10
     11
     12
     13
     14
     15
     16
     17
     18
     19
     20
     21
     22
     23
     24
     25
     26
     27
     28
     29
     30
     31
     32
     33
     34
     35
     36
     37
     38
     39
     40
     41
     42
     43
     44
     45
     46
     47
     48
     49
     50
     51
     52
     53
     54
     55
     56
     57
     58
     59
     60
     61
     62
     63
     64
     65
     66
     67
     68
     69
     70
     71
     72
     73
     74
     75
     76
     77
     78
     79
     80
     81
     82
     83
     84
     85
     86
     87
     88
     89
     90
     91
     92
     93
     94
     95
     96
     97
     98
     99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    #!/usr/bin/env python
    
    """
    usage: pyre_client05.py [-h] [-v] [-d DELAY]
    
    synopsis:
      A simple Pyre client.  Sends response to dealer.
    
    optional arguments:
      -h, --help            show this help message and exit
      -v, --verbose         print messages during actions.
      -d DELAY, --delay DELAY
                            delay/sleep and pretend to work for x seconds
    
    examples:
      python pyre_client05.py
    """
    
    from __future__ import print_function
    #from six.moves import input
    #import sys
    import argparse
    import socket
    import uuid
    import json
    import time
    import pyre
    
    
    Node_name = socket.gethostname()
    Node_value = '{} server'.format(Node_name)
    Group_name = 'group1'
    
    
    def dbg_print(opts, msg, **kwargs):
        """Print a message if verbose is on."""
        if opts.verbose:
            print(msg, **kwargs)
    
    
    def run(opts):
        node = pyre.Pyre(Node_name)
        node.set_header(Node_name, Node_value)
        node.start()
        node.join(Group_name)
        idx = 0
        while True:
            try:
                print('ready to receive')
                while True:
                    data = node.recv()
                    if data[0] == b'WHISPER':
                        break
                dbg_print(opts, '{}. received data: {}'.format(idx, data))
                # Send a response back to the sender.
                # Get the sender's uuid.
                time.sleep(opts.delay)
                uuid1 = data[1]
                uuid2 = uuid.UUID(bytes=uuid1)
                data1 = data[3]
                data1 = data1.decode()
                data2 = json.loads(data1)
                data3 = data2['request']
                #
                # Perform a task and create a response to the "request".
                data4 = data3.upper()
                data2['response'] = data4
                data2['sender'] = Node_name
                data5 = json.dumps(data2)
                node.whispers(uuid2, data5)
                print('{}. sending response: {}\n----------'.format(
                    idx, data4))
                idx += 1
            except KeyboardInterrupt:
                break
        print('\ncleaning up and leaving')
        node.leave(Group_name)
        node.stop()
    
    
    def main():
        description = """\
    synopsis:
      A simple Pyre client.  Sends response to dealer.
    """
        epilog = """\
    examples:
      python pyre_client05.py
    """
        parser = argparse.ArgumentParser(
            description=description,
            epilog=epilog,
            formatter_class=argparse.RawDescriptionHelpFormatter,
        )
        parser.add_argument(
            "-v", "--verbose",
            action="store_true",
            help="print messages during actions.",
        )
        parser.add_argument(
            "-d", "--delay",
            type=float, default=0.0,
            help="delay/sleep and pretend to work for x seconds",
        )
    #    parser.add_argument(
    #        "command",
    #        help="A command, one of: getlist, getone, add, update, delete"
    #    )
    #    parser.add_argument(
    #        "-a", "--args",
    #        help="Arguments: (1) task name or (2) form parameters"
    #    )
        options = parser.parse_args()
        run(options)
    
    
    if __name__ == '__main__':
        # import pdb; pdb.set_trace()
        # import ipdb; ipdb.set_trace()
        main()
    

Notes:

  • The dealer uses a simple round-robin strategy to distribute messages across available clients. In a real-world use case, it is likely that we’d need something more complex, in particular where some clients are over-loaded or some requests take longer to process.
  • In order to return a response, the client needs the UUID of the sender of the message/request. That information is automatically packed into the message by pyre. The client only needs to pick that information out of the message, then convert it from Python bytes (text) into a UUID.
  • The dealer uses the Python json module to encode the request. The client uses json to decode the request and then to encode its response. The request and response are Python dictionaries.
  • The dealer uses gevent to wait on the responses. I’m not sure that this is needed, because, after all, we are distributing load across clients. In a really application, you would have to work on this part to get the behavior you want.
  • The dealer decodes each response using the Python json module and prints out information from each response.