Commit 7a4b913d by 20200203048

first

parents
# encoding: utf-8
import matplotlib.pyplot as plt
import json
import io
import sys
import re
sys.stdout = io.TextIOWrapper(sys.stdout.buffer,encoding='utf8') #改变标准输出的默认编码
from sklearn import feature_extraction
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.feature_extraction.text import TfidfTransformer
import numpy as np
from bert_embedding import BertEmbedding
import queue as Q
from scipy.spatial.distance import cosine
def read_corpus():
"""
读取给定的语料库,并把问题列表和答案列表分别写入到 qlist, alist 里面。 在此过程中,不用对字符换做任何的处理(这部分需要在 Part 2.3里处理)
qlist = ["问题1", “问题2”, “问题3” ....]
alist = ["答案1", "答案2", "答案3" ....]
务必要让每一个问题和答案对应起来(下标位置一致)
"""
# TODO 需要完成的代码部分 ...
f_path = "train-v2.0.json"
with open(f_path,'r',encoding='utf-8') as f:
data = json.load(f)
data = data["data"]
qlist = []
alist = []
for d in data:
print(d["title"])
for x in d["paragraphs"]:
for qa in x["qas"]:
answer_key = "answers"
if qa["is_impossible"]:
answer_key = "plausible_answers"
qlist.append(qa["question"])
alist.append(qa[answer_key][0]["text"])
assert len(qlist) == len(alist) # 确保长度一样
return qlist, alist
def statistics(l):
result = {}
for sentence in l:
words = sentence.split(" ")
for word in words:
if word in result:
result[word] += 1
else:
result[word] = 1
sorted_items = sorted(result.items(),key=lambda x: -x[1])
return sorted_items
def clean(sentence):
result = []
for word in sentence.split(" "):
# 如果是停用词 则 continue
# 去标点
if re.match(r"^[\!@#$%^&*\(\)\\\{\}\[\]:\"\';\<\>\,\./\-\+=_【】;‘:“《》,。?、\?]+$",word):
continue
word = re.sub(r"[\!@#$%^&*\(\)\\\{\}\[\]:\"\';\<\>\,\./\-\+=_【】;‘:“《》,。?、\?]+","",word)
# 小写
word = word.lower()
# 去低频词
# 去数字
word = re.sub(r"[0-9]+","#number",word)
# 词性还原 lemmazation
result.append(word)
return " ".join(result)
def readWord2Vec(fd):
word2id = dict()
id2word = dict()
word2vec = []
for line in fd.readlines():
l = line.strip()
ele = l.split()
if len(ele) != 201:
continue
print(line)
word = ele[0]
if word not in word2id:
word2id[word] = len(word2id)
id2word[len(id2word)] = word
word2vec.append(ele[1:])
word2vec = np.array(word2vec,dtype='float32')
return word2id,id2word,word2vec
def get_top_results_tfidf_noindex(query,X,y):
# TODO 需要编写
"""
给定用户输入的问题 query, 返回最有可能的TOP 5问题。这里面需要做到以下几点:
1. 对于用户的输入 query 首先做一系列的预处理(上面提到的方法),然后再转换成tf-idf向量(利用上面的vectorizer)
2. 计算跟每个库里的问题之间的相似度
3. 找出相似度最高的top5问题的答案
"""
all_cosine = {}
queue = Q.PriorityQueue()
for i in range(len(X)):
x = X[i]
cos = cosine(query,x)
if cos not in all_cosine:
all_cosine[cos] = []
queue.put(cos)
all_cosine[cos].append(i)
top_idxs = [] # top_idxs存放相似度最高的(存在qlist里的)问题的下标
# hint: 请使用 priority queue来找出top results. 思考为什么可以这么做?
top = 5
while len(top_idxs) < top and not queue.empty():
cos = queue.get()
top_idxs.extend(all_cosine[cos])
result = []
for idx in top_idxs:
result.append(y[idx])
return top_idxs,result # 返回相似度最高的问题对应的答案,作为TOP5答案
def getTFIDF(query,vocab,idf):
result = [ 0. for _ in range(len(vocab)) ]
for word in query.split(" "):
if word not in vocab:
continue
result[vocab[word]] += 1
result = np.array(result)
result = result * idf
denominator = np.sum(result**2)**0.5
return np.array(result/denominator,dtype='float32')
def getInversedIndexedTable(l):
result = {}
for i in range(len(l)):
sentence = l[i]
for word in sentence.split(" "):
if word not in result:
result[word] = set()
result[word].add(i)
return result
if __name__ == "__main__":
qlist,alist = read_corpus()
sorted_items = statistics(qlist)
word_total = len(sorted_items)
print(word_total)
max_freq = sorted_items[0][1]
x = { i+1:0 for i in range(max_freq)}
for items in sorted_items:
x[items[1]] += 1
max_freq = sorted_items[0][1]
x = { i+1:0 for i in range(max_freq)}
for items in sorted_items:
x[items[1]] += 1
# print(list(x.items())[:5])
# print(list(x.values())[:5])
# print(len(x))
# print(len(list(x.values())))
# limited = 100
# plt.hist(x=np.array(list(x.values())[:limited]), bins=[i+1 for i in range(limited)], color='#0504aa')
# plt.xlabel('Value')
# plt.ylabel('Frequency')
# plt.show()
for i in range(len(qlist)):
q = qlist[i]
qlist[i] = clean(q)
print(qlist[:5])
vectorizer = CountVectorizer()
X = vectorizer.fit_transform(qlist[:1000])
tfidf_vocab = vectorizer.vocabulary_
transformer = TfidfTransformer()
tfidf = transformer.fit_transform(X)
X_tfidf = tfidf.toarray()
idf = transformer.idf_
print(idf)
print(X_tfidf[:5])
print(X_tfidf[0])
# glovefile = open("glove.6B.200d.txt","r",encoding="utf-8")
# word2id,id2word,word2vec = readWord2Vec(glovefile)
# print(list(word2id.items())[:5])
# sentenceVecs = []
# for q in qlist[:1000]:
# words = q.split(" ")
# sentenceVec = np.zeros(200).reshape((1,200))
# count = 0
# for word in words:
# count += 1
# if word not in word2id:
# continue
# sentenceVec += word2vec[word2id[word]].reshape((1,200))
# if count != 0:
# sentenceVec = sentenceVec/count
# sentenceVecs.append(sentenceVec)
# print(np.sum(sentenceVecs[:5]))
#
# bert_embedding = BertEmbedding(model='bert_12_768_12')
# result_bert = bert_embedding(qlist[:1000])
# X_bert = []
# for ele in result_bert:
# sentence_embedding = ele[1]
# X_bert.append(sentence_embedding)
# print(X_bert[:5])
test_sentence = "when did beyonce start becoming popular"
tfidf_emb = getTFIDF(test_sentence, tfidf_vocab,idf)
top_idxs,ir_answer = get_top_results_tfidf_noindex(tfidf_emb,X_tfidf,alist[:1000])
print(ir_answer)
sim_question = []
for idx in top_idxs:
sim_question.append(qlist[idx])
print(sim_question)
# encoding: utf-8
channel = {}
with open("spell-errors.txt","r",encoding="utf-8") as f:
for line in f.readlines():
eles = line.strip().split(":")
if len(eles) <= 1:
print(line)
continue
c = eles[0]
errors_dict = {}
errors = eles[1].split(",")
for s in errors:
s = s.strip()
errors_dict[s] = 1/len(errors)
channel[c] = errors_dict
print(channel)
File added
This source diff could not be displayed because it is too large. You can view the blob instead.
This source diff could not be displayed because it is too large. You can view the blob instead.
This source diff could not be displayed because it is too large. You can view the blob instead.
This source diff could not be displayed because it is too large. You can view the blob instead.
This source diff could not be displayed because it is too large. You can view the blob instead.
This source diff could not be displayed because it is too large. You can view the blob instead.
This source diff could not be displayed because it is too large. You can view the blob instead.
# encoding: utf-8
import numpy as np
from lm import getProb1
import json
channel = {}
with open("spell-errors.txt","r",encoding="utf-8") as f:
for line in f.readlines():
eles = line.strip().split(":")
if len(eles) <= 1:
print(line)
continue
c = eles[0]
errors_dict = {}
errors = eles[1].split(",")
for s in errors:
s = s.strip()
errors_dict[s] = 1/len(errors)
channel[c] = errors_dict
#print(channel)
vocab = set()
file_path = "spell-errors.txt"
with open(file_path,"r",encoding="utf-8") as f:
for line in f.readlines():
eles = line.strip().split(":")
if len(eles) <= 1:
continue
vocab.add(eles[0])
def generate_candidates(word):
candidates = {word}
for _ in [1,2]:
store = set()
for cand in candidates:
tmp = generate_candidates_with_one_ED(cand)
for t in tmp:
if t != word:
store.add(t)
candidates = store
result = set()
for cand in candidates:
if cand in vocab:
result.add(cand)
return result
def generate_candidates_with_one_ED(word):
candidates = set()
# 增
alternative = " " + word + " "
for i in range(1,len(alternative)-1):
for j in range(ord("a"),ord("a") + 26):
c = chr(j)
candidate = alternative[:i] + c + alternative[i:]
candidate = candidate.strip()
candidates.add(candidate)
# 删
for i in range(1,len(alternative)-1):
candidate = alternative[:i] + alternative[i+1:]
candidate = candidate.strip()
candidates.add(candidate)
# 改
for i in range(1,len(alternative)-1):
for j in range(ord("a"),ord("a") + 26):
c = chr(j)
if c == alternative[i]:
continue
candidate = alternative[:i] + c + alternative[i+1:]
candidate = candidate.strip()
candidates.add(candidate)
return candidates
words = ["reserve",
"reverse"]
for word in words:
print(word,word in vocab)
cands = generate_candidates(word)
print(cands)
single_path = "single.json"
dual_path = "dual.json"
with open(single_path,"r",encoding="utf-8") as f:
single = json.load(f)
with open(dual_path,"r",encoding="utf-8") as f:
dual = json.load(f)
def spell_corrector(line):
# 1. 首先做分词,然后把``line``表示成``tokens``
# 2. 循环每一token, 然后判断是否存在词库里。如果不存在就意味着是拼写错误的,需要修正。
# 修正的过程就使用上述提到的``noisy channel model``, 然后从而找出最好的修正之后的结果。
tokens = [ token for token in line.split(" ") if str(token).isalnum() ]
optimal = []
for i in range(len(tokens)):
token = tokens[i]
if token not in vocab:
cands = generate_candidates(token)
if len(cands) != 0:
info = {"token":token,"idx":i,"correct":"","prob":0.0}
for cand in cands:
# p(s|c)
if token not in channel[cand]:
psc = 1 / (len(vocab) + len(channel[cand]))
else:
psc = channel[cand][token]
pc = getProb1(" ".join(tokens[:i]+[cand]+tokens[i+1:]),single,dual)
prob = 10**(np.log10(psc)+np.log(pc))
if info["prob"] < prob:
info["prob"] = prob
info["correct"] = cand
optimal.append(info)
for info in optimal:
tokens[info["idx"]] = info["correct"]
print(optimal)
newline = " ".join(tokens)
return newline # 修正之后的结果,假如用户输入没有问题,那这时候``newline = line``
sens = ["am rienind the book ","everything will become goad"]
for sen in sens :
newsen = spell_corrector(sen)
print(newsen)
\ No newline at end of file
# encoding: utf-8
from nltk.corpus import reuters
import re
import numpy as np
import json
import os
def clean(sentence):
result = []
for word in sentence:
# 如果是停用词 则 continue
# 去标点
if re.match(r"^[\!@#$%^&*\(\)\\\{\}\[\]:\"\';\<\>\,\./\-\+=_【】;‘:“《》,。?、\?]+$",word):
continue
word = re.sub(r"[\!@#$%^&*\(\)\\\{\}\[\]:\"\';\<\>\,\./\-\+=_【】;‘:“《》,。?、\?]+","",word)
# 小写
word = word.lower()
# 去低频词
# 去数字
word = re.sub(r"[0-9]+","#number",word)
# 词性还原 lemmazation
result.append(word)
return result
def create(corpus):
dual = {}
single = {}
single["<START>"] = len(corpus)
single["<END>"] = len(corpus)
for sen in corpus:
sen = clean(sen)
for i in range(len(sen)):
word = sen[i]
if word not in single:
single[word] = 0
single[word] += 1
if i == 0 :
key = "{pre}##{cur}".format(pre="<START>",cur=word)
else:
key = "{pre}##{cur}".format(pre=sen[i-1],cur=word)
if key not in dual:
dual[key] = 0
dual[key] += 1
if len(sen) != 0:
key = "{pre}##{cur}".format(pre=sen[-1],cur="<END>")
if key not in dual:
dual[key] = 0
dual[key] += 1
return single,dual
def getProb(query,single,dual):
if type(query) == type(str()):
query = clean(query.split(" "))
vocab_size = len(single)
result = 0
for i in range(len(query)):
word = query[i]
denominator = 0
if i == 0:
pre = "<START>"
else:
pre = query[i-1]
if pre in single:
denominator += single[pre]
key = "{pre}##{cur}".format(pre=pre,cur=word)
numerator = 0
if key in dual:
numerator += dual[key]
if denominator == 0 or numerator == 0:
prob = (numerator + 1.0) / (denominator + vocab_size)
else:
prob = numerator / denominator
result += np.log10(prob)
if len(query) != 0:
word = "<END>"
denominator = 0
pre = query[i-1]
if pre in single:
denominator += single[pre]
key = "{pre}##{cur}".format(pre=pre,cur=word)
numerator = 0
if key in dual:
numerator += dual[key]
if denominator == 0 or numerator == 0:
prob = (numerator + 1.0) / (denominator + vocab_size)
else:
prob = numerator / denominator
result += np.log10(prob)
result = result/(len(query)+2)
return 10**result
def getProb1(query,single,dual):
if type(query) == type(str()):
query = clean(query.split(" "))
vocab_size = len(single)
result = 0
if len(query) != 0:
numerator = 0
if query[0] in single:
numerator += single[query[0]]
result += numerator/vocab_size
for i in range(1,len(query)):
word = query[i]
denominator = 0
if i == 0:
pre = "<START>"
else:
pre = query[i-1]
if pre in single:
denominator += single[pre]
key = "{pre}##{cur}".format(pre=pre,cur=word)
numerator = 0
if key in dual:
numerator += dual[key]
if denominator == 0 or numerator == 0:
prob = (numerator + 1.0) / (denominator + vocab_size)
else:
prob = numerator / denominator
result += np.log10(prob)
if result != 0:
result = result/len(query)
result = 10**result
return result
if __name__ == '__main__':
single_path = "single.json"
dual_path = "dual.json"
if not os.path.exists(single_path) or not os.path.exists(dual_path):
# 读取语料库的数据
categories = reuters.categories()
corpus = reuters.sents(categories=categories)
single,dual = create(corpus)
with open(single_path,"w",encoding="utf-8") as f:
json.dump(single,f,ensure_ascii=False)
with open(dual_path,"w",encoding="utf-8") as f:
json.dump(dual,f,ensure_ascii=False)
else:
with open(single_path,"r",encoding="utf-8") as f:
single = json.load(f)
with open(dual_path,"r",encoding="utf-8") as f:
dual = json.load(f)
sens = ["They told Reuter correspondents in Asian capitals a U . S . Move against Japan might boost protectionist sentiment in the U . S . And lead to curbs on American imports of their products .","x y z", "world true good","car vechicle movies film music",
"i like movie"]
for sen in sens:
print(sen,getProb(sen,single,dual),getProb1(sen,single,dual))
This source diff could not be displayed because it is too large. You can view the blob instead.
This source diff could not be displayed because it is too large. You can view the blob instead.
This source diff could not be displayed because it is too large. You can view the blob instead.
This source diff could not be displayed because it is too large. You can view the blob instead.
This source diff could not be displayed because it is too large. You can view the blob instead.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment