diff --git a/include/libipc/condition.h b/include/libipc/condition.h index d3b3a59..a4e2ac3 100644 --- a/include/libipc/condition.h +++ b/include/libipc/condition.h @@ -27,8 +27,8 @@ public: void close() noexcept; bool wait(ipc::sync::mutex &mtx, std::uint64_t tm = ipc::invalid_value) noexcept; - bool notify() noexcept; - bool broadcast() noexcept; + bool notify(ipc::sync::mutex &mtx) noexcept; + bool broadcast(ipc::sync::mutex &mtx) noexcept; private: class condition_; diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 00dd78e..e1f439f 100755 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -1,12 +1,8 @@ project(ipc) -if(UNIX) - file(GLOB SRC_FILES ${LIBIPC_PROJECT_DIR}/src/libipc/platform/*_linux.cpp) -else() - file(GLOB SRC_FILES ${LIBIPC_PROJECT_DIR}/src/libipc/platform/*_win.cpp) -endif() aux_source_directory(${LIBIPC_PROJECT_DIR}/src/libipc SRC_FILES) aux_source_directory(${LIBIPC_PROJECT_DIR}/src/libipc/sync SRC_FILES) +aux_source_directory(${LIBIPC_PROJECT_DIR}/src/libipc/platform SRC_FILES) file(GLOB HEAD_FILES ${LIBIPC_PROJECT_DIR}/include/libipc/*.h @@ -33,28 +29,27 @@ set_target_properties(${PROJECT_NAME} PROPERTIES ARCHIVE_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/lib" LIBRARY_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/lib" - RUNTIME_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/bin" ) + RUNTIME_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/bin") # set version set_target_properties(${PROJECT_NAME} PROPERTIES - VERSION 1.1.0 - SOVERSION 2) + VERSION 1.2.0 + SOVERSION 3) target_include_directories(${PROJECT_NAME} PUBLIC ${LIBIPC_PROJECT_DIR}/include PRIVATE ${LIBIPC_PROJECT_DIR}/src -) + $<$:${LIBIPC_PROJECT_DIR}/src/libipc/platform/linux>) if(NOT MSVC) target_link_libraries(${PROJECT_NAME} PUBLIC - pthread - $<$>:rt>) + $<$>:pthread> + $<$,$>>:rt>) endif() install( TARGETS ${PROJECT_NAME} RUNTIME DESTINATION bin LIBRARY DESTINATION lib - ARCHIVE DESTINATION lib -) + ARCHIVE DESTINATION lib) diff --git a/src/libipc/ipc.cpp b/src/libipc/ipc.cpp index 2713de3..c864a1b 100755 --- a/src/libipc/ipc.cpp +++ b/src/libipc/ipc.cpp @@ -687,8 +687,8 @@ buff_t chan_impl::try_recv(ipc::handle_t h) { } template struct chan_impl>; -template struct chan_impl>; -template struct chan_impl>; +// template struct chan_impl>; // TBD +// template struct chan_impl>; // TBD template struct chan_impl>; template struct chan_impl>; diff --git a/src/libipc/platform/detail.h b/src/libipc/platform/detail.h index 8f4c4f5..e93c1d2 100755 --- a/src/libipc/platform/detail.h +++ b/src/libipc/platform/detail.h @@ -1,4 +1,22 @@ -#pragma once +#ifndef LIBIPC_SRC_PLATFORM_DETAIL_H_ +#define LIBIPC_SRC_PLATFORM_DETAIL_H_ + +// detect platform + +#if defined(WIN64) || defined(_WIN64) || defined(__WIN64__) || \ + defined(WIN32) || defined(_WIN32) || defined(__WIN32__) || defined(__NT__) || \ + defined(WINCE) || defined(_WIN32_WCE) +# define IPC_OS_WINDOWS_ +#elif defined(__linux__) || defined(__linux) +# define IPC_OS_LINUX_ +#elif defined(__QNX__) +# define IPC_OS_QNX_ +#elif defined(__APPLE__) +#elif defined(__ANDROID__) +// TBD +#endif + +#if defined(__cplusplus) #include #include @@ -22,18 +40,6 @@ # error "IPC_CONSTEXPR_ has been defined." #endif -// detect platform - -#if defined(WIN64) || defined(_WIN64) || defined(__WIN64__) || \ - defined(WIN32) || defined(_WIN32) || defined(__WIN32__) || defined(__NT__) || \ - defined(WINCE) || defined(_WIN32_WCE) -# define IPC_OS_WINDOWS_ -#endif/*WIN*/ - -#if defined(__linux__) || defined(__linux) -# define IPC_OS_LINUX_ -#endif/*linux*/ - #if __cplusplus >= 201703L #define IPC_UNUSED_ [[maybe_unused]] @@ -123,17 +129,8 @@ constexpr const T& (min)(const T& a, const T& b) { #endif/*__cplusplus < 201703L*/ -template -auto horrible_cast(U rhs) noexcept - -> typename std::enable_if::value - && std::is_trivially_copyable::value, T>::type { - union { - T t; - U u; - } r = {}; - r.u = rhs; - return r.t; -} - } // namespace detail } // namespace ipc + +#endif // defined(__cplusplus) +#endif // LIBIPC_SRC_PLATFORM_DETAIL_H_ \ No newline at end of file diff --git a/src/libipc/platform/linux/a0/LICENSE b/src/libipc/platform/linux/a0/LICENSE new file mode 100644 index 0000000..cf1ab25 --- /dev/null +++ b/src/libipc/platform/linux/a0/LICENSE @@ -0,0 +1,24 @@ +This is free and unencumbered software released into the public domain. + +Anyone is free to copy, modify, publish, use, compile, sell, or +distribute this software, either in source code form or as a compiled +binary, for any purpose, commercial or non-commercial, and by any +means. + +In jurisdictions that recognize copyright laws, the author or authors +of this software dedicate any and all copyright interest in the +software to the public domain. We make this dedication for the benefit +of the public at large and to the detriment of our heirs and +successors. We intend this dedication to be an overt act of +relinquishment in perpetuity of all present and future rights to this +software under copyright law. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. +IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR +OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, +ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR +OTHER DEALINGS IN THE SOFTWARE. + +For more information, please refer to diff --git a/src/libipc/platform/linux/a0/README.md b/src/libipc/platform/linux/a0/README.md new file mode 100644 index 0000000..8aed9d7 --- /dev/null +++ b/src/libipc/platform/linux/a0/README.md @@ -0,0 +1,213 @@ +

+
+ +
+ AlephZero +

+ +

Simple, Robust, Fast IPC.

+ +

+ + + + +

+ +

+ Overview • + Transport • + Protocol • + Examples • + Installation • + Across Dockers +

+ +# Overview + +[Presentation from March 25, 2020](https://docs.google.com/presentation/d/12KE9UucjZPtpVnM1NljxOqBolBBKECWJdrCoE2yJaBw/edit#slide=id.p) + +AlephZero is a library for message based communication between programs running on the same machine. + +## Simple + +AlephZero's main goal is to be simple to use. Nothing is higher priority. + +There is no "master" process in between your nodes that is needed to do handshakes or exchanges of any kind. All you need is the topic name. + +See the Examples. + +## Robust + +This is probably the main value of AlephZero, above similar libraries. + +AlephZero uses a lot of tricks to ensure the state of all channels is consistent, even when programs die. This includes double-buffering the state of the communication channel and [robustifying](https://man7.org/linux/man-pages/man3/pthread_mutexattr_setrobust.3.html) the locks and notification channels. + +## Fast + +AlephZero uses shared memory across multiple processes to read and write messages, minimizing the involvement of the kernel. The kernel only really gets involved in notifying a process that a new message exists, and for that we use futex (fast user-space mutex). + +TODO: Benchmarks + +# Transport + +AlephZero, at its core, is a simple allocator on top of a contiguous region of memory. Usually, shared-memory. The allocator of choice is a circular-linked-list, which is fast, simple, and sufficient for the protocol listed below. It also plays well with the robustness requirement. + +This has a number of implications. For one, this means that old messages are kept around until the space is needed. The oldest messages are always discarded before any more recent messages. + +# Protocol + +Rather than exposing the low-level transport directly, AlephZero provides a few higher level protocol: + +* PubSub: Broadcast published messages. Subscribers get notified. +* RPC: Request-response. +* PRPC (Progressive RPC): Request-streaming response. +* Sessions: Bi-directional channel of communication. Not yet implemented. Let me know if you want this. + +# Examples + +Many more example and an interactive experience can be found at: https://github.com/alephzero/playground + +For the curious, here are some simple snippets to get you started: + +To begin with, we need to include AlephZero: +```cc +#include +``` + +## PubSub + +You can have as many publisher and subscribers on the same topic as you wish. They just need to agree on the filename. + +```cc +a0::Publisher p("my_pubsub_topic"); +p.pub("foo"); +``` + +You just published `"foo"` to the `"my_pubsub_topic"`. + +To read those message, you can create a subscriber on the same topic: +```cc +a0::Subscriber sub( + "my_pubsub_topic", + A0_INIT_AWAIT_NEW, // or MOST_RECENT or OLDEST + A0_ITER_NEWEST, // or NEXT + [](a0::PacketView pkt_view) { + std::cout << "Got: " << pkt_view.payload() << std::endl; + }); +``` +The callback will trigger whenever a message is published. + +The `Subscriber` object spawns a thread that will read the topic and call the callback. + +The `A0_INIT` tells the subscriber where to start reading. +* `A0_INIT_AWAIT_NEW`: Start with messages published after the creation of the subscriber. +* `A0_INIT_MOST_RECENT`: Start with the most recently published message. Useful for state and configuration. But be careful, this can be quite old! +* `A0_INIT_OLDEST`: Topics keep a history of 16MB (unless configures otherwise). Start with the oldest thing still in there. + +The `A0_ITER` tells the subscriber how to continue reading messages. After each callback: +* `A0_ITER_NEXT`: grab the sequentially next message. When you don't want to miss a thing. +* `A0_ITER_NEWEST`: grab the newest available unread message. When you want to keep up with the firehose. + +```cc +a0::SubscriberSync sub_sync( + "my_pubsub_topic", + A0_INIT_OLDEST, A0_ITER_NEXT); +while (sub_sync.has_next()) { + auto pkt = sub_sync.next(); + std::cout << "Got: " << pkt.payload() << std::endl; +} +``` + +## RPC + +Create an `RpcServer`: + +```cc +a0::RpcServer server( + "my_rpc_topic", + /* onrequest = */ [](a0::RpcRequest req) { + std::cout << "Got: " << req.pkt().payload() << std::endl; + req.reply("echo " + std::string(req.pkt().payload())); + }, + /* oncancel = */ nullptr); +``` + +Create an `RpcClient`: + +```cc +a0::RpcClient client("my_rpc_topic"); +client.send("client msg", [](a0::PacketView reply) { + std::cout << "Got: " << reply.payload() << std::endl; +}); +``` + +# Installation + +## Install From Source + +### Ubuntu Dependencies + +```sh +apt install g++ make +``` + +### Alpine Dependencies + +```sh +apk add g++ linux-headers make +``` + +### Download And Install + +```sh +git clone https://github.com/alephzero/alephzero.git +cd alephzero +make install -j +``` + +## Install From Package + +Coming soon-ish. Let me know if you want this and I'll prioritize it. External support is much appreciated. + +## Integration + +### Command Line + +Add the following to g++ / clang commands. +```sh +-L${libdir} -lalephzero -lpthread +``` + +### Package-cfg + +```sh +pkg-config --cflags --libs alephzero +``` + +### CMake + +Coming soon-ish. Let me know if you want this and I'll prioritize it. External support is much appreciated. + +### Bazel + +Coming soon-ish. Let me know if you want this and I'll prioritize it. + +# Across Dockers + +For programs running across different dockers to be able to communicate, we need to have them match up on two flags: `--ipc` and `--pid`. + +* `--ipc` shares the `/dev/shm` filesystem. This is necessary to open the same file topics. +* `--pid` shares the process id namespace. This is necessary for the locking and notification systems. + +In the simplest case, you can set them both to `host` and talk through the system's global `/dev/shm` and process id namespace. +```sh +docker run --ipc=host --pid=host --name=foo foo_image +docker run --ipc=host --pid=host --name=bar bar_image +``` + +Or, you can mark one as `shareable` and have the others connect to it: +```sh +docker run --ipc=shareable --pid=shareable --name=foo foo_image +docker run --ipc=container:foo --pid=container:foo --name=bar bar_image +``` diff --git a/src/libipc/platform/linux/a0/atomic.h b/src/libipc/platform/linux/a0/atomic.h new file mode 100644 index 0000000..09a26ff --- /dev/null +++ b/src/libipc/platform/linux/a0/atomic.h @@ -0,0 +1,36 @@ +#ifndef A0_SRC_ATOMIC_H +#define A0_SRC_ATOMIC_H + +#include "a0/inline.h" + +#ifdef __cplusplus +extern "C" { +#endif + +A0_STATIC_INLINE +void a0_barrier() { + // 'atomic_thread_fence' is not supported with ‘-fsanitize=thread’ + __sync_synchronize(); +} + +#define a0_atomic_fetch_add(P, V) __atomic_fetch_add((P), (V), __ATOMIC_RELAXED) +#define a0_atomic_add_fetch(P, V) __atomic_add_fetch((P), (V), __ATOMIC_RELAXED) + +#define a0_atomic_fetch_and(P, V) __atomic_fetch_and((P), (V), __ATOMIC_RELAXED) +#define a0_atomic_and_fetch(P, V) __atomic_and_fetch((P), (V), __ATOMIC_RELAXED) + +#define a0_atomic_fetch_or(P, V) __atomic_fetch_or((P), (V), __ATOMIC_RELAXED) +#define a0_atomic_or_fetch(P, V) __atomic_or_fetch((P), (V), __ATOMIC_RELAXED) + +#define a0_atomic_load(P) __atomic_load_n((P), __ATOMIC_RELAXED) +#define a0_atomic_store(P, V) __atomic_store_n((P), (V), __ATOMIC_RELAXED) + +// TODO(lshamis): Switch from __sync to __atomic. +#define a0_cas_val(P, OV, NV) __sync_val_compare_and_swap((P), (OV), (NV)) +#define a0_cas(P, OV, NV) __sync_bool_compare_and_swap((P), (OV), (NV)) + +#ifdef __cplusplus +} +#endif + +#endif // A0_SRC_ATOMIC_H diff --git a/src/libipc/platform/linux/a0/clock.h b/src/libipc/platform/linux/a0/clock.h new file mode 100644 index 0000000..7b2a88b --- /dev/null +++ b/src/libipc/platform/linux/a0/clock.h @@ -0,0 +1,60 @@ +#ifndef A0_SRC_CLOCK_H +#define A0_SRC_CLOCK_H + +#include "a0/err.h" +#include "a0/inline.h" + +#include +#include + +#include "err_macro.h" + +#ifdef __cplusplus +extern "C" { +#endif + +static const int64_t NS_PER_SEC = 1e9; + +typedef struct timespec timespec_t; + +A0_STATIC_INLINE +a0_err_t a0_clock_now(clockid_t clk, timespec_t* out) { + A0_RETURN_SYSERR_ON_MINUS_ONE(clock_gettime(clk, out)); + return A0_OK; +} + +A0_STATIC_INLINE +a0_err_t a0_clock_add(timespec_t ts, int64_t add_nsec, timespec_t* out) { + out->tv_sec = ts.tv_sec + add_nsec / NS_PER_SEC; + out->tv_nsec = ts.tv_nsec + add_nsec % NS_PER_SEC; + if (out->tv_nsec >= NS_PER_SEC) { + out->tv_sec++; + out->tv_nsec -= NS_PER_SEC; + } else if (out->tv_nsec < 0) { + out->tv_sec--; + out->tv_nsec += NS_PER_SEC; + } + + return A0_OK; +} + +A0_STATIC_INLINE +a0_err_t a0_clock_convert( + clockid_t orig_clk, + timespec_t orig_ts, + clockid_t target_clk, + timespec_t* target_ts) { + timespec_t orig_now; + A0_RETURN_ERR_ON_ERR(a0_clock_now(orig_clk, &orig_now)); + timespec_t target_now; + A0_RETURN_ERR_ON_ERR(a0_clock_now(target_clk, &target_now)); + + int64_t add_nsec = (orig_ts.tv_sec - orig_now.tv_sec) * NS_PER_SEC + (orig_ts.tv_nsec - orig_now.tv_nsec); + return a0_clock_add(target_now, add_nsec, target_ts); +} + +#ifdef __cplusplus +} +#endif + +#endif // A0_SRC_CLOCK_H diff --git a/src/libipc/platform/linux/a0/empty.h b/src/libipc/platform/linux/a0/empty.h new file mode 100644 index 0000000..69fef8a --- /dev/null +++ b/src/libipc/platform/linux/a0/empty.h @@ -0,0 +1,13 @@ +#ifndef A0_EMPTY_H +#define A0_EMPTY_H + +// Bah. Why is there no consistent way to zero initialize a struct? +#ifdef __cplusplus +#define A0_EMPTY \ + {} +#else +#define A0_EMPTY \ + { 0 } +#endif + +#endif // A0_EMPTY_H diff --git a/src/libipc/platform/linux/a0/err.c b/src/libipc/platform/linux/a0/err.c new file mode 100644 index 0000000..a48b858 --- /dev/null +++ b/src/libipc/platform/linux/a0/err.c @@ -0,0 +1,50 @@ +#include "a0/err.h" +#include "a0/thread_local.h" + +#include +#include + +A0_THREAD_LOCAL int a0_err_syscode; +A0_THREAD_LOCAL char a0_err_msg[1024]; + +const char* a0_strerror(a0_err_t err) { + switch (err) { + case A0_OK: { + return strerror(0); + } + case A0_ERR_SYS: { + return strerror(a0_err_syscode); + } + case A0_ERR_CUSTOM_MSG: { + return a0_err_msg; + } + case A0_ERR_INVALID_ARG: { + return strerror(EINVAL); + } + case A0_ERR_RANGE: { + return "Index out of bounds"; + } + case A0_ERR_AGAIN: { + return "Not available yet"; + } + case A0_ERR_FRAME_LARGE: { + return "Frame size too large"; + } + case A0_ERR_ITER_DONE: { + return "Done iterating"; + } + case A0_ERR_NOT_FOUND: { + return "Not found"; + } + case A0_ERR_BAD_PATH: { + return "Invalid path"; + } + case A0_ERR_BAD_TOPIC: { + return "Invalid topic name"; + } + default: { + break; + } + } + return ""; +} diff --git a/src/libipc/platform/linux/a0/err.h b/src/libipc/platform/linux/a0/err.h new file mode 100644 index 0000000..05b7007 --- /dev/null +++ b/src/libipc/platform/linux/a0/err.h @@ -0,0 +1,33 @@ +#ifndef A0_ERR_H +#define A0_ERR_H + +#include "a0/thread_local.h" + +#ifdef __cplusplus +extern "C" { +#endif + +typedef enum a0_err_e { + A0_OK = 0, + A0_ERR_SYS = 1, + A0_ERR_CUSTOM_MSG = 2, + A0_ERR_INVALID_ARG = 3, + A0_ERR_RANGE = 4, + A0_ERR_AGAIN = 5, + A0_ERR_ITER_DONE = 6, + A0_ERR_NOT_FOUND = 7, + A0_ERR_FRAME_LARGE = 8, + A0_ERR_BAD_PATH = 9, + A0_ERR_BAD_TOPIC = 10, +} a0_err_t; + +extern A0_THREAD_LOCAL int a0_err_syscode; +extern A0_THREAD_LOCAL char a0_err_msg[1024]; + +const char* a0_strerror(a0_err_t); + +#ifdef __cplusplus +} +#endif + +#endif // A0_ERR_H diff --git a/src/libipc/platform/linux/a0/err_macro.h b/src/libipc/platform/linux/a0/err_macro.h new file mode 100644 index 0000000..6683efc --- /dev/null +++ b/src/libipc/platform/linux/a0/err_macro.h @@ -0,0 +1,52 @@ +#ifndef A0_SRC_ERR_MACRO_H +#define A0_SRC_ERR_MACRO_H + +#include "a0/err.h" +#include "a0/inline.h" + +#include +#include +#include +#include + +A0_STATIC_INLINE +a0_err_t A0_MAKE_SYSERR(int syserr) { + a0_err_syscode = syserr; + return A0_ERR_SYS; +} + +A0_STATIC_INLINE +int A0_SYSERR(a0_err_t err) { + return err == A0_ERR_SYS ? a0_err_syscode : 0; +} + +A0_STATIC_INLINE_RECURSIVE +a0_err_t A0_MAKE_MSGERR(const char* fmt, ...) { + va_list args; + va_start(args, fmt); + if (fmt) { + vsnprintf(a0_err_msg, sizeof(a0_err_msg), fmt, args); // NOLINT(clang-analyzer-valist.Uninitialized): https://bugs.llvm.org/show_bug.cgi?id=41311 + va_end(args); + return A0_ERR_CUSTOM_MSG; + } + va_end(args); + return A0_OK; +} + +#define A0_RETURN_SYSERR_ON_MINUS_ONE(X) \ + do { \ + if ((X) == -1) { \ + a0_err_syscode = errno; \ + return A0_ERR_SYS; \ + } \ + } while (0) + +#define A0_RETURN_ERR_ON_ERR(X) \ + do { \ + a0_err_t _err = (X); \ + if (_err) { \ + return _err; \ + } \ + } while (0) + +#endif // A0_SRC_ERR_MACRO_H diff --git a/src/libipc/platform/linux/a0/ftx.h b/src/libipc/platform/linux/a0/ftx.h new file mode 100644 index 0000000..d0d4c3d --- /dev/null +++ b/src/libipc/platform/linux/a0/ftx.h @@ -0,0 +1,111 @@ +#ifndef A0_SRC_FTX_H +#define A0_SRC_FTX_H + +#include "a0/err.h" +#include "a0/inline.h" +#include "a0/time.h" + +#include +#include +#include +#include +#include +#include + +#include "clock.h" +#include "err_macro.h" + +#ifdef __cplusplus +extern "C" { +#endif + +// FUTEX_WAIT and FUTEX_WAIT_REQUEUE_PI default to CLOCK_MONOTONIC, +// but FUTEX_LOCK_PI always uses CLOCK_REALTIME. +// +// Until someone tells me otherwise, I assume this is bad decision making +// and I will instead standardize all things on CLOCK_BOOTTIME. + +// Futex. +// Operations rely on the address. +// It should not be copied or moved. +typedef uint32_t a0_ftx_t; + +A0_STATIC_INLINE +a0_err_t a0_futex(a0_ftx_t* uaddr, + int futex_op, + int val, + uintptr_t timeout_or_val2, + a0_ftx_t* uaddr2, + int val3) { + A0_RETURN_SYSERR_ON_MINUS_ONE(syscall(SYS_futex, uaddr, futex_op, val, timeout_or_val2, uaddr2, val3)); + return A0_OK; +} + +A0_STATIC_INLINE +a0_err_t a0_ftx_wait(a0_ftx_t* ftx, int confirm_val, const a0_time_mono_t* time_mono) { + if (!time_mono) { + return a0_futex(ftx, FUTEX_WAIT, confirm_val, 0, NULL, 0); + } + + timespec_t ts_mono; + A0_RETURN_ERR_ON_ERR(a0_clock_convert(CLOCK_BOOTTIME, time_mono->ts, CLOCK_MONOTONIC, &ts_mono)); + return a0_futex(ftx, FUTEX_WAIT, confirm_val, (uintptr_t)&ts_mono, NULL, 0); +} + +A0_STATIC_INLINE +a0_err_t a0_ftx_wake(a0_ftx_t* ftx, int cnt) { + return a0_futex(ftx, FUTEX_WAKE, cnt, 0, NULL, 0); +} + +A0_STATIC_INLINE +a0_err_t a0_ftx_signal(a0_ftx_t* ftx) { + return a0_ftx_wake(ftx, 1); +} + +A0_STATIC_INLINE +a0_err_t a0_ftx_broadcast(a0_ftx_t* ftx) { + return a0_ftx_wake(ftx, INT_MAX); +} + +A0_STATIC_INLINE +a0_err_t a0_ftx_lock_pi(a0_ftx_t* ftx, const a0_time_mono_t* time_mono) { + if (!time_mono) { + return a0_futex(ftx, FUTEX_LOCK_PI, 0, 0, NULL, 0); + } + + timespec_t ts_wall; + A0_RETURN_ERR_ON_ERR(a0_clock_convert(CLOCK_BOOTTIME, time_mono->ts, CLOCK_REALTIME, &ts_wall)); + return a0_futex(ftx, FUTEX_LOCK_PI, 0, (uintptr_t)&ts_wall, NULL, 0); +} + +A0_STATIC_INLINE +a0_err_t a0_ftx_trylock_pi(a0_ftx_t* ftx) { + return a0_futex(ftx, FUTEX_TRYLOCK_PI, 0, 0, NULL, 0); +} + +A0_STATIC_INLINE +a0_err_t a0_ftx_unlock_pi(a0_ftx_t* ftx) { + return a0_futex(ftx, FUTEX_UNLOCK_PI, 0, 0, NULL, 0); +} + +A0_STATIC_INLINE +a0_err_t a0_ftx_cmp_requeue_pi(a0_ftx_t* ftx, int confirm_val, a0_ftx_t* requeue_ftx, int max_requeue) { + return a0_futex(ftx, FUTEX_CMP_REQUEUE_PI, 1, max_requeue, requeue_ftx, confirm_val); +} + +A0_STATIC_INLINE +a0_err_t a0_ftx_wait_requeue_pi(a0_ftx_t* ftx, int confirm_val, const a0_time_mono_t* time_mono, a0_ftx_t* requeue_ftx) { + if (!time_mono) { + return a0_futex(ftx, FUTEX_WAIT_REQUEUE_PI, confirm_val, 0, requeue_ftx, 0); + } + + timespec_t ts_mono; + A0_RETURN_ERR_ON_ERR(a0_clock_convert(CLOCK_BOOTTIME, time_mono->ts, CLOCK_MONOTONIC, &ts_mono)); + return a0_futex(ftx, FUTEX_WAIT_REQUEUE_PI, confirm_val, (uintptr_t)&ts_mono, requeue_ftx, 0); +} + +#ifdef __cplusplus +} +#endif + +#endif // A0_SRC_FTX_H diff --git a/src/libipc/platform/linux/a0/inline.h b/src/libipc/platform/linux/a0/inline.h new file mode 100644 index 0000000..a605540 --- /dev/null +++ b/src/libipc/platform/linux/a0/inline.h @@ -0,0 +1,8 @@ +#ifndef A0_INLINE_H +#define A0_INLINE_H + +#define A0_STATIC_INLINE static inline __attribute__((always_inline)) + +#define A0_STATIC_INLINE_RECURSIVE static inline + +#endif // A0_INLINE_H diff --git a/src/libipc/platform/linux/a0/mtx.c b/src/libipc/platform/linux/a0/mtx.c new file mode 100644 index 0000000..00022b6 --- /dev/null +++ b/src/libipc/platform/linux/a0/mtx.c @@ -0,0 +1,420 @@ +#include "a0/err.h" +#include "a0/inline.h" +#include "a0/mtx.h" +#include "a0/thread_local.h" +#include "a0/tid.h" +#include "a0/time.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "atomic.h" +#include "clock.h" +#include "err_macro.h" +#include "ftx.h" + +// TSAN is worth the pain of properly annotating our mutex. + +// clang-format off +#if defined(__SANITIZE_THREAD__) + #define A0_TSAN_ENABLED +#elif defined(__has_feature) + #if __has_feature(thread_sanitizer) + #define A0_TSAN_ENABLED + #endif +#endif +// clang-format on + +const unsigned __tsan_mutex_linker_init = 1 << 0; +const unsigned __tsan_mutex_write_reentrant = 1 << 1; +const unsigned __tsan_mutex_read_reentrant = 1 << 2; +const unsigned __tsan_mutex_not_static = 1 << 8; +const unsigned __tsan_mutex_read_lock = 1 << 3; +const unsigned __tsan_mutex_try_lock = 1 << 4; +const unsigned __tsan_mutex_try_lock_failed = 1 << 5; +const unsigned __tsan_mutex_recursive_lock = 1 << 6; +const unsigned __tsan_mutex_recursive_unlock = 1 << 7; + +#ifdef A0_TSAN_ENABLED + +void __tsan_mutex_create(void* addr, unsigned flags); +void __tsan_mutex_destroy(void* addr, unsigned flags); +void __tsan_mutex_pre_lock(void* addr, unsigned flags); +void __tsan_mutex_post_lock(void* addr, unsigned flags, int recursion); +int __tsan_mutex_pre_unlock(void* addr, unsigned flags); +void __tsan_mutex_post_unlock(void* addr, unsigned flags); +void __tsan_mutex_pre_signal(void* addr, unsigned flags); +void __tsan_mutex_post_signal(void* addr, unsigned flags); +void __tsan_mutex_pre_divert(void* addr, unsigned flags); +void __tsan_mutex_post_divert(void* addr, unsigned flags); + +#else + +#define _u_ __attribute__((unused)) + +A0_STATIC_INLINE void _u_ __tsan_mutex_create(_u_ void* addr, _u_ unsigned flags) {} +A0_STATIC_INLINE void _u_ __tsan_mutex_destroy(_u_ void* addr, _u_ unsigned flags) {} +A0_STATIC_INLINE void _u_ __tsan_mutex_pre_lock(_u_ void* addr, _u_ unsigned flags) {} +A0_STATIC_INLINE void _u_ __tsan_mutex_post_lock(_u_ void* addr, + _u_ unsigned flags, + _u_ int recursion) {} +A0_STATIC_INLINE int _u_ __tsan_mutex_pre_unlock(_u_ void* addr, _u_ unsigned flags) { + return 0; +} +A0_STATIC_INLINE void _u_ __tsan_mutex_post_unlock(_u_ void* addr, _u_ unsigned flags) {} +A0_STATIC_INLINE void _u_ __tsan_mutex_pre_signal(_u_ void* addr, _u_ unsigned flags) {} +A0_STATIC_INLINE void _u_ __tsan_mutex_post_signal(_u_ void* addr, _u_ unsigned flags) {} +A0_STATIC_INLINE void _u_ __tsan_mutex_pre_divert(_u_ void* addr, _u_ unsigned flags) {} +A0_STATIC_INLINE void _u_ __tsan_mutex_post_divert(_u_ void* addr, _u_ unsigned flags) {} + +#endif + +A0_THREAD_LOCAL bool a0_robust_init = false; + +A0_STATIC_INLINE +void a0_robust_reset() { + a0_robust_init = 0; +} + +A0_STATIC_INLINE +void a0_robust_reset_atfork() { + pthread_atfork(NULL, NULL, &a0_robust_reset); +} + +static pthread_once_t a0_robust_reset_atfork_once; + +typedef struct robust_list robust_list_t; +typedef struct robust_list_head robust_list_head_t; + +A0_THREAD_LOCAL robust_list_head_t a0_robust_head; + +A0_STATIC_INLINE +void robust_init() { + a0_robust_head.list.next = &a0_robust_head.list; + a0_robust_head.futex_offset = offsetof(a0_mtx_t, ftx); + a0_robust_head.list_op_pending = NULL; + syscall(SYS_set_robust_list, &a0_robust_head.list, sizeof(a0_robust_head)); +} + +A0_STATIC_INLINE +void init_thread() { + if (a0_robust_init) { + return; + } + + pthread_once(&a0_robust_reset_atfork_once, a0_robust_reset_atfork); + robust_init(); + a0_robust_init = true; +} + +A0_STATIC_INLINE +void robust_op_start(a0_mtx_t* mtx) { + init_thread(); + a0_robust_head.list_op_pending = (struct robust_list*)mtx; + a0_barrier(); +} + +A0_STATIC_INLINE +void robust_op_end(a0_mtx_t* mtx) { + (void)mtx; + a0_barrier(); + a0_robust_head.list_op_pending = NULL; +} + +A0_STATIC_INLINE +bool robust_is_head(a0_mtx_t* mtx) { + return mtx == (a0_mtx_t*)&a0_robust_head; +} + +A0_STATIC_INLINE +void robust_op_add(a0_mtx_t* mtx) { + a0_mtx_t* old_first = (a0_mtx_t*)a0_robust_head.list.next; + + mtx->prev = (a0_mtx_t*)&a0_robust_head; + mtx->next = old_first; + + a0_barrier(); + + a0_robust_head.list.next = (robust_list_t*)mtx; + if (!robust_is_head(old_first)) { + old_first->prev = mtx; + } +} + +A0_STATIC_INLINE +void robust_op_del(a0_mtx_t* mtx) { + a0_mtx_t* prev = mtx->prev; + a0_mtx_t* next = mtx->next; + prev->next = next; + if (!robust_is_head(next)) { + next->prev = prev; + } +} + +A0_STATIC_INLINE +uint32_t ftx_tid(a0_ftx_t ftx) { + return ftx & FUTEX_TID_MASK; +} + +A0_STATIC_INLINE +bool ftx_owner_died(a0_ftx_t ftx) { + return ftx & FUTEX_OWNER_DIED; +} + +static const uint32_t FTX_NOTRECOVERABLE = FUTEX_TID_MASK | FUTEX_OWNER_DIED; + +A0_STATIC_INLINE +bool ftx_notrecoverable(a0_ftx_t ftx) { + return (ftx & FTX_NOTRECOVERABLE) == FTX_NOTRECOVERABLE; +} + +A0_STATIC_INLINE +a0_err_t a0_mtx_timedlock_robust(a0_mtx_t* mtx, const a0_time_mono_t* timeout) { + const uint32_t tid = a0_tid(); + + int syserr = EINTR; + while (syserr == EINTR) { + // Can't lock if borked. + if (ftx_notrecoverable(a0_atomic_load(&mtx->ftx))) { + return A0_MAKE_SYSERR(ENOTRECOVERABLE); + } + + // Try to lock without kernel involvement. + if (a0_cas(&mtx->ftx, 0, tid)) { + return A0_OK; + } + + // Ask the kernel to lock. + syserr = A0_SYSERR(a0_ftx_lock_pi(&mtx->ftx, timeout)); + } + + if (!syserr) { + if (ftx_owner_died(a0_atomic_load(&mtx->ftx))) { + return A0_MAKE_SYSERR(EOWNERDEAD); + } + return A0_OK; + } + + return A0_MAKE_SYSERR(syserr); +} + +A0_STATIC_INLINE +a0_err_t a0_mtx_timedlock_impl(a0_mtx_t* mtx, const a0_time_mono_t* timeout) { + // Note: __tsan_mutex_pre_lock should come here, but tsan doesn't provide + // a way to "fail" a lock. Only a trylock. + robust_op_start(mtx); + const a0_err_t err = a0_mtx_timedlock_robust(mtx, timeout); + if (!err || A0_SYSERR(err) == EOWNERDEAD) { + __tsan_mutex_pre_lock(mtx, 0); + robust_op_add(mtx); + __tsan_mutex_post_lock(mtx, 0, 0); + } + robust_op_end(mtx); + return err; +} + +a0_err_t a0_mtx_timedlock(a0_mtx_t* mtx, a0_time_mono_t timeout) { + return a0_mtx_timedlock_impl(mtx, &timeout); +} + +a0_err_t a0_mtx_lock(a0_mtx_t* mtx) { + return a0_mtx_timedlock_impl(mtx, NULL); +} + +A0_STATIC_INLINE +a0_err_t a0_mtx_trylock_impl(a0_mtx_t* mtx) { + const uint32_t tid = a0_tid(); + + // Try to lock without kernel involvement. + uint32_t old = a0_cas_val(&mtx->ftx, 0, tid); + + // Did it work? + if (!old) { + robust_op_add(mtx); + return A0_OK; + } + + // Is the lock still usable? + if (ftx_notrecoverable(old)) { + return A0_MAKE_SYSERR(ENOTRECOVERABLE); + } + + // Is the owner still alive? + if (!ftx_owner_died(old)) { + return A0_MAKE_SYSERR(EBUSY); + } + + // Oh, the owner died. Ask the kernel to fix the state. + a0_err_t err = a0_ftx_trylock_pi(&mtx->ftx); + if (!err) { + robust_op_add(mtx); + if (ftx_owner_died(a0_atomic_load(&mtx->ftx))) { + return A0_MAKE_SYSERR(EOWNERDEAD); + } + return A0_OK; + } + + // EAGAIN means that somebody else beat us to it. + // Anything else means we're borked. + if (A0_SYSERR(err) == EAGAIN) { + return A0_MAKE_SYSERR(EBUSY); + } + return A0_MAKE_SYSERR(ENOTRECOVERABLE); +} + +a0_err_t a0_mtx_trylock(a0_mtx_t* mtx) { + __tsan_mutex_pre_lock(mtx, __tsan_mutex_try_lock); + robust_op_start(mtx); + a0_err_t err = a0_mtx_trylock_impl(mtx); + robust_op_end(mtx); + if (!err || A0_SYSERR(err) == EOWNERDEAD) { + __tsan_mutex_post_lock(mtx, __tsan_mutex_try_lock, 0); + } else { + __tsan_mutex_post_lock(mtx, __tsan_mutex_try_lock | __tsan_mutex_try_lock_failed, 0); + } + return err; +} + +a0_err_t a0_mtx_consistent(a0_mtx_t* mtx) { + const uint32_t val = a0_atomic_load(&mtx->ftx); + + // Why fix what isn't broken? + if (!ftx_owner_died(val)) { + return A0_MAKE_SYSERR(EINVAL); + } + + // Is it yours to fix? + if (ftx_tid(val) != a0_tid()) { + return A0_MAKE_SYSERR(EPERM); + } + + // Fix it! + a0_atomic_and_fetch(&mtx->ftx, ~FUTEX_OWNER_DIED); + + return A0_OK; +} + +a0_err_t a0_mtx_unlock(a0_mtx_t* mtx) { + const uint32_t tid = a0_tid(); + + const uint32_t val = a0_atomic_load(&mtx->ftx); + + // Only the owner can unlock. + if (ftx_tid(val) != tid) { + return A0_MAKE_SYSERR(EPERM); + } + + __tsan_mutex_pre_unlock(mtx, 0); + + // If the mutex was acquired with EOWNERDEAD, the caller is responsible + // for fixing the state and marking the mutex consistent. If they did + // not mark it consistent and are unlocking... then we are unrecoverably + // borked! + uint32_t new_val = 0; + if (ftx_owner_died(val)) { + new_val = FTX_NOTRECOVERABLE; + } + + robust_op_start(mtx); + robust_op_del(mtx); + + // If the futex is exactly equal to tid, then there are no waiters and the + // kernel doesn't need to get involved. + if (!a0_cas(&mtx->ftx, tid, new_val)) { + // Ask the kernel to wake up a waiter. + a0_ftx_unlock_pi(&mtx->ftx); + if (new_val) { + a0_atomic_or_fetch(&mtx->ftx, new_val); + } + } + + robust_op_end(mtx); + __tsan_mutex_post_unlock(mtx, 0); + + return A0_OK; +} + +// TODO(lshamis): Handle ENOTRECOVERABLE +A0_STATIC_INLINE +a0_err_t a0_cnd_timedwait_impl(a0_cnd_t* cnd, a0_mtx_t* mtx, const a0_time_mono_t* timeout) { + const uint32_t init_cnd = a0_atomic_load(cnd); + + // Unblock other threads to do the things that will eventually signal this wait. + a0_err_t err = a0_mtx_unlock(mtx); + if (err) { + return err; + } + + __tsan_mutex_pre_lock(mtx, 0); + robust_op_start(mtx); + + do { + // Priority-inheritance-aware wait until awoken or timeout. + err = a0_ftx_wait_requeue_pi(cnd, init_cnd, timeout, &mtx->ftx); + } while (A0_SYSERR(err) == EINTR); + + // We need to manually lock on timeout. + // Note: We keep the timeout error. + if (A0_SYSERR(err) == ETIMEDOUT) { + a0_mtx_timedlock_robust(mtx, NULL); + } + // Someone else grabbed and mutated the resource between the unlock and wait. + // No need to wait. + if (A0_SYSERR(err) == EAGAIN) { + err = a0_mtx_timedlock_robust(mtx, NULL); + } + + robust_op_add(mtx); + + // If no higher priority error, check the previous owner didn't die. + if (!err) { + err = ftx_owner_died(a0_atomic_load(&mtx->ftx)) ? EOWNERDEAD : A0_OK; + } + + robust_op_end(mtx); + __tsan_mutex_post_lock(mtx, 0, 0); + return err; +} + +a0_err_t a0_cnd_timedwait(a0_cnd_t* cnd, a0_mtx_t* mtx, a0_time_mono_t timeout) { + // Let's not unlock the mutex if we're going to get EINVAL due to a bad timeout. + if ((timeout.ts.tv_sec < 0 || timeout.ts.tv_nsec < 0 || (!timeout.ts.tv_sec && !timeout.ts.tv_nsec) || timeout.ts.tv_nsec >= NS_PER_SEC)) { + return A0_MAKE_SYSERR(EINVAL); + } + return a0_cnd_timedwait_impl(cnd, mtx, &timeout); +} + +a0_err_t a0_cnd_wait(a0_cnd_t* cnd, a0_mtx_t* mtx) { + return a0_cnd_timedwait_impl(cnd, mtx, NULL); +} + +A0_STATIC_INLINE +a0_err_t a0_cnd_wake(a0_cnd_t* cnd, a0_mtx_t* mtx, uint32_t cnt) { + uint32_t val = a0_atomic_add_fetch(cnd, 1); + + while (true) { + a0_err_t err = a0_ftx_cmp_requeue_pi(cnd, val, &mtx->ftx, cnt); + if (A0_SYSERR(err) != EAGAIN) { + return err; + } + + // Another thread is also trying to wake this condition variable. + val = a0_atomic_load(cnd); + } +} + +a0_err_t a0_cnd_signal(a0_cnd_t* cnd, a0_mtx_t* mtx) { + return a0_cnd_wake(cnd, mtx, 1); +} + +a0_err_t a0_cnd_broadcast(a0_cnd_t* cnd, a0_mtx_t* mtx) { + return a0_cnd_wake(cnd, mtx, INT_MAX); +} diff --git a/src/libipc/platform/linux/a0/mtx.h b/src/libipc/platform/linux/a0/mtx.h new file mode 100644 index 0000000..d27ce17 --- /dev/null +++ b/src/libipc/platform/linux/a0/mtx.h @@ -0,0 +1,57 @@ +#ifndef A0_MTX_H +#define A0_MTX_H + +#include "a0/err.h" +#include "a0/time.h" +#include "a0/unused.h" + +#ifdef __cplusplus +extern "C" { +#endif + +typedef uint32_t a0_ftx_t; + +// https://stackoverflow.com/questions/61645966/is-typedef-allowed-before-definition +struct a0_mtx_s; + +typedef struct a0_mtx_s a0_mtx_t; + +// Mutex implementation designed for IPC. +// +// Similar to pthread_mutex_t with the following flags fixed: +// * Process shared. +// * Robust. +// * Error checking. +// * Priority inheriting. +// +// Unlike pthread_mutex_t, timespec are expected to use CLOCK_BOOTTIME. +// +// struct a0_mtx_s "Inherits" from robust_list, which requires: +// * The first field MUST be a next pointer. +// * There must be a futex, which makes the mutex immovable. +// +// Note: a mutex MUST be unlocked before being freed or unmapped. +struct a0_mtx_s { + a0_mtx_t* next; + a0_mtx_t* prev; + a0_ftx_t ftx; +}; + +a0_err_t a0_mtx_lock(a0_mtx_t*) A0_WARN_UNUSED_RESULT; +a0_err_t a0_mtx_timedlock(a0_mtx_t*, a0_time_mono_t) A0_WARN_UNUSED_RESULT; +a0_err_t a0_mtx_trylock(a0_mtx_t*) A0_WARN_UNUSED_RESULT; +a0_err_t a0_mtx_consistent(a0_mtx_t*); +a0_err_t a0_mtx_unlock(a0_mtx_t*); + +typedef a0_ftx_t a0_cnd_t; + +a0_err_t a0_cnd_wait(a0_cnd_t*, a0_mtx_t*); +a0_err_t a0_cnd_timedwait(a0_cnd_t*, a0_mtx_t*, a0_time_mono_t); +a0_err_t a0_cnd_signal(a0_cnd_t*, a0_mtx_t*); +a0_err_t a0_cnd_broadcast(a0_cnd_t*, a0_mtx_t*); + +#ifdef __cplusplus +} +#endif + +#endif // A0_MTX_H diff --git a/src/libipc/platform/linux/a0/strconv.c b/src/libipc/platform/linux/a0/strconv.c new file mode 100644 index 0000000..70eedbf --- /dev/null +++ b/src/libipc/platform/linux/a0/strconv.c @@ -0,0 +1,64 @@ +#include "strconv.h" + +#include "a0/err.h" + +#include + +static const char DECIMAL_DIGITS[] = + "00010203040506070809" + "10111213141516171819" + "20212223242526272829" + "30313233343536373839" + "40414243444546474849" + "50515253545556575859" + "60616263646566676869" + "70717273747576777879" + "80818283848586878889" + "90919293949596979899"; + +a0_err_t a0_u32_to_str(uint32_t val, char* buf_start, char* buf_end, char** start_ptr) { + return a0_u64_to_str(val, buf_start, buf_end, start_ptr); +} + +a0_err_t a0_u64_to_str(uint64_t val, char* buf_start, char* buf_end, char** start_ptr) { + uint64_t orig_val = val; + char* ptr = buf_end; + while (val >= 10) { + ptr -= 2; + memcpy(ptr, &DECIMAL_DIGITS[2 * (val % 100)], sizeof(uint16_t)); + val /= 100; + } + if (val) { + *--ptr = (char)('0' + val); + } + memset(buf_start, '0', ptr - buf_start); + ptr -= (!orig_val); + if (start_ptr) { + *start_ptr = ptr; + } + return A0_OK; +} + +a0_err_t a0_str_to_u32(const char* start, const char* end, uint32_t* out) { + *out = 0; + for (const char* ptr = start; ptr < end; ptr++) { + if (*ptr < '0' || *ptr > '9') { + return A0_ERR_INVALID_ARG; + } + *out *= 10; + *out += *ptr - '0'; + } + return A0_OK; +} + +a0_err_t a0_str_to_u64(const char* start, const char* end, uint64_t* out) { + *out = 0; + for (const char* ptr = start; ptr < end; ptr++) { + if (*ptr < '0' || *ptr > '9') { + return A0_ERR_INVALID_ARG; + } + *out *= 10; + *out += *ptr - '0'; + } + return A0_OK; +} diff --git a/src/libipc/platform/linux/a0/strconv.h b/src/libipc/platform/linux/a0/strconv.h new file mode 100644 index 0000000..2f5b743 --- /dev/null +++ b/src/libipc/platform/linux/a0/strconv.h @@ -0,0 +1,31 @@ +#ifndef A0_SRC_STRCONV_H +#define A0_SRC_STRCONV_H + +#include "a0/err.h" + +#include + +#ifdef __cplusplus +extern "C" { +#endif + +// Converts a uint32 or uint64 to string. +// The entire buffer will be populated, if not with given the value, then with '0'. +// Populates the buffer from the back. +// start_ptr, if not null, will be set to the point within the buffer where the number starts. +// Does NOT check for overflow. +a0_err_t a0_u32_to_str(uint32_t val, char* buf_start, char* buf_end, char** start_ptr); +a0_err_t a0_u64_to_str(uint64_t val, char* buf_start, char* buf_end, char** start_ptr); + +// Converts a string to uint32 or uint64. +// The string may have leading '0's. +// Returns A0_ERR_INVALID_ARG if any character is not a digit. +// Does NOT check for overflow. +a0_err_t a0_str_to_u32(const char* start, const char* end, uint32_t* out); +a0_err_t a0_str_to_u64(const char* start, const char* end, uint64_t* out); + +#ifdef __cplusplus +} +#endif + +#endif // A0_SRC_STRCONV_H diff --git a/src/libipc/platform/linux/a0/thread_local.h b/src/libipc/platform/linux/a0/thread_local.h new file mode 100644 index 0000000..8e49813 --- /dev/null +++ b/src/libipc/platform/linux/a0/thread_local.h @@ -0,0 +1,10 @@ +#ifndef A0_THREAD_LOCAL_H +#define A0_THREAD_LOCAL_H + +#ifdef __cplusplus +#define A0_THREAD_LOCAL thread_local +#else +#define A0_THREAD_LOCAL _Thread_local +#endif // __cplusplus + +#endif // A0_THREAD_LOCAL_H diff --git a/src/libipc/platform/linux/a0/tid.c b/src/libipc/platform/linux/a0/tid.c new file mode 100644 index 0000000..9ad016a --- /dev/null +++ b/src/libipc/platform/linux/a0/tid.c @@ -0,0 +1,30 @@ +#include "a0/inline.h" +#include "a0/thread_local.h" +#include "a0/tid.h" + +#include +#include +#include +#include +#include + +A0_THREAD_LOCAL uint32_t a0_tid_cache = 0; +static pthread_once_t a0_tid_reset_atfork_once; + +A0_STATIC_INLINE +void a0_tid_reset() { + a0_tid_cache = 0; +} + +A0_STATIC_INLINE +void a0_tid_reset_atfork() { + pthread_atfork(NULL, NULL, &a0_tid_reset); +} + +uint32_t a0_tid() { + if (!a0_tid_cache) { + a0_tid_cache = syscall(SYS_gettid); + pthread_once(&a0_tid_reset_atfork_once, a0_tid_reset_atfork); + } + return a0_tid_cache; +} diff --git a/src/libipc/platform/linux/a0/tid.h b/src/libipc/platform/linux/a0/tid.h new file mode 100644 index 0000000..11cb71c --- /dev/null +++ b/src/libipc/platform/linux/a0/tid.h @@ -0,0 +1,16 @@ +#ifndef A0_TID_H +#define A0_TID_H + +#include + +#ifdef __cplusplus +extern "C" { +#endif + +uint32_t a0_tid(); + +#ifdef __cplusplus +} +#endif + +#endif // A0_TID_H diff --git a/src/libipc/platform/linux/a0/time.c b/src/libipc/platform/linux/a0/time.c new file mode 100644 index 0000000..09a79b4 --- /dev/null +++ b/src/libipc/platform/linux/a0/time.c @@ -0,0 +1,124 @@ +#include "a0/empty.h" +#include "a0/err.h" +#include "a0/time.h" + +#include +#include +#include +#include + +#include "clock.h" +#include "err_macro.h" +#include "strconv.h" + +const char A0_TIME_MONO[] = "a0_time_mono"; + +a0_err_t a0_time_mono_now(a0_time_mono_t* out) { + return a0_clock_now(CLOCK_BOOTTIME, &out->ts); +} + +a0_err_t a0_time_mono_str(a0_time_mono_t time_mono, char mono_str[20]) { + // Mono time as unsigned integer with up to 20 chars: "18446744072709551615" + uint64_t ns = time_mono.ts.tv_sec * NS_PER_SEC + time_mono.ts.tv_nsec; + mono_str[19] = '\0'; + return a0_u64_to_str(ns, mono_str, mono_str + 19, NULL); +} + +a0_err_t a0_time_mono_parse(const char mono_str[20], a0_time_mono_t* out) { + uint64_t ns; + A0_RETURN_ERR_ON_ERR(a0_str_to_u64(mono_str, mono_str + 19, &ns)); + out->ts.tv_sec = ns / NS_PER_SEC; + out->ts.tv_nsec = ns % NS_PER_SEC; + return A0_OK; +} + +a0_err_t a0_time_mono_add(a0_time_mono_t time_mono, int64_t add_nsec, a0_time_mono_t* out) { + return a0_clock_add(time_mono.ts, add_nsec, &out->ts); +} + +const char A0_TIME_WALL[] = "a0_time_wall"; + +a0_err_t a0_time_wall_now(a0_time_wall_t* out) { + A0_RETURN_SYSERR_ON_MINUS_ONE(clock_gettime(CLOCK_REALTIME, &out->ts)); + return A0_OK; +} + +a0_err_t a0_time_wall_str(a0_time_wall_t wall_time, char wall_str[36]) { + // Wall time in RFC 3999 Nano: "2006-01-02T15:04:05.999999999-07:00" + struct tm wall_tm; + gmtime_r(&wall_time.ts.tv_sec, &wall_tm); + + strftime(&wall_str[0], 20, "%Y-%m-%dT%H:%M:%S", &wall_tm); + snprintf(&wall_str[19], 17, ".%09ld-00:00", wall_time.ts.tv_nsec); + wall_str[35] = '\0'; + + return A0_OK; +} + +a0_err_t a0_time_wall_parse(const char wall_str[36], a0_time_wall_t* out) { + // strptime requires _GNU_SOURCE, which we don't want, so we do it our selves. + // Hard code "%Y-%m-%dT%H:%M:%S" + ".%09ld-00:00" pattern. + + struct tm wall_tm = A0_EMPTY; + // %Y + A0_RETURN_ERR_ON_ERR(a0_str_to_u32(wall_str + 0, wall_str + 4, (uint32_t*)&wall_tm.tm_year)); + wall_tm.tm_year -= 1900; + // - + if (wall_str[4] != '-') { + return A0_ERR_INVALID_ARG; + } + // %m + A0_RETURN_ERR_ON_ERR(a0_str_to_u32(wall_str + 5, wall_str + 7, (uint32_t*)&wall_tm.tm_mon)); + if (wall_tm.tm_mon < 1 || wall_tm.tm_mon > 12) { + return A0_ERR_INVALID_ARG; + } + wall_tm.tm_mon--; + // - + if (wall_str[7] != '-') { + return A0_ERR_INVALID_ARG; + } + // %d + A0_RETURN_ERR_ON_ERR(a0_str_to_u32(wall_str + 8, wall_str + 10, (uint32_t*)&wall_tm.tm_mday)); + if (wall_tm.tm_mday < 1 || wall_tm.tm_mday > 31) { + return A0_ERR_INVALID_ARG; + } + // T + if (wall_str[10] != 'T') { + return A0_ERR_INVALID_ARG; + } + // %H + A0_RETURN_ERR_ON_ERR(a0_str_to_u32(wall_str + 11, wall_str + 13, (uint32_t*)&wall_tm.tm_hour)); + if (wall_tm.tm_hour > 24) { + return A0_ERR_INVALID_ARG; + } + // : + if (wall_str[13] != ':') { + return A0_ERR_INVALID_ARG; + } + // %M + A0_RETURN_ERR_ON_ERR(a0_str_to_u32(wall_str + 14, wall_str + 16, (uint32_t*)&wall_tm.tm_min)); + if (wall_tm.tm_min > 60) { + return A0_ERR_INVALID_ARG; + } + // : + if (wall_str[16] != ':') { + return A0_ERR_INVALID_ARG; + } + // %S + A0_RETURN_ERR_ON_ERR(a0_str_to_u32(wall_str + 17, wall_str + 19, (uint32_t*)&wall_tm.tm_sec)); + if (wall_tm.tm_sec > 61) { + return A0_ERR_INVALID_ARG; + } + // . + if (wall_str[19] != '.') { + return A0_ERR_INVALID_ARG; + } + + if (memcmp(wall_str + 29, "-00:00", 6) != 0) { + return A0_ERR_INVALID_ARG; + } + + // Use timegm, cause it's a pain to compute months/years to seconds. + out->ts.tv_sec = timegm(&wall_tm); + return a0_str_to_u64(wall_str + 20, wall_str + 29, (uint64_t*)&out->ts.tv_nsec); +} diff --git a/src/libipc/platform/linux/a0/time.h b/src/libipc/platform/linux/a0/time.h new file mode 100644 index 0000000..77e49c4 --- /dev/null +++ b/src/libipc/platform/linux/a0/time.h @@ -0,0 +1,97 @@ +/** + * \file time.h + * \rst + * + * Mono Time + * --------- + * + * | Mono time is a number of nanoseconds from machine boottime. + * | This time cannot decrease and duration between ticks is constant. + * | This time is not related to wall clock time. + * | This time is most suitable for measuring durations. + * | + * | As a string, it is represented as a 20 char number: + * | **18446744072709551615** + * | + * | Note that this uses CLOCK_BOOTTIME under the hood, not CLOCK_MONOTONIC. + * + * Wall Time + * --------- + * + * | Wall time is an time object representing human-readable wall clock time. + * | This time can decrease and duration between ticks is not constant. + * | This time is most related to wall clock time. + * | This time is not suitable for measuring durations. + * | + * | As a string, it is represented as a 36 char RFC 3999 Nano / ISO 8601: + * | **2006-01-02T15:04:05.999999999-07:00** + * + * \endrst + */ + +#ifndef A0_TIME_H +#define A0_TIME_H + +#include "a0/err.h" + +#include +#include + +#ifdef __cplusplus +extern "C" { +#endif + +/** \addtogroup TIME_MONO + * @{ + */ + +/// Header key for mono timestamps. +extern const char A0_TIME_MONO[]; + +/// Monotonic timestamp. Despite the name, uses CLOCK_BOOTTIME. +typedef struct a0_time_mono_s { + struct timespec ts; +} a0_time_mono_t; + +/// Get the current mono timestamps. +a0_err_t a0_time_mono_now(a0_time_mono_t*); + +/// Stringify a given mono timestamps. +a0_err_t a0_time_mono_str(a0_time_mono_t, char mono_str[20]); + +/// Parse a stringified mono timestamps. +a0_err_t a0_time_mono_parse(const char mono_str[20], a0_time_mono_t*); + +/// Add a duration in nanoseconds to a mono timestamp. +a0_err_t a0_time_mono_add(a0_time_mono_t, int64_t add_nsec, a0_time_mono_t*); + +/** @}*/ + +/** \addtogroup TIME_WALL + * @{ + */ + +/// Header key for wall timestamps. +extern const char A0_TIME_WALL[]; + +/// Wall clock timestamp. +typedef struct a0_time_wall_s { + struct timespec ts; +} a0_time_wall_t; + +/// Get the current wall timestamps. +a0_err_t a0_time_wall_now(a0_time_wall_t*); + +/// Stringify a given wall timestamps. +a0_err_t a0_time_wall_str(a0_time_wall_t, char wall_str[36]); + +/// Parse a stringified wall timestamps. +a0_err_t a0_time_wall_parse(const char wall_str[36], a0_time_wall_t*); + +/** @}*/ + +#ifdef __cplusplus +} +#endif + +#endif // A0_TRANSPORT_H diff --git a/src/libipc/platform/linux/a0/unused.h b/src/libipc/platform/linux/a0/unused.h new file mode 100644 index 0000000..8b1025a --- /dev/null +++ b/src/libipc/platform/linux/a0/unused.h @@ -0,0 +1,7 @@ +#ifndef A0_UNUSED_H +#define A0_UNUSED_H + +#define A0_WARN_UNUSED_RESULT __attribute__((warn_unused_result)) +#define A0_MAYBE_UNUSED(X) ((void)(X)) + +#endif // A0_UNUSED_H diff --git a/src/libipc/platform/linux/condition.h b/src/libipc/platform/linux/condition.h new file mode 100644 index 0000000..c4f00ca --- /dev/null +++ b/src/libipc/platform/linux/condition.h @@ -0,0 +1,66 @@ +#pragma once + +#include "libipc/utility/log.h" +#include "libipc/mutex.h" + +#include "get_wait_time.h" +#include "sync_obj_impl.h" + +#include "a0/err_macro.h" +#include "a0/mtx.h" + +namespace ipc { +namespace detail { +namespace sync { + +class condition : public sync::obj_impl { +public: + condition() = default; + ~condition() = default; + + bool wait(ipc::sync::mutex &mtx, std::uint64_t tm) noexcept { + if (!valid()) return false; + if (tm == invalid_value) { + int eno = A0_SYSERR(a0_cnd_wait(native(), static_cast(mtx.native()))); + if (eno != 0) { + ipc::error("fail condition wait[%d]\n", eno); + return false; + } + } else { + auto ts = detail::make_timespec(tm); + int eno = A0_SYSERR(a0_cnd_timedwait(native(), static_cast(mtx.native()), {ts})); + if (eno != 0) { + if (eno != ETIMEDOUT) { + ipc::error("fail condition timedwait[%d]: tm = %zd, tv_sec = %ld, tv_nsec = %ld\n", + eno, tm, ts.tv_sec, ts.tv_nsec); + } + return false; + } + } + return true; + } + + bool notify(ipc::sync::mutex &mtx) noexcept { + if (!valid()) return false; + int eno = A0_SYSERR(a0_cnd_signal(native(), static_cast(mtx.native()))); + if (eno != 0) { + ipc::error("fail condition notify[%d]\n", eno); + return false; + } + return true; + } + + bool broadcast(ipc::sync::mutex &mtx) noexcept { + if (!valid()) return false; + int eno = A0_SYSERR(a0_cnd_broadcast(native(), static_cast(mtx.native()))); + if (eno != 0) { + ipc::error("fail condition broadcast[%d]\n", eno); + return false; + } + return true; + } +}; + +} // namespace sync +} // namespace detail +} // namespace ipc diff --git a/src/libipc/platform/linux/get_wait_time.h b/src/libipc/platform/linux/get_wait_time.h new file mode 100644 index 0000000..ffaf2d2 --- /dev/null +++ b/src/libipc/platform/linux/get_wait_time.h @@ -0,0 +1,46 @@ +#pragma once + +#include +#include +#include + +#include "libipc/utility/log.h" + +#include "a0/time.h" +#include "a0/err_macro.h" + +namespace ipc { +namespace detail { + +inline bool calc_wait_time(timespec &ts, std::uint64_t tm /*ms*/) noexcept { + std::int64_t add_ns = static_cast(tm * 1000000ull); + if (add_ns < 0) { + ipc::error("invalid time = " PRIu64 "\n", tm); + return false; + } + a0_time_mono_t now; + int eno = A0_SYSERR(a0_time_mono_now(&now)); + if (eno != 0) { + ipc::error("fail get time[%d]\n", eno); + return false; + } + a0_time_mono_t *target = reinterpret_cast(&ts); + if ((eno = A0_SYSERR(a0_time_mono_add(now, add_ns, target))) != 0) { + ipc::error("fail get time[%d]\n", eno); + return false; + } + return true; +} + +inline timespec make_timespec(std::uint64_t tm /*ms*/) noexcept(false) { + timespec ts {}; + if (!calc_wait_time(ts, tm)) { + ipc::error("fail calc_wait_time: tm = %zd, tv_sec = %ld, tv_nsec = %ld\n", + tm, ts.tv_sec, ts.tv_nsec); + throw std::system_error{static_cast(errno), std::system_category()}; + } + return ts; +} + +} // namespace detail +} // namespace ipc diff --git a/src/libipc/platform/linux/mutex.h b/src/libipc/platform/linux/mutex.h new file mode 100644 index 0000000..91f156b --- /dev/null +++ b/src/libipc/platform/linux/mutex.h @@ -0,0 +1,204 @@ +#pragma once + +#include +#include +#include +#include + +#include "libipc/platform/detail.h" +#include "libipc/utility/log.h" +#include "libipc/memory/resource.h" +#include "libipc/shm.h" + +#include "get_wait_time.h" +#include "sync_obj_impl.h" + +#include "a0/err_macro.h" +#include "a0/mtx.h" + +namespace ipc { +namespace detail { +namespace sync { + +class robust_mutex : public sync::obj_impl { +public: + bool lock(std::uint64_t tm) noexcept { + if (!valid()) return false; + for (;;) { + auto ts = detail::make_timespec(tm); + int eno = A0_SYSERR( + (tm == invalid_value) ? a0_mtx_lock(native()) + : a0_mtx_timedlock(native(), {ts})); + switch (eno) { + case 0: + return true; + case ETIMEDOUT: + return false; + case EOWNERDEAD: { + int eno2 = A0_SYSERR(a0_mtx_consistent(native())); + if (eno2 != 0) { + ipc::error("fail mutex lock[%d] -> consistent[%d]\n", eno, eno2); + return false; + } + int eno3 = A0_SYSERR(a0_mtx_unlock(native())); + if (eno3 != 0) { + ipc::error("fail mutex lock[%d] -> unlock[%d]\n", eno, eno3); + return false; + } + } + break; // loop again + default: + ipc::error("fail mutex lock[%d]\n", eno); + return false; + } + } + } + + bool try_lock() noexcept(false) { + if (!valid()) return false; + int eno = A0_SYSERR(a0_mtx_timedlock(native(), {detail::make_timespec(0)})); + switch (eno) { + case 0: + return true; + case ETIMEDOUT: + return false; + case EOWNERDEAD: { + int eno2 = A0_SYSERR(a0_mtx_consistent(native())); + if (eno2 != 0) { + ipc::error("fail mutex try_lock[%d] -> consistent[%d]\n", eno, eno2); + break; + } + int eno3 = A0_SYSERR(a0_mtx_unlock(native())); + if (eno3 != 0) { + ipc::error("fail mutex try_lock[%d] -> unlock[%d]\n", eno, eno3); + break; + } + } + break; + default: + ipc::error("fail mutex try_lock[%d]\n", eno); + break; + } + throw std::system_error{eno, std::system_category()}; + } + + bool unlock() noexcept { + if (!valid()) return false; + int eno = A0_SYSERR(a0_mtx_unlock(native())); + if (eno != 0) { + ipc::error("fail mutex unlock[%d]\n", eno); + return false; + } + return true; + } +}; + +class mutex { + robust_mutex *mutex_ = nullptr; + std::atomic *ref_ = nullptr; + + struct curr_prog { + struct shm_data { + robust_mutex mtx; + std::atomic ref; + + struct init { + char const *name; + }; + shm_data(init arg) + : mtx{}, ref{0} { mtx.open(arg.name); } + }; + ipc::map mutex_handles; + std::mutex lock; + + static curr_prog &get() { + static curr_prog info; + return info; + } + }; + + void acquire_mutex(char const *name) { + if (name == nullptr) { + return; + } + auto &info = curr_prog::get(); + IPC_UNUSED_ std::lock_guard guard {info.lock}; + auto it = info.mutex_handles.find(name); + if (it == info.mutex_handles.end()) { + it = curr_prog::get().mutex_handles.emplace(name, + curr_prog::shm_data::init{name}).first; + } + mutex_ = &it->second.mtx; + ref_ = &it->second.ref; + } + + template + void release_mutex(ipc::string const &name, F &&clear) { + if (name.empty()) return; + IPC_UNUSED_ std::lock_guard guard {curr_prog::get().lock}; + auto it = curr_prog::get().mutex_handles.find(name); + if (it == curr_prog::get().mutex_handles.end()) { + return; + } + if (clear()) { + curr_prog::get().mutex_handles.erase(it); + } + } + +public: + mutex() = default; + ~mutex() = default; + + a0_mtx_t const *native() const noexcept { + return valid() ? mutex_->native() : nullptr; + } + + a0_mtx_t *native() noexcept { + return valid() ? mutex_->native() : nullptr; + } + + bool valid() const noexcept { + return (mutex_ != nullptr) && (ref_ != nullptr) && mutex_->valid(); + } + + bool open(char const *name) noexcept { + close(); + acquire_mutex(name); + if (!valid()) { + return false; + } + ref_->fetch_add(1, std::memory_order_relaxed); + return true; + } + + void close() noexcept { + if ((mutex_ != nullptr) && (ref_ != nullptr)) { + if (mutex_->name() != nullptr) { + release_mutex(mutex_->name(), [this] { + return ref_->fetch_sub(1, std::memory_order_relaxed) <= 1; + }); + } else mutex_->close(); + } + mutex_ = nullptr; + ref_ = nullptr; + } + + bool lock(std::uint64_t tm) noexcept { + if (!valid()) return false; + return mutex_->lock(tm); + } + + bool try_lock() noexcept(false) { + if (!valid()) return false; + return mutex_->try_lock(); + } + + bool unlock() noexcept { + if (!valid()) return false; + return mutex_->unlock(); + } +}; + +} // namespace sync +} // namespace detail +} // namespace ipc diff --git a/src/libipc/platform/linux/sync_obj_impl.h b/src/libipc/platform/linux/sync_obj_impl.h new file mode 100644 index 0000000..e28f95d --- /dev/null +++ b/src/libipc/platform/linux/sync_obj_impl.h @@ -0,0 +1,69 @@ +#pragma once + +#include "libipc/utility/log.h" +#include "libipc/shm.h" + +#include "a0/empty.h" + +namespace ipc { +namespace detail { +namespace sync { + +template +class obj_impl { +public: + using sync_t = SyncT; + +protected: + ipc::shm::handle shm_; + sync_t *h_ = nullptr; + + sync_t *acquire_handle(char const *name) { + if (!shm_.acquire(name, sizeof(sync_t))) { + ipc::error("[acquire_handle] fail shm.acquire: %s\n", name); + return nullptr; + } + return static_cast(shm_.get()); + } + +public: + obj_impl() = default; + ~obj_impl() = default; + + sync_t const *native() const noexcept { + return h_; + } + + sync_t *native() noexcept { + return h_; + } + + char const *name() const noexcept { + return shm_.name(); + } + + bool valid() const noexcept { + return h_ != nullptr; + } + + bool open(char const *name) noexcept { + close(); + if ((h_ = acquire_handle(name)) == nullptr) { + return false; + } + if (shm_.ref() > 1) { + return true; + } + *h_ = A0_EMPTY; + return true; + } + + void close() noexcept { + shm_.release(); + h_ = nullptr; + } +}; + +} // namespace sync +} // namespace detail +} // namespace ipc diff --git a/src/libipc/platform/platform.c b/src/libipc/platform/platform.c new file mode 100644 index 0000000..49cc977 --- /dev/null +++ b/src/libipc/platform/platform.c @@ -0,0 +1,13 @@ + +#include "libipc/platform/detail.h" +#if defined(IPC_OS_WINDOWS_) +#elif defined(IPC_OS_LINUX_) +#include "libipc/platform/linux/a0/err.c" +#include "libipc/platform/linux/a0/mtx.c" +#include "libipc/platform/linux/a0/strconv.c" +#include "libipc/platform/linux/a0/tid.c" +#include "libipc/platform/linux/a0/time.c" +#elif defined(IPC_OS_QNX_) +#else/*IPC_OS*/ +# error "Unsupported platform." +#endif diff --git a/src/libipc/platform/platform.cpp b/src/libipc/platform/platform.cpp new file mode 100644 index 0000000..b77cbc9 --- /dev/null +++ b/src/libipc/platform/platform.cpp @@ -0,0 +1,9 @@ + +#include "libipc/platform/detail.h" +#if defined(IPC_OS_WINDOWS_) +#include "libipc/platform/win/shm_win.cpp" +#elif defined(IPC_OS_LINUX_) || defined(IPC_OS_QNX_) +#include "libipc/platform/posix/shm_posix.cpp" +#else/*IPC_OS*/ +# error "Unsupported platform." +#endif diff --git a/src/libipc/platform/condition_linux.h b/src/libipc/platform/posix/condition.h similarity index 94% rename from src/libipc/platform/condition_linux.h rename to src/libipc/platform/posix/condition.h index d9d4280..d283d68 100644 --- a/src/libipc/platform/condition_linux.h +++ b/src/libipc/platform/posix/condition.h @@ -5,12 +5,13 @@ #include -#include "libipc/platform/get_wait_time.h" #include "libipc/utility/log.h" #include "libipc/utility/scope_guard.h" #include "libipc/mutex.h" #include "libipc/shm.h" +#include "get_wait_time.h" + namespace ipc { namespace detail { namespace sync { @@ -62,7 +63,7 @@ public: ipc::error("fail pthread_condattr_init[%d]\n", eno); return false; } - IPC_UNUSED_ auto guard_cond_attr = unique_ptr(&cond_attr, ::pthread_condattr_destroy); + IPC_UNUSED_ auto guard_cond_attr = guard([&cond_attr] { ::pthread_condattr_destroy(&cond_attr); }); if ((eno = ::pthread_condattr_setpshared(&cond_attr, PTHREAD_PROCESS_SHARED)) != 0) { ipc::error("fail pthread_condattr_setpshared[%d]\n", eno); return false; @@ -114,7 +115,7 @@ public: return true; } - bool notify() noexcept { + bool notify(ipc::sync::mutex &) noexcept { if (!valid()) return false; int eno; if ((eno = ::pthread_cond_signal(cond_)) != 0) { @@ -124,7 +125,7 @@ public: return true; } - bool broadcast() noexcept { + bool broadcast(ipc::sync::mutex &) noexcept { if (!valid()) return false; int eno; if ((eno = ::pthread_cond_broadcast(cond_)) != 0) { diff --git a/src/libipc/platform/get_wait_time.h b/src/libipc/platform/posix/get_wait_time.h similarity index 100% rename from src/libipc/platform/get_wait_time.h rename to src/libipc/platform/posix/get_wait_time.h diff --git a/src/libipc/platform/mutex_linux.h b/src/libipc/platform/posix/mutex.h similarity index 98% rename from src/libipc/platform/mutex_linux.h rename to src/libipc/platform/posix/mutex.h index 9080f58..8f7525f 100644 --- a/src/libipc/platform/mutex_linux.h +++ b/src/libipc/platform/posix/mutex.h @@ -9,13 +9,14 @@ #include -#include "libipc/platform/get_wait_time.h" #include "libipc/platform/detail.h" #include "libipc/utility/log.h" #include "libipc/utility/scope_guard.h" #include "libipc/memory/resource.h" #include "libipc/shm.h" +#include "get_wait_time.h" + namespace ipc { namespace detail { namespace sync { @@ -114,7 +115,7 @@ public: ipc::error("fail pthread_mutexattr_init[%d]\n", eno); return false; } - IPC_UNUSED_ auto guard_mutex_attr = unique_ptr(&mutex_attr, ::pthread_mutexattr_destroy); + IPC_UNUSED_ auto guard_mutex_attr = guard([&mutex_attr] { ::pthread_mutexattr_destroy(&mutex_attr); }); if ((eno = ::pthread_mutexattr_setpshared(&mutex_attr, PTHREAD_PROCESS_SHARED)) != 0) { ipc::error("fail pthread_mutexattr_setpshared[%d]\n", eno); return false; diff --git a/src/libipc/platform/semaphore_linux.h b/src/libipc/platform/posix/semaphore_impl.h similarity index 98% rename from src/libipc/platform/semaphore_linux.h rename to src/libipc/platform/posix/semaphore_impl.h index cbc4973..d48bcd4 100644 --- a/src/libipc/platform/semaphore_linux.h +++ b/src/libipc/platform/posix/semaphore_impl.h @@ -8,9 +8,10 @@ #include #include "libipc/utility/log.h" -#include "libipc/platform/get_wait_time.h" #include "libipc/shm.h" +#include "get_wait_time.h" + namespace ipc { namespace detail { namespace sync { diff --git a/src/libipc/platform/shm_linux.cpp b/src/libipc/platform/posix/shm_posix.cpp old mode 100755 new mode 100644 similarity index 96% rename from src/libipc/platform/shm_linux.cpp rename to src/libipc/platform/posix/shm_posix.cpp index 4baf8b5..7f70b07 --- a/src/libipc/platform/shm_linux.cpp +++ b/src/libipc/platform/posix/shm_posix.cpp @@ -1,198 +1,197 @@ - -#include -#include -#include -#include -#include -#include -#include - -#include -#include -#include -#include - -#include "libipc/shm.h" -#include "libipc/def.h" -#include "libipc/pool_alloc.h" - -#include "libipc/utility/log.h" -#include "libipc/memory/resource.h" - -namespace { - -struct info_t { - std::atomic acc_; -}; - -struct id_info_t { - int fd_ = -1; - void* mem_ = nullptr; - std::size_t size_ = 0; - ipc::string name_; -}; - -constexpr std::size_t calc_size(std::size_t size) { - return ((((size - 1) / alignof(info_t)) + 1) * alignof(info_t)) + sizeof(info_t); -} - -inline auto& acc_of(void* mem, std::size_t size) { - return reinterpret_cast(static_cast(mem) + size - sizeof(info_t))->acc_; -} - -} // internal-linkage - -namespace ipc { -namespace shm { - -id_t acquire(char const * name, std::size_t size, unsigned mode) { - if (name == nullptr || name[0] == '\0') { - ipc::error("fail acquire: name is empty\n"); - return nullptr; - } - ipc::string op_name = ipc::string{"__IPC_SHM__"} + name; - // Open the object for read-write access. - int flag = O_RDWR; - switch (mode) { - case open: - size = 0; - break; - // The check for the existence of the object, - // and its creation if it does not exist, are performed atomically. - case create: - flag |= O_CREAT | O_EXCL; - break; - // Create the shared memory object if it does not exist. - default: - flag |= O_CREAT; - break; - } - int fd = ::shm_open(op_name.c_str(), flag, S_IRUSR | S_IWUSR | - S_IRGRP | S_IWGRP | - S_IROTH | S_IWOTH); - if (fd == -1) { - ipc::error("fail shm_open[%d]: %s\n", errno, name); - return nullptr; - } - auto ii = mem::alloc(); - ii->fd_ = fd; - ii->size_ = size; - ii->name_ = std::move(op_name); - return ii; -} - -std::int32_t get_ref(id_t id) { - if (id == nullptr) { - return 0; - } - auto ii = static_cast(id); - if (ii->mem_ == nullptr || ii->size_ == 0) { - return 0; - } - return acc_of(ii->mem_, ii->size_).load(std::memory_order_acquire); -} - -void sub_ref(id_t id) { - if (id == nullptr) { - ipc::error("fail sub_ref: invalid id (null)\n"); - return; - } - auto ii = static_cast(id); - if (ii->mem_ == nullptr || ii->size_ == 0) { - ipc::error("fail sub_ref: invalid id (mem = %p, size = %zd)\n", ii->mem_, ii->size_); - return; - } - acc_of(ii->mem_, ii->size_).fetch_sub(1, std::memory_order_acq_rel); -} - -void * get_mem(id_t id, std::size_t * size) { - if (id == nullptr) { - ipc::error("fail get_mem: invalid id (null)\n"); - return nullptr; - } - auto ii = static_cast(id); - if (ii->mem_ != nullptr) { - if (size != nullptr) *size = ii->size_; - return ii->mem_; - } - int fd = ii->fd_; - if (fd == -1) { - ipc::error("fail get_mem: invalid id (fd = -1)\n"); - return nullptr; - } - if (ii->size_ == 0) { - struct stat st; - if (::fstat(fd, &st) != 0) { - ipc::error("fail fstat[%d]: %s, size = %zd\n", errno, ii->name_.c_str(), ii->size_); - return nullptr; - } - ii->size_ = static_cast(st.st_size); - if ((ii->size_ <= sizeof(info_t)) || (ii->size_ % sizeof(info_t))) { - ipc::error("fail get_mem: %s, invalid size = %zd\n", ii->name_.c_str(), ii->size_); - return nullptr; - } - } - else { - ii->size_ = calc_size(ii->size_); - if (::ftruncate(fd, static_cast(ii->size_)) != 0) { - ipc::error("fail ftruncate[%d]: %s, size = %zd\n", errno, ii->name_.c_str(), ii->size_); - return nullptr; - } - } - void* mem = ::mmap(nullptr, ii->size_, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); - if (mem == MAP_FAILED) { - ipc::error("fail mmap[%d]: %s, size = %zd\n", errno, ii->name_.c_str(), ii->size_); - return nullptr; - } - ::close(fd); - ii->fd_ = -1; - ii->mem_ = mem; - if (size != nullptr) *size = ii->size_; - acc_of(mem, ii->size_).fetch_add(1, std::memory_order_release); - return mem; -} - -std::int32_t release(id_t id) { - if (id == nullptr) { - ipc::error("fail release: invalid id (null)\n"); - return -1; - } - std::int32_t ret = -1; - auto ii = static_cast(id); - if (ii->mem_ == nullptr || ii->size_ == 0) { - ipc::error("fail release: invalid id (mem = %p, size = %zd)\n", ii->mem_, ii->size_); - } - else if ((ret = acc_of(ii->mem_, ii->size_).fetch_sub(1, std::memory_order_acq_rel)) <= 1) { - ::munmap(ii->mem_, ii->size_); - if (!ii->name_.empty()) { - ::shm_unlink(ii->name_.c_str()); - } - } - else ::munmap(ii->mem_, ii->size_); - mem::free(ii); - return ret; -} - -void remove(id_t id) { - if (id == nullptr) { - ipc::error("fail remove: invalid id (null)\n"); - return; - } - auto ii = static_cast(id); - auto name = std::move(ii->name_); - release(id); - if (!name.empty()) { - ::shm_unlink(name.c_str()); - } -} - -void remove(char const * name) { - if (name == nullptr || name[0] == '\0') { - ipc::error("fail remove: name is empty\n"); - return; - } - ::shm_unlink((ipc::string{"__IPC_SHM__"} + name).c_str()); -} - -} // namespace shm -} // namespace ipc + +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +#include "libipc/shm.h" +#include "libipc/def.h" +#include "libipc/pool_alloc.h" + +#include "libipc/utility/log.h" +#include "libipc/memory/resource.h" + +namespace { + +struct info_t { + std::atomic acc_; +}; + +struct id_info_t { + int fd_ = -1; + void* mem_ = nullptr; + std::size_t size_ = 0; + ipc::string name_; +}; + +constexpr std::size_t calc_size(std::size_t size) { + return ((((size - 1) / alignof(info_t)) + 1) * alignof(info_t)) + sizeof(info_t); +} + +inline auto& acc_of(void* mem, std::size_t size) { + return reinterpret_cast(static_cast(mem) + size - sizeof(info_t))->acc_; +} + +} // internal-linkage + +namespace ipc { +namespace shm { + +id_t acquire(char const * name, std::size_t size, unsigned mode) { + if (name == nullptr || name[0] == '\0') { + ipc::error("fail acquire: name is empty\n"); + return nullptr; + } + ipc::string op_name = ipc::string{"__IPC_SHM__"} + name; + // Open the object for read-write access. + int flag = O_RDWR; + switch (mode) { + case open: + size = 0; + break; + // The check for the existence of the object, + // and its creation if it does not exist, are performed atomically. + case create: + flag |= O_CREAT | O_EXCL; + break; + // Create the shared memory object if it does not exist. + default: + flag |= O_CREAT; + break; + } + int fd = ::shm_open(op_name.c_str(), flag, S_IRUSR | S_IWUSR | + S_IRGRP | S_IWGRP | + S_IROTH | S_IWOTH); + if (fd == -1) { + ipc::error("fail shm_open[%d]: %s\n", errno, name); + return nullptr; + } + auto ii = mem::alloc(); + ii->fd_ = fd; + ii->size_ = size; + ii->name_ = std::move(op_name); + return ii; +} + +std::int32_t get_ref(id_t id) { + if (id == nullptr) { + return 0; + } + auto ii = static_cast(id); + if (ii->mem_ == nullptr || ii->size_ == 0) { + return 0; + } + return acc_of(ii->mem_, ii->size_).load(std::memory_order_acquire); +} + +void sub_ref(id_t id) { + if (id == nullptr) { + ipc::error("fail sub_ref: invalid id (null)\n"); + return; + } + auto ii = static_cast(id); + if (ii->mem_ == nullptr || ii->size_ == 0) { + ipc::error("fail sub_ref: invalid id (mem = %p, size = %zd)\n", ii->mem_, ii->size_); + return; + } + acc_of(ii->mem_, ii->size_).fetch_sub(1, std::memory_order_acq_rel); +} + +void * get_mem(id_t id, std::size_t * size) { + if (id == nullptr) { + ipc::error("fail get_mem: invalid id (null)\n"); + return nullptr; + } + auto ii = static_cast(id); + if (ii->mem_ != nullptr) { + if (size != nullptr) *size = ii->size_; + return ii->mem_; + } + int fd = ii->fd_; + if (fd == -1) { + ipc::error("fail get_mem: invalid id (fd = -1)\n"); + return nullptr; + } + if (ii->size_ == 0) { + struct stat st; + if (::fstat(fd, &st) != 0) { + ipc::error("fail fstat[%d]: %s, size = %zd\n", errno, ii->name_.c_str(), ii->size_); + return nullptr; + } + ii->size_ = static_cast(st.st_size); + if ((ii->size_ <= sizeof(info_t)) || (ii->size_ % sizeof(info_t))) { + ipc::error("fail get_mem: %s, invalid size = %zd\n", ii->name_.c_str(), ii->size_); + return nullptr; + } + } + else { + ii->size_ = calc_size(ii->size_); + if (::ftruncate(fd, static_cast(ii->size_)) != 0) { + ipc::error("fail ftruncate[%d]: %s, size = %zd\n", errno, ii->name_.c_str(), ii->size_); + return nullptr; + } + } + void* mem = ::mmap(nullptr, ii->size_, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); + if (mem == MAP_FAILED) { + ipc::error("fail mmap[%d]: %s, size = %zd\n", errno, ii->name_.c_str(), ii->size_); + return nullptr; + } + ::close(fd); + ii->fd_ = -1; + ii->mem_ = mem; + if (size != nullptr) *size = ii->size_; + acc_of(mem, ii->size_).fetch_add(1, std::memory_order_release); + return mem; +} + +std::int32_t release(id_t id) { + if (id == nullptr) { + ipc::error("fail release: invalid id (null)\n"); + return -1; + } + std::int32_t ret = -1; + auto ii = static_cast(id); + if (ii->mem_ == nullptr || ii->size_ == 0) { + ipc::error("fail release: invalid id (mem = %p, size = %zd)\n", ii->mem_, ii->size_); + } + else if ((ret = acc_of(ii->mem_, ii->size_).fetch_sub(1, std::memory_order_acq_rel)) <= 1) { + ::munmap(ii->mem_, ii->size_); + if (!ii->name_.empty()) { + ::shm_unlink(ii->name_.c_str()); + } + } + else ::munmap(ii->mem_, ii->size_); + mem::free(ii); + return ret; +} + +void remove(id_t id) { + if (id == nullptr) { + ipc::error("fail remove: invalid id (null)\n"); + return; + } + auto ii = static_cast(id); + auto name = std::move(ii->name_); + release(id); + if (!name.empty()) { + ::shm_unlink(name.c_str()); + } +} + +void remove(char const * name) { + if (name == nullptr || name[0] == '\0') { + ipc::error("fail remove: name is empty\n"); + return; + } + ::shm_unlink((ipc::string{"__IPC_SHM__"} + name).c_str()); +} + +} // namespace shm +} // namespace ipc diff --git a/src/libipc/platform/condition_win.h b/src/libipc/platform/win/condition.h similarity index 97% rename from src/libipc/platform/condition_win.h rename to src/libipc/platform/win/condition.h index 5d82d47..e477bff 100644 --- a/src/libipc/platform/condition_win.h +++ b/src/libipc/platform/win/condition.h @@ -89,7 +89,7 @@ public: return rs && rl; } - bool notify() noexcept { + bool notify(ipc::sync::mutex &) noexcept { if (!valid()) return false; auto &cnt = counter(); if (!lock_.lock()) return false; @@ -101,7 +101,7 @@ public: return lock_.unlock() && ret; } - bool broadcast() noexcept { + bool broadcast(ipc::sync::mutex &) noexcept { if (!valid()) return false; auto &cnt = counter(); if (!lock_.lock()) return false; diff --git a/src/libipc/platform/get_sa.h b/src/libipc/platform/win/get_sa.h similarity index 100% rename from src/libipc/platform/get_sa.h rename to src/libipc/platform/win/get_sa.h diff --git a/src/libipc/platform/mutex_win.h b/src/libipc/platform/win/mutex.h similarity index 96% rename from src/libipc/platform/mutex_win.h rename to src/libipc/platform/win/mutex.h index b648c3b..68968c4 100644 --- a/src/libipc/platform/mutex_win.h +++ b/src/libipc/platform/win/mutex.h @@ -7,8 +7,8 @@ #include "libipc/utility/log.h" -#include "libipc/platform/to_tchar.h" -#include "libipc/platform/get_sa.h" +#include "to_tchar.h" +#include "get_sa.h" namespace ipc { namespace detail { diff --git a/src/libipc/platform/semaphore_win.h b/src/libipc/platform/win/semaphore.h similarity index 95% rename from src/libipc/platform/semaphore_win.h rename to src/libipc/platform/win/semaphore.h index 9a91eba..47fd45d 100644 --- a/src/libipc/platform/semaphore_win.h +++ b/src/libipc/platform/win/semaphore.h @@ -6,8 +6,8 @@ #include "libipc/utility/log.h" -#include "libipc/platform/to_tchar.h" -#include "libipc/platform/get_sa.h" +#include "to_tchar.h" +#include "get_sa.h" namespace ipc { namespace detail { diff --git a/src/libipc/platform/shm_win.cpp b/src/libipc/platform/win/shm_win.cpp similarity index 94% rename from src/libipc/platform/shm_win.cpp rename to src/libipc/platform/win/shm_win.cpp index ae49268..366e8bd 100755 --- a/src/libipc/platform/shm_win.cpp +++ b/src/libipc/platform/win/shm_win.cpp @@ -9,10 +9,11 @@ #include "libipc/pool_alloc.h" #include "libipc/utility/log.h" -#include "libipc/platform/to_tchar.h" -#include "libipc/platform/get_sa.h" #include "libipc/memory/resource.h" +#include "to_tchar.h" +#include "get_sa.h" + namespace { struct id_info_t { diff --git a/src/libipc/platform/to_tchar.h b/src/libipc/platform/win/to_tchar.h similarity index 97% rename from src/libipc/platform/to_tchar.h rename to src/libipc/platform/win/to_tchar.h index 61def06..892177e 100755 --- a/src/libipc/platform/to_tchar.h +++ b/src/libipc/platform/win/to_tchar.h @@ -11,8 +11,8 @@ #include #include "libipc/utility/concept.h" -#include "libipc/platform/detail.h" #include "libipc/memory/resource.h" +#include "libipc/platform/detail.h" namespace ipc { namespace detail { diff --git a/src/libipc/sync/condition.cpp b/src/libipc/sync/condition.cpp index 2859d21..06d90e1 100644 --- a/src/libipc/sync/condition.cpp +++ b/src/libipc/sync/condition.cpp @@ -5,10 +5,12 @@ #include "libipc/memory/resource.h" #include "libipc/platform/detail.h" #if defined(IPC_OS_WINDOWS_) -#include "libipc/platform/condition_win.h" +#include "libipc/platform/win/condition.h" #elif defined(IPC_OS_LINUX_) -#include "libipc/platform/condition_linux.h" -#else/*linux*/ +#include "libipc/platform/linux/condition.h" +#elif defined(IPC_OS_QNX_) +#include "libipc/platform/posix/condition.h" +#else/*IPC_OS*/ # error "Unsupported platform." #endif @@ -58,12 +60,12 @@ bool condition::wait(ipc::sync::mutex &mtx, std::uint64_t tm) noexcept { return impl(p_)->cond_.wait(mtx, tm); } -bool condition::notify() noexcept { - return impl(p_)->cond_.notify(); +bool condition::notify(ipc::sync::mutex &mtx) noexcept { + return impl(p_)->cond_.notify(mtx); } -bool condition::broadcast() noexcept { - return impl(p_)->cond_.broadcast(); +bool condition::broadcast(ipc::sync::mutex &mtx) noexcept { + return impl(p_)->cond_.broadcast(mtx); } } // namespace sync diff --git a/src/libipc/sync/mutex.cpp b/src/libipc/sync/mutex.cpp index 813e334..79f1a91 100644 --- a/src/libipc/sync/mutex.cpp +++ b/src/libipc/sync/mutex.cpp @@ -5,10 +5,12 @@ #include "libipc/memory/resource.h" #include "libipc/platform/detail.h" #if defined(IPC_OS_WINDOWS_) -#include "libipc/platform/mutex_win.h" +#include "libipc/platform/win/mutex.h" #elif defined(IPC_OS_LINUX_) -#include "libipc/platform/mutex_linux.h" -#else/*linux*/ +#include "libipc/platform/linux/mutex.h" +#elif defined(IPC_OS_QNX_) +#include "libipc/platform/posix/mutex.h" +#else/*IPC_OS*/ # error "Unsupported platform." #endif diff --git a/src/libipc/sync/semaphore.cpp b/src/libipc/sync/semaphore.cpp index 6e86823..23f6bdc 100644 --- a/src/libipc/sync/semaphore.cpp +++ b/src/libipc/sync/semaphore.cpp @@ -5,10 +5,10 @@ #include "libipc/memory/resource.h" #include "libipc/platform/detail.h" #if defined(IPC_OS_WINDOWS_) -#include "libipc/platform/semaphore_win.h" -#elif defined(IPC_OS_LINUX_) -#include "libipc/platform/semaphore_linux.h" -#else/*linux*/ +#include "libipc/platform/win/semaphore.h" +#elif defined(IPC_OS_LINUX_) || defined(IPC_OS_QNX_) +#include "libipc/platform/posix/semaphore_impl.h" +#else/*IPC_OS*/ # error "Unsupported platform." #endif diff --git a/src/libipc/utility/utility.h b/src/libipc/utility/utility.h index 59d4ad1..79424d9 100755 --- a/src/libipc/utility/utility.h +++ b/src/libipc/utility/utility.h @@ -1,8 +1,9 @@ #pragma once -#include // std::forward, std::integer_sequence -#include // std::size_t -#include // std::hardware_destructive_interference_size +#include // std::forward, std::integer_sequence +#include // std::size_t +#include // std::hardware_destructive_interference_size +#include // std::is_trivially_copyable #include "libipc/platform/detail.h" @@ -44,13 +45,15 @@ enum { }; template -T horrible_cast(U val) { +auto horrible_cast(U rhs) noexcept + -> typename std::enable_if::value + && std::is_trivially_copyable::value, T>::type { union { - T out; - U in; - } u; - u.in = val; - return u.out; + T t; + U u; + } r = {}; + r.u = rhs; + return r.t; } IPC_CONSTEXPR_ std::size_t make_align(std::size_t align, std::size_t size) { diff --git a/src/libipc/waiter.h b/src/libipc/waiter.h index 7983702..2e13dc3 100644 --- a/src/libipc/waiter.h +++ b/src/libipc/waiter.h @@ -63,12 +63,12 @@ public: bool notify() noexcept { std::lock_guard{lock_}; // barrier - return cond_.notify(); + return cond_.notify(lock_); } bool broadcast() noexcept { std::lock_guard{lock_}; // barrier - return cond_.broadcast(); + return cond_.broadcast(lock_); } bool quit_waiting() { diff --git a/test/profiler/README.md b/test/profiler/README.md deleted file mode 100644 index d4a6bea..0000000 --- a/test/profiler/README.md +++ /dev/null @@ -1,11 +0,0 @@ -# A Quick Introduction to C++ Performance Tuning -(From: https://github.com/adah1972/cpp_summit_2020.git) - -This repository contains the presentation file and example code for my -presentation at the C++ Summit 2020 held in Shenzhen, China on 4–5 December -2020. - -The presentation content is shared under a [Creative Commons Attribution-Share -Alike 2.5 Licence](http://creativecommons.org/licenses/by-sa/2.5/). The code -is put in the public domain (i.e. do whatever you like with it), though an -acknowledgement will be appreciated (but not required). diff --git a/test/profiler/profiler.cpp b/test/profiler/profiler.cpp deleted file mode 100644 index d8fd7bc..0000000 --- a/test/profiler/profiler.cpp +++ /dev/null @@ -1,77 +0,0 @@ -#include "profiler.h" -#include -#include -#include - -namespace { - -struct profiling_data { - int number; - int call_count{}; - uint64_t call_duration{}; -}; - -class profiler { -public: - profiler(); - ~profiler(); - - void add_data(int number, uint64_t duration); - -private: - std::vector data_; -}; - -profiler::profiler() -{ - size_t len = 0; - for (;;) { - if (name_map[len].name == NULL) { - break; - } - ++len; - } - data_.resize(len); - int i = 0; - for (auto& item : data_) { - assert(i == name_map[i].number); - item.number = i; - ++i; - } -} - -profiler::~profiler() -{ -#ifndef NDEBUG - for (auto& item : data_) { - if (item.call_count == 0) { - continue; - } - std::cout << item.number << " " << name_map[item.number].name - << ":\n"; - std::cout << " Call count: " << item.call_count << '\n'; - std::cout << " Call duration: " << item.call_duration << '\n'; - std::cout << " Average duration: " - << item.call_duration * 1.0 / - (item.call_count != 0 ? item.call_count : 1) - << '\n'; - } -#endif -} - -void profiler::add_data(int number, uint64_t duration) -{ - assert(number >= 0 && number < static_cast(data_.size())); - data_[number].call_count++; - data_[number].call_duration += duration; -} - -profiler profiler_instance; - -} // unnamed namespace - -profiling_checker::~profiling_checker() -{ - auto end_time = rdtsc(); - profiler_instance.add_data(number_, end_time - start_time_); -} diff --git a/test/profiler/profiler.h b/test/profiler/profiler.h deleted file mode 100644 index d04264a..0000000 --- a/test/profiler/profiler.h +++ /dev/null @@ -1,35 +0,0 @@ -#ifndef PROFILER_H -#define PROFILER_H - -#include "rdtsc.h" - -struct name_mapper { - int number; - const char* name; -}; - -extern name_mapper name_map[]; - -class profiling_checker { -public: - profiling_checker(int number); - ~profiling_checker(); - -private: - int number_; - uint64_t start_time_; -}; - -inline profiling_checker::profiling_checker(int number) - : number_(number) -{ - start_time_ = rdtsc(); -} - -#ifdef NDEBUG -#define PROFILE_CHECK(func_number) (void)0 -#else -#define PROFILE_CHECK(func_number) profiling_checker _checker(func_number) -#endif - -#endif // PROFILER_H diff --git a/test/profiler/rdtsc.h b/test/profiler/rdtsc.h deleted file mode 100644 index 80e35c7..0000000 --- a/test/profiler/rdtsc.h +++ /dev/null @@ -1,52 +0,0 @@ -#ifndef RDTSC_H -#define RDTSC_H - -#include // uint64_t - -#if defined(_M_X64) || defined(_M_IX86) || defined(__x86_64) || defined(__i386) -# ifdef _WIN32 -# include // __rdtsc -# else -# include // __rdtsc -# endif -# define HAS_HW_RDTSC 1 -#else -# include // std::chrono::high_resolution_clock -# define HAS_HW_RDTSC 0 -#endif - -inline uint64_t rdtsc() -{ -#if HAS_HW_RDTSC - // _mm_lfence() might be used to serialize the instruction stream, - // and it would guarantee that RDTSC will not be reordered with - // other instructions. However, measurements show that the overhead - // may be too big (easily 15 to 30 CPU cycles) for profiling - // purposes: if reordering matters, the overhead matters too! - - // Forbid the compiler from reordering instructions -# ifdef _MSC_VER - _ReadWriteBarrier(); -# else - __asm__ __volatile__("" : : : "memory"); -# endif - - uint64_t result = __rdtsc(); - - // Forbid the compiler from reordering instructions -# ifdef _MSC_VER - _ReadWriteBarrier(); -# else - __asm__ __volatile__("" : : : "memory"); -# endif - - return result; -#else - auto now = std::chrono::high_resolution_clock::now(); - return std::chrono::duration_cast( - now.time_since_epoch()) - .count(); -#endif -} - -#endif // RDTSC_H diff --git a/test/test_ipc.cpp b/test/test_ipc.cpp index c731198..a1f0108 100755 --- a/test/test_ipc.cpp +++ b/test/test_ipc.cpp @@ -153,16 +153,16 @@ void test_sr(char const * name, int s_cnt, int r_cnt) { TEST(IPC, basic) { test_basic("ssu"); - test_basic("smu"); - test_basic("mmu"); + //test_basic("smu"); + //test_basic("mmu"); test_basic("smb"); test_basic("mmb"); } TEST(IPC, 1v1) { test_sr("ssu", 1, 1); - test_sr("smu", 1, 1); - test_sr("mmu", 1, 1); + //test_sr("smu", 1, 1); + //test_sr("mmu", 1, 1); test_sr("smb", 1, 1); test_sr("mmb", 1, 1); } diff --git a/test/test_platform.cpp b/test/test_platform.cpp index e7a6059..65fa513 100644 --- a/test/test_platform.cpp +++ b/test/test_platform.cpp @@ -8,7 +8,7 @@ #include "test.h" -#include "libipc/platform/to_tchar.h" +#include "libipc/platform/win/to_tchar.h" TEST(Platform, to_tchar) { char const *utf8 = "hello world, " diff --git a/test/test_sync.cpp b/test/test_sync.cpp index 84ace7b..4cb82e6 100644 --- a/test/test_sync.cpp +++ b/test/test_sync.cpp @@ -67,22 +67,22 @@ TEST(Sync, Mutex) { EXPECT_THROW(lock.try_lock(), std::system_error); - int i = 0; - EXPECT_TRUE(lock.lock()); - i = 100; - auto t2 = std::thread{[&i] { - ipc::sync::mutex lock {"test-mutex-robust"}; - EXPECT_TRUE(lock.valid()); - EXPECT_FALSE(lock.try_lock()); - EXPECT_TRUE(lock.lock()); - i += i; - EXPECT_TRUE(lock.unlock()); - }}; - std::this_thread::sleep_for(std::chrono::seconds(1)); - EXPECT_EQ(i, 100); - EXPECT_TRUE(lock.unlock()); - t2.join(); - EXPECT_EQ(i, 200); + // int i = 0; + // EXPECT_TRUE(lock.lock()); + // i = 100; + // auto t2 = std::thread{[&i] { + // ipc::sync::mutex lock {"test-mutex-robust"}; + // EXPECT_TRUE(lock.valid()); + // EXPECT_FALSE(lock.try_lock()); + // EXPECT_TRUE(lock.lock()); + // i += i; + // EXPECT_TRUE(lock.unlock()); + // }}; + // std::this_thread::sleep_for(std::chrono::seconds(1)); + // EXPECT_EQ(i, 100); + // EXPECT_TRUE(lock.unlock()); + // t2.join(); + // EXPECT_EQ(i, 200); } #include "libipc/semaphore.h" @@ -113,7 +113,7 @@ TEST(Sync, Condition) { auto job = [&que](int num) { ipc::sync::condition cond {"test-cond"}; ipc::sync::mutex lock {"test-mutex"}; - for (;;) { + for (int i = 0; i < 10; ++i) { int val = 0; { std::lock_guard guard {lock}; @@ -123,6 +123,19 @@ TEST(Sync, Condition) { val = que.front(); que.pop_front(); } + EXPECT_NE(val, 0); + std::printf("test-cond-%d: %d\n", num, val); + } + for (;;) { + int val = 0; + { + std::lock_guard guard {lock}; + while (que.empty()) { + EXPECT_TRUE(cond.wait(lock, 1000)); + } + val = que.front(); + que.pop_front(); + } if (val == 0) { std::printf("test-cond-%d: exit.\n", num); return; @@ -139,16 +152,16 @@ TEST(Sync, Condition) { { std::lock_guard guard {lock}; que.push_back(i); + ASSERT_TRUE(cond.notify(lock)); } - cond.notify(); std::this_thread::sleep_for(std::chrono::milliseconds(20)); } for (int i = 1; i < 100; ++i) { { std::lock_guard guard {lock}; que.push_back(i); + ASSERT_TRUE(cond.broadcast(lock)); } - cond.broadcast(); std::this_thread::sleep_for(std::chrono::milliseconds(20)); } { @@ -156,8 +169,8 @@ TEST(Sync, Condition) { for (int i = 0; i < (int)test_conds.size(); ++i) { que.push_back(0); } + ASSERT_TRUE(cond.broadcast(lock)); } - cond.broadcast(); for (auto &t : test_conds) t.join(); } @@ -171,7 +184,7 @@ TEST(Sync, ConditionRobust) { printf("XXXXXXXXXXXXXXXXXXXXXXXXXXXXXX 2\n"); ipc::sync::mutex lock {"test-mutex"}; printf("XXXXXXXXXXXXXXXXXXXXXXXXXXXXXX 3\n"); - lock.lock(); + ASSERT_TRUE(lock.lock()); std::thread unlock {[] { printf("WWWWWWWWWWWWWWWWWWWWWWWWWWWWWW 1\n"); ipc::sync::condition cond {"test-cond"}; @@ -183,13 +196,13 @@ TEST(Sync, ConditionRobust) { } std::this_thread::sleep_for(std::chrono::seconds(1)); printf("WWWWWWWWWWWWWWWWWWWWWWWWWWWWWW 4\n"); - cond.broadcast(); + ASSERT_TRUE(cond.broadcast(lock)); printf("WWWWWWWWWWWWWWWWWWWWWWWWWWWWWW 5\n"); }}; printf("XXXXXXXXXXXXXXXXXXXXXXXXXXXXXX 4\n"); - cond.wait(lock); + ASSERT_TRUE(cond.wait(lock)); printf("XXXXXXXXXXXXXXXXXXXXXXXXXXXXXX 5\n"); - lock.unlock(); + ASSERT_TRUE(lock.unlock()); printf("XXXXXXXXXXXXXXXXXXXXXXXXXXXXXX 6\n"); unlock.join(); } \ No newline at end of file