Merge pull request #74 from mutouyun/develop

Develop
This commit is contained in:
木头云 2022-01-08 23:53:33 +08:00 committed by GitHub
commit f2f1af8f8e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
53 changed files with 2188 additions and 485 deletions

View File

@ -27,8 +27,8 @@ public:
void close() noexcept; void close() noexcept;
bool wait(ipc::sync::mutex &mtx, std::uint64_t tm = ipc::invalid_value) noexcept; bool wait(ipc::sync::mutex &mtx, std::uint64_t tm = ipc::invalid_value) noexcept;
bool notify() noexcept; bool notify(ipc::sync::mutex &mtx) noexcept;
bool broadcast() noexcept; bool broadcast(ipc::sync::mutex &mtx) noexcept;
private: private:
class condition_; class condition_;

View File

@ -1,12 +1,8 @@
project(ipc) project(ipc)
if(UNIX)
file(GLOB SRC_FILES ${LIBIPC_PROJECT_DIR}/src/libipc/platform/*_linux.cpp)
else()
file(GLOB SRC_FILES ${LIBIPC_PROJECT_DIR}/src/libipc/platform/*_win.cpp)
endif()
aux_source_directory(${LIBIPC_PROJECT_DIR}/src/libipc SRC_FILES) aux_source_directory(${LIBIPC_PROJECT_DIR}/src/libipc SRC_FILES)
aux_source_directory(${LIBIPC_PROJECT_DIR}/src/libipc/sync SRC_FILES) aux_source_directory(${LIBIPC_PROJECT_DIR}/src/libipc/sync SRC_FILES)
aux_source_directory(${LIBIPC_PROJECT_DIR}/src/libipc/platform SRC_FILES)
file(GLOB HEAD_FILES file(GLOB HEAD_FILES
${LIBIPC_PROJECT_DIR}/include/libipc/*.h ${LIBIPC_PROJECT_DIR}/include/libipc/*.h
@ -33,28 +29,27 @@ set_target_properties(${PROJECT_NAME}
PROPERTIES PROPERTIES
ARCHIVE_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/lib" ARCHIVE_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/lib"
LIBRARY_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/lib" LIBRARY_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/lib"
RUNTIME_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/bin" ) RUNTIME_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/bin")
# set version # set version
set_target_properties(${PROJECT_NAME} set_target_properties(${PROJECT_NAME}
PROPERTIES PROPERTIES
VERSION 1.1.0 VERSION 1.2.0
SOVERSION 2) SOVERSION 3)
target_include_directories(${PROJECT_NAME} target_include_directories(${PROJECT_NAME}
PUBLIC ${LIBIPC_PROJECT_DIR}/include PUBLIC ${LIBIPC_PROJECT_DIR}/include
PRIVATE ${LIBIPC_PROJECT_DIR}/src PRIVATE ${LIBIPC_PROJECT_DIR}/src
) $<$<BOOL:UNIX>:${LIBIPC_PROJECT_DIR}/src/libipc/platform/linux>)
if(NOT MSVC) if(NOT MSVC)
target_link_libraries(${PROJECT_NAME} PUBLIC target_link_libraries(${PROJECT_NAME} PUBLIC
pthread $<$<NOT:$<STREQUAL:${CMAKE_SYSTEM_NAME},QNX>>:pthread>
$<$<NOT:$<STREQUAL:${CMAKE_SYSTEM_NAME},Windows>>:rt>) $<$<NOT:$<OR:$<STREQUAL:${CMAKE_SYSTEM_NAME},Windows>,$<STREQUAL:${CMAKE_SYSTEM_NAME},QNX>>>:rt>)
endif() endif()
install( install(
TARGETS ${PROJECT_NAME} TARGETS ${PROJECT_NAME}
RUNTIME DESTINATION bin RUNTIME DESTINATION bin
LIBRARY DESTINATION lib LIBRARY DESTINATION lib
ARCHIVE DESTINATION lib ARCHIVE DESTINATION lib)
)

View File

@ -687,8 +687,8 @@ buff_t chan_impl<Flag>::try_recv(ipc::handle_t h) {
} }
template struct chan_impl<ipc::wr<relat::single, relat::single, trans::unicast >>; template struct chan_impl<ipc::wr<relat::single, relat::single, trans::unicast >>;
template struct chan_impl<ipc::wr<relat::single, relat::multi , trans::unicast >>; // template struct chan_impl<ipc::wr<relat::single, relat::multi , trans::unicast >>; // TBD
template struct chan_impl<ipc::wr<relat::multi , relat::multi , trans::unicast >>; // template struct chan_impl<ipc::wr<relat::multi , relat::multi , trans::unicast >>; // TBD
template struct chan_impl<ipc::wr<relat::single, relat::multi , trans::broadcast>>; template struct chan_impl<ipc::wr<relat::single, relat::multi , trans::broadcast>>;
template struct chan_impl<ipc::wr<relat::multi , relat::multi , trans::broadcast>>; template struct chan_impl<ipc::wr<relat::multi , relat::multi , trans::broadcast>>;

View File

@ -1,4 +1,22 @@
#pragma once #ifndef LIBIPC_SRC_PLATFORM_DETAIL_H_
#define LIBIPC_SRC_PLATFORM_DETAIL_H_
// detect platform
#if defined(WIN64) || defined(_WIN64) || defined(__WIN64__) || \
defined(WIN32) || defined(_WIN32) || defined(__WIN32__) || defined(__NT__) || \
defined(WINCE) || defined(_WIN32_WCE)
# define IPC_OS_WINDOWS_
#elif defined(__linux__) || defined(__linux)
# define IPC_OS_LINUX_
#elif defined(__QNX__)
# define IPC_OS_QNX_
#elif defined(__APPLE__)
#elif defined(__ANDROID__)
// TBD
#endif
#if defined(__cplusplus)
#include <memory> #include <memory>
#include <mutex> #include <mutex>
@ -22,18 +40,6 @@
# error "IPC_CONSTEXPR_ has been defined." # error "IPC_CONSTEXPR_ has been defined."
#endif #endif
// detect platform
#if defined(WIN64) || defined(_WIN64) || defined(__WIN64__) || \
defined(WIN32) || defined(_WIN32) || defined(__WIN32__) || defined(__NT__) || \
defined(WINCE) || defined(_WIN32_WCE)
# define IPC_OS_WINDOWS_
#endif/*WIN*/
#if defined(__linux__) || defined(__linux)
# define IPC_OS_LINUX_
#endif/*linux*/
#if __cplusplus >= 201703L #if __cplusplus >= 201703L
#define IPC_UNUSED_ [[maybe_unused]] #define IPC_UNUSED_ [[maybe_unused]]
@ -123,17 +129,8 @@ constexpr const T& (min)(const T& a, const T& b) {
#endif/*__cplusplus < 201703L*/ #endif/*__cplusplus < 201703L*/
template <typename T, typename U>
auto horrible_cast(U rhs) noexcept
-> typename std::enable_if<std::is_trivially_copyable<T>::value
&& std::is_trivially_copyable<U>::value, T>::type {
union {
T t;
U u;
} r = {};
r.u = rhs;
return r.t;
}
} // namespace detail } // namespace detail
} // namespace ipc } // namespace ipc
#endif // defined(__cplusplus)
#endif // LIBIPC_SRC_PLATFORM_DETAIL_H_

View File

@ -0,0 +1,24 @@
This is free and unencumbered software released into the public domain.
Anyone is free to copy, modify, publish, use, compile, sell, or
distribute this software, either in source code form or as a compiled
binary, for any purpose, commercial or non-commercial, and by any
means.
In jurisdictions that recognize copyright laws, the author or authors
of this software dedicate any and all copyright interest in the
software to the public domain. We make this dedication for the benefit
of the public at large and to the detriment of our heirs and
successors. We intend this dedication to be an overt act of
relinquishment in perpetuity of all present and future rights to this
software under copyright law.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR
OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
OTHER DEALINGS IN THE SOFTWARE.
For more information, please refer to <http://unlicense.org>

View File

