最近在工作中遇到一个比较棘手的问题,客户端从服务端同步数据的问题。
思考这个解决的方法还是花费了不少的精力。

前言

最近在工作中遇到一个比较棘手的问题,客户端从服务端同步数据的问题。
背景简介:客户端有N个,客户端上的同步时间,各不相同。同步的时候,是一次获取10条数据,多批次获取。即分页获取。
在代码中存在两种同步的方式:

  1. 全量同步。同步过程是从服务端拉取全部的数据;依赖具有唯一约束ID来实现同步。只适用于数据量小的表,浪费网络流量。
  2. 增量同步。从服务器拉取大于客户端最新时间的数据;依赖于时间戳,问题时间戳不唯一存在相同时间点下面多条数据,会出现数据遗漏,也会重复拉取数据,浪费网络流量。

本文的所使用到的解决办法,就是结合了唯一ID时间戳,两个入参来做增量同步。本文也只做逻辑层面的说明。

模拟场景

表结构:ID 具有唯一约束, Name 姓名, UpdateTime 更新时间;现在问题的关键是ID为3,4,这两条时间点相同的数据。
假如一次只能同步一条数据,如何同步完ID 2后,再同步 ID 3。

ID Name UpdateTime
1 张三 2018-11-10
2 李四 2018-12-10
3 王五 2018-12-10
4 赵六 2018-11-20
5 金七 2018-11-30

解决思路

生成新的唯一标识

通过 UpdateTime 和 ID 这两种数据,通过某种运算,生成新的数。而这个新的数具备可排序唯一;同时还要携带有IDUpdateTime的信息。
简单表述就是,具有一个函数f: f(可排序A,可排序唯一B) = 可排序唯一C 。 C 的唯一解是 A和B。RSA加密算法

我想出了一个方法,也是生活中比较常用的方法:

  1. 先把 UpdateTime 转变成数字。如: 字符串 2018-12-10 -> 数字 20181210;
  2. 然后 UpdateTime 乘以权重,这个权重必须大于ID的可能最大值。如: 20181210 * 100 = 2018121000,Max(ID)<999
  3. 然后再把第二部的结果,加上唯一键ID。如: 2018121000 + 3 = 2018121003。

这个时候,2018121003 这个数,既包含了UpdateTimeID的信息,又具有可排序唯一性。用它作为增量更新的判断点,是再好不过的了。
但是它具有很大的缺点:数字太大了,时间转化成数字,目前还是用的是级别,如果换成毫秒级别呢。还有ID可能的最大值也够大了,如果是int64那就更没得搞了。

这个方法理论上可行,实际中不可用基本不可行,除非找到一种非常好的函数f;
PS: 我的直觉告诉我: 极可能存在这种函数,既满足我的需要,又可以克服数字很大这个问题。只是我目前不知道。

数据库表修改(不推荐)

修改数据内容

修改数据内容,使 UpdateTime 数据值唯一。缺点也比较明显:

  1. 脚本操作数据的情况下,或者直接sql更新。可能会,造成时间不唯一;
  2. 只是适用在数据量小,系统操作频率小的情况下。因为毫秒级别的时间,在绝大多数软件系统中,可以认为是唯一;
  3. 尤其是老旧项目,历史遗留数据如何处理。

增加字段

还有一种办法,就是在数据库中,增加一个新的字段,专门用来同步数据的时候使用。
比方说,增加字段 SyncData int 类型。如果 UpdateTime 发生了改变,就把它更新为 SyncData = Max(SyncData) + 1;
也就是说, SyncData 这个字段的最大值一定是最新的数据,SyncData的降序就是 更新时间的降序。SyncData更新时间顺序充分不必要条件

总的来说,这种办法是比较好的,但缺点也比较明显:

  1. 需要修改表结构,并且额外维护这个字段;
  2. 新增或者更新的时候,会先锁表,找出这个表的最大值,再更新,资源浪费明显。
  3. 如果表的数据量比较大,或者更新比较频繁时候。时间消耗较大。

我的解决方法

分页提取数据的可能情况

首先,先来分析一下,一次提取10条数据,提取的数据,存在的可能情况。再次说明前提,先时间倒序,再ID倒序。Order By UpdateTime DESC, ID DESC
可能情况如下图,可以简化为三种:

  1. 情景1。当前获取的数据中包含了,所有相同时间点的数据;图1,图5
  2. 情景2。当前获取的数据中包含了,部分相同时间点的数据;图2,图3,图4,图6,图7
  3. 情景3。当前获取的数据中包含了,没有相同时间点的数据;图···

