celix-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [celix] rbulter commented on a change in pull request #279: Feature/tcp admin msg segmentation
Date Wed, 02 Dec 2020 19:21:38 GMT

rbulter commented on a change in pull request #279:
URL: https://github.com/apache/celix/pull/279#discussion_r534422059



##########
File path: bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
##########
@@ -144,119 +146,108 @@ pubsub_tcp_topic_sender_t *pubsub_tcpTopicSender_create(
     if (uuid != NULL) {
         uuid_parse(uuid, sender->fwUUID);
     }
-    sender->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_TCP_METRICS_ENABLED,
-                                                                   PSA_TCP_DEFAULT_METRICS_ENABLED);
-    bool isEndpoint = false;
+    pubsubInterceptorsHandler_create(ctx, scope, topic, &sender->interceptorsHandler);
+    sender->isPassive = false;
+    sender->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_TCP_METRICS_ENABLED,
PSA_TCP_DEFAULT_METRICS_ENABLED);
     char *urls = NULL;
     const char *ip = celix_bundleContext_getProperty(ctx, PUBSUB_TCP_PSA_IP_KEY, NULL);
-    const char *discUrl = NULL;
-    const char *staticClientEndPointUrls = NULL;
-    const char *staticServerEndPointUrls = NULL;
-
-    discUrl = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_STATIC_BIND_URL_FOR,
topic, scope);
+    const char *discUrl = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_STATIC_BIND_URL_FOR,
topic, scope);
+    const char *isPassive = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_PASSIVE_ENABLED,
topic, scope);
+    const char *passiveKey = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_PASSIVE_SELECTION_KEY,
topic, scope);
 
     if (topicProperties != NULL) {
         if (discUrl == NULL) {
             discUrl = celix_properties_get(topicProperties, PUBSUB_TCP_STATIC_DISCOVER_URL,
NULL);
         }
-        /* Check if it's a static endpoint */
-        const char *endPointType = celix_properties_get(topicProperties, PUBSUB_TCP_STATIC_ENDPOINT_TYPE,
NULL);
-        if (endPointType != NULL) {
-            isEndpoint = true;
-            if (strncmp(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_CLIENT, endPointType,
-                        strlen(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_CLIENT)) == 0) {
-                staticClientEndPointUrls = celix_properties_get(topicProperties, PUBSUB_TCP_STATIC_CONNECT_URLS,
NULL);
-            }
-            if (strncmp(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_SERVER, endPointType,
-                        strlen(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_SERVER)) == 0) {
-                staticServerEndPointUrls = discUrl;
-            }
+        if (isPassive == NULL) {
+            isPassive = celix_properties_get(topicProperties, PUBSUB_TCP_PASSIVE_CONFIGURED,
NULL);
+        }
+        if (passiveKey == NULL) {
+            passiveKey = celix_properties_get(topicProperties, PUBSUB_TCP_PASSIVE_KEY, NULL);
         }
     }
+    sender->isPassive = psa_tcp_isPassive(isPassive);
 
     /* When it's an endpoint share the socket with the receiver */
-    if ((staticClientEndPointUrls != NULL) || (staticServerEndPointUrls)) {
-        celixThreadMutex_lock(&endPointStore->mutex);
-        const char *endPointUrl = (staticClientEndPointUrls) ? staticClientEndPointUrls :
staticServerEndPointUrls;
-        pubsub_tcpHandler_t *entry = hashMap_get(endPointStore->map, endPointUrl);
+    if (passiveKey != NULL) {
+        celixThreadMutex_lock(&handlerStore->mutex);
+        pubsub_tcpHandler_t *entry = hashMap_get(handlerStore->map, passiveKey);
         if (entry == NULL) {
             if (sender->socketHandler == NULL)
                 sender->socketHandler = pubsub_tcpHandler_create(sender->protocol,
sender->logHelper);
             entry = sender->socketHandler;
             sender->sharedSocketHandler = sender->socketHandler;
-            hashMap_put(endPointStore->map, (void *) endPointUrl, entry);
+            hashMap_put(handlerStore->map, (void *) passiveKey, entry);
         } else {
             sender->socketHandler = entry;
             sender->sharedSocketHandler = entry;
         }
-        celixThreadMutex_unlock(&endPointStore->mutex);
+        celixThreadMutex_unlock(&handlerStore->mutex);
     } else {
         sender->socketHandler = pubsub_tcpHandler_create(sender->protocol, sender->logHelper);
     }
 
     if ((sender->socketHandler != NULL) && (topicProperties != NULL)) {
         long prio = celix_properties_getAsLong(topicProperties, PUBSUB_TCP_THREAD_REALTIME_PRIO,
-1L);
         const char *sched = celix_properties_get(topicProperties, PUBSUB_TCP_THREAD_REALTIME_SCHED,
NULL);
-        long retryCnt = celix_properties_getAsLong(topicProperties, PUBSUB_TCP_PUBLISHER_RETRY_CNT_KEY,
-                                                   PUBSUB_TCP_PUBLISHER_RETRY_CNT_DEFAULT);
-        double timeout = celix_properties_getAsDouble(topicProperties, PUBSUB_TCP_PUBLISHER_SNDTIMEO_KEY,
-                                                                       (!isEndpoint) ? PUBSUB_TCP_PUBLISHER_SNDTIMEO_DEFAULT
:
-                                                                                       PUBSUB_TCP_PUBLISHER_SNDTIMEO_ENDPOINT_DEFAULT);
+        long retryCnt = celix_properties_getAsLong(topicProperties, PUBSUB_TCP_PUBLISHER_RETRY_CNT_KEY,
PUBSUB_TCP_PUBLISHER_RETRY_CNT_DEFAULT);
+        double sendTimeout = celix_properties_getAsDouble(topicProperties, PUBSUB_TCP_PUBLISHER_SNDTIMEO_KEY,
PUBSUB_TCP_PUBLISHER_SNDTIMEO_DEFAULT);
+        long maxMsgSize = celix_properties_getAsLong(topicProperties, PSA_TCP_MAX_MESSAGE_SIZE,
PSA_TCP_DEFAULT_MAX_MESSAGE_SIZE);
+        long timeout = celix_bundleContext_getPropertyAsLong(ctx, PSA_TCP_TIMEOUT, PSA_TCP_DEFAULT_TIMEOUT);
+        sender->send_delay = celix_bundleContext_getPropertyAsLong(ctx,  PSA_TCP_SEND_DELAY,
PSA_TCP_DEFAULT_SEND_DELAY);
         pubsub_tcpHandler_setThreadName(sender->socketHandler, topic, scope);
         pubsub_tcpHandler_setThreadPriority(sender->socketHandler, prio, sched);
         pubsub_tcpHandler_setSendRetryCnt(sender->socketHandler, (unsigned int) retryCnt);
-        pubsub_tcpHandler_setSendTimeOut(sender->socketHandler, timeout);
+        pubsub_tcpHandler_setSendTimeOut(sender->socketHandler, sendTimeout);
+        pubsub_tcpHandler_setMaxMsgSize(sender->socketHandler, (unsigned int) maxMsgSize);
+        pubsub_tcpHandler_enableReceiveEvent(sender->socketHandler, (passiveKey) ? true
: false);

Review comment:
       When the passive connection is used a full duplex connection is setup.
   This means also EPOLLIN is enabled for the publisher




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



Mime
View raw message