source: project/release/4/ugarit/trunk/backend-fs.scm @ 25570

Last change on this file since 25570 was 25570, checked in by Alaric Snell-Pym, 9 years ago

ugarit: Version 2 of the backend protocol, supporting better reporting back to the user, and administrative interfaces. Backends outfitted with admin interfaces, and a ugarit-archive-admin tool added to drive them.

File size: 22.9 KB
Line 
1(use ugarit-backend)
2(use sql-de-lite)
3(use srfi-69)
4(use matchable)
5(use regex)
6(use miscmacros)
7
8(define (backend-fs base)
9   (define (make-name key extension) ; Break into levels to reduce files-in-one-dir strain
10      (cond
11         ((< (string-length key) 4)
12            (string-append base "/" key extension))
13         ((< (string-length key) 7)
14            (string-append base "/" (string-take key 3) "/" (string-drop key 3) extension))
15         ((< (string-length key) 10)
16            (string-append base "/" (string-take key 3) "/" (string-take (string-drop key 3) 3)
17               "/" (string-drop key 6) extension))
18         (else
19            (string-append base "/" (string-take key 3) "/" (string-take (string-drop key 3) 3)
20               "/" (string-take (string-drop key 6) 3) "/"
21               (string-drop key 9) extension))))
22   (define (ensure-directory! key)
23      (let
24         ((ed (lambda (path)
25            (if (not (directory? path))
26               (create-directory path)))))
27         (if (>= (string-length key) 4)
28            (ed (string-append base "/" (string-take key 3))))
29         (if (>= (string-length key) 7)
30            (ed (string-append base "/" (string-take key 3) "/" (string-take (string-drop key 3) 3))))
31         (if (>= (string-length key) 10)
32            (ed (string-append base "/" (string-take key 3) "/" (string-take (string-drop key 3) 3) "/" (string-take (string-drop key 6) 3))))
33         (void)))
34   (define (delete-dir-if-empty! key)
35      (let
36         ((dd (lambda (path)
37            (if (and (directory? path) (null? (directory path)))
38               (delete-directory path)))))
39
40         (if (>= (string-length key) 10)
41            (dd (string-append base "/" (string-take key 3) "/" (string-take (string-drop key 3) 3) "/" (string-take (string-drop key 6) 3))))
42         (if (>= (string-length key) 7)
43            (dd (string-append base "/" (string-take key 3) "/" (string-take (string-drop key 3) 3))))
44         (if (>= (string-length key) 4)
45            (dd (string-append base "/" (string-take key 3))))
46         (void)))
47
48   (define (make-tag-name tag)
49      (string-append base "/" tag ".tag"))
50
51   (define (make-tag-lock-name tag)
52      (string-append base "/" tag ".tag-lock"))
53
54   (if (not (directory? base))
55       (error "The archive directory does not exist" base))
56
57   (define block-size (* 1024 1024))
58
59   (make-storage
60    block-size
61      #t ; We are writable
62      #t ; We support unlink!
63      (lambda (key data type) ; put!
64         (if (file-read-access? (make-name key ".type"))
65            (signal (make-property-condition 'exn 'message "Duplicate block: put! should not be called on an existing hash" 'arguments (list key type)))
66            (begin
67               (ensure-directory! key)
68               ; Note: We save to ...~ files then mv them into place, so as to avoid ending up with a partial block
69               ; in the archive if it dies in mid-write. We move the .type file in last, since the existance of that is what
70               ; makes the block "official".
71               ; The only thing we need worry about is a race between two snapshots writing the same block at once...
72               ; However, since we can't easily provide atomicity on link!, we just say "don't do that" for now.
73               (with-output-to-file (make-name key ".data~")
74                  (lambda () (write-u8vector data)))
75               (with-output-to-file (make-name key ".type~")
76                  (lambda () (write type)))
77               (with-output-to-file (make-name key ".refcount~")
78                  (lambda () (write 1)))
79               (rename-file (make-name key ".data~") (make-name key ".data"))
80               (rename-file (make-name key ".refcount~") (make-name key ".refcount"))
81               (rename-file (make-name key ".type~") (make-name key ".type"))
82               (void))))
83      (lambda () (void)) ; flush! - a no-op for us
84      (lambda (key) ; exists?
85         (if (file-read-access? (make-name key ".data"))
86            (with-input-from-file (make-name key ".type")
87               (lambda () (read)))
88            #f))
89      (lambda (key) ; get
90         (if (file-read-access? (make-name key ".data"))
91            (with-input-from-file (make-name key ".data")
92               (lambda () (read-u8vector)))
93            #f))
94      (lambda (key) ; link!
95         (if
96            (file-read-access? (make-name key ".data"))
97            (let
98               ((current-refcount
99                  (with-input-from-file (make-name key ".refcount")
100                     (lambda () (read)))))
101               (begin
102                  (with-output-to-file (make-name key ".refcount~")
103                     (lambda () (write (+ current-refcount 1))))
104                     (rename-file (make-name key ".refcount~") (make-name key ".refcount"))))))
105      (lambda (key) ; unlink!
106         (and-let*
107            (((file-read-access? (make-name key ".data")))
108            (current-refcount
109               (with-input-from-file (make-name key ".refcount")
110                  (lambda () (read))))
111            (new-refcount (- current-refcount 1)))
112            (if (zero? new-refcount)
113               (let
114                  ((data (with-input-from-file (make-name key ".data")
115                     (lambda () (read-u8vector)))))
116                  (begin
117                     (delete-file (make-name key ".data"))
118                     (delete-file (make-name key ".type"))
119                     (delete-file (make-name key ".refcount"))
120                     (delete-dir-if-empty! key)
121                     data)) ; returned in case of deletion
122               (begin
123                  (with-output-to-file (make-name key ".refcount~")
124                     (lambda () (write new-refcount)))
125                  (rename-file (make-name key ".refcount~") (make-name key ".refcount"))
126                  #f))))
127      (lambda (tag key) ; set-tag!
128         (with-output-to-file (make-tag-name tag)
129            (lambda () (write key))))
130      (lambda (tag) ; tag
131         (if (file-read-access? (make-tag-name tag))
132            (with-input-from-file (make-tag-name tag)
133               (lambda () (let ((key (read)))
134                            (if (eof-object? key)
135                                #f ; Treat empty file as no tag
136                                key))))
137            #f))
138      (lambda () ; all-tags
139         (let
140            ((tag-path-regexp (regexp (make-tag-name "(.*)"))))
141            (map
142               (lambda (path)
143                  (cadr (string-match tag-path-regexp path)))
144               (glob (make-tag-name "*")))))
145      (lambda (tag) ; remove-tag!
146         (if (file-write-access? (make-tag-name tag))
147            (begin
148              (delete-file (make-tag-name tag))
149              (when (file-exists? (make-tag-lock-name tag))
150                    (delete-file (make-tag-lock-name tag))))
151            #f))
152      (lambda (tag) ; lock-tag!
153        ; Ensure tag file exists first, as an empty file if necessary
154        (file-close (file-open (make-tag-name tag) (+ open/wronly open/append open/creat)))
155        (condition-case
156         (begin
157           (file-link (make-tag-name tag) (make-tag-lock-name tag))
158           #t)
159         ((exn i/o file)
160          #f)))  ; If we can't create it for any reason, we haven't got the lock; it'd be nicer to check errno = EEXIST, though, and raise an exception for other errors.
161      (lambda (tag) ; tag-locked?
162        (not (not (file-exists? (make-tag-lock-name tag)))))
163      (lambda (tag) ; unlock-tag!
164        (delete-file (make-tag-lock-name tag))
165        (void))
166      (lambda (command) ; admin!
167        (match command
168               (('info)
169                (list (cons 'backend "fs")
170                      (cons 'path base)
171                      (cons 'block-size block-size)
172                      (cons 'writable? #t)
173                      (cons 'unlinkable? #t)))
174               (('help)
175                (list (cons 'info "Return information about the archive")
176                      (cons 'help "List available admin commands")))
177               (else (error "Unknown admin command"))))
178      (lambda () ; close!
179         (void))))
180
181(define splitlog-sql-schema
182  (list
183   "CREATE TABLE metadata (key TEXT PRIMARY KEY, value TEXT);"
184   "INSERT INTO metadata VALUES ('version','1');"
185   "CREATE TABLE blocks (key TEXT PRIMARY KEY, type TEXT, fileno INTEGER, position INTEGER, length INTEGER);"
186   "CREATE TABLE tags (tag TEXT PRIMARY KEY, key TEXT, locked INTEGER DEFAULT 0);"))
187
188(define file-sync (foreign-lambda int "fsync" int))
189
190(define (backend-splitlog logdir metapath)
191   (let*
192        ((*db*
193          (let ((db (open-database metapath)))
194            (change-file-mode metapath (bitwise-ior perm/irusr perm/iwusr)) ; Don't think we can do anything about the journal files, though.
195            (when (null? (schema db))
196                  (for-each (lambda (statement)
197                              (exec (sql db statement)))
198                            splitlog-sql-schema))
199            (exec (sql db "BEGIN;"))
200            db))
201
202         ; Prepared statements
203         (get-metadata-query (sql *db* "SELECT value FROM metadata WHERE key = ?"))
204         (set-metadata-query (sql *db* "INSERT OR REPLACE INTO metadata (key,value) VALUES (?,?)"))
205         (get-block-data-query (sql *db* "SELECT type, fileno, position, length FROM blocks WHERE key = ?"))
206         (set-block-data-query (sql *db* "INSERT INTO blocks (key,type,fileno,position,length) VALUES (?,?,?,?,?)"))
207         (get-tag-query (sql *db* "SELECT key FROM tags WHERE tag = ?"))
208         (set-tag-query (sql *db* "INSERT OR REPLACE INTO tags (tag,key) VALUES (?,?)"))
209         (remove-tag-query (sql *db* "DELETE FROM tags WHERE tag = ?"))
210         (set-tag-lock-query (sql *db* "UPDATE tags SET locked = ? WHERE tag = ?"))
211         (get-tag-lock-query (sql *db* "SELECT locked FROM tags WHERE tag = ?"))
212         (get-tags-query (sql *db* "SELECT tag FROM tags"))
213
214         ; Database access functions
215         (get-metadata (lambda (key default)
216                         (let ((result (query fetch get-metadata-query key)))
217                           (if (null? result)
218                               (begin
219                                 (exec set-metadata-query key default)
220                                 default)
221                               (car result)))))
222         (set-metadata! (lambda (key value)
223                         (exec set-metadata-query key value)))
224
225         (max-logpart-size (string->number (get-metadata "max-logpart-size" "600000000")))
226
227         ; Log file management
228         (*logcount* (string->number (get-metadata "current-logfile" "0")))
229         (set-logcount! (lambda (newcount)
230                         (set! *logcount* newcount)))
231         (*log* (file-open (string-append logdir "/log" (number->string *logcount*))
232                  (bitwise-ior open/creat open/rdwr open/append) (bitwise-ior perm/irusr perm/iwusr)))
233         (*logfiles* (make-hash-table)) ; hash of file number to FD
234         (get-log (lambda (index)
235            (if (hash-table-exists? *logfiles* index)
236               (hash-table-ref *logfiles* index)
237               (begin
238                  (let ((fd (file-open (string-append logdir "/log" (number->string index)) open/rdonly perm/irwxu)))
239                     (set! (hash-table-ref *logfiles* index) fd)
240                     fd)))))
241
242         ; Basic configurables
243         (block-size (string->number (get-metadata "block-size" "1048576")))
244         (writable? (not (string=? "0" (get-metadata "writable" "1"))))
245         (check-writable (lambda ()
246                           (unless writable?
247                                   (error "This archive is write protected"))))
248
249         ; Periodic commit management
250         (commit-interval (string->number (get-metadata "commit-interval" "1000")))
251         (*updates-since-last-commit* 0)
252         (flush! (lambda ()
253                   (when (> *updates-since-last-commit* 0)
254                    (file-sync *log*)
255                    (set-metadata! "current-logfile" (number->string *logcount*))
256                    (exec (sql *db* "COMMIT;"))
257                    (exec (sql *db* "BEGIN;"))
258                    (set! *updates-since-last-commit* 0))))
259         (maybe-flush! (lambda ()
260                         (inc! *updates-since-last-commit*)
261                         (when (> *updates-since-last-commit* commit-interval)
262                             (flush!))))
263
264         ; Higher-level database utilities
265         (get-block-data (lambda (key) ; Returns #f for nonexistant blocks
266                           (let ((bd (query fetch get-block-data-query key)))
267                             (if (pair? bd)
268                                 (let ((type (string->symbol (first bd)))
269                                       (fileno (second bd))
270                                       (position (third bd))
271                                       (length (fourth bd)))
272                                   (list type fileno position length))
273                                 #f))))
274
275         (set-block-data! (lambda (key type fileno position length)
276                           (exec set-block-data-query key (symbol->string type) fileno position length)
277                           (maybe-flush!)))
278
279         (set-tag! (lambda (tag key)
280                    (exec set-tag-query tag key)
281                    (flush!)))
282
283         (remove-tag! (lambda (tag)
284                        (exec remove-tag-query tag)
285                        (flush!)))
286
287         (get-tag (lambda (tag)
288                         (let ((td (query fetch get-tag-query tag)))
289                           (if (pair? td)
290                               (if (null? (car td)) ; treat NULL as no tag
291                                   #f
292                                   (car td))
293                               #f))))
294
295         (set-tag-lock! (lambda (tag lock)
296                      (exec set-tag-lock-query lock tag)
297                      (flush!)))
298
299         (get-tag-lock (lambda (tag lock)
300                         (let ((td (query fetch get-tag-lock-query tag)))
301                           (if (pair? td)
302                               (car td)
303                               (begin ; Tag does not exist, create it on demand
304                                 (set-tag! tag '()) ; insert NULL tag record
305                                 0)))))
306
307         (get-tags (lambda ()
308                     (map car (query fetch-all get-tags-query))))
309
310         (reindex! (lambda ()
311                     (flush!)
312                     (exec (sql *db* "DELETE FROM tags;"))
313                     (exec (sql *db* "DELETE FROM blocks;"))
314
315                     (let loop-over-logs ((log-number 0))
316                       (let* ((log-file-name (string-append logdir "/log" (number->string log-number))))
317                         (if (file-exists? log-file-name)
318                          (begin
319                            ((backend-log!) 'info (sprintf "Reading ~a" log-file-name))
320                            (with-input-from-file log-file-name
321                              (lambda ()
322                                (let loop-over-entries ()
323                                  (let* ((entry (read))
324                                         (posn (file-position (current-input-port))))
325                                    (if (eof-object? entry)
326                                        (loop-over-logs (+ log-number 1))
327                                        (begin
328                                          (match entry
329                                                 (('block key type length)
330                                                  (set-block-data! key type log-number posn length)
331                                                  (set-file-position! (current-input-port) length seek/cur))
332                                                 (('tag tag key)
333                                                  (set-tag! tag key))
334                                                 (else
335                                                  ((backend-log!) 'error "Unknown log entry ~S" entry)))
336                                          (loop-over-entries)))))))))
337                         (void)))
338                     (flush!)
339                     (void))))
340
341      (make-storage
342         block-size
343         writable?
344         #f ; We DO NOT support unlink!
345
346         (lambda (key data type) ; put!
347           (check-writable)
348           (when (pair? (get-block-data key))
349                 (error "Duplicate block" key type))
350
351           (set-file-position! *log* 0 seek/end)
352
353           (let ((header (sprintf "(block ~S ~S ~S)" key type (u8vector-length data)))
354                 (posn (file-position *log*)))
355             (if (and (not (zero? posn)) (> (+ (u8vector-length data) (string-length header) posn) max-logpart-size))
356                 (begin
357                   (file-close *log*)
358                   (set! posn 0)
359                   (set-logcount! (+ *logcount* 1))
360                   (set! *log* (file-open (string-append logdir "/log" (number->string *logcount*))
361                                          (bitwise-ior open/creat open/rdwr open/append) (bitwise-ior perm/irusr perm/iwusr)))))
362             (file-write *log* header)
363             (file-write *log* (u8vector->blob/shared data))
364             (set-block-data! key type *logcount* (+ (string-length header) posn) (u8vector-length data))
365             (void)))
366
367         (lambda ()                     ; flush!
368           (flush!)
369           (void))
370
371         (lambda (key) ; exists?
372           (let ((bd (get-block-data key)))
373             (if (pair? bd)
374                 (car bd)
375                 #f)))
376
377         (lambda (key) ; get
378            (let* ((entry (get-block-data key)))
379              (if (pair? entry)
380               (let* ((type (first entry))
381                      (index (second entry))
382                      (position (third entry))
383                      (length (fourth entry))
384                      (buffer (make-blob length))
385                      (logpart (get-log index)))
386                 (set-file-position! logpart position seek/set)
387                 (file-read logpart length buffer)
388                 (blob->u8vector/shared buffer))
389               #f)))
390
391         (lambda (key) ; link!
392           (check-writable)
393           (void))
394
395         (lambda (key) ; unlink!
396           (check-writable)
397           (error "splitlog archives do not support unlinkined"))
398
399         (lambda (tag key) ; set-tag!
400           (check-writable)
401           (file-write *log* (sprintf "(tag ~S ~S)" tag key))
402           (set-tag! tag key)
403           (void))
404         (lambda (tag) ; tag
405           (get-tag tag))
406         (lambda () ; all-tags
407           (get-tags))
408         (lambda (tag) ; remove-tag!
409           (check-writable)
410           (remove-tag! tag)
411           (void))
412         (lambda (tag) ; lock-tag!
413           (check-writable)
414           (flush!)
415           (let ((existing-lock? (not (zero? (get-tag-lock tag)))))
416             (if existing-lock?
417                 (begin
418                   #f)
419                 (begin
420                   (set-tag-lock! tag 1)
421                   (flush!)
422                   #t))))
423         (lambda (tag) ; tag-locked?
424           (if (zero? (get-tag-lock tag))
425               #f
426               #t))
427         (lambda (tag) ; unlock-tag!
428           (check-writable)
429           (set-tag-lock! tag 0)
430           (flush!))
431         (lambda (command) ; admin!
432           (match command
433                  (('info)
434                   (list (cons 'backend "splitlog")
435                         (cons 'block-size block-size)
436                         (cons 'writable? writable?)
437                         (cons 'unlinkable? #f)
438                         (cons 'path logdir)
439                         (cons 'metadata-file metapath)
440                         (cons 'max-logfile-size max-logpart-size)
441                         (cons 'currently-writing-to *logcount*)
442                         (cons 'commit-interval commit-interval)))
443                  (('help)
444                   (list (cons 'info "Return information about the archive")
445                         (cons 'help "List available admin commands")
446                         (cons 'stats "Examine the metadata and report back statistics")
447                         (cons 'set-block-size! (sprintf "<size in bytes> Sets a new maximum block size (current: ~a)" block-size))
448                         (cons 'set-max-logfile-size! (sprintf "<size in bytes> Sets a new maximum logfile size (current: ~a)" max-logpart-size))
449                         (cons 'set-commit-interval! (sprintf "<updates> Sets a new commit interval (current: ~a)" commit-interval))
450                         (cons 'write-protect! (sprintf "Disable writing to the archive (currently ~a)" (if writable? "enabled" "disabled")))
451                         (cons 'write-unprotect! (sprintf "Enable writing to the archive (currently ~a)" (if writable? "enabled" "disabled")))
452                         (cons 'reindex! "Rebuild the index in the metadata from scratch by scanning the log (takes a while)")))
453                  (('stats)
454                   (let* ((stats (query fetch (sql *db* "SELECT COUNT(*), SUM(length) FROM blocks"))))
455                    (list (cons 'blocks (first stats))
456                          (cons 'bytes (second stats)))))
457                  (('set-block-size! size)
458                   (assert (integer? size))
459                   (set! block-size size)
460                   (set-metadata! "block-size" (number->string size))
461                   (list (cons 'result "Done")))
462                  (('set-max-logfile-size! size)
463                   (assert (integer? size))
464                   (set! max-logpart-size size)
465                   (set-metadata! "max-logpart-size" (number->string size))
466                   (list (cons 'result "Done")))
467                  (('set-commit-interval! cis)
468                   (assert (integer? cis))
469                   (set! commit-interval cis)
470                   (set-metadata! "commit-interval" (number->string  cis))
471                   (list (cons 'result "Done")))
472                  (('write-protect!)
473                   (set! writable? #f)
474                   (set-metadata! "writable" "0")
475                   (list (cons 'result "Done")))
476                  (('write-unprotect!)
477                   (set! writable? #f)
478                   (set-metadata! "writable" "1")
479                   (list (cons 'result "Done")))
480                  (('reindex!)
481                   (reindex!)
482                   (list (cons 'result "Done")))
483                  (else (error "Unknown admin command"))))
484         (lambda () ; close!
485           (flush!)
486           (exec (sql *db* "COMMIT;"))
487           (close-database *db*)
488           (file-close *log*)
489           (hash-table-for-each *logfiles*
490                                (lambda (key value)
491                                  (file-close value)))))))
492
493(define backend
494  (match (command-line-arguments)
495         (("fs" base)
496          (lambda ()  (backend-fs base)))
497
498         (("splitlog" logdir metadir)
499          (lambda ()  (backend-splitlog logdir metadir)))
500
501         (else
502          (export-storage-error! "Invalid arguments to backend-fs")
503          (printf "USAGE:\nbackend-fs fs <basedir-path>\nbackend-fs splitlog <logdir-path> <metadata-file-path>\n")
504          #f)))
505
506(if backend
507    (export-storage! backend))
Note: See TracBrowser for help on using the repository browser.