source: project/release/3/rpc/trunk/rpc.scm @ 9973

Last change on this file since 9973 was 9973, checked in by Kon Lovett, 12 years ago

Using canonical directory structure.

File size: 5.8 KB
Line 
1;;;; rpc.scm
2;;;; A flexible peer-to-peer RPC system
3
4(define-extension rpc
5  (export
6   rpc:publish-procedure! rpc:withdraw-procedure!
7   rpc:default-server-port rpc:connect-procedure
8   rpc:is-connected? rpc:get-connection
9   rpc:close-connection! rpc:close-all-connections!
10   rpc:current-peer
11   rpc:procedure
12   rpc:make-server))
13
14(eval-when (compile)
15  (declare
16    (usual-integrations)))
17
18(use extras match srfi-18 srfi-69 tcp tcp-server s11n)
19
20;;; Published procedures management
21
22(define name->procedure-map
23  (make-mutex 'name->procedure-map))
24(mutex-specific-set!
25 name->procedure-map
26 (make-hash-table equal?))
27
28(define procedure->name-map
29  (make-mutex 'procedure->name-map))
30(mutex-specific-set!
31 procedure->name-map
32 (make-hash-table eqv?))
33
34(eval-when (compile)
35  (define-macro (with-mutex mtx var . stuff)
36    `(let ((,var (mutex-specific ,mtx)))
37       (dynamic-wind
38           (cut mutex-lock! ,mtx)
39           (lambda ()
40             ,@stuff)
41           (cut mutex-unlock! ,mtx)))))
42
43(define (rpc:publish-procedure! name proc #!optional (callback-outgoing? #t))
44  (with-mutex name->procedure-map n->p
45    (hash-table-set! n->p name proc))
46  (if callback-outgoing?
47      (with-mutex procedure->name-map p->n
48        (hash-table-set! p->n proc name))))
49
50(define (rpc:withdraw-procedure! name-or-procedure)
51  (if (procedure? name-or-procedure)
52      (with-mutex procedure->name-map p->n
53        (let ((n (hash-table-ref p->n name-or-procedure)))
54          (hash-table-delete! p->n name-or-procedure)
55          (with-mutex name->procedure-map n->p
56            (hash-table-delete! n->p n))))
57      (with-mutex name->procedure-map n->p
58        (let ((p (hash-table-ref n->p name-or-procedure)))
59          (hash-table-delete! n->p name-or-procedure)
60          (with-mutex procedure->name-map p->n
61            (hash-table-delete! p->n p))))))
62
63;;; RPC connections management
64
65(define rpc:default-server-port
66  (make-parameter 29296))
67
68(define rpc:connect-procedure
69  (make-parameter tcp-connect))
70
71;; !!! Do make sure that this is set to a different value for every thread !!!
72(define connections
73  (make-parameter (make-hash-table equal?)))
74(define connections-owner
75  (make-parameter (current-thread)))
76
77(define (ensure-local-connections)
78  (unless (eq? (connections-owner) (current-thread))
79    (connections (make-hash-table equal?))
80    (connections-owner (current-thread))))
81
82(define (rpc:is-connected? host #!optional (port (rpc:default-server-port)))
83  (ensure-local-connections)
84  (hash-table-exists?
85   (connections) (if port (sprintf "~a:~a" host port) host)))
86
87(define (rpc:get-connection host #!optional (port (rpc:default-server-port)))
88  (ensure-local-connections)
89  (let ((key (if port (sprintf "~a:~a" host port) host)))
90    (apply
91     values
92     (hash-table-ref
93      (connections) key
94      (lambda ()
95        (let ((con (receive ((rpc:connect-procedure) host port))))
96          (hash-table-set! (connections) key con)
97          con))))))
98
99(define (rpc:close-connection! host #!optional (port (rpc:default-server-port)))
100  (ensure-local-connections)
101  (let ((key (if port (sprintf "~a:~a" host port) host)))
102    (let ((con (hash-table-ref (connections) key)))
103      (hash-table-delete! (connections) key)
104      (close-input-port (car con))
105      (close-output-port (cadr con)))))
106
107(define (rpc:close-all-connections!)
108  (ensure-local-connections)
109  (hash-table-walk
110   (connections)
111   (lambda (key con)
112      (hash-table-delete! (connections) key)
113      (close-input-port (car con))
114      (close-output-port (cadr con)))))
115
116;;; Client side functionality
117
118(define rpc:current-peer
119  (make-parameter #f))
120
121(define (make-callback name)
122  (lambda params
123    (apply (rpc:procedure name (rpc:current-peer)) params)))
124
125(define (rpc:procedure name host #!optional (port (rpc:default-server-port)))
126  (lambda params
127    (let-values (((in out) (rpc:get-connection host port)))
128      (serialize
129       (append
130        (list 'call name)
131        (map
132         (lambda (p)
133           (call-with-current-continuation
134            (lambda (q)
135              (with-mutex procedure->name-map p->n
136                (make-callback
137                 (hash-table-ref p->n p (lambda () (q p))))))))
138         params))
139       out)
140      (match (deserialize in)
141        (('results o e . r)
142         (display o (current-output-port))
143         (display e (current-error-port))
144         (apply values r))
145        (('exception . x)
146         (signal x))
147        (any
148         (signal
149          (make-composite-condition
150           (make-property-condition
151            'exn
152            'message "reply from RPC server not understood"
153            'location 'rpc:procedure
154            'arguments (list any))
155           (make-property-condition
156            'rpc))))))))
157
158;;; Server side functionality
159
160(define (rpc:make-server listener)
161  (make-tcp-server
162   listener
163   (lambda ()
164     (rpc:current-peer
165      (call-with-values
166          (cut (tcp-server-get-addresses-procedure) (current-input-port))
167        (project 1)))
168     (let loop ()
169       (let ((req (deserialize)))
170         (unless (eq? req (void))
171           (match req
172             (('call name . params)
173               (handle-exceptions ex
174                   (serialize (cons 'exception ex))
175                 (let* ((p (with-mutex name->procedure-map n->p
176                             (hash-table-ref
177                              n->p name
178                              (lambda ()
179                                (signal
180                                 (make-composite-condition
181                                  (make-property-condition
182                                   'exn
183                                   'message "no such public procedure"
184                                   'location 'rpc:server
185                                   'arguments (list name))
186                                  (make-property-condition
187                                   'rpc)))))))
188                        (o (open-output-string))
189                        (e (open-output-string))
190                        (r
191                         (parameterize ((current-input-port
192                                         (make-input-port
193                                          (constantly #!eof)
194                                          (constantly #f)
195                                          void
196                                          (constantly #!eof)))
197                                        (current-output-port o)
198                                        (current-error-port e))
199                           (receive (apply p params)))))
200                   (serialize
201                    (append
202                     (list
203                      'results
204                      (get-output-string o) (get-output-string e))
205                     r)))))
206             (any
207              (signal
208               (make-composite-condition
209                (make-property-condition
210                 'exn
211                 'message "request from RPC client not understood"
212                 'location 'rpc:server
213                 'arguments (list any))
214                (make-property-condition
215                 'rpc)))))
216           (loop)))))))
Note: See TracBrowser for help on using the repository browser.