Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix for #8082 by making engine to use user buffers directly #8145

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 16 additions & 21 deletions src/dsql/DsqlBatch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -208,12 +208,6 @@ DsqlBatch* DsqlBatch::open(thread_db* tdbb, DsqlDmlRequest* req, IMessageMetadat

const auto statement = req->getDsqlStatement();

if (statement->getFlags() & DsqlStatement::FLAG_ORPHAN)
{
ERRD_post(Arg::Gds(isc_sqlerr) << Arg::Num(-901) <<
Arg::Gds(isc_bad_req_handle));
}

switch (statement->getType())
{
case DsqlStatement::TYPE_INSERT:
Expand All @@ -229,7 +223,7 @@ DsqlBatch* DsqlBatch::open(thread_db* tdbb, DsqlDmlRequest* req, IMessageMetadat
}

const dsql_msg* message = statement->getSendMsg();
if (! (inMetadata && message && req->parseMetadata(inMetadata, message->msg_parameters)))
if (! (inMetadata && message && message->msg_parameter > 0))
{
ERRD_post(Arg::Gds(isc_sqlerr) << Arg::Num(-901) <<
Arg::Gds(isc_batch_param));
Expand Down Expand Up @@ -659,18 +653,23 @@ Firebird::IBatchCompletionState* DsqlBatch::execute(thread_db* tdbb)
// execute request
m_dsqlRequest->req_transaction = transaction;
Request* req = m_dsqlRequest->getRequest();
DsqlStatement* dStmt = m_dsqlRequest->getDsqlStatement();
fb_assert(req);

// prepare completion interface
AutoPtr<BatchCompletionState, SimpleDispose> completionState
(FB_NEW BatchCompletionState(m_flags & (1 << IBatch::TAG_RECORD_COUNTS), m_detailed));
AutoSetRestore<bool> batchFlag(&req->req_batch_mode, true);
const dsql_msg* message = m_dsqlRequest->getDsqlStatement()->getSendMsg();
const dsql_msg* sendMessage = dStmt->getSendMsg();
// map message to internal engine format
// Do it one time only to avoid parsing its metadata for every message
m_dsqlRequest->metadataToFormat(m_meta, sendMessage);
// Using of positional DML in batch is strange but not forbidden
m_dsqlRequest->mapCursorKey(tdbb);
bool startRequest = true;

bool isExecBlock = m_dsqlRequest->getDsqlStatement()->getType() == DsqlStatement::TYPE_EXEC_BLOCK;
const auto receiveMessage = isExecBlock ? m_dsqlRequest->getDsqlStatement()->getReceiveMsg() : nullptr;
auto receiveMsgBuffer = isExecBlock ? m_dsqlRequest->req_msg_buffers[receiveMessage->msg_buffer_number] : nullptr;
bool isExecBlock = dStmt->getType() == DsqlStatement::TYPE_EXEC_BLOCK;
const dsql_msg* receiveMessage = isExecBlock ? dStmt->getReceiveMsg() : nullptr;

// process messages
ULONG remains;
Expand Down Expand Up @@ -726,25 +725,18 @@ Firebird::IBatchCompletionState* DsqlBatch::execute(thread_db* tdbb)
*id = newId;
}

// map message to internal engine format
// pass m_meta one time only to avoid parsing its metadata for every message
m_dsqlRequest->mapInOut(tdbb, false, message, start ? m_meta : nullptr, nullptr, data);
data += m_messageSize;
remains -= m_messageSize;

UCHAR* msgBuffer = m_dsqlRequest->req_msg_buffers[message->msg_buffer_number];
try
{
// runsend data to request and collect stats
ULONG before = req->req_records_inserted + req->req_records_updated +
req->req_records_deleted;
EXE_send(tdbb, req, message->msg_number, message->msg_length, msgBuffer);
EXE_send(tdbb, req, sendMessage->msg_number, m_messageSize, data);
ULONG after = req->req_records_inserted + req->req_records_updated +
req->req_records_deleted;
completionState->regUpdate(after - before);

if (isExecBlock)
EXE_receive(tdbb, req, receiveMessage->msg_number, receiveMessage->msg_length, receiveMsgBuffer);
if (receiveMessage)
EXE_receive(tdbb, req, receiveMessage->msg_number, receiveMessage->msg_length, nullptr); // We don't care about returned record
}
catch (const Exception& ex)
{
Expand All @@ -764,6 +756,9 @@ Firebird::IBatchCompletionState* DsqlBatch::execute(thread_db* tdbb)

startRequest = true;
}

data += m_messageSize;
remains -= m_messageSize;
}

UCHAR* alignedData = FB_ALIGN(data, m_alignment);
Expand Down
11 changes: 1 addition & 10 deletions src/dsql/DsqlCompilerScratch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -421,10 +421,7 @@ dsql_var* DsqlCompilerScratch::resolveVariable(const MetaName& varName)
// Generate BLR for a return.
void DsqlCompilerScratch::genReturn(bool eosFlag)
{
const bool hasEos = !(flags & (FLAG_TRIGGER | FLAG_FUNCTION));

if (hasEos && !eosFlag)
appendUChar(blr_begin);
const bool hasEos = !(flags & (FLAG_TRIGGER | FLAG_FUNCTION | FLAG_EXEC_BLOCK));

appendUChar(blr_send);
appendUChar(1);
Expand Down Expand Up @@ -455,12 +452,6 @@ void DsqlCompilerScratch::genReturn(bool eosFlag)
}

appendUChar(blr_end);

if (hasEos && !eosFlag)
{
appendUChar(blr_stall);
appendUChar(blr_end);
}
}

