starter_code.ipynb 70.1 KB

搭建一个简单的问答系统 (Building a Simple QA System)

本次项目的目标是搭建一个基于检索式的简易的问答系统,这是一个最经典的方法也是最有效的方法。

不要单独创建一个文件,所有的都在这里面编写,不要试图改已经有的函数名字 (但可以根据需求自己定义新的函数)

预估完成时间: 5-10小时

检索式的问答系统

问答系统所需要的数据已经提供,对于每一个问题都可以找得到相应的答案,所以可以理解为每一个样本数据是 <问题、答案>。 那系统的核心是当用户输入一个问题的时候,首先要找到跟这个问题最相近的已经存储在库里的问题,然后直接返回相应的答案即可(但实际上也可以抽取其中的实体或者关键词)。 举一个简单的例子:

假设我们的库里面已有存在以下几个<问题,答案>:

  • <"贪心学院主要做什么方面的业务?”, “他们主要做人工智能方面的教育”>
  • <“国内有哪些做人工智能教育的公司?”, “贪心学院”>
  • <"人工智能和机器学习的关系什么?", "其实机器学习是人工智能的一个范畴,很多人工智能的应用要基于机器学习的技术">
  • <"人工智能最核心的语言是什么?", ”Python“>
  • .....

假设一个用户往系统中输入了问题 “贪心学院是做什么的?”, 那这时候系统先去匹配最相近的“已经存在库里的”问题。 那在这里很显然是 “贪心学院是做什么的”和“贪心学院主要做什么方面的业务?”是最相近的。 所以当我们定位到这个问题之后,直接返回它的答案 “他们主要做人工智能方面的教育”就可以了。 所以这里的核心问题可以归结为计算两个问句(query)之间的相似度。

项目中涉及到的任务描述

问答系统看似简单,但其中涉及到的内容比较多。 在这里先做一个简单的解释,总体来讲,我们即将要搭建的模块包括:

  • 文本的读取: 需要从相应的文件里读取(问题,答案)
  • 文本预处理: 清洗文本很重要,需要涉及到停用词过滤等工作
  • 文本的表示: 如果表示一个句子是非常核心的问题,这里会涉及到tf-idf, Glove以及BERT Embedding
  • 文本相似度匹配: 在基于检索式系统中一个核心的部分是计算文本之间的相似度,从而选择相似度最高的问题然后返回这些问题的答案
  • 倒排表: 为了加速搜索速度,我们需要设计倒排表来存储每一个词与出现的文本
  • 词义匹配:直接使用倒排表会忽略到一些意思上相近但不完全一样的单词,我们需要做这部分的处理。我们需要提前构建好相似的单词然后搜索阶段使用
  • 拼写纠错:我们不能保证用户输入的准确,所以第一步需要做用户输入检查,如果发现用户拼错了,我们需要及时在后台改正,然后按照修改后的在库里面搜索
  • 文档的排序: 最后返回结果的排序根据文档之间余弦相似度有关,同时也跟倒排表中匹配的单词有关

项目中需要的数据:

  1. dev-v2.0.json: 这个数据包含了问题和答案的pair, 但是以JSON格式存在,需要编写parser来提取出里面的问题和答案。
  2. glove.6B: 这个文件需要从网上下载,下载地址为:https://nlp.stanford.edu/projects/glove/, 请使用d=200的词向量
  3. spell-errors.txt 这个文件主要用来编写拼写纠错模块。 文件中第一列为正确的单词,之后列出来的单词都是常见的错误写法。 但这里需要注意的一点是我们没有给出他们之间的概率,也就是p(错误|正确),所以我们可以认为每一种类型的错误都是同等概率
  4. vocab.txt 这里列了几万个英文常见的单词,可以用这个词库来验证是否有些单词被拼错
  5. testdata.txt 这里搜集了一些测试数据,可以用来测试自己的spell corrector。这个文件只是用来测试自己的程序。

在本次项目中,你将会用到以下几个工具:

第一部分:对于训练数据的处理:读取文件和预处理

  • 文本的读取: 需要从文本中读取数据,此处需要读取的文件是dev-v2.0.json,并把读取的文件存入一个列表里(list)
  • 文本预处理: 对于问题本身需要做一些停用词过滤等文本方面的处理
  • 可视化分析: 对于给定的样本数据,做一些可视化分析来更好地理解数据

1.1节: 文本的读取

把给定的文本数据读入到qlistalist当中,这两个分别是列表,其中qlist是问题的列表,alist是对应的答案列表

