租约-代码实践

news/2024/7/5 1:51:55

租约

本文主要根据租约的基本原理,采用go语言实践一下,租约的基本流程。

租约设计概述

用例模型

租约的主要机制就是为了保证在分布式环境下使得各个客户端使用的数据保持强一致性,每个客户端在查询服务器数据的时候,都在服务端存在一个租约信息,如果服务端还有租约没有到期,则客户端提交的数据修改阻塞到所有的租约过期菜可进行操作。基本的用例场景描述分为如下几种。

客户端查询数据用例
用例名称客户端查询数据
主要参与者客户端,服务端
涉及关注点客户端:客户端需要知道的数据在客户端本地缓存没有找到,需要向服务端请求查询数据
前置条件服务器正常运行
后置条件
基本流程情况1:当前没有阻塞的客户端需要修改数据,此时就直接返回数据;情况2:当前服务端已经存在客户端需要修改数据的请求,此时需要阻塞等待该客户端完成修改之后,将修改之后的数据进行返回
客户端修改数据
用例名称客户端修改数据
主要参与者客户端,服务端
涉及关注点客户端:客户端需要将要需要的数据发送到服务端
前置条件服务器正常运行
后置条件
基本流程情况1:当前客户端修改数据的请求发送到服务端的时候,此时服务端没有租约则直接阻塞其他读请求并修改数据,修改完成之后返回数据;情况2:当前服务端还有租约未到期,此时阻塞所有的读请求并阻塞所有后续的修改请求,等到所有租约到期之后,此时修改完成数据,完成之后再返回数据,其他则阻塞的请求继续执行
时序图
根据用例模型查询数据与修改数据的时序图
服务端客户端A客户端B客户端C连接服务端返回连接成功连接服务端返回连接成功连接服务端返回连接成功查询数据设置A的租约并返回数据查询数据设置B的租约并返回数据查询数据设置C的租约并返回数据修改数据因为此时A B C三个租约未到期等到租约到期待到租约全部到期并修改数据后返回数据服务端客户端A客户端B客户端C
代码设计

代码设计过程中,由于水平限制,代码的设计过程会有瑕疵。尽量按照有关面向对象的设计原则来。主要包含了三个文件,分别为server.go(服务端代码主要逻辑)、lease_library.go(客户端调用内容的实现)、protocol.go(传输内容的序列号与反序列化)、get_op.go(查询的测试代码)、set_op(修改值的测试代码)。代码的目的主要是为了实践一下流程,所以协议的编写与支持的命令都相对简单,当前对go语言的编写不熟练导致代码中异常的处理会不规范,后续再深入学习并修改。

