add ipc.cpp for implementation of ipc interfaces (TBD); restructure code

This commit is contained in:
mutouyun 2018-11-29 23:05:53 +08:00
parent 518fc000e7
commit 9540842ba7
9 changed files with 189 additions and 27 deletions

View File

@ -17,10 +17,12 @@ HEADERS += \
../include/shm.h \ ../include/shm.h \
../include/circ_elem_array.h \ ../include/circ_elem_array.h \
../include/circ_queue.h \ ../include/circ_queue.h \
../include/ipc.h ../include/ipc.h \
../include/def.h
SOURCES += \ SOURCES += \
../src/shm.cpp ../src/shm.cpp \
../src/ipc.cpp
unix { unix {

View File

@ -1,18 +1,14 @@
#pragma once #pragma once
#include <cstddef>
#include <cstdint>
#include <cstring> #include <cstring>
#include <atomic> #include <atomic>
#include <utility> #include <utility>
#include <limits>
#include <algorithm> #include <algorithm>
#include <thread> #include <thread>
#include "def.h"
namespace ipc { namespace ipc {
using byte_t = std::uint8_t;
namespace circ { namespace circ {
struct alignas(std::max_align_t) elem_array_head { struct alignas(std::max_align_t) elem_array_head {

View File

@ -6,14 +6,10 @@
#include <utility> #include <utility>
#include <algorithm> #include <algorithm>
#include "def.h"
#include "circ_elem_array.h" #include "circ_elem_array.h"
namespace ipc { namespace ipc {
enum : std::size_t {
error_count = std::numeric_limits<std::size_t>::max()
};
namespace circ { namespace circ {
template <typename T> template <typename T>
@ -24,6 +20,7 @@ public:
private: private:
array_t* elems_ = nullptr; array_t* elems_ = nullptr;
typename std::result_of<decltype(&array_t::cursor)(array_t)>::type cursor_ = 0; typename std::result_of<decltype(&array_t::cursor)(array_t)>::type cursor_ = 0;
bool connected_ = false;
public: public:
queue(void) = default; queue(void) = default;
@ -52,19 +49,31 @@ public:
std::size_t connect(void) { std::size_t connect(void) {
if (elems_ == nullptr) return error_count; if (elems_ == nullptr) return error_count;
cursor_ = elems_->cursor(); if (connected_) return error_count;
connected_ = true;
return elems_->connect(); return elems_->connect();
} }
std::size_t disconnect(void) { std::size_t disconnect(void) {
if (elems_ == nullptr) return error_count; if (elems_ == nullptr) return error_count;
if (!connected_) return error_count;
connected_ = false;
return elems_->disconnect(); return elems_->disconnect();
} }
std::size_t conn_count(void) const {
return (elems_ == nullptr) ? error_count : elems_->conn_count();
}
bool connected(void) const {
return connected_;
}
array_t* attach(array_t* arr) { array_t* attach(array_t* arr) {
if (arr == nullptr) return nullptr; if (arr == nullptr) return nullptr;
auto old = elems_; auto old = elems_;
elems_ = arr; elems_ = arr;
cursor_ = elems_->cursor();
return old; return old;
} }
@ -75,16 +84,13 @@ public:
return old; return old;
} }
std::size_t conn_count(void) const {
return (elems_ == nullptr) ? error_count : elems_->conn_count();
}
template <typename... P> template <typename... P>
void push(P&&... params) { bool push(P&&... params) {
if (elems_ == nullptr) return; if (elems_ == nullptr) return false;
auto ptr = elems_->acquire(); auto ptr = elems_->acquire();
::new (ptr) T { std::forward<P>(params)... }; ::new (ptr) T { std::forward<P>(params)... };
elems_->commit(ptr); elems_->commit(ptr);
return true;
} }
T pop(void) { T pop(void) {

20
include/def.h Normal file
View File

@ -0,0 +1,20 @@
#pragma once
#include <cstddef>
#include <cstdint>
#include <limits>
namespace ipc {
// types
using byte_t = std::uint8_t;
// constants
enum : std::size_t {
error_count = std::numeric_limits<std::size_t>::max(),
data_length = 16
};
} // namespace ipc

View File

@ -1,9 +1,19 @@
#pragma once #pragma once
#include <vector>
#include "export.h" #include "export.h"
#include "def.h"
#include "shm.h"
namespace ipc { namespace ipc {
using shm::handle_t;
IPC_EXPORT handle_t connect (std::string const & name);
IPC_EXPORT void disconnect(handle_t h);
IPC_EXPORT bool send(handle_t h, byte_t* data, int size);
IPC_EXPORT std::vector<byte_t> recv(handle_t h);
} // namespace ipc } // namespace ipc

128
src/ipc.cpp Normal file
View File

@ -0,0 +1,128 @@
#include <unordered_map>
#include <memory>
#include <type_traits>
#include <cstring>
#include <algorithm>
#include "ipc.h"
#include "circ_queue.h"
namespace {
using namespace ipc;
using data_t = byte_t[data_length];
struct msg_t {
int remain_;
data_t data_;
};
using queue_t = circ::queue<msg_t>;
using guard_t = std::unique_ptr<std::remove_pointer_t<handle_t>, void(*)(handle_t)>;
std::unordered_map<handle_t, queue_t> h2q__;
queue_t* queue_of(handle_t h) {
if (h == nullptr) {
return nullptr;
}
auto it = h2q__.find(h);
if (it == h2q__.end()) {
return nullptr;
}
if (it->second.elems() == nullptr) {
return nullptr;
}
return &(it->second);
}
} // internal-linkage
namespace ipc {
handle_t connect(std::string const & name) {
auto h = shm::acquire(name, sizeof(queue_t));
if (h == nullptr) {
return nullptr;
}
guard_t h_guard {
h, [](handle_t h) { shm::release(h, sizeof(queue_t)); }
};
auto mem = shm::open(h);
if (mem == nullptr) {
return nullptr;
}
h2q__[h].attach(static_cast<queue_t::array_t*>(mem));
h_guard.release();
return h;
}
void disconnect(handle_t h) {
auto it = h2q__.find(h);
if (it == h2q__.end()) return;
it->second.disconnect();
shm::close(it->second.detach());
shm::release(h, sizeof(queue_t));
h2q__.erase(it);
}
bool send(handle_t h, byte_t* data, int size) {
if (data == nullptr) {
return false;
}
if (size <= 0) {
return false;
}
auto queue = queue_of(h);
if (queue == nullptr) {
return false;
}
queue_t drop_box { queue->elems() };
int offset = 0;
for (int i = 0; i < (size / static_cast<int>(data_length)); ++i, offset += data_length) {
msg_t msg {
size - offset - static_cast<int>(data_length),
{ 0 }
};
std::memcpy(msg.data_, data + offset, data_length);
drop_box.push(msg);
}
int remain = size - offset;
if (remain > 0) {
msg_t msg { remain - static_cast<int>(data_length), { 0 } };
std::memcpy(msg.data_, data + offset, static_cast<std::size_t>(remain));
drop_box.push(msg);
}
return true;
}
std::vector<byte_t> recv(handle_t h) {
std::vector<byte_t> all;
auto queue = queue_of(h);
if (queue == nullptr) {
return all;
}
if (!queue->connected()) {
queue->connect();
}
do {
auto msg = queue->pop();
auto last_size = all.size();
if (msg.remain_ > 0) {
all.resize(last_size + data_length);
std::memcpy(all.data() + last_size, msg.data_, data_length);
}
else {
// remain_ is minus & abs(remain_) < data_length
std::size_t remain = static_cast<std::size_t>(
static_cast<int>(data_length) + msg.remain_);
all.resize(last_size + remain);
std::memcpy(all.data() + last_size, msg.data_, remain);
break;
}
} while(1);
return all;
}
} // namespace ipc

View File

@ -1,11 +1,11 @@
#include "shm.h"
#include <sys/shm.h> #include <sys/shm.h>
#include <sys/stat.h> #include <sys/stat.h>
#include <sys/mman.h> #include <sys/mman.h>
#include <unistd.h> #include <unistd.h>
#include <fcntl.h> #include <fcntl.h>
#include "shm.h"
namespace ipc { namespace ipc {
namespace shm { namespace shm {

View File

@ -1,11 +1,11 @@
#include "shm.h"
#include <windows.h> #include <windows.h>
#include <type_traits> #include <type_traits>
#include <locale> #include <locale>
#include <codecvt> #include <codecvt>
#include "shm.h"
namespace { namespace {
template <typename T = TCHAR> template <typename T = TCHAR>

View File

@ -1,7 +1,7 @@
#include "shm.h"
#include <utility> #include <utility>
#include "shm.h"
namespace ipc { namespace ipc {
namespace shm { namespace shm {