-
Notifications
You must be signed in to change notification settings - Fork 47
/
Copy pathQuery.hs
1938 lines (1729 loc) · 86 KB
/
Query.hs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
-- | Query and update documents
{-# LANGUAGE OverloadedStrings, RecordWildCards, NamedFieldPuns, TupleSections, FlexibleContexts, FlexibleInstances, UndecidableInstances, MultiParamTypeClasses, TypeFamilies, CPP, DeriveDataTypeable, ScopedTypeVariables, BangPatterns #-}
module Database.MongoDB.Query (
-- * Monad
Action, access, Failure(..), ErrorCode,
AccessMode(..), GetLastError, master, slaveOk, accessMode,
liftDB,
MongoContext(..), HasMongoContext(..),
-- * Database
Database, allDatabases, useDb, thisDatabase,
-- ** Authentication
Username, Password, auth, authMongoCR, authSCRAMSHA1, authSCRAMSHA256,
-- * Collection
Collection, allCollections,
-- ** Selection
Selection(..), Selector, whereJS,
Select(select),
-- * Write
-- ** Insert
insert, insert_, insertMany, insertMany_, insertAll, insertAll_,
-- ** Update
save, replace, repsert, upsert, Modifier, modify, updateMany, updateAll,
WriteResult(..), UpdateOption(..), Upserted(..),
-- ** Delete
delete, deleteOne, deleteMany, deleteAll, DeleteOption(..),
-- * Read
-- ** Query
Query(..), QueryOption(NoCursorTimeout, TailableCursor, AwaitData, Partial),
Projector, Limit, Order, BatchSize,
explain, find, findCommand, findOne, fetch,
findAndModify, findAndModifyOpts, FindAndModifyOpts(..), defFamUpdateOpts,
count, distinct,
-- *** Cursor
Cursor, nextBatch, next, nextN, rest, closeCursor, isCursorClosed,
-- ** Aggregate
Pipeline, AggregateConfig(..), aggregate, aggregateCursor,
-- ** Group
Group(..), GroupKey(..), group,
-- ** MapReduce
MapReduce(..), MapFun, ReduceFun, FinalizeFun, MROut(..), MRMerge(..),
MRResult, mapReduce, runMR, runMR',
-- * Command
Command, runCommand, runCommand1,
eval, retrieveServerData, ServerData(..)
) where
import qualified Control.Concurrent.MVar as MV
import Control.Concurrent.MVar.Lifted
( MVar,
readMVar,
)
import Control.Exception (Exception, catch, throwIO)
import Control.Monad
( liftM2,
replicateM,
unless,
void,
when,
)
import Control.Monad.Reader (MonadReader, ReaderT, ask, asks, local, runReaderT)
import Control.Monad.Trans (MonadIO, liftIO, lift)
import Control.Monad.Trans.Except
import qualified Crypto.Hash.MD5 as MD5
import qualified Crypto.Hash.SHA1 as SHA1
import qualified Crypto.Hash.SHA256 as SHA256
import qualified Crypto.MAC.HMAC as HMAC
import qualified Crypto.Nonce as Nonce
import Data.Binary.Put (runPut)
import Data.Bits (xor)
import Data.Bson
( Document,
Field (..),
Javascript,
Label,
ObjectId,
Val (..),
Value (..),
at,
genObjectId,
look,
lookup,
valueAt,
(!?),
(=:),
(=?),
merge,
cast
)
import Data.Bson.Binary (putDocument)
import qualified Data.ByteString as BS
import qualified Data.ByteString.Base16 as B16
import qualified Data.ByteString.Base64 as B64
import qualified Data.ByteString.Char8 as B
import qualified Data.ByteString.Lazy as LBS
import Data.Default.Class (Default (..))
import Data.Either (lefts, rights)
import qualified Data.Either as E
import Data.Functor ((<&>))
import Data.Int (Int32, Int64)
import Data.List (foldl1')
import qualified Data.Map as Map
import Data.Maybe (catMaybes, fromMaybe, isNothing, listToMaybe, mapMaybe, maybeToList, fromJust)
import Data.Text (Text)
import qualified Data.Text as T
import Data.Typeable (Typeable)
import Data.Word (Word32)
import Database.MongoDB.Internal.Protocol
( CursorId,
DeleteOption (..),
FullCollection,
InsertOption (..),
Notice (..),
Password,
Pipe,
QueryOption (..),
Reply (..),
Request
( GetMore,
qBatchSize,
qFullCollection,
qOptions,
qProjector,
qSelector,
qSkip
),
ResponseFlag (..),
ServerData (..),
UpdateOption (..),
Username,
Cmd (..),
pwKey,
FlagBit (..)
)
import Control.Monad.Trans.Except
import qualified Database.MongoDB.Internal.Protocol as P
import Database.MongoDB.Internal.Util (liftIOE, loop, true1, (<.>), splitDot)
import System.Mem.Weak (Weak)
import Text.Read (readMaybe)
import Prelude hiding (lookup)
-- * Monad
type Action = ReaderT MongoContext
-- ^ A monad on top of m (which must be a MonadIO) that may access the database and may fail with a DB 'Failure'
access :: (MonadIO m) => Pipe -> AccessMode -> Database -> Action m a -> m a
-- ^ Run action against database on server at other end of pipe. Use access mode for any reads and writes.
-- Throw 'Failure' in case of any error.
access mongoPipe mongoAccessMode mongoDatabase action = runReaderT action MongoContext{..}
-- | A connection failure, or a read or write exception like cursor expired or inserting a duplicate key.
-- Note, unexpected data from the server is not a Failure, rather it is a programming error (you should call 'error' in this case) because the client and server are incompatible and requires a programming change.
data Failure =
ConnectionFailure IOError -- ^ TCP connection ('Pipeline') failed. May work if you try again on the same Mongo 'Connection' which will create a new Pipe.
| CursorNotFoundFailure CursorId -- ^ Cursor expired because it wasn't accessed for over 10 minutes, or this cursor came from a different server that the one you are currently connected to (perhaps a fail over happen between servers in a replica set)
| QueryFailure ErrorCode String -- ^ Query failed for some reason as described in the string
| WriteFailure Int ErrorCode String -- ^ Error observed by getLastError after a write, error description is in string, index of failed document is the first argument
| WriteConcernFailure Int String -- ^ Write concern error. It's reported only by insert, update, delete commands. Not by wire protocol.
| DocNotFound Selection -- ^ 'fetch' found no document matching selection
| AggregateFailure String -- ^ 'aggregate' returned an error
| CompoundFailure [Failure] -- ^ When we need to aggregate several failures and report them.
| ProtocolFailure Int String -- ^ The structure of the returned documents doesn't match what we expected
deriving (Show, Eq, Typeable)
instance Exception Failure
type ErrorCode = Int
-- ^ Error code from @getLastError@ or query failure.
-- | Type of reads and writes to perform.
data AccessMode =
ReadStaleOk -- ^ Read-only action, reading stale data from a slave is OK.
| UnconfirmedWrites -- ^ Read-write action, slave not OK, every write is fire & forget.
| ConfirmWrites GetLastError -- ^ Read-write action, slave not OK, every write is confirmed with @getLastError@.
deriving Show
type GetLastError = Document
-- ^ Parameters for @getLastError@ command. For example @[\"w\" =: 2]@ tells the server to wait for the write to reach at least two servers in replica set before acknowledging. See <http://www.mongodb.org/display/DOCS/Last+Error+Commands> for more options.
class Result a where
isFailed :: a -> Bool
data WriteResult = WriteResult
{ failed :: Bool
, nMatched :: Int
, nModified :: Maybe Int
, nRemoved :: Int
-- ^ Mongodb server before 2.6 doesn't allow to calculate this value.
-- This field is meaningless if we can't calculate the number of modified documents.
, upserted :: [Upserted]
, writeErrors :: [Failure]
, writeConcernErrors :: [Failure]
} deriving Show
instance Result WriteResult where
isFailed = failed
instance Result (Either a b) where
isFailed (Left _) = True
isFailed _ = False
data Upserted = Upserted
{ upsertedIndex :: Int
, upsertedId :: ObjectId
} deriving Show
master :: AccessMode
-- ^ Same as 'ConfirmWrites' []
master = ConfirmWrites []
slaveOk :: AccessMode
-- ^ Same as 'ReadStaleOk'
slaveOk = ReadStaleOk
accessMode :: (Monad m) => AccessMode -> Action m a -> Action m a
-- ^ Run action with given 'AccessMode'
accessMode mode = local (\ctx -> ctx {mongoAccessMode = mode})
readMode :: AccessMode -> ReadMode
readMode ReadStaleOk = StaleOk
readMode _ = Fresh
writeMode :: AccessMode -> WriteMode
writeMode ReadStaleOk = Confirm []
writeMode UnconfirmedWrites = NoConfirm
writeMode (ConfirmWrites z) = Confirm z
-- | Values needed when executing a db operation
data MongoContext = MongoContext {
mongoPipe :: Pipe, -- ^ operations read/write to this pipelined TCP connection to a MongoDB server
mongoAccessMode :: AccessMode, -- ^ read/write operation will use this access mode
mongoDatabase :: Database -- ^ operations query/update this database
}
mongoReadMode :: MongoContext -> ReadMode
mongoReadMode = readMode . mongoAccessMode
mongoWriteMode :: MongoContext -> WriteMode
mongoWriteMode = writeMode . mongoAccessMode
class HasMongoContext env where
mongoContext :: env -> MongoContext
instance HasMongoContext MongoContext where
mongoContext = id
liftDB :: (MonadReader env m, HasMongoContext env, MonadIO m)
=> Action IO a
-> m a
liftDB m = do
env <- ask
liftIO $ runReaderT m (mongoContext env)
-- * Database
type Database = Text
allDatabases :: (MonadIO m) => Action m [Database]
-- ^ List all databases residing on server
allDatabases = map (at "name") . at "databases" <$> useDb "admin" (runCommand1 "listDatabases")
thisDatabase :: (Monad m) => Action m Database
-- ^ Current database in use
thisDatabase = asks mongoDatabase
useDb :: (Monad m) => Database -> Action m a -> Action m a
-- ^ Run action against given database
useDb db = local (\ctx -> ctx {mongoDatabase = db})
-- * Authentication
auth :: MonadIO m => Username -> Password -> Action m Bool
-- ^ Authenticate with the current database (if server is running in secure mode). Return whether authentication was successful or not. Reauthentication is required for every new pipe. SCRAM-SHA-1 will be used for server versions 3.0+, MONGO-CR for lower versions.
auth un pw = do
let serverVersion = fmap (at "version") $ useDb "admin" $ runCommand ["buildinfo" =: (1 :: Int)]
mmv <- readMaybe . T.unpack . head . T.splitOn "." <$> serverVersion
maybe (return False) performAuth mmv
where
performAuth majorVersion
| majorVersion >= (4 :: Int) = authSCRAMSHA256 un pw
| majorVersion >= 3 = authSCRAMSHA1 un pw
| otherwise = authMongoCR un pw
authMongoCR :: (MonadIO m) => Username -> Password -> Action m Bool
-- ^ Authenticate with the current database, using the MongoDB-CR authentication mechanism (default in MongoDB server < 3.0)
authMongoCR usr pss = do
n <- at "nonce" <$> runCommand ["getnonce" =: (1 :: Int)]
true1 "ok" <$> runCommand ["authenticate" =: (1 :: Int), "user" =: usr, "nonce" =: n, "key" =: pwKey n usr pss]
data HashAlgorithm = SHA1 | SHA256 deriving Show
hash :: HashAlgorithm -> B.ByteString -> B.ByteString
hash SHA1 = SHA1.hash
hash SHA256 = SHA256.hash
authSCRAMSHA1 :: MonadIO m => Username -> Password -> Action m Bool
authSCRAMSHA1 = authSCRAMWith SHA1
authSCRAMSHA256 :: MonadIO m => Username -> Password -> Action m Bool
authSCRAMSHA256 = authSCRAMWith SHA256
toAuthResult :: Functor m => ExceptT String (Action m) () -> Action m Bool
toAuthResult = fmap (either (const False) (const True)) . runExceptT
-- | It should technically perform SASLprep, but the implementation is currently id
saslprep :: Text -> Text
saslprep = id
authSCRAMWith :: MonadIO m => HashAlgorithm -> Username -> Password -> Action m Bool
-- ^ Authenticate with the current database, using the SCRAM-SHA-1 authentication mechanism (default in MongoDB server >= 3.0)
authSCRAMWith algo un pw = toAuthResult $ do
let hmac = HMAC.hmac (hash algo) 64
nonce <- liftIO (Nonce.withGenerator Nonce.nonce128 <&> B64.encode)
let firstBare = B.concat [B.pack $ "n=" ++ T.unpack un ++ ",r=", nonce]
let client1 =
[ "saslStart" =: (1 :: Int)
, "mechanism" =: case algo of
SHA1 -> "SCRAM-SHA-1" :: String
SHA256 -> "SCRAM-SHA-256"
, "payload" =: (B.unpack . B64.encode $ B.concat [B.pack "n,,", firstBare])
, "autoAuthorize" =: (1 :: Int)
]
server1 <- lift $ runCommand client1
shortcircuit (true1 "ok" server1) (show server1)
let serverPayload1 = B64.decodeLenient . B.pack . at "payload" $ server1
let serverData1 = parseSCRAM serverPayload1
let iterations = read . B.unpack $ Map.findWithDefault "1" "i" serverData1
let salt = B64.decodeLenient $ Map.findWithDefault "" "s" serverData1
let snonce = Map.findWithDefault "" "r" serverData1
shortcircuit (B.isInfixOf nonce snonce) "nonce"
let withoutProof = B.concat [B.pack "c=biws,r=", snonce]
let digest = case algo of
SHA1 -> B16.encode $ MD5.hash $ B.pack $ T.unpack un ++ ":mongo:" ++ T.unpack pw
SHA256 -> B.pack $ T.unpack $ saslprep pw
let saltedPass = scramHI algo digest salt iterations
let clientKey = hmac saltedPass (B.pack "Client Key")
let storedKey = hash algo clientKey
let authMsg = B.concat [firstBare, B.pack ",", serverPayload1, B.pack ",", withoutProof]
let clientSig = hmac storedKey authMsg
let pval = B64.encode . BS.pack $ BS.zipWith xor clientKey clientSig
let clientFinal = B.concat [withoutProof, B.pack ",p=", pval]
let client2 =
[ "saslContinue" =: (1 :: Int)
, "conversationId" =: (at "conversationId" server1 :: Int)
, "payload" =: B.unpack (B64.encode clientFinal)
]
server2 <- lift $ runCommand client2
shortcircuit (true1 "ok" server2) (show server2)
let serverKey = hmac saltedPass (B.pack "Server Key")
let serverSig = B64.encode $ hmac serverKey authMsg
let serverPayload2 = B64.decodeLenient . B.pack $ at "payload" server2
let serverData2 = parseSCRAM serverPayload2
let serverSigComp = Map.findWithDefault "" "v" serverData2
shortcircuit (serverSig == serverSigComp) "server signature does not match"
if true1 "done" server2
then return ()
else do
let client2Step2 = [ "saslContinue" =: (1 :: Int)
, "conversationId" =: (at "conversationId" server1 :: Int)
, "payload" =: String ""]
server3 <- lift $ runCommand client2Step2
shortcircuit (true1 "ok" server3) "server3"
shortcircuit :: Monad m => Bool -> String -> ExceptT String m ()
shortcircuit True _ = pure ()
shortcircuit False reason = throwE (show reason)
scramHI :: HashAlgorithm -> B.ByteString -> B.ByteString -> Int -> B.ByteString
scramHI algo digest salt iters = snd $ foldl com (u1, u1) [1..(iters-1)]
where
hmacd = HMAC.hmac (hash algo) 64 digest
u1 = hmacd (B.concat [salt, BS.pack [0, 0, 0, 1]])
com (u,uc) _ = let u' = hmacd u in (u', BS.pack $ BS.zipWith xor uc u')
parseSCRAM :: B.ByteString -> Map.Map B.ByteString B.ByteString
parseSCRAM = Map.fromList . fmap (cleanup . T.breakOn "=") . T.splitOn "," . T.pack . B.unpack
where cleanup (t1, t2) = (B.pack $ T.unpack t1, B.pack . T.unpack $ T.drop 1 t2)
-- As long as server api is not requested OP_Query has to be used. See:
-- https://github.com/mongodb/specifications/blob/6dc6f80026f0f8d99a8c81f996389534b14f6602/source/mongodb-handshake/handshake.rst#specification
retrieveServerData :: (MonadIO m) => Action m ServerData
retrieveServerData = do
d <- runCommand1 "isMaster"
let newSd = ServerData
{ isMaster = fromMaybe False $ lookup "isMaster" d
, minWireVersion = fromMaybe 0 $ lookup "minWireVersion" d
, maxWireVersion = fromMaybe 0 $ lookup "maxWireVersion" d
, maxMessageSizeBytes = fromMaybe 48000000 $ lookup "maxMessageSizeBytes" d
, maxBsonObjectSize = fromMaybe (16 * 1024 * 1024) $ lookup "maxBsonObjectSize" d
, maxWriteBatchSize = fromMaybe 1000 $ lookup "maxWriteBatchSize" d
}
return newSd
-- * Collection
type Collection = Text
-- ^ Collection name (not prefixed with database)
allCollections :: MonadIO m => Action m [Collection]
-- ^ List all collections in this database
allCollections = do
p <- asks mongoPipe
let sd = P.serverData p
if maxWireVersion sd <= 2
then do
db <- thisDatabase
docs <- rest =<< find (query [] "system.namespaces") {sort = ["name" =: (1 :: Int)]}
(return . filter (not . isSpecial db)) (map (dropDbPrefix . at "name") docs)
else
if maxWireVersion sd < 17
then do
r <- runCommand1 "listCollections"
let curData = do
(Doc curDoc) <- r !? "cursor"
(curId :: Int64) <- curDoc !? "id"
(curNs :: Text) <- curDoc !? "ns"
(firstBatch :: [Value]) <- curDoc !? "firstBatch"
return (curId, curNs, mapMaybe cast' firstBatch :: [Document])
case curData of
Nothing -> return []
Just (curId, curNs, firstBatch) -> do
db <- thisDatabase
nc <- newCursor db curNs 0 $ return $ Batch Nothing curId firstBatch
docs <- rest nc
return $ mapMaybe (\d -> d !? "name") docs
else do
let q = Query [] (Select ["listCollections" =: (1 :: Int)] "$cmd") [] 0 0 [] False 0 []
qr <- queryRequestOpMsg False q
dBatch <- liftIO $ requestOpMsg p qr []
db <- thisDatabase
nc <- newCursor db "$cmd" 0 dBatch
docs <- rest nc
return $ mapMaybe (\d -> d !? "name") docs
where
dropDbPrefix = T.tail . T.dropWhile (/= '.')
isSpecial db col = T.any (== '$') col && db <.> col /= "local.oplog.$main"
-- * Selection
data Selection = Select {selector :: Selector, coll :: Collection} deriving (Show, Eq)
-- ^ Selects documents in collection that match selector
type Selector = Document
-- ^ Filter for a query, analogous to the where clause in SQL. @[]@ matches all documents in collection. @[\"x\" =: a, \"y\" =: b]@ is analogous to @where x = a and y = b@ in SQL. See <http://www.mongodb.org/display/DOCS/Querying> for full selector syntax.
whereJS :: Selector -> Javascript -> Selector
-- ^ Add Javascript predicate to selector, in which case a document must match both selector and predicate
whereJS sel js = ("$where" =: js) : sel
class Select aQueryOrSelection where
select :: Selector -> Collection -> aQueryOrSelection
-- ^ 'Query' or 'Selection' that selects documents in collection that match selector. The choice of type depends on use, for example, in @'find' (select sel col)@ it is a Query, and in @'delete' (select sel col)@ it is a Selection.
instance Select Selection where
select = Select
instance Select Query where
select = query
-- * Write
data WriteMode =
NoConfirm -- ^ Submit writes without receiving acknowledgments. Fast. Assumes writes succeed even though they may not.
| Confirm GetLastError -- ^ Receive an acknowledgment after every write, and raise exception if one says the write failed. This is acomplished by sending the getLastError command, with given 'GetLastError' parameters, after every write.
deriving (Show, Eq)
write :: Notice -> Action IO (Maybe Document)
-- ^ Send write to server, and if write-mode is 'Safe' then include getLastError request and raise 'WriteFailure' if it reports an error.
write notice = asks mongoWriteMode >>= \mode -> case mode of
NoConfirm -> do
pipe <- asks mongoPipe
liftIOE ConnectionFailure $ P.send pipe [notice]
return Nothing
Confirm params -> do
let q = query (("getlasterror" =: (1 :: Int)) : params) "$cmd"
pipe <- asks mongoPipe
Batch _ _ [doc] <- do
r <- queryRequest False q {limit = 1}
rr <- liftIO $ request pipe [notice] r
fulfill rr
return $ Just doc
-- ** Insert
insert :: (MonadIO m) => Collection -> Document -> Action m Value
-- ^ Insert document into collection and return its \"_id\" value, which is created automatically if not supplied
insert col doc = do
doc' <- liftIO $ assignId doc
res <- insertBlock [] col (0, [doc'])
case res of
Left failure -> liftIO $ throwIO failure
Right r -> return $ head r
insert_ :: (MonadIO m) => Collection -> Document -> Action m ()
-- ^ Same as 'insert' except don't return _id
insert_ col doc = insert col doc >> return ()
insertMany :: (MonadIO m) => Collection -> [Document] -> Action m [Value]
-- ^ Insert documents into collection and return their \"_id\" values,
-- which are created automatically if not supplied.
-- If a document fails to be inserted (eg. due to duplicate key)
-- then remaining docs are aborted, and @LastError@ is set.
-- An exception will be throw if any error occurs.
insertMany = insert' []
insertMany_ :: (MonadIO m) => Collection -> [Document] -> Action m ()
-- ^ Same as 'insertMany' except don't return _ids
insertMany_ col docs = insertMany col docs >> return ()
insertAll :: (MonadIO m) => Collection -> [Document] -> Action m [Value]
-- ^ Insert documents into collection and return their \"_id\" values,
-- which are created automatically if not supplied. If a document fails
-- to be inserted (eg. due to duplicate key) then remaining docs
-- are still inserted.
insertAll = insert' [KeepGoing]
insertAll_ :: (MonadIO m) => Collection -> [Document] -> Action m ()
-- ^ Same as 'insertAll' except don't return _ids
insertAll_ col docs = insertAll col docs >> return ()
insertCommandDocument :: [InsertOption] -> Collection -> [Document] -> Document -> Document
insertCommandDocument opts col docs writeConcern =
[ "insert" =: col
, "ordered" =: (KeepGoing `notElem` opts)
, "documents" =: docs
, "writeConcern" =: writeConcern
]
takeRightsUpToLeft :: [Either a b] -> [b]
takeRightsUpToLeft l = E.rights $ takeWhile E.isRight l
insert' :: (MonadIO m)
=> [InsertOption] -> Collection -> [Document] -> Action m [Value]
-- ^ Insert documents into collection and return their \"_id\" values, which are created automatically if not supplied
insert' opts col docs = do
p <- asks mongoPipe
let sd = P.serverData p
docs' <- liftIO $ mapM assignId docs
mode <- asks mongoWriteMode
let writeConcern = case mode of
NoConfirm -> ["w" =: (0 :: Int32)]
Confirm params -> params
let docSize = sizeOfDocument $ insertCommandDocument opts col [] writeConcern
let ordered = KeepGoing `notElem` opts
let preChunks = splitAtLimit
(maxBsonObjectSize sd - docSize)
-- size of auxiliary part of insert
-- document should be subtracted from
-- the overall size
(maxWriteBatchSize sd)
docs'
let chunks =
if ordered
then takeRightsUpToLeft preChunks
else rights preChunks
let lens = map length chunks
let lSums = 0 : zipWith (+) lSums lens
chunkResults <- interruptibleFor ordered (zip lSums chunks) $ insertBlock opts col
let lchunks = lefts preChunks
when (not $ null lchunks) $ do
liftIO $ throwIO $ head lchunks
let lresults = lefts chunkResults
when (not $ null lresults) $ liftIO $ throwIO $ head lresults
return $ concat $ rights chunkResults
insertBlock :: (MonadIO m)
=> [InsertOption] -> Collection -> (Int, [Document]) -> Action m (Either Failure [Value])
-- ^ This will fail if the list of documents is bigger than restrictions
insertBlock _ _ (_, []) = return $ Right []
insertBlock opts col (prevCount, docs) = do
db <- thisDatabase
p <- asks mongoPipe
let sd = P.serverData p
if maxWireVersion sd < 2
then do
res <- liftDB $ write (Insert (db <.> col) opts docs)
let errorMessage = do
jRes <- res
em <- lookup "err" jRes
return $ WriteFailure prevCount (fromMaybe 0 $ lookup "code" jRes) em
-- In older versions of ^^ the protocol we can't really say which document failed.
-- So we just report the accumulated number of documents in the previous blocks.
case errorMessage of
Just failure -> return $ Left failure
Nothing -> return $ Right $ map (valueAt "_id") docs
else if maxWireVersion sd == 2 && maxWireVersion sd < 17 then do
mode <- asks mongoWriteMode
let writeConcern = case mode of
NoConfirm -> ["w" =: (0 :: Int32)]
Confirm params -> params
doc <- runCommand $ insertCommandDocument opts col docs writeConcern
case (look "writeErrors" doc, look "writeConcernError" doc) of
(Nothing, Nothing) -> return $ Right $ map (valueAt "_id") docs
(Just (Array errs), Nothing) -> do
let writeErrors = map (anyToWriteError prevCount) errs
let errorsWithFailureIndex = map (addFailureIndex prevCount) writeErrors
return $ Left $ CompoundFailure errorsWithFailureIndex
(Nothing, Just err) -> do
return $ Left $ WriteFailure
prevCount
(fromMaybe 0 $ lookup "ok" doc)
(show err)
(Just (Array errs), Just writeConcernErr) -> do
let writeErrors = map (anyToWriteError prevCount) errs
let errorsWithFailureIndex = map (addFailureIndex prevCount) writeErrors
return $ Left $ CompoundFailure $ WriteFailure
prevCount
(fromMaybe 0 $ lookup "ok" doc)
(show writeConcernErr) : errorsWithFailureIndex
(Just unknownValue, Nothing) -> do
return $ Left $ ProtocolFailure prevCount $ "Expected array of errors. Received: " ++ show unknownValue
(Just unknownValue, Just writeConcernErr) -> do
return $ Left $ CompoundFailure [ ProtocolFailure prevCount $ "Expected array of errors. Received: " ++ show unknownValue
, WriteFailure prevCount (fromMaybe 0 $ lookup "ok" doc) $ show writeConcernErr]
else do
mode <- asks mongoWriteMode
let writeConcern = case mode of
NoConfirm -> ["w" =: (0 :: Int32)]
Confirm params -> merge params ["w" =: (1 :: Int32)]
doc <- runCommand $ insertCommandDocument opts col docs writeConcern
case (look "writeErrors" doc, look "writeConcernError" doc) of
(Nothing, Nothing) -> return $ Right $ map (valueAt "_id") docs
(Just (Array errs), Nothing) -> do
let writeErrors = map (anyToWriteError prevCount) errs
let errorsWithFailureIndex = map (addFailureIndex prevCount) writeErrors
return $ Left $ CompoundFailure errorsWithFailureIndex
(Nothing, Just err) -> do
return $ Left $ WriteFailure
prevCount
(fromMaybe 0 $ lookup "ok" doc)
(show err)
(Just (Array errs), Just writeConcernErr) -> do
let writeErrors = map (anyToWriteError prevCount) errs
let errorsWithFailureIndex = map (addFailureIndex prevCount) writeErrors
return $ Left $ CompoundFailure $ WriteFailure
prevCount
(fromMaybe 0 $ lookup "ok" doc)
(show writeConcernErr) : errorsWithFailureIndex
(Just unknownValue, Nothing) -> do
return $ Left $ ProtocolFailure prevCount $ "Expected array of errors. Received: " ++ show unknownValue
(Just unknownValue, Just writeConcernErr) -> do
return $ Left $ CompoundFailure [ ProtocolFailure prevCount $ "Expected array of errors. Received: " ++ show unknownValue
, WriteFailure prevCount (fromMaybe 0 $ lookup "ok" doc) $ show writeConcernErr]
splitAtLimit :: Int -> Int -> [Document] -> [Either Failure [Document]]
splitAtLimit maxSize maxCount list = chop (go 0 0 []) list
where
go :: Int -> Int -> [Document] -> [Document] -> (Either Failure [Document], [Document])
go _ _ res [] = (Right $ reverse res, [])
go curSize curCount res (x : xs) =
let size = sizeOfDocument x + 8
in {- 8 bytes =
1 byte: element type.
6 bytes: key name. |key| <= log (maxWriteBatchSize = 100000)
1 byte: \x00.
See https://bsonspec.org/spec.html
-}
if (curSize + size > maxSize) || (curCount + 1 > maxCount)
then
if curCount == 0
then (Left $ WriteFailure 0 0 "One document is too big for the message", xs)
else (Right $ reverse res, x : xs)
else go (curSize + size) (curCount + 1) (x : res) xs
chop :: ([a] -> (b, [a])) -> [a] -> [b]
chop _ [] = []
chop f as = let (b, as') = f as in b : chop f as'
sizeOfDocument :: Document -> Int
sizeOfDocument d = fromIntegral $ LBS.length $ runPut $ putDocument d
assignId :: Document -> IO Document
-- ^ Assign a unique value to _id field if missing
assignId doc = if any (("_id" ==) . label) doc
then return doc
else (\oid -> ("_id" =: oid) : doc) <$> genObjectId
-- ** Update
save :: (MonadIO m)
=> Collection -> Document -> Action m ()
-- ^ Save document to collection, meaning insert it if its new (has no \"_id\" field) or upsert it if its not new (has \"_id\" field)
save col doc = case look "_id" doc of
Nothing -> insert_ col doc
Just i -> upsert (Select ["_id" := i] col) doc
replace :: (MonadIO m)
=> Selection -> Document -> Action m ()
-- ^ Replace first document in selection with given document
replace = update []
repsert :: (MonadIO m)
=> Selection -> Document -> Action m ()
-- ^ Replace first document in selection with given document, or insert document if selection is empty
repsert = update [Upsert]
{-# DEPRECATED repsert "use upsert instead" #-}
upsert :: (MonadIO m)
=> Selection -> Document -> Action m ()
-- ^ Update first document in selection with given document, or insert document if selection is empty
upsert = update [Upsert]
type Modifier = Document
-- ^ Update operations on fields in a document. See <https://docs.mongodb.com/manual/reference/operator/update/>
modify :: (MonadIO m)
=> Selection -> Modifier -> Action m ()
-- ^ Update all documents in selection using given modifier
modify = update [MultiUpdate]
update :: (MonadIO m)
=> [UpdateOption] -> Selection -> Document -> Action m ()
-- ^ Update first document in selection using updater document, unless 'MultiUpdate' option is supplied then update all documents in selection. If 'Upsert' option is supplied then treat updater as document and insert it if selection is empty.
update opts (Select sel col) up = do
pipe <- asks mongoPipe
db <- thisDatabase
let sd = P.serverData pipe
if maxWireVersion sd < 17
then do
ctx <- ask
liftIO $ runReaderT (void $ write (Update (db <.> col) opts sel up)) ctx
else do
liftIOE ConnectionFailure $
P.sendOpMsg
pipe
[Nc (Update (db <.> col) opts sel up)]
(Just P.MoreToCome)
["writeConcern" =: ["w" =: (0 :: Int32)]]
updateCommandDocument :: Collection -> Bool -> [Document] -> Document -> Document
updateCommandDocument col ordered updates writeConcern =
[ "update" =: col
, "ordered" =: ordered
, "updates" =: updates
, "writeConcern" =: writeConcern
]
{-| Bulk update operation. If one update fails it will not update the remaining
documents. Current returned value is only a place holder. With mongodb server
before 2.6 it will send update requests one by one. In order to receive
error messages in versions under 2.6 you need to user confirmed writes.
Otherwise even if the errors had place the list of errors will be empty and
the result will be success. After 2.6 it will use bulk update feature in
mongodb.
-}
updateMany :: (MonadIO m)
=> Collection
-> [(Selector, Document, [UpdateOption])]
-> Action m WriteResult
updateMany = update' True
{-| Bulk update operation. If one update fails it will proceed with the
remaining documents. With mongodb server before 2.6 it will send update
requests one by one. In order to receive error messages in versions under
2.6 you need to use confirmed writes. Otherwise even if the errors had
place the list of errors will be empty and the result will be success.
After 2.6 it will use bulk update feature in mongodb.
-}
updateAll :: (MonadIO m)
=> Collection
-> [(Selector, Document, [UpdateOption])]
-> Action m WriteResult
updateAll = update' False
update' :: (MonadIO m)
=> Bool
-> Collection
-> [(Selector, Document, [UpdateOption])]
-> Action m WriteResult
update' ordered col updateDocs = do
p <- asks mongoPipe
let sd = P.serverData p
let updates = map (\(s, d, os) -> [ "q" =: s
, "u" =: d
, "upsert" =: (Upsert `elem` os)
, "multi" =: (MultiUpdate `elem` os)])
updateDocs
mode <- asks mongoWriteMode
ctx <- ask
liftIO $ do
let writeConcern = case mode of
NoConfirm -> ["w" =: (0 :: Int32)]
Confirm params -> params
let docSize = sizeOfDocument $ updateCommandDocument
col
ordered
[]
writeConcern
let preChunks = splitAtLimit
(maxBsonObjectSize sd - docSize)
-- size of auxiliary part of update
-- document should be subtracted from
-- the overall size
(maxWriteBatchSize sd)
updates
let chunks =
if ordered
then takeRightsUpToLeft preChunks
else rights preChunks
let lens = map length chunks
let lSums = 0 : zipWith (+) lSums lens
blocks <- interruptibleFor ordered (zip lSums chunks) $ \b -> do
runReaderT (updateBlock ordered col b) ctx
`catch` \(e :: Failure) -> do
return $ WriteResult True 0 Nothing 0 [] [e] []
let failedTotal = any failed blocks
let updatedTotal = sum $ map nMatched blocks
let modifiedTotal =
if all (isNothing . nModified) blocks
then Nothing
else Just $ sum $ mapMaybe nModified blocks
let totalWriteErrors = concatMap writeErrors blocks
let totalWriteConcernErrors = concatMap writeConcernErrors blocks
let upsertedTotal = concatMap upserted blocks
return $ WriteResult
failedTotal
updatedTotal
modifiedTotal
0 -- nRemoved
upsertedTotal
totalWriteErrors
totalWriteConcernErrors
`catch` \(e :: Failure) -> return $ WriteResult True 0 Nothing 0 [] [e] []
updateBlock :: (MonadIO m)
=> Bool -> Collection -> (Int, [Document]) -> Action m WriteResult
updateBlock ordered col (prevCount, docs) = do
p <- asks mongoPipe
let sd = P.serverData p
if maxWireVersion sd < 2
then liftIO $ ioError $ userError "updateMany doesn't support mongodb older than 2.6"
else if maxWireVersion sd == 2 && maxWireVersion sd < 17 then do
mode <- asks mongoWriteMode
let writeConcern = case mode of
NoConfirm -> ["w" =: (0 :: Int32)]
Confirm params -> params
doc <- runCommand $ updateCommandDocument col ordered docs writeConcern
let n = fromMaybe 0 $ doc !? "n"
let writeErrorsResults =
case look "writeErrors" doc of
Nothing -> WriteResult False 0 (Just 0) 0 [] [] []
Just (Array err) -> WriteResult True 0 (Just 0) 0 [] (map (anyToWriteError prevCount) err) []
Just unknownErr -> WriteResult
True
0
(Just 0)
0
[]
[ ProtocolFailure
prevCount
$ "Expected array of error docs, but received: "
++ show unknownErr]
[]
let writeConcernResults =
case look "writeConcernError" doc of
Nothing -> WriteResult False 0 (Just 0) 0 [] [] []
Just (Doc err) -> WriteResult
True
0
(Just 0)
0
[]
[]
[ WriteConcernFailure
(fromMaybe (-1) $ err !? "code")
(fromMaybe "" $ err !? "errmsg")
]
Just unknownErr -> WriteResult
True
0
(Just 0)
0
[]
[]
[ ProtocolFailure
prevCount
$ "Expected doc in writeConcernError, but received: "
++ show unknownErr]
let upsertedList = maybe [] (map docToUpserted) (doc !? "upserted")
let successResults = WriteResult False n (doc !? "nModified") 0 upsertedList [] []
return $ foldl1' mergeWriteResults [writeErrorsResults, writeConcernResults, successResults]
else do
mode <- asks mongoWriteMode
let writeConcern = case mode of
NoConfirm -> ["w" =: (0 :: Int32)]
Confirm params -> merge params ["w" =: (1 :: Int32)]
doc <- runCommand $ updateCommandDocument col ordered docs writeConcern
let n = fromMaybe 0 $ doc !? "n"
let writeErrorsResults =
case look "writeErrors" doc of
Nothing -> WriteResult False 0 (Just 0) 0 [] [] []
Just (Array err) -> WriteResult True 0 (Just 0) 0 [] (map (anyToWriteError prevCount) err) []
Just unknownErr -> WriteResult
True
0
(Just 0)
0
[]
[ ProtocolFailure
prevCount
$ "Expected array of error docs, but received: "
++ show unknownErr]
[]
let writeConcernResults =
case look "writeConcernError" doc of
Nothing -> WriteResult False 0 (Just 0) 0 [] [] []
Just (Doc err) -> WriteResult
True
0
(Just 0)
0
[]
[]
[ WriteConcernFailure
(fromMaybe (-1) $ err !? "code")
(fromMaybe "" $ err !? "errmsg")
]
Just unknownErr -> WriteResult
True
0
(Just 0)
0
[]
[]
[ ProtocolFailure
prevCount
$ "Expected doc in writeConcernError, but received: "
++ show unknownErr]
let upsertedList = maybe [] (map docToUpserted) (doc !? "upserted")
let successResults = WriteResult False n (doc !? "nModified") 0 upsertedList [] []
return $ foldl1' mergeWriteResults [writeErrorsResults, writeConcernResults, successResults]
interruptibleFor :: (Monad m, Result b) => Bool -> [a] -> (a -> m b) -> m [b]
interruptibleFor ordered = go []
where
go !res [] _ = return $ reverse res
go !res (x:xs) f = do
y <- f x
if isFailed y && ordered
then return $ reverse (y:res)
else go (y:res) xs f
mergeWriteResults :: WriteResult -> WriteResult -> WriteResult
mergeWriteResults
(WriteResult failed1 nMatched1 nModified1 nDeleted1 upserted1 writeErrors1 writeConcernErrors1)
(WriteResult failed2 nMatched2 nModified2 nDeleted2 upserted2 writeErrors2 writeConcernErrors2) =
WriteResult
(failed1 || failed2)
(nMatched1 + nMatched2)
(liftM2 (+) nModified1 nModified2)
(nDeleted1 + nDeleted2)
-- This function is used in foldl1' function. The first argument is the accumulator.
-- The list in the accumulator is usually longer than the subsequent value which goes in the second argument.
-- So, changing the order of list concatenation allows us to keep linear complexity of the
-- whole list accumulation process.
(upserted2 ++ upserted1)
(writeErrors2 ++ writeErrors1)
(writeConcernErrors2 ++ writeConcernErrors1)
docToUpserted :: Document -> Upserted
docToUpserted doc = Upserted ind uid
where
ind = at "index" doc
uid = at "_id" doc
docToWriteError :: Document -> Failure
docToWriteError doc = WriteFailure ind code msg
where
ind = at "index" doc
code = at "code" doc
msg = at "errmsg" doc
-- ** Delete
delete :: (MonadIO m)
=> Selection -> Action m ()
-- ^ Delete all documents in selection
delete s = do
pipe <- asks mongoPipe