blogjava-凯发k8网页登录http://www.blogjava.net/wangxinsh55/zh-cnsat, 08 apr 2023 20:35:43 gmtsat, 08 apr 2023 20:35:43 gmt60nodejs将对象转换成字符串代码,动态执行字符串代码,requirejs使用r.js打包时动态生成配置文件http://www.blogjava.net/wangxinsh55/archive/2016/11/01/431944.htmlsimonesimonetue, 01 nov 2016 08:24:00 gmthttp://www.blogjava.net/wangxinsh55/archive/2016/11/01/431944.htmlhttp://www.blogjava.net/wangxinsh55/comments/431944.htmlhttp://www.blogjava.net/wangxinsh55/archive/2016/11/01/431944.html#feedback0http://www.blogjava.net/wangxinsh55/comments/commentrss/431944.htmlhttp://www.blogjava.net/wangxinsh55/services/trackbacks/431944.htmlvar path = require('path');
var fs = require('fs');
var vm = require('vm');
var os = require('os');

/**
 * 格式化缩进的个数
 
*/
function toindent(indent) {
    
var s = [];
    
for (var i = 0; i < indent; i) {
        s.push('\t');
    }
    
return s.join('');
}

/**
 * 将数组对象转换成原始字符串
 
*/
function array2string(arr, indent) {
    
var s = ['[', os.eol], hasprop = false;
    
for (var i = 0; i < arr.length; i) {
        
if (!hasprop) {
            hasprop 
= true;
        }

        s.push(toindent(indent 
 1));

        
var item = arr[i];
        
var itemtp = typeof(item);
        
if (itemtp === 'object') {
            
if (item instanceof array) {
                s.push(array2string(item, indent 
 1));
            } 
else {
                s.splice(s.length 
- 22);
                s.push(object2strng(item, indent).trim());
            }
        } 
else {
            s.push(json.stringify(item));
        }
        s.push(',');
        s.push(os.eol);
    }
    
if (hasprop) {
        s.splice(s.length 
- 21);
    }
    s.push(toindent(indent));
    s.push(']');
    
return s.join('');
}

/**
 * 将对象转换成原始字符串
 
*/
function object2strng(obj, indent) {
    
var s = ['{', os.eol], hasprop = false;

    
for (var o in obj) {
        
if (!hasprop) {
            hasprop 
= true;
        }
        s.push(toindent(indent 
 1));
        s.push(json.stringify(o));
        s.push(':');

        
var tp = typeof(obj[o]);
        
if (tp === 'object') {
            
if (obj[o] instanceof array) {
                s.push(array2string(obj[o], indent 
 1));
            } 
else {
                s.push(object2strng(obj[o], indent 
 1));
            }
        } 
else if (tp === 'function') {
            s.push(obj[o].tostring());
        } 
else {
            s.push(json.stringify(obj[o]));
        }
        s.push(',');
        s.push(os.eol);
    }
    
if (hasprop) {
        s.splice(s.length 
- 21);
    }
    s.push(toindent(indent));
    s.push('}');
    
return s.join('');
}

//提取正式代码里的requirejs的配置字符串,并动态执行转换成json对象; 修改相关的值信息为下边的打包操作做准备; 并将配置信息再转成字符串形式写到临时文件下
var mainpath = path.resolve(process.cwd(), '../js/main.js');
var maincontent = fs.readfilesync(mainpath, 'utf-8').replace(/(requirejs\.config\()?([^)]]*)(\);)?/gm, '$2');
vm.runinthiscontext('
var maincfg= '  maincontent);//将提取的字符串转成maincfg对象
maincfg.baseurl = '/static/js/dist/lib';
var nmaincfgstr = 'requirejs.config('  object2strng(maincfg, 0 ');';//重新生成main.js配置文件,为下边的打包做准备
var buildpath = path.resolve(process.cwd(), './main.js');
fs.writefilesync(buildpath, nmaincfgstr);
console.log('write temp file main.js fininshed');

//打包的配置信息
var buildjson = {
    appdir: '..
/js',
    baseurl: 'lib',
    mainconfigfile: '.
/main.js',
    dir: '..
/js/dist',
    modules: [{
        'name': '..
/main',
        include: []
    }]
};
for (var p in maincfg.paths) {//这里提取所有的依赖模块,打包时放到main.js文件下
    buildjson.modules[0].include.push(p);
}

var buildpath = path.resolve(process.cwd(), './build_main.json');
fs.writefilesync(buildpath, object2strng(buildjson, 
0));//生成打包配置文件
console.log('wirte temp file build_main.json fininshed');



写一批处理文件build.bat
@echo off
node build.js
node r.js -o build_main.json
@pause
执行就可以了

simone 2016-11-01 16:24 发表评论
]]>
java keytool证书工具使用小结http://www.blogjava.net/wangxinsh55/archive/2016/10/20/431905.htmlsimonesimonethu, 20 oct 2016 03:20:00 gmthttp://www.blogjava.net/wangxinsh55/archive/2016/10/20/431905.htmlhttp://www.blogjava.net/wangxinsh55/comments/431905.htmlhttp://www.blogjava.net/wangxinsh55/archive/2016/10/20/431905.html#feedback0http://www.blogjava.net/wangxinsh55/comments/commentrss/431905.htmlhttp://www.blogjava.net/wangxinsh55/services/trackbacks/431905.html
http://www.micmiu.com/lang/java/keytool-start-guide/

java keytool证书工具使用小结


keytool 是一个java数据证书的管理工具 ,keytool将密钥(key)和证书(certificates)存在一个称为keystore的文件中在keystore里,包含两种数据:密钥实体(key entity)-密钥(secret key)或者是私钥和配对公钥(采用非对称加密)可信任的证书实体(trusted certificate entries)-只包含公钥.
jdk中keytool常用参数说明(不同版本有差异,详细可参见【附录】中的官方文档链接):

  • -genkey 在用户主目录中创建一个默认文件”.keystore”,还会产生一个mykey的别名,mykey中包含用户的公钥、私钥和证书(在没有指定生成位置的情况下,keystore会存在用户系统默认目录)
  • -alias 产生别名 每个keystore都关联这一个独一无二的alias,这个alias通常不区分大小写
  • -keystore 指定密钥库的名称(产生的各类信息将不在.keystore文件中)
  • -keyalg 指定密钥的算法 (如 rsa dsa,默认值为:dsa)
  • -validity 指定创建的证书有效期多少天(默认 90)
  • -keysize 指定密钥长度 (默认 1024)
  • -storepass 指定密钥库的密码(获取keystore信息所需的密码)
  • -keypass 指定别名条目的密码(私钥的密码)
  • -dname 指定证书发行者信息 其中: “cn=名字与姓氏,ou=组织单位名称,o=组织名称,l=城市或区域名 称,st=州或省份名称,c=单位的两字母国家代码”
  • -list 显示密钥库中的证书信息 keytool -list -v -keystore 指定keystore -storepass 密码
  • -v 显示密钥库中的证书详细信息
  • -export 将别名指定的证书导出到文件 keytool -export -alias 需要导出的别名 -keystore 指定keystore -file 指定导出的证书位置及证书名称 -storepass 密码
  • -file 参数指定导出到文件的文件名
  • -delete 删除密钥库中某条目 keytool -delete -alias 指定需删除的别 -keystore 指定keystore – storepass 密码
  • -printcert 查看导出的证书信息 keytool -printcert -file g:\sso\michael.crt
  • -keypasswd 修改密钥库中指定条目口令 keytool -keypasswd -alias 需修改的别名 -keypass 旧密码 -new 新密码 -storepass keystore密码 -keystore sage
  • -storepasswd 修改keystore口令 keytool -storepasswd -keystore g:\sso\michael.keystore(需修改口令的keystore) -storepass pwdold(原始密码) -new pwdnew(新密码)
  • -import 将已签名数字证书导入密钥库 keytool -import -alias 指定导入条目的别名 -keystore 指定keystore -file 需导入的证书
目录说明:
  1. 生成证书
  2. 查看证书
  3. 证书导出
  4. 附录资料
一、生成证书
 按win键 r,弹出运行窗口,输入 cmd 回车,打开命令行窗户,输入如下命令:
1
keytool -genkey -alias michaelkey -keyalg rsa -keysize 1024 -keypass michaelpwd -validity 365 -keystore g:\sso\michael.keystore -storepass michaelpwd2
截图如下:
二、查看证书

缺省情况下,-list 命令打印证书的 md5 指纹。而如果指定了 -v 选项,将以可读格式打印证书,如果指定了 -rfc 选项,将以可打印的编码格式输出证书。

-v 命令如下:
1
keytool -list  -v -keystore g:\sso\michael.keystore -storepass michaelpwd2
回车看到的信息如下:
-rfc 命令如下:
1
keytool -list -rfc -keystore g:\sso\michael.keystore -storepass michaelpwd2
回车看到的信息如下:
三、证书的导出和查看:
导出证书命令
1
keytool -export -alias michaelkey -keystore g:\sso\michael.keystore -file g:\sso\michael.crt -storepass michaelpwd2
回车如下:
查看导出的证书信息
1
keytool -printcert -file g:\sso\michael.crt
回车看到信息如下:
四:附录
官方有关keytool命令的介绍文档:
  • jdk1.4.2 :http://docs.oracle.com/javase/1.4.2/docs/tooldocs/windows/keytool.html
  • jdk1.6    :http://docs.oracle.com/javase/6/docs/technotes/tools/windows/keytool.html
  • jdk1.7    :http://docs.oracle.com/javase/7/docs/technotes/tools/windows/keytool.html


simone 2016-10-20 11:20 发表评论
]]>
移动端web开发调试之chrome远程调试(remote debugging)http://www.blogjava.net/wangxinsh55/archive/2016/10/12/431884.htmlsimonesimonewed, 12 oct 2016 02:29:00 gmthttp://www.blogjava.net/wangxinsh55/archive/2016/10/12/431884.htmlhttp://www.blogjava.net/wangxinsh55/comments/431884.htmlhttp://www.blogjava.net/wangxinsh55/archive/2016/10/12/431884.html#feedback0http://www.blogjava.net/wangxinsh55/comments/commentrss/431884.htmlhttp://www.blogjava.net/wangxinsh55/services/trackbacks/431884.html
http://www.cnblogs.com/terrylin/p/4606277.html


simone 2016-10-12 10:29 发表评论
]]>
genymotion 解决虚拟镜像下载速度特别慢的问题 http://www.blogjava.net/wangxinsh55/archive/2016/10/11/431881.htmlsimonesimonetue, 11 oct 2016 06:44:00 gmthttp://www.blogjava.net/wangxinsh55/archive/2016/10/11/431881.htmlhttp://www.blogjava.net/wangxinsh55/comments/431881.htmlhttp://www.blogjava.net/wangxinsh55/archive/2016/10/11/431881.html#feedback0http://www.blogjava.net/wangxinsh55/comments/commentrss/431881.htmlhttp://www.blogjava.net/wangxinsh55/services/trackbacks/431881.htmlhttp://blog.csdn.net/qing666888/article/details/51622762

genymotion号称模拟器中运行最快的,但是服务器在国外,android镜像下载起来那个速度就不想说了。

add new device后下载速度太慢了,容易失败


解决方法如下:

方法一:

1、设置http代理,在setting->network,自己设置http proxy和port

 

方法二:

1、找到下载链接,直接用迅雷拖下来

     遇到下载失败或者下载太慢,win r打开运行框,输入 %appdata%, 再点击上一步(alt ↑ ),找到local文件夹里的genymobile,打开 查看里面的genymotion.log文件,

找到类似下面的文字

[genymotion] [debug] downloading file

"http://files2.genymotion.com/dists/4.1.1/ova/genymotion_vbox86p_4.1.1_151117_133208.ova"


将http://file........ova 这个虚拟镜像地址直接用迅雷极速版下载,或者使用迅雷离线下载等功能很快能完成下载


2、把下载的文件复制到c:\users\用户主目录\appdata\local\genymobile\genymotion\ova 中覆盖里面以随机数命名的对应镜像。实际上就是刚才看到genymotion软件刚刚点击下载的那个镜像,


3、重新按照刚刚下载操作gui的下载步骤,你会发现对应的镜像已经可以使用了不需要下载了,验证安装后即会显示在设备列表中。

点击start ,启动模拟器,开始使用



simone 2016-10-11 14:44 发表评论
]]>
ubuntu mate 下的sublime text 3调用中文输入法的修改http://www.blogjava.net/wangxinsh55/archive/2016/08/19/431643.htmlsimonesimonefri, 19 aug 2016 09:53:00 gmthttp://www.blogjava.net/wangxinsh55/archive/2016/08/19/431643.htmlhttp://www.blogjava.net/wangxinsh55/comments/431643.htmlhttp://www.blogjava.net/wangxinsh55/archive/2016/08/19/431643.html#feedback0http://www.blogjava.net/wangxinsh55/comments/commentrss/431643.htmlhttp://www.blogjava.net/wangxinsh55/services/trackbacks/431643.html

本经验目前在已有搜狗输入法 for linux和sublime text 3的情况下安装成功。


保存下面的代码到文件sublime_imfix.c(位于~目录)


#include <gtk/gtkimcontext.h>

void gtk_im_context_set_client_window (gtkimcontext *context,

         gdkwindow    
*window)

{

 gtkimcontextclass 
*klass;

 g_return_if_fail (gtk_is_im_context (context));

 klass 
= gtk_im_context_get_class (context);

 
if (klass->set_client_window)

   klass
->set_client_window (context, window);

 g_object_set_data(g_object(context),
"window",window);

 
if(!gdk_is_window (window))

   
return;

 
int width = gdk_window_get_width(window);

 
int height = gdk_window_get_height(window);

 
if(width != 0 && height !=0)

   gtk_im_context_focus_in(context);

}



将上一步的代码编译成共享库libsublime-imfix.so,命令

cd ~

gcc -shared -o libsublime-imfix.so sublime_imfix.c  `pkg-config --libs --cflags gtk -2.0` -fpic


如果运行不成功,可能是某些库没有安装,执行下边的命令来安装缺失的库




sudo apt-get install build-essential
sudo apt-get install libgtk2.0-dev

然后将libsublime-imfix.so拷贝到sublime_text所在文件夹

sudo mv libsublime-imfix.so /opt/sublime_text/



修改sublime-text-2.desktop
注意:sublime_text.desktop不同版本有所不同,请调整为自己安装版本的路径
sudo vim /usr/share/applications/sublime_text.desktop

[desktop entry]
version
=1.0
type
=application
name
=sublime text
genericname
=text editor
comment
=sophisticated text editor for code, markup and prose
exec
=/usr/bin/subl %f        #这里修改执行路径为/usr/bin/subl,subl文件刚才已经修改过,大家应该记得
terminal
=false
mimetype
=text/plain;
icon=sublime-text
categories
=texteditor;development;
startupnotify=true
actions
=window;document;

[desktop action window]
name
=new window
exec
=/usr/bin/subl -n       #这里修改执行路径为/usr/bin/subl,subl文件刚才已经修改过,大家应该记得
onlyshowin
=unity;

[desktop action document]
name
=new file
exec
=/usr/bin/subl new_file    #这里修改执行路径为/usr/bin/subl,subl文件刚才已经修改过,大家应该记得
onlyshowin
=unity;

如果在命令行中执行/usr/bin/subl打开sublime text后,就应该可以使用中文输入法了。
另外在右键打开文件,还不能使用中文输入法,需要做如下步骤操作
打开“控制中心”-》打开“主菜单”-》“应用程序”树k目录中找到“编程”,找到“sublime text”,双击修改里边的命令为
/usr/bin/subl %f



simone 2016-08-19 17:53 发表评论
]]>
java 和 http 的那些事(二) 使用代理http://www.blogjava.net/wangxinsh55/archive/2016/08/02/431421.htmlsimonesimonetue, 02 aug 2016 06:11:00 gmthttp://www.blogjava.net/wangxinsh55/archive/2016/08/02/431421.htmlhttp://www.blogjava.net/wangxinsh55/comments/431421.htmlhttp://www.blogjava.net/wangxinsh55/archive/2016/08/02/431421.html#feedback0http://www.blogjava.net/wangxinsh55/comments/commentrss/431421.htmlhttp://www.blogjava.net/wangxinsh55/services/trackbacks/431421.htmlhttp://www.aneasystone.com/archives/2015/12/java-and-http-using-proxy.html

在上一篇博客中,我们分别介绍了两种方法来进行 http 的模拟请求:httpurlconnectionhttpclient ,到目前为止这两种方法都工作的很好,基本上可以实现我们需要的 get/post 方法的模拟。对于一个爬虫来说,能发送 http 请求,能获取页面数据,能解析网页内容,这相当于已经完成 80% 的工作了。只不过对于剩下的这 20% 的工作,还得花费我们另外 80% 的时间 :-)

在这篇博客里,我们将介绍剩下 20% 的工作中最为重要的一项:如何在 java 中使用 http 代理,代理也是爬虫技术中的重要一项。你如果要大规模的爬别人网页上的内容,必然会对人家的网站造成影响,如果你太拼了,就会遭人查封。要防止别人查封我 们,我们要么将自己的程序分布到大量机器上去,但是对于资金和资源有限的我们来说这是很奢侈的;要么就使用代理技术,从网上捞一批代理,免费的也好收费的 也好,或者购买一批廉价的 vps 来搭建自己的代理服务器。关于如何搭建自己的代理服务器,后面有时间的话我再写一篇关于这个话题的博客。现在有了一大批代理服务器之后,就可以使用我们这 篇博客所介绍的技术了。

一、简单的 http 代理

我们先从最简单的开始,网上有很多免费代理,直接上百度搜索 “免费代理” 或者 “http 代理” 就能找到很多(虽然网上能找到大量的免费代理,但它们的安全性已经有很多文章讨论过了,也有专门的文章对此进行调研的,譬如,我在这里就不多作说明,如果你的爬虫爬取的信息并没有什么特别的隐私问题,可以忽略之,如果你的爬虫涉及一些例如模拟登录之类的功能,考虑到安全性,我建议你还是不要使用网上公开的免费代理,而是搭建自己的代理服务器比较靠谱)。

1.1 httpurlconnection 使用代理

httpurlconnection 的 openconnection() 方法可以传入一个 proxy 参数,如下:

1
2
3
proxy proxy = new proxy(proxy.type.http, new inetsocketaddress("127.0.0.1", 9876));
url obj = new ;
httpurlconnection con = (httpurlconnection) obj.openconnection(proxy);

ok 了,就这么简单!

不仅如此,我们注意到 proxy 构造函数的第一个参数为枚举类型 proxy.type.http ,那么很显然,如果将其修改为 proxy.type.socks 即可以使用 socks 代理。

1.2 httpclient 使用代理

由于 httpclient 非常灵活,使用 httpclient 来连接代理有很多不同的方法。最简单的方法莫过于下面这样:

1
2
3
4
httphost proxy = new httphost("127.0.0.1", 9876, "http");
closeablehttpclient httpclient = httpclients.createdefault();
httpget request = new httpget(url);
closeablehttpresponse response = httpclient.execute(proxy, request);

和上一篇中使用 httpclient 发送请求的代码几乎一样,只是 httpclient.execute() 方法多加了一个参数,第一参数为 httphost 类型,我们这里设置成我们的代理即可。

这里要注意一点的是,虽然这里的 new httphost() 和上面的 new proxy() 一样,也是可以指定协议类型的,但是遗憾的是 httpclient 默认是不支持 socks 协议的,如果我们使用下面的代码:

1
httphost proxy = new httphost("127.0.0.1", 1080, "socks");

将会直接报协议不支持异常:

org.apache.http.conn.unsupportedschemeexception: socks protocol is not supported

如果希望 httpclient 支持 socks 代理,可以参看这里: 通过 httpclient 提供的 connectionsocketfactory 类来实现。

虽然使用这种方式很简单,只需要加个参数就可以了,但是其实看 httpclient 的代码注释,如下:

1
2
3
4
5
6
7
/*
* @param target    the target host for the request.
*                  implementations may accept null
*                  if they can still determine a route, for example
*                  to a default target or by inspecting the request.
* @param request   the request to execute
*/

可以看到第一个参数 target 并不是代理,它的真实作用是 执行请求的目标主机,这个解释有点模糊,什么叫做 执行请求的目标主机?代理算不算执行请求的目标主机呢?因为按常理来讲,执行请求的目标主机 应该是要请求 url 对应的站点才对。如果不算的话,为什么这里将 target 设置成代理也能正常工作?这个我也不清楚,还需要进一步研究下 httpclient 的源码来深入了解下。

除了上面介绍的这种方式(自己写的,不推荐使用)来使用代理之外,httpclient 凯发k8网页登录官网还提供了几个示例,我将其作为推荐写法记录在此。

第一种写法是使用 ,如下:

1
2
3
4
5
6
7
8
9
10
closeablehttpclient httpclient = httpclients.createdefault();      
httpget request = new httpget(url);
 
request.setconfig(
    requestconfig.custom()
        .setproxy(new httphost("45.32.21.237", 8888, "http"))
        .build()
);
         
closeablehttpresponse response = httpclient.execute(request);

第二种写法是使用 ,如下:

1
2
3
4
5
6
7
httphost proxy = new httphost("127.0.0.1", 9876, "http");
defaultproxyrouteplanner routeplanner = new defaultproxyrouteplanner(proxy);
closeablehttpclient httpclient = httpclients.custom()
        .setrouteplanner(routeplanner)
        .build();
httpget request = new httpget(url);
closeablehttpresponse response = httpclient.execute(request);

二、使用系统代理配置

我们在调试 http 爬虫程序时,常常需要切换代理来测试,有时候直接使用系统自带的代理配置将是一种简单的方法。以前在做 .net 项目时,程序默认使用 internet 网络设置中配的代理,遗憾的是,我这里说的系统代理配置指的 jvm 系统,而不是操作系统,我还没找到简单的方法来让 java 程序直接使用 windows 系统下的代理配置。

尽管如此,系统代理使用起来还是很简单的。一般有下面两种方式可以设置 jvm 的代理配置:

2.1 system.setproperty

java 中的 system 类不仅仅是用来给我们 system.out.println() 打印信息的,它其实还有很多静态方法和属性可以用。其中 system.setproperty() 就是比较常用的一个。

可以通过下面的方式来分别设置 http 代理,https 代理和 socks 代理:

1
2
3
4
5
6
7
8
9
10
11
12
// http 代理,只能代理 http 请求
system.setproperty("http.proxyhost", "127.0.0.1");
system.setproperty("http.proxyport", "9876");
 
// https 代理,只能代理 https 请求
system.setproperty("https.proxyhost", "127.0.0.1");
system.setproperty("https.proxyport", "9876");
 
// socks 代理,支持 http 和 https 请求
// 注意:如果设置了 socks 代理就不要设 http/https 代理
system.setproperty("socksproxyhost", "127.0.0.1");
system.setproperty("socksproxyport", "1080");

这里有三点要说明:

  1. 系统默认先使用 http/https 代理,如果既设置了 http/https 代理,又设置了 socks 代理,socks 代理会起不到作用
  2. 由于历史原因,注意 socksproxyhostsocksproxyport 中间没有小数点
  3. http 和 https 代理可以合起来缩写,如下:
1
2
3
// 同时支持代理 http/https 请求
system.setproperty("proxyhost", "127.0.0.1");
system.setproperty("proxyport", "9876");

2.2 jvm 命令行参数

可以使用 system.setproperty() 方法来设置系统代理,也可以直接将这些参数通过 jvm 的命令行参数来指定。如果你使用的是 eclipse ,可以按下面的步骤来设置:

  1. 按顺序打开:window -> preferences -> java -> installed jres -> edit
  2. 在 default vm arguments 中填写参数: -dproxyhost=127.0.0.1 -dproxyport=9876

2.3 使用系统代理

上面两种方法都可以设置系统,下面要怎么在程序中自动使用系统代理呢?

对于 httpurlconnection 类来说,程序不用做任何变动,它会默认使用系统代理。但是 httpclient 默认是不使用系统代理的,如果想让它默认使用系统代理,可以通过 systemdefaultrouteplannerproxyselector 来设置。示例代码如下:

1
2
3
4
5
6
systemdefaultrouteplanner routeplanner = new systemdefaultrouteplanner(proxyselector.getdefault());
closeablehttpclient httpclient = httpclients.custom()
        .setrouteplanner(routeplanner)
        .build();
httpget request = new httpget(url);    
closeablehttpresponse response = httpclient.execute(request);

