机器学习:软件工程方法与实现
上QQ阅读APP看书,第一时间看更新

1.3.2 开发邮件分类器

通常按如下步骤开发邮件分类器:获取文件、解析单词、训练、评分、评判、分类、预测与验证。

1.获取文件

先编写测试代码。在setUp中准备好测试用数据并做好相关准备工作,编写读取文件的测试代码,函数名格式为test_xxx,此处为test_get_files_from_dir。


import unittest
class TestEmailClassifier(unittest.TestCase):
    def setUp(self):
        # 内容为:notin book please PLEASE
        self.tmp_file = '../data/tmp/tmp.txt'
        #文件1内容:spam buy buy this book http www
        self.spam_dir = '../data/test_spam/'
        #文件1内容:ham this is ham please refer this book
        #文件2内容:ham please refer this book
        self.ham_dir = '../data/test_ham/'
        self.clf = EmailClassifier(self.spam_dir, self.ham_dir)
def test_get_files_from_dir(self):
        a_bad = EmailClassifier.get_files_from_dir(self.spam_dir)
        a_good = EmailClassifier.get_files_from_dir(self.ham_dir)
        self.assertEqual(len(a_bad), 1)
        self.assertEqual(len(a_good), 2)
# 主函数
if __name__ == '__main__':
# 方式一:python -m unittest TestEmailClassifier
#unittest.main()
# 方式二
suite = unittest.defaultTestLoader.loadTestsFromTestCase(
    TestEmailClassifier)
    unittest.TextTestRunner().run(suite)

再编写功能代码:


import os
import glob
class EmailClassifier:
    @staticmethod
    def get_files_from_dir(path):
        return glob.glob(os.path.join(path, '*.*'))

运行上述测试主函数,保证代码测试通过。

2.解析单词

接上述代码,在TestEmailClassifier类中编写测试代码:


# 测试文本解析功能:全部小写,取集合
# expectation : e ;a = actual
def test_get_words_from_file(self):
    e = set(['notin', 'book', 'please'])
    a = EmailClassifier.get_words_from_file(self.tmp_file)
    self.assertSetEqual(a, e)

再编写功能代码:


import re
@staticmethod
def get_words_from_file(file):
    with io.open(file, 'r') as f:
        c = re.findall('\w+', f.read().lower())
    return set(c)

运行测试主函数,保证代码测试通过。

3.训练

朴素贝叶斯训练:统计单词。先编写测试代码:


def test_train(self):
    self.clf.train()
    self.assertEqual(self.clf.total_count['spam'], 6)
    self.assertEqual(self.clf.total_count['ham'], 11)
    self.assertEqual(self.clf.training['ham']['please'], 2)
    self.assertEqual(self.clf.training['spam']['buy'], 1)

再编写功能代码:


from collections import defaultdict
class EmailClassifier:
    '''
    spam:垃圾邮件
    ham: 正常邮件
    '''
    def __init__(self, spam_dir, ham_dir):
        self.CAT = ['ham', 'spam']
        self.spam_list = EmailClassifier.get_files_from_dir(spam_dir)
        self.ham_list = EmailClassifier.get_files_from_dir(ham_dir)
        # 记录每个类别下每个单词的计数
        self.training = {c: defaultdict(float) for c in self.CAT}
        # 记录每个类别单词总数
        self.total_count = {self.CAT[0]: 0, self.CAT[1]: 0}
    def train(self):
        # 单词统计
        for t in zip(self.CAT, [self.ham_list, self.spam_list]):
            for s in t[1]:
                words = EmailClassifier.get_words_from_file(s)
                self.total_count[t[0]] += len(words)
                for ww in words:
                    self.training[t[0]][ww] += 1

运行测试主函数,保证代码测试通过。

4.评分

测试代码:输入邮件文件得出其评分。


