concurrency.lisp (5872B)
1 ;;; Copyright (C) 2013, 2014 Tomas Hlavaty <tom@logand.com> 2 ;;; 3 ;;; Permission is hereby granted, free of charge, to any person 4 ;;; obtaining a copy of this software and associated documentation 5 ;;; files (the "Software"), to deal in the Software without 6 ;;; restriction, including without limitation the rights to use, copy, 7 ;;; modify, merge, publish, distribute, sublicense, and/or sell copies 8 ;;; of the Software, and to permit persons to whom the Software is 9 ;;; furnished to do so, subject to the following conditions: 10 ;;; 11 ;;; The above copyright notice and this permission notice shall be 12 ;;; included in all copies or substantial portions of the Software. 13 ;;; 14 ;;; THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 15 ;;; EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 16 ;;; MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 17 ;;; NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT 18 ;;; HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, 19 ;;; WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 20 ;;; OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER 21 ;;; DEALINGS IN THE SOFTWARE. 22 23 (defpackage :rw.concurrency 24 (:use :cl) 25 (:export :make-concurrent-queue 26 :make-lock 27 :make-program-server 28 :make-semaphore 29 :make-thread 30 :signal-semaphore 31 :threads-supported-p 32 :using-lock 33 :wait-on-semaphore)) 34 35 (in-package :rw.concurrency) 36 37 (defun using-lock (lock thunk) 38 #-(or allegro ccl ecl mkcl cmucl sbcl) 39 (error "RW.CONCURRENCY:USING-LOCK not ported") 40 #+allegro (mp:with-process-lock (lock) (funcall thunk)) 41 #+ccl (ccl:with-lock-grabbed (lock) (funcall thunk)) 42 #+ecl (mp:with-lock (lock) (funcall thunk)) 43 #+mkcl (mt:with-lock (lock) (funcall thunk)) 44 #+cmucl (mp:with-lock-held (lock) (funcall thunk)) 45 #+sbcl (sb-concurrency::with-mutex (lock) (funcall thunk))) 46 47 (defun make-lock (name) 48 #-(or allegro ccl ecl mkcl cmucl sbcl) 49 (error "RW.CONCURRENCY:MAKE-LOCK not ported") 50 #+allegro (mp:make-process-lock :name name) 51 #+ccl (ccl:make-lock name) 52 #+ecl (mp:make-lock :name name) 53 #+mkcl (mt:make-lock :name name) 54 #+cmucl (mp:make-lock name :kind :error-check) 55 #+sbcl (sb-concurrency::make-mutex :name (string name))) 56 57 (defun make-semaphore () 58 #-(or allegro ccl ecl mkcl sbcl) 59 (error "RW.CONCURRENCY:MAKE-SEMAPHORE not ported") 60 #+allegro (mp:make-gate nil) 61 #+ccl (ccl:make-semaphore) 62 #+ecl (mp:make-semaphore) 63 #+mkcl (mt:make-semaphore) 64 #+sbcl (sb-concurrency::make-semaphore)) 65 66 (defun signal-semaphore (x) 67 #-(or allegro ccl ecl mkcl sbcl) 68 (error "RW.CONCURRENCY:SIGNAL-SEMAPHORE not ported") 69 #+allegro (mp:put-semaphore x) 70 #+ccl (ccl:signal-semaphore x) 71 #+ecl (mp:signal-semaphore x) 72 #+mkcl (mt:semaphore-signal x) 73 #+sbcl (sb-concurrency::signal-semaphore x)) 74 75 (defun wait-on-semaphore (x) 76 #-(or ccl ecl mkcl sbcl) 77 (error "RW.CONCURRENCY:WAIT-ON-SEMAPHORE not ported") 78 #+ccl (ccl:wait-on-semaphore x) 79 #+ecl (mp:wait-on-semaphore x) 80 #+mkcl (mt:semaphore-wait x) 81 #+sbcl (sb-concurrency::wait-on-semaphore x)) 82 83 (defun make-concurrent-queue () 84 (let ((x (cons nil nil)) 85 (l (make-lock 'concurrent-queue-lock)) 86 (s (make-semaphore))) 87 (setf (car x) x) 88 (lambda (&optional (value nil valuep)) 89 (if valuep 90 (let ((y (cons value nil))) 91 (using-lock l 92 (lambda () 93 (setf (cdar x) y 94 (car x) y) 95 (signal-semaphore s))) 96 value) 97 (do (done z) 98 (done z) 99 (wait-on-semaphore s) 100 (using-lock l 101 (lambda () 102 (unless (eq x (car x)) 103 (setq done t 104 z (pop (cdr x))) 105 (unless (cdr x) 106 (setf (car x) x)))))))))) 107 108 ;; (setq q (make-concurrent-queue)) 109 ;; (funcall q 1) 110 ;; (funcall q 2) 111 ;; (funcall q 3) 112 ;; (funcall q) 113 114 (defun threads-supported-p () 115 #+(or allegro ccl ecl mkcl cmucl sb-thread (and clisp mt)) 116 t) 117 118 (defun make-thread (name thunk) 119 #-(or allegro ccl ecl mkcl cmucl sbcl (and clisp mt)) 120 (error "RW.CONCURRENCY:MAKE-THREAD not ported") 121 #+allegro (mp:process-run-function name thunk) 122 #+ccl (ccl:process-run-function name thunk) 123 #+ecl (mp:process-run-function name thunk) 124 #+mkcl (mt:thread-run-function name thunk) 125 #+cmucl (mp:make-process thunk :name name) 126 #+sbcl (sb-concurrency::make-thread thunk :name (string name)) 127 #+(and clisp mt) (mt:make-thread thunk :name name)) 128 129 (defun make-program-server (command args writer reader) 130 (let ((p (rw.os:make-program :stream :stream command args nil)) 131 (wq (make-concurrent-queue))) 132 (make-thread 'program-server-writer 133 (let ((s (funcall p :input-stream))) 134 (lambda () 135 (do (x) 136 ((not (setq x (funcall wq))) 137 (close s)) 138 (funcall writer x s))))) 139 (let ((l (make-lock 'program-server-lock)) 140 (s (funcall p :output-stream))) 141 (lambda (query) 142 (using-lock l 143 (lambda () 144 (when wq 145 (cond 146 (query 147 (funcall wq query) 148 (funcall reader s)) 149 (t 150 (funcall wq nil) 151 (setq wq nil) 152 (funcall p :wait) 153 (multiple-value-bind (status code) 154 (funcall p :status-and-code) 155 (assert (eq :exited status)) 156 (assert (zerop code))))))))))))