Ticket #1732: zmq.scm

File zmq.scm, 17.5 KB (added by Ariela Wenner, 9 months ago)
Line 
1(module zmq
2
3        (zmq-default-context zmq-io-threads zmq-version
4         make-context terminate-context context?
5         make-socket socket? close-socket bind-socket connect-socket
6         socket-option-set! socket-option socket-fd socket-pointer
7         send-message receive-message receive-message*
8         make-poll-item poll poll-item-socket
9         poll-item-fd poll-item-in? poll-item-out? poll-item-error?
10         curve-keypair)
11
12        (import scheme (chicken base) (chicken foreign)
13                (chicken bitwise) (chicken memory) (chicken blob)
14                (chicken memory representation) (chicken gc)
15                (chicken format)
16                srfi-1 srfi-4 srfi-18 srfi-13 foreigners)
17
18(import-for-syntax srfi-1)
19
20(foreign-declare "#include <zmq.h>")
21(foreign-declare "#include <errno.h>")
22
23(define-record context pointer sockets)
24(define-foreign-type context c-pointer)
25
26(define-foreign-type message (c-pointer "zmq_msg_t"))
27
28(define-record socket pointer mutex message)
29(define-foreign-type socket c-pointer)
30
31(define-foreign-enum-type (socket-type int)
32  (socket-type->int int->socket-type)
33  ((pair) ZMQ_PAIR)
34  ((pub) ZMQ_PUB)
35  ((sub) ZMQ_SUB)
36  ((req) ZMQ_REQ)
37  ((rep) ZMQ_REP)
38  ((xreq) ZMQ_XREQ)
39  ((xrep) ZMQ_XREP)
40  ((pull) ZMQ_PULL)
41  ((push) ZMQ_PUSH))
42
43(define-foreign-enum-type (socket-option int)
44  (socket-option->int int->socket-option)
45  ((affinity) ZMQ_AFFINITY)
46  ((sndhwm) ZMQ_SNDHWM)
47  ((rcvhwm) ZMQ_RCVHWM)
48  ((identity) ZMQ_IDENTITY)
49  ((subscribe) ZMQ_SUBSCRIBE)
50  ((unsubscribe) ZMQ_UNSUBSCRIBE)
51  ((rate) ZMQ_RATE)
52  ((recovery-ivl) ZMQ_RECOVERY_IVL)
53  ((sndbuf) ZMQ_SNDBUF)
54  ((rcvbuf) ZMQ_RCVBUF)
55  ((rcvmore) ZMQ_RCVMORE)
56  ((fd) ZMQ_FD)
57  ((curve/pubkey) ZMQ_CURVE_PUBLICKEY)
58  ((curve/pubkey-blob) ZMQ_CURVE_PUBLICKEY)
59  ((curve/secretkey) ZMQ_CURVE_SECRETKEY)
60  ((curve/secretkey-blob) ZMQ_CURVE_SECRETKEY)
61  ((curve/server) ZMQ_CURVE_SERVER)
62  ((curve/serverkey) ZMQ_CURVE_SERVERKEY)
63  ((curve/serverkey-blob) ZMQ_CURVE_SERVERKEY))
64
65
66(define socket-options
67  '((integer sndhwm rcvhwm affinity rate recovery-ivl sndbuf rcvbuf)
68    (boolean rcvmore)
69    (string subscribe unsubscribe identity)))
70
71(define-foreign-enum-type (socket-flag int)
72  (socket-flag->int int->socket-flag)
73  ((noblock zmq/noblock) ZMQ_NOBLOCK)
74  ((sndmore zmq/sndmore) ZMQ_SNDMORE))
75
76(define-foreign-enum-type (poll-flag short)
77  (poll-flat->int short->poll-int)
78  ((in zmq/pollin) ZMQ_POLLIN)
79  ((out zmq/pollout) ZMQ_POLLOUT)
80  ((err zmq/pollerr) ZMQ_POLLERR))
81
82(define-record poll-item pointer socket in out)
83(define-foreign-record-type (poll-item zmq_pollitem_t)
84  (constructor: make-foreign-poll-item)
85  (destructor: free-foreign-poll-item)
86  (socket socket %poll-item-socket %poll-item-socket-set!)
87  (int fd %poll-item-fd %poll-item-fd-set!)
88  (short events %poll-item-events %poll-item-events-set!)
89  (short revents %poll-item-revents %poll-item-revents-set!))
90
91(define-foreign-enum-type (errno int)
92  (errno->int int->errno)
93  ((again) EAGAIN)
94  ((term) ETERM))
95
96;; helpers
97
98(define (zmq-error location)
99  (let ((errno (foreign-value errno int)))
100    (error location
101           ((foreign-lambda c-string zmq_strerror int) errno)
102           errno)))
103
104(define (errno)
105  (foreign-value errno errno))
106
107(define (type-error value expected-type)
108  (error (format "invalid value: ~S (expected ~A)" value expected-type)))
109
110(define (zmq-version)
111  (let-location ((major int) (minor int) (patch int))
112    ((foreign-lambda void zmq_version (c-pointer int) (c-pointer int) (c-pointer int))
113     (location major) (location minor) (location patch))
114    (list major minor patch)))
115
116;; contexts
117
118(define zmq-io-threads (make-parameter 1))
119
120(define zmq-default-context (make-parameter #f))
121
122(define (zmq-default-context/initialize)
123  (or (zmq-default-context)
124      (begin (zmq-default-context (make-context (zmq-io-threads)))
125             (zmq-default-context))))
126
127(define %make-context make-context)
128
129(define (make-context io-threads)
130  (let ((c (%make-context ((foreign-lambda context zmq_init int) io-threads)
131                          (make-mutex))))
132    (if (not (context-pointer c))
133        (zmq-error 'make-context)
134        (begin
135          (mutex-specific-set! (context-sockets c) '())
136          (set-finalizer! c (lambda (c)
137                              (for-each close-socket (mutex-specific (context-sockets c)))
138                              (terminate-context c)))))))
139
140(define (terminate-context ctx)
141  (or (zero? ((foreign-lambda int zmq_term context)
142              (context-pointer ctx)))
143      (zmq-error 'terminate-context)))
144
145;; messages
146
147(define (initialize-message message #!optional data)
148  (if (zero? (if data
149                 (begin
150                  (unless (or (string? data) (blob? data))
151                    (type-error data "string or blob"))
152                  (let* ((len (number-of-bytes data))
153                         (cdata (allocate len)))
154                    ((foreign-lambda void "C_memcpy" c-pointer scheme-pointer int)
155                     cdata data len)
156                    ((foreign-lambda int
157                                     zmq_msg_init_data
158                                     message
159                                     c-pointer
160                                     unsigned-int
161                                     c-pointer
162                                     c-pointer)
163                     message
164                     cdata
165                     len
166                     (foreign-value "C_free" c-pointer)
167                     #f)))
168                 ((foreign-lambda int zmq_msg_init message) message)))
169      message
170      (zmq-error 'initialize-message)))
171
172(define (close-message message)
173  (or (zero? ((foreign-lambda int zmq_msg_close message) message))
174      (zmq-error 'close-message)))
175
176(define (message-size message)
177  ((foreign-lambda unsigned-integer zmq_msg_size message) message))
178
179(define (message-data message type)
180  (let* ((size (message-size message))
181         (ptr ((foreign-lambda c-pointer zmq_msg_data message) message)))
182
183    (cond ((symbol? type)
184           (case type
185             ((string)
186              (let ((str (make-string size)))
187                (move-memory! ptr str size)
188                str))
189             ((blob)
190              (let ((blob (make-blob size)))
191                (move-memory! ptr blob size)
192                blob))
193             (else (error 'message-data "invalid message data type" type))))
194          ((procedure? type)
195           (type ptr size))
196          (else (error 'message-data "invalid message data type" type)))))
197
198;; sockets
199
200(define %make-socket make-socket)
201
202(define (make-socket type #!optional (context (zmq-default-context/initialize)))
203  (let ((sp ((foreign-lambda socket zmq_socket context socket-type)
204             (context-pointer context) type)))
205    (if (not sp)
206        (zmq-error 'make-socket)
207        (let ((m (context-sockets context))
208              (s (%make-socket sp
209                               (make-mutex)
210                               (allocate (foreign-value "sizeof(zmq_msg_t)" int)))))
211
212          (mutex-lock! m)
213          (mutex-specific-set! m (cons sp (mutex-specific m)))
214          (mutex-unlock! m)
215          (set-finalizer! s (lambda (s)
216                              (free (socket-message s))
217                              (close-socket s)))))))
218
219(define (close-socket socket)
220  (let ((sp (cond ((socket? socket) (socket-pointer socket))
221                  ((pointer? socket) socket)
222                  (else (type-error socket 'socket)))))
223
224    (when sp
225      (if (zero? ((foreign-lambda int zmq_close socket) sp))
226          (when (socket? socket) (socket-pointer-set! socket #f))
227          (zmq-error 'close-socket)))))
228
229(define (bind-socket socket endpoint)
230  (or (zero? ((foreign-lambda int zmq_bind socket c-string)
231              (socket-pointer socket)
232              endpoint))
233      (zmq-error 'bind-socket)))
234
235(define (connect-socket socket endpoint)
236  (or (zero? ((foreign-lambda int zmq_connect socket c-string)
237              (socket-pointer socket)
238              endpoint))
239      (zmq-error 'connect-socket)))
240
241;; integer64 is used instead of unsigned-integer64 for uint64_t
242;; options since the latter has only been added to the experimental
243;; branch recently. Also, we must use foreign-lambda* to be able to
244;; pass in integer64 values because let-location doesn't accept
245;; integer64 (also fixed in experimental)
246
247(define (socket-option-set! socket option value)
248  (or (zero? (case option
249               ((rcvhwm sndhwm affinity sndbuf rcvbuf rate recovery-ivl curve/server )
250                (if (integer? value)
251                    ((foreign-safe-lambda* int
252                                           ((scheme-object error)
253                                            (scheme-object error_location)
254                                            (socket socket)
255                                            (socket-option option)
256                                            (int value))
257                                           "size_t size = sizeof(value);
258                                            int status = zmq_setsockopt(socket, option, &value, size);
259                                           if (status == 0) {
260                                             C_return(0);
261                                           } else {
262                                             C_save(error_location);
263                                             C_callback(error, 1);
264                                           }")
265                     zmq-error 'socket-option-set! (socket-pointer socket) option value)
266                    (type-error value 'integer)))
267
268               ((identity subscribe unsubscribe curve/pubkey curve/secretkey curve/serverkey)
269                (if (string? value)
270                    (let ((status ((foreign-lambda int zmq_setsockopt socket socket-option c-string unsigned-int)
271                                   (socket-pointer socket) option value (number-of-bytes value))))
272                      (if (not (zero? status)) (zmq-error 'socket-option-set!) status))
273                    (type-error value 'string)))
274
275               ((curve/pubkey-blob curve/secretkey-blob curve/serverkey-blob)
276                (if (blob? value)
277                    (let ((status ((foreign-lambda int zmq_setsockopt socket socket-option blob unsigned-int)
278                                   (socket-pointer socket) option value (blob-size value))))
279                      (if (not (zero? status)) (zmq-error 'socket-option-set!) status))
280                    (type-error value 'blob)))
281
282               (else (error (format "unknown socket option: ~A" option)))))
283      (zmq-error 'socket-option-set!)))
284
285(define-syntax %socket-option
286  (er-macro-transformer
287  (lambda (e r c)
288    (let ((location (second e))
289          (f-type (third e))
290          (c-type (fourth e))
291          (socket (fifth e))
292          (option (sixth e)))
293      `((,(r 'foreign-safe-lambda*) ,f-type ((scheme-object error)
294                                             (scheme-object error_location)
295                                             (socket socket)
296                                             (socket-option option))
297         ,(string-append c-type " value;
298                                  size_t size = sizeof(value);
299                                  int status = zmq_getsockopt(socket, option, &value, &size);
300                                  if (status == 0) {
301                                    C_return(value);
302                                  } else {
303                                    C_save(error_location);
304                                    C_callback(error, 1);
305                                  }"))
306        ,(r 'zmq-error) ,location (,(r 'socket-pointer) ,socket) ,option)))))
307
308(define (socket-fd socket)
309  (%socket-option 'socket-fd int "int" socket 'fd))
310
311(define socket-option
312  (let ((routing-id (make-string 255)))
313    (lambda (socket option)
314      (case option
315        ((identity)
316         (let-location
317          ((size unsigned-integer64 255))
318          (if (zero? ((foreign-lambda int zmq_getsockopt socket socket-option scheme-pointer
319                                      (c-pointer unsigned-integer64))
320                      (socket-pointer socket) option routing-id (location size)))
321              (substring routing-id 0 size)
322              (zmq-error 'socket-option))))
323        (else
324         (cond
325
326          ((memq option (alist-ref 'integer socket-options))
327           (%socket-option 'socket-option int "int" socket option))
328
329          ((memq option (alist-ref 'boolean socket-options))
330           (%socket-option 'socket-option bool "int" socket option))
331
332          (else
333           (error (format "socket option ~A is not retrievable" option)))))
334        ))
335    ))
336
337
338;; communication
339
340(define (send-message socket data #!key non-blocking send-more)
341  (mutex-lock! (socket-mutex socket))
342  (let* ((message (initialize-message (socket-message socket) data))
343         (result ((foreign-lambda int zmq_msg_send message socket int)
344                  message
345                  (socket-pointer socket)
346                  (bitwise-ior (if non-blocking zmq/noblock  0)
347                               (if send-more zmq/sndmore 0)))))
348
349    (close-message message)
350    (mutex-unlock! (socket-mutex socket))
351    (if (< result 0) (zmq-error 'send-message))))
352
353
354(define (receive-message socket #!key non-blocking (as 'string))
355  (mutex-lock! (socket-mutex socket))
356  (let* ((message (initialize-message (socket-message socket)))
357         (result ((foreign-lambda int zmq_msg_recv message socket int)
358                  message
359                  (socket-pointer socket)
360                  (if non-blocking zmq/noblock 0))))
361
362    (if (>= result 0)
363        (let ((data (message-data message as)))
364          (mutex-unlock! (socket-mutex socket))
365          (close-message message)
366          data)
367        (begin
368          (mutex-unlock! (socket-mutex socket))
369          (close-message message)
370          (if (memq (errno) '(again term))
371              #f
372              (zmq-error 'receive-message))))))
373
374(define (receive-message* socket #!key (as 'string))
375  (or (receive-message socket non-blocking: #t as: as)
376      (begin
377        (thread-wait-for-i/o! (socket-fd socket) #:input)
378        (receive-message* socket as: as))))
379
380;; polling
381
382(define %make-poll-item make-poll-item)
383
384(define (make-poll-item socket/fd #!key in out)
385  (let ((item (%make-poll-item (make-foreign-poll-item)
386                               (and (socket? socket/fd) socket/fd)
387                               in out)))
388    (if (socket? socket/fd)
389        (%poll-item-socket-set! (poll-item-pointer item) (socket-pointer socket/fd))
390        (%poll-item-fd-set! (poll-item-pointer item) socket/fd))
391
392    (%poll-item-events-set! (poll-item-pointer item)
393                            (bitwise-ior (if in zmq/pollin 0)
394                                         (if out zmq/pollout 0)))
395
396    (%poll-item-revents-set! (poll-item-pointer item) 0)
397
398    (set-finalizer! item (lambda (i)
399                           (free-foreign-poll-item (poll-item-pointer i))))))
400
401(define (poll-item-fd item)
402  (%poll-item-fd (poll-item-pointer item)))
403
404(define (poll-item-revents item)
405  (%poll-item-revents (poll-item-pointer item)))
406
407(define (poll-item-in? item)
408  (not (zero? (bitwise-and zmq/pollin (poll-item-revents item)))))
409
410(define (poll-item-out? item)
411  (not (zero? (bitwise-and zmq/pollout (poll-item-revents item)))))
412
413(define (poll-item-error? item)
414  (not (zero? (bitwise-and zmq/pollerr (poll-item-revents item)))))
415
416(define %poll-sockets
417  (foreign-safe-lambda* int
418                        ((scheme-object poll_item_ref)
419                         (unsigned-int length)
420                         (long timeout))
421                        "zmq_pollitem_t items[length];
422                         zmq_pollitem_t *item_ptrs[length];
423                         int i;
424
425                         for (i = 0; i < length; i++) {
426                           C_save(C_fix(i));
427                           item_ptrs[i] = (zmq_pollitem_t *)C_pointer_address(C_callback(poll_item_ref, 1));
428                         }
429
430                         for (i = 0; i < length; i++) {
431                           items[i] = *item_ptrs[i];
432                         }
433
434                         int rc = zmq_poll(items, length, timeout);
435
436                         if (rc != -1) {
437                           for (i = 0; i < length; i++) {
438                             (*item_ptrs[i]).revents = items[i].revents;
439                           }
440                         }
441
442                         C_return(rc);"))
443
444(define (poll poll-items timeout/block)
445  (if (null? poll-items)
446      (error 'poll "null list passed for poll-items")
447      (let ((result (%poll-sockets (lambda (i)
448                                     (poll-item-pointer (list-ref poll-items i)))
449                                   (length poll-items)
450                                   (case timeout/block
451                                     ((#f) 0)
452                                     ((#t) -1)
453                                     (else timeout/block)))))
454        (if (= result -1)
455            (zmq-error 'poll)
456            result))))
457
458(define (curve-keypair)
459  (let-values (((pk sk)
460                ((foreign-primitive ()
461                     "char public_key [41];
462                      char secret_key [41];
463                      int rc = zmq_curve_keypair (public_key, secret_key);
464
465                      C_word* pkbuf = C_alloc(41);
466                      C_word* skbuf = C_alloc(41);
467                      C_word pkstr;
468                      C_word skstr;
469
470                      pkstr = C_string2(&pkbuf, public_key);
471                      skstr = C_string2(&skbuf, secret_key);
472
473                      C_word vals[4] = { C_SCHEME_UNDEFINED, C_k, pkstr, skstr };
474                      C_values(4, vals);\n"))))
475    (values pk sk)))
476)