In [1]:
def read_corpus():
    """
    读取给定的语料库,并把问题列表和答案列表分别写入到 qlist, alist 里面。 在此过程中,不用对字符换做任何的处理(这部分需要在 Part 2.3里处理)
    qlist = ["问题1", “问题2”, “问题3” ....]
    alist = ["答案1", "答案2", "答案3" ....]
    务必要让每一个问题和答案对应起来(下标位置一致)
    """
    # TODO 需要完成的代码部分 ...
    import json
    import jsonpath
    
    with open('train-v2.0.json', 'r', encoding='utf-8') as f:
        dic = json.load(f)
    
    qlist = jsonpath.jsonpath(dic,'$..question') 
    alist = jsonpath.jsonpath(dic,'$..text') 
    f.close()
    assert len(qlist) == len(alist)  # 确保长度一样
    
    return qlist, alist

1.2 理解数据(可视化分析/统计信息)

对数据的理解是任何AI工作的第一步, 需要对数据有个比较直观的认识。在这里,简单地统计一下:

  • qlist出现的总单词个数
  • 按照词频画一个histogram plot
In [2]:
# TODO: 统计一下在qlist中总共出现了多少个单词? 总共出现了多少个不同的单词(unique word)?
#       这里需要做简单的分词,对于英文我们根据空格来分词即可,其他过滤暂不考虑(只需分词)
from nltk import word_tokenize

qlist, alist = read_corpus()

qwords = []
for sentence in qlist:
    qwords.append(word_tokenize(sentence))

mydist = {}

for sentence in qwords:
    for word in sentence:
        if word not in mydist:
            mydist[word] =1
        else:
            mydist[word] +=1

word_total = len(mydist)
print (word_total)
Out [2]:
53157
In [3]:
from nltk import word_tokenize
from nltk import FreqDist
%matplotlib inline

fdist = FreqDist(mydist)
fdist.plot(20, cumulative=True)
In [5]:
# TODO: 从上面的图中能观察到什么样的现象? 这样的一个图的形状跟一个非常著名的函数形状很类似,能所出此定理吗? 
#       hint: [XXX]'s law
# 
#  zip's law

1.3 文本预处理

此部分需要做文本方面的处理。 以下是可以用到的一些方法:

    1. 停用词过滤 (去网上搜一下 "english stop words list",会出现很多包含停用词库的网页,或者直接使用NLTK自带的)
    1. 转换成lower_case: 这是一个基本的操作
    1. 去掉一些无用的符号: 比如连续的感叹号!!!, 或者一些奇怪的单词。
    1. 去掉出现频率很低的词:比如出现次数少于10,20.... (想一下如何选择阈值)
    1. 对于数字的处理: 分词完只有有些单词可能就是数字比如44,415,把所有这些数字都看成是一个单词,这个新的单词我们可以定义为 "#number"
    1. lemmazation: 在这里不要使用stemming, 因为stemming的结果有可能不是valid word。
In [4]:
# TODO: 需要做文本方面的处理。 从上述几个常用的方法中选择合适的方法给qlist做预处理(不一定要按照上面的顺序,不一定要全部使用)
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer

stop_words = set(stopwords.words('english'))
wnl = WordNetLemmatizer()
f_list = []
english_punctuations = ['(', ')', '[', ']', '&', '!!', '*', '@', '#', '$', '%', '?']

def sentencesList_preprocess(sentencesList):

    for sentence in sentencesList:
        # filter stop words
        filtered = [word for word in sentence if word not in stop_words]
        
        # to lower case
        lower_case = [word.lower() for word in filtered]
        
        # lemmatization
        lemma = [wnl.lemmatize(token) for token in lower_case]
        
        # remove unused symbols
        sym = [word for word in lemma if word not in english_punctuations]
        
        f_list.append(sym)
    
    return f_list

qlist = sentencesList_preprocess(qwords)    # 更新后的问题列表
In [5]:
def sentence_preprocess(sentence):
    # filter stop words
    filtered = [word for word in sentence if word not in stop_words]
    # to lower case
    lower_case = [word.lower() for word in filtered]
    # lemmatization
    lemma = [wnl.lemmatize(token) for token in lower_case]
    # remove unused symbols
    token = [word for word in lemma if word not in english_punctuations]

    return token

第二部分: 文本的表示

当我们做完必要的文本处理之后就需要想办法表示文本了,这里有几种方式

    1. 使用tf-idf vector
    1. 使用embedding技术如word2vec, bert embedding

下面我们分别提取这三个特征来做对比。

2.1 使用tf-idf表示向量

qlist中的每一个问题的字符串转换成tf-idf向量, 转换之后的结果存储在X矩阵里。 X的大小是: N* D的矩阵。 这里N是问题的个数(样本个数), D是词典库的大小

