starter_code.ipynb 61.3 KB

搭建一个简单的问答系统 (Building a Simple QA System)

本次项目的目标是搭建一个基于检索式的简易的问答系统,这是一个最经典的方法也是最有效的方法。

不要单独创建一个文件,所有的都在这里面编写,不要试图改已经有的函数名字 (但可以根据需求自己定义新的函数)

预估完成时间: 5-10小时

检索式的问答系统

问答系统所需要的数据已经提供,对于每一个问题都可以找得到相应的答案,所以可以理解为每一个样本数据是 <问题、答案>。 那系统的核心是当用户输入一个问题的时候,首先要找到跟这个问题最相近的已经存储在库里的问题,然后直接返回相应的答案即可(但实际上也可以抽取其中的实体或者关键词)。 举一个简单的例子:

假设我们的库里面已有存在以下几个<问题,答案>:

  • <"贪心学院主要做什么方面的业务?”, “他们主要做人工智能方面的教育”>
  • <“国内有哪些做人工智能教育的公司?”, “贪心学院”>
  • <"人工智能和机器学习的关系什么?", "其实机器学习是人工智能的一个范畴,很多人工智能的应用要基于机器学习的技术">
  • <"人工智能最核心的语言是什么?", ”Python“>
  • .....

假设一个用户往系统中输入了问题 “贪心学院是做什么的?”, 那这时候系统先去匹配最相近的“已经存在库里的”问题。 那在这里很显然是 “贪心学院是做什么的”和“贪心学院主要做什么方面的业务?”是最相近的。 所以当我们定位到这个问题之后,直接返回它的答案 “他们主要做人工智能方面的教育”就可以了。 所以这里的核心问题可以归结为计算两个问句(query)之间的相似度。

项目中涉及到的任务描述

问答系统看似简单,但其中涉及到的内容比较多。 在这里先做一个简单的解释,总体来讲,我们即将要搭建的模块包括:

  • 文本的读取: 需要从相应的文件里读取(问题,答案)
  • 文本预处理: 清洗文本很重要,需要涉及到停用词过滤等工作
  • 文本的表示: 如果表示一个句子是非常核心的问题,这里会涉及到tf-idf, Glove以及BERT Embedding
  • 文本相似度匹配: 在基于检索式系统中一个核心的部分是计算文本之间的相似度,从而选择相似度最高的问题然后返回这些问题的答案
  • 倒排表: 为了加速搜索速度,我们需要设计倒排表来存储每一个词与出现的文本
  • 词义匹配:直接使用倒排表会忽略到一些意思上相近但不完全一样的单词,我们需要做这部分的处理。我们需要提前构建好相似的单词然后搜索阶段使用
  • 拼写纠错:我们不能保证用户输入的准确,所以第一步需要做用户输入检查,如果发现用户拼错了,我们需要及时在后台改正,然后按照修改后的在库里面搜索
  • 文档的排序: 最后返回结果的排序根据文档之间余弦相似度有关,同时也跟倒排表中匹配的单词有关

项目中需要的数据:

  1. dev-v2.0.json: 这个数据包含了问题和答案的pair, 但是以JSON格式存在,需要编写parser来提取出里面的问题和答案。
  2. glove.6B: 这个文件需要从网上下载,下载地址为:https://nlp.stanford.edu/projects/glove/, 请使用d=200的词向量
  3. spell-errors.txt 这个文件主要用来编写拼写纠错模块。 文件中第一列为正确的单词,之后列出来的单词都是常见的错误写法。 但这里需要注意的一点是我们没有给出他们之间的概率,也就是p(错误|正确),所以我们可以认为每一种类型的错误都是同等概率
  4. vocab.txt 这里列了几万个英文常见的单词,可以用这个词库来验证是否有些单词被拼错
  5. testdata.txt 这里搜集了一些测试数据,可以用来测试自己的spell corrector。这个文件只是用来测试自己的程序。

在本次项目中,你将会用到以下几个工具:

第一部分:对于训练数据的处理:读取文件和预处理

  • 文本的读取: 需要从文本中读取数据,此处需要读取的文件是dev-v2.0.json,并把读取的文件存入一个列表里(list)
  • 文本预处理: 对于问题本身需要做一些停用词过滤等文本方面的处理
  • 可视化分析: 对于给定的样本数据,做一些可视化分析来更好地理解数据

1.1节: 文本的读取

把给定的文本数据读入到qlistalist当中,这两个分别是列表,其中qlist是问题的列表,alist是对应的答案列表

