Skip to content

Commit aad5da6

Browse files
committed
Support notify
1 parent cf23a18 commit aad5da6

3 files changed

Lines changed: 19 additions & 12 deletions

File tree

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ MessagePack RPC implementation based on Tornado.
88

99
address = msgpackrpc.Address("localhost", 18800)
1010
client = msgpackrpc.Client(address)
11-
result = client.call('add', 1, 2) # = > 3
11+
result = client.call('sum', 1, 2) # = > 3
1212

1313
# Installation
1414

@@ -21,7 +21,7 @@ MessagePack RPC implementation based on Tornado.
2121

2222
# TODO
2323

24-
* Add Server
24+
* Add advanced and async return to Server.
2525
* UDP, UNIX Domain support
2626
* Utilities (MultiFuture, SessionPool)
2727
* Support pyev for performance if needed

msgpackrpc/session.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,12 @@ def send_request(self, method, args):
5353

5454
return future
5555

56+
def notify(self, method, *args):
57+
def callback():
58+
self._loop.stop()
59+
self._transport.send_message([message.NOTIFY, method, args], callback=callback)
60+
self._loop.start()
61+
5662
def close(self):
5763
if self._transport:
5864
self._transport.close()

msgpackrpc/transport/tcp.py

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,17 +21,18 @@ def __init__(self, stream):
2121
def close(self):
2222
self._stream.close()
2323

24-
def send_message(self, message):
25-
self._stream.write(self._packer.pack(message))
24+
def send_message(self, message, callback=None):
25+
self._stream.write(self._packer.pack(message), callback=callback)
2626

2727
def on_read(self, data):
2828
self._unpacker.feed(data)
2929
for message in self._unpacker:
3030
self.on_message(message)
3131

3232
def on_message(self, message, *args):
33-
if len(message) != 4:
34-
raise Exception("Invalid MessagePack-RPC protocol: message = {0}".format(message))
33+
msgsize = len(message)
34+
if msgsize != 4 and msgsize != 3:
35+
raise RPCError("Invalid MessagePack-RPC protocol: message = {0}".format(message))
3536

3637
msgtype = message[0]
3738
if msgtype == msgpackrpc.message.REQUEST:
@@ -86,15 +87,15 @@ def __init__(self, session, address, reconnect_limit):
8687
self._pending = []
8788
self._sockets = []
8889

89-
def send_message(self, message):
90+
def send_message(self, message, callback=None):
9091
if len(self._sockets) == 0:
9192
if self._connecting == 0:
9293
self.connect()
9394
self._connecting = 1
94-
self._pending.append(message)
95+
self._pending.append((message, callback))
9596
else:
9697
sock = self._sockets[0]
97-
sock.send_message(message)
98+
sock.send_message(message, callback)
9899

99100
def connect(self):
100101
stream = IOStream(self._address.socket(), io_loop=self._session._loop._ioloop)
@@ -111,8 +112,8 @@ def close(self):
111112

112113
def on_connect(self, sock):
113114
self._sockets.append(sock)
114-
for pending in self._pending:
115-
sock.send_message(pending)
115+
for pending, callback in self._pending:
116+
sock.send_message(pending, callback)
116117
self._pending = []
117118

118119
def on_connect_failed(self, sock):
@@ -145,7 +146,7 @@ def on_request(self, msgid, method, param):
145146
self._transport._server.on_request(self, msgid, method, param)
146147

147148
def on_notify(self, method, param):
148-
self._transport._server.on_request(method, param)
149+
self._transport._server.on_notify(method, param)
149150

150151

151152
class MessagePackServer(netutil.TCPServer):

0 commit comments

Comments
 (0)