|
1 | 1 | import zmq |
2 | 2 | import numpy as np |
| 3 | +import logging |
3 | 4 |
|
4 | 5 | from cellprofiler_core.module.image_segmentation import ImageSegmentation |
5 | 6 | from cellprofiler_core.setting.do_something import DoSomething |
6 | 7 | from cellprofiler_core.setting.text import Integer |
7 | 8 | from cellprofiler_core.object import Objects |
8 | 9 |
|
| 10 | +LOGGER = logging.getLogger(__name__) |
| 11 | + |
9 | 12 | HELLO = "Hello" |
10 | 13 | ACK = "Acknowledge" |
11 | 14 | DENIED = "Denied" |
@@ -114,70 +117,83 @@ def do_server_handshake(self): |
114 | 117 | socket_addr = f"tcp://{domain}:{port}" |
115 | 118 |
|
116 | 119 | if self.context: |
| 120 | + LOGGER.debug("destroying existing context") |
117 | 121 | self.context.destroy() |
118 | 122 | self.server_socket = None |
119 | 123 |
|
120 | 124 | self.context = zmq.Context() |
121 | 125 | self.server_socket = self.context.socket(zmq.PAIR) |
122 | 126 | self.server_socket.copy_threshold = 0 |
123 | 127 |
|
124 | | - print("connecting to", socket_addr) |
| 128 | + LOGGER.debug(f"connecting to {socket_addr}") |
125 | 129 |
|
126 | 130 | c = self.server_socket.connect(socket_addr) |
127 | 131 |
|
128 | | - print("setup socket at", c) |
| 132 | + LOGGER.debug(f"setup socket at {c}") |
129 | 133 |
|
130 | | - print("sending handshake, waiting for acknowledgement") |
| 134 | + LOGGER.debug("sending handshake, waiting for acknowledgement") |
131 | 135 |
|
132 | 136 | self.server_socket.send_string(HELLO) |
133 | | - response = self.server_socket.recv_string() |
134 | 137 |
|
| 138 | + poller = zmq.Poller() |
| 139 | + poller.register(self.server_socket, zmq.POLLIN) |
| 140 | + while True: |
| 141 | + socks = dict(poller.poll(5000)) |
| 142 | + if socks.get(self.server_socket) == zmq.POLLIN: |
| 143 | + break |
| 144 | + else: |
| 145 | + LOGGER.debug("handshake timeout") |
| 146 | + return |
| 147 | + |
| 148 | + response = self.server_socket.recv_string() |
| 149 | + |
135 | 150 | if response == ACK: |
136 | | - print("received correct response", response) |
| 151 | + LOGGER.debug(f"received correct response {response}") |
137 | 152 | else: |
138 | | - print("received unexpected response", response) |
| 153 | + LOGGER.debug(f"received unexpected response {response}") |
139 | 154 |
|
140 | 155 | def do_server_execute(self, im_data): |
141 | 156 | dummy_data = lambda: np.array([[]]) |
142 | 157 |
|
143 | 158 | socket = self.server_socket |
144 | 159 | header = np.lib.format.header_data_from_array_1_0(im_data) |
145 | 160 |
|
146 | | - print("sending header", header, "waiting for acknowledgement") |
| 161 | + LOGGER.debug(f"sending header {header}; waiting for acknowledgement") |
147 | 162 | socket.send_json(header) |
148 | 163 |
|
149 | 164 | ack = socket.recv_string() |
150 | 165 | if ack == ACK: |
151 | | - print("header acknowledged:", ack) |
| 166 | + LOGGER.debug(f"header acknowledged: {ack}") |
152 | 167 | else: |
153 | | - print("unexpected response", ack) |
| 168 | + LOGGER.debug(f"unexpected response {ack}") |
154 | 169 | return dummy_data() |
155 | 170 |
|
156 | | - print("sending image data", im_data.shape, "waiting for acknowledgement") |
| 171 | + LOGGER.debug(f"sending image data {im_data.shape}; waiting for acknowledgement") |
157 | 172 | socket.send(im_data, copy=False) |
158 | 173 |
|
159 | 174 | ack = socket.recv_string() |
160 | 175 | if ack == ACK: |
161 | | - print("image data acknowledged", ack) |
| 176 | + LOGGER.debug(f"image data acknowledged {ack}") |
162 | 177 | elif ack == DENIED: |
163 | | - print("image data denied, aborting", ack) |
| 178 | + LOGGER.debug(f"image data denied, aborting {ack}") |
164 | 179 | return dummy_data() |
165 | 180 | else: |
166 | | - print("unknown response to image data", ack) |
| 181 | + LOGGER.debug(f"unknown response to image data {ack}") |
167 | 182 | return dummy_data() |
168 | 183 |
|
169 | | - print("waiting for return header") |
| 184 | + LOGGER.debug("waiting for return header") |
170 | 185 | return_header = socket.recv_json() |
171 | | - print("received return header", return_header) |
| 186 | + LOGGER.debug(f"received return header {return_header}") |
172 | 187 |
|
173 | | - print("acknowledging header reciept") |
| 188 | + LOGGER.debug("acknowledging header reciept") |
174 | 189 | socket.send_string(ACK) |
175 | 190 |
|
176 | | - print("waiting for image data") |
| 191 | + LOGGER.debug("waiting for image data") |
177 | 192 | label_data_buf = socket.recv(copy=False) |
178 | | - print("image data received") |
| 193 | + LOGGER.debug("image data received") |
179 | 194 |
|
180 | 195 | labels = np.frombuffer(label_data_buf, dtype=return_header['descr']) |
181 | 196 | labels.shape = return_header['shape'] |
182 | | - print("returning label data", labels.shape) |
| 197 | + LOGGER.debug(f"returning label data {labels.shape}") |
| 198 | + |
183 | 199 | return labels |
0 commit comments