Artificial Intelligence for Big Data
上QQ阅读APP看书,第一时间看更新

Pipeline

Pipeline represents a sequence of stages, where every stage is a transformer or an estimator. All these stages run in an order and the dataset that is input is altered as it passes through every stage. For the stages of transformers, the transform () method is used, while for the stages of estimators, the fit() method is used to create a transformer.

Every DataFrame that is output from one stage is input for the next stage. The pipeline is also an estimator. Therefore, it produces PipelineModel once the fit() method is run. PipelineModel is a transformer. PipelineModel contains the same number of stages as in the original pipeline. PipelineModel and pipelines make sure that the test and training data pass through similar feature-processing steps. For instance, consider a pipeline with three stages: Tokenizer, which will tokenize the sentence and convert it into a word with the use of Tokenizer.transform()HashingTF, which is used to represent a string in a vector form as all ML algorithms understand only vectors and not strings and this uses the HashingTF.transform() method; and NaiveBayes, an estimator that is used for prediction.

We can save the model at HDFSlocation using the save() method, so in future we can load it using the load method and use it for prediction on the new dataset. This loaded model will work on the feature column of newDataset, and return the predicted column with this newDataset will also pass through all the stages of the pipeline:

import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
import org.apache.spark.ml.classification.NaiveBayes

val df = spark.createDataFrame(Seq(
("This is the Transformer", 1.0),
("Transformer is pipeline component", 0.0)
)).toDF( "text", "label")

val tokenizer = new Tokenizer().setInputCol("text").setOutputCol("words")

val HashingTF=newHashingTF().setNumFeatures(1000).setInputCol(tokenizer.getOutputCol).setOutputCol(“features”)

val nb = new NaiveBayes().setModelType("multinomial")

val pipeline = new Pipeline().setStages(Array(tokenizer, hashingTF, nb))
val model = pipeline.fit(df)
model.save("/HDFSlocation/Path/")
val loadModel = PipelineModel.load(("/HDFSlocation/Path/")

val PredictedData = loadModel.transform(newDataset)