source: project/chicken/trunk/scheduler.scm @ 11979

Last change on this file since 11979 was 11979, checked in by felix winkelmann, 11 years ago

refactoring of timeout-list removal, also used in ##sys#thread-kill\! (thanks to Joerg Wittenberger)

File size: 17.4 KB
Line 
1; scheduler.scm - Basic scheduler for multithreading
2;
3; Copyright (c) 2000-2007, Felix L. Winkelmann
4; Copyright (c) 2008, The Chicken Team
5; All rights reserved.
6;
7; Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following
8; conditions are met:
9;
10;   Redistributions of source code must retain the above copyright notice, this list of conditions and the following
11;     disclaimer.
12;   Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following
13;     disclaimer in the documentation and/or other materials provided with the distribution.
14;   Neither the name of the author nor the names of its contributors may be used to endorse or promote
15;     products derived from this software without specific prior written permission.
16;
17; THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS
18; OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
19; AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR
20; CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21; CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
22; SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
23; THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
24; OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
25; POSSIBILITY OF SUCH DAMAGE.
26
27
28(declare
29  (fixnum)
30  (unit scheduler)
31  (disable-interrupts)
32  (usual-integrations)
33  (emit-exports "scheduler.exports")
34  (disable-warning var)
35  (hide ##sys#ready-queue-head ##sys#ready-queue-tail ##sys#timeout-list
36        ##sys#update-thread-state-buffer ##sys#restore-thread-state-buffer
37        ##sys#remove-from-ready-queue ##sys#unblock-threads-for-i/o ##sys#force-primordial
38        ##sys#fdset-input-set ##sys#fdset-output-set ##sys#fdset-clear
39        ##sys#fdset-select-timeout ##sys#fdset-restore ##sys#remove-from-timeout-list
40        ##sys#clear-i/o-state-for-thread!) 
41  (foreign-declare #<<EOF
42#ifdef HAVE_ERRNO_H
43# include <errno.h>
44# define C_signal_interrupted_p     C_mk_bool(errno == EINTR)
45#else
46# define C_signal_interrupted_p     C_SCHEME_FALSE
47#endif
48
49#ifdef _WIN32
50# if _MSC_VER > 1300
51# include <winsock2.h>
52# include <ws2tcpip.h>
53# else
54# include <winsock.h>
55# endif
56/* Beware: winsock2.h must come BEFORE windows.h */
57# define C_msleep(n)     (Sleep(C_unfix(n)), C_SCHEME_TRUE)
58#else
59# include <unistd.h>
60# include <sys/types.h>
61# include <sys/time.h>
62# include <time.h>
63static C_word C_msleep(C_word ms);
64C_word C_msleep(C_word ms) {
65#ifdef __CYGWIN__
66  if(usleep(C_unfix(ms) * 1000) == -1) return C_SCHEME_FALSE;
67#else
68  struct timespec ts;
69  unsigned long mss = C_unfix(ms);
70  ts.tv_sec = mss / 1000;
71  ts.tv_nsec = (mss % 1000) * 1000000;
72 
73  if(nanosleep(&ts, NULL) == -1) return C_SCHEME_FALSE;
74#endif
75  return C_SCHEME_TRUE;
76}
77#endif
78static fd_set C_fdset_input, C_fdset_output, C_fdset_input_2, C_fdset_output_2;
79#define C_fd_test_input(fd)  C_mk_bool(FD_ISSET(C_unfix(fd), &C_fdset_input))
80#define C_fd_test_output(fd)  C_mk_bool(FD_ISSET(C_unfix(fd), &C_fdset_output))
81EOF
82) )
83
84(cond-expand
85 [paranoia]
86 [else
87  (declare (unsafe)) ] )
88
89
90(define-macro (dbg . args) #f)
91#;(define-macro (dbg . args)
92  `(print "DBG: " ,@args) )
93
94
95(define (##sys#schedule)
96  (define (switch thread)
97    (dbg "switching to " thread)
98    (set! ##sys#current-thread thread)
99    (##sys#setslot thread 3 'running)
100    (##sys#restore-thread-state-buffer thread)
101    (##core#inline "C_set_initial_timer_interrupt_period" (##sys#slot thread 9))
102    ((##sys#slot thread 1)) )
103  (let* ([ct ##sys#current-thread]
104         [eintr #f]
105         [cts (##sys#slot ct 3)] )
106    (dbg "scheduling, current: " ct ", ready: " ##sys#ready-queue-head)
107    (##sys#update-thread-state-buffer ct)
108    ;; Put current thread on ready-queue:
109    (when (or (eq? cts 'running) (eq? cts 'ready)) ; should ct really be 'ready? - normally not.
110      (##sys#setislot ct 13 #f)                    ; clear timeout-unblock flag
111      (##sys#add-to-ready-queue ct) )
112    (let loop1 ()
113      ;; Unblock threads waiting for timeout:
114      (unless (null? ##sys#timeout-list)
115        (let ([now (##sys#fudge 16)])
116          (dbg "timeout (" now ") list: " ##sys#timeout-list)
117          (let loop ([lst ##sys#timeout-list])
118            (if (null? lst)
119                (set! ##sys#timeout-list '())
120                (let* ([tmo1 (caar lst)]
121                       [tto (cdar lst)]
122                       [tmo2 (##sys#slot tto 4)] )
123                  (dbg "  " tto " -> " tmo2)
124                  (if (eq? tmo1 tmo2)
125                      (if (>= now tmo1)
126                          (begin
127                            (##sys#setislot tto 13 #t) ; mark as being unblocked by timeout
128                            (##sys#clear-i/o-state-for-thread! tto)
129                            ;;(pp `(CLEARED: ,tto ,@##sys#fd-list) ##sys#standard-error) ;***
130                            (##sys#thread-basic-unblock! tto)
131                            (loop (cdr lst)) )
132                          (begin
133                            (set! ##sys#timeout-list lst) 
134                            ;; If there are no threads blocking on a select call (fd-list)
135                            ;; but there are threads in the timeout list then sleep for
136                            ;; the number of milliseconds of next thread to wake up.
137                            (when (and (null? ##sys#ready-queue-head)
138                                       (null? ##sys#fd-list) 
139                                       (pair? ##sys#timeout-list))
140                              (let ([tmo1 (caar ##sys#timeout-list)])
141                                (set! eintr
142                                  (and (not (##core#inline "C_msleep" (fxmax 0 (- tmo1 now))))
143                                       (foreign-value "C_signal_interrupted_p" bool) ) ) ) ) ) )
144                      (loop (cdr lst)) ) ) ) ) ) )
145      ;; Unblock threads blocked by I/O:
146      (if eintr
147          (##sys#force-primordial)
148          (begin
149            (unless (null? ##sys#fd-list)
150              (##sys#unblock-threads-for-i/o) ) ) )
151      ;; Fetch and activate next ready thread:
152      (let loop2 ()
153        (let ([nt (##sys#remove-from-ready-queue)])
154          (cond [(not nt) 
155                 (if (and (null? ##sys#timeout-list) (null? ##sys#fd-list))
156                     (##sys#signal-hook #:runtime-error "deadlock")
157                     (loop1) ) ]
158                [(eq? (##sys#slot nt 3) 'ready) (switch nt)]
159                [else (loop2)] ) ) ) ) ) )
160
161(define (##sys#force-primordial)
162  (dbg "primordial thread forced due to interrupt")
163  (##sys#thread-unblock! ##sys#primordial-thread) )
164
165(define ##sys#ready-queue-head '())
166(define ##sys#ready-queue-tail '())
167
168(define (##sys#ready-queue) ##sys#ready-queue-head)
169
170(define (##sys#add-to-ready-queue thread)
171  (##sys#setslot thread 3 'ready)
172  (let ((new-pair (cons thread '())))
173    (cond ((eq? '() ##sys#ready-queue-head) 
174           (set! ##sys#ready-queue-head new-pair))
175          (else (set-cdr! ##sys#ready-queue-tail new-pair)) )
176    (set! ##sys#ready-queue-tail new-pair) ) )
177
178(define (##sys#remove-from-ready-queue)
179  (let ((first-pair ##sys#ready-queue-head))
180    (and (not (null? first-pair))
181         (let ((first-cdr (cdr first-pair)))
182           (set! ##sys#ready-queue-head first-cdr)
183           (when (eq? '() first-cdr) (set! ##sys#ready-queue-tail '()))
184           (car first-pair) ) ) ) )
185
186(define (##sys#update-thread-state-buffer thread)
187  (let ([buf (##sys#slot thread 5)])
188    (##sys#setslot buf 0 ##sys#dynamic-winds)
189    (##sys#setslot buf 1 ##sys#standard-input)
190    (##sys#setslot buf 2 ##sys#standard-output)
191    (##sys#setslot buf 3 ##sys#standard-error)
192    (##sys#setslot buf 4 ##sys#current-exception-handler)
193    (##sys#setslot buf 5 ##sys#current-parameter-vector) ) )
194
195(define (##sys#restore-thread-state-buffer thread)
196  (let ([buf (##sys#slot thread 5)])
197    (set! ##sys#dynamic-winds (##sys#slot buf 0))
198    (set! ##sys#standard-input (##sys#slot buf 1))
199    (set! ##sys#standard-output (##sys#slot buf 2))
200    (set! ##sys#standard-error (##sys#slot buf 3)) 
201    (set! ##sys#current-exception-handler (##sys#slot buf 4))
202    (set! ##sys#current-parameter-vector (##sys#slot buf 5)) ) )
203
204(set! ##sys#interrupt-hook
205  (let ([oldhook ##sys#interrupt-hook])
206    (lambda (reason state)
207      (when (fx= reason 255)            ; C_TIMER_INTERRUPT_NUMBER
208        (let ([ct ##sys#current-thread])
209          (##sys#setslot ct 1 (lambda () (oldhook reason state))) 
210          (##sys#schedule) ) )          ; expected not to return!
211      (oldhook reason state) ) ) )
212
213(define ##sys#timeout-list '())
214
215(define (##sys#remove-from-timeout-list t)
216  (let loop ((l ##sys#timeout-list) (prev #f))
217    (if (null? l)
218        l
219        (let ((h (##sys#slot l 0))
220              (r (##sys#slot l 1)))
221          (if (eq? (##sys#slot h 1) t)
222              (if prev
223                  (set-cdr! prev r)
224                  (set! ##sys#timeout-list r))
225              (loop r l))))))
226
227(define (##sys#thread-block-for-timeout! t tm)
228  (dbg t " blocks for " tm)
229  ;; This should really use a balanced tree:
230  (let loop ([tl ##sys#timeout-list] [prev #f])
231    (if (or (null? tl) (< tm (caar tl)))
232        (if prev
233            (set-cdr! prev (cons (cons tm t) tl))
234            (set! ##sys#timeout-list (cons (cons tm t) tl)) )
235        (loop (cdr tl) tl) ) ) 
236  (##sys#setslot t 3 'blocked)
237  (##sys#setislot t 13 #f)
238  (##sys#setislot t 4 tm) )
239
240(define (##sys#thread-block-for-termination! t t2)
241  (dbg t " blocks for " t2)
242  (let ([state (##sys#slot t2 3)])
243    (unless (or (eq? state 'dead) (eq? state 'terminated))
244      (##sys#setslot t2 12 (cons t (##sys#slot t2 12)))
245      (##sys#setslot t 3 'blocked) 
246      (##sys#setislot t 13 #f)
247      (##sys#setslot t 11 t2) ) ) )
248
249(define (##sys#thread-kill! t s)
250  (dbg "killing: " t " -> " s ", recipients: " (##sys#slot t 12))
251  (##sys#abandon-mutexes t)
252  (##sys#setslot t 3 s)
253  (##sys#setislot t 4 #f)
254  (##sys#setislot t 11 #f)
255  (##sys#setislot t 8 '())
256  (##sys#remove-from-timeout-list t)
257  (let ([rs (##sys#slot t 12)])
258    (unless (null? rs)
259      (for-each
260       (lambda (t2)
261         (dbg "  checking: " t2 " (" (##sys#slot t2 3) ") -> " (##sys#slot t2 11))
262         (when (eq? (##sys#slot t2 11) t)
263           (##sys#thread-basic-unblock! t2) ) )
264       rs) ) )
265  (##sys#setislot t 12 '()) )
266
267(define (##sys#thread-basic-unblock! t)
268  (dbg "unblocking: " t)
269  (##sys#setislot t 11 #f)
270  (##sys#setislot t 4 #f)
271  (##sys#add-to-ready-queue t) )
272
273(define ##sys#default-exception-handler
274  (let ([print-error-message print-error-message]
275        [display display]
276        [print-call-chain print-call-chain]
277        [open-output-string open-output-string]
278        [get-output-string get-output-string] )
279    (lambda (arg)
280      (let ([ct ##sys#current-thread])
281        (dbg "exception: " ct " -> " (if (##sys#structure? arg 'condition) (##sys#slot arg 2) arg))
282        (cond [(foreign-value "C_abort_on_thread_exceptions" bool)
283               (let* ([pt ##sys#primordial-thread]
284                      [ptx (##sys#slot pt 1)] )
285                 (##sys#setslot 
286                  pt 1 
287                  (lambda ()
288                    (##sys#signal arg)
289                    (ptx) ) )
290                 (##sys#thread-unblock! pt) ) ]
291              [##sys#warnings-enabled
292               (let ([o (open-output-string)])
293                 (display "Warning (" o)
294                 (display ct o)
295                 (display "): " o)
296                 (print-error-message arg ##sys#standard-error (get-output-string o))
297                 (print-call-chain ##sys#standard-error 0 ct) ) ] )
298        (##sys#setslot ct 7 arg)
299        (##sys#thread-kill! ct 'terminated)
300        (##sys#schedule) ) ) ) )
301
302
303;;; `select()'-based blocking:
304
305(define ##sys#fd-list '())
306
307(define ##sys#fdset-select-timeout
308  (foreign-lambda* int ([bool to] [unsigned-long tm])
309    "struct timeval timeout;"
310    "timeout.tv_sec = tm / 1000;"
311    "timeout.tv_usec = (tm % 1000) * 1000;"
312    "C_fdset_input_2 = C_fdset_input;"
313    "C_fdset_output_2 = C_fdset_output;"
314    "return(select(FD_SETSIZE, &C_fdset_input, &C_fdset_output, NULL, to ? &timeout : NULL));") )
315
316(define ##sys#fdset-restore
317  (foreign-lambda* void ()
318    "C_fdset_input = C_fdset_input_2;"
319    "C_fdset_output = C_fdset_output_2;") )
320
321((foreign-lambda* void ()
322   "FD_ZERO(&C_fdset_input);"
323   "FD_ZERO(&C_fdset_output);") )
324
325(define ##sys#fdset-input-set
326  (foreign-lambda* void ([int fd])
327    "FD_SET(fd, &C_fdset_input);" ) )
328
329(define ##sys#fdset-output-set
330  (foreign-lambda* void ([int fd])
331    "FD_SET(fd, &C_fdset_output);" ) )
332
333(define ##sys#fdset-clear
334  (foreign-lambda* void ([int fd])
335    "FD_CLR(fd, &C_fdset_input_2);"
336    "FD_CLR(fd, &C_fdset_output_2);") )
337
338(define (##sys#thread-block-for-i/o! t fd i/o)
339  (dbg t " blocks for I/O " fd)
340  (let loop ([lst ##sys#fd-list])
341    (if (null? lst) 
342        (set! ##sys#fd-list (cons (list fd t) ##sys#fd-list)) 
343        (let ([a (car lst)])
344          (if (fx= fd (car a)) 
345              (##sys#setslot a 1 (cons t (cdr a)))
346              (loop (cdr lst)) ) ) ) )
347  (case i/o
348    ((#t #:input) (##sys#fdset-input-set fd))
349    ((#f #:output) (##sys#fdset-output-set fd))
350    ((#:all)
351     (##sys#fdset-input-set fd)
352     (##sys#fdset-output-set fd) ) )
353  (##sys#setslot t 3 'blocked)
354  (##sys#setislot t 13 #f)
355  (##sys#setslot t 11 (cons fd i/o)) )
356
357(define (##sys#unblock-threads-for-i/o)
358  (dbg "fd-list: " ##sys#fd-list)
359  (let* ([to? (pair? ##sys#timeout-list)]
360         [rq? (pair? ##sys#ready-queue-head)]
361         [n (##sys#fdset-select-timeout ; we use FD_SETSIZE, but really should use max fd
362             (or rq? to?)
363             (if (and to? (not rq?))    ; no thread was unblocked by timeout, so wait
364                 (let* ([tmo1 (caar ##sys#timeout-list)]
365                        [now (##sys#fudge 16)])
366                   (fxmax 0 (- tmo1 now)) )
367                 0) ) ] )               ; otherwise immediate timeout.
368    (dbg n " fds ready")
369    (cond [(eq? -1 n) 
370           (##sys#force-primordial)]
371          [(fx> n 0)
372           (set! ##sys#fd-list
373             (let loop ([n n] [lst ##sys#fd-list])
374               (if (or (zero? n) (null? lst))
375                   lst
376                   (let* ([a (car lst)]
377                          [fd (car a)]
378                          [inf (##core#inline "C_fd_test_input" fd)]
379                          [outf (##core#inline "C_fd_test_output" fd)] )
380                     (dbg "fd " fd " ready: input=" inf ", output=" outf)
381                     (if (or inf outf)
382                         (let loop2 ([threads (cdr a)])
383                           (if (null? threads) 
384                               (begin
385                                 (##sys#fdset-clear fd)
386                                 (loop (sub1 n) (cdr lst)) )
387                               (let* ([t (car threads)]
388                                      [p (##sys#slot t 11)] )
389                                 (when (and (pair? p)
390                                            (eq? fd (car p))
391                                            (not (##sys#slot t 13) ) ) ; not unblocked by timeout
392                                   (##sys#thread-basic-unblock! t) )
393                                 (loop2 (cdr threads)) ) ) )
394                         (cons a (loop n (cdr lst))) ) ) ) ) ) ] )
395    (##sys#fdset-restore) ) )
396
397
398;;; Clear I/O state for unblocked thread
399
400(define (##sys#clear-i/o-state-for-thread! t)
401  (when (pair? (##sys#slot t 11))
402    (let ((fd (##sys#slot (##sys#slot t 11) 0)))
403      (set! ##sys#fd-list
404        (let loop ([lst ##sys#fd-list])
405          (if (null? lst)
406              '()
407              (let* ([a (##sys#slot lst 0)]
408                     [fd2 (##sys#slot a 0)] )
409                (if (eq? fd fd2)
410                    (let ((ts (##sys#delq t (##sys#slot a 1)))) ; remove from fd-list entry
411                      (cond ((null? ts)
412                             ;;(pp `(CLEAR FD: ,fd ,t) ##sys#standard-error)
413                             (##sys#fdset-clear fd) ; no more threads waiting for this fd
414                             (##sys#fdset-restore)
415                             (##sys#slot lst 1) )
416                            (else
417                             (##sys#setslot a 1 ts) ; fd-list entry is list with t removed
418                             lst) ) )
419                    (cons a (loop (##sys#slot lst 1)))))))))))
420
421
422;;; Get list of all threads that are ready or waiting for timeout or waiting for I/O:
423
424(define (##sys#all-threads)
425  (append ##sys#ready-queue-head
426          (apply append (map cdr ##sys#fd-list))
427          (map cdr ##sys#timeout-list)))
428
429
430;;; Remove all waiting threads from the relevant queues with the exception of the current thread:
431
432(define (##sys#fetch-and-clear-threads)
433  (let ([all (vector ##sys#ready-queue-head ##sys#ready-queue-tail ##sys#fd-list ##sys#timeout-list)])
434    (set! ##sys#ready-queue-head '())
435    (set! ##sys#ready-queue-tail '())
436    (set! ##sys#fd-list '())
437    (set! ##sys#timeout-list '()) 
438    all) )
439
440
441;;; Restore list of waiting threads:
442
443(define (##sys#restore-threads vec)
444  (set! ##sys#ready-queue-head (##sys#slot vec 0))
445  (set! ##sys#ready-queue-tail (##sys#slot vec 1))
446  (set! ##sys#fd-list (##sys#slot vec 2))
447  (set! ##sys#timeout-list (##sys#slot vec 3)) )
448
449
450;;; Unblock thread cleanly:
451
452(define (##sys#thread-unblock! t)
453  (when (eq? 'blocked (##sys#slot t 3))
454    (##sys#remove-from-timeout-list t)
455    (set! ##sys#fd-list 
456      (let loop ([fdl ##sys#fd-list])
457        (if (null? fdl)
458            '()
459            (let ([a (##sys#slot fdl 0)])
460              (cons
461               (cons (##sys#slot a 0)
462                     (##sys#delq t (##sys#slot a 1)) )
463               (loop (##sys#slot fdl 1)) ) ) ) ) )
464    (##sys#setislot t 12 '())
465    (##sys#thread-basic-unblock! t) ) )
466
467
468;;; Multithreaded breakpoints
469
470(define (##sys#break-entry name args)
471  (when (or (not ##sys#break-in-thread) (eq? ##sys#break-in-thread ##sys#current-thread))
472    (##sys#call-with-current-continuation
473     (lambda (k)
474       (let* ((pk (if (eq? ##sys#current-thread ##sys#primordial-thread)
475                      '()
476                      (list '(exn . thread) ##sys#current-thread
477                            '(exn . primordial-continuation) 
478                            (lambda _ ((##sys#slot ##sys#primordial-thread 1))))))
479              (exn (##sys#make-structure
480                    'condition
481                    '(exn breakpoint)
482                    (append
483                     (list '(exn . message) "*** breakpoint ***"
484                           '(exn . arguments) (cons name args)
485                           '(exn . location) name
486                           '(exn . continuation) k)
487                     pk) ) ) )
488         (set! ##sys#last-breakpoint exn)
489         (cond ((eq? ##sys#current-thread ##sys#primordial-thread)
490                (##sys#signal exn) )
491               (else
492                (##sys#setslot ##sys#current-thread 3 'suspended)
493                (##sys#setslot ##sys#current-thread 1 (lambda () (k (##core#undefined))))
494                (let ([old (##sys#slot ##sys#primordial-thread 1)])
495                  (##sys#setslot
496                   ##sys#primordial-thread 1
497                   (lambda ()
498                     (##sys#signal exn)
499                     (old) ) )
500                  (##sys#thread-unblock! ##sys#primordial-thread)
501                  (##sys#schedule) ) ) ) ) ) ) ) )
502                 
503(define (##sys#break-resume exn)
504  ;; assumes current-thread is primordial
505  (let* ((props (##sys#slot exn 2))
506         (a (member '(exn . continuation) props))
507         (t (member '(exn . thread) props))
508         (pk (or (member '(exn . primordial-continuation) props) a)))
509    (when t
510      (let ((t (cadr t)))
511        (if a
512            (##sys#setslot t 1 (lambda () ((cadr a) (##core#undefined))))
513            (##sys#signal-hook #:type-error "condition has no continuation" exn) )
514        (##sys#add-to-ready-queue t) ) )
515    (if pk
516        ((cadr pk) (##core#undefined))
517        (##sys#signal-hook #:type-error "condition has no continuation" exn) ) ) )
Note: See TracBrowser for help on using the repository browser.