Lab 5: MapReduce


Overview

A distributed system is a system whose components are located on different networked computers, which communicate and coordinate their actions by passing messages to one another. A well-designed distributed system provides not only efficiency and scalability, but also fault tolerance and economy. Hence, distributed systems are becoming more and more important in building today's cloud and big-data systems.

In this lab, you will be implementing, with C, some most important infrastructures in distributed computing, and thus gain a deeper understanding of the rationals behind distributed computing system design and implementation. First, you will learn what are remote procedure calls (RPCs), and will use them to build some interesting services. Second, you will implement MapReduce, one of the most influential distributed computing infrastructures. Third, you will implement a sharded key-value store, whereas similar systems powers many popular websites.

You should download this code to start with this lab.

Hand-in Procedure

When you finished the lab, zip you code files with file name ID-lab-5.zip (e.g., SA19225789-lab-5.zip), and submit it to Online Teaching Platform. The deadline is 23:59 of Nov. 12, 2023 (Beijing time). Any late submission will NOT be accepted.


Part A: Remote Procedure Calls (RPC)

Remote procedure calls are communications protocols allowing a program to call a program or service on another computer on a network. It allows us to execute a function on one machine and the function actually runs on another machine and returns the result. This mechanism is particularly suitable for distributed systems, allowing interaction between different systems and languages.

1. Software Setup

gorpc is an RPC framework written in Go language, providing a simple API that makes it easy to integrate RPC functionality into Go projects. gorpc utilizes efficient serialization and deserialization methods, as well as optimized network communication protocols, to deliver high-performance RPC services. In addition, gorpc supports a variety of transport protocols and serialization formats.

To use GoRPC, you first need to install the Go language in your environment:

    css-lab@tiger:~$ sudo apt install golang
    css-lab@tiger:~$ go version
    go version go1.18.1 linux/amd64
    
If you see the above information, the installation is successful.

2. Services and RPCs

In this part, we will create a square server using RPC. The client will send a number, and the server will square the number and return the result to the client.

Exercise 1: Read the code in partA/square folder and fill the missing code in square-client.c and square-server.c.

Next, we will implement a feature to count the number of words in a string. The string-client will send hello, world to the string-service, and the string-server will count the number of words and return the result.

Exercise 2. Read the assigned code string-service.c, and complete string-count functions

Similarly, we can also implement RPC-based cloud data structures. For example, you can design a stack and some of its operations on the server side and try to call it on the client side.

Exercise 3. Complete the code in stack-server.h and stack-server.s. Make sure your code passed the test in stack-client.c.

3. Concurrency and Locks

In a distributed scenario, a race condition is a situation where multiple distributed components attempt to access and modify a shared resource simultaneously, leading to unpredictable results. To avoid this, locks can be used to ensure that only one component can access or modify the resource at any given time. In this way, locks provide a mechanism to ensure that operations on shared resources are atomic, thereby eliminating potential race conditions.

When multiple threads try to modify a shared counter at the same time, data inconsistency may result if there is no appropriate synchronization mechanism. Specifically, two or more threads may read the same value of the counter at the same time, and then perform increment or decrement operations based on this value. In this way, their modifications may overwrite each other, resulting in the final count value not being the expected sum. This situation is often called a race condition.

Exercise 1. Compile and run the code in the folder partA/count, compile and run the code. What is your output? What is the expected output? How does this output generated?
Exercise 2. Modify the code in the partA/count folder, to protect the shared variable with mutex. Compile and run your code, what is the output?

When there are multiple clients or multiple threads accessing the sever, the stack of the RPC implementation also has data competition, so we need to use some methods to ensure data security

Exercise 3. Improve the code in stack-server.h and stack-server.s. Make sure your code passed the test in constack-client.c.

Part B: MapReduce

In this part, you need to build a MapReduce library for distributed computing, just like the Google MapReduce paper depicts. To make the task manageable, you will indeed build three versions of MapReduce incrementally:

  1. a sequential MapReduce where there is just a single thread to perform all Map tasks then Reduce tasks;
  2. a concurrent MapReduce where there are N workers to perform Map tasks and M workers to perform Reduce tasks and 1 master to coordinate all tasks, on a same server; and
  3. a distributed MapReduce where the workers might run on distinct servers.

1. Sequential MapReduce

Check the file-struct.h file for details of the function declarations. And, in conjunction with the two diagrams below, understand the data structure of single-threaded MapReduce.

Exercise 1: Complete the map_phase and reduce_phase functions in the seq-mr.c file to implement the functionality of single-threaded MapReduce.

Hint: You may need a dataset for MapReduce. You can generate raw data in the Utils directory by running the following command:
css-lab@tiger:~$ python3 generate-words.py 10 1M

2. Concurrent MapReduce

Sequential MapReduce involves the use of a single thread to sequentially execute Map and Reduce functions. In this section, you will be implementing Map and Reduce using a multithreaded model, and you will need to design appropriate data structures to address data synchronization challenges in concurrent processing.

Exercise 2: Design your data structures and implement thread communication and synchronization for concurrent processing, to complete MapReduce in a multithreaded single-machine environment.

3. Distributed MapReduce

In practical environments, the threads responsible for MapReduce processing are often distributed across multiple servers, also known as workers. Workers initiate connection requests to the Master to request tasks, and the Master assigns Map or Reduce tasks to the workers, thus completing distributed MapReduce.

