分词器大家都很熟悉,这里我们从源码分析下jieba分词器的实现
github地址: https://github.com/fxsjy/jieba
分析源码前我们分析下jieba的分词是怎么实现的
1 基于前缀词典实现的词表扫描,然后生成基于句中汉字所有可能成词的有向无环图(DAG)
2 采用动态规划计算最大概率路径,找出基于词频的最大切分组合
3 对于未登录词和连着多个单个词,采用了HMM模型,使用viterbi算法来计算隐藏序列
4 提前统计HMM的估计参数 PI/A/B 三个参数矩阵 (本文不打算深入HMM 不清楚的请自行搜索资料)
5 最后我们基于jieba实现一个自己的分词器
import jieba sentence = '我说上海是中国首都' data = list(jieba.cut(sentence)
1 进入cut 函数
def cut(self, sentence, cut_all=False, HMM=True, use_paddle=False):
#
is_paddle_installed = check_paddle_install['is_paddle_installed']
sentence = strdecode(sentence)
# 这里是看是否使用第三方paddle分词
# 检测paddle是否安装
if use_paddle and is_paddle_installed:
# if sentence is null, it will raise core exception in paddle.
if sentence is None or len(sentence) == 0:
return
import jieba.lac_small.predict as predict
results = predict.get_sent(sentence)
for sent in results:
if sent is None:
continue
yield sent
return
re_han = re_han_default
re_skip = re_skip_default
if cut_all:
cut_block = self.__cut_all
elif HMM:
# 我们主要分析的方法 使用 HMM
cut_block = self.__cut_DAG
2 进入__cut_DAG
def __cut_DAG(self, sentence):
# 得到句子的有向无环图 比如
# 得到 有向无环图
DAG = self.get_DAG(sentence)
route = {}
# 计算最大概率路径
self.calc(sentence, DAG, route)
x = 0
# 定义一个 临时 变量存储要处理的词
buf = ''
N = len(sentence)
# 遍历句子
while x < N:
# 取出该词到后面词的最大路径处的词
y = route[x][1] + 1
# 取出这个最大概率路径词
l_word = sentence[x:y]
if y - x == 1:
# 如果这个词长度为1 用临时变量buf 存起来 后面判断是否需要用hmm来进一步分词
buf += l_word
else:
# 如果该词是个词组 长度大于1
if buf:
# 如果buf里只存了一个词 直接返回 不需要用hmm来进一步分词
if len(buf) == 1:
yield buf
# 清空buf
buf = ''
else:
# 如果存储在buf中的多个词不在词库中 用 hmm进行分词
"""
这里重点分析下 方便理解更清楚
eg 我说北京是中国首都
前两个词 我 说 都是单个的 所有会进入上面的if buf = '我说'
然后 到 北京是 会进入 else 如果 我说 不在词库中就会进入hmm分词
北京 被后面 的 yield l_word 直接返回 然后清空 buf
"""
if not self.FREQ.get(buf):
# HMM分词
recognized = finalseg.cut(buf)
# 因为这里是个生成器 需要遍历进行返回
for t in recognized:
yield t
else:
# 如果buf中的多个词在词库中 直接遍历单个返回 不是当成词组返回,因为它单个成词的概率最大
for elem in buf:
yield elem
# 清空buf 其实 这个 buf可以放到 if else 外面 因为都需要清空
buf = ''
# 对于本来已经是词组的词直接返回 不需要进一步处理
yield l_word
x = y
# 如果buf 还有词未处理 继续处理
if buf:
# 如果只有一个词 直接返回 后面其实就和上面的if else 一样了
if len(buf) == 1:
yield buf
elif not self.FREQ.get(buf):
recognized = finalseg.cut(buf)
for t in recognized:
yield t
else:
for elem in buf:
yield elem
进入 get_DAG
def get_DAG(self, sentence):
# 检测是否初始化
# 初始化主要加载前缀字典 self.FREQ self.total 为了后续计算概率 详情见 gen_pfdict 函数
self.check_initialized()
DAG = {}
N = len(sentence)
# 遍历句子
# 构建有向无环图
# 我说北京是中国首都
# DAG: {0: [0], 1: [1], 2: [2, 3], 3: [3], 4: [4], 5: [5, 6], 6: [6], 7: [7, 8], 8: [8]}
for k in xrange(N):
tmplist = []
i = k
frag = sentence[k]
while i < N and frag in self.FREQ:
if self.FREQ[frag]:
tmplist.append(i)
i += 1
frag = sentence[k:i + 1]
if not tmplist:
tmplist.append(k)
DAG[k] = tmplist
return DAG
进入 get_pfdict
def gen_pfdict(f):
# f 其实是 dict.txt 文件 里面存所有词、次数、词性
lfreq = {}
ltotal = 0
f_name = resolve_filename(f)
for lineno, line in enumerate(f, 1):
try:
line = line.strip().decode('utf-8')
# 里面存所有词、次数、词性 这里取 词和次数
word, freq = line.split(' ')[:2]
freq = int(freq)
lfreq[word] = freq
# 记录总的次数 方便后续计算概率
ltotal += freq
# 遍历词的所有前缀存储
for ch in xrange(len(word)):
wfrag = word[:ch + 1]
if wfrag not in lfreq:
# 设置词的前缀出现的次数是0
# 大约会使词库增加40%左右
lfreq[wfrag] = 0
except ValueError:
raise ValueError(
'invalid dictionary entry in %s at Line %s: %s' % (f_name, lineno, line))
f.close()
return lfreq, ltotal
进入 calc
def calc(self, sentence, DAG, route):
# 采用动态规划 计算最大概率的切分路径
# 句子长度
N = len(sentence)
# 初始化 route 存储当前词到后面词的最大概率路径
route[N] = (0, 0)
# 很多词的出现次数其实是很小的除以total计算出的概率值太小 所有这里用对数概率来计算
# 将除法变成减法
logtotal = log(self.total)
for idx in xrange(N - 1, -1, -1):
# 反向计算路径切分最大概率
# 这里为什么要反向计算 是因为 DAG 里面每个词存了该词到后面词的所有路径
route[idx] = max((log(self.FREQ.get(sentence[idx:x + 1]) or 1) -
logtotal + route[x + 1][0], x) for x in DAG[idx])
最后分析下 HMM分词
def __cut(sentence):
global emit_P
# 通过veterbi方法计算最佳隐藏序列
prob, pos_list = viterbi(sentence, 'BMES', start_P, trans_P, emit_P)
begin, nexti = 0, 0
# print pos_list, sentence
# 处理sentence 通过隐藏序列得到对应的分词
for i, char in enumerate(sentence):
pos = pos_list[i]
if pos == 'B':
begin = i
elif pos == 'E':
yield sentence[begin:i + 1]
nexti = i + 1
elif pos == 'S':
yield char
nexti = i + 1
if nexti < len(sentence):
yield sentence[nexti:]
最后分析 viterbi方法
def viterbi(obs, states, start_p, trans_p, emit_p):
# HMM decoding / inference 已知 λ=(A,B,π) 和 观察序列 O1, O2, ..., Ot 这里的观测序列就是出现的词
# 计算隐藏序列 隐藏序列就是 BMES 组成的序列
V = [{}] # tabular
path = {}
# 初始化 V 和 path
# V 里面存储的是 当前时刻处于某一个状态的最大概率
# Path 存储的是当前状态下的最大概率路径
for y in states: # init
# 这里是对数概率 所有乘法变成 加法
V[0][y] = start_p[y] + emit_p[y].get(obs[0], MIN_FLOAT)
path[y] = [y]
# 计算每个时间点t 某个状态下的最大概率值
for t in xrange(1, len(obs)):
V.append({})
newpath = {}
# 遍历所有状态 BMES
for y in states:
# 得到当前状态下得到该词的概率
em_p = emit_p[y].get(obs[t], MIN_FLOAT)
# 这里其实也是需要遍历所有前一个状态到当前状态下的所有概率值 取最大
# 因为 BMES 有一定的规律 并不是所有状态都可以到达该状态 所以只需要遍历 PrevStatus
# PrevStatus = {
# 'B': 'ES',
# 'M': 'MB',
# 'S': 'SE',
# 'E': 'BM'
# }
(prob, state) = max(
[(V[t - 1][y0] + trans_p[y0].get(y, MIN_FLOAT) + em_p, y0) for y0 in PrevStatus[y]])
V[t][y] = prob
# 计算该状态下的最佳路径
newpath[y] = path[state] + [y]
# 存储最佳路径
path = newpath
# 因为句子最后一个词一定为 E或者S 所以只需要 遍历 ES 得到最佳路径
(prob, state) = max((V[len(obs) - 1][y], y) for y in 'ES')
return (prob, path[state])
OK 基本源码已经分析完了, 现在我们回想下 基本jieba我们是不是也能实现一个自己的分词?
接下来我们开始搭建一个自己的分词
第一步: 根据语库资料得到词库
import os
from collections import Counter
"""
根据语料
生成dict 文件
"""
class GenerateDict:
def __init__(self, data_files):
self.data_files = data_files
self._parse_file()
def _parse_file(self):
if os.path.exists('dict.txt'):
return
if len(self.data_files) == 0:
return
counter = Counter()
for file in self.data_files:
if not os.path.exists(file):
continue
f = open(file, encoding='utf-8')
lines = f.readlines()
for line in lines:
word_arr = line.strip().split()
counter.update(word_arr)
f.close()
if len(counter) == 0:
return
sorted_arr = sorted(counter.items(), key=lambda item: item[1], reverse=True)
out_file = open('dict.txt', 'w', encoding='utf-8')
for word, num in sorted_arr:
line = word + ' ' + str(num) + '\n'
out_file.write(line)
out_file.close()
if __name__ == '__main__':
data_files = ['data.txt']
GenerateDict(data_files)
第二步 由于使用的HMM 我们需要自己统计参数λ=(A,B,π)并存储下来
"""
生成 HMM 参数
(A, B, pi) 发射概率、转移概率、初始概率
"""
import os
from math import log
import json
MIN_FLOAT = -3.14e100
class GenerateProb:
def __init__(self, data_files):
self.data_files = data_files
self.STATUS = 'BMES'
# 初始概率
self.PI = {}
# 发射概率
self.A = {}
# 转移概率
self.B = {}
self._parse_file()
def _parse_file(self):
if len(self.data_files) == 0:
return
for file in self.data_files:
if not os.path.exists(file):
continue
f = open(file, encoding='utf-8')
lines = f.readlines()
for line in lines:
word_arr = line.strip().split()
labels = []
for index, word in enumerate(word_arr):
if len(word) == 1:
label = 'S'
else:
label = 'B' + 'M' * (len(word) - 2) + 'E'
# 取第一个字
if index == 0:
key = label[0]
if key in self.PI:
self.PI[key] += 1
else:
self.PI[key] = 1
# 对于每一个词统计发射数
for i, item in enumerate(label):
ch = word[i]
if item not in self.A:
self.A[item] = {}
self.A[item][ch] = 1
else:
if ch not in self.A[item]:
self.A[item][ch] = {}
self.A[item][ch] = 1
else:
self.A[item][ch] += 1
labels.extend(label)
# 统计B
for i in range(1, len(labels)):
pre_status = labels[i - 1]
cur_status = labels[i]
if pre_status not in self.B:
self.B[pre_status] = {}
self.B[pre_status][cur_status] = 1
else:
if cur_status not in self.B[pre_status]:
self.B[pre_status][cur_status] = {}
self.B[pre_status][cur_status] = 1
else:
self.B[pre_status][cur_status] += 1
f.close()
# 计算 PI A B
self.__calc_params()
def __calc_params(self):
# 统计PI
total = 0.
for key, value in self.PI.items():
total += value
for state in self.STATUS:
if state not in self.PI:
self.PI[state] = MIN_FLOAT
else:
self.PI[state] = log(self.PI[state] / total)
# 统计 B
for key, item in self.B.items():
total = 0.
for inner_key, value in item.items():
total += value
for inner_key, value in item.items():
item[inner_key] = log(value / total)
# 统计 A
for key, item in self.A.items():
total = 0.
for inner_key, value in item.items():
total += value
for inner_key, value in item.items():
item[inner_key] = log(value / total)
if __name__ == '__main__':
data_files = ['data.txt']
prob = GenerateProb(data_files)
data = {
'PI': prob.PI,
'B': prob.B,
'A': prob.A
}
if not os.path.exists('prob.json'):
print('write prob to json')
with open('prob.json', 'w') as f:
json.dump(data, f)
第三步 然后实现自己的分词
from math import log
import hmm
class Tokenizer:
def __init__(self):
self.initialized = False
self.FREQ, self.total = {}, 0
self.initialize()
def initialize(self):
freq_dict = {}
total = 0
f = open('dict.txt', encoding='utf-8')
lines = f.readlines()
for line in lines:
line = line.strip()
word_freq = line.split()
if len(word_freq) < 2:
continue
word, freq = word_freq[0:2]
freq = int(freq)
freq_dict[word] = freq
total += freq
for ch in range(len(word)):
sub_word = word[:ch + 1]
if sub_word not in freq_dict:
freq_dict[sub_word] = 0
f.close()
self.FREQ, self.total = freq_dict, total
self.initialized = True
def check_initialized(self):
if not self.initialized:
self.initialize()
def get_DAG(self, sentence):
self.check_initialized()
DAG = {}
N = len(sentence)
for k in range(N):
tmplist = []
i = k
frag = sentence[k]
while i < N and frag in self.FREQ:
if self.FREQ[frag]:
tmplist.append(i)
i += 1
frag = sentence[k:i + 1]
if not tmplist:
tmplist.append(k)
DAG[k] = tmplist
return DAG
def calc(self, sentence, DAG, route):
N = len(sentence)
route[N] = (0, 0)
logtotal = log(self.total)
for idx in range(N - 1, -1, -1):
route[idx] = max((log(self.FREQ.get(sentence[idx:x + 1]) or 1) -
logtotal + route[x + 1][0], x) for x in DAG[idx])
def __cut_DAG(self, sentence):
DAG = self.get_DAG(sentence)
route = {}
self.calc(sentence, DAG, route)
x = 0
N = len(sentence)
result = []
buf = ''
while x < N:
y = route[x][1] + 1
tmp_word = sentence[x:y]
if y - x == 1:
buf += tmp_word
else:
if buf:
if len(buf) == 1:
result.append(buf)
else:
if buf not in self.FREQ:
# HMM 分词
recognized = hmm.cut(buf)
for elem in recognized:
result.append(elem)
else:
for elem in buf:
result.append(elem)
buf = ''
result.append(tmp_word)
x = y
if buf:
if len(buf) == 1:
result.append(buf)
elif buf not in self.FREQ:
# HMM 分词
recognized = hmm.cut(buf)
for elem in recognized:
result.append(elem)
else:
for elem in buf:
result.append(elem)
return result
def cut(self, sentence, HMM=True):
if HMM:
cut_block = self.__cut_DAG
else:
raise NotImplemented("error not implement")
return cut_block(sentence)
tk = Tokenizer()
cut = tk.cut
第四步 实现HMM分词
import os
import json
MIN_FLOAT = -3.14e100
# load start_P, trans_P, emit_P
if os.path.exists('../prob.json'):
f = open('../prob.json')
data = json.load(f)
start_P = data['PI']
trans_P = data['B']
emit_P = data['A']
else:
from .prob_start import P as start_P
from .prob_trans import P as trans_P
from .prob_emit import P as emit_P
# B - begin
# M - Mid
# S - Single
# E - End
STATES = 'BMES'
# 每个状态的前一个状态可能出现的状态 比如 E -> B S -> B B不能转到B M不能转到B
PrevStatus = {
'B': 'ES',
'M': 'MB',
'S': 'SE',
'E': 'BM'
}
def viterbi(obs, states, start_p, trans_p, emit_p):
V = [{}] # tabular
path = {}
for y in states: # init
V[0][y] = start_p[y] + emit_p[y].get(obs[0], MIN_FLOAT)
path[y] = [y]
for t in range(1, len(obs)):
V.append({})
newpath = {}
for y in states:
em_p = emit_p[y].get(obs[t], MIN_FLOAT)
(prob, state) = max(
[(V[t - 1][y0] + trans_p[y0].get(y, MIN_FLOAT) + em_p, y0) for y0 in PrevStatus[y]])
V[t][y] = prob
newpath[y] = path[state] + [y]
path = newpath
(prob, state) = max((V[len(obs) - 1][y], y) for y in 'ES')
return prob, path[state]
def cut(sentence):
global emit_P
prob, pos_list = viterbi(sentence, STATES, start_P, trans_P, emit_P)
begin, nexti = 0, 0
for i, char in enumerate(sentence):
pos = pos_list[i]
if pos == 'B':
begin = i
elif pos == 'E':
yield sentence[begin:i + 1]
nexti = i + 1
elif pos == 'S':
yield char
nexti = i + 1
if nexti < len(sentence):
yield sentence[nexti:]
最后测试
import jieba import tokenizer sentence = '我说北京是中国首都' data = list(jieba.cut(sentence)) print('jieba:', data) data = tokenizer.cut(sentence) print('tokenizer:', data)
输出结果:
从结果上可以看出来: 效果还算不错
当然这只是个大概功能,大家也可以按照自己的想法进行功能添加和修改。
最后附上代码的github地址:
https://github.com/under-the-moon/myjiebahttps://github.com/under-the-moon/myjieba
以上均为原创,转载请添加来源!
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)