1.3.2 开发邮件分类器
通常按如下步骤开发邮件分类器:获取文件、解析单词、训练、评分、评判、分类、预测与验证。
1.获取文件
先编写测试代码。在setUp中准备好测试用数据并做好相关准备工作,编写读取文件的测试代码,函数名格式为test_xxx,此处为test_get_files_from_dir。
import unittest class TestEmailClassifier(unittest.TestCase): def setUp(self): # 内容为:notin book please PLEASE self.tmp_file = '../data/tmp/tmp.txt' #文件1内容:spam buy buy this book http www self.spam_dir = '../data/test_spam/' #文件1内容:ham this is ham please refer this book #文件2内容:ham please refer this book self.ham_dir = '../data/test_ham/' self.clf = EmailClassifier(self.spam_dir, self.ham_dir) def test_get_files_from_dir(self): a_bad = EmailClassifier.get_files_from_dir(self.spam_dir) a_good = EmailClassifier.get_files_from_dir(self.ham_dir) self.assertEqual(len(a_bad), 1) self.assertEqual(len(a_good), 2) # 主函数 if __name__ == '__main__': # 方式一:python -m unittest TestEmailClassifier #unittest.main() # 方式二 suite = unittest.defaultTestLoader.loadTestsFromTestCase( TestEmailClassifier) unittest.TextTestRunner().run(suite)
再编写功能代码:
import os import glob class EmailClassifier: @staticmethod def get_files_from_dir(path): return glob.glob(os.path.join(path, '*.*'))
运行上述测试主函数,保证代码测试通过。
2.解析单词
接上述代码,在TestEmailClassifier类中编写测试代码:
# 测试文本解析功能:全部小写,取集合 # expectation : e ;a = actual def test_get_words_from_file(self): e = set(['notin', 'book', 'please']) a = EmailClassifier.get_words_from_file(self.tmp_file) self.assertSetEqual(a, e)
再编写功能代码:
import re @staticmethod def get_words_from_file(file): with io.open(file, 'r') as f: c = re.findall('\w+', f.read().lower()) return set(c)
运行测试主函数,保证代码测试通过。
3.训练
朴素贝叶斯训练:统计单词。先编写测试代码:
def test_train(self): self.clf.train() self.assertEqual(self.clf.total_count['spam'], 6) self.assertEqual(self.clf.total_count['ham'], 11) self.assertEqual(self.clf.training['ham']['please'], 2) self.assertEqual(self.clf.training['spam']['buy'], 1)
再编写功能代码:
from collections import defaultdict class EmailClassifier: ''' spam:垃圾邮件 ham: 正常邮件 ''' def __init__(self, spam_dir, ham_dir): self.CAT = ['ham', 'spam'] self.spam_list = EmailClassifier.get_files_from_dir(spam_dir) self.ham_list = EmailClassifier.get_files_from_dir(ham_dir) # 记录每个类别下每个单词的计数 self.training = {c: defaultdict(float) for c in self.CAT} # 记录每个类别单词总数 self.total_count = {self.CAT[0]: 0, self.CAT[1]: 0} def train(self): # 单词统计 for t in zip(self.CAT, [self.ham_list, self.spam_list]): for s in t[1]: words = EmailClassifier.get_words_from_file(s) self.total_count[t[0]] += len(words) for ww in words: self.training[t[0]][ww] += 1
运行测试主函数,保证代码测试通过。
4.评分
测试代码:输入邮件文件得出其评分。
def test_score(self): a = { 'ham': round(2 / 3 * 1 / 12 * 3 / 12 * 3 / 12, 7) 'spam': round(1 / 3 * 1 / 7 * 2 / 7 * 1 / 7, 7) } e = self.clf.score(self.tmp_file) self.assertDictEqual(a, e)
功能代码:为了避免因预测集中出现训练集中未出现的单词而导致归零的情况,在统计时对分子、分母各加1。
# 增加先验 def __init__(self, spam_dir, ham_dir): p_ham = len(self.ham_list) / (len(self.ham_list) + len(self.spam_list)) self.P = {self.CAT[0]: p_ham, self.CAT[1]: 1 - p_ham} def score(self, email_file): if self.total_count[self.CAT[0]] == 0 or self.total_count[ self.CAT[1]] == 0: self.train() result = self.P.copy() for ww in EmailClassifier.get_words_from_file(email_file): for cc in self.CAT: v = self.training[cc][ww] p = (v + 1) / (self.total_count[cc] + 1) result[cc] *= p
运行测试主函数,保证代码测试通过。
5.评判
测试代码:输入得分字典,进行贝叶斯决策。
def test_judge(self): t = self.clf.score(self.tmp_file) e = {'ham':0.0034722} a = self.clf.judge(t) self.assertDictEqual(a, e)
功能代码:
@staticmethod def judge(score_dict): '''二分类''' keys = list(score_dict.keys()) if score_dict[keys[0]] >= score_dict[keys[1]]: return {keys[0]: score_dict[keys[0]]} else: return {keys[1]: score_dict[keys[1]]}
运行测试主函数,保证代码测试通过。
6.分类
测试代码:输入邮件文件,得出分类结果。
def test_classify(self): e = {'ham':0.0034722} a = self.clf.classify(self.tmp_file) self.assertDictEqual(a, e)
功能代码:
def classify(self, email_file): score = self.score(email_file) return self.judge(score)
运行测试主函数,保证代码测试通过。最终的测试通过提示如图1-8所示。
图1-8 unittest测试通过
在测试和开发结束后,就完成了核心功能点的开发,后续要检查功能或微调,最后将测试代码和功能代码一并归档提交。只要保证测试通过,就能保证原功能正常,这极大方便了后续迭代和维护。
7.预测与验证
查看分类器的效果:将准备好的数据分为训练集和验证集,二者的比率为8∶2,并分别将其放入不同的文件夹中处理。
#训练集 s = '../data/processed/spam_400/' h = '../data/processed/ham_1120/' clf = EmailClassifier(s, h) clf.train() #预测功能代码 def predict(clf, ham_dir, spam_dir): ham_list = EmailClassifier.get_files_form_dir(ham_dir) spam_list = EmailClassifier.get_files_form_dir(spam_dir) tp,fp,tn,fn = 0,0,0,0 for hh in ham_list: t = clf.classify(hh) if 'ham' in t: tn += 1 else: print(t) fn += 1 for ss in spam_list: t = clf.classify(ss) if 'spam' in t: tp += 1 else: print(t) fp += 1 accuracy = (tp + tn) / (tp+tn+fp+fn) print('accuracy:{}'.format(accuracy)) return tp,fp,tn,fn # 验证集上预测查看效果 v_s = '../data/validate/spam/' v_h = '../data/validate/ham/' predict(clf,h,s)
重复上述步骤会发现,最终得到的结果分类完全正确,这主要是score函数中连乘导致精度溢出所致。可以实验每项乘以1000进行微调:
p = 1000*(v + 1) / (self.total_count[cc] + 1)
同时,调整测试代码,保证在一定精度下测试代码全部通过,此情况下精度为0.98。朴素贝叶斯邮件分类器在数据量较少的情况下也能有不错的表现,一般随着数据量的增多,效果越来越好。