为什么批量请求要尽可能的合并操作 -凯发k8网页登录

记录工作/学习的点点滴滴。

为什么批量请求要尽可能的合并操作

前言

线上情况:

  1. 线上redis集群,多个twemproxy代理(nutcracker),lvs dr路由均衡调度
  2. 客户端使用jedis操作redis集群,一个程序进程实例使用原先1024个工作线程处理请求,若干个进程实例
  3. 一天超过22亿次请求,网络一般情况下,一天超过上万个连接失败异常
  4. 运维同学告知,lvs压力较大

改进工作:

  1. 工作线程由原先1024改用16个
  2. 每个线程每次最多操作1000个redis命令批量提交

实际效果:

  1. 一天不到一亿次的请求量
  2. lvs压力大减
  3. cpu压力降低到原先1/3以下
  4. 单个请求抽样调研平均减少1-90毫秒时间(尤其是跨机房处理)

redis支持批量提交

原生支持批量操作方式

一般命令前缀若添加上m字符串,表示支持多个、批量命令提交了。

显式的...

mset key value [key value ...]
msetnx key value [key value ...]
hmget key field [field ...]
hmset key field value [field value ...]

一般方式的...

hdel key field [field ...]
srem key member [member ...]
rpush key value [value ...]
......

更多,请参考:

pipeline管道方式

官方文档:

  1. redis client把所有命令一起打包发送到redis server,然后阻塞等待处理结果
  2. redis server必须在处理完所有命令前先缓存起所有命令的处理结果
  3. 打包的命令越多,缓存消耗内存也越多
  4. 不是打包的命令越多越好
  5. 实际环境需要根据命令执行时间等各种因素选择合并命令的个数,以及测试效果等

java队列支持

一般业务、接入前端请求量过大,生产者速度过快,这时候使用队列暂时缓存会比较好一些,消费者直接直接从队列获取任务,通过队列让生产者和消费者进行分离这也是业界普通采用的方式。

监控队列

有的时候,若可以监控一下队列消费情况,可以监控一下,就很直观。同事为队列添加了一个监控线程,清晰明了了解队列消费情况。

示范

示范使用了redis pipeline,线程池,准备数据,生产者-消费者队列,队列监控等,消费完毕,程序关闭。

/**
 * 以下测试在jedis 2.6下测试通过
 * 
 * @author nieyong
 * 
 */
public class testjedispipeline {
    private static final int num = 512;
    private static final int max = 1000000; // 100w
    private static jedispool redispool;
    private static final executorservice pool = executors.newcachedthreadpool();
    protected static final blockingqueue queue = new arrayblockingqueue(
            max); // 100w
    private static boolean finished = false;
    static {
        jedispoolconfig config = new jedispoolconfig();
        config.setmaxactive(64);
        config.setmaxidle(64);
        try {
            redispool = new jedispool(config, "192.168.192.8", 6379, 10000,
                    null, 0);
        } catch (exception e) {
            system.err.println("init msg redis factory error! "   e.tostring());
        }
    }
    public static void main(string[] args) throws interruptedexception {
        system.out.println("prepare test data 100w");
        preparetestdata();
        system.out.println("prepare test data done!");
        // 生产者,模拟请求100w次
        pool.execute(new runnable() {
            @override
            public void run() {
                for (int i = 0; i < max; i  ) {
                    if (i % 3 == 0) {
                        queue.offer("del_key key_"   i);
                    } else {
                        queue.offer("get_key key_"   i);
                    }
                }
            }
        });
        // cpu核数*2 个工作者线程
        int threadnum = 2 * runtime.getruntime().availableprocessors();
        for (int i = 0; i < threadnum; i  )
            pool.execute(new consumertask());
        pool.execute(new monitortask());
        thread.sleep(10 * 1000);// 10sec
        system.out.println("going to shutdown server ...");
        setfinished(true);
        pool.shutdown();
        pool.awaittermination(1, timeunit.milliseconds);
        system.out.println("colse!");
    }
    private static void preparetestdata() {
        jedis redis = redispool.getresource();
        pipeline pipeline = redis.pipelined();
        for (int i = 0; i < max; i  ) {
            pipeline.set("key_"   i, (i * 2   1)   "");
            if (i % (num * 2) == 0) {
                pipeline.sync();
            }
        }
        pipeline.sync();
        redispool.returnresource(redis);
    }
    // queue monitor,生产者-消费队列监控
    private static class monitortask implements runnable {
        @override
        public void run() {
            while (!thread.interrupted() && !isfinished()) {
                system.out.println("queue.size = "   queue.size());
                try {
                    thread.sleep(500); // 0.5 second
                } catch (interruptedexception e) {
                    break;
                }
            }
        }
    }
    // consumer,消费者
    private static class consumertask implements runnable {
        @override
        public void run() {
            while (!thread.interrupted() && !isfinished()) {
                if (queue.isempty()) {
                    try {
                        thread.sleep(100);
                    } catch (interruptedexception e) {
                    }
                    continue;
                }
                list tasks = new arraylist(num);
                queue.drainto(tasks, num);
                if (tasks.isempty()) {
                    continue;
                }
                jedis jedis = redispool.getresource();
                pipeline pipeline = jedis.pipelined();
                try {
                    list> resultlist = new arraylist>(
                            tasks.size());
                    list waitdeletelist = new arraylist(
                            tasks.size());
                    for (string task : tasks) {
                        string key = task.split(" ")[1];
                        if (task.startswith("get_key")) {
                            resultlist.add(pipeline.get(key));
                            waitdeletelist.add(key);
                        } else if (task.startswith("del_key")) {
                            pipeline.del(key);
                        }
                    }
                    pipeline.sync();
                    // 处理返回列表
                    for (int i = 0; i < resultlist.size(); i  ) {
                        resultlist.get(i).get();
                        // handle value here ...
                        // system.out.println("get value "   value);
                    }
                    // 读取完毕,直接删除之
                    for (string key : waitdeletelist) {
                        pipeline.del(key);
                    }
                    pipeline.sync();
                } catch (exception e) {
                    redispool.returnbrokenresource(jedis);
                } finally {
                    redispool.returnresource(jedis);
                }
            }
        }
    }
    private static boolean isfinished(){
        return finished;
    }
    private static void setfinished(boolean bool){
        finished = bool;
    }
}

