用Python自己写一个分词器,python实现分词功能,隐马尔科夫模型预测问题之维特比算法(Viterbi Algorithm)的Python实现

news/2024/7/5 4:58:33

 ☕️ 本文系列文章汇总:

(1)HMM开篇:基本概念和几个要素

(2)HMM计算问题:前后向算法

         代码实现 

(3)HMM学习问题:Baum-Welch算法

          代码实现
(4)  HMM预测问题:维特比算法
本篇算法原理分析及公式推导请参考: HMM预测问题:维特比算法

目录

1. 模型参数估计

2. 维特比实现

3. 完整代码Github

4. 实例


事实上维特比算法属于隐马尔科夫模型的“应用篇”,特别是在NLP的分词领域,维特比算法无处不在。我们先需要根据HMM的学习算法来学习得到一个模型λ=(π,A,B),然后再通过这个模型,利用维特比算法对数据进行预测。本篇基于维特比算法实现一个简单的分词器,有助于大家深入理解。


1. 模型参数估计

我们先通过训练集来估计出一个模型。训练集是一堆已经分好词的文本,一行一条训练样本。在训练集中,我们的观测数据是每一个字,我们的状态是每一个字对应的分词标志,一共有4种状态:S,表示单字成词;B,表示一个分出来的词的起始字;M,表示一个分出来的词的中间字;E,表示一个分出来的词的结尾字。例如:

|什么|难过|,|只不过||一次|错过

S|BE|BE|S|BME|S|BE|BE

注意,由于我们的训练集包含了事实上包含了观测值和状态值,因此我们不需要用无监督的Baum Welch算法来学习模型,只需要简单的有监督统计方法来估计模型参数即可,这个思想主要用到《统计学习方法》中10.3.1节中提到的方法。

class Model:
    def __init__(self, trainfile, N, M, Q):
        self.trainfile = trainfile
        self.N = N
        self.M = M
        self.Pi = np.zeros(N)
        self.A = np.zeros((N, N))
        self.B = np.zeros((N, M))
        self.Q2id = {x: i for i, x in enumerate(Q)}

    def cal_rate(self):
        reader = dataloader(self.trainfile)
        for i, line in enumerate(reader):
            line = line.strip().strip('\n')
            if not line:
                continue
            word_list = line.split(' ')
            status_sequence = []
            # 计算π和B中每个元素的频数
            for j, item in enumerate(word_list):
                if len(item) == 1:
                    flag = 'S'
                else:
                    flag = 'B' + 'M' * (len(item) - 2) + 'E'
                if j == 0:
                    self.Pi[self.Q2id[flag[0]]] += 1
                for t, s in enumerate(flag):
                    self.B[self.Q2id[s]][ord(item[t])] += 1
                status_sequence.extend(flag)
            # 计算A元素的频数
            for t, s in enumerate(status_sequence):
                prev = status_sequence[t - 1]
                self.A[self.Q2id[prev]][self.Q2id[s]] += 1

    def generate_model(self):
        """
        根据课本10.3.1介绍的方法,将频数参数矩阵π、A、B转换成频率参数矩阵,
        并对每一个元素取log,将后续的乘法运算转成加法运算,方便计算。
        """
        self.cal_rate()
        norm = -2.718e+16
        denominator = sum(self.Pi)
        # 处理π
        for i, pi in enumerate(self.Pi):
            if pi == 0.:
                self.Pi[i] = norm
            else:
                self.Pi[i] = np.log(pi / denominator)
        # 处理A
        for row in range(self.A.shape[0]):
            denominator = sum(self.A[row])
            for col, a in enumerate(self.A[row]):
                if a == 0.:
                    self.A[row][col] = norm
                else:
                    self.A[row][col] = np.log(a / denominator)
        # 处理B
        for row in range(self.B.shape[0]):
            denominator = sum(self.B[row])
            for col, b in enumerate(self.B[row]):
                if b == 0.:
                    self.B[row][col] = norm
                else:
                    self.B[row][col] = np.log(b / denominator)
        return AttrDict(
            pi=self.Pi,
            A=self.A,
            B=self.B
        )

2. 维特比实现

这一部分的代码完全是按照课本中算法流程【10.5】中的步骤来的,注意矩阵的运算正确即可。

