整个 Sample 提供了以下功能:
基于TCP长连接的聊天室体验。
数据通信成功率、耗时、流量的展示。
网络状况检测结果展示。
一、本地运行 Server 端 具体如何运行 Server 端,参照官方wiki:Mars Sample 使用说明
二、修改 Android Sample 下面说下Android 端该如何修改源码连接到本地服务器
1.全局搜索marsopen.cn
,修改替换为localhost
2.在保证 app/build.gradle
下useLocalMarsWrapper = true
的情况下,在 wrapper module 下修改 com.tencent.mars.sample.wrapper.service. MarsServiceStub.java
的 dns 解析的地址为本地主机的 IP 地址 (wiki 上没写清楚)1 2 3 4 5 @Override public String[] onNewDns(String host) { return new String []{"192.168.24.193" }; }
三、运行后聊天效果
四、整体概述
由项目结构可知 sample 分为两部分,app 和 wrapper(当然也可以用依赖的方式来使用),数据格式使用的是google开源的 protobuf ,它具有高效、数据量小的特性(目前版本为 proto3,Android 开发推荐使用 protobuf-lite 版本,使用方法 )
Mars Android 接入指南 · Tencent/mars Wiki · GitHub 有这样一条介绍:
目前 gradle 接入支持两种方式:mars-core 和 mars-wrapper。只是做个 sample 的话建议可以使用 mars-wrapper, 但是如果在实际 App 中使用 mars,建议使用 mars-core 或本地编译。 怎么理解这句话?两种接入有什么不同? 从 sample 中 mars-wrapper(下面简称为 wrapper) 的源码可以看出 wrapper 只是对 mars-core 进行了再次封装,wrapper 库本质还是依赖 mars-core 库的 ,所以微信团队不建议实际App 中接入 wrapper ( 它只是微信官方提供对 mars-core 使用的一种方式,不过具有很好的参考价值)
另外,有 wrapper 的 manifest 可知,IM 服务模块是运行在独立的进程的 ,所以同时会涉及到 AIDL 多进程通信方面技术。
1 2 3 4 5 6 7 8 9 10 <application> <service android:name=".service.MarsServiceNative" android:process=":marsservice" /> <!--注册一个网络切换的广播接收器(源码上实现的)--> <receiver android:name="com.tencent.mars.BaseEvent$ConnectionReceiver" android:process=":marsservice" /> </application>
五、详细分析 1. 先从 ConversationActivity 获取初始的会话列表的请求开始 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 private NanoMarsTaskWrapper<Main.ConversationListRequest, Main.ConversationListResponse> taskGetConvList = null ; private void updateConversationTopics () { if (taskGetConvList != null ) { MarsServiceProxy.cancel(taskGetConvList); } textView.setVisibility(View.INVISIBLE); progressBar.setVisibility(View.VISIBLE); swipeRefreshLayout.setRefreshing(true ); taskGetConvList = new NanoMarsTaskWrapper <Main.ConversationListRequest, Main.ConversationListResponse>( new Main .ConversationListRequest(), new Main .ConversationListResponse() ) { private List<Conversation> dataList = new LinkedList <>(); @Override public void onPreEncode (Main.ConversationListRequest req) { req.type = conversationFilterType; req.accessToken = "" ; } @Override public void onPostDecode (Main.ConversationListResponse response) { } @Override public void onTaskEnd (int errType, int errCode) { runOnUiThread(new Runnable () { @Override public void run () { if (response != null ) { for (Main.Conversation conv : response.list) { dataList.add(new Conversation (conv.name, conv.topic, conv.notice)); } } if (!dataList.isEmpty()) { progressBar.setVisibility(View.INVISIBLE); conversationListAdapter.list.clear(); conversationListAdapter.list.addAll(dataList); conversationListAdapter.notifyDataSetChanged(); swipeRefreshLayout.setRefreshing(false ); } else { Log.i(TAG, "getconvlist: empty response list" ); progressBar.setVisibility(View.INVISIBLE); textView.setVisibility(View.VISIBLE); } } }); } }; MarsServiceProxy.send(taskGetConvList.setHttpRequest(CONVERSATION_HOST, "/mars/getconvlist" )); }
执行步骤:
创建一个 NanoMarsTaskWrapper 对象,里面主要包含 onPreEncode,onPostDecode 和onTaskEnd 等方法,分别是编码传输前,接收数据解码后和任务结束后的回调;
设置 NanoMarsTaskWrapper 的 http 地址
通过 MarsServiceProxy.send 方法执行发送请求;
初步了解执行步骤后,再详细了解 MarServiceProxy 和 NanoMarTaskWrapper 的实现,它们为什么会有这样的功能。
2. NanoMarTaskWrapper 顾名思义,NanoMarTaskWrapper 是一个任务的包装器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 public abstract class NanoMarsTaskWrapper <T extends MessageNano , R extends MessageNano > extends AbstractTaskWrapper { private static final String TAG = "Mars.Sample.NanoMarsTaskWrapper" ; protected T request; protected R response; public NanoMarsTaskWrapper (T req, R resp) { super (); this .request = req; this .response = resp; } @Override public byte [] req2buf() { try { onPreEncode(request); final byte [] flatArray = new byte [request.getSerializedSize()]; final CodedOutputByteBufferNano output = CodedOutputByteBufferNano.newInstance(flatArray); request.writeTo(output); Log.d(TAG, "encoded request to buffer, [%s]" , MemoryDump.dumpHex(flatArray)); return flatArray; } catch (Exception e) { e.printStackTrace(); } return new byte [0 ]; } @Override public int buf2resp (byte [] buf) { try { Log.d(TAG, "decode response buffer, [%s]" , MemoryDump.dumpHex(buf)); response = MessageNano.mergeFrom(response, buf); onPostDecode(response); return StnLogic.RESP_FAIL_HANDLE_NORMAL; } catch (Exception e) { Log.e(TAG, "%s" , e); } return StnLogic.RESP_FAIL_HANDLE_TASK_END; } public abstract void onPreEncode (T request) ; public abstract void onPostDecode (R response) ; }
1)继承自AbstractTaskWrapper
2)创建时会同时创建继承自 MessageNano(protobuf 的消息数据类) 的 request 和 response 数据模型;
3)处理编解码的两个方法,req2buf 和 buf2resp,也就是字节流数组和 protobuf 对象的转换,涉及到 MessageNano.writeTo
和MessageNano.mergeFrom
的使用,这方面的具体实现不需要我们去了解,Google 的 protobuf 已经帮我们生成代码完成好了
再看看 AbstractTaskWrapper 是怎样实现的
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 public abstract class AbstractTaskWrapper extends MarsTaskWrapper .Stub { private Bundle properties = new Bundle (); public AbstractTaskWrapper () { final TaskProperty taskProperty = this .getClass().getAnnotation(TaskProperty.class); if (taskProperty != null ) { setHttpRequest(taskProperty.host(), taskProperty.path()); setShortChannelSupport(taskProperty.shortChannelSupport()); setLongChannelSupport(taskProperty.longChannelSupport()); setCmdID(taskProperty.cmdID()); } } @Override public Bundle getProperties () { return properties; } @Override public abstract void onTaskEnd (int errType, int errCode) ; public AbstractTaskWrapper setHttpRequest (String host, String path) { properties.putString(MarsTaskProperty.OPTIONS_HOST, ("" .equals(host) ? null : host)); properties.putString(MarsTaskProperty.OPTIONS_CGI_PATH, path); return this ; } public AbstractTaskWrapper setShortChannelSupport (boolean support) { properties.putBoolean(MarsTaskProperty.OPTIONS_CHANNEL_SHORT_SUPPORT, support); return this ; } public AbstractTaskWrapper setLongChannelSupport (boolean support) { properties.putBoolean(MarsTaskProperty.OPTIONS_CHANNEL_LONG_SUPPORT, support); return this ; } public AbstractTaskWrapper setCmdID (int cmdID) { properties.putInt(MarsTaskProperty.OPTIONS_CMD_ID, cmdID); return this ; } @Override public String toString () { return "AbsMarsTask: " + BundleFormat.toString(properties); } }
抽象的 AbstractTaskWrapper 继承自 MarTaskWrapper.Stub (MarsTaskWrapper.aidl 生成的代码),它主要通过注解类 TaskProperty 设置了任务的配置信息 properties(如主机名、url路径、是否支持长短链接和指令 id),
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 @Retention(RetentionPolicy.RUNTIME) @Target(ElementType.TYPE) @Documented @Inherited public @interface TaskProperty { String host () default "" ; String path () ; boolean shortChannelSupport () default true ; boolean longChannelSupport () default false ; int cmdID () default -1 ; }
再回到刚开始的发送消息的 MarsServiceProxy 类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 public class MarsServiceProxy implements ServiceConnection { private MarsService service = null ; public static MarsServiceProxy inst; private LinkedBlockingQueue<MarsTaskWrapper> queue = new LinkedBlockingQueue <>(); private MarsServiceProxy () { worker = new Worker (); worker.start(); } public static void init (Context context, Looper looper, String packageName) { if (inst != null ) { return ; } gContext = context.getApplicationContext(); gPackageName = (packageName == null ? context.getPackageName() : packageName); gClassName = SERVICE_DEFAULT_CLASSNAME; inst = new MarsServiceProxy (); } @Override public void onServiceConnected (ComponentName componentName, IBinder binder) { Log.d(TAG, "remote mars service connected" ); try { service = MarsService.Stub.asInterface(binder); service.registerPushMessageFilter(filter); service.setAccountInfo(accountInfo.uin, accountInfo.userName); } catch (Exception e) { service = null ; } } private static class Worker extends Thread { @Override public void run () { while (true ) { inst.continueProcessTaskWrappers(); try { Thread.sleep(50 ); } catch (InterruptedException e) { } } } } }
MarsServiceProxy 只是一个单例的 ServiceConnection,不过它应该具有代理某种 Service的功能(ServiceConnection会和 Service绑定),由定义的变量和onServiceConnected中的调用可知,它关联了 MarsService,但是 MarsService 只是一个 AIDL ,不是真正的服务,这个稍后再说 MarsServiceProxy 是在SampleApplication 的 onCreate调用的时候初始化的
1 2 3 MarsServiceProxy.init(this , getMainLooper(), null );
当调用send 方法发送任务时,
1 2 3 public static void send (MarsTaskWrapper marsTaskWrapper) { inst.queue.offer(marsTaskWrapper); }
queue 是一个 LinkedBlockingQueue线程安全的队列,缓存了所有的MarsTaskWrapper 任务。 那将任务放进队列后,什么时候执行呢?由上面代码可以看到 MarsServiceProxy 创建时会启动一个 Worker 线程,线程会每隔50ms 执行调用inst.continueProcessTaskWrappers();
方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 private void continueProcessTaskWrappers () { try { if (service == null ) { Log.d(TAG, "try to bind remote mars service, packageName: %s, className: %s" , gPackageName, gClassName); Intent i = new Intent ().setClassName(gPackageName, gClassName); gContext.startService(i); if (!gContext.bindService(i, inst, Service.BIND_AUTO_CREATE)) { Log.e(TAG, "remote mars service bind failed" ); } return ; } MarsTaskWrapper taskWrapper = queue.take(); if (taskWrapper == null ) { return ; } try { Log.d(TAG, "sending task = %s" , taskWrapper); final String cgiPath = taskWrapper.getProperties().getString(MarsTaskProperty.OPTIONS_CGI_PATH); final Integer globalCmdID = GLOBAL_CMD_ID_MAP.get(cgiPath); if (globalCmdID != null ) { taskWrapper.getProperties().putInt(MarsTaskProperty.OPTIONS_CMD_ID, globalCmdID); Log.i(TAG, "overwrite cmdID with global cmdID Map: %s -> %d" , cgiPath, globalCmdID); } final int taskID = service.send(taskWrapper, taskWrapper.getProperties()); taskWrapper.getProperties().putInt(MarsTaskProperty.OPTIONS_TASK_ID, taskID); } catch (Exception e) { e.printStackTrace(); } } catch (Exception e) { } }
这个方法里面会从任务队列取出一个任务,然后最终会通过 MarsService.send 方法发送消息任务,并保存任务 id
另外这个方法开始还有启动服务的判断,由gClassName = SERVICE_DEFAULT_CLASSNAME = "com.tencent.mars.sample.wrapper.service.MarsServiceNative"
知道,启动的服务是MarsServiceNative,那 MarsServiceNative 是怎样和 MarsServiceProxy 关联起来的呢?
再看 MarsServiceProxy 的 onServiceConnected 方法,MarsService 的初始化是通过MarsService.Stub.asInterface(binder)
关联了 MarsServiceProxy 的 IBinder
1 2 3 4 5 6 7 8 9 10 11 12 13 @Override public void onServiceConnected (ComponentName componentName, IBinder binder) { Log.d(TAG, "remote mars service connected" ); try { service = MarsService.Stub.asInterface(binder); service.registerPushMessageFilter(filter); service.setAccountInfo(accountInfo.uin, accountInfo.userName); } catch (Exception e) { service = null ; } }
那哪个类实现了 MarsService 的方法呢?查看 MarsServiceNative 源码就能明白了,它同时实现 MarsService,间接和 MarsServiceProxy 形成强关联
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 public class MarsServiceNative extends Service implements MarsService { private static final String TAG = "Mars.Sample.MarsServiceNative" ; private MarsServiceStub stub; private static MarsServiceProfileFactory gFactory = new MarsServiceProfileFactory () { @Override public MarsServiceProfile createMarsServiceProfile () { return new DebugMarsServiceProfile (); } }; @Override public IBinder asBinder () { return stub; } @Override public void onCreate () { super .onCreate(); final MarsServiceProfile profile = gFactory.createMarsServiceProfile(); stub = new MarsServiceStub (this , profile); AppLogic.setCallBack(stub); StnLogic.setCallBack(stub); SdtLogic.setCallBack(stub); Mars.init(getApplicationContext(), new Handler (Looper.getMainLooper())); StnLogic.setLonglinkSvrAddr(profile.longLinkHost(), profile.longLinkPorts()); StnLogic.setShortlinkSvrAddr(profile.shortLinkPort()); StnLogic.setClientVersion(profile.productID()); Mars.onCreate(true ); StnLogic.makesureLongLinkConnected(); Log.d(TAG, "mars service native created" ); } @Override public IBinder onBind (Intent intent) { return stub; } }
MarsServiceNative 是使用 mars-core 的关键类,同时它也是一个 Service 类,运行在独立进程中,主要负责了以下功能:
创建配置信息类 MarsServiceProfile,并在 StnLogic 设置相关信息;
实例化一个 MarsServiceStub;
设置了 AppLogic, StnLogic, SdtLogic 的回调;
初始化 Mars;
确定 StnLogic 的长连接StnLogic.makesureLongLinkConnected();
既然 MarsServiceNative 设置了 AppLogic, StnLogic, SdtLogic 的回调,那再看看 MarsServiceStub 是如何实现它们接口的
先看发送消息的 send 方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 public class MarsServiceStub extends MarsService .Stub implements StnLogic .ICallBack, SdtLogic.ICallBack, AppLogic.ICallBack { private static final String TAG = "Mars.Sample.MarsServiceStub" ; private final MarsServiceProfile profile; private AppLogic.AccountInfo accountInfo = new AppLogic .AccountInfo(); private ConcurrentLinkedQueue<MarsPushMessageFilter> filters = new ConcurrentLinkedQueue <>(); private int clientVersion = 200 ; public MarsServiceStub (Context context, MarsServiceProfile profile) { this .context = context; this .profile = profile; } private static final int FIXED_HEADER_SKIP = 4 + 2 + 2 + 4 + 4 ; private static Map<Integer, MarsTaskWrapper> TASK_ID_TO_WRAPPER = new ConcurrentHashMap <>(); @Override public int send (final MarsTaskWrapper taskWrapper, Bundle taskProperties) throws RemoteException { final StnLogic.Task _task = new StnLogic .Task(StnLogic.Task.EShort, 0 , "" , null ); final String host = taskProperties.getString(MarsTaskProperty.OPTIONS_HOST); final String cgiPath = taskProperties.getString(MarsTaskProperty.OPTIONS_CGI_PATH); _task.shortLinkHostList = new ArrayList <>(); _task.shortLinkHostList.add(host); _task.cgi = cgiPath; final boolean shortSupport = taskProperties.getBoolean(MarsTaskProperty.OPTIONS_CHANNEL_SHORT_SUPPORT, true ); final boolean longSupport = taskProperties.getBoolean(MarsTaskProperty.OPTIONS_CHANNEL_LONG_SUPPORT, false ); if (shortSupport && longSupport) { _task.channelSelect = StnLogic.Task.EBoth; } else if (shortSupport) { _task.channelSelect = StnLogic.Task.EShort; } else if (longSupport) { _task.channelSelect = StnLogic.Task.ELong; } else { Log.e(TAG, "invalid channel strategy" ); throw new RemoteException ("Invalid Channel Strategy" ); } int cmdID = taskProperties.getInt(MarsTaskProperty.OPTIONS_CMD_ID, -1 ); if (cmdID != -1 ) { _task.cmdID = cmdID; } TASK_ID_TO_WRAPPER.put(_task.taskID, taskWrapper); Log.i(TAG, "now start task with id %d" , _task.taskID); StnLogic.startTask(_task); if (StnLogic.hasTask(_task.taskID)) { Log.i(TAG, "stn task started with id %d" , _task.taskID); } else { Log.e(TAG, "stn task start failed with id %d" , _task.taskID); } return _task.taskID; } }
1)创建一个 StnLogic.Task,并通过 Bundle 传过来的数据(主机名、路径、长短连接和 cmdId),设置 task;
2)保存 taskID 和 MarsTaskWrapper 的映射关系;
3)调用 StnLogic.startTask(_task) 启动任务执行,最后返回 taskID; 具体的逻辑实现还是在 mars-core 和底层的 C++ 源码中,这个以后研究到底层源码再说。 在 mars-core 大概看下 StnLogic.ICallBack 接口有哪些方法:1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 public interface ICallBack { boolean makesureAuthed () ; String[] onNewDns(final String host); void onPush (final int cmdid, final byte [] data) ; boolean req2Buf (final int taskID, Object userContext, ByteArrayOutputStream reqBuffer, int [] errCode, int channelSelect) ; int buf2Resp (final int taskID, Object userContext, final byte [] respBuffer, int [] errCode, int channelSelect) ; int onTaskEnd (final int taskID, Object userContext, final int errType, final int errCode) ; void trafficData (final int send, final int recv) ; void reportConnectInfo (int status, int longlinkstatus) ; int getLongLinkIdentifyCheckBuffer (ByteArrayOutputStream identifyReqBuf, ByteArrayOutputStream hashCodeBuffer, int [] reqRespCmdID) ; boolean onLongLinkIdentifyResp (final byte [] buffer, final byte [] hashCodeBuffer) ; void requestDoSync () ; String[] requestNetCheckShortLinkHosts(); boolean isLogoned () ; void reportTaskProfile (String taskString) ; }
总结一下: 1.NanoMarsTaskWrapper:涉及编码前、解码后和任务结束后的回调,还有字节流数组和 protobuf 对象的转换等; 2.MarsServiceProxy:是一个 ServiceConnection,涉及消息的发送和取消等,作为一个 API 的功能,本质是 MarsServiceNative 代理服务类; 3.MarsServiceNative 是 wrapper 的核心类,里面涉及 Mars 的初始化,设置了 AppLogic, StnLogic 和 SdtLogic 的回调等; 4.而 MarsServiceStub 实现了AppLogic, StnLogic 和 SdtLogic 的回调接口,维护了和 mars-core 的调用等;
其余分析,可以到我 fork 的 mars 的地址GitHub - navyifanr/mars: (添加注释分析) 下载。
参考资料:GitHub - Tencent/mars: Mars is a cross-platform network component developed by WeChat. 微信开源mars源码分析1—上层samples分析 - ameise_w - SegmentFault