source: project/release/4/ugarit/trunk/ugarit-backend.scm

Last change on this file was 25570, checked in by Alaric Snell-Pym, 10 years ago

ugarit: Version 2 of the backend protocol, supporting better reporting back to the user, and administrative interfaces. Backends outfitted with admin interfaces, and a ugarit-archive-admin tool added to drive them.

File size: 16.8 KB
Line 
1(module ugarit-backend
2        (make-storage ; Storage records
3         storage?
4         storage-max-block-size
5         storage-writable?
6         storage-unlinkable?
7         storage-put!
8         storage-flush!
9         storage-exists?
10         storage-get
11         storage-link!
12         storage-unlink!
13         storage-set-tag!
14         storage-tag
15         storage-all-tags
16         storage-remove-tag!
17         storage-lock-tag!
18         storage-tag-locked?
19         storage-unlock-tag!
20         storage-admin!
21         storage-close!
22
23         backend-log!
24
25         export-storage! ; Export a storage via stdin/stdout
26         export-storage-error!
27         import-storage ; Create a storage from a command line
28         )
29
30(import scheme)
31(import chicken)
32
33(use ports)
34(use matchable)
35(use posix)
36(use srfi-4)
37(use data-structures)
38(use miscmacros)
39
40; Backends can call the procedure found in this paramter to log
41; things. type should be 'warning, 'error or 'info. message should
42; be any string.
43(define backend-log! (make-parameter
44                      (lambda (type message)
45                        (error "No backend log handler has been defined"))))
46
47(define-record storage
48  max-block-size  ; Integer: largest size of block we can store
49  writable? ; Boolean: Can we call put!, link!, unlink!, set-tag!, lock-tag!, unlock-tag!?
50  unlinkable? ; Boolean: Can we call unlink?
51  put! ; Procedure: (put! key data type) - stores the data (u8vector) under the key (string) with the given type tag (symbol) and a refcount of 1. Does nothing of the key is already in use.
52  flush! ; Procedure: (flush!) - all previous changes must be flushed to disk by the time the continuation is applied.
53  exists? ; Procedure: (exists? key) - returns the type of the block with the given key if it exists, or #f otherwise
54  get ; Procedure: (get key) - returns the contents (u8vector) of the block with the given key (string) if it exists, or #f otherwise
55  link! ; Procedure: (link key) - increments the refcount of the block
56  unlink! ; Procedure: (unlink key) - decrements the refcount of the block. If it's now zero, deletes it but returns its value as a u8vector. If not, returns #f.
57  set-tag! ; Procedure: (set-tag! name key) - assigns the given key (string) to the given tag (named with a string). Creates a new tag if the name has not previously been used, otherwise updates an existing tag
58  tag ; Procedure: (tag name) - returns the key assigned to the given tag, or #f if it does not exist.
59  all-tags ; Procedure: (all-tags) - returns a list of all existing tag names
60  remove-tag! ; Procedure: (remove-tag! name) - removes the named tag
61  lock-tag! ; Procedure: (lock-tag! name) - locks the named tag, returning #t if all went well, or #f if it can't be locked.
62  tag-locked? ; Procedure: (tag-locked? name) - returns #t if the tag is locked, #f otherwise
63  unlock-tag! ; Procedure: (unlock-tag! name) - unlocks the named tag
64  admin! ; Procedure: (admin! command) - returns an alist
65  close!)  ; Procedure: (close!) - closes the storage engine
66
67(define *magic-v1* 'ugarit-backend-protocol-1)
68(define *magic-v2* 'ugarit-backend-protocol-2)
69
70(define (describe-exception exn)
71  (sprintf "~a: ~s in ~a"
72           ((condition-property-accessor 'exn 'message "Unknown error") exn)
73           ((condition-property-accessor 'exn 'arguments '()) exn)
74           ((condition-property-accessor 'exn 'location (void)) exn)))
75
76; Return the result of the body, and any logs
77(define-syntax-rule (with-error-reporting-and-result body ...)
78  (handle-exceptions
79   exn (write (list "error" (describe-exception exn)))
80   (let ((log (make-queue)))
81     (parameterize ((backend-log!
82                     (lambda (type message)
83                       (queue-add! log (cons type message))
84                       (void))))
85                   (let ((result (begin body ...)))
86                     (write (list (queue->list log) result)))))))
87
88; Return the result of the body as a data block, and any logs
89(define-syntax-rule (with-error-reporting-and-block body ...)
90  (handle-exceptions
91   exn (write (list "error" (describe-exception exn)))
92   (let ((log (make-queue)))
93     (parameterize ((backend-log!
94                     (lambda (type message)
95                       (queue-add! log (cons type message))
96                       (void))))
97                   (let ((result (begin body ...)))
98                     (if result
99                         (begin
100                           (write (list (queue->list log) (u8vector-length result)))
101                           (write-u8vector result))
102                         (write (list (queue->list log) #f))))))))
103
104; Return any logs
105(define-syntax-rule (with-error-reporting body ...)
106  (handle-exceptions
107   exn (write (list "error" (describe-exception exn)))
108   (let ((log (make-queue)))
109     (parameterize ((backend-log!
110                     (lambda (type message)
111                       (queue-add! log (cons type message))
112                       (void))))
113                   (let ((result (begin body ...)))
114                     (write (list (queue->list log))))))))
115
116(define (export-storage-error! message)
117  (set-buffering-mode! (current-output-port) #:none)
118
119  ; Write the error header
120  (write *magic-v2*) (newline)
121  (write (list "error" message)))
122
123;; Given a storage object, provide the storage remote access protocol
124;; via current-input-port / current-output-port until the storage is closed
125;; via the protocol.
126(define (export-storage! storage-thunk)
127  (set-buffering-mode! (current-output-port) #:none)
128
129  ; Write the header
130  (write *magic-v2*) (newline)
131  (let ((storage #f))
132
133    (with-error-reporting-and-result ; Initialise and send the header
134     (let ((storage* (storage-thunk)))
135       (set! storage storage*)        ; This feels hacky
136       (list (storage-max-block-size storage)
137             (storage-writable? storage)
138             (storage-unlinkable? storage))))
139
140                                        ; Engage command loop
141    (if storage
142        (let loop ()
143          (newline)
144          (let ((command (read)))
145            (if (eof-object? command)
146                (begin
147                  (with-error-reporting
148                   ((storage-close! storage)))
149                  (void))
150                (match
151                 command
152
153                 (('put! key type length)
154                  (let ((data (read-u8vector length)))
155                    (with-error-reporting
156                     ((storage-put! storage) key data type)))
157                  (loop))
158
159                 (('flush!)
160                  (with-error-reporting
161                   ((storage-flush! storage)))
162                  (loop))
163
164                 (('exists? key)
165                  (with-error-reporting-and-result
166                   ((storage-exists? storage) key))
167                  (loop))
168
169                 (('get key)
170                  (with-error-reporting-and-block
171                   ((storage-get storage) key))
172                  (loop))
173
174                 (('link! key)
175                  (with-error-reporting
176                   ((storage-link! storage) key))
177                  (loop))
178
179                 (('unlink! key)
180                  (with-error-reporting-and-block
181                   ((storage-unlink! storage) key))
182                  (loop))
183
184                 (('set-tag! name key)
185                  (with-error-reporting
186                   ((storage-set-tag! storage) name key))
187                  (loop))
188
189                 (('tag name)
190                  (with-error-reporting-and-result
191                   ((storage-tag storage) name))
192                  (loop))
193
194                 (('all-tags)
195                  (with-error-reporting-and-result
196                   ((storage-all-tags storage)))
197                  (loop))
198
199                 (('remove-tag! name)
200                  (with-error-reporting
201                   ((storage-remove-tag! storage) name))
202                  (loop))
203
204                 (('lock-tag! name)
205                  (with-error-reporting-and-result
206                   ((storage-lock-tag! storage) name))
207                  (loop))
208
209                 (('tag-locked? name)
210                  (with-error-reporting-and-result
211                   ((storage-tag-locked? storage) name))
212                  (loop))
213
214                 (('unlock-tag! name)
215                  (with-error-reporting
216                   ((storage-unlock-tag! storage) name))
217                  (loop))
218
219                 (('admin! command)
220                  (with-error-reporting-and-result
221                   ((storage-admin! storage) command))
222                  (loop))
223
224                 (('close!)
225                  (with-error-reporting
226                   ((storage-close! storage)))
227                  (void))
228
229                 (else
230                  (write (list "error" (sprintf "Bad command ~s" command)))
231                  (loop)))))))))
232
233(define (read-response-v1 port)
234  (let ((response (read port)))
235   (match response
236          (("error" err) (error "Backend protocol error" err))
237          (else response))))
238
239(define (read-response-v1-body port)
240  (let ((response (read-response-v1 port)))
241    (if response
242        (read-u8vector (car response) port)
243        #f)))
244
245(define (import-storage-v1 command-line debug responses commands pid)
246  (let ((header (read responses)))
247    (if debug (print "~a: read header" command-line header))
248    (if (not (list? header))
249        (error "Invalid backend protocol header" header))
250    (if (not (= (length header) 3))
251        (error "Invalid backend protocol header" header))
252    (let ((max-block-size (car header))
253          (writable? (cadr header))
254          (unlinkable? (caddr header)))
255      (make-storage
256       max-block-size
257       writable?
258       unlinkable?
259
260       (lambda (key data type)  ; put!
261         (if debug (printf "~a: put!" command-line))
262         (write `(put! ,key ,type ,(u8vector-length data)) commands)
263         (write-u8vector data commands)
264         (read-response-v1 responses)
265         (void))
266
267       (lambda ()                  ; flush!
268         (if debug (printf "~a: flush!" command-line))
269         (write `(flush!) commands)
270         (read-response-v1 responses)
271         (void))
272
273       (lambda (key)            ; exists?
274         (if debug (printf "~a: exists?" command-line))
275         (write `(exists? ,key) commands)
276         (read-response-v1 responses))
277
278       (lambda (key)            ; get
279         (if debug (printf "~a: get" command-line))
280         (write `(get ,key) commands)
281         (read-response-v1-body responses))
282
283       (lambda (key)            ; link!
284         (if debug (printf "~a: link!" command-line))
285         (write `(link! ,key) commands)
286         (read-response-v1 responses)
287         (void))
288
289       (lambda (key)            ; unlink!
290         (if debug (printf "~a: unlink! ~s" command-line key))
291         (write `(unlink! ,key) commands)
292         (read-response-v1-body responses))
293
294       (lambda (name key)               ; set-tag!
295         (if debug (printf "~a: set-tag!" command-line))
296         (write `(set-tag! ,name ,key) commands)
297         (read-response-v1 responses)
298         (void))
299
300       (lambda (name)           ; tag
301         (if debug (printf "~a: tag" command-line))
302         (write `(tag ,name) commands)
303         (read-response-v1 responses))
304
305       (lambda ()                       ; all-tags
306         (if debug (printf "~a: all-tags" command-line))
307         (write `(all-tags) commands)
308         (read-response-v1 responses))
309
310       (lambda (name)           ; remove-tag!
311         (if debug (printf "~a: remove-tag!" command-line))
312         (write `(remove-tag! ,name) commands)
313         (read-response-v1 responses)
314         (void))
315
316       (lambda (name)           ; lock-tag!
317         (if debug (printf "~a: lock-tag!" command-line))
318         (write `(lock-tag! ,name) commands)
319         (read-response-v1 responses))
320
321       (lambda (name)           ; tag-locked?
322         (if debug (printf "~a: tag-locked?" command-line))
323         (write `(tag-locked? ,name) commands)
324         (read-response-v1 responses))
325
326       (lambda (name)           ; unlock-tag!
327         (if debug (printf "~a: unlock-tag!" command-line))
328         (write `(unlock-tag! ,name) commands)
329         (read-response-v1 responses)
330         (void))
331
332       (lambda ()                       ; close!
333         (if debug (printf "~a: close!!" command-line))
334         (write '(close!) commands)
335         (read-response-v1 responses)
336         (close-input-port responses)
337         (close-output-port commands)
338         (void))))))
339
340(define (read-response-v2 port)
341  (let ((response (read port)))
342   (match response
343          (("error" err) (error (sprintf "Error from backend: ~s" err)))
344          ((log value)
345           (for-each (lambda (logentry)
346                       ((backend-log!) (car logentry) (cdr logentry)))
347                     log)
348           value)
349          ((log)
350           (for-each (lambda (logentry)
351                       ((backend-log!) (car logentry) (cdr logentry)))
352                     log)
353           (void))
354          (else (error "Malformed response from backend" response)))))
355
356(define (read-response-v2-body port)
357  (let ((length (read-response-v2 port)))
358    (if length
359        (read-u8vector length port)
360        #f)))
361
362(define (import-storage-v2 command-line debug responses commands pid)
363  (let ((header (read-response-v2 responses)))
364    (if debug (print "~a: read header" command-line header))
365    (if (not (list? header))
366        (error "Invalid backend protocol header" header))
367    (if (not (= (length header) 3))
368        (error "Invalid backend protocol header" header))
369    (let ((max-block-size (car header))
370          (writable? (cadr header))
371          (unlinkable? (caddr header)))
372      (make-storage
373       max-block-size
374       writable?
375       unlinkable?
376
377       (lambda (key data type)  ; put!
378         (if debug (printf "~a: put!" command-line))
379         (write `(put! ,key ,type ,(u8vector-length data)) commands)
380         (write-u8vector data commands)
381         (read-response-v2 responses)
382         (void))
383
384       (lambda ()                  ; flush!
385         (if debug (printf "~a: flush!" command-line))
386         (write `(flush!) commands)
387         (read-response-v2 responses)
388         (void))
389
390       (lambda (key)            ; exists?
391         (if debug (printf "~a: exists?" command-line))
392         (write `(exists? ,key) commands)
393         (read-response-v2 responses))
394
395       (lambda (key)            ; get
396         (if debug (printf "~a: get" command-line))
397         (write `(get ,key) commands)
398         (read-response-v2-body responses))
399
400       (lambda (key)            ; link!
401         (if debug (printf "~a: link!" command-line))
402         (write `(link! ,key) commands)
403         (read-response-v2 responses)
404         (void))
405
406       (lambda (key)            ; unlink!
407         (if debug (printf "~a: unlink! ~s" command-line key))
408         (write `(unlink! ,key) commands)
409         (read-response-v2-body responses))
410
411       (lambda (name key)               ; set-tag!
412         (if debug (printf "~a: set-tag!" command-line))
413         (write `(set-tag! ,name ,key) commands)
414         (read-response-v2 responses)
415         (void))
416
417       (lambda (name)           ; tag
418         (if debug (printf "~a: tag" command-line))
419         (write `(tag ,name) commands)
420         (read-response-v2 responses))
421
422       (lambda ()                       ; all-tags
423         (if debug (printf "~a: all-tags" command-line))
424         (write `(all-tags) commands)
425         (read-response-v2 responses))
426
427       (lambda (name)           ; remove-tag!
428         (if debug (printf "~a: remove-tag!" command-line))
429         (write `(remove-tag! ,name) commands)
430         (read-response-v2 responses)
431         (void))
432
433       (lambda (name)           ; lock-tag!
434         (if debug (printf "~a: lock-tag!" command-line))
435         (write `(lock-tag! ,name) commands)
436         (read-response-v2 responses))
437
438       (lambda (name)           ; tag-locked?
439         (if debug (printf "~a: tag-locked?" command-line))
440         (write `(tag-locked? ,name) commands)
441         (read-response-v2 responses))
442
443       (lambda (name)           ; unlock-tag!
444         (if debug (printf "~a: unlock-tag!" command-line))
445         (write `(unlock-tag! ,name) commands)
446         (read-response-v2 responses)
447         (void))
448
449       (lambda (command)                ; admin!
450         (if debug (printf "~a: admin!" command-line))
451         (write `(admin! ,command) commands)
452         (read-response-v2 responses))
453
454       (lambda ()                       ; close!
455         (if debug (printf "~a: close!!" command-line))
456         (write '(close!) commands)
457         (read-response-v2 responses)
458         (close-input-port responses)
459         (close-output-port commands)
460         (void))))))
461
462;; Given the command line to a storage remote access protocol server,
463;; activate it and return a storage object providing access to the
464;; server.
465(define (import-storage command-line . args)
466  (let-optionals args ((debug #f))
467   (let-values (((responses commands pid)
468                 (process command-line)))
469
470     #;(set-buffering-mode! commands #:none)
471
472     (if debug (print "~a: process opened" command-line))
473     (let ((magic (read responses)))
474       (if debug (print "~a: read magic ~a" command-line magic))
475       (cond
476        ((equal? magic *magic-v1*)
477         (import-storage-v1 command-line debug responses commands pid))
478        ((equal? magic *magic-v2*)
479         (import-storage-v2 command-line debug responses commands pid))
480        (else (error "Unrecognised backend protocol header magic" magic)))))))
481
482
483)
Note: See TracBrowser for help on using the repository browser.