Learning Apache Apex
上QQ阅读APP看书,第一时间看更新

Application specifications

Let's start by transforming this placeholder application into an application that counts words – the Hello World equivalent for big data processing frameworks. The functionality is easy to understand and not very important, as our focus here is on the development process.

The full source code of the modified application is available at https://github.com/tweise/apex-samples/tree/master/wordcount. Here is the modified application assembly in Application.java:

@Override
public void populateDAG(DAG dag, Configuration conf)
{
LineByLineFileInputOperator lineReader = dag.addOperator("input",
new LineByLineFileInputOperator());
LineSplitter parser = dag.addOperator("parser", new LineSplitter());
UniqueCounter counter = dag.addOperator("counter", new UniqueCounter());
GenericFileOutputOperator<Object> output = dag.addOperator("output",
new GenericFileOutputOperator<>());
output.setConverter(new ToStringConverter());
dag.addStream("lines", lineReader.output, parser.input);
dag.addStream("words", parser.output, counter.data);
dag.addStream("counts", counter.count, output.input);
}

The pipeline reads from a file (LineByLineFileInputOperator), then each line is split into words (LineSplitter), then occurrences of each word are counted (UniqueCounter), and finally the result is written to the file (GenericFileOutputOperator). Apart from the LineSplitter operator, all other operators are part of the Apex library. After all the operators are added to the DAG, the pipeline is completed connecting the operator (through their ports) using addStream. This is the explicit style of composing the logical DAG (rather than using the high level API), hence the name compositional API. Note that ports must always be defined in their respective operators, and may not always be named input and output.