In [6]:
# TODO 
from sklearn.feature_extraction.text import TfidfVectorizer

vectorizer = TfidfVectorizer()   # 定义一个tf-idf的vectorizer

qlistnew = [' '.join(sentence) for sentence in qlist]
X_tfidf = vectorizer.fit_transform(qlistnew)  # 结果存放在X矩阵里

2.2 使用wordvec + average pooling

词向量方面需要下载: https://nlp.stanford.edu/projects/glove/ (请下载glove.6B.zip),并使用d=200的词向量(200维)。国外网址如果很慢,可以在百度上搜索国内服务器上的。 每个词向量获取完之后,即可以得到一个句子的向量。 我们通过average pooling来实现句子的向量。

In [9]:
# load Glove
def loadGlove(path):
    vocab = {}
    embedding = []
    vocab["UNK"] = 0
    embedding.append([0]*200)
    file = open(path, 'r', encoding='utf8')
    i = 1
    for line in file:
        row = line.strip().split()
        vocab[row[0]] = i
        embedding.append(row[1:])
        i += 1
    print("Finish load Glove")
    file.close()
    return vocab, embedding

path = '../glove.6b/glove.6b.200d.txt'
voc, emb = loadGlove(path)
Out [9]:
Finish load Glove
In [11]:
def sentence_to_vec(embedding, sentence):
    vec = np.zeros((200,), dtype=np.float64)
    for word in sentence:
        if word in voc:
            idx = voc[word]
            vec += embedding[idx].astype('float64')
        else:
            vec += embedding[0].astype('float64')
    vec = vec/len(sentence)
    return vec
In [12]:
def sentences_to_vec(embedding, sentences):
    vec = np.zeros((len(sentences), 200))
    for i, sentence in enumerate(sentences):
        sentence = sentence.strip().split(' ')
        vec[i] = sentence_to_vec(embedding, sentence)
                
    return vec
In [14]:
import numpy as np
emc = np.asarray(emb)
In [15]:
# TODO 基于Glove向量获取句子向量
# emb  =  # 这是 D*H的矩阵,这里的D是词典库的大小, H是词向量的大小。 这里面我们给定的每个单词的词向量,
        # 这需要从文本中读取
    
X_w2v = sentences_to_vec(emc, qlistnew)   # 初始化完emb之后就可以对每一个句子来构建句子向量了,这个过程使用average pooling来实现
# for i in voc:
#     v = emb[v]
#     res = list(cosine_similarity(emb, v)[0])
#     result = sorted(get_top_numbers(res, 10), reverse = True)

2.3 使用BERT + average pooling

最近流行的BERT也可以用来学出上下文相关的词向量(contex-aware embedding), 在很多问题上得到了比较好的结果。在这里,我们不做任何的训练,而是直接使用已经训练好的BERT embedding。 具体如何训练BERT将在之后章节里体会到。 为了获取BERT-embedding,可以直接下载已经训练好的模型从而获得每一个单词的向量。可以从这里获取: https://github.com/imgarylai/bert-embedding , 请使用bert_12_768_12 当然,你也可以从其他source获取也没问题,只要是合理的词向量。

In [9]:
import numpy as np
from bert_embedding.bert import BertEmbedding
bert_embedding = BertEmbedding(model='bert_12_768_12', dataset_name='book_corpus_wiki_en_uncased')
In [10]:
def sentences_to_vec_bert(sentences):
    X_bert = np.zeros((len(sentences), 768))
    for i in range(len(sentences)):
        emb = bert_embedding(sentence, 'avg')
        vec_list = emb[0][1]
#         v_bert = np.zeros((768,), dtype=np.float32)
        
#         for vec in vec_list:
#             v_bert += vec.astype('float32')
#         v_bert = v_bert/len(vec_list)
        v_bert = np.average(vec_list, axis=0)
        X_bert[i] = v_bert
    return X_bert
In [11]:
def sentence_to_vec_bert(sentence):
    emb = bert_embedding([sentence], 'avg')
    vec_list = emb[0][1]
    v_bert = np.sum(vec_list, axis=0)
    
    return v_bert
In [12]:
# TODO 基于BERT的句子向量计算

X_bert = sentences_to_vec_bert(qlistnew) # 每一个句子的向量结果存放在X_bert矩阵里。行数为句子的总个数,列数为一个句子embedding大小。 
In [12]:
import heapq
from sklearn.metrics.pairwise import cosine_similarity

