1 | (use ugarit-backend) |
---|
2 | (use sql-de-lite) |
---|
3 | (use srfi-69) |
---|
4 | (use matchable) |
---|
5 | (use regex) |
---|
6 | (use miscmacros) |
---|
7 | |
---|
8 | (define (backend-fs base) |
---|
9 | (define (make-name key extension) ; Break into levels to reduce files-in-one-dir strain |
---|
10 | (cond |
---|
11 | ((< (string-length key) 4) |
---|
12 | (string-append base "/" key extension)) |
---|
13 | ((< (string-length key) 7) |
---|
14 | (string-append base "/" (string-take key 3) "/" (string-drop key 3) extension)) |
---|
15 | ((< (string-length key) 10) |
---|
16 | (string-append base "/" (string-take key 3) "/" (string-take (string-drop key 3) 3) |
---|
17 | "/" (string-drop key 6) extension)) |
---|
18 | (else |
---|
19 | (string-append base "/" (string-take key 3) "/" (string-take (string-drop key 3) 3) |
---|
20 | "/" (string-take (string-drop key 6) 3) "/" |
---|
21 | (string-drop key 9) extension)))) |
---|
22 | (define (ensure-directory! key) |
---|
23 | (let |
---|
24 | ((ed (lambda (path) |
---|
25 | (if (not (directory? path)) |
---|
26 | (create-directory path))))) |
---|
27 | (if (>= (string-length key) 4) |
---|
28 | (ed (string-append base "/" (string-take key 3)))) |
---|
29 | (if (>= (string-length key) 7) |
---|
30 | (ed (string-append base "/" (string-take key 3) "/" (string-take (string-drop key 3) 3)))) |
---|
31 | (if (>= (string-length key) 10) |
---|
32 | (ed (string-append base "/" (string-take key 3) "/" (string-take (string-drop key 3) 3) "/" (string-take (string-drop key 6) 3)))) |
---|
33 | (void))) |
---|
34 | (define (delete-dir-if-empty! key) |
---|
35 | (let |
---|
36 | ((dd (lambda (path) |
---|
37 | (if (and (directory? path) (null? (directory path))) |
---|
38 | (delete-directory path))))) |
---|
39 | |
---|
40 | (if (>= (string-length key) 10) |
---|
41 | (dd (string-append base "/" (string-take key 3) "/" (string-take (string-drop key 3) 3) "/" (string-take (string-drop key 6) 3)))) |
---|
42 | (if (>= (string-length key) 7) |
---|
43 | (dd (string-append base "/" (string-take key 3) "/" (string-take (string-drop key 3) 3)))) |
---|
44 | (if (>= (string-length key) 4) |
---|
45 | (dd (string-append base "/" (string-take key 3)))) |
---|
46 | (void))) |
---|
47 | |
---|
48 | (define (make-tag-name tag) |
---|
49 | (string-append base "/" tag ".tag")) |
---|
50 | |
---|
51 | (define (make-tag-lock-name tag) |
---|
52 | (string-append base "/" tag ".tag-lock")) |
---|
53 | |
---|
54 | (if (not (directory? base)) |
---|
55 | (signal (make-property-condition 'exn 'message "The archive does not exist" 'arguments base))) |
---|
56 | |
---|
57 | (make-storage |
---|
58 | (* 1024 1024) ; 1MiB blocks since local disk is fast and cheap |
---|
59 | #t ; We are writable |
---|
60 | #t ; We support unlink! |
---|
61 | (lambda (key data type) ; put! |
---|
62 | (if (file-read-access? (make-name key ".type")) |
---|
63 | (signal (make-property-condition 'exn 'message "Duplicate block: put! should not be called on an existing hash" 'arguments (list key type))) |
---|
64 | (begin |
---|
65 | (ensure-directory! key) |
---|
66 | ; Note: We save to ...~ files then mv them into place, so as to avoid ending up with a partial block |
---|
67 | ; in the archive if it dies in mid-write. We move the .type file in last, since the existance of that is what |
---|
68 | ; makes the block "official". |
---|
69 | ; The only thing we need worry about is a race between two snapshots writing the same block at once... |
---|
70 | ; However, since we can't easily provide atomicity on link!, we just say "don't do that" for now. |
---|
71 | (with-output-to-file (make-name key ".data~") |
---|
72 | (lambda () (write-u8vector data))) |
---|
73 | (with-output-to-file (make-name key ".type~") |
---|
74 | (lambda () (write type))) |
---|
75 | (with-output-to-file (make-name key ".refcount~") |
---|
76 | (lambda () (write 1))) |
---|
77 | (rename-file (make-name key ".data~") (make-name key ".data")) |
---|
78 | (rename-file (make-name key ".refcount~") (make-name key ".refcount")) |
---|
79 | (rename-file (make-name key ".type~") (make-name key ".type")) |
---|
80 | (void)))) |
---|
81 | (lambda () (void)) ; flush! - a no-op for us |
---|
82 | (lambda (key) ; exists? |
---|
83 | (if (file-read-access? (make-name key ".data")) |
---|
84 | (with-input-from-file (make-name key ".type") |
---|
85 | (lambda () (read))) |
---|
86 | #f)) |
---|
87 | (lambda (key) ; get |
---|
88 | (if (file-read-access? (make-name key ".data")) |
---|
89 | (with-input-from-file (make-name key ".data") |
---|
90 | (lambda () (read-u8vector))) |
---|
91 | #f)) |
---|
92 | (lambda (key) ; link! |
---|
93 | (if |
---|
94 | (file-read-access? (make-name key ".data")) |
---|
95 | (let |
---|
96 | ((current-refcount |
---|
97 | (with-input-from-file (make-name key ".refcount") |
---|
98 | (lambda () (read))))) |
---|
99 | (begin |
---|
100 | (with-output-to-file (make-name key ".refcount~") |
---|
101 | (lambda () (write (+ current-refcount 1)))) |
---|
102 | (rename-file (make-name key ".refcount~") (make-name key ".refcount")))))) |
---|
103 | (lambda (key) ; unlink! |
---|
104 | (and-let* |
---|
105 | (((file-read-access? (make-name key ".data"))) |
---|
106 | (current-refcount |
---|
107 | (with-input-from-file (make-name key ".refcount") |
---|
108 | (lambda () (read)))) |
---|
109 | (new-refcount (- current-refcount 1))) |
---|
110 | (if (zero? new-refcount) |
---|
111 | (let |
---|
112 | ((data (with-input-from-file (make-name key ".data") |
---|
113 | (lambda () (read-u8vector))))) |
---|
114 | (begin |
---|
115 | (delete-file (make-name key ".data")) |
---|
116 | (delete-file (make-name key ".type")) |
---|
117 | (delete-file (make-name key ".refcount")) |
---|
118 | (delete-dir-if-empty! key) |
---|
119 | data)) ; returned in case of deletion |
---|
120 | (begin |
---|
121 | (with-output-to-file (make-name key ".refcount~") |
---|
122 | (lambda () (write new-refcount))) |
---|
123 | (rename-file (make-name key ".refcount~") (make-name key ".refcount")) |
---|
124 | #f)))) |
---|
125 | (lambda (tag key) ; set-tag! |
---|
126 | (with-output-to-file (make-tag-name tag) |
---|
127 | (lambda () (write key)))) |
---|
128 | (lambda (tag) ; tag |
---|
129 | (if (file-read-access? (make-tag-name tag)) |
---|
130 | (with-input-from-file (make-tag-name tag) |
---|
131 | (lambda () (let ((key (read))) |
---|
132 | (if (eof-object? key) |
---|
133 | #f ; Treat empty file as no tag |
---|
134 | key)))) |
---|
135 | #f)) |
---|
136 | (lambda () ; all-tags |
---|
137 | (let |
---|
138 | ((tag-path-regexp (regexp (make-tag-name "(.*)")))) |
---|
139 | (map |
---|
140 | (lambda (path) |
---|
141 | (cadr (string-match tag-path-regexp path))) |
---|
142 | (glob (make-tag-name "*"))))) |
---|
143 | (lambda (tag) ; remove-tag! |
---|
144 | (if (file-write-access? (make-tag-name tag)) |
---|
145 | (begin |
---|
146 | (delete-file (make-tag-name tag)) |
---|
147 | (when (file-exists? (make-tag-lock-name tag)) |
---|
148 | (delete-file (make-tag-lock-name tag)))) |
---|
149 | #f)) |
---|
150 | (lambda (tag) ; lock-tag! |
---|
151 | ; Ensure tag file exists first, as an empty file if necessary |
---|
152 | (file-close (file-open (make-tag-name tag) (+ open/wronly open/append open/creat))) |
---|
153 | (condition-case |
---|
154 | (begin |
---|
155 | (file-link (make-tag-name tag) (make-tag-lock-name tag)) |
---|
156 | #t) |
---|
157 | ((exn i/o file) |
---|
158 | #f))) ; If we can't create it for any reason, we haven't got the lock; it'd be nicer to check errno = EEXIST, though, and raise an exception for other errors. |
---|
159 | (lambda (tag) ; tag-locked? |
---|
160 | (not (not (file-exists? (make-tag-lock-name tag))))) |
---|
161 | (lambda (tag) ; unlock-tag! |
---|
162 | (delete-file (make-tag-lock-name tag)) |
---|
163 | (void)) |
---|
164 | (lambda () ; close! |
---|
165 | (void)))) |
---|
166 | |
---|
167 | (define splitlog-sql-schema |
---|
168 | (list |
---|
169 | "CREATE TABLE metadata (key TEXT PRIMARY KEY, value TEXT);" |
---|
170 | "INSERT INTO metadata VALUES ('version','1');" |
---|
171 | "CREATE TABLE blocks (key TEXT PRIMARY KEY, type TEXT, fileno INTEGER, position INTEGER, length INTEGER);" |
---|
172 | "CREATE TABLE tags (tag TEXT PRIMARY KEY, key TEXT, locked INTEGER DEFAULT 0);")) |
---|
173 | |
---|
174 | (define (backend-splitlog logdir metapath max-logpart-size) |
---|
175 | (let* |
---|
176 | ((*db* |
---|
177 | (let ((db (open-database metapath))) |
---|
178 | (change-file-mode metapath (bitwise-ior perm/irusr perm/iwusr)) ; Don't think we can do anything about the journal files, though. |
---|
179 | (when (null? (schema db)) |
---|
180 | (for-each (lambda (statement) |
---|
181 | (exec (sql db statement))) |
---|
182 | splitlog-sql-schema)) |
---|
183 | (exec (sql db "BEGIN;")) |
---|
184 | db)) |
---|
185 | |
---|
186 | ; Prepared statements |
---|
187 | (get-metadata-query (sql *db* "SELECT value FROM metadata WHERE key = ?")) |
---|
188 | (set-metadata-query (sql *db* "INSERT OR REPLACE INTO metadata (key,value) VALUES (?,?)")) |
---|
189 | (get-block-data-query (sql *db* "SELECT type, fileno, position, length FROM blocks WHERE key = ?")) |
---|
190 | (set-block-data-query (sql *db* "INSERT INTO blocks (key,type,fileno,position,length) VALUES (?,?,?,?,?)")) |
---|
191 | (get-tag-query (sql *db* "SELECT key FROM tags WHERE tag = ?")) |
---|
192 | (set-tag-query (sql *db* "INSERT OR REPLACE INTO tags (tag,key) VALUES (?,?)")) |
---|
193 | (remove-tag-query (sql *db* "DELETE FROM tags WHERE tag = ?")) |
---|
194 | (set-tag-lock-query (sql *db* "UPDATE tags SET locked = ? WHERE tag = ?")) |
---|
195 | (get-tag-lock-query (sql *db* "SELECT locked FROM tags WHERE tag = ?")) |
---|
196 | (get-tags-query (sql *db* "SELECT tag FROM tags")) |
---|
197 | |
---|
198 | ; Database access functions |
---|
199 | (get-metadata (lambda (key default) |
---|
200 | (let ((result (query fetch get-metadata-query key))) |
---|
201 | (if (null? result) |
---|
202 | (begin |
---|
203 | (exec set-metadata-query key default) |
---|
204 | default) |
---|
205 | (car result))))) |
---|
206 | (set-metadata (lambda (key value) |
---|
207 | (exec set-metadata-query key value))) |
---|
208 | |
---|
209 | ; Log file management |
---|
210 | (*logcount* (string->number (get-metadata "current-logfile" "0"))) |
---|
211 | (set-logcount! (lambda (newcount) |
---|
212 | (set! *logcount* newcount))) |
---|
213 | (*log* (file-open (string-append logdir "/log" (number->string *logcount*)) |
---|
214 | (bitwise-ior open/creat open/rdwr open/append) (bitwise-ior perm/irusr perm/iwusr))) |
---|
215 | (*logfiles* (make-hash-table)) ; hash of file number to FD |
---|
216 | (get-log (lambda (index) |
---|
217 | (if (hash-table-exists? *logfiles* index) |
---|
218 | (hash-table-ref *logfiles* index) |
---|
219 | (begin |
---|
220 | (let ((fd (file-open (string-append logdir "/log" (number->string index)) open/rdonly perm/irwxu))) |
---|
221 | (set! (hash-table-ref *logfiles* index) fd) |
---|
222 | fd))))) |
---|
223 | |
---|
224 | ; Periodic commit management |
---|
225 | (commit-interval (string->number (get-metadata "commit-interval" "1000"))) |
---|
226 | (*updates-since-last-commit* 0) |
---|
227 | (flush! (lambda () |
---|
228 | (when (> *updates-since-last-commit* 0) |
---|
229 | (set-metadata "current-logfile" (number->string *logcount*)) |
---|
230 | (exec (sql *db* "COMMIT;")) |
---|
231 | (exec (sql *db* "BEGIN;")) |
---|
232 | (set! *updates-since-last-commit* 0)))) |
---|
233 | (maybe-flush! (lambda () |
---|
234 | (inc! *updates-since-last-commit*) |
---|
235 | (when (> *updates-since-last-commit* commit-interval) |
---|
236 | (flush!)))) |
---|
237 | |
---|
238 | ; Higher-level database utilities |
---|
239 | (get-block-data (lambda (key) ; Returns #f for nonexistant blocks |
---|
240 | (let ((bd (query fetch get-block-data-query key))) |
---|
241 | (if (pair? bd) |
---|
242 | (let ((type (string->symbol (first bd))) |
---|
243 | (fileno (second bd)) |
---|
244 | (position (third bd)) |
---|
245 | (length (fourth bd))) |
---|
246 | (list type fileno position length)) |
---|
247 | #f)))) |
---|
248 | |
---|
249 | (set-block-data! (lambda (key type fileno position length) |
---|
250 | (exec set-block-data-query key (symbol->string type) fileno position length) |
---|
251 | (maybe-flush!))) |
---|
252 | |
---|
253 | (set-tag! (lambda (tag key) |
---|
254 | (exec set-tag-query tag key) |
---|
255 | (flush!))) |
---|
256 | |
---|
257 | (remove-tag! (lambda (tag) |
---|
258 | (exec remove-tag-query tag) |
---|
259 | (flush!))) |
---|
260 | |
---|
261 | (get-tag (lambda (tag) |
---|
262 | (let ((td (query fetch get-tag-query tag))) |
---|
263 | (if (pair? td) |
---|
264 | (if (null? (car td)) ; treat NULL as no tag |
---|
265 | #f |
---|
266 | (car td)) |
---|
267 | #f)))) |
---|
268 | |
---|
269 | (set-tag-lock! (lambda (tag lock) |
---|
270 | (exec set-tag-lock-query lock tag) |
---|
271 | (flush!))) |
---|
272 | |
---|
273 | (get-tag-lock (lambda (tag lock) |
---|
274 | (let ((td (query fetch get-tag-lock-query tag))) |
---|
275 | (if (pair? td) |
---|
276 | (car td) |
---|
277 | (begin ; Tag does not exist, create it on demand |
---|
278 | (set-tag! tag '()) ; insert NULL tag record |
---|
279 | 0))))) |
---|
280 | |
---|
281 | (get-tags (lambda () |
---|
282 | (map car (query fetch-all get-tags-query))))) |
---|
283 | |
---|
284 | (make-storage |
---|
285 | (* 1024 1024) ; 1MiB blocks since local disk is fast and cheap, right? |
---|
286 | #t ; We are writable |
---|
287 | #f ; We DO NOT support unlink! |
---|
288 | |
---|
289 | (lambda (key data type) ; put! |
---|
290 | (when (pair? (get-block-data key)) |
---|
291 | (signal (make-property-condition 'exn 'message "Duplicate block: put! should not be called on an existing hash" 'arguments (list key type)))) |
---|
292 | |
---|
293 | (set-file-position! *log* 0 seek/end) |
---|
294 | |
---|
295 | (let ((header (sprintf "(block ~S ~S ~S)" key type (u8vector-length data))) |
---|
296 | (posn (file-position *log*))) |
---|
297 | (if (and (not (zero? posn)) (> (+ (u8vector-length data) (string-length header) posn) max-logpart-size)) |
---|
298 | (begin |
---|
299 | (file-close *log*) |
---|
300 | (set! posn 0) |
---|
301 | (set-logcount! (+ *logcount* 1)) |
---|
302 | (set! *log* (file-open (string-append logdir "/log" (number->string *logcount*)) |
---|
303 | (bitwise-ior open/creat open/rdwr open/append) (bitwise-ior perm/irusr perm/iwusr))))) |
---|
304 | (file-write *log* header) |
---|
305 | (file-write *log* (u8vector->blob/shared data)) |
---|
306 | (set-block-data! key type *logcount* (+ (string-length header) posn) (u8vector-length data)) |
---|
307 | (void))) |
---|
308 | |
---|
309 | (lambda () ; flush! |
---|
310 | (flush!) |
---|
311 | (void)) |
---|
312 | |
---|
313 | (lambda (key) ; exists? |
---|
314 | (let ((bd (get-block-data key))) |
---|
315 | (if (pair? bd) |
---|
316 | (car bd) |
---|
317 | #f))) |
---|
318 | |
---|
319 | (lambda (key) ; get |
---|
320 | (let* ((entry (get-block-data key))) |
---|
321 | (if (pair? entry) |
---|
322 | (let* ((type (first entry)) |
---|
323 | (index (second entry)) |
---|
324 | (position (third entry)) |
---|
325 | (length (fourth entry)) |
---|
326 | (buffer (make-blob length)) |
---|
327 | (logpart (get-log index))) |
---|
328 | (set-file-position! logpart position seek/set) |
---|
329 | (file-read logpart length buffer) |
---|
330 | (blob->u8vector/shared buffer)) |
---|
331 | #f))) |
---|
332 | |
---|
333 | (lambda (key) ; link! |
---|
334 | (void)) |
---|
335 | |
---|
336 | (lambda (key) ; unlink! |
---|
337 | (signal (make-property-condition 'exn 'message "Log archives do not support deletion"))) |
---|
338 | |
---|
339 | (lambda (tag key) ; set-tag! |
---|
340 | (file-write *log* (sprintf "(tag ~S ~S)" tag key)) |
---|
341 | (set-tag! tag key) |
---|
342 | (void)) |
---|
343 | (lambda (tag) ; tag |
---|
344 | (get-tag tag)) |
---|
345 | (lambda () ; all-tags |
---|
346 | (get-tags)) |
---|
347 | (lambda (tag) ; remove-tag! |
---|
348 | (remove-tag! tag) |
---|
349 | (void)) |
---|
350 | (lambda (tag) ; lock-tag! |
---|
351 | (flush!) |
---|
352 | (let ((existing-lock? (not (zero? (get-tag-lock tag))))) |
---|
353 | (if existing-lock? |
---|
354 | (begin |
---|
355 | #f) |
---|
356 | (begin |
---|
357 | (set-tag-lock! tag 1) |
---|
358 | (flush!) |
---|
359 | #t)))) |
---|
360 | (lambda (tag) ; tag-locked? |
---|
361 | (if (zero? (get-tag-lock tag)) |
---|
362 | #f |
---|
363 | #t)) |
---|
364 | (lambda (tag) ; unlock-tag! |
---|
365 | (set-tag-lock! tag 0) |
---|
366 | (flush!)) |
---|
367 | (lambda () ; close! |
---|
368 | (flush!) |
---|
369 | (exec (sql *db* "COMMIT;")) |
---|
370 | (close-database *db*) |
---|
371 | (file-close *log*) |
---|
372 | (hash-table-for-each *logfiles* |
---|
373 | (lambda (key value) |
---|
374 | (file-close value))))))) |
---|
375 | |
---|
376 | (define backend |
---|
377 | (match (command-line-arguments) |
---|
378 | (("fs" base) |
---|
379 | (backend-fs base)) |
---|
380 | |
---|
381 | (("splitlog" logdir metadir max-logpart-size) |
---|
382 | (backend-splitlog logdir metadir (string->number max-logpart-size))) |
---|
383 | |
---|
384 | (else |
---|
385 | (printf "USAGE:\nbackend-fs fs <basedir-path>\nbackend-fs splitlog <logdir-path> <metadata-file-path> <max-file-size>\n") |
---|
386 | #f))) |
---|
387 | |
---|
388 | (if backend |
---|
389 | (export-storage! backend)) |
---|