In Assignment 2, we developed parallel solutions for Triangle Counting and PageRank. In this assignment, we will improve those parallel solutions. Specifically, we will look at different task decomposition and mapping strategies for Triangle Counting and PageRank programs. You will implement and observe the effects of different strategies, and write a report that answers the specific questions listed in the Report (see repo).
With additional timing info required to answer questions in the report
mkdir build
cd build
cmake -DCMAKE_CXX_FLAGS="-DADDITIONAL_TIMER_LOGS" ../ # for additional timer info
For base performance experiments
mkdir build
cd build
cmake ../
--strategy
command line argument. The value of --strategy
will vary between 1
and 3
(more details presented with each strategy below).--nWorkers
to specify the number of threads for the program. Example: --nWorkers 4
.cpus-per-task
is correctly specified in your slurm config file based on your requirement.timer t1;
t1.start();
/* ---- Code region whose time is to be measured --- */
double time_taken = t1.stop();
If you need to time a sub-section inside a loop, you can do that as follows:
double time_taken = 0.0;
timer t1;
while(True){
/* ---- Code region whose time should not be measured --- */
t1.start();
/* ---- Code region whose time is to be measured --- */
time_taken += t1.stop();
/* ---- Code region whose time should not be measured --- */
}
std::cout << "Time spent on required code region : " << time_taken << "\n";
--cpus-per-task=4
is set in your slurm job.$ cd $REPO
$ ls ./triangle/scripts/
$ ls ./pagerank/scripts
triangle_tester.pyc pagerank_tester.pyc
Sample input graphs are available at /scratch/assignment2/input_graphs/
(available on cs-cloud-02 and cs-cloud-04 only). maps/
in your repo provides a single example roadNet-CA.
Create a file called testGraph.txt
with the list of edges (one edge on each line) in the graph. For example,
1 2
2 3
$ /scratch/assignment2/input_graphs/SNAPtoBinary testGraph.txt testGraphConverted
testGraphConverted.csr
and testGraphConverted.csc
files which are CSR and CSC representations of the graph.--inputFile "testGraphConverted"
.params | description |
---|---|
nWorkers | Number of threads |
inputFile | Path to input graph |
strategy | Strategy |
./triangle/triangle_parallel.bin --nWorkers 4 --inputFile /scratch/assignment1/input_graphs/roadNet-CA --strategy 1
In Assignment 2, we developed a parallel solution for Triangle Counting where each thread works on (approximately) n/T
vertices, where n
is the number of vertices in the graph and T
is the number of threads. The pseudocode of this solution is shown below.
Create T threads
for each thread in parallel {
Compute the number of triangles for (approximately) n/T vertices allocated to the thread
}
triangle_count = Accumulate the triangle counts from all the threads
triangle_count = triangle_count / 3
This is the naive vertex-based task decomposition
strategy, which will be used with --strategy 1
(i.e., add --strategy 1
in your solution).
Question 1 in report will require you to run your solution and analyze the results. We have provided a base solution using atomics for strategy 1. Compare your implementation from Assignment 2 with the implementation provided.
./triangle/triangle_parallel.bin --nWorkers 4 --inputFile /scratch/assignment1/input_graphs/roadNet-CA --strategy 2
In this strategy, you have to distribute the m
edges in the graph such that each thread works on approximately m/T
edges, where T
is the number of workers. Below is the pseudo-code showing edge-based task decomposition:
Create T threads
for each thread in parallel {
Compute the number of triangles created by approximately m/T edges allocated to the thread
}
triangle_count = Accumulate the triangle counts from all the threads
triangle_count = triangle_count / 3
Question 2 in report. Study the work distribution across the threads.
./triangle/triangle_parallel.bin --nWorkers 4 --inputFile /scratch/assignment2/input_graphs/roadNet-CA --strategy 3
Instead of allocating approximately equal number of vertices (or edges) to threads, we can dynamically allocate work to each thread whenever they are free. In this strategy, each thread dynamically gets the next vertex to be computed until all the vertices are processed. Below is the pseudo-code showing dynamic task mapping (with vertex-based decomposition):
Create T threads
for each thread in parallel {
while(true){
v = getNextVertexToBeProcessed()
if(v == -1) break;
Compute the number of triangles created by the vertex v & its outNeighbors
}
}
triangle_count = Accumulate the triangle counts from all the threads
triangle_count = triangle_count / 3
Output Format for Questions 1-3:
Your parallel solution must output the following information:
[0, T)
)get_start_vertex()
and get_end_vertex()
, you should time them as follows: int start_vertex = 0; end_vertex = 0;
timer t2;
double partitionTime = 0.0;
for(int tid=0; tid<number_of_workers; tid++){
t2.start();
start_vertex = get_start_vertex(tid);
end_vertex = get_end_vertex(tid);
partitionTime += t2.stop();
}
std::cout << "Partitioning time (in seconds) : " << partitioning_time << "\n";
If your task decomposition/mapping is carried out in the thread function, then get the time taken for task decomposition/mapping by thread 0 and print it in the main thread.
// Thread function
void threadFunction(int tid, double* partitionTime){
int start_vertex = 0; end_vertex = 0;
timer t2;
t2.start();
start_vertex = get_start_vertex(tid);
end_vertex = get_end_vertex(tid);
double t_partitionTime = t2.stop();
if(tid == 0){
*partitionTime = t_partitionTime;
}
// --- Other code ---
}
int main(){
double partitionTime = 0.0;
// --- Other code ---
for (uint i = 0; i < num_of_workers; i++){
threads[i] = std::thread(threadFunction, tid, &partitionTime);
}
// --- Other code ---
std::cout << "Partitioning time (in seconds) : " << partitioning_time << "\n";
}
Please note that the output format should strictly match the expected format (including “spaces” and “commas”). You can test your code using the test script as follows: **Only includes test data for roadNet-CA and lj. Validated on cs-cloud-02 **
python2.7 ../triangle/scripts/triangle_tester.pyc --execPath=$PWD/triangle/triangle_parallel.bin --scriptPath=$PWD/../triangle/scripts/triangle_evaluator.pyc --mapPath=/scratch/assignment2/input_graphs/ --map=roadNet-CA
./pagerank/pagerank_parallel.bin --nWorkers 4 --inputFile /scratch/assignment2/input_graphs/roadNet-CA --strategy 3
In Assignment 2 we developed a parallel solution for PageRank where each thread works on (approximately) n/T
vertices, where n
is the number of vertices in the graph and T
is the number of threads. The pseudocode of this solution is shown below.
Create T threads
Distribute vertices to each thread such that each thread works on approximately n/T vertices
for each thread in parallel {
for(i=0; i<max_iterations; i++) {
for each vertex 'u' allocated to the thread {
edges_processed += outDegree(u) // used in output validation
for vertex 'v' in outNeighbor(u)
next_page_rank[v] += (current_page_rank[u]/outdegree[u])
}
barrier1 // barrier1_time -> cumulative time spent on this line
for each vertex 'v' allocated to the thread {
vertices_processed += 1 // used in output validation
compute the new_pagerank using the accumulated values in next_page_rank[v].
current_page_rank[v] = new_pagerank
Reset next_page_rank[v] to 0
}
barrier2 // barrier2_time -> cumulative time spent on this line
}
}
This is the naive vertex-based task decomposition
strategy, which will be used with --strategy 1
(i.e., add --strategy 1
in your solution).
Answer questions 4 and 5 in Report.
In this strategy, you have to distribute the vertices to each thread such that each thread is allocated approximately m/T
edges, where m
is the number of edges in the graph and T
is the number of workers. Below is the pseudo-code showing edge-based task decomposition:
Create T threads
Distribute vertices to each thread such that each thread works on approximately m/T edges
for each thread in parallel {
for(i=0; i<max_iterations; i++) {
for each vertex 'u' allocated to the thread {
edges_processed += outDegree(u) // used in output validation
for vertex 'v' in outNeighbor(u)
next_page_rank[v] += (current_page_rank[u]/outdegree[u])
}
barrier1
for each vertex 'v' allocated to the thread {
vertices_processed += 1 // used in output validation
compute the new_pagerank using the accumulated values in next_page_rank[v].
current_page_rank[v] = new_pagerank
Reset next_page_rank[v] to 0
}
barrier2
}
}
This strategy should be used with command-line parameter --strategy 2
.
Question 6 in report.
Instead of allocating approximately equal number of vertices (or edges) to threads, we can dynamically allocate work to each thread whenever they are free. In this strategy, each thread dynamically gets the next vertex to be computed until all the vertices are processed. Below is the pseudo-code showing dynamic task mapping (with vertex-based decomposition):
Create T threads
for each thread in parallel {
for(i=0; i<max_iterations; i++) {
while(true){
u = getNextVertexToBeProcessed();
if(u == -1) break;
edges_processed += outDegree(u) // used in output validation
for vertex v in outNeighbor(u)
next_page_rank[v] += (current_page_rank[u]/outdegree[u])
}
barrier1
while(true){
v = getNextVertexToBeProcessed();
if(v == -1) break;
vertices_processed += 1 // used in output validation
compute the new_pagerank using the accumulated values in next_page_rank[v].
current_page_rank[v] = new_pagerank
Reset next_page_rank[v] to 0
}
barrier2
}
}
This strategy should be used with command-line parameter --strategy 3
. Implement the above strategy in your solution and answer questions 7 and 8 in Assignment 2 - Report.
To reduce the time spent by each thread on the getNextVertexToBeProcessed()
, we will vary the task granularity so that each thread receives multiple vertices to be processed each time it calls getNextVertexToBeProcessed()
.
Update the dynamic load distribution logic as follows:
k
vertices and then calls the getNextVertexToBeProcessed()
. Here, k
determines the granularity of the work done by each thread before requesting new work. For example,
k = 1
, the thread calls getNextVertexToBeProcessed()
after processing each vertex.k = 1000
, the thread calls getNextVertexToBeProcessed()
after processing 1000 vertices.getNextVertexToBeProcessed()
function should return 0
, k
, 2k
, … depending on the granularity k
.k
should be provided at run time using command-line parameter. Eg: --granularity 100
Below is the pseudo-code showing the logic of our parallel solution:
k = 1000 // granularity
Create T threads
for each thread in parallel {
for(i=0; i<max_iterations; i++) {
while(true){
u = getNextVertexToBeProcessed()
if(u == -1) break;
for i = 0 to k {
edges_processed += outDegree(u) // used in output validation
for vertex v in outNeighbor(u)
next_page_rank[v] += (current_page_rank[u]/outdegree[u]
u++
if(u >= n) break; // n is the total number of vertices in the graph
}
}
barrier1
while(true){
v = getNextVertexToBeProcessed()
if(v == -1) break;
for i = 0 to k {
vertices_processed += 1 // used in output validation
compute the new_pagerank using the accumulated values in next_page_rank[v].
current_page_rank[v] = new_pagerank
Reset next_page_rank[v] to 0
v++
if(v >= n) break; // n is the total number of vertices in the graph
}
}
barrier2
}
}
This strategy should be used with command-line parameter --strategy 3
in conjunction with --granularity
. Implement the above strategy in your solution and answer questions 9 and 10.
Your parallel solution must output the following information:
1
if not applicable).[0, T)
)for
loop across all iterations. Refer the pseudocode of pagerank.for
loop across all iterations. Refer the pseudocode of pagerank.barrier1
(in seconds)barrier2
(in seconds)getNextVertexToBeProcessed()
(in seconds). Print 0 for vertex-based and edge-based task decomposition.get_start_vertex()
and get_end_vertex()
, you should time them as follows: int start_vertex = 0; end_vertex = 0;
timer t2;
double partitionTime = 0.0;
for(int tid=0; tid<number_of_workers; tid++){
t2.start();
start_vertex = get_start_vertex(tid);
end_vertex = get_end_vertex(tid);
partitionTime += t2.stop();
}
std::cout << "Partitioning time (in seconds) : " << partitioning_time << "\n";
If your task decomposition/mapping is carried out in the thread function, then get the time taken for task decomposition/mapping by thread 0 and print it in the main thread.
// Thread function
void threadFunction(int tid, double* partitionTime){
int start_vertex = 0; end_vertex = 0;
timer t2;
t2.start();
start_vertex = get_start_vertex(tid);
end_vertex = get_end_vertex(tid);
double t_partitionTime = t2.stop();
if(tid == 0){
*partitionTime = t_partitionTime;
}
// --- Other code ---
}
int main(){
double partitionTime = 0.0;
// --- Other code ---
for (uint i = 0; i < num_of_workers; i++){
threads[i] = std::thread(threadFunction, tid, &partitionTime);
}
// --- Other code ---
std::cout << "Partitioning time (in seconds) : " << partitioning_time << "\n";
}
The sample output can be found in sample_outputs/page_rank.output
.
Please note that the output format should strictly match the expected format (including “spaces” and “commas”). You can test your code using the test script as follows (only lj):
cd build
python2.7 ../pagerank/scripts/pagerank_tester.pyc --execPath=$PWD/pagerank/pagerank_parallel.bin --scriptPath=$PWD/../pagerank/scripts/pagerank_evaluator.pyc --mapPath=/scratch/assignment2/input_graphs/ --map=lj
@copyright Arrvindh Shriraman and Keval Vora