def test_score(self):
    a = {
        'ham': round(2 / 3 * 1 / 12 * 3 / 12 * 3 / 12, 7)
        'spam': round(1 / 3 * 1 / 7 * 2 / 7 * 1 / 7, 7)
    }
    e = self.clf.score(self.tmp_file)
    self.assertDictEqual(a, e)

功能代码:为了避免因预测集中出现训练集中未出现的单词而导致归零的情况,在统计时对分子、分母各加1。


# 增加先验
def __init__(self, spam_dir, ham_dir):
        p_ham = len(self.ham_list) / (len(self.ham_list) + len(self.spam_list))
        self.P = {self.CAT[0]: p_ham, self.CAT[1]: 1 - p_ham}
    def score(self, email_file):
        if self.total_count[self.CAT[0]] == 0 or self.total_count[
            self.CAT[1]] == 0:
        self.train()
    result = self.P.copy()
    for ww in EmailClassifier.get_words_from_file(email_file):
        for cc in self.CAT:
            v = self.training[cc][ww]
            p = (v + 1) / (self.total_count[cc] + 1)
            result[cc] *= p

运行测试主函数,保证代码测试通过。

5.评判

测试代码:输入得分字典,进行贝叶斯决策。


def test_judge(self):
    t = self.clf.score(self.tmp_file)
    e = {'ham':0.0034722}
    a = self.clf.judge(t)
    self.assertDictEqual(a, e)

功能代码:


@staticmethod
    def judge(score_dict):
        '''二分类'''
        keys = list(score_dict.keys())
        if score_dict[keys[0]] >= score_dict[keys[1]]:
            return {keys[0]: score_dict[keys[0]]}
        else:
            return {keys[1]: score_dict[keys[1]]}

运行测试主函数,保证代码测试通过。

6.分类

测试代码:输入邮件文件,得出分类结果。


def test_classify(self):
    e = {'ham':0.0034722}
    a = self.clf.classify(self.tmp_file)
    self.assertDictEqual(a, e)

功能代码:


def classify(self, email_file):
    score = self.score(email_file)
    return self.judge(score)

运行测试主函数,保证代码测试通过。最终的测试通过提示如图1-8所示。

图1-8 unittest测试通过

在测试和开发结束后,就完成了核心功能点的开发,后续要检查功能或微调,最后将测试代码和功能代码一并归档提交。只要保证测试通过,就能保证原功能正常,这极大方便了后续迭代和维护。

7.预测与验证

查看分类器的效果:将准备好的数据分为训练集和验证集,二者的比率为8∶2,并分别将其放入不同的文件夹中处理。


#训练集
s = '../data/processed/spam_400/'
h = '../data/processed/ham_1120/'
clf = EmailClassifier(s, h)
clf.train()
#预测功能代码
def predict(clf, ham_dir, spam_dir):
    ham_list = EmailClassifier.get_files_form_dir(ham_dir)
    spam_list = EmailClassifier.get_files_form_dir(spam_dir)
    tp,fp,tn,fn = 0,0,0,0
    for hh in ham_list:
        t = clf.classify(hh)
        if 'ham' in t:
            tn += 1
        else:
            print(t)
            fn += 1  
    for ss in spam_list:
        t = clf.classify(ss)
        if 'spam' in t:
            tp += 1
        else:
            print(t)
            fp += 1
    accuracy = (tp + tn) / (tp+tn+fp+fn)
    print('accuracy:{}'.format(accuracy))
    return tp,fp,tn,fn
# 验证集上预测查看效果
v_s = '../data/validate/spam/'
v_h = '../data/validate/ham/'
predict(clf,h,s)

重复上述步骤会发现,最终得到的结果分类完全正确,这主要是score函数中连乘导致精度溢出所致。可以实验每项乘以1000进行微调:


p = 1000*(v + 1) / (self.total_count[cc] + 1)

同时,调整测试代码,保证在一定精度下测试代码全部通过,此情况下精度为0.98。朴素贝叶斯邮件分类器在数据量较少的情况下也能有不错的表现,一般随着数据量的增多,效果越来越好。