In [62]:
import json
import matplotlib.pyplot as plt
def read_corpus():
    """
    读取给定的语料库,并把问题列表和答案列表分别写入到 qlist, alist 里面。 在此过程中,不用对字符换做任何的处理(这部分需要在 Part 2.3里处理)
    qlist = ["问题1", “问题2”, “问题3” ....]
    alist = ["答案1", "答案2", "答案3" ....]
    务必要让每一个问题和答案对应起来(下标位置一致)
    """
    # TODO 需要完成的代码部分 ...
    filename="./train-v2.0.json"
    qlist=[]
    alist=[]
    with open(filename) as f:
        data_array = json.load(f)['data']
        for data in data_array:
            paragraphs = data['paragraphs']
            for paragraph in paragraphs:
                qas = paragraph['qas']
                for qa in qas:
                    for qa in qas:
                        if 'plausible_answers' in qa:
                            qlist.append(qa['question'])
                            alist.append(qa['plausible_answers'][0]['text'])
                        else:
                            qlist.append(qa['question'])
                            alist.append(qa['answers'][0]['text'])


    with open('a_list.json','w') as file:
        json.dump(alist,file)
    assert len(qlist) == len(alist)  # 确保长度一样
    return qlist, alist

1.2 理解数据(可视化分析/统计信息)

对数据的理解是任何AI工作的第一步, 需要对数据有个比较直观的认识。在这里,简单地统计一下:

  • qlist出现的总单词个数
  • 按照词频画一个histogram plot
In [63]:
# TODO: 统计一下在qlist中总共出现了多少个单词? 总共出现了多少个不同的单词(unique word)?
#       这里需要做简单的分词,对于英文我们根据空格来分词即可,其他过滤暂不考虑(只需分词)
def count_word(qlist):
    qlist_dic={}
    word_total=0

    for line in qlist:
        words=line.strip().split()
        for item in words:
            word_total+=1
            if item in qlist_dic:
                qlist_dic[item]+=1
            else:
                qlist_dic[item]=1
    return qlist_dic,word_total
qlist,alist=read_corpus()
qlist_dic,word_total=count_word(qlist)
print (word_total,len(qlist_dic.keys()))
    
Out [63]:
10574217 76474
In [64]:
# TODO: 统计一下qlist中出现1次,2次,3次... 出现的单词个数, 然后画一个plot. 这里的x轴是单词出现的次数(1,2,3,..), y轴是单词个数。
#       从左到右分别是 出现1次的单词数,出现2次的单词数,出现3次的单词数... 
from collections import Counter
def plot_table(qlist_dic):
    words_freq=qlist_dic.values()
    words_freq_dic=Counter(words_freq)  #此时字典中的键就是词频,值就是具有相同词频的单词数量  
    x=sorted(words_freq_dic.keys()) #排序,确保词频,即X轴上递增
    y=[]
    #找到每个词频对应的单词数量
    for i in x:
        y.append(words_freq_dic[i])
    
    plt.plot(x, y, color="r", linestyle="-", marker="*", linewidth=1.0)
    plt.show()
plot_table(qlist_dic) 
In [65]:
# TODO: 从上面的图中能观察到什么样的现象? 这样的一个图的形状跟一个非常著名的函数形状很类似,能所出此定理吗? 
#     answer:  [Zip]'s law吧,就是低频的词占了绝大多数,高频词其实很少
#       就是不知道为啥叫Zip 
# 

1.3 文本预处理

此部分需要做文本方面的处理。 以下是可以用到的一些方法:

    1. 停用词过滤 (去网上搜一下 "english stop words list",会出现很多包含停用词库的网页,或者直接使用NLTK自带的)
    1. 转换成lower_case: 这是一个基本的操作
    1. 去掉一些无用的符号: 比如连续的感叹号!!!, 或者一些奇怪的单词。
    1. 去掉出现频率很低的词:比如出现次数少于10,20.... (想一下如何选择阈值)
    1. 对于数字的处理: 分词完只有有些单词可能就是数字比如44,415,把所有这些数字都看成是一个单词,这个新的单词我们可以定义为 "#number"
    1. lemmazation: 在这里不要使用stemming, 因为stemming的结果有可能不是valid word。
In [81]:
# TODO: 需要做文本方面的处理。 从上述几个常用的方法中选择合适的方法给qlist做预处理(不一定要按照上面的顺序,不一定要全部使用)
from nltk.corpus import stopwords
import string
#利用上面得到的有序词频列表x,选取百分比来确定低频值
def get_lowfreq_list(qlist_dic,percent):
    words_freq=qlist_dic.values()
    words_freq_dic=Counter(words_freq)  #此时字典中的键就是词频,值就是具有相同词频的单词数量  
    x=sorted(words_freq_dic.keys())
    
    index=float(percent)*len(x)
    index=int(index)+1#避免取0值
    low_freq=x[index]
    low_freq_list=[]
    for i,j in qlist_dic.items():
        if j<low_freq:
            low_freq_list.append(i)
    return low_freq_list
#不替换数字,也不去除低频词
def text_preprocess(input_list):
    #low_freq_list=get_lowfreq_list(qlist_dic,percent)
    stop_words = stopwords.words('english')
    new_list=[]
    for line in input_list:
        line=line.lower()
        line=''.join(c for c in line if c not in string.punctuation)#去除无用符号
        split_line=line.strip().split()
        #print(words)
        new_line=[word for word in split_line if word not in stop_words]
        filtered_line=' '.join(new_line)
        new_list.append(filtered_line)
    print(new_list[0])
    with open("qlist.json",'w')as file:
        json.dump(new_list,file)
    return new_list