服务端代码
package mainimport ("bytes""fmt""log""net""sync""time"
)func init() {log.SetFlags(log.Ltime|log.Lshortfile)
}var StoreValues map[string]stringtype ServerLeases struct {mutex sync.Mutexcond *sync.CondwaitChangeFlag boolclientNums intmodfiyChan chan bool
}func(self *ServerLeases) getWaitStatus()bool{return self.waitChangeFlag
}func(self *ServerLeases) updateWaitStatus(status bool){self.waitChangeFlag = status
}func(self *ServerLeases) checkLeases(){self.mutex.Lock()self.clientNums -= 1log.Println("check leases  ", self.clientNums)if self.waitChangeFlag == true && self.clientNums == 0 {log.Println("send chan to block chan modfiyChan")self.modfiyChan <- true}self.mutex.Unlock()
}func(self *ServerLeases) addLeasesTimer(){self.mutex.Lock()self.clientNums += 1time.AfterFunc(12*time.Second, self.checkLeases)log.Println("current clientNums ", self.clientNums)self.mutex.Unlock()
}func(self *ServerLeases) waitLeasesTimeout(){self.cond.L.Lock()self.cond.Wait()self.cond.L.Unlock()
}func(self *ServerLeases) notifyWaitLeasesClients(){self.cond.L.Lock()self.cond.Broadcast()self.cond.L.Unlock()
}var serverLeases ServerLeasestype ClientServer struct {conn net.ConnrecvData stringsendData []bytewriteChan chan bool
}func(self *ClientServer) init(c net.Conn){self.conn = cself.recvData = ""self.sendData = []byte("")self.writeChan = make(chan bool)// 开启协程启动执行 一个读 一个写go self.HandleReadEvent()go self.SendData()
}func(self *ClientServer) Dispatch(params map[string]interface{}){if params == nil {return}// 发送回客户端var sendMap map[string]interface{}sendMap = make(map[string]interface{})method, ok := params["method"]if ok != true {return}// 检查服务端状态 加锁if method == "query" {serverLeases.mutex.Lock()wait_status := serverLeases.getWaitStatus()log.Println("get query wait status ", wait_status)// 添加到租约列表中// 如果当前有修改操作则阻塞当前查询操作if wait_status == true {log.Println("wait .... and release lock")serverLeases.mutex.Unlock()serverLeases.waitLeasesTimeout()log.Println("notify from and  continue")} else {serverLeases.mutex.Unlock()}serverLeases.addLeasesTimer()sendMap["data"] = StoreValues} else if method == "modfiy" {// 修改操作则检查当前是否有客户端有租约serverLeases.mutex.Lock()serverLeases.updateWaitStatus(true)serverLeases.mutex.Unlock()log.Println("wait all leases expire  ", serverLeases)if serverLeases.clientNums > 0 {log.Println("wait modfiy chan")<- serverLeases.modfiyChan}log.Println("modfiy chan start  ")// 修改数据serverLeases.mutex.Lock()log.Println("-----------------------")log.Println("modfiy data ", params)datas := params["data"]var modfiyData map[string]interface{}modfiyData = make(map[string]interface{})modfiyData = datas.(map[string]interface{})for k, v := range modfiyData{StoreValues[k] = v.(string)}log.Println("modify after ", StoreValues)sendMap["data"] = StoreValuesserverLeases.updateWaitStatus(false)serverLeases.mutex.Unlock()// 唤醒 其他等待协程serverLeases.notifyWaitLeasesClients()}sendData := SerializeProtocol(sendMap)// 更新发送缓冲区var buffer bytes.Bufferbuffer.Write(self.sendData)buffer.Write(sendData)self.sendData = buffer.Bytes()self.writeChan <- true
}func(self *ClientServer) HandleRecv(){// 清空接受缓冲区var result map[string]interface{}self.recvData, result = DeserializeProtocol(self.recvData)// 处理业务逻辑if result == nil {return}self.Dispatch(result)
}func(self *ClientServer) SendData(){for {_, ok := <-self.writeChanif ok == false{log.Println("writeChan error ", ok)return}for len(self.sendData) > 0 {n, err := self.conn.Write(self.sendData)if err != nil {fmt.Println("client write error ", err)self.Close()return}self.sendData = self.sendData[n:]}}
}func(self *ClientServer) HandleReadEvent(){var recvByte = make([]byte, 20)for {// 优先发送数据返回n, err := self.conn.Read(recvByte)if err != nil {fmt.Println("client recv error ", err)self.Close()return}self.recvData += string(recvByte[:n])self.HandleRecv()}
}func(self *ClientServer) Close(){self.conn.Close()close(self.writeChan)
}func main() {fmt.Println("start")StoreValues = make(map[string]string)StoreValues["store1"] = "stor1_value"StoreValues["store2"] = "stor2_value"var new_mutex sync.MutexserverLeases = ServerLeases{}serverLeases.mutex = new_mutexserverLeases.cond = sync.NewCond(&sync.Mutex{})serverLeases.modfiyChan = make(chan bool)serverLeases.clientNums = 0serverLeases.waitChangeFlag = falseserver, err := net.Listen("tcp", "127.0.0.1:7070")if err != nil{fmt.Println("listen error :", err)return}for {conn, err := server.Accept()if err != nil{fmt.Println("accept error: ", err)continue}c := ClientServer{}c.init(conn)}
}

服务端代码量不多,其主要的实现思想,就是将每一个新加入的conn包装成一个结构体,该结构体ClientServer通过该结构体来实现数据的查询与修改,接着在服务端实现了一个全局的管理租约的ServerLeases结构体,所有加入的连接都通过该结构体的条件变量来保证在阻塞之后能够被唤醒,并通过加锁来实现数据的修改的原子性,并为每个已经发送的租约设置定时回调,保证租约过期后通过阻塞的需要修改数据的协程依次完成事件通知。

协议代码
package mainimport ("bytes""encoding/json""fmt""strconv""strings"
)/*protocollen\r\n{"method":"test","data":"val"}通过长度与json来进行数据的传入与业务逻辑的处理*/var PREFIX = "\r\n"func SerializeProtocol(data map[string]interface{})[]byte{// 序列化result, err := json.Marshal(data)if err != nil {fmt.Println("error encoding json ", err)return []byte("")}length := len(result)var buffer bytes.Bufferbuffer.Write([]byte(strconv.Itoa(length)+PREFIX))buffer.Write(result)return buffer.Bytes()
}func DeserializeProtocol(val string)(left string, result map[string]interface{}){// 解析  各种异常需要待处理left = valif len(val) == 0{return}pos := strings.Index(val, PREFIX)if pos == -1 {fmt.Println("not found")return}len_str := val[:pos]length, err := strconv.Atoi(len_str)if err != nil {fmt.Println("length con error ", err)return}data := val[(pos + len(PREFIX)):]if data == "" {fmt.Println("string have not enougth length")return}if len(data) >= length {cur_recv := []byte(data[:length])left = data[length:]result = make(map[string]interface{})if ok := json.Valid(cur_recv); ok{err = json.Unmarshal([]byte(data), &result)if err != nil{fmt.Println("json ummarshal error", err)return}}}return
}

协议的设计相对简单,主要的设计格式都是通过头部保存数据长度通过\r\n来做分割符,数据内容就是json的字符串内容,所有保证了通过json可以保证较大的灵活度。当前支持的命令就只有query和modfiy两个处理过程。

客户端代码
package mainimport ("bytes""errors""fmt""log""net""sync""time"
)func init() {log.SetFlags(log.Ltime|log.Lshortfile)
}type LeaseClient struct {conn net.ConnmsgChan chan boolcacheData map[string]interface{}mutex sync.MutexrecvData stringsendData []byte
}func(self *LeaseClient) HandleReadEvent(){var recvByte = make([]byte, 20)for {// 优先发送数据返回n, err := self.conn.Read(recvByte)if err != nil {fmt.Println("client recv error ", err)return}self.recvData += string(recvByte[:n])self.HandleRecv()}
}func(self *LeaseClient) HandleRecv(){// 清空接受缓冲区var result map[string]interface{}self.recvData, result = DeserializeProtocol(self.recvData)// 处理业务逻辑if result == nil {return}self.cacheData = result["data"].(map[string]interface{})self.msgChan <- true
}func(self *LeaseClient) sendDataToServer(sendBuffer []byte){var buffer bytes.Bufferbuffer.Write(self.sendData)buffer.Write(sendBuffer)self.sendData = buffer.Bytes()for len(self.sendData) > 0 {n, err := self.conn.Write(self.sendData)if err != nil {fmt.Println("client write error ", err)return}self.sendData = self.sendData[n:]}
}func(self *LeaseClient) queryData(){var sendData map[string]interface{}sendData = make(map[string]interface{})sendData["method"] = "query"sendByte := SerializeProtocol(sendData)self.sendDataToServer(sendByte)
}func(self *LeaseClient) get(key string)(value interface{}, err error){self.mutex.Lock()defer self.mutex.Unlock()var ok boolif len(self.cacheData) == 0 {// 请求远端服务端数据self.queryData()log.Println("wait from server data")<- self.msgChan// 设置本地缓存过期时间self.addExpireTimer()log.Println("get from server ", self.cacheData)}value, ok = self.cacheData[key]if ok == true {return}err = errors.New("not found")return
}func(self *LeaseClient) set(key string, value string)error{// 先过期本地缓存数据self.expireCacheData()// 想远端发送修改数据var sendData map[string]interface{}sendData = make(map[string]interface{})sendData["method"] = "modfiy"params := make(map[string]string)params[key] = valuesendData["data"] = paramssendByte := SerializeProtocol(sendData)self.sendDataToServer(sendByte)<- self.msgChanreturn nil
}func(self *LeaseClient) expireCacheData(){self.mutex.Lock()defer self.mutex.Unlock()log.Println("expire cache data ", self.cacheData)self.cacheData = make(map[string]interface{})
}func(self *LeaseClient) addExpireTimer(){time.AfterFunc(10*time.Second, self.expireCacheData)
}func GetNewLeaseClient(address string)(leaseClient *LeaseClient,err error){if address == "" || len(address) == 0 {err = errors.New("address not valid")return}leaseClient = &LeaseClient{}conn, err := net.DialTimeout("tcp", address, 2*time.Second)if err != nil {fmt.Println("connect to server error ", err)return}leaseClient.conn = connleaseClient.msgChan = make(chan bool)leaseClient.cacheData = make(map[string]interface{})leaseClient.recvData = ""leaseClient.sendData = []byte("")go leaseClient.HandleReadEvent()return
}

客户端的代码设计,主要是生成一个LeaseClient结构体并连接远端的服务器,然后就一直监听服务端的数据,并更新到保存的cacheData中,从而然调用方能够在其中获取数据,如果该cacheData没有数据则需要向服务端请求数据,等待服务端将数据。如果在get的过程中,检测到本地cacheData为空则阻塞等待从服务端请求完成之后再返回。在获取服务端的数据返回之后,也会设置一个租约时间,如果租约时间到了则清空本地缓存数据重新向服务端查询数据。

查询数据代码示例
package mainimport ("math/rand""time""log"
)func main() {var value interface{}leaseClient, err := GetNewLeaseClient("127.0.0.1:7070")if err != nil{log.Println("new error ", err)return}for {var sleepTime intsleepTime = rand.Intn(10)log.Println("sleep time ", sleepTime)time.Sleep(time.Duration(sleepTime)*time.Second)value, err = leaseClient.get("store2")if err != nil {log.Println("get error ", value, err)}log.Println("final get value  ", value)}
}

主要就是随机的根据休眠一个时间之后,然后再去查询客户端数据。

修改数据代码示例
package mainimport ("math/rand""strconv""time""log"
)func init() {log.SetFlags(log.Ltime|log.Lshortfile)
}func main() {//var value interface{}leaseClient, err := GetNewLeaseClient("127.0.0.1:7070")if err != nil{log.Println("new error ", err)return}for {var sleepTime intsleepTime = rand.Intn(10)log.Println("sleep time ", sleepTime)time.Sleep(time.Duration(sleepTime)*time.Second)value := strconv.Itoa(rand.Int())leaseClient.set("client_set_key", "client_set_value_"+value)log.Println("current  cacheData ", leaseClient.cacheData)}
}

修改也是随机休眠一个时间之后再想服务端提交数据去修改。

代码运行演示

此时为了展示效果,在本地启动一个服务端,两个查询客户端,一个修改数据客户端

go run server.go protocol.go
go run lease_library.go protocol.go get_op.go
go run set_op.go protocol.go lease_library.go

此时查看服务端的日志输入如下;

16:53:14 server.go:42: send chan to block chan modfiyChan
16:53:14 server.go:132: modfiy chan start  
16:53:14 server.go:136: -----------------------
16:53:14 server.go:137: modfiy data  map[data:map[client_set_key:client_set_value_3510942875414458836] method:modfiy]
16:53:14 server.go:145: modify after  map[client_set_key:client_set_value_3510942875414458836 store1:stor1_value store2:stor2_value]
16:53:15 server.go:108: get query wait status  false
16:53:15 server.go:52: current clientNums  1
16:53:15 server.go:108: get query wait status  false
16:53:15 server.go:52: current clientNums  2
16:53:22 server.go:127: wait all leases expire   {{0 0} 0xc00005c0c0 true 2 0xc0000b6060}
16:53:22 server.go:129: wait modfiy chan
16:53:26 server.go:108: get query wait status  true
16:53:26 server.go:112: wait .... and release lock
16:53:26 server.go:108: get query wait status  true
16:53:26 server.go:112: wait .... and release lock
16:53:27 server.go:40: check leases   1
16:53:27 server.go:40: check leases   0
16:53:27 server.go:42: send chan to block chan modfiyChan
16:53:27 server.go:132: modfiy chan start  
16:53:27 server.go:136: -----------------------
16:53:27 server.go:137: modfiy data  map[data:map[client_set_key:client_set_value_4324745483838182873] method:modfiy]
16:53:27 server.go:145: modify after  map[client_set_key:client_set_value_4324745483838182873 store1:stor1_value store2:stor2_value]
16:53:27 server.go:115: notify from and  continue
16:53:27 server.go:52: current clientNums  1
16:53:27 server.go:115: notify from and  continue
16:53:27 server.go:52: current clientNums  2
16:53:28 server.go:127: wait all leases expire   {{0 0} 0xc00005c0c0 true 2 0xc0000b6060}
16:53:28 server.go:129: wait modfiy chan
16:53:39 server.go:40: check leases   1
16:53:39 server.go:40: check leases   0
16:53:39 server.go:42: send chan to block chan modfiyChan
16:53:39 server.go:132: modfiy chan start  
16:53:39 server.go:136: -----------------------
16:53:39 server.go:137: modfiy data  map[data:map[client_set_key:client_set_value_2703387474910584091] method:modfiy]
16:53:39 server.go:145: modify after  map[client_set_key:client_set_value_2703387474910584091 store1:stor1_value store2:stor2_value]
16:53:41 server.go:108: get query wait status  false
16:53:41 server.go:52: current clientNums  1
16:53:41 server.go:108: get query wait status  false
16:53:41 server.go:52: current clientNums  2
16:53:46 server.go:127: wait all leases expire   {{0 0} 0xc00005c0c0 true 2 0xc0000b6060}
16:53:46 server.go:129: wait modfiy chan
16:53:53 server.go:40: check leases   1
16:53:53 server.go:40: check leases   0
16:53:53 server.go:42: send chan to block chan modfiyChan

从日志中可以看出,在客户端需要修改数据的时候wait all leases expire等待所有的租约到期,接着就是等到所有日期到期之后,上次查询的客户端都notify from and continue继续执行读操作,从而保证了修改的数据是全局一致的。

在客户端查询的日志输出如下;

16:54:06 lease_library.go:165: expire cache data  map[client_set_key:client_set_value_2015796113853353331 store1:stor1_value store2:stor2_value]
16:54:10 lease_library.go:126: wait from server data
16:54:10 lease_library.go:130: get from server  map[client_set_key:client_set_value_3328451335138149956 store1:stor1_value store2:stor2_value]
16:54:10 get_op.go:27: final get value   stor2_value
16:54:10 get_op.go:20: sleep time  8
16:54:18 get_op.go:27: final get value   stor2_value
16:54:18 get_op.go:20: sleep time  0
16:54:18 get_op.go:27: final get value   stor2_value
16:54:18 get_op.go:20: sleep time  5

从日志中可知,获取的本地数据会过期,过期之后会从server端获取数据,获取完成之后,如果在有效的租约内,再次查询数据的时候,就是直接从本地的缓存中获取并没有访问远端服务器。

修改数据的日志输出如下;

16:51:49 set_op.go:24: sleep time  1
16:51:50 lease_library.go:165: expire cache data  map[]
16:51:59 set_op.go:30: current  cacheData  map[client_set_key:client_set_value_8674665223082153551 store1:stor1_value store2:stor2_value]
16:51:59 set_op.go:24: sleep time  7
16:52:06 lease_library.go:165: expire cache data  map[client_set_key:client_set_value_8674665223082153551 store1:stor1_value store2:stor2_value]
16:52:13 set_op.go:30: current  cacheData  map[client_set_key:client_set_value_4037200794235010051 store1:stor1_value store2:stor2_value]
16:52:13 set_op.go:24: sleep time  1
16:52:14 lease_library.go:165: expire cache data  map[client_set_key:client_set_value_4037200794235010051 store1:stor1_value store2:stor2_value]
16:52:25 set_op.go:30: current  cacheData  map[client_set_key:client_set_value_6334824724549167320 store1:stor1_value store2:stor2_value]
16:52:25 set_op.go:24: sleep time  5

修改数据的客户端日志输出主要就是休眠不定时的时间,然后再去将要修改的数据发送到服务端,通过响应的时间可以看到一个修改到响应的总共的耗时每次都不一样,有的是7秒,有的是11秒,这其中主要的原因就是在服务端有发放的租约还未到期等待服务端所有的租约到期然后再讲数据返回到本地。

总结

本文主要就是简单的实践了租约的基本流程,主要就是根据租约的原理来代码实践一下,在本文的示例代码中有很多不完善的地方,仅仅是为了演示租约的基本原理,并基本熟悉一下go语言的编写过程,后续会继续学习了解。由于本人才疏学浅,如有错误请批评指正。


http://lihuaxi.xjx100.cn/news/257856.html

相关文章

每天学一点flash(15) xml的一些常见写法

今天下了大雨来了&#xff0c;什么地方去不了&#xff0c;只好将想写的东西都记载下来。 一些常见的一些xml写法&#xff0c;收集目的就是为了代码调试方便&#xff1a; 一&#xff0e;简单数组单值形 <?xml version"1.0" encoding"UTF-8"?> <i…

Yann LeCun开怼谷歌研究:目标传播早就有了,你们创新在哪里?

视学算法报道机器之心编辑部在昨日的学术圈&#xff0c;图灵奖得主Yann LeCun对谷歌的一项研究发起了质疑。前段时间&#xff0c;谷歌 AI在其新研究《LocoProp: Enhancing BackProp via Local Loss Optimization》中提出了一种用于多层神经网络的通用层级损失构造框架LocoProp&…

什么是SESSION?(三)

本微信图文通过一个利用数据库的方式存储Session的例子&#xff0c;深入介绍了对Session机制的理解。本微信图文由钟锦提供。

C#趣味程序---个位数为6,且能被3整出的五位数

using System;namespace ConsoleApplication1 {class Program{static void Main(string[] args){int count 0;int k;for (int i 1000; i < 9999; i){k i * 10 6;if (k % 3 0){Console.WriteLine(k);count;}}Console.WriteLine(count); }} }

详解Linux内核IO技术栈

欢迎关注方志朋的博客&#xff0c;回复”666“获面试宝典在开始正式的讨论前&#xff0c;我先抛出几个问题&#xff1a;谈到磁盘时&#xff0c;常说的HDD磁盘和SSD磁盘最大的区别是什么&#xff1f;这些差异会影响我们的系统设计吗&#xff1f;单线程写文件有点慢&#xff0c;那…

App调用safar

/调用safar打开网页 [[UIApplication sharedApplication] openURL:[NSURL URLWithString:"http://www.cnblogs.com/foxmin"]]; 调用app store (省略号后面加的是产品的id等一些参数) // [[UIApplication sharedApplication] openURL:[NSURL URLWithString:"i…

南大最新综述论文:基于模型的强化学习

点击上方“视学算法”&#xff0c;选择加"星标"或“置顶”重磅干货&#xff0c;第一时间送达来源&#xff1a;专知强化学习(RL)通过与环境交互的试错过程来解决顺序决策问题。虽然RL在允许大量试错的复杂电子游戏中取得了杰出的成功&#xff0c;但在现实世界中犯错总…

【HDOJ】3275 Light

这就是个简单线段树延迟标记。因为对bool使用了~而不是&#xff01;&#xff0c;wa了一下午找不到原因。 1 /* 3275 */2 #include <iostream>3 #include <sstream>4 #include <string>5 #include <map>6 #include <queue>7 #include <set>…