最近在工作中遇到一个比较棘手的问题,客户端从服务端同步数据的问题。
思考这个解决的方法还是花费了不少的精力。
前言
最近在工作中遇到一个比较棘手的问题,客户端从服务端同步数据的问题。
背景简介:客户端有N个,客户端上的同步时间,各不相同
。同步的时候,是一次获取10条数据,多批次获取。即分页获取。
在代码中存在两种同步的方式:
- 全量同步。同步过程是从服务端拉取全部的数据;依赖具有
唯一约束
的ID
来实现同步。只适用于数据量小的表,浪费网络流量。
- 增量同步。从服务器拉取
大于
客户端最新时间
的数据;依赖于时间戳
,问题时间戳不唯一
存在相同时间点下面多条数据,会出现数据遗漏,也会重复拉取数据,浪费网络流量。
本文的所使用到的解决办法,就是结合了唯一ID和时间戳,两个入参来做增量同步。本文也只做逻辑层面的说明。
模拟场景
表结构:ID 具有唯一约束, Name 姓名, UpdateTime 更新时间;现在问题的关键是ID为3,4,这两条时间点相同的数据。
假如一次只能同步一条数据,如何同步完ID 2后,再同步 ID 3。
ID |
Name |
UpdateTime |
1 |
张三 |
2018-11-10 |
2 |
李四 |
2018-12-10 |
3 |
王五 |
2018-12-10 |
4 |
赵六 |
2018-11-20 |
5 |
金七 |
2018-11-30 |
解决思路
生成新的唯一标识
通过 UpdateTime 和 ID 这两种数据,通过某种运算,生成新的数。而这个新的数
具备可排序和唯一;同时还要携带有ID
和UpdateTime
的信息。
简单表述就是,具有一个函数f: f(可排序A,可排序唯一B) = 可排序唯一C 。 C 的唯一解是 A和B。RSA加密算法
我想出了一个方法,也是生活中比较常用的方法:
- 先把 UpdateTime 转变成数字。如: 字符串 2018-12-10 -> 数字 20181210;
- 然后 UpdateTime 乘以权重,这个
权重
必须大于ID
的可能最大值。如: 20181210 * 100 = 2018121000,Max(ID)<999
- 然后再把第二部的结果,加上唯一键
ID
。如: 2018121000 + 3 = 2018121003。
这个时候,2018121003 这个数,既包含了UpdateTime
和ID
的信息,又具有可排序和唯一性。用它作为增量更新的判断点,是再好不过的了。
但是它具有很大的缺点:数字太大了,时间转化成数字,目前还是用的是天级别,如果换成毫秒级别呢。还有ID可能的最大值也够大了,如果是int64那就更没得搞了。
这个方法理论上可行,实际中不可用基本不可行,除非找到一种非常好的函数f;
PS: 我的直觉告诉我: 极可能存在这种函数,既满足我的需要,又可以克服数字很大这个问题。只是我目前不知道。
数据库表修改(不推荐)
修改数据内容
修改数据内容,使 UpdateTime
数据值唯一。缺点也比较明显:
- 脚本操作数据的情况下,或者直接sql更新。可能会,造成时间不唯一;
- 只是适用在数据量小,系统操作频率小的情况下。因为毫秒级别的时间,在绝大多数软件系统中,可以认为是唯一;
- 尤其是老旧项目,历史遗留数据如何处理。
增加字段
还有一种办法,就是在数据库中,增加一个新的字段,专门用来同步数据的时候使用。
比方说,增加字段 SyncData
int 类型。如果 UpdateTime 发生了改变,就把它更新为 SyncData = Max(SyncData) + 1
;
也就是说, SyncData
这个字段的最大值一定是最新的数据,SyncData
的降序就是 更新时间的降序。SyncData
是更新时间顺序
的充分不必要条件。
总的来说,这种办法是比较好的,但缺点也比较明显:
- 需要修改表结构,并且额外维护这个字段;
- 新增或者更新的时候,会先锁表,找出这个表的最大值,再更新,资源浪费明显。
- 如果表的数据量比较大,或者更新比较频繁时候。时间消耗较大。
我的解决方法
分页提取数据的可能情况
首先,先来分析一下,一次提取10条数据,提取的数据,存在的可能情况。再次说明前提,先时间倒序,再ID倒序。Order By UpdateTime DESC, ID DESC
可能情况如下图,可以简化为三种:
- 情景1。当前获取的数据中包含了,所有相同时间点的数据;图1,图5
- 情景2。当前获取的数据中包含了,部分相同时间点的数据;图2,图3,图4,图6,图7
- 情景3。当前获取的数据中包含了,没有相同时间点的数据;图···
其中情景1
和情景3
,可以把查询条件变为:WHERE UpdateTime > sync_time LIMIT 10
但是情景2
的情况不能使用大于>
这个条件。假如使用了大于>
这个条件,情景2
就会变成情景1
或情景3
或图3
这种情况。不是包含部分了,需要额外特别处理。
注:图3的结束点 ]
不重要,下面情景5有解释。
情景2部分情况,提取的起始点
提取的起始点:也就是说图中[
左中括号的位置,需要准确定位这个位置。
至于结束点:图中]
右中括号的位置是在哪里。这个就不重要了,因为下一次的分页提取的起始点
,就是上一次的结束点。只需要关注起始点就足够了。
而根据起始点,又可以把情景2
,再做一次简化:
- 情景4。起始点在相同时间点集合内的;图2,图4,图6,图7
- 情景5。起始点不在相同时间点集合内的;图3,
针对情景4
。这个时候,时间戳sync_time
一个入参就不够了,还额外需要唯一键ID来准确定位。可以把查询写作:WHERE UpdateTime = sync_time AND ID > sync_id LIMIT 10
。
如果查询的行数 等于 10,则是图4;小于 10,则是图2,图6,图7的情况。
针对情景5
。依旧可以使用:WHERE UpdateTime > sync_time LIMIT 10
完整的分页过程
完整的分页过程的步骤:
一、先用起始点来过滤:WHERE UpdateTime = sync_time AND ID > sync_id LIMIT 10
,查询结果行数N。如果 N=10
是图4的情况,则结束,并且直接返回结果。如果 0<= N <10
,则进行第二步,其中N=0
是图1,图3,图5,**图···**的情况;
二、再用时间戳查询:WHERE UpdateTime > sync_time LIMIT 10-N
,查询结果行数 M ,0<= M <=10-N
;这个阶段,是否同一个时间点都不重要了。只需要按着顺序
取已排序的数据就可以了;
三、把一和二的结果集合并,一并返回。
四、重复步骤一二三,直到,分页获取的最后一条数据的ID
,是服务端数据库中最新的ID;(防止存在,恰好这十条是所需要获取的最后十条)。
服务端中最新ID获取:Select Id From myTable Order by UpdateTime desc,ID desc Limit 1
;
代码实例
以下是已经使用了半年的代码,做了简化,换行的处理,方便阅读。本文重点描述在服务端,可以只看服务端的代码。
以下代码使用 C#
,Linq
来编写,如果没有学过这类语言,通过方法的英文名称也能大概猜出意思,另外,注释写的比较详细,不再额外解释。
服务端的代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
|
public long GetDeviceLatestID()
{
long latestID = 0;
using (var context = new Entities())
{
// 倒序取 最新的ID
var _tmp = context.modules.OrderByDescending(m => m.update_date).
ThenByDescending(n => n.id).FirstOrDefault();
if (_tmp != null)
{
latestID =_tmp.id;
}
}
return latestID;
}
/// <summary>
/// 分批同步 获取Device 数据
/// </summary>
/// <param name="startDataVersion">开始的时间版本</param>
/// <param name="startID">开始的唯一标识ID</param>
/// <param name="dataCount">本次同步的数量</param>
public List<DeviceModel> GetDevices(DateTime startDataVersion, long startID, int dataCount)
{
List<DeviceModel> resultList = null;
<span class="k">using</span> <span class="p">(</span><span class="kt">var</span> <span class="n">context</span> <span class="p">=</span> <span class="k">new</span> <span class="n">Entities</span><span class="p">())</span>
<span class="p">{</span>
<span class="c1">// 关键:先确定时间点,然后再顺序取 dataCount 个。
// 先顺序排序,需要保留 deleted 信息,同步时候移除表关系
var _tmpQry = context.modules.OrderBy(m => m.update_date).ThenBy(m => m.id);
// 先取 同一个时间点的 数据
var _tmpList = _tmpQry.Where(m => m.update_date == startDataVersion && m.id > startID).
Take(dataCount).ToList();
// 如果不足 dataCount 条数据,再顺序取够 dataCount 条;
if (_tmpList.Count < dataCount)
{
var _tmpList2 = _tmpQry.Where(m => m.update_date > startDataVersion).
Take(dataCount - _tmpList.Count).ToList();
_tmpList.AddRange(_tmpList2);
}
<span class="n">resultList</span> <span class="p">=</span> <span class="n">_tmpList</span><span class="p">.</span><span class="n">Select</span><span class="p">(</span><span class="n">m</span> <span class="p">=></span> <span class="k">new</span> <span class="n">DeviceModel</span><span class="p">()</span>
<span class="p">{</span>
<span class="n">DeviceID</span> <span class="p">=</span> <span class="n">m</span><span class="p">.</span><span class="n">id</span><span class="p">,</span>
<span class="n">CreateDateTime</span> <span class="p">=</span> <span class="n">m</span><span class="p">.</span><span class="n">create_date</span><span class="p">,</span>
<span class="n">ModifyDateTime</span> <span class="p">=</span> <span class="n">m</span><span class="p">.</span><span class="n">update_date</span><span class="p">,</span>
<span class="n">Name</span> <span class="p">=</span> <span class="n">m</span><span class="p">.</span><span class="n">name</span>
<span class="p">}).</span><span class="n">ToList</span><span class="p">();</span>
<span class="p">}</span>
<span class="k">return</span> <span class="n">resultList</span><span class="p">;</span>
}
|
客户端的代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
|
public bool SyncDevices()
{
DateTime newDataVersion = DateTime.Now;
// 从本地库,读取 最近修改时间 ,最大的 ID,
var _syncControlData = m_Storage.GetDevicesNewestLastID();
ServerModel.DeviceModel[] serverDevices = null;
<span class="k">while</span> <span class="p">(</span><span class="k">true</span><span class="p">)</span>
<span class="p">{</span>
<span class="c1">// 通过 WCF 接口获取:服务端最新的ID 和 分批获取增量数据
using (var client = new ServerModel.ResourceClient())
{
_syncControlData.FinishLatestID = client.GetDeviceLatestID(0);
serverDevices = client.GetDevices(0, _syncControlData.CurrentHandlingTime,
_syncControlData.CurrentHandlingID, 50);
}
if (serverDevices == null)
{
return false;
}
if (serverDevices.Length == 0)
{
return true; // 本次为获取到最新数据,无需同步操作。
}
<span class="c1">// 在本地库中查找,从服务器拉取的新设备。
var serverDeviceIDList = serverDevices.Select(m => m.RecordID).
Distinct().ToList();
var localDevices = m_Storage.GetDevices(serverDeviceIDList);
if (localDevices == null)
{
continue;
}
<span class="c1">// 如果本地库存在则更新,不存在则新增
// 预存同步参数,以便失败后,回滚操作
_syncControlData.SavePrevBatchIDForRollBack();
foreach (var _single in serverDevices)
{
_syncControlData.SetCurrentHandlingData(_single.ModifyDateTime,
_single.RecordID);
var model = new DeviceModel()
{
DataVersion = newDataVersion, // 同步的时间
ID = _single.RecordID, // 系统id,无意义
DeviceID = _single.DeviceID,
Name = _single.Name,
CreateDateTime = _single.CreateDateTime,
ModifyDateTime = _single.ModifyDateTime
};
<span class="k">if</span> <span class="p">(</span><span class="n">localDevices</span><span class="p">.</span><span class="n">ContainsKey</span><span class="p">(</span><span class="n">_single</span><span class="p">.</span><span class="n">RecordID</span><span class="p">))</span>
<span class="p">{</span>
<span class="c1">// 修改时先移除,然后统一 add
localDevices.Remove(_single.RecordID);
model.DataModitifyStatus = _single.Deleted ?
DataModitifyStatus.Delete : DataModitifyStatus.Modityfy;
}
else
{
if (_single.Deleted)
{
// 本地数据库不存在该数据,并且是服务端已删除的数据。则不需要执行数据库操作
continue;
}
model.DataModitifyStatus = DataModitifyStatus.Add;
}
localDevices.Add(model.ID, model);
}
<span class="kt">var</span> <span class="n">addOrUpdateExecuteResult</span> <span class="p">=</span> <span class="n">m_Storage</span><span class="p">.</span><span class="n">AddOrUpdateDevices</span><span class="p">(</span><span class="n">localDevices</span><span class="p">);</span>
<span class="k">if</span> <span class="p">(</span><span class="n">addOrUpdateExecuteResult</span> <span class="p">!=</span> <span class="n">ExecuteResult</span><span class="p">.</span><span class="n">Success</span><span class="p">)</span>
<span class="p">{</span>
<span class="n">_syncControlData</span><span class="p">.</span><span class="n">RollBackPrevBatchID</span><span class="p">();</span> <span class="c1">// 插入数据库失败,回滚
continue;
}
if (_syncControlData.IsFinished)
{
return true; // 本次同步操作完成后,恰好同步到最新数据,则结束
}
}
}
public SyncControlModel GetDevicesNewestLastID()
{
using (var context = new Entities())
{
var _newestData = context.devices.OrderByDescending(m => m.server_modify_datetime).
ThenByDescending(n => n.id).FirstOrDefault();
var_ret = new SyncControlModel();
if (_newestData != null)
{
_ret.NewestID = _newestData.id;
_ret.NewestDateTime = _newestData.server_modify_datetime;
}
return_ret;
}
}
|
客户端的代码(同步数据的控制类)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
|
/// <summary>
/// 本地库中最新的标记 特征,差异化更新时候使用
/// </summary>
public class SyncControlModel
{
/// <summary>
/// 同个时间点下,最大的ID
/// </summary>
public long NewestID
{
set { CurrentHandlingID = value; }
}
<span class="c1">/// <summary>
/// 最新时间
/// </summary>
public DateTime NewestDateTime
{
set { CurrentHandlingTime = value; }
}
<span class="c1">/// <summary>
/// 上一批次,处理的 最后一个ID。上一批次最后处理的ID,用来判断是否更新完成
/// </summary>
public long PrevBatchLastHandlingID { get; private set; }
<span class="c1">/// <summary>
/// 同步结束ID,即终止ID
/// </summary>
public long FinishLatestID { private get; set; }
<span class="c1">/// <summary>
/// 当前处理的 ID 唯一标识
/// </summary>
public long CurrentHandlingID { get; private set; }
<span class="c1">/// <summary>
/// 当前处理的 时间戳
/// </summary>
public DateTime CurrentHandlingTime { get; private set; }
<span class="c1">/// <summary>
/// 结束标志
/// </summary>
public bool IsFinished
{
get { return FinishLatestID == CurrentHandlingID; }
}
<span class="c1">/// <summary>
/// 保存上一批次的值,用于回滚
/// </summary>
public void SavePrevBatchIDForRollBack()
{
PrevBatchLastHandlingID = CurrentHandlingID;
}
<span class="c1">/// <summary>
/// 批次号回滚
/// </summary>
public void RollBackPrevBatchID()
{
CurrentHandlingID = PrevBatchLastHandlingID;
}
<span class="c1">/// <summary>
/// 设置当前的 处理数据,时间戳 和 ID
/// </summary>
/// <param name="_currentId"></param>
public void SetCurrentHandlingData(DateTime _currentDateTime, long _currentId)
{
CurrentHandlingTime = _currentDateTime;
CurrentHandlingID = _currentId;
}
}
|
经验总结
寻找关键信息,以及具有指标意义的数据,或者数据的组合。
- 最开始,我只执着于 UpdateTime 这个数据,甚至提出去数据库中,修改历史数据,再把 UpdateTime 加上唯一约束(以前也没有听说过在 UpdateTime 这个字段上面加唯一约束)。并且这种办法,局限性有很强,不可以通用。
- 主键ID唯一,但是它不具有时间属性。只适用于全部更新。
- 把他们两个结合起来,才算是打开了新的思路。
拆分问题,简化问题
- 把 UpdateTime 和 ID 组合使用时。妄图在一个sql里面来实现。发现无论怎么改,都会存在逻辑上面的问题;
- 没有拆分化简的时候,如果用存储过程来写的话,会非常非常复杂;
- 直到,我在脑袋里面,模拟出来可能的情况后。也就是上面的图片
同步数据的可能性
,慢慢归类,简化后;才发现。问题没有那么难,仅仅是起始点这一个小小的问题。
使用逻辑分析和哲学归纳
- 在分析数据的意义和性质的时候,偶然间使用到了归纳的方法;也就是
唯一
和可排序
;跳出了具体字段,使用场景的框架束缚,而去考虑这两种性质怎么结合的问题;
- 在逻辑分析的时候,先用排列组合,算出多少种可能性;在脑中勾画出图形,把性质相同的可能性合并化简;
- 在化简的过程中,不要仅仅着眼于查询的对象,也要去化简
查询的方法
;有点绕,打个比方,既要优化最终产品,也要去优化制作工艺;
最后,我认为我最近的逻辑分析能力,好像有比较大的提升。
- 直接得益于,常见的24种逻辑谬误的了解,【转】逻辑谬误列表(序言),在平常的生活中,说话做事,也就有了逻辑方面的意识;
- 间接可能得益于台大哲学系苑举正,苑老师讲话的视频。其实我很早以前,高中时候就喜欢哲学,《哲学的基本原理》这么枯燥的书,我居然认认真真仔仔细细的边读边想的看了三四遍。只是那时好多完全不懂,好多似懂非懂。十多年后虽然什么都不记得了,但是好像又懂了。。。感觉太玄了。。。
文章作者
Long
上次更新
2018-12-15 15:20
许可协议
原创文章,如需转载请注明文章作者和出处。
署名 4.0 国际 (CC BY 4.0)