参考



    simone 2016-08-02 14:11 发表评论
    ]]>
    正则表达式复杂应用http://www.blogjava.net/wangxinsh55/archive/2016/07/26/431335.htmlsimonesimonetue, 26 jul 2016 10:04:00 gmthttp://www.blogjava.net/wangxinsh55/archive/2016/07/26/431335.htmlhttp://www.blogjava.net/wangxinsh55/comments/431335.htmlhttp://www.blogjava.net/wangxinsh55/archive/2016/07/26/431335.html#feedback0http://www.blogjava.net/wangxinsh55/comments/commentrss/431335.htmlhttp://www.blogjava.net/wangxinsh55/services/trackbacks/431335.htmlclass reqparam(querystring: string, val encode: string = "gbk") : hashmap<string, string>() {
            
            init {
                querystring.split(
    "& ".toregex()).filter { it.contains("=") }.foreach {
                    val kv 
    = it.split("(?".toregex())
                    put(kv[
    0], urldecoder.decode(kv[1], encode))
                }
                
            }
        }

    以上是kotlin代码,是将链接地址串的 querystring 拆分k=v形式,并提取值

    fun main(args: array<string>) {
        val domain 
    = "fu.area.duxiu.com"
        val subdomain 
    = domain.replace(regex(""". ((\.\w ){2})"""), "$1")
        println(subdomain)
    }

    取主域名

    public static string cookiedomain(string domain) {
            
    if (domain.matches("((2[0-4]\\d|25[0-5]|[01]?\\d\\d?)\\.){3}(2[0-4]\\d|25[0-5]|[01]?\\d\\d?)")) {// 如果是ip地址,主域名就是ip地址
                return domain;
            }
            
    return domain.substring(domain.indexof("."));// 写子域名,域名前边加上"."
        }

    判断ip


    simone 2016-07-26 18:04 发表评论
    ]]>
    maven 依赖打包插件http://www.blogjava.net/wangxinsh55/archive/2016/07/20/431252.htmlsimonesimonewed, 20 jul 2016 01:42:00 gmthttp://www.blogjava.net/wangxinsh55/archive/2016/07/20/431252.htmlhttp://www.blogjava.net/wangxinsh55/comments/431252.htmlhttp://www.blogjava.net/wangxinsh55/archive/2016/07/20/431252.html#feedback0http://www.blogjava.net/wangxinsh55/comments/commentrss/431252.htmlhttp://www.blogjava.net/wangxinsh55/services/trackbacks/431252.html<plugin>
                    
    <groupid>org.apache.maven.pluginsgroupid>
                    
    <artifactid>maven-shade-pluginartifactid>
                    
    <version>2.4.2version>
                    
    <configuration>
                        
    <createdependencyreducedpom>falsecreatedependencyreducedpom>
                    
    configuration>
                    
    <executions>
                        
    <execution>
                            
    <phase>packagephase>
                            
    <goals>
                                
    <goal>shadegoal>
                            
    goals>
                            
    <configuration>
                                
    <artifactset>
                                    
    <includes>
                                        
    <include>org.apache.activemq:activemq-mqttinclude>
                                    
    includes>
                                
    artifactset>
                               
    <transformers>
    <transformer
    implementation="org.apache.maven.plugins.shade.resource.manifestresourcetransformer">
    <mainclass>com.duxiu.demo.app.applicationktmainclass>
    transformer>
    transformers>

                            
    configuration>
                        
    execution>
                    
    executions>
                
    plugin>


    此配置方式将所有的依赖包的源码都解压打包进去.
    如果是war包,会将整个站点解压打包进去.
    同时打包过程中如果有classpath和依赖的jar包有相同的类,会将classpath里的类替换掉依赖包里的类


    <plugin>
    <groupid>org.apache.maven.pluginsgroupid>
    <artifactid>maven-assembly-pluginartifactid>
    <configuration>
    <descriptorrefs>
    <descriptorref>jar-with-dependenciesdescriptorref>
    descriptorrefs>


    <archive>
    <manifest>
    <mainclass>com.duxiu.demo.app.applicationktmainclass>
    manifest>
    archive>
    configuration>
    <executions>
    <execution>
    <phase>packagephase>
    <goals>
    <goal>singlegoal>
    goals>
    execution>
    executions>
    plugin>
    只将依赖的jar包解压打包,对于静态文件等是不会打包的


    <plugin>
    <groupid>org.codehaus.mojogroupid>
    <artifactid>appassembler-maven-pluginartifactid>
    <version>1.10version>
    <configuration>

    <platforms>
    <platform>windowsplatform>
    <platform>unixplatform>
    platforms>

    <assembledirectory>${project.build.directory}/mallassembledirectory>

    <repositoryname>librepositoryname>

    <binfolder>binbinfolder>

    <configurationdirectory>confconfigurationdirectory>

    <copyconfigurationdirectory>truecopyconfigurationdirectory>

    <configurationsourcedirectory>src/main/resourcesconfigurationsourcedirectory>

    <repositorylayout>flatrepositorylayout>
    <encoding>utf-8encoding>
    <logsdirectory>logslogsdirectory>
    <tempdirectory>tmptempdirectory>
    <programs>
    <program>
    <id>mallid>

    <mainclass>com.duxiu.demo.app.applicationktmainclass>
    <jvmsettings>
    <extraarguments>
    <extraargument>-serverextraargument>
    <extraargument>-xmx2gextraargument>
    <extraargument>-xms2gextraargument>
    extraarguments>
    jvmsettings>
    program>
    programs>
    configuration>
    plugin>

    打包应用程序,并会生成bat或sh可执行文件



    <plugin>
    <artifactid>maven-antrun-pluginartifactid>
    <executions>
    <execution>
    <id>move-main-classid>
    <phase>compilephase>
    <configuration>
    <tasks>
    <move todir="${project.build.directory}/${project.artifactid}-${version}/com/duxiu/demo/app">
    <fileset dir="${project.build.directory}/classes/com/duxiu/demo/app">
    <include name="*.class" />
    fileset>
    move>
    tasks>
    configuration>
    <goals>
    <goal>rungoal>
    goals>
    execution>
    executions>
    plugin>

    打包的时候将包里的某个文件移动到指定的位置

    simone 2016-07-20 09:42 发表评论
    ]]>
    使用embeded tomcat进行嵌入式javaee开发-启动tomcathttp://www.blogjava.net/wangxinsh55/archive/2016/07/18/431229.htmlsimonesimonemon, 18 jul 2016 06:42:00 gmthttp://www.blogjava.net/wangxinsh55/archive/2016/07/18/431229.htmlhttp://www.blogjava.net/wangxinsh55/comments/431229.htmlhttp://www.blogjava.net/wangxinsh55/archive/2016/07/18/431229.html#feedback0http://www.blogjava.net/wangxinsh55/comments/commentrss/431229.htmlhttp://www.blogjava.net/wangxinsh55/services/trackbacks/431229.htmlhttps://www.iflym.com/index.php/code/use-embeded-tomcat-to-javaee-start-tomcat.html
    昨天在网上研究了下关于将tomcat嵌入到主程序中进行运行,而不是像以前将一个web项目copy到tomcat中进行运行。之所以这样做的原 因,即是因为项目部署到客户方,在进行更新的时候,需要手动地进行更新,再把相应代码copy到tomcat,然后再运行。运用embeded tomcat就可以将项目与tomcat分开,在进行更新时,先使用自定义的程序进行自动化更新,待更新完毕之后,再启动tomcat(或其它 javaee容器)进行项目运行。

         这样做的最终效果就是修改了项目的运行方式。原先的运行方式是以tomcat为中心,由tomcat来启动和终止项目,现在是由我们的启动程序 为中心,由启动程序来负责启动和终止项目。就相当于现在流行的cs程序一样,有单独的启动脚本,在启动时进行环境预初始化,更新程序以及其它操作,待完成 之后再进行最终的项目启动。

         这篇主要讲解如何使用embeded tomcat在代码中进行启动和终止。网上的一般文章均为tomca5.x来做,这里使用了最新的tomcat7,因为tomcat7为embeded开 发,单独发布了org.apache.tomcat.embed包,以进行独立的embed开发。以下是相应的maven包

    01
    02
    03
    04
    05
    06
    07
    08
    09
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    <dependency>
                <groupid>org.apache.tomcat.embedgroupid>
                <artifactid>tomcat-embed-coreartifactid>
                <version>7.0.2version>
            dependency>
            <dependency>
                <groupid>org.apache.tomcatgroupid>
                <artifactid>tomcat-utilartifactid>
                <version>7.0.2version>
            dependency>
            <dependency>
                <groupid>org.apache.tomcat.embedgroupid>
                <artifactid>tomcat-embed-jasperartifactid>
                <version>7.0.2version>
            dependency>
            <dependency>
                <groupid>org.apache.tomcat.embedgroupid>
                <artifactid>tomcat-embed-logging-juliartifactid>
                <version>7.0.2version>
            dependency>

        使用了embed包中的core包,以及用于编译jsp的jasper包,然后是工具类以及进行上场记录的logging-juli包。开始写代码:

    1
    2
    3
    4
    5
    6
    7
    //设置工作目录
            string catalina_home = "d:/";
            tomcat tomcat = new tomcat();
            tomcat.sethostname("localhost");
            tomcat.setport(startport);
            //设置工作目录,其实没什么用,tomcat需要使用这个目录进行写一些东西
            tomcat.setbasedir(catalina_home);

        上面使用了tomcat类来进行启动类,在tomcat7以前均是使用一个叫embed类来进行启动,在tomcat7之后,embed类被不建 议使用,而建议使用新的tomcat类来进行启动了。然后设置主机名,端口,再设置一个工作目录。这个工作目录可以是任意目录,主要是tomcat需要这 个目录来记录一些东西,比如记录word信息,日志信息(如果配置了日志的话),以及临时文件存储等。

    1
    2
    3
    4
    5
    6
    7
    8
    //设置程序的目录信息
            tomcat.gethost().setappbase("e:/");
            // add aprlifecyclelistener
            standardserver server = (standardserver) tomcat.getserver();
            aprlifecyclelistener listener = new aprlifecyclelistener();
            server.addlifecyclelistener(listener);
            //注册关闭端口以进行关闭
            tomcat.getserver().setport(shutdownport);

        上面的代码,首先设置我们的项目程序所在的appbase,即放项目代码的地方。在通常的tomcat配置中,这个目录一般是webapps。接 着设置一个listener,这个listener主要是负责启动一些比如html native支持程序以及ipv6等信息配置(可以忽略)。接着是配置一个关闭的注册端口,当向这个端口发送信息时,就可以达到关闭tomcat的目的 (后面会讲)。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    //加载上下文
            standardcontext standardcontext = new standardcontext();
            standardcontext.setpath("/aa");//contextpath
            standardcontext.setdocbase("aa");//文件目录位置
            standardcontext.addlifecyclelistener(new tomcat.defaultwebxmllistener());
            //保证已经配置好了。
            standardcontext.addlifecyclelistener(new tomcat.fixcontextlistener());
            standardcontext.setsessioncookiename("t-session");
            tomcat.gethost().addchild(standardcontext);

        我们单独使用了一个context来为这个host添加上下文,tomcat本身提供一个方法tomcat.addweb方法来添加项目包,不过 由于这里需要单独设置一个tomcat的sessionname,所以使用与与tomcat.addweb实现类似的方法来添加一个项目包。
        以上代码中有两个需要注意的listener,一个是defaultwebxmllistener,这个是由tomcat加载一些默认的配置信 息,比如jspservlet,以及一些繁复的mime/type信息;加上这个,就不需要我们自己去写这么多的配置,因为每个项目都需要这些。这个配置 与tomcat目录下的conf/web.xml中的配置一样,只不过这里是代码化了。第二个是fixcontextlistener,这个主要是在项目 部署完后,将这个上下文设置为configured,表示已经配置好了(不然,tomcat启动时会报错,即相应上下文还未配置好)。
        配置ok了之后,就是启动tomcat了:

    1
    2
    tomcat.start();
            tomcat.getserver().await();

        启动tomcat,并让tomcat在关闭端口上监听。如果没有最后一句,程序将直接结束,保证监听之后,tomcat将一直监听关闭事件,待有关闭事件之后才结束当前程序。所以如果想要关闭当前的tomcat,只需要向关闭端口发送一些信息即可:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    private static void shutdown() throws exception {
            socket socket = new socket("localhost", shutdownport);
            outputstream stream = socket.getoutputstream();
            for(int i = 0;i < shutdown.length();i )
                stream.write(shutdown.charat(i));
            stream.flush();
            stream.close();
            socket.close();
        }

        这样即可达到关闭tomcat的目的。

         实际上看整个项目代码,项目代码的运行,就是一个配置一个基础的server.xml(即tomcat目录下的 conf/server.xml),先配置运行端口,关闭监听端口;然后配置运行的host以及添加一个上下文context,最后就开始运行并开始监 听。对照这个程序,再看一下server.xml中的配置信息,就很容易明白以上这段代码了。



    simone 2016-07-18 14:42 发表评论
    ]]>
    secure kafka java producer with kerberoshttp://www.blogjava.net/wangxinsh55/archive/2016/07/05/431097.htmlsimonesimonetue, 05 jul 2016 03:41:00 gmthttp://www.blogjava.net/wangxinsh55/archive/2016/07/05/431097.htmlhttp://www.blogjava.net/wangxinsh55/comments/431097.htmlhttp://www.blogjava.net/wangxinsh55/archive/2016/07/05/431097.html#feedback0http://www.blogjava.net/wangxinsh55/comments/commentrss/431097.htmlhttp://www.blogjava.net/wangxinsh55/services/trackbacks/431097.html阅读全文

    simone 2016-07-05 11:41 发表评论
    ]]>
    kerberos 配置http://www.blogjava.net/wangxinsh55/archive/2016/07/05/431095.htmlsimonesimonetue, 05 jul 2016 03:37:00 gmthttp://www.blogjava.net/wangxinsh55/archive/2016/07/05/431095.htmlhttp://www.blogjava.net/wangxinsh55/comments/431095.htmlhttp://www.blogjava.net/wangxinsh55/archive/2016/07/05/431095.html#feedback0http://www.blogjava.net/wangxinsh55/comments/commentrss/431095.htmlhttp://www.blogjava.net/wangxinsh55/services/trackbacks/431095.htmlhttps://www.zybuluo.com/xtccc/note/176298

    blogjava-凯发k8网页登录

    在安装好kerberos的软件之后,会用到几个配置文件,例如
    /etc/krb5.conf
    /var/kerberos/krb5kdc/kdc.conf



    /etc/krb5.conf

    可以用命令man krb5.conf来查看关于该配置文件的说明

    先看一下该文件的模板:

    1. [logging]
    2. default = file:/var/log/krb5libs.log
    3. kdc = file:/var/log/krb5kdc.log
    4. admin_server = file:/var/log/kadmind.log

    5. [libdefaults]
    6. default_realm = example.com
    7. dns_lookup_realm = false
    8. dns_lookup_kdc = false
    9. ticket_lifetime = 24h
    10. renew_lifetime = 7d
    11. forwardable = true

    12. [realms]
    13. example.com = {
    14. kdc = example.com
    15. admin_server = example.com
    16. }

    17. [domain_realm]
    18. .example.com = example.com
    19. example.com = example.com



    关于几个重要配置项的说明
    [realms].kdc : the name of the host running a kdc for that realm.
    [realms].admin_server : identifies the host where the administration server is running. typically this is the master kerberos server.
    [domain_realm] : provides a translation from a hostname to the kerberos realm name for the service provided by that host.



    simone 2016-07-05 11:37 发表评论
    ]]>
    ubuntu kerberos配置http://www.blogjava.net/wangxinsh55/archive/2016/07/05/431096.htmlsimonesimonetue, 05 jul 2016 03:37:00 gmthttp://www.blogjava.net/wangxinsh55/archive/2016/07/05/431096.htmlhttp://www.blogjava.net/wangxinsh55/comments/431096.htmlhttp://www.blogjava.net/wangxinsh55/archive/2016/07/05/431096.html#feedback0http://www.blogjava.net/wangxinsh55/comments/commentrss/431096.htmlhttp://www.blogjava.net/wangxinsh55/services/trackbacks/431096.htmlhttp://www.blogjava.net/ivanwan/archive/2012/12/19/393221.html
    https://help.ubuntu.com/10.04/serverguide/kerberos.html

    kerberos

    kerberos is a network authentication system based on the principal of a trusted third party. the other two parties being the user and the service the user wishes to authenticate to. not all services and applications can use kerberos, but for those that can, it brings the network environment one step closer to being single sign on (sso).

    this section covers installation and configuration of a kerberos server, and some example client configurations.

    overview

    if you are new to kerberos there are a few terms that are good to understand before setting up a kerberos server. most of the terms will relate to things you may be familiar with in other environments:

    • principal: any users, computers, and services provided by servers need to be defined as kerberos principals.

    • instances: are used for service principals and special administrative principals.

    • realms: the unique realm of control provided by the kerberos installation. usually the dns domain converted to uppercase (example.com).

    • key distribution center: (kdc) consist of three parts, a database of all principals, the authentication server, and the ticket granting server. for each realm there must be at least one kdc.

    • ticket granting ticket: issued by the authentication server (as), the ticket granting ticket (tgt) is encrypted in the user's password which is known only to the user and the kdc.

    • ticket granting server: (tgs) issues service tickets to clients upon request.

    • tickets: confirm the identity of the two principals. one principal being a user and the other a service requested by the user. tickets establish an encryption key used for secure communication during the authenticated session.

    • keytab files: are files extracted from the kdc principal database and contain the encryption key for a service or host.

    to put the pieces together, a realm has at least one kdc, preferably two for redundancy, which contains a database of principals. when a user principal logs into a workstation, configured for kerberos authentication, the kdc issues a ticket granting ticket (tgt). if the user supplied credentials match, the user is authenticated and can then request tickets for kerberized services from the ticket granting server (tgs). the service tickets allow the user to authenticate to the service without entering another username and password.

    kerberos server

    installation

    before installing the kerberos server a properly configured dns server is needed for your domain. since the kerberos realm by convention matches the domain name, this section uses the example.com domain configured in .

    also, kerberos is a time sensitive protocol. so if the local system time between a client machine and the server differs by more than five minutes (by default), the workstation will not be able to authenticate. to correct the problem all hosts should have their time synchronized using the network time protocol (ntp). for details on setting up ntp see .

    the first step in installing a kerberos realm is to install the krb5-kdc and krb5-admin-server packages. from a terminal enter:

    sudo apt-get install krb5-kdc krb5-admin-server 

    you will be asked at the end of the install to supply a name for the kerberos and admin servers, which may or may not be the same server, for the realm.

    next, create the new realm with the kdb5_newrealm utility:

    sudo krb5_newrealm 

    configuration

    the questions asked during installation are used to configure the /etc/krb5.conf file. if you need to adjust the key distribution center (kdc) settings simply edit the file and restart the krb5-kdc daemon.

    1. now that the kdc running an admin user is needed. it is recommended to use a different username from your everyday username. using the kadmin.local utility in a terminal prompt enter:

      sudo kadmin.local authenticating as principal root/admin@example.com with password. kadmin.local: addprinc steve/admin warning: no policy specified for steve/admin@example.com; defaulting to no policy enter password for principal "steve/admin@example.com":  re-enter password for principal "steve/admin@example.com":  principal "steve/admin@example.com" created. kadmin.local: quit 

      in the above example steve is the principal, /admin is an instance, and @example.com signifies the realm. the "every day" principal would be steve@example.com, and should have only normal user rights.


      replace example.com and steve with your realm and admin username.

    2. next, the new admin user needs to have the appropriate access control list (acl) permissions. the permissions are configured in the /etc/krb5kdc/kadm5.acl file:

      steve/admin@example.com        * 

      this entry grants steve/admin the ability to perform any operation on all principals in the realm.

    3. now restart the krb5-admin-server for the new acl to take affect:

      sudo /etc/init.d/krb5-admin-server restart 
    4. the new user principal can be tested using the kinit utility:

      kinit steve/admin steve/admin@example.com's password: 

      after entering the password, use the klist utility to view information about the ticket granting ticket (tgt):

      klist credentials cache: file:/tmp/krb5cc_1000         principal: steve/admin@example.com    issued           expires          principal jul 13 17:53:34  jul 14 03:53:34  krbtgt/example.com@example.com 

      you may need to add an entry into the /etc/hosts for the kdc. for example:

      192.168.0.1   kdc01.example.com       kdc01 

      replacing 192.168.0.1 with the ip address of your kdc.

    5. in order for clients to determine the kdc for the realm some dns srv records are needed. add the following to /etc/named/db.example.com:

      _kerberos._udp.example.com.     in srv 1  0 88  kdc01.example.com. _kerberos._tcp.example.com.     in srv 1  0 88  kdc01.example.com. _kerberos._udp.example.com.     in srv 10 0 88  kdc02.example.com.  _kerberos._tcp.example.com.     in srv 10 0 88  kdc02.example.com.  _kerberos-adm._tcp.example.com. in srv 1  0 749 kdc01.example.com. _kpasswd._udp.example.com.      in srv 1  0 464 kdc01.example.com. 

      replace example.com, kdc01, and kdc02 with your domain name, primary kdc, and secondary kdc.

      see for detailed instructions on setting up dns.

    your new kerberos realm is now ready to authenticate clients.

    secondary kdc

    once you have one key distribution center (kdc) on your network, it is good practice to have a secondary kdc in case the primary becomes unavailable.

    1. first, install the packages, and when asked for the kerberos and admin server names enter the name of the primary kdc:

      sudo apt-get install krb5-kdc krb5-admin-server 
    2. once you have the packages installed, create the secondary kdc's host principal. from a terminal prompt, enter:

      kadmin -q "addprinc -randkey host/kdc02.example.com" 

      after, issuing any kadmin commands you will be prompted for your username/admin@example.com principal password.

    3. extract the keytab file:

      kadmin -q "ktadd -k keytab.kdc02 host/kdc02.example.com" 
    4. there should now be a keytab.kdc02 in the current directory, move the file to /etc/krb5.keytab:

      sudo mv keytab.kdc02 /etc/krb5.keytab 

      if the path to the keytab.kdc02 file is different adjust accordingly.

      also, you can list the principals in a keytab file, which can be useful when troubleshooting, using the klist utility:

      sudo klist -k /etc/krb5.keytab 
    5. next, there needs to be a kpropd.acl file on each kdc that lists all kdcs for the realm. for example, on both primary and secondary kdc, create /etc/krb5kdc/kpropd.acl:

      host/kdc01.example.com@example.com host/kdc02.example.com@example.com 
    6. create an empty database on the secondary kdc:

      sudo kdb5_util -s create 
    7. now start the kpropd daemon, which listens for connections from the kprop utility. kprop is used to transfer dump files:

      sudo kpropd -s 
    8. from a terminal on the primary kdc, create a dump file of the principal database:

      sudo kdb5_util dump /var/lib/krb5kdc/dump 
    9. extract the primary kdc's keytab file and copy it to /etc/krb5.keytab:

      kadmin -q "ktadd -k keytab.kdc01 host/kdc01.example.com" sudo mv keytab.kdc01 /etc/kr5b.keytab 

      make sure there is a host for kdc01.example.com before extracting the keytab.

    10. using the kprop utility push the database to the secondary kdc:

      sudo kprop -r example.com -f /var/lib/krb5kdc/dump kdc02.example.com 

      there should be a succeeded message if the propagation worked. if there is an error message check /var/log/syslog on the secondary kdc for more information.

      you may also want to create a cron job to periodically update the database on the secondary kdc. for example, the following will push the database every hour:

      # m h  dom mon dow   command 0 * * * * /usr/sbin/kdb5_util dump /var/lib/krb5kdc/dump && /usr/sbin/kprop -r example.com -f /var/lib/krb5kdc/dump kdc02.example.com 
    11. back on the secondary kdc, create a stash file to hold the kerberos master key:

      sudo kdb5_util stash 
    12. finally, start the krb5-kdc daemon on the secondary kdc:

      sudo /etc/init.d/krb5-kdc start 

    the secondary kdc should now be able to issue tickets for the realm. you can test this by stopping the krb5-kdc daemon on the primary kdc, then use kinit to request a ticket. if all goes well you should receive a ticket from the secondary kdc.

    kerberos linux client

    this section covers configuring a linux system as a kerberos client. this will allow access to any kerberized services once a user has successfully logged into the system.

    installation

    in order to authenticate to a kerberos realm, the krb5-user and libpam-krb5 packages are needed, along with a few others that are not strictly necessary but make life easier. to install the packages enter the following in a terminal prompt:

    sudo apt-get install krb5-user libpam-krb5 libpam-ccreds auth-client-config 

    the auth-client-config package allows simple configuration of pam for authentication from multiple sources, and the libpam-ccreds will cache authentication credentials allowing you to login in case the key distribution center (kdc) is unavailable. this package is also useful for laptops that may authenticate using kerberos while on the corporate network, but will need to be accessed off the network as well.

    configuration

    to configure the client in a terminal enter:

    sudo dpkg-reconfigure krb5-config 

    you will then be prompted to enter the name of the kerberos realm. also, if you don't have dns configured with kerberos srv records, the menu will prompt you for the hostname of the key distribution center (kdc) and realm administration server.

    the dpkg-reconfigure adds entries to the /etc/krb5.conf file for your realm. you should have entries similar to the following:

    [libdefaults]         default_realm = example.com ... [realms]         example.com = }                                 kdc = 192.168.0.1                                admin_server = 192.168.0.1         } 

    you can test the configuration by requesting a ticket using the kinit utility. for example:

    kinit steve@example.com password for steve@example.com: 

    when a ticket has been granted, the details can be viewed using klist:

    klist ticket cache: file:/tmp/krb5cc_1000 default principal: steve@example.com  valid starting     expires            service principal 07/24/08 05:18:56  07/24/08 15:18:56  krbtgt/example.com@example.com         renew until 07/25/08 05:18:57   kerberos 4 ticket cache: /tmp/tkt1000 klist: you have no tickets cached 

    next, use the auth-client-config to configure the libpam-krb5 module to request a ticket during login:

    sudo auth-client-config -a -p kerberos_example 

    you will should now receive a ticket upon successful login authentication.

    resources

    • for more information on kerberos see the site.

    • the page has more details.

    • o'reilly's is a great reference when setting up kerberos.

    • also, feel free to stop by the #ubuntu-server irc channel on if you have kerberos questions.



    simone 2016-07-05 11:37 发表评论
    ]]>
    kerberos安装配置 http://www.blogjava.net/wangxinsh55/archive/2016/07/05/431094.htmlsimonesimonetue, 05 jul 2016 03:36:00 gmthttp://www.blogjava.net/wangxinsh55/archive/2016/07/05/431094.htmlhttp://www.blogjava.net/wangxinsh55/comments/431094.htmlhttp://www.blogjava.net/wangxinsh55/archive/2016/07/05/431094.html#feedback0http://www.blogjava.net/wangxinsh55/comments/commentrss/431094.htmlhttp://www.blogjava.net/wangxinsh55/services/trackbacks/431094.htmlhttp://blog.csdn.net/caizhongda/article/details/7947722
    安装步骤: 
    1.下载krb5-1.9 
    http://web.mit.edu/kerberos/dist/krb5/1.9/krb5-1.9-signed.tar 

    2.解压 
    tar -xvf krb5-1.9.signed.tar 
    生成krb5-1.9.tar.gz 和krb5-1.9.tar.gz.asc 
    继续解压tar zxvf krb5-1.9.tar.gz 

    3.编译 
    cd krb5-1.9/src 
    ./configure 
    make 
    make install 

    4.配置/etc/krb5.conf 
    这个是kerberos最主要的配置文件,而且一定要放在/etc下 
    xml代码  
    1. [libdefaults]  
    2.       default_realm = 360buy.com  
    3.   
    4. [realms]  
    5.       360buy.com = {  
    6.             kdc = m1.360buy.com  
    7.             admin_server = m1.360buy.com  
    8.             default_domain =360buy.com  
    9.       }  
    10.   
    11. [logging]  
    12.       kdc = file:/data/logs/krb5/krb5kdc.log  
    13.       admin_server = file:/data/logs/krb5/kadmin.log  
    14.       default = file:/data/logs/krb5/krb5lib.log  

    [libdefaults]中的defalt_realm表示在不给出域的时候,默认采用这个 
    [logging]中的是指定日志的位置 
    [realms]是最重要的也是kerberos中最难的概念。,称为kerberos域,表示kdc所管辖的范围,可以和dns域名一样,也可以不一样 

    5.配置/usr/local/var/krb5kdc/kdc.conf 
    由于上面安装时没有选择安装目录,所以默认的安装位置在/usr/local/var/krb5kdc 
    xml代码  
    1. [kdcdefaults]  
    2.       kdc_ports=750,88  
    3.   
    4. [realm]  
    5.       360buy.com ={  
    6.             database_name=/usr/local/var/krb5kdc/principal  
    7.             admin_keytab=/usr/local/var/krb5kdc/kadm5.keytab  
    8.             acl_file=/usr/local/var/krb5kdc/kadm5.acl  
    9.             key_stash_file=/usr/local/var/krb5kdc/.k5.360buy.com  
    10.             kdc_ports=750,88  
    11.             max_life=10h 0m 0s  
    12.             max_renewable_life=7d 0h 0m 0s  
    13.       }  



    6.创建一个kerberos数据库 
    java代码  
    1. /usr/local/sbin/kdb5_util create -r 360buy.com -s  

    会要求创建数据库的密码。 
    并且创建/usr/local/var/krb5kdc/principal保存数据库文件 

    7.登录kerberos 
    java代码  
    1. /usr/local/sbin/kadmin.local   


    1)查看用户 
    listprincs 

    2)添加用户 
    addprinc admin/admin@360buy.com 

    3)删除用户 
    delprinc 

    4)创建keytab文件, 
    java代码  
    1. ktadd -k /usr/local/var/krb5kdc/kadm5.keytab kadmin/admin kadmin/changepw  

    可以用kadd来增加用户的权限 
    注意kadm5.keytab的路径要与kdc.conf中的路径一致 


    8.重启krb5kdc和kadmind进程 
    /usr/local/sbin/kadmind 
    /usr/local/sbin/krb5kdc 

    9.更改/etc/hosts文件 
    添加对应的host 
    192.168.101.201 m1.360buy.com kdc 
    192.168.101.202 m2.360buy.com client 
    并且需要修改对应的hostname 


    10.在kdc服务器上测试票据请求
     
    /usr/local/sbin/kadmin.local 
    kadmin.local:addprinc winston@360buy.com 
    提示创建密码,然后退出 

    su winston 
    $ kinit winston@360buy.com 
    提示输入刚刚创建的密码 

    $ klist 查看自己申请的票据 

    11.在client端安装kerberos 
    同样需要编译,但是只需要配置文件/etc/krb5.conf 
    内容和服务器的一致 

    12.测试kdc服务器申请票据 
    su winston 
    $ kinit winston@360buy.com 
    提示输入刚刚创建的密码 


    addprinc -randkey hdfs/sl.360buy.com@360buy.com 
    ktadd -norandkey -k hdfs.keytab hdfs/s1.360buy.com host/master.360buy.com 


    simone 2016-07-05 11:36 发表评论
    ]]>
    apache kafka security 101http://www.blogjava.net/wangxinsh55/archive/2016/06/30/431061.htmlsimonesimonethu, 30 jun 2016 11:16:00 gmthttp://www.blogjava.net/wangxinsh55/archive/2016/06/30/431061.htmlhttp://www.blogjava.net/wangxinsh55/comments/431061.htmlhttp://www.blogjava.net/wangxinsh55/archive/2016/06/30/431061.html#feedback0http://www.blogjava.net/wangxinsh55/comments/commentrss/431061.htmlhttp://www.blogjava.net/wangxinsh55/services/trackbacks/431061.html阅读全文

    simone 2016-06-30 19:16 发表评论
    ]]>
    mysql分库分表的全局唯一id生成器方案http://www.blogjava.net/wangxinsh55/archive/2016/06/28/431035.htmlsimonesimonetue, 28 jun 2016 10:48:00 gmthttp://www.blogjava.net/wangxinsh55/archive/2016/06/28/431035.htmlhttp://www.blogjava.net/wangxinsh55/comments/431035.htmlhttp://www.blogjava.net/wangxinsh55/archive/2016/06/28/431035.html#feedback0http://www.blogjava.net/wangxinsh55/comments/commentrss/431035.htmlhttp://www.blogjava.net/wangxinsh55/services/trackbacks/431035.htmlhttp://lztian.com/blog/5921.html

    借用mysql 的 auto_increment 特性可以产生唯一的可靠id。

    表定义,关键在于auto_increment,和unique key的设置:

    1
    2
    3
    4
    5
    6
    create table `tickets64` (
      `id` bigint(20) unsigned not null auto_increment,
      `stub` char(1) not null default '',
      primary key  (`id`),
      unique key `stub` (`stub`)
    ) engine=myisam

    需要使用时,巧用replace into语法来获取值,结合表定义的unique key,确保了一条记录就可以满足id生成器的需求:

    1
    2
    replace into tickets64 (stub) values ('a');
    select last_insert_id();

    以上方式中,通过mysql的机制,可以确保此id的唯一和自增,且适用于多并发的场景。官方对此的描述:https://dev.mysql.com/doc/refman/5.0/en/information-functions.html

    1
    2
    3
    it is multi-user safe because multiple clients can issue the update statement and
    get their own sequence value with the select statement (or mysql_insert_id()),
    without affecting or being affected by other clients that generate their own sequence values.

    需要注意的是,若client采用php,则不能使用mysql_insert_id()获取id,原因见《mysql_insert_id() 在bigint型ai字段遇到的问题》:http://kaifage.com/notes/99/mysql-insert-id-issue- with-bigint-ai-field.html。

    flickr 采取了此方案: http://code.flickr.net/2010/02/08/ticket-servers-distributed-unique-primary-keys-on-the-cheap/

    相关:

    http://www.zhihu.com/question/30674667

    http://my.oschina.net/u/142836/blog/174465



    simone 2016-06-28 18:48 发表评论
    ]]>
    java修改static final常量值 http://www.blogjava.net/wangxinsh55/archive/2016/06/28/431031.htmlsimonesimonetue, 28 jun 2016 09:32:00 gmthttp://www.blogjava.net/wangxinsh55/archive/2016/06/28/431031.htmlhttp://www.blogjava.net/wangxinsh55/comments/431031.htmlhttp://www.blogjava.net/wangxinsh55/archive/2016/06/28/431031.html#feedback0http://www.blogjava.net/wangxinsh55/comments/commentrss/431031.htmlhttp://www.blogjava.net/wangxinsh55/services/trackbacks/431031.htmlhttp://ljhzzyx.blog.163.com/blog/static/3838031220141011111435161/
    java中,final标识的变量是不可修改的,但是通过反射的方式达到修改的目的。修改的示例也很简单,在这里 http://stackoverflow.com/questions/2474017/using-reflection-to-change-static-final-file-separatorchar-for-unit-testing
    public class everythingistrue {
        static void setfinalstatic(field field, object newvalue) throws exception {
            field.setaccessible(true);
            field modifiersfield = field.class.getdeclaredfield("modifiers");
            modifiersfield.setaccessible(true);
            modifiersfield.setint(field, field.getmodifiers() & ~modifier.final);
            field.set(null, newvalue);
        }
        public static void main(string args[]) throws exception {
            setfinalstatic(boolean.class.getfield("false"), true);
            system.out.format("everything is %s", false); // "everything is true"
        }
    }
        关键点在于.setaccessible(true),并且修改modifiers去除final属性。获得修饰符的方式可以通过java.lang.reflect.modifier,详细说明在这里
    http://blog.csdn.net/xiao__gui/article/details/8141216
    通过modifier的ispublic、isprivate、isstatic等方法,可以判断是否包含某些修饰符
    boolean isstatic = modifier.isstatic(field.getmodifiers());
    if(isstatic) {
        system.out.println(field.get(null).tostring());
    }
    这里的field是静态类型的,因此field.get(null)方法的参数,可以是null,也可以是a.class这样的目标类,不用提供实例对象。查看java.lang.reflect.modifier的代码,可以知道对修饰符的定义是通过二进制位来实现的。上面文章中有举例

    public static,对应的整数就是二进制的:1001,也就是9。如下:

    ……

    native

    transient

    volatile

    synchronized

    final

    static

    protected

    private

    public

     

    0

    0

    0

    0

    0

    1

    0

    0

    1


    源码中的完整定义如下
    public static final int public           = 0x00000001;
    public static final int private          = 0x00000002;
    public static final int protected        = 0x00000004;
    public static final int static           = 0x00000008;
    public static final int final            = 0x00000010;
    public static final int synchronized     = 0x00000020;
    public static final int volatile         = 0x00000040;
    public static final int transient        = 0x00000080;
    public static final int native           = 0x00000100;
    public static final int interface        = 0x00000200;
    public static final int abstract         = 0x00000400;
    public static final int strict           = 0x00000800;
    根据数值,得到完整的顺序是这样的
    strict,abstract,interface,native,transient,volatile,synchronized,final,static,protected,private,public
          由此就可以了解field.getmodifiers() & ~modifier.final这部分的含义的,先~modifier.final将final所在的位设置为0,其他所有位设置为1。field.getmodifiers() & ~modifier.final与的操作,就是将field的modifiers属性修饰符中final给去除掉。
          但是在自己尝试的过程中,发现一个问题。设置final变量的方法是field.set(),如果在这个方法之前调用了field.get()方法,顺序如下面这样
    field.get(null).tostring();
    ...
    field.set(null, newvalue);
    这时对final变量的赋值就会报错,就算.setaccessible(true);也是没有用的。具体原因尚不清楚,估计需要跟踪源码才能查清楚。

          需要注意的是,对于int、long、boolean以及string等基本类型,由于编译器优化的原因,很多使用常量的地方的值还是原来的数值。如
    if (index > maxformatrecordsindex) {
        index  =  maxformatrecordsindex;
    }
    maxformatrecordsindex为final,则被编译器改成这样
    if (index > 100) {
        index = 100;
    }
    system.out.println(bean.int_value);
    //编译时会被优化成下面这样:
    system.out.println(100);
    所以正常的使用方式还是获取原来的值,获得修改后的final常量的值需要用field.get(null)这样的方式。
          总体来讲,改基本类型的final常量的用处还是不大,如果是非基本类型常量,则有实际意义。


    simone 2016-06-28 17:32 发表评论
    ]]>
    mysql之表分区----按日期分区 http://www.blogjava.net/wangxinsh55/archive/2016/06/07/430822.htmlsimonesimonetue, 07 jun 2016 10:06:00 gmthttp://www.blogjava.net/wangxinsh55/archive/2016/06/07/430822.htmlhttp://www.blogjava.net/wangxinsh55/comments/430822.htmlhttp://www.blogjava.net/wangxinsh55/archive/2016/06/07/430822.html#feedback0http://www.blogjava.net/wangxinsh55/comments/commentrss/430822.htmlhttp://www.blogjava.net/wangxinsh55/services/trackbacks/430822.htmlhttp://blog.sina.com.cn/s/blog_888269b20100w7kf.html

    mysql 5.1已经到了beta版,官方网站上也陆续有一些文章介绍,比如上次看到的。在使用分区的前提下,可以用mysql实现非常大的数据量存储。今天在mysql的站上又看到一篇进阶的文章 —— 。如果能够实现按日期分区,这对某些时效性很强的数据存储是相当实用的功能。下面是从这篇文章中摘录的一些内容。

    错误的按日期分区例子

    最直观的方法,就是直接用年月日这种日期格式来进行常规的分区:

    code:
    1. mysql>  create table rms (d date)
    2.     ->  partition by range (d)
    3.     -> (partition p0 values less than ('1995-01-01'),
    4.     ->  partition p1 values less than ('2010-01-01'));

     

    上面的例子中,就是直接用"y-m-d"的格式来对一个table进行分区,可惜想当然往往不能奏效,会得到一个错误信息:

    error 1064 (42000): values value must be of same type as partition function near '),
    partition p1 values less than ('2010-01-01'))' at line 3

    上述分区方式没有成功,而且明显的不经济,老练的dba会用整型数值来进行分区:

    code:
    1. mysql> create table part_date1
    2.     ->      (  c1 int default null,
    3.     ->  c2 varchar(30) default null,
    4.     ->  c3 date default null) engine=myisam
    5.     ->      partition by range (cast(date_format(c3,'%y%m%d') as signed))
    6.     -> (partition p0 values less than (19950101),
    7.     -> partition p1 values less than (19960101) ,
    8.     -> partition p2 values less than (19970101) ,
    9.     -> partition p3 values less than (19980101) ,
    10.     -> partition p4 values less than (19990101) ,
    11.     -> partition p5 values less than (20000101) ,
    12.     -> partition p6 values less than (20010101) ,
    13.     -> partition p7 values less than (20020101) ,
    14.     -> partition p8 values less than (20030101) ,
    15.     -> partition p9 values less than (20040101) ,
    16.     -> partition p10 values less than (20100101),
    17.     -> partition p11 values less than maxvalue );
    18. query ok, 0 rows affected (0.01 sec)

     

    搞定?接着往下分析

    code:
    1. mysql> explain partitions
    2.     -> select count(*) from part_date1 where
    3.     ->      c3> date '1995-01-01' and c3 '1995-12-31'\g
    4. *************************** 1. row ***************************
    5.            id: 1
    6.   select_type: simple
    7.         table: part_date1
    8.    partitions: p0,p1,p2,p3,p4,p5,p6,p7,p8,p9,p10,p11
    9.          type: all
    10. possible_keys: null
    11.           key: null
    12.       key_len: null
    13.           ref: null
    14.          rows: 8100000
    15.         extra: using where
    16. 1 row in set (0.00 sec)

     

    万恶的mysql居然对上面的sql使用全表扫描,而不是按照我们的日期分区分块查询。原文中解释到的优化器并不认这种日期形式的分区,花了大量的篇幅来引诱俺走上歧路,过分。

    正确的日期分区例子

    mysql优化器支持以下两种内置的日期函数进行分区:

    • to_days()
    • year()

    看个例子:

    code:
    1. mysql> create table part_date3
    2.     ->      (  c1 int default null,
    3.     ->  c2 varchar(30) default null,
    4.     ->  c3 date default null) engine=myisam
    5.     ->      partition by range (to_days(c3))
    6.     -> (partition p0 values less than (to_days('1995-01-01')),
    7.     -> partition p1 values less than (to_days('1996-01-01')) ,
    8.     -> partition p2 values less than (to_days('1997-01-01')) ,
    9.     -> partition p3 values less than (to_days('1998-01-01')) ,
    10.     -> partition p4 values less than (to_days('1999-01-01')) ,
    11.     -> partition p5 values less than (to_days('2000-01-01')) ,
    12.     -> partition p6 values less than (to_days('2001-01-01')) ,
    13.     -> partition p7 values less than (to_days('2002-01-01')) ,
    14.     -> partition p8 values less than (to_days('2003-01-01')) ,
    15.     -> partition p9 values less than (to_days('2004-01-01')) ,
    16.     -> partition p10 values less than (to_days('2010-01-01')),
    17.     -> partition p11 values less than maxvalue );
    18. query ok, 0 rows affected (0.00 sec)

     

    以to_days()函数分区成功,我们分析一下看看:

    code:
    1. mysql> explain partitions
    2.     -> select count(*) from part_date3 where
    3.     ->      c3> date '1995-01-01' and c3 '1995-12-31'\g
    4. *************************** 1. row ***************************
    5.            id: 1
    6.   select_type: simple
    7.         table: part_date3
    8.    partitions: p1
    9.          type: all
    10. possible_keys: null
    11.           key: null
    12.       key_len: null
    13.           ref: null
    14.          rows: 808431
    15.         extra: using where
    16. 1 row in set (0.00 sec)

     

    可以看到,优化器这次不负众望,仅仅在p1分区进行查询。在这种情况下查询,真的能够带来提升查询效率么?下面分别对这次建立的part_date3和之前分区失败的part_date1做一个查询对比:

    code:
    1. mysql> select count(*) from part_date3 where
    2.     ->      c3> date '1995-01-01' and c3 '1995-12-31';
    3. ----------
    4. | count(*) |
    5. ----------
    6.  805114 |
    7. ----------
    8. 1 row in set (4.11 sec)
    9.  
    10. mysql> select count(*) from part_date1 where
    11.     ->      c3> date '1995-01-01' and c3 '1995-12-31';
    12. ----------
    13. | count(*) |
    14. ----------
    15.  805114 |
    16. ----------
    17. 1 row in set (40.33 sec)

     

    可以看到,分区正确的话query花费时间为4秒,而分区错误则花费时间40秒(相当于没有分区),效率有90%的提升!所以我们千万要正确的使用分区功能,分区后务必用explain验证,这样才能获得真正的性能提升。


    注意:

    在mysql5.1中建立分区表的语句中,只能包含下列函数:
    abs()
    ceiling() and floor() (在使用这2个函数的建立分区表的前提是使用函数的分区键是int类型),例如

    mysql> create table t (c float) partition by list( floor(c) )(     -> partition p0 values in (1,3,5),     -> partition p1 values in (2,4,6)     -> );; error 1491 (hy000): the partition function returns the wrong type   mysql> create table t (c int) partition by list( floor(c) )(     -> partition p0 values in (1,3,5),     -> partition p1 values in (2,4,6)     -> ); query ok, 0 rows affected (0.01 sec) 

    day()
    dayofmonth()
    dayofweek()
    dayofyear()
    datediff()
    extract()
    hour()
    microsecond()
    minute()
    mod()
    month()
    quarter()
    second()
    time_to_sec()
    to_days()
    weekday()
    year()
    yearweek()



    simone 2016-06-07 18:06 发表评论
    ]]>
    max mqtt connectionshttp://www.blogjava.net/wangxinsh55/archive/2016/06/01/430732.htmlsimonesimonewed, 01 jun 2016 08:15:00 gmthttp://www.blogjava.net/wangxinsh55/archive/2016/06/01/430732.htmlhttp://www.blogjava.net/wangxinsh55/comments/430732.htmlhttp://www.blogjava.net/wangxinsh55/archive/2016/06/01/430732.html#feedback0http://www.blogjava.net/wangxinsh55/comments/commentrss/430732.htmlhttp://www.blogjava.net/wangxinsh55/services/trackbacks/430732.htmlhttp://stackoverflow.com/questions/29358313/max-mqtt-connections?answertab=votes#tab-top


    4
    3

    i have a need to create a server farm that can handle 5 million connections, 5 million topics (one per client), process 300k messages/sec.

    i tried to see what various message brokers were capable so i am currently using two rhel ec2 instances (r3.4xlarge) to make lots of available resources. so you do not need to look it up, it has 16vcpu, 122gb ram. i am nowhere near that limit in usage.

    i am unable to pass the 600k connections limit. since there doesn't seem to be any o/s limitation (plenty of ram/cpu/etc.) on either the client nor the server what is limiting me?

    i have edited /etc/security/limits.conf as follows:

    * soft  nofile  20000000 * hard  nofile  20000000  * soft  nproc  20000000 * hard  nproc  20000000  root  soft  nofile 20000000 root  hard  nofile 20000000 

    i have edited /etc/sysctl.conf as follows:

    net.ipv4.ip_local_port_range = 1024 65535   net.ipv4.tcp_tw_reuse = 1  net.ipv4.tcp_mem = 5242880  5242880 5242880  net.ipv4.tcp_tw_recycle = 1  fs.file-max = 20000000  fs.nr_open = 20000000  net.ipv4.tcp_syncookies = 0  net.ipv4.tcp_max_syn_backlog = 10000  net.ipv4.tcp_synack_retries = 3  net.core.somaxconn=65536  net.core.netdev_max_backlog=100000  net.core.optmem_max = 20480000 

    for apollo: export apollo_ulimit=20000000

    for activemq:

    activemq_opts="$activemq_opts -dorg.apache.activemq.usededicatedtaskrunner=false" activemq_opts_memory="-xms50g -xmx115g" 

    i created 20 additional private addresses for eth0 on the client, then assigned them: ip addr add 11.22.33.44/24 dev eth0

    i am fully aware of the 65k port limits which is why i did the above.

    • for activemq i got to: 574309
    • for apollo i got to: 592891
    • for rabbit i got to 90k but logging was awful and couldn't figure out what to do to go higher although i know its possible.
    • for hive i got to trial limit of 1000. awaiting a license
    • ibm wants to trade the cost of my house to use them - nah!
        
    asked mar 30 '15 at 23:52
    10311
       
    can't really tell how to increase the throughput. however, checkout  . not sure about the mqtt support, but it seems capable of extrem throughput / # clients. –  mar 31 '15 at 7:52
       
    did you try mosquitto? () –  apr 2 '15 at 8:02
       
    trying hive, apollo, mosquito, active, rabbit, mosquito –  apr 2 '15 at 21:58

    1 answer

    2accepted

    answer: while doing this i realized that i had a misspelling in my client setting within /etc/sysctl.conf file for: net.ipv4.ip_local_port_range

    i am now able to connect 956,591 mqtt clients to my apollo server in 188sec.


    more info: trying to isolate if this is an o/s connection limitation or a broker, i decided to write a simple client/server.

    the server:

        socket client = null;     server = new serversocket(1884);     while (true) {         client = server.accept();         clients.add(client);     } 

    the client:

        while (true) {         inetaddress clientiptobindto = getnextclientvip();         socket client = new socket(hostname, 1884, clientiptobindto, 0);         clients.add(client);     } 

    with 21 ips, i would expect 65535-1024*21 = 1354731 to be the boundary. in reality i am able to achieve 1231734

    [root@ip ec2-user]# cat /proc/net/sockstat sockets: used 1231734 tcp: inuse 5 orphan 0 tw 0 alloc 1231307 mem 2 udp: inuse 4 mem 1 udplite: inuse 0 raw: inuse 0 frag: inuse 0 memory 0 

    so the socket/kernel/io stuff is worked out.

    i am still unable to achieve this using any broker.

    again just after my client/server test this is the kernel settings.

    client:

    [root@ip ec2-user]# sysctl -p net.ipv4.ip_local_port_range = 1024     65535 net.ipv4.tcp_tw_reuse = 1 net.ipv4.tcp_mem = 5242880      5242880 15242880 net.ipv4.tcp_tw_recycle = 1 fs.file-max = 20000000 fs.nr_open = 20000000  [root@ip ec2-user]# cat /etc/security/limits.conf * soft  nofile  2000000 * hard  nofile  2000000     root  soft  nofile 2000000 root  hard  nofile 2000000 

    server:

    [root@ ec2-user]# sysctl -p net.ipv4.tcp_tw_reuse = 1 net.ipv4.tcp_mem = 5242880      5242880 5242880 net.ipv4.tcp_tw_recycle = 1 fs.file-max = 20000000 fs.nr_open = 20000000 net.ipv4.tcp_syncookies = 0 net.ipv4.tcp_max_syn_backlog = 1000000 net.ipv4.tcp_synack_retries = 3 net.core.somaxconn = 65535 net.core.netdev_max_backlog = 1000000 net.core.optmem_max = 20480000 


    simone 2016-06-01 16:15 发表评论
    ]]>
    hdfs配置kerberos认证 http://www.blogjava.net/wangxinsh55/archive/2016/05/31/430718.htmlsimonesimonetue, 31 may 2016 09:24:00 gmthttp://www.blogjava.net/wangxinsh55/archive/2016/05/31/430718.htmlhttp://www.blogjava.net/wangxinsh55/comments/430718.htmlhttp://www.blogjava.net/wangxinsh55/archive/2016/05/31/430718.html#feedback0http://www.blogjava.net/wangxinsh55/comments/commentrss/430718.htmlhttp://www.blogjava.net/wangxinsh55/services/trackbacks/430718.html阅读全文

    simone 2016-05-31 17:24 发表评论
    ]]>
    spark history server配置使用http://www.blogjava.net/wangxinsh55/archive/2016/05/26/430665.htmlsimonesimonethu, 26 may 2016 06:12:00 gmthttp://www.blogjava.net/wangxinsh55/archive/2016/05/26/430665.htmlhttp://www.blogjava.net/wangxinsh55/comments/430665.htmlhttp://www.blogjava.net/wangxinsh55/archive/2016/05/26/430665.html#feedback0http://www.blogjava.net/wangxinsh55/comments/commentrss/430665.htmlhttp://www.blogjava.net/wangxinsh55/services/trackbacks/430665.htmlhttp://www.cnblogs.com/luogankun/p/3981645.html

    spark history server产生背景

    以standalone运行模式为例,在运行spark application的时候,spark会提供一个webui列出应用程序的运行时信息;但该webui随着application的完成(成功/失 败)而关闭,也就是说,spark application运行完(成功/失败)后,将无法查看application的历史记录;

    spark history server就是为了应对这种情况而产生的,通过配置可以在application执行的过程中记录下了日志事件信息,那么在application执行 结束后,webui就能重新渲染生成ui界面展现出该application在执行过程中的运行时信息;

    spark运行在yarn或者mesos之上,通过spark的history server仍然可以重构出一个已经完成的application的运行时参数信息(假如application运行的事件日志信息已经记录下来);

     

    配置&使用spark history server

    以默认配置的方式启动spark history server:

    cd $spark_home/sbin start-history-server.sh

    报错:

    starting org.apache.spark.deploy.history.historyserver, logging to /home/spark/software/source/compile/deploy_spark/sbin/../logs/spark-spark-org.apache.spark.deploy.history.historyserver-1-hadoop000.out failed to launch org.apache.spark.deploy.history.historyserver:         at org.apache.spark.deploy.history.fshistoryprovider.(fshistoryprovider.scala:44)         ... 6 more

    需要在启动时指定目录:

    start-history-server.sh hdfs://hadoop000:8020/directory

    hdfs://hadoop000:8020/directory可以配置在配置文件中,那么在启动history-server时就不需要指定,后续介绍怎么配置;

    注:该目录需要事先在hdfs上创建好,否则history-server启动报错。

    启动完成之后可以通过webui访问,默认端口是18080:http://hadoop000:18080

    默认界面列表信息是空的,下面截图是我跑了几次spark-sql测试后出现的。

     

    history server相关的配置参数描述

    1) spark.history.updateinterval
      默认值:10
      以秒为单位,更新日志相关信息的时间间隔

    2)spark.history.retainedapplications
      默认值:50
      在内存中保存application历史记录的个数,如果超过这个值,旧的应用程序信息将被删除,当再次访问已被删除的应用信息时需要重新构建页面。

    3)spark.history.ui.port
      默认值:18080
      historyserver的web端口

    4)spark.history.kerberos.enabled
      默认值:false
      是否使用kerberos方式登录访问historyserver,对于持久层位于安全集群的hdfs上是有用的,如果设置为true,就要配置下面的两个属性

    5)spark.history.kerberos.principal
      默认值:用于historyserver的kerberos主体名称

    6)spark.history.kerberos.keytab
      用于historyserver的kerberos keytab文件位置

    7)spark.history.ui.acls.enable
      默认值:false
      授权用户查看应用程序信息的时候是否检查acl。如果启用,只有应用程序所有者和spark.ui.view.acls指定的用户可以查看应用程序信息;否则,不做任何检查

    8)spark.eventlog.enabled
      默认值:false
      是否记录spark事件,用于应用程序在完成后重构webui

    9)spark.eventlog.dir
      默认值:file:///tmp/spark-events
      保存日志相关信息的路径,可以是hdfs://开头的hdfs路径,也可以是file://开头的本地路径,都需要提前创建

    10)spark.eventlog.compress
      默认值:false
      是否压缩记录spark事件,前提spark.eventlog.enabled为true,默认使用的是snappy

    以spark.history开头的需要配置在spark-env.sh中的spark_history_opts,以spark.eventlog开头的配置在spark-defaults.conf

     

    我在测试过程中的配置如下:

    spark-defaults.conf

    spark.eventlog.enabled  true spark.eventlog.dir      hdfs://hadoop000:8020/directory spark.eventlog.compress true

    spark-env.sh

    export spark_history_opts="-dspark.history.ui.port=7777 -dspark.history.retainedapplications=3 -dspark.history.fs.logdirectory=hdfs://had oop000:8020/directory"

    参数描述:

    spark.history.ui.port=7777  调整webui访问的端口号为7777

    spark.history.fs.logdirectory=hdfs://hadoop000:8020/directory  配置了该属性后,在start-history-server.sh时就无需再显示的指定路径

    spark.history.retainedapplications=3   指定保存application历史记录的个数,如果超过这个值,旧的应用程序信息将被删除

     

    调整参数后启动start-history-server.sh

    start-history-server.sh 

    访问webui: http://hadoop000:7777

     

    在使用spark history server的过程中产生的几个疑问:

    疑问1:spark.history.fs.logdirectory和spark.eventlog.dir指定目录有啥区别?

    经测试后发现:

    spark.eventlog.dir:application在运行过程中所有的信息均记录在该属性指定的路径下;

    spark.history.fs.logdirectory:spark history server页面只展示该指定路径下的信息;

    比如:spark.eventlog.dir刚开始时指定的是hdfs://hadoop000:8020/directory,而后修改成hdfs://hadoop000:8020/directory2

    那么spark.history.fs.logdirectory如果指定的是hdfs://hadoop000:8020/directory,就只能显示出该目录下的所有application运行的日志信息;反之亦然。

     

    疑问2:spark.history.retainedapplications=3 貌似没生效??????

    the history server will list all applications. it will just retain a max number of them in memory. that option does not control how many applications are show, it controls how much memory the hs will need.

    注意:该参数并不是也页面中显示的application的记录数,而是存放在内存中的个数,内存中的信息在访问页面时直接读取渲染既可;

    比如说该参数配置了10个,那么内存中就最多只能存放10个applicaiton的日志信息,当第11个加入时,第一个就会被踢除,当再次访问第1个application的页面信息时就需要重新读取指定路径上的日志信息来渲染展示页面。 

    详见官方文档:http://spark.apache.org/docs/latest/monitoring.html



    simone 2016-05-26 14:12 发表评论
    ]]>
    spark on yarn中spark.yarn.jar属性的使用http://www.blogjava.net/wangxinsh55/archive/2016/05/26/430664.htmlsimonesimonethu, 26 may 2016 06:11:00 gmthttp://www.blogjava.net/wangxinsh55/archive/2016/05/26/430664.htmlhttp://www.blogjava.net/wangxinsh55/comments/430664.htmlhttp://www.blogjava.net/wangxinsh55/archive/2016/05/26/430664.html#feedback0http://www.blogjava.net/wangxinsh55/comments/commentrss/430664.htmlhttp://www.blogjava.net/wangxinsh55/services/trackbacks/430664.htmlhttp://www.cnblogs.com/luogankun/p/4191796.html

    今天在测试spark-sql运行在yarn上的过程中,无意间从日志中发现了一个问题:

    spark-sql --master yarn
    14/12/29 15:23:17 info client: requesting a new application from cluster with 1 nodemanagers 14/12/29 15:23:17 info client: verifying our application has not requested more than the maximum memory capability of the cluster (8192 mb per container) 14/12/29 15:23:17 info client: will allocate am container, with 896 mb memory including 384 mb overhead 14/12/29 15:23:17 info client: setting up container launch context for our am 14/12/29 15:23:17 info client: preparing resources for our am container 14/12/29 15:23:17 info client: uploading resource file:/home/spark/software/source/compile/deploy_spark/assembly/target/scala-2.10/spark-assembly-1.3.0-snapshot-hadoop2.3.0-cdh5.0.0.jar -> hdfs://hadoop000:8020/user/spark/.sparkstaging/application_1416381870014_0093/spark-assembly-1.3.0-snapshot-hadoop2.3.0-cdh5.0.0.jar 14/12/29 15:23:18 info client: setting up the launch environment for our am container

    再开启一个spark-sql命令行,从日志中再次发现:

    14/12/29 15:24:03 info client: requesting a new application from cluster with 1 nodemanagers 14/12/29 15:24:03 info client: verifying our application has not requested more than the maximum memory capability of the cluster (8192 mb per container) 14/12/29 15:24:03 info client: will allocate am container, with 896 mb memory including 384 mb overhead 14/12/29 15:24:03 info client: setting up container launch context for our am 14/12/29 15:24:03 info client: preparing resources for our am container 14/12/29 15:24:03 info client: uploading resource file:/home/spark/software/source/compile/deploy_spark/assembly/target/scala-2.10/spark-assembly-1.3.0-snapshot-hadoop2.3.0-cdh5.0.0.jar -> hdfs://hadoop000:8020/user/spark/.sparkstaging/application_1416381870014_0094/spark-assembly-1.3.0-snapshot-hadoop2.3.0-cdh5.0.0.jar 14/12/29 15:24:05 info client: setting up the launch environment for our am container

    然后查看hdfs上的文件:

    hadoop fs -ls hdfs://hadoop000:8020/user/spark/.sparkstaging/
    drwx------   - spark supergroup          0 2014-12-29 15:23 hdfs://hadoop000:8020/user/spark/.sparkstaging/application_1416381870014_0093 drwx------   - spark supergroup          0 2014-12-29 15:24 hdfs://hadoop000:8020/user/spark/.sparkstaging/application_1416381870014_0094

    每个application都会上传一个spark-assembly-x.x.x-snapshot-hadoopx.x.x-cdhx.x.x.jar的jar包,影响hdfs的性能以及占用hdfs的空间。

     

    在spark文档(http://spark.apache.org/docs/latest/running-on-yarn.html)中发现spark.yarn.jar属性,将spark-assembly-xxxxx.jar存放在hdfs://hadoop000:8020/spark_lib/下

    在spark-defaults.conf添加属性配置:

    spark.yarn.jar hdfs://hadoop000:8020/spark_lib/spark-assembly-1.3.0-snapshot-hadoop2.3.0-cdh5.0.0.jar

    再次启动spark-sql --master yarn观察日志:

    14/12/29 15:39:02 info client: requesting a new application from cluster with 1 nodemanagers 14/12/29 15:39:02 info client: verifying our application has not requested more than the maximum memory capability of the cluster (8192 mb per container) 14/12/29 15:39:02 info client: will allocate am container, with 896 mb memory including 384 mb overhead 14/12/29 15:39:02 info client: setting up container launch context for our am 14/12/29 15:39:02 info client: preparing resources for our am container 14/12/29 15:39:02 info client: source and destination file systems are the same. not copying hdfs://hadoop000:8020/spark_lib/spark-assembly-1.3.0-snapshot-hadoop2.3.0-cdh5.0.0.jar 14/12/29 15:39:02 info client: setting up the launch environment for our am container

    观察hdfs上文件

    hadoop fs -ls hdfs://hadoop000:8020/user/spark/.sparkstaging/application_1416381870014_0097

    该application对应的目录下没有spark-assembly-xxxxx.jar了,从而节省assembly包上传的过程以及hdfs空间占用。

     

    我在测试过程中遇到了类似如下的错误:

    application application_xxxxxxxxx_yyyy failed 2 times due to am container for application_xxxxxxxxx_yyyy 

    exited with exitcode: -1000 due to: java.io.filenotfoundexception: file /tmp/hadoop-spark/nm-local-dir/filecache does not exist

    在/tmp/hadoop-spark/nm-local-dir路径下创建filecache文件夹即可解决报错问题。



    simone 2016-05-26 14:11 发表评论
    ]]>
    benchmarking apache kafka: 2 million writes per second (on three cheap machines)http://www.blogjava.net/wangxinsh55/archive/2016/05/26/430663.htmlsimonesimonethu, 26 may 2016 05:53:00 gmthttp://www.blogjava.net/wangxinsh55/archive/2016/05/26/430663.htmlhttp://www.blogjava.net/wangxinsh55/comments/430663.htmlhttp://www.blogjava.net/wangxinsh55/archive/2016/05/26/430663.html#feedback0http://www.blogjava.net/wangxinsh55/comments/commentrss/430663.htmlhttp://www.blogjava.net/wangxinsh55/services/trackbacks/430663.htmlhttps://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines

    wrote a  about how linkedin uses  as a central publish-subscribe log for integrating data between applications, stream processing, and hadoop data ingestion.

    to actually make this work, though, this "universal log" has to be a cheap abstraction. if you want to use a system as a central data hub it has to be fast, predictable, and easy to scale so you can dump all your data onto it. my experience has been that systems that are fragile or expensive inevitably develop a wall of protective process to prevent people from using them; a system that scales easily often ends up as a key architectural building block just because using it is the easiest way to get things built.

    i've always liked the benchmarks of cassandra that show it doing a million writes per second on three hundred machines on and . i'm not sure why, maybe it is a  thing, but doing  of anything per second is fun.

    in any case, one of the nice things about a kafka log is that, as we'll see, it is cheap. a million writes per second isn't a particularly big thing. this is because a log is a much simpler thing than a database or key-value store. indeed our production clusters take tens of millions of reads and writes per second all day long and they do so on pretty modest hardware.

    but let's do some benchmarking and take a look.

    kafka in 30 seconds

    to help understand the benchmark, let me give a quick review of what kafka is and a few details about how it works. kafka is a distributed messaging system originally built at linkedin and now part of the  and  by a .

    the general setup is quite simple. producers send records to the cluster which holds on to these records and hands them out to consumers:

    the key abstraction in kafka is the topic. producers publish their records to a topic, and consumers subscribe to one or more topics. a kafka topic is just a sharded write-ahead log. producers append records to these logs and consumers subscribe to changes. each record is a key/value pair. the key is used for assigning the record to a log partition (unless the publisher specifies the partition directly).

    here is a simple example of a single producer and consumer reading and writing from a two-partition topic.

    this picture shows a producer process appending to the logs for the two partitions, and a consumer reading from the same logs. each record in the log has an associated entry number that we call the offset. this offset is used by the consumer to describe it's position in each of the logs.

    these partitions are spread across a cluster of machines, allowing a topic to hold more data than can fit on any one machine.

    note that unlike most messaging systems the log is always persistent. messages are immediately written to the filesystem when they are received. messages are not deleted when they are read but retained with some configurable sla (say a few days or a week). this allows usage in situations where the consumer of data may need to reload data. it also makes it possible to support space-efficient publish-subscribe as there is a single shared log no matter how many consumers; in traditional messaging systems there is usually a queue per consumer, so adding a consumer doubles your data size. this makes kafka a good fit for things outside the bounds of normal messaging systems such as acting as a pipeline for offline data systems such as hadoop. these offline systems may load only at intervals as part of a periodic etl cycle, or may go down for several hours for maintenance, during which time kafka is able to buffer even tbs of unconsumed data if needed.

    kafka also replicates its logs over multiple servers for fault-tolerance. one important architectural aspect of our , in contrast to other messaging systems, is that replication is not an exotic bolt-on that requires complex configuration, only to be used in very specialized cases. instead replication is assumed to be the default: we treat un-replicated data as a special case where the replication factor happens to be one.

    producers get an acknowledgement back when they publish a message containing the record's offset. the first record published to a partition is given the offset 0, the second record 1, and so on in an ever-increasing sequence. consumers consume data from a position specified by an offset, and they save their position in a log by committing periodically: saving this offset in case that consumer instance crashes and another instance needs to resume from it's position.

    okay, hopefully that all made sense (if not, you can read a more complete introduction to kafka ).

    this benchmark

    this test is against trunk, as i made some improvements to the performance tests for this benchmark. but nothing too substantial has changed since the last full release, so you should see similar results with . i am also using our newly re-written , which offers much improved throughput over the previous producer client.

    i've followed the basic template of this very nice , but i covered scenarios and options that were more relevant to kafka.

    one quick philosophical note on this benchmark. for benchmarks that are going to be publicly reported, i like to follow a style i call "lazy benchmarking". when you work on a system, you generally have the know-how to tune it to perfection for any particular use case. this leads to a kind of benchmarketing where you heavily tune your configuration to your benchmark or worse have a different tuning for each scenario you test. i think the real test of a system is not how it performs when perfectly tuned, but rather how it performs "off the shelf". this is particularly true for systems that run in a multi-tenant setup with dozens or hundreds of use cases where tuning for each use case would be not only impractical but impossible. as a result, i have pretty much stuck with default settings, both for the server and the clients. i will point out areas where i suspect the result could be improved with a little tuning, but i have tried to resist the temptation to do any fiddling myself to improve the results.

    i have posted , so it should be possible to replicate results on your own gear if you are interested.

    the setup

    for these tests, i had six machines each has the following specs

    • intel xeon 2.5 ghz processor with six cores
    • six 7200 rpm sata drives
    • 32gb of ram
    • 1gb ethernet

    the kafka cluster is set up on three of the machines. the six drives are directly mounted with no raid (jbod style). the remaining three machines i use for zookeeper and for generating load.

    a three machine cluster isn't very big, but since we will only be testing up to a replication factor of three, it is all we need. as should be obvious, we can always add more partitions and spread data onto more machines to scale our cluster horizontally.

    this hardware is actually not linkedin's normal kafka hardware. our kafka machines are more closely tuned to running kafka, but are less in the spirit of "off-the-shelf" i was aiming for with these tests. instead, i borrowed these from one of our hadoop clusters, which runs on probably the cheapest gear of any of our persistent systems. hadoop usage patterns are pretty similar to kafka's, so this is a reasonable thing to do.

    okay, without further ado, the results!

    producer throughput

    these tests will stress the throughput of the producer. no consumers are run during these tests, so all messages are persisted but not read (we'll test cases with both producer and consumer in a bit). since we have recently rewritten our producer, i am testing this new code.

    single producer thread, no replication

    821,557 records/sec
    (78.3 mb/sec)

    for this first test i create a topic with six partitions and no replication. then i produce 50 million small (100 byte) records as quickly as possible from a single thread.

    the reason for focusing on small records in these tests is that it is the harder case for a messaging system (generally). it is easy to get good throughput in mb/sec if the messages are large, but much harder to get good throughput when the messages are small, as the overhead of processing each message dominates.

    throughout this benchmark, when i am reporting mb/sec, i am reporting just the value size of the record times the request per second, none of the other overhead of the request is included. so the actually network usage is higher than what is reported. for example with a 100 byte message we would also transmit about 22 bytes of overhead per message (for an optional key, size delimiting, a message crc, the record offset, and attributes flag), as well as some overhead for the request (including the topic, partition, required acknowledgements, etc). this makes it a little harder to see where we hit the limits of the nic, but this seems a little more reasonable then including our own overhead bytes in throughput numbers. so, in the above result, we are likely saturating the 1 gigabit nic on the client machine.

    one immediate observation is that the raw numbers here are much higher than people expect, especially for a persistent storage system. if you are used to random-access data systems, like a database or key-value store, you will generally expect maximum throughput around 5,000 to 50,000 queries-per-second, as this is close to the speed that a good rpc layer can do remote requests. we exceed this due to two key design principles:

    1. we work hard to ensure we do linear disk i/o. the six cheap disks these servers have gives an aggregate throughput of 822 mb/sec of linear disk i/o. this is actually well beyond what we can make use of with only a 1 gigabit network card. many messaging systems treat persistence as an expensive add-on that decimates performance and should be used only sparingly, but this is because they are not able to do linear i/o.
    2. at each stage we work on batching together small bits of data into larger network and disk i/o operations. for example, in the new producer we use a "group commit"-like mechanism to ensure that any record sends initiated while another i/o is in progress get grouped together. for more on understanding the importance of batching, check out this presentation by david patterson on why .

    if you are interested in the details you can read a little more about this in our .

    single producer thread, 3x asynchronous replication

    786,980 records/sec
    (75.1 mb/sec)

    this test is exactly the same as the previous one except that now each partition has three replicas (so the total data written to network or disk is three times higher). each server is doing both writes from the producer for the partitions for which it is a master, as well as fetching and writing data for the partitions for which it is a follower.

    replication in this test is asynchronous. that is, the server acknowledges the write as soon as it has written it to its local log without waiting for the other replicas to also acknowledge it. this means, if the master were to crash, it would likely lose the last few messages that had been written but not yet replicated. this makes the message acknowledgement latency a little better at the cost of some risk in the case of server failure.

    the key take away i would like people to have from this is that replication can be fast. the total cluster write capacity is, of course, 3x less with 3x replication (since each write is done three times), but the throughput is still quite good per client. high performance replication comes in large part from the efficiency of our consumer (the replicas are really nothing more than a specialized consumer) which i will discuss in the consumer section.

    single producer thread, 3x synchronous replication

    421,823 records/sec
    (40.2 mb/sec)

    this test is the same as above except that now the master for a partition waits for acknowledgement from the full set of in-sync replicas before acknowledging back to the producer. in this mode, we guarantee that messages will not be lost as long as one in-sync replica remains.

    synchronous replication in kafka is not fundamentally very different from asynchronous replication. the leader for a partition always tracks the progress of the follower replicas to monitor their liveness, and we never give out messages to consumers until they are fully acknowledged by replicas. with synchronous replication we just wait to respond to the producer request until the followers have replicated it.

    this additional latency does seem to affect our throughput. since the code path on the server is very similar, we could probably ameliorate this impact by tuning the batching to be a bit more aggressive and allowing the client to buffer more outstanding requests. however, in spirit of avoiding special case tuning, i have avoided this.

    three producers, 3x async replication

    2,024,032 records/sec
    (193.0 mb/sec)

    our single producer process is clearly not stressing our three node cluster. to add a little more load, i'll now repeat the previous async replication test, but now use three producer load generators running on three different machines (running more processes on the same machine won't help as we are saturating the nic). then we can look at the aggregate throughput across these three producers to get a better feel for the cluster's aggregate capacity.

    producer throughput versus stored data

    one of the hidden dangers of many messaging systems is that they work well only as long as the data they retain fits in memory. their throughput falls by an order of magnitude (or more) when data backs up and isn't consumed (and hence needs to be stored on disk). this means things may be running fine as long as your consumers keep up and the queue is empty, but as soon as they lag, the whole messaging layer backs up with unconsumed data. the backup causes data to go to disk which in turns causes performance to drop to a rate that means messaging system can no longer keep up with incoming data and either backs up or falls over. this is pretty terrible, as in many cases the whole purpose of the queue was to handle such a case gracefully.

    since kafka always persists messages the performance is o(1) with respect to unconsumed data volume.

    to test this experimentally, let's run our throughput test over an extended period of time and graph the results as the stored dataset grows:

    this graph actually does show some variance in performance, but no impact due to data size: we perform just as well after writing a tb of data, as we do for the first few hundred mbs.

    the variance seems to be due to linux's i/o management facilities that batch data and then flush it periodically. this is something we have tuned for a little better on our production kafka setup. some notes on tuning i/o are available .

    consumer throughput

    okay now let's turn our attention to consumer throughput.

    note that the replication factor will not effect the outcome of this test as the consumer only reads from one replica regardless of the replication factor. likewise, the acknowledgement level of the producer also doesn't matter as the consumer only ever reads fully acknowledged messages, (even if the producer doesn't wait for full acknowledgement). this is to ensure that any message the consumer sees will always be present after a leadership handoff (if the current leader fails).

    single consumer

    940,521 records/sec
    (89.7 mb/sec)

    for the first test, we will consume 50 million messages in a single thread from our 6 partition 3x replicated topic.

    kafka's consumer is very efficient. it works by fetching chunks of log directly from the filesystem. it uses the  to transfer this directly through the operating system without the overhead of copying this data through the application. this test actually starts at the beginning of the log, so it is doing real read i/o. in a production setting, though, the consumer reads almost exclusively out of the os pagecache, since it is reading data that was just written by some producer (so it is still cached). in fact, if you run i/o stat on a production server you actually see that there are no physical reads at all even though a great deal of data is being consumed.

    making consumers cheap is important for what we want kafka to do. for one thing, the replicas are themselves consumers, so making the consumer cheap makes replication cheap. in addition, this makes handling out data an inexpensive operation, and hence not something we need to tightly control for scalability reasons.

    three consumers

    2,615,968 records/sec
    (249.5 mb/sec)

    let's repeat the same test, but run three parallel consumer processes, each on a different machine, and all consuming the same topic.

    as expected, we see near linear scaling (not surprising because consumption in our model is so simple).

    producer and consumer

    795,064 records/sec
    (75.8 mb/sec)

    the above tests covered just the producer and the consumer running in isolation. now let's do the natural thing and run them together. actually, we have technically already been doing this, since our replication works by having the servers themselves act as consumers.

    all the same, let's run the test. for this test we'll run one producer and one consumer on a six partition 3x replicated topic that begins empty. the producer is again using async replication. the throughput reported is the consumer throughput (which is, obviously, an upper bound on the producer throughput).

    as we would expect, the results we get are basically the same as we saw in the producer only case—the consumer is fairly cheap.

    effect of message size

    i have mostly shown performance on small 100 byte messages. smaller messages are the harder problem for a messaging system as they magnify the overhead of the bookkeeping the system does. we can show this by just graphing throughput in both records/second and mb/second as we vary the record size.

    so, as we would expect, this graph shows that the raw count of records we can send per second decreases as the records get bigger. but if we look at mb/second, we see that the total byte throughput of real user data increases as messages get bigger:

    we can see that with the 10 byte messages we are actually cpu bound by just acquiring the lock and enqueuing the message for sending—we are not able to actually max out the network. however, starting with 100 bytes, we are actually seeing network saturation (though the mb/sec continues to increase as our fixed-size bookkeeping bytes become an increasingly small percentage of the total bytes sent).

    end-to-end latency

    2 ms (median)
    3 ms (99th percentile)
    14 ms (99.9th percentile)

    we have talked a lot about throughput, but what is the latency of message delivery? that is, how long does it take a message we send to be delivered to the consumer? for this test, we will create producer and consumer and repeatedly time how long it takes for a producer to send a message to the kafka cluster and then be received by our consumer.

    note that, kafka only gives out messages to consumers when they are acknowledged by the full in-sync set of replicas. so this test will give the same results regardless of whether we use sync or async replication, as that setting only affects the acknowledgement to the producer.

    replicating this test

    if you want to try out these benchmarks on your own machines, you can. as i said, i mostly just used our pre-packaged performance testing tools that ship with kafka and mostly stuck with the default configs both for the server and for the clients. however, you can see more details of the configuration and commands .




    simone 2016-05-26 13:53 发表评论
    ]]>
    kafka 高性能吞吐揭秘http://www.blogjava.net/wangxinsh55/archive/2016/05/26/430662.htmlsimonesimonethu, 26 may 2016 05:52:00 gmthttp://www.blogjava.net/wangxinsh55/archive/2016/05/26/430662.htmlhttp://www.blogjava.net/wangxinsh55/comments/430662.htmlhttp://www.blogjava.net/wangxinsh55/archive/2016/05/26/430662.html#feedback0http://www.blogjava.net/wangxinsh55/comments/commentrss/430662.htmlhttp://www.blogjava.net/wangxinsh55/services/trackbacks/430662.htmlhttp://umeng.baijia.baidu.com/article/227913

    本文将针对kafka性能方面进行简单分析,首先简单介绍一下kafka的架构和涉及到的名词:

    1.    topic:用于划分message的逻辑概念,一个topic可以分布在多个broker上。

    2.    partition:是kafka中横向扩展和一切并行化的基础,每个topic都至少被切分为1个partition。

    3.    offset:消息在partition中的编号,编号顺序不跨partition。

    4.    consumer:用于从broker中取出/消费message。

    5.    producer:用于往broker中发送/生产message。

    6.    replication:kafka支持以partition为单位对message进行冗余备份,每个partition都可以配置至少1个replication(当仅1个replication时即仅该partition本身)。

    7.    leader:每个replication集合中的partition都会选出一个唯一的leader,所有的读写请求都由leader处理。其他replicas从leader处把数据更新同步到本地,过程类似大家熟悉的mysql中的binlog同步。

    8.    broker:kafka中使用broker来接受producer和consumer的请求,并把message持久化到本地磁盘。每个cluster当中会选举出一个broker来担任controller,负责处理partition的leader选举,协调partition迁移等工作。

    9.    isr(in-sync replica):是replicas的一个子集,表示目前alive且与leader能够“catch-up”的replicas集合。由于读写都是首先落到leader上,所以一般来说通过同步机制从leader上拉取数据的replica都会和leader有一些延迟(包括了延迟时间和延迟条数两个维度),任意一个超过阈值都会把该replica踢出isr。每个partition都有它自己独立的isr。

    以上几乎是我们在使用kafka的过程中可能遇到的所有名词,同时也无一不是最核心的概念或组件,感觉到从设计本身来说,kafka还是足够简洁的。这次本文围绕kafka优异的吞吐性能,逐个介绍一下其设计与实现当中所使用的各项“黑科技”。

    broker

    不同于redis和memcacheq等内存消息队列,kafka的设计是把所有的message都要写入速度低容量大的硬盘,以此来换取更强的存储能力。实际上,kafka使用硬盘并没有带来过多的性能损失,“规规矩矩”的抄了一条“近道”。

    首先,说“规规矩矩”是因为kafka在磁盘上只做sequence i/o,由于消息系统读写的特殊性,这并不存在什么问题。关于磁盘i/o的性能,引用一组kafka官方给出的测试数据(raid-5,7200rpm):

    sequence i/o: 600mb/s

    random i/o: 100kb/s

    所以通过只做sequence i/o的限制,规避了磁盘访问速度低下对性能可能造成的影响。

    接下来我们再聊一聊kafka是如何“抄近道的”。

    首先,kafka重度依赖底层操作系统提供的pagecache功能。当上层有写操作时,操作系统只是将数据写入pagecache,同时标记page属性为dirty。当读操作发生时,先从pagecache中查找,如果发生缺页才进行磁盘调度,最终返回需要的数据。实际上pagecache是把尽可能多的空闲内存都当做了磁盘缓存来使用。同时如果有其他进程申请内存,回收pagecache的代价又很小,所以现代的os都支持pagecache。

    使用pagecache功能同时可以避免在jvm内部缓存数据,jvm为我们提供了强大的gc能力,同时也引入了一些问题不适用与kafka的设计。

    ·         如果在heap内管理缓存,jvm的gc线程会频繁扫描heap空间,带来不必要的开销。如果heap过大,执行一次full gc对系统的可用性来说将是极大的挑战。

    ·         所有在在jvm内的对象都不免带有一个object overhead(千万不可小视),内存的有效空间利用率会因此降低。

    ·         所有的in-process cache在os中都有一份同样的pagecache。所以通过将缓存只放在pagecache,可以至少让可用缓存空间翻倍。

    ·         如果kafka重启,所有的in-process cache都会失效,而os管理的pagecache依然可以继续使用。

    pagecache还只是第一步,kafka为了进一步的优化性能还采用了sendfile技术。在解释sendfile之前,首先介绍一下传统的网络i/o操作流程,大体上分为以下4步。

    1.    os 从硬盘把数据读到内核区的pagecache。

    2.    用户进程把数据从内核区copy到用户区。

    3.    然后用户进程再把数据写入到socket,数据流入内核区的socket buffer上。

    4.    os 再把数据从buffer中copy到网卡的buffer上,这样完成一次发送。

    整个过程共经历两次context switch,四次system call。同一份数据在内核buffer与用户buffer之间重复拷贝,效率低下。其中2、3两步没有必要,完全可以直接在内核区完成数据拷贝。这也正是sendfile所解决的问题,经过sendfile优化后,整个i/o过程就变成了下面这个样子。

    通过以上的介绍不难看出,kafka的设计初衷是尽一切努力在内存中完成数据交换,无论是对外作为一整个消息系统,或是内部同底层操作系统的交互。如果producer和consumer之间生产和消费进度上配合得当,完全可以实现数据交换零i/o。这也就是我为什么说kafka使用“硬盘”并没有带来过多性能损失的原因。下面是我在生产环境中采到的一些指标。

    (20 brokers, 75 partitions per broker, 110k msg/s)

    此时的集群只有写,没有读操作。10m/s左右的send的流量是partition之间进行replicate而产生的。从recv和writ的速率比较可以看出,写盘是使用asynchronous batch的方式,底层os可能还会进行磁盘写顺序优化。而在有read request进来的时候分为两种情况,第一种是内存中完成数据交换。

    send流量从平均10m/s增加到了到平均60m/s,而磁盘read只有不超过50kb/s。pagecache降低磁盘i/o效果非常明显。

    接下来是读一些收到了一段时间,已经从内存中被换出刷写到磁盘上的老数据。

    其他指标还是老样子,而磁盘read已经飚高到40 mb/s。此时全部的数据都已经是走硬盘了(对硬盘的顺序读取os层会进行prefill pagecache的优化)。依然没有任何性能问题。

    tips

    1.    kafka官方并不建议通过broker端的log.flush.interval.messages和log.flush.interval.ms来强制写盘,认为数据的可靠性应该通过replica来保证,而强制flush数据到磁盘会对整体性能产生影响。

    2.    可以通过调整/proc/sys/vm/dirty_background_ratio和/proc/sys/vm/dirty_ratio来调优性能。

    a.    脏页率超过第一个指标会启动pdflush开始flush dirty pagecache。

    b.    脏页率超过第二个指标会阻塞所有的写操作来进行flush。

    c.    根据不同的业务需求可以适当的降低dirty_background_ratio和提高dirty_ratio。

    partition

    partition是kafka可以很好的横向扩展和提供高并发处理以及实现replication的基础。

    扩展性方面。首先,kafka允许partition在集群内的broker之间任意移动,以此来均衡可能存在的数据倾斜问题。其次,partition支持自定义的分区算法,例如可以将同一个key的所有消息都路由到同一个partition上去。 同时leader也可以在in-sync的replica中迁移。由于针对某一个partition的所有读写请求都是只由leader来处理,所以kafka会尽量把leader均匀的分散到集群的各个节点上,以免造成网络流量过于集中。

    并发方面。任意partition在某一个时刻只能被一个consumer group内的一个consumer消费(反过来一个consumer则可以同时消费多个partition),kafka非常简洁的offset机制最小化了broker和consumer之间的交互,这使kafka并不会像同类其他消息队列一样,随着下游consumer数目的增加而成比例的降低性能。此外,如果多个consumer恰巧都是消费时间序上很相近的数据,可以达到很高的pagecache命中率,因而kafka可以非常高效的支持高并发读操作,实践中基本可以达到单机网卡上限。

    不过,partition的数量并不是越多越好,partition的数量越多,平均到每一个broker上的数量也就越多。考虑到broker宕机(network failure, full gc)的情况下,需要由controller来为所有宕机的broker上的所有partition重新选举leader,假设每个partition的选举消耗10ms,如果broker上有500个partition,那么在进行选举的5s的时间里,对上述partition的读写操作都会触发leadernotavailableexception。

    再进一步,如果挂掉的broker是整个集群的controller,那么首先要进行的是重新任命一个broker作为controller。新任命的controller要从zookeeper上获取所有partition的meta信息,获取每个信息大概3-5ms,那么如果有10000个partition这个时间就会达到30s-50s。而且不要忘记这只是重新启动一个controller花费的时间,在这基础上还要再加上前面说的选举leader的时间 -_-!!!!!!

    此外,在broker端,对producer和consumer都使用了buffer机制。其中buffer的大小是统一配置的,数量则与partition个数相同。如果partition个数过多,会导致producer和consumer的buffer内存占用过大。

    tips

    1.    partition的数量尽量提前预分配,虽然可以在后期动态增加partition,但是会冒着可能破坏message key和partition之间对应关系的风险。

    2.    replica的数量不要过多,如果条件允许尽量把replica集合内的partition分别调整到不同的rack。

    3.    尽一切努力保证每次停broker时都可以clean shutdown,否则问题就不仅仅是恢复服务所需时间长,还可能出现数据损坏或其他很诡异的问题。

    producer

    kafka的研发团队表示在0.8版本里用java重写了整个producer,据说性能有了很大提升。我还没有亲自对比试用过,这里就不做数据对比了。本文结尾的扩展阅读里提到了一套我认为比较好的对照组,有兴趣的同学可以尝试一下。

    其实在producer端的优化大部分消息系统采取的方式都比较单一,无非也就化零为整、同步变异步这么几种。

    kafka系统默认支持messageset,把多条message自动地打成一个group后发送出去,均摊后拉低了每次通信的rtt。而且在组织messageset的同时,还可以把数据重新排序,从爆发流式的随机写入优化成较为平稳的线性写入。

    此外,还要着重介绍的一点是,producer支持end-to-end的压缩。数据在本地压缩后放到网络上传输,在broker一般不解压(除非指定要deep-iteration),直至消息被consume之后在客户端解压。

    当然用户也可以选择自己在应用层上做压缩和解压的工作(毕竟kafka目前支持的压缩算法有限,只有gzip和snappy),不过这样做反而会意外的降低效率!!!! kafka的end-to-end压缩与messageset配合在一起工作效果最佳,上面的做法直接割裂了两者间联系。至于道理其实很简单,压缩算法中一条基本的原理“重复的数据量越多,压缩比越高”。无关于消息体的内容,无关于消息体的数量,大多数情况下输入数据量大一些会取得更好的压缩比。

    不过kafka采用messageset也导致在可用性上一定程度的妥协。每次发送数据时,producer都是send()之后就认为已经发送出去了,但其实大多数情况下消息还在内存的messageset当中,尚未发送到网络,这时候如果producer挂掉,那就会出现丢数据的情况。

    为了解决这个问题,kafka在0.8版本的设计借鉴了网络当中的ack机制。如果对性能要求较高,又能在一定程度上允许message的丢失,那就可以设置request.required.acks=0 来关闭ack,以全速发送。如果需要对发送的消息进行确认,就需要设置request.required.acks为1或-1,那么1和-1又有什么区别呢?这里又要提到前面聊的有关replica数量问题。如果配置为1,表示消息只需要被leader接收并确认即可,其他的replica可以进行异步拉取无需立即进行确认,在保证可靠性的同时又不会把效率拉得很低。如果设置为-1,表示消息要commit到该partition的isr集合中的所有replica后,才可以返回ack,消息的发送会更安全,而整个过程的延迟会随着replica的数量正比增长,这里就需要根据不同的需求做相应的优化。

    tips

    1.    producer的线程不要配置过多,尤其是在mirror或者migration中使用的时候,会加剧目标集群partition消息乱序的情况(如果你的应用场景对消息顺序很敏感的话)。

    2.    0.8版本的request.required.acks默认是0(同0.7)。

    consumer

    consumer端的设计大体上还算是比较常规的。

    ·         通过consumer group,可以支持生产者消费者和队列访问两种模式。

    ·         consumer api分为high level和low level两种。前一种重度依赖zookeeper,所以性能差一些且不自由,但是超省心。第二种不依赖zookeeper服务,无论从自由度和性能上都有更好的表现,但是所有的异常(leader迁移、offset越界、broker宕机等)和offset的维护都需要自行处理。

    ·         大家可以关注下不日发布的0.9 release。开发人员又用java重写了一套consumer。把两套api合并在一起,同时去掉了对zookeeper的依赖。据说性能有大幅度提升哦~~

    tips

    强烈推荐使用low level api,虽然繁琐一些,但是目前只有这个api可以对error数据进行自定义处理,尤其是处理broker异常或由于unclean shutdown导致的corrupted data时,否则无法skip只能等着“坏消息”在broker上被rotate掉,在此期间该replica将会一直处于不可用状态。

    扩展阅读

    sendfile: https://www.ibm.com/developerworks/cn/java/j-zerocopy/

    so what’s wrong with 1975 programming: https://www.varnish-cache.org/trac/wiki/architectnotes

    benchmarking: https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines




    simone 2016-05-26 13:52 发表评论
    ]]>
    java实现gif图片缩放与剪切功能http://www.blogjava.net/wangxinsh55/archive/2016/05/23/430621.htmlsimonesimonemon, 23 may 2016 06:40:00 gmthttp://www.blogjava.net/wangxinsh55/archive/2016/05/23/430621.htmlhttp://www.blogjava.net/wangxinsh55/comments/430621.htmlhttp://www.blogjava.net/wangxinsh55/archive/2016/05/23/430621.html#feedback0http://www.blogjava.net/wangxinsh55/comments/commentrss/430621.htmlhttp://www.blogjava.net/wangxinsh55/services/trackbacks/430621.htmlhttp://www.open-open.com/lib/view/open1394859355853.html
    package com.pinker.util;
    import java.awt.color;
    import java.awt.graphics;
    import java.awt.image;
    import java.awt.image.bufferedimage;
    import java.io.file;
    import java.io.ioexception;
    import java.util.arrays;
    import java.util.iterator;

    import javax.imageio.iioimage;
    import javax.imageio.imageio;
    import javax.imageio.imagereader;
    import javax.imageio.imagewriter;
    import javax.imageio.stream.imageinputstream;
    import javax.imageio.stream.imageoutputstream;
     
    /**
     * 图像裁剪以及压缩处理工具类
     *
     * 主要针对动态的gif格式图片裁剪之后,只出现一帧动态效果的现象提供凯发天生赢家一触即发官网的解决方案
     *
     * 提供依赖三方包凯发天生赢家一触即发官网的解决方案(针对gif格式数据特征一一解析,进行编码解码操作)
     * 提供基于jdk image i/o 的凯发天生赢家一触即发官网的解决方案(jdk探索失败)
     */
    public class imageutil2 {
     
        public enum image_format{
            bmp("bmp"),
            jpg("jpg"),
            wbmp("wbmp"),
            jpeg("jpeg"),
            png("png"),
            gif("gif");
             
            private string value;
            image_format(string value){
                this.value = value;
            }
            public string getvalue() {
                return value;
            }
            public void setvalue(string value) {
                this.value = value;
            }
        }
         
         
        /**
         * 获取图片格式
         * @param file   图片文件
         * @return    图片格式
         */
        public static string getimageformatname(file file)throws ioexception{
            string formatname = null;
             
            imageinputstream iis = imageio.createimageinputstream(file);
            iterator imagereader =  imageio.getimagereaders(iis);
            if(imagereader.hasnext()){
                imagereader reader = imagereader.next();
                formatname = reader.getformatname();
            }
     
            return formatname;
        }
         
        /*************************  基于三方包凯发天生赢家一触即发官网的解决方案    *****************************/
        /**
         * 剪切图片
         *
         * @param source        待剪切图片路径
         * @param targetpath    裁剪后保存路径(默认为源路径)
         * @param x                起始横坐标
         * @param y                起始纵坐标
         * @param width            剪切宽度
         * @param height        剪切高度         
         *
         * @returns            裁剪后保存路径(图片后缀根据图片本身类型生成)    
         * @throws ioexception
         */
        public static string cutimage(string sourcepath , string targetpath , int x , int y , int width , int height) throws ioexception{
            file file = new file(sourcepath);
            if(!file.exists()) {
                throw new ioexception("not found the image:" sourcepath);
            }
            if(null == targetpath || targetpath.isempty()) targetpath = sourcepath;
             
            string formatname = getimageformatname(file);
            if(null == formatname) return targetpath;
            formatname = formatname.tolowercase();
             
            // 防止图片后缀与图片本身类型不一致的情况
            string pathprefix = getpathwithoutsuffix(targetpath);
            targetpath = pathprefix formatname;
             
            // gif需要特殊处理
            if(image_format.gif.getvalue() == formatname){
                gifdecoder decoder = new gifdecoder();  
                int status = decoder.read(sourcepath);  
                if (status != gifdecoder.status_ok) {  
                    throw new ioexception("read image " sourcepath " error!");  
                }
     
                animatedgifencoder encoder = new animatedgifencoder();
                encoder.start(targetpath);
                encoder.setrepeat(decoder.getloopcount());  
                for (int i = 0; i < decoder.getframecount(); i ) {  
                    encoder.setdelay(decoder.getdelay(i));  
                    bufferedimage childimage = decoder.getframe(i);
                    bufferedimage image = childimage.getsubimage(x, y, width, height);
                    encoder.addframe(image);  
                }  
                encoder.finish();
            }else{
                bufferedimage image = imageio.read(file);
                image = image.getsubimage(x, y, width, height);
                imageio.write(image, formatname, new file(targetpath));
            }
            //普通图片
            bufferedimage image = imageio.read(file);
            image = image.getsubimage(x, y, width, height);
            imageio.write(image, formatname, new file(targetpath));
            
            return targetpath;
        }
         
        /**
         * 压缩图片
         * @param sourcepath       待压缩的图片路径
         * @param targetpath    压缩后图片路径(默认为初始路径)
         * @param width            压缩宽度
         * @param height        压缩高度
         *
         * @returns                   裁剪后保存路径(图片后缀根据图片本身类型生成)    
         * @throws ioexception
         */
        public static string zoom(string sourcepath , string targetpath, int width , int height) throws ioexception{
            file file = new file(sourcepath);
            if(!file.exists()) {
                throw new ioexception("not found the image :" sourcepath);
            }
            if(null == targetpath || targetpath.isempty()) targetpath = sourcepath;
             
            string formatname = getimageformatname(file);
            if(null == formatname) return targetpath;
            formatname = formatname.tolowercase();
             
            // 防止图片后缀与图片本身类型不一致的情况
            string pathprefix = getpathwithoutsuffix(targetpath);
            targetpath = pathprefix formatname;
             
            // gif需要特殊处理
            if(image_format.gif.getvalue() == formatname){
                gifdecoder decoder = new gifdecoder();  
                int status = decoder.read(sourcepath);  
                if (status != gifdecoder.status_ok) {  
                    throw new ioexception("read image " sourcepath " error!");  
                }
     
                animatedgifencoder encoder = new animatedgifencoder();
                encoder.start(targetpath);
                encoder.setrepeat(decoder.getloopcount());  
                for (int i = 0; i < decoder.getframecount(); i ) {  
                    encoder.setdelay(decoder.getdelay(i));  
                    bufferedimage image = zoom(decoder.getframe(i), width , height);
                    encoder.addframe(image);  
                }  
                encoder.finish();
            }else{
                bufferedimage image = imageio.read(file);
                bufferedimage zoomimage = zoom(image , width , height);
                imageio.write(zoomimage, formatname, new file(targetpath));
            }
            bufferedimage image = imageio.read(file);
            bufferedimage zoomimage = zoom(image , width , height);
            imageio.write(zoomimage, formatname, new file(targetpath));
             
            return targetpath;
        }
         
        /*********************** 基于jdk 凯发天生赢家一触即发官网的解决方案     ********************************/
         
        /**
         * 读取图片
         * @param file 图片文件
         * @return     图片数据
         * @throws ioexception
         */
        public static bufferedimage[] readerimage(file file) throws ioexception{
            bufferedimage sourceimage = imageio.read(file);
            bufferedimage[] images = null;
            imageinputstream iis = imageio.createimageinputstream(file);
            iterator imagereaders = imageio.getimagereaders(iis);
            if(imagereaders.hasnext()){
                imagereader reader = imagereaders.next();
                reader.setinput(iis);
                int imagenumber = reader.getnumimages(true);
                images = new bufferedimage[imagenumber];
                for (int i = 0; i < imagenumber; i ) {
                    bufferedimage image = reader.read(i);
                    if(sourceimage.getwidth() > image.getwidth() || sourceimage.getheight() > image.getheight()){
                        image = zoom(image, sourceimage.getwidth(), sourceimage.getheight());
                    }
                    images[i] = image;
                }
                reader.dispose();
                iis.close();
            }
            return images;
        }
         
        /**
         * 根据要求处理图片
         *
         * @param images    图片数组
         * @param x            横向起始位置
         * @param y         纵向起始位置
         * @param width      宽度    
         * @param height    宽度
         * @return            处理后的图片数组
         * @throws exception
         */
        public static bufferedimage[] processimage(bufferedimage[] images , int x , int y , int width , int height) throws exception{
            if(null == images){
                return images;
            }
            bufferedimage[] oldimages = images;
            images = new bufferedimage[images.length];
            for (int i = 0; i < oldimages.length; i ) {
                bufferedimage image = oldimages[i];
                images[i] = image.getsubimage(x, y, width, height);
            }
            return images;
        }
         
        /**
         * 写入处理后的图片到file
         *
         * 图片后缀根据图片格式生成
         *
         * @param images        处理后的图片数据
         * @param formatname     图片格式
         * @param file            写入文件对象
         * @throws exception
         */
        public static void writerimage(bufferedimage[] images ,  string formatname , file file) throws exception{
            iterator imagewriters = imageio.getimagewritersbyformatname(formatname);
            if(imagewriters.hasnext()){
                imagewriter writer = imagewriters.next();
                string filename = file.getname();
                int index = filename.lastindexof(".");
                if(index > 0){
                    filename = filename.substring(0, index 1) formatname;
                }
                string pathprefix = getfileprefixpath(file.getpath());
                file outfile = new file(pathprefix filename);
                imageoutputstream ios = imageio.createimageoutputstream(outfile);
                writer.setoutput(ios);
                 
                if(writer.canwritesequence()){
                    writer.preparewritesequence(null);
                    for (int i = 0; i < images.length; i ) {
                        bufferedimage childimage = images[i];
                        iioimage image = new iioimage(childimage, null , null);
                        writer.writetosequence(image, null);
                    }
                    writer.endwritesequence();
                }else{
                    for (int i = 0; i < images.length; i ) {
                        writer.write(images[i]);
                    }
                }
                 
                writer.dispose();
                ios.close();
            }
        }
         
        /**
         * 剪切格式图片
         *
         * 基于jdk image i/o凯发天生赢家一触即发官网的解决方案
         *
         * @param sourcefile        待剪切图片文件对象
         * @param destfile                  裁剪后保存文件对象
         * @param x                    剪切横向起始位置
         * @param y                 剪切纵向起始位置
         * @param width              剪切宽度    
         * @param height            剪切宽度
         * @throws exception
         */
        public static void cutimage(file sourcefile , file destfile, int x , int y , int width , int height) throws exception{
            // 读取图片信息
            bufferedimage[] images = readerimage(sourcefile);
            // 处理图片
            images = processimage(images, x, y, width, height);
            // 获取文件后缀
            string formatname = getimageformatname(sourcefile);
            destfile = new file(getpathwithoutsuffix(destfile.getpath()) formatname);
     
            // 写入处理后的图片到文件
            writerimage(images, formatname , destfile);
        }
         
         
         
        /**
         * 获取系统支持的图片格式
         */
        public static void getossupportsstandardimageformat(){
            string[] readerformatname = imageio.getreaderformatnames();
            string[] readersuffixname = imageio.getreaderfilesuffixes();
            string[] readermimetype = imageio.getreadermimetypes();
            system.out.println("========================= os supports reader ========================");
            system.out.println("os supports reader format name :  " arrays.aslist(readerformatname));
            system.out.println("os supports reader suffix name :  " arrays.aslist(readersuffixname));
            system.out.println("os supports reader mime type :  " arrays.aslist(readermimetype));
             
            string[] writerformatname = imageio.getwriterformatnames();
            string[] writersuffixname = imageio.getwriterfilesuffixes();
            string[] writermimetype = imageio.getwritermimetypes();
             
            system.out.println("========================= os supports writer ========================");
            system.out.println("os supports writer format name :  " arrays.aslist(writerformatname));
            system.out.println("os supports writer suffix name :  " arrays.aslist(writersuffixname));
            system.out.println("os supports writer mime type :  " arrays.aslist(writermimetype));
        }
         
        /**
         * 压缩图片
         * @param sourceimage    待压缩图片
         * @param width          压缩图片高度
         * @param heigt          压缩图片宽度
         */
        private static bufferedimage zoom(bufferedimage sourceimage , int width , int height){
            bufferedimage zoomimage = new bufferedimage(width, height, sourceimage.gettype());
            image image = sourceimage.getscaledinstance(width, height, image.scale_smooth);
            graphics gc = zoomimage.getgraphics();
            gc.setcolor(color.white);
            gc.drawimage( image , 0, 0, null);
            return zoomimage;
        }
         
        /**
         * 获取某个文件的前缀路径
         *
         * 不包含文件名的路径
         *
         * @param file   当前文件对象
         * @return
         * @throws ioexception
         */
        public static string getfileprefixpath(file file) throws ioexception{
            string path = null;
            if(!file.exists()) {
                throw new ioexception("not found the file !" );
            }
            string filename = file.getname();
            path = file.getpath().replace(filename, "");
            return path;
        }
         
        /**
         * 获取某个文件的前缀路径
         *
         * 不包含文件名的路径
         *
         * @param path   当前文件路径
         * @return       不包含文件名的路径
         * @throws exception
         */
        public static string getfileprefixpath(string path) throws exception{
            if(null == path || path.isempty()) throw new exception("文件路径为空!");
            int index = path.lastindexof(file.separator);
            if(index > 0){
                path = path.substring(0, index 1);
            }
            return path;
        }
         
        /**
         * 获取不包含后缀的文件路径
         *
         * @param src
         * @return
         */
        public static string getpathwithoutsuffix(string src){
            string path = src;
            int index = path.lastindexof(".");
            if(index > 0){
                path = path.substring(0, index 1);
            }
            return path;
        }
         
        /**
         * 获取文件名
         * @param filepath        文件路径
         * @return                文件名
         * @throws ioexception
         */
        public static string getfilename(string filepath) throws ioexception{
            file file = new file(filepath);
            if(!file.exists()) {
                throw new ioexception("not found the file !" );
            }
            return file.getname();
        }
         
        /**
         * @param args
         * @throws exception
         */
        public static void main(string[] args) throws exception {
            // 获取系统支持的图片格式
            //imagecutterutil.getossupportsstandardimageformat();
             
            try {
                // 起始坐标,剪切大小
                int x = 100;
                int y = 75;
                int width = 100;
                int height = 100;
                // 参考图像大小
                int clientwidth = 300;
                int clientheight = 250;
                 
                 
                file file = new file("c:\\1.jpg");
                bufferedimage image = imageio.read(file);
                double destwidth = image.getwidth();
                double destheight = image.getheight();
                 
                if(destwidth < width || destheight < height)
                    throw new exception("源图大小小于截取图片大小!");
                 
                double widthratio = destwidth / clientwidth;
                double heightratio = destheight / clientheight;
                 
                x = double.valueof(x * widthratio).intvalue();
                y = double.valueof(y * heightratio).intvalue();
                width = double.valueof(width * widthratio).intvalue();
                height = double.valueof(height * heightratio).intvalue();
                 
                system.out.println("裁剪大小  x:" x ",y:" y ",width:" width ",height:" height);
     
                /************************ 基于三方包凯发天生赢家一触即发官网的解决方案 *************************/
                string formatname = getimageformatname(file);
                string pathsuffix = "." formatname;
                string pathprefix = getfileprefixpath(file);
                string targetpath = pathprefix  system.currenttimemillis() pathsuffix;
                targetpath = imageutil2.cutimage(file.getpath(), targetpath, x , y , width, height);
                 
                string bigtargetpath = pathprefix  system.currenttimemillis() pathsuffix;
                imageutil2.zoom(targetpath, bigtargetpath, 100, 100);
                 
                string smalltargetpath = pathprefix  system.currenttimemillis() pathsuffix;
                imageutil2.zoom(targetpath, smalltargetpath, 50, 50);
                 
                /************************ 基于jdk image i/o 凯发天生赢家一触即发官网的解决方案(jdk探索失败) *************************/
    //                file destfile = new file(targetpath);
    //                imagecutterutil.cutimage(file, destfile, x, y, width, height);
            } catch (ioexception e) {
                e.printstacktrace();
            }
        }
    }


    simone 2016-05-23 14:40 发表评论
    ]]>
    基于redis实现分布式锁 http://www.blogjava.net/wangxinsh55/archive/2016/05/12/430470.htmlsimonesimonethu, 12 may 2016 09:52:00 gmthttp://www.blogjava.net/wangxinsh55/archive/2016/05/12/430470.htmlhttp://www.blogjava.net/wangxinsh55/comments/430470.htmlhttp://www.blogjava.net/wangxinsh55/archive/2016/05/12/430470.html#feedback0http://www.blogjava.net/wangxinsh55/comments/commentrss/430470.htmlhttp://www.blogjava.net/wangxinsh55/services/trackbacks/430470.htmlhttp://chaopeng.me/blog/2014/01/26/redis-lock.html
    http://blog.csdn.net/ugg/article/details/41894947

    http://www.jeffkit.info/2011/07/1000/

    redis有一系列的命令,特点是以nx结尾,nx是not exists的缩写,如setnx命令就应该理解为:set if not exists。这系列的命令非常有用,这里讲使用setnx来实现分布式锁。

    用setnx实现分布式锁

    利用setnx非常简单地实现分布式锁。例如:某客户端要获得一个名字foo的锁,客户端使用下面的命令进行获取:

    setnx lock.foo

    •  如返回1,则该客户端获得锁,把lock.foo的键值设置为时间值表示该键已被锁定,该客户端最后可以通过del lock.foo来释放该锁。
    •  如返回0,表明该锁已被其他客户端取得,这时我们可以先返回或进行重试等对方完成或等待锁超时。

    解决死锁

    上面的锁定逻辑有一个问题:如果一个持有锁的客户端失败或崩溃了不能释放锁,该怎么解决?我们可以通过锁的键对应的时间戳来判断这种情况是否发生了,如果当前的时间已经大于lock.foo的值,说明该锁已失效,可以被重新使用。

    发生这种情况时,可不能简单的通过del来删除锁,然后再setnx一次,当多个客户端检测到锁超时后都会尝试去释放它,这里就可能出现一个竞态条件,让我们模拟一下这个场景:

    1.  c0操作超时了,但它还持有着锁,c1和c2读取lock.foo检查时间戳,先后发现超时了。
    2.  c1 发送del lock.foo
    3.  c1 发送setnx lock.foo 并且成功了。
    4.  c2 发送del lock.foo
    5.  c2 发送setnx lock.foo 并且成功了。

    这样一来,c1,c2都拿到了锁!问题大了!

    幸好这种问题是可以避免d,让我们来看看c3这个客户端是怎样做的:

    1. c3发送setnx lock.foo 想要获得锁,由于c0还持有锁,所以redis返回给c3一个0
    2. c3发送get lock.foo 以检查锁是否超时了,如果没超时,则等待或重试。
    3. 反之,如果已超时,c3通过下面的操作来尝试获得锁:
      getset lock.foo
    4. 通过getset,c3拿到的时间戳如果仍然是超时的,那就说明,c3如愿以偿拿到锁了。
    5. 如果在c3之前,有个叫c4的客户端比c3快一步执行了上面的操作,那么c3拿到的时间戳是个未超时的值,这时,c3没有如期获得锁,需要再次等待或重试。留意一下,尽管c3没拿到锁,但它改写了c4设置的锁的超时值,不过这一点非常微小的误差带来的影响可以忽略不计。

    注意:为了让分布式锁的算法更稳键些,持有锁的客户端在解锁之前应该再检查一次自己的锁是否已经超时,再去做del操作,因为可能客户端因为某个耗时的操作而挂起,操作完的时候锁因为超时已经被别人获得,这时就不必解锁了。

    示例伪代码

    根据上面的代码,我写了一小段fake代码来描述使用分布式锁的全过程:

    1. # get lock
    2. lock = 0
    3. while lock != 1:
    4.     timestamp = current unix time lock timeout 1
    5.     lock = setnx lock.foo timestamp
    6.     if lock == 1 or (now() > (get lock.foo) and now() > (getset lock.foo timestamp)):
    7.         break;
    8.     else:
    9.         sleep(10ms)
    10.  
    11. # do your job
    12. do_job()
    13.  
    14. # release
    15. if now() < get lock.foo:
    16.     del lock.foo

    是的,要想这段逻辑可以重用,使用python的你马上就想到了decorator,而用java的你是不是也想到了那谁?aop annotation?行,怎样舒服怎样用吧,别重复代码就行。



    背景
    在 很多互联网产品应用中,有些场景需要加锁处理,比如:秒杀,全局递增id,楼层生成等等。大部分的凯发天生赢家一触即发官网的解决方案是基于db实现的,redis为单进程单线程模 式,采用队列模式将并发访问变成串行访问,且多客户端对redis的连接并不存在竞争关系。其次redis提供一些命令setnx,getset,可以方 便实现分布式锁机制。

    redis命令介绍
    使用redis实现分布式锁,有两个重要函数需要介绍

    setnx命令(set if not exists)
    语法:
    setnx key value
    功能:
    当且仅当 key 不存在,将 key 的值设为 value ,并返回1;若给定的 key 已经存在,则 setnx 不做任何动作,并返回0。

    getset命令
    语法:
    getset key value
    功能:
    将给定 key 的值设为 value ,并返回 key 的旧值 (old value),当 key 存在但不是字符串类型时,返回一个错误,当key不存在时,返回nil。

    get命令
    语法:
    get key
    功能:
    返回 key 所关联的字符串值,如果 key 不存在那么返回特殊值 nil 。

    del命令
    语法:
    del key [key …]
    功能:
    删除给定的一个或多个 key ,不存在的 key 会被忽略。

    兵贵精,不在多。分布式锁,我们就依靠这四个命令。但在具体实现,还有很多细节,需要仔细斟酌,因为在分布式并发多进程中,任何一点出现差错,都会导致死锁,hold住所有进程。

    加锁实现

    setnx 可以直接加锁操作,比如说对某个关键词foo加锁,客户端可以尝试
    setnx foo.lock

    如果返回1,表示客户端已经获取锁,可以往下操作,操作完成后,通过
    del foo.lock

    命令来释放锁。
    如果返回0,说明foo已经被其他客户端上锁,如果锁是非堵塞的,可以选择返回调用。如果是堵塞调用调用,就需要进入以下个重试循环,直至成功获得锁或者重试超时。理想是美好的,现实是残酷的。仅仅使用setnx加锁带有竞争条件的,在某些特定的情况会造成死锁错误。

    处理死锁

    在 上面的处理方式中,如果获取锁的客户端端执行时间过长,进程被kill掉,或者因为其他异常崩溃,导致无法释放锁,就会造成死锁。所以,需要对加锁要做时 效性检测。因此,我们在加锁时,把当前时间戳作为value存入此锁中,通过当前时间戳和redis中的时间戳进行对比,如果超过一定差值,认为锁已经时 效,防止锁无限期的锁下去,但是,在大并发情况,如果同时检测锁失效,并简单粗暴的删除死锁,再通过setnx上锁,可能会导致竞争条件的产生,即多个客 户端同时获取锁。

    c1获取锁,并崩溃。c2和c3调用setnx上锁返回0后,获得foo.lock的时间戳,通过比对时间戳,发现锁超时。
    c2 向foo.lock发送del命令。
    c2 向foo.lock发送setnx获取锁。
    c3 向foo.lock发送del命令,此时c3发送del时,其实del掉的是c2的锁。
    c3 向foo.lock发送setnx获取锁。

    此时c2和c3都获取了锁,产生竞争条件,如果在更高并发的情况,可能会有更多客户端获取锁。所以,del锁的操作,不能直接使用在锁超时的情况下,幸好我们有getset方法,假设我们现在有另外一个客户端c4,看看如何使用getset方式,避免这种情况产生。

    c1获取锁,并崩溃。c2和c3调用setnx上锁返回0后,调用get命令获得foo.lock的时间戳t1,通过比对时间戳,发现锁超时。
    c4 向foo.lock发送geset命令,
    getset foo.lock
    并得到foo.lock中老的时间戳t2

    如果t1=t2,说明c4获得时间戳。
    如果t1!=t2,说明c4之前有另外一个客户端c5通过调用getset方式获取了时间戳,c4未获得锁。只能sleep下,进入下次循环中。

    现在唯一的问题是,c4设置foo.lock的新时间戳,是否会对锁产生影响。其实我们可以看到c4和c5执行的时间差值极小,并且写入foo.lock中的都是有效时间错,所以对锁并没有影响。
    为 了让这个锁更加强壮,获取锁的客户端,应该在调用关键业务时,再次调用get方法获取t1,和写入的t0时间戳进行对比,以免锁因其他情况被执行del意 外解开而不知。以上步骤和情况,很容易从其他参考资料中看到。客户端处理和失败的情况非常复杂,不仅仅是崩溃这么简单,还可能是客户端因为某些操作被阻塞 了相当长时间,紧接着 del 命令被尝试执行(但这时锁却在另外的客户端手上)。也可能因为处理不当,导致死锁。还有可能因为sleep设置不合理,导致redis在大并发下被压垮。 最为常见的问题还有

    get返回nil时应该走那种逻辑?

    第一种走超时逻辑
    c1客户端获取锁,并且处理完后,del掉锁,在del锁之前。c2通过setnx向foo.lock设置时间戳t0 发现有客户端获取锁,进入get操作。
    c2 向foo.lock发送get命令,获取返回值t1(nil)。
    c2 通过t0>t1 expire对比,进入getset流程。
    c2 调用getset向foo.lock发送t0时间戳,返回foo.lock的原值t2
    c2 如果t2=t1相等,获得锁,如果t2!=t1,未获得锁。

    第二种情况走循环走setnx逻辑
    c1客户端获取锁,并且处理完后,del掉锁,在del锁之前。c2通过setnx向foo.lock设置时间戳t0 发现有客户端获取锁,进入get操作。
    c2 向foo.lock发送get命令,获取返回值t1(nil)。
    c2 循环,进入下一次setnx逻辑

    两 种逻辑貌似都是ok,但是从逻辑处理上来说,第一种情况存在问题。当get返回nil表示,锁是被删除的,而不是超时,应该走setnx逻辑加锁。走第一 种情况的问题是,正常的加锁逻辑应该走setnx,而现在当锁被解除后,走的是getst,如果判断条件不当,就会引起死锁,很悲催,我在做的时候就碰到 了,具体怎么碰到的看下面的问题

    getset返回nil时应该怎么处理?

    c1和c2客户端调用get接口,c1返回t1,此时c3网络情况更好,快速进入获取锁,并执行del删除锁,c2返回t2(nil),c1和c2都进入超时处理逻辑。
    c1 向foo.lock发送getset命令,获取返回值t11(nil)。
    c1 比对c1和c11发现两者不同,处理逻辑认为未获取锁。
    c2 向foo.lock发送getset命令,获取返回值t22(c1写入的时间戳)。
    c2 比对c2和c22发现两者不同,处理逻辑认为未获取锁。

    此 时c1和c2都认为未获取锁,其实c1是已经获取锁了,但是他的处理逻辑没有考虑getset返回nil的情况,只是单纯的用get和getset值就行 对比,至于为什么会出现这种情况?一种是多客户端时,每个客户端连接redis的后,发出的命令并不是连续的,导致从单客户端看到的好像连续的命令,到 redis server后,这两条命令之间可能已经插入大量的其他客户端发出的命令,比如del,setnx等。第二种情况,多客户端之间时间不同步,或者不是严格 意义的同步。

    时间戳的问题

    我们看到foo.lock的value值为时间戳,所以要在多客户端情况下,保证锁有效,一定要同步各服务器的时间,如果各服务器间,时间有差异。时间不一致的客户端,在判断锁超时,就会出现偏差,从而产生竞争条件。
    锁的超时与否,严格依赖时间戳,时间戳本身也是有精度限制,假如我们的时间精度为秒,从加锁到执行操作再到解锁,一般操作肯定都能在一秒内完成。这样的话,我们上面的case,就很容易出现。所以,最好把时间精度提升到毫秒级。这样的话,可以保证毫秒级别的锁是安全的。

    分布式锁的问题

    1:必要的超时机制:获取锁的客户端一旦崩溃,一定要有过期机制,否则其他客户端都降无法获取锁,造成死锁问题。
    2:分布式锁,多客户端的时间戳不能保证严格意义的一致性,所以在某些特定因素下,有可能存在锁串的情况。要适度的机制,可以承受小概率的事件产生。
    3:只对关键处理节点加锁,良好的习惯是,把相关的资源准备好,比如连接数据库后,调用加锁机制获取锁,直接进行操作,然后释放,尽量减少持有锁的时间。
    4:在持有锁期间要不要check锁,如果需要严格依赖锁的状态,最好在关键步骤中做锁的check检查机制,但是根据我们的测试发现,在大并发时,每一次check锁操作,都要消耗掉几个毫秒,而我们的整个持锁处理逻辑才不到10毫秒,玩客没有选择做锁的检查。
    5:sleep学问,为了减少对redis的压力,获取锁尝试时,循环之间一定要做sleep操作。但是sleep时间是多少是门学问。需要根据自己的redis的qps,加上持锁处理时间等进行合理计算。
    6:至于为什么不使用redis的muti,expire,watch等机制,可以查一参考资料,找下原因。

    锁测试数据

    未使用sleep
    第一种,锁重试时未做sleep。单次请求,加锁,执行,解锁时间 


    可以看到加锁和解锁时间都很快,当我们使用

    ab -n1000 -c100 'http://sandbox6.wanke.etao.com/test/test_sequence.php?tbpm=t'
    ab 并发100累计1000次请求,对这个方法进行压测时。


    我们会发现,获取锁的时间变成,同时持有锁后,执行时间也变成,而delete锁的时间,将近10ms时间,为什么会这样?
    1:持有锁后,我们的执行逻辑中包含了再次调用redis操作,在大并发情况下,redis执行明显变慢。
    2:锁的删除时间变长,从之前的0.2ms,变成9.8ms,性能下降近50倍。
    在这种情况下,我们压测的qps为49,最终发现qps和压测总量有关,当我们并发100总共100次请求时,qps得到110多。当我们使用sleep时

    使用sleep时

    单次执行请求时

    我们看到,和不使用sleep机制时,性能相当。当时用相同的压测条件进行压缩时 

    获取锁的时间明显变长,而锁的释放时间明显变短,仅是不采用sleep机制的一半。当然执行时间变成就是因为,我们在执行过程中,重新创建数据库连接,导致时间变长的。同时我们可以对比下redis的命令执行压力情况 

    上 图中细高部分是为未采用sleep机制的时的压测图,矮胖部分为采用sleep机制的压测图,通上图看到压力减少50%左右,当然,sleep这种方式还 有个缺点qps下降明显,在我们的压测条件下,仅为35,并且有部分请求出现超时情况。不过综合各种情况后,我们还是决定采用sleep机制,主要是为了 防止在大并发情况下把redis压垮,很不行,我们之前碰到过,所以肯定会采用sleep机制。

    参考资料



    http://www.blogjava.net/caojianhua/archive/2013/01/28/394847.html


    引子

    redis是一个很强大的数据结构存储的nosql数据库,很方便针对业务模型进行效率的优化。最近我的工作是负责对现有java服务器框架进行整理,并将网络层与逻辑层脱离,以便于逻辑层和网络层的横向扩展。 尽管我在逻辑层上使用了akka作为核心框架,尽可能lockfree,但是还是免不了需要跨jvm的锁。所以我需要实现一个分布式锁。

    官方的实现

    官方在 这一页给了一个实现。

    • c4 sends setnx lock.foo in order to acquire the lock
    • the crashed client c3 still holds it, so redis will reply with 0 to c4.
    • c4 sends get lock.foo to check if the lock expired. if it is not, it will sleep for some time and retry from the start.
    • instead, if the lock is expired because the unix time at lock.foo is older than the current unix time, c4 tries to perform: getset lock.foo (current unix timestamp lock timeout 1)
    • because of the getset semantic, c4 can check if the old value stored at key is still an expired timestamp. if it is, the lock was acquired.
    • if another client, for instance c5, was faster than c4 and acquired the lock with the getset operation, the c4 getset operation will return a non expired timestamp. c4 will simply restart from the first step. note that even if c4 set the key a bit a few seconds in the future this is not a problem.

    但是使用官方推荐的getset实现的话,未竞争到锁的一方确实可以判断到自己未能竞争到锁,但却将持有锁一方的时间修改了,这样的直接后果就是,持有锁的一方无法解锁!!!

    基于lua的实现

    其实官方实现出现的问题,是因为使用redis独立的命令不能将get-check-set这个过程进行原子化,所以我决定引入redis-lua,将get-check-set这个过程使用lua脚本来实现。

    加锁:

    • script params: lock_key, current_timestamp, lock_timeout
    • setnx lock_key (current_timestamp lock_timeout). if not success, set lock_key (current_timestamp lock_timeout) if current_timestamp > value
    • client save current_timestamp(lock_create_timestamp)

    解锁:

    • script params: lock_key, lock_create_timestamp, lock_timeout
    • delete if lock_create_timestamp lock_timeout == value

    具体的实现:

    lua
    1. ---lock

    2. local now = tonumber(argv[1])
    3. local timeout = tonumber(argv[2])
    4. local to = now timeout
    5. local locked = redis.call('setnx', keys[1], to)
    6. if (locked == 1) then
    7. return 0
    8. end
    9. local kt = redis.call('type', keys[1]);
    10. if (kt['ok'] ~= 'string') then
    11. return 2
    12. end
    13. local keyvalue = tonumber(redis.call('get', keys[1]))
    14. if (now > keyvalue) then
    15. redis.call('set', keys[1], to)
    16. return 0
    17. end
    18. return 1

    19. ---unlock

    20. local begin = tonumber(argv[1])
    21. local timeout = tonumber(argv[2])
    22. local kt = redis.call('type', keys[1]);
    23. if (kt['ok'] == 'string') then
    24. local keyvalue = tonumber(redis.call('get', keys[1]))
    25. if ((keyvalue - begin) == timeout) then
    26. redis.call('del', keys[1])
    27. return 0
    28. end
    29. end
    30. return 1

    已知问题

    redis的分布式锁会有单点的问题。当然我们的业务量也没有达到挂掉专门做锁的redis单点的水平。



    simone 2016-05-12 17:52 发表评论
    ]]>
    spring 动态注册类http://www.blogjava.net/wangxinsh55/archive/2016/03/23/429774.htmlsimonesimonewed, 23 mar 2016 02:47:00 gmthttp://www.blogjava.net/wangxinsh55/archive/2016/03/23/429774.htmlhttp://www.blogjava.net/wangxinsh55/comments/429774.htmlhttp://www.blogjava.net/wangxinsh55/archive/2016/03/23/429774.html#feedback0http://www.blogjava.net/wangxinsh55/comments/commentrss/429774.htmlhttp://www.blogjava.net/wangxinsh55/services/trackbacks/429774.htmlimport com.duxiu.modules.beetlsql.beetlsqldao;
    import org.beetl.sql.core.sqlmanager;
    import org.springframework.beans.beansexception;
    import org.springframework.beans.factory.support.defaultlistablebeanfactory;
    import org.springframework.context.applicationcontext;
    import org.springframework.context.applicationcontextaware;
    import org.springframework.context.configurableapplicationcontext;
    import org.springframework.core.io.resource;
    import org.springframework.core.io.support.pathmatchingresourcepatternresolver;
    import org.springframework.core.io.support.resourcepatternresolver;

    import java.io.ioexception;
    import java.util.objects;
    import java.util.stream.stream;

    public class daofactorybean implements applicationcontextaware {
        
        
        @override
        
    public void setapplicationcontext(applicationcontext applicationcontext) throws beansexception {
            
            configurableapplicationcontext context 
    = (configurableapplicationcontext) applicationcontext;
            defaultlistablebeanfactory beanfactory 
    = (defaultlistablebeanfactory) context.getbeanfactory();
            resourcepatternresolver rpr 
    = new pathmatchingresourcepatternresolver(applicationcontext);
            sqlmanager sqlmanager 
    = applicationcontext.getbean(sqlmanager.class);
            
    try {
                resource[] resources 
    = rpr.getresources("classpath:com/duxiu/**/*.class");
                stream.of(resources).map(f 
    -> {
                    
    try {
                        
    return f.geturi().getpath().split("(classes/)|(!/)")[1].replace("/"".").replace(".class""");
                    } 
    catch (ioexception e) {
                        e.printstacktrace();
                        
    return null;
                    }
                }).filter(objects::nonnull).foreach(f 
    -> {
                    
    try {
                        class
     aclass = class.forname(f);
                        
    boolean match = stream.of(aclass.getannotations()).anymatch(c -> c instanceof beetlsqldao);
                        
    if (match && !beanfactory.containsbean(aclass.getsimplename())) {
                            system.out.println(sqlmanager.getmapper(aclass));
                            
    //beanfactory.registersingleton(aclass.getsimplename(),sqlmanager.getmapper(aclass));
                        }
                    } 
    catch (classnotfoundexception e) {
                        e.printstacktrace();
                    }
                });
            } 
    catch (ioexception e) {
                e.printstacktrace();
            }
            system.out.println(applicationcontext.getbean(sqlmanager.
    class));
            
    /*if(!beanfactory.containsbean(beanname)){
                beandefinitionbuilder beandefinitionbuilder= beandefinitionbuilder.rootbeandefinition(beanclass);
                beandefinitionbuilder.addpropertyvalue("host", host);
                beandefinitionbuilder.addpropertyvalue("port", port);
                beandefinitionbuilder.addpropertyvalue("database", database);
                beandefinitionbuilder.setinitmethodname("init");
                beandefinitionbuilder.setdestroymethodname("destroy");
                beanfactory.registerbeandefinition(beanname, beandefinitionbuilder.getbeandefinition());
                logger.info("add {} to bean container.", beanname);
            }
    */
        }
    }


    simone 2016-03-23 10:47 发表评论
    ]]>
    模块化利器: 一篇文章掌握requirejs常用知识http://www.blogjava.net/wangxinsh55/archive/2016/03/04/429538.htmlsimonesimonefri, 04 mar 2016 07:27:00 gmthttp://www.blogjava.net/wangxinsh55/archive/2016/03/04/429538.htmlhttp://www.blogjava.net/wangxinsh55/comments/429538.htmlhttp://www.blogjava.net/wangxinsh55/archive/2016/03/04/429538.html#feedback0http://www.blogjava.net/wangxinsh55/comments/commentrss/429538.htmlhttp://www.blogjava.net/wangxinsh55/services/trackbacks/429538.htmlhttp://www.cnblogs.com/lyzg/p/4865502.html

    通 过本文,你可以对模块化开发和amd规范有一个较直观的认识,并详细地学习requirejs这个模块化开发工具的常见用法。本文采取循序渐进的方式,从 理论到实践,从requirejs官方api文档中,总结出在使用requirejs过程中最常用的一些用法,并对文档中不够清晰具体的内容,加以例证和 分析,希望本文的内容对你的能力提升有实质性的帮助。

    1. 模块化

    相信每个前端开发人员在刚开始接触js编程时,都写过类似下面这样风格的代码:

     
     

    这些代码的特点是:

    • 到处可见的全局变量
    • 大量的函数
    • 内嵌在html元素上的各种js调用

    当然这些代码本身在实现功能上并没有错误,但是从代码的可重用性,健壮性以及可维护性来说,这种编程方式是有问题的,尤其是在页面逻辑较为复杂的应用中,这些问题会暴露地特别明显:

    • 全局变量极易造成命名冲突
    • 函数式编程非常不利于代码的组织和管理
    • 内嵌的js调用很不利于代码的维护,因为html代码有的时候是十分臃肿和庞大的

    所以当这些问题出现的时候,js大牛们就开始寻找去解决这些问题的究极办法,于是模块化开发就出现了。正如模块化这个概念的表面意思一样,它要求在 编写代码的时候,按层次,按功能,将独立的逻辑,封装成可重用的模块,对外提供直接明了的调用接口,内部实现细节完全私有,并且模块之间的内部实现在执行 期间互不干扰,最终的结果就是可以解决前面举例提到的问题。一个简单遵循模块化开发要求编写的例子:

    //module.js var student = function (name) {         return name && {                 getname: function () {                     return name;                 }             };     },     course = function (name) {         return name && {                 getname: function () {                     return name;                 }             }     },     controller = function () {         var data = {};         return {             add: function (stu, cour) {                 var stuname = stu && stu.getname(),                     courname = cour && cour.getname(),                     current,                     _filter = function (e) {                         return e === courname;                     };                  if (!stuname || !courname) return;                  current = data[stuname] = data[stuname] || [];                  if (current.filter(_filter).length === 0) {                     current.push(courname);                 }             },             list: function (stu) {                 var stuname = stu && stu.getname(),                     current = data[stuname];                 current && console.log(current.join(';'));             }         }     };  //main.js  var stu = new student('lyzg'),     c = new controller();  c.add(stu,new course('javascript')); c.add(stu,new course('html')); c.add(stu,new course('css')); c.list(stu);

    以上代码定义了三个模块分别表示学生,课程和控制器,然后在main.js中调用了controller提供的add和list接口,为lyzg这个学生添加了三门课程,然后在控制台显示了出来。运行结果如下:

    javascript;html;css

    通过上例,可以看出模块化的代码结构和逻辑十分清晰,代码看起来十分优雅,另外由于逻辑都通过模块拆分,所以达到了解耦的目的,代码的功能也会比较 健壮。不过上例使用的这种模块化开发方式也并不是没有问题,这个问题就是它还是把模块引用如student这些直接添加到了全局空间下,虽然通过模块减少 了很多全局空间的变量和函数,但是模块引用本身还是要依赖全局空间,才能被调用,当模块较多,或者有引入第三方模块库时,仍然可能造成命名冲突的问题,所 以这种全局空间下的模块化开发的方式并不是最完美的方式。目前常见的模块化开发方式,全局空间方式是最基本的一种,另外常见的还有遵循amd规范的开发方 式,遵循cmd规范的开发方式,和ecmascript 6的开发方式。需要说明的是,cmd和es6跟本文的核心没有关系,所以不会在此介绍,后面的内容主要介绍amd以及实现了amd规范的 requirejs。

    2. amd规范

    正如上文提到,实现模块化开发的方式,另外常见的一种就是遵循amd规范的实现方式,不过amd规范并不是具体的实现方式,而仅仅是模块化开发的一 种凯发天生赢家一触即发官网的解决方案,你可以把它理解成模块化开发的一些接口声明,如果你要实现一个遵循该规范的模块化开发工具,就必须实现它预先定义的api。比如它要求在加载 模块时,必须使用如下的api调用方式:

    require([module], callback) 其中: [module]:是一个数组,里面的成员就是要加载的模块; callback:是模块加载完成之后的回调函数

    所有遵循amd规范的模块化工具,都必须按照它的要求去实现,比如requirejs这个库,就是完全遵循amd规范实现的,所以在利用 requirejs加载或者调用模块时,如果你事先知道amd规范的话,你就知道该怎么用requirejs了。规范的好处在于,不同的实现却有相同的调 用方式,很容易切换不同的工具使用,至于具体用哪一个实现,这就跟各个工具的各自的优点跟项目的特点有关系,这些都是在项目开始选型的时候需要确定的。目 前requirejs不是唯一实现了amd规范的库,像dojo这种更全面的js库也都有amd的实现。

    最后对amd全称做一个解释,译为:异步模块定义。异步强调的是,在加载模块以及模块所依赖的其它模块时,都采用异步加载的方式,避免模块加载阻塞了网页的渲染进度。相比传统的异步加载,amd工具的异步加载更加简便,而且还能实现按需加载,具体解释在下一部分说明。

    3. javascript的异步加载和按需加载

    html中的script标签在加载和执行过程中会阻塞网页的渲染,所以一般要求尽量将script标签放置在body元素的底部,以便加快页面显示的速度,还有一种方式就是通过异步加载的方式来加载js,这样可以避免js文件对html渲染的阻塞。

    第1种异步加载的方式是直接利用脚本生成script标签的方式:

    (function() {     var s = document.createelement('script');     s.type = 'text/javascript';     s.async = true;     s.src = 'http://yourdomain.com/script.js';     var x = document.getelementsbytagname('script')[0];     x.parentnode.insertbefore(s, x); })();

    这段代码,放置在script标记内部,然后该script标记添加到body元素的底部即可。

    第2种方式是借助script的属性:defer和async,defer这个属性在ie浏览器和早起的火狐浏览器中支持,async在支持 html5的浏览器上都支持,只要有这两个属性,script就会以异步的方式来加载,所以script在html中的位置就不重要了:

     
     

    这种方式下,所有异步js在执行的时候还是按顺序执行的,不然就会存在依赖问题,比如如果上例中的main.js依赖foo.js和bar.js, 但是main.js先执行的话就会出错了。虽然从来理论上这种方式也算不错了,但是不够好,因为它用起来很繁琐,而且还有个问题就是页面需要添加多个 script标记以及没有办法完全做到按需加载。

    js的按需加载分两个层次,第一个层次是只加载这个页面可能被用到的js,第二个层次是在只在用到某个js的时 候才去加载。传统地方式很容易做到第一个层次,但是不容易做到第二个层次,虽然我们可以通过合并和压缩工具,将某个页面所有的js都添加到一个文件中去, 最大程度减少资源请求量,但是这个js请求到客户端以后,其中有很多内容可能都用不上,要是有个工具能够做到在需要的时候才去加载相关js就完美解决问题 了,比如requirejs。

    4. requirejs常用用法总结

    前文多次提及requirejs,本部分将对它的常用用法详细说明,它的官方地址是:,你可以到该地址去下载最新版requirejs文件。requirejs作为目前使用最广泛的amd工具,它的主要优点是:

    • 完全支持模块化开发
    • 能将非amd规范的模块引入到requirejs中使用
    • 异步加载js
    • 完全按需加载依赖模块,模块文件只需要压缩混淆,不需要合并
    • 错误调试
    • 插件支持

    4.01 如何使用requirejs

    使用方式很简单,只要一个script标记就可以在网页中加载requirejs:

     

    由于这里用到了defer和async这两个异步加载的属性,所以require.js是异步加载的,你把这个script标记放置在任何地方都没有问题。

    4.02 如何利用requirejs加载并执行当前网页的逻辑js

    4.01解决的仅仅是requirejs的使用问题,但它仅仅是一个js库,是一个被当前页面的逻辑所利用的工具,真正实现网页功能逻辑的是我们要 利用requirejs编写的主js,这个主js(假设这些代码都放置在main.js文件中)又该如何利用rj来加载执行呢?方式如下:

     

    对比4.01,你会发现script标记多了一个data-main,rj用这个配置当前页面的主js,你要把逻辑都写在这个main.js里面。 当rj自身加载执行后,就会再次异步加载main.js。这个main.js是当前网页所有逻辑的入口,理想情况下,整个网页只需要这一个script标 记,利用rj加载依赖的其它文件,如jquery等。

     

    4.03 main.js怎么写

    假设项目的目录结构为:

    main.js是跟当前页面相关的主js,app文件夹存放本项目自定义的模块,lib存放第三方库。

    html中按4.02的方式配置rj。main.js的代码如下:

    require(['lib/foo', 'app/bar', 'app/app'], function(foo, bar, app) {     //use foo bar app do sth });

    在这段js中,我们利用rj提供的require方法,加载了三个模块,然后在这个三个模块都加载成功之后执行页面逻辑。require方法有2个 参数,第一个参数是数组类型的,实际使用时,数组的每个元素都是一个模块的module id,第二个参数是一个回调函数,这个函数在第一个参数定义的所有模块都加载成功后回调,形参的个数和顺序分别与第一个参数定义的模块对应,比如第一个模 块时lib/foo,那么这个回调函数的第一个参数就是foo这个模块的引用,在回调函数中我们使用这些形参来调用各个模块的方法,由于回调是在各模块加 载之后才调用的,所以这些模块引用肯定都是有效的。

    从以上这个简短的代码,你应该已经知道该如何使用rj了。

    4.04 rj的baseurl和module id

    在介绍rj如何去解析依赖的那些模块js的路径时,必须先弄清楚baseurl和module id这两个概念。

    html中的base元素可以定义当前页面内部任何http请求的url前缀部分,rj的baseurl跟这个base元素起的作用是类似的,由于 rj总是动态地请求依赖的js文件,所以必然涉及到一个js文件的路径解析问题,rj默认采用一种baseurl moduleid的解析方式,这个解析方式后续会举例说明。这个baseurl非常重要,rj对它的处理遵循如下规则:

    • 在没有使用data-main和config的情况下,baseurl默认为当前页面的目录
    • 在有data-main的情况下,main.js前面的部分就是baseurl,比如上面的scripts/
    • 在有config的情况下,baseurl以config配置的为准

    上述三种方式,优先级由低到高排列。

    data-main的使用方式,你已经知道了,config该如何配置,如下所示:

    require.config({     baseurl: 'scripts' });

    这个配置必须放置在main.js的最前面。data-main与config配置同时存在的时候,以config为准,由于rj的其它配置也是在这个位置配置的,所以4.03中的main.js可以改成如下结构,以便将来的扩展:

    require.config({     baseurl: 'scripts' });  require(['lib/foo', 'app/bar', 'app/app'], function(foo, bar, app) {     // use foo bar app do sth });

     

    关于module id,就是在require方法以及后续的define方法里,用在依赖数组这个参数里,用来标识一个模块的字符串。上面代码中的['lib/foo', 'app/bar', 'app/app']就是一个依赖数组,其中的每个元素都是一个module id。值得注意的是,module id并不一定是该module 相关js路径的一部分,有的module id很短,但可能路径很长,这跟rj的解析规则有关。下一节详细介绍。

    4.05 rj的文件解析规则

    rj默认按baseurl module id的规则,解析文件,并且它默认要加载的文件都是js,所以你的module id里面可以不包含.js的后缀,这就是为啥你看到的module id都是lib/foo, app/bar这种形式了。有三种module id,不适用这种规则:

    • 以/开头,如/lib/jquey.js
    • 以.js结尾,如test.js
    • 包含http或https,如

    假如main.js如下使用:

    require.config({     baseurl: 'scripts' });  require(['/lib/foo', 'test.js', 'http://cdn.baidu.com/js/jquery'], function(foo, bar, app) {     // use foo bar app do sth });

    这三个module 都不会根据baseurl module id的规则来解析,而是直接用module id来解析,等效于下面的代码:

    <script src="/lib/foo.js">script> <script src="test.js">script> <script src="http://cdn.baidu.com/js/jquery.js">script>

    各种module id解析举例:

    例1,项目结构如下:

    main.js如下:

    require.config({     baseurl: 'scripts' });  require(['lib/foo', 'app/bar', 'app/app'], function(foo, bar, app) {     // use foo bar app do sth });

    baseurl为:scripts目录

    moduleid为:lib/foo, app/bar, app/app

    根据baseurl moduleid,以及自动补后缀.js,最终这三个module的js文件路径为:

    scripts/lib/foo.js scripts/app/bar.js scripts/app/app.js

    例2,项目结构同例1:

    main.js改为:

    require.config({     baseurl: 'scripts/lib',     paths: {      app: '../app'     } });   require(['foo', 'app/bar', 'app/app'], function(foo, bar, app) {     // use foo bar app do sth });

    这里出现了一个新的配置paths,它的作用是针对module id中特定的部分,进行转义,如以上代码中对app这个部分,转义为../app,这表示一个相对路径,相对位置是baseurl所指定的目录,由项目结 构可知,../app其实对应的是scirpt/app目录。正因为有这个转义的存在,所以以上代码中的app/bar才能被正确解析,否则还按 baseurl moduleid的规则,app/bar不是应该被解析成scripts/lib/app/bar.js吗,但实际并非如此,app/bar被解析成 scripts/app/bar.js,其中起关键作用的就是paths的配置。通过这个举例,可以看出module id并不一定是js文件路径中的一部分,paths的配置对于路径过程的js特别有效,因为可以简化它的module id。

    另外第一个模块的id为foo,同时没有paths的转义,所以根据解析规则,它的文件路径时:scripts/lib/foo.js。

    paths的配置中只有当模块位于baseurl所指定的文件夹的同层目录,或者更上层的目录时,才会用到../这种相对路径。

    例3,项目结果同例1,main.js同例2:

    这里要说明的问题稍微特殊,不以main.js为例,而以app.js为例,且app依赖bar,当然config还是需要在main.js中定义的,由于这个问题在定义模块的时候更加常见,所以用define来举例,假设app.js模块如下定义:

    define(['./bar'], function(bar) {      return {           dosth: function() {               bar.dosth();           }      } });

    上面的代码通过define定义了一个模块,这个define函数后面介绍如何定义模块的时候再来介绍,这里简单了解。这里这种用法的第一个参数跟 require函数一样,是一个依赖数组,第二个参数是一个回调,也是在所有依赖加载成功之后调用,这个回调的返回值会成为这个模块的引用被其它模块所使 用。

    这里要说的问题还是跟解析规则相关的,如果完全遵守rj的解析规则,这里的依赖应该配置成app/bar才是正确的,但由于app.js与 bar.js位于同一个目录,所以完全可利用./这个同目录的相对标识符来解析js,这样的话只要app.js已经加载成功了,那么去同目录下找 bar.js就肯定能找到了。这种配置在定义模块的时候非常有意义,这样你的模块就不依赖于放置这些模块的文件夹名称了。

    4.06 rj的异步加载

    rj不管是require方法还是define方法的依赖模块都是异步加载的,所以下面的代码不一定能解析到正确的js文件:

     
    //main.js
    require.config({ paths: { foo:
    'libs/foo-1.1.3' } });
    //other.js
    require( ['foo'], function( foo ) { //foo is undefined });

    由于main.js是异步加载的,所以other.js会比它先加载,但是rj的配置存在于main.js里面,所以在加载other.js读不到rj的配置,在other.js执行的时候解析出来的foo的路径就会变成scripts/foo.js,而正确路径应该是scripts/libs/foo-1.1.3.js。

    尽管rj的依赖是异步加载的,但是已加载的模块在多次依赖的时候,不会再重新加载:

    define(['require', 'app/bar', 'app/app'], function(require) {     var bar= require("app/bar");     var app= require("app/app");     //use bar and app do sth });

    上面的代码,在callback定义的时候,只用了一个形参,这主要是为了减少形参的数量,避免整个回调的签名很长。依赖的模块在回调内部可以直接用require(moduleid)的参数得到,由于在回调执行前,依赖的模块已经加载,所以此处调用不会再重新加载。但是如果此处获取一个并不在依赖数组中出现的module id,require很有可能获取不到该模块引用,因为它可能需要重新加载,如果它没有在其它模块中被加载过的话。

    4.07 rj官方推荐的js文件组织结构

    rj建议,文件组织尽量扁平,不要多层嵌套,最理想的是跟项目相关的放在一个文件夹,第三方库放在一个文件夹,如下所示:

    4.08 使用define定义模块

    amd规定的模块定义规范为:

    define(id?, dependencies?, factory);  其中: id: 模块标识,可以省略。 dependencies: 所依赖的模块,可以省略。 factory: 模块的实现,或者一个javascript对象

    关于第一个参数,本文不会涉及,因为rj建议所有模块都不要使用第一个参数,如果使用第一个参数定义的模块成为命名模块,不适用第一个参数的模块成为匿名模块,命名模块如果更名,所有依赖它的模块都得修改!第二个参数是依赖数组,跟require一样,如果没有这个参数,那么定义的就是一个无依赖的模块;最后一个参数是回调或者是一个简单对象,在模块加载完毕后调用,当然没有第二个参数,最后一个参数也会调用。

    本部分所举例都采用如下项目结构:

    1. 定义简单对象模块:

    app/bar.js

    define({  bar:'i am bar.' });

    利用main.js测试:

    require.config({     baseurl: 'scripts/lib',     paths: {         app: '../app'     } });  require(['app/bar'], function(bar) {     console.log(bar);// {bar: 'i am bar.'} });

    2. 定义无依赖的模块:

    app/nodec.js:

    define(function () {     return {         nodec: "yes, i don't need dependence."     } });

    利用main.js测试:

    require.config({     baseurl: 'scripts/lib',     paths: {         app: '../app'     } });  require(['app/nodec'], function(nodec) {     console.log(nodec);// {nodec: yes, i don't need dependence.'} });

    3. 定义依赖其它模块的模块:

    app/dec.js:

    define(['jquery'], function($){     //use $ do sth ...     return {        usejq: true     } });

    利用main.js测试:

    require.config({     baseurl: 'scripts/lib',     paths: {         app: '../app'     } });  require(['app/dec'], function(dec) {     console.log(dec);//{usejq: true} });

    4. 循环依赖:

    当一个模块foo的依赖数组中存在bar,bar模块的依赖数组中存在foo,就会形成循环依赖,稍微修改下bar.js和foo.js如下。

    app/bar.js:

    define(['foo'],function(foo){  return {   name: 'bar',   hi: function(){    console.log('hi! '   foo.name);   }  } });

    lib/foo.js:

    define(['app/bar'],function(bar){  return {   name: 'foo',   hi: function(){    console.log('hi! '   bar.name);   }  } });

    利用main.js测试:

    require.config({     baseurl: 'scripts/lib',     paths: {         app: '../app'     } });   require(['app/bar', 'foo'], function(bar, foo) {     bar.hi();     foo.hi(); });

    运行结果:

    如果改变main.js中require部分的依赖顺序,结果:

    循环依赖导致两个依赖的module之间,始终会有一个在获取另一个的时候,得到undefined。解决方法是,在定义module的时候,如果用到循环依赖的时候,在define内部通过require重新获取。main.js不变,bar.js改成:

    define(['require', 'foo'], function(require, foo) {     return {         name: 'bar',         hi: function() {          foo = require('foo');             console.log('hi! '   foo.name);         }     } });

    foo.js改成:

    define(['require', 'app/bar'], function(require, bar) {     return {         name: 'foo',         hi: function() {          bar = require('app/bar');             console.log('hi! '   bar.name);         }     } });

    利用上述代码,重新执行,结果是:

    模块定义总结:不管模块是用回调函数定义还是简单对象定义,这个模块输出的是一个引用,所以这个引用必须是有效的,你的回调不能返回undefined,但是不局限于对象类型,还可以是数组,函数,甚至是基本类型,只不过如果返回对象,你能通过这个对象组织更多的接口。

    4.09 内置的rj模块

    再看看这个代码:

    define(['require', 'app/bar'], function(require) {     return {         name: 'foo',         hi: function() {             var bar = require('app/bar');             console.log('hi! '   bar.name);         }     } });

    依赖数组中的require这个moduleid对应的是一个内置模块,利用它加载模块,怎么用你已经看到了,比如在main.js中,在define中。另外一个内置模块是module,这个模块跟rj的另外一个配置有关,具体用法请在第5大部分去了解。

    4.10 其它rj有用功能

    1. 生成相对于模块的url地址

    define(["require"], function(require) {     var cssurl = require.to; });

    这个功能在你想要动态地加载一些文件的时候有用,注意要使用相对路径。

    2. 控制台调试

    require("module/name").callsomefunction()

    假如你想在控制台中查看某个模块都有哪些方法可以调用,如果这个模块已经在页面加载的时候通过依赖被加载过后,那么就可以用以上代码在控制台中做各种测试了。

    5. requirejs常用配置总结

    在rj的配置中,前面已经接触到了baseurl,paths,另外几个常用的配置是:

    • shim
    • config
    • enforcedefine
    • urlargs

    5.01 shim

    为那些没有使用define()来声明依赖关系、设置模块的"浏览器全局变量注入"型脚本做依赖和导出配置。

    例1:利用exports将模块的全局变量引用与requirejs关联

    项目结构如图:

    main.js如下:

    require.config({     baseurl: 'scripts/lib',     paths: {         app: '../app'     },     shim: {      underscore: {       exports: '_'      }     } });  require(['underscore'], function(_) {     // 现在可以通过_调用underscore的api了 });

    如你所见,rj在shim中添加了一个对underscore这个模块的配置,并通过exports属性指定该模块暴露的全局变量,以便rj能够对这些模块统一管理。

    例2:利用deps配置js模块的依赖

    项目结构如图:

    main.js如下:

    require.config({     baseurl: 'scripts/lib',     paths: {         app: '../app'     },     shim: {      backbone: {         deps: ['underscore', 'jquery'],         exports: 'backbone'      }     } });  require(['backbone'], function(backbone) {     //use backbone's api });

    由于backbone这个组件依赖jquery和underscore,所以可以通过deps属性配置它的依赖,这样backbone将会在另外两个模块加载完毕之后才会加载。

    例3:jquery等库插件配置方法

    代码举例如下:

    requirejs.config({     shim: {         'jquery.colorize': {             deps: ['jquery'],             exports: 'jquery.fn.colorize'         },         'jquery.scroll': {             deps: ['jquery'],             exports: 'jquery.fn.scroll'         },         'backbone.layoutmanager': {             deps: ['backbone']             exports: 'backbone.layoutmanager'         }     } });

     

    5.02 config

    常常需要将配置信息传给一个模块。这些配置往往是application级别的信息,需要一个手段将它们向下传递给模块。在requirejs中,基于requirejs.config()的config配置项来实现。要获取这些信息的模块可以加载特殊的依赖“module”,并调用module.config()。

    例1:在requirejs.config()中定义config,以供其它模块使用

    requirejs.config({     config: {         'bar': {             size: 'large'         },         'baz': {             color: 'blue'         }     } });

    如你所见,config属性中的bar这一节是在用于module id为bar这个模块的,baz这一节是用于module id为baz这个模块的。具体使用以bar.js举例:

    define(['module'], function(module) {     //will be the value 'large'var size = module.config().size; });

    前面提到过,rj的内置模块除了require还有一个module,用法就在此处,通过它可以来加载config的内容。

    5.03 enforcedefine

    如果设置为true,则当一个脚本不是通过define()定义且不具备可供检查的shim导出字串值时,就会抛出错误。这个属性可以强制要求所有rj依赖或加载的模块都要通过define或者shim被rj来管理,同时它还有一个好处就是用于错误检测。

    5.04 urlargs

    requirejs获取资源时附加在url后面的额外的query参数。作为浏览器或服务器未正确配置时的“cache bust”手段很有用。使用cache bust配置的一个示例:

    urlargs: "bust="   (new date()).gettime()

    6. 错误处理

    6.01 加载错误的捕获

    ie中捕获加载错误不完美:

    • ie 6-8中的script.onerror无效。没有办法判断是否加载一个脚本会导致404错;更甚地,在404中依然会触发state为complete的onreadystatechange事件。
    • ie 9 中script.onerror有效,但有一个bug:在执行脚本之后它并不触发script.onload事件句柄。因此它无法支持匿名amd模块的标准方法。所以script.onreadystatechange事件仍被使用。但是,state为complete的onreadystatechange事件会在script.onerror函数触发之前触发。

    所以为了支持在ie中捕获加载错误,需要配置enforcedefine为true,这不得不要求你所有的模块都用define定义,或者用shim配置rj对它的引用。

    注意:如果你设置了enforcedefine: true,而且你使用data-main=""来加载你的主js模块,则该主js模块必须调用define()而不是require()来加载其所需的代码。主js模块仍然可调用require/requirejs来设置config值,但对于模块加载必须使用define()。比如原来的这段就会报错:

    require.config({  enforcedefine: true,     baseurl: 'scripts/lib',     paths: {         app: '../app'     },     shim: {      backbone: {       deps: ['underscore', 'jquery'],             exports: 'backbone'      }     } }); require(['backbone'], function(backbone) {     console.log(backbone); });

    把最后三行改成:

    define(['backbone'], function(backbone) {     console.log(backbone); });

    才不会报错。

    6.02 paths备错

    requirejs.config({     //to get timely, correct error triggers in ie, force a define/shim exports check.     enforcedefine: true,     paths: {         jquery: [             'http://ajax.googleapis.com/ajax/libs/jquery/1.4.4/jquery.min',             //if the cdn location fails, load from this location             'lib/jquery'         ]     } });  //later require(['jquery'], function ($) { });

    上述代码先尝试加载cdn版本,如果出错,则退回到本地的lib/jquery.js。

    注意: paths备错仅在模块id精确匹配时工作。这不同于常规的paths配置,常规配置可匹配模块id的任意前缀部分。备错主要用于非常的错误恢复,而不是常规的path查找解析,因为那在浏览器中是低效的。

    6.03 全局 requirejs.onerror

    为了捕获在局域的errback中未捕获的异常,你可以重载requirejs.onerror():

    requirejs.onerror = function (err) {     console.log(err.requiretype);     if (err.requiretype === 'timeout') {         console.log('modules: '   err.requiremodules);     }      throw err; };

    (完)



    simone 2016-03-04 15:27 发表评论
    ]]>
    浅谈spark应用程序的性能调优http://www.blogjava.net/wangxinsh55/archive/2016/03/02/429506.htmlsimonesimonewed, 02 mar 2016 06:12:00 gmthttp://www.blogjava.net/wangxinsh55/archive/2016/03/02/429506.htmlhttp://www.blogjava.net/wangxinsh55/comments/429506.htmlhttp://www.blogjava.net/wangxinsh55/archive/2016/03/02/429506.html#feedback0http://www.blogjava.net/wangxinsh55/comments/commentrss/429506.htmlhttp://www.blogjava.net/wangxinsh55/services/trackbacks/429506.html

    spark是基于内存的分布式计算引擎,以处理的高效和稳定著称。然而在实际的应用开发过程中,开发者还是会遇到种种问题,其中一大类就是和性能相关。在本文中,笔者将结合自身实践,谈谈如何尽可能地提高应用程序性能。

    分布式计算引擎在调优方面有四个主要关注方向,分别是cpu、内存、网络开销和i/o,其具体调优目标如下:

    1. 提高cpu利用率。
    2. 避免oom。
    3. 降低网络开销。
    4. 减少i/o操作。

    第1章 数据倾斜

    数据倾斜意味着某一个或某几个partition中的数据量特别的大,这意味着完成针对这几个partition的计算需要耗费相当长的时间。

    如 果大量数据集中到某一个partition,那么这个partition在计算的时候就会成为瓶颈。图1是spark应用程序执行并发的示意图,在 spark中,同一个应用程序的不同stage是串行执行的,而同一stage中的不同task可以并发执行,task数目由partition数来决 定,如果某一个partition的数据量特别大,则相应的task完成时间会特别长,由此导致接下来的stage无法开始,整个job完成的时间就会非 常长。

    要避免数据倾斜的出现,一种方法就是选择合适的key,或者是自己定义相关的partitioner。在spark中block使用 了bytebuffer来存储数据,而bytebuffer能够存储的最大数据量不超过2gb。如果某一个key有大量的数据,那么在调用cache或 persist函数时就会碰到spark-1476这个异常。

    下面列出的这些api会导致shuffle操作,是数据倾斜可能发生的关键点所在
    1. groupbykey
    2. reducebykey
    3. aggregatebykey
    4. sortbykey
    5. join
    6. cogroup
    7. cartesian
    8. coalesce
    9. repartition
    10. repartitionandsortwithinpartitions


    图1: spark任务并发模型

      def rdd: rdd[t] }  // todo view bounds are deprecated, should use context bounds // might need to change classmanifest for classtag in spark 1.0.0 case class demopairrdd[k <% ordered[k] : classmanifest, v: classmanifest](   rdd: rdd[(k, v)]) extends rddwrapper[(k, v)] {   // here we use a single long to try to ensure the sort is balanced,    // but for really large dataset, we may want to consider   // using a tuple of many longs or even a guid   def sortbykeygrouped(numpartitions: int): rdd[(k, v)] =     rdd.map(kv => ((kv._1, random.nextlong()), kv._2)).sortbykey()     .grouped(numpartitions).map(t => (t._1._1, t._2)) }  case class demordd[t: classmanifest](rdd: rdd[t]) extends rddwrapper[t] {   def grouped(size: int): rdd[t] = {     // todo version where withindex is cached     val withindex = rdd.mappartitions(_.zipwithindex)      val startvalues =       withindex.mappartitionswithindex((i, iter) =>          iterator((i, iter.toiterable.last))).toarray().tolist       .sortby(_._1).map(_._2._2.tolong).scan(-1l)(_   _).map(_   1l)      withindex.mappartitionswithindex((i, iter) => iter.map {       case (value, index) => (startvalues(i)   index.tolong, value)     })     .partitionby(new partitioner {       def numpartitions: int = size       def getpartition(key: any): int =          (key.asinstanceof[long] * numpartitions.tolong / startvalues.last).toint     })     .map(_._2)   } }

    定义隐式的转换

      implicit def todemordd[t: classmanifest](rdd: rdd[t]): demordd[t] =      new demordd[t](rdd)   implicit def todemopairrdd[k <% ordered[k] : classmanifest, v: classmanifest](     rdd: rdd[(k, v)]): demopairrdd[k, v] = demopairrdd(rdd)   implicit def tordd[t](rdd: rddwrapper[t]): rdd[t] = rdd.rdd }

    在spark-shell中就可以使用了

    import rddconversions._  yourrdd.grouped(5)

    第2章 减少网络通信开销

    spark 的shuffle过程非常消耗资源,shuffle过程意味着在相应的计算节点,要先将计算结果存储到磁盘,后续的stage需要将上一个stage的结 果再次读入。数据的写入和读取意味着disk i/o操作,与内存操作相比,disk i/o操作是非常低效的。

    使用iostat来查看disk i/o的使用情况,disk i/o操作频繁一般会伴随着cpu load很高。

    如果数据和计算节点都在同一台机器上,那么可以避免网络开销,否则还要加上相应的网络开销。 使用iftop来查看网络带宽使用情况,看哪几个节点之间有大量的网络传输。
    图2是spark节点间数据传输的示意图,spark task的计算函数是通过akka通道由driver发送到executor上,而shuffle的数据则是通过netty网络接口来实现。由于akka 通道中参数spark.akka.framesize决定了能够传输消息的最大值,所以应该避免在spark task中引入超大的局部变量。

    图2: spark节点间的数据传输

    第1节 选择合适的并发数

    为了提高spark应用程序的效率,尽可能的提升cpu的利用率。并发数应该是可用cpu物理核数的两倍。在这里,并发数过低,cpu得不到充分的利用,并发数过大,由于spark是每一个task都要分发到计算结点,所以任务启动的开销会上升。

    并发数的修改,通过配置参数来改变spark.default.parallelism,如果是sql的话,可能通过修改spark.sql.shuffle.partitions来修改。

    第1项 repartition vs. coalesce

    repartition和coalesce都能实现数据分区的动态调整,但需要注意的是repartition会导致shuffle操作,而coalesce不会。

    第2节 reducebykey vs. groupby

    groupby操作应该尽可能的避免,第一是有可能造成大量的网络开销,第二是可能导致oom。以wordcount为例来演示reducebykey和groupby的差异

    reducebykey     sc.textfile(“readme.md”).map(l=>l.split(“,”)).map(w=>(w,1)).reducebykey(_   _)


    图3:reducebykey的shuffle过程

    shuffle过程如图2所示

    groupbykey     sc.textfile(“readme.md”).map(l=>l.split(“,”)).map(w=>(w,1)).groupbykey.map(r=>(r._1,r._2.sum))


    图4:groupbykey的shuffle过程

    建议: 尽可能使用reducebykey, aggregatebykey, foldbykey和combinebykey
    假设有一rdd如下所示,求每个key的均值

    val data = sc.parallelize( list((0, 2.), (0, 4.), (1, 0.), (1, 10.), (1, 20.)) )

    方法一:reducebykey

    data.map(r=>(r._1, (r.2,1))).reducebykey((a,b)=>(a._1   b._1, a._2   b._2)).map(r=>(r._1,(r._2._1/r._2._2)).foreach(println)

    方法二:combinebykey

    data.combinebykey(value=>(value,1),     (x:(double, int), value:double)=> (x._1 value, x._2   1),     (x:(double,int), y:(double, int))=>(x._1   y._1, x._2   y._2))

    第3节 broadcasthashjoin vs. shufflehashjoin

    在join过程中,经常会遇到大表和小表的join. 为了提高效率可以使用broadcasthashjoin, 预先将小表的内容广播到各个executor, 这样将避免针对小表的shuffle过程,从而极大的提高运行效率。

    其实broadcasthashjoin核心就是利用了broadcast函数,如果理解清楚broadcast的优点,就能比较好的明白broadcasthashjoin的优势所在。

    以下是一个简单使用broadcast的示例程序。

    val lst = 1 to 100 tolist val examplerdd = sc.makerdd(1 to 20 toseq, 2) val broadcastlst = sc.broadcast(lst) examplerdd.filter(i=>broadcastlst.valuecontains(i)).collect.foreach(println)

    第4节 map vs. mappartitions

    有时需要将计算结果存储到外部数据库,势必会建立到外部数据库的连接。应该尽可能的让更多的元素共享同一个数据连接而不是每一个元素的处理时都去建立数据库连接。
    在这种情况下,mappartitions和foreachpartitons将比map操作高效的多。

    第5节 数据就地读取

    移动计算的开销远远低于移动数据的开销。

    spark中每个task都需要相应的输入数据,因此输入数据的位置对于task的性能变得很重要。按照数据获取的速度来区分,由快到慢分别是:

    1.process_local
    2.node_local
    3.rack_local

    spark在task执行的时候会尽优先考虑最快的数据获取方式,如果想尽可能的在更多的机器上启动task,那么可以通过调低spark.locality.wait的值来实现, 默认值是3s。

    除 了hdfs,spark能够支持的数据源越来越多,如cassandra, hbase,mongodb等知名的nosql数据库,随着elasticsearch的日渐兴起,spark和elasticsearch组合起来提供 高速的查询凯发天生赢家一触即发官网的解决方案也成为一种有益的尝试。

    上述提到的外部数据源面临的一个相同问题就是如何让spark快速读取其中的数据, 尽可能的将计算结点和数据结点部署在一起是达到该目标的基本方法,比如在部署hadoop集群的时候,可以将hdfs的datanode和spark worker共享一台机器。

    以cassandra为例,如果spark的部署和cassandra的机器有部分重叠,那么在读取cassandra中数据的时候,通过调低spark.locality.wait就可以在没有部署cassandra的机器上启动spark task。

    对于cassandra, 可以在部署cassandra的机器上部署spark worker,需要注意的是cassandra的compaction操作会极大的消耗cpu,因此在为spark worker配置cpu核数时,需要将这些因素综合在一起进行考虑。

    这一部分的代码逻辑可以参考源码tasksetmanager::addpendingtask

    private def addpendingtask(index: int, readding: boolean = false) {   // utility method that adds `index` to a list only if readding=false or it's not already there   def addto(list: arraybuffer[int]) {     if (!readding || !list.contains(index)) {       list  = index     }   }    for (loc <- tasks(index).preferredlocations) {     loc match {       case e: executorcachetasklocation =>         addto(pendingtasksforexecutor.getorelseupdate(e.executorid, new arraybuffer))       case e: hdfscachetasklocation => {         val exe = sched.getexecutorsaliveonhost(loc.host)         exe match {           case some(set) => {             for (e <- set) {               addto(pendingtasksforexecutor.getorelseupdate(e, new arraybuffer))             }             loginfo(s"pending task $index has a cached location at ${e.host} "                 ", where there are executors "   set.mkstring(","))           }           case none => logdebug(s"pending task $index has a cached location at ${e.host} "                 ", but there are no executors alive there.")         }       }       case _ => unit     }     addto(pendingtasksforhost.getorelseupdate(loc.host, new arraybuffer))     for (rack <- sched.getrackforhost(loc.host)) {       addto(pendingtasksforrack.getorelseupdate(rack, new arraybuffer))     }   }    if (tasks(index).preferredlocations == nil) {     addto(pendingtaskswithnoprefs)   }    if (!readding) {     allpendingtasks  = index  // no point scanning this whole list to find the old task there   } }

    如果准备让spark支持新的存储源,进而开发相应的rdd,与位置相关的部分就是自定义getpreferredlocations函数,以elasticsearch-hadoop中的esrdd为例,其代码实现如下。

    override def getpreferredlocations(split: partition): seq[string] = {   val essplit = split.asinstanceof[espartition]   val ip = essplit.espartition.nodeip   if (ip != null) seq(ip) else nil }

    第6节 序列化

    使用好的序列化算法能够提高运行速度,同时能够减少内存的使用。

    spark在shuffle的时候要将数据先存储到磁盘中,存储的内容是经过序列化的。序列化的过程牵涉到两大基本考虑的因素,一是序列化的速度,二是序列化后内容所占用的大小。

    kryoserializer与默认的javaserializer相比,在序列化速度和序列化结果的大小方面都具有极大的优势。所以建议在应用程序配置中使用kryoserializer.

    spark.serializer  org.apache.spark.serializer.kryoserializer

    默认的cache没有对缓存的对象进行序列化,使用的storagelevel是memory_only,这意味着要占用比较大的内存。可以通过指定persist中的参数来对缓存内容进行序列化。

    examplerdd.persist(memory_only_ser)

    需要特别指出的是persist函数是等到job执行的时候才会将数据缓存起来,属于延迟执行; 而unpersist函数则是立即执行,缓存会被立即清除。

    作者简介:许鹏, 《apache spark源码剖析》作者,关注于大数据实时搜索和实时流数据处理,对elasticsearch, storm及drools多有研究,现就职于携程。



    simone 2016-03-02 14:12 发表评论
    ]]>
    playframwork dist 打包时将非项目中的文件也打包进去http://www.blogjava.net/wangxinsh55/archive/2016/02/26/429448.htmlsimonesimonefri, 26 feb 2016 08:46:00 gmthttp://www.blogjava.net/wangxinsh55/archive/2016/02/26/429448.htmlhttp://www.blogjava.net/wangxinsh55/comments/429448.htmlhttp://www.blogjava.net/wangxinsh55/archive/2016/02/26/429448.html#feedback0http://www.blogjava.net/wangxinsh55/comments/commentrss/429448.htmlhttp://www.blogjava.net/wangxinsh55/services/trackbacks/429448.htmlhttp://stackoverflow.com/questions/12231862/how-to-make-play-framework-dist-command-adding-some-files-folders-to-the-final

    play uses , which supports the inclusion of arbitrary files by adding them to the mappings:

    mappings in universal   =   (basedirectory.value / "scripts" * "*" get) map     (x => x -> ("scripts/"   x.getname)) 
    the syntax assumes play 2.2.x

    val jdk8 = new file("d:\\jdk\\jdk8\\jre1_8_0_40")
    mappings in universal = (jdk8 ** "*" get) map (x => x -> ("jre8/" jdk8.relativize(x).getorelse(x.getname)))


    simone 2016-02-26 16:46 发表评论
    ]]>
    利用中文数据跑google开源项目word2vechttp://www.blogjava.net/wangxinsh55/archive/2016/01/13/429028.htmlsimonesimonewed, 13 jan 2016 05:49:00 gmthttp://www.blogjava.net/wangxinsh55/archive/2016/01/13/429028.htmlhttp://www.blogjava.net/wangxinsh55/comments/429028.htmlhttp://www.blogjava.net/wangxinsh55/archive/2016/01/13/429028.html#feedback0http://www.blogjava.net/wangxinsh55/comments/commentrss/429028.htmlhttp://www.blogjava.net/wangxinsh55/services/trackbacks/429028.htmlhttp://www.cnblogs.com/hebin/p/3507609.html

    一直听说word2vec在处理词与词的相似度的问题上效果十分好,最近自己也上手跑了跑google开源的代码()。

    1、语料

    首先准备数据:采用网上博客上推荐的全网新闻数据(sogouca),大小为2.1g。 

    从ftp上下载数据包sogouca.tar.gz:
    1 wget ftp://ftp.labs.sogou.com/data/sogouca/sogouca.tar.gz --ftp-user=hebin_hit@foxmail.com --ftp-password=4fqlsydncrdxvndi -r

    解压数据包:

    1 gzip -d sogouca.tar.gz 2 tar -xvf sogouca.tar

    再将生成的txt文件归并到sogouca.txt中,取出其中包含content的行并转码,得到语料corpus.txt,大小为2.7g。

    1 cat *.txt > sogouca.txt 2 cat sogouca.txt | iconv -f gbk -t utf-8 -c | grep "" > corpus.txt

    2、分词

    ansj对corpus.txt进行分词,得到分词结果resultbig.txt,大小为3.1g。

    分词工具ansj参见 
    在分词工具seg_tool目录下先编译再执行得到分词结果resultbig.txt,内含426221个词,次数总计572308385个。
     分词结果:
      
    3、用word2vec工具训练词向量
    1 nohup ./word2vec -train resultbig.txt -output vectors.bin -cbow 0 -size 200 -window 5 -negative 0 -hs 1 -sample 1e-3 -threads 12 -binary 1 &

    vectors.bin是word2vec处理resultbig.txt后生成的词的向量文件,在实验室的服务器上训练了1个半小时。

    4、分析
    4.1 计算相似的词:
    1 ./distance vectors.bin

     ./distance可以看成计算词与词之间的距离,把词看成向量空间上的一个点,distance看成向量空间上点与点的距离。

    下面是一些例子: 

    4.2 潜在的语言学规律

    在对demo-analogy.sh修改后得到下面几个例子:
    法国的首都是巴黎,英国的首都是伦敦, vector("法国") - vector("巴黎) vector("英国") --> vector("伦敦")"

    4.3 聚类

    将经过分词后的语料resultbig.txt中的词聚类并按照类别排序:

    1 nohup ./word2vec -train resultbig.txt -output classes.txt -cbow 0 -size 200 -window 5 -negative 0 -hs 1 -sample 1e-3 -threads 12 -classes 500  & 2 sort classes.txt -k 2 -n > classes_sorted_sogouca.txt  

    例如:

    4.4 短语分析

    先利用经过分词的语料resultbig.txt中得出包含词和短语的文件sogouca_phrase.txt,再训练该文件中词与短语的向量表示。

    1 ./word2phrase -train resultbig.txt -output sogouca_phrase.txt -threshold 500 -debug 2 2 ./word2vec -train sogouca_phrase.txt -output vectors_sogouca_phrase.bin -cbow 0 -size 300 -window 10 -negative 0 -hs 1 -sample 1e-3 -threads 12 -binary 1

    下面是几个计算相似度的例子:

    5、参考链接

    1. word2vec:tool for computing continuous distributed representations of words,

    2. 用中文把玩google开源的deep-learning项目word2vec,

    3. 利用word2vec对关键词进行聚类,

    6、后续准备仔细阅读的文献:

    [1] tomas mikolov, kai chen, greg corrado, and jeffrey dean. . in proceedings of workshop at iclr, 2013.
    [2] tomas mikolov, ilya sutskever, kai chen, greg corrado, and jeffrey dean. . in proceedings of nips, 2013.
    [3] tomas mikolov, wen-tau yih, and geoffrey zweig. . in proceedings of naacl hlt, 2013.

    [4] collobert r, weston j, bottou l, et al. . the journal of machine learning research, 2011, 12: 2493-2537.

     



    simone 2016-01-13 13:49 发表评论
    ]]>
    网站地图