qlist,alist=read_corpus()
new_list= text_preprocess(qlist)    # 更新后的问题列表
Out [81]:
beyonce start becoming popular

第二部分: 文本的表示

当我们做完必要的文本处理之后就需要想办法表示文本了,这里有几种方式

    1. 使用tf-idf vector
    1. 使用embedding技术如word2vec, bert embedding

下面我们分别提取这三个特征来做对比。

2.1 使用tf-idf表示向量

qlist中的每一个问题的字符串转换成tf-idf向量, 转换之后的结果存储在X矩阵里。 X的大小是: N* D的矩阵。 这里N是问题的个数(样本个数), D是词典库的大小

In [82]:
# TODO 
from sklearn.feature_extraction.text import TfidfVectorizer
def Tfidf_vec(qlist):
    vectorizer = TfidfVectorizer()# 定义一个tf-idf的vectorizer
    X = vectorizer.fit_transform(qlist)  
    # 结果存放在X矩阵里
    return X


#print(Tfidf_vec(qlist)[0])

2.2 使用wordvec + average pooling

词向量方面需要下载: https://nlp.stanford.edu/projects/glove/ (请下载glove.6B.zip),并使用d=200的词向量(200维)。国外网址如果很慢,可以在百度上搜索国内服务器上的。 每个词向量获取完之后,即可以得到一个句子的向量。 我们通过average pooling来实现句子的向量。

# TODO 基于Glove向量获取句子向量
def get_glove_vec(qlist):
    import numpy as np
    with open("glove.6B.200d.txt","r",encoding="utf-8") as glovefile:
        word2vec={}
        for i,each_line in enumerate(glovefile):
            each_vec=each_line.strip().split(' ')
            word2vec[each_vec[0]]=[float(i) for i in each_vec[1:]]
    with open("glove.json",'w')as file:
        json.dump(word2vec,file)
        
    print(word2vec['of'])
    X_w2v=[]    
    for line in qlist:
        words=line.strip().split()
        line_vec=[]
        count_word=0
        for word in words:
            if word in word2vec: 
                count_word+=1
                line_vec.append(word2vec[word])
        X_w2v.append(np.sum(np.array(line_vec),axis=0) /count_word)  
        #print(X_w2v)
             # 初始化完emb之后就可以对每一个句子来构建句子向量了,这个过程使用average pooling来实现
    X_w2v=np.asarray(X_w2v)
    return X_w2v
get_glove_vec(qlist)

2.3 使用BERT + average pooling

最近流行的BERT也可以用来学出上下文相关的词向量(contex-aware embedding), 在很多问题上得到了比较好的结果。在这里,我们不做任何的训练,而是直接使用已经训练好的BERT embedding。 具体如何训练BERT将在之后章节里体会到。 为了获取BERT-embedding,可以直接下载已经训练好的模型从而获得每一个单词的向量。可以从这里获取: https://github.com/imgarylai/bert-embedding , 请使用bert_12_768_12 当然,你也可以从其他source获取也没问题,只要是合理的词向量。

# TODO 基于BERT的句子向量计算
!pip install bert_embedding
def get_bert_vec(qlist):
    import numpy as np
    from bert_embedding import BertEmbedding
    bert_embedding = BertEmbedding(model='bert_12_768_12', dataset_name='book_corpus_wiki_en_cased')
    bert=[]
    for line in qlist:
        result = bert_embedding([line])
        item=result[0]
        if len(item[0])>0:
            bert.append(np.sum(np.array(item[1]),axis=0)/len(item[0]))
    X_bert =np.asarray(bert)   # 每一个句子的向量结果存放在X_bert矩阵里。行数为句子的总个数,列数为一个句子embedding大小。
    print(X_bert[0])
    return X_bert
get_bert_vec(qlist)

第三部分: 相似度匹配以及搜索

在这部分里,我们需要把用户每一个输入跟知识库里的每一个问题做一个相似度计算,从而得出最相似的问题。但对于这个问题,时间复杂度其实很高,所以我们需要结合倒排表来获取相似度最高的问题,从而获得答案。

3.1 tf-idf + 余弦相似度

我们可以直接基于计算出来的tf-idf向量,计算用户最新问题与库中存储的问题之间的相似度,从而选择相似度最高的问题的答案。这个方法的复杂度为O(N)N是库中问题的个数。

import heapq as hp
from sklearn.metrics.pairwise import cosine_similarity
#处理查询句子
def movestopwords(sentence):
    from nltk.corpus import stopwords
    # 英文的大写全部改成小写
    q_list_lower = sentence.lower()    
    # 把问题拆解成一个一个单词
    word_tokens =q_list_lower.strip().split()         
    # 过滤掉停用词
    ## 增加stopwords
    stop_words = set(stopwords.words('english'))
    stop_words.update(['.', ',', '"', "'", '?', '!', ':', ';', '(', ')', '[', ']', '{', '}'])
    filtered_words = [word for word in word_tokens if word not in stop_words]
    sent=" ".join(filtered_words)
    return sent
def get_top_results_tfidf_noindex(query):
    # TODO 需要编写
    """
    给定用户输入的问题 query, 返回最有可能的TOP 5问题。这里面需要做到以下几点:
    1. 对于用户的输入 query 首先做一系列的预处理(上面提到的方法),然后再转换成tf-idf向量(利用上面的vectorizer)
    2. 计算跟每个库里的问题之间的相似度
    3. 找出相似度最高的top5问题的答案
    """
    import heapq 
    with open('qlist.json','r')as file:
        q_list=file.read()
        qlist=json.loads(q_list)
    vectorizer = TfidfVectorizer()
    qlist_vec=vectorizer.fit_transform(qlist)
    #print(qlist_vec[0])
    sent=movestopwords(query)
    query_vec=vectorizer.transform([sent])
    #print(query_vec[0])
    #计算相似度最高的前5个
    
    all_result=cosine_similarity(qlist_vec,query_vec)[0]
    print(all_result)
    cos_list=list(all_result[0])
    top_idxs =map(cos_list.index, heapq.nlargest(5, cos_list)) 
    #print(top_idxs)
    top_answer= []  # top_idxs存放相似度最高的(存在qlist里的)问题的下标 
                   # hint: 请使用 priority queue来找出top results. 思考为什么可以这么做?
    with open('a_list.json','r')as file:
        a_list=file.read()
        alist=json.loads(a_list)
    for index in top_idxs:
        top_answer.append(alist[index])
    return top_answer  # 返回相似度最高的问题对应的答案,作为TOP5答案    
# TODO: 编写几个测试用例,并输出结果
sent1=movestopwords("when did Beyonce start singing")
print (get_top_results_tfidf_noindex(sent1))
sent2=movestopwords("what is machine learning")
print (get_top_results_tfidf_noindex(sent2))

你会发现上述的程序很慢,没错! 是因为循环了所有库里的问题。为了优化这个过程,我们需要使用一种数据结构叫做倒排表。 使用倒排表我们可以把单词和出现这个单词的文档做关键。 之后假如要搜索包含某一个单词的文档,即可以非常快速的找出这些文档。 在这个QA系统上,我们首先使用倒排表来快速查找包含至少一个单词的文档,然后再进行余弦相似度的计算,即可以大大减少时间复杂度

3.2 倒排表的创建

倒排表的创建其实很简单,最简单的方法就是循环所有的单词一遍,然后记录每一个单词所出现的文档,然后把这些文档的ID保存成list即可。我们可以定义一个类似于hash_map, 比如 inverted_index = {}, 然后存放包含每一个关键词的文档出现在了什么位置,也就是,通过关键词的搜索首先来判断包含这些关键词的文档(比如出现至少一个),然后对于candidates问题做相似度比较。

def create_inverted_index(qlist_dic):
    word_total = [item.lower() for item in qlist_dic.keys()]
    inverted_idx = {}  # 定一个一个简单的倒排表,是一个map结构。 循环所有qlist一遍就可以
    with open('q_list.json') as file:
        q_list=file.read()
        qlist=json.loads(q_list)

    for index,i in enumerate(word_total):
        tmp=[]
        j=0
        while j<len(qlist):
            field=qlist[j]
            split_field=field.split()
            if i in split_field:
                tmp.append(j)
            j+=1
        inverted_idx[i]=tmp
            
    with open('inverted_idx.json','w') as file_object:
        json.dump(inverted_idx,file_object)
    return inverted_idx

3.3 语义相似度

这里有一个问题还需要解决,就是语义的相似度。可以这么理解: 两个单词比如car, auto这两个单词长得不一样,但从语义上还是类似的。如果只是使用倒排表我们不能考虑到这些单词之间的相似度,这就导致如果我们搜索句子里包含了car, 则我们没法获取到包含auto的所有的文档。所以我们希望把这些信息也存下来。那这个问题如何解决呢? 其实也不难,可以提前构建好相似度的关系,比如对于car这个单词,一开始就找好跟它意思上比较类似的单词比如top 10,这些都标记为related words。所以最后我们就可以创建一个保存related words的一个map. 比如调用related_words['car']即可以调取出跟car意思上相近的TOP 10的单词。

那这个related_words又如何构建呢? 在这里我们仍然使用Glove向量,然后计算一下俩俩的相似度(余弦相似度)。之后对于每一个词,存储跟它最相近的top 10单词,最终结果保存在related_words里面。 这个计算需要发生在离线,因为计算量很大,复杂度为O(V*V), V是单词的总数。

这个计算过程的代码请放在related.py的文件里,然后结果保存在related_words.txt里。 我们在使用的时候直接从文件里读取就可以了,不用再重复计算。所以在此notebook里我们就直接读取已经计算好的结果。 作业提交时需要提交related.pyrelated_words.txt文件,这样在使用的时候就不再需要做这方面的计算了。