class Viterbi:
    def __init__(self, model: dict):
        self.pi = model.pi
        self.A = model.A
        self.B = model.B

    def predict(self, datapath):
        """
        根据算法10.5中的流程计算δ和ψ
        """
        reader = dataloader(datapath)
        self.O = [line.strip().strip('\n') for line in reader]
        N = self.pi.shape[0]
        self.segs = []
        for o in self.O:
            o = [w for w in o if w]
            if not o:
                self.segs.append([])
                continue
            T = len(o)
            delta_t = np.zeros((T, N))
            psi_t = np.zeros((T, N))
            for t in range(T):
                if not t:
                    delta_t[t][:] = self.pi + self.B.T[:][ord(o[0])]  # 由于log转换,所以原先的*变成+
                    psi_t[t][:] = np.zeros((1, N))
                else:
                    deltaTemp = delta_t[t - 1] + self.A.T
                    for i in range(N):
                        delta_t[t][i] = max(deltaTemp[:][i]) + self.B[i][ord(o[t])]
                        psi_t[t][i] = np.argmax(deltaTemp[:][i])
            I = []
            """
            这里是回溯的过程,目的在于找最优路径,记得最后的路径需要反转,因为我们是从T时刻往前
            找的。
            """
            maxNode = np.argmax(delta_t[-1][:])
            I.append(int(maxNode))
            for t in range(T - 1, 0, -1):
                maxNode = int(psi_t[t][maxNode])
                I.append(maxNode)
            I.reverse()
            self.segs.append(I)

    def segment(self):
        """
        得到最优路径状态序列后,我们就可以根据状态序列对句子进行分割了
        """
        segments = []
        for i, line in enumerate(self.segs):
            curText = ""
            temp = []
            for j, w in enumerate(line):
                if w == 0:
                    temp.append(self.O[i][j])
                else:
                    if w != 3:
                        curText += self.O[i][j]
                    else:
                        curText += self.O[i][j]
                        temp.append(curText)
                        curText = ''
            segments.append(temp)
        return segments

3. 完整代码Github

import numpy as np


class AttrDict(dict):
    # 一个小trick,将结果返回成一个字典格式
    def __init__(self, *args, **kwargs):
        super(AttrDict, self).__init__(*args, **kwargs)
        self.__dict__ = self


def dataloader(datapath):
    with open(datapath, 'r') as reader:
        for line in reader:
            yield line


class Model:
    def __init__(self, trainfile, N, M, Q):
        self.trainfile = trainfile
        self.N = N
        self.M = M
        self.Pi = np.zeros(N)
        self.A = np.zeros((N, N))
        self.B = np.zeros((N, M))
        self.Q2id = {x: i for i, x in enumerate(Q)}

    def cal_rate(self):
        reader = dataloader(self.trainfile)
        for i, line in enumerate(reader):
            line = line.strip().strip('\n')
            if not line:
                continue
            word_list = line.split(' ')
            status_sequence = []
            # 计算π和B中每个元素的频数
            for j, item in enumerate(word_list):
                if len(item) == 1:
                    flag = 'S'
                else:
                    flag = 'B' + 'M' * (len(item) - 2) + 'E'
                if j == 0:
                    self.Pi[self.Q2id[flag[0]]] += 1
                for t, s in enumerate(flag):
                    self.B[self.Q2id[s]][ord(item[t])] += 1
                status_sequence.extend(flag)
            # 计算A元素的频数
            for t, s in enumerate(status_sequence):
                prev = status_sequence[t - 1]
                self.A[self.Q2id[prev]][self.Q2id[s]] += 1

    def generate_model(self):
        self.cal_rate()
        norm = -2.718e+16
        denominator = sum(self.Pi)
        for i, pi in enumerate(self.Pi):
            if pi == 0.:
                self.Pi[i] = norm
            else:
                self.Pi[i] = np.log(pi / denominator)

        for row in range(self.A.shape[0]):
            denominator = sum(self.A[row])
            for col, a in enumerate(self.A[row]):
                if a == 0.:
                    self.A[row][col] = norm
                else:
                    self.A[row][col] = np.log(a / denominator)

        for row in range(self.B.shape[0]):
            denominator = sum(self.B[row])
            for col, b in enumerate(self.B[row]):
                if b == 0.:
                    self.B[row][col] = norm
                else:
                    self.B[row][col] = np.log(b / denominator)
        return AttrDict(
            pi=self.Pi,
            A=self.A,
            B=self.B
        )


class Viterbi:
    def __init__(self, model: dict):
        self.pi = model.pi
        self.A = model.A
        self.B = model.B

    def predict(self, datapath):
        reader = dataloader(datapath)
        self.O = [line.strip().strip('\n') for line in reader]
        N = self.pi.shape[0]
        self.segs = []
        for o in self.O:
            o = [w for w in o if w]
            if not o:
                self.segs.append([])
                continue
            T = len(o)
            delta_t = np.zeros((T, N))
            psi_t = np.zeros((T, N))
            for t in range(T):
                if not t:
                    delta_t[t][:] = self.pi + self.B.T[:][ord(o[0])]  # 由于log转换,所以原先的*变成+
                    psi_t[t][:] = np.zeros((1, N))
                else:
                    deltaTemp = delta_t[t - 1] + self.A.T
                    for i in range(N):
                        delta_t[t][i] = max(deltaTemp[:][i]) + self.B[i][ord(o[t])]
                        psi_t[t][i] = np.argmax(deltaTemp[:][i])
            I = []
            maxNode = np.argmax(delta_t[-1][:])
            I.append(int(maxNode))
            for t in range(T - 1, 0, -1):
                maxNode = int(psi_t[t][maxNode])
                I.append(maxNode)
            I.reverse()
            self.segs.append(I)

    def segment(self):
        segments = []
        for i, line in enumerate(self.segs):
            curText = ""
            temp = []
            for j, w in enumerate(line):
                if w == 0:
                    temp.append(self.O[i][j])
                else:
                    if w != 3:
                        curText += self.O[i][j]
                    else:
                        curText += self.O[i][j]
                        temp.append(curText)
                        curText = ''
            segments.append(temp)
        return segments

