Naïve Bayes model on SageMaker notebooks using Apache Spark
In the previous section Classifying text with language models, we saw how you can train a model with scikit-learn on a SageMaker notebook instance. This is feasible for examples as small as the ones we collected from Twitter. What if, instead, we had hundreds of terabytes worth of tweet data? For starters, we would not be able to store the data in a single machine. Even if we could, it would probably take too long to train on such large dataset. Apache Spark solves this problem for us by implementing ML algorithms that can read data from distributed datasets (such as AWS S3) and can distribute the computing across many machines. AWS provides a product called Elastic MapReduce (EMR) that is capable of launching and managing clusters on which we can perform ML at scale.
Many of the ML algorithms require several passes over the data (although this is not the case for Naive Bayes). Apache Spark provides a way to cache the datasets in memory so that we can efficiently run algorithms that require several passes over the data (such as logistic regression or decision trees, which we will see in the following chapters). We will show how to launch EMR clusters in Chapter 4, Predicting User Behavior with Tree-Based Methods; however, in this section, we will present how similar it is to work with Apache Spark and scikit-learn. In fact, many of the interfaces in Apache Spark (such as pipelines, Transformers, and Estimators) were inspired by scikit-learn.
Apache Spark supports four main languages: R, Python, Scala, and Java. In this book we will use the Python flavor, also called PySpark. Even though our spark code will run on a single machine (that is, will run on our SageMaker notebook instance), it could run on multiple machines without any code changes if our data was larger and we had a Spark Cluster (in Chapter 4, Predicting User Behavior with Tree-Based Methods, we will dive into creating Spark Clusters with EMR).
In Spark, the first thing we need to do is to create a Spark session. We do this by first creating a Spark context, and then creating a session for SQL-like manipulation of data:
from pyspark.context import SparkContext
from pyspark.sql import SQLContext
sc = SparkContext('local', 'test')
sql = SQLContext(sc)
Since we will run Spark locally (on a single machine) we specify local. However, if we were to run this on a cluster, we would need to specify the master address of the cluster instead. Spark works with abstractions called DataFrames that allow us to manipulate huge tables of data using SQL-like operations.
Our first task will be to define DataFrames for our raw data:
from pyspark.sql.functions import lit
dems_df = sql.read.text("file://" + SRC_PATH + 'dem.txt')
gop_df = sql.read.text("file://" + SRC_PATH + 'gop.txt')
corpus_df = dems_df.select("value", lit(1).alias("label")).union(gop_df.select("value", lit(0).alias("label")))
In the first two lines, we create DataFrames out of our raw tweets. We also create corpus_df, which contains both sources of tweets, and add the label by creating a column with a literal of 1 for Democrats and 0 for Republicans:
>>> corpus_df.select("*").limit(2).show()
+--------------------+-----+
| value|label|
+--------------------+-----+
|This ruling is th...| 1 . |
|No president shou...| 1 . |
+--------------------+-----+
Spark works in a lazy fashion, so, even though we defined and unioned the DataFrame, no actual processing will happen until we perform the first operation on the data. In our case, this will be the splitting of the DataFrame into testing and training:
train_df, test_df = corpus_df.randomSplit([0.75, 0.25])
Now, we are ready to train our model. Spark supports the same concept of pipelines. We will build a pipeline with the necessary transformations for our model. It's very similar to our previous example, except that Spark has two separate stages for tokenization and stopword removal:
from pyspark.ml import Pipeline
from pyspark.ml.feature import CountVectorizer, Tokenizer, StopWordsRemover
tokenizer = Tokenizer(inputCol="value", outputCol="words")
stop_words_remover = StopWordsRemover(inputCol="words", outputCol="words_cleaned")
vectorizer = CountVectorizer(inputCol="words_cleaned", outputCol="features")
cleaning_pipeline = Pipeline(stages = [tokenizer, stop_words_remover, vectorizer])
cleaning_pipeline_model = cleaning_pipeline.fit(corpus_df)
cleaned_training_df = cleaning_pipeline_model.transform(train_df)
cleaned_testing_df = cleaning_pipeline_model.transform(test_df)
As you can see in the preceding code, we defined a pipeline with all the necessary stages to clean the data. Each stage will transform the original DataFrame (which only has two columns value, which are the raw tweet text and label) and add more columns.
In the following code, the relevant columns used at training time are the features (a sparse vector representing the BoWs exactly like our scikit-learn example) and the label:
>>> cleaned_training_df.show(n=3)
+-----------+------------------+-------------+--------------------+
| value |label| . words . |words_cleaned| features |
+-----------+------------------+-------------+--------------------+
|#Tuesday...| 1 . |[#tuesday...|[#tuesday... |(3025,[63,1398,18...|
|#WorldAI...| 1 . |[#worlda....|[#worldai... |(3025,[37,75,155,...|
|@Tony4W....| 1 . |[.@tony4w...|[.@tony4w... |(3025,[41,131,160...|
+-----------------+------------+-------------+--------------------+
By specifying these columns to the NaiveBayes classifier we can train a model:
from pyspark.ml.classification import NaiveBayes
naive_bayes = NaiveBayes(featuresCol="features", labelCol="label")
The model is a transformer that can provide predictions for each row in our training DataFrame:
naive_bayes_model = naive_bayes.fit(cleaned_training_df)
predictions_df = naive_bayes_model.transform(cleaned_testing_df)
>>> predictions_df.select("features", "label", "prediction").limit(3).show()
+--------------------+-----+----------+
| features |label|prediction|
+--------------------+-----+----------+
|(3025,[1303,1858,...| 1 . | 1.0 |
|(3025,[1,20,91,13...| 1 . | 1.0 |
|(3025,[16,145,157...| 1 . | 1.0 |
+--------------------+-----+----------+
Similar to our previous example, we can evaluate the accuracy of our models. By using the MulticlassClassificationEvaluator class and specifying the actual and predicted labels, we can obtain accuracy:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(
labelCol="label", predictionCol="prediction", metricName="accuracy")
evaluator.evaluate(predictions_df)
The output is 0.93, which is similar to the results we had on scikit-learn.