使用Java 11 WebSocket API的Websocket客户端

对于一个小型加密项目,我想使用Bitfinex WebSocket API实时获取市场数据。

从Java SE 11开始,JDK包含一个客户端WebSocket API。Javadoc包含一些代码示例,但是您不能立即使用这些示例。在网上搜索“ java websocket client”将主要显示有关旧JSR 352 websocket的示例和指南。设置一切以完成工作并不像预期的那样简单,因此我编写了这个小教程。
只要适合,我都会在项目中使用vert.x,并且也有一个websocket客户端API。但是可悲的是vert.x websocket客户端有一些缺点,它不适用于重定向尽管我们不使用vert.x websocket客户端,但我们将vert.x用作小型应用程序的基础。
够多了,我们开始编码。要构建并连接到Websocket服务器,构建器需要一个侦听器,该侦听器将侦听传入的数据包:
class BitfinexListener(val vertx: Vertx) : WebSocket.Listener {
    override fun onOpen(webSocket: WebSocket?) {
        super.onOpen(webSocket)
        LOGGER.info(“websocket opened”)
        this.vertx.periodicStream(60000).toObservable()
          .subscribe { i ->
            val pingTxt = JsonObject().put(“event”, “ping”)
                           .put(“cid”, Random(2020).nextInt())
                           .encode()
            webSocket?.sendText(pingTxt, true)?
             .thenRun {-> LOGGER.info(“sent ping {}”, pingTxt)}
          }
    }
    var parts: MutableListSequence?> = 
            MutableList(0) { index: Int -> “” }
    var accumulatedMessage: CompletableFuture<*> =
            CompletableFuture<Any>()
    override fun onText(webSocket: WebSocket,
                         message: CharSequence?,
                         last: Boolean): CompletionStage<*>? {
        parts.add(message)
        webSocket.request(1)
        if (last) {
            val completeMessage = parts.joinToString(separator = “”) 
                   { charSequence -> charSequence ?: “” }
            parts.clear()
            accumulatedMessage.complete(null)
            val cf: CompletionStage<*> = accumulatedMessage
            accumulatedMessage = CompletableFuture<Any>()
            onMessage(completeMessage)
            return cf
        }
        return accumulatedMessage
    }
    fun onMessage(message: String) {
        val bitfinexMessage = Json.decodeValue(message)
        // … see repo at github for full code
    }
}
我们重写onOpen方法以建立对bitfinex的定期ping。重要的一件事是,当您覆盖onOpen时,必须调用super.onOpen,否则客户端不会向服务器发送任何数据。
我花了很多时间才发现这个错误。由于侦听器是一个接口,所以我不习惯于调用接口的超级方法。但在这种情况下这很重要。java8中引入的接口中默认方法的概念对我来说还没有成为第二天性。
onText消息将收集所有传输的文本数据,直到文本完成为止(通常是一次调用inText的情况)。文本完成后,将通过onMessage方法中的vert.x事件总线发送文本。
现在,我们可以使用此侦听器设置一个表示与bitfinex的连接的顶点:
class BitfinexConnection : AbstractVerticle() {
    var webSocket: WebSocket? = null
    override fun start() {
        LOGGER.info(“deploying BitfinexConnection”)
        val subs = vertx.sharedData()
                .getLocalMap<Int,String>(“bitfinex.subscriptions”)
        val listener = BitfinexListener(this.vertx, subs)
        val client = HttpClient.newHttpClient()
        val uri = URI.create(“wss://api-pub.bitfinex.com/ws/2”)
        this.webSocket = client.newWebSocketBuilder()
                     .buildAsync(uri), listener).join()
        vertx.eventBus()
             .consumerBITFINEX_EB_ADDRESS)
             .handler { jsonMsg ->
          if ( webSocket==null || webSocket?.isOutputClosed()!!) {
              jsonMsg.reply(JsonObject()
                            .put(“message”, “websocket closed”)
                            .put(“statusCode”,503))
                [email protected]
          }
          val bitfinexMessage = jsonMsg.body().encode()
          this.webSocket?.sendText(bitfinexMessage,true)?
           .thenRun{ LOGGER.debug(“delivered {} “, bitfinexMessage)}
        }
    }
}
该类非常简单明了,在verticle的start方法中,将初始化并启动与公共bitfinex api的websocket连接。应通过vert.x事件总线将消息直接发送到websocket,因此我们在地址BITFINEX_EB_ADDRESS上启动使用者。
在最后一步,我们将所有内容放在一起,并尝试订阅tBTCUSD代码:
fun main() {
  val vertx = Vertx.vertx()
  val symbol = “tBTCUSD”
  vertx.rxDeployVerticle(BitfinexConnection::class.java.name)
    .subscribe(
        { id ->
            LOGGER.info(“deployed bitfinex connection {}”, id)
            val address = “ticker.” + symbol
            vertx.eventBus()
               .consumer<JsonArray>(address)
               .handler { jsonMsg ->
                  LOGGER.info(“received {} {}”, address,
                        jsonMsg.body().encodePrettily())
               }
            val subscribeMessage = JsonObject()
                  .bfxSubscribeTickerMessage(symbol)           
            vertx.eventBus()
                .send(BITFINEX_EB_ADDRESS, subscribeMessage)
       },
       { t: Throwable? -> 
         LOGGER.error(“deployment failed”, t) }
   )
}
main方法启动Vertx并部署bitfinex Websocket Verticle。部署完成后,我们将带有订阅有效负载的消息发送到websocket verticle,并在特殊的eventbus地址上启动使用者。
您可以在github上找到所有代码,对其进行克隆,以确保已安装Java 11或更高版本,并使用以下命令运行它:
./gradlew run
我希望本教程将为您快速启动项目中的websocket客户端代码。