@ -0,0 +1,213 @@
<h1 align="center">
<br>
<img src="https://raw.githubusercontent.com/alephzero/logo/master/rendered/alephzero.svg" width="256px">
<br>
AlephZero
</h1>
<h3 align="center">Simple, Robust, Fast IPC.</h3>
<p align="center">
<a href="https://github.com/alephzero/alephzero/actions?query=workflow%3ACI"><img src="https://github.com/alephzero/alephzero/workflows/CI/badge.svg"></a>
<a href="https://codecov.io/gh/alephzero/alephzero"><img src="https://codecov.io/gh/alephzero/alephzero/branch/master/graph/badge.svg"></a>
<a href="https://alephzero.readthedocs.io/en/latest/?badge=latest"><img src="https://readthedocs.org/projects/alephzero/badge/?version=latest"></a>
<a href="http://unlicense.org"><img src="https://img.shields.io/badge/license-Unlicense-blue.svg"></a>
</p>
<p align="center">
<a href="#overview">Overview</a>
<a href="#transport">Transport</a>
<a href="#protocol">Protocol</a>
<a href="#examples">Examples</a>
<a href="#installation">Installation</a>
<a href="#across-dockers">Across Dockers</a>
</p>
# Overview
[Presentation from March 25, 2020](https://docs.google.com/presentation/d/12KE9UucjZPtpVnM1NljxOqBolBBKECWJdrCoE2yJaBw/edit#slide=id.p)
AlephZero is a library for message based communication between programs running on the same machine.
## Simple
AlephZero's main goal is to be simple to use. Nothing is higher priority.
There is no "master" process in between your nodes that is needed to do handshakes or exchanges of any kind. All you need is the topic name.
See the <a href="#examples">Examples</a>.
## Robust
This is probably the main value of AlephZero, above similar libraries.
AlephZero uses a lot of tricks to ensure the state of all channels is consistent, even when programs die. This includes double-buffering the state of the communication channel and [robustifying](https://man7.org/linux/man-pages/man3/pthread_mutexattr_setrobust.3.html) the locks and notification channels.
## Fast
AlephZero uses shared memory across multiple processes to read and write messages, minimizing the involvement of the kernel. The kernel only really gets involved in notifying a process that a new message exists, and for that we use futex (fast user-space mutex).
TODO: Benchmarks
# Transport
AlephZero, at its core, is a simple allocator on top of a contiguous region of memory. Usually, shared-memory. The allocator of choice is a circular-linked-list, which is fast, simple, and sufficient for the protocol listed below. It also plays well with the robustness requirement.
This has a number of implications. For one, this means that old messages are kept around until the space is needed. The oldest messages are always discarded before any more recent messages.
# Protocol
Rather than exposing the low-level transport directly, AlephZero provides a few higher level protocol:
* <b>PubSub</b>: Broadcast published messages. Subscribers get notified.
* <b>RPC</b>: Request-response.
* <b>PRPC (Progressive RPC)</b>: Request-streaming response.
* <b>Sessions</b>: Bi-directional channel of communication. Not yet implemented. Let me know if you want this.
# Examples
Many more example and an interactive experience can be found at: https://github.com/alephzero/playground
For the curious, here are some simple snippets to get you started:
To begin with, we need to include AlephZero:
```cc
#include <a0.h>
```
## PubSub
You can have as many publisher and subscribers on the same topic as you wish. They just need to agree on the filename.
```cc
a0::Publisher p("my_pubsub_topic");
p.pub("foo");
```
You just published `"foo"` to the `"my_pubsub_topic"`.
To read those message, you can create a subscriber on the same topic:
```cc
a0::Subscriber sub(
"my_pubsub_topic",
A0_INIT_AWAIT_NEW, // or MOST_RECENT or OLDEST
A0_ITER_NEWEST, // or NEXT
[](a0::PacketView pkt_view) {
std::cout << "Got: " << pkt_view.payload() << std::endl;
});
```
The callback will trigger whenever a message is published.
The `Subscriber` object spawns a thread that will read the topic and call the callback.
The `A0_INIT` tells the subscriber where to start reading.
* `A0_INIT_AWAIT_NEW`: Start with messages published after the creation of the subscriber.
* `A0_INIT_MOST_RECENT`: Start with the most recently published message. Useful for state and configuration. But be careful, this can be quite old!
* `A0_INIT_OLDEST`: Topics keep a history of 16MB (unless configures otherwise). Start with the oldest thing still in there.
The `A0_ITER` tells the subscriber how to continue reading messages. After each callback:
* `A0_ITER_NEXT`: grab the sequentially next message. When you don't want to miss a thing.
* `A0_ITER_NEWEST`: grab the newest available unread message. When you want to keep up with the firehose.
```cc
a0::SubscriberSync sub_sync(
"my_pubsub_topic",
A0_INIT_OLDEST, A0_ITER_NEXT);
while (sub_sync.has_next()) {
auto pkt = sub_sync.next();
std::cout << "Got: " << pkt.payload() << std::endl;
}
```
## RPC
Create an `RpcServer`:
```cc
a0::RpcServer server(
"my_rpc_topic",
/* onrequest = */ [](a0::RpcRequest req) {
std::cout << "Got: " << req.pkt().payload() << std::endl;
req.reply("echo " + std::string(req.pkt().payload()));
},
/* oncancel = */ nullptr);
```
Create an `RpcClient`:
```cc
a0::RpcClient client("my_rpc_topic");
client.send("client msg", [](a0::PacketView reply) {
std::cout << "Got: " << reply.payload() << std::endl;
});
```
# Installation
## Install From Source
### Ubuntu Dependencies
```sh
apt install g++ make
```
### Alpine Dependencies
```sh
apk add g++ linux-headers make
```
### Download And Install
```sh
git clone https://github.com/alephzero/alephzero.git
cd alephzero
make install -j
```
## Install From Package
Coming soon-ish. Let me know if you want this and I'll prioritize it. External support is much appreciated.
## Integration
### Command Line
Add the following to g++ / clang commands.
```sh
-L${libdir} -lalephzero -lpthread
```
### Package-cfg
```sh
pkg-config --cflags --libs alephzero
```
### CMake
Coming soon-ish. Let me know if you want this and I'll prioritize it. External support is much appreciated.
### Bazel
Coming soon-ish. Let me know if you want this and I'll prioritize it.
# Across Dockers
For programs running across different dockers to be able to communicate, we need to have them match up on two flags: `--ipc` and `--pid`.
* `--ipc` shares the `/dev/shm` filesystem. This is necessary to open the same file topics.
* `--pid` shares the process id namespace. This is necessary for the locking and notification systems.
In the simplest case, you can set them both to `host` and talk through the system's global `/dev/shm` and process id namespace.
```sh
docker run --ipc=host --pid=host --name=foo foo_image
docker run --ipc=host --pid=host --name=bar bar_image
```
Or, you can mark one as `shareable` and have the others connect to it:
```sh
docker run --ipc=shareable --pid=shareable --name=foo foo_image
docker run --ipc=container:foo --pid=container:foo --name=bar bar_image
```

View File

@ -0,0 +1,36 @@
#ifndef A0_SRC_ATOMIC_H
#define A0_SRC_ATOMIC_H
#include "a0/inline.h"
#ifdef __cplusplus
extern "C" {
#endif
A0_STATIC_INLINE
void a0_barrier() {
// 'atomic_thread_fence' is not supported with -fsanitize=thread
__sync_synchronize();
}
#define a0_atomic_fetch_add(P, V) __atomic_fetch_add((P), (V), __ATOMIC_RELAXED)
#define a0_atomic_add_fetch(P, V) __atomic_add_fetch((P), (V), __ATOMIC_RELAXED)
#define a0_atomic_fetch_and(P, V) __atomic_fetch_and((P), (V), __ATOMIC_RELAXED)
#define a0_atomic_and_fetch(P, V) __atomic_and_fetch((P), (V), __ATOMIC_RELAXED)
#define a0_atomic_fetch_or(P, V) __atomic_fetch_or((P), (V), __ATOMIC_RELAXED)
#define a0_atomic_or_fetch(P, V) __atomic_or_fetch((P), (V), __ATOMIC_RELAXED)
#define a0_atomic_load(P) __atomic_load_n((P), __ATOMIC_RELAXED)
#define a0_atomic_store(P, V) __atomic_store_n((P), (V), __ATOMIC_RELAXED)
// TODO(lshamis): Switch from __sync to __atomic.
#define a0_cas_val(P, OV, NV) __sync_val_compare_and_swap((P), (OV), (NV))
#define a0_cas(P, OV, NV) __sync_bool_compare_and_swap((P), (OV), (NV))
#ifdef __cplusplus
}
#endif
#endif // A0_SRC_ATOMIC_H

View File

@ -0,0 +1,60 @@
#ifndef A0_SRC_CLOCK_H
#define A0_SRC_CLOCK_H
#include "a0/err.h"
#include "a0/inline.h"
#include <stdint.h>
#include <time.h>
#include "err_macro.h"
#ifdef __cplusplus
extern "C" {
#endif
static const int64_t NS_PER_SEC = 1e9;
typedef struct timespec timespec_t;
A0_STATIC_INLINE
a0_err_t a0_clock_now(clockid_t clk, timespec_t* out) {
A0_RETURN_SYSERR_ON_MINUS_ONE(clock_gettime(clk, out));
return A0_OK;
}
A0_STATIC_INLINE
a0_err_t a0_clock_add(timespec_t ts, int64_t add_nsec, timespec_t* out) {
out->tv_sec = ts.tv_sec + add_nsec / NS_PER_SEC;
out->tv_nsec = ts.tv_nsec + add_nsec % NS_PER_SEC;
if (out->tv_nsec >= NS_PER_SEC) {
out->tv_sec++;
out->tv_nsec -= NS_PER_SEC;
} else if (out->tv_nsec < 0) {
out->tv_sec--;
out->tv_nsec += NS_PER_SEC;
}
return A0_OK;
}
A0_STATIC_INLINE
a0_err_t a0_clock_convert(
clockid_t orig_clk,
timespec_t orig_ts,
clockid_t target_clk,
timespec_t* target_ts) {
timespec_t orig_now;
A0_RETURN_ERR_ON_ERR(a0_clock_now(orig_clk, &orig_now));
timespec_t target_now;
A0_RETURN_ERR_ON_ERR(a0_clock_now(target_clk, &target_now));
int64_t add_nsec = (orig_ts.tv_sec - orig_now.tv_sec) * NS_PER_SEC + (orig_ts.tv_nsec - orig_now.tv_nsec);
return a0_clock_add(target_now, add_nsec, target_ts);
}
#ifdef __cplusplus
}
#endif
#endif // A0_SRC_CLOCK_H

View File

@ -0,0 +1,13 @@
#ifndef A0_EMPTY_H
#define A0_EMPTY_H
// Bah. Why is there no consistent way to zero initialize a struct?
#ifdef __cplusplus
#define A0_EMPTY \
{}
#else
#define A0_EMPTY \
{ 0 }
#endif
#endif // A0_EMPTY_H

View File

@ -0,0 +1,50 @@
#include "a0/err.h"
#include "a0/thread_local.h"
#include <errno.h>
#include <string.h>
A0_THREAD_LOCAL int a0_err_syscode;
A0_THREAD_LOCAL char a0_err_msg[1024];
const char* a0_strerror(a0_err_t err) {
switch (err) {
case A0_OK: {
return strerror(0);
}
case A0_ERR_SYS: {
return strerror(a0_err_syscode);
}
case A0_ERR_CUSTOM_MSG: {
return a0_err_msg;
}
case A0_ERR_INVALID_ARG: {
return strerror(EINVAL);
}
case A0_ERR_RANGE: {
return "Index out of bounds";
}
case A0_ERR_AGAIN: {
return "Not available yet";
}
case A0_ERR_FRAME_LARGE: {
return "Frame size too large";
}
case A0_ERR_ITER_DONE: {
return "Done iterating";
}
case A0_ERR_NOT_FOUND: {
return "Not found";
}
case A0_ERR_BAD_PATH: {
return "Invalid path";
}
case A0_ERR_BAD_TOPIC: {
return "Invalid topic name";
}
default: {
break;
}
}
return "";
}

View File

@ -0,0 +1,33 @@
#ifndef A0_ERR_H
#define A0_ERR_H
#include "a0/thread_local.h"
#ifdef __cplusplus
extern "C" {
#endif
typedef enum a0_err_e {
A0_OK = 0,
A0_ERR_SYS = 1,
A0_ERR_CUSTOM_MSG = 2,
A0_ERR_INVALID_ARG = 3,
A0_ERR_RANGE = 4,
A0_ERR_AGAIN = 5,
A0_ERR_ITER_DONE = 6,
A0_ERR_NOT_FOUND = 7,
A0_ERR_FRAME_LARGE = 8,
A0_ERR_BAD_PATH = 9,
A0_ERR_BAD_TOPIC = 10,
} a0_err_t;
extern A0_THREAD_LOCAL int a0_err_syscode;
extern A0_THREAD_LOCAL char a0_err_msg[1024];
const char* a0_strerror(a0_err_t);
#ifdef __cplusplus
}
#endif
#endif // A0_ERR_H

View File

@ -0,0 +1,52 @@
#ifndef A0_SRC_ERR_MACRO_H
#define A0_SRC_ERR_MACRO_H
#include "a0/err.h"
#include "a0/inline.h"
#include <errno.h>
#include <stdarg.h>
#include <stdbool.h>
#include <stdio.h>
A0_STATIC_INLINE
a0_err_t A0_MAKE_SYSERR(int syserr) {
a0_err_syscode = syserr;
return A0_ERR_SYS;
}
A0_STATIC_INLINE
int A0_SYSERR(a0_err_t err) {
return err == A0_ERR_SYS ? a0_err_syscode : 0;
}
A0_STATIC_INLINE_RECURSIVE
a0_err_t A0_MAKE_MSGERR(const char* fmt, ...) {
va_list args;
va_start(args, fmt);
if (fmt) {
vsnprintf(a0_err_msg, sizeof(a0_err_msg), fmt, args); // NOLINT(clang-analyzer-valist.Uninitialized): https://bugs.llvm.org/show_bug.cgi?id=41311
va_end(args);
return A0_ERR_CUSTOM_MSG;
}
va_end(args);
return A0_OK;
}
#define A0_RETURN_SYSERR_ON_MINUS_ONE(X) \
do { \
if ((X) == -1) { \
a0_err_syscode = errno; \
return A0_ERR_SYS; \
} \
} while (0)
#define A0_RETURN_ERR_ON_ERR(X) \
do { \
a0_err_t _err = (X); \
if (_err) { \
return _err; \
} \
} while (0)
#endif // A0_SRC_ERR_MACRO_H

View File

@ -0,0 +1,111 @@
#ifndef A0_SRC_FTX_H
#define A0_SRC_FTX_H
#include "a0/err.h"
#include "a0/inline.h"
#include "a0/time.h"
#include <limits.h>
#include <linux/futex.h>
#include <stdint.h>
#include <syscall.h>
#include <time.h>
#include <unistd.h>
#include "clock.h"
#include "err_macro.h"
#ifdef __cplusplus
extern "C" {
#endif
// FUTEX_WAIT and FUTEX_WAIT_REQUEUE_PI default to CLOCK_MONOTONIC,
// but FUTEX_LOCK_PI always uses CLOCK_REALTIME.
//
// Until someone tells me otherwise, I assume this is bad decision making
// and I will instead standardize all things on CLOCK_BOOTTIME.
// Futex.
// Operations rely on the address.
// It should not be copied or moved.
typedef uint32_t a0_ftx_t;
A0_STATIC_INLINE
a0_err_t a0_futex(a0_ftx_t* uaddr,
int futex_op,
int val,
uintptr_t timeout_or_val2,
a0_ftx_t* uaddr2,
int val3) {
A0_RETURN_SYSERR_ON_MINUS_ONE(syscall(SYS_futex, uaddr, futex_op, val, timeout_or_val2, uaddr2, val3));
return A0_OK;
}
A0_STATIC_INLINE
a0_err_t a0_ftx_wait(a0_ftx_t* ftx, int confirm_val, const a0_time_mono_t* time_mono) {
if (!time_mono) {
return a0_futex(ftx, FUTEX_WAIT, confirm_val, 0, NULL, 0);
}
timespec_t ts_mono;
A0_RETURN_ERR_ON_ERR(a0_clock_convert(CLOCK_BOOTTIME, time_mono->ts, CLOCK_MONOTONIC, &ts_mono));
return a0_futex(ftx, FUTEX_WAIT, confirm_val, (uintptr_t)&ts_mono, NULL, 0);
}
A0_STATIC_INLINE
a0_err_t a0_ftx_wake(a0_ftx_t* ftx, int cnt) {
return a0_futex(ftx, FUTEX_WAKE, cnt, 0, NULL, 0);
}
A0_STATIC_INLINE
a0_err_t a0_ftx_signal(a0_ftx_t* ftx) {
return a0_ftx_wake(ftx, 1);
}
A0_STATIC_INLINE
a0_err_t a0_ftx_broadcast(a0_ftx_t* ftx) {
return a0_ftx_wake(ftx, INT_MAX);
}
A0_STATIC_INLINE
a0_err_t a0_ftx_lock_pi(a0_ftx_t* ftx, const a0_time_mono_t* time_mono) {
if (!time_mono) {
return a0_futex(ftx, FUTEX_LOCK_PI, 0, 0, NULL, 0);
}
timespec_t ts_wall;
A0_RETURN_ERR_ON_ERR(a0_clock_convert(CLOCK_BOOTTIME, time_mono->ts, CLOCK_REALTIME, &ts_wall));
return a0_futex(ftx, FUTEX_LOCK_PI, 0, (uintptr_t)&ts_wall, NULL, 0);
}
A0_STATIC_INLINE
a0_err_t a0_ftx_trylock_pi(a0_ftx_t* ftx) {
return a0_futex(ftx, FUTEX_TRYLOCK_PI, 0, 0, NULL, 0);
}
A0_STATIC_INLINE
a0_err_t a0_ftx_unlock_pi(a0_ftx_t* ftx) {
return a0_futex(ftx, FUTEX_UNLOCK_PI, 0, 0, NULL, 0);
}
A0_STATIC_INLINE
a0_err_t a0_ftx_cmp_requeue_pi(a0_ftx_t* ftx, int confirm_val, a0_ftx_t* requeue_ftx, int max_requeue) {
return a0_futex(ftx, FUTEX_CMP_REQUEUE_PI, 1, max_requeue, requeue_ftx, confirm_val);
}
A0_STATIC_INLINE
a0_err_t a0_ftx_wait_requeue_pi(a0_ftx_t* ftx, int confirm_val, const a0_time_mono_t* time_mono, a0_ftx_t* requeue_ftx) {
if (!time_mono) {
return a0_futex(ftx, FUTEX_WAIT_REQUEUE_PI, confirm_val, 0, requeue_ftx, 0);
}
timespec_t ts_mono;
A0_RETURN_ERR_ON_ERR(a0_clock_convert(CLOCK_BOOTTIME, time_mono->ts, CLOCK_MONOTONIC, &ts_mono));
return a0_futex(ftx, FUTEX_WAIT_REQUEUE_PI, confirm_val, (uintptr_t)&ts_mono, requeue_ftx, 0);
}
#ifdef __cplusplus
}
#endif
#endif // A0_SRC_FTX_H

View File

@ -0,0 +1,8 @@
#ifndef A0_INLINE_H
#define A0_INLINE_H
#define A0_STATIC_INLINE static inline __attribute__((always_inline))
#define A0_STATIC_INLINE_RECURSIVE static inline
#endif // A0_INLINE_H

View File

@ -0,0 +1,420 @@
#include "a0/err.h"
#include "a0/inline.h"
#include "a0/mtx.h"
#include "a0/thread_local.h"
#include "a0/tid.h"
#include "a0/time.h"
#include <errno.h>
#include <limits.h>
#include <linux/futex.h>
#include <pthread.h>
#include <stdbool.h>
#include <stddef.h>
#include <stdint.h>
#include <syscall.h>
#include <time.h>
#include <unistd.h>
#include "atomic.h"
#include "clock.h"
#include "err_macro.h"
#include "ftx.h"
// TSAN is worth the pain of properly annotating our mutex.
// clang-format off
#if defined(__SANITIZE_THREAD__)
#define A0_TSAN_ENABLED
#elif defined(__has_feature)
#if __has_feature(thread_sanitizer)
#define A0_TSAN_ENABLED
#endif
#endif
// clang-format on
const unsigned __tsan_mutex_linker_init = 1 << 0;
const unsigned __tsan_mutex_write_reentrant = 1 << 1;
const unsigned __tsan_mutex_read_reentrant = 1 << 2;
const unsigned __tsan_mutex_not_static = 1 << 8;
const unsigned __tsan_mutex_read_lock = 1 << 3;
const unsigned __tsan_mutex_try_lock = 1 << 4;
const unsigned __tsan_mutex_try_lock_failed = 1 << 5;
const unsigned __tsan_mutex_recursive_lock = 1 << 6;
const unsigned __tsan_mutex_recursive_unlock = 1 << 7;
#ifdef A0_TSAN_ENABLED
void __tsan_mutex_create(void* addr, unsigned flags);
void __tsan_mutex_destroy(void* addr, unsigned flags);
void __tsan_mutex_pre_lock(void* addr, unsigned flags);
void __tsan_mutex_post_lock(void* addr, unsigned flags, int recursion);
int __tsan_mutex_pre_unlock(void* addr, unsigned flags);
void __tsan_mutex_post_unlock(void* addr, unsigned flags);
void __tsan_mutex_pre_signal(void* addr, unsigned flags);
void __tsan_mutex_post_signal(void* addr, unsigned flags);
void __tsan_mutex_pre_divert(void* addr, unsigned flags);
void __tsan_mutex_post_divert(void* addr, unsigned flags);
#else
#define _u_ __attribute__((unused))
A0_STATIC_INLINE void _u_ __tsan_mutex_create(_u_ void* addr, _u_ unsigned flags) {}
A0_STATIC_INLINE void _u_ __tsan_mutex_destroy(_u_ void* addr, _u_ unsigned flags) {}
A0_STATIC_INLINE void _u_ __tsan_mutex_pre_lock(_u_ void* addr, _u_ unsigned flags) {}
A0_STATIC_INLINE void _u_ __tsan_mutex_post_lock(_u_ void* addr,
_u_ unsigned flags,
_u_ int recursion) {}
A0_STATIC_INLINE int _u_ __tsan_mutex_pre_unlock(_u_ void* addr, _u_ unsigned flags) {
return 0;
}
A0_STATIC_INLINE void _u_ __tsan_mutex_post_unlock(_u_ void* addr, _u_ unsigned flags) {}
A0_STATIC_INLINE void _u_ __tsan_mutex_pre_signal(_u_ void* addr, _u_ unsigned flags) {}
A0_STATIC_INLINE void _u_ __tsan_mutex_post_signal(_u_ void* addr, _u_ unsigned flags) {}
A0_STATIC_INLINE void _u_ __tsan_mutex_pre_divert(_u_ void* addr, _u_ unsigned flags) {}
A0_STATIC_INLINE void _u_ __tsan_mutex_post_divert(_u_ void* addr, _u_ unsigned flags) {}
#endif
A0_THREAD_LOCAL bool a0_robust_init = false;
A0_STATIC_INLINE
void a0_robust_reset() {
a0_robust_init = 0;
}
A0_STATIC_INLINE
void a0_robust_reset_atfork() {
pthread_atfork(NULL, NULL, &a0_robust_reset);
}
static pthread_once_t a0_robust_reset_atfork_once;
typedef struct robust_list robust_list_t;
typedef struct robust_list_head robust_list_head_t;
A0_THREAD_LOCAL robust_list_head_t a0_robust_head;
A0_STATIC_INLINE
void robust_init() {
a0_robust_head.list.next = &a0_robust_head.list;
a0_robust_head.futex_offset = offsetof(a0_mtx_t, ftx);
a0_robust_head.list_op_pending = NULL;
syscall(SYS_set_robust_list, &a0_robust_head.list, sizeof(a0_robust_head));
}
A0_STATIC_INLINE
void init_thread() {
if (a0_robust_init) {
return;
}
pthread_once(&a0_robust_reset_atfork_once, a0_robust_reset_atfork);
robust_init();
a0_robust_init = true;
}
A0_STATIC_INLINE
void robust_op_start(a0_mtx_t* mtx) {
init_thread();
a0_robust_head.list_op_pending = (struct robust_list*)mtx;
a0_barrier();
}
A0_STATIC_INLINE
void robust_op_end(a0_mtx_t* mtx) {
(void)mtx;
a0_barrier();
a0_robust_head.list_op_pending = NULL;
}
A0_STATIC_INLINE
bool robust_is_head(a0_mtx_t* mtx) {
return mtx == (a0_mtx_t*)&a0_robust_head;
}
A0_STATIC_INLINE
void robust_op_add(a0_mtx_t* mtx) {
a0_mtx_t* old_first = (a0_mtx_t*)a0_robust_head.list.next;
mtx->prev = (a0_mtx_t*)&a0_robust_head;
mtx->next = old_first;
a0_barrier();
a0_robust_head.list.next = (robust_list_t*)mtx;
if (!robust_is_head(old_first)) {
old_first->prev = mtx;
}
}
A0_STATIC_INLINE
void robust_op_del(a0_mtx_t* mtx) {
a0_mtx_t* prev = mtx->prev;
a0_mtx_t* next = mtx->next;
prev->next = next;
if (!robust_is_head(next)) {
next->prev = prev;
}
}
A0_STATIC_INLINE
uint32_t ftx_tid(a0_ftx_t ftx) {
return ftx & FUTEX_TID_MASK;
}
A0_STATIC_INLINE
bool ftx_owner_died(a0_ftx_t ftx) {
return ftx & FUTEX_OWNER_DIED;
}
static const uint32_t FTX_NOTRECOVERABLE = FUTEX_TID_MASK | FUTEX_OWNER_DIED;
A0_STATIC_INLINE
bool ftx_notrecoverable(a0_ftx_t ftx) {
return (ftx & FTX_NOTRECOVERABLE) == FTX_NOTRECOVERABLE;
}
A0_STATIC_INLINE
a0_err_t a0_mtx_timedlock_robust(a0_mtx_t* mtx, const a0_time_mono_t* timeout) {
const uint32_t tid = a0_tid();
int syserr = EINTR;
while (syserr == EINTR) {
// Can't lock if borked.
if (ftx_notrecoverable(a0_atomic_load(&mtx->ftx))) {
return A0_MAKE_SYSERR(ENOTRECOVERABLE);
}
// Try to lock without kernel involvement.
if (a0_cas(&mtx->ftx, 0, tid)) {
return A0_OK;
}
// Ask the kernel to lock.
syserr = A0_SYSERR(a0_ftx_lock_pi(&mtx->ftx, timeout));
}
if (!syserr) {
if (ftx_owner_died(a0_atomic_load(&mtx->ftx))) {
return A0_MAKE_SYSERR(EOWNERDEAD);
}
return A0_OK;
}
return A0_MAKE_SYSERR(syserr);
}
A0_STATIC_INLINE
a0_err_t a0_mtx_timedlock_impl(a0_mtx_t* mtx, const a0_time_mono_t* timeout) {
// Note: __tsan_mutex_pre_lock should come here, but tsan doesn't provide
// a way to "fail" a lock. Only a trylock.
robust_op_start(mtx);
const a0_err_t err = a0_mtx_timedlock_robust(mtx, timeout);
if (!err || A0_SYSERR(err) == EOWNERDEAD) {
__tsan_mutex_pre_lock(mtx, 0);
robust_op_add(mtx);
__tsan_mutex_post_lock(mtx, 0, 0);
}
robust_op_end(mtx);
return err;
}
a0_err_t a0_mtx_timedlock(a0_mtx_t* mtx, a0_time_mono_t timeout) {
return a0_mtx_timedlock_impl(mtx, &timeout);
}
a0_err_t a0_mtx_lock(a0_mtx_t* mtx) {
return a0_mtx_timedlock_impl(mtx, NULL);
}
A0_STATIC_INLINE
a0_err_t a0_mtx_trylock_impl(a0_mtx_t* mtx) {
const uint32_t tid = a0_tid();
// Try to lock without kernel involvement.
uint32_t old = a0_cas_val(&mtx->ftx, 0, tid);
// Did it work?
if (!old) {
robust_op_add(mtx);
return A0_OK;
}
// Is the lock still usable?
if (ftx_notrecoverable(old)) {
return A0_MAKE_SYSERR(ENOTRECOVERABLE);
}
// Is the owner still alive?
if (!ftx_owner_died(old)) {
return A0_MAKE_SYSERR(EBUSY);
}
// Oh, the owner died. Ask the kernel to fix the state.
a0_err_t err = a0_ftx_trylock_pi(&mtx->ftx);
if (!err) {
robust_op_add(mtx);
if (ftx_owner_died(a0_atomic_load(&mtx->ftx))) {
return A0_MAKE_SYSERR(EOWNERDEAD);
}
return A0_OK;
}
// EAGAIN means that somebody else beat us to it.
// Anything else means we're borked.
if (A0_SYSERR(err) == EAGAIN) {
return A0_MAKE_SYSERR(EBUSY);
}
return A0_MAKE_SYSERR(ENOTRECOVERABLE);
}
a0_err_t a0_mtx_trylock(a0_mtx_t* mtx) {
__tsan_mutex_pre_lock(mtx, __tsan_mutex_try_lock);
robust_op_start(mtx);
a0_err_t err = a0_mtx_trylock_impl(mtx);
robust_op_end(mtx);
if (!err || A0_SYSERR(err) == EOWNERDEAD) {
__tsan_mutex_post_lock(mtx, __tsan_mutex_try_lock, 0);
} else {
__tsan_mutex_post_lock(mtx, __tsan_mutex_try_lock | __tsan_mutex_try_lock_failed, 0);
}
return err;
}
a0_err_t a0_mtx_consistent(a0_mtx_t* mtx) {
const uint32_t val = a0_atomic_load(&mtx->ftx);
// Why fix what isn't broken?
if (!ftx_owner_died(val)) {
return A0_MAKE_SYSERR(EINVAL);
}
// Is it yours to fix?
if (ftx_tid(val) != a0_tid()) {
return A0_MAKE_SYSERR(EPERM);
}
// Fix it!
a0_atomic_and_fetch(&mtx->ftx, ~FUTEX_OWNER_DIED);
return A0_OK;
}
a0_err_t a0_mtx_unlock(a0_mtx_t* mtx) {
const uint32_t tid = a0_tid();
const uint32_t val = a0_atomic_load(&mtx->ftx);
// Only the owner can unlock.
if (ftx_tid(val) != tid) {
return A0_MAKE_SYSERR(EPERM);
}
__tsan_mutex_pre_unlock(mtx, 0);
// If the mutex was acquired with EOWNERDEAD, the caller is responsible
// for fixing the state and marking the mutex consistent. If they did
// not mark it consistent and are unlocking... then we are unrecoverably
// borked!
uint32_t new_val = 0;
if (ftx_owner_died(val)) {
new_val = FTX_NOTRECOVERABLE;
}
robust_op_start(mtx);
robust_op_del(mtx);
// If the futex is exactly equal to tid, then there are no waiters and the
// kernel doesn't need to get involved.
if (!a0_cas(&mtx->ftx, tid, new_val)) {
// Ask the kernel to wake up a waiter.
a0_ftx_unlock_pi(&mtx->ftx);
if (new_val) {
a0_atomic_or_fetch(&mtx->ftx, new_val);
}
}
robust_op_end(mtx);
__tsan_mutex_post_unlock(mtx, 0);
return A0_OK;
}
// TODO(lshamis): Handle ENOTRECOVERABLE
A0_STATIC_INLINE
a0_err_t a0_cnd_timedwait_impl(a0_cnd_t* cnd, a0_mtx_t* mtx, const a0_time_mono_t* timeout) {
const uint32_t init_cnd = a0_atomic_load(cnd);
// Unblock other threads to do the things that will eventually signal this wait.
a0_err_t err = a0_mtx_unlock(mtx);
if (err) {
return err;
}
__tsan_mutex_pre_lock(mtx, 0);
robust_op_start(mtx);
do {
// Priority-inheritance-aware wait until awoken or timeout.
err = a0_ftx_wait_requeue_pi(cnd, init_cnd, timeout, &mtx->ftx);
} while (A0_SYSERR(err) == EINTR);
// We need to manually lock on timeout.
// Note: We keep the timeout error.
if (A0_SYSERR(err) == ETIMEDOUT) {
a0_mtx_timedlock_robust(mtx, NULL);
}
// Someone else grabbed and mutated the resource between the unlock and wait.
// No need to wait.
if (A0_SYSERR(err) == EAGAIN) {
err = a0_mtx_timedlock_robust(mtx, NULL);
}
robust_op_add(mtx);
// If no higher priority error, check the previous owner didn't die.
if (!err) {
err = ftx_owner_died(a0_atomic_load(&mtx->ftx)) ? EOWNERDEAD : A0_OK;
}
robust_op_end(mtx);
__tsan_mutex_post_lock(mtx, 0, 0);
return err;
}
a0_err_t a0_cnd_timedwait(a0_cnd_t* cnd, a0_mtx_t* mtx, a0_time_mono_t timeout) {
// Let's not unlock the mutex if we're going to get EINVAL due to a bad timeout.
if ((timeout.ts.tv_sec < 0 || timeout.ts.tv_nsec < 0 || (!timeout.ts.tv_sec && !timeout.ts.tv_nsec) || timeout.ts.tv_nsec >= NS_PER_SEC)) {
return A0_MAKE_SYSERR(EINVAL);
}
return a0_cnd_timedwait_impl(cnd, mtx, &timeout);
}
a0_err_t a0_cnd_wait(a0_cnd_t* cnd, a0_mtx_t* mtx) {
return a0_cnd_timedwait_impl(cnd, mtx, NULL);
}
A0_STATIC_INLINE
a0_err_t a0_cnd_wake(a0_cnd_t* cnd, a0_mtx_t* mtx, uint32_t cnt) {
uint32_t val = a0_atomic_add_fetch(cnd, 1);
while (true) {
a0_err_t err = a0_ftx_cmp_requeue_pi(cnd, val, &mtx->ftx, cnt);
if (A0_SYSERR(err) != EAGAIN) {
return err;
}
// Another thread is also trying to wake this condition variable.
val = a0_atomic_load(cnd);
}
}
a0_err_t a0_cnd_signal(a0_cnd_t* cnd, a0_mtx_t* mtx) {
return a0_cnd_wake(cnd, mtx, 1);
}
a0_err_t a0_cnd_broadcast(a0_cnd_t* cnd, a0_mtx_t* mtx) {
return a0_cnd_wake(cnd, mtx, INT_MAX);
}

View File

@ -0,0 +1,57 @@
#ifndef A0_MTX_H
#define A0_MTX_H
#include "a0/err.h"
#include "a0/time.h"
#include "a0/unused.h"
#ifdef __cplusplus
extern "C" {
#endif
typedef uint32_t a0_ftx_t;
// https://stackoverflow.com/questions/61645966/is-typedef-allowed-before-definition
struct a0_mtx_s;
typedef struct a0_mtx_s a0_mtx_t;
// Mutex implementation designed for IPC.
//
// Similar to pthread_mutex_t with the following flags fixed:
// * Process shared.
// * Robust.
// * Error checking.
// * Priority inheriting.
//
// Unlike pthread_mutex_t, timespec are expected to use CLOCK_BOOTTIME.
//
// struct a0_mtx_s "Inherits" from robust_list, which requires:
// * The first field MUST be a next pointer.
// * There must be a futex, which makes the mutex immovable.
//
// Note: a mutex MUST be unlocked before being freed or unmapped.
struct a0_mtx_s {
a0_mtx_t* next;
a0_mtx_t* prev;
a0_ftx_t ftx;
};
a0_err_t a0_mtx_lock(a0_mtx_t*) A0_WARN_UNUSED_RESULT;
a0_err_t a0_mtx_timedlock(a0_mtx_t*, a0_time_mono_t) A0_WARN_UNUSED_RESULT;
a0_err_t a0_mtx_trylock(a0_mtx_t*) A0_WARN_UNUSED_RESULT;
a0_err_t a0_mtx_consistent(a0_mtx_t*);
a0_err_t a0_mtx_unlock(a0_mtx_t*);
typedef a0_ftx_t a0_cnd_t;
a0_err_t a0_cnd_wait(a0_cnd_t*, a0_mtx_t*);
a0_err_t a0_cnd_timedwait(a0_cnd_t*, a0_mtx_t*, a0_time_mono_t);
a0_err_t a0_cnd_signal(a0_cnd_t*, a0_mtx_t*);
a0_err_t a0_cnd_broadcast(a0_cnd_t*, a0_mtx_t*);
#ifdef __cplusplus
}
#endif
#endif // A0_MTX_H

View File

@ -0,0 +1,64 @@
#include "strconv.h"
#include "a0/err.h"
#include <string.h>
static const char DECIMAL_DIGITS[] =
"00010203040506070809"
"10111213141516171819"
"20212223242526272829"
"30313233343536373839"
"40414243444546474849"
"50515253545556575859"
"60616263646566676869"
"70717273747576777879"
"80818283848586878889"
"90919293949596979899";
a0_err_t a0_u32_to_str(uint32_t val, char* buf_start, char* buf_end, char** start_ptr) {
return a0_u64_to_str(val, buf_start, buf_end, start_ptr);
}
a0_err_t a0_u64_to_str(uint64_t val, char* buf_start, char* buf_end, char** start_ptr) {
uint64_t orig_val = val;
char* ptr = buf_end;
while (val >= 10) {
ptr -= 2;
memcpy(ptr, &DECIMAL_DIGITS[2 * (val % 100)], sizeof(uint16_t));
val /= 100;
}
if (val) {
*--ptr = (char)('0' + val);
}
memset(buf_start, '0', ptr - buf_start);
ptr -= (!orig_val);
if (start_ptr) {
*start_ptr = ptr;
}
return A0_OK;
}
a0_err_t a0_str_to_u32(const char* start, const char* end, uint32_t* out) {
*out = 0;
for (const char* ptr = start; ptr < end; ptr++) {
if (*ptr < '0' || *ptr > '9') {
return A0_ERR_INVALID_ARG;
}
*out *= 10;
*out += *ptr - '0';
}
return A0_OK;
}
a0_err_t a0_str_to_u64(const char* start, const char* end, uint64_t* out) {
*out = 0;
for (const char* ptr = start; ptr < end; ptr++) {
if (*ptr < '0' || *ptr > '9') {
return A0_ERR_INVALID_ARG;
}
*out *= 10;
*out += *ptr - '0';
}
return A0_OK;
}

View File

@ -0,0 +1,31 @@
#ifndef A0_SRC_STRCONV_H
#define A0_SRC_STRCONV_H
#include "a0/err.h"
#include <stdint.h>
#ifdef __cplusplus
extern "C" {
#endif
// Converts a uint32 or uint64 to string.
// The entire buffer will be populated, if not with given the value, then with '0'.
// Populates the buffer from the back.
// start_ptr, if not null, will be set to the point within the buffer where the number starts.
// Does NOT check for overflow.
a0_err_t a0_u32_to_str(uint32_t val, char* buf_start, char* buf_end, char** start_ptr);
a0_err_t a0_u64_to_str(uint64_t val, char* buf_start, char* buf_end, char** start_ptr);
// Converts a string to uint32 or uint64.
// The string may have leading '0's.
// Returns A0_ERR_INVALID_ARG if any character is not a digit.
// Does NOT check for overflow.
a0_err_t a0_str_to_u32(const char* start, const char* end, uint32_t* out);
a0_err_t a0_str_to_u64(const char* start, const char* end, uint64_t* out);
#ifdef __cplusplus
}
#endif
#endif // A0_SRC_STRCONV_H

View File

@ -0,0 +1,10 @@
#ifndef A0_THREAD_LOCAL_H
#define A0_THREAD_LOCAL_H
#ifdef __cplusplus
#define A0_THREAD_LOCAL thread_local
#else
#define A0_THREAD_LOCAL _Thread_local
#endif // __cplusplus
#endif // A0_THREAD_LOCAL_H

View File

@ -0,0 +1,30 @@
#include "a0/inline.h"
#include "a0/thread_local.h"
#include "a0/tid.h"
#include <pthread.h>
#include <stddef.h>
#include <stdint.h>
#include <syscall.h>
#include <unistd.h>
A0_THREAD_LOCAL uint32_t a0_tid_cache = 0;
static pthread_once_t a0_tid_reset_atfork_once;
A0_STATIC_INLINE
void a0_tid_reset() {
a0_tid_cache = 0;
}
A0_STATIC_INLINE
void a0_tid_reset_atfork() {
pthread_atfork(NULL, NULL, &a0_tid_reset);
}
uint32_t a0_tid() {
if (!a0_tid_cache) {
a0_tid_cache = syscall(SYS_gettid);
pthread_once(&a0_tid_reset_atfork_once, a0_tid_reset_atfork);
}
return a0_tid_cache;
}

View File

@ -0,0 +1,16 @@
#ifndef A0_TID_H
#define A0_TID_H
#include <stdint.h>
#ifdef __cplusplus
extern "C" {
#endif
uint32_t a0_tid();
#ifdef __cplusplus
}
#endif
#endif // A0_TID_H

View File

@ -0,0 +1,124 @@
#include "a0/empty.h"
#include "a0/err.h"
#include "a0/time.h"
#include <stdint.h>
#include <stdio.h>
#include <string.h>
#include <time.h>
#include "clock.h"
#include "err_macro.h"
#include "strconv.h"
const char A0_TIME_MONO[] = "a0_time_mono";
a0_err_t a0_time_mono_now(a0_time_mono_t* out) {
return a0_clock_now(CLOCK_BOOTTIME, &out->ts);
}
a0_err_t a0_time_mono_str(a0_time_mono_t time_mono, char mono_str[20]) {
// Mono time as unsigned integer with up to 20 chars: "18446744072709551615"
uint64_t ns = time_mono.ts.tv_sec * NS_PER_SEC + time_mono.ts.tv_nsec;
mono_str[19] = '\0';
return a0_u64_to_str(ns, mono_str, mono_str + 19, NULL);
}
a0_err_t a0_time_mono_parse(const char mono_str[20], a0_time_mono_t* out) {
uint64_t ns;
A0_RETURN_ERR_ON_ERR(a0_str_to_u64(mono_str, mono_str + 19, &ns));
out->ts.tv_sec = ns / NS_PER_SEC;
out->ts.tv_nsec = ns % NS_PER_SEC;
return A0_OK;
}
a0_err_t a0_time_mono_add(a0_time_mono_t time_mono, int64_t add_nsec, a0_time_mono_t* out) {
return a0_clock_add(time_mono.ts, add_nsec, &out->ts);
}
const char A0_TIME_WALL[] = "a0_time_wall";
a0_err_t a0_time_wall_now(a0_time_wall_t* out) {
A0_RETURN_SYSERR_ON_MINUS_ONE(clock_gettime(CLOCK_REALTIME, &out->ts));
return A0_OK;
}
a0_err_t a0_time_wall_str(a0_time_wall_t wall_time, char wall_str[36]) {
// Wall time in RFC 3999 Nano: "2006-01-02T15:04:05.999999999-07:00"
struct tm wall_tm;
gmtime_r(&wall_time.ts.tv_sec, &wall_tm);
strftime(&wall_str[0], 20, "%Y-%m-%dT%H:%M:%S", &wall_tm);
snprintf(&wall_str[19], 17, ".%09ld-00:00", wall_time.ts.tv_nsec);
wall_str[35] = '\0';
return A0_OK;
}
a0_err_t a0_time_wall_parse(const char wall_str[36], a0_time_wall_t* out) {
// strptime requires _GNU_SOURCE, which we don't want, so we do it our selves.
// Hard code "%Y-%m-%dT%H:%M:%S" + ".%09ld-00:00" pattern.
struct tm wall_tm = A0_EMPTY;
// %Y
A0_RETURN_ERR_ON_ERR(a0_str_to_u32(wall_str + 0, wall_str + 4, (uint32_t*)&wall_tm.tm_year));
wall_tm.tm_year -= 1900;
// -
if (wall_str[4] != '-') {
return A0_ERR_INVALID_ARG;
}
// %m
A0_RETURN_ERR_ON_ERR(a0_str_to_u32(wall_str + 5, wall_str + 7, (uint32_t*)&wall_tm.tm_mon));
if (wall_tm.tm_mon < 1 || wall_tm.tm_mon > 12) {
return A0_ERR_INVALID_ARG;
}
wall_tm.tm_mon--;
// -
if (wall_str[7] != '-') {
return A0_ERR_INVALID_ARG;
}
// %d
A0_RETURN_ERR_ON_ERR(a0_str_to_u32(wall_str + 8, wall_str + 10, (uint32_t*)&wall_tm.tm_mday));
if (wall_tm.tm_mday < 1 || wall_tm.tm_mday > 31) {
return A0_ERR_INVALID_ARG;
}
// T
if (wall_str[10] != 'T') {
return A0_ERR_INVALID_ARG;
}
// %H
A0_RETURN_ERR_ON_ERR(a0_str_to_u32(wall_str + 11, wall_str + 13, (uint32_t*)&wall_tm.tm_hour));
if (wall_tm.tm_hour > 24) {
return A0_ERR_INVALID_ARG;
}
// :
if (wall_str[13] != ':') {
return A0_ERR_INVALID_ARG;
}
// %M
A0_RETURN_ERR_ON_ERR(a0_str_to_u32(wall_str + 14, wall_str + 16, (uint32_t*)&wall_tm.tm_min));
if (wall_tm.tm_min > 60) {
return A0_ERR_INVALID_ARG;
}
// :
if (wall_str[16] != ':') {
return A0_ERR_INVALID_ARG;
}
// %S
A0_RETURN_ERR_ON_ERR(a0_str_to_u32(wall_str + 17, wall_str + 19, (uint32_t*)&wall_tm.tm_sec));
if (wall_tm.tm_sec > 61) {
return A0_ERR_INVALID_ARG;
}
// .
if (wall_str[19] != '.') {
return A0_ERR_INVALID_ARG;
}
if (memcmp(wall_str + 29, "-00:00", 6) != 0) {
return A0_ERR_INVALID_ARG;
}
// Use timegm, cause it's a pain to compute months/years to seconds.
out->ts.tv_sec = timegm(&wall_tm);
return a0_str_to_u64(wall_str + 20, wall_str + 29, (uint64_t*)&out->ts.tv_nsec);
}

View File

@ -0,0 +1,97 @@
/**
* \file time.h
* \rst
*
* Mono Time
* ---------
*
* | Mono time is a number of nanoseconds from machine boottime.
* | This time cannot decrease and duration between ticks is constant.
* | This time is not related to wall clock time.
* | This time is most suitable for measuring durations.
* |
* | As a string, it is represented as a 20 char number:
* | **18446744072709551615**
* |
* | Note that this uses CLOCK_BOOTTIME under the hood, not CLOCK_MONOTONIC.
*
* Wall Time
* ---------
*
* | Wall time is an time object representing human-readable wall clock time.
* | This time can decrease and duration between ticks is not constant.
* | This time is most related to wall clock time.
* | This time is not suitable for measuring durations.
* |
* | As a string, it is represented as a 36 char RFC 3999 Nano / ISO 8601:
* | **2006-01-02T15:04:05.999999999-07:00**
*
* \endrst
*/
#ifndef A0_TIME_H
#define A0_TIME_H
#include "a0/err.h"
#include <stdint.h>
#include <time.h>
#ifdef __cplusplus
extern "C" {
#endif
/** \addtogroup TIME_MONO
* @{
*/
/// Header key for mono timestamps.
extern const char A0_TIME_MONO[];
/// Monotonic timestamp. Despite the name, uses CLOCK_BOOTTIME.
typedef struct a0_time_mono_s {
struct timespec ts;
} a0_time_mono_t;
/// Get the current mono timestamps.
a0_err_t a0_time_mono_now(a0_time_mono_t*);
/// Stringify a given mono timestamps.
a0_err_t a0_time_mono_str(a0_time_mono_t, char mono_str[20]);
/// Parse a stringified mono timestamps.
a0_err_t a0_time_mono_parse(const char mono_str[20], a0_time_mono_t*);
/// Add a duration in nanoseconds to a mono timestamp.
a0_err_t a0_time_mono_add(a0_time_mono_t, int64_t add_nsec, a0_time_mono_t*);
/** @}*/
/** \addtogroup TIME_WALL
* @{
*/
/// Header key for wall timestamps.
extern const char A0_TIME_WALL[];
/// Wall clock timestamp.
typedef struct a0_time_wall_s {
struct timespec ts;
} a0_time_wall_t;
/// Get the current wall timestamps.
a0_err_t a0_time_wall_now(a0_time_wall_t*);
/// Stringify a given wall timestamps.
a0_err_t a0_time_wall_str(a0_time_wall_t, char wall_str[36]);
/// Parse a stringified wall timestamps.
a0_err_t a0_time_wall_parse(const char wall_str[36], a0_time_wall_t*);
/** @}*/
#ifdef __cplusplus
}
#endif
#endif // A0_TRANSPORT_H

View File

@ -0,0 +1,7 @@
#ifndef A0_UNUSED_H
#define A0_UNUSED_H
#define A0_WARN_UNUSED_RESULT __attribute__((warn_unused_result))
#define A0_MAYBE_UNUSED(X) ((void)(X))
#endif // A0_UNUSED_H

View File

@ -0,0 +1,66 @@
#pragma once
#include "libipc/utility/log.h"
#include "libipc/mutex.h"
#include "get_wait_time.h"
#include "sync_obj_impl.h"
#include "a0/err_macro.h"
#include "a0/mtx.h"
namespace ipc {
namespace detail {
namespace sync {
class condition : public sync::obj_impl<a0_cnd_t> {
public:
condition() = default;
~condition() = default;
bool wait(ipc::sync::mutex &mtx, std::uint64_t tm) noexcept {
if (!valid()) return false;
if (tm == invalid_value) {
int eno = A0_SYSERR(a0_cnd_wait(native(), static_cast<a0_mtx_t *>(mtx.native())));
if (eno != 0) {
ipc::error("fail condition wait[%d]\n", eno);
return false;
}
} else {
auto ts = detail::make_timespec(tm);
int eno = A0_SYSERR(a0_cnd_timedwait(native(), static_cast<a0_mtx_t *>(mtx.native()), {ts}));
if (eno != 0) {
if (eno != ETIMEDOUT) {
ipc::error("fail condition timedwait[%d]: tm = %zd, tv_sec = %ld, tv_nsec = %ld\n",
eno, tm, ts.tv_sec, ts.tv_nsec);
}
return false;
}
}
return true;
}
bool notify(ipc::sync::mutex &mtx) noexcept {
if (!valid()) return false;
int eno = A0_SYSERR(a0_cnd_signal(native(), static_cast<a0_mtx_t *>(mtx.native())));
if (eno != 0) {
ipc::error("fail condition notify[%d]\n", eno);
return false;
}
return true;
}
bool broadcast(ipc::sync::mutex &mtx) noexcept {
if (!valid()) return false;
int eno = A0_SYSERR(a0_cnd_broadcast(native(), static_cast<a0_mtx_t *>(mtx.native())));
if (eno != 0) {
ipc::error("fail condition broadcast[%d]\n", eno);
return false;
}
return true;
}
};
} // namespace sync
} // namespace detail
} // namespace ipc

View File

@ -0,0 +1,46 @@
#pragma once
#include <cstdint>
#include <cinttypes>
#include <system_error>
#include "libipc/utility/log.h"
#include "a0/time.h"
#include "a0/err_macro.h"
namespace ipc {
namespace detail {
inline bool calc_wait_time(timespec &ts, std::uint64_t tm /*ms*/) noexcept {
std::int64_t add_ns = static_cast<std::int64_t>(tm * 1000000ull);
if (add_ns < 0) {
ipc::error("invalid time = " PRIu64 "\n", tm);
return false;
}
a0_time_mono_t now;
int eno = A0_SYSERR(a0_time_mono_now(&now));
if (eno != 0) {
ipc::error("fail get time[%d]\n", eno);
return false;
}
a0_time_mono_t *target = reinterpret_cast<a0_time_mono_t *>(&ts);
if ((eno = A0_SYSERR(a0_time_mono_add(now, add_ns, target))) != 0) {
ipc::error("fail get time[%d]\n", eno);
return false;
}
return true;
}
inline timespec make_timespec(std::uint64_t tm /*ms*/) noexcept(false) {
timespec ts {};
if (!calc_wait_time(ts, tm)) {
ipc::error("fail calc_wait_time: tm = %zd, tv_sec = %ld, tv_nsec = %ld\n",
tm, ts.tv_sec, ts.tv_nsec);
throw std::system_error{static_cast<int>(errno), std::system_category()};
}
return ts;
}
} // namespace detail
} // namespace ipc

View File

@ -0,0 +1,204 @@
#pragma once
#include <cstdint>
#include <system_error>
#include <mutex>
#include <atomic>
#include "libipc/platform/detail.h"
#include "libipc/utility/log.h"
#include "libipc/memory/resource.h"
#include "libipc/shm.h"
#include "get_wait_time.h"
#include "sync_obj_impl.h"
#include "a0/err_macro.h"
#include "a0/mtx.h"
namespace ipc {
namespace detail {
namespace sync {
class robust_mutex : public sync::obj_impl<a0_mtx_t> {
public:
bool lock(std::uint64_t tm) noexcept {
if (!valid()) return false;
for (;;) {
auto ts = detail::make_timespec(tm);
int eno = A0_SYSERR(
(tm == invalid_value) ? a0_mtx_lock(native())
: a0_mtx_timedlock(native(), {ts}));
switch (eno) {
case 0:
return true;
case ETIMEDOUT:
return false;
case EOWNERDEAD: {
int eno2 = A0_SYSERR(a0_mtx_consistent(native()));
if (eno2 != 0) {
ipc::error("fail mutex lock[%d] -> consistent[%d]\n", eno, eno2);
return false;
}
int eno3 = A0_SYSERR(a0_mtx_unlock(native()));
if (eno3 != 0) {
ipc::error("fail mutex lock[%d] -> unlock[%d]\n", eno, eno3);
return false;
}
}
break; // loop again
default:
ipc::error("fail mutex lock[%d]\n", eno);
return false;
}
}
}
bool try_lock() noexcept(false) {
if (!valid()) return false;
int eno = A0_SYSERR(a0_mtx_timedlock(native(), {detail::make_timespec(0)}));
switch (eno) {
case 0:
return true;
case ETIMEDOUT:
return false;
case EOWNERDEAD: {
int eno2 = A0_SYSERR(a0_mtx_consistent(native()));
if (eno2 != 0) {
ipc::error("fail mutex try_lock[%d] -> consistent[%d]\n", eno, eno2);
break;
}
int eno3 = A0_SYSERR(a0_mtx_unlock(native()));
if (eno3 != 0) {
ipc::error("fail mutex try_lock[%d] -> unlock[%d]\n", eno, eno3);
break;
}
}
break;
default:
ipc::error("fail mutex try_lock[%d]\n", eno);
break;
}
throw std::system_error{eno, std::system_category()};
}
bool unlock() noexcept {
if (!valid()) return false;
int eno = A0_SYSERR(a0_mtx_unlock(native()));
if (eno != 0) {
ipc::error("fail mutex unlock[%d]\n", eno);
return false;
}
return true;
}
};
class mutex {
robust_mutex *mutex_ = nullptr;
std::atomic<std::int32_t> *ref_ = nullptr;
struct curr_prog {
struct shm_data {
robust_mutex mtx;
std::atomic<std::int32_t> ref;
struct init {
char const *name;
};
shm_data(init arg)
: mtx{}, ref{0} { mtx.open(arg.name); }
};
ipc::map<ipc::string, shm_data> mutex_handles;
std::mutex lock;
static curr_prog &get() {
static curr_prog info;
return info;
}
};
void acquire_mutex(char const *name) {
if (name == nullptr) {
return;
}
auto &info = curr_prog::get();
IPC_UNUSED_ std::lock_guard<std::mutex> guard {info.lock};
auto it = info.mutex_handles.find(name);
if (it == info.mutex_handles.end()) {
it = curr_prog::get().mutex_handles.emplace(name,
curr_prog::shm_data::init{name}).first;
}
mutex_ = &it->second.mtx;
ref_ = &it->second.ref;
}
template <typename F>
void release_mutex(ipc::string const &name, F &&clear) {
if (name.empty()) return;
IPC_UNUSED_ std::lock_guard<std::mutex> guard {curr_prog::get().lock};
auto it = curr_prog::get().mutex_handles.find(name);
if (it == curr_prog::get().mutex_handles.end()) {
return;
}
if (clear()) {
curr_prog::get().mutex_handles.erase(it);
}
}
public:
mutex() = default;
~mutex() = default;
a0_mtx_t const *native() const noexcept {
return valid() ? mutex_->native() : nullptr;
}
a0_mtx_t *native() noexcept {
return valid() ? mutex_->native() : nullptr;
}
bool valid() const noexcept {
return (mutex_ != nullptr) && (ref_ != nullptr) && mutex_->valid();
}
bool open(char const *name) noexcept {
close();
acquire_mutex(name);
if (!valid()) {
return false;
}
ref_->fetch_add(1, std::memory_order_relaxed);
return true;
}
void close() noexcept {
if ((mutex_ != nullptr) && (ref_ != nullptr)) {
if (mutex_->name() != nullptr) {
release_mutex(mutex_->name(), [this] {
return ref_->fetch_sub(1, std::memory_order_relaxed) <= 1;
});
} else mutex_->close();
}
mutex_ = nullptr;
ref_ = nullptr;
}
bool lock(std::uint64_t tm) noexcept {
if (!valid()) return false;
return mutex_->lock(tm);
}
bool try_lock() noexcept(false) {
if (!valid()) return false;
return mutex_->try_lock();
}
bool unlock() noexcept {
if (!valid()) return false;
return mutex_->unlock();
}
};
} // namespace sync
} // namespace detail
} // namespace ipc

View File

@ -0,0 +1,69 @@
#pragma once
#include "libipc/utility/log.h"
#include "libipc/shm.h"
#include "a0/empty.h"
namespace ipc {
namespace detail {
namespace sync {
template <typename SyncT>
class obj_impl {
public:
using sync_t = SyncT;
protected:
ipc::shm::handle shm_;
sync_t *h_ = nullptr;
sync_t *acquire_handle(char const *name) {
if (!shm_.acquire(name, sizeof(sync_t))) {
ipc::error("[acquire_handle] fail shm.acquire: %s\n", name);
return nullptr;
}
return static_cast<sync_t *>(shm_.get());
}
public:
obj_impl() = default;
~obj_impl() = default;
sync_t const *native() const noexcept {
return h_;
}
sync_t *native() noexcept {
return h_;
}
char const *name() const noexcept {
return shm_.name();
}
bool valid() const noexcept {
return h_ != nullptr;
}
bool open(char const *name) noexcept {
close();
if ((h_ = acquire_handle(name)) == nullptr) {
return false;
}
if (shm_.ref() > 1) {
return true;
}
*h_ = A0_EMPTY;
return true;
}
void close() noexcept {
shm_.release();
h_ = nullptr;
}
};
} // namespace sync
} // namespace detail
} // namespace ipc

View File

@ -0,0 +1,13 @@
#include "libipc/platform/detail.h"
#if defined(IPC_OS_WINDOWS_)
#elif defined(IPC_OS_LINUX_)
#include "libipc/platform/linux/a0/err.c"
#include "libipc/platform/linux/a0/mtx.c"
#include "libipc/platform/linux/a0/strconv.c"
#include "libipc/platform/linux/a0/tid.c"
#include "libipc/platform/linux/a0/time.c"
#elif defined(IPC_OS_QNX_)
#else/*IPC_OS*/
# error "Unsupported platform."
#endif

View File

@ -0,0 +1,9 @@
#include "libipc/platform/detail.h"
#if defined(IPC_OS_WINDOWS_)
#include "libipc/platform/win/shm_win.cpp"
#elif defined(IPC_OS_LINUX_) || defined(IPC_OS_QNX_)
#include "libipc/platform/posix/shm_posix.cpp"
#else/*IPC_OS*/
# error "Unsupported platform."
#endif

View File

@ -5,12 +5,13 @@
#include <pthread.h> #include <pthread.h>
#include "libipc/platform/get_wait_time.h"
#include "libipc/utility/log.h" #include "libipc/utility/log.h"
#include "libipc/utility/scope_guard.h" #include "libipc/utility/scope_guard.h"
#include "libipc/mutex.h" #include "libipc/mutex.h"
#include "libipc/shm.h" #include "libipc/shm.h"
#include "get_wait_time.h"
namespace ipc { namespace ipc {
namespace detail { namespace detail {
namespace sync { namespace sync {
@ -62,7 +63,7 @@ public:
ipc::error("fail pthread_condattr_init[%d]\n", eno); ipc::error("fail pthread_condattr_init[%d]\n", eno);
return false; return false;
} }
IPC_UNUSED_ auto guard_cond_attr = unique_ptr(&cond_attr, ::pthread_condattr_destroy); IPC_UNUSED_ auto guard_cond_attr = guard([&cond_attr] { ::pthread_condattr_destroy(&cond_attr); });
if ((eno = ::pthread_condattr_setpshared(&cond_attr, PTHREAD_PROCESS_SHARED)) != 0) { if ((eno = ::pthread_condattr_setpshared(&cond_attr, PTHREAD_PROCESS_SHARED)) != 0) {
ipc::error("fail pthread_condattr_setpshared[%d]\n", eno); ipc::error("fail pthread_condattr_setpshared[%d]\n", eno);
return false; return false;
@ -114,7 +115,7 @@ public:
return true; return true;
} }
bool notify() noexcept { bool notify(ipc::sync::mutex &) noexcept {
if (!valid()) return false; if (!valid()) return false;
int eno; int eno;
if ((eno = ::pthread_cond_signal(cond_)) != 0) { if ((eno = ::pthread_cond_signal(cond_)) != 0) {
@ -124,7 +125,7 @@ public:
return true; return true;
} }
bool broadcast() noexcept { bool broadcast(ipc::sync::mutex &) noexcept {
if (!valid()) return false; if (!valid()) return false;
int eno; int eno;
if ((eno = ::pthread_cond_broadcast(cond_)) != 0) { if ((eno = ::pthread_cond_broadcast(cond_)) != 0) {

View File

@ -9,13 +9,14 @@
#include <pthread.h> #include <pthread.h>
#include "libipc/platform/get_wait_time.h"
#include "libipc/platform/detail.h" #include "libipc/platform/detail.h"
#include "libipc/utility/log.h" #include "libipc/utility/log.h"
#include "libipc/utility/scope_guard.h" #include "libipc/utility/scope_guard.h"
#include "libipc/memory/resource.h" #include "libipc/memory/resource.h"
#include "libipc/shm.h" #include "libipc/shm.h"
#include "get_wait_time.h"
namespace ipc { namespace ipc {
namespace detail { namespace detail {
namespace sync { namespace sync {
@ -114,7 +115,7 @@ public:
ipc::error("fail pthread_mutexattr_init[%d]\n", eno); ipc::error("fail pthread_mutexattr_init[%d]\n", eno);
return false; return false;
} }
IPC_UNUSED_ auto guard_mutex_attr = unique_ptr(&mutex_attr, ::pthread_mutexattr_destroy); IPC_UNUSED_ auto guard_mutex_attr = guard([&mutex_attr] { ::pthread_mutexattr_destroy(&mutex_attr); });
if ((eno = ::pthread_mutexattr_setpshared(&mutex_attr, PTHREAD_PROCESS_SHARED)) != 0) { if ((eno = ::pthread_mutexattr_setpshared(&mutex_attr, PTHREAD_PROCESS_SHARED)) != 0) {
ipc::error("fail pthread_mutexattr_setpshared[%d]\n", eno); ipc::error("fail pthread_mutexattr_setpshared[%d]\n", eno);
return false; return false;

View File

@ -8,9 +8,10 @@
#include <errno.h> #include <errno.h>
#include "libipc/utility/log.h" #include "libipc/utility/log.h"
#include "libipc/platform/get_wait_time.h"
#include "libipc/shm.h" #include "libipc/shm.h"
#include "get_wait_time.h"
namespace ipc { namespace ipc {
namespace detail { namespace detail {
namespace sync { namespace sync {

View File

@ -1,198 +1,197 @@
#include <sys/shm.h> #include <sys/stat.h>
#include <sys/stat.h> #include <sys/mman.h>
#include <sys/mman.h> #include <sys/types.h>
#include <sys/types.h> #include <unistd.h>
#include <unistd.h> #include <fcntl.h>
#include <fcntl.h> #include <errno.h>
#include <errno.h>
#include <atomic>
#include <atomic> #include <string>
#include <string> #include <utility>
#include <utility> #include <cstring>
#include <cstring>
#include "libipc/shm.h"
#include "libipc/shm.h" #include "libipc/def.h"
#include "libipc/def.h" #include "libipc/pool_alloc.h"
#include "libipc/pool_alloc.h"
#include "libipc/utility/log.h"
#include "libipc/utility/log.h" #include "libipc/memory/resource.h"
#include "libipc/memory/resource.h"
namespace {
namespace {
struct info_t {
struct info_t { std::atomic<std::int32_t> acc_;
std::atomic<std::int32_t> acc_; };
};
struct id_info_t {
struct id_info_t { int fd_ = -1;
int fd_ = -1; void* mem_ = nullptr;
void* mem_ = nullptr; std::size_t size_ = 0;
std::size_t size_ = 0; ipc::string name_;
ipc::string name_; };
};
constexpr std::size_t calc_size(std::size_t size) {
constexpr std::size_t calc_size(std::size_t size) { return ((((size - 1) / alignof(info_t)) + 1) * alignof(info_t)) + sizeof(info_t);
return ((((size - 1) / alignof(info_t)) + 1) * alignof(info_t)) + sizeof(info_t); }
}
inline auto& acc_of(void* mem, std::size_t size) {
inline auto& acc_of(void* mem, std::size_t size) { return reinterpret_cast<info_t*>(static_cast<ipc::byte_t*>(mem) + size - sizeof(info_t))->acc_;
return reinterpret_cast<info_t*>(static_cast<ipc::byte_t*>(mem) + size - sizeof(info_t))->acc_; }
}
} // internal-linkage
} // internal-linkage
namespace ipc {
namespace ipc { namespace shm {
namespace shm {
id_t acquire(char const * name, std::size_t size, unsigned mode) {
id_t acquire(char const * name, std::size_t size, unsigned mode) { if (name == nullptr || name[0] == '\0') {
if (name == nullptr || name[0] == '\0') { ipc::error("fail acquire: name is empty\n");
ipc::error("fail acquire: name is empty\n"); return nullptr;
return nullptr; }
} ipc::string op_name = ipc::string{"__IPC_SHM__"} + name;
ipc::string op_name = ipc::string{"__IPC_SHM__"} + name; // Open the object for read-write access.
// Open the object for read-write access. int flag = O_RDWR;
int flag = O_RDWR; switch (mode) {
switch (mode) { case open:
case open: size = 0;
size = 0; break;
break; // The check for the existence of the object,
// The check for the existence of the object, // and its creation if it does not exist, are performed atomically.
// and its creation if it does not exist, are performed atomically. case create:
case create: flag |= O_CREAT | O_EXCL;
flag |= O_CREAT | O_EXCL; break;
break; // Create the shared memory object if it does not exist.
// Create the shared memory object if it does not exist. default:
default: flag |= O_CREAT;
flag |= O_CREAT; break;
break; }
} int fd = ::shm_open(op_name.c_str(), flag, S_IRUSR | S_IWUSR |
int fd = ::shm_open(op_name.c_str(), flag, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP |
S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
S_IROTH | S_IWOTH); if (fd == -1) {
if (fd == -1) { ipc::error("fail shm_open[%d]: %s\n", errno, name);
ipc::error("fail shm_open[%d]: %s\n", errno, name); return nullptr;
return nullptr; }
} auto ii = mem::alloc<id_info_t>();
auto ii = mem::alloc<id_info_t>(); ii->fd_ = fd;
ii->fd_ = fd; ii->size_ = size;
ii->size_ = size; ii->name_ = std::move(op_name);
ii->name_ = std::move(op_name); return ii;
return ii; }
}
std::int32_t get_ref(id_t id) {
std::int32_t get_ref(id_t id) { if (id == nullptr) {
if (id == nullptr) { return 0;
return 0; }
} auto ii = static_cast<id_info_t*>(id);
auto ii = static_cast<id_info_t*>(id); if (ii->mem_ == nullptr || ii->size_ == 0) {
if (ii->mem_ == nullptr || ii->size_ == 0) { return 0;
return 0; }
} return acc_of(ii->mem_, ii->size_).load(std::memory_order_acquire);
return acc_of(ii->mem_, ii->size_).load(std::memory_order_acquire); }
}
void sub_ref(id_t id) {
void sub_ref(id_t id) { if (id == nullptr) {
if (id == nullptr) { ipc::error("fail sub_ref: invalid id (null)\n");
ipc::error("fail sub_ref: invalid id (null)\n"); return;
return; }
} auto ii = static_cast<id_info_t*>(id);
auto ii = static_cast<id_info_t*>(id); if (ii->mem_ == nullptr || ii->size_ == 0) {
if (ii->mem_ == nullptr || ii->size_ == 0) { ipc::error("fail sub_ref: invalid id (mem = %p, size = %zd)\n", ii->mem_, ii->size_);
ipc::error("fail sub_ref: invalid id (mem = %p, size = %zd)\n", ii->mem_, ii->size_); return;
return; }
} acc_of(ii->mem_, ii->size_).fetch_sub(1, std::memory_order_acq_rel);
acc_of(ii->mem_, ii->size_).fetch_sub(1, std::memory_order_acq_rel); }
}
void * get_mem(id_t id, std::size_t * size) {
void * get_mem(id_t id, std::size_t * size) { if (id == nullptr) {
if (id == nullptr) { ipc::error("fail get_mem: invalid id (null)\n");
ipc::error("fail get_mem: invalid id (null)\n"); return nullptr;
return nullptr; }
} auto ii = static_cast<id_info_t*>(id);
auto ii = static_cast<id_info_t*>(id); if (ii->mem_ != nullptr) {
if (ii->mem_ != nullptr) { if (size != nullptr) *size = ii->size_;
if (size != nullptr) *size = ii->size_; return ii->mem_;
return ii->mem_; }
} int fd = ii->fd_;
int fd = ii->fd_; if (fd == -1) {
if (fd == -1) { ipc::error("fail get_mem: invalid id (fd = -1)\n");
ipc::error("fail get_mem: invalid id (fd = -1)\n"); return nullptr;
return nullptr; }
} if (ii->size_ == 0) {
if (ii->size_ == 0) { struct stat st;
struct stat st; if (::fstat(fd, &st) != 0) {
if (::fstat(fd, &st) != 0) { ipc::error("fail fstat[%d]: %s, size = %zd\n", errno, ii->name_.c_str(), ii->size_);
ipc::error("fail fstat[%d]: %s, size = %zd\n", errno, ii->name_.c_str(), ii->size_); return nullptr;
return nullptr; }
} ii->size_ = static_cast<std::size_t>(st.st_size);
ii->size_ = static_cast<std::size_t>(st.st_size); if ((ii->size_ <= sizeof(info_t)) || (ii->size_ % sizeof(info_t))) {
if ((ii->size_ <= sizeof(info_t)) || (ii->size_ % sizeof(info_t))) { ipc::error("fail get_mem: %s, invalid size = %zd\n", ii->name_.c_str(), ii->size_);
ipc::error("fail get_mem: %s, invalid size = %zd\n", ii->name_.c_str(), ii->size_); return nullptr;
return nullptr; }
} }
} else {
else { ii->size_ = calc_size(ii->size_);
ii->size_ = calc_size(ii->size_); if (::ftruncate(fd, static_cast<off_t>(ii->size_)) != 0) {
if (::ftruncate(fd, static_cast<off_t>(ii->size_)) != 0) { ipc::error("fail ftruncate[%d]: %s, size = %zd\n", errno, ii->name_.c_str(), ii->size_);
ipc::error("fail ftruncate[%d]: %s, size = %zd\n", errno, ii->name_.c_str(), ii->size_); return nullptr;
return nullptr; }
} }
} void* mem = ::mmap(nullptr, ii->size_, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
void* mem = ::mmap(nullptr, ii->size_, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); if (mem == MAP_FAILED) {
if (mem == MAP_FAILED) { ipc::error("fail mmap[%d]: %s, size = %zd\n", errno, ii->name_.c_str(), ii->size_);
ipc::error("fail mmap[%d]: %s, size = %zd\n", errno, ii->name_.c_str(), ii->size_); return nullptr;
return nullptr; }
} ::close(fd);
::close(fd); ii->fd_ = -1;
ii->fd_ = -1; ii->mem_ = mem;
ii->mem_ = mem; if (size != nullptr) *size = ii->size_;
if (size != nullptr) *size = ii->size_; acc_of(mem, ii->size_).fetch_add(1, std::memory_order_release);
acc_of(mem, ii->size_).fetch_add(1, std::memory_order_release); return mem;
return mem; }
}
std::int32_t release(id_t id) {
std::int32_t release(id_t id) { if (id == nullptr) {
if (id == nullptr) { ipc::error("fail release: invalid id (null)\n");
ipc::error("fail release: invalid id (null)\n"); return -1;
return -1; }
} std::int32_t ret = -1;
std::int32_t ret = -1; auto ii = static_cast<id_info_t*>(id);
auto ii = static_cast<id_info_t*>(id); if (ii->mem_ == nullptr || ii->size_ == 0) {
if (ii->mem_ == nullptr || ii->size_ == 0) { ipc::error("fail release: invalid id (mem = %p, size = %zd)\n", ii->mem_, ii->size_);
ipc::error("fail release: invalid id (mem = %p, size = %zd)\n", ii->mem_, ii->size_); }
} else if ((ret = acc_of(ii->mem_, ii->size_).fetch_sub(1, std::memory_order_acq_rel)) <= 1) {
else if ((ret = acc_of(ii->mem_, ii->size_).fetch_sub(1, std::memory_order_acq_rel)) <= 1) { ::munmap(ii->mem_, ii->size_);
::munmap(ii->mem_, ii->size_); if (!ii->name_.empty()) {
if (!ii->name_.empty()) { ::shm_unlink(ii->name_.c_str());
::shm_unlink(ii->name_.c_str()); }
} }
} else ::munmap(ii->mem_, ii->size_);
else ::munmap(ii->mem_, ii->size_); mem::free(ii);
mem::free(ii); return ret;
return ret; }
}
void remove(id_t id) {
void remove(id_t id) { if (id == nullptr) {
if (id == nullptr) { ipc::error("fail remove: invalid id (null)\n");
ipc::error("fail remove: invalid id (null)\n"); return;
return; }
} auto ii = static_cast<id_info_t*>(id);
auto ii = static_cast<id_info_t*>(id); auto name = std::move(ii->name_);
auto name = std::move(ii->name_); release(id);
release(id); if (!name.empty()) {
if (!name.empty()) { ::shm_unlink(name.c_str());
::shm_unlink(name.c_str()); }
} }
}
void remove(char const * name) {
void remove(char const * name) { if (name == nullptr || name[0] == '\0') {
if (name == nullptr || name[0] == '\0') { ipc::error("fail remove: name is empty\n");
ipc::error("fail remove: name is empty\n"); return;
return; }
} ::shm_unlink((ipc::string{"__IPC_SHM__"} + name).c_str());
::shm_unlink((ipc::string{"__IPC_SHM__"} + name).c_str()); }
}
} // namespace shm
} // namespace shm } // namespace ipc
} // namespace ipc

View File

@ -89,7 +89,7 @@ public:
return rs && rl; return rs && rl;
} }
bool notify() noexcept { bool notify(ipc::sync::mutex &) noexcept {
if (!valid()) return false; if (!valid()) return false;
auto &cnt = counter(); auto &cnt = counter();
if (!lock_.lock()) return false; if (!lock_.lock()) return false;
@ -101,7 +101,7 @@ public:
return lock_.unlock() && ret; return lock_.unlock() && ret;
} }
bool broadcast() noexcept { bool broadcast(ipc::sync::mutex &) noexcept {
if (!valid()) return false; if (!valid()) return false;
auto &cnt = counter(); auto &cnt = counter();
if (!lock_.lock()) return false; if (!lock_.lock()) return false;

View File

@ -7,8 +7,8 @@
#include "libipc/utility/log.h" #include "libipc/utility/log.h"
#include "libipc/platform/to_tchar.h" #include "to_tchar.h"
#include "libipc/platform/get_sa.h" #include "get_sa.h"
namespace ipc { namespace ipc {
namespace detail { namespace detail {

View File

@ -6,8 +6,8 @@
#include "libipc/utility/log.h" #include "libipc/utility/log.h"
#include "libipc/platform/to_tchar.h" #include "to_tchar.h"
#include "libipc/platform/get_sa.h" #include "get_sa.h"
namespace ipc { namespace ipc {
namespace detail { namespace detail {

View File

@ -9,10 +9,11 @@
#include "libipc/pool_alloc.h" #include "libipc/pool_alloc.h"
#include "libipc/utility/log.h" #include "libipc/utility/log.h"
#include "libipc/platform/to_tchar.h"
#include "libipc/platform/get_sa.h"
#include "libipc/memory/resource.h" #include "libipc/memory/resource.h"
#include "to_tchar.h"
#include "get_sa.h"
namespace { namespace {
struct id_info_t { struct id_info_t {

View File

@ -11,8 +11,8 @@
#include <cstddef> #include <cstddef>
#include "libipc/utility/concept.h" #include "libipc/utility/concept.h"
#include "libipc/platform/detail.h"
#include "libipc/memory/resource.h" #include "libipc/memory/resource.h"
#include "libipc/platform/detail.h"
namespace ipc { namespace ipc {
namespace detail { namespace detail {

View File

@ -5,10 +5,12 @@
#include "libipc/memory/resource.h" #include "libipc/memory/resource.h"
#include "libipc/platform/detail.h" #include "libipc/platform/detail.h"
#if defined(IPC_OS_WINDOWS_) #if defined(IPC_OS_WINDOWS_)
#include "libipc/platform/condition_win.h" #include "libipc/platform/win/condition.h"
#elif defined(IPC_OS_LINUX_) #elif defined(IPC_OS_LINUX_)
#include "libipc/platform/condition_linux.h" #include "libipc/platform/linux/condition.h"
#else/*linux*/ #elif defined(IPC_OS_QNX_)
#include "libipc/platform/posix/condition.h"
#else/*IPC_OS*/
# error "Unsupported platform." # error "Unsupported platform."
#endif #endif
@ -58,12 +60,12 @@ bool condition::wait(ipc::sync::mutex &mtx, std::uint64_t tm) noexcept {
return impl(p_)->cond_.wait(mtx, tm); return impl(p_)->cond_.wait(mtx, tm);
} }
bool condition::notify() noexcept { bool condition::notify(ipc::sync::mutex &mtx) noexcept {
return impl(p_)->cond_.notify(); return impl(p_)->cond_.notify(mtx);
} }
bool condition::broadcast() noexcept { bool condition::broadcast(ipc::sync::mutex &mtx) noexcept {
return impl(p_)->cond_.broadcast(); return impl(p_)->cond_.broadcast(mtx);
} }
} // namespace sync } // namespace sync

View File

@ -5,10 +5,12 @@
#include "libipc/memory/resource.h" #include "libipc/memory/resource.h"
#include "libipc/platform/detail.h" #include "libipc/platform/detail.h"
#if defined(IPC_OS_WINDOWS_) #if defined(IPC_OS_WINDOWS_)
#include "libipc/platform/mutex_win.h" #include "libipc/platform/win/mutex.h"
#elif defined(IPC_OS_LINUX_) #elif defined(IPC_OS_LINUX_)
#include "libipc/platform/mutex_linux.h" #include "libipc/platform/linux/mutex.h"
#else/*linux*/ #elif defined(IPC_OS_QNX_)
#include "libipc/platform/posix/mutex.h"
#else/*IPC_OS*/
# error "Unsupported platform." # error "Unsupported platform."
#endif #endif

View File

@ -5,10 +5,10 @@
#include "libipc/memory/resource.h" #include "libipc/memory/resource.h"
#include "libipc/platform/detail.h" #include "libipc/platform/detail.h"
#if defined(IPC_OS_WINDOWS_) #if defined(IPC_OS_WINDOWS_)
#include "libipc/platform/semaphore_win.h" #include "libipc/platform/win/semaphore.h"
#elif defined(IPC_OS_LINUX_) #elif defined(IPC_OS_LINUX_) || defined(IPC_OS_QNX_)
#include "libipc/platform/semaphore_linux.h" #include "libipc/platform/posix/semaphore_impl.h"
#else/*linux*/ #else/*IPC_OS*/
# error "Unsupported platform." # error "Unsupported platform."
#endif #endif

View File

@ -1,8 +1,9 @@
#pragma once #pragma once
#include <utility> // std::forward, std::integer_sequence #include <utility> // std::forward, std::integer_sequence
#include <cstddef> // std::size_t #include <cstddef> // std::size_t
#include <new> // std::hardware_destructive_interference_size #include <new> // std::hardware_destructive_interference_size
#include <type_traits> // std::is_trivially_copyable
#include "libipc/platform/detail.h" #include "libipc/platform/detail.h"
@ -44,13 +45,15 @@ enum {
}; };
template <typename T, typename U> template <typename T, typename U>
T horrible_cast(U val) { auto horrible_cast(U rhs) noexcept
-> typename std::enable_if<std::is_trivially_copyable<T>::value
&& std::is_trivially_copyable<U>::value, T>::type {
union { union {
T out; T t;
U in; U u;
} u; } r = {};
u.in = val; r.u = rhs;
return u.out; return r.t;
} }
IPC_CONSTEXPR_ std::size_t make_align(std::size_t align, std::size_t size) { IPC_CONSTEXPR_ std::size_t make_align(std::size_t align, std::size_t size) {

View File

@ -63,12 +63,12 @@ public:
bool notify() noexcept { bool notify() noexcept {
std::lock_guard<ipc::sync::mutex>{lock_}; // barrier std::lock_guard<ipc::sync::mutex>{lock_}; // barrier
return cond_.notify(); return cond_.notify(lock_);
} }
bool broadcast() noexcept { bool broadcast() noexcept {
std::lock_guard<ipc::sync::mutex>{lock_}; // barrier std::lock_guard<ipc::sync::mutex>{lock_}; // barrier
return cond_.broadcast(); return cond_.broadcast(lock_);
} }
bool quit_waiting() { bool quit_waiting() {

View File

@ -1,11 +0,0 @@
# A Quick Introduction to C++ Performance Tuning
(From: https://github.com/adah1972/cpp_summit_2020.git)
This repository contains the presentation file and example code for my
presentation at the C++ Summit 2020 held in Shenzhen, China on 45 December
2020.
The presentation content is shared under a [Creative Commons Attribution-Share
Alike 2.5 Licence](http://creativecommons.org/licenses/by-sa/2.5/). The code
is put in the public domain (i.e. do whatever you like with it), though an
acknowledgement will be appreciated (but not required).

View File

@ -1,77 +0,0 @@
#include "profiler.h"
#include <cassert>
#include <iostream>
#include <vector>
namespace {
struct profiling_data {
int number;
int call_count{};
uint64_t call_duration{};
};
class profiler {
public:
profiler();
~profiler();
void add_data(int number, uint64_t duration);
private:
std::vector<profiling_data> data_;
};
profiler::profiler()
{
size_t len = 0;
for (;;) {
if (name_map[len].name == NULL) {
break;
}
++len;
}
data_.resize(len);
int i = 0;
for (auto& item : data_) {
assert(i == name_map[i].number);
item.number = i;
++i;
}
}
profiler::~profiler()
{
#ifndef NDEBUG
for (auto& item : data_) {
if (item.call_count == 0) {
continue;
}
std::cout << item.number << " " << name_map[item.number].name
<< ":\n";
std::cout << " Call count: " << item.call_count << '\n';
std::cout << " Call duration: " << item.call_duration << '\n';
std::cout << " Average duration: "
<< item.call_duration * 1.0 /
(item.call_count != 0 ? item.call_count : 1)
<< '\n';
}
#endif
}
void profiler::add_data(int number, uint64_t duration)
{
assert(number >= 0 && number < static_cast<int>(data_.size()));
data_[number].call_count++;
data_[number].call_duration += duration;
}
profiler profiler_instance;
} // unnamed namespace
profiling_checker::~profiling_checker()
{
auto end_time = rdtsc();
profiler_instance.add_data(number_, end_time - start_time_);
}

View File

@ -1,35 +0,0 @@
#ifndef PROFILER_H
#define PROFILER_H
#include "rdtsc.h"
struct name_mapper {
int number;
const char* name;
};
extern name_mapper name_map[];
class profiling_checker {
public:
profiling_checker(int number);
~profiling_checker();
private:
int number_;
uint64_t start_time_;
};
inline profiling_checker::profiling_checker(int number)
: number_(number)
{
start_time_ = rdtsc();
}
#ifdef NDEBUG
#define PROFILE_CHECK(func_number) (void)0
#else
#define PROFILE_CHECK(func_number) profiling_checker _checker(func_number)
#endif
#endif // PROFILER_H

View File

@ -1,52 +0,0 @@
#ifndef RDTSC_H
#define RDTSC_H
#include <stdint.h> // uint64_t
#if defined(_M_X64) || defined(_M_IX86) || defined(__x86_64) || defined(__i386)
# ifdef _WIN32
# include <intrin.h> // __rdtsc
# else
# include <x86intrin.h> // __rdtsc
# endif
# define HAS_HW_RDTSC 1
#else
# include <chrono> // std::chrono::high_resolution_clock
# define HAS_HW_RDTSC 0
#endif
inline uint64_t rdtsc()
{
#if HAS_HW_RDTSC
// _mm_lfence() might be used to serialize the instruction stream,
// and it would guarantee that RDTSC will not be reordered with
// other instructions. However, measurements show that the overhead
// may be too big (easily 15 to 30 CPU cycles) for profiling
// purposes: if reordering matters, the overhead matters too!
// Forbid the compiler from reordering instructions
# ifdef _MSC_VER
_ReadWriteBarrier();
# else
__asm__ __volatile__("" : : : "memory");
# endif
uint64_t result = __rdtsc();
// Forbid the compiler from reordering instructions
# ifdef _MSC_VER
_ReadWriteBarrier();
# else
__asm__ __volatile__("" : : : "memory");
# endif
return result;
#else
auto now = std::chrono::high_resolution_clock::now();
return std::chrono::duration_cast<std::chrono::nanoseconds>(
now.time_since_epoch())
.count();
#endif
}
#endif // RDTSC_H

View File

@ -153,16 +153,16 @@ void test_sr(char const * name, int s_cnt, int r_cnt) {
TEST(IPC, basic) { TEST(IPC, basic) {
test_basic<relat::single, relat::single, trans::unicast >("ssu"); test_basic<relat::single, relat::single, trans::unicast >("ssu");
test_basic<relat::single, relat::multi , trans::unicast >("smu"); //test_basic<relat::single, relat::multi , trans::unicast >("smu");
test_basic<relat::multi , relat::multi , trans::unicast >("mmu"); //test_basic<relat::multi , relat::multi , trans::unicast >("mmu");
test_basic<relat::single, relat::multi , trans::broadcast>("smb"); test_basic<relat::single, relat::multi , trans::broadcast>("smb");
test_basic<relat::multi , relat::multi , trans::broadcast>("mmb"); test_basic<relat::multi , relat::multi , trans::broadcast>("mmb");
} }
TEST(IPC, 1v1) { TEST(IPC, 1v1) {
test_sr<relat::single, relat::single, trans::unicast >("ssu", 1, 1); test_sr<relat::single, relat::single, trans::unicast >("ssu", 1, 1);
test_sr<relat::single, relat::multi , trans::unicast >("smu", 1, 1); //test_sr<relat::single, relat::multi , trans::unicast >("smu", 1, 1);
test_sr<relat::multi , relat::multi , trans::unicast >("mmu", 1, 1); //test_sr<relat::multi , relat::multi , trans::unicast >("mmu", 1, 1);
test_sr<relat::single, relat::multi , trans::broadcast>("smb", 1, 1); test_sr<relat::single, relat::multi , trans::broadcast>("smb", 1, 1);
test_sr<relat::multi , relat::multi , trans::broadcast>("mmb", 1, 1); test_sr<relat::multi , relat::multi , trans::broadcast>("mmb", 1, 1);
} }

View File

@ -8,7 +8,7 @@
#include "test.h" #include "test.h"
#include "libipc/platform/to_tchar.h" #include "libipc/platform/win/to_tchar.h"
TEST(Platform, to_tchar) { TEST(Platform, to_tchar) {
char const *utf8 = "hello world, " char const *utf8 = "hello world, "

View File

@ -67,22 +67,22 @@ TEST(Sync, Mutex) {
EXPECT_THROW(lock.try_lock(), std::system_error); EXPECT_THROW(lock.try_lock(), std::system_error);
int i = 0; // int i = 0;
EXPECT_TRUE(lock.lock()); // EXPECT_TRUE(lock.lock());
i = 100; // i = 100;
auto t2 = std::thread{[&i] { // auto t2 = std::thread{[&i] {
ipc::sync::mutex lock {"test-mutex-robust"}; // ipc::sync::mutex lock {"test-mutex-robust"};
EXPECT_TRUE(lock.valid()); // EXPECT_TRUE(lock.valid());
EXPECT_FALSE(lock.try_lock()); // EXPECT_FALSE(lock.try_lock());
EXPECT_TRUE(lock.lock()); // EXPECT_TRUE(lock.lock());
i += i; // i += i;
EXPECT_TRUE(lock.unlock()); // EXPECT_TRUE(lock.unlock());
}}; // }};
std::this_thread::sleep_for(std::chrono::seconds(1)); // std::this_thread::sleep_for(std::chrono::seconds(1));
EXPECT_EQ(i, 100); // EXPECT_EQ(i, 100);
EXPECT_TRUE(lock.unlock()); // EXPECT_TRUE(lock.unlock());
t2.join(); // t2.join();
EXPECT_EQ(i, 200); // EXPECT_EQ(i, 200);
} }
#include "libipc/semaphore.h" #include "libipc/semaphore.h"
@ -113,7 +113,7 @@ TEST(Sync, Condition) {
auto job = [&que](int num) { auto job = [&que](int num) {
ipc::sync::condition cond {"test-cond"}; ipc::sync::condition cond {"test-cond"};
ipc::sync::mutex lock {"test-mutex"}; ipc::sync::mutex lock {"test-mutex"};
for (;;) { for (int i = 0; i < 10; ++i) {
int val = 0; int val = 0;
{ {
std::lock_guard<ipc::sync::mutex> guard {lock}; std::lock_guard<ipc::sync::mutex> guard {lock};
@ -123,6 +123,19 @@ TEST(Sync, Condition) {
val = que.front(); val = que.front();
que.pop_front(); que.pop_front();
} }
EXPECT_NE(val, 0);
std::printf("test-cond-%d: %d\n", num, val);
}
for (;;) {
int val = 0;
{
std::lock_guard<ipc::sync::mutex> guard {lock};
while (que.empty()) {
EXPECT_TRUE(cond.wait(lock, 1000));
}
val = que.front();
que.pop_front();
}
if (val == 0) { if (val == 0) {
std::printf("test-cond-%d: exit.\n", num); std::printf("test-cond-%d: exit.\n", num);
return; return;
@ -139,16 +152,16 @@ TEST(Sync, Condition) {
{ {
std::lock_guard<ipc::sync::mutex> guard {lock}; std::lock_guard<ipc::sync::mutex> guard {lock};
que.push_back(i); que.push_back(i);
ASSERT_TRUE(cond.notify(lock));
} }
cond.notify();
std::this_thread::sleep_for(std::chrono::milliseconds(20)); std::this_thread::sleep_for(std::chrono::milliseconds(20));
} }
for (int i = 1; i < 100; ++i) { for (int i = 1; i < 100; ++i) {
{ {
std::lock_guard<ipc::sync::mutex> guard {lock}; std::lock_guard<ipc::sync::mutex> guard {lock};
que.push_back(i); que.push_back(i);
ASSERT_TRUE(cond.broadcast(lock));
} }
cond.broadcast();
std::this_thread::sleep_for(std::chrono::milliseconds(20)); std::this_thread::sleep_for(std::chrono::milliseconds(20));
} }
{ {
@ -156,8 +169,8 @@ TEST(Sync, Condition) {
for (int i = 0; i < (int)test_conds.size(); ++i) { for (int i = 0; i < (int)test_conds.size(); ++i) {
que.push_back(0); que.push_back(0);
} }
ASSERT_TRUE(cond.broadcast(lock));
} }
cond.broadcast();
for (auto &t : test_conds) t.join(); for (auto &t : test_conds) t.join();
} }
@ -171,7 +184,7 @@ TEST(Sync, ConditionRobust) {
printf("XXXXXXXXXXXXXXXXXXXXXXXXXXXXXX 2\n"); printf("XXXXXXXXXXXXXXXXXXXXXXXXXXXXXX 2\n");
ipc::sync::mutex lock {"test-mutex"}; ipc::sync::mutex lock {"test-mutex"};
printf("XXXXXXXXXXXXXXXXXXXXXXXXXXXXXX 3\n"); printf("XXXXXXXXXXXXXXXXXXXXXXXXXXXXXX 3\n");
lock.lock(); ASSERT_TRUE(lock.lock());
std::thread unlock {[] { std::thread unlock {[] {
printf("WWWWWWWWWWWWWWWWWWWWWWWWWWWWWW 1\n"); printf("WWWWWWWWWWWWWWWWWWWWWWWWWWWWWW 1\n");
ipc::sync::condition cond {"test-cond"}; ipc::sync::condition cond {"test-cond"};
@ -183,13 +196,13 @@ TEST(Sync, ConditionRobust) {
} }
std::this_thread::sleep_for(std::chrono::seconds(1)); std::this_thread::sleep_for(std::chrono::seconds(1));
printf("WWWWWWWWWWWWWWWWWWWWWWWWWWWWWW 4\n"); printf("WWWWWWWWWWWWWWWWWWWWWWWWWWWWWW 4\n");
cond.broadcast(); ASSERT_TRUE(cond.broadcast(lock));
printf("WWWWWWWWWWWWWWWWWWWWWWWWWWWWWW 5\n"); printf("WWWWWWWWWWWWWWWWWWWWWWWWWWWWWW 5\n");
}}; }};
printf("XXXXXXXXXXXXXXXXXXXXXXXXXXXXXX 4\n"); printf("XXXXXXXXXXXXXXXXXXXXXXXXXXXXXX 4\n");
cond.wait(lock); ASSERT_TRUE(cond.wait(lock));
printf("XXXXXXXXXXXXXXXXXXXXXXXXXXXXXX 5\n"); printf("XXXXXXXXXXXXXXXXXXXXXXXXXXXXXX 5\n");
lock.unlock(); ASSERT_TRUE(lock.unlock());
printf("XXXXXXXXXXXXXXXXXXXXXXXXXXXXXX 6\n"); printf("XXXXXXXXXXXXXXXXXXXXXXXXXXXXXX 6\n");
unlock.join(); unlock.join();
} }