Skip to content

2 Processing Workflow

GV Top Coder edited this page Nov 18, 2021 · 1 revision

1 Processing worker app workflow

  1. Application starts a local gRPC processing service to allow other processing peers to communicate to it.
processingServiceServer = new ProcessingServiceServer("localHost:port");
processingServiceServer->Run();
  1. Application connects to the local GRPC processing service.
processingServiceClient = new ProcessingServiceClient("localHost:port");
  1. Application requests a list of available processing room names (IDs) from a database.
roomList = processingServiceClient->GetRoomList();
  1. Application sequentially tries to join a room from the room list.
foreach( room in roomList)
    if (processingServiceClient->RoomJoin(room) == OK)
         break;
  1. If a room is joined the application receives data to process and starts a local data processing.

Once a result is calculated the worker publishes it to the room.

if (processingServiceClient->RequestProcessingData("room", out data) == OK)
{
    result = Calculation();
    processingServiceClient->PublishProcessingResult(roomName, result);
}

NOTE: Since data processing is implemented on the application side it is easier to request a new processing sub-block from the application than initiate the processing from the room host.

  1. If the processing data request failed the worker replicates the room creator job. See details in Paragraph 2
if (processingServiceClient->RequestProcessingData("room", out data) != OK);
{
    // Becomes a room manager
}

2 Processing room workflow

Assumptions: any processing node participles in a single processing room simultaneously.

  1. If a room joining process failed the application should create a new processing room and manage the processing.
data = RetrieveDataToProcess();
dataSubBlocks = SplitDataToChunks();
processingServiceClient->PublishDataToProcess(roomName, dataSubBlocks);
  1. Once PublishDataToProcess() method is called the local gRPC service starts the room managing. Initially it notifies other peers that the room host was changed:
PublishDataRoProcess(roomName, dataSubBlock)
{
    SaveDataSublocks();
    foreach(peer in peerList);
       peer.processingService->OnHostChanged(currentPeerInfo);
}
  1. When any peer is joining the room, it sends the join request to the room Pub/Sub channel. The room host checks if the room peers limit is not reached and either accept the request or rejects it. See details in Paragraph 4
   RoomJoin(roomName, remotePeer)
   {
       channel = SubscribeToPubSubChannel(roomName);
       peerList = channel.GetlPeerList();
       if (peerList.size() <= 1)
          ROOM_DOES_NOT_EXIST;
       else
       {
           channel.Publish("Room join request")
           { CHECK_ASYNC_RESPONSE }
       }
  }
  
  // On room host side
  OnJoinRequestMessageReceived(message, peer)
  {
          if (peerList.size() < roomSizeLimit)
          {
              peerList.push_back(remotePeer);
              createDirectConnection(remotePeer);
              foreach(peer in peerList);
                   peer.processingService->OnPeerListChanged(peerList);

          }
 }
  1. Room host keeps connections to remote peers. If a connection error occurs the host removes the failed peer from the room peer list and notifies other peers about the list changes.
OnConnectionError(peer):
peerList.remove(peer);
peer.processingService->OnPeerListChanged(peerList);
  1. When a data sub-block is requested the host tracks the request time and peer. If a processing time limit reached but the corresponding result is not received the host excludes the peer from the processing room and notifies other peers about peer list change.

  2. If the room host leaves the room (during to a connection error for instance) any other peer from the room should be able to take responsibility. Thus when a worker requests data for processing another peer in the room should be notified about it.

RequestProcessingData(roomName, dataSubBlock)
{
    dataSubBlock = FindDataSubBlock(out subBlockID);
    foreach(peer in peerList);
       peer.processingService->OnSubBlockProcessingStarted(subBlockID, currentTime);
}

3 Results accumulation and verification

  1. On gPRC service side once a PublishProcessingResult(roomName, result) is called the service gets a list of peers connected to the room and sends the result all of them using direct gRPC connections.
   foreach(peer in peerList);
       peer.processingService->OnProcessed(result);
  1. TODO: if results should be accumulated on the DApp side it is necessary to decide how to pass them to DApp from the gRPC service. I haven’t found a way to declare streams in YAML while Proto supports them. A stream could be passed as an input parameter to a service call and results could be obtained from the stream on the DApp side.

4 Room join process

When a peer tries to join a room with a specified name (ID) it subscribes to the corresponding Pub/Sub channel with a topic=ID. Once it is subscribed it can get a list of peers subscribed to the same topic. If the list contains more than 1 peer, it means that the room exists. In the general case, not all peers subscribed to a channel can participate in processing because there is a limit of workers that can be included in a processing room. To join the room, it is necessary to publish a message to the channel:

"Room join request" Once a room host receives the message it decides if the peer can be joined or not. If the answer is "yes" the host is directly connected to the peer and sends it a list of the room peers.