source: project/release/5/thread-utils/trunk/thread-reaper.scm @ 38565

Last change on this file since 38565 was 38565, checked in by Kon Lovett, 6 months ago

*-test runner, add zombie-thread concept, not pure, test for shutdown after requesting shutdown

File size: 8.7 KB
Line 
1;;;; thread-reaper.scm
2;;;; Kon Lovett, Oct '09
3
4;; Issues
5;;
6;; - What's up w/ thread-yield!
7;;
8;; - Could allow the stopping of an existing reaper and the startup of another.
9;; Make '*stopping?*' thread thunk local w/ a set/get behavior.
10
11; Used by threads that are cleanly terminating and wish to 'join' the
12; primordial thread w/o any user intervention. (A thread that attempts
13; to 'join' itself will cause a deadlock.)
14;
15; The "reaped" thread's end-exception, if any, is printed as a warning.
16;
17; The reaper can be stopped at any time
18
19(module thread-reaper
20
21(;export
22  zombie-threads
23  thread-reaper-shutdown?
24  thread-reap!
25  thread-reaper-stop!
26  ;
27  thread-reaper-greedy thread-reaper-greedy-set!
28  thread-reaper-quantum thread-reaper-quantum-set!
29  thread-reaper-wait-seconds thread-reaper-wait-seconds-set!
30  thread-reaper-timeout thread-reaper-timeout-set!
31  thread-reaper-retries thread-reaper-retries-set!)
32
33(import scheme)
34(import (chicken base))
35(import (chicken type))
36(import (only (chicken condition) handle-exceptions))
37(import (only queues queue-empty? queue-remove! make-queue queue-add! queue->list))
38(import (only (srfi 18) thread-name thread-sleep! thread-join! thread-yield!
39  thread-start! make-thread thread-quantum-set! thread-quantum
40  terminated-thread-exception? uncaught-exception?))
41(import (only miscmacros until))
42(import (only synch-object make-synch-with-object))
43(import (prefix (only synch-dyn synch-with) dyn:))
44(import (only synch-open %synch-with))
45(import (only record-variants define-record-type-variant))
46(import (only thread-utils check-thread print-exception-warning))
47(import (only type-checks check-positive-number check-natural-fixnum))
48
49;;
50
51(define (->boolean x) (and x #t))
52
53;;
54
55;modes: normal & stopping (during reaper termination)
56
57(define-constant REAPER-WAIT-SECONDS 1.0)       ;reaper wait time
58
59(define-constant NORMAL-REAPER-QUANTUM 10)      ;reaper thread normal
60(define-constant STOPPING-REAPER-QUANTUM 100)
61
62(define-constant NORMAL-REAPER-TIMEOUT #f)      ;reaped thread join wait
63(define-constant STOPPING-REAPER-TIMEOUT 1.0)
64
65(define-constant NORMAL-REAPER-RETRIES 1)       ;reaped thread reap retries
66(define-constant STOPPING-REAPER-RETRIES 0)
67
68;;
69
70;Local to this module
71(define-constant reap-item 'reap-item) ;type tag variable
72(define-record-type-variant reap-item (unsafe unchecked inline)
73  (make-reap-item th to rt)
74  reap-item?
75  (th reap-item-thread)
76  (to reap-item-timeout set-reap-item-timeout!)
77  (rt reap-item-retries set-reap-item-retries!) )
78
79;;
80
81(define *zombie-threads* (make-queue))  ;queue of threads cannot reap
82
83(define-inline (empty-zombies) (set! *zombie-threads* (make-queue)))
84
85(define (zombie-threads) (queue->list *zombie-threads*))
86
87(define REAP-TIMED-OUT '(reap-timed-out))
88
89(define (reap-queue-thread thq)
90  (let* (
91    (ri (queue-remove! thq))
92    (th (reap-item-thread ri)) )
93    ;unhandled-exception
94    (handle-exceptions exn
95      (let ()
96        ;(queue-add! thq ri) ;∞-loop possible if put back offender
97        (queue-add! *zombie-threads* th)
98        (print-exception-warning exn) )
99      (let ((res (thread-join! th (reap-item-timeout ri) REAP-TIMED-OUT)))
100        ;timed-out?
101        (if (not (eq? REAP-TIMED-OUT res))
102          res
103          ;try again?
104          (let ((rt (reap-item-retries ri)))
105            (if (zero? rt)
106              (warning "cannot reap thread" (reap-item-thread ri))
107              (begin
108                (set-reap-item-retries! ri (sub1 rt))
109                (queue-add! thq ri) ) ) ) ) ) ) ) )
110
111(define-inline (reap-thread-queue-top thq)
112  (empty-zombies)
113  (unless (queue-empty? thq) (reap-queue-thread thq)) )
114
115(define-inline (reap-thread-queue thq)
116  (empty-zombies)
117  (until (queue-empty? thq) (reap-queue-thread thq)) )
118
119;;
120
121(define *threads* #f) ;queue of threads to reap
122
123(define *reaper-thread* #f) ;needs a separate thread since asynch
124
125(define *greedy?* #f) ;reaper should empty the queue each time-slice
126
127(define *wait-seconds* REAPER-WAIT-SECONDS) ;reaper sleep time
128
129(define *stopping?* #f) ;reaper should cleanly stop
130(define *shutdown?* #f) ;program terminating
131
132(define *timeout* NORMAL-REAPER-TIMEOUT) ;reaped thread join timeout
133(define *retries* NORMAL-REAPER-RETRIES) ;reaped thread join attempts
134
135; Reapers
136
137;NOTE `reap-queue-thread' is under `handle-exceptions' so only synch-dyn
138;needed
139
140(define-inline (reap-all) (dyn:synch-with *threads* threads (reap-thread-queue threads)))
141(define-inline (reap-top) (dyn:synch-with *threads* threads (reap-thread-queue-top threads)))
142(define-inline (reap)     (if *greedy?* (reap-all) (reap-top)))
143
144; Reaper thread thunk
145
146(define (reaper)
147  (let loop ()
148    (if *stopping?* (reap-all)
149      (begin
150        (reap)
151        #; ;FIXME this causes busy loop!
152        (thread-yield!)
153        (thread-sleep! 1.0)
154        (loop) ) ) ) )
155
156(define (adjust-reap-items-for-stopping)
157  (%synch-with *threads* threads
158    (for-each
159      (lambda (ri)
160        (set-reap-item-retries! ri STOPPING-REAPER-RETRIES)
161        (set-reap-item-timeout! ri STOPPING-REAPER-TIMEOUT) )
162      (queue->list threads)) ) )
163
164;;
165
166(define (thread-reaper-shutdown!)
167  (set! *shutdown?* #t)
168  (thread-reaper-stop!) )
169
170(define (thread-reaper-start!)
171  ;ensure reasonable state anyway
172  (unless *threads*
173    ;only done once
174    (set! *threads* (make-synch-with-object (make-queue) '(queue/synch-)))
175    ;clean shutdown
176    (on-exit thread-reaper-shutdown!) )
177  ;no reaper?
178  (unless *reaper-thread*
179    (set! *stopping?* #f)
180    (set! *reaper-thread* (make-thread reaper 'thread-reaper))
181    (thread-quantum-set! *reaper-thread* NORMAL-REAPER-QUANTUM)
182    (thread-start! *reaper-thread*) ) )
183
184(define-inline (ensure-reaper) (unless *reaper-thread* (thread-reaper-start!)))
185
186;;; Public
187
188(define (thread-reaper-shutdown?) (or *stopping?* *shutdown?*))
189
190(define (thread-reap! th)
191  (check-thread 'thread-reap! th)
192  ;ignore request when cannot fulfill
193  (if (thread-reaper-shutdown?)
194    (warning "attempt to reap a thread as reaper winding up" th)
195    (begin
196      (ensure-reaper)
197      (%synch-with *threads* threads
198        (queue-add! threads (make-reap-item th *timeout* *retries*))) ) ) )
199
200
201(define (thread-reaper-stop!)
202  (when (and *reaper-thread* (not *stopping?*))
203    (let (
204      (th *reaper-thread*) )
205      ;bump up the time-slice so queue clears faster
206      (thread-quantum-set! th (max (thread-quantum th) STOPPING-REAPER-QUANTUM))
207      ;no long waits or retries
208      (adjust-reap-items-for-stopping)
209      ;tell reaper we're quits
210      (set! *stopping?* #t)
211      ;waits until queue empty
212      ;FIXME Timeout? Assuming each item joins/timed-out then not needed.
213      (thread-join! th)
214      ;no more reaping with this thread
215      (set! *reaper-thread* #f) ) ) )
216
217;;
218
219;"location" style calling
220
221(define (thread-reaper-greedy-set! flag) (set! *greedy?* (->boolean flag)))
222
223(define thread-reaper-greedy
224  (getter-with-setter
225    (lambda args
226      (if (null? args) *greedy?*
227        (begin
228          (warning 'thread-reaper-greedy "mutation deprecated")
229          (thread-reaper-greedy-set! (car args)) ) ) )
230     thread-reaper-greedy-set!))
231
232(define (thread-reaper-quantum-set! qt)
233  (unless *reaper-thread* (error 'thread-reaper-quantum-set! "reaper is not running"))
234  (unless (thread-reaper-shutdown?) (thread-quantum-set! *reaper-thread* qt)) )
235
236(define thread-reaper-quantum
237  (getter-with-setter
238    (lambda args
239      (unless *reaper-thread* (error 'thread-reaper-quantum "reaper is not running"))
240      (if (null? args) (thread-quantum *reaper-thread*)
241        (begin
242          (warning 'thread-reaper-quantum "mutation deprecated")
243          (thread-reaper-quantum-set! (car args)) ) ) )
244    thread-reaper-quantum-set!))
245
246(define (thread-reaper-wait-seconds-set! to)
247  (set! *wait-seconds* (check-positive-number 'thread-reaper-wait-seconds to)) )
248
249(define thread-reaper-wait-seconds
250  (getter-with-setter
251    (lambda args
252      (if (null? args) *wait-seconds*
253        (begin
254          (warning 'thread-reaper-wait-seconds "mutation deprecated")
255          (thread-reaper-wait-seconds-set! (car args)) ) ) )
256    thread-reaper-wait-seconds-set!))
257
258(define (thread-reaper-timeout-set! to)
259  (set! *timeout* (and to (check-positive-number 'thread-reaper-timeout to))) )
260
261(define thread-reaper-timeout
262  (getter-with-setter
263    (lambda args
264      (if (null? args) *timeout*
265        (begin
266          (warning 'thread-timeout "mutation deprecated")
267          (thread-reaper-timeout-set! (car args)) ) ) )
268    thread-reaper-timeout-set!))
269
270(define (thread-reaper-retries-set! rt)
271  (set! *retries* (check-natural-fixnum 'thread-reaper-retries rt)) )
272
273(define thread-reaper-retries
274  (getter-with-setter
275    (lambda args
276      (if (null? args) *retries*
277        (begin
278          (warning 'thread-reaper-retries "mutation deprecated")
279          (thread-reaper-retries-set! (car args)) ) ) )
280    thread-reaper-retries-set!))
281
282) ;module thread-reaper
Note: See TracBrowser for help on using the repository browser.