# TODO 读取语义相关的单词
def get_related_words(file):
    
    return related_words

related_words = get_related_words('related_words.txt') # 直接放在文件夹的根目录下,不要修改此路径。

3.4 利用倒排表搜索

在这里,我们使用倒排表先获得一批候选问题,然后再通过余弦相似度做精准匹配,这样一来可以节省大量的时间。搜索过程分成两步:

  • 使用倒排表把候选问题全部提取出来。首先,对输入的新问题做分词等必要的预处理工作,然后对于句子里的每一个单词,从related_words里提取出跟它意思相近的top 10单词, 然后根据这些top词从倒排表里提取相关的文档,把所有的文档返回。 这部分可以放在下面的函数当中,也可以放在外部。
  • 然后针对于这些文档做余弦相似度的计算,最后排序并选出最好的答案。

可以适当定义自定义函数,使得减少重复性代码

#第一大模块
#处理查询句子
def q_movestopwords(sentence):
    from nltk.corpus import stopwords
    # 英文的大写全部改成小写
    q_list_lower = sentence.lower()    

    # 把问题拆解成一个一个单词
    word_tokens =q_list_lower.strip().split()
    # 过滤掉停用词
    ## 增加stopwords
    stop_words = set(stopwords.words('english'))
    stop_words.update(['.', ',', '"', "'", '?', '!', ':', ';', '(', ')', '[', ']', '{', '}'])
    filtered_words = [word for word in word_tokens if word not in stop_words]
    filtered_sent=" ".join(filtered_words)

    return filtered_words,filtered_sent



#第二大模块
#取得查询语句意义相近的词
def get_related_words(filtered_list):
    import json
    # 读取word的相似词字典.json
    with open('related_words.json', 'r') as fp:
        related_words_dic = json.load(fp)
        
    simi_list = []
    for w in filtered_list:
        temp = related_words_dic[w]
        temp_list = temp[0]
        simi_list += temp_list
    return simi_list

#取得相近的词在整个问题列表中的索引
def get_inverted_idx(simi_list):
    import json
    # 读取word的倒排表字典.json
    with open('inverted_idx.json', 'r') as fp:
        inverted_idx_dic = json.load(fp)
    
    # 查找问题的index列表
    question_list_idnex = []
    for word in simi_list:
    
        # 只找字典中有的单词进行查找
        if word in inverted_idx_dic:
            question_list_idnex += inverted_idx_dic[word]

    # 清除重复的问题的index        
    index_q = list(set(question_list_idnex))
    #print(index_q)
    return index_q

#找到问题与问题集相似的所有问题
def get_total_q(index_q):
    import json
    with open('q_list.json','r')as file:
        tem_qlist=file.read()
        qlist=json.loads(tem_qlist)
        #print(qlist.shape)
    q_total_list = []
    for q in index_q:
        q_total_list.append(qlist[q])
    #print(q_total_list)
    return q_total_list

 
#找出前k个余弦相似度最高的问题
def get_cos_list(query,q_total_list,k):
    import heapq
    from sklearn.feature_extraction.text import TfidfVectorizer
    from sklearn.metrics.pairwise import cosine_similarity
    vectorizer = TfidfVectorizer()
    #query_list=query.strip().split()
    q_list_tfidf=vectorizer.fit_transform(q_total_list)
    sent_tfidf=vectorizer.transform([query])
    all_result=cosine_similarity(sent_tfidf,q_list_tfidf)
    #排序并记录最相似问题的索引
    cos_list=list(all_result[0])
    max_index =map(cos_list.index, heapq.nlargest(k, cos_list)) 
    return max_index
#找出最相似的问题    
def get_top_answer(max_index,q_total_clean_list):
    import json
    with open('a_list.json', 'r') as file:
        answers = file.read()
        alist = json.loads(answers)
    #由于记录最相似词在问题列表中的位置的索引表和问题的句子向量一一对应,故通过后者在列表中位置问题qlist的索引
    top_index=[]
    for i in max_index:
        if i <len(q_total_clean_list):
            top_index.append(q_total_clean_list[i])
     
    top_answer=[]    
    for i in top_index:
        top_answer.append([alist[i]])
    return top_answer
        
#合并所有流程
def get_top_results_tfidf(query):
    import time
    # 找出与问题与问题集中相似的问题
    filtered_words,filtered_sent=q_movestopwords(query)
    simi_list=get_related_words(filtered_words)
    q_total_index=get_inverted_idx(simi_list)
    q_total_list=get_total_q(q_total_index)   
 
    #取得新问题对相关问题的相似度
    start_time=time.time()
    max_index = get_cos_list(filtered_sent,q_total_list,5)
    end_time=time.time()
    print(end_time-start_time)
    #取得最接近的答案list
    top_answer=get_top_answer(max_index,q_total_index)
    return top_answer
