# Introduction and MapReduce

## 课程介绍

本课主要介绍分布式系统所依赖的基础设施，即在机器集群之上提供给分布式应用的一层抽象。

## 分布式系统 (distributed system) 谈论的主要话题

* 实现工具 (Implementation)
  * RPC
  * threads
  * concurrency control
* 性能 (Performance)
  * 目标：N 台机器 => N 倍处理效率，如吞吐
  * 问题：当 N 越大，越难实现相应的扩展性
    * 负载不均衡或出现拖后腿 (stragglers) 的机器
    * 无法并行的代码：初始化，信息交流
    * 共享资源的瓶颈，如网络
    * 一些无法通过横向扩展提高性能的情况：
      * web 服务器处理单个用户请求
* 容错 (Fault Tolerance)
  * 问题：成千上网的机器中总会出现部分宕机的情况
  * 目标：向应用层隐藏机器的问题
    * Availability
    * Durability
  * 方案：复制服务器 (replica servers)
* 一致性 (consistency)
  * 目标：从系统中读取的永远是最新的信息
  * 问题：
    * 中途宕机：客户端执行多步写操作、服务器更新数据
    * 复制服务器之间的网络异常
  * 观点：
    * 一致性与性能常常相互排斥

## MapReduce

### 背景

在 TB 级别数据集上进行长时间的计算，如：

* 分析网页之间的关系图结构
* 大量日志数据离线分析

### 目标

使得没有分布式系统设计经验的程序员能够轻松地将计算分布到多台机器上

### 过程

![图1 - MapReduce 执行概览](https://1008303647-files.gitbook.io/~/files/v0/b/gitbook-legacy-files/o/assets%2F-LMjQD5UezC9P8miypMG%2F-LNYpvpcE48I08SBeSU8%2F-LNYqqKDZ6Ei92ajfnZV%2Fmapreduce.jpg?alt=media\&token=43e9a668-3c7e-4d3c-bdb5-1765bee93dcd)

\
**Higher Level Overview**

如图 1 从左到右所示：一个 MapReduce 任务会先把输入数据分成 M 份，分别由 M 个 worker 对其执行 Map 函数，产生 R 份中间结果；再由 R 个 worker 分别拉取相应的中间结果（同时对结果按 key 排序），对其执行 Reduce 函数，产生 R 份输出结果，这些结果既可被用作另一个 MapReduce 任务的输入，也可以被合并成一个文件输出。

**Lower Level Overview**

注意：以下序号与图 1 中的序号一一对应

1. MapReduce Library 首先将输入数据分成 M 份，通常每份大小为 16 - 64 MB，然后在集群上启动相应的程序
2. 集群上启动的程序分为两种，Master 和 Workers。Master 将 M 个 Map 任务和 R 个 Reduce 任务分配给集群中的 Workers
3. 每个接收到 Map 任务的 Worker 读入自己被分配到的输入数据，使用用户定义的 Map 函数将它解析成键值对，这些键值对作为中间结果被缓存在 Worker 的内存中
4. 内存中的中间结果会按周期写入 Worker 本地硬盘，同时被分成 R 个区域，所有相应的文件地址会被发送给 Master，后者将会把这些地址告诉执行 Reduce 任务的 Worker
5. 每个接收到 Reduce 任务的 Worker 得到中间结果的文件地址后，将使用 RPC 获取自己将要处理的部分数据文件，注意到图中的箭头，每个 Worker 都将从所有 M 个 Map Worker 的硬盘中读取自己要处理的部分数据文件，也就是步骤 4 中提到的 R 个区域。当 Worker 读取完所有中间结果后将按 key 对其排序，有必要时会使用 external sort
6. 每个接收到 Reduce 任务的 Worker 将遍历排序后的中间结果，将这些键值对传递给 Reduce 函数，得到 Reduce 结果文件
7. 当所有 Map、Reduce 任务被执行完毕后，master 将唤醒用户程序，进程回到用户程序中

**Example: word count**

```
map(String key, String value):
  // key: document name
  // value: document contents
  for each word in value:
    EmitIntermediate(w, "1");

reduce(String key, Iterator values):
  // key: a word
  // values: a list of counts
  int result = 0
  for each v in values:
    result += ParseInt(v);
  Emit(AsString(result))  
```

### 优势

* MapReduce 搞定许多繁琐易错的底层细节：
  * 在多台机器上启动程序
  * 跟踪任务进度
  * 在机器之间移动数据
  * 容错
* 伸缩性好
  * N 台机器 => Nx 吞吐量
    * 假设 M, R >= N，即有足够数量的 Worker
    * Map 任务之间无关，可以并行；Reduce 任务之间无关，可以并行

### 性能瓶颈

* 少量拖后腿的机器 (stragglers)
* Reduce 阶段数据读取的网络开销
* MapReduce 与 HDFS 之间的文件读写网络开销
* 硬盘读写开销

### 故障处理

**基本策略**

* 重试 (retry)
* 复制 (replicate)
* 代替 (replace)

**具体问题**

* Map Worker 宕机
  * Master 重试 （依赖于 Map 函数与 Reduce 函数遵循函数式编程风格）
* Reduce Worker 在执行 Reduce 函数或写输出结果时宕机
  * Master 重试
* Master 错误地给了两个 Worker 同一个 Map 任务
  * Master 只告诉 Reducer Worker 其中一个的结果
* Master 错误地给了两个 Worker 同一个 Reduce 任务
  * 两个 Worker 都会尝试在 HDFS 上写入相同的文件，后者的 atomic rename 可以保证原子性
* Master 宕机
  * 从 check-point 恢复，或放弃任务

### 适用任务

* 建立索引
* 推荐分析
* 建立倒排索引
* 统计

### Lab 1

[code](https://github.com/ZhengHe-MD/distributed-system-lab-codes)

## 参考

6.824 Lecture 1: [Youtube Video](https://www.youtube.com/watch?v=hBWfjkGKRas\&t=3311s\&index=2\&list=PLpl804R-ZwjKCOwWpTZ21eeaBS3uBrMfV), [Course Note](http://nil.csail.mit.edu/6.824/2018/notes/l01.txt), [Lab 1](http://nil.csail.mit.edu/6.824/2018/labs/lab-1.html)

MapReduce: Simplified Data Processing on Large Clusters [paper](https://static.googleusercontent.com/media/research.google.com/zh-CN/archive/mapreduce-osdi04.pdf)
