celix-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [celix] pnoltes commented on a change in pull request #279: Feature/tcp admin msg segmentation
Date Thu, 17 Dec 2020 14:29:37 GMT

pnoltes commented on a change in pull request #279:
URL: https://github.com/apache/celix/pull/279#discussion_r545132329



##########
File path: bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
##########
@@ -559,31 +549,39 @@ processMsgForSubscriberEntry(pubsub_tcp_topic_receiver_t *receiver,
psa_tcp_subs
             }
 
             if (status == CELIX_SUCCESS) {
-                hash_map_iterator_t iter = hashMapIterator_construct(entry->subscriberServices);
+                const char *msgType = msgSer->msgName;
+                uint32_t msgId = message->header.msgId;
+                celix_properties_t *metadata = message->metadata.metadata;
+                bool cont = pubsubInterceptorHandler_invokePreReceive(receiver->interceptorsHandler,
msgType, msgId, deSerializedMsg, &metadata);
                 bool release = true;
-                while (hashMapIterator_hasNext(&iter)) {
-                    pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter);
-                    svc->receive(svc->handle, msgSer->msgName, msgSer->msgId,
deSerializedMsg, message->metadata.metadata,
-                                 &release);
-                    if (!release && hashMapIterator_hasNext(&iter)) {
-                        //receive function has taken ownership and still more receive function
to come ..
-                        //deserialize again for new message
-                        status = msgSer->deserialize(msgSer->handle, &deSerializeBuffer,
1, &deSerializedMsg);
-                        if (status != CELIX_SUCCESS) {
-                            L_WARN("[PSA_TCP_TR] Cannot deserialize msg type %s for scope/topic
%s/%s", msgSer->msgName,
-                                   receiver->scope == NULL ? "(null)" : receiver->scope,
receiver->topic);
-                            break;
+                if (cont) {
+                    hash_map_iterator_t iter = hashMapIterator_construct(entry->subscriberServices);
+                    while (hashMapIterator_hasNext(&iter)) {
+                        pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter);
+                        svc->receive(svc->handle, msgSer->msgName, msgSer->msgId,
deSerializedMsg, message->metadata.metadata, &release);
+                        pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler,
msgType, msgId, deSerializedMsg, metadata);

Review comment:
       The interceptor can be used monitor (pre/post) pubsub. 
   For example to observe the the recieve processing time by writting the difftime using a
pre and postReceive interceptor call. 
   
   The interceptors do not need the actual subscriber service. 
   Although the wish to intercept the usage of a pubsub receive was also discussed, this is
more adapter than interceptor functionality. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



Mime
View raw message