celix-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Bjoern Petri <bjoern.pe...@sundevil.de>
Subject Re: svn commit: r1632567 - in /celix/trunk/remote_services: discovery/private/include/ discovery/private/src/ discovery_configured/private/include/ discovery_etcd/private/include/ discovery_etcd/private/src/
Date Fri, 17 Oct 2014 15:43:27 GMT

Hi Alexander,

thanks for testing - just committed the fix. Allocation on the heap is 
needed as the discovery_etcd needs to pick up the port for its 
annunciation.

Regards,
   Bjoern



On 2014-10-17 16:50, Alexander Broekhuis wrote:
> Hi Bjoern,
> 
> This change causes a crash when shutting down the application. More
> specific, if the initial port can be used, the port is not allocated, 
> but
> once the webserver does not start, an allocation is done. So the free 
> in
> endpointDiscoveryServer_destroy is ok in the latter, but not needed in 
> the
> former case.
> 
> Is there a need to allocate something? It could just be done on stack 
> as
> far as I can tell.
> 
> 
> 2014-10-17 9:05 GMT-04:00 <bpetri@apache.org>:
> 
>> Author: bpetri
>> Date: Fri Oct 17 13:05:20 2014
>> New Revision: 1632567
>> 
>> URL: http://svn.apache.org/r1632567
>> Log:
>> CELIX-169: Add port-collision auto-correction to discovery
>> 
>> Modified:
>>     celix/trunk/remote_services/discovery/private/include/discovery.h
>> 
>> celix/trunk/remote_services/discovery/private/include/endpoint_discovery_server.h
>> 
>> celix/trunk/remote_services/discovery/private/src/endpoint_discovery_server.c
>> 
>> celix/trunk/remote_services/discovery_configured/private/include/discovery_impl.h
>> 
>> celix/trunk/remote_services/discovery_etcd/private/include/discovery_impl.h
>> 
>> celix/trunk/remote_services/discovery_etcd/private/include/etcd_watcher.h
>>     celix/trunk/remote_services/discovery_etcd/private/src/etcd.c
>>     
>> celix/trunk/remote_services/discovery_etcd/private/src/etcd_watcher.c
>> 
>> Modified: 
>> celix/trunk/remote_services/discovery/private/include/discovery.h
>> URL:
>> http://svn.apache.org/viewvc/celix/trunk/remote_services/discovery/private/include/discovery.h?rev=1632567&r1=1632566&r2=1632567&view=diff
>> 
>> ==============================================================================
>> --- celix/trunk/remote_services/discovery/private/include/discovery.h
>> (original)
>> +++ celix/trunk/remote_services/discovery/private/include/discovery.h 
>> Fri
>> Oct 17 13:05:20 2014
>> @@ -33,6 +33,8 @@
>>  #include "endpoint_description.h"
>>  #include "endpoint_listener.h"
>> 
>> +#define DISCOVERY_SERVER_INTERFACE     
>> "DISCOVERY_CFG_SERVER_INTERFACE"
>> +#define DISCOVERY_SERVER_IP            "DISCOVERY_CFG_SERVER_IP"
>>  #define DISCOVERY_SERVER_PORT          "DISCOVERY_CFG_SERVER_PORT"
>>  #define DISCOVERY_SERVER_PATH          "DISCOVERY_CFG_SERVER_PATH"
>>  #define DISCOVERY_POLL_ENDPOINTS       "DISCOVERY_CFG_POLL_ENDPOINTS"
>> 
>> Modified:
>> celix/trunk/remote_services/discovery/private/include/endpoint_discovery_server.h
>> URL:
>> http://svn.apache.org/viewvc/celix/trunk/remote_services/discovery/private/include/endpoint_discovery_server.h?rev=1632567&r1=1632566&r2=1632567&view=diff
>> 
>> ==============================================================================
>> ---
>> celix/trunk/remote_services/discovery/private/include/endpoint_discovery_server.h
>> (original)
>> +++
>> celix/trunk/remote_services/discovery/private/include/endpoint_discovery_server.h
>> Fri Oct 17 13:05:20 2014
>> @@ -68,4 +68,14 @@ celix_status_t endpointDiscoveryServer_a
>>   */
>>  celix_status_t endpointDiscoveryServer_removeEndpoint(
>> endpoint_discovery_server_pt server, endpoint_description_pt 
>> endpoint);
>> 
>> +/**
>> + * Removes the url, which is used by the discovery server to announce 
>> the
>> endpoints
>> + *
>> + * @param server [in] the endpoint discovery server to retrieve the 
>> url
>> from
>> + * @param url [out] url which is used to announce the endpoints.
>> + * @return CELIX_SUCCESS when successful.
>> + */
>> +celix_status_t
>> endpointDiscoveryServer_getUrl(endpoint_discovery_server_pt server, 
>> char*
>> url);
>> +
>> +
>>  #endif /* ENDPOINT_DISCOVERY_SERVER_H_ */
>> 
>> Modified:
>> celix/trunk/remote_services/discovery/private/src/endpoint_discovery_server.c
>> URL:
>> http://svn.apache.org/viewvc/celix/trunk/remote_services/discovery/private/src/endpoint_discovery_server.c?rev=1632567&r1=1632566&r2=1632567&view=diff
>> 
>> ==============================================================================
>> ---
>> celix/trunk/remote_services/discovery/private/src/endpoint_discovery_server.c
>> (original)
>> +++
>> celix/trunk/remote_services/discovery/private/src/endpoint_discovery_server.c
>> Fri Oct 17 13:05:20 2014
>> @@ -25,7 +25,10 @@
>>   */
>>  #include <stdlib.h>
>>  #include <stdint.h>
>> -
>> +#include <arpa/inet.h>
>> +#include <sys/socket.h>
>> +#include <netdb.h>
>> +#include <ifaddrs.h>
>>  #include "civetweb.h"
>>  #include "celix_errno.h"
>>  #include "utils.h"
>> @@ -36,7 +39,8 @@
>>  #include "endpoint_descriptor_writer.h"
>>  #include "endpoint_discovery_server.h"
>> 
>> -
>> +// defines how often the webserver is restarted (with an increased 
>> port
>> number)
>> +#define MAX_NUMBER_OF_RESTARTS         5
>>  #define DEFAULT_SERVER_THREADS "1"
>> 
>>  #define CIVETWEB_REQUEST_NOT_HANDLED 0
>> @@ -54,16 +58,23 @@ struct endpoint_discovery_server {
>>      celix_thread_mutex_t serverLock;
>> 
>>      const char* path;
>> +    const char *port;
>> +    const char* ip;
>>      struct mg_context* ctx;
>>  };
>> 
>>  // Forward declarations...
>>  static int endpointDiscoveryServer_callback(struct mg_connection 
>> *conn);
>>  static char* format_path(char* path);
>> +static celix_status_t endpointDiscoveryServer_getIpAdress(char*
>> interface, char** ip);
>> 
>>  celix_status_t endpointDiscoveryServer_create(discovery_pt discovery,
>> bundle_context_pt context, endpoint_discovery_server_pt *server) {
>>         celix_status_t status = CELIX_SUCCESS;
>> 
>> +       char *port = 0;
>> +       char *ip = NULL;
>> +       char *path = NULL;
>> +
>>         *server = malloc(sizeof(struct endpoint_discovery_server));
>>         if (!*server) {
>>                 return CELIX_ENOMEM;
>> @@ -79,13 +90,34 @@ celix_status_t endpointDiscoveryServer_c
>>                 return CELIX_BUNDLE_EXCEPTION;
>>         }
>> 
>> -       char *port = NULL;
>> +       bundleContext_getProperty(context, DISCOVERY_SERVER_IP, &ip);
>> +       if (ip == NULL) {
>> +               char *interface = NULL;
>> +
>> +               bundleContext_getProperty(context,
>> DISCOVERY_SERVER_INTERFACE, &interface);
>> +               if ((interface != NULL) &&
>> (endpointDiscoveryServer_getIpAdress(interface, &ip) != 
>> CELIX_SUCCESS)) {
>> +                       fw_log(logger, OSGI_FRAMEWORK_LOG_WARNING, 
>> "Could
>> not retrieve IP adress for interface %s", interface);
>> +               }
>> +
>> +               if (ip == NULL) {
>> +                       endpointDiscoveryServer_getIpAdress(NULL, 
>> &ip);
>> +               }
>> +       }
>> +
>> +       if (ip != NULL) {
>> +               fw_log(logger, OSGI_FRAMEWORK_LOG_INFO, "Using %s for
>> service annunciation", ip);
>> +               (*server)->ip = strdup(ip);
>> +       }
>> +       else {
>> +               fw_log(logger, OSGI_FRAMEWORK_LOG_WARNING, "No IP 
>> address
>> for service annunciation set. Using %s", DEFAULT_SERVER_IP);
>> +               (*server)->ip = (char*) DEFAULT_SERVER_IP;
>> +       }
>> +
>>         bundleContext_getProperty(context, DISCOVERY_SERVER_PORT, 
>> &port);
>>         if (port == NULL) {
>>                 port = DEFAULT_SERVER_PORT;
>>         }
>> 
>> -       char *path = NULL;
>>         bundleContext_getProperty(context, DISCOVERY_SERVER_PATH, 
>> &path);
>>         if (path == NULL) {
>>                 path = DEFAULT_SERVER_PATH;
>> @@ -93,19 +125,56 @@ celix_status_t endpointDiscoveryServer_c
>> 
>>         (*server)->path = format_path(path);
>> 
>> -       const char *options[] = {
>> -               "listening_ports", port,
>> -               "num_threads", DEFAULT_SERVER_THREADS,
>> -               NULL
>> -       };
>> -
>>         const struct mg_callbacks callbacks = {
>>                 .begin_request = endpointDiscoveryServer_callback,
>>         };
>> 
>> -       (*server)->ctx = mg_start(&callbacks, (*server), options);
>> +       unsigned int port_counter = 0;
>> +
>> +       do {
>> +               const char *options[] = {
>> +                       "listening_ports", port,
>> +                       "num_threads", DEFAULT_SERVER_THREADS,
>> +                       NULL
>> +               };
>> +
>> +               (*server)->ctx = mg_start(&callbacks, (*server), 
>> options);
>> +
>> +               if ((*server)->ctx != NULL)
>> +               {
>> +                       fw_log(logger, OSGI_FRAMEWORK_LOG_INFO, 
>> "Starting
>> discovery server on port %s...", port);
>> +                       (*server)->port = port;
>> +               }
>> +               else {
>> +                       errno = 0;
>> +                       char* newPort = calloc(10, sizeof(*newPort));
>> +               char* endptr = port;
>> +               int currentPort = strtol(port, &endptr, 10);
>> +
>> +               if (*endptr || errno != 0) {
>> +                   currentPort = strtol(DEFAULT_SERVER_PORT, NULL, 
>> 10);
>> +               }
>> +
>> +               port_counter++;
>> +                       snprintf(newPort, 6,  "%d", (currentPort+1));
>> 
>> -       fw_log(logger, OSGI_FRAMEWORK_LOG_INFO, "Starting discovery 
>> server
>> on port %s...", port);
>> +                       fw_log(logger, OSGI_FRAMEWORK_LOG_ERROR, 
>> "Error
>> while starting discovery server on port %s - retrying on port %s...", 
>> port,
>> newPort);
>> +                       port = newPort;
>> +               }
>> +
>> +       } while(((*server)->ctx == NULL) && (port_counter <
>> MAX_NUMBER_OF_RESTARTS));
>> +
>> +       return status;
>> +}
>> +
>> +celix_status_t
>> endpointDiscoveryServer_getUrl(endpoint_discovery_server_pt server, 
>> char*
>> url)
>> +{
>> +       celix_status_t status = CELIX_BUNDLE_EXCEPTION;
>> +
>> +       if (server->ip && server->port && server->path)
{
>> +               sprintf(url, "http://%s:%s/%s", server->ip, 
>> server->port,
>> server->path);
>> +               status = CELIX_SUCCESS;
>> +       }
>> 
>>         return status;
>>  }
>> @@ -127,6 +196,9 @@ celix_status_t endpointDiscoveryServer_d
>>         status = celixThreadMutex_destroy(&server->serverLock);
>> 
>>         free((void*) server->path);
>> +       free((void*) server->port);
>> +       free((void*) server->ip);
>> +
>>         free(server);
>> 
>>         return status;
>> @@ -308,3 +380,34 @@ static int endpointDiscoveryServer_callb
>> 
>>         return status;
>>  }
>> +
>> +static celix_status_t endpointDiscoveryServer_getIpAdress(char*
>> interface, char** ip) {
>> +       celix_status_t status = CELIX_BUNDLE_EXCEPTION;
>> +
>> +       struct ifaddrs *ifaddr, *ifa;
>> +    char host[NI_MAXHOST];
>> +
>> +    if (getifaddrs(&ifaddr) != -1)
>> +    {
>> +               for (ifa = ifaddr; ifa != NULL && status != 
>> CELIX_SUCCESS;
>> ifa = ifa->ifa_next)
>> +               {
>> +                       if (ifa->ifa_addr == NULL)
>> +                               continue;
>> +
>> +                       if ((getnameinfo(ifa->ifa_addr,sizeof(struct
>> sockaddr_in), host, NI_MAXHOST, NULL, 0, NI_NUMERICHOST) == 0) &&
>> (ifa->ifa_addr->sa_family == AF_INET)) {
>> +                               if (interface == NULL) {
>> +                                       *ip = strdup(host);
>> +                                       status = CELIX_SUCCESS;
>> +                               }
>> +                               else if (strcmp(ifa->ifa_name, 
>> interface)
>> == 0) {
>> +                                       *ip = strdup(host);
>> +                                       status = CELIX_SUCCESS;
>> +                               }
>> +                       }
>> +               }
>> +
>> +               freeifaddrs(ifaddr);
>> +    }
>> +
>> +    return status;
>> +}
>> 
>> Modified:
>> celix/trunk/remote_services/discovery_configured/private/include/discovery_impl.h
>> URL:
>> http://svn.apache.org/viewvc/celix/trunk/remote_services/discovery_configured/private/include/discovery_impl.h?rev=1632567&r1=1632566&r2=1632567&view=diff
>> 
>> ==============================================================================
>> ---
>> celix/trunk/remote_services/discovery_configured/private/include/discovery_impl.h
>> (original)
>> +++
>> celix/trunk/remote_services/discovery_configured/private/include/discovery_impl.h
>> Fri Oct 17 13:05:20 2014
>> @@ -37,7 +37,7 @@
>>  #include "endpoint_discovery_server.h"
>> 
>> 
>> -
>> +#define DEFAULT_SERVER_IP      "127.0.0.1"
>>  #define DEFAULT_SERVER_PORT "9999"
>>  #define DEFAULT_SERVER_PATH "/org.apache.celix.discovery.configured"
>>  #define DEFAULT_POLL_ENDPOINTS "
>> http://localhost:9999/org.apache.celix.discovery.configured"
>> 
>> Modified:
>> celix/trunk/remote_services/discovery_etcd/private/include/discovery_impl.h
>> URL:
>> http://svn.apache.org/viewvc/celix/trunk/remote_services/discovery_etcd/private/include/discovery_impl.h?rev=1632567&r1=1632566&r2=1632567&view=diff
>> 
>> ==============================================================================
>> ---
>> celix/trunk/remote_services/discovery_etcd/private/include/discovery_impl.h
>> (original)
>> +++
>> celix/trunk/remote_services/discovery_etcd/private/include/discovery_impl.h
>> Fri Oct 17 13:05:20 2014
>> @@ -38,9 +38,11 @@
>>  #include "etcd_watcher.h"
>> 
>> 
>> +#define DEFAULT_SERVER_IP      "127.0.0.1"
>>  #define DEFAULT_SERVER_PORT "9999"
>>  #define DEFAULT_SERVER_PATH "/org.apache.celix.discovery.etcd"
>> -#define DEFAULT_POLL_ENDPOINTS "
>> http://localhost:9999/org.apache.celix.discovery.etcd"
>> +
>> +#define DEFAULT_POLL_ENDPOINTS ""
>> 
>>  #define MAX_ROOTNODE_LENGTH             64
>>  #define MAX_LOCALNODE_LENGTH   256
>> 
>> Modified:
>> celix/trunk/remote_services/discovery_etcd/private/include/etcd_watcher.h
>> URL:
>> http://svn.apache.org/viewvc/celix/trunk/remote_services/discovery_etcd/private/include/etcd_watcher.h?rev=1632567&r1=1632566&r2=1632567&view=diff
>> 
>> ==============================================================================
>> ---
>> celix/trunk/remote_services/discovery_etcd/private/include/etcd_watcher.h
>> (original)
>> +++
>> celix/trunk/remote_services/discovery_etcd/private/include/etcd_watcher.h
>> Fri Oct 17 13:05:20 2014
>> @@ -33,7 +33,7 @@
>> 
>>  typedef struct etcd_watcher *etcd_watcher_pt;
>> 
>> -celix_status_t etcdWatcher_create(endpoint_discovery_poller_pt 
>> poller,
>> bundle_context_pt context, etcd_watcher_pt *watcher);
>> +celix_status_t etcdWatcher_create(discovery_pt discovery,
>> bundle_context_pt context, etcd_watcher_pt *watcher);
>>  celix_status_t etcdWatcher_destroy(etcd_watcher_pt watcher);
>> 
>> 
>> 
>> Modified: 
>> celix/trunk/remote_services/discovery_etcd/private/src/etcd.c
>> URL:
>> http://svn.apache.org/viewvc/celix/trunk/remote_services/discovery_etcd/private/src/etcd.c?rev=1632567&r1=1632566&r2=1632567&view=diff
>> 
>> ==============================================================================
>> --- celix/trunk/remote_services/discovery_etcd/private/src/etcd.c
>> (original)
>> +++ celix/trunk/remote_services/discovery_etcd/private/src/etcd.c Fri 
>> Oct
>> 17 13:05:20 2014
>> @@ -103,7 +103,7 @@ bool etcd_get(char* key, char* value, ch
>>         } else if ((js_root = json_loads(reply.memory, 0, &error)) ==
>> NULL) {
>>                 printf("error while parsing json data\n");
>>         } else if ((js_node = json_object_get(js_root, 
>> ETCD_JSON_NODE)) ==
>> NULL) {
>> -               printf("error while retrieving expected node 
>> object\n");
>> +               printf("error while retrieving expected node object 
>> %s\n",
>> json_dumps(js_root, 0));
>>         } else if (((js_value = json_object_get(js_node, 
>> ETCD_JSON_VALUE))
>> == NULL) || ((js_value = json_object_get(js_node, ETCD_JSON_VALUE)) ==
>> NULL) || ((js_modifiedIndex = json_object_get(js_node,
>> ETCD_JSON_MODIFIEDINDEX)) == NULL)) {
>>                 printf("error while retrieving expected objects\n");
>>         }
>> @@ -208,7 +208,7 @@ bool etcd_set(char* key, char* value, in
>>         } else if ((js_root = json_loads(reply.memory, 0, &error)) ==
>> NULL) {
>>                 printf("error while parsing json data\n");
>>         } else if ((js_node = json_object_get(js_root, 
>> ETCD_JSON_NODE)) ==
>> NULL) {
>> -               printf("error while retrieving expected node 
>> object\n");
>> +               printf("error while retrieving expected node object 
>> %s\n",
>> json_dumps(js_root, 0));
>>         } else if ((js_value = json_object_get(js_node, 
>> ETCD_JSON_VALUE))
>> == NULL) {
>>                 printf("error while retrieving expected value 
>> object\n");
>>         } else if (json_is_string(js_value)) {
>> @@ -247,7 +247,7 @@ bool etcd_del(char* key) {
>>         } else if ((js_root = json_loads(reply.memory, 0, &error)) ==
>> NULL) {
>>                 printf("error while parsing json data\n");
>>         } else if ((js_node = json_object_get(js_root, 
>> ETCD_JSON_NODE)) ==
>> NULL) {
>> -               printf("error while retrieving expected node 
>> object\n");
>> +               printf("error while retrieving expected node object 
>> %s\n",
>> json_dumps(js_root, 0));
>>         } else {
>>                 retVal = true;
>>         }
>> @@ -277,10 +277,10 @@ bool etcd_watch(char* key, int index, ch
>>         reply.size = 0; /* no data at this point */
>> 
>>         if (index != 0)
>> -               snprintf(url, MAX_URL_LENGTH, 
>> "http://%s:%d/v2/keys/%s?wait=true&waitIndex=%d",
>> etcd_server, etcd_port, key,
>> +               snprintf(url, MAX_URL_LENGTH, 
>> "http://%s:%d/v2/keys/%s?wait=true&recursive=true&waitIndex=%d",
>> etcd_server, etcd_port, key,
>>                                 index);
>>         else
>> -               snprintf(url, MAX_URL_LENGTH, 
>> "http://%s:%d/v2/keys/%s?wait=true",
>> etcd_server, etcd_port, key);
>> +               snprintf(url, MAX_URL_LENGTH, 
>> "http://%s:%d/v2/keys/%s?wait=true&recursive=true",
>> etcd_server, etcd_port, key);
>> 
>>         res = performRequest(url, GET, WriteMemoryCallback, NULL, 
>> (void*)
>> &reply);
>> 
>> @@ -290,18 +290,24 @@ bool etcd_watch(char* key, int index, ch
>>                 printf("error while performing curl w/ %s\n", url);
>>         } else if ((js_root = json_loads(reply.memory, 0, &error)) ==
>> NULL) {
>>                 printf("error while parsing json data\n");
>> -       } else if (((js_action = json_object_get(js_root,
>> ETCD_JSON_ACTION)) == NULL) ||
>> -                       ((js_node = json_object_get(js_root,
>> ETCD_JSON_NODE)) == NULL) ||
>> -                       ((js_prevNode = json_object_get(js_root,
>> ETCD_JSON_PREVNODE)) == NULL)) {
>> -               printf("error while retrieving expected node 
>> object\n");
>> -       } else if (((js_value = json_object_get(js_node, 
>> ETCD_JSON_VALUE))
>> == NULL) ||
>> -                       ((js_prevValue = json_object_get(js_prevNode,
>> ETCD_JSON_VALUE)) == NULL)) {
>> -               printf("error while retrieving expected value 
>> objects\n");
>> -       } else if (json_is_string(js_value) &&
>> json_is_string(js_prevValue) && json_is_string(js_action)) {
>> -               strncpy(value, json_string_value(js_value),
>> MAX_VALUE_LENGTH);
>> -               strncpy(prevValue, json_string_value(js_prevValue),
>> MAX_VALUE_LENGTH);
>> -               strncpy(action, json_string_value(js_action),
>> MAX_ACTION_LENGTH);
>> -               retVal = true;
>> +       } else {
>> +               js_action = json_object_get(js_root, 
>> ETCD_JSON_ACTION);
>> +               js_node = json_object_get(js_root, ETCD_JSON_NODE);
>> +               js_prevNode = json_object_get(js_root, 
>> ETCD_JSON_PREVNODE);
>> +
>> +               if (js_action == NULL || js_node == NULL) {
>> +                       printf("error while retrieving expected node
>> object %s\n", json_dumps(js_root, 0));
>> +               } else if ((js_value = json_object_get(js_node,
>> ETCD_JSON_VALUE)) == NULL) {
>> +                       printf("error while retrieving expected value
>> objects\n");
>> +               }
>> +               else if (json_is_string(js_value) &&
>> json_is_string(js_action)) {
>> +                       if ((js_prevNode != NULL) && ((js_prevValue =
>> json_object_get(js_prevNode, ETCD_JSON_VALUE)) != NULL) &&
>> (json_is_string(js_prevValue))) {
>> +                               strncpy(prevValue,
>> json_string_value(js_prevValue), MAX_VALUE_LENGTH);
>> +                       }
>> +                       strncpy(value, json_string_value(js_value),
>> MAX_VALUE_LENGTH);
>> +                       strncpy(action, json_string_value(js_action),
>> MAX_ACTION_LENGTH);
>> +                       retVal = true;
>> +               }
>>         }
>> 
>>         if (reply.memory) {
>> 
>> Modified:
>> celix/trunk/remote_services/discovery_etcd/private/src/etcd_watcher.c
>> URL:
>> http://svn.apache.org/viewvc/celix/trunk/remote_services/discovery_etcd/private/src/etcd_watcher.c?rev=1632567&r1=1632566&r2=1632567&view=diff
>> 
>> ==============================================================================
>> --- 
>> celix/trunk/remote_services/discovery_etcd/private/src/etcd_watcher.c
>> (original)
>> +++ 
>> celix/trunk/remote_services/discovery_etcd/private/src/etcd_watcher.c
>> Fri Oct 17 13:05:20 2014
>> @@ -145,25 +145,31 @@ static celix_status_t etcdWatcher_addAlr
>>  }
>> 
>> 
>> -static celix_status_t etcdWatcher_addOwnFramework(bundle_context_pt
>> context)
>> +static celix_status_t etcdWatcher_addOwnFramework(etcd_watcher_pt 
>> watcher)
>>  {
>>      celix_status_t status = CELIX_BUNDLE_EXCEPTION;
>>      char localNodePath[MAX_LOCALNODE_LENGTH];
>>      char value[MAX_VALUE_LENGTH];
>>      char action[MAX_VALUE_LENGTH];
>> +       char url[MAX_VALUE_LENGTH];
>>      int modIndex;
>>      char* endpoints = NULL;
>>      char* ttlStr = NULL;
>>      int ttl;
>> 
>> +       bundle_context_pt context = watcher->discovery->context;
>> +       endpoint_discovery_server_pt server = 
>> watcher->discovery->server;
>> +
>>      // register own framework
>>      if ((status = etcdWatcher_getLocalNodePath(context,
>> &localNodePath[0])) != CELIX_SUCCESS) {
>>          return status;
>>      }
>> 
>> -    if ((bundleContext_getProperty(context, DISCOVERY_POLL_ENDPOINTS,
>> &endpoints) != CELIX_SUCCESS) || !endpoints) {
>> -        endpoints = DEFAULT_POLL_ENDPOINTS;
>> -    }
>> +       if (endpointDiscoveryServer_getUrl(server, &url[0]) !=
>> CELIX_SUCCESS) {
>> +               snprintf(url, MAX_VALUE_LENGTH, "http://%s:%s/%s",
>> DEFAULT_SERVER_IP, DEFAULT_SERVER_PORT, DEFAULT_SERVER_PATH);
>> +       }
>> +
>> +       endpoints = &url[0];
>> 
>>      if ((bundleContext_getProperty(context, CFG_ETCD_TTL, &ttlStr) !=
>> CELIX_SUCCESS) || !ttlStr) {
>>          ttl = DEFAULT_ETCD_TTL;
>> @@ -212,19 +218,21 @@ static void* etcdWatcher_run(void* data)
>>                 char preValue[MAX_VALUE_LENGTH];
>>                 char action[MAX_ACTION_LENGTH];
>> 
>> -               if (etcd_watch(rootPath, highestModified + 1, 
>> &action[0],
>> &preValue[0], &value[0]) == true) {
>> +               if (etcd_watch(rootPath, 0, &action[0], &preValue[0],
>> &value[0]) == true) {
>> 
>>                         if (strcmp(action, "set") == 0) {
>> -
>>  endpointDiscoveryPoller_removeDiscoveryEndpoint(poller, 
>> &preValue[0]);
>> 
>> endpointDiscoveryPoller_addDiscoveryEndpoint(poller, &value[0]);
>>                         } else if (strcmp(action, "delete") == 0) {
>> 
>> endpointDiscoveryPoller_removeDiscoveryEndpoint(poller, &preValue[0]);
>> +                       } else if (strcmp(action, "update") == 0) {
>> +                               // TODO
>>                         } else {
>>                                 fw_log(logger, 
>> OSGI_FRAMEWORK_LOG_INFO,
>> "Unexpected action: %s", action);
>>                         }
>>                 }
>> +
>>                 // update own framework uuid in any case;
>> -           etcdWatcher_addOwnFramework(context);
>> +           etcdWatcher_addOwnFramework(watcher);
>>         }
>> 
>>         return NULL;
>> @@ -278,7 +286,7 @@ celix_status_t etcdWatcher_create(discov
>>                 return CELIX_BUNDLE_EXCEPTION;
>>         }
>> 
>> -       etcdWatcher_addOwnFramework(context);
>> +       etcdWatcher_addOwnFramework((*watcher));
>> 
>>         if ((status = 
>> celixThreadMutex_create(&(*watcher)->watcherLock,
>> NULL)) != CELIX_SUCCESS) {
>>                 return status;
>> 
>> 
>> 

Mime
View raw message