celix-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [celix] glimmerveen commented on a change in pull request #203: Feature/osgi promise
Date Tue, 21 Apr 2020 06:18:24 GMT

glimmerveen commented on a change in pull request #203:
URL: https://github.com/apache/celix/pull/203#discussion_r411900232



##########
File path: misc/experimental/promise/api/celix/impl/SharedPromiseState.h
##########
@@ -0,0 +1,508 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#pragma once
+
+#include <functional>
+#include <chrono>
+#include <mutex>
+#include <condition_variable>
+#include <vector>
+#include <thread>
+
+#include <tbb/task_arena.h>
+
+#include "celix/PromiseInvocationException.h"
+#include "celix/PromiseTimeoutException.h"
+
+namespace celix {
+    namespace impl {
+
+        template<typename T>
+        class SharedPromiseState {
+        public:
+            typedef typename std::aligned_storage<sizeof(T), std::alignment_of<T>::value>::type
DataType;
+
+            explicit SharedPromiseState(const tbb::task_arena &executor = {});
+
+            ~SharedPromiseState();
+
+            void resolve(T &&value);
+
+            void resolve(const T& value);
+
+            void fail(std::exception_ptr p);
+
+            void fail(const std::exception &e);
+
+            void tryResolve(T &&value);
+
+            void tryFail(std::exception_ptr p);
+
+            const T &getValue() const; //copy
+            T moveValue(); //move
+            std::exception_ptr getFailure() const;
+
+            void wait() const;
+
+            bool isDone() const;
+
+            bool isSuccessfullyResolved() const;
+
+            void addOnSuccessConsumeCallback(std::function<void(T)> callback);
+
+            void addOnFailureConsumeCallback(std::function<void(const std::exception &)>
callback);
+
+            void addOnResolve(std::function<void(bool succeeded, T *val, std::exception_ptr
exp)> callback);
+
+            template<typename Rep, typename Period>
+            std::shared_ptr<SharedPromiseState<T>> delay(std::chrono::duration<Rep,
Period> duration);
+
+            std::shared_ptr<SharedPromiseState<T>> recover(std::function<T()>
recover);
+
+            std::shared_ptr<SharedPromiseState<T>> filter(std::function<bool(T)>
predicate);
+
+            std::shared_ptr<SharedPromiseState<T>> fallbackTo(std::shared_ptr<SharedPromiseState<T>>
fallbackTo);
+
+            void resolveWith(std::shared_ptr<SharedPromiseState<T>> with);
+
+            template<typename R>
+            std::shared_ptr<SharedPromiseState<R>> map(std::function<R(T)>
mapper);
+
+            std::shared_ptr<SharedPromiseState<T>> thenAccept(std::function<void(T)>
consumer);
+
+            template<typename Rep, typename Period>
+            static std::shared_ptr<SharedPromiseState<T>>
+            timeout(std::shared_ptr<SharedPromiseState<T>> state, std::chrono::duration<Rep,
Period> duration);
+
+            void addChain(std::function<void()> chainFunction);
+
+            tbb::task_arena getExecutor() const;
+
+//        template<typename R>
+//        std::shared_ptr<SharedPromiseState<R>> then(std::function<void()>
success, std::function<void()> failure);
+        private:
+            /**
+             * Complete the resolving and call the registered tasks
+             * A reference to the possible locked unique_lock.
+             */
+            void complete(std::unique_lock<std::mutex> &lck);
+
+            /**
+             * Wait for data and check if it resolved as expected (expects mutex locked)
+             */
+            void waitForAndCheckData(std::unique_lock<std::mutex> &lck, bool expectValid)
const;
+
+            tbb::task_arena executor; //TODO look into different thread pool libraries
+            //TODO add ScheduledExecutorService like object
+
+            mutable std::mutex mutex{}; //protects below
+            mutable std::condition_variable cond{};
+            bool done = false;
+            bool dataMoved = false;
+            std::vector<std::function<void()>> chain{}; //chain tasks are executed
on thread pool.
+            std::exception_ptr exp{nullptr};
+            DataType data{};
+        };
+    }
+}
+
+
+/*********************************************************************************
+ Implementation
+*********************************************************************************/
+
+template<typename T>
+inline celix::impl::SharedPromiseState<T>::SharedPromiseState(const tbb::task_arena&
_executor) : executor{_executor} {}
+
+template<typename T>
+inline celix::impl::SharedPromiseState<T>::~SharedPromiseState() {
+    std::unique_lock<std::mutex> lck{mutex};
+
+    //Note for now, not waiting until promise is met.
+    //Else if a deferred goes out of scope without resolving, a wait will block
+    //cond.wait(lck, [this]{return done;});
+
+    if (done && !exp && !dataMoved) {
+        static_cast<T*>(static_cast<void*>(&data))->~T();
+    }
+}
+
+template<typename T>
+inline void celix::impl::SharedPromiseState<T>::resolve(T&& value) {
+    std::unique_lock<std::mutex> lck{mutex};
+    if (done) {
+        throw celix::PromiseInvocationException("Cannot resolve Promise. Promise is already
done");
+    }
+    new(&data) T{std::forward<T>(value)};
+    exp = nullptr;
+    complete(lck);
+}
+
+
+template<typename T>
+inline void celix::impl::SharedPromiseState<T>::resolve(const T& value) {
+    std::unique_lock<std::mutex> lck{mutex};
+    if (done) {
+        throw celix::PromiseInvocationException("Cannot resolve Promise. Promise is already
done");
+    }
+    new(&data) T{value};
+    exp = nullptr;
+    complete(lck);
+}
+
+template<typename T>
+inline void celix::impl::SharedPromiseState<T>::fail(std::exception_ptr e) {
+    std::unique_lock<std::mutex> lck{mutex};
+    if (done) {
+        throw celix::PromiseInvocationException("Cannot fail Promise. Promise is already
done");
+    }
+    exp = e;
+    complete(lck);
+}
+
+template<typename T>
+inline void celix::impl::SharedPromiseState<T>::fail(const std::exception& e) {
+    fail(std::make_exception_ptr(e));
+}
+
+template<typename T>
+inline void celix::impl::SharedPromiseState<T>::tryResolve(T&& value) {
+    std::unique_lock<std::mutex> lck{mutex};
+    if (!done) {
+        new(&data) T(std::forward<T>(value));
+        exp = nullptr;
+        complete(lck);
+    }
+}
+
+template<typename T>
+inline void celix::impl::SharedPromiseState<T>::tryFail(std::exception_ptr e) {
+    std::unique_lock<std::mutex> lck{mutex};
+    if (!done) {
+        exp = e;
+        complete(lck);
+    }
+}
+
+template<typename T>
+inline bool celix::impl::SharedPromiseState<T>::isDone() const {
+    std::lock_guard<std::mutex> lck{mutex};
+    return done;
+}
+
+template<typename T>
+inline bool celix::impl::SharedPromiseState<T>::isSuccessfullyResolved() const {
+    std::lock_guard<std::mutex> lck{mutex};
+    return done && !exp;
+}
+
+
+template<typename T>
+inline void celix::impl::SharedPromiseState<T>::waitForAndCheckData(std::unique_lock<std::mutex>&
lck, bool expectValid) const {
+    if (!lck.owns_lock()) {
+        lck.lock();
+    }
+    cond.wait(lck, [this]{return done;});
+    if (expectValid && exp) {
+        throw celix::PromiseInvocationException{"Expected a succeeded promise, but promise
failed"};
+    } else if(!expectValid && !exp && !dataMoved) {
+        throw celix::PromiseInvocationException{"Expected a failed promise, but promise succeeded"};
+    } else if (dataMoved) {
+        throw celix::PromiseInvocationException{"Invalid use of promise, data is moved and
not available anymore!"};
+    }
+}
+
+template<typename T>
+inline const T& celix::impl::SharedPromiseState<T>::getValue() const {
+    std::unique_lock<std::mutex> lck{mutex};
+    waitForAndCheckData(lck, true);
+    const T* ptr = reinterpret_cast<const T*>(&data);
+    return *ptr;
+}
+
+template<typename T>
+inline tbb::task_arena celix::impl::SharedPromiseState<T>::getExecutor() const {

Review comment:
       Conceptually the Executor belongs to the PromiseFactory, and the Promise instances
obtain the Executor from there.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



Mime
View raw message