celix-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [celix] pnoltes commented on a change in pull request #223: Feature/pubsub custom serializers
Date Mon, 18 May 2020 07:18:16 GMT

pnoltes commented on a change in pull request #223:
URL: https://github.com/apache/celix/pull/223#discussion_r426414311



##########
File path: bundles/pubsub/pubsub_utils/src/pubsub_serialization_provider.c
##########
@@ -0,0 +1,668 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#include "pubsub_serialization_provider.h"
+
+#include <stdlib.h>
+#include <stdarg.h>
+#include <dirent.h>
+#include <string.h>
+
+#include "celix_constants.h"
+#include "dyn_function.h"
+#include "celix_version.h"
+#include "celix_utils.h"
+#include "dyn_message.h"
+#include "pubsub_utils.h"
+#include "celix_log_helper.h"
+#include "pubsub_message_serialization_service.h"
+#include "celix_shell_command.h"
+
+#define MAX_PATH_LEN    1024
+
+typedef enum
+{
+    FIT_INVALID = 0,
+    FIT_DESCRIPTOR = 1,
+    FIT_AVPR = 2
+} descriptor_type_e;
+
+#define L_DEBUG(...) \
+    celix_logHelper_debug(provider->logHelper, __VA_ARGS__)
+#define L_INFO(...) \
+    celix_logHelper_info(provider->logHelper, __VA_ARGS__)
+#define L_WARN(...) \
+    celix_logHelper_warning(provider->logHelper, __VA_ARGS__)
+#define L_ERROR(...) \
+    celix_logHelper_error(provider->logHelper, __VA_ARGS__)
+
+
+struct pubsub_serialization_provider {
+    celix_bundle_context_t *ctx;
+    celix_log_helper_t *logHelper;
+    char* serializationType;
+
+    //serialization callbacks
+    celix_status_t (*serialize)(pubsub_serialization_entry_t* entry, const void* msg, struct
iovec** output, size_t* outputIovLen);
+    void (*freeSerializeMsg)(pubsub_serialization_entry_t* entry, struct iovec* input, size_t
inputIovLen);
+    celix_status_t (*deserialize)(pubsub_serialization_entry_t* entry, const struct iovec*
input, size_t inputIovLen __attribute__((unused)), void **out);
+    void (*freeDeserializeMsg)(pubsub_serialization_entry_t* entry, void *msg);
+
+    //updated serialization services
+    long bundleTrackerId;
+
+    pubsub_message_serialization_marker_t markerSvc;
+    long serializationMarkerSvcId;
+
+    celix_shell_command_t cmdSvc;
+    long cmdSvcId;
+
+    celix_thread_mutex_t mutex; //protects below
+    celix_array_list_t *serializationSvcEntries; //key = pubsub_serialization_entry;
+};
+
+static void dfi_log(void *handle, int level, const char *file, int line, const char *msg,
...) {
+    (void)level;
+    va_list ap;
+    pubsub_serialization_provider_t *provider = handle;
+    char *logStr = NULL;
+    va_start(ap, msg);
+    vasprintf(&logStr, msg, ap);
+    va_end(ap);
+    celix_logHelper_log(provider->logHelper, CELIX_LOG_LEVEL_WARNING, "FILE:%s, LINE:%i,
MSG:%s", file, line, logStr);
+    free(logStr);
+}
+
+static descriptor_type_e getDescriptorType(const char* filename) {
+    if (strstr(filename, ".descriptor")) {
+        return FIT_DESCRIPTOR;
+    }
+    else if (strstr(filename, ".properties")) {
+        return FIT_AVPR;
+    }
+    else {
+        return FIT_INVALID;
+    }
+}
+
+static bool readPropertiesFile(pubsub_serialization_provider_t* provider, const char* properties_file_name,
const char* root, char* avpr_fqn, char* path) {
+    snprintf(path, MAX_PATH_LEN, "%s/%s", root, properties_file_name); // use path to create
path to properties file
+    FILE *properties = fopen(path, "r");
+    if (!properties) {
+        L_WARN("Could not find or open %s as a properties file in %s\n", properties_file_name,
root);
+        return false;
+    }
+
+    *avpr_fqn = '\0';
+    *path = '\0'; //re-use path to create path to avpr file
+    char *p_line = malloc(MAX_PATH_LEN);
+    size_t line_len = MAX_PATH_LEN;
+    while (getline(&p_line, &line_len, properties) >= 0) {
+        if (strncmp(p_line, "fqn=", strlen("fqn=")) == 0) {
+            snprintf(avpr_fqn, MAX_PATH_LEN, "%s", (p_line + strlen("fqn=")));
+            avpr_fqn[strcspn(avpr_fqn, "\n")] = 0;
+        }
+        else if (strncmp(p_line, "avpr=", strlen("avpr=")) == 0) {
+            snprintf(path, MAX_PATH_LEN, "%s/%s", root, (p_line + strlen("avpr=")));
+            path[strcspn(path, "\n")] = 0;
+        }
+    }
+    free(p_line);
+    fclose(properties);
+
+    if (*avpr_fqn == '\0') {
+        L_WARN("File %s does not contain a fully qualified name for the parser\n", properties_file_name);
+        return false;
+    }
+
+    if (*path == '\0') {
+        L_WARN("File %s does not contain a location for the avpr file\n", properties_file_name);
+        return false;
+    }
+
+    return true;
+}
+
+static FILE* openFileStream(pubsub_serialization_provider_t* provider, descriptor_type_e
descriptorType, const char* filename, const char* root, char* avpr_fqn, char* pathOrError)
{
+    FILE* result = NULL;
+    memset(pathOrError, 0, MAX_PATH_LEN);
+    switch (descriptorType) {
+        case FIT_INVALID:
+            snprintf(pathOrError, MAX_PATH_LEN, "Because %s is not a valid file", filename);
+            break;
+        case FIT_DESCRIPTOR:
+            snprintf(pathOrError, MAX_PATH_LEN, "%s/%s", root, filename);
+            result = fopen(pathOrError, "r");
+            break;
+        case FIT_AVPR:
+            if (readPropertiesFile(provider, filename, root, avpr_fqn, pathOrError)) {
+                result = fopen(pathOrError, "r");
+            }
+            break;
+        default:
+            L_WARN("Unknown file input type, returning NULL!\n");
+            break;
+    }
+
+    return result;
+}
+
+static unsigned int pubsub_serializationProvider_getMsgId(pubsub_serialization_provider_t*
provider __attribute__((unused)), dyn_message_type *msg) {
+    unsigned int msgId = 0;
+
+    char *msgName = NULL;
+    dynMessage_getName(msg, &msgName);
+
+    char *msgIdStr = NULL;
+    int rv = dynMessage_getAnnotationEntry(msg, "msgId", &msgIdStr);
+    if (rv == CELIX_SUCCESS && msgIdStr != NULL) {
+        // custom msg id passed, use it
+        long customMsgId = strtol(msgIdStr, NULL, 10);
+        if (customMsgId > 0) {
+            msgId = (unsigned int) customMsgId;
+        }
+    }
+    if (msgId == 0) {
+        msgId = celix_utils_stringHash(msgName);
+    }
+
+    return msgId;
+}
+
+static dyn_message_type* pubsub_serializationProvider_parseDfiDescriptor(pubsub_serialization_provider_t*
provider, FILE* stream, const char* entryPath) {
+    dyn_message_type *msg = NULL;
+    int rc = dynMessage_parse(stream, &msg);
+    if (rc != 0 || msg == NULL) {
+        L_WARN("Cannot parse message from descriptor from entry %s.\n", entryPath);
+        return NULL;
+    }
+
+    char *msgName = NULL;
+    rc += dynMessage_getName(msg, &msgName);
+
+    version_pt msgVersion = NULL;
+    rc += dynMessage_getVersion(msg, &msgVersion);
+
+    if (rc != 0 || msgName == NULL || msgVersion == NULL) {
+        L_WARN("Cannot retrieve name and/or version from msg, using entry %s.\n", entryPath);
+        dynMessage_destroy(msg);
+        return NULL;
+    }
+
+    return msg;
+}
+
+//TODO FIXME, see #158
+//
+//    static dyn_message_type* pubsub_serializationProvider_parseAvprDescriptor(pubsub_serialization_provider_t*
provider, FILE* stream, const char *entryName, const char* fqn) {
+//
+//    //dyn_message_type* msgType = dynMessage_parseAvpr(file_ptr, fqn);
+//    dyn_message_type* msgType = NULL;
+//
+//    if (!msgType) {
+//        L_WARN("[json serializer] Cannot parse avpr file '%s'\n", fqn);
+//        return -1;
+//    }
+//
+//    dyn_type* type;
+//    dynMessage_getMessageType(msgType, &type);
+//
+//    const char *msgName = dynType_getName(type);
+//
+//    version_pt msgVersion = NULL;
+//    celix_status_t s = version_createVersionFromString(dynType_getMetaInfo(type, "version"),
&msgVersion);
+//
+//    if (s != CELIX_SUCCESS || !msgName) {
+//        L_WARN("[json serializer] Cannot retrieve name and/or version from msg\n");
+//        if (s == CELIX_SUCCESS) {
+//            version_destroy(msgVersion);
+//        }
+//        return -1;
+//    }
+//
+//    unsigned int msgId = 0;
+//    const char *msgIdStr = dynType_getMetaInfo(type, "msgId");
+//    if (msgIdStr != NULL) {
+//        // custom msg id passed, use it
+//        long customMsgId = strtol(msgIdStr, NULL, 10);
+//        if (customMsgId > 0)
+//            msgId = (unsigned int) customMsgId;
+//    }
+//
+//    if (msgId == 0) {
+//        msgId = utils_stringHash(msgName);
+//    }
+//
+//
+//
+//    return 0;
+//}
+//}
+
+/**
+ * Returns true if the msgType is valid and uqinue (new msg fqn & msg id).
+ * Logs error if msg id clashes or versions are different.
+ * Logs warning if descriptors are different.
+ */
+static bool pubsub_serializationProvider_isUniqueAndCheckValid(pubsub_serialization_provider_t*
provider, pubsub_serialization_entry_t* entry) {

Review comment:
       renamed and added doc




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



Mime
View raw message