Skip to content

Commit

Permalink
- CS104 slave: remove message from queue when confirmation received (…
Browse files Browse the repository at this point in the history
…see #77)

- CS104 master: confirm all received I messages before sending STOPDT ACT or closing the connection
- CS104 master: add additional semaphore to protect write to connection socket
  • Loading branch information
mzillgith committed Feb 18, 2020
1 parent bc2f640 commit 651691a
Show file tree
Hide file tree
Showing 3 changed files with 215 additions and 100 deletions.
103 changes: 89 additions & 14 deletions lib60870-C/src/iec60870/cs104/cs104_connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,19 @@ static struct sCS101_AppLayerParameters defaultAppLayerParameters = {
#define HOST_NAME_MAX 64
#endif

typedef enum {
STATE_IDLE = 0,
STATE_INACTIVE = 1,
STATE_ACTIVE = 2,
STATE_WAITING_FOR_STARTDT_CON = 3,
STATE_WAITING_FOR_STOPDT_CON = 4
} CS104_ConState;

typedef struct {
uint64_t sentTime; /* required for T1 timeout */
int seqNo;
} SentASDU;


struct sCS104_Connection {
char hostname[HOST_NAME_MAX + 1];
int tcpPort;
Expand All @@ -86,6 +93,7 @@ struct sCS104_Connection {

#if (CONFIG_USE_SEMAPHORES == 1)
Semaphore sentASDUsLock;
Semaphore socketWriteLock;
#endif

#if (CONFIG_USE_THREADS == 1)
Expand All @@ -111,6 +119,8 @@ struct sCS104_Connection {
bool failure;
bool close;

CS104_ConState conState;

#if (CONFIG_CS104_SUPPORT_TLS == 1)
TLSConfiguration tlsConfig;
TLSSocket tlsSocket;
Expand Down Expand Up @@ -170,20 +180,32 @@ prepareSMessage(uint8_t* msg)
static void
sendSMessage(CS104_Connection self)
{
#if (CONFIG_USE_SEMAPHORES == 1)
Semaphore_wait(self->socketWriteLock);
#endif
uint8_t* msg = self->sMessage;

msg [4] = (uint8_t) ((self->receiveCount % 128) * 2);
msg [5] = (uint8_t) (self->receiveCount / 128);

writeToSocket(self, msg, 6);
#if (CONFIG_USE_SEMAPHORES == 1)
Semaphore_post(self->socketWriteLock);
#endif
}

static int
sendIMessage(CS104_Connection self, Frame frame)
{
T104Frame_prepareToSend((T104Frame) frame, self->sendCount, self->receiveCount);

#if (CONFIG_USE_SEMAPHORES == 1)
Semaphore_wait(self->socketWriteLock);
#endif
writeToSocket(self, T104Frame_getBuffer(frame), T104Frame_getMsgSize(frame));
#if (CONFIG_USE_SEMAPHORES == 1)
Semaphore_post(self->socketWriteLock);
#endif

self->sendCount = (self->sendCount + 1) % 32768;

Expand Down Expand Up @@ -215,6 +237,7 @@ createConnection(const char* hostname, int tcpPort)

#if (CONFIG_USE_SEMAPHORES == 1)
self->sentASDUsLock = Semaphore_create(1);
self->socketWriteLock = Semaphore_create(1);
#endif

#if (CONFIG_USE_THREADS == 1)
Expand All @@ -228,6 +251,8 @@ createConnection(const char* hostname, int tcpPort)

self->sentASDUs = NULL;

self->conState = STATE_IDLE;

prepareSMessage(self->sMessage);
}

Expand Down Expand Up @@ -295,6 +320,8 @@ resetConnection(CS104_Connection self)
self->outstandingTestFCConMessages = 0;
self->uMessageTimeout = 0;

self->conState = STATE_IDLE;

resetT3Timeout(self);
}

Expand Down Expand Up @@ -426,6 +453,7 @@ CS104_Connection_destroy(CS104_Connection self)

#if (CONFIG_USE_SEMAPHORES == 1)
Semaphore_destroy(self->sentASDUsLock);
Semaphore_destroy(self->socketWriteLock);
#endif

GLOBAL_FREEMEM(self);
Expand Down Expand Up @@ -558,6 +586,15 @@ checkConfirmTimeout(CS104_Connection self, uint64_t currentTime)
return false;
}

static void
confirmOutstandingMessages(CS104_Connection self)
{
self->lastConfirmationTime = Hal_getTimeInMs();
self->unconfirmedReceivedIMessages = 0;
self->timeoutT2Trigger = false;
sendSMessage(self);
}

static bool
checkMessage(CS104_Connection self, uint8_t* buffer, int msgSize)
{
Expand Down Expand Up @@ -610,25 +647,42 @@ checkMessage(CS104_Connection self, uint8_t* buffer, int msgSize)

if (buffer[2] == 0x43) { /* Check for TESTFR_ACT message */
DEBUG_PRINT("Send TESTFR_CON\n");
#if (CONFIG_USE_SEMAPHORES == 1)
Semaphore_wait(self->socketWriteLock);
#endif
writeToSocket(self, TESTFR_CON_MSG, TESTFR_CON_MSG_SIZE);
#if (CONFIG_USE_SEMAPHORES == 1)
Semaphore_post(self->socketWriteLock);
#endif
}
else if (buffer[2] == 0x83) { /* TESTFR_CON */
DEBUG_PRINT("Rcvd TESTFR_CON\n");
self->outstandingTestFCConMessages = 0;
}
else if (buffer[2] == 0x07) { /* STARTDT_ACT */
DEBUG_PRINT("Send STARTDT_CON\n");
#if (CONFIG_USE_SEMAPHORES == 1)
Semaphore_wait(self->socketWriteLock);
#endif
writeToSocket(self, STARTDT_CON_MSG, STARTDT_CON_MSG_SIZE);
#if (CONFIG_USE_SEMAPHORES == 1)
Semaphore_post(self->socketWriteLock);
#endif
self->conState = STATE_ACTIVE;
}
else if (buffer[2] == 0x0b) { /* STARTDT_CON */
DEBUG_PRINT("Received STARTDT_CON\n");

self->conState = STATE_ACTIVE;

if (self->connectionHandler != NULL)
self->connectionHandler(self->connectionHandlerParameter, self, CS104_CONNECTION_STARTDT_CON_RECEIVED);
}
else if (buffer[2] == 0x23) { /* STOPDT_CON */
DEBUG_PRINT("Received STOPDT_CON\n");

self->conState = STATE_INACTIVE;

if (self->connectionHandler != NULL)
self->connectionHandler(self->connectionHandlerParameter, self, CS104_CONNECTION_STOPDT_CON_RECEIVED);
}
Expand All @@ -643,7 +697,6 @@ checkMessage(CS104_Connection self, uint8_t* buffer, int msgSize)
return false;
}


resetT3Timeout(self);

return true;
Expand All @@ -667,8 +720,13 @@ handleTimeouts(CS104_Connection self)
}
else {
DEBUG_PRINT("U message T3 timeout\n");

#if (CONFIG_USE_SEMAPHORES == 1)
Semaphore_wait(self->socketWriteLock);
#endif
writeToSocket(self, TESTFR_ACT_MSG, TESTFR_ACT_MSG_SIZE);
#if (CONFIG_USE_SEMAPHORES == 1)
Semaphore_post(self->socketWriteLock);
#endif
self->uMessageTimeout = currentTime + (self->parameters.t1 * 1000);
self->outstandingTestFCConMessages++;

Expand All @@ -679,12 +737,7 @@ handleTimeouts(CS104_Connection self)
if (self->unconfirmedReceivedIMessages > 0) {

if (checkConfirmTimeout(self, currentTime)) {

self->lastConfirmationTime = currentTime;
self->unconfirmedReceivedIMessages = 0;
self->timeoutT2Trigger = false;

sendSMessage(self); /* send confirmation message */
confirmOutstandingMessages(self);
}
}

Expand Down Expand Up @@ -750,6 +803,8 @@ handleConnection(void* parameter)

if (self->running) {

self->conState = STATE_INACTIVE;

/* Call connection handler */
if (self->connectionHandler != NULL)
self->connectionHandler(self->connectionHandlerParameter, self, CS104_CONNECTION_OPENED);
Expand Down Expand Up @@ -783,11 +838,8 @@ handleConnection(void* parameter)
}
}

if (self->unconfirmedReceivedIMessages >= self->parameters.w) {
self->lastConfirmationTime = Hal_getTimeInMs();
self->unconfirmedReceivedIMessages = 0;
self->timeoutT2Trigger = false;
sendSMessage(self);
if ((self->unconfirmedReceivedIMessages >= self->parameters.w) || (self->conState == STATE_WAITING_FOR_STOPDT_CON)) {
confirmOutstandingMessages(self);
}
}

Expand All @@ -810,12 +862,19 @@ handleConnection(void* parameter)
self->failure = true;
}

/* Confirm all unconfirmed received I-messages before closing the connection */
if (self->unconfirmedReceivedIMessages > 0) {
confirmOutstandingMessages(self);
}

#if (CONFIG_CS104_SUPPORT_TLS == 1)
if (self->tlsSocket)
TLSSocket_close(self->tlsSocket);
#endif

Socket_destroy(self->socket);

self->conState = STATE_IDLE;
}
else {
DEBUG_PRINT("Failed to create socket\n");
Expand Down Expand Up @@ -918,13 +977,29 @@ encodeIOA(CS104_Connection self, Frame frame, int ioa)
void
CS104_Connection_sendStartDT(CS104_Connection self)
{
self->conState = STATE_WAITING_FOR_STARTDT_CON;
#if (CONFIG_USE_SEMAPHORES == 1)
Semaphore_wait(self->socketWriteLock);
#endif
writeToSocket(self, STARTDT_ACT_MSG, STARTDT_ACT_MSG_SIZE);
#if (CONFIG_USE_SEMAPHORES == 1)
Semaphore_post(self->socketWriteLock);
#endif
}

void
CS104_Connection_sendStopDT(CS104_Connection self)
{
confirmOutstandingMessages(self);

self->conState = STATE_WAITING_FOR_STOPDT_CON;
#if (CONFIG_USE_SEMAPHORES == 1)
Semaphore_wait(self->socketWriteLock);
#endif
writeToSocket(self, STOPDT_ACT_MSG, STOPDT_ACT_MSG_SIZE);
#if (CONFIG_USE_SEMAPHORES == 1)
Semaphore_post(self->socketWriteLock);
#endif
}

static void
Expand Down
Loading

0 comments on commit 651691a

Please sign in to comment.