Understanding Pulsar C++ Client’s Custom Future and Promise Implementation
This article explores the Pulsar C++ client’s hand‑crafted Future and Promise classes, comparing them with C++11’s std::future/std::promise, illustrating thread creation with pthread and std::thread, and detailing the internal design, callbacks, synchronization, and usage patterns for asynchronous programming.
Introduction
Due to work requirements, the author recently read the Pulsar C++ client implementation and discovered that, although the client is written with C++11, it implements its own Future and Promise classes. This article explains a simple implementation of Future and Promise in C++ by examining the source code in lib/Future.h.
C++11 Concurrency API
C++11 provides support for concurrent programming, including std::thread and the std::future / std::promise pair for task‑based design.
Example using pthread to sum two integers and return the result as a hexadecimal string:
#include <pthread.h>
#include <iostream>
#include <string>
using namespace std;
struct Package {
int x, y; // input
string result; // output
void calculate() { result = std::to_string(x + y); }
};
int main(int argc, char* argv[]) {
pthread_t tid;
Package package{1, 3, ""};
pthread_create(&tid, nullptr,
[](void* param) -> void* {
static_cast<Package*>(param)->calculate();
return nullptr;
}, &package);
pthread_join(tid, nullptr);
cout << "result: " << package.result << endl;
return 0;
}The equivalent implementation with std::thread simplifies the code:
#include <string>
#include <thread>
using namespace std;
int main(int argc, char* argv[]) {
string result;
thread t([](int x, int y, string& s) { s = to_string(x + y); }, 1, 3, ref(result));
t.join();
cout << "result: " << result << endl;
return 0;
}The major improvement is that the thread function signature changes from the low‑level void*(void*) to a generic T(Args&&...), allowing any return type and any number of arguments.
How to Obtain Return Values
While std::thread cannot directly return a value, pthread can return a void* that holds the result, which requires manual memory management and introduces potential leaks.
#include <pthread.h>
#include <iostream>
#include <string>
using namespace std;
struct Input {
int x, y;
string sum() const { return to_string(x + y); }
};
int main(int argc, char* argv[]) {
Input input{1, 3};
pthread_t tid;
pthread_create(&tid, nullptr,
[](void* param) -> void* {
return new string(static_cast<Input*>(param)->sum());
}, &input);
void* result;
pthread_join(tid, &result);
cout << "result: " << *static_cast<string*>(result) << endl;
delete static_cast<string*>(result);
return 0;
}std::future
std::futureaddresses the inability of std::thread to return a value. An example using std::async:
#include <future>
#include <iostream>
#include <string>
using namespace std;
int main(int argc, char* argv[]) {
auto future_ = async([](int x, int y) { return to_string(x + y); }, 1, 3);
cout << "result: " << future_.get() << endl;
return 0;
} asynccreates an asynchronous task; get() blocks until the task finishes and returns the result. It also supports timeout control to avoid indefinite blocking.
std::promise
std::promisecomplements std::future when the caller wants the thread to continue doing work after producing a result. A typical usage:
#include <chrono>
#include <future>
#include <iostream>
#include <string>
#include <thread>
using namespace std;
int main(int argc, char* argv[]) {
string input;
cout << "input: ";
cin >> input;
promise<int> p;
thread t([&p](const string& s) {
try {
int number = stoi(s);
p.set_value(number);
this_thread::sleep_for(chrono::seconds(1));
cout << "thread do something else..." << endl;
} catch (...) {
p.set_exception(current_exception());
}
}, input);
try {
cout << "result: " << p.get_future().get() << endl;
} catch (const exception& e) {
cout << "Failed to parse " << input << " to integer: " << e.what() << endl;
}
t.join();
return 0;
}Running the program yields either the parsed integer with a subsequent message from the thread, or an error message if parsing fails.
Pulsar C++ Client’s Future
The client’s Future differs from std::future<T> by using two template parameters: Result (error code) and Type (actual value). Internally it holds a shared_ptr to an InternalState struct containing a mutex, condition variable, result, value, completion flag, and a list of listener callbacks.
template <typename Result, typename Type>
class Future {
public:
// ...
private:
std::shared_ptr<InternalState<Result, Type>> state_;
};
template <typename Result, typename Type>
struct InternalState {
std::mutex mutex;
std::condition_variable condition;
Result result;
Type value;
bool complete;
std::list<std::function<void(Result, const Type&)>> listeners;
};Listeners are callbacks that receive the result code and the value when the asynchronous task completes. The addListener method registers a callback, executing it immediately if the task is already complete or storing it for later invocation.
Future& addListener(ListenerCallback callback) {
InternalState<Result, Type>* state = state_.get();
Lock lock(state->mutex);
if (state->complete) {
lock.unlock();
callback(state->result, state->value);
} else {
state->listeners.push_back(callback);
}
return *this;
}The get method blocks until complete becomes true, then returns the result code and writes the value to the provided reference.
Result get(Type& result) {
InternalState<Result, Type>* state = state_.get();
Lock lock(state->mutex);
if (!state->complete) {
while (!state->complete) {
state->condition.wait(lock);
}
}
result = state->value;
return state->result;
}Pulsar C++ Client’s Promise
The Promise class is a friend of Future and shares the same InternalState. It provides methods to set the value, set a failure code, and query completion.
template <typename Result, typename Type>
class Promise {
public:
Promise() : state_(std::make_shared<InternalState<Result, Type>>()) {}
Future<Result, Type> getFuture() const { return Future<Result, Type>(state_); }
bool setValue(const Type& value) {
InternalState<Result, Type>* state = state_.get();
Lock lock(state->mutex);
if (state->complete) return false;
state->value = value;
state->result = Result();
state->complete = true;
for (auto& cb : state->listeners) cb(state->result, state->value);
state->listeners.clear();
state->condition.notify_all();
return true;
}
bool setFailed(Result result) {
InternalState<Result, Type>* state = state_.get();
Lock lock(state->mutex);
if (state->complete) return false;
state->result = result;
state->complete = true;
for (auto& cb : state->listeners) cb(state->result, state->value);
state->listeners.clear();
state->condition.notify_all();
return true;
}
private:
std::shared_ptr<InternalState<Result, Type>> state_;
};When a value or error code is set, all registered listeners are invoked, and any threads blocked in Future::get are awakened.
Reinventing the Wheel?
The Pulsar client’s implementation covers only a subset of the standard library’s std::future / std::promise functionality, using error codes instead of exceptions. Its distinctive feature is the support for registering callbacks, enabling both asynchronous and synchronous usage patterns. An example wrapper class IntegerParser demonstrates how to provide both async and sync APIs using the custom Future/Promise pair.
enum class ErrorCode { kSuccess = 0, kInvalidFormat, kUserCallbackException };
class IntegerParser {
public:
Future<ErrorCode, int> parseAsync(const std::string& input,
std::function<void(ErrorCode, int)> callback) {
std::thread t([&input, this] {
try {
int number = std::stoi(input);
promise_.setValue(number);
} catch (...) {
promise_.setFailed(ErrorCode::kInvalidFormat);
}
});
t.detach();
return promise_.getFuture().addListener(
[&callback](ErrorCode code, const int& result) {
try { callback(code, result); }
catch (const std::exception& e) {
std::cerr << "[ERROR] User's callback throws: " << e.what() << std::endl;
}
});
}
ErrorCode parse(const std::string& input, int& result) {
ErrorCode parse_error = ErrorCode::kSuccess;
auto fut = parseAsync(input, [&parse_error](ErrorCode err, const int&) { parse_error = err; });
fut.get(result);
return parse_error;
}
private:
Promise<ErrorCode, int> promise_;
};Overall, the Pulsar client’s Future/Promise design mirrors C++11’s standard facilities while adding callback registration and error‑code‑based error handling.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
360 Zhihui Cloud Developer
360 Zhihui Cloud is an enterprise open service platform that aims to "aggregate data value and empower an intelligent future," leveraging 360's extensive product and technology resources to deliver platform services to customers.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
