Finagle是Tweiiter’s 的RPC 系统。这个博客介绍了它的动机和它的核心设计理念,finagle README包含了详细的文档。Finagle 致力于让构建健壮的客户端和服务器更加容易。课程包括
我们将要讨论一些不是标准Scala 的代码。如果你想要使用REPL 学习,你可能会奇怪怎样获得一个像Twitter 的Finagle 那样的Scala 的REPL ,以及获得它们相关的东西。
你一定想要看看Finagle 源代码
如果你的finagle 路径下有Finagle 源码,你可以在控制台里这样
$ cd finagle
$ ./sbt "project finagle-http" console
...build output...
scala>
Finagle 使用com.twitter.util.Future 来编码延迟的操作。Future 处理一个不在可见的值。Finagle 使用Futures 作为它的异步API 的返回值。同步API 在返回前等待结果;异步API 不会这样做。比如,HTTP 每隔半秒请求一些网络上的可能没有返回值的服务。你不希望你的程序每隔半秒就执行一次代码。“Slow” API 可以马上返回一个Future 同时在它的值被解析的时候添加。
val myFuture = MySlowService(request) // returns right away
...do other things...
val serviceResult = myFuture.get() // blocks until service "fills in" myFuture
实际上,你不想写发送请求同时在一些语句后面调用myFuture.get 的代码。Future 有一个方法当值可见的时候来注册回调。
如果你使用其它的异步API,可能在你看到”callback“这个单词时会畏惧。你可能会把它和非法的代码流程联系起来,它们在那里调用函数隐藏。但是Futures 可以利用Scala 的头等函数优势,来展现一个更可读的代码流程。你可一在你调用的地方简单的定一个处理函数。
比如,写代码调度request 请求,接着处理响应,你可以把代码放在一起:
val future = dispatch(req) // returns immediately, but future is "empty"
future onSuccess { reply => // when the future gets "filled", use its value
println(reply)
}
你可以在REPL 里面练习一下Futures。在真实的代码里学习使用它们可是一个糟糕的想法,不过好像可也一学习一下API。当你使用REPL时,Promise 是一个方便的类,它是抽象的Future 类的一个具体的子类。你可以使用它创建还没有值的Funture。
scala> import com.twitter.util.{Future,Promise}
import com.twitter.util.{Future, Promise}
scala> val f6 = Future.value(6) // create already-resolved future
f6: com.twitter.util.Future[Int] = com.twitter.util.ConstFuture@c63a8af
scala> f6.get()
res0: Int = 6
scala> val fex = Future.exception(new Exception) // create resolved sad future
fex: com.twitter.util.Future[Nothing] = com.twitter.util.ConstFuture@38ddab20
scala> fex.get()
java.lang.Exception
... stack trace ...
scala> val pr7 = new Promise[Int] // create unresolved future
pr7: com.twitter.util.Promise[Int] = Promise@1994943491(...)
scala> pr7.get()
...console hangs, waiting for future to resolve...
Ctrl-C
Execution interrupted by signal.
scala> pr7.setValue(7)
scala> pr7.get()
res1: Int = 7
scala>
当你在真实代码里面使用Futures 时,你可能不会调用get;做为代替,你使用callback 函数。get 只是在REPL 修修补补时很可爱。
Futures 组合了合量API 的相似(比如,map,flatMap)。一个集合量组合,你重新调用,这让你表达事情就像”我有一个integers 的List以及一个平方函数:我的integers 的平方函数的List 的映射。“这也太整齐了;你可以把组合函数和其它的函数放在一起来有效的定义一个新函数。一个Future 组合让你像这样表达事情”我有一个假想的Future integer 和一个平方函数:一个我假想的integer 的Furure 平方的映射。“
如果你定义了一个异步API,一个请求值进来你的API 返回一个被Future 包围的响应。所以,这些把输入和函数转化成Funtures 的组合真是太特么的有用了:它们帮助你定义对你的异步API 来讲的其它异步APIs。
最重要的Future 组合是flatMap:
def Future[A].flatMap[B](f: A => Future[B]): Future[B]
flatMap 给两个futures 排序。也就是说,它接受一个Future 以及一个异步函数并且返回另一个Future。 method 的签名说明一个问题:给定一个成功的future 值,函数f 提供下一个Future。flatMap 自动调用f if/when 输入Future 成功的完成。这个操作的结果是另一个Future,它仅仅当所有的futures 都完成的时候才算完成。如果有失败的Future,那么给出的Future 也会是失败的。这个implicit 交叉的错误允许我们仅仅在这些语义显著的地方处理错误。flatMap 是组合这些语义的标准的名字。
如果你有一个Future 并且你想给它的值应用一个异步API。比如,假如你有一个Future[User] 以及需要一个Future[Boolean] 来说明这个封闭的用户是否被禁止了。这里有一个isBanned API 来决定这个用户是否被禁止了,但是它是异步的。你可以使用flatMap:
scala> import com.twitter.util.{Future,Promise}
import com.twitter.util.{Future, Promise}
scala> class User(n: String) { val name = n }
defined class User
scala> def isBanned(u: User) = { Future.value(false) }
isBanned: (u: User)com.twitter.util.Future[Boolean]
scala> val pru = new Promise[User]
pru: com.twitter.util.Promise[User] = Promise@897588993(...)
scala> val futBan = pru flatMap isBanned // apply isBanned to future
futBan: com.twitter.util.Future[Boolean] = Promise@1733189548(...)
scala> futBan.get()
...REPL hangs, futBan not resolved yet...
Ctrl-C
Execution interrupted by signal.
scala> pru.setValue(new User("prudence"))
scala> futBan.get()
res45: Boolean = false
scala>
同样的,为了给Future 应用一个异步函数,使用map。比如,假设你有一个Future[RawCredentials] 和一个Future[Credentials]。你有一个异步normalize 函数从RawCredentials 到Credentials。你可以使用map:
cala> class RawCredentials(u: String, pw: String) {
| val username = u
| val password = pw
| }
defined class RawCredentials
scala> class Credentials(u: String, pw: String) {
| val username = u
| val password = pw
| }
defined class Credentials
scala> def normalize(raw: RawCredentials) = {
| new Credentials(raw.username.toLowerCase(), raw.password)
| }
normalize: (raw: RawCredentials)Credentials
scala> val praw = new Promise[RawCredentials]
praw: com.twitter.util.Promise[RawCredentials] = Promise@1341283926(...)
scala> val fcred = praw map normalize // apply normalize to future
fcred: com.twitter.util.Future[Credentials] = Promise@1309582018(...)
scala> fcred.get()
...REPL hangs, fcred doesn't have a value yet...
Ctrl-C
Execution interrupted by signal.
scala> praw.setValue(new RawCredentials("Florence", "nightingale"))
scala> fcred.get().username
res48: String = florence
scala>
Scala 有一个调用flatMap 的简写:for。假如你想通过异步API 认证登录请求同时通过其它API 检查看看用户是不是被禁止了。通过for-comperhensions,我们可以这样写:
scala> def authenticate(req: LoginRequest) = {
| // TODO: we should check the password
| Future.value(new User(req.username))
| }
authenticate: (req: LoginRequest)com.twitter.util.Future[User]
scala> val f = for {
| u <- authenticate(request)
| b <- isBanned(u)
| } yield (u, b)
f: com.twitter.util.Future[(User, Boolean)] = Promise@35785606(...)
scala>
创造了一个 future f: Future[(User, Boolean)],含有用户object 和一个Boolean 说明用户的速率是否被限制。注意 sequential 组合 是在这里需要rateLimit 接受一个输出认证的参数。
你可能会一次在多于一个服务上抓取数据。比如,如果你写的web 服务上面包含显示的内容和广告,这可能会在一个服务上抓取内容在另一个服务上抓取广告。但是你怎样告诉你的代码等待每一个的回复呢?如果你自己写的话可能你会觉得很奇怪,作为替代,你可以使用concurrent 组合。
Future 提供一些concurrent 组件,它们把一个序列的Future 转换成为一个在有点不同的方式的Future。这样很好,因为这允许你(潜意识的)把一些Funtures 打包成为一个Future。
object Future {
…
def collect[A](fs: Seq[Future[A]]): Future[Seq[A]]
def join(fs: Seq[Future[_]]): Future[Unit]
def select(fs: Seq[Future[A]]) : Future[(Try[A], Seq[Future[A]])]
}
collect 接受一组相同类型的Futures,产生一组这个类型的值的Future。这个future 在所有相关的futures 完成时或者任何一个产生错误时完成。返回一组按照传入的顺序的一组请求。
scala> val f2 = Future.value(2)
f2: com.twitter.util.Future[Int] = com.twitter.util.ConstFuture@13ecdec0
scala> val f3 = Future.value(3)
f3: com.twitter.util.Future[Int] = com.twitter.util.ConstFuture@263bb672
scala> val f23 = Future.collect(Seq(f2, f3))
f23: com.twitter.util.Future[Seq[Int]] = Promise@635209178(...)
scala> val f5 = f23 map (_.sum)
f5: com.twitter.util.Future[Int] = Promise@1954478838(...)
scala> f5.get()
res9: Int = 5
join 接受一组可能是混合类型的Futures ,产生一个Future[Unit],当所有相关的futures完成时(或者其中有失败的)。这对于说明一组异构操作的完成是很有用的。这对于内容或广告例子可能会是一个很好的解决方案。
scala> val ready = Future.join(Seq(f2, f3))
ready: com.twitter.util.Future[Unit] = Promise@699347471(...)
scala> ready.get() // doesn't ret value, but I know my futures are done
scala>
select 返回一个Future,当第一个给定的Future 完成时它完成。它返回Future 和一个包含剩余为完成Futures 的序列。(它没有做任何事来取消剩余的Futures。如果你想要等待更多的响应,等呗。如过你想取消剩余的响应,随你便。)
scala> val pr7 = new Promise[Int] // unresolved future
pr7: com.twitter.util.Promise[Int] = Promise@1608532943(...)
scala> val sel = Future.select(Seq(f2, pr7)) // select from 2 futs, one resolved
sel: com.twitter.util.Future[...] = Promise@1003382737(...)
scala> val(complete, stragglers) = sel.get()
complete: com.twitter.util.Try[Int] = Return(2)
stragglers: Seq[...] = List(...)
scala> complete.get()
res110: Int = 2
scala> stragglers(0).get() // our list of not-yet-finished futures has one item
...get() hangs the REPL because this straggling future is not finished...
Ctrl-C
Execution interrupted by signal.
scala> pr7.setValue(7)
scala> stragglers(0).get()
res113: Int = 7
scala>
这些组合表达了网络服务的操作类型。这个假想代码执行率限制(为了获得一个本地缓存限制率)同时调度了一个代表用户终端的请求。
// Find out if user is rate-limited. This can be slow; we have to ask
// the remote server that keeps track of who is rate-limited.
def isRateLimited(u: User): Future[Boolean] = {
...
}
// Notice how you can swap this implementation out now with something that might
// implement a different, more restrictive policy.
// Check the cache to find out if user is rate-limited. This cache
// implementation is just a Map, and can return a value right way. But we
// return a Future anyhow in case we need to use a slower implementation later.
def isLimitedByCache(u: User): Future[Boolean] = Future.value(limitCache(u))
// Update the cache
def setIsLimitedInCache(user: User, v: Boolean) { limitCache(user) = v }
// Get a timeline of tweets... unless the user is rate-limited (then throw
// an exception instead)
def getTimeline(cred: Credentials): Future[Timeline] =
isLimitedByCache(cred.user) flatMap {
case true => Future.exception(new Exception("rate limited"))
case false =>
// First we get auth'd user then we get timeline.
// Sequential composition of asynchronous APIs: use flatMap
val timeline = auth(cred) flatMap(getTimeline)
val limited = isRateLimited(cred.user) onSuccess(
setIsLimitedInCache(cred.user, _))
// 'join' concurrently combines differently-typed futures
// 'flatMap' sequentially combines, specifies what to do next
timeline join limited flatMap {
case (_, true) => Future.exception(new Exception("rate limited"))
case (timeline, _) => Future.value(timeline)
}
}
}
这个假想的例子组合了sequential 和concurrent 组件。注意,除了转换一个限制率回复了一个异常这里没有明确的错误处理。如果任何future 在这里失败了,它自动传播返回的Future。
你已经看到了怎样使用含有Futures 的组合,但是可能会想要更多的例子。假设你有一个简单的Internet 的模型。它有一个HTML 页面和图片。页面可以链接到图片或者其它页面。你可以抓取一个页面或者一个图片,但是API 是异步的。这个假的API 调用这些“fetchable”的资源。
import com.twitter.util.{Try,Future,Promise}
// a fetchable thing
trait Resource {
def imageLinks(): Seq[String]
def links(): Seq[String]
}
// HTML pages can link to Imgs and to other HTML pages.
class HTMLPage(val i: Seq[String], val l: Seq[String]) extends Resource {
def imageLinks() = i
def links = l
}
// IMGs don't actually link to anything else
class Img() extends Resource {
def imageLinks() = Seq()
def links() = Seq()
}
// profile.html links to gallery.html and has an image link to portrait.jpg
val profile = new HTMLPage(Seq("portrait.jpg"), Seq("gallery.html"))
val portrait = new Img
// gallery.html links to profile.html and two images
val gallery = new HTMLPage(Seq("kitten.jpg", "puppy.jpg"), Seq("profile.html"))
val kitten = new Img
val puppy = new Img
val internet = Map(
"profile.html" -> profile,
"gallery.html" -> gallery,
"portrait.jpg" -> portrait,
"kitten.jpg" -> kitten,
"puppy.jpg" -> puppy
)
// fetch(url) attempts to fetch a resource from our fake internet.
// Its returned Future might contain a Resource or an exception
def fetch(url: String) = { new Promise(Try(internet(url))) }
假设你想抓取页面的第一个图片,给定这个页面的URL。假设你正在制作一个用户可以把他们感兴趣的链接发送过来的页面。为了帮助其它人决定这个链接是否值得跟随,你想要显示一个这个链接页面的第一个图片的缩略图。
如果你不了解组合器,你可能仍然在写一个thumbnail-getting 的函数:
def getThumbnail(url: String): Future[Resource]={
val returnVal = new Promise[Resource]
fetch(url) onSuccess { page => // callback for successful page fetch
fetch(page.imageLinks()(0)) onSuccess { p => // callback for successful img fetch
returnVal.setValue(p)
} onFailure { exc => // callback for failed img fetch
returnVal.setException(exc)
}
} onFailure { exc => // callback for failed page fetch
returnVal.setException(exc)
}
returnVal
}
这个版本的函数工作良好。大多数的函数由未封装的Futures 组成,接着就把内容放到了另一个Future里面。
我们想先获得一个页面,然后在或者这个页面的图片。如果你想要先得到A 后的到B,这通常意味着sequential 组合。直到我们的B 是异步的,我们想要flatMap:
def getThumbnail(url: String): Future[Resource] =
fetch(url) flatMap { page => fetch(page.imageLinks()(0)) }
获取到了页面的第一个图片了很好,但是也许我们会把所有的图片都获取出来然后让用户选择他们最喜欢的。我们可以写一个for 循环来一个接一个的抓取它们,但是这可能会花费很长时间。我们想要并行的获取它们。如果你想要让事情并行的发生,通常意味着concurrent 组件。所以我们使用Future.collect 来抓取所有的图片:
def getThumbnails(url:String): Future[Seq[Resource]] =
fetch(url) flatMap { page =>
Future.collect(
page.imageLinks map { u => fetch(u) }
)
}
如果这对你来说很重要,非常好。你可能会担心这一行,page.imageLinks map{ u => fetch(u) }:它使用了map 之后的事情是map 返回了Future。假设下一个事情不返回一个Future呢?但是注意在map 不是Furure 之前的事情;它是一个collection.collection 返回一个集合量的映射函数;我们使用Future.collect 来把Futures 的集合量收集进集合量里。
和页面抓取图片不同,我们可能抓取链接到的其它图片。如果我们递归做这件事,我们就做了一个简单的web 蜘蛛。
// Return
def crawl(url: String): Future[Seq[Resource]] =
fetch(url) flatMap { page =>
Future.collect(
page.links map { u => crawl(u) }
) map { pps => pps.flatten }
}
crawl("profile.html")
...hangs REPL, infinite loop...
Ctrl-C
Execution interrupted by signal.
scala>
// She's gone rogue, captain! Have to take her out!
// Calling Thread.stop on runaway Thread[Thread-93,5,main] with offending code:
// scala> crawl("profile.html")
实际上,这个web 蜘蛛没有什么用:我们没有告诉它什么时候停止爬行;它很高兴的重复爬行只是抓取更早一刻的资源。
一个Finagle 服务器代表一个处理RPC 的服务,接受请求然后返回一个响应。一个服务是一个函数Req => Funture[Req] 对于一些请求和响应类型。
abstract class Service[-Req,+Req] extends (Req => Future[Req])
我们定义了客户端和服务器。
一个Finagle 客户端从网络导入了一个服务。从概念上讲,一个Finagle 客户端有两个部分:
相似的,一个Finagle 服务向网络导出一个服务。一个服务器有两个部分:
这个分隔了从数据流怎样通过网络的配置服务逻辑。
我们也谈论了Fiagle 的过滤器。一个站点过滤服务,修改流过它的数据。过滤器很好的和服务组合。比如,如果你有一个比率限制过滤器以及一个tweet-serving 服务,你可以把它们放在一起做为比率限制过滤和tweet-serving 服务。
一个Finagle 客户端导入一个服务。它有一些关于怎样通过网络发送数据的配置。一个简单的HTTP 客户端可能看起来这样。
import org.jboss.netty.handler.codec.http.{DefaultHttpRequest, HttpRequest, HttpResponse, HttpVersion, HttpMethod}
import com.twitter.finagle.Service
import com.twitter.finagle.builder.ClientBuilder
import com.twitter.finagle.http.Http
// Don't worry, we discuss this magic "ClientBuilder" later
val client: Service[HttpRequest, HttpResponse] = ClientBuilder()
.codec(Http())
.hosts("twitter.com:80") // If >1 host, client does simple load-balancing
.hostConnectionLimit(1)
.build()
val req = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/")
val f = client(req) // Client, send the request
// Handle the response:
f onSuccess { res =>
println("got response", res)
} onFailure { exc =>
println("failed :-(", exc)
}
在服务和关于怎样监听从网络上来的请求方面的服务器定义。一个简单的服务器看起来可能向下面这样:
import com.twitter.finagle.Service
import com.twitter.finagle.http.Http
import com.twitter.util.Future
import org.jboss.netty.handler.codec.http.{DefaultHttpResponse, HttpVersion, HttpResponseStatus, HttpRequest, HttpResponse}
import java.net.{SocketAddress, InetSocketAddress}
import com.twitter.finagle.builder.{Server, ServerBuilder}
import com.twitter.finagle.builder.ServerBuilder
// Define our service: OK response for root, 404 for other paths
val rootService = new Service[HttpRequest, HttpResponse] {
def apply(request: HttpRequest) = {
val r = request.getUri match {
case "/" => new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK)
case _ => new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NOT_FOUND)
}
Future.value(r)
}
}
// Serve our service on a port
val address: SocketAddress = new InetSocketAddress(10000)
val server: Server = ServerBuilder()
.codec(Http())
.bindTo(address)
.name("HttpServer")
.build(rootService)
即使我们在这些例子里没有用到,强制性名字对剖析和调试还是很有用的。
Filter 是一个转换服务。它们可以提供服务生成函数。比如,你可能有一些应该支持限制率的服务;你可以写一个限制率过滤器并把它应用到你所有的服务。Filter 对分解不同阶段的服务也是不错的。
一个简单的代理可能看起来如下:
class MyService(client: Service[..]) extends Service[HttpRequest, HttpResponse]
{
def apply(request: HttpRequest) = {
client(rewriteReq(request)) map { res =>
rewriteRes(res)
}
}
}
rewriteReq 和 rewriteRes 可以提供协议转换,比如。
abstract class Filter[-ReqIn, +RepOut, +ReqOut, -RepIn]
extends ((ReqIn, Service[ReqOut, RepIn]) => Future[RepOut])
这样的视图看起来能更好:
((ReqIn, Service[ReqOut, RepIn])
=> Future[RepOut])
(* Service *)
[ReqIn -> (ReqOut -> RepIn) -> RepOut]
这里是一个提供服务超时机制的过滤器。
class TimeoutFilter[Req, Rep](
timeout: Duration,
exception: RequestTimeoutException,
timer: Timer)
extends Filter[Req, Rep, Req, Rep]
{
def this(timeout: Duration, timer: Timer) =
this(timeout, new IndividualRequestTimeoutException(timeout), timer)
def apply(request: Req, service: Service[Req, Rep]): Future[Rep] = {
val res = service(request)
res.within(timer, timeout) rescue {
case _: java.util.concurrent.TimeoutException =>
res.cancel()
Trace.record(TimeoutFilter.TimeoutAnnotation)
Future.exception(exception)
}
}
}
这个例子展示了你可能提供认证(通过一个认证服务)为了把Service[AuthHttpReq,HttpRep] 转换成为Service[HttpReq,HttpRep]。
class RequireAuthentication(authService: AuthService)
extends Filter[HttpReq, HttpRep, AuthHttpReq, HttpRep] {
def apply(
req: HttpReq,
service: Service[AuthHttpReq, HttpRep]
) = {
authService.auth(req) flatMap {
case AuthResult(AuthResultCode.OK, Some(passport), _) =>
service(AuthHttpReq(req, passport))
case ar: AuthResult =>
Future.exception(
new RequestUnauthenticated(ar.resultCode))
}
}
}
用这种方式使用过滤器有很多好处。它帮助你保持在一个地方的“认证逻辑”。拥有一个独立的类型授权的请求,使得它更容易保证程序安全性的原因。
过滤器可以和andThen 组合在一起。提供一个Service 作为一个参数,andThen 创造一个(过滤器)服务(提供次类型的例子)
val authFilter: Filter[HttpReq, HttpRep, AuthHttpReq, HttpRep]
val timeoutfilter[Req, Rep]: Filter[Req, Rep, Req, Rep]
val serviceRequiringAuth: Service[AuthHttpReq, HttpRep]
val authenticateAndTimedOut: Filter[HttpReq, HttpRep, AuthHttpReq, HttpRep] =
authFilter andThen timeoutFilter
val authenticatedTimedOutService: Service[HttpReq, HttpRep] =
authenticateAndTimedOut andThen serviceRequiringAuth
Builders 把所有的东西都放在一起。给定一组参数,ClientBuilder 会产生一个服务实例,一个ServerBuilder 接受一个Service 实例然后在分派传入到它的请求。为了确定Service 的类型,我们必须提供一个Codec。Codec 提供相关的协议实现(比如,HTTP,thrift,memcached)。Buildeers 通常都有很多参数,需要的却很少。
这是一个ClientBuilder 注解的例子(类型提供illustration):
val client: Service[HttpRequest, HttpResponse] = ClientBuilder()
.codec(Http)
.hosts("host1.twitter.com:10000,host2.twitter.com:10001,host3.twitter.com:10003")
.hostConnectionLimit(1)
.tcpConnectTimeout(1.second)
.retries(2)
.reportTo(new OstrichStatsReceiver)
.build()
这构建了一个通过三个给定的hosts 负载均衡的客户端,构建时每个host 不止有一个连接,仅当两个失败后才放弃。统计被报告在ostrich。需要下面的构建选项,(它们代表静态强制执行):hosts 或者cluster,codec 以及hostConnectionLimit。
类似的,你可以使用一个ServerBuilder 来让你的服务监听输入的请求:
val service = new MyService(...) // construct instance of your Finagle service
var filter = new MyFilter(...) // and maybe some filters
var filteredServce = filter andThen service
val server = ServerBuilder()
.bindTo(new InetSocketAddress(port))
.codec(ThriftServerFramedCodec())
.name("my filtered service")
// .hostConnectionMaxLifeTime(5.minutes)
// .readTimeout(2.minutes)
.build(filteredService)
这将开始服务,端对端的,一个通过分派请求来节约服务开销的服务。如果我们取消掉hostConnectionMaxLifeTime 行的注释,每一个链接会允许在保持联通。如果我们取消掉 readTimeout 行的注释,我们需要每隔2分钟发送一个请求。需要的ServerBuiler 选项是:name,bindTo 和 codec。
Finagle 自动调整线程来保持服务运行的平滑。不管怎样,如果你服务block,它可以block 所有的Finagle 线程。
如果你的代码调用了blocking 操作(apply 或者 get),使用了Future 池包裹了blocking code。这就会在它自己的线程池里面运行blocking 操作。
如果你的代码使用了sequential 组件,不要担心它会在这些Futures 上面blocking。
注意,这里有其它的”Future” 类。不要通过scala.actor.Future 或java.util.concurrent.Future 来迷惑com.twitter.util.Future!
如果你学习了类型系统或者/以及category 理论,你会很高兴的发现flatMap 就想要与monadic 绑定。