前言
线上情况:
- 线上redis集群,多个twemproxy代理(nutcracker),lvs dr路由均衡调度
- 客户端使用jedis操作redis集群,一个程序进程实例使用原先1024个工作线程处理请求,若干个进程实例
- 一天超过22亿次请求,网络一般情况下,一天超过上万个连接失败异常
- 运维同学告知,lvs压力较大
改进工作:
- 工作线程由原先1024改用16个
- 每个线程每次最多操作1000个redis命令批量提交
实际效果:
- 一天不到一亿次的请求量
- lvs压力大减
- cpu压力降低到原先1/3以下
- 单个请求抽样调研平均减少1-90毫秒时间(尤其是跨机房处理)
redis支持批量提交
原生支持批量操作方式
一般命令前缀若添加上m字符串,表示支持多个、批量命令提交了。
显式的...
mset key value [key value ...]
msetnx key value [key value ...]
hmget key field [field ...]
hmset key field value [field value ...]
一般方式的...
hdel key field [field ...]
srem key member [member ...]
rpush key value [value ...]
......
更多,请参考:
pipeline管道方式
官方文档:
- redis client把所有命令一起打包发送到redis server,然后阻塞等待处理结果
- redis server必须在处理完所有命令前先缓存起所有命令的处理结果
- 打包的命令越多,缓存消耗内存也越多
- 不是打包的命令越多越好
- 实际环境需要根据命令执行时间等各种因素选择合并命令的个数,以及测试效果等
java队列支持
一般业务、接入前端请求量过大,生产者速度过快,这时候使用队列暂时缓存会比较好一些,消费者直接直接从队列获取任务,通过队列让生产者和消费者进行分离这也是业界普通采用的方式。
监控队列
有的时候,若可以监控一下队列消费情况,可以监控一下,就很直观。同事为队列添加了一个监控线程,清晰明了了解队列消费情况。
示范
示范使用了redis pipeline,线程池,准备数据,生产者-消费者队列,队列监控等,消费完毕,程序关闭。
/**
* 以下测试在jedis 2.6下测试通过
*
* @author nieyong
*
*/
public class testjedispipeline {
private static final int num = 512;
private static final int max = 1000000; // 100w
private static jedispool redispool;
private static final executorservice pool = executors.newcachedthreadpool();
protected static final blockingqueue queue = new arrayblockingqueue(
max); // 100w
private static boolean finished = false;
static {
jedispoolconfig config = new jedispoolconfig();
config.setmaxactive(64);
config.setmaxidle(64);
try {
redispool = new jedispool(config, "192.168.192.8", 6379, 10000,
null, 0);
} catch (exception e) {
system.err.println("init msg redis factory error! " e.tostring());
}
}
public static void main(string[] args) throws interruptedexception {
system.out.println("prepare test data 100w");
preparetestdata();
system.out.println("prepare test data done!");
// 生产者,模拟请求100w次
pool.execute(new runnable() {
@override
public void run() {
for (int i = 0; i < max; i ) {
if (i % 3 == 0) {
queue.offer("del_key key_" i);
} else {
queue.offer("get_key key_" i);
}
}
}
});
// cpu核数*2 个工作者线程
int threadnum = 2 * runtime.getruntime().availableprocessors();
for (int i = 0; i < threadnum; i )
pool.execute(new consumertask());
pool.execute(new monitortask());
thread.sleep(10 * 1000);// 10sec
system.out.println("going to shutdown server ...");
setfinished(true);
pool.shutdown();
pool.awaittermination(1, timeunit.milliseconds);
system.out.println("colse!");
}
private static void preparetestdata() {
jedis redis = redispool.getresource();
pipeline pipeline = redis.pipelined();
for (int i = 0; i < max; i ) {
pipeline.set("key_" i, (i * 2 1) "");
if (i % (num * 2) == 0) {
pipeline.sync();
}
}
pipeline.sync();
redispool.returnresource(redis);
}
// queue monitor,生产者-消费队列监控
private static class monitortask implements runnable {
@override
public void run() {
while (!thread.interrupted() && !isfinished()) {
system.out.println("queue.size = " queue.size());
try {
thread.sleep(500); // 0.5 second
} catch (interruptedexception e) {
break;
}
}
}
}
// consumer,消费者
private static class consumertask implements runnable {
@override
public void run() {
while (!thread.interrupted() && !isfinished()) {
if (queue.isempty()) {
try {
thread.sleep(100);
} catch (interruptedexception e) {
}
continue;
}
list tasks = new arraylist(num);
queue.drainto(tasks, num);
if (tasks.isempty()) {
continue;
}
jedis jedis = redispool.getresource();
pipeline pipeline = jedis.pipelined();
try {
list> resultlist = new arraylist>(
tasks.size());
list waitdeletelist = new arraylist(
tasks.size());
for (string task : tasks) {
string key = task.split(" ")[1];
if (task.startswith("get_key")) {
resultlist.add(pipeline.get(key));
waitdeletelist.add(key);
} else if (task.startswith("del_key")) {
pipeline.del(key);
}
}
pipeline.sync();
// 处理返回列表
for (int i = 0; i < resultlist.size(); i ) {
resultlist.get(i).get();
// handle value here ...
// system.out.println("get value " value);
}
// 读取完毕,直接删除之
for (string key : waitdeletelist) {
pipeline.del(key);
}
pipeline.sync();
} catch (exception e) {
redispool.returnbrokenresource(jedis);
} finally {
redispool.returnresource(jedis);
}
}
}
}
private static boolean isfinished(){
return finished;
}
private static void setfinished(boolean bool){
finished = bool;
}
}
代码作为示范。若线上则需要处理一些异常等。
小结
若能够批量请求进行合并操作,自然可以节省很多的网络带宽、cpu等资源。有类似问题的同学,不妨考虑一下。