Changeset 15577 in project


Ignore:
Timestamp:
08/26/09 16:15:38 (10 years ago)
Author:
felix winkelmann
Message:

a mess

Location:
release/4/high-load-scheduler
Files:
1 added
1 edited

Legend:

Unmodified
Added
Removed
  • release/4/high-load-scheduler/scheduler.scm

    r14592 r15577  
    2828(declare
    2929  (fixnum)
    30   (unit scheduler)
    3130  (disable-interrupts)
    3231  (usual-integrations)
    33   (disable-warning var)
    34   (hide ##sys#ready-queue-head ##sys#ready-queue-tail ##sys#timeout-list
     32  (hide ##sys#ready-queue-head ##sys#ready-queue-tail
     33        ##sys#waiting-queue-head ##sys#waiting-queue-tail
     34        ##sys#timeout-list ##sys#timeout-list-head
    3535        ##sys#update-thread-state-buffer ##sys#restore-thread-state-buffer
    3636        ##sys#remove-from-ready-queue ##sys#unblock-threads-for-i/o ##sys#force-primordial
    3737        ##sys#fdset-input-set ##sys#fdset-output-set ##sys#fdset-clear
    3838        ##sys#fdset-select-timeout ##sys#fdset-restore
    39         ##sys#clear-i/o-state-for-thread!)
     39        ##sys#clear-i/o-state-for-thread!
     40        make-int-priority-queue-entry int-priority-queue-entry?
     41        int-priority-queue-color int-priority-queue-color-set!
     42        int-priority-queue-parent int-priority-queue-parent-set!
     43        int-priority-queue-left int-priority-queue-left-set!
     44        int-priority-queue-right int-priority-queue-right-set!
     45        int-priority-queue-index int-priority-queue-index-set!
     46        int-priority-queue-value int-priority-queue-value-set!
     47        int-priority-queue-before? int-priority-queue-match? int-priority-queue-index-before?
     48        int-priority-queue-init! int-priority-queue->rbtree
     49        int-priority-queue-lookup int-priority-queue-node-fold int-priority-queue-node-for-each
     50        int-priority-queue-node-insert! int-priority-queue-remove! int-priority-queue-reposition!
     51        int-priority-queue-empty? int-priority-queue-singleton?
     52        int-priority-queue-delete-min! int-priority-queue-delete!
     53        ##sys#fd-list-add-thread!
     54        )
    4055  (foreign-declare #<<EOF
    4156#ifdef HAVE_ERRNO_H
     
    4560# define C_signal_interrupted_p     C_SCHEME_FALSE
    4661#endif
     62# include <sys/stat.h>
    4763
    4864#ifdef _WIN32
     
    87103
    88104
     105(define-syntax dbg
     106  (syntax-rules ()
     107    ((_ . _) #f)))
     108#;(define-syntax dbg
     109(syntax-rules ()
     110  ((_ x ...) (begin (print x ...) (flush-output (current-output-port))))))
     111
    89112(cond-expand
    90  (hygienic-macros
    91   (define-syntax dbg
     113 (rbtree
     114
     115  (include "rbtree-s.scm")
     116
     117  ;; We shall replace that with a lolevel structure once.
     118
     119  (define-record-type <int-priority-queue>
     120    (make-int-priority-queue-entry color parent left right index value)
     121    int-priority-queue-entry?
     122    (color int-priority-queue-color int-priority-queue-color-set!)
     123    (parent int-priority-queue-parent int-priority-queue-parent-set!)
     124    (left int-priority-queue-left int-priority-queue-left-set!)
     125    (right int-priority-queue-right int-priority-queue-right-set!)
     126    (index int-priority-queue-index int-priority-queue-index-set!)
     127    (value int-priority-queue-value int-priority-queue-value-set!))
     128
     129  (define (int-priority-queue-before? node1 node2) ;; ordering function
     130    (fx< (int-priority-queue-index node1) (int-priority-queue-index node2)))
     131
     132  (define (int-priority-queue-match? key node)
     133    (eqv? key (int-priority-queue-index node)))
     134
     135  (define (int-priority-queue-index-before? key node)
     136    (fx< key (int-priority-queue-index node)))
     137
     138  (define-rbtree
     139    int-priority-queue-init!          ;; defined by define-rbtree
     140    int-priority-queue->rbtree        ;; defined by define-rbtree
     141    int-priority-queue-lookup         ;; defined by define-rbtree
     142    int-priority-queue-node-fold      ;; defined by define-rbtree
     143    int-priority-queue-node-for-each  ;; defined by define-rbtree
     144    int-priority-queue-node-insert!   ;; defined by define-rbtree
     145    int-priority-queue-remove!        ;; defined by define-rbtree
     146    int-priority-queue-reposition!    ;; defined by define-rbtree
     147    int-priority-queue-empty?         ;; defined by define-rbtree
     148    int-priority-queue-singleton?     ;; defined by define-rbtree
     149    int-priority-queue-match?
     150    int-priority-queue-index-before?
     151    int-priority-queue-before?
     152    int-priority-queue-color
     153    int-priority-queue-color-set!
     154    int-priority-queue-parent
     155    int-priority-queue-parent-set!
     156    int-priority-queue-left
     157    int-priority-queue-left-set!
     158    int-priority-queue-right
     159    int-priority-queue-right-set!
     160    int-priority-queue-right
     161    int-priority-queue-right-set!
     162    #f
     163    #f)
     164
     165  (define-inline (make-queue-entry k v)
     166    (make-int-priority-queue-entry #f #f #f #f k v))
     167
     168  (define-inline (make-int-priority-queue)
     169    (int-priority-queue-init! (make-int-priority-queue-entry #f #f #f #f #f #f)))
     170
     171  (define ##sys#timeout-list (make-int-priority-queue))
     172
     173  (define-inline (##sys#timeout-list-empty?) (int-priority-queue-empty? ##sys#timeout-list))
     174
     175  (define-inline (timeout-queue-next) (int-priority-queue-right ##sys#timeout-list))
     176
     177  (define-inline (timeout-queue-unqueue!)
     178    (int-priority-queue-remove! (timeout-queue-next)))
     179
     180  (define-inline (timeout-queue-insert-entry! entry)
     181    (int-priority-queue-node-insert! ##sys#timeout-list entry))
     182
     183  (define-inline (timeout-queue-remove-entry! entry)
     184    (int-priority-queue-remove! entry))
     185
     186  ) ;; rbtree
     187 (else
     188
     189  ;; Sorry for that, I don't know any better yet.
     190
     191  (define-syntax define-macro
    92192    (syntax-rules ()
    93       ((_ . _) #f))) )
     193      ((_ (name . llist) body ...)
     194       (define-syntax name
     195         (lambda (x r c)
     196           (apply (lambda llist body ...) (cdr x)))))
     197      ((_ name . body)
     198       (define-syntax name
     199         (lambda (x r c) (cdr x))))))
     200
     201  (define-macro (define-llrbtree-code
     202                  features
     203                  update
     204                  init-root-node!
     205                  t-lookup
     206                  t-min
     207                  t-fold
     208                  t-for-each
     209                  t-insert
     210                  t-delete
     211                  t-delete-min
     212                  t-empty?
     213                  t-k-eq?
     214                  t-k-<?
     215                  t-<?
     216                  left set-left!
     217                  right set-right!
     218                  color set-color!
     219                  set-leftmost!
     220                  )
     221    (make-llrbtree-code
     222     features
     223     (eval (cons 'lambda update))
     224     init-root-node!
     225     t-lookup
     226     t-min
     227     t-fold
     228     t-for-each
     229     t-insert
     230     t-delete
     231     t-delete-min
     232     t-empty?
     233     (eval (cons 'lambda t-k-eq?))
     234     (eval (cons 'lambda t-k-<?))
     235     (eval (cons 'lambda t-<?))
     236     left set-left!
     237     right set-right!
     238     color set-color!
     239     set-leftmost!
     240     ))
     241
     242  ;; We shall replace that with a lolevel structure once.
     243
     244  (define-record-type <int-priority-queue>
     245    (make-int-priority-queue-entry color left right index value)
     246    int-priority-queue-entry?
     247    (color int-priority-queue-color int-priority-queue-color-set!)
     248    (left int-priority-queue-left int-priority-queue-left-set!)
     249    (right int-priority-queue-right int-priority-queue-right-set!)
     250    (index int-priority-queue-index int-priority-queue-index-set!)
     251    (value int-priority-queue-value int-priority-queue-value-set!))
     252
     253  (define-inline (make-queue-entry k v)
     254    (make-int-priority-queue-entry #f #f #f k v))
     255
     256  (define-llrbtree-code
     257    (debug)
     258    ((node . args)
     259     `(let ((node ,node))
     260        . ,(let loop ((args args))
     261             (if (null? args)
     262                 '(node)
     263                 (cons
     264                  (case (car args)
     265                    ((color:) `(int-priority-queue-color-set! node ,(cadr args)))
     266                    ((left:) `(int-priority-queue-left-set! node ,(cadr args)))
     267                    ((right:) `(int-priority-queue-right-set! node ,(cadr args)))
     268                    (else (error  (format "unbrauchbar ~a" args))))
     269                  (loop (cddr args)))))))
     270    int-priority-queue-init!            ;; defined
     271    int-priority-queue-lookup           ;; defined
     272    #f                                  ;; no min defined
     273    int-priority-queue-node-fold        ;; defined
     274    int-priority-queue-node-for-each    ;; defined
     275    int-priority-queue-node-insert!     ;; defined
     276    int-priority-queue-node-delete!     ;; delete by node defined
     277    int-priority-queue-delete-min!      ;; defined
     278    int-priority-queue-empty?           ;; defined
     279    ((k n)
     280     `(fx= ,k (int-priority-queue-index ,n)))
     281    ((k n)
     282     `(fx<= ,k (int-priority-queue-index ,n)))
     283    ((n1 n2)
     284     `(fx<= (int-priority-queue-index ,n1) (int-priority-queue-index ,n2)))
     285    int-priority-queue-left
     286    int-priority-queue-left-set!
     287    int-priority-queue-right
     288    int-priority-queue-right-set!
     289    int-priority-queue-color
     290    int-priority-queue-color-set!
     291    #f)
     292
     293  #;(define-llrbtree-code
     294    (ordered)
     295    ((node . args)
     296     `(let ((node ,node))
     297        . ,(let loop ((args args))
     298             (if (null? args)
     299                 '(node)
     300                 (cons
     301                  (case (car args)
     302                    ((color:) `(int-priority-queue-color-set! node ,(cadr args)))
     303                    ((left:) `(int-priority-queue-parent-set! node ,(cadr args)))
     304                    ((right:) `(int-priority-queue-right-set! node ,(cadr args)))
     305                    (else (error  (format "unbrauchbar ~a" args))))
     306                  (loop (cddr args)))))))
     307    #f                                  ;; no init defined
     308    #f                                  ;; no lookup defined
     309    #f                                  ;; no min defined
     310    #f                                  ;; no fold defined
     311    #f                                  ;; no for-each defined
     312    #f                                  ;; no insert defined
     313    int-priority-queue-delete!          ;; delete by key defined
     314    #f                                  ;; no delete-min defined
     315    #f                                  ;; no empty? defined
     316    ((k n)
     317     `(fx= ,k (int-priority-queue-index ,n)))
     318    ((k n)
     319     `(fx< ,k (int-priority-queue-index ,n)))
     320    ((n1 n2)
     321     `(fx< (int-priority-queue-index ,n1) (int-priority-queue-index ,n2)))
     322    int-priority-queue-left
     323    int-priority-queue-left-set!
     324    int-priority-queue-right
     325    int-priority-queue-right-set!
     326    int-priority-queue-color
     327    int-priority-queue-color-set!
     328    #f)
     329
     330  (define ##sys#timeout-list (int-priority-queue-init! (make-queue-entry #f #f)))
     331
     332  (define ##sys#timeout-list-head #f)
     333
     334  (define-inline (##sys#timeout-list-empty?) (not ##sys#timeout-list-head))
     335
     336  (define-inline (timeout-queue-next) ##sys#timeout-list-head)
     337
     338  (define-inline (timeout-queue-unqueue!)
     339    (set! ##sys#timeout-list-head (int-priority-queue-delete-min! ##sys#timeout-list)))
     340
     341  (define-inline (timeout-queue-remove-entry! entry)
     342    (if (eq? ##sys#timeout-list-head entry)
     343        (timeout-queue-unqueue!)
     344        (int-priority-queue-node-delete! ##sys#timeout-list entry)))
     345
     346  (define-inline (timeout-queue-insert-entry! entry)
     347    (cond
     348     ((not ##sys#timeout-list-head)
     349      (set! ##sys#timeout-list-head entry))
     350     ((fx< (int-priority-queue-index entry)
     351           (int-priority-queue-index ##sys#timeout-list-head))
     352      (int-priority-queue-node-insert! ##sys#timeout-list ##sys#timeout-list-head)
     353      (set! ##sys#timeout-list-head entry))
     354     (else (int-priority-queue-node-insert! ##sys#timeout-list entry))))
     355
     356  ))
     357
     358(define (make-int-priority-queue)
     359  (int-priority-queue-init! (make-queue-entry #f #f)))
     360
     361(define ##sys#fd-list (make-int-priority-queue))
     362
     363(define-inline (##sys#fd-list-empty?) (int-priority-queue-empty? ##sys#fd-list))
     364
     365(cond-expand
     366 (rbtree
     367  (define-inline (fd-list-remove-entry! entry)
     368    (int-priority-queue-remove! entry)))
    94369 (else
    95   (define-macro (dbg . args) #f)
    96   #;(define-macro (dbg . args)
    97   `(print "DBG: " ,@args) ) ) )
    98 
     370  (define-inline (fd-list-remove-entry! entry)
     371    (int-priority-queue-node-delete! ##sys#fd-list entry))))
     372
     373(cond-expand
     374 (ready-queue-linear
     375
     376  (define ##sys#ready-queue-head '())
     377  (define ##sys#ready-queue-tail '())
     378
     379  (define (##sys#ready-queue) ##sys#ready-queue-head)
     380
     381  (define-inline (##sys#ready-queue-empty?) (eq? '() ##sys#ready-queue-head))
     382
     383  (define (##sys#add-to-ready-queue thread)
     384    (##sys#setslot thread 3 'ready)
     385    (let ((new-pair (cons thread '())))
     386      (cond ((##sys#ready-queue-empty?)
     387             (set! ##sys#ready-queue-head new-pair))
     388            (else (set-cdr! ##sys#ready-queue-tail new-pair)) )
     389      (set! ##sys#ready-queue-tail new-pair) ) )
     390
     391  (define-inline (##sys#remove-from-ready-queue)
     392    (let ((first-pair ##sys#ready-queue-head))
     393      (and (not (null? first-pair))
     394           (let ((first-cdr (cdr first-pair)))
     395             (set! ##sys#ready-queue-head first-cdr)
     396             (when (eq? '() first-cdr) (set! ##sys#ready-queue-tail '()))
     397             (car first-pair) ) ) ) )
     398
     399  (define-inline (queue-fold cns init q)
     400    (fold cns init q))
     401
     402  (define ##sys#waiting-queue-head '())
     403  (define ##sys#waiting-queue-tail '())
     404
     405  (define (##sys#waiting-queue) ##sys#waiting-queue-head)
     406
     407  (define-inline (##sys#waiting-queue-empty?) (eq? '() ##sys#waiting-queue-head))
     408
     409  (define-inline (##sys#add-to-waiting-queue thread)
     410    (##sys#setslot thread 3 'ready)
     411    (let ((new-pair (cons thread '())))
     412      (cond ((##sys#waiting-queue-empty?)
     413             (set! ##sys#waiting-queue-head new-pair))
     414            (else (set-cdr! ##sys#waiting-queue-tail new-pair)) )
     415      (set! ##sys#waiting-queue-tail new-pair) ) )
     416
     417  (define-inline (##sys#release-waiting-queue)
     418    (set! ##sys#ready-queue-head ##sys#waiting-queue-head)
     419    (set! ##sys#ready-queue-tail ##sys#waiting-queue-tail)
     420    (set! ##sys#waiting-queue-head '())
     421    (set! ##sys#waiting-queue-tail '()))
     422  )
     423 (else
     424
     425  (define-inline (make-vector-queue) '#(0 0 #(#f #f #f #f)))
     426
     427  (define-inline (vector-queue-empty? q) (eqv? (##sys#slot q 0) (##sys#slot q 1)))
     428
     429  (define-inline (vector-queue-size q)
     430    (let ((len (fx- (##sys#slot q 1) (##sys#slot q 0))))
     431      (if (fx< len 0)
     432          (fx+ len (##sys#size (##sys#slot q 2)))
     433          len)))
     434
     435  (define-inline (queue-grow! q v)
     436    (let ((v2 (##sys#make-vector (fx* (##sys#size v) 2) #f))
     437          (first (##sys#slot q 0))
     438          (last (##sys#slot q 1)) )
     439      (if (fx<= first last)
     440          (do ((i 0 (fx+ i 1))
     441               (j first (fx+ j 1)))
     442              ((fx>= j last) (##sys#setslot q 1 i))
     443            (##sys#setslot v2 i (##sys#slot v j)) )
     444          (let* ( (max (##sys#size v)) )
     445            (do ((i (do ((i 0 (fx+ i 1))
     446                         (j first (fx+ j 1)))
     447                        ((fx>= j max) i)
     448                      (##sys#setslot v2 i (##sys#slot v j)) )
     449                    (fx+ i 1))
     450                 (j 0 (fx+ j 1)))
     451                ((fx>= j last) (##sys#setslot q 1 i))
     452              (##sys#setslot v2 i (##sys#slot v j)) )))
     453      (##sys#setslot q 0 0)
     454      (##sys#setslot q 2 v2) ))
     455
     456  (define (##sys#vector-queue-add! q d)
     457    (let ((ns (add1 (vector-queue-size q)))
     458          (v (##sys#slot q 2)))
     459      (if (fx>= ns (##sys#size v))
     460          (begin
     461            (queue-grow! q v)
     462            (##sys#vector-queue-add! q d))
     463          (let* ((pos (##sys#slot q 1))
     464                 (next (add1 pos)))
     465            (if (eqv? next (##sys#size v)) (set! next 0))
     466            (##sys#setslot v pos d)
     467            (##sys#setslot q 1 next)))))
     468
     469  (define-inline (vector-queue-remove! q)
     470    (let ((pos (##sys#slot q 0)))
     471      (if (eqv? (##sys#slot q 1) pos) #f
     472          (let* ((v (##sys#slot q 2))
     473                 (d (##sys#slot v pos))
     474                 (n (add1 pos)))
     475            (##sys#setslot v pos #f)
     476            (if (eqv? n (##sys#size v)) (set! n 0))
     477            (##sys#setslot q 0 n)
     478            d))))
     479
     480  (define-inline (queue-fold cns init q)
     481    (let ((first (##sys#slot q 0))
     482          (last (##sys#slot q 1))
     483          (v (##sys#slot q 2)))
     484      (if (fx<= first last)
     485          (let loop ((i first))
     486            (if (eqv? i last) init
     487                (cns (##sys#slot v i) (loop (add1 i)))))
     488          (let loop ((i first))
     489            (if (eqv? i (##sys#size v))
     490                (let loop ((i 0))
     491                  (if (eqv? i last) init
     492                      (cns (##sys#slot v i) (loop (add1 i)))))
     493                (cns (##sys#slot v i) (loop (add1 i))))) ) ) )
     494
     495  (define ##sys#ready-queue-head (make-vector-queue))
     496
     497  (define (##sys#ready-queue) ##sys#ready-queue-head)
     498
     499  (define-inline (##sys#ready-queue-empty?) (vector-queue-empty? ##sys#ready-queue-head))
     500
     501  (define (##sys#add-to-ready-queue thread)
     502    (##sys#setslot thread 3 'ready)
     503    (##sys#vector-queue-add! ##sys#ready-queue-head thread) )
     504
     505  (define-inline (##sys#remove-from-ready-queue)
     506    (vector-queue-remove! ##sys#ready-queue-head) )
     507
     508  (define ##sys#waiting-queue-head (make-vector-queue))
     509
     510  (define (##sys#waiting-queue) ##sys#waiting-queue-head)
     511
     512  (define-inline (##sys#waiting-queue-empty?) (vector-queue-empty? ##sys#waiting-queue-head))
     513
     514  (define-inline (##sys#add-to-waiting-queue thread)
     515    (##sys#setslot thread 3 'ready)
     516    (##sys#vector-queue-add! ##sys#waiting-queue-head thread))
     517
     518  (define-inline (##sys#release-waiting-queue)
     519    (let ((rq ##sys#ready-queue-head))
     520      (set! ##sys#ready-queue-head ##sys#waiting-queue-head)
     521      (set! ##sys#waiting-queue-head rq)))
     522
     523  ))
    99524
    100525(define (##sys#schedule)
    101   (define (switch thread)
    102     (dbg "switching to " thread)
    103     (set! ##sys#current-thread thread)
    104     (##sys#setslot thread 3 'running)
    105     (##sys#restore-thread-state-buffer thread)
    106     (##core#inline "C_set_initial_timer_interrupt_period" (##sys#slot thread 9))
    107     ((##sys#slot thread 1)) )
    108526  (let* ([ct ##sys#current-thread]
    109          [eintr #f]
    110527         [cts (##sys#slot ct 3)] )
    111     (dbg "scheduling, current: " ct ", ready: " ##sys#ready-queue-head)
     528    (dbg "scheduling, current: " ct ", ready: " ##sys#ready-queue-head " waiting: " ##sys#waiting-queue-head)
    112529    (##sys#update-thread-state-buffer ct)
    113530    ;; Put current thread on ready-queue:
    114531    (when (or (eq? cts 'running) (eq? cts 'ready)) ; should ct really be 'ready? - normally not.
    115532      (##sys#setislot ct 13 #f)                    ; clear timeout-unblock flag
    116       (##sys#add-to-ready-queue ct) )
    117     (let loop1 ()
    118       ;; Unblock threads waiting for timeout:
    119       (unless (null? ##sys#timeout-list)
    120         (let ([now (##sys#fudge 16)])
    121           (dbg "timeout (" now ") list: " ##sys#timeout-list)
    122           (let loop ([lst ##sys#timeout-list])
    123             (if (null? lst)
    124                 (set! ##sys#timeout-list '())
    125                 (let* ([tmo1 (caar lst)]
    126                        [tto (cdar lst)]
    127                        [tmo2 (##sys#slot tto 4)] )
    128                   (dbg "  " tto " -> " tmo2)
    129                   (if (eq? tmo1 tmo2)
    130                       (if (>= now tmo1)
    131                           (begin
    132                             (##sys#setislot tto 13 #t) ; mark as being unblocked by timeout
    133                             (##sys#clear-i/o-state-for-thread! tto)
    134                             ;;(pp `(CLEARED: ,tto ,@##sys#fd-list) ##sys#standard-error) ;***
    135                             (##sys#thread-basic-unblock! tto)
    136                             (loop (cdr lst)) )
    137                           (begin
    138                             (set! ##sys#timeout-list lst)
    139                             ;; If there are no threads blocking on a select call (fd-list)
    140                             ;; but there are threads in the timeout list then sleep for
    141                             ;; the number of milliseconds of next thread to wake up.
    142                             (when (and (null? ##sys#ready-queue-head)
    143                                        (null? ##sys#fd-list)
    144                                        (pair? ##sys#timeout-list))
    145                               (let ([tmo1 (caar ##sys#timeout-list)])
    146                                 (set! eintr
    147                                   (and (not (##core#inline "C_msleep" (fxmax 0 (- tmo1 now))))
    148                                        (foreign-value "C_signal_interrupted_p" bool) ) ) ) ) ) )
    149                       (loop (cdr lst)) ) ) ) ) ) )
    150       ;; Unblock threads blocked by I/O:
    151       (if eintr
    152           (##sys#force-primordial)
    153           (begin
    154             (unless (null? ##sys#fd-list)
    155               (##sys#unblock-threads-for-i/o) ) ) )
    156       ;; Fetch and activate next ready thread:
    157       (let loop2 ()
    158         (let ([nt (##sys#remove-from-ready-queue)])
    159           (cond [(not nt)
    160                  (if (and (null? ##sys#timeout-list) (null? ##sys#fd-list))
    161                      (##sys#signal-hook #:runtime-error "deadlock")
    162                      (loop1) ) ]
    163                 [(eq? (##sys#slot nt 3) 'ready) (switch nt)]
    164                 [else (loop2)] ) ) ) ) ) )
     533      (##sys#add-to-waiting-queue ct) )
     534    ;; Fetch and activate next ready thread:
     535    (let loop ([nt (##sys#remove-from-ready-queue)])
     536      (cond
     537       [(not nt)
     538        (##sys#release-waiting-queue)
     539        ;; Unblock threads waiting for timeout:
     540        (unless (##sys#timeout-list-empty?)
     541           (if (##sys#unblock-threads-for-timeout!)
     542               (##sys#force-primordial)))
     543        ;; Unblock threads blocked by I/O:
     544        (unless (##sys#fd-list-empty?)
     545           (##sys#unblock-threads-for-i/o) )
     546        (if (and (##sys#ready-queue-empty?)
     547                 (##sys#timeout-list-empty?)
     548                 (##sys#fd-list-empty?))
     549            (##sys#signal-hook #:runtime-error "deadlock")
     550            (loop (##sys#remove-from-ready-queue))) ]
     551       [(eq? (##sys#slot nt 3) 'ready)
     552        (dbg "switching to " nt)
     553        (set! ##sys#current-thread nt)
     554        (##sys#setslot nt 3 'running)
     555        (##sys#restore-thread-state-buffer nt)
     556        (##core#inline "C_set_initial_timer_interrupt_period" (##sys#slot nt 9))
     557        ((##sys#slot nt 1))]
     558       [else (loop (##sys#remove-from-ready-queue))] ) ) ))
    165559
    166560(define (##sys#force-primordial)
    167561  (dbg "primordial thread forced due to interrupt")
    168562  (##sys#thread-unblock! ##sys#primordial-thread) )
    169 
    170 (define ##sys#ready-queue-head '())
    171 (define ##sys#ready-queue-tail '())
    172 
    173 (define (##sys#ready-queue) ##sys#ready-queue-head)
    174 
    175 (define (##sys#add-to-ready-queue thread)
    176   (##sys#setslot thread 3 'ready)
    177   (let ((new-pair (cons thread '())))
    178     (cond ((eq? '() ##sys#ready-queue-head)
    179            (set! ##sys#ready-queue-head new-pair))
    180           (else (set-cdr! ##sys#ready-queue-tail new-pair)) )
    181     (set! ##sys#ready-queue-tail new-pair) ) )
    182 
    183 (define (##sys#remove-from-ready-queue)
    184   (let ((first-pair ##sys#ready-queue-head))
    185     (and (not (null? first-pair))
    186          (let ((first-cdr (cdr first-pair)))
    187            (set! ##sys#ready-queue-head first-cdr)
    188            (when (eq? '() first-cdr) (set! ##sys#ready-queue-tail '()))
    189            (car first-pair) ) ) ) )
    190563
    191564(define (##sys#update-thread-state-buffer thread)
     
    216589      (oldhook reason state) ) ) )
    217590
    218 (define ##sys#timeout-list '())
    219 
    220591(define (##sys#remove-from-timeout-list t)
    221   (let loop ((l ##sys#timeout-list) (prev #f))
    222     (if (null? l)
    223         l
    224         (let ((h (##sys#slot l 0))
    225               (r (##sys#slot l 1)))
    226           (if (eq? (##sys#slot h 1) t)
    227               (if prev
    228                   (set-cdr! prev r)
    229                   (set! ##sys#timeout-list r))
    230               (loop r l))))))
     592  (let ((entry (##sys#slot t 4)))
     593    (when entry
     594      (timeout-queue-remove-entry! entry)
     595      (##sys#setislot t 4 #f))))
    231596
    232597(define (##sys#thread-block-for-timeout! t tm)
    233598  (dbg t " blocks for " tm)
    234   ;; This should really use a balanced tree:
    235   (let loop ([tl ##sys#timeout-list] [prev #f])
    236     (if (or (null? tl) (< tm (caar tl)))
    237         (if prev
    238             (set-cdr! prev (cons (cons tm t) tl))
    239             (set! ##sys#timeout-list (cons (cons tm t) tl)) )
    240         (loop (cdr tl) tl) ) )
     599  ;; It wouldn't hurt if the thread structure where prepared to be
     600  ;; moved between thread queues, however that too much of a change at
     601  ;; once.
     602  (let ((ton (make-queue-entry tm t)))
     603    (##sys#setslot t 4 ton)
     604    (timeout-queue-insert-entry! ton))
    241605  (##sys#setslot t 3 'blocked)
    242   (##sys#setislot t 13 #f)
    243   (##sys#setislot t 4 tm) )
     606  (##sys#setislot t 13 #f) )
     607
     608(define (##sys#unblock-threads-for-timeout!)
     609  (let ([now (##sys#fudge 16)])
     610    (dbg "timeout (" now ") list: " (##sys#timeout-list-empty?))
     611    (let loop ()
     612      (unless (##sys#timeout-list-empty?)
     613        (let* ((entry (timeout-queue-next))
     614               (tmo (int-priority-queue-index entry)))
     615          (dbg "  " now " -> " tmo)
     616          (if (>= now tmo)
     617              (let ((tto (int-priority-queue-value entry)))
     618                (if (not (eq? (##sys#slot tto 4) entry))
     619                    (print "(not (eq? (##sys#slot " (##sys#slot tto 4) " 4) " entry ")) "))
     620                (timeout-queue-unqueue!)
     621                (##sys#setislot tto 4 #f)
     622                (##sys#setislot tto 13 #t) ; mark as being unblocked by timeout
     623                (##sys#clear-i/o-state-for-thread! tto)
     624                ;;(pp `(CLEARED: ,tto ,@##sys#fd-list) ##sys#standard-error) ;***
     625                (##sys#thread-basic-unblock! tto)
     626                (loop) ) ))))
     627    ;; If there are no threads blocking on a select call (fd-list) but
     628    ;; there are threads in the timeout list then sleep for the number
     629    ;; of milliseconds of next thread to wake up and return #t if
     630    ;; interupted.
     631    (and (##sys#ready-queue-empty?)
     632         (##sys#waiting-queue-empty?)
     633         (##sys#fd-list-empty?)
     634         (not (##sys#timeout-list-empty?))
     635         (let ([tmo (int-priority-queue-index (timeout-queue-next))])
     636           (and (not (##core#inline "C_msleep" (fxmax 0 (- tmo now))))
     637                (foreign-value "C_signal_interrupted_p" bool) ) ) )))
    244638
    245639(define (##sys#thread-block-for-termination! t t2)
     
    256650  (##sys#abandon-mutexes t)
    257651  (##sys#setslot t 3 s)
    258   (##sys#setislot t 4 #f)
    259652  (##sys#setislot t 11 #f)
    260653  (##sys#setislot t 8 '())
     
    273666  (dbg "unblocking: " t)
    274667  (##sys#setislot t 11 #f)
    275   (##sys#setislot t 4 #f)
     668  (if (##sys#slot t 4)
     669      (begin
     670        (dbg "##sys#thread-basic-unblock! timout slot is still set!")
     671        (##sys#setislot t 4 #f)))
    276672  (##sys#add-to-ready-queue t) )
    277673
     
    308704;;; `select()'-based blocking:
    309705
    310 (define ##sys#fd-list '())
     706(define (##sys#empty-fd-list!) (set! ##sys#fd-list (make-int-priority-queue)))
     707
     708(define (##sys#fd-list-add-thread! fd t)
     709  (let ((entry (int-priority-queue-lookup ##sys#fd-list fd)))
     710    (if entry
     711        (if (not (memq t (int-priority-queue-value entry)))
     712            (int-priority-queue-value-set! entry (cons t (int-priority-queue-value entry))))
     713        (int-priority-queue-node-insert! ##sys#fd-list (make-queue-entry fd (list t))))))
    311714
    312715(define ##sys#fdset-select-timeout
     
    343746(define (##sys#thread-block-for-i/o! t fd i/o)
    344747  (dbg t " blocks for I/O " fd)
    345   (let loop ([lst ##sys#fd-list])
    346     (if (null? lst)
    347         (set! ##sys#fd-list (cons (list fd t) ##sys#fd-list))
    348         (let ([a (car lst)])
    349           (if (fx= fd (car a))
    350               (##sys#setslot a 1 (cons t (cdr a)))
    351               (loop (cdr lst)) ) ) ) )
     748  (##sys#fd-list-add-thread! fd t)
    352749  (case i/o
    353750    ((#t #:input) (##sys#fdset-input-set fd))
     
    360757  (##sys#setslot t 11 (cons fd i/o)) )
    361758
     759(define-foreign-variable error-bad-file int "(errno == EBADF)")
     760
    362761(define (##sys#unblock-threads-for-i/o)
    363   (dbg "fd-list: " ##sys#fd-list)
    364   (let* ([to? (pair? ##sys#timeout-list)]
    365          [rq? (pair? ##sys#ready-queue-head)]
     762  (dbg "fd-list: " (int-priority-queue-node-fold
     763                    (lambda (n i) (cons (cons (int-priority-queue-index n) (int-priority-queue-value n)) i))
     764                    '()
     765                    ##sys#fd-list))
     766  (let* ([to? (not (##sys#timeout-list-empty?))]
     767         [rq? (not (and (##sys#ready-queue-empty?) (##sys#waiting-queue-empty?)))]
     768(nix (dbg "to? " to? " rq? " rq? " re " ##sys#ready-queue-head " w " (##sys#waiting-queue)))
    366769         [n (##sys#fdset-select-timeout ; we use FD_SETSIZE, but really should use max fd
    367770             (or rq? to?)
    368771             (if (and to? (not rq?))    ; no thread was unblocked by timeout, so wait
    369                  (let* ([tmo1 (caar ##sys#timeout-list)]
     772                 (let* ([tmo (int-priority-queue-index (timeout-queue-next))]
    370773                        [now (##sys#fudge 16)])
    371                    (fxmax 0 (- tmo1 now)) )
     774                   (fxmax 0 (- tmo now)))
    372775                 0) ) ] )               ; otherwise immediate timeout.
    373776    (dbg n " fds ready")
    374     (cond [(eq? -1 n)
    375            (##sys#force-primordial)]
     777    (cond [(eq? n 0) (##sys#fdset-restore)]
     778          [(eq? -1 n)
     779           (cond
     780            (error-bad-file
     781             (let ((node (##sys#call-with-current-continuation
     782                          (lambda (exit)
     783                            (int-priority-queue-node-for-each
     784                             (lambda (node)
     785                               (define fd (int-priority-queue-index node))
     786                               (dbg "check bad " fd)
     787                               (if ((foreign-lambda*
     788                                     bool ((integer fd))
     789                                     "struct stat buf;"
     790                                     "int i = ( (fstat(fd, &buf) == -1 && errno == EBADF) ? 1 : 0);"
     791                                     "return(i);")
     792                                    fd)
     793                                   (exit node)))
     794                             ##sys#fd-list)
     795                            (exit #f)))))
     796               (if node
     797                   (let ((fd (int-priority-queue-index node))
     798                         (ts (int-priority-queue-value node)))
     799                     (dbg "bad is " fd)
     800                     (##sys#fdset-clear fd)
     801                     (fd-list-remove-entry! node)
     802                     (for-each
     803                      #;(lambda (thread)
     804                        (thread-signal!
     805                         thread
     806                         (##sys#make-structure
     807                          'condition
     808                          '(exn i/o) ;; better? '(exn i/o net)
     809                          (list '(exn . message) "bad file descriptor"
     810                                '(exn . arguments) (list fd)
     811                                '(exn . location) thread) )))
     812                      (lambda (t)
     813                        (let* ((p (##sys#slot t 11)) )
     814                          (when (and (pair? p)
     815                                     (eq? fd (car p))
     816                                     (not (##sys#slot t 13) ) ) ; not unblocked by timeout
     817                                (##sys#thread-basic-unblock! t) ) ))
     818                      ts))))
     819             (##sys#fdset-restore)
     820             (##sys#unblock-threads-for-i/o))
     821            (else (##sys#force-primordial))) ]
    376822          [(fx> n 0)
    377            (set! ##sys#fd-list
    378              (let loop ([n n] [lst ##sys#fd-list])
    379                (if (or (zero? n) (null? lst))
    380                    lst
    381                    (let* ([a (car lst)]
    382                           [fd (car a)]
    383                           [inf (##core#inline "C_fd_test_input" fd)]
    384                           [outf (##core#inline "C_fd_test_output" fd)] )
    385                      (dbg "fd " fd " ready: input=" inf ", output=" outf)
    386                      (if (or inf outf)
    387                          (let loop2 ([threads (cdr a)])
    388                            (if (null? threads)
    389                                (begin
    390                                  (##sys#fdset-clear fd)
    391                                  (loop (sub1 n) (cdr lst)) )
    392                                (let* ([t (car threads)]
    393                                       [p (##sys#slot t 11)] )
    394                                  (when (and (pair? p)
    395                                             (eq? fd (car p))
    396                                             (not (##sys#slot t 13) ) ) ; not unblocked by timeout
    397                                    (##sys#thread-basic-unblock! t) )
    398                                  (loop2 (cdr threads)) ) ) )
    399                          (cons a (loop n (cdr lst))) ) ) ) ) ) ] )
    400     (##sys#fdset-restore) ) )
    401 
     823           (for-each
     824            (lambda (e) (fd-list-remove-entry! e))
     825            (##sys#call-with-current-continuation
     826             (lambda (exit)
     827               (int-priority-queue-node-fold
     828                (lambda (node init)
     829                  (define fd (int-priority-queue-index node))
     830                  (define threads (int-priority-queue-value node))
     831                  (if (zero? n) (exit init)
     832                      (let* ([inf (##core#inline "C_fd_test_input" fd)]
     833                             [outf (##core#inline "C_fd_test_output" fd)] )
     834                        (dbg "fd " fd " ready: input=" inf ", output=" outf)
     835                        (if (or inf outf)
     836                            (begin
     837                              (for-each
     838                               (lambda (t)
     839                                 (let* ((p (##sys#slot t 11)) )
     840                                   (when (and (pair? p)
     841                                              (eq? fd (car p))
     842                                              (not (##sys#slot t 13) ) ) ; not unblocked by timeout
     843                                         (##sys#thread-basic-unblock! t) ) ))
     844                               threads)
     845                              (##sys#fdset-clear fd)
     846                              (set! n (sub1 n))
     847                              (cons node init))
     848                            init))))
     849                '()
     850                ##sys#fd-list))))
     851           (##sys#fdset-restore) ] ) ) )
    402852
    403853;;; Clear I/O state for unblocked thread
     
    405855(define (##sys#clear-i/o-state-for-thread! t)
    406856  (when (pair? (##sys#slot t 11))
    407     (let ((fd (##sys#slot (##sys#slot t 11) 0)))
    408       (set! ##sys#fd-list
    409         (let loop ([lst ##sys#fd-list])
    410           (if (null? lst)
    411               '()
    412               (let* ([a (##sys#slot lst 0)]
    413                      [fd2 (##sys#slot a 0)] )
    414                 (if (eq? fd fd2)
    415                     (let ((ts (##sys#delq t (##sys#slot a 1)))) ; remove from fd-list entry
    416                       (cond ((null? ts)
    417                              ;;(pp `(CLEAR FD: ,fd ,t) ##sys#standard-error)
    418                              (##sys#fdset-clear fd) ; no more threads waiting for this fd
    419                              (##sys#fdset-restore)
    420                              (##sys#slot lst 1) )
    421                             (else
    422                              (##sys#setslot a 1 ts) ; fd-list entry is list with t removed
    423                              lst) ) )
    424                     (cons a (loop (##sys#slot lst 1)))))))))))
     857    (let* ((fd (##sys#slot (##sys#slot t 11) 0))
     858           (entry (int-priority-queue-lookup ##sys#fd-list fd)))
     859      (when entry
     860        (let ((ts (##sys#delq t (int-priority-queue-value entry)))) ; remove from fd-list entry
     861          (cond ((null? ts)
     862                 ;;(pp `(CLEAR FD: ,fd ,t) ##sys#standard-error)
     863                 (##sys#fdset-clear fd) ; no more threads waiting for this fd
     864                 (##sys#fdset-restore)
     865                 (fd-list-remove-entry! entry))
     866                (else
     867                 (int-priority-queue-value-set! entry ts)) ) ))))) ; fd-list entry is list with t removed
    425868
    426869
     
    429872; (contributed by Joerg Wittenberger)
    430873
    431 (define (##sys#all-threads #!optional
    432                            (cns (lambda (queue arg val init)
    433                                   (cons val init)))
    434                            (init '()))
    435   (let loop ((l ##sys#ready-queue-head) (i init))
    436     (if (pair? l)
    437         (loop (cdr l) (cns 'ready #f (car l) i))
    438         (let loop ((l ##sys#fd-list) (i i))
    439           (if (pair? l)
    440               (loop (cdr l)
    441                     (let ((fd (caar l)))
    442                       (let loop ((l (cdar l)))
    443                         (if (null? l) i
    444                             (cns 'i/o fd (car l) (loop (cdr l)))))))
    445               (let loop ((l ##sys#timeout-list) (i i))
    446                 (if (pair? l)
    447                     (loop (cdr l) (cns 'timeout (caar l) (cdar l) i))
    448                     i)))))))
    449 
     874(cond-expand
     875 (rbtree
     876  (define (##sys#all-threads #!optional
     877                             (cns (lambda (queue arg val init)
     878                                    (cons val init)))
     879                             (init '()))
     880    (int-priority-queue-node-fold
     881     (lambda (n i)
     882       (fold (lambda (t i) (cns 'i/o (int-priority-queue-index n) t i))
     883             i (int-priority-queue-value n)))
     884     (int-priority-queue-node-fold
     885      (lambda (n i)
     886        (cns 'timeout (int-priority-queue-index n) (int-priority-queue-value n) i))
     887      (queue-fold
     888       (lambda (t i) (cns 'ready #f t i))
     889       (queue-fold
     890        (lambda (t i) (cns 'waiting #f t i))
     891        init
     892        ##sys#waiting-queue-head)
     893       ##sys#ready-queue-head)
     894      ##sys#timeout-list)
     895     ##sys#fd-list)))
     896 (else
     897  (define (##sys#all-threads #!optional
     898                             (cns (lambda (queue arg val init)
     899                                    (cons val init)))
     900                             (init '()))
     901
     902    (int-priority-queue-node-fold
     903     (lambda (n i)
     904       (fold (lambda (t i) (cns 'i/o (int-priority-queue-index n) t i))
     905             i (int-priority-queue-value n)))
     906     (let ((r (int-priority-queue-node-fold
     907               (lambda (n i)
     908                 (cns 'timeout (int-priority-queue-index n) (int-priority-queue-value n) i))
     909               (queue-fold
     910                (lambda (t i) (cns 'ready #f t i))
     911                (queue-fold
     912                 (lambda (t i) (cns 'waiting #f t i))
     913                 init
     914                 ##sys#waiting-queue-head)
     915                ##sys#ready-queue-head)
     916               ##sys#timeout-list)))
     917       (let ((n ##sys#timeout-list-head))
     918         (if n
     919             (cns
     920              'timeout (int-priority-queue-index n) (int-priority-queue-value n) r)
     921             r)))
     922     ##sys#fd-list))))
    450923
    451924;;; Remove all waiting threads from the relevant queues with the exception of the current thread:
    452 
     925#|
    453926(define (##sys#fetch-and-clear-threads)
    454927  (let ([all (vector ##sys#ready-queue-head ##sys#ready-queue-tail ##sys#fd-list ##sys#timeout-list)])
    455928    (set! ##sys#ready-queue-head '())
    456929    (set! ##sys#ready-queue-tail '())
    457     (set! ##sys#fd-list '())
     930    (##sys#empty-fd-list!)
    458931    (set! ##sys#timeout-list '())
    459932    all) )
     
    467940  (set! ##sys#fd-list (##sys#slot vec 2))
    468941  (set! ##sys#timeout-list (##sys#slot vec 3)) )
    469 
     942|#
    470943
    471944;;; Unblock thread cleanly:
     
    474947  (when (eq? 'blocked (##sys#slot t 3))
    475948    (##sys#remove-from-timeout-list t)
    476     (set! ##sys#fd-list
    477       (let loop ([fdl ##sys#fd-list])
    478         (if (null? fdl)
    479             '()
    480             (let ([a (##sys#slot fdl 0)])
    481               (cons
    482                (cons (##sys#slot a 0)
    483                      (##sys#delq t (##sys#slot a 1)) )
    484                (loop (##sys#slot fdl 1)) ) ) ) ) )
     949    (##sys#clear-i/o-state-for-thread! t)
    485950    (##sys#setislot t 12 '())
    486951    (##sys#thread-basic-unblock! t) ) )
    487 
    488952
    489953;;; Multithreaded breakpoints
Note: See TracChangeset for help on using the changeset viewer.