[go: nahoru, domu]

Add a GraphProcessor to CameraPipe

The GraphProcessor is responsible for keeping track of state across
1 or more Camera2 CameraCaptureSessions and for dispatching updates to
the camera on background thread(s). All of the interactions with the
camera go through a RequestProcessor interface that wrap a Camera2
CaptureSession.

Test: Existing unit tests continue to pass.
Test: Added coroutine-based tests for the GraphProcessor
Test: Added Fake objects to test interactions

Change-Id: I4e0765de97b0e854b8304c1869c1c7ef12a94edd
diff --git a/camera/camera-camera2-pipe/src/main/java/androidx/camera/camera2/pipe/CameraGraph.kt b/camera/camera-camera2-pipe/src/main/java/androidx/camera/camera2/pipe/CameraGraph.kt
index 56860a9..91dc85f 100644
--- a/camera/camera-camera2-pipe/src/main/java/androidx/camera/camera2/pipe/CameraGraph.kt
+++ b/camera/camera-camera2-pipe/src/main/java/androidx/camera/camera2/pipe/CameraGraph.kt
@@ -16,13 +16,27 @@
 
 package androidx.camera.camera2.pipe
 
+import android.view.Surface
 import java.io.Closeable
 
 /**
  * A CameraGraph represents the combined configuration and state of a camera.
  */
-interface CameraGraph {
+interface CameraGraph : Closeable {
+
+    /**
+     * This will cause the CameraGraph to start opening the camera and configuring the Camera2
+     * CaptureSession. While the CameraGraph is started it will attempt to keep the camera alive,
+     * active, and in a configured running state.
+     */
     fun start()
+
+    /**
+     * This will cause the CameraGraph to stop executing requests and close the current Camera2
+     * CaptureSession (if one is active). The current repeating request is preserved, and any
+     * call to submit a request to a session will be enqueued. To prevent requests from being
+     * enqueued, close the CameraGraph.
+     */
     fun stop()
 
     /**
@@ -36,11 +50,33 @@
      */
     fun acquireSessionOrNull(): Session?
 
+    /**
+     * This configures the camera graph to use a specific Surface for the given stream.
+     *
+     * Changing a surface may cause the camera to stall and/or reconfigure.
+     */
+    fun setSurface(stream: StreamId, surface: Surface?)
+
+    /**
+     * This defines the configuration, flags, and pre-defined behavior of a CameraGraph instance.
+     */
     data class Config(
         val camera: CameraId,
         val streams: List<StreamConfig>,
-        val defaultTemplate: Int,
-        val metadataTransform: MetadataTransform = MetadataTransform()
+        val defaultTemplate: RequestTemplate,
+        val listeners: List<Request.Listener> = listOf(),
+        val metadataTransform: MetadataTransform = MetadataTransform(),
+        val flags: Flags = Flags()
+    )
+
+    /**
+     * Flags define boolean values that are used to adjust the behavior and interactions with
+     * camera2. These flags should default to the ideal behavior and should be overridden on
+     * specific devices to be faster or to work around bad behavior.
+     */
+    data class Flags(
+        val configureBlankSessionOnStop: Boolean = false,
+        val abortCapturesOnStop: Boolean = false
     )
 
     /**
@@ -52,5 +88,11 @@
         fun submit(request: Request)
         fun submit(requests: List<Request>)
         fun setRepeating(request: Request)
+
+        /**
+         * Abort in-flight requests. This will abort *all* requests in the current
+         * CameraCaptureSession as well as any requests that are currently enqueued.
+         */
+        fun abort()
     }
 }
\ No newline at end of file
diff --git a/camera/camera-camera2-pipe/src/main/java/androidx/camera/camera2/pipe/impl/CameraGraphComponent.kt b/camera/camera-camera2-pipe/src/main/java/androidx/camera/camera2/pipe/impl/CameraGraphComponent.kt
index 034ce16..6612e5e 100644
--- a/camera/camera-camera2-pipe/src/main/java/androidx/camera/camera2/pipe/impl/CameraGraphComponent.kt
+++ b/camera/camera-camera2-pipe/src/main/java/androidx/camera/camera2/pipe/impl/CameraGraphComponent.kt
@@ -16,16 +16,27 @@
 
 package androidx.camera.camera2.pipe.impl
 
+import android.os.Process
 import androidx.camera.camera2.pipe.CameraGraph
+import androidx.camera.camera2.pipe.Request
 import dagger.Binds
-import dagger.Subcomponent
 import dagger.Module
 import dagger.Provides
+import dagger.Subcomponent
+import kotlinx.coroutines.CoroutineDispatcher
+import kotlinx.coroutines.CoroutineName
+import kotlinx.coroutines.CoroutineScope
+import kotlinx.coroutines.asCoroutineDispatcher
+import java.util.concurrent.Executors
+import javax.inject.Qualifier
 import javax.inject.Scope
 
 @Scope
 annotation class CameraGraphScope
 
+@Qualifier
+annotation class ForCameraGraph
+
 @CameraGraphScope
 @Subcomponent(modules = [CameraGraphModule::class])
 interface CameraGraphComponent {
@@ -38,7 +49,11 @@
     }
 }
 