void DsqlCompilerScratch::genParameters(Array<NestConst<ParameterClause> >& parameters,
Expand Down
9 changes: 7 additions & 2 deletions src/dsql/DsqlCompilerScratch.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,10 @@ typedef Firebird::Pair<
Firebird::NonPooled<NestConst<ValueListNode>, NestConst<ValueListNode>>> ReturningClause;


// DSQL Compiler scratch block - may be discarded after compilation in the future.
// DSQL Compiler scratch block.
// Contains any kind of objects used during DsqlStatement compilation
// Is deleted with its pool as soon as DsqlStatement is fully formed in prepareStatement()
// or with the statement itself (if the statement reqested it returning true from shouldPreserveScratch())
class DsqlCompilerScratch : public BlrDebugWriter
{
public:
Expand All @@ -70,6 +73,7 @@ class DsqlCompilerScratch : public BlrDebugWriter
static const unsigned FLAG_DDL = 0x2000;
static const unsigned FLAG_FETCH = 0x4000;
static const unsigned FLAG_VIEW_WITH_CHECK = 0x8000;
static const unsigned FLAG_EXEC_BLOCK = 0x100000;

static const unsigned MAX_NESTING = 512;

Expand Down Expand Up @@ -105,7 +109,7 @@ class DsqlCompilerScratch : public BlrDebugWriter

protected:
// DsqlCompilerScratch should never be destroyed using delete.
// It dies together with it's pool in release_request().
// It dies together with it's pool.
~DsqlCompilerScratch()
{
}
Expand Down Expand Up @@ -317,6 +321,7 @@ class DsqlCompilerScratch : public BlrDebugWriter
DsqlCompilerScratch* mainScratch = nullptr;
Firebird::NonPooledMap<USHORT, USHORT> outerMessagesMap; // <outer, inner>
Firebird::NonPooledMap<USHORT, USHORT> outerVarsMap; // <outer, inner>
dsql_msg* recordKeyMessage = nullptr; // Side message for positioned DML

private:
Firebird::HalfStaticArray<SelectExprNode*, 4> ctes; // common table expressions
Expand Down
97 changes: 78 additions & 19 deletions src/dsql/DsqlCursor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

#include "../dsql/dsql_proto.h"
#include "../dsql/DsqlCursor.h"
#include "../dsql/StmtNodes.h"

using namespace Firebird;
using namespace Jrd;
Expand All @@ -36,7 +37,11 @@ static const char* const SCRATCH = "fb_cursor_";
static const ULONG PREFETCH_SIZE = 65536; // 64 KB

DsqlCursor::DsqlCursor(DsqlDmlRequest* req, ULONG flags)
: m_dsqlRequest(req), m_message(req->getDsqlStatement()->getReceiveMsg()),
: m_dsqlRequest(req),
m_keyBuffer(nullptr),
m_keyBufferLength(0),
m_message(req->getDsqlStatement()->getReceiveMsg()->msg_number),
m_messageLength(0),
m_resultSet(NULL), m_flags(flags),
m_space(req->getPool(), SCRATCH),
m_state(BOS), m_eof(false), m_position(0), m_cachedCount(0)
Expand All @@ -48,6 +53,11 @@ DsqlCursor::~DsqlCursor()
{
if (m_resultSet)
m_resultSet->resetHandle();
if (m_keyBuffer)
{
delete[] m_keyBuffer;
m_keyBuffer = nullptr;
}
}

jrd_tra* DsqlCursor::getTransaction() const
Expand All @@ -66,6 +76,23 @@ void DsqlCursor::setInterfacePtr(JResultSet* interfacePtr) noexcept
m_resultSet = interfacePtr;
}

bool DsqlCursor::getCurrentRecordKey(USHORT context, RecordKey& key) const
{
if (m_keyBuffer == nullptr || context * sizeof(RecordKey) >= m_keyBufferLength)
{
fb_assert(false);
return false;
}

if (m_state != POSITIONED)
{
return false;
}

key = m_keyBuffer[context];
return key.recordNumber.bid_relation_id != 0;
}

void DsqlCursor::close(thread_db* tdbb, DsqlCursor* cursor)
{
if (!cursor)
Expand All @@ -88,7 +115,7 @@ void DsqlCursor::close(thread_db* tdbb, DsqlCursor* cursor)

if (dsqlRequest->req_traced && TraceManager::need_dsql_free(attachment))
{
TraceSQLStatementImpl stmt(dsqlRequest, NULL);
TraceSQLStatementImpl stmt(dsqlRequest, nullptr, nullptr);
TraceManager::event_dsql_free(attachment, &stmt, DSQL_close);
}

Expand All @@ -115,6 +142,15 @@ int DsqlCursor::fetchNext(thread_db* tdbb, UCHAR* buffer)
return 1;
}

if (m_keyBufferLength == 0)
{
Request* req = m_dsqlRequest->getRequest();
m_keyBufferLength = req->req_rpb.getCount() * sizeof(RecordKey);
fb_assert(m_keyBufferLength > 0);
m_keyBuffer = new RecordKey[req->req_rpb.getCount()];
}

m_dsqlRequest->gatherRecordKey(m_keyBuffer);
m_state = POSITIONED;
return 0;
}
Expand Down Expand Up @@ -163,7 +199,7 @@ int DsqlCursor::fetchAbsolute(thread_db* tdbb, UCHAR* buffer, SLONG position)
{
if (!m_eof)
{
cacheInput(tdbb);
cacheInput(tdbb, buffer);
fb_assert(m_eof);
}

Expand Down Expand Up @@ -248,7 +284,7 @@ void DsqlCursor::getInfo(thread_db* tdbb,
case IResultSet::INF_RECORD_COUNT:
if (isScrollable && !m_eof)
{
cacheInput(tdbb);
cacheInput(tdbb, nullptr);
fb_assert(m_eof);
}
response.insertInt(tag, isScrollable ? m_cachedCount : -1);
Expand Down Expand Up @@ -291,48 +327,71 @@ int DsqlCursor::fetchFromCache(thread_db* tdbb, UCHAR* buffer, FB_UINT64 positio
{
if (position >= m_cachedCount)
{
if (m_eof || !cacheInput(tdbb, position))
if (m_eof || !cacheInput(tdbb, buffer, position))
{
m_state = EOS;
return 1;
}
}

fb_assert(position < m_cachedCount);
fb_assert(m_messageLength > 0); // At this point m_messageLength must be set by cacheInput

UCHAR* const msgBuffer = m_dsqlRequest->req_msg_buffers[m_message->msg_buffer_number];

const FB_UINT64 offset = position * m_message->msg_length;
const FB_UINT64 readBytes = m_space.read(offset, msgBuffer, m_message->msg_length);
fb_assert(readBytes == m_message->msg_length);

m_dsqlRequest->mapInOut(tdbb, true, m_message, NULL, buffer);
FB_UINT64 offset = position * (m_messageLength + m_keyBufferLength);
FB_UINT64 readBytes = m_space.read(offset, buffer, m_messageLength);
offset += m_messageLength;
readBytes += m_space.read(offset, m_keyBuffer, m_keyBufferLength);
fb_assert(readBytes == m_messageLength + m_keyBufferLength);

m_position = position;
m_state = POSITIONED;
return 0;
}

bool DsqlCursor::cacheInput(thread_db* tdbb, FB_UINT64 position)
bool DsqlCursor::cacheInput(thread_db* tdbb, UCHAR* buffer, FB_UINT64 position)
{
fb_assert(!m_eof);

const ULONG prefetchCount = MAX(PREFETCH_SIZE / m_message->msg_length, 1);
const UCHAR* const msgBuffer = m_dsqlRequest->req_msg_buffers[m_message->msg_buffer_number];
// It could not be done before: user buffer length may be unknown until call setDelayedOutputMetadata()
if (m_messageLength == 0)
{
Request* req = m_dsqlRequest->getRequest();
const MessageNode* msg = req->getStatement()->getMessage(m_message);
m_messageLength = msg->getFormat(req)->fmt_length;
// Save record key unconditionally because setCursorName() can be called after openCursor()
m_keyBufferLength = req->req_rpb.getCount() * sizeof(RecordKey);
m_keyBuffer = new RecordKey[req->req_rpb.getCount()];
}

std::unique_ptr<UCHAR[]> ownBuffer;
if (buffer == nullptr)
{
// We are called from getInfo() and there is no user-provided buffer for data.
// Create a temporary one.
// This code cannot be moved into getInfo() itself because it is most likely called before fetch()
// so m_messageLength is still unknown there.
ownBuffer.reset(buffer = new UCHAR[m_messageLength]);
}

const ULONG prefetchCount = MAX(PREFETCH_SIZE / (m_messageLength + m_keyBufferLength), 1);

while (position >= m_cachedCount)
{
for (ULONG count = 0; count < prefetchCount; count++)
{
if (!m_dsqlRequest->fetch(tdbb, NULL))
if (!m_dsqlRequest->fetch(tdbb, buffer))
{
m_eof = true;
break;
}

const FB_UINT64 offset = m_cachedCount * m_message->msg_length;
const FB_UINT64 writtenBytes = m_space.write(offset, msgBuffer, m_message->msg_length);
fb_assert(writtenBytes == m_message->msg_length);
m_dsqlRequest->gatherRecordKey(m_keyBuffer);

FB_UINT64 offset = m_cachedCount * (m_messageLength + m_keyBufferLength);
FB_UINT64 writtenBytes = m_space.write(offset, buffer, m_messageLength);
offset += m_messageLength;
writtenBytes += m_space.write(offset, m_keyBuffer, m_keyBufferLength);
fb_assert(writtenBytes == m_messageLength + m_keyBufferLength);
m_cachedCount++;
}

Expand Down
9 changes: 7 additions & 2 deletions src/dsql/DsqlCursor.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ namespace Jrd {

class DsqlDmlRequest;
class JResultSet;
struct RecordKey;

class DsqlCursor
{
Expand All @@ -41,6 +42,7 @@ class DsqlCursor
jrd_tra* getTransaction() const;
Attachment* getAttachment() const;
void setInterfacePtr(JResultSet* interfacePtr) noexcept;
bool getCurrentRecordKey(USHORT context, RecordKey& key) const;

static void close(thread_db* tdbb, DsqlCursor* cursor);

Expand All @@ -67,10 +69,13 @@ class DsqlCursor

private:
int fetchFromCache(thread_db* tdbb, UCHAR* buffer, FB_UINT64 position);
bool cacheInput(thread_db* tdbb, FB_UINT64 position = MAX_UINT64);
bool cacheInput(thread_db* tdbb, UCHAR* buffer, FB_UINT64 position = MAX_UINT64);

DsqlDmlRequest* const m_dsqlRequest;
const dsql_msg* const m_message;
RecordKey* m_keyBuffer;
ULONG m_keyBufferLength;
const USHORT m_message;
ULONG m_messageLength;
JResultSet* m_resultSet;
const ULONG m_flags;
TempSpace m_space;
Expand Down
Loading
Loading