impl robust mutex & condition (using alephzero's mtx implementation)

This commit is contained in:
mutouyun 2021-10-23 19:06:33 +08:00
parent 96551d5fcb
commit 3344bbf799
28 changed files with 1696 additions and 272 deletions

View File

@ -1,7 +1,8 @@
project(ipc)
if(UNIX)
file(GLOB SRC_FILES ${LIBIPC_PROJECT_DIR}/src/libipc/platform/linux/*.cpp)
file(GLOB SRC_FILES ${LIBIPC_PROJECT_DIR}/src/libipc/platform/linux/*.cpp
${LIBIPC_PROJECT_DIR}/src/libipc/platform/linux/a0/*.c)
else()
file(GLOB SRC_FILES ${LIBIPC_PROJECT_DIR}/src/libipc/platform/win/*.cpp)
endif()
@ -33,18 +34,18 @@ set_target_properties(${PROJECT_NAME}
PROPERTIES
ARCHIVE_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/lib"
LIBRARY_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/lib"
RUNTIME_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/bin" )
RUNTIME_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/bin")
# set version
set_target_properties(${PROJECT_NAME}
PROPERTIES
VERSION 1.1.0
SOVERSION 2)
VERSION 1.2.0
SOVERSION 3)
target_include_directories(${PROJECT_NAME}
PUBLIC ${LIBIPC_PROJECT_DIR}/include
PRIVATE ${LIBIPC_PROJECT_DIR}/src
)
$<$<BOOL:UNIX>:${LIBIPC_PROJECT_DIR}/src/libipc/platform/linux>)
if(NOT MSVC)
target_link_libraries(${PROJECT_NAME} PUBLIC
@ -56,5 +57,4 @@ install(
TARGETS ${PROJECT_NAME}
RUNTIME DESTINATION bin
LIBRARY DESTINATION lib
ARCHIVE DESTINATION lib
)
ARCHIVE DESTINATION lib)

View File

@ -0,0 +1,24 @@
This is free and unencumbered software released into the public domain.
Anyone is free to copy, modify, publish, use, compile, sell, or
distribute this software, either in source code form or as a compiled
binary, for any purpose, commercial or non-commercial, and by any
means.
In jurisdictions that recognize copyright laws, the author or authors
of this software dedicate any and all copyright interest in the
software to the public domain. We make this dedication for the benefit
of the public at large and to the detriment of our heirs and
successors. We intend this dedication to be an overt act of
relinquishment in perpetuity of all present and future rights to this
software under copyright law.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR
OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
OTHER DEALINGS IN THE SOFTWARE.
For more information, please refer to <http://unlicense.org>

View File

@ -0,0 +1,213 @@
<h1 align="center">
<br>
<img src="https://raw.githubusercontent.com/alephzero/logo/master/rendered/alephzero.svg" width="256px">
<br>
AlephZero
</h1>
<h3 align="center">Simple, Robust, Fast IPC.</h3>
<p align="center">
<a href="https://github.com/alephzero/alephzero/actions?query=workflow%3ACI"><img src="https://github.com/alephzero/alephzero/workflows/CI/badge.svg"></a>
<a href="https://codecov.io/gh/alephzero/alephzero"><img src="https://codecov.io/gh/alephzero/alephzero/branch/master/graph/badge.svg"></a>
<a href="https://alephzero.readthedocs.io/en/latest/?badge=latest"><img src="https://readthedocs.org/projects/alephzero/badge/?version=latest"></a>
<a href="http://unlicense.org"><img src="https://img.shields.io/badge/license-Unlicense-blue.svg"></a>
</p>
<p align="center">
<a href="#overview">Overview</a>
<a href="#transport">Transport</a>
<a href="#protocol">Protocol</a>
<a href="#examples">Examples</a>
<a href="#installation">Installation</a>
<a href="#across-dockers">Across Dockers</a>
</p>
# Overview
[Presentation from March 25, 2020](https://docs.google.com/presentation/d/12KE9UucjZPtpVnM1NljxOqBolBBKECWJdrCoE2yJaBw/edit#slide=id.p)
AlephZero is a library for message based communication between programs running on the same machine.
## Simple
AlephZero's main goal is to be simple to use. Nothing is higher priority.
There is no "master" process in between your nodes that is needed to do handshakes or exchanges of any kind. All you need is the topic name.
See the <a href="#examples">Examples</a>.
## Robust
This is probably the main value of AlephZero, above similar libraries.
AlephZero uses a lot of tricks to ensure the state of all channels is consistent, even when programs die. This includes double-buffering the state of the communication channel and [robustifying](https://man7.org/linux/man-pages/man3/pthread_mutexattr_setrobust.3.html) the locks and notification channels.
## Fast
AlephZero uses shared memory across multiple processes to read and write messages, minimizing the involvement of the kernel. The kernel only really gets involved in notifying a process that a new message exists, and for that we use futex (fast user-space mutex).
TODO: Benchmarks
# Transport
AlephZero, at its core, is a simple allocator on top of a contiguous region of memory. Usually, shared-memory. The allocator of choice is a circular-linked-list, which is fast, simple, and sufficient for the protocol listed below. It also plays well with the robustness requirement.
This has a number of implications. For one, this means that old messages are kept around until the space is needed. The oldest messages are always discarded before any more recent messages.
# Protocol
Rather than exposing the low-level transport directly, AlephZero provides a few higher level protocol:
* <b>PubSub</b>: Broadcast published messages. Subscribers get notified.
* <b>RPC</b>: Request-response.
* <b>PRPC (Progressive RPC)</b>: Request-streaming response.
* <b>Sessions</b>: Bi-directional channel of communication. Not yet implemented. Let me know if you want this.
# Examples
Many more example and an interactive experience can be found at: https://github.com/alephzero/playground
For the curious, here are some simple snippets to get you started:
To begin with, we need to include AlephZero:
```cc
#include <a0.h>
```
## PubSub
You can have as many publisher and subscribers on the same topic as you wish. They just need to agree on the filename.
```cc
a0::Publisher p("my_pubsub_topic");
p.pub("foo");
```
You just published `"foo"` to the `"my_pubsub_topic"`.
To read those message, you can create a subscriber on the same topic:
```cc
a0::Subscriber sub(
"my_pubsub_topic",
A0_INIT_AWAIT_NEW, // or MOST_RECENT or OLDEST
A0_ITER_NEWEST, // or NEXT
[](a0::PacketView pkt_view) {
std::cout << "Got: " << pkt_view.payload() << std::endl;
});
```
The callback will trigger whenever a message is published.
The `Subscriber` object spawns a thread that will read the topic and call the callback.
The `A0_INIT` tells the subscriber where to start reading.
* `A0_INIT_AWAIT_NEW`: Start with messages published after the creation of the subscriber.
* `A0_INIT_MOST_RECENT`: Start with the most recently published message. Useful for state and configuration. But be careful, this can be quite old!
* `A0_INIT_OLDEST`: Topics keep a history of 16MB (unless configures otherwise). Start with the oldest thing still in there.
The `A0_ITER` tells the subscriber how to continue reading messages. After each callback:
* `A0_ITER_NEXT`: grab the sequentially next message. When you don't want to miss a thing.
* `A0_ITER_NEWEST`: grab the newest available unread message. When you want to keep up with the firehose.
```cc
a0::SubscriberSync sub_sync(
"my_pubsub_topic",
A0_INIT_OLDEST, A0_ITER_NEXT);
while (sub_sync.has_next()) {
auto pkt = sub_sync.next();
std::cout << "Got: " << pkt.payload() << std::endl;
}
```
## RPC
Create an `RpcServer`:
```cc
a0::RpcServer server(
"my_rpc_topic",
/* onrequest = */ [](a0::RpcRequest req) {
std::cout << "Got: " << req.pkt().payload() << std::endl;
req.reply("echo " + std::string(req.pkt().payload()));
},
/* oncancel = */ nullptr);
```
Create an `RpcClient`:
```cc
a0::RpcClient client("my_rpc_topic");
client.send("client msg", [](a0::PacketView reply) {
std::cout << "Got: " << reply.payload() << std::endl;
});
```
# Installation
## Install From Source
### Ubuntu Dependencies
```sh
apt install g++ make
```
### Alpine Dependencies
```sh
apk add g++ linux-headers make
```
### Download And Install
```sh
git clone https://github.com/alephzero/alephzero.git
cd alephzero
make install -j
```
## Install From Package
Coming soon-ish. Let me know if you want this and I'll prioritize it. External support is much appreciated.
## Integration
### Command Line
Add the following to g++ / clang commands.
```sh
-L${libdir} -lalephzero -lpthread
```
### Package-cfg
```sh
pkg-config --cflags --libs alephzero
```
### CMake
Coming soon-ish. Let me know if you want this and I'll prioritize it. External support is much appreciated.
### Bazel
Coming soon-ish. Let me know if you want this and I'll prioritize it.
# Across Dockers
For programs running across different dockers to be able to communicate, we need to have them match up on two flags: `--ipc` and `--pid`.
* `--ipc` shares the `/dev/shm` filesystem. This is necessary to open the same file topics.
* `--pid` shares the process id namespace. This is necessary for the locking and notification systems.
In the simplest case, you can set them both to `host` and talk through the system's global `/dev/shm` and process id namespace.
```sh
docker run --ipc=host --pid=host --name=foo foo_image
docker run --ipc=host --pid=host --name=bar bar_image
```
Or, you can mark one as `shareable` and have the others connect to it:
```sh
docker run --ipc=shareable --pid=shareable --name=foo foo_image
docker run --ipc=container:foo --pid=container:foo --name=bar bar_image
```

View File

@ -0,0 +1,36 @@
#ifndef A0_SRC_ATOMIC_H
#define A0_SRC_ATOMIC_H
#include "a0/inline.h"
#ifdef __cplusplus
extern "C" {
#endif
A0_STATIC_INLINE
void a0_barrier() {
// 'atomic_thread_fence' is not supported with -fsanitize=thread
__sync_synchronize();
}
#define a0_atomic_fetch_add(P, V) __atomic_fetch_add((P), (V), __ATOMIC_RELAXED)
#define a0_atomic_add_fetch(P, V) __atomic_add_fetch((P), (V), __ATOMIC_RELAXED)
#define a0_atomic_fetch_and(P, V) __atomic_fetch_and((P), (V), __ATOMIC_RELAXED)
#define a0_atomic_and_fetch(P, V) __atomic_and_fetch((P), (V), __ATOMIC_RELAXED)
#define a0_atomic_fetch_or(P, V) __atomic_fetch_or((P), (V), __ATOMIC_RELAXED)
#define a0_atomic_or_fetch(P, V) __atomic_or_fetch((P), (V), __ATOMIC_RELAXED)
#define a0_atomic_load(P) __atomic_load_n((P), __ATOMIC_RELAXED)
#define a0_atomic_store(P, V) __atomic_store_n((P), (V), __ATOMIC_RELAXED)
// TODO(lshamis): Switch from __sync to __atomic.
#define a0_cas_val(P, OV, NV) __sync_val_compare_and_swap((P), (OV), (NV))
#define a0_cas(P, OV, NV) __sync_bool_compare_and_swap((P), (OV), (NV))
#ifdef __cplusplus
}
#endif
#endif // A0_SRC_ATOMIC_H

View File

@ -0,0 +1,60 @@
#ifndef A0_SRC_CLOCK_H
#define A0_SRC_CLOCK_H
#include "a0/err.h"
#include "a0/inline.h"
#include <stdint.h>
#include <time.h>
#include "err_macro.h"
#ifdef __cplusplus
extern "C" {
#endif
static const int64_t NS_PER_SEC = 1e9;
typedef struct timespec timespec_t;
A0_STATIC_INLINE
a0_err_t a0_clock_now(clockid_t clk, timespec_t* out) {
A0_RETURN_SYSERR_ON_MINUS_ONE(clock_gettime(clk, out));
return A0_OK;
}
A0_STATIC_INLINE
a0_err_t a0_clock_add(timespec_t ts, int64_t add_nsec, timespec_t* out) {
out->tv_sec = ts.tv_sec + add_nsec / NS_PER_SEC;
out->tv_nsec = ts.tv_nsec + add_nsec % NS_PER_SEC;
if (out->tv_nsec >= NS_PER_SEC) {
out->tv_sec++;
out->tv_nsec -= NS_PER_SEC;
} else if (out->tv_nsec < 0) {
out->tv_sec--;
out->tv_nsec += NS_PER_SEC;
}
return A0_OK;
}
A0_STATIC_INLINE
a0_err_t a0_clock_convert(
clockid_t orig_clk,
timespec_t orig_ts,
clockid_t target_clk,
timespec_t* target_ts) {
timespec_t orig_now;
A0_RETURN_ERR_ON_ERR(a0_clock_now(orig_clk, &orig_now));
timespec_t target_now;
A0_RETURN_ERR_ON_ERR(a0_clock_now(target_clk, &target_now));
int64_t add_nsec = (orig_ts.tv_sec - orig_now.tv_sec) * NS_PER_SEC + (orig_ts.tv_nsec - orig_now.tv_nsec);
return a0_clock_add(target_now, add_nsec, target_ts);
}
#ifdef __cplusplus
}
#endif
#endif // A0_SRC_CLOCK_H

View File

@ -0,0 +1,13 @@
#ifndef A0_EMPTY_H
#define A0_EMPTY_H
// Bah. Why is there no consistent way to zero initialize a struct?
#ifdef __cplusplus
#define A0_EMPTY \
{}
#else
#define A0_EMPTY \
{ 0 }
#endif
#endif // A0_EMPTY_H

View File

@ -0,0 +1,50 @@
#include "a0/err.h"
#include "a0/thread_local.h"
#include <errno.h>
#include <string.h>
A0_THREAD_LOCAL int a0_err_syscode;
A0_THREAD_LOCAL char a0_err_msg[1024];
const char* a0_strerror(a0_err_t err) {
switch (err) {
case A0_OK: {
return strerror(0);
}
case A0_ERR_SYS: {
return strerror(a0_err_syscode);
}
case A0_ERR_CUSTOM_MSG: {
return a0_err_msg;
}
case A0_ERR_INVALID_ARG: {
return strerror(EINVAL);
}
case A0_ERR_RANGE: {
return "Index out of bounds";
}
case A0_ERR_AGAIN: {
return "Not available yet";
}
case A0_ERR_FRAME_LARGE: {
return "Frame size too large";
}
case A0_ERR_ITER_DONE: {
return "Done iterating";
}
case A0_ERR_NOT_FOUND: {
return "Not found";
}
case A0_ERR_BAD_PATH: {
return "Invalid path";
}
case A0_ERR_BAD_TOPIC: {
return "Invalid topic name";
}
default: {
break;
}
}
return "";
}

View File

@ -0,0 +1,33 @@
#ifndef A0_ERR_H
#define A0_ERR_H
#include "a0/thread_local.h"
#ifdef __cplusplus
extern "C" {
#endif
typedef enum a0_err_e {
A0_OK = 0,
A0_ERR_SYS = 1,
A0_ERR_CUSTOM_MSG = 2,
A0_ERR_INVALID_ARG = 3,
A0_ERR_RANGE = 4,
A0_ERR_AGAIN = 5,
A0_ERR_ITER_DONE = 6,
A0_ERR_NOT_FOUND = 7,
A0_ERR_FRAME_LARGE = 8,
A0_ERR_BAD_PATH = 9,
A0_ERR_BAD_TOPIC = 10,
} a0_err_t;
extern A0_THREAD_LOCAL int a0_err_syscode;
extern A0_THREAD_LOCAL char a0_err_msg[1024];
const char* a0_strerror(a0_err_t);
#ifdef __cplusplus
}
#endif
#endif // A0_ERR_H

View File

@ -0,0 +1,52 @@
#ifndef A0_SRC_ERR_MACRO_H
#define A0_SRC_ERR_MACRO_H
#include "a0/err.h"
#include "a0/inline.h"
#include <errno.h>
#include <stdarg.h>
#include <stdbool.h>
#include <stdio.h>
A0_STATIC_INLINE
a0_err_t A0_MAKE_SYSERR(int syserr) {
a0_err_syscode = syserr;
return A0_ERR_SYS;
}
A0_STATIC_INLINE
int A0_SYSERR(a0_err_t err) {
return err == A0_ERR_SYS ? a0_err_syscode : 0;
}
A0_STATIC_INLINE_RECURSIVE
a0_err_t A0_MAKE_MSGERR(const char* fmt, ...) {
va_list args;
va_start(args, fmt);
if (fmt) {
vsnprintf(a0_err_msg, sizeof(a0_err_msg), fmt, args); // NOLINT(clang-analyzer-valist.Uninitialized): https://bugs.llvm.org/show_bug.cgi?id=41311
va_end(args);
return A0_ERR_CUSTOM_MSG;
}
va_end(args);
return A0_OK;
}
#define A0_RETURN_SYSERR_ON_MINUS_ONE(X) \
do { \
if ((X) == -1) { \
a0_err_syscode = errno; \
return A0_ERR_SYS; \
} \
} while (0)
#define A0_RETURN_ERR_ON_ERR(X) \
do { \
a0_err_t _err = (X); \
if (_err) { \
return _err; \
} \
} while (0)
#endif // A0_SRC_ERR_MACRO_H

View File

@ -0,0 +1,111 @@
#ifndef A0_SRC_FTX_H
#define A0_SRC_FTX_H
#include "a0/err.h"
#include "a0/inline.h"
#include "a0/time.h"
#include <limits.h>
#include <linux/futex.h>
#include <stdint.h>
#include <syscall.h>
#include <time.h>
#include <unistd.h>
#include "clock.h"
#include "err_macro.h"
#ifdef __cplusplus
extern "C" {
#endif
// FUTEX_WAIT and FUTEX_WAIT_REQUEUE_PI default to CLOCK_MONOTONIC,
// but FUTEX_LOCK_PI always uses CLOCK_REALTIME.
//
// Until someone tells me otherwise, I assume this is bad decision making
// and I will instead standardize all things on CLOCK_BOOTTIME.
// Futex.
// Operations rely on the address.
// It should not be copied or moved.
typedef uint32_t a0_ftx_t;
A0_STATIC_INLINE
a0_err_t a0_futex(a0_ftx_t* uaddr,
int futex_op,
int val,
uintptr_t timeout_or_val2,
a0_ftx_t* uaddr2,
int val3) {
A0_RETURN_SYSERR_ON_MINUS_ONE(syscall(SYS_futex, uaddr, futex_op, val, timeout_or_val2, uaddr2, val3));
return A0_OK;
}
A0_STATIC_INLINE
a0_err_t a0_ftx_wait(a0_ftx_t* ftx, int confirm_val, const a0_time_mono_t* time_mono) {
if (!time_mono) {
return a0_futex(ftx, FUTEX_WAIT, confirm_val, 0, NULL, 0);
}
timespec_t ts_mono;
A0_RETURN_ERR_ON_ERR(a0_clock_convert(CLOCK_BOOTTIME, time_mono->ts, CLOCK_MONOTONIC, &ts_mono));
return a0_futex(ftx, FUTEX_WAIT, confirm_val, (uintptr_t)&ts_mono, NULL, 0);
}
A0_STATIC_INLINE
a0_err_t a0_ftx_wake(a0_ftx_t* ftx, int cnt) {
return a0_futex(ftx, FUTEX_WAKE, cnt, 0, NULL, 0);
}
A0_STATIC_INLINE
a0_err_t a0_ftx_signal(a0_ftx_t* ftx) {
return a0_ftx_wake(ftx, 1);
}
A0_STATIC_INLINE
a0_err_t a0_ftx_broadcast(a0_ftx_t* ftx) {
return a0_ftx_wake(ftx, INT_MAX);
}
A0_STATIC_INLINE
a0_err_t a0_ftx_lock_pi(a0_ftx_t* ftx, const a0_time_mono_t* time_mono) {
if (!time_mono) {
return a0_futex(ftx, FUTEX_LOCK_PI, 0, 0, NULL, 0);
}
timespec_t ts_wall;
A0_RETURN_ERR_ON_ERR(a0_clock_convert(CLOCK_BOOTTIME, time_mono->ts, CLOCK_REALTIME, &ts_wall));
return a0_futex(ftx, FUTEX_LOCK_PI, 0, (uintptr_t)&ts_wall, NULL, 0);
}
A0_STATIC_INLINE
a0_err_t a0_ftx_trylock_pi(a0_ftx_t* ftx) {
return a0_futex(ftx, FUTEX_TRYLOCK_PI, 0, 0, NULL, 0);
}
A0_STATIC_INLINE
a0_err_t a0_ftx_unlock_pi(a0_ftx_t* ftx) {
return a0_futex(ftx, FUTEX_UNLOCK_PI, 0, 0, NULL, 0);
}
A0_STATIC_INLINE
a0_err_t a0_ftx_cmp_requeue_pi(a0_ftx_t* ftx, int confirm_val, a0_ftx_t* requeue_ftx, int max_requeue) {
return a0_futex(ftx, FUTEX_CMP_REQUEUE_PI, 1, max_requeue, requeue_ftx, confirm_val);
}
A0_STATIC_INLINE
a0_err_t a0_ftx_wait_requeue_pi(a0_ftx_t* ftx, int confirm_val, const a0_time_mono_t* time_mono, a0_ftx_t* requeue_ftx) {
if (!time_mono) {
return a0_futex(ftx, FUTEX_WAIT_REQUEUE_PI, confirm_val, 0, requeue_ftx, 0);
}
timespec_t ts_mono;
A0_RETURN_ERR_ON_ERR(a0_clock_convert(CLOCK_BOOTTIME, time_mono->ts, CLOCK_MONOTONIC, &ts_mono));
return a0_futex(ftx, FUTEX_WAIT_REQUEUE_PI, confirm_val, (uintptr_t)&ts_mono, requeue_ftx, 0);
}
#ifdef __cplusplus
}
#endif
#endif // A0_SRC_FTX_H

View File

@ -0,0 +1,8 @@
#ifndef A0_INLINE_H
#define A0_INLINE_H
#define A0_STATIC_INLINE static inline __attribute__((always_inline))
#define A0_STATIC_INLINE_RECURSIVE static inline
#endif // A0_INLINE_H

View File

@ -0,0 +1,420 @@
#include "a0/err.h"
#include "a0/inline.h"
#include "a0/mtx.h"
#include "a0/thread_local.h"
#include "a0/tid.h"
#include "a0/time.h"
#include <errno.h>
#include <limits.h>
#include <linux/futex.h>
#include <pthread.h>
#include <stdbool.h>
#include <stddef.h>
#include <stdint.h>
#include <syscall.h>
#include <time.h>
#include <unistd.h>
#include "atomic.h"
#include "clock.h"
#include "err_macro.h"
#include "ftx.h"
// TSAN is worth the pain of properly annotating our mutex.
// clang-format off
#if defined(__SANITIZE_THREAD__)
#define A0_TSAN_ENABLED
#elif defined(__has_feature)
#if __has_feature(thread_sanitizer)
#define A0_TSAN_ENABLED
#endif
#endif
// clang-format on
const unsigned __tsan_mutex_linker_init = 1 << 0;
const unsigned __tsan_mutex_write_reentrant = 1 << 1;
const unsigned __tsan_mutex_read_reentrant = 1 << 2;
const unsigned __tsan_mutex_not_static = 1 << 8;
const unsigned __tsan_mutex_read_lock = 1 << 3;
const unsigned __tsan_mutex_try_lock = 1 << 4;
const unsigned __tsan_mutex_try_lock_failed = 1 << 5;
const unsigned __tsan_mutex_recursive_lock = 1 << 6;
const unsigned __tsan_mutex_recursive_unlock = 1 << 7;
#ifdef A0_TSAN_ENABLED
void __tsan_mutex_create(void* addr, unsigned flags);
void __tsan_mutex_destroy(void* addr, unsigned flags);
void __tsan_mutex_pre_lock(void* addr, unsigned flags);
void __tsan_mutex_post_lock(void* addr, unsigned flags, int recursion);
int __tsan_mutex_pre_unlock(void* addr, unsigned flags);
void __tsan_mutex_post_unlock(void* addr, unsigned flags);
void __tsan_mutex_pre_signal(void* addr, unsigned flags);
void __tsan_mutex_post_signal(void* addr, unsigned flags);
void __tsan_mutex_pre_divert(void* addr, unsigned flags);
void __tsan_mutex_post_divert(void* addr, unsigned flags);
#else
#define _u_ __attribute__((unused))
A0_STATIC_INLINE void _u_ __tsan_mutex_create(_u_ void* addr, _u_ unsigned flags) {}
A0_STATIC_INLINE void _u_ __tsan_mutex_destroy(_u_ void* addr, _u_ unsigned flags) {}
A0_STATIC_INLINE void _u_ __tsan_mutex_pre_lock(_u_ void* addr, _u_ unsigned flags) {}
A0_STATIC_INLINE void _u_ __tsan_mutex_post_lock(_u_ void* addr,
_u_ unsigned flags,
_u_ int recursion) {}
A0_STATIC_INLINE int _u_ __tsan_mutex_pre_unlock(_u_ void* addr, _u_ unsigned flags) {
return 0;
}
A0_STATIC_INLINE void _u_ __tsan_mutex_post_unlock(_u_ void* addr, _u_ unsigned flags) {}
A0_STATIC_INLINE void _u_ __tsan_mutex_pre_signal(_u_ void* addr, _u_ unsigned flags) {}
A0_STATIC_INLINE void _u_ __tsan_mutex_post_signal(_u_ void* addr, _u_ unsigned flags) {}
A0_STATIC_INLINE void _u_ __tsan_mutex_pre_divert(_u_ void* addr, _u_ unsigned flags) {}
A0_STATIC_INLINE void _u_ __tsan_mutex_post_divert(_u_ void* addr, _u_ unsigned flags) {}
#endif
A0_THREAD_LOCAL bool a0_robust_init = false;
A0_STATIC_INLINE
void a0_robust_reset() {
a0_robust_init = 0;
}
A0_STATIC_INLINE
void a0_robust_reset_atfork() {
pthread_atfork(NULL, NULL, &a0_robust_reset);
}
static pthread_once_t a0_robust_reset_atfork_once;
typedef struct robust_list robust_list_t;
typedef struct robust_list_head robust_list_head_t;
A0_THREAD_LOCAL robust_list_head_t a0_robust_head;
A0_STATIC_INLINE
void robust_init() {
a0_robust_head.list.next = &a0_robust_head.list;
a0_robust_head.futex_offset = offsetof(a0_mtx_t, ftx);
a0_robust_head.list_op_pending = NULL;
syscall(SYS_set_robust_list, &a0_robust_head.list, sizeof(a0_robust_head));
}
A0_STATIC_INLINE
void init_thread() {
if (a0_robust_init) {
return;
}
pthread_once(&a0_robust_reset_atfork_once, a0_robust_reset_atfork);
robust_init();
a0_robust_init = true;
}
A0_STATIC_INLINE
void robust_op_start(a0_mtx_t* mtx) {
init_thread();
a0_robust_head.list_op_pending = (struct robust_list*)mtx;
a0_barrier();
}
A0_STATIC_INLINE
void robust_op_end(a0_mtx_t* mtx) {
(void)mtx;
a0_barrier();
a0_robust_head.list_op_pending = NULL;
}
A0_STATIC_INLINE
bool robust_is_head(a0_mtx_t* mtx) {
return mtx == (a0_mtx_t*)&a0_robust_head;
}
A0_STATIC_INLINE
void robust_op_add(a0_mtx_t* mtx) {
a0_mtx_t* old_first = (a0_mtx_t*)a0_robust_head.list.next;
mtx->prev = (a0_mtx_t*)&a0_robust_head;
mtx->next = old_first;
a0_barrier();
a0_robust_head.list.next = (robust_list_t*)mtx;
if (!robust_is_head(old_first)) {
old_first->prev = mtx;
}
}
A0_STATIC_INLINE
void robust_op_del(a0_mtx_t* mtx) {
a0_mtx_t* prev = mtx->prev;
a0_mtx_t* next = mtx->next;
prev->next = next;
if (!robust_is_head(next)) {
next->prev = prev;
}
}
A0_STATIC_INLINE
uint32_t ftx_tid(a0_ftx_t ftx) {
return ftx & FUTEX_TID_MASK;
}
A0_STATIC_INLINE
bool ftx_owner_died(a0_ftx_t ftx) {
return ftx & FUTEX_OWNER_DIED;
}
static const uint32_t FTX_NOTRECOVERABLE = FUTEX_TID_MASK | FUTEX_OWNER_DIED;
A0_STATIC_INLINE
bool ftx_notrecoverable(a0_ftx_t ftx) {
return (ftx & FTX_NOTRECOVERABLE) == FTX_NOTRECOVERABLE;
}
A0_STATIC_INLINE
a0_err_t a0_mtx_timedlock_robust(a0_mtx_t* mtx, const a0_time_mono_t* timeout) {
const uint32_t tid = a0_tid();
int syserr = EINTR;
while (syserr == EINTR) {
// Can't lock if borked.
if (ftx_notrecoverable(a0_atomic_load(&mtx->ftx))) {
return A0_MAKE_SYSERR(ENOTRECOVERABLE);
}
// Try to lock without kernel involvement.
if (a0_cas(&mtx->ftx, 0, tid)) {
return A0_OK;
}
// Ask the kernel to lock.
syserr = A0_SYSERR(a0_ftx_lock_pi(&mtx->ftx, timeout));
}
if (!syserr) {
if (ftx_owner_died(a0_atomic_load(&mtx->ftx))) {
return A0_MAKE_SYSERR(EOWNERDEAD);
}
return A0_OK;
}
return A0_MAKE_SYSERR(syserr);
}
A0_STATIC_INLINE
a0_err_t a0_mtx_timedlock_impl(a0_mtx_t* mtx, const a0_time_mono_t* timeout) {
// Note: __tsan_mutex_pre_lock should come here, but tsan doesn't provide
// a way to "fail" a lock. Only a trylock.
robust_op_start(mtx);
const a0_err_t err = a0_mtx_timedlock_robust(mtx, timeout);
if (!err || A0_SYSERR(err) == EOWNERDEAD) {
__tsan_mutex_pre_lock(mtx, 0);
robust_op_add(mtx);
__tsan_mutex_post_lock(mtx, 0, 0);
}
robust_op_end(mtx);
return err;
}
a0_err_t a0_mtx_timedlock(a0_mtx_t* mtx, a0_time_mono_t timeout) {
return a0_mtx_timedlock_impl(mtx, &timeout);
}
a0_err_t a0_mtx_lock(a0_mtx_t* mtx) {
return a0_mtx_timedlock_impl(mtx, NULL);
}
A0_STATIC_INLINE
a0_err_t a0_mtx_trylock_impl(a0_mtx_t* mtx) {
const uint32_t tid = a0_tid();
// Try to lock without kernel involvement.
uint32_t old = a0_cas_val(&mtx->ftx, 0, tid);
// Did it work?
if (!old) {
robust_op_add(mtx);
return A0_OK;
}
// Is the lock still usable?
if (ftx_notrecoverable(old)) {
return A0_MAKE_SYSERR(ENOTRECOVERABLE);
}
// Is the owner still alive?
if (!ftx_owner_died(old)) {
return A0_MAKE_SYSERR(EBUSY);
}
// Oh, the owner died. Ask the kernel to fix the state.
a0_err_t err = a0_ftx_trylock_pi(&mtx->ftx);
if (!err) {
robust_op_add(mtx);
if (ftx_owner_died(a0_atomic_load(&mtx->ftx))) {
return A0_MAKE_SYSERR(EOWNERDEAD);
}
return A0_OK;
}
// EAGAIN means that somebody else beat us to it.
// Anything else means we're borked.
if (A0_SYSERR(err) == EAGAIN) {
return A0_MAKE_SYSERR(EBUSY);
}
return A0_MAKE_SYSERR(ENOTRECOVERABLE);
}
a0_err_t a0_mtx_trylock(a0_mtx_t* mtx) {
__tsan_mutex_pre_lock(mtx, __tsan_mutex_try_lock);
robust_op_start(mtx);
a0_err_t err = a0_mtx_trylock_impl(mtx);
robust_op_end(mtx);
if (!err || A0_SYSERR(err) == EOWNERDEAD) {
__tsan_mutex_post_lock(mtx, __tsan_mutex_try_lock, 0);
} else {
__tsan_mutex_post_lock(mtx, __tsan_mutex_try_lock | __tsan_mutex_try_lock_failed, 0);
}
return err;
}
a0_err_t a0_mtx_consistent(a0_mtx_t* mtx) {
const uint32_t val = a0_atomic_load(&mtx->ftx);
// Why fix what isn't broken?
if (!ftx_owner_died(val)) {
return A0_MAKE_SYSERR(EINVAL);
}
// Is it yours to fix?
if (ftx_tid(val) != a0_tid()) {
return A0_MAKE_SYSERR(EPERM);
}
// Fix it!
a0_atomic_and_fetch(&mtx->ftx, ~FUTEX_OWNER_DIED);
return A0_OK;
}
a0_err_t a0_mtx_unlock(a0_mtx_t* mtx) {
const uint32_t tid = a0_tid();
const uint32_t val = a0_atomic_load(&mtx->ftx);
// Only the owner can unlock.
if (ftx_tid(val) != tid) {
return A0_MAKE_SYSERR(EPERM);
}
__tsan_mutex_pre_unlock(mtx, 0);
// If the mutex was acquired with EOWNERDEAD, the caller is responsible
// for fixing the state and marking the mutex consistent. If they did
// not mark it consistent and are unlocking... then we are unrecoverably
// borked!
uint32_t new_val = 0;
if (ftx_owner_died(val)) {
new_val = FTX_NOTRECOVERABLE;
}
robust_op_start(mtx);
robust_op_del(mtx);
// If the futex is exactly equal to tid, then there are no waiters and the
// kernel doesn't need to get involved.
if (!a0_cas(&mtx->ftx, tid, new_val)) {
// Ask the kernel to wake up a waiter.
a0_ftx_unlock_pi(&mtx->ftx);
if (new_val) {
a0_atomic_or_fetch(&mtx->ftx, new_val);
}
}
robust_op_end(mtx);
__tsan_mutex_post_unlock(mtx, 0);
return A0_OK;
}
// TODO(lshamis): Handle ENOTRECOVERABLE
A0_STATIC_INLINE
a0_err_t a0_cnd_timedwait_impl(a0_cnd_t* cnd, a0_mtx_t* mtx, const a0_time_mono_t* timeout) {
const uint32_t init_cnd = a0_atomic_load(cnd);
// Unblock other threads to do the things that will eventually signal this wait.
a0_err_t err = a0_mtx_unlock(mtx);
if (err) {
return err;
}
__tsan_mutex_pre_lock(mtx, 0);
robust_op_start(mtx);
do {
// Priority-inheritance-aware wait until awoken or timeout.
err = a0_ftx_wait_requeue_pi(cnd, init_cnd, timeout, &mtx->ftx);
} while (A0_SYSERR(err) == EINTR);
// We need to manually lock on timeout.
// Note: We keep the timeout error.
if (A0_SYSERR(err) == ETIMEDOUT) {
a0_mtx_timedlock_robust(mtx, NULL);
}
// Someone else grabbed and mutated the resource between the unlock and wait.
// No need to wait.
if (A0_SYSERR(err) == EAGAIN) {
err = a0_mtx_timedlock_robust(mtx, NULL);
}
robust_op_add(mtx);
// If no higher priority error, check the previous owner didn't die.
if (!err) {
err = ftx_owner_died(a0_atomic_load(&mtx->ftx)) ? EOWNERDEAD : A0_OK;
}
robust_op_end(mtx);
__tsan_mutex_post_lock(mtx, 0, 0);
return err;
}
a0_err_t a0_cnd_timedwait(a0_cnd_t* cnd, a0_mtx_t* mtx, a0_time_mono_t timeout) {
// Let's not unlock the mutex if we're going to get EINVAL due to a bad timeout.
if ((timeout.ts.tv_sec < 0 || timeout.ts.tv_nsec < 0 || (!timeout.ts.tv_sec && !timeout.ts.tv_nsec) || timeout.ts.tv_nsec >= NS_PER_SEC)) {
return A0_MAKE_SYSERR(EINVAL);
}
return a0_cnd_timedwait_impl(cnd, mtx, &timeout);
}
a0_err_t a0_cnd_wait(a0_cnd_t* cnd, a0_mtx_t* mtx) {
return a0_cnd_timedwait_impl(cnd, mtx, NULL);
}
A0_STATIC_INLINE
a0_err_t a0_cnd_wake(a0_cnd_t* cnd, a0_mtx_t* mtx, uint32_t cnt) {
uint32_t val = a0_atomic_add_fetch(cnd, 1);
while (true) {
a0_err_t err = a0_ftx_cmp_requeue_pi(cnd, val, &mtx->ftx, cnt);
if (A0_SYSERR(err) != EAGAIN) {
return err;
}
// Another thread is also trying to wake this condition variable.
val = a0_atomic_load(cnd);
}
}
a0_err_t a0_cnd_signal(a0_cnd_t* cnd, a0_mtx_t* mtx) {
return a0_cnd_wake(cnd, mtx, 1);
}
a0_err_t a0_cnd_broadcast(a0_cnd_t* cnd, a0_mtx_t* mtx) {
return a0_cnd_wake(cnd, mtx, INT_MAX);
}

View File

@ -0,0 +1,57 @@
#ifndef A0_MTX_H
#define A0_MTX_H
#include "a0/err.h"
#include "a0/time.h"
#include "a0/unused.h"
#ifdef __cplusplus
extern "C" {
#endif
typedef uint32_t a0_ftx_t;
// https://stackoverflow.com/questions/61645966/is-typedef-allowed-before-definition
struct a0_mtx_s;
typedef struct a0_mtx_s a0_mtx_t;
// Mutex implementation designed for IPC.
//
// Similar to pthread_mutex_t with the following flags fixed:
// * Process shared.
// * Robust.
// * Error checking.
// * Priority inheriting.
//
// Unlike pthread_mutex_t, timespec are expected to use CLOCK_BOOTTIME.
//
// struct a0_mtx_s "Inherits" from robust_list, which requires:
// * The first field MUST be a next pointer.
// * There must be a futex, which makes the mutex immovable.
//
// Note: a mutex MUST be unlocked before being freed or unmapped.
struct a0_mtx_s {
a0_mtx_t* next;
a0_mtx_t* prev;
a0_ftx_t ftx;
};
a0_err_t a0_mtx_lock(a0_mtx_t*) A0_WARN_UNUSED_RESULT;
a0_err_t a0_mtx_timedlock(a0_mtx_t*, a0_time_mono_t) A0_WARN_UNUSED_RESULT;
a0_err_t a0_mtx_trylock(a0_mtx_t*) A0_WARN_UNUSED_RESULT;
a0_err_t a0_mtx_consistent(a0_mtx_t*);
a0_err_t a0_mtx_unlock(a0_mtx_t*);
typedef a0_ftx_t a0_cnd_t;
a0_err_t a0_cnd_wait(a0_cnd_t*, a0_mtx_t*);
a0_err_t a0_cnd_timedwait(a0_cnd_t*, a0_mtx_t*, a0_time_mono_t);
a0_err_t a0_cnd_signal(a0_cnd_t*, a0_mtx_t*);
a0_err_t a0_cnd_broadcast(a0_cnd_t*, a0_mtx_t*);
#ifdef __cplusplus
}
#endif
#endif // A0_MTX_H

View File

@ -0,0 +1,64 @@
#include "strconv.h"
#include "a0/err.h"
#include <string.h>
static const char DECIMAL_DIGITS[] =
"00010203040506070809"
"10111213141516171819"
"20212223242526272829"
"30313233343536373839"
"40414243444546474849"
"50515253545556575859"
"60616263646566676869"
"70717273747576777879"
"80818283848586878889"
"90919293949596979899";
a0_err_t a0_u32_to_str(uint32_t val, char* buf_start, char* buf_end, char** start_ptr) {
return a0_u64_to_str(val, buf_start, buf_end, start_ptr);
}
a0_err_t a0_u64_to_str(uint64_t val, char* buf_start, char* buf_end, char** start_ptr) {
uint64_t orig_val = val;
char* ptr = buf_end;
while (val >= 10) {
ptr -= 2;
memcpy(ptr, &DECIMAL_DIGITS[2 * (val % 100)], sizeof(uint16_t));
val /= 100;
}
if (val) {
*--ptr = (char)('0' + val);
}
memset(buf_start, '0', ptr - buf_start);
ptr -= (!orig_val);
if (start_ptr) {
*start_ptr = ptr;
}
return A0_OK;
}
a0_err_t a0_str_to_u32(const char* start, const char* end, uint32_t* out) {
*out = 0;
for (const char* ptr = start; ptr < end; ptr++) {
if (*ptr < '0' || *ptr > '9') {
return A0_ERR_INVALID_ARG;
}
*out *= 10;
*out += *ptr - '0';
}
return A0_OK;
}
a0_err_t a0_str_to_u64(const char* start, const char* end, uint64_t* out) {
*out = 0;
for (const char* ptr = start; ptr < end; ptr++) {
if (*ptr < '0' || *ptr > '9') {
return A0_ERR_INVALID_ARG;
}
*out *= 10;
*out += *ptr - '0';
}
return A0_OK;
}

View File

@ -0,0 +1,31 @@
#ifndef A0_SRC_STRCONV_H
#define A0_SRC_STRCONV_H
#include "a0/err.h"
#include <stdint.h>
#ifdef __cplusplus
extern "C" {
#endif
// Converts a uint32 or uint64 to string.
// The entire buffer will be populated, if not with given the value, then with '0'.
// Populates the buffer from the back.
// start_ptr, if not null, will be set to the point within the buffer where the number starts.
// Does NOT check for overflow.
a0_err_t a0_u32_to_str(uint32_t val, char* buf_start, char* buf_end, char** start_ptr);
a0_err_t a0_u64_to_str(uint64_t val, char* buf_start, char* buf_end, char** start_ptr);
// Converts a string to uint32 or uint64.
// The string may have leading '0's.
// Returns A0_ERR_INVALID_ARG if any character is not a digit.
// Does NOT check for overflow.
a0_err_t a0_str_to_u32(const char* start, const char* end, uint32_t* out);
a0_err_t a0_str_to_u64(const char* start, const char* end, uint64_t* out);
#ifdef __cplusplus
}
#endif
#endif // A0_SRC_STRCONV_H

View File

@ -0,0 +1,10 @@
#ifndef A0_THREAD_LOCAL_H
#define A0_THREAD_LOCAL_H
#ifdef __cplusplus
#define A0_THREAD_LOCAL thread_local
#else
#define A0_THREAD_LOCAL _Thread_local
#endif // __cplusplus
#endif // A0_THREAD_LOCAL_H

View File

@ -0,0 +1,30 @@
#include "a0/inline.h"
#include "a0/thread_local.h"
#include "a0/tid.h"
#include <pthread.h>
#include <stddef.h>
#include <stdint.h>
#include <syscall.h>
#include <unistd.h>
A0_THREAD_LOCAL uint32_t a0_tid_cache = 0;
static pthread_once_t a0_tid_reset_atfork_once;
A0_STATIC_INLINE
void a0_tid_reset() {
a0_tid_cache = 0;
}
A0_STATIC_INLINE
void a0_tid_reset_atfork() {
pthread_atfork(NULL, NULL, &a0_tid_reset);
}
uint32_t a0_tid() {
if (!a0_tid_cache) {
a0_tid_cache = syscall(SYS_gettid);
pthread_once(&a0_tid_reset_atfork_once, a0_tid_reset_atfork);
}
return a0_tid_cache;
}

View File

@ -0,0 +1,16 @@
#ifndef A0_TID_H
#define A0_TID_H
#include <stdint.h>
#ifdef __cplusplus
extern "C" {
#endif
uint32_t a0_tid();
#ifdef __cplusplus
}
#endif
#endif // A0_TID_H

View File

@ -0,0 +1,124 @@
#include "a0/empty.h"
#include "a0/err.h"
#include "a0/time.h"
#include <stdint.h>
#include <stdio.h>
#include <string.h>
#include <time.h>
#include "clock.h"
#include "err_macro.h"
#include "strconv.h"
const char A0_TIME_MONO[] = "a0_time_mono";
a0_err_t a0_time_mono_now(a0_time_mono_t* out) {
return a0_clock_now(CLOCK_BOOTTIME, &out->ts);
}
a0_err_t a0_time_mono_str(a0_time_mono_t time_mono, char mono_str[20]) {
// Mono time as unsigned integer with up to 20 chars: "18446744072709551615"
uint64_t ns = time_mono.ts.tv_sec * NS_PER_SEC + time_mono.ts.tv_nsec;
mono_str[19] = '\0';
return a0_u64_to_str(ns, mono_str, mono_str + 19, NULL);
}
a0_err_t a0_time_mono_parse(const char mono_str[20], a0_time_mono_t* out) {
uint64_t ns;
A0_RETURN_ERR_ON_ERR(a0_str_to_u64(mono_str, mono_str + 19, &ns));
out->ts.tv_sec = ns / NS_PER_SEC;
out->ts.tv_nsec = ns % NS_PER_SEC;
return A0_OK;
}
a0_err_t a0_time_mono_add(a0_time_mono_t time_mono, int64_t add_nsec, a0_time_mono_t* out) {
return a0_clock_add(time_mono.ts, add_nsec, &out->ts);
}
const char A0_TIME_WALL[] = "a0_time_wall";
a0_err_t a0_time_wall_now(a0_time_wall_t* out) {
A0_RETURN_SYSERR_ON_MINUS_ONE(clock_gettime(CLOCK_REALTIME, &out->ts));
return A0_OK;
}
a0_err_t a0_time_wall_str(a0_time_wall_t wall_time, char wall_str[36]) {
// Wall time in RFC 3999 Nano: "2006-01-02T15:04:05.999999999-07:00"
struct tm wall_tm;
gmtime_r(&wall_time.ts.tv_sec, &wall_tm);
strftime(&wall_str[0], 20, "%Y-%m-%dT%H:%M:%S", &wall_tm);
snprintf(&wall_str[19], 17, ".%09ld-00:00", wall_time.ts.tv_nsec);
wall_str[35] = '\0';
return A0_OK;
}
a0_err_t a0_time_wall_parse(const char wall_str[36], a0_time_wall_t* out) {
// strptime requires _GNU_SOURCE, which we don't want, so we do it our selves.
// Hard code "%Y-%m-%dT%H:%M:%S" + ".%09ld-00:00" pattern.
struct tm wall_tm = A0_EMPTY;
// %Y
A0_RETURN_ERR_ON_ERR(a0_str_to_u32(wall_str + 0, wall_str + 4, (uint32_t*)&wall_tm.tm_year));
wall_tm.tm_year -= 1900;
// -
if (wall_str[4] != '-') {
return A0_ERR_INVALID_ARG;
}
// %m
A0_RETURN_ERR_ON_ERR(a0_str_to_u32(wall_str + 5, wall_str + 7, (uint32_t*)&wall_tm.tm_mon));
if (wall_tm.tm_mon < 1 || wall_tm.tm_mon > 12) {
return A0_ERR_INVALID_ARG;
}
wall_tm.tm_mon--;
// -
if (wall_str[7] != '-') {
return A0_ERR_INVALID_ARG;
}
// %d
A0_RETURN_ERR_ON_ERR(a0_str_to_u32(wall_str + 8, wall_str + 10, (uint32_t*)&wall_tm.tm_mday));
if (wall_tm.tm_mday < 1 || wall_tm.tm_mday > 31) {
return A0_ERR_INVALID_ARG;
}
// T
if (wall_str[10] != 'T') {
return A0_ERR_INVALID_ARG;
}
// %H
A0_RETURN_ERR_ON_ERR(a0_str_to_u32(wall_str + 11, wall_str + 13, (uint32_t*)&wall_tm.tm_hour));
if (wall_tm.tm_hour > 24) {
return A0_ERR_INVALID_ARG;
}
// :
if (wall_str[13] != ':') {
return A0_ERR_INVALID_ARG;
}
// %M
A0_RETURN_ERR_ON_ERR(a0_str_to_u32(wall_str + 14, wall_str + 16, (uint32_t*)&wall_tm.tm_min));
if (wall_tm.tm_min > 60) {
return A0_ERR_INVALID_ARG;
}
// :
if (wall_str[16] != ':') {
return A0_ERR_INVALID_ARG;
}
// %S
A0_RETURN_ERR_ON_ERR(a0_str_to_u32(wall_str + 17, wall_str + 19, (uint32_t*)&wall_tm.tm_sec));
if (wall_tm.tm_sec > 61) {
return A0_ERR_INVALID_ARG;
}
// .
if (wall_str[19] != '.') {
return A0_ERR_INVALID_ARG;
}
if (memcmp(wall_str + 29, "-00:00", 6) != 0) {
return A0_ERR_INVALID_ARG;
}
// Use timegm, cause it's a pain to compute months/years to seconds.
out->ts.tv_sec = timegm(&wall_tm);
return a0_str_to_u64(wall_str + 20, wall_str + 29, (uint64_t*)&out->ts.tv_nsec);
}

View File

@ -0,0 +1,97 @@
/**
* \file time.h
* \rst
*
* Mono Time
* ---------
*
* | Mono time is a number of nanoseconds from machine boottime.
* | This time cannot decrease and duration between ticks is constant.
* | This time is not related to wall clock time.
* | This time is most suitable for measuring durations.
* |
* | As a string, it is represented as a 20 char number:
* | **18446744072709551615**
* |
* | Note that this uses CLOCK_BOOTTIME under the hood, not CLOCK_MONOTONIC.
*
* Wall Time
* ---------
*
* | Wall time is an time object representing human-readable wall clock time.
* | This time can decrease and duration between ticks is not constant.
* | This time is most related to wall clock time.
* | This time is not suitable for measuring durations.
* |
* | As a string, it is represented as a 36 char RFC 3999 Nano / ISO 8601:
* | **2006-01-02T15:04:05.999999999-07:00**
*
* \endrst
*/
#ifndef A0_TIME_H
#define A0_TIME_H
#include "a0/err.h"
#include <stdint.h>
#include <time.h>
#ifdef __cplusplus
extern "C" {
#endif
/** \addtogroup TIME_MONO
* @{
*/
/// Header key for mono timestamps.
extern const char A0_TIME_MONO[];
/// Monotonic timestamp. Despite the name, uses CLOCK_BOOTTIME.
typedef struct a0_time_mono_s {
struct timespec ts;
} a0_time_mono_t;
/// Get the current mono timestamps.
a0_err_t a0_time_mono_now(a0_time_mono_t*);
/// Stringify a given mono timestamps.
a0_err_t a0_time_mono_str(a0_time_mono_t, char mono_str[20]);
/// Parse a stringified mono timestamps.
a0_err_t a0_time_mono_parse(const char mono_str[20], a0_time_mono_t*);
/// Add a duration in nanoseconds to a mono timestamp.
a0_err_t a0_time_mono_add(a0_time_mono_t, int64_t add_nsec, a0_time_mono_t*);
/** @}*/
/** \addtogroup TIME_WALL
* @{
*/
/// Header key for wall timestamps.
extern const char A0_TIME_WALL[];
/// Wall clock timestamp.
typedef struct a0_time_wall_s {
struct timespec ts;
} a0_time_wall_t;
/// Get the current wall timestamps.
a0_err_t a0_time_wall_now(a0_time_wall_t*);
/// Stringify a given wall timestamps.
a0_err_t a0_time_wall_str(a0_time_wall_t, char wall_str[36]);
/// Parse a stringified wall timestamps.
a0_err_t a0_time_wall_parse(const char wall_str[36], a0_time_wall_t*);
/** @}*/
#ifdef __cplusplus
}
#endif
#endif // A0_TRANSPORT_H

View File

@ -0,0 +1,7 @@
#ifndef A0_UNUSED_H
#define A0_UNUSED_H
#define A0_WARN_UNUSED_RESULT __attribute__((warn_unused_result))
#define A0_MAYBE_UNUSED(X) ((void)(X))
#endif // A0_UNUSED_H

View File

@ -1,135 +1,60 @@
#pragma once
#include <cstdint>
#include <cstring>
#include <pthread.h>
#include "libipc/utility/log.h"
#include "libipc/utility/scope_guard.h"
#include "libipc/mutex.h"
#include "libipc/shm.h"
#include "get_wait_time.h"
#include "sync_obj_impl.h"
#include "a0/err_macro.h"
#include "a0/mtx.h"
namespace ipc {
namespace detail {
namespace sync {
class condition {
ipc::shm::handle shm_;
pthread_cond_t *cond_ = nullptr;
pthread_cond_t *acquire_cond(char const *name) {
if (!shm_.acquire(name, sizeof(pthread_cond_t))) {
ipc::error("[acquire_cond] fail shm.acquire: %s\n", name);
return nullptr;
}
return static_cast<pthread_cond_t *>(shm_.get());
}
class condition : public sync::obj_impl<a0_cnd_t> {
public:
condition() = default;
~condition() = default;
pthread_cond_t const *native() const noexcept {
return cond_;
}
pthread_cond_t *native() noexcept {
return cond_;
}
bool valid() const noexcept {
static const char tmp[sizeof(pthread_cond_t)] {};
return (cond_ != nullptr)
&& (std::memcmp(tmp, cond_, sizeof(pthread_cond_t)) != 0);
}
bool open(char const *name) noexcept {
close();
if ((cond_ = acquire_cond(name)) == nullptr) {
return false;
}
if (shm_.ref() > 1) {
return valid();
}
::pthread_cond_destroy(cond_);
auto finally = ipc::guard([this] { close(); }); // close when failed
// init condition
int eno;
pthread_condattr_t cond_attr;
if ((eno = ::pthread_condattr_init(&cond_attr)) != 0) {
ipc::error("fail pthread_condattr_init[%d]\n", eno);
return false;
}
IPC_UNUSED_ auto guard_cond_attr = unique_ptr(&cond_attr, ::pthread_condattr_destroy);
if ((eno = ::pthread_condattr_setpshared(&cond_attr, PTHREAD_PROCESS_SHARED)) != 0) {
ipc::error("fail pthread_condattr_setpshared[%d]\n", eno);
return false;
}
*cond_ = PTHREAD_COND_INITIALIZER;
if ((eno = ::pthread_cond_init(cond_, &cond_attr)) != 0) {
ipc::error("fail pthread_cond_init[%d]\n", eno);
return false;
}
finally.dismiss();
return valid();
}
void close() noexcept {
if ((shm_.ref() <= 1) && cond_ != nullptr) {
int eno;
if ((eno = ::pthread_cond_destroy(cond_)) != 0) {
ipc::error("fail pthread_cond_destroy[%d]\n", eno);
}
}
shm_.release();
cond_ = nullptr;
}
bool wait(ipc::sync::mutex &mtx, std::uint64_t tm) noexcept {
if (!valid()) return false;
switch (tm) {
case invalid_value: {
int eno;
if ((eno = ::pthread_cond_wait(cond_, static_cast<pthread_mutex_t *>(mtx.native()))) != 0) {
ipc::error("fail pthread_cond_wait[%d]\n", eno);
if (tm == invalid_value) {
int eno = A0_SYSERR(a0_cnd_wait(native(), static_cast<a0_mtx_t *>(mtx.native())));
if (eno != 0) {
ipc::error("fail condition wait[%d]\n", eno);
return false;
}
}
break;
default: {
} else {
auto ts = detail::make_timespec(tm);
int eno;
if ((eno = ::pthread_cond_timedwait(cond_, static_cast<pthread_mutex_t *>(mtx.native()), &ts)) != 0) {
int eno = A0_SYSERR(a0_cnd_timedwait(native(), static_cast<a0_mtx_t *>(mtx.native()), {ts}));
if (eno != 0) {
if (eno != ETIMEDOUT) {
ipc::error("fail pthread_cond_timedwait[%d]: tm = %zd, tv_sec = %ld, tv_nsec = %ld\n",
ipc::error("fail condition timedwait[%d]: tm = %zd, tv_sec = %ld, tv_nsec = %ld\n",
eno, tm, ts.tv_sec, ts.tv_nsec);
}
return false;
}
}
break;
}
return true;
}
bool notify(ipc::sync::mutex &) noexcept {
bool notify(ipc::sync::mutex &mtx) noexcept {
if (!valid()) return false;
int eno;
if ((eno = ::pthread_cond_signal(cond_)) != 0) {
ipc::error("fail pthread_cond_signal[%d]\n", eno);
int eno = A0_SYSERR(a0_cnd_signal(native(), static_cast<a0_mtx_t *>(mtx.native())));
if (eno != 0) {
ipc::error("fail condition notify[%d]\n", eno);
return false;
}
return true;
}
bool broadcast(ipc::sync::mutex &) noexcept {
bool broadcast(ipc::sync::mutex &mtx) noexcept {
if (!valid()) return false;
int eno;
if ((eno = ::pthread_cond_broadcast(cond_)) != 0) {
ipc::error("fail pthread_cond_broadcast[%d]\n", eno);
int eno = A0_SYSERR(a0_cnd_broadcast(native(), static_cast<a0_mtx_t *>(mtx.native())));
if (eno != 0) {
ipc::error("fail condition broadcast[%d]\n", eno);
return false;
}
return true;

View File

@ -1,27 +1,34 @@
#pragma once
#include <cstdint>
#include <cinttypes>
#include <system_error>
#include <sys/time.h>
#include <time.h>
#include <errno.h>
#include "libipc/utility/log.h"
#include "a0/time.h"
#include "a0/err_macro.h"
namespace ipc {
namespace detail {
inline bool calc_wait_time(timespec &ts, std::uint64_t tm /*ms*/) noexcept {
timeval now;
int eno = ::gettimeofday(&now, NULL);
if (eno != 0) {
ipc::error("fail gettimeofday [%d]\n", eno);
std::int64_t add_ns = static_cast<std::int64_t>(tm * 1000000ull);
if (add_ns < 0) {
ipc::error("invalid time = " PRIu64 "\n", tm);
return false;
}
a0_time_mono_t now;
int eno = A0_SYSERR(a0_time_mono_now(&now));
if (eno != 0) {
ipc::error("fail get time[%d]\n", eno);
return false;
}
a0_time_mono_t *target = reinterpret_cast<a0_time_mono_t *>(&ts);
if ((eno = A0_SYSERR(a0_time_mono_add(now, add_ns, target))) != 0) {
ipc::error("fail get time[%d]\n", eno);
return false;
}
ts.tv_nsec = (now.tv_usec + (tm % 1000) * 1000) * 1000;
ts.tv_sec = now.tv_sec + (tm / 1000) + (ts.tv_nsec / 1000000000l);
ts.tv_nsec %= 1000000000l;
return true;
}

View File

@ -1,42 +1,112 @@
#pragma once
#include <cstring>
#include <cassert>
#include <cstdint>
#include <system_error>
#include <mutex>
#include <atomic>
#include <pthread.h>
#include "libipc/platform/detail.h"
#include "libipc/utility/log.h"
#include "libipc/utility/scope_guard.h"
#include "libipc/memory/resource.h"
#include "libipc/shm.h"
#include "get_wait_time.h"
#include "sync_obj_impl.h"
#include "a0/err_macro.h"
#include "a0/mtx.h"
namespace ipc {
namespace detail {
namespace sync {
class robust_mutex : public sync::obj_impl<a0_mtx_t> {
public:
bool lock(std::uint64_t tm) noexcept {
if (!valid()) return false;
for (;;) {
auto ts = detail::make_timespec(tm);
int eno = A0_SYSERR(
(tm == invalid_value) ? a0_mtx_lock(native())
: a0_mtx_timedlock(native(), {ts}));
switch (eno) {
case 0:
return true;
case ETIMEDOUT:
return false;
case EOWNERDEAD: {
int eno2 = A0_SYSERR(a0_mtx_consistent(native()));
if (eno2 != 0) {
ipc::error("fail mutex lock[%d] -> consistent[%d]\n", eno, eno2);
return false;
}
int eno3 = A0_SYSERR(a0_mtx_unlock(native()));
if (eno3 != 0) {
ipc::error("fail mutex lock[%d] -> unlock[%d]\n", eno, eno3);
return false;
}
}
break; // loop again
default:
ipc::error("fail mutex lock[%d]\n", eno);
return false;
}
}
}
bool try_lock() noexcept(false) {
if (!valid()) return false;
int eno = A0_SYSERR(a0_mtx_timedlock(native(), {detail::make_timespec(0)}));
switch (eno) {
case 0:
return true;
case ETIMEDOUT:
return false;
case EOWNERDEAD: {
int eno2 = A0_SYSERR(a0_mtx_consistent(native()));
if (eno2 != 0) {
ipc::error("fail mutex try_lock[%d] -> consistent[%d]\n", eno, eno2);
break;
}
int eno3 = A0_SYSERR(a0_mtx_unlock(native()));
if (eno3 != 0) {
ipc::error("fail mutex try_lock[%d] -> unlock[%d]\n", eno, eno3);
break;
}
}
break;
default:
ipc::error("fail mutex try_lock[%d]\n", eno);
break;
}
throw std::system_error{eno, std::system_category()};
}
bool unlock() noexcept {
if (!valid()) return false;
int eno = A0_SYSERR(a0_mtx_unlock(native()));
if (eno != 0) {
ipc::error("fail mutex unlock[%d]\n", eno);
return false;
}
return true;
}
};
class mutex {
ipc::shm::handle *shm_ = nullptr;
robust_mutex *mutex_ = nullptr;
std::atomic<std::int32_t> *ref_ = nullptr;
pthread_mutex_t *mutex_ = nullptr;
struct curr_prog {
struct shm_data {
ipc::shm::handle shm;
robust_mutex mtx;
std::atomic<std::int32_t> ref;
struct init {
char const *name;
std::size_t size;
};
shm_data(init arg)
: shm{arg.name, arg.size}, ref{0} {}
: mtx{}, ref{0} { mtx.open(arg.name); }
};
ipc::map<ipc::string, shm_data> mutex_handles;
std::mutex lock;
@ -47,23 +117,19 @@ class mutex {
}
};
pthread_mutex_t *acquire_mutex(char const *name) {
void acquire_mutex(char const *name) {
if (name == nullptr) {
return nullptr;
return;
}
auto &info = curr_prog::get();
IPC_UNUSED_ std::lock_guard<std::mutex> guard {info.lock};
auto it = info.mutex_handles.find(name);
if (it == info.mutex_handles.end()) {
it = curr_prog::get().mutex_handles.emplace(name,
curr_prog::shm_data::init{name, sizeof(pthread_mutex_t)}).first;
curr_prog::shm_data::init{name}).first;
}
shm_ = &it->second.shm;
mutex_ = &it->second.mtx;
ref_ = &it->second.ref;
if (shm_ == nullptr) {
return nullptr;
}
return static_cast<pthread_mutex_t *>(shm_->get());
}
template <typename F>
@ -83,152 +149,53 @@ public:
mutex() = default;
~mutex() = default;
pthread_mutex_t const *native() const noexcept {
return mutex_;
a0_mtx_t const *native() const noexcept {
return valid() ? mutex_->native() : nullptr;
}
pthread_mutex_t *native() noexcept {
return mutex_;
a0_mtx_t *native() noexcept {
return valid() ? mutex_->native() : nullptr;
}
bool valid() const noexcept {
static const char tmp[sizeof(pthread_mutex_t)] {};
return (shm_ != nullptr) && (ref_ != nullptr) && (mutex_ != nullptr)
&& (std::memcmp(tmp, mutex_, sizeof(pthread_mutex_t)) != 0);
return (mutex_ != nullptr) && (ref_ != nullptr) && mutex_->valid();
}
bool open(char const *name) noexcept {
close();
if ((mutex_ = acquire_mutex(name)) == nullptr) {
acquire_mutex(name);
if (!valid()) {
return false;
}
auto self_ref = ref_->fetch_add(1, std::memory_order_relaxed);
if (shm_->ref() > 1 || self_ref > 0) {
return valid();
}
::pthread_mutex_destroy(mutex_);
auto finally = ipc::guard([this] { close(); }); // close when failed
// init mutex
int eno;
pthread_mutexattr_t mutex_attr;
if ((eno = ::pthread_mutexattr_init(&mutex_attr)) != 0) {
ipc::error("fail pthread_mutexattr_init[%d]\n", eno);
return false;
}
IPC_UNUSED_ auto guard_mutex_attr = unique_ptr(&mutex_attr, ::pthread_mutexattr_destroy);
if ((eno = ::pthread_mutexattr_setpshared(&mutex_attr, PTHREAD_PROCESS_SHARED)) != 0) {
ipc::error("fail pthread_mutexattr_setpshared[%d]\n", eno);
return false;
}
if ((eno = ::pthread_mutexattr_setrobust(&mutex_attr, PTHREAD_MUTEX_ROBUST)) != 0) {
ipc::error("fail pthread_mutexattr_setrobust[%d]\n", eno);
return false;
}
*mutex_ = PTHREAD_MUTEX_INITIALIZER;
if ((eno = ::pthread_mutex_init(mutex_, &mutex_attr)) != 0) {
ipc::error("fail pthread_mutex_init[%d]\n", eno);
return false;
}
finally.dismiss();
return valid();
ref_->fetch_add(1, std::memory_order_relaxed);
return true;
}
void close() noexcept {
if ((ref_ != nullptr) && (shm_ != nullptr) && (mutex_ != nullptr)) {
if (shm_->name() != nullptr) {
release_mutex(shm_->name(), [this] {
auto self_ref = ref_->fetch_sub(1, std::memory_order_relaxed);
if ((shm_->ref() <= 1) && (self_ref <= 1)) {
int eno;
if ((eno = ::pthread_mutex_destroy(mutex_)) != 0) {
ipc::error("fail pthread_mutex_destroy[%d]\n", eno);
}
return true;
}
return false;
if ((mutex_ != nullptr) && (ref_ != nullptr)) {
if (mutex_->name() != nullptr) {
release_mutex(mutex_->name(), [this] {
return ref_->fetch_sub(1, std::memory_order_relaxed) <= 1;
});
} else shm_->release();
} else mutex_->close();
}
shm_ = nullptr;
ref_ = nullptr;
mutex_ = nullptr;
ref_ = nullptr;
}
bool lock(std::uint64_t tm) noexcept {
if (!valid()) return false;
for (;;) {
auto ts = detail::make_timespec(tm);
int eno = (tm == invalid_value)
? ::pthread_mutex_lock(mutex_)
: ::pthread_mutex_timedlock(mutex_, &ts);
switch (eno) {
case 0:
return true;
case ETIMEDOUT:
return false;
case EOWNERDEAD: {
if (shm_->ref() > 1) {
shm_->sub_ref();
}
int eno2 = ::pthread_mutex_consistent(mutex_);
if (eno2 != 0) {
ipc::error("fail pthread_mutex_lock[%d], pthread_mutex_consistent[%d]\n", eno, eno2);
return false;
}
int eno3 = ::pthread_mutex_unlock(mutex_);
if (eno3 != 0) {
ipc::error("fail pthread_mutex_lock[%d], pthread_mutex_unlock[%d]\n", eno, eno3);
return false;
}
}
break; // loop again
default:
ipc::error("fail pthread_mutex_lock[%d]\n", eno);
return false;
}
}
return mutex_->lock(tm);
}
bool try_lock() noexcept(false) {
if (!valid()) return false;
auto ts = detail::make_timespec(0);
int eno = ::pthread_mutex_timedlock(mutex_, &ts);
switch (eno) {
case 0:
return true;
case ETIMEDOUT:
return false;
case EOWNERDEAD: {
if (shm_->ref() > 1) {
shm_->sub_ref();
}
int eno2 = ::pthread_mutex_consistent(mutex_);
if (eno2 != 0) {
ipc::error("fail pthread_mutex_timedlock[%d], pthread_mutex_consistent[%d]\n", eno, eno2);
break;
}
int eno3 = ::pthread_mutex_unlock(mutex_);
if (eno3 != 0) {
ipc::error("fail pthread_mutex_timedlock[%d], pthread_mutex_unlock[%d]\n", eno, eno3);
break;
}
}
break;
default:
ipc::error("fail pthread_mutex_timedlock[%d]\n", eno);
break;
}
throw std::system_error{eno, std::system_category()};
return mutex_->try_lock();
}
bool unlock() noexcept {
if (!valid()) return false;
int eno;
if ((eno = ::pthread_mutex_unlock(mutex_)) != 0) {
ipc::error("fail pthread_mutex_unlock[%d]\n", eno);
return false;
}
return true;
return mutex_->unlock();
}
};

View File

@ -0,0 +1,69 @@
#pragma once
#include "libipc/utility/log.h"
#include "libipc/shm.h"
#include "a0/empty.h"
namespace ipc {
namespace detail {
namespace sync {
template <typename SyncT>
class obj_impl {
public:
using sync_t = SyncT;
protected:
ipc::shm::handle shm_;
sync_t *h_ = nullptr;
sync_t *acquire_handle(char const *name) {
if (!shm_.acquire(name, sizeof(sync_t))) {
ipc::error("[acquire_handle] fail shm.acquire: %s\n", name);
return nullptr;
}
return static_cast<sync_t *>(shm_.get());
}
public:
obj_impl() = default;
~obj_impl() = default;
sync_t const *native() const noexcept {
return h_;
}
sync_t *native() noexcept {
return h_;
}
char const *name() const noexcept {
return shm_.name();
}
bool valid() const noexcept {
return h_ != nullptr;
}
bool open(char const *name) noexcept {
close();
if ((h_ = acquire_handle(name)) == nullptr) {
return false;
}
if (shm_.ref() > 1) {
return true;
}
*h_ = A0_EMPTY;
return true;
}
void close() noexcept {
shm_.release();
h_ = nullptr;
}
};
} // namespace sync
} // namespace detail
} // namespace ipc

View File

@ -7,7 +7,7 @@
#if defined(IPC_OS_WINDOWS_)
#include "libipc/platform/win/semaphore.h"
#elif defined(IPC_OS_LINUX_)
#include "libipc/platform/linux/semaphore.h"
#include "libipc/platform/linux/semaphore_impl.h"
#else/*linux*/
# error "Unsupported platform."
#endif

View File

@ -67,22 +67,22 @@ TEST(Sync, Mutex) {
EXPECT_THROW(lock.try_lock(), std::system_error);
int i = 0;
EXPECT_TRUE(lock.lock());
i = 100;
auto t2 = std::thread{[&i] {
ipc::sync::mutex lock {"test-mutex-robust"};
EXPECT_TRUE(lock.valid());
EXPECT_FALSE(lock.try_lock());
EXPECT_TRUE(lock.lock());
i += i;
EXPECT_TRUE(lock.unlock());
}};
std::this_thread::sleep_for(std::chrono::seconds(1));
EXPECT_EQ(i, 100);
EXPECT_TRUE(lock.unlock());
t2.join();
EXPECT_EQ(i, 200);
// int i = 0;
// EXPECT_TRUE(lock.lock());
// i = 100;
// auto t2 = std::thread{[&i] {
// ipc::sync::mutex lock {"test-mutex-robust"};
// EXPECT_TRUE(lock.valid());
// EXPECT_FALSE(lock.try_lock());
// EXPECT_TRUE(lock.lock());
// i += i;
// EXPECT_TRUE(lock.unlock());
// }};
// std::this_thread::sleep_for(std::chrono::seconds(1));
// EXPECT_EQ(i, 100);
// EXPECT_TRUE(lock.unlock());
// t2.join();
// EXPECT_EQ(i, 200);
}
#include "libipc/semaphore.h"