celix-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [celix] Oipo commented on a change in pull request #279: Feature/tcp admin msg segmentation
Date Wed, 16 Dec 2020 12:19:32 GMT

Oipo commented on a change in pull request #279:
URL: https://github.com/apache/celix/pull/279#discussion_r544253608



##########
File path: bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
##########
@@ -559,31 +549,39 @@ processMsgForSubscriberEntry(pubsub_tcp_topic_receiver_t *receiver,
psa_tcp_subs
             }
 
             if (status == CELIX_SUCCESS) {
-                hash_map_iterator_t iter = hashMapIterator_construct(entry->subscriberServices);
+                const char *msgType = msgSer->msgName;
+                uint32_t msgId = message->header.msgId;
+                celix_properties_t *metadata = message->metadata.metadata;
+                bool cont = pubsubInterceptorHandler_invokePreReceive(receiver->interceptorsHandler,
msgType, msgId, deSerializedMsg, &metadata);
                 bool release = true;
-                while (hashMapIterator_hasNext(&iter)) {
-                    pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter);
-                    svc->receive(svc->handle, msgSer->msgName, msgSer->msgId,
deSerializedMsg, message->metadata.metadata,
-                                 &release);
-                    if (!release && hashMapIterator_hasNext(&iter)) {
-                        //receive function has taken ownership and still more receive function
to come ..
-                        //deserialize again for new message
-                        status = msgSer->deserialize(msgSer->handle, &deSerializeBuffer,
1, &deSerializedMsg);
-                        if (status != CELIX_SUCCESS) {
-                            L_WARN("[PSA_TCP_TR] Cannot deserialize msg type %s for scope/topic
%s/%s", msgSer->msgName,
-                                   receiver->scope == NULL ? "(null)" : receiver->scope,
receiver->topic);
-                            break;
+                if (cont) {
+                    hash_map_iterator_t iter = hashMapIterator_construct(entry->subscriberServices);
+                    while (hashMapIterator_hasNext(&iter)) {
+                        pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter);
+                        svc->receive(svc->handle, msgSer->msgName, msgSer->msgId,
deSerializedMsg, message->metadata.metadata, &release);
+                        pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler,
msgType, msgId, deSerializedMsg, metadata);

Review comment:
       @pnoltes the zmq admin also calls the post receive in this while loop. But the svc
that has received the message is not passed to the interceptor, so what is the use?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



Mime
View raw message