#利用前面定义好的将文档转为glove向量的函数,query的glove表示单独处理
def get_w2v_list(filtered_sent,q_total_list,k):
    import heapq
    import numpy as np
    q_list_w2v=get_glove_vec(q_total_list)
    with open("glove.json",'r')as file:
        glove=file.read()
        glove_dic=json.loads(glove)
    #获取query的glove向量表示
    query=filtered_sent.strip().split()
    query_len=len(query)
    query_vec=[]
    for word in query:
        if word in glove_dic: 
            query_vec.append(glove_dic[word])
    final_vec=np.sum(np.array(query_vec))/query_len    
    all_result=cosine_similarity(final_vec,q_list_w2v)
    #排序并记录最相似问题的索引
    cos_list=list(all_result[0])
    max_index =map(cos_list.index, heapq.nlargest(k, cos_list)) 
    return max_index
#复用已有函数,只改变获取文档向量表示的函数
def get_top_results_w2v(query):
    import time
    # 找出与问题与问题集中相似的问题
    filtered_words,filtered_sent=q_movestopwords(query)
    simi_list=get_related_words(filtered_words)
    q_total_index=get_inverted_idx(simi_list)
    q_total_list=get_total_q(q_total_index)   
 
    #取得新问题对相关问题的相似度
    start_time=time.time()
    max_index = get_w2v_list(filtered_sent,q_total_list,5)
    end_time=time.time()
    print(end_time-start_time)
    #取得最接近的答案list
    top_answer=get_top_answer(max_index,q_total_index)
    return top_answer
 
#利用前面定义好的将文档转为bert向量的函数,query的glove表示单独处理
def get_bert_list(filtered_sent,q_total_list,k):
    import heapq
    q_list_bert=get_bert_vec(q_total_list)
    import numpy as np
    from bert_embedding import BertEmbedding
    bert_embedding = BertEmbedding(model='bert_12_768_12', dataset_name='book_corpus_wiki_en_cased')
    
    query=filtered_sent.strip().split()
    result = bert_embedding(query)
    item=result[0]
    query_bert=np.sum(np.array(item[1],axis=0))/len(item[0])
    all_result=cosine_similarity(query_bert,q_list_bert)
    #排序并记录最相似问题的索引
    cos_list=list(all_result[0])
    max_index =map(cos_list.index, heapq.nlargest(k, cos_list)) 
    return max_index
#复用已有函数,只改变获取文档向量表示的函数
def get_top_results_bert(query):
    import time
    # 找出与问题与问题集中相似的问题
    filtered_words,filtered_sent=q_movestopwords(query)
    simi_list=get_related_words(filtered_words)
    q_total_index=get_inverted_idx(simi_list)
    q_total_list=get_total_q(q_total_index)   
 
    #取得新问题对相关问题的相似度
    start_time=time.time()
    max_index = get_bert_list(filtered_sent,q_total_list,5)
    end_time=time.time()
    print(end_time-start_time)
    #取得最接近的答案list
    top_answer=get_top_answer(max_index,q_total_index)
    return top_answer
 
# TODO: 编写几个测试用例,并输出结果

test_query1 = "when did Beyonce start singing"
test_query2 = "what is machine learning"

print (get_top_results_tfidf(test_query1))
print (get_top_results_w2v(test_query1))
print (get_top_results_bert(test_query1))

print (get_top_results_tfidf(test_query2))
print (get_top_results_w2v(test_query2))
print (get_top_results_bert(test_query2))

4. 拼写纠错

其实用户在输入问题的时候,不能期待他一定会输入正确,有可能输入的单词的拼写错误的。这个时候我们需要后台及时捕获拼写错误,并进行纠正,然后再通过修正之后的结果再跟库里的问题做匹配。这里我们需要实现一个简单的拼写纠错的代码,然后自动去修复错误的单词。

这里使用的拼写纠错方法是课程里讲过的方法,就是使用noisy channel model。 我们回想一下它的表示:

$c^* = \text{argmax}{c\in candidates} ~~p(c|s) = \text{argmax}{c\in candidates} ~~p(s|c)p(c)$

