keytool 是一个java数据证书的管理工具 ,keytool将密钥(key)和证书(certificates)存在一个称为keystore的文件中在keystore里,包含两种数据:密钥实体(key entity)-密钥(secret key)或者是私钥和配对公钥(采用非对称加密)可信任的证书实体(trusted certificate entries)-只包含公钥.
缺省情况下,-list 命令打印证书的 md5 指纹。而如果指定了 -v 选项,将以可读格式打印证书,如果指定了 -rfc 选项,将以可打印的编码格式输出证书。
add new device后下载速度太慢了,容易失败
1、设置http代理,在setting->network,自己设置http proxy和port,
遇到下载失败或者下载太慢,win r打开运行框,输入 %appdata%, 再点击上一步(alt ↑ ),找到local文件夹里的genymobile,打开 查看里面的genymotion.log文件,
[genymotion] [debug] downloading file
将http://file........ova 这个虚拟镜像地址直接用迅雷极速版下载,或者使用迅雷离线下载等功能很快能完成下载
2、把下载的文件复制到c:\users\用户主目录\appdata\local\genymobile\genymotion\ova 中覆盖里面以随机数命名的对应镜像。实际上就是刚才看到genymotion软件刚刚点击下载的那个镜像,
点击start ,启动模拟器,开始使用
本经验目前在已有搜狗输入法 for linux和sublime text 3的情况下安装成功。
cd ~
gcc -shared -o libsublime-imfix.so sublime_imfix.c `pkg-config --libs --cflags gtk -2.0` -fpic
sudo apt-get install build-essential
sudo apt-get install libgtk2.0-dev
sudo mv libsublime-imfix.so /opt/sublime_text/
修改sublime-text-2.desktop注意:sublime_text.desktop不同版本有所不同,请调整为自己安装版本的路径sudo vim /usr/share/applications/sublime_text.desktop
在上一篇博客中,我们分别介绍了两种方法来进行 http 的模拟请求:httpurlconnection
和 httpclient
,到目前为止这两种方法都工作的很好,基本上可以实现我们需要的 get/post 方法的模拟。对于一个爬虫来说,能发送 http 请求,能获取页面数据,能解析网页内容,这相当于已经完成 80% 的工作了。只不过对于剩下的这 20% 的工作,还得花费我们另外 80% 的时间 :-)
在这篇博客里,我们将介绍剩下 20% 的工作中最为重要的一项:如何在 java 中使用 http 代理,代理也是爬虫技术中的重要一项。你如果要大规模的爬别人网页上的内容,必然会对人家的网站造成影响,如果你太拼了,就会遭人查封。要防止别人查封我 们,我们要么将自己的程序分布到大量机器上去,但是对于资金和资源有限的我们来说这是很奢侈的;要么就使用代理技术,从网上捞一批代理,免费的也好收费的 也好,或者购买一批廉价的 vps 来搭建自己的代理服务器。关于如何搭建自己的代理服务器,后面有时间的话我再写一篇关于这个话题的博客。现在有了一大批代理服务器之后,就可以使用我们这 篇博客所介绍的技术了。
我们先从最简单的开始,网上有很多免费代理,直接上百度搜索 “免费代理” 或者 “http 代理” 就能找到很多(虽然网上能找到大量的免费代理,但它们的安全性已经有很多文章讨论过了,也有专门的文章对此进行调研的,譬如,我在这里就不多作说明,如果你的爬虫爬取的信息并没有什么特别的隐私问题,可以忽略之,如果你的爬虫涉及一些例如模拟登录之类的功能,考虑到安全性,我建议你还是不要使用网上公开的免费代理,而是搭建自己的代理服务器比较靠谱)。
httpurlconnection 的 openconnection()
方法可以传入一个 proxy 参数,如下:
1 2 3 | proxy proxy = new proxy(proxy.type.http, new inetsocketaddress( "" , 9876 )); url obj = new ; httpurlconnection con = (httpurlconnection) obj.openconnection(proxy); |
ok 了,就这么简单!
不仅如此,我们注意到 proxy 构造函数的第一个参数为枚举类型 proxy.type.http
,那么很显然,如果将其修改为 proxy.type.socks
即可以使用 socks 代理。
由于 httpclient
非常灵活,使用 httpclient 来连接代理有很多不同的方法。最简单的方法莫过于下面这样:
1 2 3 4 | httphost proxy = new httphost( "" , 9876 , "http" ); closeablehttpclient httpclient = httpclients.createdefault(); httpget request = new httpget(url); closeablehttpresponse response = httpclient.execute(proxy, request); |
和上一篇中使用 httpclient 发送请求的代码几乎一样,只是 httpclient.execute()
方法多加了一个参数,第一参数为 httphost
这里要注意一点的是,虽然这里的 new httphost()
和上面的 new proxy()
一样,也是可以指定协议类型的,但是遗憾的是 httpclient 默认是不支持 socks 协议的,如果我们使用下面的代码:
1 | httphost proxy = new httphost( "" , 1080 , "socks" ); |
org.apache.http.conn.unsupportedschemeexception: socks protocol is not supported
如果希望 httpclient 支持 socks 代理,可以参看这里: 通过 httpclient 提供的 connectionsocketfactory 类来实现。
虽然使用这种方式很简单,只需要加个参数就可以了,但是其实看 httpclient 的代码注释,如下:
1 2 3 4 5 6 7 | /* * @param target the target host for the request. * implementations may accept * if they can still determine a route, for example * to a default target or by inspecting the request. * @param request the request to execute */ |
可以看到第一个参数 target 并不是代理,它的真实作用是 执行请求的目标主机,这个解释有点模糊,什么叫做 执行请求的目标主机?代理算不算执行请求的目标主机呢?因为按常理来讲,执行请求的目标主机 应该是要请求 url 对应的站点才对。如果不算的话,为什么这里将 target 设置成代理也能正常工作?这个我也不清楚,还需要进一步研究下 httpclient 的源码来深入了解下。
除了上面介绍的这种方式(自己写的,不推荐使用)来使用代理之外,httpclient 凯发k8网页登录官网还提供了几个示例,我将其作为推荐写法记录在此。
第一种写法是使用 ,如下:
1 2 3 4 5 6 7 8 9 10 | closeablehttpclient httpclient = httpclients.createdefault(); httpget request = new httpget(url); request.setconfig( requestconfig.custom() .setproxy( new httphost( "" , 8888 , "http" )) .build() ); closeablehttpresponse response = httpclient.execute(request); |
第二种写法是使用 ,如下:
1 2 3 4 5 6 7 | httphost proxy = new httphost( "" , 9876 , "http" ); defaultproxyrouteplanner routeplanner = new defaultproxyrouteplanner(proxy); closeablehttpclient httpclient = httpclients.custom() .setrouteplanner(routeplanner) .build(); httpget request = new httpget(url); closeablehttpresponse response = httpclient.execute(request); |
我们在调试 http 爬虫程序时,常常需要切换代理来测试,有时候直接使用系统自带的代理配置将是一种简单的方法。以前在做 .net 项目时,程序默认使用 internet 网络设置中配的代理,遗憾的是,我这里说的系统代理配置指的 jvm 系统,而不是操作系统,我还没找到简单的方法来让 java 程序直接使用 windows 系统下的代理配置。
尽管如此,系统代理使用起来还是很简单的。一般有下面两种方式可以设置 jvm 的代理配置:
java 中的 system
类不仅仅是用来给我们 system.out.println()
打印信息的,它其实还有很多静态方法和属性可以用。其中 system.setproperty()
可以通过下面的方式来分别设置 http 代理,https 代理和 socks 代理:
1 2 3 4 5 6 7 8 9 10 11 12 | // http 代理,只能代理 http 请求 system.setproperty( "http.proxyhost" , "" ); system.setproperty( "http.proxyport" , "9876" ); // https 代理,只能代理 https 请求 system.setproperty( "https.proxyhost" , "" ); system.setproperty( "https.proxyport" , "9876" ); // socks 代理,支持 http 和 https 请求 // 注意:如果设置了 socks 代理就不要设 http/https 代理 system.setproperty( "socksproxyhost" , "" ); system.setproperty( "socksproxyport" , "1080" ); |
和 socksproxyport
中间没有小数点1 2 3 | // 同时支持代理 http/https 请求 system.setproperty( "proxyhost" , "" ); system.setproperty( "proxyport" , "9876" ); |
可以使用 system.setproperty()
方法来设置系统代理,也可以直接将这些参数通过 jvm 的命令行参数来指定。如果你使用的是 eclipse ,可以按下面的步骤来设置:
-dproxyhost= -dproxyport=9876
对于 httpurlconnection
类来说,程序不用做任何变动,它会默认使用系统代理。但是 httpclient
默认是不使用系统代理的,如果想让它默认使用系统代理,可以通过 systemdefaultrouteplanner
和 proxyselector
1 2 3 4 5 6 | systemdefaultrouteplanner routeplanner = new systemdefaultrouteplanner(proxyselector.getdefault()); closeablehttpclient httpclient = httpclients.custom() .setrouteplanner(routeplanner) .build(); httpget request = new httpget(url); closeablehttpresponse response = httpclient.execute(request); |
<move todir="${project.build.directory}/${project.artifactid}-${version}/com/duxiu/demo/app">
<fileset dir="${project.build.directory}/classes/com/duxiu/demo/app">
<include name="*.class" />
这样做的最终效果就是修改了项目的运行方式。原先的运行方式是以tomcat为中心,由tomcat来启动和终止项目,现在是由我们的启动程序 为中心,由启动程序来负责启动和终止项目。就相当于现在流行的cs程序一样,有单独的启动脚本,在启动时进行环境预初始化,更新程序以及其它操作,待完成 之后再进行最终的项目启动。
这篇主要讲解如何使用embeded tomcat在代码中进行启动和终止。网上的一般文章均为tomca5.x来做,这里使用了最新的tomcat7,因为tomcat7为embeded开 发,单独发布了org.apache.tomcat.embed包,以进行独立的embed开发。以下是相应的maven包
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 | < dependency > < groupid >org.apache.tomcat.embed
1 2 3 4 5 6 7 | //设置工作目录 string catalina_home = "d:/" ; tomcat tomcat = new tomcat(); tomcat.sethostname( "localhost" ); tomcat.setport(startport); //设置工作目录,其实没什么用,tomcat需要使用这个目录进行写一些东西 tomcat.setbasedir(catalina_home); |
上面使用了tomcat类来进行启动类,在tomcat7以前均是使用一个叫embed类来进行启动,在tomcat7之后,embed类被不建 议使用,而建议使用新的tomcat类来进行启动了。然后设置主机名,端口,再设置一个工作目录。这个工作目录可以是任意目录,主要是tomcat需要这 个目录来记录一些东西,比如记录word信息,日志信息(如果配置了日志的话),以及临时文件存储等。
1 2 3 4 5 6 7 8 | //设置程序的目录信息 tomcat.gethost().setappbase( "e:/" ); // add aprlifecyclelistener standardserver server = (standardserver) tomcat.getserver(); aprlifecyclelistener listener = new aprlifecyclelistener(); server.addlifecyclelistener(listener); //注册关闭端口以进行关闭 tomcat.getserver().setport(shutdownport); |
上面的代码,首先设置我们的项目程序所在的appbase,即放项目代码的地方。在通常的tomcat配置中,这个目录一般是webapps。接 着设置一个listener,这个listener主要是负责启动一些比如html native支持程序以及ipv6等信息配置(可以忽略)。接着是配置一个关闭的注册端口,当向这个端口发送信息时,就可以达到关闭tomcat的目的 (后面会讲)。
1 2 3 4 5 6 7 8 9 | //加载上下文 standardcontext standardcontext = new standardcontext(); standardcontext.setpath( "/aa" ); //contextpath standardcontext.setdocbase( "aa" ); //文件目录位置 standardcontext.addlifecyclelistener( new tomcat.defaultwebxmllistener()); //保证已经配置好了。 standardcontext.addlifecyclelistener( new tomcat.fixcontextlistener()); standardcontext.setsessioncookiename( "t-session" ); tomcat.gethost().addchild(standardcontext); |
我们单独使用了一个context来为这个host添加上下文,tomcat本身提供一个方法tomcat.addweb方法来添加项目包,不过 由于这里需要单独设置一个tomcat的sessionname,所以使用与与tomcat.addweb实现类似的方法来添加一个项目包。
以上代码中有两个需要注意的listener,一个是defaultwebxmllistener,这个是由tomcat加载一些默认的配置信 息,比如jspservlet,以及一些繁复的mime/type信息;加上这个,就不需要我们自己去写这么多的配置,因为每个项目都需要这些。这个配置 与tomcat目录下的conf/web.xml中的配置一样,只不过这里是代码化了。第二个是fixcontextlistener,这个主要是在项目 部署完后,将这个上下文设置为configured,表示已经配置好了(不然,tomcat启动时会报错,即相应上下文还未配置好)。
1 2 | tomcat.start(); tomcat.getserver().await(); |
1 2 3 4 5 6 7 8 9 | private static void shutdown() throws exception { socket socket = new socket( "localhost" , shutdownport); outputstream stream = socket.getoutputstream(); for ( int i = 0 ;i < shutdown.length();i ) stream.write(shutdown.charat(i)); stream.flush(); stream.close(); socket.close(); } |
实际上看整个项目代码,项目代码的运行,就是一个配置一个基础的server.xml(即tomcat目录下的 conf/server.xml),先配置运行端口,关闭监听端口;然后配置运行的host以及添加一个上下文context,最后就开始运行并开始监 听。对照这个程序,再看一下server.xml中的配置信息,就很容易明白以上这段代码了。
可以用命令man krb5.conf
default = file:/var/log/krb5libs.log
kdc = file:/var/log/krb5kdc.log
admin_server = file:/var/log/kadmind.log
default_realm = example.com
dns_lookup_realm = false
dns_lookup_kdc = false
ticket_lifetime = 24h
renew_lifetime = 7d
forwardable = true
example.com = {
kdc = example.com
admin_server = example.com
.example.com = example.com
example.com = example.com
: the name of the host running a kdc for that realm.
: identifies the host where the administration server is running. typically this is the master kerberos server.
: provides a translation from a hostname to the kerberos realm name for the service provided by that host.
kerberos is a network authentication system based on the principal of a trusted third party. the other two parties being the user and the service the user wishes to authenticate to. not all services and applications can use kerberos, but for those that can, it brings the network environment one step closer to being single sign on (sso).
this section covers installation and configuration of a kerberos server, and some example client configurations.
if you are new to kerberos there are a few terms that are good to understand before setting up a kerberos server. most of the terms will relate to things you may be familiar with in other environments:
principal: any users, computers, and services provided by servers need to be defined as kerberos principals.
instances: are used for service principals and special administrative principals.
realms: the unique realm of control provided by the kerberos installation. usually the dns domain converted to uppercase (example.com).
key distribution center: (kdc) consist of three parts, a database of all principals, the authentication server, and the ticket granting server. for each realm there must be at least one kdc.
ticket granting ticket: issued by the authentication server (as), the ticket granting ticket (tgt) is encrypted in the user's password which is known only to the user and the kdc.
ticket granting server: (tgs) issues service tickets to clients upon request.
tickets: confirm the identity of the two principals. one principal being a user and the other a service requested by the user. tickets establish an encryption key used for secure communication during the authenticated session.
keytab files: are files extracted from the kdc principal database and contain the encryption key for a service or host.
to put the pieces together, a realm has at least one kdc, preferably two for redundancy, which contains a database of principals. when a user principal logs into a workstation, configured for kerberos authentication, the kdc issues a ticket granting ticket (tgt). if the user supplied credentials match, the user is authenticated and can then request tickets for kerberized services from the ticket granting server (tgs). the service tickets allow the user to authenticate to the service without entering another username and password.
before installing the kerberos server a properly configured dns server is needed for your domain. since the kerberos realm by convention matches the domain name, this section uses the example.com domain configured in .
also, kerberos is a time sensitive protocol. so if the local system time between a client machine and the server differs by more than five minutes (by default), the workstation will not be able to authenticate. to correct the problem all hosts should have their time synchronized using the network time protocol (ntp). for details on setting up ntp see .
the first step in installing a kerberos realm is to install the krb5-kdc and krb5-admin-server packages. from a terminal enter:
sudo apt-get install krb5-kdc krb5-admin-server
you will be asked at the end of the install to supply a name for the kerberos and admin servers, which may or may not be the same server, for the realm.
next, create the new realm with the kdb5_newrealm utility:
sudo krb5_newrealm
the questions asked during installation are used to configure the /etc/krb5.conf
file. if you need to adjust the key distribution center (kdc) settings simply edit the file and restart the krb5-kdc daemon.
now that the kdc running an admin user is needed. it is recommended to use a different username from your everyday username. using the kadmin.local utility in a terminal prompt enter:
sudo kadmin.localauthenticating as principal root/admin@example.com with password. kadmin.local:
addprinc steve/admin
warning: no policy specified for steve/admin@example.com; defaulting to no policy enter password for principal "steve/admin@example.com": re-enter password for principal "steve/admin@example.com": principal "steve/admin@example.com" created. kadmin.local:
in the above example steve is the principal, /admin is an instance, and @example.com signifies the realm. the "every day" principal would be steve@example.com, and should have only normal user rights.
replace example.com and steve with your realm and admin username. |
next, the new admin user needs to have the appropriate access control list (acl) permissions. the permissions are configured in the /etc/krb5kdc/kadm5.acl
steve/admin@example.com *
this entry grants steve/admin the ability to perform any operation on all principals in the realm.
now restart the krb5-admin-server for the new acl to take affect:
sudo /etc/init.d/krb5-admin-server restart
the new user principal can be tested using the kinit utility:
kinit steve/admin steve/admin@example.com's password:
after entering the password, use the klist utility to view information about the ticket granting ticket (tgt):
klist credentials cache: file:/tmp/krb5cc_1000 principal: steve/admin@example.com issued expires principal jul 13 17:53:34 jul 14 03:53:34 krbtgt/example.com@example.com
you may need to add an entry into the /etc/hosts
for the kdc. for example: kdc01.example.com kdc01
replacing with the ip address of your kdc.
in order for clients to determine the kdc for the realm some dns srv records are needed. add the following to /etc/named/db.example.com
_kerberos._udp.example.com. in srv 1 0 88 kdc01.example.com. _kerberos._tcp.example.com. in srv 1 0 88 kdc01.example.com. _kerberos._udp.example.com. in srv 10 0 88 kdc02.example.com. _kerberos._tcp.example.com. in srv 10 0 88 kdc02.example.com. _kerberos-adm._tcp.example.com. in srv 1 0 749 kdc01.example.com. _kpasswd._udp.example.com. in srv 1 0 464 kdc01.example.com.
replace example.com, kdc01, and kdc02 with your domain name, primary kdc, and secondary kdc. |
see for detailed instructions on setting up dns.
your new kerberos realm is now ready to authenticate clients.
once you have one key distribution center (kdc) on your network, it is good practice to have a secondary kdc in case the primary becomes unavailable.
first, install the packages, and when asked for the kerberos and admin server names enter the name of the primary kdc:
sudo apt-get install krb5-kdc krb5-admin-server
once you have the packages installed, create the secondary kdc's host principal. from a terminal prompt, enter:
kadmin -q "addprinc -randkey host/kdc02.example.com"
after, issuing any kadmin commands you will be prompted for your username/admin@example.com principal password. |
extract the keytab file:
kadmin -q "ktadd -k keytab.kdc02 host/kdc02.example.com"
there should now be a keytab.kdc02
in the current directory, move the file to /etc/krb5.keytab
sudo mv keytab.kdc02 /etc/krb5.keytab
if the path to the |
also, you can list the principals in a keytab file, which can be useful when troubleshooting, using the klist utility:
sudo klist -k /etc/krb5.keytab
next, there needs to be a kpropd.acl
file on each kdc that lists all kdcs for the realm. for example, on both primary and secondary kdc, create /etc/krb5kdc/kpropd.acl
host/kdc01.example.com@example.com host/kdc02.example.com@example.com
create an empty database on the secondary kdc:
sudo kdb5_util -s create
now start the kpropd daemon, which listens for connections from the kprop utility. kprop is used to transfer dump files:
sudo kpropd -s
from a terminal on the primary kdc, create a dump file of the principal database:
sudo kdb5_util dump /var/lib/krb5kdc/dump
extract the primary kdc's keytab file and copy it to /etc/krb5.keytab
kadmin -q "ktadd -k keytab.kdc01 host/kdc01.example.com" sudo mv keytab.kdc01 /etc/kr5b.keytab
make sure there is a host for kdc01.example.com before extracting the keytab. |
using the kprop utility push the database to the secondary kdc:
sudo kprop -r example.com -f /var/lib/krb5kdc/dump kdc02.example.com
there should be a succeeded message if the propagation worked. if there is an error message check |
you may also want to create a cron job to periodically update the database on the secondary kdc. for example, the following will push the database every hour:
# m h dom mon dow command 0 * * * * /usr/sbin/kdb5_util dump /var/lib/krb5kdc/dump && /usr/sbin/kprop -r example.com -f /var/lib/krb5kdc/dump kdc02.example.com
back on the secondary kdc, create a stash file to hold the kerberos master key:
sudo kdb5_util stash
finally, start the krb5-kdc daemon on the secondary kdc:
sudo /etc/init.d/krb5-kdc start
the secondary kdc should now be able to issue tickets for the realm. you can test this by stopping the krb5-kdc daemon on the primary kdc, then use kinit to request a ticket. if all goes well you should receive a ticket from the secondary kdc.
this section covers configuring a linux system as a kerberos client. this will allow access to any kerberized services once a user has successfully logged into the system.
in order to authenticate to a kerberos realm, the krb5-user and libpam-krb5 packages are needed, along with a few others that are not strictly necessary but make life easier. to install the packages enter the following in a terminal prompt:
sudo apt-get install krb5-user libpam-krb5 libpam-ccreds auth-client-config
the auth-client-config package allows simple configuration of pam for authentication from multiple sources, and the libpam-ccreds will cache authentication credentials allowing you to login in case the key distribution center (kdc) is unavailable. this package is also useful for laptops that may authenticate using kerberos while on the corporate network, but will need to be accessed off the network as well.
to configure the client in a terminal enter:
sudo dpkg-reconfigure krb5-config
you will then be prompted to enter the name of the kerberos realm. also, if you don't have dns configured with kerberos srv records, the menu will prompt you for the hostname of the key distribution center (kdc) and realm administration server.
the dpkg-reconfigure adds entries to the /etc/krb5.conf
file for your realm. you should have entries similar to the following:
[libdefaults] default_realm = example.com ... [realms] example.com = } kdc = admin_server = }
you can test the configuration by requesting a ticket using the kinit utility. for example:
kinit steve@example.com password for steve@example.com:
when a ticket has been granted, the details can be viewed using klist:
klist ticket cache: file:/tmp/krb5cc_1000 default principal: steve@example.com valid starting expires service principal 07/24/08 05:18:56 07/24/08 15:18:56 krbtgt/example.com@example.com renew until 07/25/08 05:18:57 kerberos 4 ticket cache: /tmp/tkt1000 klist: you have no tickets cached
next, use the auth-client-config to configure the libpam-krb5 module to request a ticket during login:
sudo auth-client-config -a -p kerberos_example
you will should now receive a ticket upon successful login authentication.
for more information on kerberos see the site.
the page has more details.
o'reilly's is a great reference when setting up kerberos.
also, feel free to stop by the #ubuntu-server irc channel on if you have kerberos questions.
借用mysql 的 auto_increment 特性可以产生唯一的可靠id。
表定义,关键在于auto_increment,和unique key的设置:
1 2 3 4 5 6 | create table `tickets64` ( `id` bigint (20) unsigned not null auto_increment, `stub` char (1) not null default '' , primary key (`id`), unique key `stub` (`stub`) ) engine=myisam |
需要使用时,巧用replace into语法来获取值,结合表定义的unique key,确保了一条记录就可以满足id生成器的需求:
1 2 | replace into tickets64 (stub) values ( 'a' ); select last_insert_id(); |
1 2 3 | it is multi-user safe because multiple clients can issue the update statement and get their own sequence value with the select statement (or mysql_insert_id()), without affecting or being affected by other clients that generate their own sequence values. |
需要注意的是,若client采用php,则不能使用mysql_insert_id()获取id,原因见《mysql_insert_id() 在bigint型ai字段遇到的问题》:http://kaifage.com/notes/99/mysql-insert-id-issue- with-bigint-ai-field.html。
flickr 采取了此方案: http://code.flickr.net/2010/02/08/ticket-servers-distributed-unique-primary-keys-on-the-cheap/
public static,对应的整数就是二进制的:1001,也就是9。如下:
…… | native | transient | volatile | synchronized | final | static | protected | private | public |
| 0 | 0 | 0 | 0 | 0 | 1 | 0 | 0 | 1 |
mysql 5.1已经到了beta版,官方网站上也陆续有一些文章介绍,比如上次看到的。在使用分区的前提下,可以用mysql实现非常大的数据量存储。今天在mysql的站上又看到一篇进阶的文章 —— 。如果能够实现按日期分区,这对某些时效性很强的数据存储是相当实用的功能。下面是从这篇文章中摘录的一些内容。
error 1064 (42000): values value must be of same type as partition function near '),
partition p1 values less than ('2010-01-01'))' at line 3
ceiling() and floor() (在使用这2个函数的建立分区表的前提是使用函数的分区键是int类型),例如
mysql> create table t (c float) partition by list( floor(c) )( -> partition p0 values in (1,3,5), -> partition p1 values in (2,4,6) -> );; error 1491 (hy000): the partition function returns the wrong typemysql> create table t (c int) partition by list( floor(c) )( -> partition p0 values in (1,3,5), -> partition p1 values in (2,4,6) -> ); query ok, 0 rows affected (0.01 sec)
4 3 | i have a need to create a server farm that can handle 5 million connections, 5 million topics (one per client), process 300k messages/sec. i tried to see what various message brokers were capable so i am currently using two rhel ec2 instances (r3.4xlarge) to make lots of available resources. so you do not need to look it up, it has 16vcpu, 122gb ram. i am nowhere near that limit in usage. i am unable to pass the 600k connections limit. since there doesn't seem to be any o/s limitation (plenty of ram/cpu/etc.) on either the client nor the server what is limiting me? i have edited /etc/security/limits.conf as follows:
i have edited /etc/sysctl.conf as follows:
for apollo: export apollo_ulimit=20000000 for activemq:
i created 20 additional private addresses for eth0 on the client, then assigned them: ip addr add dev eth0 i am fully aware of the 65k port limits which is why i did the above.
| ||||||||||||
2 | answer: while doing this i realized that i had a misspelling in my client setting within /etc/sysctl.conf file for: net.ipv4.ip_local_port_range i am now able to connect 956,591 mqtt clients to my apollo server in 188sec. more info: trying to isolate if this is an o/s connection limitation or a broker, i decided to write a simple client/server. the server:
the client:
with 21 ips, i would expect 65535-1024*21 = 1354731 to be the boundary. in reality i am able to achieve 1231734
so the socket/kernel/io stuff is worked out. i am still unable to achieve this using any broker. again just after my client/server test this is the kernel settings. client:
spark history server产生背景
以standalone运行模式为例,在运行spark application的时候,spark会提供一个webui列出应用程序的运行时信息;但该webui随着application的完成(成功/失 败)而关闭,也就是说,spark application运行完(成功/失败)后,将无法查看application的历史记录;
spark history server就是为了应对这种情况而产生的,通过配置可以在application执行的过程中记录下了日志事件信息,那么在application执行 结束后,webui就能重新渲染生成ui界面展现出该application在执行过程中的运行时信息;
spark运行在yarn或者mesos之上,通过spark的history server仍然可以重构出一个已经完成的application的运行时参数信息(假如application运行的事件日志信息已经记录下来);
配置&使用spark history server
以默认配置的方式启动spark history server:
cd $spark_home/sbin start-history-server.sh
starting org.apache.spark.deploy.history.historyserver, logging to /home/spark/software/source/compile/deploy_spark/sbin/../logs/spark-spark-org.apache.spark.deploy.history.historyserver-1-hadoop000.out failed to launch org.apache.spark.deploy.history.historyserver: at org.apache.spark.deploy.history.fshistoryprovider.(fshistoryprovider.scala:44) ... 6 more
start-history-server.sh hdfs://hadoop000:8020/directory
history server相关的配置参数描述
1) spark.history.updateinterval
用于historyserver的kerberos keytab文件位置
spark.eventlog.enabled true spark.eventlog.dir hdfs://hadoop000:8020/directory spark.eventlog.compress true
export spark_history_opts="-dspark.history.ui.port=7777 -dspark.history.retainedapplications=3 -dspark.history.fs.logdirectory=hdfs://had oop000:8020/directory"
spark.history.ui.port=7777 调整webui访问的端口号为7777
spark.history.fs.logdirectory=hdfs://hadoop000:8020/directory 配置了该属性后,在start-history-server.sh时就无需再显示的指定路径
spark.history.retainedapplications=3 指定保存application历史记录的个数,如果超过这个值,旧的应用程序信息将被删除
访问webui: http://hadoop000:7777
在使用spark history server的过程中产生的几个疑问:
spark.history.fs.logdirectory:spark history server页面只展示该指定路径下的信息;
疑问2:spark.history.retainedapplications=3 貌似没生效??????
the history server will list all applications. it will just retain a max number of them in memory. that option does not control how many applications are show, it controls how much memory the hs will need.
spark-sql --master yarn
14/12/29 15:23:17 info client: requesting a new application from cluster with 1 nodemanagers 14/12/29 15:23:17 info client: verifying our application has not requested more than the maximum memory capability of the cluster (8192 mb per container) 14/12/29 15:23:17 info client: will allocate am container, with 896 mb memory including 384 mb overhead 14/12/29 15:23:17 info client: setting up container launch context for our am 14/12/29 15:23:17 info client: preparing resources for our am container 14/12/29 15:23:17 info client: uploading resource file:/home/spark/software/source/compile/deploy_spark/assembly/target/scala-2.10/spark-assembly-1.3.0-snapshot-hadoop2.3.0-cdh5.0.0.jar -> hdfs://hadoop000:8020/user/spark/.sparkstaging/application_1416381870014_0093/spark-assembly-1.3.0-snapshot-hadoop2.3.0-cdh5.0.0.jar 14/12/29 15:23:18 info client: setting up the launch environment for our am container
14/12/29 15:24:03 info client: requesting a new application from cluster with 1 nodemanagers 14/12/29 15:24:03 info client: verifying our application has not requested more than the maximum memory capability of the cluster (8192 mb per container) 14/12/29 15:24:03 info client: will allocate am container, with 896 mb memory including 384 mb overhead 14/12/29 15:24:03 info client: setting up container launch context for our am 14/12/29 15:24:03 info client: preparing resources for our am container 14/12/29 15:24:03 info client: uploading resource file:/home/spark/software/source/compile/deploy_spark/assembly/target/scala-2.10/spark-assembly-1.3.0-snapshot-hadoop2.3.0-cdh5.0.0.jar -> hdfs://hadoop000:8020/user/spark/.sparkstaging/application_1416381870014_0094/spark-assembly-1.3.0-snapshot-hadoop2.3.0-cdh5.0.0.jar 14/12/29 15:24:05 info client: setting up the launch environment for our am container
hadoop fs -ls hdfs://hadoop000:8020/user/spark/.sparkstaging/
drwx------ - spark supergroup 0 2014-12-29 15:23 hdfs://hadoop000:8020/user/spark/.sparkstaging/application_1416381870014_0093 drwx------ - spark supergroup 0 2014-12-29 15:24 hdfs://hadoop000:8020/user/spark/.sparkstaging/application_1416381870014_0094
spark.yarn.jar hdfs://hadoop000:8020/spark_lib/spark-assembly-1.3.0-snapshot-hadoop2.3.0-cdh5.0.0.jar
再次启动spark-sql --master yarn观察日志:
14/12/29 15:39:02 info client: requesting a new application from cluster with 1 nodemanagers 14/12/29 15:39:02 info client: verifying our application has not requested more than the maximum memory capability of the cluster (8192 mb per container) 14/12/29 15:39:02 info client: will allocate am container, with 896 mb memory including 384 mb overhead 14/12/29 15:39:02 info client: setting up container launch context for our am 14/12/29 15:39:02 info client: preparing resources for our am container 14/12/29 15:39:02 info client: source and destination file systems are the same. not copying hdfs://hadoop000:8020/spark_lib/spark-assembly-1.3.0-snapshot-hadoop2.3.0-cdh5.0.0.jar 14/12/29 15:39:02 info client: setting up the launch environment for our am container
hadoop fs -ls hdfs://hadoop000:8020/user/spark/.sparkstaging/application_1416381870014_0097
application application_xxxxxxxxx_yyyy failed 2 times due to am container for application_xxxxxxxxx_yyyy
exited with exitcode: -1000 due to: java.io.filenotfoundexception: file /tmp/hadoop-spark/nm-local-dir/filecache does not exist
wrote a about how linkedin uses as a central publish-subscribe log for integrating data between applications, stream processing, and hadoop data ingestion.
to actually make this work, though, this "universal log" has to be a cheap abstraction. if you want to use a system as a central data hub it has to be fast, predictable, and easy to scale so you can dump all your data onto it. my experience has been that systems that are fragile or expensive inevitably develop a wall of protective process to prevent people from using them; a system that scales easily often ends up as a key architectural building block just because using it is the easiest way to get things built.
i've always liked the benchmarks of cassandra that show it doing a million writes per second on three hundred machines on and . i'm not sure why, maybe it is a thing, but doing of anything per second is fun.
in any case, one of the nice things about a kafka log is that, as we'll see, it is cheap. a million writes per second isn't a particularly big thing. this is because a log is a much simpler thing than a database or key-value store. indeed our production clusters take tens of millions of reads and writes per second all day long and they do so on pretty modest hardware.
but let's do some benchmarking and take a look.
to help understand the benchmark, let me give a quick review of what kafka is and a few details about how it works. kafka is a distributed messaging system originally built at linkedin and now part of the and by a .
the general setup is quite simple. producers send records to the cluster which holds on to these records and hands them out to consumers:
the key abstraction in kafka is the topic. producers publish their records to a topic, and consumers subscribe to one or more topics. a kafka topic is just a sharded write-ahead log. producers append records to these logs and consumers subscribe to changes. each record is a key/value pair. the key is used for assigning the record to a log partition (unless the publisher specifies the partition directly).
here is a simple example of a single producer and consumer reading and writing from a two-partition topic.
this picture shows a producer process appending to the logs for the two partitions, and a consumer reading from the same logs. each record in the log has an associated entry number that we call the offset. this offset is used by the consumer to describe it's position in each of the logs.
these partitions are spread across a cluster of machines, allowing a topic to hold more data than can fit on any one machine.
note that unlike most messaging systems the log is always persistent. messages are immediately written to the filesystem when they are received. messages are not deleted when they are read but retained with some configurable sla (say a few days or a week). this allows usage in situations where the consumer of data may need to reload data. it also makes it possible to support space-efficient publish-subscribe as there is a single shared log no matter how many consumers; in traditional messaging systems there is usually a queue per consumer, so adding a consumer doubles your data size. this makes kafka a good fit for things outside the bounds of normal messaging systems such as acting as a pipeline for offline data systems such as hadoop. these offline systems may load only at intervals as part of a periodic etl cycle, or may go down for several hours for maintenance, during which time kafka is able to buffer even tbs of unconsumed data if needed.
kafka also replicates its logs over multiple servers for fault-tolerance. one important architectural aspect of our , in contrast to other messaging systems, is that replication is not an exotic bolt-on that requires complex configuration, only to be used in very specialized cases. instead replication is assumed to be the default: we treat un-replicated data as a special case where the replication factor happens to be one.
producers get an acknowledgement back when they publish a message containing the record's offset. the first record published to a partition is given the offset 0, the second record 1, and so on in an ever-increasing sequence. consumers consume data from a position specified by an offset, and they save their position in a log by committing periodically: saving this offset in case that consumer instance crashes and another instance needs to resume from it's position.
okay, hopefully that all made sense (if not, you can read a more complete introduction to kafka ).
this test is against trunk, as i made some improvements to the performance tests for this benchmark. but nothing too substantial has changed since the last full release, so you should see similar results with . i am also using our newly re-written , which offers much improved throughput over the previous producer client.
i've followed the basic template of this very nice , but i covered scenarios and options that were more relevant to kafka.
one quick philosophical note on this benchmark. for benchmarks that are going to be publicly reported, i like to follow a style i call "lazy benchmarking". when you work on a system, you generally have the know-how to tune it to perfection for any particular use case. this leads to a kind of benchmarketing where you heavily tune your configuration to your benchmark or worse have a different tuning for each scenario you test. i think the real test of a system is not how it performs when perfectly tuned, but rather how it performs "off the shelf". this is particularly true for systems that run in a multi-tenant setup with dozens or hundreds of use cases where tuning for each use case would be not only impractical but impossible. as a result, i have pretty much stuck with default settings, both for the server and the clients. i will point out areas where i suspect the result could be improved with a little tuning, but i have tried to resist the temptation to do any fiddling myself to improve the results.
i have posted , so it should be possible to replicate results on your own gear if you are interested.
for these tests, i had six machines each has the following specs
the kafka cluster is set up on three of the machines. the six drives are directly mounted with no raid (jbod style). the remaining three machines i use for zookeeper and for generating load.
a three machine cluster isn't very big, but since we will only be testing up to a replication factor of three, it is all we need. as should be obvious, we can always add more partitions and spread data onto more machines to scale our cluster horizontally.
this hardware is actually not linkedin's normal kafka hardware. our kafka machines are more closely tuned to running kafka, but are less in the spirit of "off-the-shelf" i was aiming for with these tests. instead, i borrowed these from one of our hadoop clusters, which runs on probably the cheapest gear of any of our persistent systems. hadoop usage patterns are pretty similar to kafka's, so this is a reasonable thing to do.
okay, without further ado, the results!
these tests will stress the throughput of the producer. no consumers are run during these tests, so all messages are persisted but not read (we'll test cases with both producer and consumer in a bit). since we have recently rewritten our producer, i am testing this new code.
821,557 records/sec |
(78.3 mb/sec) |
for this first test i create a topic with six partitions and no replication. then i produce 50 million small (100 byte) records as quickly as possible from a single thread.
the reason for focusing on small records in these tests is that it is the harder case for a messaging system (generally). it is easy to get good throughput in mb/sec if the messages are large, but much harder to get good throughput when the messages are small, as the overhead of processing each message dominates.
throughout this benchmark, when i am reporting mb/sec, i am reporting just the value size of the record times the request per second, none of the other overhead of the request is included. so the actually network usage is higher than what is reported. for example with a 100 byte message we would also transmit about 22 bytes of overhead per message (for an optional key, size delimiting, a message crc, the record offset, and attributes flag), as well as some overhead for the request (including the topic, partition, required acknowledgements, etc). this makes it a little harder to see where we hit the limits of the nic, but this seems a little more reasonable then including our own overhead bytes in throughput numbers. so, in the above result, we are likely saturating the 1 gigabit nic on the client machine.
one immediate observation is that the raw numbers here are much higher than people expect, especially for a persistent storage system. if you are used to random-access data systems, like a database or key-value store, you will generally expect maximum throughput around 5,000 to 50,000 queries-per-second, as this is close to the speed that a good rpc layer can do remote requests. we exceed this due to two key design principles:
if you are interested in the details you can read a little more about this in our .
786,980 records/sec |
(75.1 mb/sec) |
this test is exactly the same as the previous one except that now each partition has three replicas (so the total data written to network or disk is three times higher). each server is doing both writes from the producer for the partitions for which it is a master, as well as fetching and writing data for the partitions for which it is a follower.
replication in this test is asynchronous. that is, the server acknowledges the write as soon as it has written it to its local log without waiting for the other replicas to also acknowledge it. this means, if the master were to crash, it would likely lose the last few messages that had been written but not yet replicated. this makes the message acknowledgement latency a little better at the cost of some risk in the case of server failure.
the key take away i would like people to have from this is that replication can be fast. the total cluster write capacity is, of course, 3x less with 3x replication (since each write is done three times), but the throughput is still quite good per client. high performance replication comes in large part from the efficiency of our consumer (the replicas are really nothing more than a specialized consumer) which i will discuss in the consumer section.
421,823 records/sec |
(40.2 mb/sec) |
this test is the same as above except that now the master for a partition waits for acknowledgement from the full set of in-sync replicas before acknowledging back to the producer. in this mode, we guarantee that messages will not be lost as long as one in-sync replica remains.
synchronous replication in kafka is not fundamentally very different from asynchronous replication. the leader for a partition always tracks the progress of the follower replicas to monitor their liveness, and we never give out messages to consumers until they are fully acknowledged by replicas. with synchronous replication we just wait to respond to the producer request until the followers have replicated it.
this additional latency does seem to affect our throughput. since the code path on the server is very similar, we could probably ameliorate this impact by tuning the batching to be a bit more aggressive and allowing the client to buffer more outstanding requests. however, in spirit of avoiding special case tuning, i have avoided this.2,024,032 records/sec |
(193.0 mb/sec) |
our single producer process is clearly not stressing our three node cluster. to add a little more load, i'll now repeat the previous async replication test, but now use three producer load generators running on three different machines (running more processes on the same machine won't help as we are saturating the nic). then we can look at the aggregate throughput across these three producers to get a better feel for the cluster's aggregate capacity.
one of the hidden dangers of many messaging systems is that they work well only as long as the data they retain fits in memory. their throughput falls by an order of magnitude (or more) when data backs up and isn't consumed (and hence needs to be stored on disk). this means things may be running fine as long as your consumers keep up and the queue is empty, but as soon as they lag, the whole messaging layer backs up with unconsumed data. the backup causes data to go to disk which in turns causes performance to drop to a rate that means messaging system can no longer keep up with incoming data and either backs up or falls over. this is pretty terrible, as in many cases the whole purpose of the queue was to handle such a case gracefully.
since kafka always persists messages the performance is o(1) with respect to unconsumed data volume.
to test this experimentally, let's run our throughput test over an extended period of time and graph the results as the stored dataset grows:
this graph actually does show some variance in performance, but no impact due to data size: we perform just as well after writing a tb of data, as we do for the first few hundred mbs.
the variance seems to be due to linux's i/o management facilities that batch data and then flush it periodically. this is something we have tuned for a little better on our production kafka setup. some notes on tuning i/o are available .
okay now let's turn our attention to consumer throughput.
note that the replication factor will not effect the outcome of this test as the consumer only reads from one replica regardless of the replication factor. likewise, the acknowledgement level of the producer also doesn't matter as the consumer only ever reads fully acknowledged messages, (even if the producer doesn't wait for full acknowledgement). this is to ensure that any message the consumer sees will always be present after a leadership handoff (if the current leader fails).
940,521 records/sec |
(89.7 mb/sec) |
for the first test, we will consume 50 million messages in a single thread from our 6 partition 3x replicated topic.
kafka's consumer is very efficient. it works by fetching chunks of log directly from the filesystem. it uses the to transfer this directly through the operating system without the overhead of copying this data through the application. this test actually starts at the beginning of the log, so it is doing real read i/o. in a production setting, though, the consumer reads almost exclusively out of the os pagecache, since it is reading data that was just written by some producer (so it is still cached). in fact, if you run i/o stat on a production server you actually see that there are no physical reads at all even though a great deal of data is being consumed.
making consumers cheap is important for what we want kafka to do. for one thing, the replicas are themselves consumers, so making the consumer cheap makes replication cheap. in addition, this makes handling out data an inexpensive operation, and hence not something we need to tightly control for scalability reasons.
2,615,968 records/sec |
(249.5 mb/sec) |
let's repeat the same test, but run three parallel consumer processes, each on a different machine, and all consuming the same topic.
as expected, we see near linear scaling (not surprising because consumption in our model is so simple).
795,064 records/sec |
(75.8 mb/sec) |
the above tests covered just the producer and the consumer running in isolation. now let's do the natural thing and run them together. actually, we have technically already been doing this, since our replication works by having the servers themselves act as consumers.
all the same, let's run the test. for this test we'll run one producer and one consumer on a six partition 3x replicated topic that begins empty. the producer is again using async replication. the throughput reported is the consumer throughput (which is, obviously, an upper bound on the producer throughput).
as we would expect, the results we get are basically the same as we saw in the producer only case—the consumer is fairly cheap.
i have mostly shown performance on small 100 byte messages. smaller messages are the harder problem for a messaging system as they magnify the overhead of the bookkeeping the system does. we can show this by just graphing throughput in both records/second and mb/second as we vary the record size.
so, as we would expect, this graph shows that the raw count of records we can send per second decreases as the records get bigger. but if we look at mb/second, we see that the total byte throughput of real user data increases as messages get bigger:we can see that with the 10 byte messages we are actually cpu bound by just acquiring the lock and enqueuing the message for sending—we are not able to actually max out the network. however, starting with 100 bytes, we are actually seeing network saturation (though the mb/sec continues to increase as our fixed-size bookkeeping bytes become an increasingly small percentage of the total bytes sent).
2 ms (median) |
3 ms (99th percentile) |
14 ms (99.9th percentile) |
we have talked a lot about throughput, but what is the latency of message delivery? that is, how long does it take a message we send to be delivered to the consumer? for this test, we will create producer and consumer and repeatedly time how long it takes for a producer to send a message to the kafka cluster and then be received by our consumer.
note that, kafka only gives out messages to consumers when they are acknowledged by the full in-sync set of replicas. so this test will give the same results regardless of whether we use sync or async replication, as that setting only affects the acknowledgement to the producer.
if you want to try out these benchmarks on your own machines, you can. as i said, i mostly just used our pre-packaged performance testing tools that ship with kafka and mostly stuck with the default configs both for the server and for the clients. however, you can see more details of the configuration and commands .
1. topic:用于划分message的逻辑概念,一个topic可以分布在多个broker上。
2. partition:是kafka中横向扩展和一切并行化的基础,每个topic都至少被切分为1个partition。
3. offset:消息在partition中的编号,编号顺序不跨partition。
4. consumer:用于从broker中取出/消费message。
5. producer:用于往broker中发送/生产message。
6. replication:kafka支持以partition为单位对message进行冗余备份,每个partition都可以配置至少1个replication(当仅1个replication时即仅该partition本身)。
7. leader:每个replication集合中的partition都会选出一个唯一的leader,所有的读写请求都由leader处理。其他replicas从leader处把数据更新同步到本地,过程类似大家熟悉的mysql中的binlog同步。
8. broker:kafka中使用broker来接受producer和consumer的请求,并把message持久化到本地磁盘。每个cluster当中会选举出一个broker来担任controller,负责处理partition的leader选举,协调partition迁移等工作。
9. isr(in-sync replica):是replicas的一个子集,表示目前alive且与leader能够“catch-up”的replicas集合。由于读写都是首先落到leader上,所以一般来说通过同步机制从leader上拉取数据的replica都会和leader有一些延迟(包括了延迟时间和延迟条数两个维度),任意一个超过阈值都会把该replica踢出isr。每个partition都有它自己独立的isr。
首先,说“规规矩矩”是因为kafka在磁盘上只做sequence i/o,由于消息系统读写的特殊性,这并不存在什么问题。关于磁盘i/o的性能,引用一组kafka官方给出的测试数据(raid-5,7200rpm):
sequence i/o: 600mb/s
random i/o: 100kb/s
所以通过只做sequence i/o的限制,规避了磁盘访问速度低下对性能可能造成的影响。
· 如果在heap内管理缓存,jvm的gc线程会频繁扫描heap空间,带来不必要的开销。如果heap过大,执行一次full gc对系统的可用性来说将是极大的挑战。
· 所有在在jvm内的对象都不免带有一个object overhead(千万不可小视),内存的有效空间利用率会因此降低。
· 所有的in-process cache在os中都有一份同样的pagecache。所以通过将缓存只放在pagecache,可以至少让可用缓存空间翻倍。
· 如果kafka重启,所有的in-process cache都会失效,而os管理的pagecache依然可以继续使用。
1. os 从硬盘把数据读到内核区的pagecache。
2. 用户进程把数据从内核区copy到用户区。
3. 然后用户进程再把数据写入到socket,数据流入内核区的socket buffer上。
4. os 再把数据从buffer中copy到网卡的buffer上,这样完成一次发送。
整个过程共经历两次context switch,四次system call。同一份数据在内核buffer与用户buffer之间重复拷贝,效率低下。其中2、3两步没有必要,完全可以直接在内核区完成数据拷贝。这也正是sendfile所解决的问题,经过sendfile优化后,整个i/o过程就变成了下面这个样子。
(20 brokers, 75 partitions per broker, 110k msg/s)
此时的集群只有写,没有读操作。10m/s左右的send的流量是partition之间进行replicate而产生的。从recv和writ的速率比较可以看出,写盘是使用asynchronous batch的方式,底层os可能还会进行磁盘写顺序优化。而在有read request进来的时候分为两种情况,第一种是内存中完成数据交换。
其他指标还是老样子,而磁盘read已经飚高到40 mb/s。此时全部的数据都已经是走硬盘了(对硬盘的顺序读取os层会进行prefill pagecache的优化)。依然没有任何性能问题。
1. kafka官方并不建议通过broker端的log.flush.interval.messages和log.flush.interval.ms来强制写盘,认为数据的可靠性应该通过replica来保证,而强制flush数据到磁盘会对整体性能产生影响。
2. 可以通过调整/proc/sys/vm/dirty_background_ratio和/proc/sys/vm/dirty_ratio来调优性能。
a. 脏页率超过第一个指标会启动pdflush开始flush dirty pagecache。
b. 脏页率超过第二个指标会阻塞所有的写操作来进行flush。
c. 根据不同的业务需求可以适当的降低dirty_background_ratio和提高dirty_ratio。
扩展性方面。首先,kafka允许partition在集群内的broker之间任意移动,以此来均衡可能存在的数据倾斜问题。其次,partition支持自定义的分区算法,例如可以将同一个key的所有消息都路由到同一个partition上去。 同时leader也可以在in-sync的replica中迁移。由于针对某一个partition的所有读写请求都是只由leader来处理,所以kafka会尽量把leader均匀的分散到集群的各个节点上,以免造成网络流量过于集中。
并发方面。任意partition在某一个时刻只能被一个consumer group内的一个consumer消费(反过来一个consumer则可以同时消费多个partition),kafka非常简洁的offset机制最小化了broker和consumer之间的交互,这使kafka并不会像同类其他消息队列一样,随着下游consumer数目的增加而成比例的降低性能。此外,如果多个consumer恰巧都是消费时间序上很相近的数据,可以达到很高的pagecache命中率,因而kafka可以非常高效的支持高并发读操作,实践中基本可以达到单机网卡上限。
不过,partition的数量并不是越多越好,partition的数量越多,平均到每一个broker上的数量也就越多。考虑到broker宕机(network failure, full gc)的情况下,需要由controller来为所有宕机的broker上的所有partition重新选举leader,假设每个partition的选举消耗10ms,如果broker上有500个partition,那么在进行选举的5s的时间里,对上述partition的读写操作都会触发leadernotavailableexception。
再进一步,如果挂掉的broker是整个集群的controller,那么首先要进行的是重新任命一个broker作为controller。新任命的controller要从zookeeper上获取所有partition的meta信息,获取每个信息大概3-5ms,那么如果有10000个partition这个时间就会达到30s-50s。而且不要忘记这只是重新启动一个controller花费的时间,在这基础上还要再加上前面说的选举leader的时间 -_-!!!!!!
1. partition的数量尽量提前预分配,虽然可以在后期动态增加partition,但是会冒着可能破坏message key和partition之间对应关系的风险。
2. replica的数量不要过多,如果条件允许尽量把replica集合内的partition分别调整到不同的rack。
3. 尽一切努力保证每次停broker时都可以clean shutdown,否则问题就不仅仅是恢复服务所需时间长,还可能出现数据损坏或其他很诡异的问题。
当然用户也可以选择自己在应用层上做压缩和解压的工作(毕竟kafka目前支持的压缩算法有限,只有gzip和snappy),不过这样做反而会意外的降低效率!!!! kafka的end-to-end压缩与messageset配合在一起工作效果最佳,上面的做法直接割裂了两者间联系。至于道理其实很简单,压缩算法中一条基本的原理“重复的数据量越多,压缩比越高”。无关于消息体的内容,无关于消息体的数量,大多数情况下输入数据量大一些会取得更好的压缩比。
为了解决这个问题,kafka在0.8版本的设计借鉴了网络当中的ack机制。如果对性能要求较高,又能在一定程度上允许message的丢失,那就可以设置request.required.acks=0 来关闭ack,以全速发送。如果需要对发送的消息进行确认,就需要设置request.required.acks为1或-1,那么1和-1又有什么区别呢?这里又要提到前面聊的有关replica数量问题。如果配置为1,表示消息只需要被leader接收并确认即可,其他的replica可以进行异步拉取无需立即进行确认,在保证可靠性的同时又不会把效率拉得很低。如果设置为-1,表示消息要commit到该partition的isr集合中的所有replica后,才可以返回ack,消息的发送会更安全,而整个过程的延迟会随着replica的数量正比增长,这里就需要根据不同的需求做相应的优化。
1. producer的线程不要配置过多,尤其是在mirror或者migration中使用的时候,会加剧目标集群partition消息乱序的情况(如果你的应用场景对消息顺序很敏感的话)。
2. 0.8版本的request.required.acks默认是0(同0.7)。
· 通过consumer group,可以支持生产者消费者和队列访问两种模式。
· consumer api分为high level和low level两种。前一种重度依赖zookeeper,所以性能差一些且不自由,但是超省心。第二种不依赖zookeeper服务,无论从自由度和性能上都有更好的表现,但是所有的异常(leader迁移、offset越界、broker宕机等)和offset的维护都需要自行处理。
· 大家可以关注下不日发布的0.9 release。开发人员又用java重写了一套consumer。把两套api合并在一起,同时去掉了对zookeeper的依赖。据说性能有大幅度提升哦~~
强烈推荐使用low level api,虽然繁琐一些,但是目前只有这个api可以对error数据进行自定义处理,尤其是处理broker异常或由于unclean shutdown导致的corrupted data时,否则无法skip只能等着“坏消息”在broker上被rotate掉,在此期间该replica将会一直处于不可用状态。
sendfile: https://www.ibm.com/developerworks/cn/java/j-zerocopy/
so what’s wrong with 1975 programming: https://www.varnish-cache.org/trac/wiki/architectnotes
benchmarking: https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines
redis有一系列的命令,特点是以nx结尾,nx是not exists的缩写,如setnx命令就应该理解为:set if not exists。这系列的命令非常有用,这里讲使用setnx来实现分布式锁。
setnx lock.foo
是的,要想这段逻辑可以重用,使用python的你马上就想到了decorator,而用java的你是不是也想到了那谁?aop annotation?行,怎样舒服怎样用吧,别重复代码就行。
在 很多互联网产品应用中,有些场景需要加锁处理,比如:秒杀,全局递增id,楼层生成等等。大部分的凯发天生赢家一触即发官网的解决方案是基于db实现的,redis为单进程单线程模 式,采用队列模式将并发访问变成串行访问,且多客户端对redis的连接并不存在竞争关系。其次redis提供一些命令setnx,getset,可以方 便实现分布式锁机制。
setnx命令(set if not exists)
setnx key value
当且仅当 key 不存在,将 key 的值设为 value ,并返回1;若给定的 key 已经存在,则 setnx 不做任何动作,并返回0。
getset key value
将给定 key 的值设为 value ,并返回 key 的旧值 (old value),当 key 存在但不是字符串类型时,返回一个错误,当key不存在时,返回nil。
get key
返回 key 所关联的字符串值,如果 key 不存在那么返回特殊值 nil 。
del key [key …]
删除给定的一个或多个 key ,不存在的 key 会被忽略。
setnx 可以直接加锁操作,比如说对某个关键词foo加锁,客户端可以尝试
setnx foo.lock
del foo.lock
在 上面的处理方式中,如果获取锁的客户端端执行时间过长,进程被kill掉,或者因为其他异常崩溃,导致无法释放锁,就会造成死锁。所以,需要对加锁要做时 效性检测。因此,我们在加锁时,把当前时间戳作为value存入此锁中,通过当前时间戳和redis中的时间戳进行对比,如果超过一定差值,认为锁已经时 效,防止锁无限期的锁下去,但是,在大并发情况,如果同时检测锁失效,并简单粗暴的删除死锁,再通过setnx上锁,可能会导致竞争条件的产生,即多个客 户端同时获取锁。
c2 向foo.lock发送del命令。
c2 向foo.lock发送setnx获取锁。
c3 向foo.lock发送del命令,此时c3发送del时,其实del掉的是c2的锁。
c3 向foo.lock发送setnx获取锁。
c4 向foo.lock发送geset命令,
getset foo.lock
为 了让这个锁更加强壮,获取锁的客户端,应该在调用关键业务时,再次调用get方法获取t1,和写入的t0时间戳进行对比,以免锁因其他情况被执行del意 外解开而不知。以上步骤和情况,很容易从其他参考资料中看到。客户端处理和失败的情况非常复杂,不仅仅是崩溃这么简单,还可能是客户端因为某些操作被阻塞 了相当长时间,紧接着 del 命令被尝试执行(但这时锁却在另外的客户端手上)。也可能因为处理不当,导致死锁。还有可能因为sleep设置不合理,导致redis在大并发下被压垮。 最为常见的问题还有
c1客户端获取锁,并且处理完后,del掉锁,在del锁之前。c2通过setnx向foo.lock设置时间戳t0 发现有客户端获取锁,进入get操作。
c2 向foo.lock发送get命令,获取返回值t1(nil)。
c2 通过t0>t1 expire对比,进入getset流程。
c2 调用getset向foo.lock发送t0时间戳,返回foo.lock的原值t2
c2 如果t2=t1相等,获得锁,如果t2!=t1,未获得锁。
c1客户端获取锁,并且处理完后,del掉锁,在del锁之前。c2通过setnx向foo.lock设置时间戳t0 发现有客户端获取锁,进入get操作。
c2 向foo.lock发送get命令,获取返回值t1(nil)。
c2 循环,进入下一次setnx逻辑
两 种逻辑貌似都是ok,但是从逻辑处理上来说,第一种情况存在问题。当get返回nil表示,锁是被删除的,而不是超时,应该走setnx逻辑加锁。走第一 种情况的问题是,正常的加锁逻辑应该走setnx,而现在当锁被解除后,走的是getst,如果判断条件不当,就会引起死锁,很悲催,我在做的时候就碰到 了,具体怎么碰到的看下面的问题
c1 向foo.lock发送getset命令,获取返回值t11(nil)。
c1 比对c1和c11发现两者不同,处理逻辑认为未获取锁。
c2 向foo.lock发送getset命令,获取返回值t22(c1写入的时间戳)。
c2 比对c2和c22发现两者不同,处理逻辑认为未获取锁。
此 时c1和c2都认为未获取锁,其实c1是已经获取锁了,但是他的处理逻辑没有考虑getset返回nil的情况,只是单纯的用get和getset值就行 对比,至于为什么会出现这种情况?一种是多客户端时,每个客户端连接redis的后,发出的命令并不是连续的,导致从单客户端看到的好像连续的命令,到 redis server后,这两条命令之间可能已经插入大量的其他客户端发出的命令,比如del,setnx等。第二种情况,多客户端之间时间不同步,或者不是严格 意义的同步。
ab -n1000 -c100 'http://sandbox6.wanke.etao.com/test/test_sequence.php?tbpm=t'
ab 并发100累计1000次请求,对这个方法进行压测时。
上 图中细高部分是为未采用sleep机制的时的压测图,矮胖部分为采用sleep机制的压测图,通上图看到压力减少50%左右,当然,sleep这种方式还 有个缺点qps下降明显,在我们的压测条件下,仅为35,并且有部分请求出现超时情况。不过综合各种情况后,我们还是决定采用sleep机制,主要是为了 防止在大并发情况下把redis压垮,很不行,我们之前碰到过,所以肯定会采用sleep机制。
redis是一个很强大的数据结构存储的nosql数据库,很方便针对业务模型进行效率的优化。最近我的工作是负责对现有java服务器框架进行整理,并将网络层与逻辑层脱离,以便于逻辑层和网络层的横向扩展。 尽管我在逻辑层上使用了akka作为核心框架,尽可能lockfree,但是还是免不了需要跨jvm的锁。所以我需要实现一个分布式锁。
官方在 这一页给了一个实现。
local now = tonumber(argv[1])
local timeout = tonumber(argv[2])
local to = now timeout
local locked = redis.call('setnx', keys[1], to)
if (locked == 1) then
return 0
local kt = redis.call('type', keys[1]);
if (kt['ok'] ~= 'string') then
return 2
local keyvalue = tonumber(redis.call('get', keys[1]))
if (now > keyvalue) then
redis.call('set', keys[1], to)
return 0
return 1
local begin = tonumber(argv[1])
local timeout = tonumber(argv[2])
local kt = redis.call('type', keys[1]);
if (kt['ok'] == 'string') then
local keyvalue = tonumber(redis.call('get', keys[1]))
if ((keyvalue - begin) == timeout) then
redis.call('del', keys[1])
return 0
return 1
通 过本文,你可以对模块化开发和amd规范有一个较直观的认识,并详细地学习requirejs这个模块化开发工具的常见用法。本文采取循序渐进的方式,从 理论到实践,从requirejs官方api文档中,总结出在使用requirejs过程中最常用的一些用法,并对文档中不够清晰具体的内容,加以例证和 分析,希望本文的内容对你的能力提升有实质性的帮助。
所以当这些问题出现的时候,js大牛们就开始寻找去解决这些问题的究极办法,于是模块化开发就出现了。正如模块化这个概念的表面意思一样,它要求在 编写代码的时候,按层次,按功能,将独立的逻辑,封装成可重用的模块,对外提供直接明了的调用接口,内部实现细节完全私有,并且模块之间的内部实现在执行 期间互不干扰,最终的结果就是可以解决前面举例提到的问题。一个简单遵循模块化开发要求编写的例子:
//module.js var student = function (name) { return name && { getname: function () { return name; } }; }, course = function (name) { return name && { getname: function () { return name; } } }, controller = function () { var data = {}; return { add: function (stu, cour) { var stuname = stu && stu.getname(), courname = cour && cour.getname(), current, _filter = function (e) { return e === courname; }; if (!stuname || !courname) return; current = data[stuname] = data[stuname] || []; if (current.filter(_filter).length === 0) { current.push(courname); } }, list: function (stu) { var stuname = stu && stu.getname(), current = data[stuname]; current && console.log(current.join(';')); } } }; //main.js var stu = new student('lyzg'), c = new controller(); c.add(stu,new course('javascript')); c.add(stu,new course('html')); c.add(stu,new course('css')); c.list(stu);
通过上例,可以看出模块化的代码结构和逻辑十分清晰,代码看起来十分优雅,另外由于逻辑都通过模块拆分,所以达到了解耦的目的,代码的功能也会比较 健壮。不过上例使用的这种模块化开发方式也并不是没有问题,这个问题就是它还是把模块引用如student这些直接添加到了全局空间下,虽然通过模块减少 了很多全局空间的变量和函数,但是模块引用本身还是要依赖全局空间,才能被调用,当模块较多,或者有引入第三方模块库时,仍然可能造成命名冲突的问题,所 以这种全局空间下的模块化开发的方式并不是最完美的方式。目前常见的模块化开发方式,全局空间方式是最基本的一种,另外常见的还有遵循amd规范的开发方 式,遵循cmd规范的开发方式,和ecmascript 6的开发方式。需要说明的是,cmd和es6跟本文的核心没有关系,所以不会在此介绍,后面的内容主要介绍amd以及实现了amd规范的 requirejs。
正如上文提到,实现模块化开发的方式,另外常见的一种就是遵循amd规范的实现方式,不过amd规范并不是具体的实现方式,而仅仅是模块化开发的一 种凯发天生赢家一触即发官网的解决方案,你可以把它理解成模块化开发的一些接口声明,如果你要实现一个遵循该规范的模块化开发工具,就必须实现它预先定义的api。比如它要求在加载 模块时,必须使用如下的api调用方式:
require([module], callback) 其中: [module]:是一个数组,里面的成员就是要加载的模块; callback:是模块加载完成之后的回调函数
所有遵循amd规范的模块化工具,都必须按照它的要求去实现,比如requirejs这个库,就是完全遵循amd规范实现的,所以在利用 requirejs加载或者调用模块时,如果你事先知道amd规范的话,你就知道该怎么用requirejs了。规范的好处在于,不同的实现却有相同的调 用方式,很容易切换不同的工具使用,至于具体用哪一个实现,这就跟各个工具的各自的优点跟项目的特点有关系,这些都是在项目开始选型的时候需要确定的。目 前requirejs不是唯一实现了amd规范的库,像dojo这种更全面的js库也都有amd的实现。
(function() { var s = document.createelement('script'); s.type = 'text/javascript'; s.async = true; s.src = 'http://yourdomain.com/script.js'; var x = document.getelementsbytagname('script')[0]; x.parentnode.insertbefore(s, x); })();
第2种方式是借助script的属性:defer和async,defer这个属性在ie浏览器和早起的火狐浏览器中支持,async在支持 html5的浏览器上都支持,只要有这两个属性,script就会以异步的方式来加载,所以script在html中的位置就不重要了:
这种方式下,所有异步js在执行的时候还是按顺序执行的,不然就会存在依赖问题,比如如果上例中的main.js依赖foo.js和bar.js, 但是main.js先执行的话就会出错了。虽然从来理论上这种方式也算不错了,但是不够好,因为它用起来很繁琐,而且还有个问题就是页面需要添加多个 script标记以及没有办法完全做到按需加载。
js的按需加载分两个层次,第一个层次是只加载这个页面可能被用到的js,第二个层次是在只在用到某个js的时 候才去加载。传统地方式很容易做到第一个层次,但是不容易做到第二个层次,虽然我们可以通过合并和压缩工具,将某个页面所有的js都添加到一个文件中去, 最大程度减少资源请求量,但是这个js请求到客户端以后,其中有很多内容可能都用不上,要是有个工具能够做到在需要的时候才去加载相关js就完美解决问题 了,比如requirejs。
4.02 如何利用requirejs加载并执行当前网页的逻辑js
4.01解决的仅仅是requirejs的使用问题,但它仅仅是一个js库,是一个被当前页面的逻辑所利用的工具,真正实现网页功能逻辑的是我们要 利用requirejs编写的主js,这个主js(假设这些代码都放置在main.js文件中)又该如何利用rj来加载执行呢?方式如下:
对比4.01,你会发现script标记多了一个data-main,rj用这个配置当前页面的主js,你要把逻辑都写在这个main.js里面。 当rj自身加载执行后,就会再次异步加载main.js。这个main.js是当前网页所有逻辑的入口,理想情况下,整个网页只需要这一个script标 记,利用rj加载依赖的其它文件,如jquery等。
require(['lib/foo', 'app/bar', 'app/app'], function(foo, bar, app) { //use foo bar app do sth });
在这段js中,我们利用rj提供的require方法,加载了三个模块,然后在这个三个模块都加载成功之后执行页面逻辑。require方法有2个 参数,第一个参数是数组类型的,实际使用时,数组的每个元素都是一个模块的module id,第二个参数是一个回调函数,这个函数在第一个参数定义的所有模块都加载成功后回调,形参的个数和顺序分别与第一个参数定义的模块对应,比如第一个模 块时lib/foo,那么这个回调函数的第一个参数就是foo这个模块的引用,在回调函数中我们使用这些形参来调用各个模块的方法,由于回调是在各模块加 载之后才调用的,所以这些模块引用肯定都是有效的。
在介绍rj如何去解析依赖的那些模块js的路径时,必须先弄清楚baseurl和module id这两个概念。
html中的base元素可以定义当前页面内部任何http请求的url前缀部分,rj的baseurl跟这个base元素起的作用是类似的,由于 rj总是动态地请求依赖的js文件,所以必然涉及到一个js文件的路径解析问题,rj默认采用一种baseurl moduleid的解析方式,这个解析方式后续会举例说明。这个baseurl非常重要,rj对它的处理遵循如下规则:
require.config({ baseurl: 'scripts' });
require.config({ baseurl: 'scripts' }); require(['lib/foo', 'app/bar', 'app/app'], function(foo, bar, app) { // use foo bar app do sth });
关于module id,就是在require方法以及后续的define方法里,用在依赖数组这个参数里,用来标识一个模块的字符串。上面代码中的['lib/foo', 'app/bar', 'app/app']就是一个依赖数组,其中的每个元素都是一个module id。值得注意的是,module id并不一定是该module 相关js路径的一部分,有的module id很短,但可能路径很长,这跟rj的解析规则有关。下一节详细介绍。
rj默认按baseurl module id的规则,解析文件,并且它默认要加载的文件都是js,所以你的module id里面可以不包含.js的后缀,这就是为啥你看到的module id都是lib/foo, app/bar这种形式了。有三种module id,不适用这种规则:
require.config({ baseurl: 'scripts' }); require(['/lib/foo', 'test.js', 'http://cdn.baidu.com/js/jquery'], function(foo, bar, app) { // use foo bar app do sth });
这三个module 都不会根据baseurl module id的规则来解析,而是直接用module id来解析,等效于下面的代码:
<script src="/lib/foo.js">script> <script src="test.js">script> <script src="http://cdn.baidu.com/js/jquery.js">script>
各种module id解析举例:
require.config({ baseurl: 'scripts' }); require(['lib/foo', 'app/bar', 'app/app'], function(foo, bar, app) { // use foo bar app do sth });
moduleid为:lib/foo, app/bar, app/app
根据baseurl moduleid,以及自动补后缀.js,最终这三个module的js文件路径为:
scripts/lib/foo.js scripts/app/bar.js scripts/app/app.js
require.config({ baseurl: 'scripts/lib', paths: { app: '../app' } }); require(['foo', 'app/bar', 'app/app'], function(foo, bar, app) { // use foo bar app do sth });
这里出现了一个新的配置paths,它的作用是针对module id中特定的部分,进行转义,如以上代码中对app这个部分,转义为../app,这表示一个相对路径,相对位置是baseurl所指定的目录,由项目结 构可知,../app其实对应的是scirpt/app目录。正因为有这个转义的存在,所以以上代码中的app/bar才能被正确解析,否则还按 baseurl moduleid的规则,app/bar不是应该被解析成scripts/lib/app/bar.js吗,但实际并非如此,app/bar被解析成 scripts/app/bar.js,其中起关键作用的就是paths的配置。通过这个举例,可以看出module id并不一定是js文件路径中的一部分,paths的配置对于路径过程的js特别有效,因为可以简化它的module id。
define(['./bar'], function(bar) { return { dosth: function() { bar.dosth(); } } });
上面的代码通过define定义了一个模块,这个define函数后面介绍如何定义模块的时候再来介绍,这里简单了解。这里这种用法的第一个参数跟 require函数一样,是一个依赖数组,第二个参数是一个回调,也是在所有依赖加载成功之后调用,这个回调的返回值会成为这个模块的引用被其它模块所使 用。
这里要说的问题还是跟解析规则相关的,如果完全遵守rj的解析规则,这里的依赖应该配置成app/bar才是正确的,但由于app.js与 bar.js位于同一个目录,所以完全可利用./这个同目录的相对标识符来解析js,这样的话只要app.js已经加载成功了,那么去同目录下找 bar.js就肯定能找到了。这种配置在定义模块的时候非常有意义,这样你的模块就不依赖于放置这些模块的文件夹名称了。
require.config({ paths: { foo: 'libs/foo-1.1.3' } });
require( ['foo'], function( foo ) { //foo is undefined });
define(['require', 'app/bar', 'app/app'], function(require) { var bar= require("app/bar"); var app= require("app/app"); //use bar and app do sth });
上面的代码,在callback定义的时候,只用了一个形参,这主要是为了减少形参的数量,避免整个回调的签名很长。依赖的模块在回调内部可以直接用require(moduleid)的参数得到,由于在回调执行前,依赖的模块已经加载,所以此处调用不会再重新加载。但是如果此处获取一个并不在依赖数组中出现的module id,require很有可能获取不到该模块引用,因为它可能需要重新加载,如果它没有在其它模块中被加载过的话。
define(id?, dependencies?, factory); 其中: id: 模块标识,可以省略。 dependencies: 所依赖的模块,可以省略。 factory: 模块的实现,或者一个javascript对象
1. 定义简单对象模块:
define({ bar:'i am bar.' });
require.config({ baseurl: 'scripts/lib', paths: { app: '../app' } }); require(['app/bar'], function(bar) { console.log(bar);// {bar: 'i am bar.'} });
2. 定义无依赖的模块:
define(function () { return { nodec: "yes, i don't need dependence." } });
require.config({ baseurl: 'scripts/lib', paths: { app: '../app' } }); require(['app/nodec'], function(nodec) { console.log(nodec);// {nodec: yes, i don't need dependence.'} });
3. 定义依赖其它模块的模块:
define(['jquery'], function($){ //use $ do sth ... return { usejq: true } });
require.config({ baseurl: 'scripts/lib', paths: { app: '../app' } }); require(['app/dec'], function(dec) { console.log(dec);//{usejq: true} });
4. 循环依赖:
define(['foo'],function(foo){ return { name: 'bar', hi: function(){ console.log('hi! ' foo.name); } } });
define(['app/bar'],function(bar){ return { name: 'foo', hi: function(){ console.log('hi! ' bar.name); } } });
require.config({ baseurl: 'scripts/lib', paths: { app: '../app' } }); require(['app/bar', 'foo'], function(bar, foo) { bar.hi(); foo.hi(); });
define(['require', 'foo'], function(require, foo) { return { name: 'bar', hi: function() { foo = require('foo'); console.log('hi! ' foo.name); } } });
define(['require', 'app/bar'], function(require, bar) { return { name: 'foo', hi: function() { bar = require('app/bar'); console.log('hi! ' bar.name); } } });
define(['require', 'app/bar'], function(require) { return { name: 'foo', hi: function() { var bar = require('app/bar'); console.log('hi! ' bar.name); } } });
1. 生成相对于模块的url地址
define(["require"], function(require) { var cssurl = require.to; });
2. 控制台调试
require.config({ baseurl: 'scripts/lib', paths: { app: '../app' }, shim: { underscore: { exports: '_' } } }); require(['underscore'], function(_) { // 现在可以通过_调用underscore的api了 });
require.config({ baseurl: 'scripts/lib', paths: { app: '../app' }, shim: { backbone: { deps: ['underscore', 'jquery'], exports: 'backbone' } } }); require(['backbone'], function(backbone) { //use backbone's api });
requirejs.config({ shim: { 'jquery.colorize': { deps: ['jquery'], exports: 'jquery.fn.colorize' }, 'jquery.scroll': { deps: ['jquery'], exports: 'jquery.fn.scroll' }, 'backbone.layoutmanager': { deps: ['backbone'] exports: 'backbone.layoutmanager' } } });
requirejs.config({ config: { 'bar': { size: 'large' }, 'baz': { color: 'blue' } } });
如你所见,config属性中的bar这一节是在用于module id为bar这个模块的,baz这一节是用于module id为baz这个模块的。具体使用以bar.js举例:
define(['module'], function(module) { //will be the value 'large'var size = module.config().size; });
requirejs获取资源时附加在url后面的额外的query参数。作为浏览器或服务器未正确配置时的“cache bust”手段很有用。使用cache bust配置的一个示例:
urlargs: "bust=" (new date()).gettime()
注意:如果你设置了enforcedefine: true,而且你使用data-main=""来加载你的主js模块,则该主js模块必须调用define()而不是require()来加载其所需的代码。主js模块仍然可调用require/requirejs来设置config值,但对于模块加载必须使用define()。比如原来的这段就会报错:
require.config({ enforcedefine: true, baseurl: 'scripts/lib', paths: { app: '../app' }, shim: { backbone: { deps: ['underscore', 'jquery'], exports: 'backbone' } } }); require(['backbone'], function(backbone) { console.log(backbone); });
define(['backbone'], function(backbone) { console.log(backbone); });
requirejs.config({ //to get timely, correct error triggers in ie, force a define/shim exports check. enforcedefine: true, paths: { jquery: [ 'http://ajax.googleapis.com/ajax/libs/jquery/1.4.4/jquery.min', //if the cdn location fails, load from this location 'lib/jquery' ] } }); //later require(['jquery'], function ($) { });
注意: paths备错仅在模块id精确匹配时工作。这不同于常规的paths配置,常规配置可匹配模块id的任意前缀部分。备错主要用于非常的错误恢复,而不是常规的path查找解析,因为那在浏览器中是低效的。
requirejs.onerror = function (err) { console.log(err.requiretype); if (err.requiretype === 'timeout') { console.log('modules: ' err.requiremodules); } throw err; };
如 果大量数据集中到某一个partition,那么这个partition在计算的时候就会成为瓶颈。图1是spark应用程序执行并发的示意图,在 spark中,同一个应用程序的不同stage是串行执行的,而同一stage中的不同task可以并发执行,task数目由partition数来决 定,如果某一个partition的数据量特别大,则相应的task完成时间会特别长,由此导致接下来的stage无法开始,整个job完成的时间就会非 常长。
要避免数据倾斜的出现,一种方法就是选择合适的key,或者是自己定义相关的partitioner。在spark中block使用 了bytebuffer来存储数据,而bytebuffer能够存储的最大数据量不超过2gb。如果某一个key有大量的数据,那么在调用cache或 persist函数时就会碰到spark-1476这个异常。
1. groupbykey
2. reducebykey
3. aggregatebykey
4. sortbykey
5. join
6. cogroup
7. cartesian
8. coalesce
9. repartition
10. repartitionandsortwithinpartitions
def rdd: rdd[t] } // todo view bounds are deprecated, should use context bounds // might need to change classmanifest for classtag in spark 1.0.0 case class demopairrdd[k <% ordered[k] : classmanifest, v: classmanifest]( rdd: rdd[(k, v)]) extends rddwrapper[(k, v)] { // here we use a single long to try to ensure the sort is balanced, // but for really large dataset, we may want to consider // using a tuple of many longs or even a guid def sortbykeygrouped(numpartitions: int): rdd[(k, v)] = rdd.map(kv => ((kv._1, random.nextlong()), kv._2)).sortbykey() .grouped(numpartitions).map(t => (t._1._1, t._2)) } case class demordd[t: classmanifest](rdd: rdd[t]) extends rddwrapper[t] { def grouped(size: int): rdd[t] = { // todo version where withindex is cached val withindex = rdd.mappartitions(_.zipwithindex) val startvalues = withindex.mappartitionswithindex((i, iter) => iterator((i, iter.toiterable.last))).toarray().tolist .sortby(_._1).map(_._2._2.tolong).scan(-1l)(_ _).map(_ 1l) withindex.mappartitionswithindex((i, iter) => iter.map { case (value, index) => (startvalues(i) index.tolong, value) }) .partitionby(new partitioner { def numpartitions: int = size def getpartition(key: any): int = (key.asinstanceof[long] * numpartitions.tolong / startvalues.last).toint }) .map(_._2) } }
implicit def todemordd[t: classmanifest](rdd: rdd[t]): demordd[t] = new demordd[t](rdd) implicit def todemopairrdd[k <% ordered[k] : classmanifest, v: classmanifest]( rdd: rdd[(k, v)]): demopairrdd[k, v] = demopairrdd(rdd) implicit def tordd[t](rdd: rddwrapper[t]): rdd[t] = rdd.rdd }
import rddconversions._ yourrdd.grouped(5)
spark 的shuffle过程非常消耗资源,shuffle过程意味着在相应的计算节点,要先将计算结果存储到磁盘,后续的stage需要将上一个stage的结 果再次读入。数据的写入和读取意味着disk i/o操作,与内存操作相比,disk i/o操作是非常低效的。
使用iostat来查看disk i/o的使用情况,disk i/o操作频繁一般会伴随着cpu load很高。
如果数据和计算节点都在同一台机器上,那么可以避免网络开销,否则还要加上相应的网络开销。 使用iftop来查看网络带宽使用情况,看哪几个节点之间有大量的网络传输。
图2是spark节点间数据传输的示意图,spark task的计算函数是通过akka通道由driver发送到executor上,而shuffle的数据则是通过netty网络接口来实现。由于akka 通道中参数spark.akka.framesize决定了能够传输消息的最大值,所以应该避免在spark task中引入超大的局部变量。
第1项 repartition vs. coalesce
第2节 reducebykey vs. groupby
reducebykey sc.textfile(“readme.md”).map(l=>l.split(“,”)).map(w=>(w,1)).reducebykey(_ _)
groupbykey sc.textfile(“readme.md”).map(l=>l.split(“,”)).map(w=>(w,1)).groupbykey.map(r=>(r._1,r._2.sum))
建议: 尽可能使用reducebykey, aggregatebykey, foldbykey和combinebykey
val data = sc.parallelize( list((0, 2.), (0, 4.), (1, 0.), (1, 10.), (1, 20.)) )
data.map(r=>(r._1, (r.2,1))).reducebykey((a,b)=>(a._1 b._1, a._2 b._2)).map(r=>(r._1,(r._2._1/r._2._2)).foreach(println)
data.combinebykey(value=>(value,1), (x:(double, int), value:double)=> (x._1 value, x._2 1), (x:(double,int), y:(double, int))=>(x._1 y._1, x._2 y._2))
第3节 broadcasthashjoin vs. shufflehashjoin
在join过程中,经常会遇到大表和小表的join. 为了提高效率可以使用broadcasthashjoin, 预先将小表的内容广播到各个executor, 这样将避免针对小表的shuffle过程,从而极大的提高运行效率。
val lst = 1 to 100 tolist val examplerdd = sc.makerdd(1 to 20 toseq, 2) val broadcastlst = sc.broadcast(lst) examplerdd.filter(i=>broadcastlst.valuecontains(i)).collect.foreach(println)
第4节 map vs. mappartitions
第5节 数据就地读取
spark在task执行的时候会尽优先考虑最快的数据获取方式,如果想尽可能的在更多的机器上启动task,那么可以通过调低spark.locality.wait的值来实现, 默认值是3s。
除 了hdfs,spark能够支持的数据源越来越多,如cassandra, hbase,mongodb等知名的nosql数据库,随着elasticsearch的日渐兴起,spark和elasticsearch组合起来提供 高速的查询凯发天生赢家一触即发官网的解决方案也成为一种有益的尝试。
上述提到的外部数据源面临的一个相同问题就是如何让spark快速读取其中的数据, 尽可能的将计算结点和数据结点部署在一起是达到该目标的基本方法,比如在部署hadoop集群的时候,可以将hdfs的datanode和spark worker共享一台机器。
以cassandra为例,如果spark的部署和cassandra的机器有部分重叠,那么在读取cassandra中数据的时候,通过调低spark.locality.wait就可以在没有部署cassandra的机器上启动spark task。
对于cassandra, 可以在部署cassandra的机器上部署spark worker,需要注意的是cassandra的compaction操作会极大的消耗cpu,因此在为spark worker配置cpu核数时,需要将这些因素综合在一起进行考虑。
private def addpendingtask(index: int, readding: boolean = false) { // utility method that adds `index` to a list only if readding=false or it's not already there def addto(list: arraybuffer[int]) { if (!readding || !list.contains(index)) { list = index } } for (loc <- tasks(index).preferredlocations) { loc match { case e: executorcachetasklocation => addto(pendingtasksforexecutor.getorelseupdate(e.executorid, new arraybuffer)) case e: hdfscachetasklocation => { val exe = sched.getexecutorsaliveonhost(loc.host) exe match { case some(set) => { for (e <- set) { addto(pendingtasksforexecutor.getorelseupdate(e, new arraybuffer)) } loginfo(s"pending task $index has a cached location at ${e.host} " ", where there are executors " set.mkstring(",")) } case none => logdebug(s"pending task $index has a cached location at ${e.host} " ", but there are no executors alive there.") } } case _ => unit } addto(pendingtasksforhost.getorelseupdate(loc.host, new arraybuffer)) for (rack <- sched.getrackforhost(loc.host)) { addto(pendingtasksforrack.getorelseupdate(rack, new arraybuffer)) } } if (tasks(index).preferredlocations == nil) { addto(pendingtaskswithnoprefs) } if (!readding) { allpendingtasks = index // no point scanning this whole list to find the old task there } }
override def getpreferredlocations(split: partition): seq[string] = { val essplit = split.asinstanceof[espartition] val ip = essplit.espartition.nodeip if (ip != null) seq(ip) else nil }
第6节 序列化
spark.serializer org.apache.spark.serializer.kryoserializer
需要特别指出的是persist函数是等到job执行的时候才会将数据缓存起来,属于延迟执行; 而unpersist函数则是立即执行,缓存会被立即清除。
作者简介:许鹏, 《apache spark源码剖析》作者,关注于大数据实时搜索和实时流数据处理,对elasticsearch, storm及drools多有研究,现就职于携程。
play uses , which supports the inclusion of arbitrary files by adding them to the mappings:
mappings in universal = (basedirectory.value / "scripts" * "*" get) map (x => x -> ("scripts/" x.getname))
the syntax assumes play 2.2.xval jdk8 = new file("d:\\jdk\\jdk8\\jre1_8_0_40")
mappings in universal = (jdk8 ** "*" get) map (x => x -> ("jre8/" jdk8.relativize(x).getorelse(x.getname)))
1 wget ftp://ftp.labs.sogou.com/data/sogouca/sogouca.tar.gz --ftp-user=hebin_hit@foxmail.com --ftp-password=4fqlsydncrdxvndi -r
1 gzip -d sogouca.tar.gz 2 tar -xvf sogouca.tar
1 cat *.txt > sogouca.txt 2 cat sogouca.txt | iconv -f gbk -t utf-8 -c | grep "" > corpus.txt
1 nohup ./word2vec -train resultbig.txt -output vectors.bin -cbow 0 -size 200 -window 5 -negative 0 -hs 1 -sample 1e-3 -threads 12 -binary 1 &
1 ./distance vectors.bin
4.2 潜在的语言学规律
4.3 聚类
1 nohup ./word2vec -train resultbig.txt -output classes.txt -cbow 0 -size 200 -window 5 -negative 0 -hs 1 -sample 1e-3 -threads 12 -classes 500 & 2 sort classes.txt -k 2 -n > classes_sorted_sogouca.txt
4.4 短语分析
1 ./word2phrase -train resultbig.txt -output sogouca_phrase.txt -threshold 500 -debug 2 2 ./word2vec -train sogouca_phrase.txt -output vectors_sogouca_phrase.bin -cbow 0 -size 300 -window 10 -negative 0 -hs 1 -sample 1e-3 -threads 12 -binary 1
1. word2vec:tool for computing continuous distributed representations of words,
