celix-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [celix] rlenferink commented on a change in pull request #34: Feature/add tcp pubsub endpoint
Date Mon, 08 Jul 2019 06:25:53 GMT
rlenferink commented on a change in pull request #34: Feature/add tcp pubsub endpoint
URL: https://github.com/apache/celix/pull/34#discussion_r300937458
 
 

 ##########
 File path: bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
 ##########
 @@ -0,0 +1,745 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#include <pubsub_serializer.h>
+#include <stdlib.h>
+#include <pubsub/subscriber.h>
+#include <memory.h>
+#include <pubsub_constants.h>
+#include <sys/epoll.h>
+#include <assert.h>
+#include <pubsub_endpoint.h>
+#include <arpa/inet.h>
+#include <log_helper.h>
+#include <math.h>
+#include "pubsub_tcp_handler.h"
+#include "pubsub_tcp_topic_receiver.h"
+#include "pubsub_psa_tcp_constants.h"
+#include "pubsub_tcp_common.h"
+
+#include <uuid/uuid.h>
+#include <pubsub_admin_metrics.h>
+
+#define MAX_EPOLL_EVENTS     16
+#ifndef UUID_STR_LEN
+#define UUID_STR_LEN  37
+#endif
+
+
+#define L_DEBUG(...) \
+    logHelper_log(receiver->logHelper, OSGI_LOGSERVICE_DEBUG, __VA_ARGS__)
+#define L_INFO(...) \
+    logHelper_log(receiver->logHelper, OSGI_LOGSERVICE_INFO, __VA_ARGS__)
+#define L_WARN(...) \
+    logHelper_log(receiver->logHelper, OSGI_LOGSERVICE_WARNING, __VA_ARGS__)
+#define L_ERROR(...) \
+    logHelper_log(receiver->logHelper, OSGI_LOGSERVICE_ERROR, __VA_ARGS__)
+
+struct pubsub_tcp_topic_receiver {
+  celix_bundle_context_t *ctx;
+  log_helper_t *logHelper;
+  long serializerSvcId;
+  pubsub_serializer_service_t *serializer;
+  char *scope;
+  char *topic;
+  char scopeAndTopicFilter[5];
+  bool metricsEnabled;
+  pubsub_tcpHandler_pt socketHandler;
+  pubsub_tcpHandler_pt sharedSocketHandler;
+
+  struct {
+    celix_thread_t thread;
+    celix_thread_mutex_t mutex;
+    bool running;
+  } thread;
+
+  struct {
+    celix_thread_mutex_t mutex;
+    hash_map_t *map; //key = tcp url, value = psa_tcp_requested_connection_entry_t*
+    bool allConnected; //true if all requestedConnectection are connected
+  } requestedConnections;
+
+  long subscriberTrackerId;
+  struct {
+    celix_thread_mutex_t mutex;
+    hash_map_t *map; //key = bnd id, value = psa_tcp_subscriber_entry_t
+    bool allInitialized;
+  } subscribers;
+};
+
+typedef struct psa_tcp_requested_connection_entry {
+  pubsub_tcp_topic_receiver_t *parent;
+  char *key;
+  char *url;
+  int  fd;
+  bool connected;
+  bool statically; //true if the connection is statically configured through the topic properties.
+} psa_tcp_requested_connection_entry_t;
+
+typedef struct psa_tcp_subscriber_metrics_entry_t {
+  unsigned int msgTypeId;
+  uuid_t origin;
+
+  unsigned long nrOfMessagesReceived;
+  unsigned long nrOfSerializationErrors;
+  struct timespec lastMessageReceived;
+  double averageTimeBetweenMessagesInSeconds;
+  double averageSerializationTimeInSeconds;
+  double averageDelayInSeconds;
+  double maxDelayInSeconds;
+  double minDelayInSeconds;
+  unsigned int lastSeqNr;
+  unsigned long nrOfMissingSeqNumbers;
+} psa_tcp_subscriber_metrics_entry_t;
+
+typedef struct psa_tcp_subscriber_entry {
+  int usageCount;
+  hash_map_t *msgTypes; //map from serializer svc
+  hash_map_t *metrics; //key = msg type id, value = hash_map (key = origin uuid, value =
psa_tcp_subscriber_metrics_entry_t*
+  pubsub_subscriber_t *svc;
+  bool initialized; //true if the init function is called through the receive thread
+} psa_tcp_subscriber_entry_t;
+
+
+static void pubsub_tcpTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t
*props,
+                                                  const celix_bundle_t *owner);
+
+static void pubsub_tcpTopicReceiver_removeSubscriber(void *handle, void *svc, const celix_properties_t
*props,
+                                                     const celix_bundle_t *owner);
+
+static void *psa_tcp_recvThread(void *data);
+
+static void psa_tcp_connectToAllRequestedConnections(pubsub_tcp_topic_receiver_t *receiver);
+
+static void psa_tcp_initializeAllSubscribers(pubsub_tcp_topic_receiver_t *receiver);
+
+static void processMsg(void* handle, const pubsub_tcp_msg_header_t *hdr, const unsigned char
*payload, size_t payloadSize, struct timespec *receiveTime);
+static void psa_tcp_connectHandler(void *handle, const char *url, bool lock);
+static void psa_tcp_disConnectHandler(void *handle, const char *url);
+
+
+
+pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context_t *ctx,
+                                                            log_helper_t *logHelper,
+                                                            const char *scope,
+                                                            const char *topic,
+                                                            const celix_properties_t *topicProperties,
+                                                            pubsub_tcp_endPointStore_t* endPointStore,
+                                                            long serializerSvcId,
+                                                            pubsub_serializer_service_t *serializer)
{
+  pubsub_tcp_topic_receiver_t *receiver = calloc(1, sizeof(*receiver));
+  receiver->ctx = ctx;
+  receiver->logHelper = logHelper;
+  receiver->serializerSvcId = serializerSvcId;
+  receiver->serializer = serializer;
+  receiver->scope = strndup(scope, 1024 * 1024);
+  receiver->topic = strndup(topic, 1024 * 1024);
+
+  long sessions = celix_bundleContext_getPropertyAsLong(ctx, PSA_TCP_MAX_RECV_SESSIONS, PSA_TCP_DEFAULT_MAX_RECV_SESSIONS);
+  long buffer_size = celix_bundleContext_getPropertyAsLong(ctx, PSA_TCP_RECV_BUFFER_SIZE,
PSA_TCP_DEFAULT_RECV_BUFFER_SIZE);
+  long timeout = celix_bundleContext_getPropertyAsLong(ctx, PSA_TCP_TIMEOUT, PSA_TCP_DEFAULT_TIMEOUT);
+  const char *staticConnectUrls = celix_properties_get(topicProperties, PUBSUB_TCP_STATIC_CONNECT_URLS,
NULL);
+
+  /* Check if it's a static endpoint */
+  bool isEndPointTypeClient = false;
+  bool isEndPointTypeServer = false;
+  const char *endPointType  = celix_properties_get(topicProperties, PUBSUB_TCP_STATIC_ENDPOINT_TYPE,
NULL);
+  if (endPointType != NULL) {
+    if (strncmp(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_CLIENT, endPointType, strlen(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_CLIENT))
==0) {
+      isEndPointTypeClient = true;
+    }
+    if (strncmp(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_SERVER, endPointType, strlen(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_SERVER))
==0) {
+      isEndPointTypeServer = true;
+    }
+  }
+  // When endpoint is server, use the bind url as a key.
+  const char *staticBindUrl = ((topicProperties != NULL) && isEndPointTypeServer)
? celix_properties_get(topicProperties, PUBSUB_TCP_STATIC_BIND_URL, NULL) : NULL;
+  /* When it's an endpoint share the socket with the receiver */
+  if (staticBindUrl != NULL  || (isEndPointTypeClient && staticConnectUrls != NULL))
{
+    celixThreadMutex_lock(&receiver->thread.mutex);
+    pubsub_tcpHandler_pt entry = hashMap_get(endPointStore->map, (isEndPointTypeServer)
? staticBindUrl : staticConnectUrls);
+    if(entry != NULL) {
+      receiver->socketHandler = entry;
+      receiver->sharedSocketHandler = entry;
+    } else {
+      L_ERROR("[PSA_TCP] Cannot find static Endpoint URL for %s/%s", scope, topic);
+    }
+    celixThreadMutex_unlock(&receiver->thread.mutex);
+  }
+
+  if (receiver->socketHandler == NULL) {
+    receiver->socketHandler = pubsub_tcpHandler_create(receiver->logHelper);
+  }
+
+  if (receiver->socketHandler != NULL) {
+    pubsub_tcpHandler_createReceiveBufferStore(receiver->socketHandler, (unsigned int)
sessions, (unsigned int) buffer_size);
+    pubsub_tcpHandler_setTimeout(receiver->socketHandler, (unsigned int) timeout);
+    pubsub_tcpHandler_addMessageHandler(receiver->socketHandler, receiver, processMsg);
+    pubsub_tcpHandler_addConnectionCallback(receiver->socketHandler, receiver, psa_tcp_connectHandler,psa_tcp_disConnectHandler);
+  }
+
+  psa_tcp_setScopeAndTopicFilter(scope, topic, receiver->scopeAndTopicFilter);
+  receiver->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_TCP_METRICS_ENABLED,
+                                                                        PSA_TCP_DEFAULT_METRICS_ENABLED);
+
+  celixThreadMutex_create(&receiver->subscribers.mutex, NULL);
+  celixThreadMutex_create(&receiver->requestedConnections.mutex, NULL);
+  celixThreadMutex_create(&receiver->thread.mutex, NULL);
+
+  receiver->subscribers.map = hashMap_create(NULL, NULL, NULL, NULL);
+  receiver->requestedConnections.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals,
NULL);
+  receiver->requestedConnections.allConnected = false;
+
+  if ((staticConnectUrls != NULL) && (receiver->socketHandler != NULL) &&
(staticBindUrl == NULL)) {
+    char *urlsCopy = strndup(staticConnectUrls, 1024*1024);
+    char* url;
+    char* save = urlsCopy;
+    while ((url = strtok_r(save, " ", &save))) {
+      psa_tcp_requested_connection_entry_t *entry = calloc(1, sizeof(*entry));
+      entry->statically = true;
+      entry->connected = false;
+      entry->url = strndup(url, 1024*1024);
+      entry->parent = receiver;
+      hashMap_put(receiver->requestedConnections.map, entry->url, entry);
+    }
+    free(urlsCopy);
+
+    // Configure Receiver thread
+    receiver->thread.running = true;
+    celixThread_create(&receiver->thread.thread, NULL, psa_tcp_recvThread, receiver);
+    char name[64];
+    snprintf(name, 64, "TCP TR %s/%s", scope, topic);
+    celixThread_setName(&receiver->thread.thread, name);
+    psa_tcp_setupTcpContext(receiver->logHelper, &receiver->thread.thread, topicProperties);
+  }
+
+  //track subscribers
+  if (receiver->socketHandler != NULL) {
+    int size = snprintf(NULL, 0, "(%s=%s)", PUBSUB_SUBSCRIBER_TOPIC, topic);
+    char buf[size+1];
+    snprintf(buf, (size_t)size+1, "(%s=%s)", PUBSUB_SUBSCRIBER_TOPIC, topic);
+    celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
+    opts.filter.ignoreServiceLanguage = true;
+    opts.filter.serviceName = PUBSUB_SUBSCRIBER_SERVICE_NAME;
+    opts.filter.filter = buf;
+    opts.callbackHandle = receiver;
+    opts.addWithOwner = pubsub_tcpTopicReceiver_addSubscriber;
+    opts.removeWithOwner = pubsub_tcpTopicReceiver_removeSubscriber;
+    receiver->subscriberTrackerId = celix_bundleContext_trackServicesWithOptions(ctx,
&opts);
+  }
+
+  if (receiver->socketHandler == NULL) {
+    free(receiver->scope);
+    free(receiver->topic);
+    free(receiver);
+    receiver = NULL;
+    L_ERROR("[PSA_TCP] Cannot create TopicReceiver for %s/%s", scope, topic);
+  }
+  return receiver;
+}
+
+void pubsub_tcpTopicReceiver_destroy(pubsub_tcp_topic_receiver_t *receiver) {
+  if (receiver != NULL) {
+
+
+    celixThreadMutex_lock(&receiver->thread.mutex);
+    if (!receiver->thread.running) {
+      receiver->thread.running = false;
+      celixThreadMutex_unlock(&receiver->thread.mutex);
+      celixThread_join(receiver->thread.thread, NULL);
+    }
+
+    celix_bundleContext_stopTracker(receiver->ctx, receiver->subscriberTrackerId);
+
+    celixThreadMutex_lock(&receiver->subscribers.mutex);
+    hash_map_iterator_t iter = hashMapIterator_construct(receiver->subscribers.map);
+    while (hashMapIterator_hasNext(&iter)) {
+      psa_tcp_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter);
+      if (entry != NULL) {
+        receiver->serializer->destroySerializerMap(receiver->serializer->handle,
entry->msgTypes);
+        free(entry);
+      }
+
+      hash_map_iterator_t iter2 = hashMapIterator_construct(entry->metrics);
+      while (hashMapIterator_hasNext(&iter2)) {
+        hash_map_t *origins = hashMapIterator_nextValue(&iter2);
+        hashMap_destroy(origins, true, true);
+      }
+      hashMap_destroy(entry->metrics, false, false);
+    }
+    hashMap_destroy(receiver->subscribers.map, false, false);
+
+
+    celixThreadMutex_unlock(&receiver->subscribers.mutex);
+
+    celixThreadMutex_lock(&receiver->requestedConnections.mutex);
+    iter = hashMapIterator_construct(receiver->requestedConnections.map);
+    while (hashMapIterator_hasNext(&iter)) {
+      psa_tcp_requested_connection_entry_t *entry = hashMapIterator_nextValue(&iter);
+      if (entry != NULL) {
+        free(entry->url);
+        free(entry);
+      }
+    }
+    hashMap_destroy(receiver->requestedConnections.map, false, false);
+    celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
+
+    celixThreadMutex_destroy(&receiver->subscribers.mutex);
+    celixThreadMutex_destroy(&receiver->requestedConnections.mutex);
+    celixThreadMutex_destroy(&receiver->thread.mutex);
+
+    if ((receiver->socketHandler)&&(receiver->sharedSocketHandler == NULL))
{
+      pubsub_tcpHandler_destroy(receiver->socketHandler);
+      receiver->socketHandler = NULL;
+    }
+
+    free(receiver->scope);
+    free(receiver->topic);
+  }
+  free(receiver);
+}
+
+const char *pubsub_tcpTopicReceiver_scope(pubsub_tcp_topic_receiver_t *receiver) {
+  return receiver->scope;
+}
+
+const char *pubsub_tcpTopicReceiver_topic(pubsub_tcp_topic_receiver_t *receiver) {
+  return receiver->topic;
+}
+
+long pubsub_tcpTopicReceiver_serializerSvcId(pubsub_tcp_topic_receiver_t *receiver) {
+  return receiver->serializerSvcId;
+}
+
+void pubsub_tcpTopicReceiver_listConnections(pubsub_tcp_topic_receiver_t *receiver, celix_array_list_t
*connectedUrls,
+                                             celix_array_list_t *unconnectedUrls) {
+  celixThreadMutex_lock(&receiver->requestedConnections.mutex);
+  hash_map_iterator_t iter = hashMapIterator_construct(receiver->requestedConnections.map);
+  while (hashMapIterator_hasNext(&iter)) {
+    psa_tcp_requested_connection_entry_t *entry = hashMapIterator_nextValue(&iter);
+    char *url = NULL;
+    asprintf(&url, "%s%s", entry->url, entry->statically ? " (static)" : "");
+    if (entry->connected) {
+      celix_arrayList_add(connectedUrls, url);
+    } else {
+      celix_arrayList_add(unconnectedUrls, url);
+    }
+  }
+  celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
+}
+
+
+
+void pubsub_tcpTopicReceiver_connectTo(
+        pubsub_tcp_topic_receiver_t *receiver,
+        const char *url) {
+  L_DEBUG("[PSA_TCP] TopicReceiver %s/%s connecting to tcp url %s", receiver->scope, receiver->topic,
url);
+
+  celixThreadMutex_lock(&receiver->requestedConnections.mutex);
+  psa_tcp_requested_connection_entry_t *entry = hashMap_get(receiver->requestedConnections.map,
url);
+  if (entry == NULL) {
+    entry = calloc(1, sizeof(*entry));
+    entry->url = strndup(url, 1024*1024);
+    entry->connected = false;
+    entry->statically = false;
+    entry->parent = receiver;
+    hashMap_put(receiver->requestedConnections.map, (void*)entry->url, entry);
+    receiver->requestedConnections.allConnected = false;
+  }
+  celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
+
+  psa_tcp_connectToAllRequestedConnections(receiver);
+}
+
+void pubsub_tcpTopicReceiver_disconnectFrom(pubsub_tcp_topic_receiver_t *receiver, const
char *url) {
+  L_DEBUG("[PSA ZMQ] TopicReceiver %s/%s disconnect from zmq url %s", receiver->scope,
receiver->topic, url);
 
 Review comment:
   [PSA TCP]

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message