source: project/release/4/ugarit/trunk/backend-fs.scm @ 25565

Last change on this file since 25565 was 25565, checked in by Alaric Snell-Pym, 9 years ago

ugarit: tag locking, and strict enforcement of maximum file size in splitlog archives

File size: 17.1 KB
Line 
1(use ugarit-backend)
2(use sql-de-lite)
3(use srfi-69)
4(use matchable)
5(use regex)
6(use miscmacros)
7
8(define (backend-fs base)
9   (define (make-name key extension) ; Break into levels to reduce files-in-one-dir strain
10      (cond
11         ((< (string-length key) 4)
12            (string-append base "/" key extension))
13         ((< (string-length key) 7)
14            (string-append base "/" (string-take key 3) "/" (string-drop key 3) extension))
15         ((< (string-length key) 10)
16            (string-append base "/" (string-take key 3) "/" (string-take (string-drop key 3) 3)
17               "/" (string-drop key 6) extension))
18         (else
19            (string-append base "/" (string-take key 3) "/" (string-take (string-drop key 3) 3)
20               "/" (string-take (string-drop key 6) 3) "/"
21               (string-drop key 9) extension))))
22   (define (ensure-directory! key)
23      (let
24         ((ed (lambda (path)
25            (if (not (directory? path))
26               (create-directory path)))))
27         (if (>= (string-length key) 4)
28            (ed (string-append base "/" (string-take key 3))))
29         (if (>= (string-length key) 7)
30            (ed (string-append base "/" (string-take key 3) "/" (string-take (string-drop key 3) 3))))
31         (if (>= (string-length key) 10)
32            (ed (string-append base "/" (string-take key 3) "/" (string-take (string-drop key 3) 3) "/" (string-take (string-drop key 6) 3))))
33         (void)))
34   (define (delete-dir-if-empty! key)
35      (let
36         ((dd (lambda (path)
37            (if (and (directory? path) (null? (directory path)))
38               (delete-directory path)))))
39
40         (if (>= (string-length key) 10)
41            (dd (string-append base "/" (string-take key 3) "/" (string-take (string-drop key 3) 3) "/" (string-take (string-drop key 6) 3))))
42         (if (>= (string-length key) 7)
43            (dd (string-append base "/" (string-take key 3) "/" (string-take (string-drop key 3) 3))))
44         (if (>= (string-length key) 4)
45            (dd (string-append base "/" (string-take key 3))))
46         (void)))
47
48   (define (make-tag-name tag)
49      (string-append base "/" tag ".tag"))
50
51   (define (make-tag-lock-name tag)
52      (string-append base "/" tag ".tag-lock"))
53
54   (if (not (directory? base))
55      (signal (make-property-condition 'exn 'message "The archive does not exist" 'arguments base)))
56
57   (make-storage
58      (* 1024 1024) ; 1MiB blocks since local disk is fast and cheap
59      #t ; We are writable
60      #t ; We support unlink!
61      (lambda (key data type) ; put!
62         (if (file-read-access? (make-name key ".type"))
63            (signal (make-property-condition 'exn 'message "Duplicate block: put! should not be called on an existing hash" 'arguments (list key type)))
64            (begin
65               (ensure-directory! key)
66               ; Note: We save to ...~ files then mv them into place, so as to avoid ending up with a partial block
67               ; in the archive if it dies in mid-write. We move the .type file in last, since the existance of that is what
68               ; makes the block "official".
69               ; The only thing we need worry about is a race between two snapshots writing the same block at once...
70               ; However, since we can't easily provide atomicity on link!, we just say "don't do that" for now.
71               (with-output-to-file (make-name key ".data~")
72                  (lambda () (write-u8vector data)))
73               (with-output-to-file (make-name key ".type~")
74                  (lambda () (write type)))
75               (with-output-to-file (make-name key ".refcount~")
76                  (lambda () (write 1)))
77               (rename-file (make-name key ".data~") (make-name key ".data"))
78               (rename-file (make-name key ".refcount~") (make-name key ".refcount"))
79               (rename-file (make-name key ".type~") (make-name key ".type"))
80               (void))))
81      (lambda () (void)) ; flush! - a no-op for us
82      (lambda (key) ; exists?
83         (if (file-read-access? (make-name key ".data"))
84            (with-input-from-file (make-name key ".type")
85               (lambda () (read)))
86            #f))
87      (lambda (key) ; get
88         (if (file-read-access? (make-name key ".data"))
89            (with-input-from-file (make-name key ".data")
90               (lambda () (read-u8vector)))
91            #f))
92      (lambda (key) ; link!
93         (if
94            (file-read-access? (make-name key ".data"))
95            (let
96               ((current-refcount
97                  (with-input-from-file (make-name key ".refcount")
98                     (lambda () (read)))))
99               (begin
100                  (with-output-to-file (make-name key ".refcount~")
101                     (lambda () (write (+ current-refcount 1))))
102                     (rename-file (make-name key ".refcount~") (make-name key ".refcount"))))))
103      (lambda (key) ; unlink!
104         (and-let*
105            (((file-read-access? (make-name key ".data")))
106            (current-refcount
107               (with-input-from-file (make-name key ".refcount")
108                  (lambda () (read))))
109            (new-refcount (- current-refcount 1)))
110            (if (zero? new-refcount)
111               (let
112                  ((data (with-input-from-file (make-name key ".data")
113                     (lambda () (read-u8vector)))))
114                  (begin
115                     (delete-file (make-name key ".data"))
116                     (delete-file (make-name key ".type"))
117                     (delete-file (make-name key ".refcount"))
118                     (delete-dir-if-empty! key)
119                     data)) ; returned in case of deletion
120               (begin
121                  (with-output-to-file (make-name key ".refcount~")
122                     (lambda () (write new-refcount)))
123                  (rename-file (make-name key ".refcount~") (make-name key ".refcount"))
124                  #f))))
125      (lambda (tag key) ; set-tag!
126         (with-output-to-file (make-tag-name tag)
127            (lambda () (write key))))
128      (lambda (tag) ; tag
129         (if (file-read-access? (make-tag-name tag))
130            (with-input-from-file (make-tag-name tag)
131               (lambda () (let ((key (read)))
132                            (if (eof-object? key)
133                                #f ; Treat empty file as no tag
134                                key))))
135            #f))
136      (lambda () ; all-tags
137         (let
138            ((tag-path-regexp (regexp (make-tag-name "(.*)"))))
139            (map
140               (lambda (path)
141                  (cadr (string-match tag-path-regexp path)))
142               (glob (make-tag-name "*")))))
143      (lambda (tag) ; remove-tag!
144         (if (file-write-access? (make-tag-name tag))
145            (begin
146              (delete-file (make-tag-name tag))
147              (when (file-exists? (make-tag-lock-name tag))
148                    (delete-file (make-tag-lock-name tag))))
149            #f))
150      (lambda (tag) ; lock-tag!
151        ; Ensure tag file exists first, as an empty file if necessary
152        (file-close (file-open (make-tag-name tag) (+ open/wronly open/append open/creat)))
153        (condition-case
154         (begin
155           (file-link (make-tag-name tag) (make-tag-lock-name tag))
156           #t)
157         ((exn i/o file)
158          #f)))  ; If we can't create it for any reason, we haven't got the lock; it'd be nicer to check errno = EEXIST, though, and raise an exception for other errors.
159      (lambda (tag) ; tag-locked?
160        (not (not (file-exists? (make-tag-lock-name tag)))))
161      (lambda (tag) ; unlock-tag!
162        (delete-file (make-tag-lock-name tag))
163        (void))
164      (lambda () ; close!
165         (void))))
166
167(define splitlog-sql-schema
168  (list
169   "CREATE TABLE metadata (key TEXT PRIMARY KEY, value TEXT);"
170   "INSERT INTO metadata VALUES ('version','1');"
171   "CREATE TABLE blocks (key TEXT PRIMARY KEY, type TEXT, fileno INTEGER, position INTEGER, length INTEGER);"
172   "CREATE TABLE tags (tag TEXT PRIMARY KEY, key TEXT, locked INTEGER DEFAULT 0);"))
173
174(define (backend-splitlog logdir metapath max-logpart-size)
175   (let*
176        ((*db*
177          (let ((db (open-database metapath)))
178            (change-file-mode metapath (bitwise-ior perm/irusr perm/iwusr)) ; Don't think we can do anything about the journal files, though.
179            (when (null? (schema db))
180                  (for-each (lambda (statement)
181                              (exec (sql db statement)))
182                            splitlog-sql-schema))
183            (exec (sql db "BEGIN;"))
184            db))
185
186         ; Prepared statements
187         (get-metadata-query (sql *db* "SELECT value FROM metadata WHERE key = ?"))
188         (set-metadata-query (sql *db* "INSERT OR REPLACE INTO metadata (key,value) VALUES (?,?)"))
189         (get-block-data-query (sql *db* "SELECT type, fileno, position, length FROM blocks WHERE key = ?"))
190         (set-block-data-query (sql *db* "INSERT INTO blocks (key,type,fileno,position,length) VALUES (?,?,?,?,?)"))
191         (get-tag-query (sql *db* "SELECT key FROM tags WHERE tag = ?"))
192         (set-tag-query (sql *db* "INSERT OR REPLACE INTO tags (tag,key) VALUES (?,?)"))
193         (remove-tag-query (sql *db* "DELETE FROM tags WHERE tag = ?"))
194         (set-tag-lock-query (sql *db* "UPDATE tags SET locked = ? WHERE tag = ?"))
195         (get-tag-lock-query (sql *db* "SELECT locked FROM tags WHERE tag = ?"))
196         (get-tags-query (sql *db* "SELECT tag FROM tags"))
197
198         ; Database access functions
199         (get-metadata (lambda (key default)
200                         (let ((result (query fetch get-metadata-query key)))
201                           (if (null? result)
202                               (begin
203                                 (exec set-metadata-query key default)
204                                 default)
205                               (car result)))))
206         (set-metadata (lambda (key value)
207                         (exec set-metadata-query key value)))
208
209         ; Log file management
210         (*logcount* (string->number (get-metadata "current-logfile" "0")))
211         (set-logcount! (lambda (newcount)
212                         (set! *logcount* newcount)))
213         (*log* (file-open (string-append logdir "/log" (number->string *logcount*))
214                  (bitwise-ior open/creat open/rdwr open/append) (bitwise-ior perm/irusr perm/iwusr)))
215         (*logfiles* (make-hash-table)) ; hash of file number to FD
216         (get-log (lambda (index)
217            (if (hash-table-exists? *logfiles* index)
218               (hash-table-ref *logfiles* index)
219               (begin
220                  (let ((fd (file-open (string-append logdir "/log" (number->string index)) open/rdonly perm/irwxu)))
221                     (set! (hash-table-ref *logfiles* index) fd)
222                     fd)))))
223
224         ; Periodic commit management
225         (commit-interval (string->number (get-metadata "commit-interval" "1000")))
226         (*updates-since-last-commit* 0)
227         (flush! (lambda ()
228                   (when (> *updates-since-last-commit* 0)
229                    (set-metadata "current-logfile" (number->string *logcount*))
230                    (exec (sql *db* "COMMIT;"))
231                    (exec (sql *db* "BEGIN;"))
232                    (set! *updates-since-last-commit* 0))))
233         (maybe-flush! (lambda ()
234                         (inc! *updates-since-last-commit*)
235                         (when (> *updates-since-last-commit* commit-interval)
236                             (flush!))))
237
238         ; Higher-level database utilities
239         (get-block-data (lambda (key) ; Returns #f for nonexistant blocks
240                           (let ((bd (query fetch get-block-data-query key)))
241                             (if (pair? bd)
242                                 (let ((type (string->symbol (first bd)))
243                                       (fileno (second bd))
244                                       (position (third bd))
245                                       (length (fourth bd)))
246                                   (list type fileno position length))
247                                 #f))))
248
249         (set-block-data! (lambda (key type fileno position length)
250                           (exec set-block-data-query key (symbol->string type) fileno position length)
251                           (maybe-flush!)))
252
253         (set-tag! (lambda (tag key)
254                    (exec set-tag-query tag key)
255                    (flush!)))
256
257         (remove-tag! (lambda (tag)
258                        (exec remove-tag-query tag)
259                        (flush!)))
260
261         (get-tag (lambda (tag)
262                         (let ((td (query fetch get-tag-query tag)))
263                           (if (pair? td)
264                               (if (null? (car td)) ; treat NULL as no tag
265                                   #f
266                                   (car td))
267                               #f))))
268
269         (set-tag-lock! (lambda (tag lock)
270                      (exec set-tag-lock-query lock tag)
271                      (flush!)))
272
273         (get-tag-lock (lambda (tag lock)
274                         (let ((td (query fetch get-tag-lock-query tag)))
275                           (if (pair? td)
276                               (car td)
277                               (begin ; Tag does not exist, create it on demand
278                                 (set-tag! tag '()) ; insert NULL tag record
279                                 0)))))
280
281         (get-tags (lambda ()
282                     (map car (query fetch-all get-tags-query)))))
283
284      (make-storage
285         (* 1024 1024) ; 1MiB blocks since local disk is fast and cheap, right?
286         #t ; We are writable
287         #f ; We DO NOT support unlink!
288
289         (lambda (key data type) ; put!
290           (when (pair? (get-block-data key))
291                 (signal (make-property-condition 'exn 'message "Duplicate block: put! should not be called on an existing hash" 'arguments (list key type))))
292
293           (set-file-position! *log* 0 seek/end)
294
295           (let ((header (sprintf "(block ~S ~S ~S)" key type (u8vector-length data)))
296                 (posn (file-position *log*)))
297             (if (and (not (zero? posn)) (> (+ (u8vector-length data) (string-length header) posn) max-logpart-size))
298                 (begin
299                   (file-close *log*)
300                   (set! posn 0)
301                   (set-logcount! (+ *logcount* 1))
302                   (set! *log* (file-open (string-append logdir "/log" (number->string *logcount*))
303                                          (bitwise-ior open/creat open/rdwr open/append) (bitwise-ior perm/irusr perm/iwusr)))))
304             (file-write *log* header)
305             (file-write *log* (u8vector->blob/shared data))
306             (set-block-data! key type *logcount* (+ (string-length header) posn) (u8vector-length data))
307             (void)))
308
309         (lambda ()                     ; flush!
310           (flush!)
311           (void))
312
313         (lambda (key) ; exists?
314           (let ((bd (get-block-data key)))
315             (if (pair? bd)
316                 (car bd)
317                 #f)))
318
319         (lambda (key) ; get
320            (let* ((entry (get-block-data key)))
321              (if (pair? entry)
322               (let* ((type (first entry))
323                      (index (second entry))
324                      (position (third entry))
325                      (length (fourth entry))
326                      (buffer (make-blob length))
327                      (logpart (get-log index)))
328                 (set-file-position! logpart position seek/set)
329                 (file-read logpart length buffer)
330                 (blob->u8vector/shared buffer))
331               #f)))
332
333         (lambda (key) ; link!
334            (void))
335
336         (lambda (key) ; unlink!
337            (signal (make-property-condition 'exn 'message "Log archives do not support deletion")))
338
339         (lambda (tag key) ; set-tag!
340            (file-write *log* (sprintf "(tag ~S ~S)" tag key))
341            (set-tag! tag key)
342            (void))
343         (lambda (tag) ; tag
344           (get-tag tag))
345         (lambda () ; all-tags
346           (get-tags))
347         (lambda (tag) ; remove-tag!
348           (remove-tag! tag)
349           (void))
350         (lambda (tag) ; lock-tag!
351           (flush!)
352           (let ((existing-lock? (not (zero? (get-tag-lock tag)))))
353             (if existing-lock?
354                 (begin
355                   #f)
356                 (begin
357                   (set-tag-lock! tag 1)
358                   (flush!)
359                   #t))))
360         (lambda (tag) ; tag-locked?
361           (if (zero? (get-tag-lock tag))
362               #f
363               #t))
364         (lambda (tag) ; unlock-tag!
365           (set-tag-lock! tag 0)
366           (flush!))
367         (lambda () ; close!
368           (flush!)
369           (exec (sql *db* "COMMIT;"))
370           (close-database *db*)
371           (file-close *log*)
372           (hash-table-for-each *logfiles*
373                                (lambda (key value)
374                                  (file-close value)))))))
375
376(define backend
377  (match (command-line-arguments)
378         (("fs" base)
379          (backend-fs base))
380
381         (("splitlog" logdir metadir max-logpart-size)
382          (backend-splitlog logdir metadir (string->number max-logpart-size)))
383
384         (else
385          (printf "USAGE:\nbackend-fs fs <basedir-path>\nbackend-fs splitlog <logdir-path> <metadata-file-path> <max-file-size>\n")
386          #f)))
387
388(if backend
389    (export-storage! backend))
Note: See TracBrowser for help on using the repository browser.