代码作为示范。若线上则需要处理一些异常等。

小结

若能够批量请求进行合并操作,自然可以节省很多的网络带宽、cpu等资源。有类似问题的同学,不妨考虑一下。

posted on 2014-11-09 22:08 nieyong 阅读(16060) 评论(17)     所属分类: socket

# re: 为什么批量请求要尽可能的合并操作 2014-11-10 09:34

经过楼主的讲解,我现在才明白为什么批量请求要尽可能的合并操作  回复     

# re: 为什么批量请求要尽可能的合并操作[未登录] 2014-11-11 13:36

好专业啊...每个月有一谝  回复     

# re: 为什么批量请求要尽可能的合并操作 2014-11-13 14:37

线程数要根据cpu的情况而决定的,一台4核的机器开40个线程就是蛋疼。同步、context switch的开销已经超过了线程带来的优势。如果不合并,仅仅减少线程数,性能也会有所优化。  回复     

# re: 为什么批量请求要尽可能的合并操作 2014-11-16 09:12

支持博主分享,欢迎到我的小店、、、、  回复     

# gank开黑吧 2014-11-16 22:13

gank开黑吧 赞一下博主  回复     

# re: 为什么批量请求要尽可能的合并操作 2014-11-16 23:37

看了楼主的讲解,我现在才大致明白  回复     

# re: 为什么批量请求要尽可能的合并操作 2014-11-17 14:32

不错的文章,学习了  回复     

# 武冈seo 2014-11-25 22:57

文章很实用,学习了,到时实践下  回复     

# re: 为什么批量请求要尽可能的合并操作 2015-01-24 15:09

请教一下,我们使用pipeline的方式后,出现了在一些闲时,内存暴涨。然后kill掉twemproxy之后就降下来了。然后查了相关的资料,把pipeline的数量降到500,甚至20了,仍然出现。而且11台机器中,有一些机器经常出现,但是最近经常出现的不出现,从没出现的又出现这情况了。不知道您是否有遇到过,如果解决。  回复     

# re: 为什么批量请求要尽可能的合并操作 2015-01-26 09:39 nieyong

@黎洪鑫
没有遇见过类似问题,爱莫能助。
因为pipeline是一个阻塞请求-响应过程,这一点很重要;另外网络机房拥塞会导致非常大的延迟,具体情况就是请求发出去,等待很长时间响应。若是机房网络延迟问题,可以考虑把pipeline异步提交,不要阻塞当前线程。
以上都是建议,仅供参考!  回复     

# re: 为什么批量请求要尽可能的合并操作 2015-01-26 10:46

多谢了,我先做一下升级看看情况会不会改善。@nieyong
  回复     

# re: 为什么批量请求要尽可能的合并操作 2015-05-28 10:05

@nieyong
可以考虑把pipeline异步提交,不要阻塞当前线程 ;

这个异步是指?不是很明白  回复     

# re: 为什么批量请求要尽可能的合并操作 2015-05-28 17:04 nieyong

@tinsang
把较为耗时任务委派到其它线程处理,当前业务线程继续忙别的。  回复     

# re: 为什么批量请求要尽可能的合并操作 2015-05-28 19:01

@nieyong
那我明白你的意思了  回复     

# re: 为什么批量请求要尽可能的合并操作 2015-05-28 19:02

@nieyong
pipeline阻塞了,那其他请求redis不是一样被阻塞了?  回复     

# re: 为什么批量请求要尽可能的合并操作 2015-05-29 09:52 nieyong

@tinsang
针对单台redis而言,单线程模型。一旦pipeline阻塞了,其它请求会被阻塞住。可考虑单线程操作管道,一个一个批处理。  回复     

# re: 为什么批量请求要尽可能的合并操作 2016-05-16 17:45

private static boolean finished = false;
finished变量应该为volatile。
楼主这么牛逼的人不应该犯这种小错误 ^_^
好文章,学习了~  回复     


只有注册用户后才能发表评论。


网站导航:
              
相关文章:
 

公告

所有文章皆为原创,若转载请标明出处,谢谢~

新浪微博,欢迎关注:

导航

2014年11月
2627282930311
2345678
101112131415
161718192122
23242526272829
30123456

统计

常用链接

留言簿(58)

随笔分类(129)

随笔档案(149)

个人收藏

最新随笔

搜索

最新评论

阅读排行榜

评论排行榜

网站地图