这里的candidates指的是针对于错误的单词的候选集,这部分我们可以假定是通过edit_distance来获取的(比如生成跟当前的词距离为1/2的所有的valid 单词。 valid单词可以定义为存在词典里的单词。 c代表的是正确的单词, s代表的是用户错误拼写的单词。 所以我们的目的是要寻找出在candidates里让上述概率最大的正确写法c

$p(s|c)$,这个概率我们可以通过历史数据来获得,也就是对于一个正确的单词$c$, 有百分之多少人把它写成了错误的形式1,形式2... 这部分的数据可以从spell_errors.txt里面找得到。但在这个文件里,我们并没有标记这个概率,所以可以使用uniform probability来表示。这个也叫做channel probability。

$p(c)$,这一项代表的是语言模型,也就是假如我们把错误的$s$,改造成了$c$, 把它加入到当前的语句之后有多通顺?在本次项目里我们使用bigram来评估这个概率。 举个例子: 假如有两个候选 c1,c2 c_1, c_2, 然后我们希望分别计算出这个语言模型的概率。 由于我们使用的是bigram, 我们需要计算出两个概率,分别是当前词前面和后面词的bigram概率。 用一个例子来表示:

给定: We are go to school tomorrow, 对于这句话我们希望把中间的go替换成正确的形式,假如候选集里有个,分别是going, went, 这时候我们分别对这俩计算如下的概率: p(goingare)p(togoing) p(going|are)p(to|going)p(wentare)p(towent) p(went|are)p(to|went), 然后把这个概率当做是$p(c)$的概率。 然后再跟channel probability结合给出最终的概率大小。

那这里的$p(are|going)$这些bigram概率又如何计算呢?答案是训练一个语言模型! 但训练一个语言模型需要一些文本数据,这个数据怎么找? 在这次项目作业里我们会用到nltk自带的reuters的文本类数据来训练一个语言模型。当然,如果你有资源你也可以尝试其他更大的数据。最终目的就是计算出bigram概率。

4.1 训练一个语言模型

在这里,我们使用nltk自带的reuters数据来训练一个语言模型。 使用add-one smoothing

from nltk.corpus import reuters

# 读取语料库的数据
categories = reuters.categories()
corpus = reuters.sents(categories=categories)

# 文本整理成list和把所有不重复单词整理成list
all_word_list = []
all_word = []

for file in  reuters.fileids():
    file_word = reuters.words(file)
    all_word += list(file_word)
    all_word_list += [list(file_word)]
# 去除重复单词
total_word = list(set(all_word))


import json
# total_word和All_word资料整理成json
file_name_1 = 'all_word_list.json'
file_name_2 = 'total_word.json'
file_name_3 = 'all_word.json'

with open(file_name_1,'w') as file_object:
    json.dump(all_word_list, file_object)
with open(file_name_2,'w') as file_object:
    json.dump(total_word, file_object)
with open(file_name_3,'w') as file_object:
    json.dump(all_word, file_object) 
#计算bigram的几率值

# 循环所有的语料库并构建bigram probability. bigram[word1][word2]: 在word1出现的情况下下一个是word2的概率。 


def get_bigram_pro(word1,word2,all_word_list = None,total_word = None):
    if all_word_list == None:
        import json
        # 读取所有问题
        with open('all_word_list.json','r') as file:
            all_word_list = json.load(file)
    if total_word == None:
        import json
        # 读取所有不重复单字
        with open('total_word.json','r') as file:
            total_word = json.load(file)
    # 所有文檔内文字列表
    from nltk.util import ngrams
    text_bigrams = [ngrams(sent, 2) for sent in all_word_list]
    text_unigrams = [ngrams(sent, 1) for sent in all_word_list]
    
    # 计算数量
    from nltk.lm import NgramCounter
    ngram_counts = NgramCounter(text_bigrams + text_unigrams)  
    
    #计算几率用add-one smoothing
    word_count = ngram_counts[word1]
    join_count = ngram_counts[[word1]][word2]
    total_word_length = len(total_word)
    
    bigram_probability = (join_count + 1) / ( word_count + total_word_length)
    return bigram_probability



4.2 构建Channel Probs

基于spell_errors.txt文件构建channel probability, 其中$channel[c][s]$表示正确的单词$c$被写错成$s$的概率。

channel = {}

spell_error_dict = {}

for line in open('spell-errors.txt'):
    item = line.split(":")
    word = item[0].strip()
    spell_error_list = [word.strip( )for word in item[1].strip().split(",")]
    spell_error_dict[word] = spell_error_list
# print_format("spell_error_dict", spell_error_dict)

for key in spell_error_dict:
    if key not in channel:
        channel[key] = {}
        for value in spell_error_dict[key]:
            channel[key][value] = 1 / len(spell_error_dict[key])
print(channel['raining']['rainning'])

4.3 根据错别字生成所有候选集合

给定一个错误的单词,首先生成跟这个单词距离为1或者2的所有的候选集合。 这部分的代码我们在课程上也讲过,可以参考一下。

    #找出所有英文字
def word_lower(text):
    import re
    return re.findall('[a-z]+',text.lower())

def known(words):
    # 打开单词列表
    import json 
    with open('total_word.json','r') as file:
        words_N = json.load(file)    
    
    return set(w for w in words if w in words_N)
#获取编辑距离为1的单词
def edits1(word):
    # 替换的英文字
    alphabet = "abcdefghijklmnopqrstuvwxyz"
    
    n = len(word)
    #删除
    s1 = [word[0:i]+word[i+1:] for i in range(n)]
    #调换相邻的两个字母
    s2 = [word[0:i]+word[i+1]+word[i]+word[i+2:] for i in range(n-1)]
    #replace
    s3 = [word[0:i]+c+word[i+1:] for i in range(n) for c in alphabet]
    #插入
    s4 = [word[0:i]+c+word[i:] for i in range(n+1) for c in alphabet]
    edits1_words = set(s1+s2+s3+s4)
    edits1_words.remove(word)
    edits1_words = known(edits1_words)
    return edits1_words

#2
def edits2(word):
    edits2_words = set(e2 for e1 in edits1(word) for e2 in edits1(e1))
    #edits2_words.remove(word)
    edits2_words = known(edits2_words)
    return edits2_words

def generate_candidates(word):
    #大写变小写
    word_lower_clean = word_lower(word)
    word_clean = word_lower_clean[0]
    
    # 打开单词列表
    import json 
    with open('total_word.json','r') as file:
        words_N = json.load(file)
    
    # 纠错
    if word_clean not in words_N:
        candidates = edits1(word_clean) or edits2(word_clean)
        return candidates
    else:
        return None
    

4.4 给定一个输入,如果有错误需要纠正

给定一个输入query, 如果这里有些单词是拼错的,就需要把它纠正过来。这部分的实现可以简单一点: 对于query分词,然后把分词后的每一个单词在词库里面搜一下,假设搜不到的话可以认为是拼写错误的! 人如果拼写错误了再通过channelbigram来计算最适合的候选。

def get_bigram_chnnel_pro(tokens , index, all_word_list, total_word):
    # 前后文的单字
    front_word = tokens[index-1]
    now_word = tokens[index]
    after_word = tokens[index+1]
    
    #前后文的几率
    front_pro = get_bigram_pro(now_word , front_word , all_word_list , total_word)
    after_pro = get_bigram_pro(now_word , after_word , all_word_list , total_word)
    
    #计算bigram几率
    bigram_pro = front_pro * after_pro
    
    return bigram_pro
def spell_corector(line):
    # 1. 首先做分词,然后把``line``表示成``tokens``
    # 2. 循环每一token, 然后判断是否存在词库里。如果不存在就意味着是拼写错误的,需要修正。 
    #    修正的过程就使用上述提到的``noisy channel model``, 然后从而找出最好的修正之后的结果。
    
    # 句子先tokens,数据清洗
    line_clean = line.replace('?','')
    tokens = line_clean.split()
    
    
    #读取要使用的单字清单
    import json
    # 读取所有不重复单字
    with open('all_word_list.json','r') as file:
        all_word_list = json.load(file)
    # 读取还有所有问题
    with open('total_word.json','r') as file:
        total_word = json.load(file)
    
    # 逐个单字检查
    t = 0
    sentence = []
    
    while t < len(tokens):
        if tokens[t] not in total_word:

            # 找出备选相似词
            simility_list = list(generate_candidates(tokens[t]))
            
            # 列出备选项的几率列表,并找出几率最高的单词
            simility_pro_list = [get_bigram_chnnel_pro(tokens , t , all_word_list, total_word) for i in simility_list]
            
            # 避免专有名词影响,若找不到相似词,当专有名词
            if simility_pro_list == []:
                sentence += [tokens[t]]
            else:
                word_index = simility_pro_list.index(max(simility_pro_list))
                correct_word = simility_list[word_index]
            
                # 组成list
                sentence += [correct_word]
        
        else:
            sentence += [tokens[t]]
    
        t += 1

        # 将单词组合成句子
        newline = ' '.join(sentence)
    
    
    return newline 

4.5 基于拼写纠错算法,实现用户输入自动矫正

首先有了用户的输入query, 然后做必要的处理把句子转换成tokens的形状,然后对于每一个token比较是否是valid, 如果不是的话就进行下面的修正过程。

test_query1 = "when did beyonce staart singing"  # 拼写错误的
test_query2 = "what is machin learning"  # 拼写错误的

test_query1 = spell_corector(test_query1)
test_query2 = spell_corector(test_query2)

print (get_top_results_tfidf(test_query1))
print (get_top_results_w2v(test_query1))
print (get_top_results_bert(test_query1))

print (get_top_results_tfidf(test_query2))
print (get_top_results_w2v(test_query2))
print (get_top_results_bert(test_query2))

附录

在本次项目中我们实现了一个简易的问答系统。基于这个项目,我们其实可以有很多方面的延伸。

  • 在这里,我们使用文本向量之间的余弦相似度作为了一个标准。但实际上,我们也可以基于基于包含关键词的情况来给一定的权重。比如一个单词跟related word有多相似,越相似就意味着相似度更高,权重也会更大。
  • 另外 ,除了根据词向量去寻找related words也可以提前定义好同义词库,但这个需要大量的人力成本。
  • 在这里,我们直接返回了问题的答案。 但在理想情况下,我们还是希望通过问题的种类来返回最合适的答案。 比如一个用户问:“明天北京的天气是多少?”, 那这个问题的答案其实是一个具体的温度(其实也叫做实体),所以需要在答案的基础上做进一步的抽取。这项技术其实是跟信息抽取相关的。
  • 对于词向量,我们只是使用了average pooling, 除了average pooling,我们也还有其他的经典的方法直接去学出一个句子的向量。
  • 短文的相似度分析一直是业界和学术界一个具有挑战性的问题。在这里我们使用尽可能多的同义词来提升系统的性能。但除了这种简单的方法,可以尝试其他的方法比如WMD,或者适当结合parsing相关的知识点。

好了,祝你好运!