feat: use web socket to connect with debug server

This commit is contained in:
hyb1996 2018-09-14 14:41:03 +08:00
parent bb7cb6bd1a
commit fc0f5c479e
5 changed files with 145 additions and 270 deletions

View File

@ -18,14 +18,16 @@ import org.autojs.autojs.BuildConfig;
import org.autojs.autojs.tool.EmptyObservers;
import java.io.IOException;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import io.reactivex.Observable;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.subjects.PublishSubject;
import okhttp3.OkHttpClient;
import okhttp3.Request;
/**
* Created by Stardust on 2017/5/11.
@ -71,7 +73,7 @@ public class DevPluginService {
private final DevPluginResponseHandler mResponseHandler = new DevPluginResponseHandler();
private final Handler mHandshakeTimeoutHandler = new Handler(Looper.getMainLooper());
private volatile JsonSocket mSocket;
private volatile JsonWebSocket mSocket;
public static DevPluginService getInstance() {
return sInstance;
@ -105,7 +107,7 @@ public class DevPluginService {
}
@AnyThread
public Observable<JsonSocket> connectToServer(String host) {
public Observable<JsonWebSocket> connectToServer(String host) {
int port = PORT;
String ip = host;
int i = host.lastIndexOf(':');
@ -114,23 +116,32 @@ public class DevPluginService {
ip = host.substring(0, i);
}
mConnectionState.onNext(new State(State.CONNECTING));
return socket(ip, port)
.observeOn(AndroidSchedulers.mainThread())
.doOnError(this::onSocketError);
}
@AnyThread
private Observable<JsonSocket> socket(String ip, int port) {
return Observable.fromCallable(() -> {
JsonSocket socket = new JsonSocket(new Socket(ip, port));
socket.data()
.observeOn(AndroidSchedulers.mainThread())
.doOnComplete(() -> mConnectionState.onNext(new State(State.DISCONNECTED)))
.subscribe(data -> onSocketData(socket, data), this::onSocketError);
sayHelloToServer(socket);
return socket;
})
.subscribeOn(Schedulers.io());
private Observable<JsonWebSocket> socket(String ip, int port) {
OkHttpClient client = new OkHttpClient.Builder()
.readTimeout(0, TimeUnit.MILLISECONDS)
.build();
String url = ip + ":" + port;
if (!url.startsWith("ws://") && !url.startsWith("wss://")) {
url = "ws://" + url;
}
return Observable.just(new JsonWebSocket(client, new Request.Builder()
.url(url)
.build()))
.doOnNext(socket -> {
mSocket = socket;
socket.data()
.observeOn(AndroidSchedulers.mainThread())
.doOnComplete(() -> mConnectionState.onNext(new State(State.DISCONNECTED)))
.subscribe(data -> onSocketData(socket, data), this::onSocketError);
sayHelloToServer(socket);
});
}
@MainThread
@ -144,7 +155,7 @@ public class DevPluginService {
}
@MainThread
private void onSocketData(JsonSocket jsonSocket, JsonElement element) {
private void onSocketData(JsonWebSocket jsonWebSocket, JsonElement element) {
if (!element.isJsonObject()) {
Log.w(LOG_TAG, "onSocketData: not json object: " + element);
return;
@ -152,14 +163,14 @@ public class DevPluginService {
JsonObject obj = element.getAsJsonObject();
JsonElement type = obj.get("type");
if (type != null && type.isJsonPrimitive() && type.getAsString().equals(TYPE_HELLO)) {
onServerHello(jsonSocket, obj);
onServerHello(jsonWebSocket, obj);
return;
}
mResponseHandler.handle(obj);
}
@WorkerThread
private void sayHelloToServer(JsonSocket socket) throws IOException {
private void sayHelloToServer(JsonWebSocket socket) throws IOException {
writeMap(socket, TYPE_HELLO, new MapEntries<String, Object>()
.entry("device_name", Build.BRAND + " " + Build.MODEL)
.entry("client_version", CLIENT_VERSION)
@ -174,36 +185,36 @@ public class DevPluginService {
}
@MainThread
private void onHandshakeTimeout(JsonSocket socket) {
private void onHandshakeTimeout(JsonWebSocket socket) {
Log.i(LOG_TAG, "onHandshakeTimeout");
mConnectionState.onNext(new State(State.DISCONNECTED, new SocketTimeoutException("handshake timeout")));
socket.close();
}
@MainThread
private void onServerHello(JsonSocket jsonSocket, JsonObject message) {
private void onServerHello(JsonWebSocket jsonWebSocket, JsonObject message) {
Log.i(LOG_TAG, "onServerHello: " + message);
mSocket = jsonSocket;
mSocket = jsonWebSocket;
mConnectionState.onNext(new State(State.CONNECTED));
}
@WorkerThread
private static int write(JsonSocket socket, String type, JsonObject data) throws IOException {
@AnyThread
private static boolean write(JsonWebSocket socket, String type, JsonObject data) {
JsonObject json = new JsonObject();
json.addProperty("type", type);
json.add("data", data);
return socket.write(json);
}
@WorkerThread
private static int writePair(JsonSocket socket, String type, Pair<String, String> pair) throws IOException {
@AnyThread
private static boolean writePair(JsonWebSocket socket, String type, Pair<String, String> pair) {
JsonObject data = new JsonObject();
data.addProperty(pair.first, pair.second);
return write(socket, type, data);
}
@WorkerThread
private static int writeMap(JsonSocket socket, String type, Map<String, ?> map) throws IOException {
@AnyThread
private static boolean writeMap(JsonWebSocket socket, String type, Map<String, ?> map) {
JsonObject data = new JsonObject();
for (Map.Entry<String, ?> entry : map.entrySet()) {
Object value = entry.getValue();
@ -230,9 +241,6 @@ public class DevPluginService {
public void log(String log) {
if (!isConnected())
return;
Observable.fromCallable(() ->
writePair(mSocket, "log", new Pair<>("log", log)))
.subscribeOn(Schedulers.io())
.subscribe(EmptyObservers.consumer(), Throwable::printStackTrace);
writePair(mSocket, "log", new Pair<>("log", log));
}
}

View File

@ -1,239 +0,0 @@
package org.autojs.autojs.pluginclient;
import android.util.Log;
import com.google.gson.JsonElement;
import com.google.gson.JsonParseException;
import com.google.gson.JsonParser;
import com.google.gson.stream.JsonReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.StringReader;
import java.net.Socket;
import io.reactivex.Observable;
import io.reactivex.subjects.PublishSubject;
public class JsonSocket {
private static final char DELIMITER = '#';
private static final String DELIMITER_STRING = "#";
private static final String LOG_TAG = "JsonSocket";
private final Socket mSocket;
private final JsonParser mJsonParser = new JsonParser();
private OutputStream mOutputStream;
private final PublishSubject<JsonElement> mJsonElementPublishSubject = PublishSubject.create();
private volatile boolean mClosed = false;
public JsonSocket(Socket socket) throws IOException {
mSocket = socket;
mOutputStream = socket.getOutputStream();
new Thread(new SocketReader(socket)).start();
}
public Observable<JsonElement> data() {
return mJsonElementPublishSubject;
}
public int write(JsonElement element) throws IOException {
String json = element.toString();
byte[] bytes = json.getBytes();
String length = json.length() + DELIMITER_STRING;
mOutputStream.write(length.getBytes());
mOutputStream.write(bytes);
Log.d(LOG_TAG, "write: length = " + bytes.length + ", json = " + element);
return bytes.length;
}
public void close() {
mJsonElementPublishSubject.onComplete();
mClosed = true;
try {
mSocket.close();
} catch (IOException ignored) {
}
}
private void close(Exception e) {
if (mClosed) {
return;
}
mJsonElementPublishSubject.onError(e);
mClosed = true;
try {
mSocket.close();
} catch (IOException ignored) {
}
}
private void dispatchJson(String json) {
try {
JsonReader reader = new JsonReader(new StringReader(json));
reader.setLenient(true);
JsonElement element = mJsonParser.parse(reader);
mJsonElementPublishSubject.onNext(element);
} catch (JsonParseException e) {
e.printStackTrace();
}
}
public boolean isClosed() {
return mClosed;
}
private static class ByteQueue {
byte[] data;
int offset = 0;
int size = 0;
public ByteQueue(int initialCapacity) {
data = new byte[initialCapacity];
}
int read(InputStream stream) throws IOException {
if (size >= data.length) {
resize();
}
int end = offset + size;
int n;
if (end >= data.length) {
n = stream.read(data, 0, offset);
} else {
n = stream.read(data, end, data.length - end);
}
size += n;
return n;
}
void pop(int len) {
if (len > size) {
throw new IllegalArgumentException("pop " + len + " but current length is " + size);
}
offset += len;
if (offset >= data.length) {
offset -= data.length;
}
size -= len;
}
String popAsString(int len) {
if (len > size) {
throw new IllegalArgumentException("popAsString " + len + " but current length is " + size);
}
int end = offset + len;
String str;
if (end < data.length) {
str = new String(data, offset, len);
} else {
byte[] bytes = new byte[len];
int firstPartLength = data.length - offset;
int secondPartLength = len - firstPartLength;
System.arraycopy(data, offset, bytes, 0, firstPartLength);
System.arraycopy(data, 0, bytes, firstPartLength, secondPartLength);
str = new String(bytes);
}
pop(len);
return str;
}
private void resize() {
byte[] newData = new byte[data.length * 2];
int end = offset + size;
if (end < data.length) {
System.arraycopy(data, offset, newData, 0, size);
} else {
int firstPartLength = data.length - offset;
int secondPartLength = offset + size - data.length;
System.arraycopy(data, offset, newData, 0, firstPartLength);
System.arraycopy(data, 0, newData, firstPartLength, secondPartLength);
}
offset = 0;
data = newData;
}
}
private class SocketReader implements Runnable {
private final Socket mSocket;
private final InputStream mInputStream;
private int mJsonDataLength = -1;
private ByteQueue mByteQueue = new ByteQueue(4096);
private SocketReader(Socket socket) throws IOException {
mSocket = socket;
mInputStream = mSocket.getInputStream();
}
@Override
public void run() {
try {
readLoop();
close();
} catch (Exception e) {
e.printStackTrace();
close(e);
}
}
private void readLoop() throws Exception {
int n;
while ((n = mByteQueue.read(mInputStream)) > 0) {
onChunk(mByteQueue, n);
}
}
private void onChunk(ByteQueue byteQueue, int chunkSize) {
if (mJsonDataLength <= 0) {
tryReadingJsonDataLength(byteQueue, chunkSize);
}
if (mJsonDataLength <= 0) {
return;
}
if (byteQueue.size < mJsonDataLength) {
return;
}
String json = byteQueue.popAsString(mJsonDataLength);
Log.d(LOG_TAG, "json = " + json);
mJsonDataLength = -1;
dispatchJson(json);
}
private void tryReadingJsonDataLength(ByteQueue byteQueue, int chunkSize) {
int end = byteQueue.offset + byteQueue.size;
int start = end - chunkSize;
for (int i = start; i < end; i++) {
if (byteQueue.data[i] == DELIMITER) {
String jsonDataLength = new String(byteQueue.data, byteQueue.offset, i - byteQueue.offset);
Log.d(LOG_TAG, "json data length = " + jsonDataLength);
byteQueue.pop(i - byteQueue.offset + 1);
receiveJsonDataLength(jsonDataLength);
break;
}
}
}
private void receiveJsonDataLength(String jsonDataLength) {
try {
mJsonDataLength = Integer.parseInt(jsonDataLength);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}

View File

@ -0,0 +1,106 @@
package org.autojs.autojs.pluginclient;
import android.util.Log;
import com.google.gson.JsonElement;
import com.google.gson.JsonParseException;
import com.google.gson.JsonParser;
import com.google.gson.stream.JsonReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.StringReader;
import java.net.Socket;
import javax.annotation.Nullable;
import io.reactivex.Observable;
import io.reactivex.subjects.PublishSubject;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import okhttp3.WebSocket;
import okhttp3.WebSocketListener;
import okio.ByteString;
public class JsonWebSocket extends WebSocketListener {
private static final String LOG_TAG = "JsonWebSocket";
private final WebSocket mWebSocket;
private final JsonParser mJsonParser = new JsonParser();
private final PublishSubject<JsonElement> mJsonElementPublishSubject = PublishSubject.create();
private volatile boolean mClosed = false;
public JsonWebSocket(OkHttpClient client, Request request) {
mWebSocket = client.newWebSocket(request, this);
}
@Override
public void onMessage(WebSocket webSocket, String text) {
Log.d(LOG_TAG, "onMessage: " + text);
dispatchJson(text);
}
public Observable<JsonElement> data() {
return mJsonElementPublishSubject;
}
public boolean write(JsonElement element) {
String json = element.toString();
Log.d(LOG_TAG, "write: length = " + json.length() + ", json = " + element);
return mWebSocket.send(json);
}
public void close() {
mJsonElementPublishSubject.onComplete();
mClosed = true;
mWebSocket.close(1000, "close");
}
@Override
public void onClosed(WebSocket webSocket, int code, String reason) {
Log.d(LOG_TAG, "onFailure: code = " + code + ", reason = " + reason);
close();
}
@Override
public void onFailure(WebSocket webSocket, Throwable t, @Nullable Response response) {
Log.d(LOG_TAG, "onFailure: response = " + response, t);
close(t);
}
@Override
public void onOpen(WebSocket webSocket, Response response) {
Log.d(LOG_TAG, "onOpen: response = " + response);
}
private void close(Throwable e) {
if (mClosed) {
return;
}
mJsonElementPublishSubject.onError(e);
mClosed = true;
mWebSocket.close(1011, "remote exception: " + e.getMessage());
}
private void dispatchJson(String json) {
try {
JsonReader reader = new JsonReader(new StringReader(json));
reader.setLenient(true);
JsonElement element = mJsonParser.parse(reader);
mJsonElementPublishSubject.onNext(element);
} catch (JsonParseException e) {
e.printStackTrace();
}
}
public boolean isClosed() {
return mClosed;
}
}

View File

@ -1 +1 @@
[{"outputType":{"type":"APK"},"apkInfo":{"type":"MAIN","splits":[],"versionCode":412},"path":"commonRelease-4.0.2 Alpha7.apk","properties":{"packageId":"org.autojs.autojs","split":"","minSdkVersion":"17"}}]
[{"outputType":{"type":"APK"},"apkInfo":{"type":"MAIN","splits":[],"versionCode":413},"path":"commonRelease-4.0.2 Alpha8.apk","properties":{"packageId":"org.autojs.autojs","split":"","minSdkVersion":"17"}}]