Ticket #1732: zmq.scm

File zmq.scm, 17.5 KB (added by Ariela Wenner, 5 years ago)
Line 
1(module zmq
2
3 (zmq-default-context zmq-io-threads zmq-version
4 make-context terminate-context context?
5 make-socket socket? close-socket bind-socket connect-socket
6 socket-option-set! socket-option socket-fd socket-pointer
7 send-message receive-message receive-message*
8 make-poll-item poll poll-item-socket
9 poll-item-fd poll-item-in? poll-item-out? poll-item-error?
10 curve-keypair)
11
12 (import scheme (chicken base) (chicken foreign)
13 (chicken bitwise) (chicken memory) (chicken blob)
14 (chicken memory representation) (chicken gc)
15 (chicken format)
16 srfi-1 srfi-4 srfi-18 srfi-13 foreigners)
17
18(import-for-syntax srfi-1)
19
20(foreign-declare "#include <zmq.h>")
21(foreign-declare "#include <errno.h>")
22
23(define-record context pointer sockets)
24(define-foreign-type context c-pointer)
25
26(define-foreign-type message (c-pointer "zmq_msg_t"))
27
28(define-record socket pointer mutex message)
29(define-foreign-type socket c-pointer)
30
31(define-foreign-enum-type (socket-type int)
32 (socket-type->int int->socket-type)
33 ((pair) ZMQ_PAIR)
34 ((pub) ZMQ_PUB)
35 ((sub) ZMQ_SUB)
36 ((req) ZMQ_REQ)
37 ((rep) ZMQ_REP)
38 ((xreq) ZMQ_XREQ)
39 ((xrep) ZMQ_XREP)
40 ((pull) ZMQ_PULL)
41 ((push) ZMQ_PUSH))
42
43(define-foreign-enum-type (socket-option int)
44 (socket-option->int int->socket-option)
45 ((affinity) ZMQ_AFFINITY)
46 ((sndhwm) ZMQ_SNDHWM)
47 ((rcvhwm) ZMQ_RCVHWM)
48 ((identity) ZMQ_IDENTITY)
49 ((subscribe) ZMQ_SUBSCRIBE)
50 ((unsubscribe) ZMQ_UNSUBSCRIBE)
51 ((rate) ZMQ_RATE)
52 ((recovery-ivl) ZMQ_RECOVERY_IVL)
53 ((sndbuf) ZMQ_SNDBUF)
54 ((rcvbuf) ZMQ_RCVBUF)
55 ((rcvmore) ZMQ_RCVMORE)
56 ((fd) ZMQ_FD)
57 ((curve/pubkey) ZMQ_CURVE_PUBLICKEY)
58 ((curve/pubkey-blob) ZMQ_CURVE_PUBLICKEY)
59 ((curve/secretkey) ZMQ_CURVE_SECRETKEY)
60 ((curve/secretkey-blob) ZMQ_CURVE_SECRETKEY)
61 ((curve/server) ZMQ_CURVE_SERVER)
62 ((curve/serverkey) ZMQ_CURVE_SERVERKEY)
63 ((curve/serverkey-blob) ZMQ_CURVE_SERVERKEY))
64
65
66(define socket-options
67 '((integer sndhwm rcvhwm affinity rate recovery-ivl sndbuf rcvbuf)
68 (boolean rcvmore)
69 (string subscribe unsubscribe identity)))
70
71(define-foreign-enum-type (socket-flag int)
72 (socket-flag->int int->socket-flag)
73 ((noblock zmq/noblock) ZMQ_NOBLOCK)
74 ((sndmore zmq/sndmore) ZMQ_SNDMORE))
75
76(define-foreign-enum-type (poll-flag short)
77 (poll-flat->int short->poll-int)
78 ((in zmq/pollin) ZMQ_POLLIN)
79 ((out zmq/pollout) ZMQ_POLLOUT)
80 ((err zmq/pollerr) ZMQ_POLLERR))
81
82(define-record poll-item pointer socket in out)
83(define-foreign-record-type (poll-item zmq_pollitem_t)
84 (constructor: make-foreign-poll-item)
85 (destructor: free-foreign-poll-item)
86 (socket socket %poll-item-socket %poll-item-socket-set!)
87 (int fd %poll-item-fd %poll-item-fd-set!)
88 (short events %poll-item-events %poll-item-events-set!)
89 (short revents %poll-item-revents %poll-item-revents-set!))
90
91(define-foreign-enum-type (errno int)
92 (errno->int int->errno)
93 ((again) EAGAIN)
94 ((term) ETERM))
95
96;; helpers
97
98(define (zmq-error location)
99 (let ((errno (foreign-value errno int)))
100 (error location
101 ((foreign-lambda c-string zmq_strerror int) errno)
102 errno)))
103
104(define (errno)
105 (foreign-value errno errno))
106
107(define (type-error value expected-type)
108 (error (format "invalid value: ~S (expected ~A)" value expected-type)))
109
110(define (zmq-version)
111 (let-location ((major int) (minor int) (patch int))
112 ((foreign-lambda void zmq_version (c-pointer int) (c-pointer int) (c-pointer int))
113 (location major) (location minor) (location patch))
114 (list major minor patch)))
115
116;; contexts
117
118(define zmq-io-threads (make-parameter 1))
119
120(define zmq-default-context (make-parameter #f))
121
122(define (zmq-default-context/initialize)
123 (or (zmq-default-context)
124 (begin (zmq-default-context (make-context (zmq-io-threads)))
125 (zmq-default-context))))
126
127(define %make-context make-context)
128
129(define (make-context io-threads)
130 (let ((c (%make-context ((foreign-lambda context zmq_init int) io-threads)
131 (make-mutex))))
132 (if (not (context-pointer c))
133 (zmq-error 'make-context)
134 (begin
135 (mutex-specific-set! (context-sockets c) '())
136 (set-finalizer! c (lambda (c)
137 (for-each close-socket (mutex-specific (context-sockets c)))
138 (terminate-context c)))))))
139
140(define (terminate-context ctx)
141 (or (zero? ((foreign-lambda int zmq_term context)
142 (context-pointer ctx)))
143 (zmq-error 'terminate-context)))
144
145;; messages
146
147(define (initialize-message message #!optional data)
148 (if (zero? (if data
149 (begin
150 (unless (or (string? data) (blob? data))
151 (type-error data "string or blob"))
152 (let* ((len (number-of-bytes data))
153 (cdata (allocate len)))
154 ((foreign-lambda void "C_memcpy" c-pointer scheme-pointer int)
155 cdata data len)
156 ((foreign-lambda int
157 zmq_msg_init_data
158 message
159 c-pointer
160 unsigned-int
161 c-pointer
162 c-pointer)
163 message
164 cdata
165 len
166 (foreign-value "C_free" c-pointer)
167 #f)))
168 ((foreign-lambda int zmq_msg_init message) message)))
169 message
170 (zmq-error 'initialize-message)))
171
172(define (close-message message)
173 (or (zero? ((foreign-lambda int zmq_msg_close message) message))
174 (zmq-error 'close-message)))
175
176(define (message-size message)
177 ((foreign-lambda unsigned-integer zmq_msg_size message) message))
178
179(define (message-data message type)
180 (let* ((size (message-size message))
181 (ptr ((foreign-lambda c-pointer zmq_msg_data message) message)))
182
183 (cond ((symbol? type)
184 (case type
185 ((string)
186 (let ((str (make-string size)))
187 (move-memory! ptr str size)
188 str))
189 ((blob)
190 (let ((blob (make-blob size)))
191 (move-memory! ptr blob size)
192 blob))
193 (else (error 'message-data "invalid message data type" type))))
194 ((procedure? type)
195 (type ptr size))
196 (else (error 'message-data "invalid message data type" type)))))
197
198;; sockets
199
200(define %make-socket make-socket)
201
202(define (make-socket type #!optional (context (zmq-default-context/initialize)))
203 (let ((sp ((foreign-lambda socket zmq_socket context socket-type)
204 (context-pointer context) type)))
205 (if (not sp)
206 (zmq-error 'make-socket)
207 (let ((m (context-sockets context))
208 (s (%make-socket sp
209 (make-mutex)
210 (allocate (foreign-value "sizeof(zmq_msg_t)" int)))))
211
212 (mutex-lock! m)
213 (mutex-specific-set! m (cons sp (mutex-specific m)))
214 (mutex-unlock! m)
215 (set-finalizer! s (lambda (s)
216 (free (socket-message s))
217 (close-socket s)))))))
218
219(define (close-socket socket)
220 (let ((sp (cond ((socket? socket) (socket-pointer socket))
221 ((pointer? socket) socket)
222 (else (type-error socket 'socket)))))
223
224 (when sp
225 (if (zero? ((foreign-lambda int zmq_close socket) sp))
226 (when (socket? socket) (socket-pointer-set! socket #f))
227 (zmq-error 'close-socket)))))
228
229(define (bind-socket socket endpoint)
230 (or (zero? ((foreign-lambda int zmq_bind socket c-string)
231 (socket-pointer socket)
232 endpoint))
233 (zmq-error 'bind-socket)))
234
235(define (connect-socket socket endpoint)
236 (or (zero? ((foreign-lambda int zmq_connect socket c-string)
237 (socket-pointer socket)
238 endpoint))
239 (zmq-error 'connect-socket)))
240
241;; integer64 is used instead of unsigned-integer64 for uint64_t
242;; options since the latter has only been added to the experimental
243;; branch recently. Also, we must use foreign-lambda* to be able to
244;; pass in integer64 values because let-location doesn't accept
245;; integer64 (also fixed in experimental)
246
247(define (socket-option-set! socket option value)
248 (or (zero? (case option
249 ((rcvhwm sndhwm affinity sndbuf rcvbuf rate recovery-ivl curve/server )
250 (if (integer? value)
251 ((foreign-safe-lambda* int
252 ((scheme-object error)
253 (scheme-object error_location)
254 (socket socket)
255 (socket-option option)
256 (int value))
257 "size_t size = sizeof(value);
258 int status = zmq_setsockopt(socket, option, &value, size);
259 if (status == 0) {
260 C_return(0);
261 } else {
262 C_save(error_location);
263 C_callback(error, 1);
264 }")
265 zmq-error 'socket-option-set! (socket-pointer socket) option value)
266 (type-error value 'integer)))
267
268 ((identity subscribe unsubscribe curve/pubkey curve/secretkey curve/serverkey)
269 (if (string? value)
270 (let ((status ((foreign-lambda int zmq_setsockopt socket socket-option c-string unsigned-int)
271 (socket-pointer socket) option value (number-of-bytes value))))
272 (if (not (zero? status)) (zmq-error 'socket-option-set!) status))
273 (type-error value 'string)))
274
275 ((curve/pubkey-blob curve/secretkey-blob curve/serverkey-blob)
276 (if (blob? value)
277 (let ((status ((foreign-lambda int zmq_setsockopt socket socket-option blob unsigned-int)
278 (socket-pointer socket) option value (blob-size value))))
279 (if (not (zero? status)) (zmq-error 'socket-option-set!) status))
280 (type-error value 'blob)))
281
282 (else (error (format "unknown socket option: ~A" option)))))
283 (zmq-error 'socket-option-set!)))
284
285(define-syntax %socket-option
286 (er-macro-transformer
287 (lambda (e r c)
288 (let ((location (second e))
289 (f-type (third e))
290 (c-type (fourth e))
291 (socket (fifth e))
292 (option (sixth e)))
293 `((,(r 'foreign-safe-lambda*) ,f-type ((scheme-object error)
294 (scheme-object error_location)
295 (socket socket)
296 (socket-option option))
297 ,(string-append c-type " value;
298 size_t size = sizeof(value);
299 int status = zmq_getsockopt(socket, option, &value, &size);
300 if (status == 0) {
301 C_return(value);
302 } else {
303 C_save(error_location);
304 C_callback(error, 1);
305 }"))
306 ,(r 'zmq-error) ,location (,(r 'socket-pointer) ,socket) ,option)))))
307
308(define (socket-fd socket)
309 (%socket-option 'socket-fd int "int" socket 'fd))
310
311(define socket-option
312 (let ((routing-id (make-string 255)))
313 (lambda (socket option)
314 (case option
315 ((identity)
316 (let-location
317 ((size unsigned-integer64 255))
318 (if (zero? ((foreign-lambda int zmq_getsockopt socket socket-option scheme-pointer
319 (c-pointer unsigned-integer64))
320 (socket-pointer socket) option routing-id (location size)))
321 (substring routing-id 0 size)
322 (zmq-error 'socket-option))))
323 (else
324 (cond
325
326 ((memq option (alist-ref 'integer socket-options))
327 (%socket-option 'socket-option int "int" socket option))
328
329 ((memq option (alist-ref 'boolean socket-options))
330 (%socket-option 'socket-option bool "int" socket option))
331
332 (else
333 (error (format "socket option ~A is not retrievable" option)))))
334 ))
335 ))
336
337
338;; communication
339
340(define (send-message socket data #!key non-blocking send-more)
341 (mutex-lock! (socket-mutex socket))
342 (let* ((message (initialize-message (socket-message socket) data))
343 (result ((foreign-lambda int zmq_msg_send message socket int)
344 message
345 (socket-pointer socket)
346 (bitwise-ior (if non-blocking zmq/noblock 0)
347 (if send-more zmq/sndmore 0)))))
348
349 (close-message message)
350 (mutex-unlock! (socket-mutex socket))
351 (if (< result 0) (zmq-error 'send-message))))
352
353
354(define (receive-message socket #!key non-blocking (as 'string))
355 (mutex-lock! (socket-mutex socket))
356 (let* ((message (initialize-message (socket-message socket)))
357 (result ((foreign-lambda int zmq_msg_recv message socket int)
358 message
359 (socket-pointer socket)
360 (if non-blocking zmq/noblock 0))))
361
362 (if (>= result 0)
363 (let ((data (message-data message as)))
364 (mutex-unlock! (socket-mutex socket))
365 (close-message message)
366 data)
367 (begin
368 (mutex-unlock! (socket-mutex socket))
369 (close-message message)
370 (if (memq (errno) '(again term))
371 #f
372 (zmq-error 'receive-message))))))
373
374(define (receive-message* socket #!key (as 'string))
375 (or (receive-message socket non-blocking: #t as: as)
376 (begin
377 (thread-wait-for-i/o! (socket-fd socket) #:input)
378 (receive-message* socket as: as))))
379
380;; polling
381
382(define %make-poll-item make-poll-item)
383
384(define (make-poll-item socket/fd #!key in out)
385 (let ((item (%make-poll-item (make-foreign-poll-item)
386 (and (socket? socket/fd) socket/fd)
387 in out)))
388 (if (socket? socket/fd)
389 (%poll-item-socket-set! (poll-item-pointer item) (socket-pointer socket/fd))
390 (%poll-item-fd-set! (poll-item-pointer item) socket/fd))
391
392 (%poll-item-events-set! (poll-item-pointer item)
393 (bitwise-ior (if in zmq/pollin 0)
394 (if out zmq/pollout 0)))
395
396 (%poll-item-revents-set! (poll-item-pointer item) 0)
397
398 (set-finalizer! item (lambda (i)
399 (free-foreign-poll-item (poll-item-pointer i))))))
400
401(define (poll-item-fd item)
402 (%poll-item-fd (poll-item-pointer item)))
403
404(define (poll-item-revents item)
405 (%poll-item-revents (poll-item-pointer item)))
406
407(define (poll-item-in? item)
408 (not (zero? (bitwise-and zmq/pollin (poll-item-revents item)))))
409
410(define (poll-item-out? item)
411 (not (zero? (bitwise-and zmq/pollout (poll-item-revents item)))))
412
413(define (poll-item-error? item)
414 (not (zero? (bitwise-and zmq/pollerr (poll-item-revents item)))))
415
416(define %poll-sockets
417 (foreign-safe-lambda* int
418 ((scheme-object poll_item_ref)
419 (unsigned-int length)
420 (long timeout))
421 "zmq_pollitem_t items[length];
422 zmq_pollitem_t *item_ptrs[length];
423 int i;
424
425 for (i = 0; i < length; i++) {
426 C_save(C_fix(i));
427 item_ptrs[i] = (zmq_pollitem_t *)C_pointer_address(C_callback(poll_item_ref, 1));
428 }
429
430 for (i = 0; i < length; i++) {
431 items[i] = *item_ptrs[i];
432 }
433
434 int rc = zmq_poll(items, length, timeout);
435
436 if (rc != -1) {
437 for (i = 0; i < length; i++) {
438 (*item_ptrs[i]).revents = items[i].revents;
439 }
440 }
441
442 C_return(rc);"))
443
444(define (poll poll-items timeout/block)
445 (if (null? poll-items)
446 (error 'poll "null list passed for poll-items")
447 (let ((result (%poll-sockets (lambda (i)
448 (poll-item-pointer (list-ref poll-items i)))
449 (length poll-items)
450 (case timeout/block
451 ((#f) 0)
452 ((#t) -1)
453 (else timeout/block)))))
454 (if (= result -1)
455 (zmq-error 'poll)
456 result))))
457
458(define (curve-keypair)
459 (let-values (((pk sk)
460 ((foreign-primitive ()
461 "char public_key [41];
462 char secret_key [41];
463 int rc = zmq_curve_keypair (public_key, secret_key);
464
465 C_word* pkbuf = C_alloc(41);
466 C_word* skbuf = C_alloc(41);
467 C_word pkstr;
468 C_word skstr;
469
470 pkstr = C_string2(&pkbuf, public_key);
471 skstr = C_string2(&skbuf, secret_key);
472
473 C_word vals[4] = { C_SCHEME_UNDEFINED, C_k, pkstr, skstr };
474 C_values(4, vals);\n"))))
475 (values pk sk)))
476)