Skip to content

Commit

Permalink
[mORMot] - upgrade to 2.2.7414
Browse files Browse the repository at this point in the history
 - added /cached_query implementation for raw server
 - increased threads count for async server cpuCount * 2 -> cpuCount * 4
  • Loading branch information
pavel.mash committed Apr 28, 2024
1 parent eeadf09 commit 8c6157f
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 52 deletions.
1 change: 1 addition & 0 deletions frameworks/Pascal/mormot/benchmark_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
"dockerfile": "mormot.dockerfile",
"db_url": "/rawdb",
"query_url": "/rawqueries?queries=",
"cached_query_url": "/rawcached?count=",
"fortune_url": "/rawfortunes",
"update_url": "/rawupdates?queries=",
"port": 8080,
Expand Down
2 changes: 1 addition & 1 deletion frameworks/Pascal/mormot/setup_and_build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ echo "Download statics from $URL ..."
wget -qO- "$URL" | tar -xz -C ./libs/mORMot/static

# uncomment for fixed commit URL
URL=https://github.com/synopse/mORMot2/tarball/7dc50900266f07454fe60b60e4a2755ce445ddeb
URL=https://github.com/synopse/mORMot2/tarball/527b3fb11cb4dad5f2c03ace293b550f85504420
#URL="https://api.github.com/repos/synopse/mORMot2/tarball/$USED_TAG"
echo "Download and unpacking mORMot sources from $URL ..."
wget -qO- "$URL" | tar -xz -C ./libs/mORMot --strip-components=1
Expand Down
129 changes: 78 additions & 51 deletions frameworks/Pascal/mormot/src/raw.pas
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
end;
TWorlds = array of TWorldRec;
TFortune = packed record
id: integer;
id: PtrUInt;
message: PUtf8Char;
end;
TFortunes = array of TFortune;
Expand Down Expand Up @@ -86,6 +86,7 @@ TRawAsyncServer = class(TSynPersistent)
fStore: TRestServerDB;
fTemplate: TSynMustache;
fCachedWorldsTable: POrmCacheTable;
fRawCache: TOrmWorlds;
fDbPool: TSqlDBPostgresConnectionProperties;
procedure OnAsyncDb(Statement: TSqlDBPostgresAsyncStatement; Context: TObject);
procedure OnAsyncFortunes(Statement: TSqlDBPostgresAsyncStatement; Context: TObject);
Expand All @@ -107,6 +108,7 @@ TRawAsyncServer = class(TSynPersistent)
function updates(ctxt: THttpServerRequest): cardinal;
function rawdb(ctxt: THttpServerRequest): cardinal;
function rawqueries(ctxt: THttpServerRequest): cardinal;
function rawcached(ctxt: THttpServerRequest): cardinal;
function rawfortunes(ctxt: THttpServerRequest): cardinal;
function rawupdates(ctxt: THttpServerRequest): cardinal;
// asynchronous PostgreSQL pipelined DB access
Expand Down Expand Up @@ -144,13 +146,13 @@ TRawAsyncServer = class(TSynPersistent)
'</html>';


function ComputeRandomWorld: integer; inline;
function ComputeRandomWorld(gen: PLecuyer): integer; inline;
begin
result := Random32(WORLD_COUNT) + 1;
result := gen^.Next(WORLD_COUNT) + 1;
end;

