Skip to content

Commit

Permalink
debug and fix iocp_asio
Browse files Browse the repository at this point in the history
  • Loading branch information
microcai committed Jan 5, 2025
1 parent d5fdfc8 commit b14a87a
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 22 deletions.
32 changes: 14 additions & 18 deletions example/unbufcpy/unbufcp5/unbufcp5.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -165,45 +165,41 @@ class FiberChannel
std::deque<FiberOVERLAPPED*> m_push_awaiting;
};

struct OverlappedBuffer : public FiberOVERLAPPED
{
IOBuffer buf;
};

struct FileCopyer
{
uint64_t cur_pos = 0;

std::array<OverlappedBuffer, PENDING_IO> buffers;
FiberChannel<int> chan;

FileCopyer(HANDLE iocp)
: chan(iocp, PENDING_IO)
{
}

void buffer_carrier(int buffer_index, HANDLE srcFile, HANDLE destFile, FiberChannel<int>& chan)
void buffer_carrier(HANDLE srcFile, HANDLE destFile, FiberChannel<int>& chan)
{
for (;;)
{
buffers[buffer_index].set_offset(cur_pos);
IOBuffer buf;
FiberOVERLAPPED ov;
ov.set_offset(cur_pos);
cur_pos += BUFFER_SIZE;

DWORD readbytes = 0;
auto ret = ReadFile(srcFile, buffers[buffer_index].buf, BUFFER_SIZE, &readbytes, &buffers[buffer_index]);
buffers[buffer_index].last_error = GetLastError();
if (!(!ret && buffers[buffer_index].last_error != ERROR_IO_PENDING))
readbytes = get_overlapped_result(buffers[buffer_index]);
if (buffers[buffer_index].last_error)
auto ret = ReadFile(srcFile, buf, BUFFER_SIZE, &readbytes, &ov);
ov.last_error = GetLastError();
if (!(!ret && ov.last_error != ERROR_IO_PENDING))
readbytes = get_overlapped_result(ov);
if (ov.last_error)
{
chan.push(1);
return;
}
DWORD written = 0;
ret = WriteFile(destFile, buffers[buffer_index].buf, (readbytes + PageSize - 1) & ~(PageSize - 1), &written, &buffers[buffer_index]);
buffers[buffer_index].last_error = GetLastError();
if (!(!ret && buffers[buffer_index].last_error != ERROR_IO_PENDING))
readbytes = get_overlapped_result(buffers[buffer_index]);
ret = WriteFile(destFile, buf, (readbytes + PageSize - 1) & ~(PageSize - 1), &written, &ov);
ov.last_error = GetLastError();
if (!(!ret && ov.last_error != ERROR_IO_PENDING))
written = get_overlapped_result(ov);
}
}

Expand All @@ -212,7 +208,7 @@ struct FileCopyer

for (int i = 0; i < PENDING_IO; i++)
{
create_detached_coroutine(std::bind(&FileCopyer::buffer_carrier, this, i, srcFile, destFile, std::ref(chan)));
create_detached_coroutine(std::bind(&FileCopyer::buffer_carrier, this, srcFile, destFile, std::ref(chan)));
}

// wait until completed.
Expand Down
14 changes: 10 additions & 4 deletions iocp_asio/src/iocp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -888,6 +888,12 @@ IOCP_DECL BOOL ReadFile(
if (lpNumberOfBytesRead)
*lpNumberOfBytesRead = readed;

if (readed == 0)
{
SetLastError(ERROR_HANDLE_EOF);
return FALSE;
}

asio_operation* op = new asio_operation;
// op->lpCompletionRoutine = lpCompletionRoutine;
op->overlapped_ptr = lpOverlapped;
Expand All @@ -901,7 +907,7 @@ IOCP_DECL BOOL ReadFile(

SetLastError(ERROR_IO_PENDING);

return readed > 0;
return FALSE;
}

IOCP_DECL BOOL WriteFile(
Expand All @@ -921,7 +927,7 @@ IOCP_DECL BOOL WriteFile(

iocp_handle_emu_class* iocp = s->_iocp;

auto write_ret = write(s->native_handle(), lpBuffer, nNumberOfBytesToWrite);
auto write_ret = pwrite(s->native_handle(), lpBuffer, nNumberOfBytesToWrite, lpOverlapped->offset_64);

if (lpNumberOfBytesWritten)
*lpNumberOfBytesWritten = write_ret;
Expand All @@ -933,15 +939,15 @@ IOCP_DECL BOOL WriteFile(
op->overlapped_ptr = lpOverlapped;
lpOverlapped->Internal = reinterpret_cast<ULONG_PTR>(op);
op->CompletionKey = s->_completion_key;
op->last_error = errno;
op->last_error = write_ret <0 ? errno : 0;
op->NumberOfBytes = write_ret;

std::scoped_lock<std::mutex> l(iocp->result_mutex);
iocp->results_.emplace_back(op);

SetLastError(ERROR_IO_PENDING);

return write_ret > 0;
return FALSE;
}

asio::io_context SOCKET_emu_class::internal_fake_io_context;

0 comments on commit b14a87a

Please sign in to comment.