source: project/chicken/trunk/scheduler.scm @ 11918

Last change on this file since 11918 was 11918, checked in by felix winkelmann, 11 years ago

deleting thread from timeout-list in ##sys#thread-unblock\! was incorrect [thanks to Joerg Wittenberger]

File size: 17.3 KB
Line 
1; scheduler.scm - Basic scheduler for multithreading
2;
3; Copyright (c) 2000-2007, Felix L. Winkelmann
4; Copyright (c) 2008, The Chicken Team
5; All rights reserved.
6;
7; Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following
8; conditions are met:
9;
10;   Redistributions of source code must retain the above copyright notice, this list of conditions and the following
11;     disclaimer.
12;   Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following
13;     disclaimer in the documentation and/or other materials provided with the distribution.
14;   Neither the name of the author nor the names of its contributors may be used to endorse or promote
15;     products derived from this software without specific prior written permission.
16;
17; THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS
18; OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
19; AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR
20; CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21; CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
22; SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
23; THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
24; OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
25; POSSIBILITY OF SUCH DAMAGE.
26
27
28(declare
29  (fixnum)
30  (unit scheduler)
31  (disable-interrupts)
32  (usual-integrations)
33  (emit-exports "scheduler.exports")
34  (disable-warning var)
35  (hide ##sys#ready-queue-head ##sys#ready-queue-tail ##sys#timeout-list
36        ##sys#update-thread-state-buffer ##sys#restore-thread-state-buffer
37        ##sys#remove-from-ready-queue ##sys#unblock-threads-for-i/o ##sys#force-primordial
38        ##sys#fdset-input-set ##sys#fdset-output-set ##sys#fdset-clear
39        ##sys#fdset-select-timeout ##sys#fdset-restore
40        ##sys#clear-i/o-state-for-thread!) 
41  (foreign-declare #<<EOF
42#ifdef HAVE_ERRNO_H
43# include <errno.h>
44# define C_signal_interrupted_p     C_mk_bool(errno == EINTR)
45#else
46# define C_signal_interrupted_p     C_SCHEME_FALSE
47#endif
48
49#ifdef _WIN32
50# if _MSC_VER > 1300
51# include <winsock2.h>
52# include <ws2tcpip.h>
53# else
54# include <winsock.h>
55# endif
56/* Beware: winsock2.h must come BEFORE windows.h */
57# define C_msleep(n)     (Sleep(C_unfix(n)), C_SCHEME_TRUE)
58#else
59# include <unistd.h>
60# include <sys/types.h>
61# include <sys/time.h>
62# include <time.h>
63static C_word C_msleep(C_word ms);
64C_word C_msleep(C_word ms) {
65#ifdef __CYGWIN__
66  if(usleep(C_unfix(ms) * 1000) == -1) return C_SCHEME_FALSE;
67#else
68  struct timespec ts;
69  unsigned long mss = C_unfix(ms);
70  ts.tv_sec = mss / 1000;
71  ts.tv_nsec = (mss % 1000) * 1000000;
72 
73  if(nanosleep(&ts, NULL) == -1) return C_SCHEME_FALSE;
74#endif
75  return C_SCHEME_TRUE;
76}
77#endif
78static fd_set C_fdset_input, C_fdset_output, C_fdset_input_2, C_fdset_output_2;
79#define C_fd_test_input(fd)  C_mk_bool(FD_ISSET(C_unfix(fd), &C_fdset_input))
80#define C_fd_test_output(fd)  C_mk_bool(FD_ISSET(C_unfix(fd), &C_fdset_output))
81EOF
82) )
83
84(cond-expand
85 [paranoia]
86 [else
87  (declare (unsafe)) ] )
88
89
90(define-macro (dbg . args) #f)
91#;(define-macro (dbg . args)
92  `(print "DBG: " ,@args) )
93
94
95(define (##sys#schedule)
96  (define (switch thread)
97    (dbg "switching to " thread)
98    (set! ##sys#current-thread thread)
99    (##sys#setslot thread 3 'running)
100    (##sys#restore-thread-state-buffer thread)
101    (##core#inline "C_set_initial_timer_interrupt_period" (##sys#slot thread 9))
102    ((##sys#slot thread 1)) )
103  (let* ([ct ##sys#current-thread]
104         [eintr #f]
105         [cts (##sys#slot ct 3)] )
106    (dbg "scheduling, current: " ct ", ready: " ##sys#ready-queue-head)
107    (##sys#update-thread-state-buffer ct)
108    ;; Put current thread on ready-queue:
109    (when (or (eq? cts 'running) (eq? cts 'ready)) ; should ct really be 'ready? - normally not.
110      (##sys#setislot ct 13 #f)                    ; clear timeout-unblock flag
111      (##sys#add-to-ready-queue ct) )
112    (let loop1 ()
113      ;; Unblock threads waiting for timeout:
114      (unless (null? ##sys#timeout-list)
115        (let ([now (##sys#fudge 16)])
116          (dbg "timeout (" now ") list: " ##sys#timeout-list)
117          (let loop ([lst ##sys#timeout-list])
118            (if (null? lst)
119                (set! ##sys#timeout-list '())
120                (let* ([tmo1 (caar lst)]
121                       [tto (cdar lst)]
122                       [tmo2 (##sys#slot tto 4)] )
123                  (dbg "  " tto " -> " tmo2)
124                  (if (eq? tmo1 tmo2)
125                      (if (>= now tmo1)
126                          (begin
127                            (##sys#setislot tto 13 #t) ; mark as being unblocked by timeout
128                            (##sys#clear-i/o-state-for-thread! tto)
129                            ;;(pp `(CLEARED: ,tto ,@##sys#fd-list) ##sys#standard-error) ;***
130                            (##sys#thread-basic-unblock! tto)
131                            (loop (cdr lst)) )
132                          (begin
133                            (set! ##sys#timeout-list lst) 
134                            ;; If there are no threads blocking on a select call (fd-list)
135                            ;; but there are threads in the timeout list then sleep for
136                            ;; the number of milliseconds of next thread to wake up.
137                            (when (and (null? ##sys#ready-queue-head)
138                                       (null? ##sys#fd-list) 
139                                       (pair? ##sys#timeout-list))
140                              (let ([tmo1 (caar ##sys#timeout-list)])
141                                (set! eintr
142                                  (and (not (##core#inline "C_msleep" (fxmax 0 (- tmo1 now))))
143                                       (foreign-value "C_signal_interrupted_p" bool) ) ) ) ) ) )
144                      (loop (cdr lst)) ) ) ) ) ) )
145      ;; Unblock threads blocked by I/O:
146      (if eintr
147          (##sys#force-primordial)
148          (begin
149            (unless (null? ##sys#fd-list)
150              (##sys#unblock-threads-for-i/o) ) ) )
151      ;; Fetch and activate next ready thread:
152      (let loop2 ()
153        (let ([nt (##sys#remove-from-ready-queue)])
154          (cond [(not nt) 
155                 (if (and (null? ##sys#timeout-list) (null? ##sys#fd-list))
156                     (##sys#signal-hook #:runtime-error "deadlock")
157                     (loop1) ) ]
158                [(eq? (##sys#slot nt 3) 'ready) (switch nt)]
159                [else (loop2)] ) ) ) ) ) )
160
161(define (##sys#force-primordial)
162  (dbg "primordial thread forced due to interrupt")
163  (##sys#thread-unblock! ##sys#primordial-thread) )
164
165(define ##sys#ready-queue-head '())
166(define ##sys#ready-queue-tail '())
167
168(define (##sys#ready-queue) ##sys#ready-queue-head)
169
170(define (##sys#add-to-ready-queue thread)
171  (##sys#setslot thread 3 'ready)
172  (let ((new-pair (cons thread '())))
173    (cond ((eq? '() ##sys#ready-queue-head) 
174           (set! ##sys#ready-queue-head new-pair))
175          (else (set-cdr! ##sys#ready-queue-tail new-pair)) )
176    (set! ##sys#ready-queue-tail new-pair) ) )
177
178(define (##sys#remove-from-ready-queue)
179  (let ((first-pair ##sys#ready-queue-head))
180    (and (not (null? first-pair))
181         (let ((first-cdr (cdr first-pair)))
182           (set! ##sys#ready-queue-head first-cdr)
183           (when (eq? '() first-cdr) (set! ##sys#ready-queue-tail '()))
184           (car first-pair) ) ) ) )
185
186(define (##sys#update-thread-state-buffer thread)
187  (let ([buf (##sys#slot thread 5)])
188    (##sys#setslot buf 0 ##sys#dynamic-winds)
189    (##sys#setslot buf 1 ##sys#standard-input)
190    (##sys#setslot buf 2 ##sys#standard-output)
191    (##sys#setslot buf 3 ##sys#standard-error)
192    (##sys#setslot buf 4 ##sys#current-exception-handler)
193    (##sys#setslot buf 5 ##sys#current-parameter-vector) ) )
194
195(define (##sys#restore-thread-state-buffer thread)
196  (let ([buf (##sys#slot thread 5)])
197    (set! ##sys#dynamic-winds (##sys#slot buf 0))
198    (set! ##sys#standard-input (##sys#slot buf 1))
199    (set! ##sys#standard-output (##sys#slot buf 2))
200    (set! ##sys#standard-error (##sys#slot buf 3)) 
201    (set! ##sys#current-exception-handler (##sys#slot buf 4))
202    (set! ##sys#current-parameter-vector (##sys#slot buf 5)) ) )
203
204(set! ##sys#interrupt-hook
205  (let ([oldhook ##sys#interrupt-hook])
206    (lambda (reason state)
207      (when (fx= reason 255)            ; C_TIMER_INTERRUPT_NUMBER
208        (let ([ct ##sys#current-thread])
209          (##sys#setslot ct 1 (lambda () (oldhook reason state))) 
210          (##sys#schedule) ) )          ; expected not to return!
211      (oldhook reason state) ) ) )
212
213(define ##sys#timeout-list '())
214
215(define (##sys#thread-block-for-timeout! t tm)
216  (dbg t " blocks for " tm)
217  ;; This should really use a balanced tree:
218  (let loop ([tl ##sys#timeout-list] [prev #f])
219    (if (or (null? tl) (< tm (caar tl)))
220        (if prev
221            (set-cdr! prev (cons (cons tm t) tl))
222            (set! ##sys#timeout-list (cons (cons tm t) tl)) )
223        (loop (cdr tl) tl) ) ) 
224  (##sys#setslot t 3 'blocked)
225  (##sys#setislot t 13 #f)
226  (##sys#setislot t 4 tm) )
227
228(define (##sys#thread-block-for-termination! t t2)
229  (dbg t " blocks for " t2)
230  (let ([state (##sys#slot t2 3)])
231    (unless (or (eq? state 'dead) (eq? state 'terminated))
232      (##sys#setslot t2 12 (cons t (##sys#slot t2 12)))
233      (##sys#setslot t 3 'blocked) 
234      (##sys#setislot t 13 #f)
235      (##sys#setslot t 11 t2) ) ) )
236
237(define (##sys#thread-kill! t s)
238  (dbg "killing: " t " -> " s ", recipients: " (##sys#slot t 12))
239  (##sys#abandon-mutexes t)
240  (##sys#setslot t 3 s)
241  (##sys#setislot t 4 #f)
242  (##sys#setislot t 11 #f)
243  (##sys#setislot t 8 '())
244  (let ([rs (##sys#slot t 12)])
245    (unless (null? rs)
246      (for-each
247       (lambda (t2)
248         (dbg "  checking: " t2 " (" (##sys#slot t2 3) ") -> " (##sys#slot t2 11))
249         (when (eq? (##sys#slot t2 11) t)
250           (##sys#thread-basic-unblock! t2) ) )
251       rs) ) )
252  (##sys#setislot t 12 '()) )
253
254(define (##sys#thread-basic-unblock! t)
255  (dbg "unblocking: " t)
256  (##sys#setislot t 11 #f)
257  (##sys#setislot t 4 #f)
258  (##sys#add-to-ready-queue t) )
259
260(define ##sys#default-exception-handler
261  (let ([print-error-message print-error-message]
262        [display display]
263        [print-call-chain print-call-chain]
264        [open-output-string open-output-string]
265        [get-output-string get-output-string] )
266    (lambda (arg)
267      (let ([ct ##sys#current-thread])
268        (dbg "exception: " ct " -> " (if (##sys#structure? arg 'condition) (##sys#slot arg 2) arg))
269        (cond [(foreign-value "C_abort_on_thread_exceptions" bool)
270               (let* ([pt ##sys#primordial-thread]
271                      [ptx (##sys#slot pt 1)] )
272                 (##sys#setslot 
273                  pt 1 
274                  (lambda ()
275                    (##sys#signal arg)
276                    (ptx) ) )
277                 (##sys#thread-unblock! pt) ) ]
278              [##sys#warnings-enabled
279               (let ([o (open-output-string)])
280                 (display "Warning (" o)
281                 (display ct o)
282                 (display "): " o)
283                 (print-error-message arg ##sys#standard-error (get-output-string o))
284                 (print-call-chain ##sys#standard-error 0 ct) ) ] )
285        (##sys#setslot ct 7 arg)
286        (##sys#thread-kill! ct 'terminated)
287        (##sys#schedule) ) ) ) )
288
289
290;;; `select()'-based blocking:
291
292(define ##sys#fd-list '())
293
294(define ##sys#fdset-select-timeout
295  (foreign-lambda* int ([bool to] [unsigned-long tm])
296    "struct timeval timeout;"
297    "timeout.tv_sec = tm / 1000;"
298    "timeout.tv_usec = (tm % 1000) * 1000;"
299    "C_fdset_input_2 = C_fdset_input;"
300    "C_fdset_output_2 = C_fdset_output;"
301    "return(select(FD_SETSIZE, &C_fdset_input, &C_fdset_output, NULL, to ? &timeout : NULL));") )
302
303(define ##sys#fdset-restore
304  (foreign-lambda* void ()
305    "C_fdset_input = C_fdset_input_2;"
306    "C_fdset_output = C_fdset_output_2;") )
307
308((foreign-lambda* void ()
309   "FD_ZERO(&C_fdset_input);"
310   "FD_ZERO(&C_fdset_output);") )
311
312(define ##sys#fdset-input-set
313  (foreign-lambda* void ([int fd])
314    "FD_SET(fd, &C_fdset_input);" ) )
315
316(define ##sys#fdset-output-set
317  (foreign-lambda* void ([int fd])
318    "FD_SET(fd, &C_fdset_output);" ) )
319
320(define ##sys#fdset-clear
321  (foreign-lambda* void ([int fd])
322    "FD_CLR(fd, &C_fdset_input_2);"
323    "FD_CLR(fd, &C_fdset_output_2);") )
324
325(define (##sys#thread-block-for-i/o! t fd i/o)
326  (dbg t " blocks for I/O " fd)
327  (let loop ([lst ##sys#fd-list])
328    (if (null? lst) 
329        (set! ##sys#fd-list (cons (list fd t) ##sys#fd-list)) 
330        (let ([a (car lst)])
331          (if (fx= fd (car a)) 
332              (##sys#setslot a 1 (cons t (cdr a)))
333              (loop (cdr lst)) ) ) ) )
334  (case i/o
335    ((#t #:input) (##sys#fdset-input-set fd))
336    ((#f #:output) (##sys#fdset-output-set fd))
337    ((#:all)
338     (##sys#fdset-input-set fd)
339     (##sys#fdset-output-set fd) ) )
340  (##sys#setslot t 3 'blocked)
341  (##sys#setislot t 13 #f)
342  (##sys#setslot t 11 (cons fd i/o)) )
343
344(define (##sys#unblock-threads-for-i/o)
345  (dbg "fd-list: " ##sys#fd-list)
346  (let* ([to? (pair? ##sys#timeout-list)]
347         [rq? (pair? ##sys#ready-queue-head)]
348         [n (##sys#fdset-select-timeout ; we use FD_SETSIZE, but really should use max fd
349             (or rq? to?)
350             (if (and to? (not rq?))    ; no thread was unblocked by timeout, so wait
351                 (let* ([tmo1 (caar ##sys#timeout-list)]
352                        [now (##sys#fudge 16)])
353                   (fxmax 0 (- tmo1 now)) )
354                 0) ) ] )               ; otherwise immediate timeout.
355    (dbg n " fds ready")
356    (cond [(eq? -1 n) 
357           (##sys#force-primordial)]
358          [(fx> n 0)
359           (set! ##sys#fd-list
360             (let loop ([n n] [lst ##sys#fd-list])
361               (if (or (zero? n) (null? lst))
362                   lst
363                   (let* ([a (car lst)]
364                          [fd (car a)]
365                          [inf (##core#inline "C_fd_test_input" fd)]
366                          [outf (##core#inline "C_fd_test_output" fd)] )
367                     (dbg "fd " fd " ready: input=" inf ", output=" outf)
368                     (if (or inf outf)
369                         (let loop2 ([threads (cdr a)])
370                           (if (null? threads) 
371                               (begin
372                                 (##sys#fdset-clear fd)
373                                 (loop (sub1 n) (cdr lst)) )
374                               (let* ([t (car threads)]
375                                      [p (##sys#slot t 11)] )
376                                 (when (and (pair? p)
377                                            (eq? fd (car p))
378                                            (not (##sys#slot t 13) ) ) ; not unblocked by timeout
379                                   (##sys#thread-basic-unblock! t) )
380                                 (loop2 (cdr threads)) ) ) )
381                         (cons a (loop n (cdr lst))) ) ) ) ) ) ] )
382    (##sys#fdset-restore) ) )
383
384
385;;; Clear I/O state for unblocked thread
386
387(define (##sys#clear-i/o-state-for-thread! t)
388  (when (pair? (##sys#slot t 11))
389    (let ((fd (##sys#slot (##sys#slot t 11) 0)))
390      (set! ##sys#fd-list
391        (let loop ([lst ##sys#fd-list])
392          (if (null? lst)
393              '()
394              (let* ([a (##sys#slot lst 0)]
395                     [fd2 (##sys#slot a 0)] )
396                (if (eq? fd fd2)
397                    (let ((ts (##sys#delq t (##sys#slot a 1)))) ; remove from fd-list entry
398                      (cond ((null? ts)
399                             ;;(pp `(CLEAR FD: ,fd ,t) ##sys#standard-error)
400                             (##sys#fdset-clear fd) ; no more threads waiting for this fd
401                             (##sys#fdset-restore)
402                             (##sys#slot lst 1) )
403                            (else
404                             (##sys#setslot a 1 ts) ; fd-list entry is list with t removed
405                             lst) ) )
406                    (cons a (loop (##sys#slot lst 1)))))))))))
407
408
409;;; Get list of all threads that are ready or waiting for timeout or waiting for I/O:
410
411(define (##sys#all-threads)
412  (append ##sys#ready-queue-head
413          (apply append (map cdr ##sys#fd-list))
414          (map cdr ##sys#timeout-list)))
415
416
417;;; Remove all waiting threads from the relevant queues with the exception of the current thread:
418
419(define (##sys#fetch-and-clear-threads)
420  (let ([all (vector ##sys#ready-queue-head ##sys#ready-queue-tail ##sys#fd-list ##sys#timeout-list)])
421    (set! ##sys#ready-queue-head '())
422    (set! ##sys#ready-queue-tail '())
423    (set! ##sys#fd-list '())
424    (set! ##sys#timeout-list '()) 
425    all) )
426
427
428;;; Restore list of waiting threads:
429
430(define (##sys#restore-threads vec)
431  (set! ##sys#ready-queue-head (##sys#slot vec 0))
432  (set! ##sys#ready-queue-tail (##sys#slot vec 1))
433  (set! ##sys#fd-list (##sys#slot vec 2))
434  (set! ##sys#timeout-list (##sys#slot vec 3)) )
435
436
437;;; Unblock thread cleanly:
438
439(define (##sys#thread-unblock! t)
440  (when (eq? 'blocked (##sys#slot t 3))
441    (set! ##sys#timeout-list
442      (let loop ((l ##sys#timeout-list))
443        (if (null? l) 
444            l
445            (let ((h (##sys#slot l 0)))
446              (if (eq? (##sys#slot h 1) t)
447                  (##sys#slot l 1)
448                  (cons h (loop (##sys#slot l 1))))))))
449    (set! ##sys#fd-list 
450      (let loop ([fdl ##sys#fd-list])
451        (if (null? fdl)
452            '()
453            (let ([a (##sys#slot fdl 0)])
454              (cons
455               (cons (##sys#slot a 0)
456                     (##sys#delq t (##sys#slot a 1)) )
457               (loop (##sys#slot fdl 1)) ) ) ) ) )
458    (##sys#setislot t 12 '())
459    (##sys#thread-basic-unblock! t) ) )
460
461
462;;; Multithreaded breakpoints
463
464(define (##sys#break-entry name args)
465  (when (or (not ##sys#break-in-thread) (eq? ##sys#break-in-thread ##sys#current-thread))
466    (##sys#call-with-current-continuation
467     (lambda (k)
468       (let* ((pk (if (eq? ##sys#current-thread ##sys#primordial-thread)
469                      '()
470                      (list '(exn . thread) ##sys#current-thread
471                            '(exn . primordial-continuation) 
472                            (lambda _ ((##sys#slot ##sys#primordial-thread 1))))))
473              (exn (##sys#make-structure
474                    'condition
475                    '(exn breakpoint)
476                    (append
477                     (list '(exn . message) "*** breakpoint ***"
478                           '(exn . arguments) (cons name args)
479                           '(exn . location) name
480                           '(exn . continuation) k)
481                     pk) ) ) )
482         (set! ##sys#last-breakpoint exn)
483         (cond ((eq? ##sys#current-thread ##sys#primordial-thread)
484                (##sys#signal exn) )
485               (else
486                (##sys#setslot ##sys#current-thread 3 'suspended)
487                (##sys#setslot ##sys#current-thread 1 (lambda () (k (##core#undefined))))
488                (let ([old (##sys#slot ##sys#primordial-thread 1)])
489                  (##sys#setslot
490                   ##sys#primordial-thread 1
491                   (lambda ()
492                     (##sys#signal exn)
493                     (old) ) )
494                  (##sys#thread-unblock! ##sys#primordial-thread)
495                  (##sys#schedule) ) ) ) ) ) ) ) )
496                 
497(define (##sys#break-resume exn)
498  ;; assumes current-thread is primordial
499  (let* ((props (##sys#slot exn 2))
500         (a (member '(exn . continuation) props))
501         (t (member '(exn . thread) props))
502         (pk (or (member '(exn . primordial-continuation) props) a)))
503    (when t
504      (let ((t (cadr t)))
505        (if a
506            (##sys#setslot t 1 (lambda () ((cadr a) (##core#undefined))))
507            (##sys#signal-hook #:type-error "condition has no continuation" exn) )
508        (##sys#add-to-ready-queue t) ) )
509    (if pk
510        ((cadr pk) (##core#undefined))
511        (##sys#signal-hook #:type-error "condition has no continuation" exn) ) ) )
Note: See TracBrowser for help on using the repository browser.