1 | (module zmq |
---|
2 | |
---|
3 | (zmq-default-context zmq-io-threads zmq-version |
---|
4 | make-context terminate-context context? |
---|
5 | make-socket socket? close-socket bind-socket connect-socket |
---|
6 | socket-option-set! socket-option socket-fd socket-pointer |
---|
7 | send-message receive-message receive-message* |
---|
8 | make-poll-item poll poll-item-socket |
---|
9 | poll-item-fd poll-item-in? poll-item-out? poll-item-error? |
---|
10 | curve-keypair) |
---|
11 | |
---|
12 | (import scheme (chicken base) (chicken foreign) |
---|
13 | (chicken bitwise) (chicken memory) (chicken blob) |
---|
14 | (chicken memory representation) (chicken gc) |
---|
15 | (chicken format) |
---|
16 | srfi-1 srfi-4 srfi-18 srfi-13 foreigners) |
---|
17 | |
---|
18 | (import-for-syntax srfi-1) |
---|
19 | |
---|
20 | (foreign-declare "#include <zmq.h>") |
---|
21 | (foreign-declare "#include <errno.h>") |
---|
22 | |
---|
23 | (define-record context pointer sockets) |
---|
24 | (define-foreign-type context c-pointer) |
---|
25 | |
---|
26 | (define-foreign-type message (c-pointer "zmq_msg_t")) |
---|
27 | |
---|
28 | (define-record socket pointer mutex message) |
---|
29 | (define-foreign-type socket c-pointer) |
---|
30 | |
---|
31 | (define-foreign-enum-type (socket-type int) |
---|
32 | (socket-type->int int->socket-type) |
---|
33 | ((pair) ZMQ_PAIR) |
---|
34 | ((pub) ZMQ_PUB) |
---|
35 | ((sub) ZMQ_SUB) |
---|
36 | ((req) ZMQ_REQ) |
---|
37 | ((rep) ZMQ_REP) |
---|
38 | ((xreq) ZMQ_XREQ) |
---|
39 | ((xrep) ZMQ_XREP) |
---|
40 | ((pull) ZMQ_PULL) |
---|
41 | ((push) ZMQ_PUSH)) |
---|
42 | |
---|
43 | (define-foreign-enum-type (socket-option int) |
---|
44 | (socket-option->int int->socket-option) |
---|
45 | ((affinity) ZMQ_AFFINITY) |
---|
46 | ((sndhwm) ZMQ_SNDHWM) |
---|
47 | ((rcvhwm) ZMQ_RCVHWM) |
---|
48 | ((identity) ZMQ_IDENTITY) |
---|
49 | ((subscribe) ZMQ_SUBSCRIBE) |
---|
50 | ((unsubscribe) ZMQ_UNSUBSCRIBE) |
---|
51 | ((rate) ZMQ_RATE) |
---|
52 | ((recovery-ivl) ZMQ_RECOVERY_IVL) |
---|
53 | ((sndbuf) ZMQ_SNDBUF) |
---|
54 | ((rcvbuf) ZMQ_RCVBUF) |
---|
55 | ((rcvmore) ZMQ_RCVMORE) |
---|
56 | ((fd) ZMQ_FD) |
---|
57 | ((curve/pubkey) ZMQ_CURVE_PUBLICKEY) |
---|
58 | ((curve/pubkey-blob) ZMQ_CURVE_PUBLICKEY) |
---|
59 | ((curve/secretkey) ZMQ_CURVE_SECRETKEY) |
---|
60 | ((curve/secretkey-blob) ZMQ_CURVE_SECRETKEY) |
---|
61 | ((curve/server) ZMQ_CURVE_SERVER) |
---|
62 | ((curve/serverkey) ZMQ_CURVE_SERVERKEY) |
---|
63 | ((curve/serverkey-blob) ZMQ_CURVE_SERVERKEY)) |
---|
64 | |
---|
65 | |
---|
66 | (define socket-options |
---|
67 | '((integer sndhwm rcvhwm affinity rate recovery-ivl sndbuf rcvbuf) |
---|
68 | (boolean rcvmore) |
---|
69 | (string subscribe unsubscribe identity))) |
---|
70 | |
---|
71 | (define-foreign-enum-type (socket-flag int) |
---|
72 | (socket-flag->int int->socket-flag) |
---|
73 | ((noblock zmq/noblock) ZMQ_NOBLOCK) |
---|
74 | ((sndmore zmq/sndmore) ZMQ_SNDMORE)) |
---|
75 | |
---|
76 | (define-foreign-enum-type (poll-flag short) |
---|
77 | (poll-flat->int short->poll-int) |
---|
78 | ((in zmq/pollin) ZMQ_POLLIN) |
---|
79 | ((out zmq/pollout) ZMQ_POLLOUT) |
---|
80 | ((err zmq/pollerr) ZMQ_POLLERR)) |
---|
81 | |
---|
82 | (define-record poll-item pointer socket in out) |
---|
83 | (define-foreign-record-type (poll-item zmq_pollitem_t) |
---|
84 | (constructor: make-foreign-poll-item) |
---|
85 | (destructor: free-foreign-poll-item) |
---|
86 | (socket socket %poll-item-socket %poll-item-socket-set!) |
---|
87 | (int fd %poll-item-fd %poll-item-fd-set!) |
---|
88 | (short events %poll-item-events %poll-item-events-set!) |
---|
89 | (short revents %poll-item-revents %poll-item-revents-set!)) |
---|
90 | |
---|
91 | (define-foreign-enum-type (errno int) |
---|
92 | (errno->int int->errno) |
---|
93 | ((again) EAGAIN) |
---|
94 | ((term) ETERM)) |
---|
95 | |
---|
96 | ;; helpers |
---|
97 | |
---|
98 | (define (zmq-error location) |
---|
99 | (let ((errno (foreign-value errno int))) |
---|
100 | (error location |
---|
101 | ((foreign-lambda c-string zmq_strerror int) errno) |
---|
102 | errno))) |
---|
103 | |
---|
104 | (define (errno) |
---|
105 | (foreign-value errno errno)) |
---|
106 | |
---|
107 | (define (type-error value expected-type) |
---|
108 | (error (format "invalid value: ~S (expected ~A)" value expected-type))) |
---|
109 | |
---|
110 | (define (zmq-version) |
---|
111 | (let-location ((major int) (minor int) (patch int)) |
---|
112 | ((foreign-lambda void zmq_version (c-pointer int) (c-pointer int) (c-pointer int)) |
---|
113 | (location major) (location minor) (location patch)) |
---|
114 | (list major minor patch))) |
---|
115 | |
---|
116 | ;; contexts |
---|
117 | |
---|
118 | (define zmq-io-threads (make-parameter 1)) |
---|
119 | |
---|
120 | (define zmq-default-context (make-parameter #f)) |
---|
121 | |
---|
122 | (define (zmq-default-context/initialize) |
---|
123 | (or (zmq-default-context) |
---|
124 | (begin (zmq-default-context (make-context (zmq-io-threads))) |
---|
125 | (zmq-default-context)))) |
---|
126 | |
---|
127 | (define %make-context make-context) |
---|
128 | |
---|
129 | (define (make-context io-threads) |
---|
130 | (let ((c (%make-context ((foreign-lambda context zmq_init int) io-threads) |
---|
131 | (make-mutex)))) |
---|
132 | (if (not (context-pointer c)) |
---|
133 | (zmq-error 'make-context) |
---|
134 | (begin |
---|
135 | (mutex-specific-set! (context-sockets c) '()) |
---|
136 | (set-finalizer! c (lambda (c) |
---|
137 | (for-each close-socket (mutex-specific (context-sockets c))) |
---|
138 | (terminate-context c))))))) |
---|
139 | |
---|
140 | (define (terminate-context ctx) |
---|
141 | (or (zero? ((foreign-lambda int zmq_term context) |
---|
142 | (context-pointer ctx))) |
---|
143 | (zmq-error 'terminate-context))) |
---|
144 | |
---|
145 | ;; messages |
---|
146 | |
---|
147 | (define (initialize-message message #!optional data) |
---|
148 | (if (zero? (if data |
---|
149 | (begin |
---|
150 | (unless (or (string? data) (blob? data)) |
---|
151 | (type-error data "string or blob")) |
---|
152 | (let* ((len (number-of-bytes data)) |
---|
153 | (cdata (allocate len))) |
---|
154 | ((foreign-lambda void "C_memcpy" c-pointer scheme-pointer int) |
---|
155 | cdata data len) |
---|
156 | ((foreign-lambda int |
---|
157 | zmq_msg_init_data |
---|
158 | message |
---|
159 | c-pointer |
---|
160 | unsigned-int |
---|
161 | c-pointer |
---|
162 | c-pointer) |
---|
163 | message |
---|
164 | cdata |
---|
165 | len |
---|
166 | (foreign-value "C_free" c-pointer) |
---|
167 | #f))) |
---|
168 | ((foreign-lambda int zmq_msg_init message) message))) |
---|
169 | message |
---|
170 | (zmq-error 'initialize-message))) |
---|
171 | |
---|
172 | (define (close-message message) |
---|
173 | (or (zero? ((foreign-lambda int zmq_msg_close message) message)) |
---|
174 | (zmq-error 'close-message))) |
---|
175 | |
---|
176 | (define (message-size message) |
---|
177 | ((foreign-lambda unsigned-integer zmq_msg_size message) message)) |
---|
178 | |
---|
179 | (define (message-data message type) |
---|
180 | (let* ((size (message-size message)) |
---|
181 | (ptr ((foreign-lambda c-pointer zmq_msg_data message) message))) |
---|
182 | |
---|
183 | (cond ((symbol? type) |
---|
184 | (case type |
---|
185 | ((string) |
---|
186 | (let ((str (make-string size))) |
---|
187 | (move-memory! ptr str size) |
---|
188 | str)) |
---|
189 | ((blob) |
---|
190 | (let ((blob (make-blob size))) |
---|
191 | (move-memory! ptr blob size) |
---|
192 | blob)) |
---|
193 | (else (error 'message-data "invalid message data type" type)))) |
---|
194 | ((procedure? type) |
---|
195 | (type ptr size)) |
---|
196 | (else (error 'message-data "invalid message data type" type))))) |
---|
197 | |
---|
198 | ;; sockets |
---|
199 | |
---|
200 | (define %make-socket make-socket) |
---|
201 | |
---|
202 | (define (make-socket type #!optional (context (zmq-default-context/initialize))) |
---|
203 | (let ((sp ((foreign-lambda socket zmq_socket context socket-type) |
---|
204 | (context-pointer context) type))) |
---|
205 | (if (not sp) |
---|
206 | (zmq-error 'make-socket) |
---|
207 | (let ((m (context-sockets context)) |
---|
208 | (s (%make-socket sp |
---|
209 | (make-mutex) |
---|
210 | (allocate (foreign-value "sizeof(zmq_msg_t)" int))))) |
---|
211 | |
---|
212 | (mutex-lock! m) |
---|
213 | (mutex-specific-set! m (cons sp (mutex-specific m))) |
---|
214 | (mutex-unlock! m) |
---|
215 | (set-finalizer! s (lambda (s) |
---|
216 | (free (socket-message s)) |
---|
217 | (close-socket s))))))) |
---|
218 | |
---|
219 | (define (close-socket socket) |
---|
220 | (let ((sp (cond ((socket? socket) (socket-pointer socket)) |
---|
221 | ((pointer? socket) socket) |
---|
222 | (else (type-error socket 'socket))))) |
---|
223 | |
---|
224 | (when sp |
---|
225 | (if (zero? ((foreign-lambda int zmq_close socket) sp)) |
---|
226 | (when (socket? socket) (socket-pointer-set! socket #f)) |
---|
227 | (zmq-error 'close-socket))))) |
---|
228 | |
---|
229 | (define (bind-socket socket endpoint) |
---|
230 | (or (zero? ((foreign-lambda int zmq_bind socket c-string) |
---|
231 | (socket-pointer socket) |
---|
232 | endpoint)) |
---|
233 | (zmq-error 'bind-socket))) |
---|
234 | |
---|
235 | (define (connect-socket socket endpoint) |
---|
236 | (or (zero? ((foreign-lambda int zmq_connect socket c-string) |
---|
237 | (socket-pointer socket) |
---|
238 | endpoint)) |
---|
239 | (zmq-error 'connect-socket))) |
---|
240 | |
---|
241 | ;; integer64 is used instead of unsigned-integer64 for uint64_t |
---|
242 | ;; options since the latter has only been added to the experimental |
---|
243 | ;; branch recently. Also, we must use foreign-lambda* to be able to |
---|
244 | ;; pass in integer64 values because let-location doesn't accept |
---|
245 | ;; integer64 (also fixed in experimental) |
---|
246 | |
---|
247 | (define (socket-option-set! socket option value) |
---|
248 | (or (zero? (case option |
---|
249 | ((rcvhwm sndhwm affinity sndbuf rcvbuf rate recovery-ivl curve/server ) |
---|
250 | (if (integer? value) |
---|
251 | ((foreign-safe-lambda* int |
---|
252 | ((scheme-object error) |
---|
253 | (scheme-object error_location) |
---|
254 | (socket socket) |
---|
255 | (socket-option option) |
---|
256 | (int value)) |
---|
257 | "size_t size = sizeof(value); |
---|
258 | int status = zmq_setsockopt(socket, option, &value, size); |
---|
259 | if (status == 0) { |
---|
260 | C_return(0); |
---|
261 | } else { |
---|
262 | C_save(error_location); |
---|
263 | C_callback(error, 1); |
---|
264 | }") |
---|
265 | zmq-error 'socket-option-set! (socket-pointer socket) option value) |
---|
266 | (type-error value 'integer))) |
---|
267 | |
---|
268 | ((identity subscribe unsubscribe curve/pubkey curve/secretkey curve/serverkey) |
---|
269 | (if (string? value) |
---|
270 | (let ((status ((foreign-lambda int zmq_setsockopt socket socket-option c-string unsigned-int) |
---|
271 | (socket-pointer socket) option value (number-of-bytes value)))) |
---|
272 | (if (not (zero? status)) (zmq-error 'socket-option-set!) status)) |
---|
273 | (type-error value 'string))) |
---|
274 | |
---|
275 | ((curve/pubkey-blob curve/secretkey-blob curve/serverkey-blob) |
---|
276 | (if (blob? value) |
---|
277 | (let ((status ((foreign-lambda int zmq_setsockopt socket socket-option blob unsigned-int) |
---|
278 | (socket-pointer socket) option value (blob-size value)))) |
---|
279 | (if (not (zero? status)) (zmq-error 'socket-option-set!) status)) |
---|
280 | (type-error value 'blob))) |
---|
281 | |
---|
282 | (else (error (format "unknown socket option: ~A" option))))) |
---|
283 | (zmq-error 'socket-option-set!))) |
---|
284 | |
---|
285 | (define-syntax %socket-option |
---|
286 | (er-macro-transformer |
---|
287 | (lambda (e r c) |
---|
288 | (let ((location (second e)) |
---|
289 | (f-type (third e)) |
---|
290 | (c-type (fourth e)) |
---|
291 | (socket (fifth e)) |
---|
292 | (option (sixth e))) |
---|
293 | `((,(r 'foreign-safe-lambda*) ,f-type ((scheme-object error) |
---|
294 | (scheme-object error_location) |
---|
295 | (socket socket) |
---|
296 | (socket-option option)) |
---|
297 | ,(string-append c-type " value; |
---|
298 | size_t size = sizeof(value); |
---|
299 | int status = zmq_getsockopt(socket, option, &value, &size); |
---|
300 | if (status == 0) { |
---|
301 | C_return(value); |
---|
302 | } else { |
---|
303 | C_save(error_location); |
---|
304 | C_callback(error, 1); |
---|
305 | }")) |
---|
306 | ,(r 'zmq-error) ,location (,(r 'socket-pointer) ,socket) ,option))))) |
---|
307 | |
---|
308 | (define (socket-fd socket) |
---|
309 | (%socket-option 'socket-fd int "int" socket 'fd)) |
---|
310 | |
---|
311 | (define socket-option |
---|
312 | (let ((routing-id (make-string 255))) |
---|
313 | (lambda (socket option) |
---|
314 | (case option |
---|
315 | ((identity) |
---|
316 | (let-location |
---|
317 | ((size unsigned-integer64 255)) |
---|
318 | (if (zero? ((foreign-lambda int zmq_getsockopt socket socket-option scheme-pointer |
---|
319 | (c-pointer unsigned-integer64)) |
---|
320 | (socket-pointer socket) option routing-id (location size))) |
---|
321 | (substring routing-id 0 size) |
---|
322 | (zmq-error 'socket-option)))) |
---|
323 | (else |
---|
324 | (cond |
---|
325 | |
---|
326 | ((memq option (alist-ref 'integer socket-options)) |
---|
327 | (%socket-option 'socket-option int "int" socket option)) |
---|
328 | |
---|
329 | ((memq option (alist-ref 'boolean socket-options)) |
---|
330 | (%socket-option 'socket-option bool "int" socket option)) |
---|
331 | |
---|
332 | (else |
---|
333 | (error (format "socket option ~A is not retrievable" option))))) |
---|
334 | )) |
---|
335 | )) |
---|
336 | |
---|
337 | |
---|
338 | ;; communication |
---|
339 | |
---|
340 | (define (send-message socket data #!key non-blocking send-more) |
---|
341 | (mutex-lock! (socket-mutex socket)) |
---|
342 | (let* ((message (initialize-message (socket-message socket) data)) |
---|
343 | (result ((foreign-lambda int zmq_msg_send message socket int) |
---|
344 | message |
---|
345 | (socket-pointer socket) |
---|
346 | (bitwise-ior (if non-blocking zmq/noblock 0) |
---|
347 | (if send-more zmq/sndmore 0))))) |
---|
348 | |
---|
349 | (close-message message) |
---|
350 | (mutex-unlock! (socket-mutex socket)) |
---|
351 | (if (< result 0) (zmq-error 'send-message)))) |
---|
352 | |
---|
353 | |
---|
354 | (define (receive-message socket #!key non-blocking (as 'string)) |
---|
355 | (mutex-lock! (socket-mutex socket)) |
---|
356 | (let* ((message (initialize-message (socket-message socket))) |
---|
357 | (result ((foreign-lambda int zmq_msg_recv message socket int) |
---|
358 | message |
---|
359 | (socket-pointer socket) |
---|
360 | (if non-blocking zmq/noblock 0)))) |
---|
361 | |
---|
362 | (if (>= result 0) |
---|
363 | (let ((data (message-data message as))) |
---|
364 | (mutex-unlock! (socket-mutex socket)) |
---|
365 | (close-message message) |
---|
366 | data) |
---|
367 | (begin |
---|
368 | (mutex-unlock! (socket-mutex socket)) |
---|
369 | (close-message message) |
---|
370 | (if (memq (errno) '(again term)) |
---|
371 | #f |
---|
372 | (zmq-error 'receive-message)))))) |
---|
373 | |
---|
374 | (define (receive-message* socket #!key (as 'string)) |
---|
375 | (or (receive-message socket non-blocking: #t as: as) |
---|
376 | (begin |
---|
377 | (thread-wait-for-i/o! (socket-fd socket) #:input) |
---|
378 | (receive-message* socket as: as)))) |
---|
379 | |
---|
380 | ;; polling |
---|
381 | |
---|
382 | (define %make-poll-item make-poll-item) |
---|
383 | |
---|
384 | (define (make-poll-item socket/fd #!key in out) |
---|
385 | (let ((item (%make-poll-item (make-foreign-poll-item) |
---|
386 | (and (socket? socket/fd) socket/fd) |
---|
387 | in out))) |
---|
388 | (if (socket? socket/fd) |
---|
389 | (%poll-item-socket-set! (poll-item-pointer item) (socket-pointer socket/fd)) |
---|
390 | (%poll-item-fd-set! (poll-item-pointer item) socket/fd)) |
---|
391 | |
---|
392 | (%poll-item-events-set! (poll-item-pointer item) |
---|
393 | (bitwise-ior (if in zmq/pollin 0) |
---|
394 | (if out zmq/pollout 0))) |
---|
395 | |
---|
396 | (%poll-item-revents-set! (poll-item-pointer item) 0) |
---|
397 | |
---|
398 | (set-finalizer! item (lambda (i) |
---|
399 | (free-foreign-poll-item (poll-item-pointer i)))))) |
---|
400 | |
---|
401 | (define (poll-item-fd item) |
---|
402 | (%poll-item-fd (poll-item-pointer item))) |
---|
403 | |
---|
404 | (define (poll-item-revents item) |
---|
405 | (%poll-item-revents (poll-item-pointer item))) |
---|
406 | |
---|
407 | (define (poll-item-in? item) |
---|
408 | (not (zero? (bitwise-and zmq/pollin (poll-item-revents item))))) |
---|
409 | |
---|
410 | (define (poll-item-out? item) |
---|
411 | (not (zero? (bitwise-and zmq/pollout (poll-item-revents item))))) |
---|
412 | |
---|
413 | (define (poll-item-error? item) |
---|
414 | (not (zero? (bitwise-and zmq/pollerr (poll-item-revents item))))) |
---|
415 | |
---|
416 | (define %poll-sockets |
---|
417 | (foreign-safe-lambda* int |
---|
418 | ((scheme-object poll_item_ref) |
---|
419 | (unsigned-int length) |
---|
420 | (long timeout)) |
---|
421 | "zmq_pollitem_t items[length]; |
---|
422 | zmq_pollitem_t *item_ptrs[length]; |
---|
423 | int i; |
---|
424 | |
---|
425 | for (i = 0; i < length; i++) { |
---|
426 | C_save(C_fix(i)); |
---|
427 | item_ptrs[i] = (zmq_pollitem_t *)C_pointer_address(C_callback(poll_item_ref, 1)); |
---|
428 | } |
---|
429 | |
---|
430 | for (i = 0; i < length; i++) { |
---|
431 | items[i] = *item_ptrs[i]; |
---|
432 | } |
---|
433 | |
---|
434 | int rc = zmq_poll(items, length, timeout); |
---|
435 | |
---|
436 | if (rc != -1) { |
---|
437 | for (i = 0; i < length; i++) { |
---|
438 | (*item_ptrs[i]).revents = items[i].revents; |
---|
439 | } |
---|
440 | } |
---|
441 | |
---|
442 | C_return(rc);")) |
---|
443 | |
---|
444 | (define (poll poll-items timeout/block) |
---|
445 | (if (null? poll-items) |
---|
446 | (error 'poll "null list passed for poll-items") |
---|
447 | (let ((result (%poll-sockets (lambda (i) |
---|
448 | (poll-item-pointer (list-ref poll-items i))) |
---|
449 | (length poll-items) |
---|
450 | (case timeout/block |
---|
451 | ((#f) 0) |
---|
452 | ((#t) -1) |
---|
453 | (else timeout/block))))) |
---|
454 | (if (= result -1) |
---|
455 | (zmq-error 'poll) |
---|
456 | result)))) |
---|
457 | |
---|
458 | (define (curve-keypair) |
---|
459 | (let-values (((pk sk) |
---|
460 | ((foreign-primitive () |
---|
461 | "char public_key [41]; |
---|
462 | char secret_key [41]; |
---|
463 | int rc = zmq_curve_keypair (public_key, secret_key); |
---|
464 | |
---|
465 | C_word* pkbuf = C_alloc(41); |
---|
466 | C_word* skbuf = C_alloc(41); |
---|
467 | C_word pkstr; |
---|
468 | C_word skstr; |
---|
469 | |
---|
470 | pkstr = C_string2(&pkbuf, public_key); |
---|
471 | skstr = C_string2(&skbuf, secret_key); |
---|
472 | |
---|
473 | C_word vals[4] = { C_SCHEME_UNDEFINED, C_k, pkstr, skstr }; |
---|
474 | C_values(4, vals);\n")))) |
---|
475 | (values pk sk))) |
---|
476 | ) |
---|