Skip to content

Commit 4e97076

Browse files
committed
Add MessagePack RPC Server
1 parent 61e83c2 commit 4e97076

8 files changed

Lines changed: 193 additions & 30 deletions

File tree

msgpackrpc/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,4 +10,5 @@
1010
# shortcut for most-used symbols
1111
from msgpackrpc.loop import Loop
1212
from msgpackrpc.client import Client
13+
from msgpackrpc.server import Server
1314
from msgpackrpc.address import Address

msgpackrpc/error.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,3 +6,6 @@ class TimeoutError(RPCError):
66

77
class TransportError(RPCError):
88
pass
9+
10+
class NoMethodError(RPCError):
11+
pass

msgpackrpc/server.py

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
import msgpack
2+
3+
from msgpackrpc import error
4+
from msgpackrpc import loop
5+
from msgpackrpc import message
6+
from msgpackrpc import session
7+
from msgpackrpc.transport import tcp
8+
9+
class Server(session.Session):
10+
"""\
11+
Server is usaful for MessagePack RPC Server.
12+
"""
13+
14+
def __init__(self, dispatcher, loop=loop.Loop(), builder=tcp):
15+
#session.Session.__init__(self, address, timeout, loop, builder)
16+
self._loop = loop
17+
self._builder = builder
18+
self._listeners = []
19+
self._dispatcher = dispatcher
20+
21+
def listen(self, address):
22+
listener = self._builder.ServerTransport(address)
23+
listener.listen(self)
24+
self._listeners.append(listener)
25+
26+
def start(self):
27+
self._loop.start()
28+
29+
def stop(self):
30+
self._loop.stop()
31+
32+
def close(self):
33+
for listener in self._listeners:
34+
listener.close()
35+
#session.Session.close(self)
36+
37+
def on_request(self, sendable, msgid, method, param):
38+
self.dispatch(method, param, _Responder(sendable, msgid))
39+
40+
def on_notify(self, method, param):
41+
self.dispatch(method, param, _NullResponder())
42+
43+
def dispatch(self, method, param, responder):
44+
try:
45+
if not hasattr(self._dispatcher, method):
46+
raise error.NoMethodError("{0} not found".format(method))
47+
responder.set_result(getattr(self._dispatcher, method)(*param))
48+
except Exception as e:
49+
responder.set_error(str(e))
50+
51+
# TODO: Support advanced and async return
52+
53+
54+
class _Responder:
55+
def __init__(self, sendable, msgid):
56+
self._sendable = sendable
57+
self._msgid = msgid
58+
self._sent = False
59+
60+
def set_result(self, value, error=None, packer=msgpack.Packer()):
61+
if not self._sent:
62+
self._sendable.send_message([message.RESPONSE, self._msgid, error, value])
63+
self._sent = True
64+
65+
def set_error(self, error, value=None):
66+
self.set_result(value, error)
67+
68+
69+
class _NullResponder:
70+
def set_result(self, value, error=None):
71+
pass
72+
73+
def set_error(self, error, value=None):
74+
pass

msgpackrpc/transport/tcp.py

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,3 +130,54 @@ def on_close(self, sock):
130130
else:
131131
# Tornado does not have on_connect_failed event.
132132
self.on_connect_failed(sock)
133+
134+
135+
class ServerSocket(BaseSocket):
136+
def __init__(self, stream, transport):
137+
BaseSocket.__init__(self, stream)
138+
self._transport = transport
139+
self._stream.read_until_close(self.on_read, self.on_read)
140+
#self._stream.set_close_callback(self.on_close)
141+
142+
#def connect(self):
143+
# self._stream.connect(self._transport._address.unpack(), self.on_connect)
144+
145+
#def on_connect(self):
146+
# self._stream.read_until_close(self.on_read, self.on_read)
147+
# self._transport.on_connect(self)
148+
149+
#def on_connect_failed(self):
150+
# self._transport.on_connect_failed(self)
151+
152+
def on_close(self):
153+
self._transport.on_close(self)
154+
155+
def on_request(self, msgid, method, param):
156+
self._transport._server.on_request(self, msgid, method, param)
157+
self._stream.close()
158+
159+
def on_notify(self, method, param):
160+
self._transport._server.on_request(method, param)
161+
self._stream.close()
162+
163+
164+
class MessagePackServer(netutil.TCPServer):
165+
def __init__(self, transport, io_loop=None):
166+
self._transport = transport
167+
netutil.TCPServer.__init__(self, io_loop=io_loop)
168+
169+
def handle_stream(self, stream, address):
170+
ServerSocket(stream, self._transport)
171+
172+
173+
class ServerTransport(object):
174+
def __init__(self, address):
175+
self._address = address;
176+
177+
def listen(self, server):
178+
self._server = server;
179+
self._mp_server = MessagePackServer(self, io_loop=self._server._loop._ioloop)
180+
self._mp_server.listen(self._address.port)
181+
182+
def close(self):
183+
self._mp_server.stop()

test/add_client.py

Lines changed: 0 additions & 13 deletions
This file was deleted.

test/add_server.rb

Lines changed: 0 additions & 17 deletions
This file was deleted.

test/helper.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
import os
2+
import sys
3+
4+
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
5+
6+
def unused_port():
7+
import socket
8+
9+
sock = socket.socket()
10+
sock.bind(("localhost", 0))
11+
port = sock.getsockname()[1]
12+
sock.close()
13+
return port

test/test_msgpackrpc.py

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
import threading
2+
import unittest
3+
4+
import helper
5+
import msgpackrpc
6+
7+
8+
class TestMessagePackRPC(unittest.TestCase):
9+
class TestServer(object):
10+
def hello(self):
11+
return "world"
12+
13+
def sum(self, x, y):
14+
return x + y
15+
16+
17+
def setUp(self):
18+
self._address = msgpackrpc.Address('localhost', helper.unused_port())
19+
20+
def setup_env(self):
21+
def _start_server(server):
22+
server.start()
23+
server.close()
24+
25+
self._server = msgpackrpc.Server(TestMessagePackRPC.TestServer())
26+
self._server.listen(self._address)
27+
self._thread = threading.Thread(target=_start_server, args=(self._server,))
28+
self._thread.start()
29+
30+
import time
31+
time.sleep(1)
32+
33+
self._client = msgpackrpc.Client(self._address)
34+
return self._client;
35+
36+
def tearDown(self):
37+
self._client.close();
38+
self._server.stop();
39+
self._thread.join();
40+
41+
def test_hello(self):
42+
client = self.setup_env();
43+
self.assertEqual(client.call('hello'), "world", "sum result is incorrect")
44+
45+
def test_add(self):
46+
client = self.setup_env();
47+
self.assertEqual(client.call('sum', 1, 2), 3, "sum result is incorrect")
48+
49+
50+
if __name__ == '__main__':
51+
unittest.main()

0 commit comments

Comments
 (0)