Pyre
Pyre – Sending messages between applications on a local network¶
Introduction¶
This post records my efforts to learn ways to send messages between hosts (nodes, devices) on a local network (LAN). It demonstrates attempts to learn software that might be useful in implementing capabilities that enable the IoT (Internet of things).
pyre is a Python implementation of zyre.
A brief description of pyre:
“Pyre - an open-source framework for proximity-based peer-to-peer applications – Pyre does local area discovery and clustering. A Pyre node broadcasts UDP beacons, and connects to peers that it finds. This class wraps a Pyre node with a message-based API.”
- You can learn about pyre here: https://github.com/zeromq/pyre.
- And, information about zyre is here: https://github.com/zeromq/zyre.
- Both pyre and zyre are built on top of ZeroMQ. See: http://zeromq.org/ and https://github.com/zeromq.
1.1 Requirements and what it runs on¶
It requires Python and pyre. pyre requires pyzmq.
I’ve tested the attached scripts with both Python 2.7 and Python 3.x.
I’ve run the dealer and clients on a Raspberry Pi (running Raspbian GNU/Linux 8 (jessie)), a laptop (running Ubuntu 17.10 GNU/Linux), and a desktop machine (also running Ubuntu 17.10 GNU/Linux).
So, that gives us some breadth of platforms on which to run pyre. You will have to try it in order to learn whether it runs on your device.
2 Installing pyre¶
I use virtualenv and virtualenvwrapper. See: https://virtualenvwrapper.readthedocs.io/en/latest/
Then, install pyre with this:
1 2 3 | |
I normally use pyre under Python 3, but it seems to work fine under older Python 2.7, also.
3 Sample code¶
There is some sample code:
- The dealer/server
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117
#!/usr/bin/env python """ usage: pyre_dealer01.py [-h] [-c COUNT] [-m MESSAGE] [-v] synopsis: a simple Pyre dealer/server optional arguments: -h, --help show this help message and exit -c COUNT, --count COUNT number of messages to send. Default: 2 -m MESSAGE, --message MESSAGE a message body to be sent -v, --verbose Print messages during actions. examples: python dealer01.py python dealer01.py --count=4 python dealer01.py -c 4 --message="a simple message" """ # # imports from __future__ import print_function import sys import argparse import socket import json import pyre if sys.version_info.major == 2: input = raw_input # # Global variables Node_name = socket.gethostname() Node_value = '{} server'.format(Node_name) Group_name = 'group1' # # Private functions def dbg_msg(options, msg): """Print a message if verbose is on.""" if options.verbose: print(msg) # # Exported functions def run(opts): node = pyre.Pyre(Node_name) node.set_header(Node_name, Node_value) node.start() #node.join(Group_name) for idx in range(opts.count): response = input('{}. Press Enter. "q"/"quit" to leave: '.format(idx)) if response in ('q', 'quit'): break msg = '{}. {}'.format(idx, opts.message) body = {'host': Node_name, 'data': msg} bodys = json.dumps(body) print('sending: "{}"'.format(bodys)) node.shouts(Group_name, bodys) node.leave(Group_name) node.stop() def main(): description = """\ synopsis: a simple Pyre dealer/server """ epilog = """\ examples: python dealer01.py python dealer01.py --count=4 python dealer01.py -c 4 --message="a simple message" """ parser = argparse.ArgumentParser( description=description, epilog=epilog, formatter_class=argparse.RawDescriptionHelpFormatter, ) # parser.add_argument( # "command", # help="A command, one of: getlist, getone, add, update, delete" # ) parser.add_argument( "-c", "--count", type=int, default=2, help="number of messages to send. Default: 2", ) parser.add_argument( "-m", "--message", default='default message', help="a message body to be sent", ) parser.add_argument( "-v", "--verbose", action="store_true", help="Print messages during actions.", ) options = parser.parse_args() run(options) if __name__ == '__main__': # import pdb; pdb.set_trace() # import ipdb; ipdb.set_trace() main() - The client
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105
#!/usr/bin/env python """ usage: pyre_client01.py [-h] synopsis: A simple Pyre client optional arguments: -h, --help show this help message and exit examples: python client01.py """ # # imports from __future__ import print_function import sys import argparse import pyre if sys.version_info.major == 2: input = raw_input # # Global variables Node_name = 'rook' Node_value = 'rook client' Group_name = 'group1' # # Private functions def dbg_msg(options, msg): """Print a message if verbose is on.""" if options.verbose: print(msg) # # Exported functions def run(opts): node = pyre.Pyre(Node_name) node.set_header(Node_name, Node_value) node.start() node.join(Group_name) idx = 0 while True: try: print('ready to receive') while True: data = node.recv() if data[0] == b'SHOUT': break print('{}. data: {}'.format(idx, data)) response = input('press Enter or "q"/"quit" to leave: ') if response in ('q', 'quit'): break idx += 1 except KeyboardInterrupt: break print('\ncleaning up and leaving') node.leave(Group_name) node.stop() def main(): description = """\ synopsis: A simple Pyre client """ epilog = """\ examples: python client01.py """ parser = argparse.ArgumentParser( description=description, epilog=epilog, formatter_class=argparse.RawDescriptionHelpFormatter, ) # parser.add_argument( # "command", # help="A command, one of: getlist, getone, add, update, delete" # ) # parser.add_argument( # "-a", "--args", # help="Arguments: (1) task name or (2) form parameters" # ) # parser.add_argument( # "-v", "--verbose", # action="store_true", # help="Print messages during actions.", # ) options = parser.parse_args() run(options) if __name__ == '__main__': # import pdb; pdb.set_trace() # import ipdb; ipdb.set_trace() main()
You should be able to start the client on several hosts in your local network, and then start the dealer/server to send messages to those clients. You will need to repeatedly press Enter on the clients and the dealer to send each individual message.
4 Notes about the sample code¶
4.1 The dealer¶
pyre_dealer01.py creates a pyre node and sets the header.
It then sends messages to members of the group. The message content and the number of messages sent depend on command line options.
The body of the message (the “payload”) is a Python dictionary that was converted to JSON.
Because that payload is a Python string, we send it with node.shouts(‘group1’, string_payload). If the payload were a byte array (Python type bytes), we would send it using node.shout(‘group1’, bytes_payload).
4.2 The client¶
pyre_client01.py creates a pyre node, the joins the group “group1”.
The client uses node.recv() to wait for and receive messages.
Each message is a list. The first item in the list is the message type.
The client skips messages until it receives a message whose type is b’SHOUT’, which is a message to the group.
It prints a message and then waits for the user to press Enter or enter a “q” to quit.
5 Hints and suggestions¶
5.1 Sending messages¶
Summary:
- To send messages to a group, use node.shout(‘group1’, bytes) or node.shouts(‘group1’, string). This assumes that each peer that is to receive the message has done node.join(‘group1’).
- To send messages to a specific node, use node.whisper(uuid, bytes) or node.whispers(uuid, string). You can use node.peers() to get a list of the UUIDs of active peers
- To send messages that are byte arrays, use node.whisper(uuid, byte_array) or node.shout(group, byte_array).
- To send messages that are strings, use node.whispers(uuid, message_string) or node.shouts(group, message_string).
5.2 Peers and UUIDs¶
Start a peer with the following:
1 2 3 4 | |
Once peers have been started, then on another host machine, you can get a list of the UUIDs of all the peers that have been started with the following:
1 | |
6 Another example: request and response¶
This example, a “dealer” distributes requests across the available pyre clients.
Here is the code:
-
The dealer/server
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147
#!/usr/bin/env python """ usage: pyre_dealer05.py [-h] [-c COUNT] [-m MESSAGE] [-v] synopsis: a simple Pyre dealer/server. Receives response from clients. optional arguments: -h, --help show this help message and exit -c COUNT, --count COUNT number of messages to send. Default: 2 -m MESSAGE, --message MESSAGE a message body to be sent -v, --verbose Print messages during actions. examples: python dealer05.py python dealer05.py --count=4 python dealer05.py -c 4 --message="a simple message" """ from __future__ import print_function # from six.moves import input import argparse import socket import json import time import itertools import pyre import gevent import gevent.monkey gevent.monkey.patch_socket() Node_name = socket.gethostname() Node_value = '{} server'.format(Node_name) Group_name = 'group1' def dbg_print(opts, msg, **kwargs): """Print a message if verbose is on.""" if opts.verbose: print(msg, **kwargs) def send_request(node, peer, msg, opts): """Send a request.""" body = {'host': Node_name, 'request': msg} bodystr = json.dumps(body) dbg_print(opts, 'sending: "{}"'.format(bodystr)) node.whispers(peer, bodystr) def receive_response(node, opts): """Receive a response.""" while True: data = node.recv() if data[0] == b'WHISPER': break dbg_print(opts, 'received response: {}'.format(data)) return data def run(opts): node = pyre.Pyre(Node_name) node.set_header(Node_name, Node_value) node.start() while not node.peers(): print('No peers. Waiting.') time.sleep(2) peers = node.peers() dbg_print(opts, 'peers: {}'.format(peers)) dbg_print(opts, 'sending') peer_cycler = itertools.cycle(peers) send_tasks = [] for idx in range(opts.count): peer = peer_cycler.__next__() msg = '{}. {}'.format(idx, opts.message) task = gevent.spawn(send_request, node, peer, msg, opts) send_tasks.append(task) receive_tasks = [] for idx in range(opts.count): task = gevent.spawn(receive_response, node, opts) receive_tasks.append(task) # gevent.joinall(list(itertools.chain(send_tasks, receive_tasks))) dbg_print(opts, 'before join send_tasks') gevent.joinall(send_tasks) dbg_print(opts, 'after join send_tasks') print('-' * 50) for task in gevent.iwait(receive_tasks): data1 = task.value data2 = data1[3] data2 = json.loads(data2) print('sent: "{}" received from {}: "{}"'.format( data2['request'], data2['sender'], data2['response'])) print('-' * 50) node.stop() def main(): description = """\ synopsis: a simple Pyre dealer/server. Receives response from clients. """ epilog = """\ examples: python dealer05.py python dealer05.py --count=4 python dealer05.py -c 4 --message="a simple message" """ parser = argparse.ArgumentParser( description=description, epilog=epilog, formatter_class=argparse.RawDescriptionHelpFormatter, ) # parser.add_argument( # "command", # help="A command, one of: getlist, getone, add, update, delete" # ) parser.add_argument( "-c", "--count", type=int, default=2, help="number of messages to send. Default: 2", ) parser.add_argument( "-m", "--message", default='default message', help="a message body to be sent", ) # parser.add_argument( # "-g", "--send-with-gevent", # action="store_true", # help="Send messages/requests with gevent tasks", # ) parser.add_argument( "-v", "--verbose", action="store_true", help="Print messages during actions.", ) options = parser.parse_args() run(options) if __name__ == '__main__': # import pdb; pdb.set_trace() # import ipdb; ipdb.set_trace() main() -
The client
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120
#!/usr/bin/env python """ usage: pyre_client05.py [-h] [-v] [-d DELAY] synopsis: A simple Pyre client. Sends response to dealer. optional arguments: -h, --help show this help message and exit -v, --verbose print messages during actions. -d DELAY, --delay DELAY delay/sleep and pretend to work for x seconds examples: python pyre_client05.py """ from __future__ import print_function #from six.moves import input #import sys import argparse import socket import uuid import json import time import pyre Node_name = socket.gethostname() Node_value = '{} server'.format(Node_name) Group_name = 'group1' def dbg_print(opts, msg, **kwargs): """Print a message if verbose is on.""" if opts.verbose: print(msg, **kwargs) def run(opts): node = pyre.Pyre(Node_name) node.set_header(Node_name, Node_value) node.start() node.join(Group_name) idx = 0 while True: try: print('ready to receive') while True: data = node.recv() if data[0] == b'WHISPER': break dbg_print(opts, '{}. received data: {}'.format(idx, data)) # Send a response back to the sender. # Get the sender's uuid. time.sleep(opts.delay) uuid1 = data[1] uuid2 = uuid.UUID(bytes=uuid1) data1 = data[3] data1 = data1.decode() data2 = json.loads(data1) data3 = data2['request'] # # Perform a task and create a response to the "request". data4 = data3.upper() data2['response'] = data4 data2['sender'] = Node_name data5 = json.dumps(data2) node.whispers(uuid2, data5) print('{}. sending response: {}\n----------'.format( idx, data4)) idx += 1 except KeyboardInterrupt: break print('\ncleaning up and leaving') node.leave(Group_name) node.stop() def main(): description = """\ synopsis: A simple Pyre client. Sends response to dealer. """ epilog = """\ examples: python pyre_client05.py """ parser = argparse.ArgumentParser( description=description, epilog=epilog, formatter_class=argparse.RawDescriptionHelpFormatter, ) parser.add_argument( "-v", "--verbose", action="store_true", help="print messages during actions.", ) parser.add_argument( "-d", "--delay", type=float, default=0.0, help="delay/sleep and pretend to work for x seconds", ) # parser.add_argument( # "command", # help="A command, one of: getlist, getone, add, update, delete" # ) # parser.add_argument( # "-a", "--args", # help="Arguments: (1) task name or (2) form parameters" # ) options = parser.parse_args() run(options) if __name__ == '__main__': # import pdb; pdb.set_trace() # import ipdb; ipdb.set_trace() main()
Notes:
- The dealer uses a simple round-robin strategy to distribute messages across available clients. In a real-world use case, it is likely that we’d need something more complex, in particular where some clients are over-loaded or some requests take longer to process.
- In order to return a response, the client needs the UUID of the sender of the message/request. That information is automatically packed into the message by pyre. The client only needs to pick that information out of the message, then convert it from Python bytes (text) into a UUID.
- The dealer uses the Python json module to encode the request. The client uses json to decode the request and then to encode its response. The request and response are Python dictionaries.
- The dealer uses gevent to wait on the responses. I’m not sure that this is needed, because, after all, we are distributing load across clients. In a really application, you would have to work on this part to get the behavior you want.
- The dealer decodes each response using the Python json module and prints out information from each response.