其中情景1情景3,可以把查询条件变为:WHERE UpdateTime > sync_time LIMIT 10
但是情景2的情况不能使用大于>这个条件。假如使用了大于>这个条件,情景2就会变成情景1情景3图3这种情况。不是包含部分了,需要额外特别处理。
注:图3的结束点 ]不重要,下面情景5有解释。
同步数据的可能性

情景2部分情况,提取的起始点

提取的起始点:也就是说图中[左中括号的位置,需要准确定位这个位置。
至于结束点:图中]右中括号的位置是在哪里。这个就不重要了,因为下一次的分页提取的起始点,就是上一次的结束点。只需要关注起始点就足够了。

而根据起始点,又可以把情景2,再做一次简化:

  1. 情景4。起始点相同时间点集合内的;图2,图4,图6,图7
  2. 情景5。起始点不在相同时间点集合内的;图3,

针对情景4。这个时候,时间戳sync_time一个入参就不够了,还额外需要唯一键ID来准确定位。可以把查询写作:WHERE UpdateTime = sync_time AND ID > sync_id LIMIT 10
如果查询的行数 等于 10,则是图4;小于 10,则是图2,图6,图7的情况。

针对情景5。依旧可以使用:WHERE UpdateTime > sync_time LIMIT 10

完整的分页过程

完整的分页过程的步骤:
一、先用起始点来过滤:WHERE UpdateTime = sync_time AND ID > sync_id LIMIT 10,查询结果行数N。如果 N=10图4的情况,则结束,并且直接返回结果。如果 0<= N <10 ,则进行第二步,其中N=0图1图3图5,**图···**的情况;

二、再用时间戳查询:WHERE UpdateTime > sync_time LIMIT 10-N,查询结果行数 M ,0<= M <=10-N;这个阶段,是否同一个时间点都不重要了。只需要按着顺序已排序的数据就可以了;

三、把一和二的结果集合并,一并返回。

四、重复步骤一二三,直到,分页获取的最后一条数据的ID,是服务端数据库中最新的ID;(防止存在,恰好这十条是所需要获取的最后十条)。

服务端中最新ID获取:Select Id From myTable Order by UpdateTime desc,ID desc Limit 1;

代码实例

以下是已经使用了半年的代码,做了简化,换行的处理,方便阅读。本文重点描述在服务端,可以只看服务端的代码
以下代码使用 C#Linq来编写,如果没有学过这类语言,通过方法的英文名称也能大概猜出意思,另外,注释写的比较详细,不再额外解释。

服务端的代码
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public long GetDeviceLatestID()
{
    long latestID = 0;
    using (var context = new Entities())
    {
        // 倒序取 最新的ID
        var _tmp = context.modules.OrderByDescending(m => m.update_date).
                ThenByDescending(n => n.id).FirstOrDefault();
        if (_tmp != null)
        {
latestID =_tmp.id;
        }
    }
    return latestID;
}

/// <summary> /// 分批同步 获取Device 数据 /// </summary> /// <param name="startDataVersion">开始的时间版本</param> /// <param name="startID">开始的唯一标识ID</param> /// <param name="dataCount">本次同步的数量</param> public List<DeviceModel> GetDevices(DateTime startDataVersion, long startID, int dataCount) { List<DeviceModel> resultList = null;

<span class="k">using</span> <span class="p">(</span><span class="kt">var</span> <span class="n">context</span> <span class="p">=</span> <span class="k">new</span> <span class="n">Entities</span><span class="p">())</span>
<span class="p">{</span>
    <span class="c1">// 关键:先确定时间点,然后再顺序取 dataCount 个。

// 先顺序排序,需要保留 deleted 信息,同步时候移除表关系 var _tmpQry = context.modules.OrderBy(m => m.update_date).ThenBy(m => m.id); // 先取 同一个时间点的 数据 var _tmpList = _tmpQry.Where(m => m.update_date == startDataVersion && m.id > startID). Take(dataCount).ToList(); // 如果不足 dataCount 条数据,再顺序取够 dataCount 条; if (_tmpList.Count < dataCount) { var _tmpList2 = _tmpQry.Where(m => m.update_date > startDataVersion). Take(dataCount - _tmpList.Count).ToList(); _tmpList.AddRange(_tmpList2); }

    <span class="n">resultList</span> <span class="p">=</span> <span class="n">_tmpList</span><span class="p">.</span><span class="n">Select</span><span class="p">(</span><span class="n">m</span> <span class="p">=&gt;</span> <span class="k">new</span> <span class="n">DeviceModel</span><span class="p">()</span>
    <span class="p">{</span>
        <span class="n">DeviceID</span> <span class="p">=</span> <span class="n">m</span><span class="p">.</span><span class="n">id</span><span class="p">,</span>
        <span class="n">CreateDateTime</span> <span class="p">=</span> <span class="n">m</span><span class="p">.</span><span class="n">create_date</span><span class="p">,</span>
        <span class="n">ModifyDateTime</span> <span class="p">=</span> <span class="n">m</span><span class="p">.</span><span class="n">update_date</span><span class="p">,</span>
        <span class="n">Name</span> <span class="p">=</span> <span class="n">m</span><span class="p">.</span><span class="n">name</span>
    <span class="p">}).</span><span class="n">ToList</span><span class="p">();</span>
<span class="p">}</span>
<span class="k">return</span> <span class="n">resultList</span><span class="p">;</span>

}