-@Module(includes = [CameraGraphBindings::class])
+@Module(
+    includes = [
+        CameraGraphBindings::class,
+        CameraGraphProviders::class]
+)
 class CameraGraphModule(private val config: CameraGraph.Config) {
     @Provides
     fun provideCameraGraphConfig(): CameraGraph.Config = config
@@ -48,4 +63,52 @@
 abstract class CameraGraphBindings {
     @Binds
     abstract fun bindCameraGraph(cameraGraph: CameraGraphImpl): CameraGraph
+
+    @Binds
+    abstract fun bindGraphProcessor(graphProcessor: GraphProcessorImpl): GraphProcessor
+}
+
+@Module
+object CameraGraphProviders {
+    @CameraGraphScope
+    @Provides
+    @ForCameraGraph
+    fun provideCameraGraphCoroutineScope(
+        @ForCameraGraph dispatcher: CoroutineDispatcher
+    ): CoroutineScope {
+        return CoroutineScope(dispatcher.plus(CoroutineName("CXCP-Graph")))
+    }
+
+    @CameraGraphScope
+    @Provides
+    @ForCameraGraph
+    fun provideCameraGraphCoroutineDispatcher(): CoroutineDispatcher {
+        // TODO: Figure out how to make sure the dispatcher gets shut down.
+        return Executors.newFixedThreadPool(1) {
+            object : Thread(it) {
+                init {
+                    name = "CXCP-Graph"
+                }
+
+                override fun run() {
+                    Process.setThreadPriority(
+                        Process.THREAD_PRIORITY_DISPLAY + Process.THREAD_PRIORITY_LESS_FAVORABLE
+                    )
+                    super.run()
+                }
+            }
+        }.asCoroutineDispatcher()
+    }
+
+    @CameraGraphScope
+    @Provides
+    @ForCameraGraph
+    fun provideRequestListeners(
+        graphConfig: CameraGraph.Config
+    ): java.util.ArrayList<Request.Listener> {
+        // TODO: Dagger doesn't appear to like standard kotlin lists. Replace this with a standard
+        //   Kotlin list interfaces when dagger compiles with them.
+        // TODO: Add internal listeners before adding external global listeners.
+        return java.util.ArrayList(graphConfig.listeners)
+    }
 }
\ No newline at end of file
diff --git a/camera/camera-camera2-pipe/src/main/java/androidx/camera/camera2/pipe/impl/CameraGraphImpl.kt b/camera/camera-camera2-pipe/src/main/java/androidx/camera/camera2/pipe/impl/CameraGraphImpl.kt
index 7563990..561b312 100644
--- a/camera/camera-camera2-pipe/src/main/java/androidx/camera/camera2/pipe/impl/CameraGraphImpl.kt
+++ b/camera/camera-camera2-pipe/src/main/java/androidx/camera/camera2/pipe/impl/CameraGraphImpl.kt
@@ -17,33 +17,45 @@
 package androidx.camera.camera2.pipe.impl
 
 import android.content.Context
+import android.view.Surface
 import androidx.camera.camera2.pipe.CameraGraph
+import androidx.camera.camera2.pipe.StreamId
 import javax.inject.Inject
 
 @CameraGraphScope
 class CameraGraphImpl @Inject constructor(
     private val context: Context,
-    private val config: CameraGraph.Config
+    private val config: CameraGraph.Config,
+    private val graphProcessor: GraphProcessor,
+    private val streamMap: StreamMap
 ) : CameraGraph {
     // Only one session can be active at a time.
     private val sessionLock = TokenLockImpl(1)
 
     override fun start() {
-        Log.debug { "Starting Camera Graph!" }
-        TODO("Not Implemented")
+        graphProcessor.start()
     }
 
     override fun stop() {
-        TODO("Not Implemented")
+        graphProcessor.stop()
     }
 
     override suspend fun acquireSession(): CameraGraph.Session {
         val token = sessionLock.acquire(1)
-        return CameraGraphSessionImpl(token)
+        return CameraGraphSessionImpl(token, graphProcessor)
     }
 
     override fun acquireSessionOrNull(): CameraGraph.Session? {
         val token = sessionLock.acquireOrNull(1) ?: return null
-        return CameraGraphSessionImpl(token)
+        return CameraGraphSessionImpl(token, graphProcessor)
+    }
+
+    override fun setSurface(stream: StreamId, surface: Surface?) {
+        streamMap[stream] = surface
+    }
+
+    override fun close() {
+        sessionLock.close()
+        graphProcessor.close()
     }
 }
\ No newline at end of file
diff --git a/camera/camera-camera2-pipe/src/main/java/androidx/camera/camera2/pipe/impl/CameraGraphSessionImpl.kt b/camera/camera-camera2-pipe/src/main/java/androidx/camera/camera2/pipe/impl/CameraGraphSessionImpl.kt
index c0b1058..9876f82 100644
--- a/camera/camera-camera2-pipe/src/main/java/androidx/camera/camera2/pipe/impl/CameraGraphSessionImpl.kt
+++ b/camera/camera-camera2-pipe/src/main/java/androidx/camera/camera2/pipe/impl/CameraGraphSessionImpl.kt
@@ -19,17 +19,24 @@
 import androidx.camera.camera2.pipe.CameraGraph
 import androidx.camera.camera2.pipe.Request
 
-class CameraGraphSessionImpl(private val token: TokenLock.Token) : CameraGraph.Session {
+class CameraGraphSessionImpl(
+    private val token: TokenLock.Token,
+    private val graphProcessor: GraphProcessor
+) : CameraGraph.Session {
     override fun submit(request: Request) {
-        TODO("not implemented")
+        graphProcessor.submit(request)
     }
 
     override fun submit(requests: List<Request>) {
-        TODO("not implemented")
+        graphProcessor.submit(requests)
     }
 
     override fun setRepeating(request: Request) {
-        TODO("not implemented")
+        graphProcessor.setRepeating(request)
+    }
+
+    override fun abort() {
+        graphProcessor.abort()
     }
 
     override fun close() {
diff --git a/camera/camera-camera2-pipe/src/main/java/androidx/camera/camera2/pipe/impl/GraphProcessorImpl.kt b/camera/camera-camera2-pipe/src/main/java/androidx/camera/camera2/pipe/impl/GraphProcessorImpl.kt
new file mode 100644
index 0000000..8304760
--- /dev/null
+++ b/camera/camera-camera2-pipe/src/main/java/androidx/camera/camera2/pipe/impl/GraphProcessorImpl.kt
@@ -0,0 +1,410 @@
+/*
+ * Copyright 2020 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package androidx.camera.camera2.pipe.impl
+
+import android.hardware.camera2.CaptureRequest
+import androidx.annotation.GuardedBy
+import androidx.camera.camera2.pipe.Request
+import androidx.camera.camera2.pipe.impl.Log.warn
+import kotlinx.coroutines.CoroutineDispatcher
+import kotlinx.coroutines.CoroutineScope
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.withContext
+import javax.inject.Inject
+
+/**
+ * The [GraphProcessor] is responsible for queuing and submitting requests to a single
+ * [RequestProcessor] instance, and for maintaining state across one or more [RequestProcessor]
+ * instances.
+ */
+interface GraphProcessor {
+    var requestProcessor: RequestProcessor?
+
+    /**
+     * This method puts the [GraphProcessor] into a started state. Starting the [GraphProcessor]
+     * will cause it to attempt to submit all requests to the current [RequestProcessor] instance,
+     * and any subsequent requests will be immediately submitted to the current [RequestProcessor].
+     */
+    fun start()
+
+    /**
+     * This method puts the [GraphProcessor] into a stopped state and clears the current
+     * [RequestProcessor] instance. While the graph processor is stopped, all requests are
+     * buffered.
+     */
+    fun stop()
+
+    fun setRepeating(request: Request)
+    fun submit(request: Request)
+    fun submit(requests: List<Request>)
+
+    /**
+     * Abort all submitted requests that have not yet been submitted to the [RequestProcessor] as
+     * well as aborting requests on the [RequestProcessor] itself.
+     */
+    fun abort()
+
+    /**
+     * Closing the [GraphProcessor] will abort all queued requests. Any requests submitted after the
+     * [GraphProcessor] is closed will be immediately aborted.
+     */
+    fun close()
+}
+
+/**
+ * The graph processor handles *cross-session* state, such as the most recent repeating request.
+ */
+@CameraGraphScope
+class GraphProcessorImpl @Inject constructor(
+    @ForCameraGraph private val graphScope: CoroutineScope,
+    @ForCameraGraph private val graphDispatcher: CoroutineDispatcher,
+    @ForCameraGraph private val graphListeners: java.util.ArrayList<Request.Listener>
+) : GraphProcessor {
+    private val lock = Any()
+
+    @GuardedBy("lock")
+    private val requestQueue: MutableList<List<Request>> = ArrayList()
+
+    @GuardedBy("lock")
+    private var currentRepeatingRequest: Request? = null
+
+    @GuardedBy("lock")
+    private var nextRepeatingRequest: Request? = null
+
+    @GuardedBy("lock")
+    private var _requestProcessor: RequestProcessor? = null
+
+    @GuardedBy("lock")
+    private var submitting = false
+
+    @GuardedBy("lock")
+    private var dirty = false
+
+    @GuardedBy("lock")
+    private var closed = false
+
+    @GuardedBy("lock")
+    private var active = false
+
+    override var requestProcessor: RequestProcessor?
+        get() = synchronized(lock) {
+            _requestProcessor
+        }
+        set(value) {
+            val processorToClose: RequestProcessor?
+            val processorToDisconnect: RequestProcessor?
+            synchronized(lock) {
+                processorToDisconnect = _requestProcessor
+                if (closed) {
+                    processorToClose = value
+                } else {
+                    processorToClose = null
+                    _requestProcessor = value
+                }
+            }
+
+            if (value === processorToDisconnect) {
+                warn { "RequestProcessor was set more than once." }
+                return
+            }
+
+            // Setting the request processor to null will disconnect the old processor.
+            if (processorToDisconnect != null) {
+                synchronized(processorToDisconnect) {
+                    processorToDisconnect.disconnect()
+                }
+            }
+
+            if (processorToClose != null) {
+                synchronized(processorToClose) {
+                    processorToClose.stop()
+                }
+                return
+            }
+
+            if (value != null) {
+                graphScope.launch {
+                    trySetRepeating()
+                    submitLoop()
+                }
+            }
+        }
+
+    override fun start() {
+        synchronized(lock) {
+            active = true
+        }
+
+        Log.debug { "Starting GraphProcessor" }
+        // TODO: Start the camera and configure the capture session.
+    }
+
+    /**
+     * This method puts the [GraphProcessorImpl] into a stopped state. While the graph processor is
+     * in this state, all requests are buffered in the RequestQueue.
+     */
+    override fun stop() {
+        val processor = synchronized(lock) {
+            active = false
+            _requestProcessor.also { _requestProcessor = null }
+        }
+
+        Log.debug { "Stopping GraphProcessor" }
+
+        if (processor == null) {
+            return
+        }
+
+        // There are about ~3 main ways a Camera2 CameraCaptureSession can be shut down and closed,
+        // and the behavior will be different depending on the circumstances.
+        //
+        // A session can be replaced by another session by simply calling createCaptureSession on
+        // the CameraDevice. Internally this will reconfigure the camera capture session, and there
+        // are optimizations present in the CameraFramework and Camera HAL that can optimize how
+        // fast the new session is created and started. The most obvious example of this is
+        // replacing a surface with a new one after recording a video, which can effectively cause
+        // the new session to be created and replaced without dropping a frame.
+        //
+        // Second, a session can be _stopped_ by calling stopRepeating and/or abortCaptures. This
+        // keeps the session alive but may abort pending requests. In some cases it's faster to
+        // switch sessions if these methods are invoked before creating a new session on the
+        // device because requests that are in-flight can be explicitly aborted.
+        //
+        // Finally, a session may be closed as a result of the underlying CameraDevice being closed
+        // or disconnected. This can happen if a higher priority process steals the camera, or
+        // during switches from one camera to another.
+
+        graphScope.launch {
+            processor.stop()
+        }
+    }
+
+    override fun setRepeating(request: Request) {
+        synchronized(lock) {
+            if (closed) return
+            nextRepeatingRequest = request
+        }
+
+        graphScope.launch {
+            trySetRepeating()
+        }
+    }
+
+    override fun submit(request: Request) {
+        submit(listOf(request))
+    }
+
+    override fun submit(requests: List<Request>) {
+        synchronized(lock) {
+            if (closed) {
+                graphScope.launch {
+                    abortBurst(requests)
+                }
+                return
+            }
+            requestQueue.add(requests)
+        }
+
+        graphScope.launch {
+            submitLoop()
+        }
+    }
+
+    /**
+     * Submit a request to the camera using only the current repeating request.
+     */
+    suspend fun submit(parameters: Map<CaptureRequest.Key<*>, Any>): Boolean =
+        withContext(graphDispatcher) {
+            val processor: RequestProcessor?
+            val request: Request?
+
+            synchronized(lock) {
+                if (closed) return@withContext false
+                processor = _requestProcessor
+                request = currentRepeatingRequest
+            }
+
+            return@withContext when {
+                processor == null || request == null -> false
+                else -> processor.submit(
+                    request,
+                    parameters,
+                    requireSurfacesForAllStreams = false
+                )
+            }
+        }
+
+    override fun abort() {
+        val processor: RequestProcessor?
+        val requests: List<List<Request>>
+
+        synchronized(lock) {
+            processor = _requestProcessor
+            requests = requestQueue.toList()
+            requestQueue.clear()
+        }
+
+        graphScope.launch {
+            // Start with requests that have already been submitted
+            if (processor != null) {
+                synchronized(processor) {
+                    processor.abort()
+                }
+            }
+
+            // Then abort requests that have not been submitted
+            for (burst in requests) {
+                abortBurst(burst)
+            }
+        }
+    }
+
+    override fun close() {
+        synchronized(lock) {
+            if (closed) {
+                return
+            }
+            closed = true
+        }
+
+        abort()
+        stop()
+    }
+
+    private fun read3AState(): Map<CaptureRequest.Key<*>, Any> {
+        // TODO: Build extras from 3A state
+        return mapOf()
+    }
+
+    private fun abortBurst(requests: List<Request>) {
+        for (request in requests) {
+            abortRequest(request)
+        }
+    }
+
+    private fun abortRequest(request: Request) {
+        for (listenerIdx in graphListeners.indices) {
+            graphListeners[listenerIdx].onAborted(request)
+        }
+
+        for (listenerIdx in request.listeners.indices) {
+            request.listeners[listenerIdx].onAborted(request)
+        }
+    }
+
+    private fun trySetRepeating() {
+        val processor: RequestProcessor?
+        val request: Request?
+
+        synchronized(lock) {
+            if (closed || !active) return
+
+            processor = _requestProcessor
+            request = nextRepeatingRequest ?: currentRepeatingRequest
+        }
+
+        if (processor != null && request != null) {
+            val extras: Map<CaptureRequest.Key<*>, Any> = read3AState()
+
+            synchronized(processor) {
+                if (processor.setRepeating(request, extras, requireSurfacesForAllStreams = true)) {
+
+                    // ONLY update the current repeating request if the update succeeds
+                    synchronized(lock) {
+                        currentRepeatingRequest = request
+
+                        // There is a race condition where the nextRepeating request might be changed
+                        // while trying to update the current repeating request. If this happens, do no
+                        // overwrite the pending request.
+                        if (nextRepeatingRequest == request) {
+                            nextRepeatingRequest = null
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    private fun submitLoop() {
+        var burst: List<Request>
+        var processor: RequestProcessor
+
+        synchronized(lock) {
+            if (closed || !active) return
+
+            if (submitting) {
+                dirty = true
+                return
+            }
+
+            val nullableProcessor = _requestProcessor
+            val nullableBurst = requestQueue.firstOrNull()
+            if (nullableProcessor == null || nullableBurst == null) {
+                return
+            }
+
+            processor = nullableProcessor
+            burst = nullableBurst
+
+            submitting = true
+        }
+
+        while (true) {
+            var submitted = false
+            try {
+                val extras: Map<CaptureRequest.Key<*>, Any> = read3AState()
+                submitted = synchronized(processor) {
+                    if (burst.size == 1) {
+                        processor.submit(burst[0], extras, true)
+                    } else {
+                        processor.submit(burst, extras, true)
+                    }
+                }
+            } finally {
+                synchronized(lock) {
+                    if (submitted) {
+                        check(requestQueue.removeAt(0) === burst)
+
+                        val nullableBurst = requestQueue.firstOrNull()
+                        if (nullableBurst == null) {
+                            dirty = false
+                            submitting = false
+                            return
+                        }
+
+                        burst = nullableBurst
+                    } else if (!dirty) {
+                        // If we did not submit, and we are also not dirty, then exit the loop
+                        submitting = false
+                        return
+                    } else {
+                        dirty = false
+
+                        // One possible situation is that the _requestProcessor was replaced or
+                        // set to null. If this happens, try to update the requestProcessor we
+                        // are currently using. If the current request processor is null, then
+                        // we cannot submit anyways.
+                        val nullableProcessor = _requestProcessor
+                        if (nullableProcessor != null) {
+                            processor = nullableProcessor
+                        }
+                    }
+                }
+            }
+        }
+    }
+}
diff --git a/camera/camera-camera2-pipe/src/main/java/androidx/camera/camera2/pipe/impl/RequestProcessor.kt b/camera/camera-camera2-pipe/src/main/java/androidx/camera/camera2/pipe/impl/RequestProcessor.kt
new file mode 100644
index 0000000..117a81d1
--- /dev/null
+++ b/camera/camera-camera2-pipe/src/main/java/androidx/camera/camera2/pipe/impl/RequestProcessor.kt
@@ -0,0 +1,116 @@
+/*
+ * Copyright 2020 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package androidx.camera.camera2.pipe.impl
+
+import android.hardware.camera2.CaptureRequest
+import androidx.camera.camera2.pipe.Request
+
+/**
+ * An instance of a RequestProcessor exists for the duration of a CameraCaptureSession and must be
+ * created for each new CameraCaptureSession. It is responsible for low level interactions with the
+ * CameraCaptureSession and for shimming the interfaces and callbacks to make them easier to work
+ * with. Unlike the CameraCaptureSessionProxy interface the RequestProcessor has more liberty to
+ * change the standard Camera2 API contract to make it easier to work with.
+ *
+ * There are some important design considerations:
+ * - Instances class is not thread safe, although the companion object has some counters that are
+ *   global and *are* thread safe.
+ * - Special care is taken to reduce the number objects and wrappers that are created, and to reduce
+ *   the number of loops and overhead in wrapper objects.
+ * - Callbacks are expected to be invoked at very high frequency.
+ * - One RequestProcessor instance per CameraCaptureSession
+ */
+interface RequestProcessor {
+
+    /**
+     * Submit a single [Request] with an optional set of extra parameters.
+     *
+     * @param request the request to submit to the camera.
+     * @param extraRequestParameters extra parameters to apply to the request.
+     * @param requireSurfacesForAllStreams if this flag is defined then this method will only submit
+     *   the request if all streamIds can be mapped to valid surfaces. At least one surface is
+     *   always required. This is useful if (for example) someone needs to quickly submit a
+     *   request with a specific trigger or mode key but does not care about modifying the list of
+     *   current surfaces.
+     * @return false if this request failed to be submitted. If this method returns false, none of
+     *   the callbacks on the Request(s) will be invoked.
+     */
+    fun submit(
+        request: Request,
+        extraRequestParameters: Map<CaptureRequest.Key<*>, Any>,
+        requireSurfacesForAllStreams: Boolean
+    ): Boolean
+
+    /**
+     * Submit a list of [Request]s with an optional set of extra parameters.
+     *
+     * @param requests the requests to submit to the camera.
+     * @param extraRequestParameters extra parameters to apply to the request.
+     * @param requireSurfacesForAllStreams if this flag is defined then this method will only submit
+     *   the request if all streamIds can be mapped to valid surfaces. At least one surface is
+     *   always required. This is useful if (for example) someone needs to quickly submit a
+     *   request with a specific trigger or mode key but does not care about modifying the list of
+     *   current surfaces.
+     * @return false if this request failed to be submitted. If this method returns false, none of
+     *   the callbacks on the Request(s) will be invoked.
+     */
+    fun submit(
+        requests: List<Request>,
+        extraRequestParameters: Map<CaptureRequest.Key<*>, Any>,
+        requireSurfacesForAllStreams: Boolean
+    ): Boolean
+
+    /**
+     * Set the repeating [Request] with an optional set of extra parameters.
+     *
+     * The current repeating request may not be executed at all, or it may be executed multiple
+     * times. The repeating request is used as the base request for all 3A interactions which may
+     * cause the request to be used to generate multiple [CaptureRequest]s to the camera.
+     *
+     * @param request the requests to set as the repeating request.
+     * @param extraRequestParameters extra parameters to apply to the request.
+     * @param requireSurfacesForAllStreams if this flag is defined then this method will only submit
+     *   the request if all streamIds can be mapped to valid surfaces. At least one surface is
+     *   always required. This is useful if (for example) someone needs to quickly submit a
+     *   request with a specific trigger or mode key but does not care about modifying the list of
+     *   current surfaces.
+     * @return false if this request failed to be submitted. If this method returns false, none of
+     *   the callbacks on the Request(s) will be invoked.
+     */
+    fun setRepeating(
+        request: Request,
+        extraRequestParameters: Map<CaptureRequest.Key<*>, Any>,
+        requireSurfacesForAllStreams: Boolean
+    ): Boolean
+
+    /**
+     * Abort requests that have been submitted but not completed.
+     */
+    fun abort()
+
+    /**
+     * Puts the RequestProcessor into a closed state where it will reject all incoming requests, but
+     * does not actively stop repeating requests or abort pending captures.
+     */
+    fun disconnect()
+
+    /**
+     * Puts the RequestProcessor into a closed state where it will reject all incoming requests and
+     * then actively stops the current repeating request.
+     */
+    fun stop()
+}
diff --git a/camera/camera-camera2-pipe/src/main/java/androidx/camera/camera2/pipe/impl/StreamMap.kt b/camera/camera-camera2-pipe/src/main/java/androidx/camera/camera2/pipe/impl/StreamMap.kt
new file mode 100644
index 0000000..388167c
--- /dev/null
+++ b/camera/camera-camera2-pipe/src/main/java/androidx/camera/camera2/pipe/impl/StreamMap.kt
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2020 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package androidx.camera.camera2.pipe.impl
+
+import android.view.Surface
+import androidx.camera.camera2.pipe.StreamId
+import javax.inject.Inject
+
+/**
+ * This object keeps track of which surfaces have been configured for each stream. In addition,
+ * it will keep track of which surfaces have changed or replaced so that the CaptureSession can be
+ * reconfigured if the configured surfaces change.
+ */
+@CameraGraphScope
+class StreamMap @Inject constructor() {
+    private val activeSurfaceMap: MutableMap<StreamId, Surface> = mutableMapOf()
+
+    operator fun set(stream: StreamId, surface: Surface?) {
+        if (surface == null) {
+            // TODO: Tell the graph processor that it should resubmit the repeating request or
+            //  reconfigure the camera2 captureSession
+            activeSurfaceMap.remove(stream)
+        } else {
+            activeSurfaceMap[stream] = surface
+        }
+    }
+
+    operator fun get(stream: StreamId): Surface? = activeSurfaceMap[stream]
+}
\ No newline at end of file
diff --git a/camera/camera-camera2-pipe/src/test/java/androidx/camera/camera2/pipe/CameraPipeTest.kt b/camera/camera-camera2-pipe/src/test/java/androidx/camera/camera2/pipe/CameraPipeTest.kt
index b70cb75..cdb7d61 100644
--- a/camera/camera-camera2-pipe/src/test/java/androidx/camera/camera2/pipe/CameraPipeTest.kt
+++ b/camera/camera-camera2-pipe/src/test/java/androidx/camera/camera2/pipe/CameraPipeTest.kt
@@ -48,7 +48,7 @@
             CameraGraph.Config(
                 camera = CameraId("0"),
                 streams = listOf(),
-                defaultTemplate = 0
+                defaultTemplate = RequestTemplate(0)
             )
         )
         assertThat(cameraGraph).isNotNull()
diff --git a/camera/camera-camera2-pipe/src/test/java/androidx/camera/camera2/pipe/impl/CameraGraphImplTest.kt b/camera/camera-camera2-pipe/src/test/java/androidx/camera/camera2/pipe/impl/CameraGraphImplTest.kt
index dccb316..29f0f9c 100644
--- a/camera/camera-camera2-pipe/src/test/java/androidx/camera/camera2/pipe/impl/CameraGraphImplTest.kt
+++ b/camera/camera-camera2-pipe/src/test/java/androidx/camera/camera2/pipe/impl/CameraGraphImplTest.kt
@@ -20,7 +20,10 @@
 import android.os.Build
 import androidx.camera.camera2.pipe.CameraGraph
 import androidx.camera.camera2.pipe.CameraId
+import androidx.camera.camera2.pipe.Request
+import androidx.camera.camera2.pipe.RequestTemplate
 import androidx.camera.camera2.pipe.testing.CameraPipeRobolectricTestRunner
+import androidx.camera.camera2.pipe.testing.FakeGraphProcessor
 import androidx.test.core.app.ApplicationProvider
 import androidx.test.filters.SmallTest
 import com.google.common.truth.Truth.assertThat
@@ -36,6 +39,7 @@
 @DoNotInstrument
 @Config(minSdk = Build.VERSION_CODES.LOLLIPOP)
 class CameraGraphImplTest {
+    private val graphProcessor = FakeGraphProcessor()
     private lateinit var impl: CameraGraphImpl
 
     @Before
@@ -43,10 +47,10 @@
         val config = CameraGraph.Config(
             camera = CameraId("0"),
             streams = listOf(),
-            defaultTemplate = 0
+            defaultTemplate = RequestTemplate(0)
         )
         val context = ApplicationProvider.getApplicationContext() as Context
-        impl = CameraGraphImpl(context, config)
+        impl = CameraGraphImpl(context, config, graphProcessor, StreamMap())
     }
 
     @Test
@@ -55,7 +59,7 @@
     }
 
     @Test
-    fun testAcquireSession() = runBlocking<Unit> {
+    fun testAcquireSession() = runBlocking {
         val session = impl.acquireSession()
         assertThat(session).isNotNull()
     }
@@ -67,7 +71,7 @@
     }
 
     @Test
-    fun testAcquireSessionOrNullAfterAcquireSession() = runBlocking<Unit> {
+    fun testAcquireSessionOrNullAfterAcquireSession() = runBlocking {
         val session = impl.acquireSession()
         assertThat(session).isNotNull()
 
@@ -81,4 +85,60 @@
         val session2 = impl.acquireSessionOrNull()
         assertThat(session2).isNotNull()
     }
+
+    @Test
+    fun sessionSubmitsRequestsToGraphProcessor() {
+        val session = checkNotNull(impl.acquireSessionOrNull())
+        val request = Request(listOf())
+        session.submit(request)
+
+        assertThat(graphProcessor.requestQueue).contains(listOf(request))
+    }
+
+    @Test
+    fun sessionSetsRepeatingRequestOnGraphProcessor() {
+        val session = checkNotNull(impl.acquireSessionOrNull())
+        val request = Request(listOf())
+        session.setRepeating(request)
+
+        assertThat(graphProcessor.repeatingRequest).isSameInstanceAs(request)
+    }
+
+    @Test
+    fun sessionAbortsRequestOnGraphProcessor() {
+        val session = checkNotNull(impl.acquireSessionOrNull())
+        val request = Request(listOf())
+        session.submit(request)
+        session.abort()
+
+        assertThat(graphProcessor.requestQueue).isEmpty()
+    }
+
+    @Test
+    fun closingSessionDoesNotCloseGraphProcessor() {
+        val session = impl.acquireSessionOrNull()
+        checkNotNull(session).close()
+
+        assertThat(graphProcessor.closed).isFalse()
+    }
+
+    @Test
+    fun closingCameraGraphClosesGraphProcessor() {
+        impl.close()
+        assertThat(graphProcessor.closed).isTrue()
+    }
+
+    @Test
+    fun stoppingCameraGraphStopsGraphProcessor() {
+        assertThat(graphProcessor.active).isFalse()
+        impl.start()
+        assertThat(graphProcessor.active).isTrue()
+        impl.stop()
+        assertThat(graphProcessor.active).isFalse()
+        impl.start()
+        assertThat(graphProcessor.active).isTrue()
+        impl.close()
+        assertThat(graphProcessor.closed).isTrue()
+        assertThat(graphProcessor.active).isFalse()
+    }
 }
\ No newline at end of file
diff --git a/camera/camera-camera2-pipe/src/test/java/androidx/camera/camera2/pipe/impl/CameraPipeComponentTest.kt b/camera/camera-camera2-pipe/src/test/java/androidx/camera/camera2/pipe/impl/CameraPipeComponentTest.kt
index 0b955fe..b8928fc 100644
--- a/camera/camera-camera2-pipe/src/test/java/androidx/camera/camera2/pipe/impl/CameraPipeComponentTest.kt
+++ b/camera/camera-camera2-pipe/src/test/java/androidx/camera/camera2/pipe/impl/CameraPipeComponentTest.kt
@@ -21,6 +21,7 @@
 import androidx.camera.camera2.pipe.CameraGraph
 import androidx.camera.camera2.pipe.CameraId
 import androidx.camera.camera2.pipe.CameraPipe
+import androidx.camera.camera2.pipe.RequestTemplate
 import androidx.camera.camera2.pipe.testing.CameraPipeRobolectricTestRunner
 import androidx.test.core.app.ApplicationProvider
 import androidx.test.filters.SmallTest
@@ -57,7 +58,7 @@
         val config = CameraGraph.Config(
             camera = cameraId,
             streams = listOf(),
-            defaultTemplate = 0
+            defaultTemplate = RequestTemplate(0)
         )
         val module = CameraGraphModule(config)
         val builder = component.cameraGraphComponentBuilder()
@@ -79,7 +80,7 @@
                     CameraGraph.Config(
                         camera = CameraId("0"),
                         streams = listOf(),
-                        defaultTemplate = 0
+                        defaultTemplate = RequestTemplate(0)
                     )
                 )
             )
diff --git a/camera/camera-camera2-pipe/src/test/java/androidx/camera/camera2/pipe/impl/GraphProcessorTest.kt b/camera/camera-camera2-pipe/src/test/java/androidx/camera/camera2/pipe/impl/GraphProcessorTest.kt
new file mode 100644
index 0000000..91d9b4e
--- /dev/null
+++ b/camera/camera-camera2-pipe/src/test/java/androidx/camera/camera2/pipe/impl/GraphProcessorTest.kt
@@ -0,0 +1,373 @@
+/*
+ * Copyright 2020 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package androidx.camera.camera2.pipe.impl
+
+import android.os.Build
+import androidx.camera.camera2.pipe.Request
+import androidx.camera.camera2.pipe.StreamId
+import androidx.camera.camera2.pipe.testing.CameraPipeRobolectricTestRunner
+import androidx.camera.camera2.pipe.testing.Event
+import androidx.camera.camera2.pipe.testing.FakeRequestListener
+import androidx.camera.camera2.pipe.testing.FakeRequestProcessor
+import androidx.test.filters.SmallTest
+import com.google.common.truth.Truth.assertThat
+import kotlinx.coroutines.Dispatchers
+import kotlinx.coroutines.runBlocking
+import org.junit.Test
+import org.junit.runner.RunWith
+import org.robolectric.annotation.Config
+
+@SmallTest
+@RunWith(CameraPipeRobolectricTestRunner::class)
+@Config(minSdk = Build.VERSION_CODES.LOLLIPOP)
+class GraphProcessorTest {
+    private val globalListener = FakeRequestListener()
+
+    private val fakeProcessor1 = FakeRequestProcessor()
+    private val fakeProcessor2 = FakeRequestProcessor()
+
+    private val requestListener1 = FakeRequestListener()
+    private val request1 = Request(listOf(StreamId(0)), listeners = listOf(requestListener1))
+
+    private val requestListener2 = FakeRequestListener()
+    private val request2 = Request(listOf(StreamId(0)), listeners = listOf(requestListener2))
+
+    @Test
+    fun graphProcessorSubmitsRequests() {
+        // The graph processor uses 'launch' within the coroutine scope to invoke updates on the
+        // requestProcessor instance. runBlocking forces all jobs to complete before testing the
+        // state of results.
+        runBlocking(Dispatchers.Default) {
+            val graphProcessor = GraphProcessorImpl(
+                this,
+                Dispatchers.Default,
+                arrayListOf(globalListener)
+            )
+            graphProcessor.start()
+
+            graphProcessor.requestProcessor = fakeProcessor1
+            graphProcessor.submit(request1)
+        }
+
+        // Make sure the requests get submitted to the request processor
+        assertThat(fakeProcessor1.requestQueue).hasSize(1)
+        assertThat(fakeProcessor1.requestQueue.first().burst).hasSize(1)
+        assertThat(fakeProcessor1.requestQueue.first().burst.first()).isSameInstanceAs(request1)
+    }
+
+    @Test
+    fun graphProcessorSubmitsRequestsToMostRecentProcessor() {
+        // The graph processor uses 'launch' within the coroutine scope to invoke updates on the
+        // requestProcessor instance. runBlocking forces all jobs to complete before testing the
+        // state of results.
+        runBlocking(Dispatchers.Default) {
+            val graphProcessor = GraphProcessorImpl(
+                this,
+                Dispatchers.Default,
+                arrayListOf(globalListener)
+            )
+            graphProcessor.start()
+
+            graphProcessor.requestProcessor = fakeProcessor1
+            graphProcessor.requestProcessor = fakeProcessor2
+            graphProcessor.submit(request1)
+        }
+
+        // requestProcessor1 does not receive requests
+        assertThat(fakeProcessor1.requestQueue).hasSize(0)
+
+        // requestProcessor2 receives requests
+        assertThat(fakeProcessor2.requestQueue).hasSize(1)
+        assertThat(fakeProcessor2.requestQueue.first().burst).hasSize(1)
+        assertThat(fakeProcessor2.requestQueue.first().burst.first()).isSameInstanceAs(request1)
+    }
+
+    @Test
+    fun graphProcessorSubmitsQueuedRequests() {
+        // The graph processor uses 'launch' within the coroutine scope to invoke updates on the
+        // requestProcessor instance. runBlocking forces all jobs to complete before testing the
+        // state of results.
+        runBlocking(Dispatchers.Default) {
+            val graphProcessor = GraphProcessorImpl(
+                this,
+                Dispatchers.Default,
+                arrayListOf(globalListener)
+            )
+            graphProcessor.start()
+
+            graphProcessor.submit(request1)
+            graphProcessor.submit(request2)
+
+            // Request1 and 2 should be queued and will be submitted even when the request
+            // processor is set after the requests are submitted.
+            graphProcessor.requestProcessor = fakeProcessor1
+        }
+
+        // Make sure the requests get submitted to the request processor
+        assertThat(fakeProcessor1.requestQueue).hasSize(2)
+        assertThat(fakeProcessor1.requestQueue[0].burst[0]).isSameInstanceAs(request1)
+        assertThat(fakeProcessor1.requestQueue[1].burst[0]).isSameInstanceAs(request2)
+    }
+
+    @Test
+    fun graphProcessorSubmitsBurstsOfRequestsTogetherWithExtras() {
+        // The graph processor uses 'launch' within the coroutine scope to invoke updates on the
+        // requestProcessor instance. runBlocking forces all jobs to complete before testing the
+        // state of results.
+        runBlocking(Dispatchers.Default) {
+            val graphProcessor = GraphProcessorImpl(
+                this,
+                Dispatchers.Default,
+                arrayListOf(globalListener)
+            )
+            graphProcessor.start()
+
+            graphProcessor.submit(listOf(request1, request2))
+            graphProcessor.requestProcessor = fakeProcessor1
+        }
+
+        assertThat(fakeProcessor1.requestQueue).hasSize(1)
+        assertThat(fakeProcessor1.requestQueue[0].burst[0]).isSameInstanceAs(request1)
+        assertThat(fakeProcessor1.requestQueue[0].burst[1]).isSameInstanceAs(request2)
+    }
+
+    @Test
+    fun graphProcessorDoesNotForgetRejectedRequests() {
+        runBlocking(Dispatchers.Default) {
+            val graphProcessor = GraphProcessorImpl(
+                this,
+                Dispatchers.Default,
+                arrayListOf(globalListener)
+            )
+            graphProcessor.start()
+
+            fakeProcessor1.rejectRequests = true
+            graphProcessor.requestProcessor = fakeProcessor1
+
+            graphProcessor.submit(request1)
+            assertThat(fakeProcessor1.nextEvent().rejected).isTrue()
+
+            graphProcessor.submit(request2)
+            assertThat(fakeProcessor1.nextEvent().rejected).isTrue()
+
+            graphProcessor.requestProcessor = fakeProcessor2
+            assertThat(fakeProcessor2.nextEvent().request!!.burst[0]).isSameInstanceAs(request1)
+            assertThat(fakeProcessor2.nextEvent().request!!.burst[0]).isSameInstanceAs(request2)
+        }
+
+        assertThat(fakeProcessor1.requestQueue).hasSize(0)
+        assertThat(fakeProcessor2.requestQueue).hasSize(2)
+        assertThat(fakeProcessor2.requestQueue[0].burst[0]).isSameInstanceAs(request1)
+        assertThat(fakeProcessor2.requestQueue[1].burst[0]).isSameInstanceAs(request2)
+    }
+
+    @Test
+    fun graphProcessorContinuesSubmittingRequestsWhenFirstRequestIsRejected() {
+        runBlocking(Dispatchers.Default) {
+            val graphProcessor = GraphProcessorImpl(
+                this,
+                Dispatchers.Default,
+                arrayListOf(globalListener)
+            )
+            graphProcessor.start()
+
+            // Note: setting the requestProcessor, and calling submit() can both trigger a call
+            // to submit a request.
+            fakeProcessor1.rejectRequests = true
+            graphProcessor.requestProcessor = fakeProcessor1
+            graphProcessor.submit(request1)
+
+            // Check to make sure that submit is called at least once, and that request1 is rejected
+            // from the request processor.
+            val event1 = fakeProcessor1.nextEvent()
+            assertThat(event1.request!!.burst).contains(request1)
+            assertThat(event1.rejected).isTrue()
+
+            // Stop rejecting requests
+            fakeProcessor1.rejectRequests = false
+            assertThat(fakeProcessor1.rejectRequests).isFalse()
+            assertThat(fakeProcessor1.disconnectInvoked).isFalse()
+            assertThat(fakeProcessor1.stopInvoked).isFalse()
+
+            graphProcessor.submit(request2)
+
+            // Cycle events until we get a submitted event with request1
+            val event2 = awaitEvent(fakeProcessor1, request1) { it.submit }
+            assertThat(event2.rejected).isFalse()
+
+            // Assert that immediately after we get a successfully submitted request, the
+            //  next request is also submitted.
+            val event3 = fakeProcessor1.nextEvent()
+            assertThat(event3.request!!.burst).contains(request2)
+            assertThat(event3.submit).isTrue()
+            assertThat(event3.rejected).isFalse()
+        }
+    }
+
+    @Test
+    fun graphProcessorSetsRepeatingRequest() {
+        runBlocking(Dispatchers.Default) {
+            val graphProcessor = GraphProcessorImpl(
+                this,
+                Dispatchers.Default,
+                arrayListOf(globalListener)
+            )
+            graphProcessor.start()
+
+            graphProcessor.requestProcessor = fakeProcessor1
+            graphProcessor.setRepeating(request1)
+            graphProcessor.setRepeating(request2)
+        }
+
+        assertThat(fakeProcessor1.repeatingRequest?.burst).contains(request2)
+    }
+
+    @Test
+    fun graphProcessorTracksRepeatingRequest() {
+        runBlocking(Dispatchers.Default) {
+            val graphProcessor = GraphProcessorImpl(
+                this,
+                Dispatchers.Default,
+                arrayListOf(globalListener)
+            )
+            graphProcessor.start()
+
+            graphProcessor.requestProcessor = fakeProcessor1
+            graphProcessor.setRepeating(request1)
+            awaitEvent(fakeProcessor1, request1) { it.setRepeating }
+
+            graphProcessor.requestProcessor = fakeProcessor2
+            awaitEvent(fakeProcessor2, request1) { it.setRepeating }
+        }
+
+        assertThat(fakeProcessor1.repeatingRequest?.burst).contains(request1)
+        assertThat(fakeProcessor2.repeatingRequest?.burst).contains(request1)
+    }
+
+    @Test
+    fun graphProcessorTracksRejectedRepeatingRequests() {
+        runBlocking(Dispatchers.Default) {
+            val graphProcessor = GraphProcessorImpl(
+                this,
+                Dispatchers.Default,
+                arrayListOf(globalListener)
+            )
+            graphProcessor.start()
+
+            fakeProcessor1.rejectRequests = true
+            graphProcessor.requestProcessor = fakeProcessor1
+            graphProcessor.setRepeating(request1)
+
+            graphProcessor.requestProcessor = fakeProcessor2
+            awaitEvent(fakeProcessor2, request1) { it.setRepeating }
+        }
+
+        assertThat(fakeProcessor2.repeatingRequest?.burst).contains(request1)
+    }
+
+    @Test
+    fun graphProcessorSubmitsRepeatingRequestAndQueuedRequests() {
+        runBlocking(Dispatchers.Default) {
+            val graphProcessor = GraphProcessorImpl(
+                this,
+                Dispatchers.Default,
+                arrayListOf(globalListener)
+            )
+            graphProcessor.start()
+
+            graphProcessor.setRepeating(request1)
+            graphProcessor.submit(request2)
+
+            graphProcessor.requestProcessor = fakeProcessor1
+        }
+
+        assertThat(fakeProcessor1.repeatingRequest?.burst).contains(request1)
+        assertThat(fakeProcessor1.requestQueue[0].burst[0]).isSameInstanceAs(request2)
+    }
+
+    @Test
+    fun graphProcessorAbortsQueuedRequests() {
+        runBlocking(Dispatchers.Default) {
+            val graphProcessor = GraphProcessorImpl(
+                this,
+                Dispatchers.Default,
+                arrayListOf(globalListener)
+            )
+            graphProcessor.start()
+
+            graphProcessor.setRepeating(request1)
+            graphProcessor.submit(request2)
+
+            // Abort queued and in-flight requests.
+            graphProcessor.abort()
+            graphProcessor.requestProcessor = fakeProcessor1
+        }
+
+        assertThat(requestListener1.lastAbortedRequest).isNull()
+        assertThat(requestListener2.lastAbortedRequest).isSameInstanceAs(request2)
+        assertThat(globalListener.lastAbortedRequest).isSameInstanceAs(request2)
+
+        assertThat(fakeProcessor1.repeatingRequest?.burst).contains(request1)
+        assertThat(fakeProcessor1.requestQueue).isEmpty()
+    }
+
+    @Test
+    fun closingGraphProcessorAbortsSubsequentRequests() {
+        runBlocking(Dispatchers.Default) {
+            val graphProcessor = GraphProcessorImpl(
+                this,
+                Dispatchers.Default,
+                arrayListOf(globalListener)
+            )
+            graphProcessor.start()
+            graphProcessor.close()
+
+            // Abort queued and in-flight requests.
+            graphProcessor.requestProcessor = fakeProcessor1
+            graphProcessor.setRepeating(request1)
+            graphProcessor.submit(request2)
+        }
+
+        // The repeating request is not aborted
+        assertThat(requestListener1.lastAbortedRequest).isNull()
+        assertThat(requestListener2.lastAbortedRequest).isSameInstanceAs(request2)
+
+        assertThat(fakeProcessor1.stopInvoked).isTrue()
+        assertThat(fakeProcessor1.repeatingRequest).isNull()
+        assertThat(fakeProcessor1.requestQueue).isEmpty()
+    }
+
+    private suspend fun awaitEvent(
+        requestProcessor: FakeRequestProcessor,
+        request: Request,
+        filter: (event: Event) -> Boolean
+    ): Event {
+
+        var event: Event
+        var loopCount = 0
+        while (loopCount < 10) {
+            loopCount++
+            event = requestProcessor.nextEvent()
+            val contains = event.request?.burst?.contains(request) ?: false
+            if (filter(event) && contains) {
+                return event
+            }
+        }
+
+        throw IllegalStateException("Failed to observe a submit event containing $request")
+    }
+}
\ No newline at end of file
diff --git a/camera/camera-camera2-pipe/src/test/java/androidx/camera/camera2/pipe/testing/FakeGraphProcessor.kt b/camera/camera-camera2-pipe/src/test/java/androidx/camera/camera2/pipe/testing/FakeGraphProcessor.kt
new file mode 100644
index 0000000..dc648c0
--- /dev/null
+++ b/camera/camera-camera2-pipe/src/test/java/androidx/camera/camera2/pipe/testing/FakeGraphProcessor.kt
@@ -0,0 +1,74 @@
+/*
+ * Copyright 2020 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package androidx.camera.camera2.pipe.testing
+
+import androidx.camera.camera2.pipe.Request
+import androidx.camera.camera2.pipe.impl.GraphProcessor
+import androidx.camera.camera2.pipe.impl.RequestProcessor
+
+/**
+ * Fake implementation of a [GraphProcessor] for tests.
+ */
+class FakeGraphProcessor : GraphProcessor {
+    var active = false
+        private set
+    var closed = false
+        private set
+    var repeatingRequest: Request? = null
+        private set
+    val requestQueue: List<List<Request>>
+        get() = _requestQueue
+
+    private val _requestQueue = mutableListOf<List<Request>>()
+
+    override var requestProcessor: RequestProcessor? = null
+
+    override fun start() {
+        if (!closed) {
+            active = true
+        }
+    }
+
+    override fun stop() {
+        active = false
+    }
+
+    override fun setRepeating(request: Request) {
+        repeatingRequest = request
+    }
+
+    override fun submit(request: Request) {
+        submit(listOf(request))
+    }
+
+    override fun submit(requests: List<Request>) {
+        _requestQueue.add(requests)
+    }
+
+    override fun abort() {
+        _requestQueue.clear()
+    }
+
+    override fun close() {
+        if (closed) {
+            return
+        }
+        closed = true
+        active = false
+        _requestQueue.clear()
+    }
+}
\ No newline at end of file
diff --git a/camera/camera-camera2-pipe/src/test/java/androidx/camera/camera2/pipe/testing/FakeRequestListener.kt b/camera/camera-camera2-pipe/src/test/java/androidx/camera/camera2/pipe/testing/FakeRequestListener.kt
new file mode 100644
index 0000000..fc8d037
--- /dev/null
+++ b/camera/camera-camera2-pipe/src/test/java/androidx/camera/camera2/pipe/testing/FakeRequestListener.kt
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2020 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package androidx.camera.camera2.pipe.testing
+
+import androidx.camera.camera2.pipe.CameraTimestamp
+import androidx.camera.camera2.pipe.FrameInfo
+import androidx.camera.camera2.pipe.FrameNumber
+import androidx.camera.camera2.pipe.Request
+import androidx.camera.camera2.pipe.RequestMetadata
+
+/**
+ * Fake implementation of a [Request.Listener] for tests.
+ */
+class FakeRequestListener : Request.Listener {
+    var lastStartedRequestMetadata: RequestMetadata? = null
+    var lastStartedFrameNumber: FrameNumber? = null
+    var lastStartedTimestamp: CameraTimestamp? = null
+
+    var lastCompleteRequestMetadata: RequestMetadata? = null
+    var lastCompleteFrameNumber: FrameNumber? = null
+    var lastCompleteResult: FrameInfo? = null
+
+    var lastAbortedRequest: Request? = null
+
+    override fun onStarted(
+        requestMetadata: RequestMetadata,
+        frameNumber: FrameNumber,
+        timestamp: CameraTimestamp
+    ) {
+        lastStartedRequestMetadata = requestMetadata
+        lastStartedFrameNumber = frameNumber
+        lastStartedTimestamp = timestamp
+    }
+
+    override fun onComplete(
+        requestMetadata: RequestMetadata,
+        frameNumber: FrameNumber,
+        result: FrameInfo
+    ) {
+        lastCompleteRequestMetadata = requestMetadata
+        lastCompleteFrameNumber = frameNumber
+        lastCompleteResult = result
+    }
+
+    override fun onAborted(
+        request: Request
+    ) {
+        lastAbortedRequest = request
+    }
+}
\ No newline at end of file
diff --git a/camera/camera-camera2-pipe/src/test/java/androidx/camera/camera2/pipe/testing/FakeRequestProcessor.kt b/camera/camera-camera2-pipe/src/test/java/androidx/camera/camera2/pipe/testing/FakeRequestProcessor.kt
new file mode 100644
index 0000000..08ab06c
--- /dev/null
+++ b/camera/camera-camera2-pipe/src/test/java/androidx/camera/camera2/pipe/testing/FakeRequestProcessor.kt
@@ -0,0 +1,138 @@
+/*
+ * Copyright 2020 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package androidx.camera.camera2.pipe.testing
+
+import android.hardware.camera2.CaptureRequest
+import androidx.camera.camera2.pipe.Request
+import androidx.camera.camera2.pipe.impl.RequestProcessor
+import androidx.camera.camera2.pipe.impl.TokenLock
+import androidx.camera.camera2.pipe.impl.TokenLockImpl
+import kotlinx.coroutines.channels.Channel
+import kotlinx.coroutines.withTimeout
+
+/**
+ * Fake implementation of a [RequestProcessor] for tests.
+ */
+class FakeRequestProcessor : RequestProcessor {
+    private val eventChannel = Channel<Event>(Channel.UNLIMITED)
+
+    val requestQueue: MutableList<FakeRequest> = mutableListOf()
+    var repeatingRequest: FakeRequest? = null
+
+    var rejectRequests = false
+    var abortInvoked = false
+        private set
+    var disconnectInvoked = false
+        private set
+    var stopInvoked = false
+        private set
+
+    private val tokenLock = TokenLockImpl(1)
+    private var token: TokenLock.Token? = null
+
+    data class FakeRequest(
+        val burst: List<Request>,
+        val extraRequestParameters: Map<CaptureRequest.Key<*>, Any> = emptyMap(),
+        val requireStreams: Boolean = false
+    )
+
+    override fun submit(
+        request: Request,
+        extraRequestParameters: Map<CaptureRequest.Key<*>, Any>,
+        requireSurfacesForAllStreams: Boolean
+    ): Boolean {
+        val fakeRequest =
+            FakeRequest(listOf(request), extraRequestParameters, requireSurfacesForAllStreams)
+
+        if (rejectRequests || disconnectInvoked || stopInvoked) {
+            eventChannel.offer(Event(request = fakeRequest, rejected = true))
+            return false
+        }
+
+        requestQueue.add(fakeRequest)
+        eventChannel.offer(Event(request = fakeRequest, submit = true))
+
+        return true
+    }
+
+    override fun submit(
+        requests: List<Request>,
+        extraRequestParameters: Map<CaptureRequest.Key<*>, Any>,
+        requireSurfacesForAllStreams: Boolean
+    ): Boolean {
+        val fakeRequest =
+            FakeRequest(requests, extraRequestParameters, requireSurfacesForAllStreams)
+        if (rejectRequests || disconnectInvoked || stopInvoked) {
+            eventChannel.offer(Event(request = fakeRequest, rejected = true))
+            return false
+        }
+
+        requestQueue.add(fakeRequest)
+        eventChannel.offer(Event(request = fakeRequest, submit = true))
+
+        return true
+    }
+
+    override fun setRepeating(
+        request: Request,
+        extraRequestParameters: Map<CaptureRequest.Key<*>, Any>,
+        requireSurfacesForAllStreams: Boolean
+    ): Boolean {
+        val fakeRequest =
+            FakeRequest(listOf(request), extraRequestParameters, requireSurfacesForAllStreams)
+        if (rejectRequests || disconnectInvoked || stopInvoked) {
+            eventChannel.offer(Event(request = fakeRequest, rejected = true))
+            return false
+        }
+
+        repeatingRequest = fakeRequest
+        eventChannel.offer(Event(request = fakeRequest, setRepeating = true))
+        return true
+    }
+
+    override fun abort() {
+        abortInvoked = true
+        eventChannel.offer(Event(abort = true))
+    }
+
+    override fun disconnect() {
+        disconnectInvoked = true
+        eventChannel.offer(Event(disconnect = true))
+    }
+
+    override fun stop() {
+        stopInvoked = true
+        eventChannel.offer(Event(stop = true))
+    }
+
+    /**
+     * Get the next event from queue with an option to specify a timeout for tests.
+     */
+    suspend fun nextEvent(timeMillis: Long = 25): Event = withTimeout(timeMillis) {
+        eventChannel.receive()
+    }
+}
+
+data class Event(
+    val request: FakeRequestProcessor.FakeRequest? = null,
+    val rejected: Boolean = false,
+    val abort: Boolean = false,
+    val disconnect: Boolean = false,
+    val stop: Boolean = false,
+    val submit: Boolean = false,
+    val setRepeating: Boolean = false
+)
\ No newline at end of file