celix-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [celix] abroekhuis commented on a change in pull request #223: Feature/pubsub custom serializers
Date Sun, 17 May 2020 20:12:04 GMT

abroekhuis commented on a change in pull request #223:
URL: https://github.com/apache/celix/pull/223#discussion_r426298603



##########
File path: bundles/pubsub/pubsub_utils/src/pubsub_serializer_handler.c
##########
@@ -0,0 +1,360 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+
+#include "pubsub_serializer_handler.h"
+
+#include <string.h>
+
+#include "celix_version.h"
+#include "pubsub_message_serialization_service.h"
+#include "celix_log_helper.h"
+
+#define L_DEBUG(...) \
+    celix_logHelper_debug(handler->logHelper, __VA_ARGS__)
+#define L_INFO(...) \
+    celix_logHelper_info(handler->logHelper, __VA_ARGS__)
+#define L_WARN(...) \
+    celix_logHelper_warning(handler->logHelper, __VA_ARGS__)
+#define L_ERROR(...) \
+    celix_logHelper_error(handler->logHelper, __VA_ARGS__)
+
+typedef struct pubsub_serialization_service_entry {
+    long svcId;
+    const celix_properties_t *properties;
+    uint32_t msgId;
+    celix_version_t* msgVersion;
+    char* msgFqn;
+    pubsub_message_serialization_service_t* svc;
+} pubsub_serialization_service_entry_t;
+
+struct pubsub_serializer_handler {
+    celix_bundle_context_t* ctx;
+    bool backwardCompatible;
+    long serializationSvcTrackerId;
+    celix_log_helper_t *logHelper;
+
+    celix_thread_rwlock_t lock;
+    hash_map_t *serializationServices; //key = msg id, value = sorted array list with pubsub_serialization_service_entry_t*
+};
+
+static void addSvc(void *handle, void* svc, const celix_properties_t *props) {
+    pubsub_serializer_handler_t* handler = handle;
+    pubsub_message_serialization_service_t* serSvc = svc;
+    pubsub_serializerHandler_addSerializationService(handler, serSvc, props);
+}
+
+static void remSvc(void *handle, void* svc, const celix_properties_t *props) {
+    pubsub_serializer_handler_t* handler = handle;
+    pubsub_message_serialization_service_t* serSvc = svc;
+    pubsub_serializerHandler_removeSerializationService(handler, serSvc, props);
+}
+
+int compareEntries(const void *a, const void *b) {
+    const pubsub_serialization_service_entry_t* aEntry = a;
+    const pubsub_serialization_service_entry_t* bEntry = b;
+
+    long servIdA = celix_properties_getAsLong(aEntry->properties, OSGI_FRAMEWORK_SERVICE_ID, 0);
+    long servIdB = celix_properties_getAsLong(bEntry->properties, OSGI_FRAMEWORK_SERVICE_ID, 0);
+
+    long servRankingA = celix_properties_getAsLong(aEntry->properties, OSGI_FRAMEWORK_SERVICE_RANKING, 0);
+    long servRankingB = celix_properties_getAsLong(bEntry->properties, OSGI_FRAMEWORK_SERVICE_RANKING, 0);
+
+    return utils_compareServiceIdsAndRanking(servIdA, servRankingA, servIdB, servRankingB);
+}
+
+static pubsub_serialization_service_entry_t* findEntry(pubsub_serializer_handler_t* handler, uint32_t msgId) {
+    //NOTE assumes mutex is locked
+    celix_array_list_t* entries = hashMap_get(handler->serializationServices, (void*)(uintptr_t)msgId);
+    if (entries != NULL) {
+        return celix_arrayList_get(entries, 0); //NOTE if entries not null, always at least 1 entry
+    }
+    return NULL;
+}
+
+static bool isCompatible(pubsub_serializer_handler_t* handler, pubsub_serialization_service_entry_t* entry, int serializedMajorVersion, int serializedMinorVersion) {
+    bool compatible = false;
+    if (handler->backwardCompatible) {
+        compatible = celix_version_isUserCompatible(entry->msgVersion, serializedMajorVersion, serializedMinorVersion);
+    } else {
+        int major = celix_version_getMajor(entry->msgVersion);
+        int minor = celix_version_getMinor(entry->msgVersion);
+        compatible = major == serializedMajorVersion && minor == serializedMinorVersion;
+    }
+    return compatible;
+}
+
+static const char* getMsgFqn(pubsub_serializer_handler_t* handler, uint32_t msgId) {
+    //NOTE assumes mutex is locked
+    const char *result = NULL;
+    celix_array_list_t* entries = hashMap_get(handler->serializationServices, (void*)(uintptr_t)msgId);
+    if (entries != NULL) {
+        pubsub_serialization_service_entry_t *entry = celix_arrayList_get(entries, 0); //NOTE if an entries exists, there is at least 1 entry.
+        result = entry->msgFqn;
+    }
+    return result;
+}
+
+pubsub_serializer_handler_t* pubsub_serializerHandler_create(celix_bundle_context_t* ctx, const char* serializerType, bool backwardCompatible) {
+    pubsub_serializer_handler_t* handler = calloc(1, sizeof(*handler));
+    handler->ctx = ctx;
+    handler->backwardCompatible = backwardCompatible;
+
+    handler->logHelper = celix_logHelper_create(ctx, "celix_pubsub_serialization_handler");
+
+    celixThreadRwlock_create(&handler->lock, NULL);
+    handler->serializationServices = hashMap_create(NULL, NULL, NULL, NULL);
+
+    char *filter = NULL;
+    asprintf(&filter, "(%s=%s)", PUBSUB_MESSAGE_SERIALIZATION_SERVICE_SERIALIZATION_TYPE_PROPERTY, serializerType);
+    celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
+    opts.filter.serviceName = PUBSUB_MESSAGE_SERIALIZATION_SERVICE_NAME;
+    opts.filter.versionRange = PUBSUB_MESSAGE_SERIALIZATION_SERVICE_RANGE;
+    opts.filter.filter = filter;
+    opts.callbackHandle = handler;
+    opts.addWithProperties = addSvc;
+    opts.removeWithProperties = remSvc;
+    handler->serializationSvcTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
+    free(filter);
+
+    return handler;
+}
+
+
+void pubsub_serializerHandler_destroy(pubsub_serializer_handler_t* handler) {
+    if (handler != NULL) {
+        celix_bundleContext_stopTracker(handler->ctx, handler->serializationSvcTrackerId);
+        celixThreadRwlock_destroy(&handler->lock);
+        hash_map_iterator_t iter = hashMapIterator_construct(handler->serializationServices);
+        while (hashMapIterator_hasNext(&iter)) {
+            celix_array_list_t *entries = hashMapIterator_nextValue(&iter);
+            for (int i = 0; i < celix_arrayList_size(entries); ++i) {
+                pubsub_serialization_service_entry_t* entry = celix_arrayList_get(entries, i);
+                free(entry->msgFqn);
+                celix_version_destroy(entry->msgVersion);
+                free(entry);
+            }
+            celix_arrayList_destroy(entries);
+        }
+        hashMap_destroy(handler->serializationServices, false, false);
+        celix_logHelper_destroy(handler->logHelper);
+        free(handler);
+    }
+}
+
+void pubsub_serializerHandler_addSerializationService(pubsub_serializer_handler_t* handler, pubsub_message_serialization_service_t* svc, const celix_properties_t* svcProperties) {
+    long svcId = celix_properties_getAsLong(svcProperties, OSGI_FRAMEWORK_SERVICE_ID, -1L);
+    const char *msgFqn = celix_properties_get(svcProperties, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_FQN_PROPERTY, NULL);
+    const char *version = celix_properties_get(svcProperties, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_VERSION_PROPERTY, "0.0.0");
+    uint32_t msgId = (uint32_t)celix_properties_getAsLong(svcProperties, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_ID_PROPERTY, 0L);
+
+    if (msgId == 0) {
+        msgId = celix_utils_stringHash(msgFqn);
+    }
+
+    celix_version_t* msgVersion = celix_version_createVersionFromString(version);
+    if (msgVersion == NULL) {
+        L_ERROR("%s service has an invalid %s property. value is '%s'", PUBSUB_MESSAGE_SERIALIZATION_SERVICE_NAME, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_VERSION_PROPERTY, msgVersion);
+        return;
+    }
+
+    celixThreadRwlock_writeLock(&handler->lock);
+
+    pubsub_serialization_service_entry_t* existingEntry = findEntry(handler, msgId);
+
+    bool valid = true;
+    if (existingEntry != NULL && strncmp(existingEntry->msgFqn, msgFqn, 1024*1024) != 0) {
+        L_ERROR("Msg id clash. Registered serialization service with msg id %d and msg fqn '%s' clashes with an existing serialization service using the same msg id and msg fqn '%s'. Ignoring serialization service.", msgId, msgFqn, existingEntry->msgFqn);
+        valid = false;
+    }
+
+    if (existingEntry != NULL && celix_version_compareTo(existingEntry->msgVersion, msgVersion) != 0) {
+        char* existingVersion = celix_version_toString(existingEntry->msgVersion);
+        L_ERROR("Mismatched message versions. Registered serialization service with msg '%s' with version %s, has a different version than an existing serialization service with version '%s'. Ignoring serialization service.", msgFqn, version, existingVersion);
+        free(existingVersion);
+        valid = false;
+    }
+
+    if (valid) {
+        celix_array_list_t *entries = hashMap_get(handler->serializationServices, (void *) (uintptr_t) msgId);
+        if (entries == NULL) {
+            entries = celix_arrayList_create();
+        }
+        pubsub_serialization_service_entry_t *entry = calloc(1, sizeof(*entry));
+        entry->svcId = svcId;
+        entry->properties = svcProperties;
+        entry->msgFqn = celix_utils_strdup(msgFqn);
+        entry->msgId = msgId;
+        entry->msgVersion = msgVersion;
+        entry->svc = svc;
+        celix_arrayList_add(entries, entry);
+        celix_arrayList_sort(entries, compareEntries);
+
+        hashMap_put(handler->serializationServices, (void *) (uintptr_t) msgId, entries);
+    } else {
+        celix_version_destroy(msgVersion);
+    }
+    celixThreadRwlock_unlock(&handler->lock);
+}
+
+void pubsub_serializerHandler_removeSerializationService(pubsub_serializer_handler_t* handler, pubsub_message_serialization_service_t* svc, const celix_properties_t* svcProperties) {
+    long svcId = celix_properties_getAsLong(svcProperties, OSGI_FRAMEWORK_SERVICE_ID, -1L);
+    const char *msgFqn = celix_properties_get(svcProperties, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_FQN_PROPERTY, NULL);
+    uint32_t msgId = (uint32_t)celix_properties_getAsLong(svcProperties, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_ID_PROPERTY, 0L);
+    if (msgId == 0) {
+        msgId = celix_utils_stringHash(msgFqn);
+    }
+
+    celixThreadRwlock_writeLock(&handler->lock);
+    celix_array_list_t* entries = hashMap_get(handler->serializationServices, (void*)(uintptr_t)msgId);
+    if (entries != NULL) {
+        pubsub_serialization_service_entry_t *found = NULL;
+        for (int i = 0; i < celix_arrayList_size(entries); ++i) {
+            pubsub_serialization_service_entry_t *entry = celix_arrayList_get(entries, i);
+            if (entry->svcId == svcId) {
+                found = entry;
+                celix_arrayList_removeAt(entries, i);
+                celix_arrayList_sort(entries, compareEntries);
+                break;
+            }
+        }
+        if (found != NULL) {
+            free(found->msgFqn);
+            celix_version_destroy(found->msgVersion);
+            free(found);
+        }
+        if (celix_arrayList_size(entries) == 0) {
+            hashMap_remove(handler->serializationServices, (void*)(uintptr_t)msgId);
+            celix_arrayList_destroy(entries);
+        }
+    }
+    celixThreadRwlock_unlock(&handler->lock);
+}
+
+celix_status_t pubsub_serializerHandler_serialize(pubsub_serializer_handler_t* handler, uint32_t msgId, const void* input, struct iovec** output, size_t* outputIovLen) {

Review comment:
       iovec already has a length. Why outputIovLen?

##########
File path: bundles/pubsub/pubsub_utils/src/pubsub_serializer_handler.c
##########
@@ -0,0 +1,360 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+
+#include "pubsub_serializer_handler.h"
+
+#include <string.h>
+
+#include "celix_version.h"
+#include "pubsub_message_serialization_service.h"
+#include "celix_log_helper.h"
+
+#define L_DEBUG(...) \
+    celix_logHelper_debug(handler->logHelper, __VA_ARGS__)
+#define L_INFO(...) \
+    celix_logHelper_info(handler->logHelper, __VA_ARGS__)
+#define L_WARN(...) \
+    celix_logHelper_warning(handler->logHelper, __VA_ARGS__)
+#define L_ERROR(...) \
+    celix_logHelper_error(handler->logHelper, __VA_ARGS__)
+
+typedef struct pubsub_serialization_service_entry {
+    long svcId;
+    const celix_properties_t *properties;
+    uint32_t msgId;
+    celix_version_t* msgVersion;
+    char* msgFqn;
+    pubsub_message_serialization_service_t* svc;
+} pubsub_serialization_service_entry_t;
+
+struct pubsub_serializer_handler {
+    celix_bundle_context_t* ctx;
+    bool backwardCompatible;
+    long serializationSvcTrackerId;
+    celix_log_helper_t *logHelper;
+
+    celix_thread_rwlock_t lock;
+    hash_map_t *serializationServices; //key = msg id, value = sorted array list with pubsub_serialization_service_entry_t*
+};
+
+static void addSvc(void *handle, void* svc, const celix_properties_t *props) {
+    pubsub_serializer_handler_t* handler = handle;
+    pubsub_message_serialization_service_t* serSvc = svc;
+    pubsub_serializerHandler_addSerializationService(handler, serSvc, props);
+}
+
+static void remSvc(void *handle, void* svc, const celix_properties_t *props) {
+    pubsub_serializer_handler_t* handler = handle;
+    pubsub_message_serialization_service_t* serSvc = svc;
+    pubsub_serializerHandler_removeSerializationService(handler, serSvc, props);
+}
+
+int compareEntries(const void *a, const void *b) {
+    const pubsub_serialization_service_entry_t* aEntry = a;
+    const pubsub_serialization_service_entry_t* bEntry = b;
+
+    long servIdA = celix_properties_getAsLong(aEntry->properties, OSGI_FRAMEWORK_SERVICE_ID, 0);
+    long servIdB = celix_properties_getAsLong(bEntry->properties, OSGI_FRAMEWORK_SERVICE_ID, 0);
+
+    long servRankingA = celix_properties_getAsLong(aEntry->properties, OSGI_FRAMEWORK_SERVICE_RANKING, 0);
+    long servRankingB = celix_properties_getAsLong(bEntry->properties, OSGI_FRAMEWORK_SERVICE_RANKING, 0);
+
+    return utils_compareServiceIdsAndRanking(servIdA, servRankingA, servIdB, servRankingB);
+}
+
+static pubsub_serialization_service_entry_t* findEntry(pubsub_serializer_handler_t* handler, uint32_t msgId) {
+    //NOTE assumes mutex is locked
+    celix_array_list_t* entries = hashMap_get(handler->serializationServices, (void*)(uintptr_t)msgId);
+    if (entries != NULL) {
+        return celix_arrayList_get(entries, 0); //NOTE if entries not null, always at least 1 entry
+    }
+    return NULL;
+}
+
+static bool isCompatible(pubsub_serializer_handler_t* handler, pubsub_serialization_service_entry_t* entry, int serializedMajorVersion, int serializedMinorVersion) {
+    bool compatible = false;
+    if (handler->backwardCompatible) {
+        compatible = celix_version_isUserCompatible(entry->msgVersion, serializedMajorVersion, serializedMinorVersion);
+    } else {
+        int major = celix_version_getMajor(entry->msgVersion);
+        int minor = celix_version_getMinor(entry->msgVersion);
+        compatible = major == serializedMajorVersion && minor == serializedMinorVersion;

Review comment:
       Is there no function in version for this?

##########
File path: bundles/pubsub/pubsub_serializer_avrobin/gtest/CMakeLists.txt
##########
@@ -0,0 +1,36 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+add_celix_bundle(pubsub_avrobin_serialization_descriptor NO_ACTIVATOR VERSION 1.0.0)
+celix_bundle_files(pubsub_avrobin_serialization_descriptor
+		${CMAKE_CURRENT_SOURCE_DIR}/msg_descriptors/msg_poi1.descriptor
+		DESTINATION "META-INF/descriptors"
+)
+
+add_executable(test_pubsub_serializer_avrobin
+        src/PubSubAvrobinSerializationProviderTestSuite.cc
+)
+target_link_libraries(test_pubsub_serializer_avrobin PRIVATE Celix::framework Celix::dfi Celix::pubsub_utils GTest::gtest GTest::gtest_main)
+target_compile_options(test_pubsub_serializer_avrobin PRIVATE -std=c++14) #Note test code is allowed to be C++14
+
+add_dependencies(test_pubsub_serializer_avrobin celix_pubsub_serializer_avrobin_bundle pubsub_avrobin_serialization_descriptor_bundle)
+target_compile_definitions(test_pubsub_serializer_avrobin PRIVATE -DSER_BUNDLE=\"$<TARGET_PROPERTY:celix_pubsub_serializer_avrobin,BUNDLE_FILE>\")

Review comment:
       SER_BUNDLE -> SERIALIZATION_BUNDLE?

##########
File path: bundles/pubsub/pubsub_utils/src/pubsub_serializer_handler.c
##########
@@ -0,0 +1,360 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+
+#include "pubsub_serializer_handler.h"
+
+#include <string.h>
+
+#include "celix_version.h"
+#include "pubsub_message_serialization_service.h"
+#include "celix_log_helper.h"
+
+#define L_DEBUG(...) \
+    celix_logHelper_debug(handler->logHelper, __VA_ARGS__)
+#define L_INFO(...) \
+    celix_logHelper_info(handler->logHelper, __VA_ARGS__)
+#define L_WARN(...) \
+    celix_logHelper_warning(handler->logHelper, __VA_ARGS__)
+#define L_ERROR(...) \
+    celix_logHelper_error(handler->logHelper, __VA_ARGS__)
+
+typedef struct pubsub_serialization_service_entry {
+    long svcId;
+    const celix_properties_t *properties;
+    uint32_t msgId;
+    celix_version_t* msgVersion;
+    char* msgFqn;
+    pubsub_message_serialization_service_t* svc;
+} pubsub_serialization_service_entry_t;
+
+struct pubsub_serializer_handler {
+    celix_bundle_context_t* ctx;
+    bool backwardCompatible;
+    long serializationSvcTrackerId;
+    celix_log_helper_t *logHelper;
+
+    celix_thread_rwlock_t lock;
+    hash_map_t *serializationServices; //key = msg id, value = sorted array list with pubsub_serialization_service_entry_t*
+};
+
+static void addSvc(void *handle, void* svc, const celix_properties_t *props) {

Review comment:
       Why not addService? Much more obvious in reading.
   
   Applies to several other functions as well. Often it is fully descriptive, but there are several shorthand versions.

##########
File path: bundles/pubsub/pubsub_serializer_avrobin/src/pubsub_avrobin_serialization_provider.c
##########
@@ -0,0 +1,120 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#include "pubsub_avrobin_serialization_provider.h"
+
+#include <stdlib.h>
+#include <stdarg.h>
+#include <string.h>
+#include <assert.h>
+
+#include "avrobin_serializer.h"
+#include "dyn_message.h"
+#include "celix_log_helper.h"
+#include "pubsub_message_serialization_service.h"
+
+static void dfi_log(void *handle, int level, const char *file, int line, const char *msg, ...) {
+    va_list ap;
+    celix_log_helper_t *log = handle;
+    char *logStr = NULL;
+    va_start(ap, msg);
+    vasprintf(&logStr, msg, ap);
+    va_end(ap);
+    celix_logHelper_log(log, level, "FILE:%s, LINE:%i, MSG:%s", file, line, logStr);
+    free(logStr);
+}
+
+
+static celix_status_t pubsub_avrobinSerializationProvider_serialize(pubsub_serialization_entry_t* entry, const void* msg, struct iovec** output, size_t* outputIovLen) {
+    celix_status_t status = CELIX_SUCCESS;
+
+    if (output != NULL) {
+        *output = calloc(1, sizeof(struct iovec));
+        *outputIovLen = 1;
+    } else {
+        return CELIX_ILLEGAL_ARGUMENT;
+    }
+
+    uint8_t *serializedOutput = NULL;
+    size_t serializedOutputLen;
+    dyn_type* dynType;
+    dynMessage_getMessageType(entry->msgType, &dynType);
+
+    if (avrobinSerializer_serialize(dynType, msg, &serializedOutput, &serializedOutputLen) != 0) {
+        status = CELIX_BUNDLE_EXCEPTION;
+    }
+
+    if (status == CELIX_SUCCESS) {
+        (**output).iov_base = (void*)serializedOutput;
+        (**output).iov_len  = serializedOutputLen;
+    }
+
+    return status;
+}
+
+void pubsub_avrobinSerializationProvider_freeSerializeMsg(pubsub_serialization_entry_t* entry, struct iovec* input, size_t inputIovLen) {
+    if (input != NULL) {
+        if (entry->msgType != NULL) {
+            for (int i = 0; i < inputIovLen; i++) {
+                if (input[i].iov_base) {
+                    free(input[i].iov_base);
+                }
+                input[i].iov_base = NULL;
+                input[i].iov_len = 0;
+            }
+        }
+        free(input);
+    }
+}
+
+celix_status_t pubsub_avrobinSerializationProvider_deserialize(pubsub_serialization_entry_t* entry, const struct iovec* input, size_t inputIovLen, void **out) {
+    celix_status_t status = CELIX_SUCCESS;
+    if (input == NULL) return CELIX_BUNDLE_EXCEPTION;
+    void *msg = NULL;
+    dyn_type* dynType;
+    dynMessage_getMessageType(entry->msgType, &dynType);
+
+    assert(inputIovLen == 1);
+
+    if (avrobinSerializer_deserialize(dynType, (uint8_t *)input->iov_base, input->iov_len, &msg) != 0) {
+        status = CELIX_BUNDLE_EXCEPTION;
+    } else{
+        *out = msg;
+    }
+
+    return status;
+}
+
+void pubsub_avrobinSerializationProvider_freeDeserializeMsg(pubsub_serialization_entry_t* entry, void *msg) {
+    if (entry->msgType != NULL) {
+        dyn_type* dynType;
+        dynMessage_getMessageType(entry->msgType, &dynType);
+        dynType_free(dynType, msg);
+    }
+}
+
+pubsub_serialization_provider_t* pubsub_avrobinSerializationProvider_create(celix_bundle_context_t* ctx)  {
+    pubsub_serialization_provider_t* provider = pubsub_serializationProvider_create(ctx, "avrobin", 0, pubsub_avrobinSerializationProvider_serialize, pubsub_avrobinSerializationProvider_freeSerializeMsg, pubsub_avrobinSerializationProvider_deserialize, pubsub_avrobinSerializationProvider_freeDeserializeMsg);
+    avrobinSerializer_logSetup(dfi_log, pubsub_serializationProvider_getLogHelper(provider), 1);;

Review comment:
       double ; at end of line

##########
File path: bundles/pubsub/pubsub_utils/src/pubsub_serialization_provider.c
##########
@@ -0,0 +1,668 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#include "pubsub_serialization_provider.h"
+
+#include <stdlib.h>
+#include <stdarg.h>
+#include <dirent.h>
+#include <string.h>
+
+#include "celix_constants.h"
+#include "dyn_function.h"
+#include "celix_version.h"
+#include "celix_utils.h"
+#include "dyn_message.h"
+#include "pubsub_utils.h"
+#include "celix_log_helper.h"
+#include "pubsub_message_serialization_service.h"
+#include "celix_shell_command.h"
+
+#define MAX_PATH_LEN    1024
+
+typedef enum
+{
+    FIT_INVALID = 0,
+    FIT_DESCRIPTOR = 1,
+    FIT_AVPR = 2
+} descriptor_type_e;
+
+#define L_DEBUG(...) \
+    celix_logHelper_debug(provider->logHelper, __VA_ARGS__)
+#define L_INFO(...) \
+    celix_logHelper_info(provider->logHelper, __VA_ARGS__)
+#define L_WARN(...) \
+    celix_logHelper_warning(provider->logHelper, __VA_ARGS__)
+#define L_ERROR(...) \
+    celix_logHelper_error(provider->logHelper, __VA_ARGS__)
+
+
+struct pubsub_serialization_provider {
+    celix_bundle_context_t *ctx;
+    celix_log_helper_t *logHelper;
+    char* serializationType;
+
+    //serialization callbacks
+    celix_status_t (*serialize)(pubsub_serialization_entry_t* entry, const void* msg, struct iovec** output, size_t* outputIovLen);
+    void (*freeSerializeMsg)(pubsub_serialization_entry_t* entry, struct iovec* input, size_t inputIovLen);
+    celix_status_t (*deserialize)(pubsub_serialization_entry_t* entry, const struct iovec* input, size_t inputIovLen __attribute__((unused)), void **out);
+    void (*freeDeserializeMsg)(pubsub_serialization_entry_t* entry, void *msg);
+
+    //updated serialization services
+    long bundleTrackerId;
+
+    pubsub_message_serialization_marker_t markerSvc;
+    long serializationMarkerSvcId;
+
+    celix_shell_command_t cmdSvc;
+    long cmdSvcId;
+
+    celix_thread_mutex_t mutex; //protects below
+    celix_array_list_t *serializationSvcEntries; //key = pubsub_serialization_entry;
+};
+
+static void dfi_log(void *handle, int level, const char *file, int line, const char *msg, ...) {
+    (void)level;
+    va_list ap;
+    pubsub_serialization_provider_t *provider = handle;
+    char *logStr = NULL;
+    va_start(ap, msg);
+    vasprintf(&logStr, msg, ap);
+    va_end(ap);
+    celix_logHelper_log(provider->logHelper, CELIX_LOG_LEVEL_WARNING, "FILE:%s, LINE:%i, MSG:%s", file, line, logStr);
+    free(logStr);
+}
+
+static descriptor_type_e getDescriptorType(const char* filename) {
+    if (strstr(filename, ".descriptor")) {
+        return FIT_DESCRIPTOR;
+    }
+    else if (strstr(filename, ".properties")) {
+        return FIT_AVPR;
+    }
+    else {
+        return FIT_INVALID;
+    }
+}
+
+static bool readPropertiesFile(pubsub_serialization_provider_t* provider, const char* properties_file_name, const char* root, char* avpr_fqn, char* path) {
+    snprintf(path, MAX_PATH_LEN, "%s/%s", root, properties_file_name); // use path to create path to properties file
+    FILE *properties = fopen(path, "r");
+    if (!properties) {
+        L_WARN("Could not find or open %s as a properties file in %s\n", properties_file_name, root);
+        return false;
+    }
+
+    *avpr_fqn = '\0';
+    *path = '\0'; //re-use path to create path to avpr file
+    char *p_line = malloc(MAX_PATH_LEN);
+    size_t line_len = MAX_PATH_LEN;
+    while (getline(&p_line, &line_len, properties) >= 0) {
+        if (strncmp(p_line, "fqn=", strlen("fqn=")) == 0) {
+            snprintf(avpr_fqn, MAX_PATH_LEN, "%s", (p_line + strlen("fqn=")));
+            avpr_fqn[strcspn(avpr_fqn, "\n")] = 0;
+        }
+        else if (strncmp(p_line, "avpr=", strlen("avpr=")) == 0) {
+            snprintf(path, MAX_PATH_LEN, "%s/%s", root, (p_line + strlen("avpr=")));
+            path[strcspn(path, "\n")] = 0;
+        }
+    }
+    free(p_line);
+    fclose(properties);
+
+    if (*avpr_fqn == '\0') {
+        L_WARN("File %s does not contain a fully qualified name for the parser\n", properties_file_name);
+        return false;
+    }
+
+    if (*path == '\0') {
+        L_WARN("File %s does not contain a location for the avpr file\n", properties_file_name);
+        return false;
+    }
+
+    return true;
+}
+
+static FILE* openFileStream(pubsub_serialization_provider_t* provider, descriptor_type_e descriptorType, const char* filename, const char* root, char* avpr_fqn, char* pathOrError) {
+    FILE* result = NULL;
+    memset(pathOrError, 0, MAX_PATH_LEN);
+    switch (descriptorType) {
+        case FIT_INVALID:
+            snprintf(pathOrError, MAX_PATH_LEN, "Because %s is not a valid file", filename);
+            break;
+        case FIT_DESCRIPTOR:
+            snprintf(pathOrError, MAX_PATH_LEN, "%s/%s", root, filename);
+            result = fopen(pathOrError, "r");
+            break;
+        case FIT_AVPR:
+            if (readPropertiesFile(provider, filename, root, avpr_fqn, pathOrError)) {
+                result = fopen(pathOrError, "r");
+            }
+            break;
+        default:
+            L_WARN("Unknown file input type, returning NULL!\n");
+            break;
+    }
+
+    return result;
+}
+
+static unsigned int pubsub_serializationProvider_getMsgId(pubsub_serialization_provider_t* provider __attribute__((unused)), dyn_message_type *msg) {
+    unsigned int msgId = 0;
+
+    char *msgName = NULL;
+    dynMessage_getName(msg, &msgName);
+
+    char *msgIdStr = NULL;
+    int rv = dynMessage_getAnnotationEntry(msg, "msgId", &msgIdStr);
+    if (rv == CELIX_SUCCESS && msgIdStr != NULL) {
+        // custom msg id passed, use it
+        long customMsgId = strtol(msgIdStr, NULL, 10);
+        if (customMsgId > 0) {
+            msgId = (unsigned int) customMsgId;
+        }
+    }
+    if (msgId == 0) {
+        msgId = celix_utils_stringHash(msgName);
+    }
+
+    return msgId;
+}
+
+static dyn_message_type* pubsub_serializationProvider_parseDfiDescriptor(pubsub_serialization_provider_t* provider, FILE* stream, const char* entryPath) {
+    dyn_message_type *msg = NULL;
+    int rc = dynMessage_parse(stream, &msg);
+    if (rc != 0 || msg == NULL) {
+        L_WARN("Cannot parse message from descriptor from entry %s.\n", entryPath);
+        return NULL;
+    }
+
+    char *msgName = NULL;
+    rc += dynMessage_getName(msg, &msgName);
+
+    version_pt msgVersion = NULL;
+    rc += dynMessage_getVersion(msg, &msgVersion);
+
+    if (rc != 0 || msgName == NULL || msgVersion == NULL) {
+        L_WARN("Cannot retrieve name and/or version from msg, using entry %s.\n", entryPath);
+        dynMessage_destroy(msg);
+        return NULL;
+    }
+
+    return msg;
+}
+
+//TODO FIXME, see #158
+//
+//    static dyn_message_type* pubsub_serializationProvider_parseAvprDescriptor(pubsub_serialization_provider_t* provider, FILE* stream, const char *entryName, const char* fqn) {
+//
+//    //dyn_message_type* msgType = dynMessage_parseAvpr(file_ptr, fqn);
+//    dyn_message_type* msgType = NULL;
+//
+//    if (!msgType) {
+//        L_WARN("[json serializer] Cannot parse avpr file '%s'\n", fqn);
+//        return -1;
+//    }
+//
+//    dyn_type* type;
+//    dynMessage_getMessageType(msgType, &type);
+//
+//    const char *msgName = dynType_getName(type);
+//
+//    version_pt msgVersion = NULL;
+//    celix_status_t s = version_createVersionFromString(dynType_getMetaInfo(type, "version"), &msgVersion);
+//
+//    if (s != CELIX_SUCCESS || !msgName) {
+//        L_WARN("[json serializer] Cannot retrieve name and/or version from msg\n");
+//        if (s == CELIX_SUCCESS) {
+//            version_destroy(msgVersion);
+//        }
+//        return -1;
+//    }
+//
+//    unsigned int msgId = 0;
+//    const char *msgIdStr = dynType_getMetaInfo(type, "msgId");
+//    if (msgIdStr != NULL) {
+//        // custom msg id passed, use it
+//        long customMsgId = strtol(msgIdStr, NULL, 10);
+//        if (customMsgId > 0)
+//            msgId = (unsigned int) customMsgId;
+//    }
+//
+//    if (msgId == 0) {
+//        msgId = utils_stringHash(msgName);
+//    }
+//
+//
+//
+//    return 0;
+//}
+//}
+
+/**
+ * Returns true if the msgType is valid and uqinue (new msg fqn & msg id).
+ * Logs error if msg id clashes or versions are different.
+ * Logs warning if descriptors are different.
+ */
+static bool pubsub_serializationProvider_isUniqueAndCheckValid(pubsub_serialization_provider_t* provider, pubsub_serialization_entry_t* entry) {

Review comment:
       isUniqueAndValid?

##########
File path: bundles/pubsub/pubsub_utils/src/pubsub_serialization_provider.c
##########
@@ -0,0 +1,668 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#include "pubsub_serialization_provider.h"
+
+#include <stdlib.h>
+#include <stdarg.h>
+#include <dirent.h>
+#include <string.h>
+
+#include "celix_constants.h"
+#include "dyn_function.h"
+#include "celix_version.h"
+#include "celix_utils.h"
+#include "dyn_message.h"
+#include "pubsub_utils.h"
+#include "celix_log_helper.h"
+#include "pubsub_message_serialization_service.h"
+#include "celix_shell_command.h"
+
+#define MAX_PATH_LEN    1024
+
+typedef enum
+{
+    FIT_INVALID = 0,
+    FIT_DESCRIPTOR = 1,
+    FIT_AVPR = 2
+} descriptor_type_e;
+
+#define L_DEBUG(...) \
+    celix_logHelper_debug(provider->logHelper, __VA_ARGS__)
+#define L_INFO(...) \
+    celix_logHelper_info(provider->logHelper, __VA_ARGS__)
+#define L_WARN(...) \
+    celix_logHelper_warning(provider->logHelper, __VA_ARGS__)
+#define L_ERROR(...) \
+    celix_logHelper_error(provider->logHelper, __VA_ARGS__)
+
+
+struct pubsub_serialization_provider {
+    celix_bundle_context_t *ctx;
+    celix_log_helper_t *logHelper;
+    char* serializationType;
+
+    //serialization callbacks
+    celix_status_t (*serialize)(pubsub_serialization_entry_t* entry, const void* msg, struct iovec** output, size_t* outputIovLen);
+    void (*freeSerializeMsg)(pubsub_serialization_entry_t* entry, struct iovec* input, size_t inputIovLen);
+    celix_status_t (*deserialize)(pubsub_serialization_entry_t* entry, const struct iovec* input, size_t inputIovLen __attribute__((unused)), void **out);
+    void (*freeDeserializeMsg)(pubsub_serialization_entry_t* entry, void *msg);
+
+    //updated serialization services
+    long bundleTrackerId;
+
+    pubsub_message_serialization_marker_t markerSvc;
+    long serializationMarkerSvcId;
+
+    celix_shell_command_t cmdSvc;
+    long cmdSvcId;
+
+    celix_thread_mutex_t mutex; //protects below
+    celix_array_list_t *serializationSvcEntries; //key = pubsub_serialization_entry;
+};
+
+static void dfi_log(void *handle, int level, const char *file, int line, const char *msg, ...) {
+    (void)level;
+    va_list ap;
+    pubsub_serialization_provider_t *provider = handle;
+    char *logStr = NULL;
+    va_start(ap, msg);
+    vasprintf(&logStr, msg, ap);
+    va_end(ap);
+    celix_logHelper_log(provider->logHelper, CELIX_LOG_LEVEL_WARNING, "FILE:%s, LINE:%i, MSG:%s", file, line, logStr);
+    free(logStr);
+}
+
+static descriptor_type_e getDescriptorType(const char* filename) {
+    if (strstr(filename, ".descriptor")) {
+        return FIT_DESCRIPTOR;
+    }
+    else if (strstr(filename, ".properties")) {
+        return FIT_AVPR;
+    }
+    else {
+        return FIT_INVALID;
+    }
+}
+
+static bool readPropertiesFile(pubsub_serialization_provider_t* provider, const char* properties_file_name, const char* root, char* avpr_fqn, char* path) {

Review comment:
       This reads the dfi files? Name is confusing/conflicting with the regular properties read/write.

##########
File path: bundles/pubsub/pubsub_utils/src/pubsub_serializer_handler.c
##########
@@ -0,0 +1,360 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+
+#include "pubsub_serializer_handler.h"
+
+#include <string.h>
+
+#include "celix_version.h"
+#include "pubsub_message_serialization_service.h"
+#include "celix_log_helper.h"
+
+#define L_DEBUG(...) \
+    celix_logHelper_debug(handler->logHelper, __VA_ARGS__)
+#define L_INFO(...) \
+    celix_logHelper_info(handler->logHelper, __VA_ARGS__)
+#define L_WARN(...) \
+    celix_logHelper_warning(handler->logHelper, __VA_ARGS__)
+#define L_ERROR(...) \
+    celix_logHelper_error(handler->logHelper, __VA_ARGS__)
+
+typedef struct pubsub_serialization_service_entry {
+    long svcId;
+    const celix_properties_t *properties;
+    uint32_t msgId;
+    celix_version_t* msgVersion;
+    char* msgFqn;
+    pubsub_message_serialization_service_t* svc;
+} pubsub_serialization_service_entry_t;
+
+struct pubsub_serializer_handler {
+    celix_bundle_context_t* ctx;
+    bool backwardCompatible;
+    long serializationSvcTrackerId;
+    celix_log_helper_t *logHelper;
+
+    celix_thread_rwlock_t lock;
+    hash_map_t *serializationServices; //key = msg id, value = sorted array list with pubsub_serialization_service_entry_t*
+};
+
+static void addSvc(void *handle, void* svc, const celix_properties_t *props) {
+    pubsub_serializer_handler_t* handler = handle;
+    pubsub_message_serialization_service_t* serSvc = svc;
+    pubsub_serializerHandler_addSerializationService(handler, serSvc, props);
+}
+
+static void remSvc(void *handle, void* svc, const celix_properties_t *props) {
+    pubsub_serializer_handler_t* handler = handle;
+    pubsub_message_serialization_service_t* serSvc = svc;
+    pubsub_serializerHandler_removeSerializationService(handler, serSvc, props);
+}
+
+int compareEntries(const void *a, const void *b) {
+    const pubsub_serialization_service_entry_t* aEntry = a;
+    const pubsub_serialization_service_entry_t* bEntry = b;
+
+    long servIdA = celix_properties_getAsLong(aEntry->properties, OSGI_FRAMEWORK_SERVICE_ID, 0);
+    long servIdB = celix_properties_getAsLong(bEntry->properties, OSGI_FRAMEWORK_SERVICE_ID, 0);
+
+    long servRankingA = celix_properties_getAsLong(aEntry->properties, OSGI_FRAMEWORK_SERVICE_RANKING, 0);
+    long servRankingB = celix_properties_getAsLong(bEntry->properties, OSGI_FRAMEWORK_SERVICE_RANKING, 0);
+
+    return utils_compareServiceIdsAndRanking(servIdA, servRankingA, servIdB, servRankingB);
+}
+
+static pubsub_serialization_service_entry_t* findEntry(pubsub_serializer_handler_t* handler, uint32_t msgId) {
+    //NOTE assumes mutex is locked
+    celix_array_list_t* entries = hashMap_get(handler->serializationServices, (void*)(uintptr_t)msgId);
+    if (entries != NULL) {
+        return celix_arrayList_get(entries, 0); //NOTE if entries not null, always at least 1 entry
+    }
+    return NULL;
+}
+
+static bool isCompatible(pubsub_serializer_handler_t* handler, pubsub_serialization_service_entry_t* entry, int serializedMajorVersion, int serializedMinorVersion) {
+    bool compatible = false;
+    if (handler->backwardCompatible) {
+        compatible = celix_version_isUserCompatible(entry->msgVersion, serializedMajorVersion, serializedMinorVersion);
+    } else {
+        int major = celix_version_getMajor(entry->msgVersion);
+        int minor = celix_version_getMinor(entry->msgVersion);
+        compatible = major == serializedMajorVersion && minor == serializedMinorVersion;
+    }
+    return compatible;
+}
+
+static const char* getMsgFqn(pubsub_serializer_handler_t* handler, uint32_t msgId) {
+    //NOTE assumes mutex is locked
+    const char *result = NULL;
+    celix_array_list_t* entries = hashMap_get(handler->serializationServices, (void*)(uintptr_t)msgId);
+    if (entries != NULL) {
+        pubsub_serialization_service_entry_t *entry = celix_arrayList_get(entries, 0); //NOTE if an entries exists, there is at least 1 entry.
+        result = entry->msgFqn;
+    }
+    return result;
+}
+
+pubsub_serializer_handler_t* pubsub_serializerHandler_create(celix_bundle_context_t* ctx, const char* serializerType, bool backwardCompatible) {
+    pubsub_serializer_handler_t* handler = calloc(1, sizeof(*handler));
+    handler->ctx = ctx;
+    handler->backwardCompatible = backwardCompatible;
+
+    handler->logHelper = celix_logHelper_create(ctx, "celix_pubsub_serialization_handler");
+
+    celixThreadRwlock_create(&handler->lock, NULL);
+    handler->serializationServices = hashMap_create(NULL, NULL, NULL, NULL);
+
+    char *filter = NULL;
+    asprintf(&filter, "(%s=%s)", PUBSUB_MESSAGE_SERIALIZATION_SERVICE_SERIALIZATION_TYPE_PROPERTY, serializerType);
+    celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
+    opts.filter.serviceName = PUBSUB_MESSAGE_SERIALIZATION_SERVICE_NAME;
+    opts.filter.versionRange = PUBSUB_MESSAGE_SERIALIZATION_SERVICE_RANGE;
+    opts.filter.filter = filter;
+    opts.callbackHandle = handler;
+    opts.addWithProperties = addSvc;
+    opts.removeWithProperties = remSvc;
+    handler->serializationSvcTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
+    free(filter);
+
+    return handler;
+}
+
+
+void pubsub_serializerHandler_destroy(pubsub_serializer_handler_t* handler) {
+    if (handler != NULL) {
+        celix_bundleContext_stopTracker(handler->ctx, handler->serializationSvcTrackerId);
+        celixThreadRwlock_destroy(&handler->lock);
+        hash_map_iterator_t iter = hashMapIterator_construct(handler->serializationServices);
+        while (hashMapIterator_hasNext(&iter)) {
+            celix_array_list_t *entries = hashMapIterator_nextValue(&iter);
+            for (int i = 0; i < celix_arrayList_size(entries); ++i) {
+                pubsub_serialization_service_entry_t* entry = celix_arrayList_get(entries, i);
+                free(entry->msgFqn);
+                celix_version_destroy(entry->msgVersion);
+                free(entry);
+            }
+            celix_arrayList_destroy(entries);
+        }
+        hashMap_destroy(handler->serializationServices, false, false);
+        celix_logHelper_destroy(handler->logHelper);
+        free(handler);
+    }
+}
+
+void pubsub_serializerHandler_addSerializationService(pubsub_serializer_handler_t* handler, pubsub_message_serialization_service_t* svc, const celix_properties_t* svcProperties) {
+    long svcId = celix_properties_getAsLong(svcProperties, OSGI_FRAMEWORK_SERVICE_ID, -1L);
+    const char *msgFqn = celix_properties_get(svcProperties, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_FQN_PROPERTY, NULL);
+    const char *version = celix_properties_get(svcProperties, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_VERSION_PROPERTY, "0.0.0");
+    uint32_t msgId = (uint32_t)celix_properties_getAsLong(svcProperties, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_ID_PROPERTY, 0L);
+
+    if (msgId == 0) {
+        msgId = celix_utils_stringHash(msgFqn);
+    }
+
+    celix_version_t* msgVersion = celix_version_createVersionFromString(version);
+    if (msgVersion == NULL) {
+        L_ERROR("%s service has an invalid %s property. value is '%s'", PUBSUB_MESSAGE_SERIALIZATION_SERVICE_NAME, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_VERSION_PROPERTY, msgVersion);
+        return;
+    }
+
+    celixThreadRwlock_writeLock(&handler->lock);
+
+    pubsub_serialization_service_entry_t* existingEntry = findEntry(handler, msgId);
+
+    bool valid = true;
+    if (existingEntry != NULL && strncmp(existingEntry->msgFqn, msgFqn, 1024*1024) != 0) {
+        L_ERROR("Msg id clash. Registered serialization service with msg id %d and msg fqn '%s' clashes with an existing serialization service using the same msg id and msg fqn '%s'. Ignoring serialization service.", msgId, msgFqn, existingEntry->msgFqn);
+        valid = false;
+    }
+
+    if (existingEntry != NULL && celix_version_compareTo(existingEntry->msgVersion, msgVersion) != 0) {
+        char* existingVersion = celix_version_toString(existingEntry->msgVersion);
+        L_ERROR("Mismatched message versions. Registered serialization service with msg '%s' with version %s, has a different version than an existing serialization service with version '%s'. Ignoring serialization service.", msgFqn, version, existingVersion);
+        free(existingVersion);
+        valid = false;
+    }
+
+    if (valid) {
+        celix_array_list_t *entries = hashMap_get(handler->serializationServices, (void *) (uintptr_t) msgId);
+        if (entries == NULL) {
+            entries = celix_arrayList_create();
+        }
+        pubsub_serialization_service_entry_t *entry = calloc(1, sizeof(*entry));
+        entry->svcId = svcId;
+        entry->properties = svcProperties;
+        entry->msgFqn = celix_utils_strdup(msgFqn);
+        entry->msgId = msgId;
+        entry->msgVersion = msgVersion;
+        entry->svc = svc;
+        celix_arrayList_add(entries, entry);
+        celix_arrayList_sort(entries, compareEntries);
+
+        hashMap_put(handler->serializationServices, (void *) (uintptr_t) msgId, entries);
+    } else {
+        celix_version_destroy(msgVersion);
+    }
+    celixThreadRwlock_unlock(&handler->lock);
+}
+
+void pubsub_serializerHandler_removeSerializationService(pubsub_serializer_handler_t* handler, pubsub_message_serialization_service_t* svc, const celix_properties_t* svcProperties) {
+    long svcId = celix_properties_getAsLong(svcProperties, OSGI_FRAMEWORK_SERVICE_ID, -1L);
+    const char *msgFqn = celix_properties_get(svcProperties, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_FQN_PROPERTY, NULL);
+    uint32_t msgId = (uint32_t)celix_properties_getAsLong(svcProperties, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_ID_PROPERTY, 0L);
+    if (msgId == 0) {
+        msgId = celix_utils_stringHash(msgFqn);
+    }
+
+    celixThreadRwlock_writeLock(&handler->lock);
+    celix_array_list_t* entries = hashMap_get(handler->serializationServices, (void*)(uintptr_t)msgId);
+    if (entries != NULL) {
+        pubsub_serialization_service_entry_t *found = NULL;
+        for (int i = 0; i < celix_arrayList_size(entries); ++i) {
+            pubsub_serialization_service_entry_t *entry = celix_arrayList_get(entries, i);
+            if (entry->svcId == svcId) {
+                found = entry;
+                celix_arrayList_removeAt(entries, i);
+                celix_arrayList_sort(entries, compareEntries);
+                break;
+            }
+        }
+        if (found != NULL) {
+            free(found->msgFqn);
+            celix_version_destroy(found->msgVersion);
+            free(found);
+        }
+        if (celix_arrayList_size(entries) == 0) {
+            hashMap_remove(handler->serializationServices, (void*)(uintptr_t)msgId);
+            celix_arrayList_destroy(entries);
+        }
+    }
+    celixThreadRwlock_unlock(&handler->lock);
+}
+
+celix_status_t pubsub_serializerHandler_serialize(pubsub_serializer_handler_t* handler, uint32_t msgId, const void* input, struct iovec** output, size_t* outputIovLen) {
+    celix_status_t status;
+    celixThreadRwlock_readLock(&handler->lock);
+    pubsub_serialization_service_entry_t* entry = findEntry(handler, msgId);
+    if (entry != NULL) {
+        status = entry->svc->serialize(entry->svc->handle, input, output, outputIovLen);
+    } else {
+        status = CELIX_ILLEGAL_ARGUMENT;
+        L_ERROR("Cannot find message serialization service for msg id %u.", msgId);
+    }
+    celixThreadRwlock_unlock(&handler->lock);
+    return status;
+}
+
+celix_status_t pubsub_serializerHandler_freeSerializedMsg(pubsub_serializer_handler_t* handler, uint32_t msgId, struct iovec* input, size_t inputIovLen) {
+    celix_status_t status = CELIX_SUCCESS;
+    celixThreadRwlock_readLock(&handler->lock);
+    pubsub_serialization_service_entry_t* entry = findEntry(handler, msgId);
+    if (entry != NULL) {
+        entry->svc->freeSerializedMsg(entry->svc->handle, input, inputIovLen);
+    } else {
+        status = CELIX_ILLEGAL_ARGUMENT;
+        L_ERROR("Cannot find message serialization service for msg id %u.", msgId);
+    }
+    celixThreadRwlock_unlock(&handler->lock);
+    return status;
+
+}
+
+celix_status_t pubsub_serializerHandler_deserialize(pubsub_serializer_handler_t* handler, uint32_t msgId, int serializedMajorVersion, int serializedMinorVersion, const struct iovec* input, size_t inputIovLen, void** out) {
+    celix_status_t status;
+    celixThreadRwlock_readLock(&handler->lock);
+    pubsub_serialization_service_entry_t* entry = findEntry(handler, msgId);
+    bool compatible = false;
+    if (entry != NULL) {
+        compatible = isCompatible(handler, entry, serializedMajorVersion, serializedMinorVersion);
+        if (compatible) {
+            status = entry->svc->deserialize(entry->svc->handle, input, inputIovLen, out);
+        } else {
+            status = CELIX_ILLEGAL_ARGUMENT;
+            char *version = celix_version_toString(entry->msgVersion);
+            L_ERROR("Cannot deserialize for message %s version %s. The serialized input has a version of %d.%d.x and this is incompatible.", entry->msgFqn, version, serializedMajorVersion, serializedMinorVersion);
+            free(version);
+        }
+    } else {
+        status = CELIX_ILLEGAL_ARGUMENT;
+        L_ERROR("Cannot find message serialization service for msg id %u.", msgId);
+    }
+    celixThreadRwlock_unlock(&handler->lock);
+    return status;
+}
+
+celix_status_t pubsub_serializerHandler_freeDeserializedMsg(pubsub_serializer_handler_t* handler, uint32_t msgId, void* msg) {
+    celix_status_t status = CELIX_SUCCESS;
+    celixThreadRwlock_readLock(&handler->lock);
+    pubsub_serialization_service_entry_t* entry = findEntry(handler, msgId);
+    if (entry != NULL) {
+        entry->svc->freeDeserializedMsg(entry->svc->handle, msg);
+    } else {
+        status = CELIX_ILLEGAL_ARGUMENT;
+        L_ERROR("Cannot find message serialization service for msg id %u.", msgId);
+    }
+    celixThreadRwlock_unlock(&handler->lock);
+    return status;
+}
+
+bool pubsub_serializerHandler_supportMsg(pubsub_serializer_handler_t* handler, uint32_t msgId, int serializedMajorVersion, int serializedMinorVersion) {

Review comment:
       Does this check if a message can be (de)serialized? If so -> isMessageSupported. If not, what does it do?

##########
File path: bundles/pubsub/pubsub_serializer_avrobin/src/pubsub_avrobin_serialization_provider.c
##########
@@ -0,0 +1,120 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#include "pubsub_avrobin_serialization_provider.h"
+
+#include <stdlib.h>
+#include <stdarg.h>
+#include <string.h>
+#include <assert.h>
+
+#include "avrobin_serializer.h"
+#include "dyn_message.h"
+#include "celix_log_helper.h"
+#include "pubsub_message_serialization_service.h"
+
+static void dfi_log(void *handle, int level, const char *file, int line, const char *msg, ...) {
+    va_list ap;
+    celix_log_helper_t *log = handle;
+    char *logStr = NULL;
+    va_start(ap, msg);
+    vasprintf(&logStr, msg, ap);
+    va_end(ap);
+    celix_logHelper_log(log, level, "FILE:%s, LINE:%i, MSG:%s", file, line, logStr);
+    free(logStr);
+}
+
+
+static celix_status_t pubsub_avrobinSerializationProvider_serialize(pubsub_serialization_entry_t* entry, const void* msg, struct iovec** output, size_t* outputIovLen) {

Review comment:
       Why outputIovLen? Not even used?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



Mime
View raw message