我们在开发中经常遇到对方的接口请求有频率限制,比如当前接口每秒的请求不能大于100,不能大于1000,对于这样的,我们作为请求方,如何做限定?
using StackExchange.Redis;
using Volo.Abp.DependencyInjection;
namespace Test.Handler
{
/// <summary>
/// 12306售票系统思路Demo 直接使用单例模式,便于测试
/// </summary>
public class HaveSeaHandler : ISingletonDependency
{
private readonly HaveSeatRedisHanlder _appCache;
/// <summary>
/// 班次 添加了日期的,精确命中 251011表示日期 K12表示列车号 02表示这一列车今天的班次
/// </summary>
private string globalCarNo { get; set; } = "251011:K12:02";
/// <summary>
/// 座位数
/// </summary>
private int globalSeatCount { get; set; } = 100;
/// <summary>
/// 站点数
/// </summary>
private int globalStationCount { get; set; } = 10;
/// <summary>
///
/// </summary>
/// <param name="appCache"></param>
public HaveSeaHandler(HaveSeatRedisHanlder appCache)
{
_appCache = appCache;
}
/// <summary>
/// 初始化
/// </summary>
/// <param name="seatCount">座位总数</param>
/// <param name="stationCount">站点总数</param>
public async Task StartAsync(string carNo, int seatCount = 100, int stationCount = 10)
{
globalCarNo = carNo;
globalSeatCount = seatCount;
globalStationCount = stationCount;
var sites = new List<SortedSetEntry>();// [] { };
for (var k = 0; k < globalSeatCount; k++)
{
sites.Add(new SortedSetEntry(k, 0));
}
//从哪个站点开始
await _appCache.SortedSetAddAsync($"{globalCarNo}:sorted:{globalStationCount}", sites.ToArray());
//记录全站座位拥有数
await _appCache.HashSetAsync($"{globalCarNo}:count", stationCount.ToString(), seatCount);
}
/// <summary>
/// 添加节点
/// </summary>
/// <param name="seatNo">座位</param>
/// <param name="startStation">开始站点</param>
/// <param name="stationTotal">站点总数,比如从A到B</param>
/// <returns></returns>
private async Task AddSection(int seatNo, int startStation, int stationTotal)
{
//后续可以考虑合并提交,原子操作
await SectionHashIncrementAsync(stationTotal);
//记录 X个站点的 哪个座位 从哪个站点开始
await _appCache.SortedSetAddAsync($"{globalCarNo}:sorted:{stationTotal}", seatNo, startStation);
}
/// <summary>
/// 退票 回撤
/// </summary>
/// <param name="seatNo"></param>
/// <param name="startStation"></param>
/// <param name="stationTotal"></param>
/// <returns></returns>
public async Task RefundSeat(int seatNo, int startStation, int stationTotal)
{
await AddSection(seatNo, startStation, stationTotal);
}
/// <summary>
/// 记录 从哪个站点开始 到多少站的座位列表 可以计算剩余座位
/// </summary>
/// <param name="seatNo"></param>
/// <param name="startStation"></param>
/// <param name="stationTotal"></param>
/// <returns></returns>
private async Task AddSeatAsync(int seatNo, int startStation, int stationTotal)
{
//记录 X个站点的 哪个座位 从哪个站点开始
await _appCache.SortedSetAddAsync($"{globalCarNo}:station:{startStation}", seatNo, stationTotal);
}
/// <summary>
/// 移除位置信息
/// </summary>
/// <param name="seatNo"></param>
/// <param name="startStation"></param>
/// <param name="stationTotal"></param>
/// <returns></returns>
private async Task RemoveSeatAsync(int seatNo, int startStation, int stationTotal)
{
//TODO:感觉有问题 比如这个座位可以有多个score!
//站点5 座位11 3个站点
//站点5 座位11 4个站点? 只会计算最近一个!
//那么查询站点A到站点B的座位数 就是 总的+ 这个获取的量 会小于实际的座位数!
//记录 X个站点的 哪个座位 从哪个站点开始
await _appCache.SortedSetRemoveAsync($"{globalCarNo}:station:{startStation}", seatNo.ToString());
//未能处理包含问题!
}
/// <summary>
/// 读取站点剩余票数 比如站点5开始到站点8的剩余票数(total=8-5=3)
/// </summary>
/// <param name="stationStart"></param>
/// <param name="stationTotal"></param>
/// <returns></returns>
public async Task<int> ReadLessAsync(int stationStart, int stationTotal)
{
var stationMin = ( stationTotal);//最小要多少个站
//全站剩余数量
var fullCount = await _appCache.SortedSetLengthAsync($"{globalCarNo}:sorted:{globalStationCount}");
//非全站剩余数量 基于score查询 就是查询某一个站点开始,多少个站以上的数量 那肯定包含你需要的!
var lessCount = await _appCache.SortedSetLengthAsync($"{globalCarNo}:station:{stationStart}", stationMin, globalStationCount);
return (int)(fullCount + lessCount);
}
/// <summary>
/// 标记多少站点,有多少座位空余
/// </summary>
/// <param name="stationTotal"></param>
/// <returns></returns>
private async Task SectionHashIncrementAsync(int stationTotal)
{
var key = $"{globalCarNo}:count";
//记录 段数有的个数 比如3站长度的 有多少座位 5站长度的有多少座位 如果没有,则从更长的地方截断
await _appCache.HashIncrementAsync(key, stationTotal);
}
/// <summary>
/// 标记多少站点,有多少座位空余
/// </summary>
/// <param name="stationTotal"></param>
/// <returns></returns>
private async Task SectionHashDecrementAsync(int stationTotal)
{
var key = $"{globalCarNo}:count";
//记录 段数有的个数 比如3站长度的 有多少座位 5站长度的有多少座位 如果没有,则从更长的地方截断
await _appCache.HashDecrementAsync(key, stationTotal);
}
/// <summary>
/// 获取一个座位
/// </summary>
/// <param name="startStation">从哪个站点开始</param>
/// <param name="stationTotal">要做几个站</param>
/// <returns></returns>
public async Task<int> GetSeatAsync(int startStation, int stationTotal)
{
var key1 = $"{globalCarNo}:sorted:{stationTotal}";
var seatNo = await ReadSeatCurrent(stationTotal, startStation, stationTotal);
if (seatNo > 0)
{
return seatNo;
}
//第一个没有命中,就要找下一个的了,站点大于当前的
if (stationTotal < globalStationCount)
{
//站点合计
var stas = new List<int>();
//HashEntry[]
var fields = await _appCache.HashReadAllAsync($"{globalCarNo}:count");
if (fields != null)
{
if (fields?.Any() == true)
{
foreach (var item in fields)
{
if (int.TryParse(item.Value, out var num))
{
if (num > 0)
{
//item.Name;
if (int.TryParse(item.Name, out var start))
{
if (start > stationTotal)
{
stas.Add(start);
}
}
}
}
}
}
}
if (stas.Any() == true)
{
//遍历获取... .. .
foreach (var item in stas)
{
var seat = await ReadSeatCurrent(item, startStation, stationTotal);
if (seat > 0)
{
return seat;
}
}
}
}
//从全站获取 下面代码 未完待续
seatNo = await ReadSeatCurrent(globalStationCount, startStation, stationTotal);
if (seatNo > 0)
{
//这里有问题,开头,结尾都减去了...
return seatNo;
}
return -1;
}
/// <summary>
/// 从某一个站点段中获取座位
/// 下面代码有问题,比如从8个站点的数据中,我从2站点做到4站点,则要分2段另存
/// </summary>
/// <param name="sectionLong">从几段站点库获取,示例:8,表示里面的都是8个连续站点的数据,从哪个站点开始则要看score,filed是座位号</param>
/// <param name="startStation">从哪个站开始(用户要从哪个站开始乘坐)</param>
/// <param name="stationTotal">要做几个站(用户本次旅行要做的总站数)</param>
/// <returns>返回座位号</returns>
private async Task<int> ReadSeatCurrent(int sectionLong, int startStation, int stationTotal)
{
//8里面都是有8个站以上的座位 4起始站 3要做的站点数
var key1 = $"{globalCarNo}:sorted:{sectionLong}";// 10(数据最大10个站) 3(从第3个站点开始) 5(要做5各站)
// K12:stored:8 表示都是有连续8站的数据 里面的数据是分布的座位
// 重新计算开始站点,比如当前是8站点段位 你需要第13站点-第15站点,则至少是要多少站点开头13-(8-2)的
// 修正计算逻辑:需要找到起始站点 <= startStation 且能够覆盖所需行程的座位段
var currentStart = startStation - (sectionLong - stationTotal);// 4-(8-3) = -1??? 3-(10-5)=-2
if (currentStart < 0) currentStart = 0;//0 0..3
// 从有序排序中,获取站点数大于等于currentStart的座位,比如上面段数是8,这里currentStart为5表示返回第一个,开始站点是5的座位,5+8,表示这个座位空闲的站点是5-13
var read = await _appCache.SortedSetStartPopScoreAsync(key1, currentStart);
if (read != null && read.HasValue)
{
// 00111 11111 11000
// 00001 11111 11110
// 00000 11111 11111
// 座位
int.TryParse(read.Value.Element, out var seatNo);
int.TryParse(read.Value.Score.ToString(), out var readStart);
Console.WriteLine($"从段位:{sectionLong} 获取座位 起始站:{startStation} 站点数:{stationTotal} 获取到开始站:{readStart}");
// 获取到了座位 则这个段少了一个座位,所以减去库存
await SectionHashDecrementAsync(sectionLong);
//var boolHalf = false;//是否拆分了
var readEnd = readStart + sectionLong; // 这一段的末站 10
var userEnd = startStation + stationTotal; // 用户行程的末站 5
//Console.WriteLine($"获取到的起始站:{readStart} 用户起始站:{startStation} ");
// 处理前面的多余段位
if (readStart < startStation)
{
//boolHalf = true;
// 前面有一段需要回收
var frontSegmentLength = startStation - readStart;
Console.WriteLine($"前面剩余 开始:{readStart} 长度:{frontSegmentLength}");
await AddSection(seatNo, readStart, frontSegmentLength);
await AddSeatAsync(seatNo, readStart, frontSegmentLength);
}
// 处理后面的多余段位
if (readEnd > userEnd)
{
//boolHalf = true;
// 尾巴还有一段需要回收
var tailSegmentStart = userEnd;
var tailSegmentLength = readEnd - userEnd;
Console.WriteLine($"后面剩余 开始:{tailSegmentStart} 长度:{tailSegmentLength}");
await AddSection(seatNo, tailSegmentStart, tailSegmentLength);
await AddSeatAsync(seatNo, tailSegmentStart, tailSegmentLength);
}
await RemoveSeatAsync(seatNo, startStation, stationTotal);
//if (seatNo > 0 && !boolHalf)
//{
// //获取的就是完整剩余的,直接减少库存量
// await SectionHashDecrementAsync(stationTotal);
//}
return seatNo;
}
return 0;
}
}
}
对应的Redis
using Microsoft.Extensions.Options;
using Newtonsoft.Json;
using StackExchange.Redis;
using Volo.Abp.DependencyInjection;
namespace Test.Handler
{
/// <summary>
/// 封装 Redis 相关操作的方法类。
/// </summary>
public class HaveSeatRedisHanlder : ISingletonDependency
{
private readonly RedisConfig _redisconfig;
private readonly Lazy<ConnectionMultiplexer>[] _connectionPool;
private int _currentConnectionIndex = 0;
private readonly object _lock = new object();
/// <summary>
/// Key的前缀
/// </summary>
private string PrefixHead = string.Empty;
/// <summary>
///
/// </summary>
/// <param name="options"></param>
public HaveSeatRedisHanlder(IOptions<RedisConfig> options)
{
_redisconfig = options.Value;
PrefixHead = _redisconfig.Prefix;
// 创建4个连接的池
_connectionPool = new Lazy<ConnectionMultiplexer>[2];
for (int i = 0; i < _connectionPool.Length; i++)
{
var config = ConfigurationOptions.Parse(_redisconfig.MainConnection);
config.ClientName = $"{_redisconfig.ClientName}_{i}"; // 为每个连接添加序号
_connectionPool[i] = new Lazy<ConnectionMultiplexer>(() => ConnectionMultiplexer.Connect(config));
}
}
/// <summary>
///
/// </summary>
/// <returns></returns>
private IDatabase GetDatabase()
{
lock (_lock)
{
// 轮询选择连接
var connection = _connectionPool[_currentConnectionIndex];
_currentConnectionIndex = (_currentConnectionIndex + 1) % _connectionPool.Length;
return connection.Value.GetDatabase();
}
}
/// <summary>
///
/// </summary>
private IDatabase _database { get { return GetDatabase(); } }
/// <summary>
///
/// </summary>
/// <param name="key"></param>
/// <returns></returns>
private string setKey(string key)
{
return PrefixHead + key;
}
/// <summary>
///
/// </summary>
/// <param name="key"></param>
/// <returns></returns>
public async Task<List<HashKeyValue>> HashGetAllAsync(string key)
{
var hentry = await _database.HashGetAllAsync(setKey(key));
var result = new List<HashKeyValue>();
foreach (var h in hentry)
{
result.Add(new HashKeyValue()
{
Key = h.Name,
Value = h.Value
});
}
return result;
}
/// <summary>
///
/// </summary>
/// <param name="key"></param>
/// <returns></returns>
public async Task<HashEntry[]> HashReadAllAsync(string key)
{
var hentry = await _database.HashGetAllAsync(setKey(key));
return hentry;
}
/// <summary>
///
/// </summary>
/// <param name="key"></param>
/// <param name="vals"></param>
public async Task HashSetAsync(string key, HashEntry[] vals)
{
await _database.HashSetAsync(setKey(key), vals);
}
/// <summary>
/// 写入单个值
/// </summary>
/// <param name="key"></param>
/// <param name="field"></param>
/// <param name="value"></param>
/// <returns></returns>
public async Task HashSetAsync(string key, string field, RedisValue value)
{
await _database.HashSetAsync(setKey(key), field, value);
}
public string HashGet(string key, string sunkey)
{
return _database.HashGet(setKey(key), sunkey);
}
/// <summary>
///
/// </summary>
/// <param name="key"></param>
/// <param name="sunkey"></param>
/// <returns></returns>
public async Task<string> HashGetAsync(string key, string sunkey)
{
return await _database.HashGetAsync(setKey(key), sunkey);
}
public async Task HashSetAsync(string key, List<HashKeyValue> vals)
{
var list = new List<HashEntry>();
foreach (var item in vals)
{
list.Add(new HashEntry(item.Key, item.Value) { });
}
await _database.HashSetAsync(setKey(key), list.ToArray());
}
/// <summary>
///
/// </summary>
/// <param name="key"></param>
/// <param name="field"></param>
/// <returns></returns>
public async Task HashDeleteAsync(string key, string field)
{
await _database.HashDeleteAsync(setKey(key), field);
}
/// <summary>
/// 叠加
/// </summary>
/// <param name="key"></param>
/// <param name="field"></param>
/// <param name="num"></param>
/// <returns></returns>
public async Task<long> HashIncrementAsync(string key, string field, int num = 1)
{
return await _database.HashIncrementAsync(setKey(key), field, num);
}
/// <summary>
///
/// </summary>
/// <param name="key"></param>
/// <param name="field"></param>
/// <param name="num"></param>
/// <returns></returns>
public async Task HashIncrementAsync(string key, RedisValue field, long num = 1)
{
await _database.HashIncrementAsync(setKey(key), field, num);
}
/// <summary>
///
/// </summary>
/// <param name="key"></param>
/// <param name="field"></param>
/// <param name="num"></param>
/// <returns></returns>
public async Task HashDecrementAsync(string key, RedisValue field, long num = 1)
{
await _database.HashDecrementAsync(setKey(key), field, num);
}
/// <summary>
/// 写入有序排序
/// </summary>
/// <param name="key"></param>
/// <param name="field"></param>
/// <param name="score"></param>
public async Task SortedSetAddAsync(string key, RedisValue field, int score)
{
await _database.SortedSetAddAsync(setKey(key), field, score);
}
/// <summary>
/// 批量写入有序排序
/// </summary>
/// <param name="key"></param>
/// <param name="fields"></param>
public async Task SortedSetAddAsync(string key, SortedSetEntry[] fields)
{
await _database.SortedSetAddAsync(setKey(key), fields);
}
/// <summary>
/// 按照分数升序,获取第一个分数 >= 指定分数的数据并删除
/// </summary>
/// <param name="key"></param>
/// <param name="minScore">最低分数</param>
/// <returns></returns>
public async Task<SortedSetEntry?> SortedSetStartPopScoreAsync(string key, int minScore)
{
var luaScript = @"
local members = redis.call('ZRANGEBYSCORE', KEYS[1], ARGV[1], '+inf', 'WITHSCORES', 'LIMIT', 0, 1)
if #members == 0 then
return nil
end
redis.call('ZREM', KEYS[1], members[1])
return {members[1], members[2]}
";
var result = await _database.ScriptEvaluateAsync(luaScript,
new RedisKey[] { setKey(key) },
new RedisValue[] { minScore });
if (result.IsNull)
{
return null;
}
var resultArray = (RedisResult[])result;
if (resultArray.Length >= 2)
{
string element = (string)resultArray[0];
double redisScore = (double)resultArray[1];
// 将double转换为int(根据您的需求)
int scoreValue = (int)redisScore;
return new SortedSetEntry(element, scoreValue);
}
return null;
}
/// <summary>
/// 递减
/// </summary>
/// <param name="key"></param>
/// <param name="field"></param>
/// <param name="num"></param>
/// <returns></returns>
public async Task<long> HashDecrementAsync(string key, string field, int num = 1)
{
return await _database.HashDecrementAsync(setKey(key), field, num);
}
/// <summary>
/// 设置某个
/// </summary>
/// <param name="key"></param>
/// <param name="field"></param>
/// <param name="time"></param>
/// <returns></returns>
public async Task SortedSetAddAsync(string key, string field, long time)
{
await _database.SortedSetAddAsync(setKey(key), field, time);
}
/// <summary>
/// 读取SortedSet的列表 注意数量
/// </summary>
/// <param name="key"></param>
/// <param name="size"></param>
/// <returns></returns>
public async Task<List<HashEntry>> SortedSetScanAsync(string key, int size = 100)
{
var list = new List<HashEntry>();
var read = _database.SortedSetScanAsync(setKey(key), pageSize: size);
await foreach (var entry in read)
{
list.Add(new HashEntry(entry.Element, entry.Score));
}
return list;
}
/// <summary>
/// 更新某一项
/// </summary>
/// <param name="key"></param>
/// <param name="field"></param>
/// <param name="score"></param>
/// <returns></returns>
public async Task SortedSetUpdateAsync(string key, RedisValue field, double score)
{
await _database.SortedSetUpdateAsync(setKey(key), field, score);
}
/// <summary>
/// 删除某个
/// </summary>
/// <param name="key"></param>
/// <param name="field"></param>
/// <returns></returns>
public async Task SortedSetRemoveAsync(string key, string field)
{
await _database.SortedSetRemoveAsync(setKey(key), field);
}
/// <summary>
/// 获取min-max之间的总个数
/// </summary>
/// <param name="key"></param>
/// <param name="min"></param>
/// <param name="max"></param>
/// <returns></returns>
public async Task<int> SortedSetLengthAsync(string key, int min, int max = 2000)
{
//var count = await _database.SortedSetLengthByValueAsync(setKey(key), min, max, exclude: Exclude.None);
//这个才是按照score获取
var count = await _database.SortedSetLengthAsync(setKey(key), min, max, exclude: Exclude.None);
return (int)count;
}
/// <summary>
///
/// </summary>
/// <param name="key"></param>
/// <returns></returns>
public async Task<long> SortedSetLengthAsync(string key)
{
var count = await _database.SortedSetLengthAsync(setKey(key));
return count;
}
/// <summary>
/// 按照一定顺序读取 这个基于数据
/// </summary>
/// <param name="key"></param>
/// <param name="timestamp"></param>
/// <returns></returns>
public async Task<List<string>> SortedSetRangeByScoreAsync(string key, long timestamp, Order order = Order.Ascending)
{
var strs = new List<string>();
var reads = await _database.SortedSetRangeByScoreAsync(setKey(key), timestamp, 0, order: order, take: 100);
if (reads != null && reads.Any())
{
foreach (var item in reads)
{
strs.Add(item.ToString());
}
}
return strs;
}
/// <summary>
/// 基于排序获取 这个基于排名
/// </summary>
/// <param name="key"></param>
/// <param name="startIndex"></param>
/// <param name="endIndex"></param>
/// <param name="order"></param>
/// <returns></returns>
public async Task<SortedSetEntry[]> SortedSetRangeByRankWithScoresAsync(string key, int startIndex = 0, int endIndex = -1, Order order = Order.Descending)
{
var reads = await _database.SortedSetRangeByRankWithScoresAsync(setKey(key), startIndex, endIndex, order);
return reads;
}
/// <summary>
/// 返回最近的一个!并删除,升序排序,也就是获取最小的一个
/// </summary>
/// <param name="skey"></param>
/// <param name="_time">时间戳</param>
/// <returns></returns>
public async Task<string> SortedSetReadMiniFirstRemoveAsync(string skey, long _time)
{
var reads = await _database.SortedSetRangeByScoreAsync(setKey(skey), 0, _time, take: 1);
if (reads != null && reads.Length > 0)
{
var _field = reads[0].ToString();
await _database.SortedSetRemoveAsync(setKey(skey), _field);
return _field;
}
return String.Empty;
}
#region Set模块部分
/// <summary>
///
/// </summary>
/// <param name="skey"></param>
/// <param name="val"></param>
/// <returns></returns>
public async Task<bool> SetAddAsync(string skey, RedisValue val)
{
return await _database.SetAddAsync(setKey(skey), val);
}
/// <summary>
///
/// </summary>
/// <param name="skey"></param>
/// <param name="filed"></param>
/// <returns></returns>
public async Task<bool> SetContainsAsync(string skey, RedisValue filed)
{
return await _database.SetContainsAsync(setKey(skey), filed);
}
/// <summary>
///
/// </summary>
/// <param name="skey"></param>
/// <param name="filed"></param>
/// <returns></returns>
public async Task<bool> SetRemoveAsync(string skey, RedisValue filed)
{
return await _database.SetRemoveAsync(setKey(skey), filed);
}
/// <summary>
///
/// </summary>
/// <param name="skey"></param>
/// <returns></returns>
public async Task<RedisValue> SetRandomMemberAsync(string skey)
{
return await _database.SetRandomMemberAsync(setKey(skey));
//return val;
}
/// <summary>
///
/// </summary>
/// <param name="skey"></param>
/// <returns></returns>
public async Task<long> SetLengthAsync(string skey)
{
return (await _database.SetLengthAsync(setKey(skey)));
}
#endregion
}
}
比如你查询站点A到站点B的剩余票
其实他包含了,还有全票的
然后是站点A开始到大于等于站点B的所有票的集合!
那么如果站点有一个站点1小于站点A,且终点站大于B,则这个也是可以算在内的
要实现这个,就需要遍历查询了,优点不划算!
是否可以直观的看到剩余票,目前是使用redis记录的,如果遍历查询的话是可以查看到哪些剩余票的!
我们希望尽量减少遍历!
贴代码
贴代码
已经升级了,主要是left join的查询的时候没有过滤,其实这个问题,多租户也是一样的!