客户端的代码
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
public bool SyncDevices()
{
    DateTime newDataVersion = DateTime.Now;
    // 从本地库,读取 最近修改时间 ,最大的 ID,
    var _syncControlData = m_Storage.GetDevicesNewestLastID();
    ServerModel.DeviceModel[] serverDevices = null;
<span class="k">while</span> <span class="p">(</span><span class="k">true</span><span class="p">)</span>
<span class="p">{</span>
    <span class="c1">// 通过 WCF 接口获取:服务端最新的ID 和 分批获取增量数据

using (var client = new ServerModel.ResourceClient()) { _syncControlData.FinishLatestID = client.GetDeviceLatestID(0); serverDevices = client.GetDevices(0, _syncControlData.CurrentHandlingTime, _syncControlData.CurrentHandlingID, 50); } if (serverDevices == null) { return false; } if (serverDevices.Length == 0) { return true; // 本次为获取到最新数据,无需同步操作。 }

    <span class="c1">// 在本地库中查找,从服务器拉取的新设备。

var serverDeviceIDList = serverDevices.Select(m => m.RecordID). Distinct().ToList(); var localDevices = m_Storage.GetDevices(serverDeviceIDList); if (localDevices == null) { continue; }

    <span class="c1">// 如果本地库存在则更新,不存在则新增

// 预存同步参数,以便失败后,回滚操作 _syncControlData.SavePrevBatchIDForRollBack(); foreach (var _single in serverDevices) { _syncControlData.SetCurrentHandlingData(_single.ModifyDateTime, _single.RecordID); var model = new DeviceModel() { DataVersion = newDataVersion, // 同步的时间 ID = _single.RecordID, // 系统id,无意义 DeviceID = _single.DeviceID, Name = _single.Name, CreateDateTime = _single.CreateDateTime, ModifyDateTime = _single.ModifyDateTime };

        <span class="k">if</span> <span class="p">(</span><span class="n">localDevices</span><span class="p">.</span><span class="n">ContainsKey</span><span class="p">(</span><span class="n">_single</span><span class="p">.</span><span class="n">RecordID</span><span class="p">))</span>
        <span class="p">{</span>
            <span class="c1">// 修改时先移除,然后统一 add

localDevices.Remove(_single.RecordID); model.DataModitifyStatus = _single.Deleted ? DataModitifyStatus.Delete : DataModitifyStatus.Modityfy; } else { if (_single.Deleted) { // 本地数据库不存在该数据,并且是服务端已删除的数据。则不需要执行数据库操作 continue; } model.DataModitifyStatus = DataModitifyStatus.Add; } localDevices.Add(model.ID, model); }

    <span class="kt">var</span> <span class="n">addOrUpdateExecuteResult</span> <span class="p">=</span> <span class="n">m_Storage</span><span class="p">.</span><span class="n">AddOrUpdateDevices</span><span class="p">(</span><span class="n">localDevices</span><span class="p">);</span>
    <span class="k">if</span> <span class="p">(</span><span class="n">addOrUpdateExecuteResult</span> <span class="p">!=</span> <span class="n">ExecuteResult</span><span class="p">.</span><span class="n">Success</span><span class="p">)</span>
    <span class="p">{</span>
        <span class="n">_syncControlData</span><span class="p">.</span><span class="n">RollBackPrevBatchID</span><span class="p">();</span>  <span class="c1">// 插入数据库失败,回滚

continue; } if (_syncControlData.IsFinished) { return true; // 本次同步操作完成后,恰好同步到最新数据,则结束 } } }

public SyncControlModel GetDevicesNewestLastID() { using (var context = new Entities()) { var _newestData = context.devices.OrderByDescending(m => m.server_modify_datetime). ThenByDescending(n => n.id).FirstOrDefault(); var_ret = new SyncControlModel(); if (_newestData != null) { _ret.NewestID = _newestData.id; _ret.NewestDateTime = _newestData.server_modify_datetime; } return_ret; } }


客户端的代码(同步数据的控制类)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
/// <summary>
/// 本地库中最新的标记 特征,差异化更新时候使用
/// </summary>
public class SyncControlModel
{
    /// <summary>
    /// 同个时间点下,最大的ID
    /// </summary>
    public long NewestID
    {
        set { CurrentHandlingID = value; }
    }
<span class="c1">/// &lt;summary&gt;

/// 最新时间 /// </summary> public DateTime NewestDateTime { set { CurrentHandlingTime = value; } }

<span class="c1">/// &lt;summary&gt;

/// 上一批次,处理的 最后一个ID。上一批次最后处理的ID,用来判断是否更新完成 /// </summary> public long PrevBatchLastHandlingID { get; private set; }

<span class="c1">/// &lt;summary&gt;

/// 同步结束ID,即终止ID /// </summary> public long FinishLatestID { private get; set; }

<span class="c1">/// &lt;summary&gt;

/// 当前处理的 ID 唯一标识 /// </summary> public long CurrentHandlingID { get; private set; }

<span class="c1">/// &lt;summary&gt;

/// 当前处理的 时间戳 /// </summary> public DateTime CurrentHandlingTime { get; private set; }

<span class="c1">/// &lt;summary&gt;

/// 结束标志 /// </summary> public bool IsFinished { get { return FinishLatestID == CurrentHandlingID; } }

<span class="c1">/// &lt;summary&gt;

/// 保存上一批次的值,用于回滚 /// </summary> public void SavePrevBatchIDForRollBack() { PrevBatchLastHandlingID = CurrentHandlingID; }

<span class="c1">/// &lt;summary&gt;

/// 批次号回滚 /// </summary> public void RollBackPrevBatchID() { CurrentHandlingID = PrevBatchLastHandlingID; }

<span class="c1">/// &lt;summary&gt;

/// 设置当前的 处理数据,时间戳 和 ID /// </summary> /// <param name="_currentId"></param> public void SetCurrentHandlingData(DateTime _currentDateTime, long _currentId) { CurrentHandlingTime = _currentDateTime; CurrentHandlingID = _currentId; } }

经验总结

寻找关键信息,以及具有指标意义的数据,或者数据的组合

  • 最开始,我只执着于 UpdateTime 这个数据,甚至提出去数据库中,修改历史数据,再把 UpdateTime 加上唯一约束(以前也没有听说过在 UpdateTime 这个字段上面加唯一约束)。并且这种办法,局限性有很强,不可以通用。
  • 主键ID唯一,但是它不具有时间属性。只适用于全部更新。
  • 把他们两个结合起来,才算是打开了新的思路。

拆分问题,简化问题

  • 把 UpdateTime 和 ID 组合使用时。妄图在一个sql里面来实现。发现无论怎么改,都会存在逻辑上面的问题;
  • 没有拆分化简的时候,如果用存储过程来写的话,会非常非常复杂;
  • 直到,我在脑袋里面,模拟出来可能的情况后。也就是上面的图片同步数据的可能性,慢慢归类,简化后;才发现。问题没有那么难,仅仅是起始点这一个小小的问题。

使用逻辑分析哲学归纳

  • 在分析数据的意义和性质的时候,偶然间使用到了归纳的方法;也就是唯一可排序;跳出了具体字段,使用场景的框架束缚,而去考虑这两种性质怎么结合的问题;
  • 在逻辑分析的时候,先用排列组合,算出多少种可能性;在脑中勾画出图形,把性质相同的可能性合并化简;
  • 在化简的过程中,不要仅仅着眼于查询的对象,也要去化简查询的方法;有点绕,打个比方,既要优化最终产品,也要去优化制作工艺;

最后,我认为我最近的逻辑分析能力,好像有比较大的提升。

  • 直接得益于,常见的24种逻辑谬误的了解,【转】逻辑谬误列表(序言),在平常的生活中,说话做事,也就有了逻辑方面的意识;
  • 间接可能得益于台大哲学系苑举正,苑老师讲话的视频。其实我很早以前,高中时候就喜欢哲学,《哲学的基本原理》这么枯燥的书,我居然认认真真仔仔细细的边读边想的看了三四遍。只是那时好多完全不懂,好多似懂非懂。十多年后虽然什么都不记得了,但是好像又懂了。。。感觉太玄了。。。