Contents
- Sequence Diagram
- RequestQueue
- RequestQueue.add()
- CacheDispatcher 와 NetworkDispatcher
- CacheDispatcher
- NetworkDispatcher
- ResponseDelivery
- Reference
Sequence Diagram
diagram 은 detail 한 동작에 대한 내용은 없으니 자세한 사항은 code 를 확인하자.
RequestQueue
RequestQueue 를 new 하면
- ExecutorDelivery
를 만들고, start() 하게 되면,
- 1개의 cacheDispatcher + 여러개의 networkDispatcher
를 만든다.
// Activity.kt mQueue = MyVolleyRequestQueue.getInstance(this.getApplicationContext())!!.getRequestQueue() mButton!!.setOnClickListener(object : View.OnClickListener { public override fun onClick(v: View) { mQueue!!.add(jsonRequest) } }) // MyVolleyRequestQueue.kt public class MyVolleyRequestQueue private constructor(context: Context) { private var mRequestQueue: RequestQueue? = null private var mCtx: Context init { mCtx = context mRequestQueue = getRequestQueue() } public fun getRequestQueue(): RequestQueue? { if (mRequestQueue == null) { val cache = DiskBasedCache(mCtx.getCacheDir(), 10 * 1024 * 1024) val network = BasicNetwork(HurlStack()) mRequestQueue = RequestQueue(cache, network) // Don't forget to start the volley request queue mRequestQueue!!.start() } return mRequestQueue } ... }
// RequestQueue.java public RequestQueue(Cache cache, Network network, int threadPoolSize, ResponseDelivery delivery) { mCache = cache; mNetwork = network; mDispatchers = new NetworkDispatcher[threadPoolSize]; mDelivery = delivery; } public RequestQueue(Cache cache, Network network, int threadPoolSize) { this(cache, network, threadPoolSize, new ExecutorDelivery(new Handler(Looper.getMainLooper()))); } // ExecutorDelivery.java public class ExecutorDelivery implements ResponseDelivery { /** Used for posting responses, typically to the main thread. */ private final Executor mResponsePoster; /** * Creates a new response delivery interface. * @param handler {@link Handler} to post responses on */ public ExecutorDelivery(final Handler handler) { // Make an Executor that just wraps the handler. mResponsePoster = new Executor() { @Override public void execute(Runnable command) { handler.post(command); } }; } ... }
// RequestQueue.java /** * Starts the dispatchers in this queue. */ public void start() { stop(); // Make sure any currently running dispatchers are stopped. // Create the cache dispatcher and start it. mCacheDispatcher = new CacheDispatcher(mCacheQueue, mNetworkQueue, mCache, mDelivery); mCacheDispatcher.start(); // Create network dispatchers (and corresponding threads) up to the pool size. for (int i = 0; i < mDispatchers.length; i++) { NetworkDispatcher networkDispatcher = new NetworkDispatcher(mNetworkQueue, mNetwork, mCache, mDelivery); mDispatchers[i] = networkDispatcher; networkDispatcher.start(); } }
public class RequestQueue {
...
/** The cache triage queue. */
private final PriorityBlockingQueue<Request<?>> mCacheQueue =
new PriorityBlockingQueue<Request<?>>();
/** The queue of requests that are actually going out to the network. */
private final PriorityBlockingQueue<Request<?>> mNetworkQueue =
new PriorityBlockingQueue<Request<?>>();
...
}
ReqeustQueue.add()
request queue 는 내가 원하는 request 를 RequestQueue 에 add 하는 것으로 일이 처리되기 시작한다. 나름의 시작지점이라고 할 수 있다. 이 add() 의 동작은 아래와 같다.- 먼저 request 를 사용하기 전에 mCurrentRequests 라는 Set에 넣어놓고, 현재 처리하고 있는 녀석이라는 것을 알 수 있게 해놓는다.
- request 가 caching 을 하지 않아도 된다면, network dispatcher 에게 처리해 달라고 하기 위해 바로 network queue 로 보낸다.
- caching 이 필요한 경우라면 일단 이미 들어왔던 요청중에 cache dispatcher 나 network dispatcher 로 request 를 보낸 상황인지를 check 하기 위해 mWaitingRequests 라는 Map을 가지고 있다. 여기에 들어가 있으면 이미 들어왔던 요청이라는 것으로 생각한다.
- 그래서 처음에는 mWaitingRequests 에 없을 테니, 이 request를 cache dispatcher 에게 처리해 달라고 하기 위해 mCacheQueue 로 보낸다. 그리고 이 request 를 mWaitingRequests 에 넣어놓는다.
- request 가 이미 처리되고 있는 상황이라면, stagedRequests 로 처리하게 된다. 이부분은 아직 의미파악이 안돼서 일단 넘어가자.
// ReuqestQueue.java /** * Adds a Request to the dispatch queue. * @param request The request to service * @return The passed-in request */ public <T> Request<T> add(Request<T> request) { // Tag the request as belonging to this queue and add it to the set of current requests. request.setRequestQueue(this); synchronized (mCurrentRequests) { mCurrentRequests.add(request); } // Process requests in the order they are added. request.setSequence(getSequenceNumber()); request.addMarker("add-to-queue"); // If the request is uncacheable, skip the cache queue and go straight to the network. if (!request.shouldCache()) { mNetworkQueue.add(request); return request; } // Insert request into stage if there's already a request with the same cache key in flight. synchronized (mWaitingRequests) { String cacheKey = request.getCacheKey(); if (mWaitingRequests.containsKey(cacheKey)) { // There is already a request in flight. Queue up. Queue<Request<?>> stagedRequests = mWaitingRequests.get(cacheKey); if (stagedRequests == null) { stagedRequests = new LinkedList<Request<?>>(); } stagedRequests.add(request); mWaitingRequests.put(cacheKey, stagedRequests); if (VolleyLog.DEBUG) { VolleyLog.v("Request for cacheKey=%s is in flight, putting on hold.", cacheKey); } } else { // Insert 'null' queue for this cacheKey, indicating there is now a request in // flight. mWaitingRequests.put(cacheKey, null); mCacheQueue.add(request); } return request; } }
CacheDispatcher 와 NetworkDispatcher
위에서도 보듯이 RequestQueue.start() 를 통해 CacheDispatcher, NetworkDispatcher 를 생성하게 된다. 그리고 이 만들어진 dispatcher 들은 RequestQueue.add() 를 통해 보내진 request 를 처리하는데에 사용된다.
이 2개의 Dispatcher 의 동작을 대략적으로 정리하면 아래와 같다.
- CacheDispatcher : 캐쉬 queue에서 queue 에서 요청을 가져와서 처리한다. 이 요청에 대해 cache 에서 찾아보고 있으면 찾은 녀석을 mDelivery 를 통해 보내고, 없으면 network request 를 하도록 network queue 로 보낸다.
- NetworkDispatcher : 네트워크 queue 에서 요청을 가져와서 처리한다. 이 요청에 대해 network request 를 보내고 response 를 받는다. 이 response 를 cache 에 넣은 후에 mDelivery 를 통해 전단한다.
아래 코드에서 보듯이 Thread 로 되어 있으며, 이들은 각자 무슨 일을 하고 있다. 이제 무슨 일을 하는지 좀 더 자세히 알아보도록 하자.
// CacheDispatcher.java public class CacheDispatcher extends Thread { /** The queue of requests coming in for triage. */ private final BlockingQueue<Request<?>> mCacheQueue; /** The queue of requests going out to the network. */ private final BlockingQueue<Request<?>> mNetworkQueue; /** The cache to read from. */ private final Cache mCache; /** For posting responses. */ private final ResponseDelivery mDelivery; /** Used for telling us to die. */ private volatile boolean mQuit = false; /** * Creates a new cache triage dispatcher thread. You must call {@link #start()} * in order to begin processing. * * @param cacheQueue Queue of incoming requests for triage * @param networkQueue Queue to post requests that require network to * @param cache Cache interface to use for resolution * @param delivery Delivery interface to use for posting responses */ public CacheDispatcher( BlockingQueue<Request<?>> cacheQueue, BlockingQueue<Request<?>> networkQueue, Cache cache, ResponseDelivery delivery) { mCacheQueue = cacheQueue; mNetworkQueue = networkQueue; mCache = cache; mDelivery = delivery; } ... }
// NetworkDispatcher.java public class NetworkDispatcher extends Thread { /** The queue of requests to service. */ private final BlockingQueue<Request<?>> mQueue; /** The network interface for processing requests. */ private final Network mNetwork; /** The cache to write to. */ private final Cache mCache; /** For posting responses and errors. */ private final ResponseDelivery mDelivery; /** Used for telling us to die. */ private volatile boolean mQuit = false; /** * Creates a new network dispatcher thread. You must call {@link #start()} * in order to begin processing. * * @param queue Queue of incoming requests for triage * @param network Network interface to use for performing requests * @param cache Cache interface to use for writing responses to cache * @param delivery Delivery interface to use for posting responses */ public NetworkDispatcher(BlockingQueue<Request<?>> queue, Network network, Cache cache, ResponseDelivery delivery) { mQueue = queue; mNetwork = network; mCache = cache; mDelivery = delivery; } ... }
CacheDispatcher
CacheDispatcher 의 하는 일을 알아보기위해 run() 을 살펴보자. Dispatcher 는 InterruptedException 을 받기 전까지는 계속 살아있는 상태로 유지된다. 그래서 while(true) 로 무한 loop 을 돌고 있다.
이 loop 를 돌기 전에 mCache.initialize() 로 자신이 사용할 cache 를 초기화 시켜 놓는다.
그후에 아래와 같은 대략적인 작업들을 하게 된다.
- cache queue 에서 request 를 하나 가져온다.(mCacheQueue.take())
- 그 request 가 cancel 된 것인지 확인하고 cancel 됐으면, request.finish() 를 호출하고 1 번 작업을 다시 진행한다.
- cancel 이 되지 않은 request 에서 cache key 를 가져오고, 이 key 로 mCache 에서 해당하는 entry 가 있는지 찾아본다.
- key 에 해당하는 entry 가 없다면, 이 request 를 network queue 로 보낸다.
- key 에 해당하는 entry 가 있다면, 이 녀석이 expired 된 녀석인지를 확인하고, expired 됐으면 이 request 를 network queue 에 보낸다.
- 이제 mCache 에서 찾은 entry 가 사용할 수 있는 녀석이 라는 것을 확인했다.(cache-hit) 이제 이녀석을 사용하면 되는데, 이 entry 를 그냥사용하진 않고, 이 entry 를 이용해서 response 를 만들어 사용한다.
- 이렇게 response 를 만든 이후에 한가지 확인을 더하는데 그것이 softTTL 이라는 녀석이다. 이 softTTL 이 만료되면, 이 entry 를 사용하긴 하는데, 나중을 위해서 update 를 해놓는다.( entry.refreshNeeded() )[ref. 2]
개인적인 생각에 이것은 미리 가져와서 최대한 cache 에서 data 를 가져오기 위한 정책인 듯 하다. - 사용할 수 있는 entry 는 mDelivery.postResponse(request, response); 를 이용해서 delivery 한다.
public class CacheDispatcher extends Thread { ... @Override public void run() { if (DEBUG) VolleyLog.v("start new dispatcher"); Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND); // Make a blocking call to initialize the cache. mCache.initialize(); while (true) { try { // Get a request from the cache triage queue, blocking until // at least one is available. final Request<?> request = mCacheQueue.take(); request.addMarker("cache-queue-take"); // If the request has been canceled, don't bother dispatching it. if (request.isCanceled()) { request.finish("cache-discard-canceled"); continue; } // Attempt to retrieve this item from cache. Cache.Entry entry = mCache.get(request.getCacheKey()); if (entry == null) { request.addMarker("cache-miss"); // Cache miss; send off to the network dispatcher. mNetworkQueue.put(request); continue; } // If it is completely expired, just send it to the network. if (entry.isExpired()) { request.addMarker("cache-hit-expired"); request.setCacheEntry(entry); mNetworkQueue.put(request); continue; } // We have a cache hit; parse its data for delivery back to the request. request.addMarker("cache-hit"); Response<?> response = request.parseNetworkResponse( new NetworkResponse(entry.data, entry.responseHeaders)); request.addMarker("cache-hit-parsed"); if (!entry.refreshNeeded()) { // Completely unexpired cache hit. Just deliver the response. mDelivery.postResponse(request, response); } else { // Soft-expired cache hit. We can deliver the cached response, // but we need to also send the request to the network for // refreshing. request.addMarker("cache-hit-refresh-needed"); request.setCacheEntry(entry); // Mark the response as intermediate. response.intermediate = true; // Post the intermediate response back to the user and have // the delivery then forward the request along to the network. mDelivery.postResponse(request, response, new Runnable() { @Override public void run() { try { mNetworkQueue.put(request); } catch (InterruptedException e) { // Not much we can do about this. } } }); } } catch (InterruptedException e) { // We may have been interrupted because it was time to quit. if (mQuit) { return; } continue; } } } }
NetworkDispatcher
이녀석도 계속 loop 을 돌면서 일을 처리한다. 그런데 InterruptException 에 의한 quit 은 queue 에서 request 를 받아올 때에만 정상적으로 종료되고, 나머지 경우에는 다른 error 와 같이 처리되고, mDelivery 를 통해서 전달된다.
그 후 작업은 아래와 같다.
- request 가 cancel 됐는지를 살펴본다.
- request 가 cancel 됐으면 request.finish() 를 실행한다.
- mNetwork.performRequest(request) 를 통해 network 통신을 해서 response 를 가져온다.
- network 로 request 를 보내고 돌아온 응답이 304(not modified) 인 경우이고, response 가 delivered 됐다면, 굳이 현재 response 를 건내줄 필요가 없다.(이 녀석은 soft ttl 과 같이 보면 될 듯 하다.) 그래서 response 를 끝낸다.(response.finish())
- request 가 cache 되어야 하는 녀석으로 되어 있고, response 의 cacheEntry 가 null 이 아닌 경우에 mCache 에 response.cacheEntry 를 넣어놓는다.
- request 가 delivered 됐다고 표시하고(request.markDelivered())
- response 를 보낸다. mDelivery.postResponse(request, response)
public class NetworkDispatcher extends Thread { ... @Override public void run() { Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND); Request<?> request; while (true) { long startTimeMs = SystemClock.elapsedRealtime(); // release previous request object to avoid leaking request object when mQueue is drained. request = null; try { // Take a request from the queue. request = mQueue.take(); } catch (InterruptedException e) { // We may have been interrupted because it was time to quit. if (mQuit) { return; } continue; } try { request.addMarker("network-queue-take"); // If the request was cancelled already, do not perform the // network request. if (request.isCanceled()) { request.finish("network-discard-cancelled"); continue; } addTrafficStatsTag(request); // Perform the network request. NetworkResponse networkResponse = mNetwork.performRequest(request); request.addMarker("network-http-complete"); // If the server returned 304 AND we delivered a response already, // we're done -- don't deliver a second identical response. if (networkResponse.notModified && request.hasHadResponseDelivered()) { request.finish("not-modified"); continue; } // Parse the response here on the worker thread. Response<?> response = request.parseNetworkResponse(networkResponse); request.addMarker("network-parse-complete"); // Write to cache if applicable. // TODO: Only update cache metadata instead of entire record for 304s. if (request.shouldCache() && response.cacheEntry != null) { mCache.put(request.getCacheKey(), response.cacheEntry); request.addMarker("network-cache-written"); } // Post the response back. request.markDelivered(); mDelivery.postResponse(request, response); } catch (VolleyError volleyError) { volleyError.setNetworkTimeMs(SystemClock.elapsedRealtime() - startTimeMs); parseAndDeliverNetworkError(request, volleyError); } catch (Exception e) { VolleyLog.e(e, "Unhandled exception %s", e.toString()); VolleyError volleyError = new VolleyError(e); volleyError.setNetworkTimeMs(SystemClock.elapsedRealtime() - startTimeMs); mDelivery.postError(request, volleyError); } } } private void parseAndDeliverNetworkError(Request<?> request, VolleyError error) { error = request.parseNetworkError(error); mDelivery.postError(request, error); } }
ResponseDelivery
ResponseDelivery#postResponse 에서 하는 일은 많지 않다.일단 request 가 deliver 됐다고 표시 해 준다.(request.markDelivered())
그리고 UI thread 에서 할 일이 몇 개 있는 데 그 일들을 UI thread 로 전달 해 준다.
UI thread 로 넘어와서 하는 일은 아래와 같다.
- request 가 cancel 됐는지를 살피고 cancel 됐으면 finish 해준다.
- response 가 success 라면 response의 deliverResponse 그렇지 않다면 Request.deliverError
- 를 호출한다. 이 deliverResponse / deliveryError 부분은 이 volley library 를 사용할 때 user 가 직접 작성하는 부분이 된다.
- mResponse.intermediate 은 CacheDispatcher 부분에서 확인할 수 있는데, 별 뜻은 없고, Runnable 이 있으니, 그녀석을 처리하고 나서 request 를 finish 해야 한다는 뜻으로 지정해 놓은 녀석이다.
public class ExecutorDelivery implements ResponseDelivery { /** Used for posting responses, typically to the main thread. */ private final Executor mResponsePoster; /** * Creates a new response delivery interface. * @param handler {@link Handler} to post responses on */ public ExecutorDelivery(final Handler handler) { // Make an Executor that just wraps the handler. mResponsePoster = new Executor() { @Override public void execute(Runnable command) { handler.post(command); } }; } ... @Override public void postResponse(Request<?> request, Response<?> response, Runnable runnable) { request.markDelivered(); request.addMarker("post-response"); mResponsePoster.execute(new ResponseDeliveryRunnable(request, response, runnable)); } ... /** * A Runnable used for delivering network responses to a listener on the * main thread. */ @SuppressWarnings("rawtypes") private class ResponseDeliveryRunnable implements Runnable { private final Request mRequest; private final Response mResponse; private final Runnable mRunnable; public ResponseDeliveryRunnable(Request request, Response response, Runnable runnable) { mRequest = request; mResponse = response; mRunnable = runnable; } @SuppressWarnings("unchecked") @Override public void run() { // If this request has canceled, finish it and don't deliver. if (mRequest.isCanceled()) { mRequest.finish("canceled-at-delivery"); return; } // Deliver a normal response or error, depending. if (mResponse.isSuccess()) { mRequest.deliverResponse(mResponse.result); } else { mRequest.deliverError(mResponse.error); } // If this is an intermediate response, add a marker, otherwise we're done // and the request can be finished. if (mResponse.intermediate) { mRequest.addMarker("intermediate-response"); } else { mRequest.finish("done"); } // If we have been provided a post-delivery runnable, run it. if (mRunnable != null) { mRunnable.run(); } } } }
댓글 없음:
댓글 쓰기