celix-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [celix] Oipo commented on a change in pull request #279: Feature/tcp admin msg segmentation
Date Mon, 16 Nov 2020 13:59:06 GMT

Oipo commented on a change in pull request #279:
URL: https://github.com/apache/celix/pull/279#discussion_r522991186



##########
File path: bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_common.c
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <stdio.h>
+#include <string.h>
+#include "pubsub_tcp_common.h"
+
+
+bool psa_tcp_isPassive(const char* buffer) {
+    bool isPassive = false;
+    // Parse Properties
+    if (buffer != NULL) {
+        char buf[32];
+        snprintf(buf, 32, "%s", buffer);
+        char *trimmed = utils_stringTrim(buf);
+        if (strncasecmp("true", trimmed, strlen("true")) == 0) {
+            isPassive = true;
+        } else if (strncasecmp("false", trimmed, strlen("false")) == 0) {

Review comment:
       This else if branch is not needed, since `isPassive` is false by default.

##########
File path: bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h
##########
@@ -23,19 +23,20 @@
 #define PSA_TCP_BASE_PORT                       "PSA_TCP_BASE_PORT"
 #define PSA_TCP_MAX_PORT                        "PSA_TCP_MAX_PORT"
 
-#define PSA_TCP_MAX_RECV_SESSIONS               "PSA_TCP_MAX_RECV_SESSIONS"
+#define PSA_TCP_MAX_MESSAGE_SIZE                "PSA_TCP_MAX_MESSAGE_SIZE"
 #define PSA_TCP_RECV_BUFFER_SIZE                "PSA_TCP_RECV_BUFFER_SIZE"
 #define PSA_TCP_TIMEOUT                         "PSA_TCP_TIMEOUT"
 #define PSA_TCP_SUBSCRIBER_CONNECTION_TIMEOUT   "PSA_TCP_SUBSCRIBER_CONNECTION_TIMEOUT"
+#define PSA_TCP_SEND_DELAY                      "PSA_TCP_SEND_DELAY"

Review comment:
       Please rename to `PSA_FIRST_SEND_DELAY` and move to pubsub_utils, as all pubsub admins
can use it.

##########
File path: bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
##########
@@ -752,35 +747,116 @@ void pubsub_tcpHandler_setReceiveTimeOut(pubsub_tcpHandler_t *handle,
double tim
     }
 }
 
-static inline
-int pubsub_tcpHandler_readSocket(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t
*entry, int fd, void* _buffer, unsigned int offset, unsigned int size, int flag ) {
-    int expectedReadSize = size;
-    int nbytes = size;
-    int msgSize = 0;
-    char* buffer = (char*)_buffer;
-    while (nbytes > 0 && expectedReadSize > 0) {
-        // Read the message header
-        nbytes = recv(fd, &buffer[offset], expectedReadSize, flag | MSG_NOSIGNAL);
-        // Update buffer administration
-        offset += nbytes;
-        expectedReadSize -= nbytes;
-        msgSize += nbytes;
+void pubsub_tcpHandler_enableReceiveEvent(pubsub_tcpHandler_t *handle,bool enable) {
+    if (handle != NULL) {
+        celixThreadRwlock_writeLock(&handle->dbLock);
+        handle->enableReceiveEvent = enable;
+        celixThreadRwlock_unlock(&handle->dbLock);
+    }
+}
+
+static inline long int pubsub_tcpHandler_getMsgSize(psa_tcp_connection_entry_t *entry) {
+    // Note header message is already read
+    return (long int)entry->header.header.payloadPartSize + (long int)entry->header.header.metadataSize
+ (long int)entry->readFooterSize;
+}
+
+static inline 
+bool pubsub_tcpHandler_readHeader(pubsub_tcpHandler_t *handle, int fd, psa_tcp_connection_entry_t
*entry, long int* msgSize) {
+    bool result = false;
+    size_t syncSize = 0;
+    size_t protocolHeaderBufferSize = 0;
+    // Get Sync Size
+    handle->protocol->getSyncHeaderSize(handle->protocol->handle, &syncSize);
+    // Get HeaderSize of the Protocol Header
+    handle->protocol->getHeaderSize(handle->protocol->handle, &entry->readHeaderSize);
+    // Get HeaderBufferSize of the Protocol Header, when headerBufferSize == 0, the protocol
header is included in the payload (needed for endpoints)
+    handle->protocol->getHeaderBufferSize(handle->protocol->handle, &protocolHeaderBufferSize);
+
+    // Ensure capacity in header buffer
+    pubsub_tcpHandler_ensureReadBufferCapacity(handle, entry);
+
+    entry->readMsg.msg_iovlen = 0;
+    entry->readMsg.msg_iov[entry->readMsg.msg_iovlen].iov_base = entry->readHeaderBuffer;
+    entry->readMsg.msg_iov[entry->readMsg.msg_iovlen].iov_len  = entry->readHeaderBufferSize;
+    entry->readMsg.msg_iovlen++;
+
+    // Read the message
+    long int nbytes = 0;
+    // Use peek flag to find sync word or when header is part of the payload
+    unsigned int flag = (entry->headerError || (!protocolHeaderBufferSize)) ? MSG_PEEK
: 0;
+    if (entry->readHeaderSize) nbytes = recvmsg(fd, &(entry->readMsg), MSG_NOSIGNAL
| MSG_WAITALL | flag);
+    if (nbytes >= entry->readHeaderSize) {
+        if (handle->protocol->decodeHeader(handle->protocol->handle,
+                                           entry->readMsg.msg_iov[0].iov_base,
+                                           entry->readMsg.msg_iov[0].iov_len,
+                                           &entry->header) == CELIX_SUCCESS) {
+            // read header from queue, when recovered from headerError and when header is
not part of the payload. (Because of MSG_PEEK)
+            if (entry->headerError && protocolHeaderBufferSize && entry->readHeaderSize)
nbytes = recvmsg(fd, &(entry->readMsg), MSG_NOSIGNAL | MSG_WAITALL);

Review comment:
       Please add curly braces here, as it is confusing which line the if affects.

##########
File path: bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
##########
@@ -78,18 +74,27 @@ typedef struct psa_tcp_connection_entry {
     bool connected;
     bool headerError;
     pubsub_protocol_message_t header;
-    unsigned int syncSize;
-    unsigned int headerSize;
-    unsigned int headerBufferSize; // Size of headerBuffer, size = 0, no headerBuffer ->
included in payload
-    void *headerBuffer;
-    unsigned int footerSize;
-    void *footerBuffer;
-    unsigned int bufferSize;
+    size_t maxMsgSize;
+    size_t readHeaderSize;
+    size_t readHeaderBufferSize; // Size of headerBuffer
+    void *readHeaderBuffer;
+    size_t writeHeaderBufferSize; // Size of headerBuffer
+    void *writeHeaderBuffer;
+    size_t readFooterSize;
+    size_t readFooterBufferSize;
+    void *readFooterBuffer;
+    size_t writeFooterBufferSize;
+    void *writeFooterBuffer;
+    size_t bufferSize;
     void *buffer;
-    unsigned int bufferReadSize;
-    unsigned int metaBufferSize;
-    void *metaBuffer;
+    size_t readMetaBufferSize;
+    void *readMetaBuffer;
+    size_t writeMetaBufferSize;
+    void *writeMetaBuffer;
     unsigned int retryCount;
+    celix_thread_mutex_t writeMutex;
+    celix_thread_mutex_t readMutex;

Review comment:
       The readMutex is only used in `pubsub_tcpHandler_read`, meaning it is never used concurrently.
Please remove it.

##########
File path: bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
##########
@@ -1034,130 +1050,176 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message
             void *metadataData = NULL;
             size_t metadataSize = 0;
             if (message->metadata.metadata) {
-                metadataData = entry->metaBuffer;
-                handle->protocol->encodeMetadata(handle->protocol->handle, message,
-                                                 &metadataData,
-                                                 &metadataSize);
-                entry->metaBufferSize = metadataSize;
+                metadataSize = entry->writeMetaBufferSize;
+                metadataData = entry->writeMetaBuffer;
+                // When maxMsgSize is smaller then meta data is disabled
+                if (metadataSize > entry->maxMsgSize) {
+                    metadataSize = 0;
+                }
+                handle->protocol->encodeMetadata(handle->protocol->handle, message,
&metadataData, &metadataSize);
             }
+
             message->header.metadataSize = metadataSize;
+            size_t totalMsgSize = payloadSize + metadataSize;
+
+            size_t sendMsgSize = 0;
+            size_t msgPayloadOffset = 0;
+            size_t msgIovOffset     = 0;
+            bool allPayloadAdded = (payloadSize == 0);
+            long int nbytes = LONG_MAX;
+            while (sendMsgSize < totalMsgSize && nbytes > 0) {
+                struct msghdr msg;
+                struct iovec msg_iov[IOV_MAX];
+                memset(&msg, 0x00, sizeof(struct msghdr));
+                msg.msg_name = &entry->addr;
+                msg.msg_namelen = entry->len;
+                msg.msg_flags = flags;
+                msg.msg_iov = msg_iov;
+
+                size_t msgPartSize = 0;
+                message->header.payloadPartSize = 0;
+                message->header.payloadOffset = 0;
+                message->header.metadataSize = 0;
+                message->header.isLastSegment = 0;
+
+                size_t protocolHeaderBufferSize = 0;
+                // Get HeaderBufferSize of the Protocol Header, when headerBufferSize ==
0, the protocol header is included in the payload (needed for endpoints)
+                handle->protocol->getHeaderBufferSize(handle->protocol->handle,
&protocolHeaderBufferSize);
+                size_t footerSize = 0;
+                // Get size of the Protocol Footer
+                handle->protocol->getFooterSize(handle->protocol->handle, &footerSize);
+                size_t maxMsgSize = entry->maxMsgSize - protocolHeaderBufferSize - footerSize;
+
+                // reserve space for the header if required, header is added later when size
of message is known (message can split in parts)
+                if (protocolHeaderBufferSize) {
+                    msg.msg_iovlen++;
+                }
+                // Write generic seralized payload in vector buffer
+                if (!allPayloadAdded) {
+                    if (payloadSize && payloadData && maxMsgSize) {
+                        char *buffer = payloadData;
+                        msg.msg_iov[msg.msg_iovlen].iov_base = &buffer[msgPayloadOffset];
+                        msg.msg_iov[msg.msg_iovlen].iov_len = MIN((payloadSize - msgPayloadOffset),
maxMsgSize);
+                        msgPartSize += msg.msg_iov[msg.msg_iovlen].iov_len;
+                        msg.msg_iovlen++;
 
-            void *footerData = NULL;
-            size_t footerDataSize = 0;
-            if (entry->footerSize) {
-                footerData = entry->footerBuffer;
-                handle->protocol->encodeFooter(handle->protocol->handle, message,
-                                                 &footerData,
-                                                 &footerDataSize);
-                entry->footerSize = footerDataSize;
-            }
+                    } else {
+                        // copy serialized vector into vector buffer
+                        size_t i;
+                        for (i = msgIovOffset; i < MIN(msg_iov_len, msgIovOffset + max_msg_iov_len);
i++) {
+                            if ((msgPartSize + msgIoVec[i].iov_len) > maxMsgSize) {
+                                break;
+                            }
+                            msg.msg_iov[msg.msg_iovlen].iov_base = msgIoVec[i].iov_base;
+                            msg.msg_iov[msg.msg_iovlen].iov_len = msgIoVec[i].iov_len;
+                            msgPartSize += msg.msg_iov[msg.msg_iovlen].iov_len;
+                            msg.msg_iovlen++;
+                        }
+                        // if no entry could be added
+                        if (i == msgIovOffset) {
+                            // TODO element can be split in parts?
+                            L_ERROR("[TCP Socket] vector io element is larger than max msg
size");
+                            break;
+                        }
+                        msgIovOffset = i;
+                    }
+                    message->header.payloadPartSize = msgPartSize;
+                    message->header.payloadOffset   = msgPayloadOffset;
+                    msgPayloadOffset += message->header.payloadPartSize;
+                    sendMsgSize = msgPayloadOffset;
+                    allPayloadAdded= msgPayloadOffset >= payloadSize;
+                }
 
-            size_t msgSize = 0;
-            struct msghdr msg;
-            struct iovec msg_iov[IOV_MAX];
-            memset(&msg, 0x00, sizeof(struct msghdr));
-            msg.msg_name = &entry->addr;
-            msg.msg_namelen = entry->len;
-            msg.msg_flags = flags;
-            msg.msg_iov = msg_iov;
-
-            // Write generic seralized payload in vector buffer
-            if (payloadSize && payloadData) {
-                msg.msg_iovlen++;
-                msg.msg_iov[msg.msg_iovlen].iov_base = payloadData;
-                msg.msg_iov[msg.msg_iovlen].iov_len = payloadSize;
-                msgSize += msg.msg_iov[msg.msg_iovlen].iov_len;
-            } else {
-                // copy serialized vector into vector buffer
-                for (size_t i = 0; i < MIN(msg_iov_len, IOV_MAX - 2); i++) {
+                // Write optional metadata in vector buffer
+                if (allPayloadAdded &&
+                    (metadataSize != 0 && metadataData) &&
+                    (msgPartSize < maxMsgSize) &&
+                    (msg.msg_iovlen-1 < max_msg_iov_len)) {  // header is already included
+                    msg.msg_iov[msg.msg_iovlen].iov_base = metadataData;
+                    msg.msg_iov[msg.msg_iovlen].iov_len = metadataSize;
                     msg.msg_iovlen++;
-                    msg.msg_iov[msg.msg_iovlen].iov_base = msgIoVec[i].iov_base;
-                    msg.msg_iov[msg.msg_iovlen].iov_len = msgIoVec[i].iov_len;
-                    msgSize += msg.msg_iov[msg.msg_iovlen].iov_len;
+                    msgPartSize += metadataSize;
+                    message->header.metadataSize = metadataSize;
+                    sendMsgSize += metadataSize;
+                }
+                if (sendMsgSize >= totalMsgSize) {
+                    message->header.isLastSegment = 0x1;
                 }
-            }
-
-            // Write optional metadata in vector buffer
-            if (metadataSize && metadataData) {
-                msg.msg_iovlen++;
-                msg.msg_iov[msg.msg_iovlen].iov_base = metadataData;
-                msg.msg_iov[msg.msg_iovlen].iov_len = metadataSize;
-                msgSize += msg.msg_iov[msg.msg_iovlen].iov_len;
-            }
 
-            // Write optional footerData in vector buffer
-            if (footerData && footerDataSize) {
-                msg.msg_iovlen++;
-                msg.msg_iov[msg.msg_iovlen].iov_base = footerData;
-                msg.msg_iov[msg.msg_iovlen].iov_len = footerDataSize;
-                msgSize += msg.msg_iov[msg.msg_iovlen].iov_len;
-            }
+                void *headerData = NULL;
+                size_t headerSize = 0;
+                // Get HeaderSize of the Protocol Header
+                handle->protocol->getHeaderSize(handle->protocol->handle, &headerSize);
+
+                // check if header is not part of the payload (=> headerBufferSize = 0)
+                if (protocolHeaderBufferSize) {
+                    headerData = entry->writeHeaderBuffer;
+                    // Encode the header, with payload size and metadata size
+                    handle->protocol->encodeHeader(handle->protocol->handle,
message, &headerData, &headerSize);
+                    entry->writeHeaderBufferSize = MAX(headerSize, entry->writeHeaderBufferSize);
+                    if (headerData && entry->writeHeaderBuffer != headerData)
{
+                        entry->writeHeaderBuffer = headerData;
+                    }
+                    if (headerSize && headerData) {
+                        // Write header in 1st vector buffer item
+                        msg.msg_iov[0].iov_base = headerData;
+                        msg.msg_iov[0].iov_len = headerSize;
+                        msgPartSize += msg.msg_iov[0].iov_len;
+                    } else {
+                        L_ERROR("[TCP Socket] No header buffer is generated");
+                        break;
+                    }
+                }
 
-            void *headerData = NULL;
-            size_t headerSize = 0;
-            // check if header is not part of the payload (=> headerBufferSize = 0)s
-            if (entry->headerBufferSize) {
-              headerData = entry->headerBuffer;
-              // Encode the header, with payload size and metadata size
-              handle->protocol->encodeHeader(handle->protocol->handle, message,
-                                             &headerData,
-                                             &headerSize);
-              entry->headerBufferSize = headerSize;
-            }
-            if (!entry->headerBufferSize) {
-              // Skip header buffer, when header is part of payload;
-              msg.msg_iov = &msg_iov[1];
-            } else if (headerSize && headerData) {
-              // Write header in 1st vector buffer item
-                msg.msg_iov[0].iov_base = headerData;
-                msg.msg_iov[0].iov_len = headerSize;
-                msgSize += msg.msg_iov[0].iov_len;
-                msg.msg_iovlen++;
-            } else {
-              L_ERROR("[TCP Socket] No header buffer is generated");
-              msg.msg_iovlen = 0;
-            }
-            long int nbytes = pubsub_tcpHandler_writeSocket(handle, entry, &msg, msgSize,
flags);
-            //  When a specific socket keeps reporting errors can indicate a subscriber
-            //  which is not active anymore, the connection will remain until the retry
-            //  counter exceeds the maximum retry count.
-            //  Btw, also, SIGSTOP issued by a debugging tool can result in EINTR error.
-            if (nbytes == -1) {
-                if (entry->retryCount < handle->maxSendRetryCount) {
-                    entry->retryCount++;
-                    L_ERROR(
-                        "[TCP Socket] Failed to send message (fd: %d), error: %s. try again.
Retry count %u of %u, ",
-                        entry->fd, strerror(errno), entry->retryCount, handle->maxSendRetryCount);
-                } else {
-                    L_ERROR(
-                        "[TCP Socket] Failed to send message (fd: %d) after %u retries! Closing
connection... Error: %s",
-                        entry->fd, handle->maxSendRetryCount, strerror(errno));
-                    connFdCloseQueue[nofConnToClose++] = entry->fd;
+                void *footerData = NULL;
+                // Write optional footerData in vector buffer
+                if (footerSize) {
+                    footerData = entry->writeFooterBuffer;
+                    handle->protocol->encodeFooter(handle->protocol->handle,
message, &footerData, &footerSize);
+                    if (footerData && entry->writeFooterBuffer != footerData)
{
+                        entry->writeFooterBuffer = footerData;
+                        entry->writeFooterBufferSize = footerSize;
+                    }
+                    if (footerData) {
+                        msg.msg_iov[msg.msg_iovlen].iov_base = footerData;
+                        msg.msg_iov[msg.msg_iovlen].iov_len  = footerSize;
+                        msg.msg_iovlen++;
+                        msgPartSize += footerSize;
+                    }
                 }
-                result = -1; //At least one connection failed sending
-            } else if (msgSize) {
-                entry->retryCount = 0;
-                if (nbytes != msgSize) {
-                    L_ERROR("[TCP Socket] seq: %d MsgSize not correct: %d != %d (%s)\n",
message->header.seqNr, msgSize, nbytes,  strerror(errno));
+                nbytes = sendmsg(entry->fd, &msg, flags | MSG_NOSIGNAL);
+
+                //  When a specific socket keeps reporting errors can indicate a subscriber
+                //  which is not active anymore, the connection will remain until the retry
+                //  counter exceeds the maximum retry count.
+                //  Btw, also, SIGSTOP issued by a debugging tool can result in EINTR error.
+                if (nbytes == -1) {
+                    if (entry->retryCount < handle->maxSendRetryCount) {
+                        entry->retryCount++;
+                        L_ERROR(
+                            "[TCP Socket] Failed to send message (fd: %d), try again. Retry
count %u of %u, error(%d): %s.",
+                            entry->fd, entry->retryCount, handle->maxSendRetryCount,
errno, strerror(errno));
+                    } else {
+                        L_ERROR(
+                            "[TCP Socket] Failed to send message (fd: %d) after %u retries!
Closing connection... Error: %s", entry->fd, handle->maxSendRetryCount, strerror(errno));
+                        connFdCloseQueue[nofConnToClose++] = entry->fd;
+                    }
+                    result = -1; //At least one connection failed sending
+                } else if (msgPartSize) {
+                    entry->retryCount = 0;
+                    if (nbytes != msgPartSize) {
+                        L_ERROR("[TCP Socket] seq: %d MsgSize not correct: %d != %d (%s)\n",
message->header.seqNr, msgPartSize, nbytes, strerror(errno));
+                    }
+                }
+                // Note: serialized Payload is deleted by serializer
+                if (payloadData && (payloadData != message->payload.payload))
{
+                    free(payloadData);
                 }
             }
-            // Release data
-            if (headerData && headerData != entry->headerBuffer) {
-                free(headerData);
-            }
-            // Note: serialized Payload is deleted by serializer
-            if (payloadData && (payloadData != message->payload.payload)) {
-                free(payloadData);
-            }
-            if (metadataData && metadataData != entry->metaBuffer) {
-                free(metadataData);
-            }
-            if (footerData && footerData != entry->footerBuffer) {
-                free(footerData);
-            }
+            celixThreadMutex_unlock(&entry->writeMutex);
         }
+        celixThreadRwlock_unlock(&handle->dbLock);
     }
-    celixThreadRwlock_unlock(&handle->dbLock);
     //Force close all connections that are queued in a list, done outside of locking handle->dbLock
to prevent deadlock
     for (int i = 0; i < nofConnToClose; i++) {
         pubsub_tcpHandler_close(handle, connFdCloseQueue[i]);

Review comment:
       Potential NULL dereference on `handle`

##########
File path: bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h
##########
@@ -23,19 +23,20 @@
 #define PSA_TCP_BASE_PORT                       "PSA_TCP_BASE_PORT"
 #define PSA_TCP_MAX_PORT                        "PSA_TCP_MAX_PORT"
 
-#define PSA_TCP_MAX_RECV_SESSIONS               "PSA_TCP_MAX_RECV_SESSIONS"
+#define PSA_TCP_MAX_MESSAGE_SIZE                "PSA_TCP_MAX_MESSAGE_SIZE"
 #define PSA_TCP_RECV_BUFFER_SIZE                "PSA_TCP_RECV_BUFFER_SIZE"
 #define PSA_TCP_TIMEOUT                         "PSA_TCP_TIMEOUT"
 #define PSA_TCP_SUBSCRIBER_CONNECTION_TIMEOUT   "PSA_TCP_SUBSCRIBER_CONNECTION_TIMEOUT"
+#define PSA_TCP_SEND_DELAY                      "PSA_TCP_SEND_DELAY"
 
 #define PSA_TCP_DEFAULT_BASE_PORT               5501
 #define PSA_TCP_DEFAULT_MAX_PORT                6000
 
-#define PSA_TCP_DEFAULT_MAX_RECV_SESSIONS       1
-
+#define PSA_TCP_DEFAULT_MAX_MESSAGE_SIZE        UINT32_MAX
 #define PSA_TCP_DEFAULT_RECV_BUFFER_SIZE        65 * 1024
 #define PSA_TCP_DEFAULT_TIMEOUT                 2000 // 2 seconds
 #define PSA_TCP_SUBSCRIBER_CONNECTION_DEFAULT_TIMEOUT 250 // 250 ms
+#define PSA_TCP_DEFAULT_SEND_DELAY              250 //  250 ms

Review comment:
       Please rename to `PSA_DEFAULT_FIRST_SEND_DELAY` and move to pubsub_utils, as all pubsub
admins can use it.

##########
File path: bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
##########
@@ -144,119 +146,108 @@ pubsub_tcp_topic_sender_t *pubsub_tcpTopicSender_create(
     if (uuid != NULL) {
         uuid_parse(uuid, sender->fwUUID);
     }
-    sender->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_TCP_METRICS_ENABLED,
-                                                                   PSA_TCP_DEFAULT_METRICS_ENABLED);
-    bool isEndpoint = false;
+    pubsubInterceptorsHandler_create(ctx, scope, topic, &sender->interceptorsHandler);
+    sender->isPassive = false;
+    sender->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_TCP_METRICS_ENABLED,
PSA_TCP_DEFAULT_METRICS_ENABLED);
     char *urls = NULL;
     const char *ip = celix_bundleContext_getProperty(ctx, PUBSUB_TCP_PSA_IP_KEY, NULL);
-    const char *discUrl = NULL;
-    const char *staticClientEndPointUrls = NULL;
-    const char *staticServerEndPointUrls = NULL;
-
-    discUrl = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_STATIC_BIND_URL_FOR,
topic, scope);
+    const char *discUrl = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_STATIC_BIND_URL_FOR,
topic, scope);
+    const char *isPassive = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_PASSIVE_ENABLED,
topic, scope);
+    const char *passiveKey = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_PASSIVE_SELECTION_KEY,
topic, scope);
 
     if (topicProperties != NULL) {
         if (discUrl == NULL) {
             discUrl = celix_properties_get(topicProperties, PUBSUB_TCP_STATIC_DISCOVER_URL,
NULL);
         }
-        /* Check if it's a static endpoint */
-        const char *endPointType = celix_properties_get(topicProperties, PUBSUB_TCP_STATIC_ENDPOINT_TYPE,
NULL);
-        if (endPointType != NULL) {
-            isEndpoint = true;
-            if (strncmp(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_CLIENT, endPointType,
-                        strlen(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_CLIENT)) == 0) {
-                staticClientEndPointUrls = celix_properties_get(topicProperties, PUBSUB_TCP_STATIC_CONNECT_URLS,
NULL);
-            }
-            if (strncmp(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_SERVER, endPointType,
-                        strlen(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_SERVER)) == 0) {
-                staticServerEndPointUrls = discUrl;
-            }
+        if (isPassive == NULL) {
+            isPassive = celix_properties_get(topicProperties, PUBSUB_TCP_PASSIVE_CONFIGURED,
NULL);
+        }
+        if (passiveKey == NULL) {
+            passiveKey = celix_properties_get(topicProperties, PUBSUB_TCP_PASSIVE_KEY, NULL);
         }
     }
+    sender->isPassive = psa_tcp_isPassive(isPassive);
 
     /* When it's an endpoint share the socket with the receiver */
-    if ((staticClientEndPointUrls != NULL) || (staticServerEndPointUrls)) {
-        celixThreadMutex_lock(&endPointStore->mutex);
-        const char *endPointUrl = (staticClientEndPointUrls) ? staticClientEndPointUrls :
staticServerEndPointUrls;
-        pubsub_tcpHandler_t *entry = hashMap_get(endPointStore->map, endPointUrl);
+    if (passiveKey != NULL) {
+        celixThreadMutex_lock(&handlerStore->mutex);
+        pubsub_tcpHandler_t *entry = hashMap_get(handlerStore->map, passiveKey);
         if (entry == NULL) {
             if (sender->socketHandler == NULL)
                 sender->socketHandler = pubsub_tcpHandler_create(sender->protocol,
sender->logHelper);
             entry = sender->socketHandler;
             sender->sharedSocketHandler = sender->socketHandler;
-            hashMap_put(endPointStore->map, (void *) endPointUrl, entry);
+            hashMap_put(handlerStore->map, (void *) passiveKey, entry);
         } else {
             sender->socketHandler = entry;
             sender->sharedSocketHandler = entry;
         }
-        celixThreadMutex_unlock(&endPointStore->mutex);
+        celixThreadMutex_unlock(&handlerStore->mutex);
     } else {
         sender->socketHandler = pubsub_tcpHandler_create(sender->protocol, sender->logHelper);
     }
 
     if ((sender->socketHandler != NULL) && (topicProperties != NULL)) {
         long prio = celix_properties_getAsLong(topicProperties, PUBSUB_TCP_THREAD_REALTIME_PRIO,
-1L);
         const char *sched = celix_properties_get(topicProperties, PUBSUB_TCP_THREAD_REALTIME_SCHED,
NULL);
-        long retryCnt = celix_properties_getAsLong(topicProperties, PUBSUB_TCP_PUBLISHER_RETRY_CNT_KEY,
-                                                   PUBSUB_TCP_PUBLISHER_RETRY_CNT_DEFAULT);
-        double timeout = celix_properties_getAsDouble(topicProperties, PUBSUB_TCP_PUBLISHER_SNDTIMEO_KEY,
-                                                                       (!isEndpoint) ? PUBSUB_TCP_PUBLISHER_SNDTIMEO_DEFAULT
:
-                                                                                       PUBSUB_TCP_PUBLISHER_SNDTIMEO_ENDPOINT_DEFAULT);
+        long retryCnt = celix_properties_getAsLong(topicProperties, PUBSUB_TCP_PUBLISHER_RETRY_CNT_KEY,
PUBSUB_TCP_PUBLISHER_RETRY_CNT_DEFAULT);
+        double sendTimeout = celix_properties_getAsDouble(topicProperties, PUBSUB_TCP_PUBLISHER_SNDTIMEO_KEY,
PUBSUB_TCP_PUBLISHER_SNDTIMEO_DEFAULT);
+        long maxMsgSize = celix_properties_getAsLong(topicProperties, PSA_TCP_MAX_MESSAGE_SIZE,
PSA_TCP_DEFAULT_MAX_MESSAGE_SIZE);
+        long timeout = celix_bundleContext_getPropertyAsLong(ctx, PSA_TCP_TIMEOUT, PSA_TCP_DEFAULT_TIMEOUT);
+        sender->send_delay = celix_bundleContext_getPropertyAsLong(ctx,  PSA_TCP_SEND_DELAY,
PSA_TCP_DEFAULT_SEND_DELAY);
         pubsub_tcpHandler_setThreadName(sender->socketHandler, topic, scope);
         pubsub_tcpHandler_setThreadPriority(sender->socketHandler, prio, sched);
         pubsub_tcpHandler_setSendRetryCnt(sender->socketHandler, (unsigned int) retryCnt);
-        pubsub_tcpHandler_setSendTimeOut(sender->socketHandler, timeout);
+        pubsub_tcpHandler_setSendTimeOut(sender->socketHandler, sendTimeout);
+        pubsub_tcpHandler_setMaxMsgSize(sender->socketHandler, (unsigned int) maxMsgSize);
+        pubsub_tcpHandler_enableReceiveEvent(sender->socketHandler, (passiveKey) ? true
: false);

Review comment:
       The receive event is used to enable `EPOLLIN`, but for a sender I would expect it to
enable `EPOLLOUT`. What's the purpose of enabling it here?

##########
File path: bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
##########
@@ -628,16 +615,24 @@ int pubsub_tcpHandler_listen(pubsub_tcpHandler_t *handle, char *url)
{
 }
 
 //
-// Setup buffer sizes
+// Setup receive buffer size
+//
+int pubsub_tcpHandler_setReceiveBufferSize(pubsub_tcpHandler_t *handle, unsigned int size)
{
+    if (handle != NULL) {
+        celixThreadRwlock_writeLock(&handle->dbLock);
+        handle->bufferSize = size;

Review comment:
       I would expect that the `handle->buffer` here also gets resized accordingly. However,
I see that `pubsub_tcpHandler_ensureReadBufferCapacity` gets called. Please add a comment
here explaining why `handle->buffer` is left alone here.

##########
File path: bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
##########
@@ -374,6 +351,17 @@ long pubsub_tcpTopicReceiver_protocolSvcId(pubsub_tcp_topic_receiver_t
*receiver
 void pubsub_tcpTopicReceiver_listConnections(pubsub_tcp_topic_receiver_t *receiver, celix_array_list_t
*connectedUrls,
                                              celix_array_list_t *unconnectedUrls) {
     celixThreadMutex_lock(&receiver->requestedConnections.mutex);
+    if (receiver->isPassive) {
+        char* interface_url = pubsub_tcpHandler_get_interface_url(receiver->socketHandler);
+        char *url = NULL;
+        asprintf(&url, "%s (passive)", interface_url ? interface_url : "");
+        if (interface_url) {
+            celix_arrayList_add(connectedUrls, url);
+        } else {
+            celix_arrayList_add(unconnectedUrls, url);
+        }
+        free(interface_url);
+    } else {
     hash_map_iterator_t iter = hashMapIterator_construct(receiver->requestedConnections.map);

Review comment:
       Please indent this else branch

##########
File path: bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
##########
@@ -39,7 +39,7 @@
 #include "pubsub_admin.h"
 #include "../../pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.h"
 
-#define PSTM_PSA_HANDLING_DEFAULT_SLEEPTIME_IN_SECONDS       30L
+#define PSTM_PSA_HANDLING_DEFAULT_SLEEPTIME       250 // 250 msecond

Review comment:
       Renaming an existing environment variable is a breaking change. Please keep the existing
variable and have the new one overrule the older one only if defined in the properties.

##########
File path: bundles/pubsub/test/CMakeLists.txt
##########
@@ -47,7 +47,7 @@ celix_bundle_files(pubsub_endpoint_tst
         DESTINATION "META-INF/descriptors"
         )
 celix_bundle_files(pubsub_endpoint_tst
-        meta_data/ping2.properties
+        meta_data/ping3.properties

Review comment:
       It's probably better to create two endpoint tests; one which uses ping2/pong2 and one
which uses ping3/pong3

##########
File path: bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
##########
@@ -617,8 +614,8 @@ static void delay_first_send_for_late_joiners(pubsub_tcp_topic_sender_t
*sender)
     static bool firstSend = true;
 
     if (firstSend) {
-        L_INFO("PSA_TCP_TP: Delaying first send for late joiners...\n");
-        sleep(FIRST_SEND_DELAY_IN_SECONDS);
+        if (sender->send_delay ) L_INFO("PSA_TCP_TP: Delaying first send for late joiners...\n");

Review comment:
       If statement should encompass more than 1 line, please add braces.

##########
File path: bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
##########
@@ -533,7 +529,8 @@ psa_tcp_topicPublicationSend(void *handle, unsigned int msgTypeId, const
void *i
             clock_gettime(CLOCK_REALTIME, &serializationEnd);
         }
 
-        if (status == CELIX_SUCCESS /*ser ok*/) {
+        bool cont = pubsubInterceptorHandler_invokePreSend(sender->interceptorsHandler,
entry->msgSer->msgName, msgTypeId, inMsg, &metadata);

Review comment:
       I'm not quite sure if we want to intercept messages if we can't serialize them. @pnoltes
WDYT?
   
   Maybe we should do short-circruiting here
   ```
   bool cont = status == CELIX_SUCCESS && pubsubInterceptorHandler_invokePreSend(sender->interceptorsHandler,
entry->msgSer->msgName, msgTypeId, inMsg, &metadata);
   ```

##########
File path: bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
##########
@@ -707,7 +700,7 @@ pubsub_admin_receiver_metrics_t *pubsub_tcpTopicReceiver_metrics(pubsub_tcp_topi
                            metrics->msgTypeId);
                 }
             }
-            i += 1;
+            i +=1 ;

Review comment:
       nitpick: space before 1 was fine

##########
File path: bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
##########
@@ -964,58 +998,30 @@ int pubsub_tcpHandler_addAcceptConnectionCallback(pubsub_tcpHandler_t
*handle, v
     return result;
 }
 
-static inline
-int pubsub_tcpHandler_writeSocket(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t
*entry, struct msghdr* msg, unsigned int size, int flag ) {
-  int nbytes = 0;
-  int msgSize = 0;
-  if (entry->fd >= 0 && size && msg->msg_iovlen) {
-    int expectedReadSize = size;
-    unsigned int offset = 0;
-    nbytes = size;
-    while (nbytes > 0 && expectedReadSize > 0) {
-      // Read the message header
-      nbytes = sendmsg(entry->fd, msg, flag | MSG_NOSIGNAL);
-      // Update admin
-      expectedReadSize -= nbytes;
-      msgSize += nbytes;
-      // Not all written
-      if (expectedReadSize && nbytes > 0) {
-        unsigned int readSize = 0;
-        unsigned int readIndex = 0;
-        unsigned int i = 0;
-        for (i = 0; i < msg->msg_iovlen; i++) {
-          if (nbytes < msg->msg_iov[i].iov_len) {
-            readIndex = i;
-            break;
-          }
-          readSize+= msg->msg_iov[i].iov_len;
-        }
-        msg->msg_iov = &msg->msg_iov[readIndex];
-        msg->msg_iovlen -= readIndex;
-        char* buffer = (char*)msg->msg_iov->iov_base;
-        offset = nbytes - readSize;
-        msg->msg_iov->iov_base = &buffer[offset];
-        msg->msg_iov->iov_len  = msg->msg_iov->iov_len - offset;
-      }
-    }
-  }
-  if (nbytes <=0)  msgSize = nbytes;
-  return msgSize;
-}
+
 //
 // Write large data to TCP. .
 //
 int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message_t *message,
struct iovec *msgIoVec,
                             size_t msg_iov_len, int flags) {
-    celixThreadRwlock_readLock(&handle->dbLock);
     int result = 0;
     int connFdCloseQueue[hashMap_size(handle->connection_fd_map)];

Review comment:
       segfault if handle is NULL




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



Mime
View raw message