环境: Java21
本方法未使用任何第三方库,未处理ping-pong消息
package com.binance.quant.api.ws.wsTest;
import javax.net.ssl.SSLSocket;
import javax.net.ssl.SSLSocketFactory;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.Random;
public class BinanceWebSocketClient {
private SSLSocket socket;
private OutputStream out;
private InputStream in;
public void connect(String host, int port) throws Exception {
SSLSocketFactory factory = (SSLSocketFactory) SSLSocketFactory.getDefault();
socket = (SSLSocket) factory.createSocket(host, port);
out = socket.getOutputStream();
in = socket.getInputStream();
performHandshake(host);
}
private void performHandshake(String host) throws Exception {
// Generate WebSocket key
String key = generateWebSocketKey();
String request = String.format(
"GET /ws/btcusdt@bookTicker HTTP/1.1\r\n" +
"Host: %s\r\n" +
"Upgrade: websocket\r\n" +
"Connection: Upgrade\r\n" +
"Sec-WebSocket-Key: %s\r\n" +
"Sec-WebSocket-Version: 13\r\n" +
"\r\n", host, key);
// Send handshake request
PrintWriter writer = new PrintWriter(new OutputStreamWriter(out, StandardCharsets.UTF_8), true);
writer.print(request);
writer.flush();
// Read handshake response
byte[] buffer = new byte[1024];
int bytesRead;
StringBuilder response = new StringBuilder();
while ((bytesRead = in.read(buffer)) != -1) {
response.append(new String(buffer, 0, bytesRead, StandardCharsets.UTF_8));
if (response.toString().contains("\r\n\r\n")) { // End of headers
break;
}
}
System.out.println("Response: " + response.toString());
}
private String generateWebSocketKey() {
// Generate a WebSocket key for the handshake request
byte[] randomBytes = new byte[16];
new Random().nextBytes(randomBytes);
return Base64.getEncoder().encodeToString(randomBytes);
}
public void sendTextFrame(String message) throws Exception {
byte[] payload = message.getBytes(StandardCharsets.UTF_8);
int length = payload.length;
// Frame header
out.write(0x81); // FIN=1, Opcode=1 (text frame)
if (length <= 125) {
out.write(length);
} else if (length <= 65535) {
out.write(126);
out.write((length >> 8) & 0xFF);
out.write(length & 0xFF);
} else {
out.write(127);
out.write((length >> 56) & 0xFF);
out.write((length >> 48) & 0xFF);
out.write((length >> 40) & 0xFF);
out.write((length >> 32) & 0xFF);
out.write((length >> 24) & 0xFF);
out.write((length >> 16) & 0xFF);
out.write((length >> 8) & 0xFF);
out.write(length & 0xFF);
}
out.write(payload);
out.flush();
}
public String receiveTextFrame() throws Exception {
int opcode = in.read() & 0x0F;
int length = in.read() & 0x7F;
if (length == 126) {
length = (in.read() << 8) | in.read();
} else if (length == 127) {
length = (int) ((in.read() << 56) | (in.read() << 48) | (in.read() << 40) | (in.read() << 32) |
(in.read() << 24) | (in.read() << 16) | (in.read() << 8) | in.read());
}
byte[] payload = new byte[length];
int bytesRead = 0;
while (bytesRead < length) {
int result = in.read(payload, bytesRead, length - bytesRead);
if (result == -1) {
throw new Exception("Connection closed");
}
bytesRead += result;
}
return new String(payload, StandardCharsets.UTF_8);
}
public void close() throws Exception {
socket.close();
}
public static void main(String[] args) {
String host = "stream.binance.com"; // WebSocket server host
int port = 443; // Port for wss
try {
BinanceWebSocketClient client = new BinanceWebSocketClient();
client.connect(host, port);
// Send a ping message to start the connection
client.sendTextFrame("{\"method\": \"SUBSCRIBE\", \"params\": [\"btcusdt@bookTicker\"], \"id\": 1}");
// Wait and receive messages
while (true) {
String response = client.receiveTextFrame();
System.out.println(System.currentTimeMillis() + " " + response);;
}
// client.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
发表评论