This is a project I am doing for a course. The project’s goal is to simulate computing PageRank in a Pregel like system with multithreading in Java.
Some Basic Information
Pregel: A System for Large-Scale Graph Processing
Review for Java Threads
Important Details about Pregel
compute()woule be executed at each active vertex in every superstep.
compute()is allowed to query information about the current vertex and its edges and to send messages to other vertices.
compute()can inspect and modify the values of out-edges using methods supplied by the out-edge iterator.
- The values associated with the vertex and its edges are the only pre-vertex state that persists across supersteps.
- Vertices communicate directly with one another by sending messages.
- A message consists of a message value and the name of the destination vertex.
- Messages sent to vertex in superstep are available via an iterator, at superstep .
- There is no guarantee on order of messages in the iterator, but it is guaranteed that messages will be delivered and that they will not be duplicated.
- Each vertex can provide a value to an aggregator in superstep , the system combines those values using a reduction operator.
- Resulting value is made available to all vertices in superstep .
- Define a new aggregator by creating a subclass of
Aggregator, which is a predefined class. And specify how the aggregated value is initialized from the first input value and how multiple partially aggregated values are reduced to one.
In PageRank task, I am considering using
Aggregator’s subclass instance(maybe a function is enough for this project) to aggregate message values sent to the same vertex.
- The state of each vertex contains:
- current value
- a list of outgoing edges
- a queue containing incoming messages
- a flag specifying whether the vertex is active
When the worker performs a superstep it loops through all vertices and calls
compute(), passing it the current value, an iterator to the incoming messages, and an iterator to the outgoing edges.
- Two copies of the active vertex flags and the incoming message queue exist: one for the current superstep and one for the next superstep
Primarily, the master is responsible for coordinating the activities of workers. Most master operations, including input, output, computation, and saving and resuming from checkpoints, are terminated at
An aggregator computes a single global value by applying an aggregation function to a set of values that the user supplies. When a worker executes a superstep for any partition of the graph, the worker combines all of the values supplied to an aggregator instance into a single value.
- Initialize edges; create
- Start all vertices’
- While number of vertices that voting to halt is smaller than
superstep += 1
- Send message to all vertices for starting next
voteCountis smaller than
- Count votes received and add it to
- Count votes received and add it to
- Count number of -1 in
- Up on start, halt
- waiting for a message from Master that indicates starting of next
- get the aggregated message from the in-message-queue.
compute(), where the
vote messagewould be computed or generated.
- Pause thread and wait for next superstep to start
Message is used for holding message information. It has several attributes:
type indicates that if this message is a vote or a message that passing weight;
to marks the sender and receiver of this message;
value holds the weight, would be null if message is a vote;
isVote is a flag for marking votes message;
vote indicates whether the vertex is voting to halt or not.
- Calculate new weight:
- Calculate message value:
- Send to all its outgoing edges’ destination
- Calculate error:
- Vote to halt (
1for voting to halt,
- Wait on
Note that initially, every vertex’s weight would be set to . Also, in the very first
error shouldn’t be computed because there
are no value available.