Я пишу свою собственную библиотеку, то есть для асинхронного сетевого взаимодействия. а затем хочу написать свой собственный HTTP-прокси-сервер для балансировки нагрузки подключений к внутренним серверам с его помощью.
когда я использую docker работая в компании, я думал, что «если apache есть REST API для добавления / удаления прокси, его можно динамически изменять во время выполнения без перезагрузки / перезапуска демона. «(Это может быть немного глупая идея …)
Как бы то ни было, я задумал его создать, пришла более дурацкая идея.
«Если я могу обобщить, это обрабатывает некоторый поток данных, то есть поток».
Я искал разные вещи. И я обнаружил, что это называется ReactiveX, но есть часть, которая мне немного не понравилась. поэтому я решил написать все, что было похоже на это сам. По моему мнению, внутреннее представление данных как потока такое же, как и у RX. Но важные различия — это направление, в котором движется поток данных, и его процессор. И инкапсуляция потока обработки.
1. Обобщение Iterator и Iteratable.
Для единой обработки определенных данных требовалось source который предоставляет эти данные. и почти все может быть iterator. например socketс accept() функция. И самое главное — быть совместимым со стандартом C ++. Затем многие вещи могут быть интегрированы без моей реализации или встраивания в какую-либо структуру.
/**
* struct last_iterator_t.
* Represents the end of iteration.
* e.g. iterator == end_of_iterator
*/
struct last_iterator_t { };
constexpr last_iterator_t end_of_iterator = {};
namespace _ {
/**
* struct iteratable_t<_iteratable>
* Generalize an iteratable to one interface.
* All of `iteratable_t<type>` will copy source container.
* To prevent copy, use with `std::move` or wrap it as `std::shard_ptr`.
*/
template<typename _iteratable>
struct iteratable_t {
static_assert(
traits::is_iteratable<_iteratable>,
"iteratable_t<type> can work with only iteratable containers!"
);
using trait = traits::iterator_of<_iteratable>;
using state_t = iteratable_state_t<typename trait::value_t, trait>;
using iterator_t = state_iterator_t<typename trait::value_t, state_t>;
/* source container or source sequence something: */
instrusive<_iteratable> source;
/* constructor of this iteratable.*/
iteratable_t() { }
iteratable_t(_iteratable source) : source(std::move(source)) { }
iteratable_t(const iteratable_t& other) : source(other.source) { }
iteratable_t(iteratable_t&& other) : source(std::move(other.source)) { }
/* assignment operators. */
inline iteratable_t& operator =(const iteratable_t& other) { source = other.source; return *this; }
inline iteratable_t& operator =(iteratable_t&& other) { source = std::move(other); return *this; }
/* get begining of iteratable. */
inline iterator_t begin() const {
SEQ_INIT_ASSERT((bool) source, "tried to access uninitialized iteratable<type>!"); /* : non default initializable. */
return iterator_t(state_t(trait::begin_of(*source), trait::end_of(*source)));
}
/* get ending of iteratable. */
inline last_iterator_t end() const { return end_of_iterator; }
};
}
/**
* using iteratable_t<iteratable>.
* Facade declaration for using _::iteratable_t<iteratable> class.
*/
template<typename _iteratable>
using iteratable_t = typename traits::anti_recursive<_::iteratable_t<typename std::decay<_iteratable>::type>>::type;
Но, как известно, ничего нет Asynchronous в Iterator. Не подходит для асинхронного режима. Поэтому я постарался предоставить немного нестандартных методов. И благодаря статическому полиморфизму они объединяются в асинхронное состояние.
/**
* struct state_iterator_t<value_type, state_type>
* Adapts state-type to iterator.
*/
template<typename value_type, typename state_type>
struct state_iterator_t {
mutable instrusive<state_type> state;
static_assert(
traits::state_iterators::has_end_of<state_type>,
"state_type must implement `bool is_end_of()` method!"
);
static_assert(
traits::state_iterators::has_forward<state_type>,
"state_type must implement `bool forward()` method!"
);
static_assert(
traits::state_iterators::has_value<state_type>,
"state_type must implement `value_type value()` method!"
);
/**
* delegated functor for invoking eligible polymorphic methods.
* @note Polys marked with ^ must be implemented.
*
* 1. ensure: initiate if the state isn't initiated. -> calls `init` method once in entire iteration.
* 2. end_of ^: determines end of the state.
* 3. forward ^: shift the input to forward.
* 4. value ^: acquire the current value of input.
* 5. pending: test the input is available without blocking.
* 6. wait: wait if the input is unavailable yet.
*
* the life-cycle(flow) of state_type is:
*
* 1. instantiating prototype of state_type by iteratable.
* 2. then, integrated into manipulatables or operatables.
* 3. cloning state_type from prototype when `begin` of iteratable method called.
* 4. --- then, moved to manipulator, loop or worker (consumer) ---
* 5. the consumer checks end of iterator first and runs iterator.
*
* This flow is slightly different depending on the subject
* that operates the Iterator, but the big frame is as follows.
*
* --> state_type::init() once,
* while (it != seq::end_of_iterator) {
* --> state_type::is_end_of() <-- RETRY
* --> state_type::pending()
* => on true: returns immediately.
* --> state_type::wait(-1)
* => on true: returns immediately.
* --> goto RETRY.
*
* value_type value = *it;
* --> state_type::value().
*
* ... (external code) ...
*
* ++it;
* --> state_type::forward().
* }
*
* @note in debug mode binaries, the is_end_of method is called every
* time the iterator move forward to the next sequence or accessing current value,
* checking pending values, waiting incoming values.
*/
polys::state_iterator_poly_ensure<state_type> _ensure;
polys::state_iterator_poly_end_of<state_type> _end_of;
polys::state_iterator_poly_forward<state_type> _forward;
polys::state_iterator_poly_value<state_type> _value;
polys::state_iterator_poly_pending<state_type> _pending;
polys::state_iterator_poly_wait<state_type> _wait;
/* constructors */
state_iterator_t() { }
state_iterator_t(const state_type& o) : state(o) { }
state_iterator_t(state_type&& o) : state(std::move(o)) { }
/* cloning/shifting constructors */
state_iterator_t(const state_iterator_t& o)
: state(o.state), _ensure(o._ensure),
_end_of(o._end_of), _forward(o._forward), _value(o._value),
_pending(o._pending), _wait(o._wait) { }
state_iterator_t(state_iterator_t&& o)
: state(std::move(o.state)), _ensure(std::move(o._ensure)),
_end_of(std::move(o._end_of)), _forward(std::move(o._forward)),
_value(std::move(o._value)), _pending(std::move(o._pending)),
_wait(std::move(o._wait)) { }
/* assignment operators */
inline state_iterator_t& operator =(const state_iterator_t& o) {
state = o.state; _ensure = o._ensure;
_end_of = o._end_of; _forward = o._forward;
_value = o._value; _pending = o._pending; _wait = o._wait;
return *this;
}
inline state_iterator_t& operator =(state_iterator_t&& o) {
state = std::move(o.state); _ensure = std::move(o._ensure);
_end_of = std::move(o._end_of); _forward = std::move(o._forward);
_value = std::move(o._value); _pending = std::move(o._pending);
_wait = std::move(o._wait);
return *this;
}
/* called if iterated by standard-way. */
inline bool __end_of() const {
_ensure(state);
if (!_end_of(state)) {
if (_pending(state))
return false;
while (!_end_of(state)) {
if (_wait(state, -1))
return _end_of(state);
}
}
return true;
}
/* redirect methods to compile-time polymorphic methods */
inline void init() { return _ensure(state); }
inline bool end_of() const { return _end_of(state); }
inline value_type value() const { return _value(state); }
inline bool forward() { return _forward(state); }
inline bool pending() const { return _pending(state); }
inline bool wait(int timeout) const { return _wait(state, timeout); }
/* checking end of iteration or not. */
inline bool operator ==(const last_iterator_t&) const { return __end_of(); }
inline bool operator !=(const last_iterator_t&) const { return !__end_of(); }
/* iterate forward to. */
inline state_iterator_t& operator ++() { _ensure(state); _forward(state); return *this; }
inline state_iterator_t operator ++(int) { _ensure(state); state_iterator_t clone(*this); _forward(state); return clone; }
/* get value of current iteration. */
inline value_type operator *() const { _ensure(state); return _value(state); }
};
И это Iteratable всегда должно быть Replay включено. Таким образом, он внутренне обернут исходными данными в iterable_t<...> класс, реализует anti_recursive, и блокирует рекурсивную упаковку. ниже приведен пример, который генерирует числовую последовательность.
namespace iteratables {
/**
* struct numeric_range_t<number_type, step_type>.
* generates numeric sequence.
*/
template<typename number_type, typename step_type>
struct numeric_range_t {
static_assert(
std::is_integral<number_type>::value || std::is_floating_point<number_type>::value ||
std::is_pointer<number_type>::value || traits::can_offset_incremental<number_type>,
"numeric_range_t<num_type, ?> requires numeric offset type!"
);
static_assert(
std::is_integral<step_type>::value ||
(!std::is_pointer<number_type>::value && !traits::can_offset_incremental<number_type> &&
std::is_floating_point<step_type>::value) ,
"numeric_range_t<?, step_type> requires numeric stepping offset type!"
);
SEQ_DECLARE_ITERATABLE(state_t, _init);
struct state_t {
number_type next, last;
step_type step;
bool revert;
state_t(number_type&& next, number_type&& last, step_type&& step)
: next(std::move(next)), last(std::move(last)), step(std::move(step)), revert(step < 0)
{
SEQ_RANGE_ASSERT(step != 0, "step amount for numeric_range_t<type> never be zero!");
}
inline bool is_end_of() const {
return revert ? next <= last : next >= last;
}
inline bool forward() {
if (!is_end_of()) {
next += step;
return true;
}
return false;
}
inline number_type value() const {
return next;
}
} _init;
numeric_range_t(number_type&& next, number_type&& last, step_type&& step)
: _init(std::move(next), std::move(last), std::move(step)) { }
};
}
/**
* range ( start, end [, increasement ] )
* generates numeric sequence in given range.
*/
template<typename number_type,typename step_type = int>
inline auto range(number_type from, number_type to, step_type step = 1) {
return iteratable_t<iteratables::numeric_range_t<number_type, step_type>>(
iteratables::numeric_range_t<number_type, step_type>(std::move(from), std::move(to), std::move(step)));
}
/**
* infinite( start [, increasement ] )
* generates numeric sequence until overflow.
*/
template<typename number_type, typename step_type = int>
inline auto infinite(number_type from, step_type step = 1) {
static_assert(
std::is_integral<number_type>::value || std::is_floating_point<number_type>::value,
"infinite range (until overflow) can accept only integer types and floating points!"
);
number_type to = step < 0 ? std::numeric_limits<number_type>::min() - step : std::numeric_limits<number_type>::max() - step;
return iteratable_t<iteratables::numeric_range_t<number_type, step_type>>(
iteratables::numeric_range_t<number_type, step_type>(std::move(from), std::move(to), std::move(step)));
}
И этот код можно использовать как:
for (int i: range(0, 100)) {
std::cout << i << "n";
}
2. Обобщение Operators, который управляет последовательностью данных.
После обобщения iterator и iteratable, Мне нужно было обобщить больше, чем для написания манипуляторов. Этот процесс никогда не должен быть сложным или трудным, и не нужно вводить много кода. Итак, определил правило подробнее. И что должно быть реализовано в качестве ядра, так это возможность распознавать и комбинировать друг друга без передачи аргументов шаблона. Но этого не должно быть с virtual table для performance.
namespace _ {
/**
* struct operatable_t<operator>.
* Operator takes iteratables and handles them, and then,
* internally creates progressive or bypass action which met its purpose.
*/
template<typename operatable = void>
struct operatable_t {
instrusive<operatable> source;
/**
* take iteratable and decide returnning action by its purpose.
*/
template<typename iteratable>
inline auto take(iteratable&& input)
-> iteratable_t<typename traits::action_of<operatable, iteratable>::action_type>
{
static_assert(
traits::is_iteratable<iteratable>,
"take(iteratable) can only take iteratable!"
);
SEQ_INIT_ASSERT((bool) source, "tried to interact with uninitialized operator!");
return iteratable_t<typename traits::action_of<operatable, iteratable>::action_type>((*source).take(std::move(input)));
}
/* constructor of this operatable_t. */
operatable_t() { }
operatable_t(operatable source) : source(std::move(source)) { }
operatable_t(const operatable_t& o) : source(o.source) { }
operatable_t(operatable_t& o) : source(std::move(o.source)) { }
/* assignment operators. */
inline operatable_t& operator =(const operatable_t& other) { source = other.source; return *this; }
inline operatable_t& operator =(operatable_t&& other) { source = std::move(other); return *this; }
};
}
template<typename operatable>
using operatable_t = typename traits::anti_recursive<_::operatable_t<typename std::decay<operatable>::type>>::type;
/**
* Pipe operator.
* This operator is for putting the iteratable into operatable.
*/
template<typename iteratable, typename operatable>
inline auto operator >> (_::iteratable_t<iteratable> source, _::operatable_t<operatable> dest)
-> decltype(std::declval<operatable_t<operatable>>().take(std::declval<iteratable_t<iteratable>&&>())) {
return dest.take(iteratable_t<iteratable>(std::move(source)));
}
bypass проходит iteratable сам к executor, который является реализацией. progressive передает индивидуальные данные, перечисляя их iteratable. Это не значит что progressive немедленно iterate все данные заранее. Только при попытке получить данные из progressive результат iteratable, он обрабатывает один за другим и выдает результат.
/**
* struct exec_progressive_action_t<executor_type, iteratable>
* Handle input and return its result progressively.
*/
template<typename executor_type, typename iteratable>
struct exec_progressive_action_t {
using trait = traits::iterator_of<iteratable>;
using return_type = traits::executor::progressive_return_of<executor_type, typename trait::value_t>;
using decayed_return_type = typename std::decay<return_type>::type;
using decayed_iteratable = typename std::decay<iteratable>::type;
SEQ_DECLARE_ITERATABLE(state_t, _init);
struct state_t {
executor_type executor;
decayed_iteratable input;
instrusive<typename trait::begin_t, true> next;
instrusive<typename trait::end_t, true> last;
instrusive<decayed_return_type, true> buffer;
state_t(executor_type& executor, decayed_iteratable&& input)
: executor(executor), input(std::move(input)) { }
inline void init() {
next = input.begin();
last = input.end();
(*next).init();
}
inline bool is_end_of() { return (*next).end_of(); }
inline bool pending() { return (*next).pending(); }
inline bool wait(int timeout) { return (*next).wait(timeout); }
inline decayed_return_type& value() {
if (!buffer) {
auto _value = (*next).value();
buffer = executor.handle(std::move(_value));
}
return *buffer;
}
inline bool forward() {
if ((*next).forward()) {
buffer.unset();
return true;
}
return false;
}
};
state_t _init;
exec_progressive_action_t(executor_type& executor, decayed_iteratable&& input)
: _init(executor, std::move(input)) { }
};
Ниже приведен пример кода operatable.
namespace operatables {
/**
* struct flat_map<handler_type>.
* maps the input interatable to given functor.
*/
template<typename handler_type>
struct flat_map {
using functor_is = traits::action_functor_of<handler_type>;
using return_type = typename functor_is::return_type;
using decayed_type = typename std::decay<return_type>::type;
static_assert(
!functor_is::is_bypass || traits::is_iteratable<typename functor_is::return_type>,
"`bypass` action should return iteratable type!"
);
struct executor_t {
handler_type handler;
instrusive<decayed_type, false> buffer;
executor_t(handler_type&& handler)
: handler(std::move(handler)) { }
template<typename iteratable>
inline decayed_type&& bypass(iteratable&& input) {
return std::move(*(buffer = handler(input)));
}
template<typename input_type>
inline decayed_type&& handle(input_type&& input) {
return std::move(*(buffer = handler(input)));
}
};
SEQ_DECLARE_TAKE_BOTHWAY(executor_t, _init, functor_is::is_bypass);
executor_t _init;
flat_map(handler_type&& handler)
: _init(std::move(handler)) { }
};
}
/**
* flat_map( handler functor ).
* maps the input iteratable to given functor.
* 1. progressive mode: []( data-type ...) { return processed-data. };
* 2. bypass mode: []( dynamic_iterator<data-type> ... ) { return iteratable; };
*/
template<typename handler_type>
inline auto flat_map(handler_type&& handler)
-> operatable_t<operatables::flat_map<handler_type>> {
return operatable_t<operatables::flat_map<handler_type>>(
operatables::flat_map<handler_type>(std::move(handler)));
}
А затем его использование:
auto zero_to_one = range(0, 5)
>> flat_map([](int v) {
return v / 2.0;
})
>> flat_map([](dynamic_iteratable<double> values) {
std::vector<int> s;
for(auto k : values) {
s.push_back(k);
for (int v : range(0, (int)k + 5)) {
s.push_back(v);
}
}
return values;
});
for (auto i : zero_to_one) {
std::cout << i << "n";
}
Аргумент второй flat_map звонок, причина dynamic_iteratable<type> является, iteratable_t<...> шаблон будет преобразован в нечитаемый человеком и не может быть набран вручную слишком долго. например iteratable_t<operatables::flat_map<lambda [] ->124ab...>::iteratable<iteratables::numeric_range_t<int, int, int>>..............>.
Значит, его нужно завернуть в более короткий шрифт.
/**
* struct dynamic_iteratable<data_type>.
* Wraps compile-time generated iteratable to dynamic instance.
*/
template<typename data_type>
struct dynamic_iteratable {
struct state_t;
struct iteratable_trans;
struct iterator_trans;
using iteratable_ptr = trans_ptr<iteratable_trans>;
using iterator_ptr = trans_ptr<iterator_trans>;
struct iterator_trans {
instrusive<data_type, true> temp;
void* _object;
/*
* Delegates to:
*
* inline void init() { return _ensure(state); }
* inline bool is_end_of() const { return _end_of(state); }
* inline value_type value() const { return _value(state); }
* inline bool forward() { return _forward(state); }
* inline bool pending() const { return _pending(state); }
* inline bool wait(int timeout) const { return _wait(state, timeout); }
*/
iterator_ptr (*_clone)(const iterator_ptr&);
void (*_init)(const iterator_ptr&);
bool (*_end_of)(const iterator_ptr&);
data_type& (*_value)(const iterator_ptr&);
bool (*_forward)(const iterator_ptr&);
bool (*_pending)(const iterator_ptr&);
bool (*_wait)(const iterator_ptr&, int);
};
struct iteratable_trans {
void* _object;
iterator_ptr (*begin)(const iteratable_ptr&);
};
SEQ_DECLARE_ITERATABLE(state_t, _init);
/* transparent pointers to original iteratable. */
struct state_t {
mutable iteratable_ptr transparent;
mutable iterator_ptr iterator;
inline void init() {
SEQ_INIT_ASSERT(transparent, "tryied to iterate uninitialized dynamic iterator!");
iterator = transparent->begin(transparent);
iterator->_init(iterator);
}
inline bool is_end_of() const { return iterator->_end_of(iterator); }
inline data_type& value() const { return iterator->_value(iterator); }
inline bool forward() { return iterator->_forward(iterator); }
inline bool pending() const { return iterator->_pending(iterator); }
inline bool wait(int timeout) const { return iterator->_wait(iterator, timeout); }
};
template<typename iteratable>
dynamic_iteratable(iteratable&& _iteratable) {
using qualified_iteratable = iteratable_t<iteratable>;
using begin_type = typename traits::iterator_of<qualified_iteratable>::begin_t;
static_assert(
traits::is_iteratable<iteratable>,
"not iteratables can't be wrapped to dynamic iteratable!"
);
struct func_proxy {
static iterator_ptr clone(const iterator_ptr& p) {
iterator_trans* newbie = new iterator_trans(*p.get());
newbie->_object = new begin_type(*((begin_type*)p->_object));
return iterator_ptr(newbie, [](iterator_trans* p) {
delete ((begin_type*)(p->_object));
});
}
static void init(const iterator_ptr& p) { ((begin_type*)(p->_object))->init(); };
static bool end_of(const iterator_ptr& p) { return ((begin_type*)(p->_object))->end_of(); };
static data_type& value(const iterator_ptr& p) { return *(p->temp = ((begin_type*)(p->_object))->value()); }
static bool forward(const iterator_ptr& p) { return ((begin_type*)(p->_object))->forward(); };
static bool pending(const iterator_ptr& p) { return ((begin_type*)(p->_object))->pending(); };
static bool wait(const iterator_ptr& p, int v) { return ((begin_type*)(p->_object))->wait(v); };
};
iteratable_trans* data = new iteratable_trans();
data->_object = new qualified_iteratable(std::move(_iteratable));
data->begin = [](const iteratable_ptr& ptr) {
auto* _iteratable = ((qualified_iteratable*)ptr->_object);
iterator_trans* iter = new iterator_trans();
iterator_ptr shareable(iter, [](iterator_trans* p) {
delete ((begin_type*)(p->_object));
});
iter->_object = new begin_type(_iteratable->begin());
iter->_clone = func_proxy::clone;
iter->_init = func_proxy::init;
iter->_end_of = func_proxy::end_of;
iter->_value = func_proxy::value;
iter->_forward = func_proxy::forward;
iter->_pending = func_proxy::pending;
iter->_wait = func_proxy::wait;
return shareable;
};
_init.transparent = iteratable_ptr(data,
[](iteratable_trans* p) { delete ((qualified_iteratable*)(p->_object)); });
}
state_t _init;
};
3. Идеи для минимизации waiting и polling.
Как вы могли заметить, если вы посмотрите на часть кода, который я показал в приведенном выше примере, у них нет ожидаемых handles предоставленный kernel.
Итак, я придумал идею. Я еще не занимаюсь его реализацией. Причина, по которой я пишу эту длинную статью, состоит в том, чтобы получить совет по поводу идеи и получить отзывы о вышеупомянутых обобщениях.
Идея, о которой я думаю:
- Только один ожидающий
handleраспределяется по потоку. Это может быть доступноTLS. (не решил) - В
handleвыходит из своего состояния всякий раз, когда сообщается, что какой-либоasyncзапрос из этого потока был выполнен. - И
iteratableкто создалasync requestвыпущенasync token, и когда он будет завершен, токен будет возвращен. - Когда все
asynchronous tokensвыданные для потока возвращаются, ожидающиеhandlesвозвращаются вconcurrent queue.
Однако весь этот процесс основан на предпосылке, что реакция на asynchronous request должно быть iteratable.
Могу я получить совет по этому поводу?
