@echo off
node build.js
node r.js -o build_main.json
@pause
keytool 是一个java数据证书的管理工具 ,keytool将密钥(key)和证书(certificates)存在一个称为keystore的文件中在keystore里,包含两种数据:密钥实体(key entity)-密钥(secret key)或者是私钥和配对公钥(采用非对称加密)可信任的证书实体(trusted certificate entries)-只包含公钥.
jdk中keytool常用参数说明(不同版本有差异,详细可参见【附录】中的官方文档链接):
缺省情况下,-list 命令打印证书的 md5 指纹。而如果指定了 -v 选项,将以可读格式打印证书,如果指定了 -rfc 选项,将以可打印的编码格式输出证书。
genymotion号称模拟器中运行最快的,但是服务器在国外,android镜像下载起来那个速度就不想说了。
add new device后下载速度太慢了,容易失败
解决方法如下:
方法一:
1、设置http代理,在setting->network,自己设置http proxy和port,
方法二:
1、找到下载链接,直接用迅雷拖下来
遇到下载失败或者下载太慢,win r打开运行框,输入 %appdata%, 再点击上一步(alt ↑ ),找到local文件夹里的genymobile,打开 查看里面的genymotion.log文件,
找到类似下面的文字
[genymotion] [debug] downloading file
"http://files2.genymotion.com/dists/4.1.1/ova/genymotion_vbox86p_4.1.1_151117_133208.ova"
将http://file........ova 这个虚拟镜像地址直接用迅雷极速版下载,或者使用迅雷离线下载等功能很快能完成下载
2、把下载的文件复制到c:\users\用户主目录\appdata\local\genymobile\genymotion\ova 中覆盖里面以随机数命名的对应镜像。实际上就是刚才看到genymotion软件刚刚点击下载的那个镜像,
3、重新按照刚刚下载操作gui的下载步骤,你会发现对应的镜像已经可以使用了不需要下载了,验证安装后即会显示在设备列表中。
点击start ,启动模拟器,开始使用
本经验目前在已有搜狗输入法 for linux和sublime text 3的情况下安装成功。
将上一步的代码编译成共享库libsublime-imfix.so,命令
cd ~
gcc -shared -o libsublime-imfix.so sublime_imfix.c `pkg-config --libs --cflags gtk -2.0` -fpic
sudo apt-get install build-essential
sudo apt-get install libgtk2.0-dev
然后将libsublime-imfix.so拷贝到sublime_text所在文件夹
sudo mv libsublime-imfix.so /opt/sublime_text/
修改sublime-text-2.desktop注意:sublime_text.desktop不同版本有所不同,请调整为自己安装版本的路径sudo vim /usr/share/applications/sublime_text.desktop
在上一篇博客中,我们分别介绍了两种方法来进行 http 的模拟请求:httpurlconnection
和 httpclient
,到目前为止这两种方法都工作的很好,基本上可以实现我们需要的 get/post 方法的模拟。对于一个爬虫来说,能发送 http 请求,能获取页面数据,能解析网页内容,这相当于已经完成 80% 的工作了。只不过对于剩下的这 20% 的工作,还得花费我们另外 80% 的时间 :-)
在这篇博客里,我们将介绍剩下 20% 的工作中最为重要的一项:如何在 java 中使用 http 代理,代理也是爬虫技术中的重要一项。你如果要大规模的爬别人网页上的内容,必然会对人家的网站造成影响,如果你太拼了,就会遭人查封。要防止别人查封我 们,我们要么将自己的程序分布到大量机器上去,但是对于资金和资源有限的我们来说这是很奢侈的;要么就使用代理技术,从网上捞一批代理,免费的也好收费的 也好,或者购买一批廉价的 vps 来搭建自己的代理服务器。关于如何搭建自己的代理服务器,后面有时间的话我再写一篇关于这个话题的博客。现在有了一大批代理服务器之后,就可以使用我们这 篇博客所介绍的技术了。
我们先从最简单的开始,网上有很多免费代理,直接上百度搜索 “免费代理” 或者 “http 代理” 就能找到很多(虽然网上能找到大量的免费代理,但它们的安全性已经有很多文章讨论过了,也有专门的文章对此进行调研的,譬如,我在这里就不多作说明,如果你的爬虫爬取的信息并没有什么特别的隐私问题,可以忽略之,如果你的爬虫涉及一些例如模拟登录之类的功能,考虑到安全性,我建议你还是不要使用网上公开的免费代理,而是搭建自己的代理服务器比较靠谱)。
httpurlconnection 的 openconnection()
方法可以传入一个 proxy 参数,如下:
1 2 3 | proxy proxy = new proxy(proxy.type.http, new inetsocketaddress( "127.0.0.1" , 9876 )); url obj = new ; httpurlconnection con = (httpurlconnection) obj.openconnection(proxy); |
ok 了,就这么简单!
不仅如此,我们注意到 proxy 构造函数的第一个参数为枚举类型 proxy.type.http
,那么很显然,如果将其修改为 proxy.type.socks
即可以使用 socks 代理。
由于 httpclient
非常灵活,使用 httpclient 来连接代理有很多不同的方法。最简单的方法莫过于下面这样:
1 2 3 4 | httphost proxy = new httphost( "127.0.0.1" , 9876 , "http" ); closeablehttpclient httpclient = httpclients.createdefault(); httpget request = new httpget(url); closeablehttpresponse response = httpclient.execute(proxy, request); |
和上一篇中使用 httpclient 发送请求的代码几乎一样,只是 httpclient.execute()
方法多加了一个参数,第一参数为 httphost
类型,我们这里设置成我们的代理即可。
这里要注意一点的是,虽然这里的 new httphost()
和上面的 new proxy()
一样,也是可以指定协议类型的,但是遗憾的是 httpclient 默认是不支持 socks 协议的,如果我们使用下面的代码:
1 | httphost proxy = new httphost( "127.0.0.1" , 1080 , "socks" ); |
将会直接报协议不支持异常:
org.apache.http.conn.unsupportedschemeexception: socks protocol is not supported
如果希望 httpclient 支持 socks 代理,可以参看这里: 通过 httpclient 提供的 connectionsocketfactory 类来实现。
虽然使用这种方式很简单,只需要加个参数就可以了,但是其实看 httpclient 的代码注释,如下:
1 2 3 4 5 6 7 | /* * @param target the target host for the request. * implementations may accept * if they can still determine a route, for example * to a default target or by inspecting the request. * @param request the request to execute */ |
可以看到第一个参数 target 并不是代理,它的真实作用是 执行请求的目标主机,这个解释有点模糊,什么叫做 执行请求的目标主机?代理算不算执行请求的目标主机呢?因为按常理来讲,执行请求的目标主机 应该是要请求 url 对应的站点才对。如果不算的话,为什么这里将 target 设置成代理也能正常工作?这个我也不清楚,还需要进一步研究下 httpclient 的源码来深入了解下。
除了上面介绍的这种方式(自己写的,不推荐使用)来使用代理之外,httpclient 凯发k8网页登录官网还提供了几个示例,我将其作为推荐写法记录在此。
第一种写法是使用 ,如下:
1 2 3 4 5 6 7 8 9 10 | closeablehttpclient httpclient = httpclients.createdefault(); httpget request = new httpget(url); request.setconfig( requestconfig.custom() .setproxy( new httphost( "45.32.21.237" , 8888 , "http" )) .build() ); closeablehttpresponse response = httpclient.execute(request); |
第二种写法是使用 ,如下:
1 2 3 4 5 6 7 | httphost proxy = new httphost( "127.0.0.1" , 9876 , "http" ); defaultproxyrouteplanner routeplanner = new defaultproxyrouteplanner(proxy); closeablehttpclient httpclient = httpclients.custom() .setrouteplanner(routeplanner) .build(); httpget request = new httpget(url); closeablehttpresponse response = httpclient.execute(request); |
我们在调试 http 爬虫程序时,常常需要切换代理来测试,有时候直接使用系统自带的代理配置将是一种简单的方法。以前在做 .net 项目时,程序默认使用 internet 网络设置中配的代理,遗憾的是,我这里说的系统代理配置指的 jvm 系统,而不是操作系统,我还没找到简单的方法来让 java 程序直接使用 windows 系统下的代理配置。
尽管如此,系统代理使用起来还是很简单的。一般有下面两种方式可以设置 jvm 的代理配置:
java 中的 system
类不仅仅是用来给我们 system.out.println()
打印信息的,它其实还有很多静态方法和属性可以用。其中 system.setproperty()
就是比较常用的一个。
可以通过下面的方式来分别设置 http 代理,https 代理和 socks 代理:
1 2 3 4 5 6 7 8 9 10 11 12 | // http 代理,只能代理 http 请求 system.setproperty( "http.proxyhost" , "127.0.0.1" ); system.setproperty( "http.proxyport" , "9876" ); // https 代理,只能代理 https 请求 system.setproperty( "https.proxyhost" , "127.0.0.1" ); system.setproperty( "https.proxyport" , "9876" ); // socks 代理,支持 http 和 https 请求 // 注意:如果设置了 socks 代理就不要设 http/https 代理 system.setproperty( "socksproxyhost" , "127.0.0.1" ); system.setproperty( "socksproxyport" , "1080" ); |
这里有三点要说明:
socksproxyhost
和 socksproxyport
中间没有小数点1 2 3 | // 同时支持代理 http/https 请求 system.setproperty( "proxyhost" , "127.0.0.1" ); system.setproperty( "proxyport" , "9876" ); |
可以使用 system.setproperty()
方法来设置系统代理,也可以直接将这些参数通过 jvm 的命令行参数来指定。如果你使用的是 eclipse ,可以按下面的步骤来设置:
-dproxyhost=127.0.0.1 -dproxyport=9876
上面两种方法都可以设置系统,下面要怎么在程序中自动使用系统代理呢?
对于 httpurlconnection
类来说,程序不用做任何变动,它会默认使用系统代理。但是 httpclient
默认是不使用系统代理的,如果想让它默认使用系统代理,可以通过 systemdefaultrouteplanner
和 proxyselector
来设置。示例代码如下:
1 2 3 4 5 6 | systemdefaultrouteplanner routeplanner = new systemdefaultrouteplanner(proxyselector.getdefault()); closeablehttpclient httpclient = httpclients.custom() .setrouteplanner(routeplanner) .build(); httpget request = new httpget(url); closeablehttpresponse response = httpclient.execute(request); |
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.manifestresourcetransformer">
<mainclass>com.duxiu.demo.app.applicationktmainclass>
transformer>
transformers>
<plugin>
<groupid>org.apache.maven.pluginsgroupid>
<artifactid>maven-assembly-pluginartifactid>
<configuration>
<descriptorrefs>
<descriptorref>jar-with-dependenciesdescriptorref>
descriptorrefs>
<archive>
<manifest>
<mainclass>com.duxiu.demo.app.applicationktmainclass>
manifest>
archive>
configuration>
<executions>
<execution>
<phase>packagephase>
<goals>
<goal>singlegoal>
goals>
execution>
executions>
plugin>
<plugin>
<groupid>org.codehaus.mojogroupid>
<artifactid>appassembler-maven-pluginartifactid>
<version>1.10version>
<configuration>
<platforms>
<platform>windowsplatform>
<platform>unixplatform>
platforms>
<assembledirectory>${project.build.directory}/mallassembledirectory>
<repositoryname>librepositoryname>
<binfolder>binbinfolder>
<configurationdirectory>confconfigurationdirectory>
<copyconfigurationdirectory>truecopyconfigurationdirectory>
<configurationsourcedirectory>src/main/resourcesconfigurationsourcedirectory>
<repositorylayout>flatrepositorylayout>
<encoding>utf-8encoding>
<logsdirectory>logslogsdirectory>
<tempdirectory>tmptempdirectory>
<programs>
<program>
<id>mallid>
<mainclass>com.duxiu.demo.app.applicationktmainclass>
<jvmsettings>
<extraarguments>
<extraargument>-serverextraargument>
<extraargument>-xmx2gextraargument>
<extraargument>-xms2gextraargument>
extraarguments>
jvmsettings>
program>
programs>
configuration>
plugin>
<plugin>
<artifactid>maven-antrun-pluginartifactid>
<executions>
<execution>
<id>move-main-classid>
<phase>compilephase>
<configuration>
<tasks>
<move todir="${project.build.directory}/${project.artifactid}-${version}/com/duxiu/demo/app">
<fileset dir="${project.build.directory}/classes/com/duxiu/demo/app">
<include name="*.class" />
fileset>
move>
tasks>
configuration>
<goals>
<goal>rungoal>
goals>
execution>
executions>
plugin>
这样做的最终效果就是修改了项目的运行方式。原先的运行方式是以tomcat为中心,由tomcat来启动和终止项目,现在是由我们的启动程序 为中心,由启动程序来负责启动和终止项目。就相当于现在流行的cs程序一样,有单独的启动脚本,在启动时进行环境预初始化,更新程序以及其它操作,待完成 之后再进行最终的项目启动。
这篇主要讲解如何使用embeded tomcat在代码中进行启动和终止。网上的一般文章均为tomca5.x来做,这里使用了最新的tomcat7,因为tomcat7为embeded开 发,单独发布了org.apache.tomcat.embed包,以进行独立的embed开发。以下是相应的maven包
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 | < dependency > < groupid >org.apache.tomcat.embed
|
使用了embed包中的core包,以及用于编译jsp的jasper包,然后是工具类以及进行上场记录的logging-juli包。开始写代码:
1 2 3 4 5 6 7 | //设置工作目录 string catalina_home = "d:/" ; tomcat tomcat = new tomcat(); tomcat.sethostname( "localhost" ); tomcat.setport(startport); //设置工作目录,其实没什么用,tomcat需要使用这个目录进行写一些东西 tomcat.setbasedir(catalina_home); |
上面使用了tomcat类来进行启动类,在tomcat7以前均是使用一个叫embed类来进行启动,在tomcat7之后,embed类被不建 议使用,而建议使用新的tomcat类来进行启动了。然后设置主机名,端口,再设置一个工作目录。这个工作目录可以是任意目录,主要是tomcat需要这 个目录来记录一些东西,比如记录word信息,日志信息(如果配置了日志的话),以及临时文件存储等。
1 2 3 4 5 6 7 8 | //设置程序的目录信息 tomcat.gethost().setappbase( "e:/" ); // add aprlifecyclelistener standardserver server = (standardserver) tomcat.getserver(); aprlifecyclelistener listener = new aprlifecyclelistener(); server.addlifecyclelistener(listener); //注册关闭端口以进行关闭 tomcat.getserver().setport(shutdownport); |
上面的代码,首先设置我们的项目程序所在的appbase,即放项目代码的地方。在通常的tomcat配置中,这个目录一般是webapps。接 着设置一个listener,这个listener主要是负责启动一些比如html native支持程序以及ipv6等信息配置(可以忽略)。接着是配置一个关闭的注册端口,当向这个端口发送信息时,就可以达到关闭tomcat的目的 (后面会讲)。
1 2 3 4 5 6 7 8 9 | //加载上下文 standardcontext standardcontext = new standardcontext(); standardcontext.setpath( "/aa" ); //contextpath standardcontext.setdocbase( "aa" ); //文件目录位置 standardcontext.addlifecyclelistener( new tomcat.defaultwebxmllistener()); //保证已经配置好了。 standardcontext.addlifecyclelistener( new tomcat.fixcontextlistener()); standardcontext.setsessioncookiename( "t-session" ); tomcat.gethost().addchild(standardcontext); |
我们单独使用了一个context来为这个host添加上下文,tomcat本身提供一个方法tomcat.addweb方法来添加项目包,不过 由于这里需要单独设置一个tomcat的sessionname,所以使用与与tomcat.addweb实现类似的方法来添加一个项目包。
以上代码中有两个需要注意的listener,一个是defaultwebxmllistener,这个是由tomcat加载一些默认的配置信 息,比如jspservlet,以及一些繁复的mime/type信息;加上这个,就不需要我们自己去写这么多的配置,因为每个项目都需要这些。这个配置 与tomcat目录下的conf/web.xml中的配置一样,只不过这里是代码化了。第二个是fixcontextlistener,这个主要是在项目 部署完后,将这个上下文设置为configured,表示已经配置好了(不然,tomcat启动时会报错,即相应上下文还未配置好)。
配置ok了之后,就是启动tomcat了:
1 2 | tomcat.start(); tomcat.getserver().await(); |
启动tomcat,并让tomcat在关闭端口上监听。如果没有最后一句,程序将直接结束,保证监听之后,tomcat将一直监听关闭事件,待有关闭事件之后才结束当前程序。所以如果想要关闭当前的tomcat,只需要向关闭端口发送一些信息即可:
1 2 3 4 5 6 7 8 9 | private static void shutdown() throws exception { socket socket = new socket( "localhost" , shutdownport); outputstream stream = socket.getoutputstream(); for ( int i = 0 ;i < shutdown.length();i ) stream.write(shutdown.charat(i)); stream.flush(); stream.close(); socket.close(); } |
这样即可达到关闭tomcat的目的。
实际上看整个项目代码,项目代码的运行,就是一个配置一个基础的server.xml(即tomcat目录下的 conf/server.xml),先配置运行端口,关闭监听端口;然后配置运行的host以及添加一个上下文context,最后就开始运行并开始监 听。对照这个程序,再看一下server.xml中的配置信息,就很容易明白以上这段代码了。
在安装好kerberos的软件之后,会用到几个配置文件,例如
/etc/krb5.conf
/var/kerberos/krb5kdc/kdc.conf
可以用命令man krb5.conf
来查看关于该配置文件的说明
先看一下该文件的模板:
[logging]
default = file:/var/log/krb5libs.log
kdc = file:/var/log/krb5kdc.log
admin_server = file:/var/log/kadmind.log
[libdefaults]
default_realm = example.com
dns_lookup_realm = false
dns_lookup_kdc = false
ticket_lifetime = 24h
renew_lifetime = 7d
forwardable = true
[realms]
example.com = {
kdc = example.com
admin_server = example.com
}
[domain_realm]
.example.com = example.com
example.com = example.com
关于几个重要配置项的说明
[realms].kdc
: the name of the host running a kdc for that realm.
[realms].admin_server
: identifies the host where the administration server is running. typically this is the master kerberos server.
[domain_realm]
: provides a translation from a hostname to the kerberos realm name for the service provided by that host.
kerberos is a network authentication system based on the principal of a trusted third party. the other two parties being the user and the service the user wishes to authenticate to. not all services and applications can use kerberos, but for those that can, it brings the network environment one step closer to being single sign on (sso).
this section covers installation and configuration of a kerberos server, and some example client configurations.
if you are new to kerberos there are a few terms that are good to understand before setting up a kerberos server. most of the terms will relate to things you may be familiar with in other environments:
principal: any users, computers, and services provided by servers need to be defined as kerberos principals.
instances: are used for service principals and special administrative principals.
realms: the unique realm of control provided by the kerberos installation. usually the dns domain converted to uppercase (example.com).
key distribution center: (kdc) consist of three parts, a database of all principals, the authentication server, and the ticket granting server. for each realm there must be at least one kdc.
ticket granting ticket: issued by the authentication server (as), the ticket granting ticket (tgt) is encrypted in the user's password which is known only to the user and the kdc.
ticket granting server: (tgs) issues service tickets to clients upon request.
tickets: confirm the identity of the two principals. one principal being a user and the other a service requested by the user. tickets establish an encryption key used for secure communication during the authenticated session.
keytab files: are files extracted from the kdc principal database and contain the encryption key for a service or host.
to put the pieces together, a realm has at least one kdc, preferably two for redundancy, which contains a database of principals. when a user principal logs into a workstation, configured for kerberos authentication, the kdc issues a ticket granting ticket (tgt). if the user supplied credentials match, the user is authenticated and can then request tickets for kerberized services from the ticket granting server (tgs). the service tickets allow the user to authenticate to the service without entering another username and password.
before installing the kerberos server a properly configured dns server is needed for your domain. since the kerberos realm by convention matches the domain name, this section uses the example.com domain configured in .
also, kerberos is a time sensitive protocol. so if the local system time between a client machine and the server differs by more than five minutes (by default), the workstation will not be able to authenticate. to correct the problem all hosts should have their time synchronized using the network time protocol (ntp). for details on setting up ntp see .
the first step in installing a kerberos realm is to install the krb5-kdc and krb5-admin-server packages. from a terminal enter:
sudo apt-get install krb5-kdc krb5-admin-server
you will be asked at the end of the install to supply a name for the kerberos and admin servers, which may or may not be the same server, for the realm.
next, create the new realm with the kdb5_newrealm utility:
sudo krb5_newrealm
the questions asked during installation are used to configure the /etc/krb5.conf
file. if you need to adjust the key distribution center (kdc) settings simply edit the file and restart the krb5-kdc daemon.
now that the kdc running an admin user is needed. it is recommended to use a different username from your everyday username. using the kadmin.local utility in a terminal prompt enter:
sudo kadmin.localauthenticating as principal root/admin@example.com with password. kadmin.local:
addprinc steve/admin
warning: no policy specified for steve/admin@example.com; defaulting to no policy enter password for principal "steve/admin@example.com": re-enter password for principal "steve/admin@example.com": principal "steve/admin@example.com" created. kadmin.local:
quit
in the above example steve is the principal, /admin is an instance, and @example.com signifies the realm. the "every day" principal would be steve@example.com, and should have only normal user rights.
replace example.com and steve with your realm and admin username. |
next, the new admin user needs to have the appropriate access control list (acl) permissions. the permissions are configured in the /etc/krb5kdc/kadm5.acl
file:
steve/admin@example.com *
this entry grants steve/admin the ability to perform any operation on all principals in the realm.
now restart the krb5-admin-server for the new acl to take affect:
sudo /etc/init.d/krb5-admin-server restart
the new user principal can be tested using the kinit utility:
kinit steve/admin steve/admin@example.com's password:
after entering the password, use the klist utility to view information about the ticket granting ticket (tgt):
klist credentials cache: file:/tmp/krb5cc_1000 principal: steve/admin@example.com issued expires principal jul 13 17:53:34 jul 14 03:53:34 krbtgt/example.com@example.com
you may need to add an entry into the /etc/hosts
for the kdc. for example:
192.168.0.1 kdc01.example.com kdc01
replacing 192.168.0.1 with the ip address of your kdc.
in order for clients to determine the kdc for the realm some dns srv records are needed. add the following to /etc/named/db.example.com
:
_kerberos._udp.example.com. in srv 1 0 88 kdc01.example.com. _kerberos._tcp.example.com. in srv 1 0 88 kdc01.example.com. _kerberos._udp.example.com. in srv 10 0 88 kdc02.example.com. _kerberos._tcp.example.com. in srv 10 0 88 kdc02.example.com. _kerberos-adm._tcp.example.com. in srv 1 0 749 kdc01.example.com. _kpasswd._udp.example.com. in srv 1 0 464 kdc01.example.com.
replace example.com, kdc01, and kdc02 with your domain name, primary kdc, and secondary kdc. |
see for detailed instructions on setting up dns.
your new kerberos realm is now ready to authenticate clients.
once you have one key distribution center (kdc) on your network, it is good practice to have a secondary kdc in case the primary becomes unavailable.
first, install the packages, and when asked for the kerberos and admin server names enter the name of the primary kdc:
sudo apt-get install krb5-kdc krb5-admin-server
once you have the packages installed, create the secondary kdc's host principal. from a terminal prompt, enter:
kadmin -q "addprinc -randkey host/kdc02.example.com"
after, issuing any kadmin commands you will be prompted for your username/admin@example.com principal password. |
extract the keytab file:
kadmin -q "ktadd -k keytab.kdc02 host/kdc02.example.com"
there should now be a keytab.kdc02
in the current directory, move the file to /etc/krb5.keytab
:
sudo mv keytab.kdc02 /etc/krb5.keytab
if the path to the |
also, you can list the principals in a keytab file, which can be useful when troubleshooting, using the klist utility:
sudo klist -k /etc/krb5.keytab
next, there needs to be a kpropd.acl
file on each kdc that lists all kdcs for the realm. for example, on both primary and secondary kdc, create /etc/krb5kdc/kpropd.acl
:
host/kdc01.example.com@example.com host/kdc02.example.com@example.com
create an empty database on the secondary kdc:
sudo kdb5_util -s create
now start the kpropd daemon, which listens for connections from the kprop utility. kprop is used to transfer dump files:
sudo kpropd -s
from a terminal on the primary kdc, create a dump file of the principal database:
sudo kdb5_util dump /var/lib/krb5kdc/dump
extract the primary kdc's keytab file and copy it to /etc/krb5.keytab
:
kadmin -q "ktadd -k keytab.kdc01 host/kdc01.example.com" sudo mv keytab.kdc01 /etc/kr5b.keytab
make sure there is a host for kdc01.example.com before extracting the keytab. |
using the kprop utility push the database to the secondary kdc:
sudo kprop -r example.com -f /var/lib/krb5kdc/dump kdc02.example.com
there should be a succeeded message if the propagation worked. if there is an error message check |
you may also want to create a cron job to periodically update the database on the secondary kdc. for example, the following will push the database every hour:
# m h dom mon dow command 0 * * * * /usr/sbin/kdb5_util dump /var/lib/krb5kdc/dump && /usr/sbin/kprop -r example.com -f /var/lib/krb5kdc/dump kdc02.example.com
back on the secondary kdc, create a stash file to hold the kerberos master key:
sudo kdb5_util stash
finally, start the krb5-kdc daemon on the secondary kdc:
sudo /etc/init.d/krb5-kdc start
the secondary kdc should now be able to issue tickets for the realm. you can test this by stopping the krb5-kdc daemon on the primary kdc, then use kinit to request a ticket. if all goes well you should receive a ticket from the secondary kdc.
this section covers configuring a linux system as a kerberos client. this will allow access to any kerberized services once a user has successfully logged into the system.
in order to authenticate to a kerberos realm, the krb5-user and libpam-krb5 packages are needed, along with a few others that are not strictly necessary but make life easier. to install the packages enter the following in a terminal prompt:
sudo apt-get install krb5-user libpam-krb5 libpam-ccreds auth-client-config
the auth-client-config package allows simple configuration of pam for authentication from multiple sources, and the libpam-ccreds will cache authentication credentials allowing you to login in case the key distribution center (kdc) is unavailable. this package is also useful for laptops that may authenticate using kerberos while on the corporate network, but will need to be accessed off the network as well.
to configure the client in a terminal enter:
sudo dpkg-reconfigure krb5-config
you will then be prompted to enter the name of the kerberos realm. also, if you don't have dns configured with kerberos srv records, the menu will prompt you for the hostname of the key distribution center (kdc) and realm administration server.
the dpkg-reconfigure adds entries to the /etc/krb5.conf
file for your realm. you should have entries similar to the following:
[libdefaults] default_realm = example.com ... [realms] example.com = } kdc = 192.168.0.1 admin_server = 192.168.0.1 }
you can test the configuration by requesting a ticket using the kinit utility. for example:
kinit steve@example.com password for steve@example.com:
when a ticket has been granted, the details can be viewed using klist:
klist ticket cache: file:/tmp/krb5cc_1000 default principal: steve@example.com valid starting expires service principal 07/24/08 05:18:56 07/24/08 15:18:56 krbtgt/example.com@example.com renew until 07/25/08 05:18:57 kerberos 4 ticket cache: /tmp/tkt1000 klist: you have no tickets cached
next, use the auth-client-config to configure the libpam-krb5 module to request a ticket during login:
sudo auth-client-config -a -p kerberos_example
you will should now receive a ticket upon successful login authentication.
for more information on kerberos see the site.
the page has more details.
o'reilly's is a great reference when setting up kerberos.
also, feel free to stop by the #ubuntu-server irc channel on if you have kerberos questions.
借用mysql 的 auto_increment 特性可以产生唯一的可靠id。
表定义,关键在于auto_increment,和unique key的设置:
1 2 3 4 5 6 | create table `tickets64` ( `id` bigint (20) unsigned not null auto_increment, `stub` char (1) not null default '' , primary key (`id`), unique key `stub` (`stub`) ) engine=myisam |
需要使用时,巧用replace into语法来获取值,结合表定义的unique key,确保了一条记录就可以满足id生成器的需求:
1 2 | replace into tickets64 (stub) values ( 'a' ); select last_insert_id(); |
以上方式中,通过mysql的机制,可以确保此id的唯一和自增,且适用于多并发的场景。官方对此的描述:https://dev.mysql.com/doc/refman/5.0/en/information-functions.html
1 2 3 | it is multi-user safe because multiple clients can issue the update statement and get their own sequence value with the select statement (or mysql_insert_id()), without affecting or being affected by other clients that generate their own sequence values. |
需要注意的是,若client采用php,则不能使用mysql_insert_id()获取id,原因见《mysql_insert_id() 在bigint型ai字段遇到的问题》:http://kaifage.com/notes/99/mysql-insert-id-issue- with-bigint-ai-field.html。
flickr 采取了此方案: http://code.flickr.net/2010/02/08/ticket-servers-distributed-unique-primary-keys-on-the-cheap/
相关:
http://www.zhihu.com/question/30674667
http://my.oschina.net/u/142836/blog/174465
public static,对应的整数就是二进制的:1001,也就是9。如下:
…… | native | transient | volatile | synchronized | final | static | protected | private | public |
| 0 | 0 | 0 | 0 | 0 | 1 | 0 | 0 | 1 |
mysql 5.1已经到了beta版,官方网站上也陆续有一些文章介绍,比如上次看到的。在使用分区的前提下,可以用mysql实现非常大的数据量存储。今天在mysql的站上又看到一篇进阶的文章 —— 。如果能够实现按日期分区,这对某些时效性很强的数据存储是相当实用的功能。下面是从这篇文章中摘录的一些内容。
最直观的方法,就是直接用年月日这种日期格式来进行常规的分区:
上面的例子中,就是直接用"y-m-d"的格式来对一个table进行分区,可惜想当然往往不能奏效,会得到一个错误信息:
error 1064 (42000): values value must be of same type as partition function near '),
partition p1 values less than ('2010-01-01'))' at line 3
上述分区方式没有成功,而且明显的不经济,老练的dba会用整型数值来进行分区:
搞定?接着往下分析
万恶的mysql居然对上面的sql使用全表扫描,而不是按照我们的日期分区分块查询。原文中解释到的优化器并不认这种日期形式的分区,花了大量的篇幅来引诱俺走上歧路,过分。
mysql优化器支持以下两种内置的日期函数进行分区:
看个例子:
以to_days()函数分区成功,我们分析一下看看:
可以看到,优化器这次不负众望,仅仅在p1分区进行查询。在这种情况下查询,真的能够带来提升查询效率么?下面分别对这次建立的part_date3和之前分区失败的part_date1做一个查询对比:
可以看到,分区正确的话query花费时间为4秒,而分区错误则花费时间40秒(相当于没有分区),效率有90%的提升!所以我们千万要正确的使用分区功能,分区后务必用explain验证,这样才能获得真正的性能提升。
注意:
在mysql5.1中建立分区表的语句中,只能包含下列函数:
abs()
ceiling() and floor() (在使用这2个函数的建立分区表的前提是使用函数的分区键是int类型),例如
mysql> create table t (c float) partition by list( floor(c) )( -> partition p0 values in (1,3,5), -> partition p1 values in (2,4,6) -> );; error 1491 (hy000): the partition function returns the wrong typemysql> create table t (c int) partition by list( floor(c) )( -> partition p0 values in (1,3,5), -> partition p1 values in (2,4,6) -> ); query ok, 0 rows affected (0.01 sec)
day()
dayofmonth()
dayofweek()
dayofyear()
datediff()
extract()
hour()
microsecond()
minute()
mod()
month()
quarter()
second()
time_to_sec()
to_days()
weekday()
year()
yearweek()
4 3 | i have a need to create a server farm that can handle 5 million connections, 5 million topics (one per client), process 300k messages/sec. i tried to see what various message brokers were capable so i am currently using two rhel ec2 instances (r3.4xlarge) to make lots of available resources. so you do not need to look it up, it has 16vcpu, 122gb ram. i am nowhere near that limit in usage. i am unable to pass the 600k connections limit. since there doesn't seem to be any o/s limitation (plenty of ram/cpu/etc.) on either the client nor the server what is limiting me? i have edited /etc/security/limits.conf as follows:
i have edited /etc/sysctl.conf as follows:
for apollo: export apollo_ulimit=20000000 for activemq:
i created 20 additional private addresses for eth0 on the client, then assigned them: ip addr add 11.22.33.44/24 dev eth0 i am fully aware of the 65k port limits which is why i did the above.
| ||||||||||||
|
2 | answer: while doing this i realized that i had a misspelling in my client setting within /etc/sysctl.conf file for: net.ipv4.ip_local_port_range i am now able to connect 956,591 mqtt clients to my apollo server in 188sec. more info: trying to isolate if this is an o/s connection limitation or a broker, i decided to write a simple client/server. the server:
the client:
with 21 ips, i would expect 65535-1024*21 = 1354731 to be the boundary. in reality i am able to achieve 1231734
so the socket/kernel/io stuff is worked out. i am still unable to achieve this using any broker. again just after my client/server test this is the kernel settings. client:
server:
|
spark history server产生背景
以standalone运行模式为例,在运行spark application的时候,spark会提供一个webui列出应用程序的运行时信息;但该webui随着application的完成(成功/失 败)而关闭,也就是说,spark application运行完(成功/失败)后,将无法查看application的历史记录;
spark history server就是为了应对这种情况而产生的,通过配置可以在application执行的过程中记录下了日志事件信息,那么在application执行 结束后,webui就能重新渲染生成ui界面展现出该application在执行过程中的运行时信息;
spark运行在yarn或者mesos之上,通过spark的history server仍然可以重构出一个已经完成的application的运行时参数信息(假如application运行的事件日志信息已经记录下来);
配置&使用spark history server
以默认配置的方式启动spark history server:
cd $spark_home/sbin start-history-server.sh
报错:
starting org.apache.spark.deploy.history.historyserver, logging to /home/spark/software/source/compile/deploy_spark/sbin/../logs/spark-spark-org.apache.spark.deploy.history.historyserver-1-hadoop000.out failed to launch org.apache.spark.deploy.history.historyserver: at org.apache.spark.deploy.history.fshistoryprovider.(fshistoryprovider.scala:44) ... 6 more
需要在启动时指定目录:
start-history-server.sh hdfs://hadoop000:8020/directory
hdfs://hadoop000:8020/directory可以配置在配置文件中,那么在启动history-server时就不需要指定,后续介绍怎么配置;
注:该目录需要事先在hdfs上创建好,否则history-server启动报错。
启动完成之后可以通过webui访问,默认端口是18080:http://hadoop000:18080
默认界面列表信息是空的,下面截图是我跑了几次spark-sql测试后出现的。
history server相关的配置参数描述
1) spark.history.updateinterval
默认值:10
以秒为单位,更新日志相关信息的时间间隔
2)spark.history.retainedapplications
默认值:50
在内存中保存application历史记录的个数,如果超过这个值,旧的应用程序信息将被删除,当再次访问已被删除的应用信息时需要重新构建页面。
3)spark.history.ui.port
默认值:18080
historyserver的web端口
4)spark.history.kerberos.enabled
默认值:false
是否使用kerberos方式登录访问historyserver,对于持久层位于安全集群的hdfs上是有用的,如果设置为true,就要配置下面的两个属性
5)spark.history.kerberos.principal
默认值:用于historyserver的kerberos主体名称
6)spark.history.kerberos.keytab
用于historyserver的kerberos keytab文件位置
7)spark.history.ui.acls.enable
默认值:false
授权用户查看应用程序信息的时候是否检查acl。如果启用,只有应用程序所有者和spark.ui.view.acls指定的用户可以查看应用程序信息;否则,不做任何检查
8)spark.eventlog.enabled
默认值:false
是否记录spark事件,用于应用程序在完成后重构webui
9)spark.eventlog.dir
默认值:file:///tmp/spark-events
保存日志相关信息的路径,可以是hdfs://开头的hdfs路径,也可以是file://开头的本地路径,都需要提前创建
10)spark.eventlog.compress
默认值:false
是否压缩记录spark事件,前提spark.eventlog.enabled为true,默认使用的是snappy
以spark.history开头的需要配置在spark-env.sh中的spark_history_opts,以spark.eventlog开头的配置在spark-defaults.conf
我在测试过程中的配置如下:
spark-defaults.conf
spark.eventlog.enabled true spark.eventlog.dir hdfs://hadoop000:8020/directory spark.eventlog.compress true
spark-env.sh
export spark_history_opts="-dspark.history.ui.port=7777 -dspark.history.retainedapplications=3 -dspark.history.fs.logdirectory=hdfs://had oop000:8020/directory"
参数描述:
spark.history.ui.port=7777 调整webui访问的端口号为7777
spark.history.fs.logdirectory=hdfs://hadoop000:8020/directory 配置了该属性后,在start-history-server.sh时就无需再显示的指定路径
spark.history.retainedapplications=3 指定保存application历史记录的个数,如果超过这个值,旧的应用程序信息将被删除
调整参数后启动start-history-server.sh
start-history-server.sh
访问webui: http://hadoop000:7777
在使用spark history server的过程中产生的几个疑问:
疑问1:spark.history.fs.logdirectory和spark.eventlog.dir指定目录有啥区别?
经测试后发现:
spark.eventlog.dir:application在运行过程中所有的信息均记录在该属性指定的路径下;
spark.history.fs.logdirectory:spark history server页面只展示该指定路径下的信息;
比如:spark.eventlog.dir刚开始时指定的是hdfs://hadoop000:8020/directory,而后修改成hdfs://hadoop000:8020/directory2
那么spark.history.fs.logdirectory如果指定的是hdfs://hadoop000:8020/directory,就只能显示出该目录下的所有application运行的日志信息;反之亦然。
疑问2:spark.history.retainedapplications=3 貌似没生效??????
the history server will list all applications. it will just retain a max number of them in memory. that option does not control how many applications are show, it controls how much memory the hs will need.
注意:该参数并不是也页面中显示的application的记录数,而是存放在内存中的个数,内存中的信息在访问页面时直接读取渲染既可;
比如说该参数配置了10个,那么内存中就最多只能存放10个applicaiton的日志信息,当第11个加入时,第一个就会被踢除,当再次访问第1个application的页面信息时就需要重新读取指定路径上的日志信息来渲染展示页面。
详见官方文档:http://spark.apache.org/docs/latest/monitoring.html
今天在测试spark-sql运行在yarn上的过程中,无意间从日志中发现了一个问题:
spark-sql --master yarn
14/12/29 15:23:17 info client: requesting a new application from cluster with 1 nodemanagers 14/12/29 15:23:17 info client: verifying our application has not requested more than the maximum memory capability of the cluster (8192 mb per container) 14/12/29 15:23:17 info client: will allocate am container, with 896 mb memory including 384 mb overhead 14/12/29 15:23:17 info client: setting up container launch context for our am 14/12/29 15:23:17 info client: preparing resources for our am container 14/12/29 15:23:17 info client: uploading resource file:/home/spark/software/source/compile/deploy_spark/assembly/target/scala-2.10/spark-assembly-1.3.0-snapshot-hadoop2.3.0-cdh5.0.0.jar -> hdfs://hadoop000:8020/user/spark/.sparkstaging/application_1416381870014_0093/spark-assembly-1.3.0-snapshot-hadoop2.3.0-cdh5.0.0.jar 14/12/29 15:23:18 info client: setting up the launch environment for our am container
再开启一个spark-sql命令行,从日志中再次发现:
14/12/29 15:24:03 info client: requesting a new application from cluster with 1 nodemanagers 14/12/29 15:24:03 info client: verifying our application has not requested more than the maximum memory capability of the cluster (8192 mb per container) 14/12/29 15:24:03 info client: will allocate am container, with 896 mb memory including 384 mb overhead 14/12/29 15:24:03 info client: setting up container launch context for our am 14/12/29 15:24:03 info client: preparing resources for our am container 14/12/29 15:24:03 info client: uploading resource file:/home/spark/software/source/compile/deploy_spark/assembly/target/scala-2.10/spark-assembly-1.3.0-snapshot-hadoop2.3.0-cdh5.0.0.jar -> hdfs://hadoop000:8020/user/spark/.sparkstaging/application_1416381870014_0094/spark-assembly-1.3.0-snapshot-hadoop2.3.0-cdh5.0.0.jar 14/12/29 15:24:05 info client: setting up the launch environment for our am container
然后查看hdfs上的文件:
hadoop fs -ls hdfs://hadoop000:8020/user/spark/.sparkstaging/
drwx------ - spark supergroup 0 2014-12-29 15:23 hdfs://hadoop000:8020/user/spark/.sparkstaging/application_1416381870014_0093 drwx------ - spark supergroup 0 2014-12-29 15:24 hdfs://hadoop000:8020/user/spark/.sparkstaging/application_1416381870014_0094
每个application都会上传一个spark-assembly-x.x.x-snapshot-hadoopx.x.x-cdhx.x.x.jar的jar包,影响hdfs的性能以及占用hdfs的空间。
在spark文档(http://spark.apache.org/docs/latest/running-on-yarn.html)中发现spark.yarn.jar属性,将spark-assembly-xxxxx.jar存放在hdfs://hadoop000:8020/spark_lib/下
在spark-defaults.conf添加属性配置:
spark.yarn.jar hdfs://hadoop000:8020/spark_lib/spark-assembly-1.3.0-snapshot-hadoop2.3.0-cdh5.0.0.jar
再次启动spark-sql --master yarn观察日志:
14/12/29 15:39:02 info client: requesting a new application from cluster with 1 nodemanagers 14/12/29 15:39:02 info client: verifying our application has not requested more than the maximum memory capability of the cluster (8192 mb per container) 14/12/29 15:39:02 info client: will allocate am container, with 896 mb memory including 384 mb overhead 14/12/29 15:39:02 info client: setting up container launch context for our am 14/12/29 15:39:02 info client: preparing resources for our am container 14/12/29 15:39:02 info client: source and destination file systems are the same. not copying hdfs://hadoop000:8020/spark_lib/spark-assembly-1.3.0-snapshot-hadoop2.3.0-cdh5.0.0.jar 14/12/29 15:39:02 info client: setting up the launch environment for our am container
观察hdfs上文件
hadoop fs -ls hdfs://hadoop000:8020/user/spark/.sparkstaging/application_1416381870014_0097
该application对应的目录下没有spark-assembly-xxxxx.jar了,从而节省assembly包上传的过程以及hdfs空间占用。
我在测试过程中遇到了类似如下的错误:
application application_xxxxxxxxx_yyyy failed 2 times due to am container for application_xxxxxxxxx_yyyy
exited with exitcode: -1000 due to: java.io.filenotfoundexception: file /tmp/hadoop-spark/nm-local-dir/filecache does not exist
在/tmp/hadoop-spark/nm-local-dir路径下创建filecache文件夹即可解决报错问题。
wrote a about how linkedin uses as a central publish-subscribe log for integrating data between applications, stream processing, and hadoop data ingestion.
to actually make this work, though, this "universal log" has to be a cheap abstraction. if you want to use a system as a central data hub it has to be fast, predictable, and easy to scale so you can dump all your data onto it. my experience has been that systems that are fragile or expensive inevitably develop a wall of protective process to prevent people from using them; a system that scales easily often ends up as a key architectural building block just because using it is the easiest way to get things built.
i've always liked the benchmarks of cassandra that show it doing a million writes per second on three hundred machines on and . i'm not sure why, maybe it is a thing, but doing of anything per second is fun.
in any case, one of the nice things about a kafka log is that, as we'll see, it is cheap. a million writes per second isn't a particularly big thing. this is because a log is a much simpler thing than a database or key-value store. indeed our production clusters take tens of millions of reads and writes per second all day long and they do so on pretty modest hardware.
but let's do some benchmarking and take a look.
to help understand the benchmark, let me give a quick review of what kafka is and a few details about how it works. kafka is a distributed messaging system originally built at linkedin and now part of the and by a .
the general setup is quite simple. producers send records to the cluster which holds on to these records and hands them out to consumers:
the key abstraction in kafka is the topic. producers publish their records to a topic, and consumers subscribe to one or more topics. a kafka topic is just a sharded write-ahead log. producers append records to these logs and consumers subscribe to changes. each record is a key/value pair. the key is used for assigning the record to a log partition (unless the publisher specifies the partition directly).
here is a simple example of a single producer and consumer reading and writing from a two-partition topic.
this picture shows a producer process appending to the logs for the two partitions, and a consumer reading from the same logs. each record in the log has an associated entry number that we call the offset. this offset is used by the consumer to describe it's position in each of the logs.
these partitions are spread across a cluster of machines, allowing a topic to hold more data than can fit on any one machine.
note that unlike most messaging systems the log is always persistent. messages are immediately written to the filesystem when they are received. messages are not deleted when they are read but retained with some configurable sla (say a few days or a week). this allows usage in situations where the consumer of data may need to reload data. it also makes it possible to support space-efficient publish-subscribe as there is a single shared log no matter how many consumers; in traditional messaging systems there is usually a queue per consumer, so adding a consumer doubles your data size. this makes kafka a good fit for things outside the bounds of normal messaging systems such as acting as a pipeline for offline data systems such as hadoop. these offline systems may load only at intervals as part of a periodic etl cycle, or may go down for several hours for maintenance, during which time kafka is able to buffer even tbs of unconsumed data if needed.
kafka also replicates its logs over multiple servers for fault-tolerance. one important architectural aspect of our , in contrast to other messaging systems, is that replication is not an exotic bolt-on that requires complex configuration, only to be used in very specialized cases. instead replication is assumed to be the default: we treat un-replicated data as a special case where the replication factor happens to be one.
producers get an acknowledgement back when they publish a message containing the record's offset. the first record published to a partition is given the offset 0, the second record 1, and so on in an ever-increasing sequence. consumers consume data from a position specified by an offset, and they save their position in a log by committing periodically: saving this offset in case that consumer instance crashes and another instance needs to resume from it's position.
okay, hopefully that all made sense (if not, you can read a more complete introduction to kafka ).
this test is against trunk, as i made some improvements to the performance tests for this benchmark. but nothing too substantial has changed since the last full release, so you should see similar results with . i am also using our newly re-written , which offers much improved throughput over the previous producer client.
i've followed the basic template of this very nice , but i covered scenarios and options that were more relevant to kafka.
one quick philosophical note on this benchmark. for benchmarks that are going to be publicly reported, i like to follow a style i call "lazy benchmarking". when you work on a system, you generally have the know-how to tune it to perfection for any particular use case. this leads to a kind of benchmarketing where you heavily tune your configuration to your benchmark or worse have a different tuning for each scenario you test. i think the real test of a system is not how it performs when perfectly tuned, but rather how it performs "off the shelf". this is particularly true for systems that run in a multi-tenant setup with dozens or hundreds of use cases where tuning for each use case would be not only impractical but impossible. as a result, i have pretty much stuck with default settings, both for the server and the clients. i will point out areas where i suspect the result could be improved with a little tuning, but i have tried to resist the temptation to do any fiddling myself to improve the results.
i have posted , so it should be possible to replicate results on your own gear if you are interested.
for these tests, i had six machines each has the following specs
the kafka cluster is set up on three of the machines. the six drives are directly mounted with no raid (jbod style). the remaining three machines i use for zookeeper and for generating load.
a three machine cluster isn't very big, but since we will only be testing up to a replication factor of three, it is all we need. as should be obvious, we can always add more partitions and spread data onto more machines to scale our cluster horizontally.
this hardware is actually not linkedin's normal kafka hardware. our kafka machines are more closely tuned to running kafka, but are less in the spirit of "off-the-shelf" i was aiming for with these tests. instead, i borrowed these from one of our hadoop clusters, which runs on probably the cheapest gear of any of our persistent systems. hadoop usage patterns are pretty similar to kafka's, so this is a reasonable thing to do.
okay, without further ado, the results!
these tests will stress the throughput of the producer. no consumers are run during these tests, so all messages are persisted but not read (we'll test cases with both producer and consumer in a bit). since we have recently rewritten our producer, i am testing this new code.
821,557 records/sec |
(78.3 mb/sec) |
for this first test i create a topic with six partitions and no replication. then i produce 50 million small (100 byte) records as quickly as possible from a single thread.
the reason for focusing on small records in these tests is that it is the harder case for a messaging system (generally). it is easy to get good throughput in mb/sec if the messages are large, but much harder to get good throughput when the messages are small, as the overhead of processing each message dominates.
throughout this benchmark, when i am reporting mb/sec, i am reporting just the value size of the record times the request per second, none of the other overhead of the request is included. so the actually network usage is higher than what is reported. for example with a 100 byte message we would also transmit about 22 bytes of overhead per message (for an optional key, size delimiting, a message crc, the record offset, and attributes flag), as well as some overhead for the request (including the topic, partition, required acknowledgements, etc). this makes it a little harder to see where we hit the limits of the nic, but this seems a little more reasonable then including our own overhead bytes in throughput numbers. so, in the above result, we are likely saturating the 1 gigabit nic on the client machine.
one immediate observation is that the raw numbers here are much higher than people expect, especially for a persistent storage system. if you are used to random-access data systems, like a database or key-value store, you will generally expect maximum throughput around 5,000 to 50,000 queries-per-second, as this is close to the speed that a good rpc layer can do remote requests. we exceed this due to two key design principles:
if you are interested in the details you can read a little more about this in our .
786,980 records/sec |
(75.1 mb/sec) |
this test is exactly the same as the previous one except that now each partition has three replicas (so the total data written to network or disk is three times higher). each server is doing both writes from the producer for the partitions for which it is a master, as well as fetching and writing data for the partitions for which it is a follower.
replication in this test is asynchronous. that is, the server acknowledges the write as soon as it has written it to its local log without waiting for the other replicas to also acknowledge it. this means, if the master were to crash, it would likely lose the last few messages that had been written but not yet replicated. this makes the message acknowledgement latency a little better at the cost of some risk in the case of server failure.
the key take away i would like people to have from this is that replication can be fast. the total cluster write capacity is, of course, 3x less with 3x replication (since each write is done three times), but the throughput is still quite good per client. high performance replication comes in large part from the efficiency of our consumer (the replicas are really nothing more than a specialized consumer) which i will discuss in the consumer section.
421,823 records/sec |
(40.2 mb/sec) |
this test is the same as above except that now the master for a partition waits for acknowledgement from the full set of in-sync replicas before acknowledging back to the producer. in this mode, we guarantee that messages will not be lost as long as one in-sync replica remains.
synchronous replication in kafka is not fundamentally very different from asynchronous replication. the leader for a partition always tracks the progress of the follower replicas to monitor their liveness, and we never give out messages to consumers until they are fully acknowledged by replicas. with synchronous replication we just wait to respond to the producer request until the followers have replicated it.
this additional latency does seem to affect our throughput. since the code path on the server is very similar, we could probably ameliorate this impact by tuning the batching to be a bit more aggressive and allowing the client to buffer more outstanding requests. however, in spirit of avoiding special case tuning, i have avoided this.2,024,032 records/sec |
(193.0 mb/sec) |
our single producer process is clearly not stressing our three node cluster. to add a little more load, i'll now repeat the previous async replication test, but now use three producer load generators running on three different machines (running more processes on the same machine won't help as we are saturating the nic). then we can look at the aggregate throughput across these three producers to get a better feel for the cluster's aggregate capacity.
one of the hidden dangers of many messaging systems is that they work well only as long as the data they retain fits in memory. their throughput falls by an order of magnitude (or more) when data backs up and isn't consumed (and hence needs to be stored on disk). this means things may be running fine as long as your consumers keep up and the queue is empty, but as soon as they lag, the whole messaging layer backs up with unconsumed data. the backup causes data to go to disk which in turns causes performance to drop to a rate that means messaging system can no longer keep up with incoming data and either backs up or falls over. this is pretty terrible, as in many cases the whole purpose of the queue was to handle such a case gracefully.
since kafka always persists messages the performance is o(1) with respect to unconsumed data volume.
to test this experimentally, let's run our throughput test over an extended period of time and graph the results as the stored dataset grows:
this graph actually does show some variance in performance, but no impact due to data size: we perform just as well after writing a tb of data, as we do for the first few hundred mbs.
the variance seems to be due to linux's i/o management facilities that batch data and then flush it periodically. this is something we have tuned for a little better on our production kafka setup. some notes on tuning i/o are available .
okay now let's turn our attention to consumer throughput.
note that the replication factor will not effect the outcome of this test as the consumer only reads from one replica regardless of the replication factor. likewise, the acknowledgement level of the producer also doesn't matter as the consumer only ever reads fully acknowledged messages, (even if the producer doesn't wait for full acknowledgement). this is to ensure that any message the consumer sees will always be present after a leadership handoff (if the current leader fails).
940,521 records/sec |
(89.7 mb/sec) |
for the first test, we will consume 50 million messages in a single thread from our 6 partition 3x replicated topic.
kafka's consumer is very efficient. it works by fetching chunks of log directly from the filesystem. it uses the to transfer this directly through the operating system without the overhead of copying this data through the application. this test actually starts at the beginning of the log, so it is doing real read i/o. in a production setting, though, the consumer reads almost exclusively out of the os pagecache, since it is reading data that was just written by some producer (so it is still cached). in fact, if you run i/o stat on a production server you actually see that there are no physical reads at all even though a great deal of data is being consumed.
making consumers cheap is important for what we want kafka to do. for one thing, the replicas are themselves consumers, so making the consumer cheap makes replication cheap. in addition, this makes handling out data an inexpensive operation, and hence not something we need to tightly control for scalability reasons.
2,615,968 records/sec |
(249.5 mb/sec) |
let's repeat the same test, but run three parallel consumer processes, each on a different machine, and all consuming the same topic.
as expected, we see near linear scaling (not surprising because consumption in our model is so simple).
795,064 records/sec |
(75.8 mb/sec) |
the above tests covered just the producer and the consumer running in isolation. now let's do the natural thing and run them together. actually, we have technically already been doing this, since our replication works by having the servers themselves act as consumers.
all the same, let's run the test. for this test we'll run one producer and one consumer on a six partition 3x replicated topic that begins empty. the producer is again using async replication. the throughput reported is the consumer throughput (which is, obviously, an upper bound on the producer throughput).
as we would expect, the results we get are basically the same as we saw in the producer only case—the consumer is fairly cheap.
i have mostly shown performance on small 100 byte messages. smaller messages are the harder problem for a messaging system as they magnify the overhead of the bookkeeping the system does. we can show this by just graphing throughput in both records/second and mb/second as we vary the record size.
so, as we would expect, this graph shows that the raw count of records we can send per second decreases as the records get bigger. but if we look at mb/second, we see that the total byte throughput of real user data increases as messages get bigger:we can see that with the 10 byte messages we are actually cpu bound by just acquiring the lock and enqueuing the message for sending—we are not able to actually max out the network. however, starting with 100 bytes, we are actually seeing network saturation (though the mb/sec continues to increase as our fixed-size bookkeeping bytes become an increasingly small percentage of the total bytes sent).
2 ms (median) |
3 ms (99th percentile) |
14 ms (99.9th percentile) |
we have talked a lot about throughput, but what is the latency of message delivery? that is, how long does it take a message we send to be delivered to the consumer? for this test, we will create producer and consumer and repeatedly time how long it takes for a producer to send a message to the kafka cluster and then be received by our consumer.
note that, kafka only gives out messages to consumers when they are acknowledged by the full in-sync set of replicas. so this test will give the same results regardless of whether we use sync or async replication, as that setting only affects the acknowledgement to the producer.
if you want to try out these benchmarks on your own machines, you can. as i said, i mostly just used our pre-packaged performance testing tools that ship with kafka and mostly stuck with the default configs both for the server and for the clients. however, you can see more details of the configuration and commands .
本文将针对kafka性能方面进行简单分析,首先简单介绍一下kafka的架构和涉及到的名词:
1. topic:用于划分message的逻辑概念,一个topic可以分布在多个broker上。
2. partition:是kafka中横向扩展和一切并行化的基础,每个topic都至少被切分为1个partition。
3. offset:消息在partition中的编号,编号顺序不跨partition。
4. consumer:用于从broker中取出/消费message。
5. producer:用于往broker中发送/生产message。
6. replication:kafka支持以partition为单位对message进行冗余备份,每个partition都可以配置至少1个replication(当仅1个replication时即仅该partition本身)。
7. leader:每个replication集合中的partition都会选出一个唯一的leader,所有的读写请求都由leader处理。其他replicas从leader处把数据更新同步到本地,过程类似大家熟悉的mysql中的binlog同步。
8. broker:kafka中使用broker来接受producer和consumer的请求,并把message持久化到本地磁盘。每个cluster当中会选举出一个broker来担任controller,负责处理partition的leader选举,协调partition迁移等工作。
9. isr(in-sync replica):是replicas的一个子集,表示目前alive且与leader能够“catch-up”的replicas集合。由于读写都是首先落到leader上,所以一般来说通过同步机制从leader上拉取数据的replica都会和leader有一些延迟(包括了延迟时间和延迟条数两个维度),任意一个超过阈值都会把该replica踢出isr。每个partition都有它自己独立的isr。
以上几乎是我们在使用kafka的过程中可能遇到的所有名词,同时也无一不是最核心的概念或组件,感觉到从设计本身来说,kafka还是足够简洁的。这次本文围绕kafka优异的吞吐性能,逐个介绍一下其设计与实现当中所使用的各项“黑科技”。
broker
不同于redis和memcacheq等内存消息队列,kafka的设计是把所有的message都要写入速度低容量大的硬盘,以此来换取更强的存储能力。实际上,kafka使用硬盘并没有带来过多的性能损失,“规规矩矩”的抄了一条“近道”。
首先,说“规规矩矩”是因为kafka在磁盘上只做sequence i/o,由于消息系统读写的特殊性,这并不存在什么问题。关于磁盘i/o的性能,引用一组kafka官方给出的测试数据(raid-5,7200rpm):
sequence i/o: 600mb/s
random i/o: 100kb/s
所以通过只做sequence i/o的限制,规避了磁盘访问速度低下对性能可能造成的影响。
接下来我们再聊一聊kafka是如何“抄近道的”。
首先,kafka重度依赖底层操作系统提供的pagecache功能。当上层有写操作时,操作系统只是将数据写入pagecache,同时标记page属性为dirty。当读操作发生时,先从pagecache中查找,如果发生缺页才进行磁盘调度,最终返回需要的数据。实际上pagecache是把尽可能多的空闲内存都当做了磁盘缓存来使用。同时如果有其他进程申请内存,回收pagecache的代价又很小,所以现代的os都支持pagecache。
使用pagecache功能同时可以避免在jvm内部缓存数据,jvm为我们提供了强大的gc能力,同时也引入了一些问题不适用与kafka的设计。
· 如果在heap内管理缓存,jvm的gc线程会频繁扫描heap空间,带来不必要的开销。如果heap过大,执行一次full gc对系统的可用性来说将是极大的挑战。
· 所有在在jvm内的对象都不免带有一个object overhead(千万不可小视),内存的有效空间利用率会因此降低。
· 所有的in-process cache在os中都有一份同样的pagecache。所以通过将缓存只放在pagecache,可以至少让可用缓存空间翻倍。
· 如果kafka重启,所有的in-process cache都会失效,而os管理的pagecache依然可以继续使用。
pagecache还只是第一步,kafka为了进一步的优化性能还采用了sendfile技术。在解释sendfile之前,首先介绍一下传统的网络i/o操作流程,大体上分为以下4步。
1. os 从硬盘把数据读到内核区的pagecache。
2. 用户进程把数据从内核区copy到用户区。
3. 然后用户进程再把数据写入到socket,数据流入内核区的socket buffer上。
4. os 再把数据从buffer中copy到网卡的buffer上,这样完成一次发送。
整个过程共经历两次context switch,四次system call。同一份数据在内核buffer与用户buffer之间重复拷贝,效率低下。其中2、3两步没有必要,完全可以直接在内核区完成数据拷贝。这也正是sendfile所解决的问题,经过sendfile优化后,整个i/o过程就变成了下面这个样子。
通过以上的介绍不难看出,kafka的设计初衷是尽一切努力在内存中完成数据交换,无论是对外作为一整个消息系统,或是内部同底层操作系统的交互。如果producer和consumer之间生产和消费进度上配合得当,完全可以实现数据交换零i/o。这也就是我为什么说kafka使用“硬盘”并没有带来过多性能损失的原因。下面是我在生产环境中采到的一些指标。
(20 brokers, 75 partitions per broker, 110k msg/s)
此时的集群只有写,没有读操作。10m/s左右的send的流量是partition之间进行replicate而产生的。从recv和writ的速率比较可以看出,写盘是使用asynchronous batch的方式,底层os可能还会进行磁盘写顺序优化。而在有read request进来的时候分为两种情况,第一种是内存中完成数据交换。
send流量从平均10m/s增加到了到平均60m/s,而磁盘read只有不超过50kb/s。pagecache降低磁盘i/o效果非常明显。
接下来是读一些收到了一段时间,已经从内存中被换出刷写到磁盘上的老数据。
其他指标还是老样子,而磁盘read已经飚高到40 mb/s。此时全部的数据都已经是走硬盘了(对硬盘的顺序读取os层会进行prefill pagecache的优化)。依然没有任何性能问题。
tips
1. kafka官方并不建议通过broker端的log.flush.interval.messages和log.flush.interval.ms来强制写盘,认为数据的可靠性应该通过replica来保证,而强制flush数据到磁盘会对整体性能产生影响。
2. 可以通过调整/proc/sys/vm/dirty_background_ratio和/proc/sys/vm/dirty_ratio来调优性能。
a. 脏页率超过第一个指标会启动pdflush开始flush dirty pagecache。
b. 脏页率超过第二个指标会阻塞所有的写操作来进行flush。
c. 根据不同的业务需求可以适当的降低dirty_background_ratio和提高dirty_ratio。
partition
partition是kafka可以很好的横向扩展和提供高并发处理以及实现replication的基础。
扩展性方面。首先,kafka允许partition在集群内的broker之间任意移动,以此来均衡可能存在的数据倾斜问题。其次,partition支持自定义的分区算法,例如可以将同一个key的所有消息都路由到同一个partition上去。 同时leader也可以在in-sync的replica中迁移。由于针对某一个partition的所有读写请求都是只由leader来处理,所以kafka会尽量把leader均匀的分散到集群的各个节点上,以免造成网络流量过于集中。
并发方面。任意partition在某一个时刻只能被一个consumer group内的一个consumer消费(反过来一个consumer则可以同时消费多个partition),kafka非常简洁的offset机制最小化了broker和consumer之间的交互,这使kafka并不会像同类其他消息队列一样,随着下游consumer数目的增加而成比例的降低性能。此外,如果多个consumer恰巧都是消费时间序上很相近的数据,可以达到很高的pagecache命中率,因而kafka可以非常高效的支持高并发读操作,实践中基本可以达到单机网卡上限。
不过,partition的数量并不是越多越好,partition的数量越多,平均到每一个broker上的数量也就越多。考虑到broker宕机(network failure, full gc)的情况下,需要由controller来为所有宕机的broker上的所有partition重新选举leader,假设每个partition的选举消耗10ms,如果broker上有500个partition,那么在进行选举的5s的时间里,对上述partition的读写操作都会触发leadernotavailableexception。
再进一步,如果挂掉的broker是整个集群的controller,那么首先要进行的是重新任命一个broker作为controller。新任命的controller要从zookeeper上获取所有partition的meta信息,获取每个信息大概3-5ms,那么如果有10000个partition这个时间就会达到30s-50s。而且不要忘记这只是重新启动一个controller花费的时间,在这基础上还要再加上前面说的选举leader的时间 -_-!!!!!!
此外,在broker端,对producer和consumer都使用了buffer机制。其中buffer的大小是统一配置的,数量则与partition个数相同。如果partition个数过多,会导致producer和consumer的buffer内存占用过大。
tips
1. partition的数量尽量提前预分配,虽然可以在后期动态增加partition,但是会冒着可能破坏message key和partition之间对应关系的风险。
2. replica的数量不要过多,如果条件允许尽量把replica集合内的partition分别调整到不同的rack。
3. 尽一切努力保证每次停broker时都可以clean shutdown,否则问题就不仅仅是恢复服务所需时间长,还可能出现数据损坏或其他很诡异的问题。
producer
kafka的研发团队表示在0.8版本里用java重写了整个producer,据说性能有了很大提升。我还没有亲自对比试用过,这里就不做数据对比了。本文结尾的扩展阅读里提到了一套我认为比较好的对照组,有兴趣的同学可以尝试一下。
其实在producer端的优化大部分消息系统采取的方式都比较单一,无非也就化零为整、同步变异步这么几种。
kafka系统默认支持messageset,把多条message自动地打成一个group后发送出去,均摊后拉低了每次通信的rtt。而且在组织messageset的同时,还可以把数据重新排序,从爆发流式的随机写入优化成较为平稳的线性写入。
此外,还要着重介绍的一点是,producer支持end-to-end的压缩。数据在本地压缩后放到网络上传输,在broker一般不解压(除非指定要deep-iteration),直至消息被consume之后在客户端解压。
当然用户也可以选择自己在应用层上做压缩和解压的工作(毕竟kafka目前支持的压缩算法有限,只有gzip和snappy),不过这样做反而会意外的降低效率!!!! kafka的end-to-end压缩与messageset配合在一起工作效果最佳,上面的做法直接割裂了两者间联系。至于道理其实很简单,压缩算法中一条基本的原理“重复的数据量越多,压缩比越高”。无关于消息体的内容,无关于消息体的数量,大多数情况下输入数据量大一些会取得更好的压缩比。
不过kafka采用messageset也导致在可用性上一定程度的妥协。每次发送数据时,producer都是send()之后就认为已经发送出去了,但其实大多数情况下消息还在内存的messageset当中,尚未发送到网络,这时候如果producer挂掉,那就会出现丢数据的情况。
为了解决这个问题,kafka在0.8版本的设计借鉴了网络当中的ack机制。如果对性能要求较高,又能在一定程度上允许message的丢失,那就可以设置request.required.acks=0 来关闭ack,以全速发送。如果需要对发送的消息进行确认,就需要设置request.required.acks为1或-1,那么1和-1又有什么区别呢?这里又要提到前面聊的有关replica数量问题。如果配置为1,表示消息只需要被leader接收并确认即可,其他的replica可以进行异步拉取无需立即进行确认,在保证可靠性的同时又不会把效率拉得很低。如果设置为-1,表示消息要commit到该partition的isr集合中的所有replica后,才可以返回ack,消息的发送会更安全,而整个过程的延迟会随着replica的数量正比增长,这里就需要根据不同的需求做相应的优化。
tips
1. producer的线程不要配置过多,尤其是在mirror或者migration中使用的时候,会加剧目标集群partition消息乱序的情况(如果你的应用场景对消息顺序很敏感的话)。
2. 0.8版本的request.required.acks默认是0(同0.7)。
consumer
consumer端的设计大体上还算是比较常规的。
· 通过consumer group,可以支持生产者消费者和队列访问两种模式。
· consumer api分为high level和low level两种。前一种重度依赖zookeeper,所以性能差一些且不自由,但是超省心。第二种不依赖zookeeper服务,无论从自由度和性能上都有更好的表现,但是所有的异常(leader迁移、offset越界、broker宕机等)和offset的维护都需要自行处理。
· 大家可以关注下不日发布的0.9 release。开发人员又用java重写了一套consumer。把两套api合并在一起,同时去掉了对zookeeper的依赖。据说性能有大幅度提升哦~~
tips
强烈推荐使用low level api,虽然繁琐一些,但是目前只有这个api可以对error数据进行自定义处理,尤其是处理broker异常或由于unclean shutdown导致的corrupted data时,否则无法skip只能等着“坏消息”在broker上被rotate掉,在此期间该replica将会一直处于不可用状态。
扩展阅读
sendfile: https://www.ibm.com/developerworks/cn/java/j-zerocopy/
so what’s wrong with 1975 programming: https://www.varnish-cache.org/trac/wiki/architectnotes
benchmarking: https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines
redis有一系列的命令,特点是以nx结尾,nx是not exists的缩写,如setnx命令就应该理解为:set if not exists。这系列的命令非常有用,这里讲使用setnx来实现分布式锁。
利用setnx非常简单地实现分布式锁。例如:某客户端要获得一个名字foo的锁,客户端使用下面的命令进行获取:
setnx lock.foo
上面的锁定逻辑有一个问题:如果一个持有锁的客户端失败或崩溃了不能释放锁,该怎么解决?我们可以通过锁的键对应的时间戳来判断这种情况是否发生了,如果当前的时间已经大于lock.foo的值,说明该锁已失效,可以被重新使用。
发生这种情况时,可不能简单的通过del来删除锁,然后再setnx一次,当多个客户端检测到锁超时后都会尝试去释放它,这里就可能出现一个竞态条件,让我们模拟一下这个场景:
这样一来,c1,c2都拿到了锁!问题大了!
幸好这种问题是可以避免d,让我们来看看c3这个客户端是怎样做的:
注意:为了让分布式锁的算法更稳键些,持有锁的客户端在解锁之前应该再检查一次自己的锁是否已经超时,再去做del操作,因为可能客户端因为某个耗时的操作而挂起,操作完的时候锁因为超时已经被别人获得,这时就不必解锁了。
根据上面的代码,我写了一小段fake代码来描述使用分布式锁的全过程:
是的,要想这段逻辑可以重用,使用python的你马上就想到了decorator,而用java的你是不是也想到了那谁?aop annotation?行,怎样舒服怎样用吧,别重复代码就行。
背景
在 很多互联网产品应用中,有些场景需要加锁处理,比如:秒杀,全局递增id,楼层生成等等。大部分的凯发天生赢家一触即发官网的解决方案是基于db实现的,redis为单进程单线程模 式,采用队列模式将并发访问变成串行访问,且多客户端对redis的连接并不存在竞争关系。其次redis提供一些命令setnx,getset,可以方 便实现分布式锁机制。
redis命令介绍
使用redis实现分布式锁,有两个重要函数需要介绍
setnx命令(set if not exists)
语法:
setnx key value
功能:
当且仅当 key 不存在,将 key 的值设为 value ,并返回1;若给定的 key 已经存在,则 setnx 不做任何动作,并返回0。
getset命令
语法:
getset key value
功能:
将给定 key 的值设为 value ,并返回 key 的旧值 (old value),当 key 存在但不是字符串类型时,返回一个错误,当key不存在时,返回nil。
get命令
语法:
get key
功能:
返回 key 所关联的字符串值,如果 key 不存在那么返回特殊值 nil 。
del命令
语法:
del key [key …]
功能:
删除给定的一个或多个 key ,不存在的 key 会被忽略。
兵贵精,不在多。分布式锁,我们就依靠这四个命令。但在具体实现,还有很多细节,需要仔细斟酌,因为在分布式并发多进程中,任何一点出现差错,都会导致死锁,hold住所有进程。
加锁实现
setnx 可以直接加锁操作,比如说对某个关键词foo加锁,客户端可以尝试
setnx foo.lock
如果返回1,表示客户端已经获取锁,可以往下操作,操作完成后,通过
del foo.lock
命令来释放锁。
如果返回0,说明foo已经被其他客户端上锁,如果锁是非堵塞的,可以选择返回调用。如果是堵塞调用调用,就需要进入以下个重试循环,直至成功获得锁或者重试超时。理想是美好的,现实是残酷的。仅仅使用setnx加锁带有竞争条件的,在某些特定的情况会造成死锁错误。
处理死锁
在 上面的处理方式中,如果获取锁的客户端端执行时间过长,进程被kill掉,或者因为其他异常崩溃,导致无法释放锁,就会造成死锁。所以,需要对加锁要做时 效性检测。因此,我们在加锁时,把当前时间戳作为value存入此锁中,通过当前时间戳和redis中的时间戳进行对比,如果超过一定差值,认为锁已经时 效,防止锁无限期的锁下去,但是,在大并发情况,如果同时检测锁失效,并简单粗暴的删除死锁,再通过setnx上锁,可能会导致竞争条件的产生,即多个客 户端同时获取锁。
c1获取锁,并崩溃。c2和c3调用setnx上锁返回0后,获得foo.lock的时间戳,通过比对时间戳,发现锁超时。
c2 向foo.lock发送del命令。
c2 向foo.lock发送setnx获取锁。
c3 向foo.lock发送del命令,此时c3发送del时,其实del掉的是c2的锁。
c3 向foo.lock发送setnx获取锁。
此时c2和c3都获取了锁,产生竞争条件,如果在更高并发的情况,可能会有更多客户端获取锁。所以,del锁的操作,不能直接使用在锁超时的情况下,幸好我们有getset方法,假设我们现在有另外一个客户端c4,看看如何使用getset方式,避免这种情况产生。
c1获取锁,并崩溃。c2和c3调用setnx上锁返回0后,调用get命令获得foo.lock的时间戳t1,通过比对时间戳,发现锁超时。
c4 向foo.lock发送geset命令,
getset foo.lock
并得到foo.lock中老的时间戳t2
如果t1=t2,说明c4获得时间戳。
如果t1!=t2,说明c4之前有另外一个客户端c5通过调用getset方式获取了时间戳,c4未获得锁。只能sleep下,进入下次循环中。
现在唯一的问题是,c4设置foo.lock的新时间戳,是否会对锁产生影响。其实我们可以看到c4和c5执行的时间差值极小,并且写入foo.lock中的都是有效时间错,所以对锁并没有影响。
为 了让这个锁更加强壮,获取锁的客户端,应该在调用关键业务时,再次调用get方法获取t1,和写入的t0时间戳进行对比,以免锁因其他情况被执行del意 外解开而不知。以上步骤和情况,很容易从其他参考资料中看到。客户端处理和失败的情况非常复杂,不仅仅是崩溃这么简单,还可能是客户端因为某些操作被阻塞 了相当长时间,紧接着 del 命令被尝试执行(但这时锁却在另外的客户端手上)。也可能因为处理不当,导致死锁。还有可能因为sleep设置不合理,导致redis在大并发下被压垮。 最为常见的问题还有
get返回nil时应该走那种逻辑?
第一种走超时逻辑
c1客户端获取锁,并且处理完后,del掉锁,在del锁之前。c2通过setnx向foo.lock设置时间戳t0 发现有客户端获取锁,进入get操作。
c2 向foo.lock发送get命令,获取返回值t1(nil)。
c2 通过t0>t1 expire对比,进入getset流程。
c2 调用getset向foo.lock发送t0时间戳,返回foo.lock的原值t2
c2 如果t2=t1相等,获得锁,如果t2!=t1,未获得锁。
第二种情况走循环走setnx逻辑
c1客户端获取锁,并且处理完后,del掉锁,在del锁之前。c2通过setnx向foo.lock设置时间戳t0 发现有客户端获取锁,进入get操作。
c2 向foo.lock发送get命令,获取返回值t1(nil)。
c2 循环,进入下一次setnx逻辑
两 种逻辑貌似都是ok,但是从逻辑处理上来说,第一种情况存在问题。当get返回nil表示,锁是被删除的,而不是超时,应该走setnx逻辑加锁。走第一 种情况的问题是,正常的加锁逻辑应该走setnx,而现在当锁被解除后,走的是getst,如果判断条件不当,就会引起死锁,很悲催,我在做的时候就碰到 了,具体怎么碰到的看下面的问题
getset返回nil时应该怎么处理?
c1和c2客户端调用get接口,c1返回t1,此时c3网络情况更好,快速进入获取锁,并执行del删除锁,c2返回t2(nil),c1和c2都进入超时处理逻辑。
c1 向foo.lock发送getset命令,获取返回值t11(nil)。
c1 比对c1和c11发现两者不同,处理逻辑认为未获取锁。
c2 向foo.lock发送getset命令,获取返回值t22(c1写入的时间戳)。
c2 比对c2和c22发现两者不同,处理逻辑认为未获取锁。
此 时c1和c2都认为未获取锁,其实c1是已经获取锁了,但是他的处理逻辑没有考虑getset返回nil的情况,只是单纯的用get和getset值就行 对比,至于为什么会出现这种情况?一种是多客户端时,每个客户端连接redis的后,发出的命令并不是连续的,导致从单客户端看到的好像连续的命令,到 redis server后,这两条命令之间可能已经插入大量的其他客户端发出的命令,比如del,setnx等。第二种情况,多客户端之间时间不同步,或者不是严格 意义的同步。
时间戳的问题
我们看到foo.lock的value值为时间戳,所以要在多客户端情况下,保证锁有效,一定要同步各服务器的时间,如果各服务器间,时间有差异。时间不一致的客户端,在判断锁超时,就会出现偏差,从而产生竞争条件。
锁的超时与否,严格依赖时间戳,时间戳本身也是有精度限制,假如我们的时间精度为秒,从加锁到执行操作再到解锁,一般操作肯定都能在一秒内完成。这样的话,我们上面的case,就很容易出现。所以,最好把时间精度提升到毫秒级。这样的话,可以保证毫秒级别的锁是安全的。
分布式锁的问题
1:必要的超时机制:获取锁的客户端一旦崩溃,一定要有过期机制,否则其他客户端都降无法获取锁,造成死锁问题。
2:分布式锁,多客户端的时间戳不能保证严格意义的一致性,所以在某些特定因素下,有可能存在锁串的情况。要适度的机制,可以承受小概率的事件产生。
3:只对关键处理节点加锁,良好的习惯是,把相关的资源准备好,比如连接数据库后,调用加锁机制获取锁,直接进行操作,然后释放,尽量减少持有锁的时间。
4:在持有锁期间要不要check锁,如果需要严格依赖锁的状态,最好在关键步骤中做锁的check检查机制,但是根据我们的测试发现,在大并发时,每一次check锁操作,都要消耗掉几个毫秒,而我们的整个持锁处理逻辑才不到10毫秒,玩客没有选择做锁的检查。
5:sleep学问,为了减少对redis的压力,获取锁尝试时,循环之间一定要做sleep操作。但是sleep时间是多少是门学问。需要根据自己的redis的qps,加上持锁处理时间等进行合理计算。
6:至于为什么不使用redis的muti,expire,watch等机制,可以查一参考资料,找下原因。
锁测试数据
未使用sleep
第一种,锁重试时未做sleep。单次请求,加锁,执行,解锁时间
可以看到加锁和解锁时间都很快,当我们使用
ab -n1000 -c100 'http://sandbox6.wanke.etao.com/test/test_sequence.php?tbpm=t'
ab 并发100累计1000次请求,对这个方法进行压测时。
我们会发现,获取锁的时间变成,同时持有锁后,执行时间也变成,而delete锁的时间,将近10ms时间,为什么会这样?
1:持有锁后,我们的执行逻辑中包含了再次调用redis操作,在大并发情况下,redis执行明显变慢。
2:锁的删除时间变长,从之前的0.2ms,变成9.8ms,性能下降近50倍。
在这种情况下,我们压测的qps为49,最终发现qps和压测总量有关,当我们并发100总共100次请求时,qps得到110多。当我们使用sleep时
使用sleep时
单次执行请求时
我们看到,和不使用sleep机制时,性能相当。当时用相同的压测条件进行压缩时
获取锁的时间明显变长,而锁的释放时间明显变短,仅是不采用sleep机制的一半。当然执行时间变成就是因为,我们在执行过程中,重新创建数据库连接,导致时间变长的。同时我们可以对比下redis的命令执行压力情况
上 图中细高部分是为未采用sleep机制的时的压测图,矮胖部分为采用sleep机制的压测图,通上图看到压力减少50%左右,当然,sleep这种方式还 有个缺点qps下降明显,在我们的压测条件下,仅为35,并且有部分请求出现超时情况。不过综合各种情况后,我们还是决定采用sleep机制,主要是为了 防止在大并发情况下把redis压垮,很不行,我们之前碰到过,所以肯定会采用sleep机制。
参考资料
http://www.blogjava.net/caojianhua/archive/2013/01/28/394847.html
redis是一个很强大的数据结构存储的nosql数据库,很方便针对业务模型进行效率的优化。最近我的工作是负责对现有java服务器框架进行整理,并将网络层与逻辑层脱离,以便于逻辑层和网络层的横向扩展。 尽管我在逻辑层上使用了akka作为核心框架,尽可能lockfree,但是还是免不了需要跨jvm的锁。所以我需要实现一个分布式锁。
官方在 这一页给了一个实现。
但是使用官方推荐的getset实现的话,未竞争到锁的一方确实可以判断到自己未能竞争到锁,但却将持有锁一方的时间修改了,这样的直接后果就是,持有锁的一方无法解锁!!!
其实官方实现出现的问题,是因为使用redis独立的命令不能将get-check-set这个过程进行原子化,所以我决定引入redis-lua,将get-check-set这个过程使用lua脚本来实现。
加锁:
解锁:
具体的实现:
---lock
local now = tonumber(argv[1])
local timeout = tonumber(argv[2])
local to = now timeout
local locked = redis.call('setnx', keys[1], to)
if (locked == 1) then
return 0
end
local kt = redis.call('type', keys[1]);
if (kt['ok'] ~= 'string') then
return 2
end
local keyvalue = tonumber(redis.call('get', keys[1]))
if (now > keyvalue) then
redis.call('set', keys[1], to)
return 0
end
return 1
---unlock
local begin = tonumber(argv[1])
local timeout = tonumber(argv[2])
local kt = redis.call('type', keys[1]);
if (kt['ok'] == 'string') then
local keyvalue = tonumber(redis.call('get', keys[1]))
if ((keyvalue - begin) == timeout) then
redis.call('del', keys[1])
return 0
end
end
return 1
redis的分布式锁会有单点的问题。当然我们的业务量也没有达到挂掉专门做锁的redis单点的水平。
通 过本文,你可以对模块化开发和amd规范有一个较直观的认识,并详细地学习requirejs这个模块化开发工具的常见用法。本文采取循序渐进的方式,从 理论到实践,从requirejs官方api文档中,总结出在使用requirejs过程中最常用的一些用法,并对文档中不够清晰具体的内容,加以例证和 分析,希望本文的内容对你的能力提升有实质性的帮助。
相信每个前端开发人员在刚开始接触js编程时,都写过类似下面这样风格的代码:
这些代码的特点是:
当然这些代码本身在实现功能上并没有错误,但是从代码的可重用性,健壮性以及可维护性来说,这种编程方式是有问题的,尤其是在页面逻辑较为复杂的应用中,这些问题会暴露地特别明显:
所以当这些问题出现的时候,js大牛们就开始寻找去解决这些问题的究极办法,于是模块化开发就出现了。正如模块化这个概念的表面意思一样,它要求在 编写代码的时候,按层次,按功能,将独立的逻辑,封装成可重用的模块,对外提供直接明了的调用接口,内部实现细节完全私有,并且模块之间的内部实现在执行 期间互不干扰,最终的结果就是可以解决前面举例提到的问题。一个简单遵循模块化开发要求编写的例子:
//module.js var student = function (name) { return name && { getname: function () { return name; } }; }, course = function (name) { return name && { getname: function () { return name; } } }, controller = function () { var data = {}; return { add: function (stu, cour) { var stuname = stu && stu.getname(), courname = cour && cour.getname(), current, _filter = function (e) { return e === courname; }; if (!stuname || !courname) return; current = data[stuname] = data[stuname] || []; if (current.filter(_filter).length === 0) { current.push(courname); } }, list: function (stu) { var stuname = stu && stu.getname(), current = data[stuname]; current && console.log(current.join(';')); } } }; //main.js var stu = new student('lyzg'), c = new controller(); c.add(stu,new course('javascript')); c.add(stu,new course('html')); c.add(stu,new course('css')); c.list(stu);
以上代码定义了三个模块分别表示学生,课程和控制器,然后在main.js中调用了controller提供的add和list接口,为lyzg这个学生添加了三门课程,然后在控制台显示了出来。运行结果如下:
javascript;html;css
通过上例,可以看出模块化的代码结构和逻辑十分清晰,代码看起来十分优雅,另外由于逻辑都通过模块拆分,所以达到了解耦的目的,代码的功能也会比较 健壮。不过上例使用的这种模块化开发方式也并不是没有问题,这个问题就是它还是把模块引用如student这些直接添加到了全局空间下,虽然通过模块减少 了很多全局空间的变量和函数,但是模块引用本身还是要依赖全局空间,才能被调用,当模块较多,或者有引入第三方模块库时,仍然可能造成命名冲突的问题,所 以这种全局空间下的模块化开发的方式并不是最完美的方式。目前常见的模块化开发方式,全局空间方式是最基本的一种,另外常见的还有遵循amd规范的开发方 式,遵循cmd规范的开发方式,和ecmascript 6的开发方式。需要说明的是,cmd和es6跟本文的核心没有关系,所以不会在此介绍,后面的内容主要介绍amd以及实现了amd规范的 requirejs。
正如上文提到,实现模块化开发的方式,另外常见的一种就是遵循amd规范的实现方式,不过amd规范并不是具体的实现方式,而仅仅是模块化开发的一 种凯发天生赢家一触即发官网的解决方案,你可以把它理解成模块化开发的一些接口声明,如果你要实现一个遵循该规范的模块化开发工具,就必须实现它预先定义的api。比如它要求在加载 模块时,必须使用如下的api调用方式:
require([module], callback) 其中: [module]:是一个数组,里面的成员就是要加载的模块; callback:是模块加载完成之后的回调函数
所有遵循amd规范的模块化工具,都必须按照它的要求去实现,比如requirejs这个库,就是完全遵循amd规范实现的,所以在利用 requirejs加载或者调用模块时,如果你事先知道amd规范的话,你就知道该怎么用requirejs了。规范的好处在于,不同的实现却有相同的调 用方式,很容易切换不同的工具使用,至于具体用哪一个实现,这就跟各个工具的各自的优点跟项目的特点有关系,这些都是在项目开始选型的时候需要确定的。目 前requirejs不是唯一实现了amd规范的库,像dojo这种更全面的js库也都有amd的实现。
最后对amd全称做一个解释,译为:异步模块定义。异步强调的是,在加载模块以及模块所依赖的其它模块时,都采用异步加载的方式,避免模块加载阻塞了网页的渲染进度。相比传统的异步加载,amd工具的异步加载更加简便,而且还能实现按需加载,具体解释在下一部分说明。
html中的script标签在加载和执行过程中会阻塞网页的渲染,所以一般要求尽量将script标签放置在body元素的底部,以便加快页面显示的速度,还有一种方式就是通过异步加载的方式来加载js,这样可以避免js文件对html渲染的阻塞。
第1种异步加载的方式是直接利用脚本生成script标签的方式:
(function() { var s = document.createelement('script'); s.type = 'text/javascript'; s.async = true; s.src = 'http://yourdomain.com/script.js'; var x = document.getelementsbytagname('script')[0]; x.parentnode.insertbefore(s, x); })();
这段代码,放置在script标记内部,然后该script标记添加到body元素的底部即可。
第2种方式是借助script的属性:defer和async,defer这个属性在ie浏览器和早起的火狐浏览器中支持,async在支持 html5的浏览器上都支持,只要有这两个属性,script就会以异步的方式来加载,所以script在html中的位置就不重要了:
这种方式下,所有异步js在执行的时候还是按顺序执行的,不然就会存在依赖问题,比如如果上例中的main.js依赖foo.js和bar.js, 但是main.js先执行的话就会出错了。虽然从来理论上这种方式也算不错了,但是不够好,因为它用起来很繁琐,而且还有个问题就是页面需要添加多个 script标记以及没有办法完全做到按需加载。
js的按需加载分两个层次,第一个层次是只加载这个页面可能被用到的js,第二个层次是在只在用到某个js的时 候才去加载。传统地方式很容易做到第一个层次,但是不容易做到第二个层次,虽然我们可以通过合并和压缩工具,将某个页面所有的js都添加到一个文件中去, 最大程度减少资源请求量,但是这个js请求到客户端以后,其中有很多内容可能都用不上,要是有个工具能够做到在需要的时候才去加载相关js就完美解决问题 了,比如requirejs。
前文多次提及requirejs,本部分将对它的常用用法详细说明,它的官方地址是:,你可以到该地址去下载最新版requirejs文件。requirejs作为目前使用最广泛的amd工具,它的主要优点是:
使用方式很简单,只要一个script标记就可以在网页中加载requirejs:
由于这里用到了defer和async这两个异步加载的属性,所以require.js是异步加载的,你把这个script标记放置在任何地方都没有问题。
4.02 如何利用requirejs加载并执行当前网页的逻辑js
4.01解决的仅仅是requirejs的使用问题,但它仅仅是一个js库,是一个被当前页面的逻辑所利用的工具,真正实现网页功能逻辑的是我们要 利用requirejs编写的主js,这个主js(假设这些代码都放置在main.js文件中)又该如何利用rj来加载执行呢?方式如下:
对比4.01,你会发现script标记多了一个data-main,rj用这个配置当前页面的主js,你要把逻辑都写在这个main.js里面。 当rj自身加载执行后,就会再次异步加载main.js。这个main.js是当前网页所有逻辑的入口,理想情况下,整个网页只需要这一个script标 记,利用rj加载依赖的其它文件,如jquery等。
假设项目的目录结构为:
main.js是跟当前页面相关的主js,app文件夹存放本项目自定义的模块,lib存放第三方库。
html中按4.02的方式配置rj。main.js的代码如下:
require(['lib/foo', 'app/bar', 'app/app'], function(foo, bar, app) { //use foo bar app do sth });
在这段js中,我们利用rj提供的require方法,加载了三个模块,然后在这个三个模块都加载成功之后执行页面逻辑。require方法有2个 参数,第一个参数是数组类型的,实际使用时,数组的每个元素都是一个模块的module id,第二个参数是一个回调函数,这个函数在第一个参数定义的所有模块都加载成功后回调,形参的个数和顺序分别与第一个参数定义的模块对应,比如第一个模 块时lib/foo,那么这个回调函数的第一个参数就是foo这个模块的引用,在回调函数中我们使用这些形参来调用各个模块的方法,由于回调是在各模块加 载之后才调用的,所以这些模块引用肯定都是有效的。
从以上这个简短的代码,你应该已经知道该如何使用rj了。
在介绍rj如何去解析依赖的那些模块js的路径时,必须先弄清楚baseurl和module id这两个概念。
html中的base元素可以定义当前页面内部任何http请求的url前缀部分,rj的baseurl跟这个base元素起的作用是类似的,由于 rj总是动态地请求依赖的js文件,所以必然涉及到一个js文件的路径解析问题,rj默认采用一种baseurl moduleid的解析方式,这个解析方式后续会举例说明。这个baseurl非常重要,rj对它的处理遵循如下规则:
上述三种方式,优先级由低到高排列。
data-main的使用方式,你已经知道了,config该如何配置,如下所示:
require.config({ baseurl: 'scripts' });
这个配置必须放置在main.js的最前面。data-main与config配置同时存在的时候,以config为准,由于rj的其它配置也是在这个位置配置的,所以4.03中的main.js可以改成如下结构,以便将来的扩展:
require.config({ baseurl: 'scripts' }); require(['lib/foo', 'app/bar', 'app/app'], function(foo, bar, app) { // use foo bar app do sth });
关于module id,就是在require方法以及后续的define方法里,用在依赖数组这个参数里,用来标识一个模块的字符串。上面代码中的['lib/foo', 'app/bar', 'app/app']就是一个依赖数组,其中的每个元素都是一个module id。值得注意的是,module id并不一定是该module 相关js路径的一部分,有的module id很短,但可能路径很长,这跟rj的解析规则有关。下一节详细介绍。
rj默认按baseurl module id的规则,解析文件,并且它默认要加载的文件都是js,所以你的module id里面可以不包含.js的后缀,这就是为啥你看到的module id都是lib/foo, app/bar这种形式了。有三种module id,不适用这种规则:
假如main.js如下使用:
require.config({ baseurl: 'scripts' }); require(['/lib/foo', 'test.js', 'http://cdn.baidu.com/js/jquery'], function(foo, bar, app) { // use foo bar app do sth });
这三个module 都不会根据baseurl module id的规则来解析,而是直接用module id来解析,等效于下面的代码:
<script src="/lib/foo.js">script> <script src="test.js">script> <script src="http://cdn.baidu.com/js/jquery.js">script>
各种module id解析举例:
例1,项目结构如下:
main.js如下:
require.config({ baseurl: 'scripts' }); require(['lib/foo', 'app/bar', 'app/app'], function(foo, bar, app) { // use foo bar app do sth });
baseurl为:scripts目录
moduleid为:lib/foo, app/bar, app/app
根据baseurl moduleid,以及自动补后缀.js,最终这三个module的js文件路径为:
scripts/lib/foo.js scripts/app/bar.js scripts/app/app.js
例2,项目结构同例1:
main.js改为:
require.config({ baseurl: 'scripts/lib', paths: { app: '../app' } }); require(['foo', 'app/bar', 'app/app'], function(foo, bar, app) { // use foo bar app do sth });
这里出现了一个新的配置paths,它的作用是针对module id中特定的部分,进行转义,如以上代码中对app这个部分,转义为../app,这表示一个相对路径,相对位置是baseurl所指定的目录,由项目结 构可知,../app其实对应的是scirpt/app目录。正因为有这个转义的存在,所以以上代码中的app/bar才能被正确解析,否则还按 baseurl moduleid的规则,app/bar不是应该被解析成scripts/lib/app/bar.js吗,但实际并非如此,app/bar被解析成 scripts/app/bar.js,其中起关键作用的就是paths的配置。通过这个举例,可以看出module id并不一定是js文件路径中的一部分,paths的配置对于路径过程的js特别有效,因为可以简化它的module id。
另外第一个模块的id为foo,同时没有paths的转义,所以根据解析规则,它的文件路径时:scripts/lib/foo.js。
paths的配置中只有当模块位于baseurl所指定的文件夹的同层目录,或者更上层的目录时,才会用到../这种相对路径。
例3,项目结果同例1,main.js同例2:
这里要说明的问题稍微特殊,不以main.js为例,而以app.js为例,且app依赖bar,当然config还是需要在main.js中定义的,由于这个问题在定义模块的时候更加常见,所以用define来举例,假设app.js模块如下定义:
define(['./bar'], function(bar) { return { dosth: function() { bar.dosth(); } } });
上面的代码通过define定义了一个模块,这个define函数后面介绍如何定义模块的时候再来介绍,这里简单了解。这里这种用法的第一个参数跟 require函数一样,是一个依赖数组,第二个参数是一个回调,也是在所有依赖加载成功之后调用,这个回调的返回值会成为这个模块的引用被其它模块所使 用。
这里要说的问题还是跟解析规则相关的,如果完全遵守rj的解析规则,这里的依赖应该配置成app/bar才是正确的,但由于app.js与 bar.js位于同一个目录,所以完全可利用./这个同目录的相对标识符来解析js,这样的话只要app.js已经加载成功了,那么去同目录下找 bar.js就肯定能找到了。这种配置在定义模块的时候非常有意义,这样你的模块就不依赖于放置这些模块的文件夹名称了。
rj不管是require方法还是define方法的依赖模块都是异步加载的,所以下面的代码不一定能解析到正确的js文件:
//main.js
require.config({ paths: { foo: 'libs/foo-1.1.3' } });
//other.js
require( ['foo'], function( foo ) { //foo is undefined });
由于main.js是异步加载的,所以other.js会比它先加载,但是rj的配置存在于main.js里面,所以在加载other.js读不到rj的配置,在other.js执行的时候解析出来的foo的路径就会变成scripts/foo.js,而正确路径应该是scripts/libs/foo-1.1.3.js。
尽管rj的依赖是异步加载的,但是已加载的模块在多次依赖的时候,不会再重新加载:
define(['require', 'app/bar', 'app/app'], function(require) { var bar= require("app/bar"); var app= require("app/app"); //use bar and app do sth });
上面的代码,在callback定义的时候,只用了一个形参,这主要是为了减少形参的数量,避免整个回调的签名很长。依赖的模块在回调内部可以直接用require(moduleid)的参数得到,由于在回调执行前,依赖的模块已经加载,所以此处调用不会再重新加载。但是如果此处获取一个并不在依赖数组中出现的module id,require很有可能获取不到该模块引用,因为它可能需要重新加载,如果它没有在其它模块中被加载过的话。
rj建议,文件组织尽量扁平,不要多层嵌套,最理想的是跟项目相关的放在一个文件夹,第三方库放在一个文件夹,如下所示:
amd规定的模块定义规范为:
define(id?, dependencies?, factory); 其中: id: 模块标识,可以省略。 dependencies: 所依赖的模块,可以省略。 factory: 模块的实现,或者一个javascript对象
关于第一个参数,本文不会涉及,因为rj建议所有模块都不要使用第一个参数,如果使用第一个参数定义的模块成为命名模块,不适用第一个参数的模块成为匿名模块,命名模块如果更名,所有依赖它的模块都得修改!第二个参数是依赖数组,跟require一样,如果没有这个参数,那么定义的就是一个无依赖的模块;最后一个参数是回调或者是一个简单对象,在模块加载完毕后调用,当然没有第二个参数,最后一个参数也会调用。
本部分所举例都采用如下项目结构:
1. 定义简单对象模块:
app/bar.js
define({ bar:'i am bar.' });
利用main.js测试:
require.config({ baseurl: 'scripts/lib', paths: { app: '../app' } }); require(['app/bar'], function(bar) { console.log(bar);// {bar: 'i am bar.'} });
2. 定义无依赖的模块:
app/nodec.js:
define(function () { return { nodec: "yes, i don't need dependence." } });
利用main.js测试:
require.config({ baseurl: 'scripts/lib', paths: { app: '../app' } }); require(['app/nodec'], function(nodec) { console.log(nodec);// {nodec: yes, i don't need dependence.'} });
3. 定义依赖其它模块的模块:
app/dec.js:
define(['jquery'], function($){ //use $ do sth ... return { usejq: true } });
利用main.js测试:
require.config({ baseurl: 'scripts/lib', paths: { app: '../app' } }); require(['app/dec'], function(dec) { console.log(dec);//{usejq: true} });
4. 循环依赖:
当一个模块foo的依赖数组中存在bar,bar模块的依赖数组中存在foo,就会形成循环依赖,稍微修改下bar.js和foo.js如下。
app/bar.js:
define(['foo'],function(foo){ return { name: 'bar', hi: function(){ console.log('hi! ' foo.name); } } });
lib/foo.js:
define(['app/bar'],function(bar){ return { name: 'foo', hi: function(){ console.log('hi! ' bar.name); } } });
利用main.js测试:
require.config({ baseurl: 'scripts/lib', paths: { app: '../app' } }); require(['app/bar', 'foo'], function(bar, foo) { bar.hi(); foo.hi(); });
运行结果:
如果改变main.js中require部分的依赖顺序,结果:
循环依赖导致两个依赖的module之间,始终会有一个在获取另一个的时候,得到undefined。解决方法是,在定义module的时候,如果用到循环依赖的时候,在define内部通过require重新获取。main.js不变,bar.js改成:
define(['require', 'foo'], function(require, foo) { return { name: 'bar', hi: function() { foo = require('foo'); console.log('hi! ' foo.name); } } });
foo.js改成:
define(['require', 'app/bar'], function(require, bar) { return { name: 'foo', hi: function() { bar = require('app/bar'); console.log('hi! ' bar.name); } } });
利用上述代码,重新执行,结果是:
模块定义总结:不管模块是用回调函数定义还是简单对象定义,这个模块输出的是一个引用,所以这个引用必须是有效的,你的回调不能返回undefined,但是不局限于对象类型,还可以是数组,函数,甚至是基本类型,只不过如果返回对象,你能通过这个对象组织更多的接口。
再看看这个代码:
define(['require', 'app/bar'], function(require) { return { name: 'foo', hi: function() { var bar = require('app/bar'); console.log('hi! ' bar.name); } } });
依赖数组中的require这个moduleid对应的是一个内置模块,利用它加载模块,怎么用你已经看到了,比如在main.js中,在define中。另外一个内置模块是module,这个模块跟rj的另外一个配置有关,具体用法请在第5大部分去了解。
1. 生成相对于模块的url地址
define(["require"], function(require) { var cssurl = require.to; });
这个功能在你想要动态地加载一些文件的时候有用,注意要使用相对路径。
2. 控制台调试
require("module/name").callsomefunction()
假如你想在控制台中查看某个模块都有哪些方法可以调用,如果这个模块已经在页面加载的时候通过依赖被加载过后,那么就可以用以上代码在控制台中做各种测试了。
在rj的配置中,前面已经接触到了baseurl,paths,另外几个常用的配置是:
为那些没有使用define()来声明依赖关系、设置模块的"浏览器全局变量注入"型脚本做依赖和导出配置。
例1:利用exports将模块的全局变量引用与requirejs关联
项目结构如图:
main.js如下:
require.config({ baseurl: 'scripts/lib', paths: { app: '../app' }, shim: { underscore: { exports: '_' } } }); require(['underscore'], function(_) { // 现在可以通过_调用underscore的api了 });
如你所见,rj在shim中添加了一个对underscore这个模块的配置,并通过exports属性指定该模块暴露的全局变量,以便rj能够对这些模块统一管理。
例2:利用deps配置js模块的依赖
项目结构如图:
main.js如下:
require.config({ baseurl: 'scripts/lib', paths: { app: '../app' }, shim: { backbone: { deps: ['underscore', 'jquery'], exports: 'backbone' } } }); require(['backbone'], function(backbone) { //use backbone's api });
由于backbone这个组件依赖jquery和underscore,所以可以通过deps属性配置它的依赖,这样backbone将会在另外两个模块加载完毕之后才会加载。
例3:jquery等库插件配置方法
代码举例如下:
requirejs.config({ shim: { 'jquery.colorize': { deps: ['jquery'], exports: 'jquery.fn.colorize' }, 'jquery.scroll': { deps: ['jquery'], exports: 'jquery.fn.scroll' }, 'backbone.layoutmanager': { deps: ['backbone'] exports: 'backbone.layoutmanager' } } });
常常需要将配置信息传给一个模块。这些配置往往是application级别的信息,需要一个手段将它们向下传递给模块。在requirejs中,基于requirejs.config()的config配置项来实现。要获取这些信息的模块可以加载特殊的依赖“module”,并调用module.config()。
例1:在requirejs.config()中定义config,以供其它模块使用
requirejs.config({ config: { 'bar': { size: 'large' }, 'baz': { color: 'blue' } } });
如你所见,config属性中的bar这一节是在用于module id为bar这个模块的,baz这一节是用于module id为baz这个模块的。具体使用以bar.js举例:
define(['module'], function(module) { //will be the value 'large'var size = module.config().size; });
前面提到过,rj的内置模块除了require还有一个module,用法就在此处,通过它可以来加载config的内容。
如果设置为true,则当一个脚本不是通过define()定义且不具备可供检查的shim导出字串值时,就会抛出错误。这个属性可以强制要求所有rj依赖或加载的模块都要通过define或者shim被rj来管理,同时它还有一个好处就是用于错误检测。
requirejs获取资源时附加在url后面的额外的query参数。作为浏览器或服务器未正确配置时的“cache bust”手段很有用。使用cache bust配置的一个示例:
urlargs: "bust=" (new date()).gettime()
ie中捕获加载错误不完美:
所以为了支持在ie中捕获加载错误,需要配置enforcedefine为true,这不得不要求你所有的模块都用define定义,或者用shim配置rj对它的引用。
注意:如果你设置了enforcedefine: true,而且你使用data-main=""来加载你的主js模块,则该主js模块必须调用define()而不是require()来加载其所需的代码。主js模块仍然可调用require/requirejs来设置config值,但对于模块加载必须使用define()。比如原来的这段就会报错:
require.config({ enforcedefine: true, baseurl: 'scripts/lib', paths: { app: '../app' }, shim: { backbone: { deps: ['underscore', 'jquery'], exports: 'backbone' } } }); require(['backbone'], function(backbone) { console.log(backbone); });
把最后三行改成:
define(['backbone'], function(backbone) { console.log(backbone); });
才不会报错。
requirejs.config({ //to get timely, correct error triggers in ie, force a define/shim exports check. enforcedefine: true, paths: { jquery: [ 'http://ajax.googleapis.com/ajax/libs/jquery/1.4.4/jquery.min', //if the cdn location fails, load from this location 'lib/jquery' ] } }); //later require(['jquery'], function ($) { });
上述代码先尝试加载cdn版本,如果出错,则退回到本地的lib/jquery.js。
注意: paths备错仅在模块id精确匹配时工作。这不同于常规的paths配置,常规配置可匹配模块id的任意前缀部分。备错主要用于非常的错误恢复,而不是常规的path查找解析,因为那在浏览器中是低效的。
为了捕获在局域的errback中未捕获的异常,你可以重载requirejs.onerror():
requirejs.onerror = function (err) { console.log(err.requiretype); if (err.requiretype === 'timeout') { console.log('modules: ' err.requiremodules); } throw err; };
(完)
spark是基于内存的分布式计算引擎,以处理的高效和稳定著称。然而在实际的应用开发过程中,开发者还是会遇到种种问题,其中一大类就是和性能相关。在本文中,笔者将结合自身实践,谈谈如何尽可能地提高应用程序性能。
分布式计算引擎在调优方面有四个主要关注方向,分别是cpu、内存、网络开销和i/o,其具体调优目标如下:
数据倾斜意味着某一个或某几个partition中的数据量特别的大,这意味着完成针对这几个partition的计算需要耗费相当长的时间。
如 果大量数据集中到某一个partition,那么这个partition在计算的时候就会成为瓶颈。图1是spark应用程序执行并发的示意图,在 spark中,同一个应用程序的不同stage是串行执行的,而同一stage中的不同task可以并发执行,task数目由partition数来决 定,如果某一个partition的数据量特别大,则相应的task完成时间会特别长,由此导致接下来的stage无法开始,整个job完成的时间就会非 常长。
要避免数据倾斜的出现,一种方法就是选择合适的key,或者是自己定义相关的partitioner。在spark中block使用 了bytebuffer来存储数据,而bytebuffer能够存储的最大数据量不超过2gb。如果某一个key有大量的数据,那么在调用cache或 persist函数时就会碰到spark-1476这个异常。
下面列出的这些api会导致shuffle操作,是数据倾斜可能发生的关键点所在
1. groupbykey
2. reducebykey
3. aggregatebykey
4. sortbykey
5. join
6. cogroup
7. cartesian
8. coalesce
9. repartition
10. repartitionandsortwithinpartitions
def rdd: rdd[t] } // todo view bounds are deprecated, should use context bounds // might need to change classmanifest for classtag in spark 1.0.0 case class demopairrdd[k <% ordered[k] : classmanifest, v: classmanifest]( rdd: rdd[(k, v)]) extends rddwrapper[(k, v)] { // here we use a single long to try to ensure the sort is balanced, // but for really large dataset, we may want to consider // using a tuple of many longs or even a guid def sortbykeygrouped(numpartitions: int): rdd[(k, v)] = rdd.map(kv => ((kv._1, random.nextlong()), kv._2)).sortbykey() .grouped(numpartitions).map(t => (t._1._1, t._2)) } case class demordd[t: classmanifest](rdd: rdd[t]) extends rddwrapper[t] { def grouped(size: int): rdd[t] = { // todo version where withindex is cached val withindex = rdd.mappartitions(_.zipwithindex) val startvalues = withindex.mappartitionswithindex((i, iter) => iterator((i, iter.toiterable.last))).toarray().tolist .sortby(_._1).map(_._2._2.tolong).scan(-1l)(_ _).map(_ 1l) withindex.mappartitionswithindex((i, iter) => iter.map { case (value, index) => (startvalues(i) index.tolong, value) }) .partitionby(new partitioner { def numpartitions: int = size def getpartition(key: any): int = (key.asinstanceof[long] * numpartitions.tolong / startvalues.last).toint }) .map(_._2) } }
定义隐式的转换
implicit def todemordd[t: classmanifest](rdd: rdd[t]): demordd[t] = new demordd[t](rdd) implicit def todemopairrdd[k <% ordered[k] : classmanifest, v: classmanifest]( rdd: rdd[(k, v)]): demopairrdd[k, v] = demopairrdd(rdd) implicit def tordd[t](rdd: rddwrapper[t]): rdd[t] = rdd.rdd }
在spark-shell中就可以使用了
import rddconversions._ yourrdd.grouped(5)
spark 的shuffle过程非常消耗资源,shuffle过程意味着在相应的计算节点,要先将计算结果存储到磁盘,后续的stage需要将上一个stage的结 果再次读入。数据的写入和读取意味着disk i/o操作,与内存操作相比,disk i/o操作是非常低效的。
使用iostat来查看disk i/o的使用情况,disk i/o操作频繁一般会伴随着cpu load很高。
如果数据和计算节点都在同一台机器上,那么可以避免网络开销,否则还要加上相应的网络开销。 使用iftop来查看网络带宽使用情况,看哪几个节点之间有大量的网络传输。
图2是spark节点间数据传输的示意图,spark task的计算函数是通过akka通道由driver发送到executor上,而shuffle的数据则是通过netty网络接口来实现。由于akka 通道中参数spark.akka.framesize决定了能够传输消息的最大值,所以应该避免在spark task中引入超大的局部变量。
为了提高spark应用程序的效率,尽可能的提升cpu的利用率。并发数应该是可用cpu物理核数的两倍。在这里,并发数过低,cpu得不到充分的利用,并发数过大,由于spark是每一个task都要分发到计算结点,所以任务启动的开销会上升。
并发数的修改,通过配置参数来改变spark.default.parallelism,如果是sql的话,可能通过修改spark.sql.shuffle.partitions来修改。
第1项 repartition vs. coalesce
repartition和coalesce都能实现数据分区的动态调整,但需要注意的是repartition会导致shuffle操作,而coalesce不会。
第2节 reducebykey vs. groupby
groupby操作应该尽可能的避免,第一是有可能造成大量的网络开销,第二是可能导致oom。以wordcount为例来演示reducebykey和groupby的差异
reducebykey sc.textfile(“readme.md”).map(l=>l.split(“,”)).map(w=>(w,1)).reducebykey(_ _)
shuffle过程如图2所示
groupbykey sc.textfile(“readme.md”).map(l=>l.split(“,”)).map(w=>(w,1)).groupbykey.map(r=>(r._1,r._2.sum))
建议: 尽可能使用reducebykey, aggregatebykey, foldbykey和combinebykey
假设有一rdd如下所示,求每个key的均值
val data = sc.parallelize( list((0, 2.), (0, 4.), (1, 0.), (1, 10.), (1, 20.)) )
方法一:reducebykey
data.map(r=>(r._1, (r.2,1))).reducebykey((a,b)=>(a._1 b._1, a._2 b._2)).map(r=>(r._1,(r._2._1/r._2._2)).foreach(println)
方法二:combinebykey
data.combinebykey(value=>(value,1), (x:(double, int), value:double)=> (x._1 value, x._2 1), (x:(double,int), y:(double, int))=>(x._1 y._1, x._2 y._2))
第3节 broadcasthashjoin vs. shufflehashjoin
在join过程中,经常会遇到大表和小表的join. 为了提高效率可以使用broadcasthashjoin, 预先将小表的内容广播到各个executor, 这样将避免针对小表的shuffle过程,从而极大的提高运行效率。
其实broadcasthashjoin核心就是利用了broadcast函数,如果理解清楚broadcast的优点,就能比较好的明白broadcasthashjoin的优势所在。
以下是一个简单使用broadcast的示例程序。
val lst = 1 to 100 tolist val examplerdd = sc.makerdd(1 to 20 toseq, 2) val broadcastlst = sc.broadcast(lst) examplerdd.filter(i=>broadcastlst.valuecontains(i)).collect.foreach(println)
第4节 map vs. mappartitions
有时需要将计算结果存储到外部数据库,势必会建立到外部数据库的连接。应该尽可能的让更多的元素共享同一个数据连接而不是每一个元素的处理时都去建立数据库连接。
在这种情况下,mappartitions和foreachpartitons将比map操作高效的多。
第5节 数据就地读取
移动计算的开销远远低于移动数据的开销。
spark中每个task都需要相应的输入数据,因此输入数据的位置对于task的性能变得很重要。按照数据获取的速度来区分,由快到慢分别是:
1.process_local
2.node_local
3.rack_local
spark在task执行的时候会尽优先考虑最快的数据获取方式,如果想尽可能的在更多的机器上启动task,那么可以通过调低spark.locality.wait的值来实现, 默认值是3s。
除 了hdfs,spark能够支持的数据源越来越多,如cassandra, hbase,mongodb等知名的nosql数据库,随着elasticsearch的日渐兴起,spark和elasticsearch组合起来提供 高速的查询凯发天生赢家一触即发官网的解决方案也成为一种有益的尝试。
上述提到的外部数据源面临的一个相同问题就是如何让spark快速读取其中的数据, 尽可能的将计算结点和数据结点部署在一起是达到该目标的基本方法,比如在部署hadoop集群的时候,可以将hdfs的datanode和spark worker共享一台机器。
以cassandra为例,如果spark的部署和cassandra的机器有部分重叠,那么在读取cassandra中数据的时候,通过调低spark.locality.wait就可以在没有部署cassandra的机器上启动spark task。
对于cassandra, 可以在部署cassandra的机器上部署spark worker,需要注意的是cassandra的compaction操作会极大的消耗cpu,因此在为spark worker配置cpu核数时,需要将这些因素综合在一起进行考虑。
这一部分的代码逻辑可以参考源码tasksetmanager::addpendingtask
private def addpendingtask(index: int, readding: boolean = false) { // utility method that adds `index` to a list only if readding=false or it's not already there def addto(list: arraybuffer[int]) { if (!readding || !list.contains(index)) { list = index } } for (loc <- tasks(index).preferredlocations) { loc match { case e: executorcachetasklocation => addto(pendingtasksforexecutor.getorelseupdate(e.executorid, new arraybuffer)) case e: hdfscachetasklocation => { val exe = sched.getexecutorsaliveonhost(loc.host) exe match { case some(set) => { for (e <- set) { addto(pendingtasksforexecutor.getorelseupdate(e, new arraybuffer)) } loginfo(s"pending task $index has a cached location at ${e.host} " ", where there are executors " set.mkstring(",")) } case none => logdebug(s"pending task $index has a cached location at ${e.host} " ", but there are no executors alive there.") } } case _ => unit } addto(pendingtasksforhost.getorelseupdate(loc.host, new arraybuffer)) for (rack <- sched.getrackforhost(loc.host)) { addto(pendingtasksforrack.getorelseupdate(rack, new arraybuffer)) } } if (tasks(index).preferredlocations == nil) { addto(pendingtaskswithnoprefs) } if (!readding) { allpendingtasks = index // no point scanning this whole list to find the old task there } }
如果准备让spark支持新的存储源,进而开发相应的rdd,与位置相关的部分就是自定义getpreferredlocations函数,以elasticsearch-hadoop中的esrdd为例,其代码实现如下。
override def getpreferredlocations(split: partition): seq[string] = { val essplit = split.asinstanceof[espartition] val ip = essplit.espartition.nodeip if (ip != null) seq(ip) else nil }
第6节 序列化
使用好的序列化算法能够提高运行速度,同时能够减少内存的使用。
spark在shuffle的时候要将数据先存储到磁盘中,存储的内容是经过序列化的。序列化的过程牵涉到两大基本考虑的因素,一是序列化的速度,二是序列化后内容所占用的大小。
kryoserializer与默认的javaserializer相比,在序列化速度和序列化结果的大小方面都具有极大的优势。所以建议在应用程序配置中使用kryoserializer.
spark.serializer org.apache.spark.serializer.kryoserializer
默认的cache没有对缓存的对象进行序列化,使用的storagelevel是memory_only,这意味着要占用比较大的内存。可以通过指定persist中的参数来对缓存内容进行序列化。
examplerdd.persist(memory_only_ser)
需要特别指出的是persist函数是等到job执行的时候才会将数据缓存起来,属于延迟执行; 而unpersist函数则是立即执行,缓存会被立即清除。
作者简介:许鹏, 《apache spark源码剖析》作者,关注于大数据实时搜索和实时流数据处理,对elasticsearch, storm及drools多有研究,现就职于携程。
play uses , which supports the inclusion of arbitrary files by adding them to the mappings:
mappings in universal = (basedirectory.value / "scripts" * "*" get) map (x => x -> ("scripts/" x.getname))
the syntax assumes play 2.2.xval jdk8 = new file("d:\\jdk\\jdk8\\jre1_8_0_40")
mappings in universal = (jdk8 ** "*" get) map (x => x -> ("jre8/" jdk8.relativize(x).getorelse(x.getname)))
一直听说word2vec在处理词与词的相似度的问题上效果十分好,最近自己也上手跑了跑google开源的代码()。
1、语料
首先准备数据:采用网上博客上推荐的全网新闻数据(sogouca),大小为2.1g。
1 wget ftp://ftp.labs.sogou.com/data/sogouca/sogouca.tar.gz --ftp-user=hebin_hit@foxmail.com --ftp-password=4fqlsydncrdxvndi -r
解压数据包:
1 gzip -d sogouca.tar.gz 2 tar -xvf sogouca.tar
再将生成的txt文件归并到sogouca.txt中,取出其中包含content的行并转码,得到语料corpus.txt,大小为2.7g。
1 cat *.txt > sogouca.txt 2 cat sogouca.txt | iconv -f gbk -t utf-8 -c | grep "" > corpus.txt
2、分词
用ansj对corpus.txt进行分词,得到分词结果resultbig.txt,大小为3.1g。
1 nohup ./word2vec -train resultbig.txt -output vectors.bin -cbow 0 -size 200 -window 5 -negative 0 -hs 1 -sample 1e-3 -threads 12 -binary 1 &
vectors.bin是word2vec处理resultbig.txt后生成的词的向量文件,在实验室的服务器上训练了1个半小时。
1 ./distance vectors.bin
./distance可以看成计算词与词之间的距离,把词看成向量空间上的一个点,distance看成向量空间上点与点的距离。
下面是一些例子:
4.2 潜在的语言学规律
4.3 聚类
将经过分词后的语料resultbig.txt中的词聚类并按照类别排序:
1 nohup ./word2vec -train resultbig.txt -output classes.txt -cbow 0 -size 200 -window 5 -negative 0 -hs 1 -sample 1e-3 -threads 12 -classes 500 & 2 sort classes.txt -k 2 -n > classes_sorted_sogouca.txt
例如:
4.4 短语分析
先利用经过分词的语料resultbig.txt中得出包含词和短语的文件sogouca_phrase.txt,再训练该文件中词与短语的向量表示。
1 ./word2phrase -train resultbig.txt -output sogouca_phrase.txt -threshold 500 -debug 2 2 ./word2vec -train sogouca_phrase.txt -output vectors_sogouca_phrase.bin -cbow 0 -size 300 -window 10 -negative 0 -hs 1 -sample 1e-3 -threads 12 -binary 1
下面是几个计算相似度的例子:
5、参考链接:
1. word2vec:tool for computing continuous distributed representations of words,
2. 用中文把玩google开源的deep-learning项目word2vec,
3. 利用word2vec对关键词进行聚类,
6、后续准备仔细阅读的文献:
[1] tomas mikolov, kai chen, greg corrado, and jeffrey dean. . in proceedings of workshop at iclr, 2013.
[2] tomas mikolov, ilya sutskever, kai chen, greg corrado, and jeffrey dean. . in proceedings of nips, 2013.
[3] tomas mikolov, wen-tau yih, and geoffrey zweig. . in proceedings of naacl hlt, 2013.
[4] collobert r, weston j, bottou l, et al. . the journal of machine learning research, 2011, 12: 2493-2537.