Flipper Loop

Introduction

The flipperLoop function aims to continually push the system towards consensus on a particular value, using various mechanisms and tools.

Parameters:

  1. Node* u: The primary node initiating the consensus round.

  2. int value0: The initial value proposed for consensus.

  3. const std::vector<Node*>& network: The entire network of nodes.

Internal Variables:

  1. value: Current value being considered.

  2. lastValue: Last value that was considered.

  3. cnt: Counter to keep track of consecutive times a majority value has been identified.

  4. e: Efficiency factor, a dynamic parameter to determine sampling size and other related parameters.

  5. d: Map to keep track of votes for different values.

  6. checkpointInterval: Interval at which the value is stored as a checkpoint.

  7. lastCheckpoint: Last stored checkpoint value.

Dynamic Variables:

The updateParams lambda function updates the following:

  1. alpha: Calculated from e, used to influence other parameters.

  2. beta: Calculated from e, used to influence other parameters.

  3. k: Calculated from e, determines the sampling size.

Mechanism:

  1. Initialization: Set initial parameters and dynamic variables.

  2. Invalid State Check: If value becomes 2 (indicating an invalid state), it continues to the next iteration.

  3. Pre-prepare Phase: The primary node u proposes a value (proposedValue).

  4. Prepare Phase:

    • The network is sampled to select a subset K of nodes.

    • These nodes are queried in parallel for their values, resulting in a set of values P.

  5. Commit Phase:

    • Majority check is performed on the set P using majorityVote. If a majority value is found, it updates the count and current proposed value.

    • If a majority is not found, the counter cnt is reset.

  6. Reply Phase:

    • If the counter cnt reaches a threshold (e.g., 5), the current value is accepted, displayed, and the loop exits.

  7. Dynamic Adjustment:

    • The efficiency factor e is adjusted to improve the consensus process's efficiency and responsiveness.

    • If there are frequent changes in the primary node (view changes) or if the consensus process is too slow, the efficiency factor is adjusted downward or upward, respectively. This ensures the system adapts to its operational context.

  8. Feedback Loop:

    • Placeholder flags frequentViewChanges and slowConsensus are given, which, in a real-world application, would be populated based on system observations. They determine how the efficiency factor e is adjusted.

  9. Checkpoint Mechanism:

    • At regular intervals (checkpointInterval), the current value is stored as a checkpoint (lastCheckpoint).

  10. View Change Mechanism:

  • If the primary node is suspected by more than 1/3 of the network, a view change is initiated. The primary node is then changed. (Note: the actual implementation details for selecting a new primary and handling the view change are not provided in the code).

Loops:

The entire mechanism continually tries to reach consensus on the proposed value. The loop iterates until the cnt threshold is reached for a particular value, indicating that consensus has been achieved on that value.

Samling.h:

1. Sampling:

sampling.h:

#ifndef SAMPLING_H
#define SAMPLING_H

#include <vector>
#include <chrono>
#include <unordered_map>

class Node;

std::vector<Node*> sample(const std::vector<Node*>& network, int k);

#endif // SAMPLING_H

sampling.cpp:

#include "sampling.h"
#include "node.h"
#include <random>

std::unordered_map<int, std::chrono::steady_clock::time_point> lastSampled;

std::vector<Node*> sample(const std::vector<Node*>& network, int k) {
    std::vector<Node*> sampledNodes;
    std::vector<double> weights;

    auto now = std::chrono::steady_clock::now();
    for (auto& node : network) {
        auto it = lastSampled.find(node->id);
        if (it == lastSampled.end()) {
            weights.push_back(1.0);
        } else {
            auto duration = std::chrono::duration_cast<std::chrono::seconds>(now - it->second).count();
            weights.push_back(1.0 + duration * 0.01);
        }
    }

    std::random_device rd;
    std::mt19937 gen(rd());
    std::discrete_distribution<> dist(weights.begin(), weights.end());

    for (int i = 0; i < k; ++i) {
        int selectedIndex = dist(gen);
        sampledNodes.push_back(network[selectedIndex]);
        lastSampled[network[selectedIndex]->id] = now;
    }

    return sampledNodes;
}

2. Parallel Queries:

parallel_queries.h:

#ifndef PARALLEL_QUERIES_H
#define PARALLEL_QUERIES_H

#include <vector>

class Node;

std::vector<int> parallel_query(const std::vector<Node*>& K, int value);

#endif // PARALLEL_QUERIES_H

parallel_queries.cpp:

#include "parallel_queries.h"
#include "node.h"
#include <thread>
#include <future>

std::vector<int> parallel_query(const std::vector<Node*>& K, int value) {
    std::vector<int> results;
    std::vector<std::future<int>> futures;

    for (auto& node : K) {
        futures.push_back(std::async(std::launch::async, [node, value]() -> int {
            std::this_thread::sleep_for(std::chrono::milliseconds(rand() % 50 + 50));
            return value + (node->id % 5);
        }));
    }

    for (auto& fut : futures) {
        results.push_back(fut.get());
    }

    return results;
}

Node and Dynamic Adjustments:

node_and_adjustments.h:

#ifndef NODE_AND_ADJUSTMENTS_H
#define NODE_AND_ADJUSTMENTS_H

#include <string>
#include <unordered_map>

