def mapper_get_words(self, _, line):
# yield each word in the line
for word in WORD_RE.findall(line):
yield (word.lower(), 1)
1
2
3
4
由于输入协议是RawValueProtocol,键始终是None ,值将是行的文本
该函数丢弃中键(key),并对每行的每个单词返回(word, 1)形式的元组。由于内部协议是JSONProtocol,输出都被序列化为JSON。序列化组件被写入stdout(标准输出),由制表符分隔并以换行符结尾,如下所示:
def combiner_count_words(self, word, counts):
# sum the words we've seen so far
yield (word, sum(counts))
def reducer_count_words(self, word, counts):
# send all (num_occurrences, word) pairs to the same reducer.
# num_occurrences is so we can easily use Python's max() function.
yield None, (sum(counts), word)
1
2
3
4
5
6
7
8
在这两种情况下,字节通过JSONProtocol被反序列化为键值对(word, counts) ,并以相同的方式输出被序列化为JSON(因为二者都后跟另一个步骤)。它看起来就像第一个mapper的输出,但结果被相加如下:
# discard the key; it is just None
def reducer_find_max_word(self, _, word_count_pairs):
# each item of word_count_pairs is (count, word),
# so yielding one results in key=counts, value=word
yield max(word_count_pairs)
1
2
3
4
5
由于此步骤的所有输入都具有相同的键(None),因此单个任务将获得所有行。同样,JSONProtocol将处理反序列化并生成参数给reducer_find_max_word()
# get input as raw strings
INPUT_PROTOCOL = RawValueProtocol
# pass data internally with pickle
INTERNAL_PROTOCOL = PickleProtocol
# write output as JSON
OUTPUT_PROTOCOL = JSONProtocol
1
2
3
4
5
6
7
8
如果您需要更复杂的行为,你可以重写 input_protocol(), internal_protocol()或者 output_protocol()方法并返回一个协议对象实例。这是一个定义命令行选项文档(https://pythonhosted.org/mrjob/g ... tml#writing-cl-opts)中的示例:
class CommandLineProtocolJob(MRJob):
def configure_options(self):
super(CommandLineProtocolJob, self).configure_options()
self.add_passthrough_option(
'--output-format', default='raw', choices=['raw', 'json'],
help="Specify the output format of the job")