Exercise 3: Implement distributed MapReduce using gorpc remote procedure calls. You need to design appropriate data structures to mitigate common conflicts and synchronization issues in a distributed context.

Part C: Sharded KV Stores

In this part, you will build a distributed sharded key-value (KV) storage system. A key-value storage system is a conceptual map from a key to its corresponding value. However, today's popular websites (e.g., Weibo) or applications (e.g., WeChat) might have more than billions of users, hence neither to store all user data nor to serve all requests is possible with a single server. To this end, a natural idea is to split all user data to store them on a group of servers, which is often called shard. The server storing the data is called a sharded KV store.

1. Sharded KV Stores

To utilize KV sharded storages, the client should maintain a configuration C telling which sharded KV server the client should talk to, in order to access the value associated with a specified key. More precisely, suppose we have n servers with addresses addr_i, 1<=i<=n, then we can maintain, on each client, the following configuration C

    C: [(addr_1, [l_1, h_1]), (addr_2, [l_2, h_2]), ..., (addr_n, [l_n, h_n])]

recording the address of the sharding server along with the range of hash values. Hence, for any key "k", if its hash value hash(k) is in the range [l_i, h_i], then we know its data resides on the sharding server addr_i.

To better understand this idea, let us study an illustrating example. Suppose we have four sharding servers with addresses from addr_0 to addr_3, and have the following configuration:

    C: [(addr_0, [0, 249]),
        (addr_1, [250, 499]),
        (addr_2, [500, 749]),
        (addr_3, [750, 999])]

Hence, for a sample username "ZhangSan", if we have

    hash("ZhangSan") % 1000 == 347

then we should talk to the server at address addr_1, to access its values.

Exercise 1: Implement the configuration.

A sharded KV store S maps a key "k" to a list of values v_1, ..., v_n associated with the key:

    S: k -> [v_1, v_2, ..., v_n]

Both the key k and the value v_i, 1<=i<=n, are strings. For example, when k is a username on a social media like WeChat, the values v_1 to v_n are all posts she has published in the "Moments".

A sharded KV store support a service with the following APIs:

    Get(k)      : retrieve the values associated with the key "k"
    Put(k, v)   : associate "k" to "v" (will clear "k"'s existing values)
    Append(k, v): append the value "v" to "k"'s value list; act as Put(k, v) is k does not exist
    Delete(k)   : remove the key "k" and all values associated with it
Exercise 2: Implement sharded KV stores.

2. Sharding Master

While you have built a sharded KV store running on multiple servers, the key limitation is that the store is static hence non-scalable. Specifically, servers cannot be added or removed. In this part of the lab, you will implement a sharding master, which will manage all sharding servers, supporting server additions or removals. In the meanwhile, key-value pairs on each sharding server should be moved between servers, reflecting server configuration changes. For example, when a server S is removed, all key-value pairs on that server S should be moved to other servers before S is removed, hence the data continue to be available despite the server removal.

In this part of the lab, you should concentrate on the sharding master implementation, but do not need to care about data movement between sharding servers. You will implement data movement in the next part of the lab.

The sharding master maintain the aforementioned server configuration C:

    C: [(addr_1, [l_1, h_1]), (addr_2, [l_2, h_2]), ..., (addr_n, [l_n, h_n])]

In the meanwhile, the sharding master runs a service supporting the following APIs:

    GetConfig()       : retrieve the current configuration
    AddServer(addr)   : add a new server with address "addr"
    RemoveServer(addr): remove a server with address "addr"

It should be noted that these APIs change the configuration C. For example, given a configuration:

    C: [(addr_0, [0, 249]),
        (addr_1, [250, 499]),
        (addr_2, [500, 749]),
        (addr_3, [750, 999])]

A call to the RPC AddServer(addr_4) will result in the following configuration:

    C: [(addr_0, [0, 199]),
        (addr_1, [200, 399]),
        (addr_2, [400, 599]),
        (addr_3, [600, 799]),
        (addr_4, [800, 999])]

Next, a call to RemoveServer(addr_2) will result in the following configuration:

    C: [(addr_0, [0, 249]),
        (addr_1, [250, 499]),
        (addr_3, [500, 749]),
        (addr_4, [750, 999])]
Exercise 3: Implement the code for the master configuration as well as the master APIs.

The configuration C is essentially a global property in that not only the sharding master but also each sharding server S should maintain a copy. In the meanwhile, the configuration C undergoes constant changes with the additions or removals of sharding servers. Hence, each sharding server S should make its local copy of configuration up-to-date, by invoking the sharding master's GetConfig() RPC periodically.

Exercise 4: Implement the configuration updating.

3. KV Shuffling

In this part of the lab, you will implement KV shuffling between sharding servers to move KV pairs. Specifically, when a new server S is added into the configuration, KV pairs on other servers might move to this new server S, according to the new configuration. Otherwise, when an existing server S is removed from the configuration, KV pairs on the server S should be moved to other sharding servers, before the server S leaves.

Exercise 5: Implement KV shuffling.


This completes this lab. Remember to zip you homework with file name ID-lab-5.zip (e.g., SA19225789-lab-5.zip), and submit it to Online Teaching Platform.

Happy hacking!