From dev-return-1766-apmail-celix-dev-archive=celix.apache.org@celix.apache.org Fri Mar 13 18:44:21 2020 Return-Path: X-Original-To: apmail-celix-dev-archive@www.apache.org Delivered-To: apmail-celix-dev-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by minotaur.apache.org (Postfix) with SMTP id D531C1906E for ; Fri, 13 Mar 2020 18:44:19 +0000 (UTC) Received: (qmail 29762 invoked by uid 500); 13 Mar 2020 18:44:19 -0000 Delivered-To: apmail-celix-dev-archive@celix.apache.org Received: (qmail 29745 invoked by uid 500); 13 Mar 2020 18:44:19 -0000 Mailing-List: contact dev-help@celix.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@celix.apache.org Delivered-To: mailing list dev@celix.apache.org Received: (qmail 29721 invoked by uid 99); 13 Mar 2020 18:44:19 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 13 Mar 2020 18:44:19 +0000 From: GitBox To: dev@celix.apache.org Subject: [GitHub] [celix] abroekhuis commented on a change in pull request #165: Feature/framework racecondition Message-ID: <158412505910.5261.16628404064546357317.gitbox@gitbox.apache.org> References: In-Reply-To: Date: Fri, 13 Mar 2020 18:44:19 -0000 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit abroekhuis commented on a change in pull request #165: Feature/framework racecondition URL: https://github.com/apache/celix/pull/165#discussion_r390855760 ########## File path: libs/framework/src/service_registry.c ########## @@ -972,4 +1032,249 @@ bool celix_serviceRegistry_getServiceInfo( celixThreadRwlock_unlock(®istry->lock); return found; -} \ No newline at end of file +} + +celix_status_t celix_serviceRegistry_addServiceListener(celix_service_registry_t *registry, celix_bundle_t *bundle, const char *stringFilter, celix_service_listener_t *listener) { + + celix_filter_t *filter = NULL; + if (stringFilter != NULL) { + filter = celix_filter_create(stringFilter); + if (filter == NULL) { + fw_log(logger, OSGI_FRAMEWORK_LOG_ERROR, "Cannot add service listener filter '%s' is invalid", stringFilter); + return CELIX_ILLEGAL_ARGUMENT; + } + } + + celix_service_registry_service_listener_entry_t *entry = calloc(1, sizeof(*entry)); + entry->bundle = bundle; + entry->filter = filter; + entry->listener = listener; + entry->useCount = 1; //new entry -> count on 1 + celixThreadMutex_create(&entry->mutex, NULL); + celixThreadCondition_init(&entry->cond, NULL); + + celix_array_list_t *registrations = celix_arrayList_create(); + + celixThreadRwlock_writeLock(®istry->lock); + celix_arrayList_add(registry->serviceListeners, entry); //use count 1 + + //find already registered services + hash_map_iterator_t iter = hashMapIterator_construct(registry->serviceRegistrations); + while (hashMapIterator_hasNext(&iter)) { + celix_array_list_t *regs = (array_list_pt) hashMapIterator_nextValue(&iter); + for (int regIdx = 0; (regs != NULL) && regIdx < celix_arrayList_size(regs); ++regIdx) { + service_registration_pt registration = celix_arrayList_get(regs, regIdx); + properties_pt props = NULL; + serviceRegistration_getProperties(registration, &props); + if (celix_filter_match(filter, props)) { + serviceRegistration_retain(registration); + long svcId = serviceRegistration_getServiceId(registration); + celix_arrayList_add(registrations, registration); + //update pending register event count + celix_increasePendingRegisteredEvent(registry, svcId); + } + } + } + celixThreadRwlock_unlock(®istry->lock); + + //NOTE there is a race condition with serviceRegistry_registerServiceInternal, as result + //a REGISTERED event can be triggered twice instead of once. The service tracker can deal with this. + //The handling of pending registered events is to ensure that the UNREGISTERING event is always + //after the 1 or 2 REGISTERED events. + + for (int i = 0; i < celix_arrayList_size(registrations); ++i) { + service_registration_pt reg = celix_arrayList_get(registrations, i); + long svcId = serviceRegistration_getServiceId(reg); + service_reference_pt ref = NULL; + serviceRegistry_getServiceReference_internal(registry, bundle, reg, &ref); + celix_service_event_t event; + event.reference = ref; + event.type = OSGI_FRAMEWORK_SERVICE_EVENT_REGISTERED; + listener->serviceChanged(listener->handle, &event); + serviceReference_release(ref, NULL); + serviceRegistration_release(reg); + + //update pending register event count + celix_decreasePendingRegisteredEvent(registry, svcId); + } + celix_arrayList_destroy(registrations); + + serviceRegistry_callHooksForListenerFilter(registry, bundle, entry->filter, false); + + celix_decreaseCountServiceListener(entry); //use count decreased, can be 0 + return CELIX_SUCCESS; +} + +celix_status_t celix_serviceRegistry_removeServiceListener(celix_service_registry_t *registry, celix_service_listener_t *listener) { + celix_service_registry_service_listener_entry_t *entry = NULL; + + celixThreadRwlock_writeLock(®istry->lock); + for (int i = 0; i < celix_arrayList_size(registry->serviceListeners); ++i) { + celix_service_registry_service_listener_entry_t *visit = celix_arrayList_get(registry->serviceListeners, i); + if (visit->listener == listener) { + entry = visit; + celix_arrayList_removeAt(registry->serviceListeners, i); + break; + } + } + celixThreadRwlock_unlock(®istry->lock); + + if (entry != NULL) { + serviceRegistry_callHooksForListenerFilter(registry, entry->bundle, entry->filter, true); + celix_waitAndDestroyServiceListener(entry); + } else { + fw_log(logger, OSGI_FRAMEWORK_LOG_ERROR, "Cannot remove service listener, listener not found"); + return CELIX_ILLEGAL_ARGUMENT; + } + return CELIX_SUCCESS; +} + +static void celix_serviceRegistry_serviceChanged(celix_service_registry_t *registry, celix_service_event_type_t eventType, service_registration_pt registration) { + celix_service_registry_service_listener_entry_t *entry; + + celix_array_list_t* retainedEntries = celix_arrayList_create(); + celix_array_list_t* matchedEntries = celix_arrayList_create(); + + celixThreadRwlock_readLock(®istry->lock); + for (int i = 0; i < celix_arrayList_size(registry->serviceListeners); ++i) { + entry = celix_arrayList_get(registry->serviceListeners, i); + celix_arrayList_add(retainedEntries, entry); + celix_increaseCountServiceListener(entry); //ensure that use count > 0, so that the listener cannot be destroyed until all pending event are handled. + } + celixThreadRwlock_unlock(®istry->lock); + + for (int i = 0; i < celix_arrayList_size(retainedEntries); ++i) { + entry = celix_arrayList_get(retainedEntries, i); + int matched = 0; + celix_properties_t *props = NULL; + bool matchResult = false; + serviceRegistration_getProperties(registration, &props); + if (entry->filter != NULL) { + filter_match(entry->filter, props, &matchResult); + } + matched = (entry->filter == NULL) || matchResult; + if (matched) { + celix_arrayList_add(matchedEntries, entry); + } else { + celix_decreaseCountServiceListener(entry); //Not a match -> release entry + } + } + celix_arrayList_destroy(retainedEntries); + + /* + * TODO FIXME, A deadlock can happen when (e.g.) a service is deregistered, triggering this fw_serviceChanged and + * one of the matching service listener callbacks tries to remove an other matched service listener. + * The remove service listener will call the listener_waitForDestroy and the fw_serviceChanged part keeps the + * usageCount on > 0. + * + * Not sure how to prevent/handle this. + */ + + for (int i = 0; i < celix_arrayList_size(matchedEntries); ++i) { + entry = celix_arrayList_get(matchedEntries, i); + service_reference_pt reference = NULL; + celix_service_event_t event; + serviceRegistry_getServiceReference(registry, entry->bundle, registration, &reference); + event.type = eventType; + event.reference = reference; + entry->listener->serviceChanged(entry->listener->handle, &event); + serviceRegistry_ungetServiceReference(registry, entry->bundle, reference); + celix_decreaseCountServiceListener(entry); //decrease usage, so that the listener can be destroyed (if use count is now 0) + } + celix_arrayList_destroy(matchedEntries); +} + + +static void celix_increasePendingRegisteredEvent(celix_service_registry_t *registry, long svcId) { + celixThreadMutex_lock(®istry->pendingRegisterEvents.mutex); + long count = (long)hashMap_get(registry->pendingRegisterEvents.map, (void*)svcId); + count += 1; + hashMap_put(registry->pendingRegisterEvents.map, (void*)svcId, (void*)count); + celixThreadMutex_unlock(®istry->pendingRegisterEvents.mutex); +} + +static void celix_decreasePendingRegisteredEvent(celix_service_registry_t *registry, long svcId) { + celixThreadMutex_lock(®istry->pendingRegisterEvents.mutex); + long count = (long)hashMap_get(registry->pendingRegisterEvents.map, (void*)svcId); + assert(count >= 1); + count -= 1; + if (count > 0) { + hashMap_put(registry->pendingRegisterEvents.map, (void *)svcId, (void *)count); + } else { + hashMap_remove(registry->pendingRegisterEvents.map, (void*)svcId); + } + celixThreadCondition_signal(®istry->pendingRegisterEvents.cond); + celixThreadMutex_unlock(®istry->pendingRegisterEvents.mutex); +} + +static void celix_waitForPendingRegisteredEvents(celix_service_registry_t *registry, long svcId) { + celixThreadMutex_lock(®istry->pendingRegisterEvents.mutex); + long count = (long)hashMap_get(registry->pendingRegisterEvents.map, (void*)svcId); + while (count > 0) { + celixThreadCondition_wait(®istry->pendingRegisterEvents.cond, ®istry->pendingRegisterEvents.mutex); + count = (long)hashMap_get(registry->pendingRegisterEvents.map, (void*)svcId); + } + celixThreadMutex_unlock(®istry->pendingRegisterEvents.mutex); +} + +//static void celix_serviceRegistry_triggerListenerHooks(celix_service_registry_t *registry, const char* serviceName, const celix_properties_t *properties) { Review comment: Commented out code? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: users@infra.apache.org With regards, Apache Git Services