4. 实例

if __name__ == '__main__':
    # 因为我们的观测值是用编码来表示汉字的,这里设置观测值为65536就是为了能最大限度覆盖所有可能出
    # 现的汉字。
    trainer = Model(N=4, M=65536, Q=['S', 'B', 'M', 'E'], trainfile='train.txt')
    model = trainer.generate_model()
    segment = Viterbi(model)
    segment.predict('test.txt')
    print(segment.segment())

我们的训练集大概长这样:

 给一条测试数据:

分词后:

[['他', '强调', ',', '党校', '始终', '不', '变', '的', '初心', '就', '是', '为', '党育', '才', '、', '为', '党', '献策', '。', '各级', '党校', '要', '坚守', '这个', '初心', ',锐', '意', '进', '取', '、', '奋发', '有', '为', ',', '为', '全', '面建', '设社', '会', '主义现', '代化国', '家', '、', '全面', '推进', '中华', '民族', '伟大', '复兴', '作', '出', '新', '的', '贡献', '。']]

可以看出,这是一般非常粗糙的分词器,虽然有些词分的不准,但是总体上还是可以的。由于我们的模型参数估计方法不是自发的学习过程,所以对于语料的依赖特别强,语料中没见过的词,就可能分错。


http://lihuaxi.xjx100.cn/news/955264.html

相关文章

树状数组讲解

树状数组 文章目录树状数组引入例题AcWing241.楼兰图腾思路代码AcWing 242. 一个简单的整数问题思路代码AcWing 244. 谜一样的牛思路代码总结引入 树状数组主要维护的是这样一个数据结构: tr[x]表示以x为终点的长度为lowbit(x)的前缀和 对于树状数组主要就两种操作 …

边缘计算开源项目解读——kubeedge mappers实现

0 背景 本文重点解读kubeedge项目中的mapper模块。该模块位于kubeedge的edgecore的南向边缘侧,主要对接入kubeedge的终端设备,进行协议的适配和转换,使其可以和边缘设备通信,转换后的协议是我们前面描述的mqtt协议,当然…

【数据库理论】研究 ANSI/ISO SQL 标准、事务、数据异常与隔离级别

文章目录 一、前言二、简介研究的演进过程三、一致性级别与锁机制3.1 定义 1 :一致性的非正规定义3.2 定义 2 :一致性的锁协议定义3.3 定义 3 :一致性的正式定义四、《ANSI SQL-92 标准》[^1] 定义的现象与隔离级别4.1 ANSI (ISO/IEC) SQL 标准简介4.2 ANSI SQL 定义的现象4…

密码传输和存储,如何保证数据安全?

本文从一个输入密码登录场景说起,详细介绍了密码传输过程的改进和思路,最后展现出一个相对安全的传输和存储方案。点击上方“后端开发技术”,选择“设为星标” ,优质资源及时送达场景在互联网项目中,我们经常会遇到以下…

中心频率法确认VMD的K值的python实现

变分模态分解VMD1.1原理 VMD算法是利用VMD是一种新的信号分解方法,就是求解变分问题。首先构造变分问题,将信号f分解为K个IMF分量,且每个模态分量都有各自的中心频率,同时求解模态的估计带宽之和最小值,约束条件为所有模态分量之和与原始信号相等。 算法认为所有分量都是集…

解锁大厂思维,剖析《阿里巴巴开发手册》之Integer缓存问题引发的分析

小熊学Java网站:https://javaxiaobear.gitee.io/,每周持续更新干货,建议收藏! 1、引言 阿里巴巴Java开发手册在第一章节,编程规约中OOP规约的第7条提到: **【强制】**所有整型包装类对象之间值的比较&…

Android Handler机制(三) Looper源码分析

一. 简介 我们接上一篇文章:Android Handler机制(二) Handler 实现原理 继续分析Looper Looper 的职责很单一,就是单纯的从 MessageQueue 中取出消息分发给消息对应 的宿主 Handler,因此它的代码不多(400行左右) . Looper 是线程独立的且每个线程只能存在…

关于如何合理设置线程池参数解决方案

关于如何合理设置线程池参数解决方案&#xff08;ThreadPoolExecutor&#xff09; 线程池参数有哪些 我们直接来看构造方法 ... public ThreadPoolExecutor(int var1, int var2, long var3, TimeUnit var5, BlockingQueue<Runnable> var6,ThreadFactory var7, Rejecte…