enum MessageType {
    PRE_PREPARE,
    PREPARE,
    COMMIT
};

struct Message {
    MessageType type;
    int value;
    std::string signature;
};

class Node {
public:
    int id;
    Node(int id);
    std::string signMessage(const std::string& message);
    bool verifySignature(const std::string& message, const std::string& signature);
};

// Dynamic adjustments, efficiency factor, etc.
void updateParams(double& e, std::unordered_map<int, int>& d, int& value);

bool isSuperMajority(int votes, int total);

#endif // NODE_AND_ADJUSTMENTS_H

node_and_adjustments.cpp:

#include "node_and_adjustments.h"
#include <cmath>

Node::Node(int id): id(id) {}

std::string Node::signMessage(const std::string& message) {
    return "signature_for_" + message; // Mock signature
}
    
bool Node::verifySignature(const std::string& message, const std::string& signature) {
    return signMessage(message) == signature;
}

void updateParams(double& e, std::unordered_map<int, int>& d, int& value) {
    static const double upperThreshold = 1.5;
    static const double lowerThreshold = 0.5;
    static const double adjustmentRate = 1.01;
    static const double decreaseRate = 0.99;
    static const double smoothingFactor = 0.1;

    e = std::min(1.5, e * adjustmentRate);

    // Feedback from the system
    bool frequentViewChanges = false; // Placeholder: This should be determined based on actual system feedback
    bool slowConsensus = false;       // Placeholder: This should be determined based on actual system feedback

    if (frequentViewChanges) {
        e *= decreaseRate;
    } else if (slowConsensus) {
        e *= adjustmentRate;
    } else {
        e *= adjustmentRate;
    }

    e = (1 - smoothingFactor) * e + smoothingFactor * std::min(upperThreshold, e);
    e = std::clamp(e, lowerThreshold, upperThreshold);
}

bool isSuperMajority(int votes, int total) {
    return votes > (2 * total) / 3;
}

Flipper Loop and Remaining Elements:

flipper_loop.h:

#ifndef FLIPPER_LOOP_H
#define FLIPPER_LOOP_H

#include "node_and_adjustments.h"
#include "sampling_parallel_queries.h"
#include <vector>

// Majority voting function signature
std::pair<int, bool> majorityVote(const std::vector<int>& vec);

void flipperLoop(Node* u, int value0, const std::vector<Node*>& network);

#endif // FLIPPER_LOOP_H

helpers.h:

#ifndef HELPERS_H
#define HELPERS_H

#include "node_and_adjustments.h"
#include <vector>
#include <unordered_map>

// Mock functions for sending and receiving messages
void sendMessage(Node* receiver, Message msg);
Message receiveMessage(Node* sender);

#endif // HELPERS_H

flipper_loop.cpp:

#include "flipper_loop.h"
#include "helpers.h"
#include <unordered_map>
#include <iostream>

std::pair<int, bool> majorityVote(const std::vector<int>& vec) {
    std::unordered_map<int, int> freqMap;
    for(auto& el : vec) {
        freqMap[el]++;
    }

    for(auto& [key, value] : freqMap) {
        if(isSuperMajority(value, vec.size())) {
            return {key, true};
        }
        if(value > vec.size() / 2) {
            return {key, true};
        }
    }
    return {-1, false};
}

void flipperLoop(Node* u, int value0, const std::vector<Node*>& network) {
    int value = value0, lastValue = value0, cnt = 0;
    double e = 1.0;
    std::unordered_map<int, int> d = {{0, 0}, {1, 0}};
    
    while(true) {
        if(value == 2) continue; // Invalid state, continue

        // Pre-prepare Phase: Proposed by the primary node
        auto proposedValue = value;

        // Sampling and Querying (Prepare Phase)
        auto K = sample(network, 5);
        auto P = parallel_query(K, proposedValue);

        // Majority Check (Commit Phase)
        auto [majElement, isMajority] = majorityVote(P);

        // Reset counter if no majority
        cnt = isMajority ? (majElement == lastValue ? cnt + 1 : 1) : 0;

        // Update values
        if(isMajority) {
            d[majElement]++;
            if(d[majElement] > d[value]) {
                value = majElement;
            }
        }

        // Accept value condition (Reply Phase)
        if(cnt >= 5) {
            std::cout << "Accept value: " << value << std::endl;
            return;
        }

        // Dynamic adjustment of efficiency factor
        updateParams(e, d, value);

        // Checkpoint mechanism (example, may need more details in actual implementation)
        int checkpointInterval = 10;
        int lastCheckpoint = 0;

        if (cnt % checkpointInterval == 0) {
            lastCheckpoint = value;
        }

        // Simplified view change mechanism for demonstration
        int faultyCount = 0;
        for (auto& node : network) {
            if (/*condition to suspect primary*/) {
                faultyCount++;
            }
        }
        if (faultyCount > network.size() / 3) {
            // Initiate view change
            u = /*select new primary from network*/;
        }
    }
}

// Mock implementations
void sendMessage(Node* receiver, Message msg) {
    // Mock sending a message to a node
}

Message receiveMessage(Node* sender) {
    // Mock receiving a message from a node
    return Message{PRE_PREPARE, 0, ""};
}

Last updated

Logo