From 28477ca43d9e3d852a7749729df7dad2ffee1976 Mon Sep 17 00:00:00 2001 From: "Scott L. Burson" Date: Sat, 8 Jun 2024 12:23:56 -0700 Subject: [PATCH] Performance improvements for tuples. --- Code/testing.lisp | 18 ++--- Code/tuples.lisp | 191 ++++++++++++++++++++++++++++----------------- Code/wb-trees.lisp | 46 +++++------ 3 files changed, 154 insertions(+), 101 deletions(-) diff --git a/Code/testing.lisp b/Code/testing.lisp index 6137f51..214efcc 100644 --- a/Code/testing.lisp +++ b/Code/testing.lisp @@ -20,14 +20,8 @@ (def-tuple-key +K2+) (def-tuple-key +K3+) (def-tuple-key +K4+) -(def-tuple-key +K5+) -(def-tuple-key +K6+) -(def-tuple-key +K7+) -(def-tuple-key +K8+) -(def-tuple-key +K9+) - (defclass My-Unhandled-Obj () ((value :initarg :value :initform nil :accessor My-Unhandled-Obj-Value)) @@ -2822,7 +2816,9 @@ (error "Find failed on iteration ~D" i))))) -(deflex Tuple-Keys (vector +K0+ +K1+ +K2+ +K3+ +K4+ +K5+ +K6+ +K7+ +K8+ +K9+)) +(deflex Tuple-Keys (gmap (:result vector :length 40) + #'get-tuple-key + (:arg index 0 40))) (defun Test-Tuple-Operations (i) (let ((tup (tuple)) @@ -2830,14 +2826,18 @@ (nkeys (length Tuple-Keys))) (dotimes (j 100) (let ((key (svref Tuple-Keys (random nkeys))) - (val (Make-My-Integer (random 8)))) + (val (Make-My-Integer (random 8))) + (prev-m m) + (prev-tup tup)) (setq tup (with tup key val)) (setq m (with m key val)) (unless (equal? m (convert 'map tup)) (error "Tuple `with' failed on iteration ~D" i)) (do-map (k v m) (unless (equal? v (lookup tup k)) - (error "Tuple `lookup' failed on iteration ~D" i))))))) + (error "Tuple `lookup' failed on iteration ~D" i))) + (unless (eq (compare prev-tup tup) (compare prev-m m)) + (error "Tuple `compare' failed on iteration ~D" i)))))) ;;; ================================================================================ diff --git a/Code/tuples.lisp b/Code/tuples.lisp index 6e42c54..30cc7ae 100644 --- a/Code/tuples.lisp +++ b/Code/tuples.lisp @@ -173,7 +173,7 @@ the tuple." (t ':equal)))) -(defconstant Tuple-Value-Chunk-Bits 3) +(defconstant Tuple-Value-Chunk-Bits 4) ;;; Each value chunk (vector) contains no more than this many elements. (defconstant Tuple-Value-Chunk-Size (ash 1 Tuple-Value-Chunk-Bits)) @@ -212,11 +212,12 @@ the tuple." (deflex +Tuple-Descriptor-Map+ (empty-map)) (defmethod compare ((x Tuple-Desc) (y Tuple-Desc)) - (let ((xser (Tuple-Desc-Serial-Number x)) - (yser (Tuple-Desc-Serial-Number y))) - (cond ((< xser yser) ':less) - ((> xser yser) ':greater) - (t ':equal)))) + (if (eq x y) ':equal + (let ((xser (Tuple-Desc-Serial-Number x)) + (yser (Tuple-Desc-Serial-Number y))) + (cond ((< xser yser) ':less) + ((> xser yser) ':greater) + (t ':equal))))) ;;; Urgh, Allegro 6 doesn't obey inline declarations, so we use a macro for this. ;;; This takes its argument doubled for the convenience of the common case (lookup). @@ -264,14 +265,26 @@ don't worry about locking it, either.") (defconstant Tuple-Reorder-Frequency 31 "Controls how often we do tuple reordering. Must be 2^n - 1 for some n.") +(defmacro Tuple-Contents-Ref (size contents idx) + (let ((contents-var (gensym "CONTENTS-")) + (idx-var (gensym "IDX-"))) + `(let ((,contents-var ,contents) + (,idx-var ,idx)) + (declare (fixnum ,idx-var)) + (if (<= ,size Tuple-Value-Chunk-Size) + (svref ,contents-var ,idx-var) + (svref (svref ,contents-var (ash ,idx-var (- Tuple-Value-Chunk-Bits))) + (logand ,idx-var (1- Tuple-Value-Chunk-Size))))))) + (defun Tuple-Lookup (tuple key &optional no-reorder?) - ;(declare (optimize (speed 3) (safety 0))) + (declare (optimize (speed 3) (safety 0))) (let ((desc (dyn-tuple-descriptor tuple)) ((pairs (Tuple-Desc-Pairs desc)) ((nkeys*2 (length pairs)))) (key-num (if (typep key 'fixnum) key ; for internal use only (tuple-key-number key)))) - (declare (fixnum nkeys*2 key-num)) + (declare (fixnum nkeys*2 key-num) + (simple-vector pairs)) ;; As explained above, we don't have to lock here because there are two copies ;; of the pair sequence, only the first of which gets reordered. So if we ;; happen to miss the pair we're looking for in the first copy, because it is @@ -283,23 +296,22 @@ don't worry about locking it, either.") (when (= (logand pr Tuple-Key-Number-Mask) key-num) (let ((chunks (dyn-tuple-contents tuple)) - (val-idx (the fixnum (ash pr (- Tuple-Key-Number-Size))))) - (let ((val (svref (svref chunks (ash val-idx (- Tuple-Value-Chunk-Bits))) - (logand val-idx (1- Tuple-Value-Chunk-Size))))) + (val-idx (ash pr (- Tuple-Key-Number-Size)))) + (let ((val (Tuple-Contents-Ref (ash nkeys*2 -1) chunks val-idx))) (unless (or no-reorder? (< i (Tuple-Window-Size nkeys*2)) ;;; Reorder only on some fraction of the occasions ;;; we come through here. - (not (= 0 (logand (Tuple-Random-Value) - Tuple-Reorder-Frequency)))) + (not (= 0 (logand (Tuple-Random-Value) Tuple-Reorder-Frequency)))) (Tuple-Reorder-Keys tuple i)) (return-from Tuple-Lookup (values t val))))))) (values nil nil))) (defun Tuple-Reorder-Keys (tuple idx) - ;(declare (optimize (speed 3) (safety 0))) + (declare (optimize (speed 3) (safety 0))) (declare (fixnum idx)) (let ((desc (dyn-tuple-descriptor tuple)) ((pairs (Tuple-Desc-Pairs desc)))) + (declare (simple-vector pairs)) ;; Some implementations can't do `:wait? nil', but that's okay -- we'll just ;; do a little redundant work. (with-lock ((Tuple-Desc-Lock desc) :wait? nil) @@ -318,7 +330,7 @@ don't worry about locking it, either.") ;;; Someday: multiple key/value pair update. (defun Tuple-With (tuple key val) - ;(declare (optimize (speed 3) (safety 0))) + (declare (optimize (speed 3) (safety 0))) (let ((old-val? old-val (Tuple-Lookup tuple key))) (if old-val? (if (equal? val old-val) @@ -330,25 +342,32 @@ don't worry about locking it, either.") ((desc (dyn-tuple-descriptor tuple)) ((pairs (Tuple-Desc-Pairs desc)) ((nkeys*2 (length pairs)) - ((pr (dotimes (i nkeys*2 (assert nil)) + ((single-chunk? (<= (ash nkeys*2 -1) Tuple-Value-Chunk-Size)) + (pr (dotimes (i nkeys*2 (assert nil)) (let ((pr (svref pairs i))) + (declare (fixnum pr)) (when (= key-num (logand pr Tuple-Key-Number-Mask)) (return pr))))) ((val-idx (ash pr (- Tuple-Key-Number-Size))) ((ichunk (ash val-idx (- Tuple-Value-Chunk-Bits))) (val-idx (logand val-idx (1- Tuple-Value-Chunk-Size))) - ((chunk (svref contents ichunk)) + ((chunk (if single-chunk? contents + (svref contents ichunk))) ((new-chunk (make-array (length chunk)))))))))) (new-contents (make-array (length contents))))) + (declare (fixnum key-num) + (simple-vector pairs contents chunk)) (dotimes (i (length chunk)) (setf (svref new-chunk i) (svref chunk i))) (setf (svref new-chunk val-idx) val) - (dotimes (i (length contents)) - (setf (svref new-contents i) (svref contents i))) - (setf (svref new-contents ichunk) new-chunk) - (make-dyn-tuple desc new-contents))) + (if single-chunk? (make-dyn-tuple desc new-chunk) + (progn + (dotimes (i (length contents)) + (setf (svref new-contents i) (svref contents i))) + (setf (svref new-contents ichunk) new-chunk) + (make-dyn-tuple desc new-contents))))) (let ((old-desc (dyn-tuple-descriptor tuple))) - (unless (< (size (Tuple-Desc-Key-Set old-desc)) + (unless (< (the fixnum (size (Tuple-Desc-Key-Set old-desc))) (1- (ash 1 Tuple-Value-Index-Size))) (error "Tuple too long (limit ~D pairs in this implementation)." (ash 1 Tuple-Value-Index-Size))) @@ -365,6 +384,7 @@ don't worry about locking it, either.") ((nkeys (size new-key-set)) ((window-size (Tuple-Window-Size (* nkeys 2))))) (old-pairs (Tuple-Desc-Pairs old-desc))) + (declare (type (unsigned-byte #.Tuple-Value-Index-Size) nkeys)) (unless new-desc ;; Lock out reorderings while we do this. One might think we also need a ;; lock to protect `+Tuple-Descriptor-Map+', but actually it doesn't hurt @@ -375,21 +395,34 @@ don't worry about locking it, either.") (key-num (tuple-key-number key)) ;; The new value goes at the end. ((new-pr (logior key-num (ash (1- nkeys) Tuple-Key-Number-Size))))) - (flet ((add-pair (i pr) - (setf (svref new-pairs i) pr) - (setf (svref new-pairs (+ i nkeys)) pr))) - (setq new-desc (Make-Tuple-Desc new-key-set new-pairs)) - ;; The new pair goes just outside the window, if the window is full. - (dotimes (i (min (1- nkeys) window-size)) - (add-pair i (svref old-pairs i))) - (if (<= nkeys window-size) - (add-pair (1- nkeys) new-pr) - (progn - (add-pair window-size new-pr) - (dotimes (i (- nkeys window-size 1)) - (add-pair (+ i window-size 1) - (svref old-pairs (+ i window-size))))))))) - ;(setf (lookup +Tuple-Descriptor-Map+ new-key-set) new-desc) + (declare (fixnum key-num)) + (setq new-desc (Make-Tuple-Desc new-key-set new-pairs)) + ;; The new pair goes just outside the window, if the window is full. + (dotimes (i (min (1- nkeys) window-size)) + (setf (svref new-pairs i) (svref old-pairs i))) + (if (<= nkeys window-size) + (setf (svref new-pairs (1- nkeys)) new-pr) + (progn + (setf (svref new-pairs window-size) new-pr) + (dotimes (i (- nkeys window-size 1)) + (setf (svref new-pairs (+ i window-size 1)) + (svref old-pairs (+ i window-size)))))) + ;; We go to some trouble here to keep the second copy sorted in key order. + ;; This makes tuple comparison much faster. + (do* ((i (1- nkeys) (1+ i)) ; index into `old-pairs' + (pr 0) + (lim (* 2 (1- nkeys)))) + ((or (= i lim) + (progn + (setq pr (svref old-pairs i)) + (> (logand pr Tuple-Key-Number-Mask) key-num))) + (setf (svref new-pairs (1+ i)) new-pr) + (do ((j i (1+ j))) + ((= j lim)) + (declare (fixnum j)) + (setf (svref new-pairs (+ j 2)) (svref old-pairs j)))) + (declare (fixnum i pr lim)) + (setf (svref new-pairs (1+ i)) pr)))) ;; Technically, we need a memory barrier to make sure the new map value ;; is fully constructed before being made available to other threads. (setq +Tuple-Descriptor-Map+ @@ -398,26 +431,28 @@ don't worry about locking it, either.") (write-memory-barrier))) (setf (lookup (Tuple-Desc-Next-Desc-Map old-desc) key) new-desc)) (let ((reorder-map (Tuple-Get-Reorder-Map old-desc new-desc)) - (old-chunks (dyn-tuple-contents tuple)) - (new-chunks (make-array (ceiling nkeys Tuple-Value-Chunk-Size)))) + (old-chunks (dyn-tuple-contents tuple))) (do ((i 0 (1+ i)) (n nkeys (- n Tuple-Value-Chunk-Size)) - (reorder-map reorder-map (cdr reorder-map))) - ((= i (length new-chunks)) - (make-dyn-tuple new-desc new-chunks)) + (reorder-map reorder-map (cdr reorder-map)) + (new-chunks nil)) + ((<= n 0) + (if (cdr new-chunks) + (make-dyn-tuple new-desc (coerce (nreverse new-chunks) 'vector)) + (make-dyn-tuple new-desc (car new-chunks)))) + (declare (fixnum i n)) (if (null (car reorder-map)) - (setf (svref new-chunks i) (svref old-chunks i)) + (push (if (<= (1- nkeys) Tuple-Value-Chunk-Size) old-chunks + (svref old-chunks i)) + new-chunks) (let ((chunk-len (min n Tuple-Value-Chunk-Size)) ((new-chunk (make-array chunk-len)))) (dotimes (i chunk-len) (let ((old-idx (svref (car reorder-map) i))) (setf (svref new-chunk i) - (if old-idx - (svref (svref old-chunks (ash old-idx - (- Tuple-Value-Chunk-Bits))) - (logand old-idx (1- Tuple-Value-Chunk-Size))) + (if old-idx (Tuple-Contents-Ref (1- nkeys) old-chunks old-idx) val)))) - (setf (svref new-chunks i) new-chunk)))))))))) + (push new-chunk new-chunks)))))))))) (defun Tuple-Get-Reorder-Map (old-desc new-desc) ;; Again, we don't bother with locking -- it doesn't hurt if we occasionally @@ -475,6 +510,7 @@ When done, returns `value'." (let ((tuple-var (gensym "TUPLE-")) (desc-var (gensym "DESC-")) (contents-var (gensym "CONTENTS-")) + (size-var (gensym "SIZE-")) (pairs-var (gensym "PAIRS-")) (idx-var (gensym "IDX-")) (pr-var (gensym "PR-")) @@ -482,18 +518,17 @@ When done, returns `value'." `(let ((,tuple-var ,tuple-form)) (let ((,contents-var (dyn-tuple-contents ,tuple-var)) (,desc-var (dyn-tuple-descriptor ,tuple-var)) - ((,pairs-var (Tuple-Desc-Pairs ,desc-var)))) - (dotimes (,idx-var (the fixnum (size (Tuple-Desc-Key-Set ,desc-var)))) + ((,size-var (size (Tuple-Desc-Key-Set ,desc-var))) + (,pairs-var (Tuple-Desc-Pairs ,desc-var)))) + (declare (fixnum ,size-var)) + (dotimes (,idx-var ,size-var) (declare (fixnum ,idx-var)) - (let ((,pr-var (the fixnum (svref ,pairs-var ,idx-var))) + ;; We use the second copy of the pairs, which won't change underneath us. + (let ((,pr-var (the fixnum (svref ,pairs-var (+ ,idx-var ,size-var)))) ((,val-idx-var (ash ,pr-var (- Tuple-Key-Number-Size))) ((,key-var (lookup +Tuple-Key-Seq+ (logand ,pr-var Tuple-Key-Number-Mask))) - (,value-var (svref (svref ,contents-var - (ash ,val-idx-var - (- Tuple-Value-Chunk-Bits))) - (logand ,val-idx-var - (1- Tuple-Value-Chunk-Size))))))) + (,value-var (Tuple-Contents-Ref ,size-var ,contents-var ,val-idx-var))))) . ,body)) ,value-form)))) @@ -514,21 +549,34 @@ When done, returns `value'." (format stream " >"))) (defmethod compare ((tup1 tuple) (tup2 tuple)) - (let ((key-set-1 (Tuple-Desc-Key-Set (dyn-tuple-descriptor tup1))) - (key-set-2 (Tuple-Desc-Key-Set (dyn-tuple-descriptor tup2))) - ((res (compare key-set-1 key-set-2))) - (default ':equal)) - (if (not (eq res ':equal)) + ;; The ordering this imposes is not easily described, but is stable within a session. + ;; If the key sets are equal, the values are compared in key order, which is the order + ;; in which the keys were created in the session. If the key sets aren't equal, we just + ;; use the result of comparing them. + (let ((desc1 (dyn-tuple-descriptor tup1)) + (desc2 (dyn-tuple-descriptor tup2)) + ((key-set-1 (Tuple-Desc-Key-Set desc1)) + (key-set-2 (Tuple-Desc-Key-Set desc2)) + ((res (if (eq key-set-1 key-set-2) ':equal + (compare key-set-1 key-set-2)))))) + (if (or (eq res ':less) (eq res ':greater)) res - (do-set (key key-set-1 default) - (let ((val1? val1 (Tuple-Lookup tup1 key t)) - (val2? val2 (Tuple-Lookup tup2 key t)) - ((res (compare val1 val2)))) - (declare (ignore val1? val2?)) - (when (or (eq res ':less) (eq res ':greater)) - (return res)) - (when (eq res ':unequal) - (setq default ':unequal))))))) + (let ((pairs (Tuple-Desc-Pairs desc1)) + (contents1 (dyn-tuple-contents tup1)) + (contents2 (dyn-tuple-contents tup2)) + (nkeys (size key-set-1)) + (default res)) ; note case where res is ':unequal + (declare (fixnum nkeys)) + (dotimes (i nkeys default) + ;; We use the second, sorted copy of the key-index pairs. + (let ((idx (ash (svref pairs (+ i nkeys)) (- Tuple-Key-Number-Size))) + ((val1 (Tuple-Contents-Ref nkeys contents1 idx)) + (val2 (Tuple-Contents-Ref nkeys contents2 idx)) + ((res (compare val1 val2))))) + (when (or (eq res ':less) (eq res ':greater)) + (return res)) + (when (eq res ':unequal) + (setq default ':unequal)))))))) (defmethod with ((tuple tuple) (key tuple-key) &optional (value nil value?)) @@ -541,6 +589,9 @@ When done, returns `value'." (let ((default-fn (tuple-key-default-fn key))) (values (and default-fn (funcall default-fn tuple)) nil))))) +(defmethod size ((tuple tuple)) + (size (Tuple-Desc-Key-Set (dyn-tuple-descriptor tuple)))) + (defgeneric tuple-merge (tuple1 tuple2 &optional val-fn) (:documentation "Returns a new tuple containing all the keys of `tuple1' and `tuple2', where the value for each key contained in only one tuple is the value from diff --git a/Code/wb-trees.lisp b/Code/wb-trees.lisp index faece26..3866892 100644 --- a/Code/wb-trees.lisp +++ b/Code/wb-trees.lisp @@ -707,12 +707,12 @@ this range." ;;; Comparison (defun WB-Set-Tree-Compare (tree1 tree2) - (let ((size1 (WB-Set-Tree-Size tree1)) - (size2 (WB-Set-Tree-Size tree2))) - (cond ((eq tree1 tree2) ':equal) - ((< size1 size2) ':less) - ((> size1 size2) ':greater) - (t (WB-Set-Tree-Compare-Rng tree1 0 tree2 0 0 size1))))) + (if (eq tree1 tree2) ':equal + (let ((size1 (WB-Set-Tree-Size tree1)) + (size2 (WB-Set-Tree-Size tree2))) + (cond ((< size1 size2) ':less) + ((> size1 size2) ':greater) + (t (WB-Set-Tree-Compare-Rng tree1 0 tree2 0 0 size1)))))) (defun WB-Set-Tree-Compare-Rng (tree1 base1 tree2 base2 lo hi) ;; This is similar to the other hedge algorithms, but there is a key difference: @@ -2401,12 +2401,12 @@ of `tree1' and `tree2' are in this range." ;;; Comparison (defun WB-Bag-Tree-Compare (tree1 tree2) - (let ((size1 (WB-Bag-Tree-Size tree1)) - (size2 (WB-Bag-Tree-Size tree2))) - (cond ((eq tree1 tree2) ':equal) - ((< size1 size2) ':less) - ((> size1 size2) ':greater) - (t (WB-Bag-Tree-Compare-Rng tree1 0 tree2 0 0 size1))))) + (if (eq tree1 tree2) ':equal + (let ((size1 (WB-Bag-Tree-Size tree1)) + (size2 (WB-Bag-Tree-Size tree2))) + (cond ((< size1 size2) ':less) + ((> size1 size2) ':greater) + (t (WB-Bag-Tree-Compare-Rng tree1 0 tree2 0 0 size1)))))) (defun WB-Bag-Tree-Compare-Rng (tree1 base1 tree2 base2 lo hi) ;; See notes at `WB-Set-Tree-Compare-Rng'. @@ -4308,11 +4308,12 @@ pair removed." ;;; Compare (defun WB-Map-Tree-Compare (tree1 tree2 &optional (val-fn #'compare)) - (let ((size1 (WB-Map-Tree-Size tree1)) - (size2 (WB-Map-Tree-Size tree2))) - (cond ((< size1 size2) ':less) - ((> size1 size2) ':greater) - (t (WB-Map-Tree-Compare-Rng tree1 0 tree2 0 0 size1 val-fn))))) + (if (eq tree1 tree2) ':equal + (let ((size1 (WB-Map-Tree-Size tree1)) + (size2 (WB-Map-Tree-Size tree2))) + (cond ((< size1 size2) ':less) + ((> size1 size2) ':greater) + (t (WB-Map-Tree-Compare-Rng tree1 0 tree2 0 0 size1 val-fn)))))) (defun WB-Map-Tree-Compare-Rng (tree1 base1 tree2 base2 lo hi val-fn) ;; See notes at `WB-Set-Tree-Compare-Rng'. @@ -5872,11 +5873,12 @@ the result, inserts `val', returning the new vector." ;;; Compare (defun WB-Seq-Tree-Compare (tree1 tree2) - (let ((size1 (WB-Seq-Tree-Size tree1)) - (size2 (WB-Seq-Tree-Size tree2))) - (cond ((< size1 size2) ':less) - ((> size1 size2) ':greater) - (t (WB-Seq-Tree-Compare-Rng tree1 0 tree2 0 0 size1))))) + (if (eq tree1 tree2) ':equal + (let ((size1 (WB-Seq-Tree-Size tree1)) + (size2 (WB-Seq-Tree-Size tree2))) + (cond ((< size1 size2) ':less) + ((> size1 size2) ':greater) + (t (WB-Seq-Tree-Compare-Rng tree1 0 tree2 0 0 size1)))))) (defun WB-Seq-Tree-Compare-Lexicographically (tree1 tree2) (let ((size1 (WB-Seq-Tree-Size tree1))