Fundamentals 18 min read

Understanding Pulsar C++ Client’s Custom Future and Promise Implementation

This article explores the Pulsar C++ client’s hand‑crafted Future and Promise classes, comparing them with C++11’s std::future/std::promise, illustrating thread creation with pthread and std::thread, and detailing the internal design, callbacks, synchronization, and usage patterns for asynchronous programming.

360 Zhihui Cloud Developer
360 Zhihui Cloud Developer
360 Zhihui Cloud Developer
Understanding Pulsar C++ Client’s Custom Future and Promise Implementation

Introduction

Due to work requirements, the author recently read the Pulsar C++ client implementation and discovered that, although the client is written with C++11, it implements its own Future and Promise classes. This article explains a simple implementation of Future and Promise in C++ by examining the source code in lib/Future.h.

C++11 Concurrency API

C++11 provides support for concurrent programming, including std::thread and the std::future / std::promise pair for task‑based design.

Example using pthread to sum two integers and return the result as a hexadecimal string:

#include <pthread.h>
#include <iostream>
#include <string>
using namespace std;
struct Package {
  int x, y;            // input
  string result;  // output

  void calculate() { result = std::to_string(x + y); }
};
int main(int argc, char* argv[]) {
  pthread_t tid;
  Package package{1, 3, ""};
  pthread_create(&tid, nullptr,
      [](void* param) -> void* {
        static_cast<Package*>(param)->calculate();
        return nullptr;
      }, &package);
  pthread_join(tid, nullptr);
  cout << "result: " << package.result << endl;
  return 0;
}

The equivalent implementation with std::thread simplifies the code:

#include <string>
#include <thread>
using namespace std;
int main(int argc, char* argv[]) {
  string result;
  thread t([](int x, int y, string& s) { s = to_string(x + y); }, 1, 3, ref(result));
  t.join();
  cout << "result: " << result << endl;
  return 0;
}

The major improvement is that the thread function signature changes from the low‑level void*(void*) to a generic T(Args&&...), allowing any return type and any number of arguments.

How to Obtain Return Values

While std::thread cannot directly return a value, pthread can return a void* that holds the result, which requires manual memory management and introduces potential leaks.

#include <pthread.h>
#include <iostream>
#include <string>
using namespace std;
struct Input {
  int x, y;
  string sum() const { return to_string(x + y); }
};
int main(int argc, char* argv[]) {
  Input input{1, 3};
  pthread_t tid;
  pthread_create(&tid, nullptr,
      [](void* param) -> void* {
        return new string(static_cast<Input*>(param)->sum());
      }, &input);
  void* result;
  pthread_join(tid, &result);
  cout << "result: " << *static_cast<string*>(result) << endl;
  delete static_cast<string*>(result);
  return 0;
}

std::future

std::future

addresses the inability of std::thread to return a value. An example using std::async:

#include <future>
#include <iostream>
#include <string>
using namespace std;
int main(int argc, char* argv[]) {
  auto future_ = async([](int x, int y) { return to_string(x + y); }, 1, 3);
  cout << "result: " << future_.get() << endl;
  return 0;
}
async

creates an asynchronous task; get() blocks until the task finishes and returns the result. It also supports timeout control to avoid indefinite blocking.

std::promise

std::promise

complements std::future when the caller wants the thread to continue doing work after producing a result. A typical usage:

#include <chrono>
#include <future>
#include <iostream>
#include <string>
#include <thread>
using namespace std;
int main(int argc, char* argv[]) {
  string input;
  cout << "input: ";
  cin >> input;
  promise<int> p;
  thread t([&p](const string& s) {
    try {
      int number = stoi(s);
      p.set_value(number);
      this_thread::sleep_for(chrono::seconds(1));
      cout << "thread do something else..." << endl;
    } catch (...) {
      p.set_exception(current_exception());
    }
  }, input);
  try {
    cout << "result: " << p.get_future().get() << endl;
  } catch (const exception& e) {
    cout << "Failed to parse " << input << " to integer: " << e.what() << endl;
  }
  t.join();
  return 0;
}

Running the program yields either the parsed integer with a subsequent message from the thread, or an error message if parsing fails.

Pulsar C++ Client’s Future

The client’s Future differs from std::future<T> by using two template parameters: Result (error code) and Type (actual value). Internally it holds a shared_ptr to an InternalState struct containing a mutex, condition variable, result, value, completion flag, and a list of listener callbacks.

