jieba分词的源码解析,并从零实现自己的分词器

jieba分词的源码解析,并从零实现自己的分词器,第1张

分词器大家都很熟悉,这里我们从源码分析下jieba分词器的实现

github地址: https://github.com/fxsjy/jieba

分析源码前我们分析下jieba的分词是怎么实现的

1 基于前缀词典实现的词表扫描,然后生成基于句中汉字所有可能成词的有向无环图(DAG)

2 采用动态规划计算最大概率路径,找出基于词频的最大切分组合

3 对于未登录词和连着多个单个词,采用了HMM模型,使用viterbi算法来计算隐藏序列

4 提前统计HMM的估计参数 PI/A/B 三个参数矩阵 (本文不打算深入HMM 不清楚的请自行搜索资料)

5 最后我们基于jieba实现一个自己的分词器

import jieba
sentence = '我说上海是中国首都'
data = list(jieba.cut(sentence)

1  进入cut 函数

def cut(self, sentence, cut_all=False, HMM=True, use_paddle=False):
    #
    is_paddle_installed = check_paddle_install['is_paddle_installed']
    sentence = strdecode(sentence)
    # 这里是看是否使用第三方paddle分词
    # 检测paddle是否安装
    if use_paddle and is_paddle_installed:
        # if sentence is null, it will raise core exception in paddle.
        if sentence is None or len(sentence) == 0:
            return
        import jieba.lac_small.predict as predict
        results = predict.get_sent(sentence)
        for sent in results:
            if sent is None:
                continue
            yield sent
        return
    re_han = re_han_default
    re_skip = re_skip_default
    if cut_all:
        cut_block = self.__cut_all
    elif HMM:
        # 我们主要分析的方法 使用 HMM
        cut_block = self.__cut_DAG

2 进入__cut_DAG

def __cut_DAG(self, sentence):
    # 得到句子的有向无环图 比如
    # 得到 有向无环图
    DAG = self.get_DAG(sentence)
    route = {}
    # 计算最大概率路径
    self.calc(sentence, DAG, route)
    x = 0
    # 定义一个 临时 变量存储要处理的词
    buf = ''
    N = len(sentence)
    # 遍历句子
    while x < N:
        # 取出该词到后面词的最大路径处的词
        y = route[x][1] + 1
        # 取出这个最大概率路径词
        l_word = sentence[x:y]
        if y - x == 1:
            # 如果这个词长度为1 用临时变量buf 存起来 后面判断是否需要用hmm来进一步分词
            buf += l_word
        else:
            # 如果该词是个词组 长度大于1
            if buf:
                # 如果buf里只存了一个词 直接返回 不需要用hmm来进一步分词
                if len(buf) == 1:
                    yield buf
                    # 清空buf
                    buf = ''
                else:
                    # 如果存储在buf中的多个词不在词库中 用 hmm进行分词
                    """
                        这里重点分析下 方便理解更清楚
                        eg  我说北京是中国首都
                        前两个词  我 说 都是单个的 所有会进入上面的if  buf = '我说'
                        然后 到 北京是  会进入 else  如果  我说 不在词库中就会进入hmm分词 
                        北京 被后面 的 yield l_word 直接返回  然后清空 buf 
                    """
                    if not self.FREQ.get(buf):
                        # HMM分词
                        recognized = finalseg.cut(buf)
                        # 因为这里是个生成器 需要遍历进行返回
                        for t in recognized:
                            yield t
                    else:
                        # 如果buf中的多个词在词库中 直接遍历单个返回 不是当成词组返回,因为它单个成词的概率最大
                        for elem in buf:
                            yield elem
                    # 清空buf  其实 这个 buf可以放到 if else 外面 因为都需要清空
                    buf = ''
            # 对于本来已经是词组的词直接返回  不需要进一步处理
            yield l_word
        x = y
    # 如果buf 还有词未处理 继续处理
    if buf:
        # 如果只有一个词 直接返回 后面其实就和上面的if else 一样了
        if len(buf) == 1:
            yield buf
        elif not self.FREQ.get(buf):
            recognized = finalseg.cut(buf)
            for t in recognized:
                yield t
        else:
            for elem in buf:
                yield elem

进入 get_DAG

def get_DAG(self, sentence):
    # 检测是否初始化
    # 初始化主要加载前缀字典 self.FREQ  self.total 为了后续计算概率 详情见 gen_pfdict 函数
    self.check_initialized()
    DAG = {}
    N = len(sentence)
    # 遍历句子
    # 构建有向无环图
    # 我说北京是中国首都
    # DAG: {0: [0], 1: [1], 2: [2, 3], 3: [3], 4: [4], 5: [5, 6], 6: [6], 7: [7, 8], 8: [8]}
    for k in xrange(N):
        tmplist = []
        i = k
        frag = sentence[k]
        while i < N and frag in self.FREQ:
            if self.FREQ[frag]:
                tmplist.append(i)
            i += 1
            frag = sentence[k:i + 1]
        if not tmplist:
            tmplist.append(k)
        DAG[k] = tmplist
    return DAG

进入 get_pfdict 

def gen_pfdict(f):
    # f 其实是 dict.txt 文件 里面存所有词、次数、词性
    lfreq = {}
    ltotal = 0
    f_name = resolve_filename(f)
    for lineno, line in enumerate(f, 1):
        try:
            line = line.strip().decode('utf-8')
            # 里面存所有词、次数、词性 这里取 词和次数
            word, freq = line.split(' ')[:2]
            freq = int(freq)
            lfreq[word] = freq
            # 记录总的次数 方便后续计算概率
            ltotal += freq
            # 遍历词的所有前缀存储
            for ch in xrange(len(word)):
                wfrag = word[:ch + 1]
                if wfrag not in lfreq:
                    # 设置词的前缀出现的次数是0
                    # 大约会使词库增加40%左右
                    lfreq[wfrag] = 0
        except ValueError:
            raise ValueError(
                'invalid dictionary entry in %s at Line %s: %s' % (f_name, lineno, line))
    f.close()
    return lfreq, ltotal

 进入 calc

def calc(self, sentence, DAG, route):
    # 采用动态规划 计算最大概率的切分路径
    # 句子长度
    N = len(sentence)
    # 初始化  route 存储当前词到后面词的最大概率路径
    route[N] = (0, 0)
    # 很多词的出现次数其实是很小的除以total计算出的概率值太小  所有这里用对数概率来计算
    # 将除法变成减法
    logtotal = log(self.total)
    for idx in xrange(N - 1, -1, -1):
        # 反向计算路径切分最大概率
        # 这里为什么要反向计算 是因为 DAG 里面每个词存了该词到后面词的所有路径 
        route[idx] = max((log(self.FREQ.get(sentence[idx:x + 1]) or 1) -
                          logtotal + route[x + 1][0], x) for x in DAG[idx])

最后分析下 HMM分词

def __cut(sentence):
    global emit_P
    # 通过veterbi方法计算最佳隐藏序列
    prob, pos_list = viterbi(sentence, 'BMES', start_P, trans_P, emit_P)
    begin, nexti = 0, 0
    # print pos_list, sentence
    # 处理sentence 通过隐藏序列得到对应的分词
    for i, char in enumerate(sentence):
        pos = pos_list[i]
        if pos == 'B':
            begin = i
        elif pos == 'E':
            yield sentence[begin:i + 1]
            nexti = i + 1
        elif pos == 'S':
            yield char
            nexti = i + 1
    if nexti < len(sentence):
        yield sentence[nexti:]

最后分析 viterbi方法

def viterbi(obs, states, start_p, trans_p, emit_p):
    # HMM decoding / inference  已知 λ=(A,B,π) 和 观察序列 O1, O2, ..., Ot 这里的观测序列就是出现的词
    # 计算隐藏序列 隐藏序列就是 BMES 组成的序列
    V = [{}]  # tabular
    path = {}
    # 初始化 V 和 path
    # V 里面存储的是 当前时刻处于某一个状态的最大概率
    # Path 存储的是当前状态下的最大概率路径
    for y in states:  # init
        # 这里是对数概率  所有乘法变成 加法
        V[0][y] = start_p[y] + emit_p[y].get(obs[0], MIN_FLOAT)
        path[y] = [y]
    # 计算每个时间点t 某个状态下的最大概率值
    for t in xrange(1, len(obs)):
        V.append({})
        newpath = {}
        # 遍历所有状态 BMES 
        for y in states:
            # 得到当前状态下得到该词的概率
            em_p = emit_p[y].get(obs[t], MIN_FLOAT)
            # 这里其实也是需要遍历所有前一个状态到当前状态下的所有概率值 取最大 
            # 因为 BMES 有一定的规律 并不是所有状态都可以到达该状态 所以只需要遍历 PrevStatus
            # PrevStatus = {
            #     'B': 'ES',
            #     'M': 'MB',
            #     'S': 'SE',
            #     'E': 'BM'
            # }
            (prob, state) = max(
                [(V[t - 1][y0] + trans_p[y0].get(y, MIN_FLOAT) + em_p, y0) for y0 in PrevStatus[y]])
            V[t][y] = prob
            # 计算该状态下的最佳路径
            newpath[y] = path[state] + [y]
        # 存储最佳路径
        path = newpath
    # 因为句子最后一个词一定为 E或者S 所以只需要 遍历 ES 得到最佳路径
    (prob, state) = max((V[len(obs) - 1][y], y) for y in 'ES')

    return (prob, path[state])

OK 基本源码已经分析完了, 现在我们回想下 基本jieba我们是不是也能实现一个自己的分词?

接下来我们开始搭建一个自己的分词

第一步: 根据语库资料得到词库

import os
from collections import Counter

"""
    根据语料
    生成dict 文件
"""


class GenerateDict:
    def __init__(self, data_files):
        self.data_files = data_files
        self._parse_file()

    def _parse_file(self):
        if os.path.exists('dict.txt'):
            return
        if len(self.data_files) == 0:
            return
        counter = Counter()
        for file in self.data_files:
            if not os.path.exists(file):
                continue
            f = open(file, encoding='utf-8')
            lines = f.readlines()
            for line in lines:
                word_arr = line.strip().split()
                counter.update(word_arr)
            f.close()
        if len(counter) == 0:
            return
        sorted_arr = sorted(counter.items(), key=lambda item: item[1], reverse=True)
        out_file = open('dict.txt', 'w', encoding='utf-8')
        for word, num in sorted_arr:
            line = word + ' ' + str(num) + '\n'
            out_file.write(line)
        out_file.close()


if __name__ == '__main__':
    data_files = ['data.txt']
    GenerateDict(data_files)

第二步 由于使用的HMM 我们需要自己统计参数λ=(A,B,π)并存储下来

"""
生成 HMM 参数
(A, B, pi) 发射概率、转移概率、初始概率
"""

import os
from math import log
import json

MIN_FLOAT = -3.14e100


class GenerateProb:
    def __init__(self, data_files):
        self.data_files = data_files
        self.STATUS = 'BMES'
        # 初始概率
        self.PI = {}
        # 发射概率
        self.A = {}
        # 转移概率
        self.B = {}
        self._parse_file()

    def _parse_file(self):
        if len(self.data_files) == 0:
            return
        for file in self.data_files:
            if not os.path.exists(file):
                continue
            f = open(file, encoding='utf-8')
            lines = f.readlines()
            for line in lines:
                word_arr = line.strip().split()
                labels = []
                for index, word in enumerate(word_arr):
                    if len(word) == 1:
                        label = 'S'
                    else:
                        label = 'B' + 'M' * (len(word) - 2) + 'E'
                    # 取第一个字
                    if index == 0:
                        key = label[0]
                        if key in self.PI:
                            self.PI[key] += 1
                        else:
                            self.PI[key] = 1
                    # 对于每一个词统计发射数
                    for i, item in enumerate(label):
                        ch = word[i]
                        if item not in self.A:
                            self.A[item] = {}
                            self.A[item][ch] = 1
                        else:
                            if ch not in self.A[item]:
                                self.A[item][ch] = {}
                                self.A[item][ch] = 1
                            else:
                                self.A[item][ch] += 1

                    labels.extend(label)
                # 统计B
                for i in range(1, len(labels)):
                    pre_status = labels[i - 1]
                    cur_status = labels[i]
                    if pre_status not in self.B:
                        self.B[pre_status] = {}
                        self.B[pre_status][cur_status] = 1
                    else:
                        if cur_status not in self.B[pre_status]:
                            self.B[pre_status][cur_status] = {}
                            self.B[pre_status][cur_status] = 1
                        else:
                            self.B[pre_status][cur_status] += 1
            f.close()
        # 计算 PI A B
        self.__calc_params()

    def __calc_params(self):
        # 统计PI
        total = 0.
        for key, value in self.PI.items():
            total += value
        for state in self.STATUS:
            if state not in self.PI:
                self.PI[state] = MIN_FLOAT
            else:
                self.PI[state] = log(self.PI[state] / total)
        # 统计 B
        for key, item in self.B.items():
            total = 0.
            for inner_key, value in item.items():
                total += value
            for inner_key, value in item.items():
                item[inner_key] = log(value / total)

        # 统计 A
        for key, item in self.A.items():
            total = 0.
            for inner_key, value in item.items():
                total += value
            for inner_key, value in item.items():
                item[inner_key] = log(value / total)


if __name__ == '__main__':
    data_files = ['data.txt']
    prob = GenerateProb(data_files)
    data = {
        'PI': prob.PI,
        'B': prob.B,
        'A': prob.A
    }
    if not os.path.exists('prob.json'):
        print('write prob to json')
        with open('prob.json', 'w') as f:
            json.dump(data, f)

第三步 然后实现自己的分词

from math import log
import hmm


class Tokenizer:
    def __init__(self):
        self.initialized = False
        self.FREQ, self.total = {}, 0
        self.initialize()

    def initialize(self):
        freq_dict = {}
        total = 0
        f = open('dict.txt', encoding='utf-8')
        lines = f.readlines()
        for line in lines:
            line = line.strip()
            word_freq = line.split()
            if len(word_freq) < 2:
                continue
            word, freq = word_freq[0:2]
            freq = int(freq)
            freq_dict[word] = freq
            total += freq
            for ch in range(len(word)):
                sub_word = word[:ch + 1]
                if sub_word not in freq_dict:
                    freq_dict[sub_word] = 0
        f.close()
        self.FREQ, self.total = freq_dict, total
        self.initialized = True

    def check_initialized(self):
        if not self.initialized:
            self.initialize()

    def get_DAG(self, sentence):
        self.check_initialized()
        DAG = {}
        N = len(sentence)
        for k in range(N):
            tmplist = []
            i = k
            frag = sentence[k]
            while i < N and frag in self.FREQ:
                if self.FREQ[frag]:
                    tmplist.append(i)
                i += 1
                frag = sentence[k:i + 1]
            if not tmplist:
                tmplist.append(k)
            DAG[k] = tmplist
        return DAG

    def calc(self, sentence, DAG, route):
        N = len(sentence)
        route[N] = (0, 0)
        logtotal = log(self.total)
        for idx in range(N - 1, -1, -1):
            route[idx] = max((log(self.FREQ.get(sentence[idx:x + 1]) or 1) -
                              logtotal + route[x + 1][0], x) for x in DAG[idx])

    def __cut_DAG(self, sentence):
        DAG = self.get_DAG(sentence)
        route = {}
        self.calc(sentence, DAG, route)
        x = 0
        N = len(sentence)
        result = []
        buf = ''
        while x < N:
            y = route[x][1] + 1
            tmp_word = sentence[x:y]
            if y - x == 1:
                buf += tmp_word
            else:
                if buf:
                    if len(buf) == 1:
                        result.append(buf)
                    else:
                        if buf not in self.FREQ:
                            # HMM 分词
                            recognized = hmm.cut(buf)
                            for elem in recognized:
                                result.append(elem)
                        else:
                            for elem in buf:
                                result.append(elem)
                    buf = ''
                result.append(tmp_word)
            x = y
        if buf:
            if len(buf) == 1:
                result.append(buf)
            elif buf not in self.FREQ:
                # HMM 分词
                recognized = hmm.cut(buf)
                for elem in recognized:
                    result.append(elem)
            else:
                for elem in buf:
                    result.append(elem)
        return result

    def cut(self, sentence, HMM=True):
        if HMM:
            cut_block = self.__cut_DAG
        else:
            raise NotImplemented("error not implement")

        return cut_block(sentence)


tk = Tokenizer()
cut = tk.cut

第四步 实现HMM分词

import os
import json

MIN_FLOAT = -3.14e100


# load start_P, trans_P, emit_P
if os.path.exists('../prob.json'):
    f = open('../prob.json')
    data = json.load(f)
    start_P = data['PI']
    trans_P = data['B']
    emit_P = data['A']
else:
    from .prob_start import P as start_P
    from .prob_trans import P as trans_P
    from .prob_emit import P as emit_P

# B - begin
# M - Mid
# S - Single
# E - End
STATES = 'BMES'
# 每个状态的前一个状态可能出现的状态  比如  E -> B  S -> B  B不能转到B M不能转到B
PrevStatus = {
    'B': 'ES',
    'M': 'MB',
    'S': 'SE',
    'E': 'BM'
}


def viterbi(obs, states, start_p, trans_p, emit_p):
    V = [{}]  # tabular
    path = {}
    for y in states:  # init
        V[0][y] = start_p[y] + emit_p[y].get(obs[0], MIN_FLOAT)
        path[y] = [y]
    for t in range(1, len(obs)):
        V.append({})
        newpath = {}
        for y in states:
            em_p = emit_p[y].get(obs[t], MIN_FLOAT)
            (prob, state) = max(
                [(V[t - 1][y0] + trans_p[y0].get(y, MIN_FLOAT) + em_p, y0) for y0 in PrevStatus[y]])
            V[t][y] = prob
            newpath[y] = path[state] + [y]
        path = newpath

    (prob, state) = max((V[len(obs) - 1][y], y) for y in 'ES')

    return prob, path[state]


def cut(sentence):
    global emit_P
    prob, pos_list = viterbi(sentence, STATES, start_P, trans_P, emit_P)
    begin, nexti = 0, 0
    for i, char in enumerate(sentence):
        pos = pos_list[i]
        if pos == 'B':
            begin = i
        elif pos == 'E':
            yield sentence[begin:i + 1]
            nexti = i + 1
        elif pos == 'S':
            yield char
            nexti = i + 1
    if nexti < len(sentence):
        yield sentence[nexti:]

最后测试

import jieba
import tokenizer

sentence = '我说北京是中国首都'

data = list(jieba.cut(sentence))
print('jieba:', data)
data = tokenizer.cut(sentence)
print('tokenizer:', data)

输出结果:

 从结果上可以看出来: 效果还算不错

当然这只是个大概功能,大家也可以按照自己的想法进行功能添加和修改。


最后附上代码的github地址:

https://github.com/under-the-moon/myjiebahttps://github.com/under-the-moon/myjieba

 以上均为原创,转载请添加来源!

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/langs/578341.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-04-11
下一篇 2022-04-11

发表评论

登录后才能评论

评论列表(0条)

保存