# return the top k numbers from the list using prority queue
def get_top_numbers(tlist, k):
    max_heap = []
    l = len(tlist)
    if l<=0 or k<=0 or k>l: return None
    
    for i in tlist:
        if k > len(max_heap): heapq.heappush(max_heap, i)
        else: heapq.heappushpop(max_heap, i)
    
    return max_heap

3.1 tf-idf + 余弦相似度

我们可以直接基于计算出来的tf-idf向量,计算用户最新问题与库中存储的问题之间的相似度,从而选择相似度最高的问题的答案。这个方法的复杂度为O(N)N是库中问题的个数。

In [13]:
def get_top_results_tfidf_noindex(query):
    # TODO 需要编写
    """
    给定用户输入的问题 query, 返回最有可能的TOP 5问题。这里面需要做到以下几点:
    1. 对于用户的输入 query 首先做一系列的预处理(上面提到的方法),然后再转换成tf-idf向量(利用上面的vectorizer)
    2. 计算跟每个库里的问题之间的相似度
    3. 找出相似度最高的top5问题的答案
    """
    query = word_tokenize(query)
    query= sentence_preprocess(query)
    query = ' '.join(query)
    
    q_tfidf = vectorizer.transform([query])
    res = list(cosine_similarity(q_tfidf, X_tfidf)[0])
    result = sorted(get_top_numbers(res, 5), reverse = True)
    
    top_idxs = []  # top_idxs存放相似度最高的(存在qlist里的)问题的下标 
                   # hint: 请使用 priority queue来找出top results. 思考为什么可以这么做? 
    dict_visited = {}

    for r in result:
        for i, n in enumerate(res):
            if n == r and i not in dict_visited: 
                top_idxs.append(i)
                dict_visited[i] = True

    ans = [alist[i] for i in top_idxs]
    
    return ans  # 返回相似度最高的问题对应的答案,作为TOP5答案    
In [18]:
# TODO: 编写几个测试用例,并输出结果
print (get_top_results_tfidf_noindex("when did Beyonce start becoming popular"))
print (get_top_results_tfidf_noindex("what languge does the word of 'symbiosis' come from"))
Out [18]:
['in the late 1990s', 'Particularly since the 1950s, pro wrestling events have frequently been responsible for sellout crowds at large arenas', 'single mandolins', 'single mandolins', 'molecular methods']
['Greek', '1877', 'living together', '1570s', 'glesum']

你会发现上述的程序很慢,没错! 是因为循环了所有库里的问题。为了优化这个过程,我们需要使用一种数据结构叫做倒排表。 使用倒排表我们可以把单词和出现这个单词的文档做关键。 之后假如要搜索包含某一个单词的文档,即可以非常快速的找出这些文档。 在这个QA系统上,我们首先使用倒排表来快速查找包含至少一个单词的文档,然后再进行余弦相似度的计算,即可以大大减少时间复杂度

3.2 倒排表的创建

倒排表的创建其实很简单,最简单的方法就是循环所有的单词一遍,然后记录每一个单词所出现的文档,然后把这些文档的ID保存成list即可。我们可以定义一个类似于hash_map, 比如 inverted_index = {}, 然后存放包含每一个关键词的文档出现在了什么位置,也就是,通过关键词的搜索首先来判断包含这些关键词的文档(比如出现至少一个),然后对于candidates问题做相似度比较。

In [25]:
# TODO 请创建倒排表
inverted_idx = {}  # 定一个一个简单的倒排表,是一个map结构。 循环所有qlist一遍就可以
for index, sentence in enumerate(qlist):
    for word in sentence:
        if word not in inverted_idx: inverted_idx[word] = [index]
        else: inverted_idx[word].append(index)

3.3 语义相似度

这里有一个问题还需要解决,就是语义的相似度。可以这么理解: 两个单词比如car, auto这两个单词长得不一样,但从语义上还是类似的。如果只是使用倒排表我们不能考虑到这些单词之间的相似度,这就导致如果我们搜索句子里包含了car, 则我们没法获取到包含auto的所有的文档。所以我们希望把这些信息也存下来。那这个问题如何解决呢? 其实也不难,可以提前构建好相似度的关系,比如对于car这个单词,一开始就找好跟它意思上比较类似的单词比如top 10,这些都标记为related words。所以最后我们就可以创建一个保存related words的一个map. 比如调用related_words['car']即可以调取出跟car意思上相近的TOP 10的单词。

那这个related_words又如何构建呢? 在这里我们仍然使用Glove向量,然后计算一下俩俩的相似度(余弦相似度)。之后对于每一个词,存储跟它最相近的top 10单词,最终结果保存在related_words里面。 这个计算需要发生在离线,因为计算量很大,复杂度为O(V*V), V是单词的总数。

这个计算过程的代码请放在related.py的文件里,然后结果保存在related_words.txt里。 我们在使用的时候直接从文件里读取就可以了,不用再重复计算。所以在此notebook里我们就直接读取已经计算好的结果。 作业提交时需要提交related.pyrelated_words.txt文件,这样在使用的时候就不再需要做这方面的计算了。

In [26]:
# TODO 读取语义相关的单词
def get_related_words(file):
    related_words = {}
    for line in open(file, mode='r', encoding='utf-8'):
        item = line.split(",")
        word, s_list = item[0], [value for value in item[1].strip().split()]
        related_words[word] = s_list
    return related_words

related_words = get_related_words('related_words.txt') # 直接放在文件夹的根目录下,不要修改此路径。

3.4 利用倒排表搜索

在这里,我们使用倒排表先获得一批候选问题,然后再通过余弦相似度做精准匹配,这样一来可以节省大量的时间。搜索过程分成两步:

  • 使用倒排表把候选问题全部提取出来。首先,对输入的新问题做分词等必要的预处理工作,然后对于句子里的每一个单词,从related_words里提取出跟它意思相近的top 10单词, 然后根据这些top词从倒排表里提取相关的文档,把所有的文档返回。 这部分可以放在下面的函数当中,也可以放在外部。
  • 然后针对于这些文档做余弦相似度的计算,最后排序并选出最好的答案。

可以适当定义自定义函数,使得减少重复性代码

In [27]:
def get_inverted_index_sentence(query):
    query = word_tokenize(query)
    query= sentence_preprocess(query)
    
    r_list = []
    for q in query:
        if q in related_words: 
            for word in related_words[q]:
                r_list.append(word)
    
    total_list = query
    for word in r_list:
        total_list.append(word)
    
    idx_list = []    
    for word in total_list:
        if word in inverted_idx:
            indx = inverted_idx[word]
            idx_list.extend(indx)
    return query, idx_list
In [28]:
def get_top_index(result):
    top_idxs = []
    dict_visited = {}
    top_result = sorted(get_top_numbers(result, 5), reverse = True)
    
    for r in top_result:
        for i, n in enumerate(result):
            if n == r and i not in dict_visited: 
                top_idxs.append(i)
                dict_visited[i] = True
    return top_idxs
In [23]:
def get_top_results_tfidf(query):
    """
    给定用户输入的问题 query, 返回最有可能的TOP 5问题。这里面需要做到以下几点:
    1. 利用倒排表来筛选 candidate (需要使用related_words). 
    2. 对于候选文档,计算跟输入问题之间的相似度
    3. 找出相似度最高的top5问题的答案
    """
    query, sentence_idxs = get_inverted_index_sentence(query)
    query = ' '.join(query)
    q_tfidf = vectorizer.transform([query])
    
    
    X_tfidf_idx = []
    for indx in sentence_idxs:
        X_tfidf_idx.append(X_tfidf[indx].toarray()[0])
    
    res = list(cosine_similarity(q_tfidf, X_tfidf_idx)[0])
    
    top_idxs = []   
    top_idxs = get_top_index(res)
    
    ans = [alist[i] for i in top_idxs]
    
    return ans   # 返回相似度最高的问题对应的答案,作为TOP5答案
In [24]:
test_query1 = "when did Beyonce start becoming popular"
test_query2 = "what languge does the word of symbiosis come from"

print (get_top_results_tfidf(test_query1))
print (get_top_results_tfidf(test_query2))
Out [24]:
['in the late 1990s', 'Beck', 'Polish Great Emigration', 'six', 'the Karmapa', 'November 12, 2015']
['Darlette Johnson', 'flute and violin', 'Chopin Family Parlour', '2001: A Space Odyssey', 'Houston', 'Saxon Palace.']
In [33]:
def get_top_results_w2v(query):
    """
    给定用户输入的问题 query, 返回最有可能的TOP 5问题。这里面需要做到以下几点:
    1. 利用倒排表来筛选 candidate (需要使用related_words). 
    2. 对于候选文档,计算跟输入问题之间的相似度
    3. 找出相似度最高的top5问题的答案
    """
        
    query, sentence_idxs = get_inverted_index_sentence(query)
    query = ' '.join(query)
    q_w2v = sentence_to_vec(emc,query)
    
    
    q_w2v_idx = []
    for indx in sentence_idxs:
        q_w2v_idx.append(X_w2v[indx])
    
    res = list(cosine_similarity(np.array([q_w2v]), np.array([q_w2v_idx])[0])[0])
    
    top_idxs = []   
    top_idxs = get_top_index(res)
    
    ans = [alist[i] for i in top_idxs]
    
    return ans  # 返回相似度最高的问题对应的答案,作为TOP5答案
