celix-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [celix] rbulter commented on a change in pull request #279: Feature/tcp admin msg segmentation
Date Mon, 14 Dec 2020 14:38:35 GMT

rbulter commented on a change in pull request #279:
URL: https://github.com/apache/celix/pull/279#discussion_r542433864



##########
File path: bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
##########
@@ -1034,130 +1050,176 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message
             void *metadataData = NULL;
             size_t metadataSize = 0;
             if (message->metadata.metadata) {
-                metadataData = entry->metaBuffer;
-                handle->protocol->encodeMetadata(handle->protocol->handle, message,
-                                                 &metadataData,
-                                                 &metadataSize);
-                entry->metaBufferSize = metadataSize;
+                metadataSize = entry->writeMetaBufferSize;
+                metadataData = entry->writeMetaBuffer;
+                // When maxMsgSize is smaller then meta data is disabled
+                if (metadataSize > entry->maxMsgSize) {
+                    metadataSize = 0;
+                }
+                handle->protocol->encodeMetadata(handle->protocol->handle, message,
&metadataData, &metadataSize);
             }
+
             message->header.metadataSize = metadataSize;
+            size_t totalMsgSize = payloadSize + metadataSize;
+
+            size_t sendMsgSize = 0;
+            size_t msgPayloadOffset = 0;
+            size_t msgIovOffset     = 0;
+            bool allPayloadAdded = (payloadSize == 0);
+            long int nbytes = LONG_MAX;
+            while (sendMsgSize < totalMsgSize && nbytes > 0) {
+                struct msghdr msg;
+                struct iovec msg_iov[IOV_MAX];
+                memset(&msg, 0x00, sizeof(struct msghdr));
+                msg.msg_name = &entry->addr;
+                msg.msg_namelen = entry->len;
+                msg.msg_flags = flags;
+                msg.msg_iov = msg_iov;
+
+                size_t msgPartSize = 0;
+                message->header.payloadPartSize = 0;
+                message->header.payloadOffset = 0;
+                message->header.metadataSize = 0;
+                message->header.isLastSegment = 0;
+
+                size_t protocolHeaderBufferSize = 0;
+                // Get HeaderBufferSize of the Protocol Header, when headerBufferSize ==
0, the protocol header is included in the payload (needed for endpoints)
+                handle->protocol->getHeaderBufferSize(handle->protocol->handle,
&protocolHeaderBufferSize);
+                size_t footerSize = 0;
+                // Get size of the Protocol Footer
+                handle->protocol->getFooterSize(handle->protocol->handle, &footerSize);
+                size_t maxMsgSize = entry->maxMsgSize - protocolHeaderBufferSize - footerSize;
+
+                // reserve space for the header if required, header is added later when size
of message is known (message can split in parts)
+                if (protocolHeaderBufferSize) {
+                    msg.msg_iovlen++;
+                }
+                // Write generic seralized payload in vector buffer
+                if (!allPayloadAdded) {
+                    if (payloadSize && payloadData && maxMsgSize) {
+                        char *buffer = payloadData;
+                        msg.msg_iov[msg.msg_iovlen].iov_base = &buffer[msgPayloadOffset];
+                        msg.msg_iov[msg.msg_iovlen].iov_len = MIN((payloadSize - msgPayloadOffset),
maxMsgSize);
+                        msgPartSize += msg.msg_iov[msg.msg_iovlen].iov_len;
+                        msg.msg_iovlen++;
 
-            void *footerData = NULL;
-            size_t footerDataSize = 0;
-            if (entry->footerSize) {
-                footerData = entry->footerBuffer;
-                handle->protocol->encodeFooter(handle->protocol->handle, message,
-                                                 &footerData,
-                                                 &footerDataSize);
-                entry->footerSize = footerDataSize;
-            }
+                    } else {
+                        // copy serialized vector into vector buffer
+                        size_t i;
+                        for (i = msgIovOffset; i < MIN(msg_iov_len, msgIovOffset + max_msg_iov_len);
i++) {
+                            if ((msgPartSize + msgIoVec[i].iov_len) > maxMsgSize) {
+                                break;
+                            }
+                            msg.msg_iov[msg.msg_iovlen].iov_base = msgIoVec[i].iov_base;
+                            msg.msg_iov[msg.msg_iovlen].iov_len = msgIoVec[i].iov_len;
+                            msgPartSize += msg.msg_iov[msg.msg_iovlen].iov_len;
+                            msg.msg_iovlen++;
+                        }
+                        // if no entry could be added
+                        if (i == msgIovOffset) {
+                            // TODO element can be split in parts?
+                            L_ERROR("[TCP Socket] vector io element is larger than max msg
size");
+                            break;
+                        }
+                        msgIovOffset = i;
+                    }
+                    message->header.payloadPartSize = msgPartSize;
+                    message->header.payloadOffset   = msgPayloadOffset;
+                    msgPayloadOffset += message->header.payloadPartSize;
+                    sendMsgSize = msgPayloadOffset;
+                    allPayloadAdded= msgPayloadOffset >= payloadSize;
+                }
 
-            size_t msgSize = 0;
-            struct msghdr msg;
-            struct iovec msg_iov[IOV_MAX];
-            memset(&msg, 0x00, sizeof(struct msghdr));
-            msg.msg_name = &entry->addr;
-            msg.msg_namelen = entry->len;
-            msg.msg_flags = flags;
-            msg.msg_iov = msg_iov;
-
-            // Write generic seralized payload in vector buffer
-            if (payloadSize && payloadData) {
-                msg.msg_iovlen++;
-                msg.msg_iov[msg.msg_iovlen].iov_base = payloadData;
-                msg.msg_iov[msg.msg_iovlen].iov_len = payloadSize;
-                msgSize += msg.msg_iov[msg.msg_iovlen].iov_len;
-            } else {
-                // copy serialized vector into vector buffer
-                for (size_t i = 0; i < MIN(msg_iov_len, IOV_MAX - 2); i++) {
+                // Write optional metadata in vector buffer
+                if (allPayloadAdded &&
+                    (metadataSize != 0 && metadataData) &&
+                    (msgPartSize < maxMsgSize) &&
+                    (msg.msg_iovlen-1 < max_msg_iov_len)) {  // header is already included
+                    msg.msg_iov[msg.msg_iovlen].iov_base = metadataData;
+                    msg.msg_iov[msg.msg_iovlen].iov_len = metadataSize;
                     msg.msg_iovlen++;
-                    msg.msg_iov[msg.msg_iovlen].iov_base = msgIoVec[i].iov_base;
-                    msg.msg_iov[msg.msg_iovlen].iov_len = msgIoVec[i].iov_len;
-                    msgSize += msg.msg_iov[msg.msg_iovlen].iov_len;
+                    msgPartSize += metadataSize;
+                    message->header.metadataSize = metadataSize;
+                    sendMsgSize += metadataSize;
+                }
+                if (sendMsgSize >= totalMsgSize) {
+                    message->header.isLastSegment = 0x1;
                 }
-            }
-
-            // Write optional metadata in vector buffer
-            if (metadataSize && metadataData) {
-                msg.msg_iovlen++;
-                msg.msg_iov[msg.msg_iovlen].iov_base = metadataData;
-                msg.msg_iov[msg.msg_iovlen].iov_len = metadataSize;
-                msgSize += msg.msg_iov[msg.msg_iovlen].iov_len;
-            }
 
-            // Write optional footerData in vector buffer
-            if (footerData && footerDataSize) {
-                msg.msg_iovlen++;
-                msg.msg_iov[msg.msg_iovlen].iov_base = footerData;
-                msg.msg_iov[msg.msg_iovlen].iov_len = footerDataSize;
-                msgSize += msg.msg_iov[msg.msg_iovlen].iov_len;
-            }
+                void *headerData = NULL;
+                size_t headerSize = 0;
+                // Get HeaderSize of the Protocol Header
+                handle->protocol->getHeaderSize(handle->protocol->handle, &headerSize);
+
+                // check if header is not part of the payload (=> headerBufferSize = 0)
+                if (protocolHeaderBufferSize) {
+                    headerData = entry->writeHeaderBuffer;
+                    // Encode the header, with payload size and metadata size
+                    handle->protocol->encodeHeader(handle->protocol->handle,
message, &headerData, &headerSize);
+                    entry->writeHeaderBufferSize = MAX(headerSize, entry->writeHeaderBufferSize);
+                    if (headerData && entry->writeHeaderBuffer != headerData)
{
+                        entry->writeHeaderBuffer = headerData;
+                    }
+                    if (headerSize && headerData) {
+                        // Write header in 1st vector buffer item
+                        msg.msg_iov[0].iov_base = headerData;
+                        msg.msg_iov[0].iov_len = headerSize;
+                        msgPartSize += msg.msg_iov[0].iov_len;
+                    } else {
+                        L_ERROR("[TCP Socket] No header buffer is generated");
+                        break;
+                    }
+                }
 
-            void *headerData = NULL;
-            size_t headerSize = 0;
-            // check if header is not part of the payload (=> headerBufferSize = 0)s
-            if (entry->headerBufferSize) {
-              headerData = entry->headerBuffer;
-              // Encode the header, with payload size and metadata size
-              handle->protocol->encodeHeader(handle->protocol->handle, message,
-                                             &headerData,
-                                             &headerSize);
-              entry->headerBufferSize = headerSize;
-            }
-            if (!entry->headerBufferSize) {
-              // Skip header buffer, when header is part of payload;
-              msg.msg_iov = &msg_iov[1];
-            } else if (headerSize && headerData) {
-              // Write header in 1st vector buffer item
-                msg.msg_iov[0].iov_base = headerData;
-                msg.msg_iov[0].iov_len = headerSize;
-                msgSize += msg.msg_iov[0].iov_len;
-                msg.msg_iovlen++;
-            } else {
-              L_ERROR("[TCP Socket] No header buffer is generated");
-              msg.msg_iovlen = 0;
-            }
-            long int nbytes = pubsub_tcpHandler_writeSocket(handle, entry, &msg, msgSize,
flags);
-            //  When a specific socket keeps reporting errors can indicate a subscriber
-            //  which is not active anymore, the connection will remain until the retry
-            //  counter exceeds the maximum retry count.
-            //  Btw, also, SIGSTOP issued by a debugging tool can result in EINTR error.
-            if (nbytes == -1) {
-                if (entry->retryCount < handle->maxSendRetryCount) {
-                    entry->retryCount++;
-                    L_ERROR(
-                        "[TCP Socket] Failed to send message (fd: %d), error: %s. try again.
Retry count %u of %u, ",
-                        entry->fd, strerror(errno), entry->retryCount, handle->maxSendRetryCount);
-                } else {
-                    L_ERROR(
-                        "[TCP Socket] Failed to send message (fd: %d) after %u retries! Closing
connection... Error: %s",
-                        entry->fd, handle->maxSendRetryCount, strerror(errno));
-                    connFdCloseQueue[nofConnToClose++] = entry->fd;
+                void *footerData = NULL;
+                // Write optional footerData in vector buffer
+                if (footerSize) {
+                    footerData = entry->writeFooterBuffer;
+                    handle->protocol->encodeFooter(handle->protocol->handle,
message, &footerData, &footerSize);
+                    if (footerData && entry->writeFooterBuffer != footerData)
{
+                        entry->writeFooterBuffer = footerData;
+                        entry->writeFooterBufferSize = footerSize;
+                    }
+                    if (footerData) {
+                        msg.msg_iov[msg.msg_iovlen].iov_base = footerData;
+                        msg.msg_iov[msg.msg_iovlen].iov_len  = footerSize;
+                        msg.msg_iovlen++;
+                        msgPartSize += footerSize;
+                    }
                 }
-                result = -1; //At least one connection failed sending
-            } else if (msgSize) {
-                entry->retryCount = 0;
-                if (nbytes != msgSize) {
-                    L_ERROR("[TCP Socket] seq: %d MsgSize not correct: %d != %d (%s)\n",
message->header.seqNr, msgSize, nbytes,  strerror(errno));
+                nbytes = sendmsg(entry->fd, &msg, flags | MSG_NOSIGNAL);
+
+                //  When a specific socket keeps reporting errors can indicate a subscriber
+                //  which is not active anymore, the connection will remain until the retry
+                //  counter exceeds the maximum retry count.
+                //  Btw, also, SIGSTOP issued by a debugging tool can result in EINTR error.
+                if (nbytes == -1) {
+                    if (entry->retryCount < handle->maxSendRetryCount) {
+                        entry->retryCount++;
+                        L_ERROR(
+                            "[TCP Socket] Failed to send message (fd: %d), try again. Retry
count %u of %u, error(%d): %s.",
+                            entry->fd, entry->retryCount, handle->maxSendRetryCount,
errno, strerror(errno));
+                    } else {
+                        L_ERROR(
+                            "[TCP Socket] Failed to send message (fd: %d) after %u retries!
Closing connection... Error: %s", entry->fd, handle->maxSendRetryCount, strerror(errno));
+                        connFdCloseQueue[nofConnToClose++] = entry->fd;
+                    }
+                    result = -1; //At least one connection failed sending
+                } else if (msgPartSize) {
+                    entry->retryCount = 0;
+                    if (nbytes != msgPartSize) {
+                        L_ERROR("[TCP Socket] seq: %d MsgSize not correct: %d != %d (%s)\n",
message->header.seqNr, msgPartSize, nbytes, strerror(errno));
+                    }
+                }
+                // Note: serialized Payload is deleted by serializer
+                if (payloadData && (payloadData != message->payload.payload))
{
+                    free(payloadData);
                 }
             }
-            // Release data
-            if (headerData && headerData != entry->headerBuffer) {
-                free(headerData);
-            }
-            // Note: serialized Payload is deleted by serializer
-            if (payloadData && (payloadData != message->payload.payload)) {
-                free(payloadData);
-            }
-            if (metadataData && metadataData != entry->metaBuffer) {
-                free(metadataData);
-            }
-            if (footerData && footerData != entry->footerBuffer) {
-                free(footerData);
-            }
+            celixThreadMutex_unlock(&entry->writeMutex);
         }
+        celixThreadRwlock_unlock(&handle->dbLock);
     }
-    celixThreadRwlock_unlock(&handle->dbLock);
     //Force close all connections that are queued in a list, done outside of locking handle->dbLock
to prevent deadlock
     for (int i = 0; i < nofConnToClose; i++) {
         pubsub_tcpHandler_close(handle, connFdCloseQueue[i]);

Review comment:
       Solved, with previous one




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



Mime
View raw message