template <typename Result, typename Type>
class Future {
public:
  // ...
private:
  std::shared_ptr<InternalState<Result, Type>> state_;
};

template <typename Result, typename Type>
struct InternalState {
  std::mutex mutex;
  std::condition_variable condition;
  Result result;
  Type value;
  bool complete;
  std::list<std::function<void(Result, const Type&)>> listeners;
};

Listeners are callbacks that receive the result code and the value when the asynchronous task completes. The addListener method registers a callback, executing it immediately if the task is already complete or storing it for later invocation.

Future& addListener(ListenerCallback callback) {
  InternalState<Result, Type>* state = state_.get();
  Lock lock(state->mutex);
  if (state->complete) {
    lock.unlock();
    callback(state->result, state->value);
  } else {
    state->listeners.push_back(callback);
  }
  return *this;
}

The get method blocks until complete becomes true, then returns the result code and writes the value to the provided reference.

Result get(Type& result) {
  InternalState<Result, Type>* state = state_.get();
  Lock lock(state->mutex);
  if (!state->complete) {
    while (!state->complete) {
      state->condition.wait(lock);
    }
  }
  result = state->value;
  return state->result;
}

Pulsar C++ Client’s Promise

The Promise class is a friend of Future and shares the same InternalState. It provides methods to set the value, set a failure code, and query completion.

template <typename Result, typename Type>
class Promise {
public:
  Promise() : state_(std::make_shared<InternalState<Result, Type>>()) {}
  Future<Result, Type> getFuture() const { return Future<Result, Type>(state_); }
  bool setValue(const Type& value) {
    InternalState<Result, Type>* state = state_.get();
    Lock lock(state->mutex);
    if (state->complete) return false;
    state->value = value;
    state->result = Result();
    state->complete = true;
    for (auto& cb : state->listeners) cb(state->result, state->value);
    state->listeners.clear();
    state->condition.notify_all();
    return true;
  }
  bool setFailed(Result result) {
    InternalState<Result, Type>* state = state_.get();
    Lock lock(state->mutex);
    if (state->complete) return false;
    state->result = result;
    state->complete = true;
    for (auto& cb : state->listeners) cb(state->result, state->value);
    state->listeners.clear();
    state->condition.notify_all();
    return true;
  }
private:
  std::shared_ptr<InternalState<Result, Type>> state_;
};

When a value or error code is set, all registered listeners are invoked, and any threads blocked in Future::get are awakened.

Reinventing the Wheel?

The Pulsar client’s implementation covers only a subset of the standard library’s std::future / std::promise functionality, using error codes instead of exceptions. Its distinctive feature is the support for registering callbacks, enabling both asynchronous and synchronous usage patterns. An example wrapper class IntegerParser demonstrates how to provide both async and sync APIs using the custom Future/Promise pair.

enum class ErrorCode { kSuccess = 0, kInvalidFormat, kUserCallbackException };

class IntegerParser {
public:
  Future<ErrorCode, int> parseAsync(const std::string& input,
      std::function<void(ErrorCode, int)> callback) {
    std::thread t([&input, this] {
      try {
        int number = std::stoi(input);
        promise_.setValue(number);
      } catch (...) {
        promise_.setFailed(ErrorCode::kInvalidFormat);
      }
    });
    t.detach();
    return promise_.getFuture().addListener(
        [&callback](ErrorCode code, const int& result) {
          try { callback(code, result); }
          catch (const std::exception& e) {
            std::cerr << "[ERROR] User's callback throws: " << e.what() << std::endl;
          }
        });
  }

  ErrorCode parse(const std::string& input, int& result) {
    ErrorCode parse_error = ErrorCode::kSuccess;
    auto fut = parseAsync(input, [&parse_error](ErrorCode err, const int&) { parse_error = err; });
    fut.get(result);
    return parse_error;
  }
private:
  Promise<ErrorCode, int> promise_;
};

Overall, the Pulsar client’s Future/Promise design mirrors C++11’s standard facilities while adding callback registration and error‑code‑based error handling.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

PulsarPromiseFuture
360 Zhihui Cloud Developer
Written by

360 Zhihui Cloud Developer

360 Zhihui Cloud is an enterprise open service platform that aims to "aggregate data value and empower an intelligent future," leveraging 360's extensive product and technology resources to deliver platform services to customers.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.