In [34]:
test_query1 = "when did Beyonce start becoming popular"
test_query2 = "what languge does the word of symbiosis come from"

print (get_top_results_w2v(test_query1))
print (get_top_results_w2v(test_query2))
Out [34]:
['Top 20 Hot 100 Songwriters', 'Diana Ross.', '1664', "a canon at one beat's distance", 'Gautama Buddha']
['50', 'Missy Elliott and Alicia Keys', 'GamePro and EGM', 'The Women Behind The Music', 'Oujda, Tangier and Erfoud']
In [56]:
def get_top_results_bert(query):
    """
    给定用户输入的问题 query, 返回最有可能的TOP 5问题。这里面需要做到以下几点:
    1. 利用倒排表来筛选 candidate (需要使用related_words). 
    2. 对于候选文档,计算跟输入问题之间的相似度
    3. 找出相似度最高的top5问题的答案
    """
    
    query, sentence_idxs = get_inverted_index_sentence(query)
    query = ' '.join(query)
    q_bert = sentence_to_vec_bert(query)
     
    
    
    q_bert_idx = []
    for indx in sentence_idxs:
        q_bert_idx.append(X_bert[indx])
    
    res = list(cosine_similarity(np.array([q_bert]), np.array([q_bert_idx])[0])[0])
    
    top_idxs = []   
    top_idxs = get_top_index(res)
    
    ans = [alist[i] for i in top_idxs]
    
    return ans  # 返回相似度最高的问题对应的答案,作为TOP5答案
test_query1 = "when did Beyonce start becoming popular"
test_query2 = "what language does the word of symbiosis come from"

print (get_top_results_bert(test_query1))
print (get_top_results_bert(test_query2))
In [35]:
# TODO: 编写几个测试用例,并输出结果

test_query1 = "when did Beyonce start becoming popular"
test_query2 = "what languge does the word of symbiosis come from"

print (get_top_results_tfidf(test_query1))
print (get_top_results_w2v(test_query1))
# print (get_top_results_bert(test_query1))

print (get_top_results_tfidf(test_query2))
print (get_top_results_w2v(test_query2))
# print (get_top_results_bert(test_query2))
Out [35]:
['in the late 1990s', 'Beck', 'Polish Great Emigration', 'six', 'the Karmapa', 'November 12, 2015']
['Top 20 Hot 100 Songwriters', 'Diana Ross.', '1664', "a canon at one beat's distance", 'Gautama Buddha']
['Darlette Johnson', 'flute and violin', 'Chopin Family Parlour', '2001: A Space Odyssey', 'Houston', 'Saxon Palace.']
['50', 'Missy Elliott and Alicia Keys', 'GamePro and EGM', 'The Women Behind The Music', 'Oujda, Tangier and Erfoud']

4. 拼写纠错

其实用户在输入问题的时候,不能期待他一定会输入正确,有可能输入的单词的拼写错误的。这个时候我们需要后台及时捕获拼写错误,并进行纠正,然后再通过修正之后的结果再跟库里的问题做匹配。这里我们需要实现一个简单的拼写纠错的代码,然后自动去修复错误的单词。

这里使用的拼写纠错方法是课程里讲过的方法,就是使用noisy channel model。 我们回想一下它的表示:

$c^* = \text{argmax}{c\in candidates} ~~p(c|s) = \text{argmax}{c\in candidates} ~~p(s|c)p(c)$

