1+ import zmq
2+ import numpy as np
3+
4+ from cellprofiler_core .module .image_segmentation import ImageSegmentation
5+ from cellprofiler_core .setting .do_something import DoSomething
6+ from cellprofiler_core .setting .text import Integer
7+ from cellprofiler_core .object import Objects
8+
9+ HELLO = "Hello"
10+ ACK = "Acknowledge"
11+ DENIED = "Denied"
12+
13+ __doc__ = """\
14+ RunForeign
15+ ============
16+
17+ **RunForeign** runs a foreign tool via sockets.
18+
19+
20+ Assumes there is a server up and running, handshakes, and pings for availability on every validation run of pipeline.
21+ Server must be idompotent on both handshake and validation ping.
22+ Server provides definition of what run will be.
23+
24+ |
25+
26+ ============ ============ ===============
27+ Supports 2D? Supports 3D? Respects masks?
28+ ============ ============ ===============
29+ YES NO YES
30+ ============ ============ ===============
31+
32+ """
33+
34+ class RunForeign (ImageSegmentation ):
35+ category = "Object Processing"
36+
37+ module_name = "RunForeign"
38+
39+ variable_revision_number = 1
40+
41+ def create_settings (self ):
42+ super ().create_settings ()
43+
44+ self .context = None
45+ self .server_socket = None
46+
47+ # TODO: launch server automatically, if necessary
48+ self .server_port = Integer (
49+ text = "Server port number" ,
50+ value = 7878 ,
51+ minval = 0 ,
52+ doc = """\
53+ The port number which the server is listening on. The server must be launched manually first.
54+ """ ,
55+ )
56+
57+ # TODO: perform handshake automatically, if necessary
58+ self .server_handshake = DoSomething (
59+ "" ,
60+ "Perform Server Handshake" ,
61+ self .do_server_handshake ,
62+ doc = f"""\
63+ Press this button to do an initial handshake with the server.
64+ This must be done manually, once.
65+ """ ,
66+ )
67+
68+ def settings (self ):
69+ return super ().settings () + [self .server_port , self .server_handshake ]
70+
71+ # ImageSegmentation defines this so we have to overide it
72+ def visible_settings (self ):
73+ return self .settings ()
74+
75+ # ImageSegmentation defines this so we have to overide it
76+ def volumetric (self ):
77+ return False
78+
79+ def run (self , workspace ):
80+ x_name = self .x_name .value
81+
82+ y_name = self .y_name .value
83+
84+ images = workspace .image_set
85+
86+ x = images .get_image (x_name )
87+
88+ dimensions = x .dimensions
89+
90+ x_data = x .pixel_data
91+
92+ y_data = self .do_server_execute (x_data )
93+
94+ y = Objects ()
95+
96+ y .segmented = y_data
97+
98+ y .parent_image = x .parent_image
99+
100+ objects = workspace .object_set
101+
102+ objects .add_objects (y , y_name )
103+
104+ self .add_measurements (workspace )
105+
106+ if self .show_window :
107+ workspace .display_data .x_data = x_data
108+
109+ workspace .display_data .y_data = y_data
110+
111+ workspace .display_data .dimensions = dimensions
112+
113+ def do_server_handshake (self ):
114+ port = str (self .server_port .value )
115+ domain = "localhost"
116+ socket_addr = f"tcp://{ domain } :{ port } "
117+
118+ if self .context :
119+ self .context .destroy ()
120+ self .server_socket = None
121+
122+ self .context = zmq .Context ()
123+ self .server_socket = self .context .socket (zmq .PAIR )
124+ self .server_socket .copy_threshold = 0
125+ c = self .server_socket .connect (socket_addr )
126+
127+ print ("Setup socket at" , socket_addr , "connected to" , c )
128+
129+ self .server_socket .send_string (HELLO )
130+ response = self .server_socket .recv_string ()
131+
132+ if response == ACK :
133+ print ("Received correct response" , response )
134+ else :
135+ print ("Received unexpected response" , response )
136+
137+ def do_server_execute (self , im_data ):
138+ dummy_data = lambda : np .array ([[]])
139+
140+ socket = self .server_socket
141+ header = np .lib .format .header_data_from_array_1_0 (im_data )
142+
143+ socket .send_json (header )
144+
145+ ack = socket .recv_string ()
146+ if ack == ACK :
147+ print ("header acknowledged:" , ack )
148+ else :
149+ print ("unexpected response" , ack )
150+ return dummy_data ()
151+
152+ socket .send (im_data , copy = False )
153+
154+ ack = socket .recv_string ()
155+ if ack == ACK :
156+ print ("image data acknowledged" , ack )
157+ elif ack == DENIED :
158+ print ("image data denied, aborting" , ack )
159+ return dummy_data ()
160+ else :
161+ print ("unknown response to image data" , ack )
162+ return dummy_data ()
163+
164+ return_header = socket .recv_json ()
165+ print ("received return header" , return_header )
166+
167+ print ("acknowledging header reciept" )
168+ socket .send_string (ACK )
169+
170+ print ("waiting for image data" )
171+
172+ label_data_buf = socket .recv (copy = False )
173+ labels = np .frombuffer (label_data_buf , dtype = return_header ['descr' ])
174+ labels .shape = return_header ['shape' ]
175+ print ("returning label data" , labels .shape )
176+ return labels
0 commit comments