function GetQueriesParamValue(ctxt: THttpServerRequest;
const search: RawUtf8 = 'QUERIES='): cardinal;
const search: RawUtf8 = 'QUERIES='): cardinal; inline;
begin
if not ctxt.UrlParam(search, result) or
(result = 0) then
Expand Down Expand Up @@ -190,6 +192,7 @@ constructor TRawAsyncServer.Create(
if fStore.Server.Cache.SetCache(TOrmCachedWorld) then
fStore.Server.Cache.FillFromQuery(TOrmCachedWorld, '', []);
fCachedWorldsTable := fStore.Orm.Cache.Table(TOrmCachedWorld);
fStore.Orm.RetrieveListObjArray(fRawCache, TOrmCachedWorld, 'order by id', []);
// initialize the mustache template for /fortunes
fTemplate := TSynMustache.Parse(FORTUNES_TPL);
// setup the HTTP server
Expand Down Expand Up @@ -222,22 +225,26 @@ destructor TRawAsyncServer.Destroy;
fHttpServer.Free;
fStore.Free;
fModel.Free;
fDBPool.free;
fDBPool.Free;
ObjArrayClear(fRawCache);
inherited Destroy;
end;

// query DB world table for /rawqueries and /rawupdates endpoints

function TRawAsyncServer.GetRawRandomWorlds(cnt: PtrInt; out res: TWorlds): boolean;
function TRawAsyncServer.GetRawRandomWorlds(cnt: PtrInt;
out res: TWorlds): boolean;
var
conn: TSqlDBConnection;
stmt: ISqlDBStatement;
pConn: TSqlDBPostgresConnection absolute conn;
pStmt: TSqlDBPostgresStatement;
gen: PLecuyer;
i: PtrInt;
begin
result := false;
SetLength(res{%H-}, cnt);
gen := Lecuyer;
conn := fDbPool.ThreadSafeConnection;
// specific code to use PostgresSQL pipelining mode
// see test_nosync in
Expand All @@ -247,7 +254,7 @@ function TRawAsyncServer.GetRawRandomWorlds(cnt: PtrInt; out res: TWorlds): bool
pStmt := TSqlDBPostgresStatement(stmt.Instance);
for i := 0 to cnt - 1 do
begin
pStmt.Bind(1, ComputeRandomWorld);
pStmt.Bind(1, ComputeRandomWorld(gen));
pStmt.SendPipelinePrepared;
pConn.PipelineSync;
end;
Expand Down Expand Up @@ -323,7 +330,7 @@ function TRawAsyncServer.db(ctxt: THttpServerRequest): cardinal;
var
w: TOrmWorld;
begin
w := TOrmWorld.Create(fStore.Orm, ComputeRandomWorld);
w := TOrmWorld.Create(fStore.Orm, ComputeRandomWorld(Lecuyer));
try
ctxt.SetOutJson(w);
result := HTTP_SUCCESS;
Expand All @@ -336,10 +343,12 @@ function TRawAsyncServer.queries(ctxt: THttpServerRequest): cardinal;
var
i: PtrInt;
res: TOrmWorlds;
gen: PLecuyer;
begin
SetLength(res, GetQueriesParamValue(ctxt, 'QUERIES='));
gen := Lecuyer;
for i := 0 to length(res) - 1 do
res[i] := TOrmWorld.Create(fStore.Orm, ComputeRandomWorld);
res[i] := TOrmWorld.Create(fStore.Orm, ComputeRandomWorld(gen));
ctxt.SetOutJson(@res, TypeInfo(TOrmWorlds));
ObjArrayClear(res);
result := HTTP_SUCCESS;
Expand All @@ -349,10 +358,12 @@ function TRawAsyncServer.cached_queries(ctxt: THttpServerRequest): cardinal;
var
i: PtrInt;
res: TOrmWorlds;
gen: PLecuyer;
begin
SetLength(res, GetQueriesParamValue(ctxt, 'COUNT='));
gen := Lecuyer;
for i := 0 to length(res) - 1 do
res[i] := fCachedWorldsTable.Get(ComputeRandomWorld);
res[i] := fCachedWorldsTable.Get(ComputeRandomWorld(gen));
ctxt.SetOutJson(@res, TypeInfo(TOrmWorlds));
result := HTTP_SUCCESS;
end;
Expand Down Expand Up @@ -390,19 +401,21 @@ function TRawAsyncServer.updates(ctxt: THttpServerRequest): cardinal;
res: TOrmWorlds;
w: TOrmWorld;
b: TRestBatch;
gen: PLecuyer;
begin
result := HTTP_SERVERERROR;
SetLength(res, GetQueriesParamValue(ctxt));
b := TRestBatch.Create(fStore.ORM, TOrmWorld, {transrows=}0,
[boExtendedJson, boNoModelEncoding, boPutNoCacheFlush]);
try
gen := Lecuyer;
for i := 0 to length(res) - 1 do
begin
w := TOrmWorld.Create;
res[i] := w;
if not fStore.Orm.Retrieve(ComputeRandomWorld, w) then
if not fStore.Orm.Retrieve(ComputeRandomWorld(gen), w) then
exit;
w.RandomNumber := ComputeRandomWorld;
w.RandomNumber := ComputeRandomWorld(gen);
b.Update(w);
end;
result := b.Send;
Expand All @@ -422,7 +435,7 @@ function TRawAsyncServer.rawdb(ctxt: THttpServerRequest): cardinal;
result := HTTP_SERVERERROR;
conn := fDbPool.ThreadSafeConnection;
stmt := conn.NewStatementPrepared(WORLD_READ_SQL, true, true);
stmt.Bind(1, ComputeRandomWorld);
stmt.Bind(1, ComputeRandomWorld(Lecuyer));
stmt.ExecutePrepared;
if stmt.Step then
begin
Expand All @@ -444,6 +457,20 @@ function TRawAsyncServer.rawqueries(ctxt: THttpServerRequest): cardinal;
result := HTTP_SUCCESS;
end;

function TRawAsyncServer.rawcached(ctxt: THttpServerRequest): cardinal;
var
i: PtrInt;
res: TOrmWorlds;
gen: PLecuyer;
begin
SetLength(res, GetQueriesParamValue(ctxt, 'COUNT='));
gen := Lecuyer;
for i := 0 to length(res) - 1 do
res[i] := fRawCache[ComputeRandomWorld(gen) - 1];
ctxt.SetOutJson(@res, TypeInfo(TOrmWorlds));
result := HTTP_SUCCESS;
end;

function TRawAsyncServer.rawfortunes(ctxt: THttpServerRequest): cardinal;
var
conn: TSqlDBConnection;
Expand Down Expand Up @@ -476,10 +503,7 @@ function ComputeUpdateSql(cnt: integer): RawUtf8;
try
W.AddShort('UPDATE world SET randomNumber = v.randomNumber FROM (VALUES');
for i := 1 to cnt do
begin
W.AddShort('(?::integer, ?::integer)');
W.Add(',');
end;
W.AddShort('(?::integer, ?::integer),');
W.CancelLastComma;
W.AddShort(' order by 1) AS v (id, randomNumber) WHERE world.id = v.id');
W.SetText(LastComputeUpdateSql);
Expand All @@ -496,6 +520,7 @@ function TRawAsyncServer.rawupdates(ctxt: THttpServerRequest): cardinal;
cnt, i: PtrInt;
res: TWorlds;
ids, nums: TInt64DynArray;
gen: PLecuyer;
conn: TSqlDBConnection;
stmt: ISqlDBStatement;
begin
Expand All @@ -505,8 +530,9 @@ function TRawAsyncServer.rawupdates(ctxt: THttpServerRequest): cardinal;
if not getRawRandomWorlds(cnt, res) then
exit;
// generate new randoms
gen := Lecuyer;
for i := 0 to cnt - 1 do
res[i].randomNumber := ComputeRandomWorld;
res[i].randomNumber := ComputeRandomWorld(gen);
if cnt > 20 then
begin
// fill parameters arrays for update with nested select (PostgreSQL only)
Expand Down Expand Up @@ -546,7 +572,7 @@ function TRawAsyncServer.asyncdb(ctxt: THttpServerRequest): cardinal;
begin
with fDbPool.Async.PrepareLocked(WORLD_READ_SQL, {res=}true, ASYNC_OPT) do
try
Bind(1, ComputeRandomWorld);
Bind(1, ComputeRandomWorld(Lecuyer));
ExecuteAsync(ctxt, OnAsyncDb);
finally
UnLock;
Expand Down Expand Up @@ -591,57 +617,61 @@ TAsyncWorld = class
res: TWorlds;
count, current: integer;
update: TSqlDBPostgresAsyncStatement; // prepared before any callback
function Queries(async: TSqlDBPostgresAsync; ctxt: THttpServerRequest): cardinal;
function Updates(async: TSqlDBPostgresAsync; ctxt: THttpServerRequest): cardinal;
async: TSqlDBPostgresAsync;
function Queries(server: TRawAsyncServer; ctxt: THttpServerRequest): cardinal;
function Updates(server: TRawAsyncServer; ctxt: THttpServerRequest): cardinal;
procedure DoUpdates;
procedure OnQueries(Statement: TSqlDBPostgresAsyncStatement; Context: TObject);
procedure OnRes({%H-}Statement: TSqlDBPostgresAsyncStatement; Context: TObject);
end;

function TRawAsyncServer.asyncqueries(ctxt: THttpServerRequest): cardinal;
begin
result := TAsyncWorld.Create.Queries(fDBPool.Async, ctxt);
result := TAsyncWorld.Create.Queries(self, ctxt);
end;

function TRawAsyncServer.asyncupdates(ctxt: THttpServerRequest): cardinal;
begin
result := TAsyncWorld.Create.Updates(fDBPool.Async, ctxt);
result := TAsyncWorld.Create.Updates(self, ctxt);
end;


{ TAsyncWorld }

function TAsyncWorld.Queries(async: TSqlDBPostgresAsync; ctxt: THttpServerRequest): cardinal;
function TAsyncWorld.Queries(server: TRawAsyncServer; ctxt: THttpServerRequest): cardinal;
var
n: integer;
opt: TSqlDBPostgresAsyncStatementOptions; // for modified libpq
opt: TSqlDBPostgresAsyncStatementOptions; // forced options for modified libpq
gen: PLecuyer;
select: TSqlDBPostgresAsyncStatement;
begin
request := ctxt;
if async = nil then
async := server.fDbPool.Async;
if count = 0 then
count := getQueriesParamValue(ctxt);
SetLength(res, count); // count is > 0
with async.PrepareLocked(WORLD_READ_SQL, {res=}true, ASYNC_OPT) do
try
opt := AsyncOptions - [asoForceConnectionFlush];
n := count;
repeat
dec(n);
Bind(1, ComputeRandomWorld);
if n = 0 then // last item should include asoForceConnectionFlush (if set)
opt := AsyncOptions;
ExecuteAsync(ctxt, OnQueries, @opt);
until n = 0;
finally
UnLock;
end;
select := async.PrepareLocked(WORLD_READ_SQL, {res=}true, ASYNC_OPT);
opt := ASYNC_OPT - [asoForceConnectionFlush];
n := count;
gen := Lecuyer;
repeat
dec(n);
select.Bind(1, ComputeRandomWorld(gen));
if n = 0 then // last item should include asoForceConnectionFlush (if set)
opt := ASYNC_OPT;
select.ExecuteAsync(ctxt, OnQueries, @opt);
until n = 0;
select.UnLock;
result := ctxt.SetAsyncResponse;
end;

function TAsyncWorld.Updates(async: TSqlDBPostgresAsync; ctxt: THttpServerRequest): cardinal;
function TAsyncWorld.Updates(server: TRawAsyncServer; ctxt: THttpServerRequest): cardinal;
begin
async := server.fDbPool.Async;
count := getQueriesParamValue(ctxt);
update := async.Prepare(WORLD_UPDATE_SQLN, false, ASYNC_OPT);
result := Queries(async, ctxt);
result := Queries(server, ctxt);
end;

procedure TAsyncWorld.OnQueries(Statement: TSqlDBPostgresAsyncStatement;
Expand All @@ -666,9 +696,11 @@ procedure TAsyncWorld.DoUpdates;
var
i: PtrInt;
params: TIntegerDynArray;
gen: PLecuyer;
begin
gen := Lecuyer;
for i := 0 to count - 1 do
res[i].randomNumber := ComputeRandomWorld;
res[i].randomNumber := ComputeRandomWorld(gen);
SetLength(params, count);
for i := 0 to count - 1 do
params[i] := res[i].id;
Expand Down Expand Up @@ -708,8 +740,8 @@ procedure TAsyncWorld.OnRes(Statement: TSqlDBPostgresAsyncStatement;
// register some RTTI for records JSON serialization
Rtti.RegisterFromText([
TypeInfo(TMessageRec), 'message:PUtf8Char',
TypeInfo(TWorldRec), 'id,randomNumber:integer',
TypeInfo(TFortune), 'id:integer message:PUtf8Char']);
TypeInfo(TWorldRec), 'id,randomNumber:cardinal',
TypeInfo(TFortune), 'id:PtrUInt message:PUtf8Char']);

// compute default execution context from HW information
cpuCount := CurrentCpuSet(cpuMask); // may run from a "taskset" command
Expand All @@ -731,7 +763,7 @@ procedure TAsyncWorld.OnRes(Statement: TSqlDBPostgresAsyncStatement;
begin
// asynchronus test with single listener socket and no CPU pinning
servers := 1;
threads := cpuCount * 2;
threads := cpuCount * 4;
pinServers2Cores := false;
end;
end
Expand All @@ -753,12 +785,7 @@ procedure TAsyncWorld.OnRes(Statement: TSqlDBPostgresAsyncStatement;
pinServers2Cores := false; // no option would keep the default boolean
Get(['s', 'servers'], servers, '#count of servers (listener sockets)', servers);
Get(['t', 'threads'], threads, 'per-server thread pool #size', threads);
if Option(['?', 'help'], 'display this message') then
begin
ConsoleWrite(FullDescription);
exit;
end;
if ConsoleWriteUnknown then
if ConsoleHelpFailed('TFB Server using mORMot 2') then
exit;
end;

Expand Down

0 comments on commit 8c6157f

Please sign in to comment.