搭建一个简单的问答系统 (Building a Simple QA System)
本次项目的目标是搭建一个基于检索式的简易的问答系统,这是一个最经典的方法也是最有效的方法。
不要单独创建一个文件,所有的都在这里面编写,不要试图改已经有的函数名字 (但可以根据需求自己定义新的函数)
预估完成时间
: 5-10小时
检索式的问答系统
问答系统所需要的数据已经提供,对于每一个问题都可以找得到相应的答案,所以可以理解为每一个样本数据是 <问题、答案>
。 那系统的核心是当用户输入一个问题的时候,首先要找到跟这个问题最相近的已经存储在库里的问题,然后直接返回相应的答案即可(但实际上也可以抽取其中的实体或者关键词)。 举一个简单的例子:
假设我们的库里面已有存在以下几个<问题,答案>:
- <"贪心学院主要做什么方面的业务?”, “他们主要做人工智能方面的教育”>
- <“国内有哪些做人工智能教育的公司?”, “贪心学院”>
- <"人工智能和机器学习的关系什么?", "其实机器学习是人工智能的一个范畴,很多人工智能的应用要基于机器学习的技术">
- <"人工智能最核心的语言是什么?", ”Python“>
- .....
假设一个用户往系统中输入了问题 “贪心学院是做什么的?”, 那这时候系统先去匹配最相近的“已经存在库里的”问题。 那在这里很显然是 “贪心学院是做什么的”和“贪心学院主要做什么方面的业务?”是最相近的。 所以当我们定位到这个问题之后,直接返回它的答案 “他们主要做人工智能方面的教育”就可以了。 所以这里的核心问题可以归结为计算两个问句(query)之间的相似度。
项目中涉及到的任务描述
问答系统看似简单,但其中涉及到的内容比较多。 在这里先做一个简单的解释,总体来讲,我们即将要搭建的模块包括:
- 文本的读取: 需要从相应的文件里读取
(问题,答案)
- 文本预处理: 清洗文本很重要,需要涉及到
停用词过滤
等工作 - 文本的表示: 如果表示一个句子是非常核心的问题,这里会涉及到
tf-idf
,Glove
以及BERT Embedding
- 文本相似度匹配: 在基于检索式系统中一个核心的部分是计算文本之间的
相似度
,从而选择相似度最高的问题然后返回这些问题的答案 - 倒排表: 为了加速搜索速度,我们需要设计
倒排表
来存储每一个词与出现的文本 - 词义匹配:直接使用倒排表会忽略到一些意思上相近但不完全一样的单词,我们需要做这部分的处理。我们需要提前构建好
相似的单词
然后搜索阶段使用 - 拼写纠错:我们不能保证用户输入的准确,所以第一步需要做用户输入检查,如果发现用户拼错了,我们需要及时在后台改正,然后按照修改后的在库里面搜索
- 文档的排序: 最后返回结果的排序根据文档之间
余弦相似度
有关,同时也跟倒排表中匹配的单词有关
项目中需要的数据:
dev-v2.0.json
: 这个数据包含了问题和答案的pair, 但是以JSON格式存在,需要编写parser来提取出里面的问题和答案。glove.6B
: 这个文件需要从网上下载,下载地址为:https://nlp.stanford.edu/projects/glove/, 请使用d=200的词向量spell-errors.txt
这个文件主要用来编写拼写纠错模块。 文件中第一列为正确的单词,之后列出来的单词都是常见的错误写法。 但这里需要注意的一点是我们没有给出他们之间的概率,也就是p(错误|正确),所以我们可以认为每一种类型的错误都是同等概率
vocab.txt
这里列了几万个英文常见的单词,可以用这个词库来验证是否有些单词被拼错testdata.txt
这里搜集了一些测试数据,可以用来测试自己的spell corrector。这个文件只是用来测试自己的程序。
在本次项目中,你将会用到以下几个工具:
sklearn
。具体安装请见:http://scikit-learn.org/stable/install.html sklearn包含了各类机器学习算法和数据处理工具,包括本项目需要使用的词袋模型,均可以在sklearn工具包中找得到。jieba
,用来做分词。具体使用方法请见 https://github.com/fxsjy/jiebabert embedding
: https://github.com/imgarylai/bert-embeddingnltk
:https://www.nltk.org/index.html
第一部分:对于训练数据的处理:读取文件和预处理
文本的读取
: 需要从文本中读取数据,此处需要读取的文件是dev-v2.0.json
,并把读取的文件存入一个列表里(list)文本预处理
: 对于问题本身需要做一些停用词过滤等文本方面的处理可视化分析
: 对于给定的样本数据,做一些可视化分析来更好地理解数据
1.1节: 文本的读取
把给定的文本数据读入到qlist
和alist
当中,这两个分别是列表,其中qlist
是问题的列表,alist
是对应的答案列表
def read_corpus(): """ 读取给定的语料库,并把问题列表和答案列表分别写入到 qlist, alist 里面。 在此过程中,不用对字符换做任何的处理(这部分需要在 Part 2.3里处理) qlist = ["问题1", “问题2”, “问题3” ....] alist = ["答案1", "答案2", "答案3" ....] 务必要让每一个问题和答案对应起来(下标位置一致) """ # TODO 需要完成的代码部分 ... import json import jsonpath with open('train-v2.0.json', 'r', encoding='utf-8') as f: dic = json.load(f) qlist = jsonpath.jsonpath(dic,'$..question') alist = jsonpath.jsonpath(dic,'$..text') f.close() assert len(qlist) == len(alist) # 确保长度一样 return qlist, alist
1.2 理解数据(可视化分析/统计信息)
对数据的理解是任何AI工作的第一步, 需要对数据有个比较直观的认识。在这里,简单地统计一下:
- 在
qlist
出现的总单词个数 - 按照词频画一个
histogram
plot
# TODO: 统计一下在qlist中总共出现了多少个单词? 总共出现了多少个不同的单词(unique word)? # 这里需要做简单的分词,对于英文我们根据空格来分词即可,其他过滤暂不考虑(只需分词) from nltk import word_tokenize qlist, alist = read_corpus() qwords = [] for sentence in qlist: qwords.append(word_tokenize(sentence)) mydist = {} for sentence in qwords: for word in sentence: if word not in mydist: mydist[word] =1 else: mydist[word] +=1 word_total = len(mydist) print (word_total)
53157
from nltk import word_tokenize from nltk import FreqDist %matplotlib inline fdist = FreqDist(mydist) fdist.plot(20, cumulative=True)
# TODO: 从上面的图中能观察到什么样的现象? 这样的一个图的形状跟一个非常著名的函数形状很类似,能所出此定理吗? # hint: [XXX]'s law # # zip's law
1.3 文本预处理
此部分需要做文本方面的处理。 以下是可以用到的一些方法:
- 停用词过滤 (去网上搜一下 "english stop words list",会出现很多包含停用词库的网页,或者直接使用NLTK自带的)
- 转换成lower_case: 这是一个基本的操作
- 去掉一些无用的符号: 比如连续的感叹号!!!, 或者一些奇怪的单词。
- 去掉出现频率很低的词:比如出现次数少于10,20.... (想一下如何选择阈值)
- 对于数字的处理: 分词完只有有些单词可能就是数字比如44,415,把所有这些数字都看成是一个单词,这个新的单词我们可以定义为 "#number"
- lemmazation: 在这里不要使用stemming, 因为stemming的结果有可能不是valid word。
# TODO: 需要做文本方面的处理。 从上述几个常用的方法中选择合适的方法给qlist做预处理(不一定要按照上面的顺序,不一定要全部使用) from nltk.corpus import stopwords from nltk.stem import WordNetLemmatizer stop_words = set(stopwords.words('english')) wnl = WordNetLemmatizer() f_list = [] english_punctuations = ['(', ')', '[', ']', '&', '!!', '*', '@', '#', '$', '%', '?'] def sentencesList_preprocess(sentencesList): for sentence in sentencesList: # filter stop words filtered = [word for word in sentence if word not in stop_words] # to lower case lower_case = [word.lower() for word in filtered] # lemmatization lemma = [wnl.lemmatize(token) for token in lower_case] # remove unused symbols sym = [word for word in lemma if word not in english_punctuations] f_list.append(sym) return f_list qlist = sentencesList_preprocess(qwords) # 更新后的问题列表
def sentence_preprocess(sentence): # filter stop words filtered = [word for word in sentence if word not in stop_words] # to lower case lower_case = [word.lower() for word in filtered] # lemmatization lemma = [wnl.lemmatize(token) for token in lower_case] # remove unused symbols token = [word for word in lemma if word not in english_punctuations] return token
第二部分: 文本的表示
当我们做完必要的文本处理之后就需要想办法表示文本了,这里有几种方式
- 使用
tf-idf vector
- 使用
- 使用embedding技术如
word2vec
,bert embedding
等
- 使用embedding技术如
下面我们分别提取这三个特征来做对比。
2.1 使用tf-idf表示向量
把qlist
中的每一个问题的字符串转换成tf-idf
向量, 转换之后的结果存储在X
矩阵里。 X
的大小是: N* D
的矩阵。 这里N
是问题的个数(样本个数),
D
是词典库的大小
# TODO from sklearn.feature_extraction.text import TfidfVectorizer vectorizer = TfidfVectorizer() # 定义一个tf-idf的vectorizer qlistnew = [' '.join(sentence) for sentence in qlist] X_tfidf = vectorizer.fit_transform(qlistnew) # 结果存放在X矩阵里
2.2 使用wordvec + average pooling
词向量方面需要下载: https://nlp.stanford.edu/projects/glove/ (请下载glove.6B.zip
),并使用d=200
的词向量(200维)。国外网址如果很慢,可以在百度上搜索国内服务器上的。 每个词向量获取完之后,即可以得到一个句子的向量。 我们通过average pooling
来实现句子的向量。
# load Glove def loadGlove(path): vocab = {} embedding = [] vocab["UNK"] = 0 embedding.append([0]*200) file = open(path, 'r', encoding='utf8') i = 1 for line in file: row = line.strip().split() vocab[row[0]] = i embedding.append(row[1:]) i += 1 print("Finish load Glove") file.close() return vocab, embedding path = '../glove.6b/glove.6b.200d.txt' voc, emb = loadGlove(path)
Finish load Glove
def sentence_to_vec(embedding, sentence): vec = np.zeros((200,), dtype=np.float64) for word in sentence: if word in voc: idx = voc[word] vec += embedding[idx].astype('float64') else: vec += embedding[0].astype('float64') vec = vec/len(sentence) return vec
def sentences_to_vec(embedding, sentences): vec = np.zeros((len(sentences), 200)) for i, sentence in enumerate(sentences): sentence = sentence.strip().split(' ') vec[i] = sentence_to_vec(embedding, sentence) return vec
import numpy as np emc = np.asarray(emb)
# TODO 基于Glove向量获取句子向量 # emb = # 这是 D*H的矩阵,这里的D是词典库的大小, H是词向量的大小。 这里面我们给定的每个单词的词向量, # 这需要从文本中读取 X_w2v = sentences_to_vec(emc, qlistnew) # 初始化完emb之后就可以对每一个句子来构建句子向量了,这个过程使用average pooling来实现
# for i in voc: # v = emb[v] # res = list(cosine_similarity(emb, v)[0]) # result = sorted(get_top_numbers(res, 10), reverse = True)
2.3 使用BERT + average pooling
最近流行的BERT也可以用来学出上下文相关的词向量(contex-aware embedding), 在很多问题上得到了比较好的结果。在这里,我们不做任何的训练,而是直接使用已经训练好的BERT embedding。 具体如何训练BERT将在之后章节里体会到。 为了获取BERT-embedding,可以直接下载已经训练好的模型从而获得每一个单词的向量。可以从这里获取: https://github.com/imgarylai/bert-embedding , 请使用bert_12_768_12
当然,你也可以从其他source获取也没问题,只要是合理的词向量。
import numpy as np from bert_embedding.bert import BertEmbedding bert_embedding = BertEmbedding(model='bert_12_768_12', dataset_name='book_corpus_wiki_en_uncased')
def sentences_to_vec_bert(sentences): X_bert = np.zeros((len(sentences), 768)) for i in range(len(sentences)): emb = bert_embedding(sentence, 'avg') vec_list = emb[0][1] # v_bert = np.zeros((768,), dtype=np.float32) # for vec in vec_list: # v_bert += vec.astype('float32') # v_bert = v_bert/len(vec_list) v_bert = np.average(vec_list, axis=0) X_bert[i] = v_bert return X_bert
def sentence_to_vec_bert(sentence): emb = bert_embedding([sentence], 'avg') vec_list = emb[0][1] v_bert = np.sum(vec_list, axis=0) return v_bert
# TODO 基于BERT的句子向量计算 X_bert = sentences_to_vec_bert(qlistnew) # 每一个句子的向量结果存放在X_bert矩阵里。行数为句子的总个数,列数为一个句子embedding大小。
import heapq from sklearn.metrics.pairwise import cosine_similarity # return the top k numbers from the list using prority queue def get_top_numbers(tlist, k): max_heap = [] l = len(tlist) if l<=0 or k<=0 or k>l: return None for i in tlist: if k > len(max_heap): heapq.heappush(max_heap, i) else: heapq.heappushpop(max_heap, i) return max_heap
3.1 tf-idf + 余弦相似度
我们可以直接基于计算出来的tf-idf
向量,计算用户最新问题与库中存储的问题之间的相似度,从而选择相似度最高的问题的答案。这个方法的复杂度为O(N)
, N
是库中问题的个数。
def get_top_results_tfidf_noindex(query): # TODO 需要编写 """ 给定用户输入的问题 query, 返回最有可能的TOP 5问题。这里面需要做到以下几点: 1. 对于用户的输入 query 首先做一系列的预处理(上面提到的方法),然后再转换成tf-idf向量(利用上面的vectorizer) 2. 计算跟每个库里的问题之间的相似度 3. 找出相似度最高的top5问题的答案 """ query = word_tokenize(query) query= sentence_preprocess(query) query = ' '.join(query) q_tfidf = vectorizer.transform([query]) res = list(cosine_similarity(q_tfidf, X_tfidf)[0]) result = sorted(get_top_numbers(res, 5), reverse = True) top_idxs = [] # top_idxs存放相似度最高的(存在qlist里的)问题的下标 # hint: 请使用 priority queue来找出top results. 思考为什么可以这么做? dict_visited = {} for r in result: for i, n in enumerate(res): if n == r and i not in dict_visited: top_idxs.append(i) dict_visited[i] = True ans = [alist[i] for i in top_idxs] return ans # 返回相似度最高的问题对应的答案,作为TOP5答案
# TODO: 编写几个测试用例,并输出结果 print (get_top_results_tfidf_noindex("when did Beyonce start becoming popular")) print (get_top_results_tfidf_noindex("what languge does the word of 'symbiosis' come from"))
['in the late 1990s', 'Particularly since the 1950s, pro wrestling events have frequently been responsible for sellout crowds at large arenas', 'single mandolins', 'single mandolins', 'molecular methods'] ['Greek', '1877', 'living together', '1570s', 'glesum']
你会发现上述的程序很慢,没错! 是因为循环了所有库里的问题。为了优化这个过程,我们需要使用一种数据结构叫做倒排表
。 使用倒排表我们可以把单词和出现这个单词的文档做关键。 之后假如要搜索包含某一个单词的文档,即可以非常快速的找出这些文档。 在这个QA系统上,我们首先使用倒排表来快速查找包含至少一个单词的文档,然后再进行余弦相似度的计算,即可以大大减少时间复杂度
。
3.2 倒排表的创建
倒排表的创建其实很简单,最简单的方法就是循环所有的单词一遍,然后记录每一个单词所出现的文档,然后把这些文档的ID保存成list即可。我们可以定义一个类似于hash_map
, 比如 inverted_index = {}
, 然后存放包含每一个关键词的文档出现在了什么位置,也就是,通过关键词的搜索首先来判断包含这些关键词的文档(比如出现至少一个),然后对于candidates问题做相似度比较。
# TODO 请创建倒排表 inverted_idx = {} # 定一个一个简单的倒排表,是一个map结构。 循环所有qlist一遍就可以 for index, sentence in enumerate(qlist): for word in sentence: if word not in inverted_idx: inverted_idx[word] = [index] else: inverted_idx[word].append(index)
3.3 语义相似度
这里有一个问题还需要解决,就是语义的相似度。可以这么理解: 两个单词比如car, auto这两个单词长得不一样,但从语义上还是类似的。如果只是使用倒排表我们不能考虑到这些单词之间的相似度,这就导致如果我们搜索句子里包含了car
, 则我们没法获取到包含auto的所有的文档。所以我们希望把这些信息也存下来。那这个问题如何解决呢? 其实也不难,可以提前构建好相似度的关系,比如对于car
这个单词,一开始就找好跟它意思上比较类似的单词比如top 10,这些都标记为related words
。所以最后我们就可以创建一个保存related words
的一个map
. 比如调用related_words['car']
即可以调取出跟car
意思上相近的TOP 10的单词。
那这个related_words
又如何构建呢? 在这里我们仍然使用Glove
向量,然后计算一下俩俩的相似度(余弦相似度)。之后对于每一个词,存储跟它最相近的top 10单词,最终结果保存在related_words
里面。 这个计算需要发生在离线,因为计算量很大,复杂度为O(V*V)
, V是单词的总数。
这个计算过程的代码请放在related.py
的文件里,然后结果保存在related_words.txt
里。 我们在使用的时候直接从文件里读取就可以了,不用再重复计算。所以在此notebook里我们就直接读取已经计算好的结果。 作业提交时需要提交related.py
和related_words.txt
文件,这样在使用的时候就不再需要做这方面的计算了。
# TODO 读取语义相关的单词 def get_related_words(file): related_words = {} for line in open(file, mode='r', encoding='utf-8'): item = line.split(",") word, s_list = item[0], [value for value in item[1].strip().split()] related_words[word] = s_list return related_words related_words = get_related_words('related_words.txt') # 直接放在文件夹的根目录下,不要修改此路径。
3.4 利用倒排表搜索
在这里,我们使用倒排表先获得一批候选问题,然后再通过余弦相似度做精准匹配,这样一来可以节省大量的时间。搜索过程分成两步:
- 使用倒排表把候选问题全部提取出来。首先,对输入的新问题做分词等必要的预处理工作,然后对于句子里的每一个单词,从
related_words
里提取出跟它意思相近的top 10单词, 然后根据这些top词从倒排表里提取相关的文档,把所有的文档返回。 这部分可以放在下面的函数当中,也可以放在外部。 - 然后针对于这些文档做余弦相似度的计算,最后排序并选出最好的答案。
可以适当定义自定义函数,使得减少重复性代码
def get_inverted_index_sentence(query): query = word_tokenize(query) query= sentence_preprocess(query) r_list = [] for q in query: if q in related_words: for word in related_words[q]: r_list.append(word) total_list = query for word in r_list: total_list.append(word) idx_list = [] for word in total_list: if word in inverted_idx: indx = inverted_idx[word] idx_list.extend(indx) return query, idx_list
def get_top_index(result): top_idxs = [] dict_visited = {} top_result = sorted(get_top_numbers(result, 5), reverse = True) for r in top_result: for i, n in enumerate(result): if n == r and i not in dict_visited: top_idxs.append(i) dict_visited[i] = True return top_idxs
def get_top_results_tfidf(query): """ 给定用户输入的问题 query, 返回最有可能的TOP 5问题。这里面需要做到以下几点: 1. 利用倒排表来筛选 candidate (需要使用related_words). 2. 对于候选文档,计算跟输入问题之间的相似度 3. 找出相似度最高的top5问题的答案 """ query, sentence_idxs = get_inverted_index_sentence(query) query = ' '.join(query) q_tfidf = vectorizer.transform([query]) X_tfidf_idx = [] for indx in sentence_idxs: X_tfidf_idx.append(X_tfidf[indx].toarray()[0]) res = list(cosine_similarity(q_tfidf, X_tfidf_idx)[0]) top_idxs = [] top_idxs = get_top_index(res) ans = [alist[i] for i in top_idxs] return ans # 返回相似度最高的问题对应的答案,作为TOP5答案
test_query1 = "when did Beyonce start becoming popular" test_query2 = "what languge does the word of symbiosis come from" print (get_top_results_tfidf(test_query1)) print (get_top_results_tfidf(test_query2))
['in the late 1990s', 'Beck', 'Polish Great Emigration', 'six', 'the Karmapa', 'November 12, 2015'] ['Darlette Johnson', 'flute and violin', 'Chopin Family Parlour', '2001: A Space Odyssey', 'Houston', 'Saxon Palace.']
def get_top_results_w2v(query): """ 给定用户输入的问题 query, 返回最有可能的TOP 5问题。这里面需要做到以下几点: 1. 利用倒排表来筛选 candidate (需要使用related_words). 2. 对于候选文档,计算跟输入问题之间的相似度 3. 找出相似度最高的top5问题的答案 """ query, sentence_idxs = get_inverted_index_sentence(query) query = ' '.join(query) q_w2v = sentence_to_vec(emc,query) q_w2v_idx = [] for indx in sentence_idxs: q_w2v_idx.append(X_w2v[indx]) res = list(cosine_similarity(np.array([q_w2v]), np.array([q_w2v_idx])[0])[0]) top_idxs = [] top_idxs = get_top_index(res) ans = [alist[i] for i in top_idxs] return ans # 返回相似度最高的问题对应的答案,作为TOP5答案
test_query1 = "when did Beyonce start becoming popular" test_query2 = "what languge does the word of symbiosis come from" print (get_top_results_w2v(test_query1)) print (get_top_results_w2v(test_query2))
['Top 20 Hot 100 Songwriters', 'Diana Ross.', '1664', "a canon at one beat's distance", 'Gautama Buddha'] ['50', 'Missy Elliott and Alicia Keys', 'GamePro and EGM', 'The Women Behind The Music', 'Oujda, Tangier and Erfoud']
def get_top_results_bert(query): """ 给定用户输入的问题 query, 返回最有可能的TOP 5问题。这里面需要做到以下几点: 1. 利用倒排表来筛选 candidate (需要使用related_words). 2. 对于候选文档,计算跟输入问题之间的相似度 3. 找出相似度最高的top5问题的答案 """ query, sentence_idxs = get_inverted_index_sentence(query) query = ' '.join(query) q_bert = sentence_to_vec_bert(query) q_bert_idx = [] for indx in sentence_idxs: q_bert_idx.append(X_bert[indx]) res = list(cosine_similarity(np.array([q_bert]), np.array([q_bert_idx])[0])[0]) top_idxs = [] top_idxs = get_top_index(res) ans = [alist[i] for i in top_idxs] return ans # 返回相似度最高的问题对应的答案,作为TOP5答案
test_query1 = "when did Beyonce start becoming popular" test_query2 = "what language does the word of symbiosis come from" print (get_top_results_bert(test_query1)) print (get_top_results_bert(test_query2))
# TODO: 编写几个测试用例,并输出结果 test_query1 = "when did Beyonce start becoming popular" test_query2 = "what languge does the word of symbiosis come from" print (get_top_results_tfidf(test_query1)) print (get_top_results_w2v(test_query1)) # print (get_top_results_bert(test_query1)) print (get_top_results_tfidf(test_query2)) print (get_top_results_w2v(test_query2)) # print (get_top_results_bert(test_query2))
['in the late 1990s', 'Beck', 'Polish Great Emigration', 'six', 'the Karmapa', 'November 12, 2015'] ['Top 20 Hot 100 Songwriters', 'Diana Ross.', '1664', "a canon at one beat's distance", 'Gautama Buddha'] ['Darlette Johnson', 'flute and violin', 'Chopin Family Parlour', '2001: A Space Odyssey', 'Houston', 'Saxon Palace.'] ['50', 'Missy Elliott and Alicia Keys', 'GamePro and EGM', 'The Women Behind The Music', 'Oujda, Tangier and Erfoud']
4. 拼写纠错
其实用户在输入问题的时候,不能期待他一定会输入正确,有可能输入的单词的拼写错误的。这个时候我们需要后台及时捕获拼写错误,并进行纠正,然后再通过修正之后的结果再跟库里的问题做匹配。这里我们需要实现一个简单的拼写纠错的代码,然后自动去修复错误的单词。
这里使用的拼写纠错方法是课程里讲过的方法,就是使用noisy channel model。 我们回想一下它的表示:
$c^* = \text{argmax}{c\in candidates} ~~p(c|s) = \text{argmax}{c\in candidates} ~~p(s|c)p(c)$
这里的candidates
指的是针对于错误的单词的候选集,这部分我们可以假定是通过edit_distance来获取的(比如生成跟当前的词距离为1/2的所有的valid 单词。 valid单词可以定义为存在词典里的单词。 c
代表的是正确的单词, s
代表的是用户错误拼写的单词。 所以我们的目的是要寻找出在candidates
里让上述概率最大的正确写法c
。
$p(s|c)$,这个概率我们可以通过历史数据来获得,也就是对于一个正确的单词$c$, 有百分之多少人把它写成了错误的形式1,形式2... 这部分的数据可以从spell_errors.txt
里面找得到。但在这个文件里,我们并没有标记这个概率,所以可以使用uniform probability来表示。这个也叫做channel probability。
$p(c)$,这一项代表的是语言模型,也就是假如我们把错误的$s$,改造成了$c$, 把它加入到当前的语句之后有多通顺?在本次项目里我们使用bigram来评估这个概率。 举个例子: 假如有两个候选 , 然后我们希望分别计算出这个语言模型的概率。 由于我们使用的是bigram
, 我们需要计算出两个概率,分别是当前词前面和后面词的bigram
概率。 用一个例子来表示:
给定: We are go to school tomorrow
, 对于这句话我们希望把中间的go
替换成正确的形式,假如候选集里有个,分别是going
, went
, 这时候我们分别对这俩计算如下的概率: 和 , 然后把这个概率当做是$p(c)$的概率。 然后再跟channel probability
结合给出最终的概率大小。
那这里的$p(are|going)$这些bigram概率又如何计算呢?答案是训练一个语言模型! 但训练一个语言模型需要一些文本数据,这个数据怎么找? 在这次项目作业里我们会用到nltk
自带的reuters
的文本类数据来训练一个语言模型。当然,如果你有资源你也可以尝试其他更大的数据。最终目的就是计算出bigram
概率。
4.1 训练一个语言模型
在这里,我们使用nltk
自带的reuters
数据来训练一个语言模型。 使用add-one smoothing
from nltk.corpus import reuters import numpy as np import codecs # 读取语料库的数据 categories = reuters.categories() corpus = reuters.sents(categories=categories) word2index = {} index2word = {} corpus2 = [] for sentence in corpus: corpus2.append(['<s> '] + sentence + [' </s>']) for sentence in corpus2: for word in sentence: word = word.lower() if word in word2index: continue index2word[len(word2index)] = word word2index[word] = len(word2index) word_count = len(word2index) uni_count = np.zeros(word_count) bi_count = np.zeros((word_count, word_count)) for sentence in corpus2: for i, word in enumerate(sentence): word = word.lower() uni_count[word2index[word]] += 1 if i <len(sentence) -1: pre = word2index[word] curr = word2index[sentence[i+1].lower()] bi_count[pre, curr] +=1 bigram = np.zeros((word_count, word_count)) for i in range(word_count): for j in range(word_count): if bi_count[i,j]==0: bigram[i,j] = 1.0 / (uni_count[i] + word_count) else: bigram[i,j] = (1.0 + bi_count[i,j]) / (word_count + uni_count[i])
def checkCount(pre,word): if pre.lower() in word2index and word.lower() in word2index: return bigram[word2index[pre.lower()],word2index[word.lower()]] else: return 0.0
4.2 构建Channel Probs
基于spell_errors.txt
文件构建channel probability
, 其中$channel[c][s]$表示正确的单词$c$被写错成$s$的概率。
# TODO 构建channel probability channel = {} spell_error_dict = {} for line in open('spell-errors.txt'): word = line.split(":") c_word = word[0] # correct word is the key spell_error_dict[c_word] = [e_word.strip( )for e_word in word[1].strip().split(",")] # TODO for c_word in spell_error_dict: if c_word not in channel: channel[c_word] = {} for e_word in spell_error_dict[c_word]: channel[c_word][e_word] = 1/len(spell_error_dict[c_word]) # print(channel)
4.3 根据错别字生成所有候选集合
给定一个错误的单词,首先生成跟这个单词距离为1或者2的所有的候选集合。 这部分的代码我们在课程上也讲过,可以参考一下。
alphabet = "abcdefghijklmnopqrstuvwxyz"
def known(words): return set(w for w in words if w in word2index)
def edits1(word): n = len(word) #删除 s1 = [word[0:i] + word[i+1:] for i in range(n)] #调换相连的两个字母 s2 = [word[0:i] + word[i+1] + word[i] + word[i+2:] for i in range(n-1)] #replace s3 = [word[0:i] + c + word[i + 1:] for i in range(n) for c in alphabet] #插入 s4 = [word[0:i] + c + word[i:] for i in range(n + 1) for c in alphabet] edit1_words = set(s1 + s2 + s3 + s4) if word in edit1_words: edit1_words.remove(word) edit1_words = known(edit1_words) return edit1_words
def edits2(word, edit1_words): edit2_words = set(e2 for e1 in edit1_words for e2 in edits1(e1)) if word in edit2_words: edit2_words.remove(word) edit2_words = known(edit2_words) return edit2_words
def generate_candidates(word): # 基于拼写错误的单词,生成跟它的编辑距离为1或者2的单词,并通过词典库的过滤。 # 只留写法上正确的单词。 word_edit1 = edits1(word) word_edit2 = edits2(word, word_edit1) words = word_edit1 | word_edit2 return words
4.4 给定一个输入,如果有错误需要纠正
给定一个输入query
, 如果这里有些单词是拼错的,就需要把它纠正过来。这部分的实现可以简单一点: 对于query
分词,然后把分词后的每一个单词在词库里面搜一下,假设搜不到的话可以认为是拼写错误的! 人如果拼写错误了再通过channel
和bigram
来计算最适合的候选。
import numpy as np from queue import PriorityQueue def spell_corrector(line): # 1. 首先做分词,然后把``line``表示成``tokens`` # 2. 循环每一token, 然后判断是否存在词库里。如果不存在就意味着是拼写错误的,需要修正。 # 修正的过程就使用上述提到的``noisy channel model``, 然后从而找出最好的修正之后的结果。 corrected_words = [] tokens = [] tokens = ['<s>']+word_tokenize(line)+['</s>'] for i, token in enumerate(tokens): if i == len(tokens)-1: break if token.lower() not in word2index: pre, nxt = tokens[i-1].lower(), tokens[i+1].lower() token = word_corrector(token, pre, nxt) corrected_words.append(token) else: corrected_words.append(token) newline = ' '.join(corrected_words) return newline # 修正之后的结果,假如用户输入没有问题,那这时候``newline = line``
def word_corrector(word, pre_word, next_word): candidates = generate_candidates(word) correctors = PriorityQueue() if len(candidates) == 0: return word for candidate in candidates: if candidate in channel and word in channel[candidate] and candidate in word2index: bi_pre = checkCount(pre_word, candidate) bi_nxt = checkCount(candidate, next_word) p = np.log(channel[candidate][word] + 0.001) + bi_pre + bi_nxt correctors.put((-1*p, candidate)) if correctors.empty(): return word return correctors.get()[1]
sentence = spell_corrector("when did Beyonce start beeome popular?") print(sentence)
test_query1 = "What counted for more of the poplation change" # 拼写错误的 test_query2 = "What counted for more of the population chenge" # 拼写错误的 test_query1 = spell_corrector(test_query1) test_query2 = spell_corrector(test_query2) print(test_query1) print(test_query2)
4.5 基于拼写纠错算法,实现用户输入自动矫正
首先有了用户的输入query
, 然后做必要的处理把句子转换成tokens的形状,然后对于每一个token比较是否是valid, 如果不是的话就进行下面的修正过程。
test_query1 = "when did Beyonce starte becoming popular?" # 拼写错误的 test_query2 = "What counted for more of the population chenge" # 拼写错误的 test_query1 = spell_corector(test_query1) test_query2 = spell_corector(test_query2) print (get_top_results_tfidf(test_query1)) print (get_top_results_w2v(test_query1)) print (get_top_results_bert(test_query1)) print (get_top_results_tfidf(test_query2)) print (get_top_results_w2v(test_query2)) print (get_top_results_bert(test_query2))
附录
在本次项目中我们实现了一个简易的问答系统。基于这个项目,我们其实可以有很多方面的延伸。
- 在这里,我们使用文本向量之间的余弦相似度作为了一个标准。但实际上,我们也可以基于基于包含关键词的情况来给一定的权重。比如一个单词跟related word有多相似,越相似就意味着相似度更高,权重也会更大。
- 另外 ,除了根据词向量去寻找
related words
也可以提前定义好同义词库,但这个需要大量的人力成本。 - 在这里,我们直接返回了问题的答案。 但在理想情况下,我们还是希望通过问题的种类来返回最合适的答案。 比如一个用户问:“明天北京的天气是多少?”, 那这个问题的答案其实是一个具体的温度(其实也叫做实体),所以需要在答案的基础上做进一步的抽取。这项技术其实是跟信息抽取相关的。
- 对于词向量,我们只是使用了
average pooling
, 除了average pooling,我们也还有其他的经典的方法直接去学出一个句子的向量。 - 短文的相似度分析一直是业界和学术界一个具有挑战性的问题。在这里我们使用尽可能多的同义词来提升系统的性能。但除了这种简单的方法,可以尝试其他的方法比如WMD,或者适当结合parsing相关的知识点。
好了,祝你好运!