这里的candidates指的是针对于错误的单词的候选集,这部分我们可以假定是通过edit_distance来获取的(比如生成跟当前的词距离为1/2的所有的valid 单词。 valid单词可以定义为存在词典里的单词。 c代表的是正确的单词, s代表的是用户错误拼写的单词。 所以我们的目的是要寻找出在candidates里让上述概率最大的正确写法c

$p(s|c)$,这个概率我们可以通过历史数据来获得,也就是对于一个正确的单词$c$, 有百分之多少人把它写成了错误的形式1,形式2... 这部分的数据可以从spell_errors.txt里面找得到。但在这个文件里,我们并没有标记这个概率,所以可以使用uniform probability来表示。这个也叫做channel probability。

$p(c)$,这一项代表的是语言模型,也就是假如我们把错误的$s$,改造成了$c$, 把它加入到当前的语句之后有多通顺?在本次项目里我们使用bigram来评估这个概率。 举个例子: 假如有两个候选 c1,c2 c_1, c_2, 然后我们希望分别计算出这个语言模型的概率。 由于我们使用的是bigram, 我们需要计算出两个概率,分别是当前词前面和后面词的bigram概率。 用一个例子来表示:

给定: We are go to school tomorrow, 对于这句话我们希望把中间的go替换成正确的形式,假如候选集里有个,分别是going, went, 这时候我们分别对这俩计算如下的概率: p(goingare)p(togoing) p(going|are)p(to|going)p(wentare)p(towent) p(went|are)p(to|went), 然后把这个概率当做是$p(c)$的概率。 然后再跟channel probability结合给出最终的概率大小。

那这里的$p(are|going)$这些bigram概率又如何计算呢?答案是训练一个语言模型! 但训练一个语言模型需要一些文本数据,这个数据怎么找? 在这次项目作业里我们会用到nltk自带的reuters的文本类数据来训练一个语言模型。当然,如果你有资源你也可以尝试其他更大的数据。最终目的就是计算出bigram概率。

4.1 训练一个语言模型

在这里,我们使用nltk自带的reuters数据来训练一个语言模型。 使用add-one smoothing

from nltk.corpus import reuters
import numpy as np
import codecs
# 读取语料库的数据
categories = reuters.categories()
corpus = reuters.sents(categories=categories)

word2index = {}
index2word = {}
corpus2 = []

for sentence in corpus:
    corpus2.append(['<s> '] + sentence + [' </s>'])

for sentence in corpus2:
    for word in sentence:
        word = word.lower()
        if word in word2index: continue
        index2word[len(word2index)] = word
        word2index[word] = len(word2index)

word_count = len(word2index)
uni_count = np.zeros(word_count)
bi_count = np.zeros((word_count, word_count))

for sentence in corpus2:
    for i, word in enumerate(sentence):
        word = word.lower()
        uni_count[word2index[word]] += 1
        if i <len(sentence) -1:
            pre = word2index[word]
            curr = word2index[sentence[i+1].lower()]
            bi_count[pre, curr] +=1

bigram = np.zeros((word_count, word_count))

for i in range(word_count):
    for j in range(word_count):
        if bi_count[i,j]==0:
            bigram[i,j] = 1.0 / (uni_count[i] + word_count)
        else:
            bigram[i,j] = (1.0 + bi_count[i,j]) / (word_count + uni_count[i])
def checkCount(pre,word):
    if pre.lower() in word2index and word.lower() in word2index:
        return bigram[word2index[pre.lower()],word2index[word.lower()]]
    else:
        return 0.0

4.2 构建Channel Probs

基于spell_errors.txt文件构建channel probability, 其中$channel[c][s]$表示正确的单词$c$被写错成$s$的概率。

# TODO 构建channel probability  
channel = {}

spell_error_dict = {}
for line in open('spell-errors.txt'):
    word = line.split(":")
    c_word = word[0] # correct word is the key
    spell_error_dict[c_word] = [e_word.strip( )for e_word in word[1].strip().split(",")]

# TODO
for c_word in spell_error_dict:
    if c_word not in channel:
        channel[c_word] = {}
    for e_word in spell_error_dict[c_word]:
        channel[c_word][e_word] = 1/len(spell_error_dict[c_word])
    
# print(channel)   

4.3 根据错别字生成所有候选集合

给定一个错误的单词,首先生成跟这个单词距离为1或者2的所有的候选集合。 这部分的代码我们在课程上也讲过,可以参考一下。

alphabet = "abcdefghijklmnopqrstuvwxyz"
def known(words):
    return set(w for w in words if w in word2index)
def edits1(word):
    n = len(word)
    #删除
    s1 = [word[0:i] + word[i+1:] for i in range(n)]
    #调换相连的两个字母
    s2 = [word[0:i] + word[i+1] + word[i] + word[i+2:] for i in range(n-1)]
    #replace
    s3 = [word[0:i] + c + word[i + 1:] for i in range(n) for c in alphabet]
    #插入
    s4 = [word[0:i] + c + word[i:] for i in range(n + 1) for c in alphabet]
    edit1_words = set(s1 + s2 + s3 + s4)

    if word in edit1_words:
        edit1_words.remove(word)

    edit1_words = known(edit1_words)
    return edit1_words
def edits2(word, edit1_words):
    edit2_words = set(e2 for e1 in edit1_words for e2 in edits1(e1))
    
    if word in edit2_words:
        edit2_words.remove(word)
    
    edit2_words = known(edit2_words)
    return edit2_words
def generate_candidates(word):
    # 基于拼写错误的单词,生成跟它的编辑距离为1或者2的单词,并通过词典库的过滤。
    # 只留写法上正确的单词。 
    
    word_edit1 = edits1(word)
    word_edit2 = edits2(word, word_edit1)
    
    words = word_edit1 | word_edit2
    
    return words

4.4 给定一个输入,如果有错误需要纠正

给定一个输入query, 如果这里有些单词是拼错的,就需要把它纠正过来。这部分的实现可以简单一点: 对于query分词,然后把分词后的每一个单词在词库里面搜一下,假设搜不到的话可以认为是拼写错误的! 人如果拼写错误了再通过channelbigram来计算最适合的候选。

import numpy as np
from queue import PriorityQueue
def spell_corrector(line):
    # 1. 首先做分词,然后把``line``表示成``tokens``
    # 2. 循环每一token, 然后判断是否存在词库里。如果不存在就意味着是拼写错误的,需要修正。 
    #    修正的过程就使用上述提到的``noisy channel model``, 然后从而找出最好的修正之后的结果。  
    
    corrected_words = []
    tokens = []
    tokens = ['<s>']+word_tokenize(line)+['</s>']
    for i, token in enumerate(tokens):
        if i == len(tokens)-1: break
        if token.lower() not in word2index:
            pre, nxt = tokens[i-1].lower(), tokens[i+1].lower()
            token = word_corrector(token, pre, nxt)
            corrected_words.append(token)
        else: corrected_words.append(token)
    newline = ' '.join(corrected_words)
    
    return newline   # 修正之后的结果,假如用户输入没有问题,那这时候``newline = line``
def word_corrector(word, pre_word, next_word):
    candidates = generate_candidates(word)
    correctors = PriorityQueue()
    
    if len(candidates) == 0: return word
    
    for candidate in candidates:
        if candidate in channel and word in channel[candidate] and candidate in word2index:
            bi_pre = checkCount(pre_word, candidate)
            bi_nxt = checkCount(candidate, next_word)
            p = np.log(channel[candidate][word] + 0.001) + bi_pre + bi_nxt
            correctors.put((-1*p, candidate))
    
    if correctors.empty(): return word
    
    return correctors.get()[1]
sentence = spell_corrector("when did Beyonce start beeome popular?")
print(sentence)
test_query1 = "What counted for more of the poplation change"  # 拼写错误的
test_query2 = "What counted for more of the population chenge"  # 拼写错误的

test_query1 = spell_corrector(test_query1)
test_query2 = spell_corrector(test_query2)

print(test_query1)
print(test_query2)

4.5 基于拼写纠错算法,实现用户输入自动矫正

首先有了用户的输入query, 然后做必要的处理把句子转换成tokens的形状,然后对于每一个token比较是否是valid, 如果不是的话就进行下面的修正过程。

test_query1 = "when did Beyonce starte becoming popular?"  # 拼写错误的
test_query2 = "What counted for more of the population chenge"  # 拼写错误的

test_query1 = spell_corector(test_query1)
test_query2 = spell_corector(test_query2)

print (get_top_results_tfidf(test_query1))
print (get_top_results_w2v(test_query1))
print (get_top_results_bert(test_query1))

print (get_top_results_tfidf(test_query2))
print (get_top_results_w2v(test_query2))
print (get_top_results_bert(test_query2))

附录

在本次项目中我们实现了一个简易的问答系统。基于这个项目,我们其实可以有很多方面的延伸。

  • 在这里,我们使用文本向量之间的余弦相似度作为了一个标准。但实际上,我们也可以基于基于包含关键词的情况来给一定的权重。比如一个单词跟related word有多相似,越相似就意味着相似度更高,权重也会更大。
  • 另外 ,除了根据词向量去寻找related words也可以提前定义好同义词库,但这个需要大量的人力成本。
  • 在这里,我们直接返回了问题的答案。 但在理想情况下,我们还是希望通过问题的种类来返回最合适的答案。 比如一个用户问:“明天北京的天气是多少?”, 那这个问题的答案其实是一个具体的温度(其实也叫做实体),所以需要在答案的基础上做进一步的抽取。这项技术其实是跟信息抽取相关的。
  • 对于词向量,我们只是使用了average pooling, 除了average pooling,我们也还有其他的经典的方法直接去学出一个句子的向量。
  • 短文的相似度分析一直是业界和学术界一个具有挑战性的问题。在这里我们使用尽可能多的同义词来提升系统的性能。但除了这种简单的方法,可以尝试其他的方法比如WMD,或者适当结合parsing相关的知识点。

好了,祝你好运!