Thread-Local storage (TLS) allows static variables to be attached to the currently executing thread. The most common use of TLS is to pass global context through the call-stack without method parameters. In a web-application, this will allow data (such as the current request’s URL) to be globally available throughout the codebase – extremely useful for logging or auditing purposes.
Where TLS can fail is when the execution path moves between threads. Anywhere Futures
parallelize code, execution is handled off to a random thread from a thread-pool for async execution where all TLS is lost. Futures
are at the heart of new reactive web frameworks such as Play!, requiring everyone to rethink how TLS is done.
But the solution is rather simple, it lies within the ExecutionContext
trait. The ExecutionContext
is an abstraction over an entity that can execute program logic, and it is an implicit requirement to create a Future:
import scala.concurrent.ExecutionContext.Implicits.global val f = Future { /*this block executes in another thread*/ }
The most common implementation is the ForkJoinPool
, which is an advancement over a basic thread-pool by implementing an efficient work-stealing algorithm. It is the default for parallelized applications built around Play! and Akka.
Let’s look at a small program which prints out basic thread information:
import scala.concurrent.{ Await, Future } import scala.concurrent.duration._ def printThreadInfo(id: String) = println { id + " : " + Thread.currentThread.getName } implicit val executionContext = scala.concurrent.ExecutionContext.Implicits.global printThreadInfo("main") val fut1 = Future { printThreadInfo("fut1") } Await.result(fut1, 1.second) //Output: //> main : main //> fut1 : ForkJoinPool-1-worker-13
So the Future
definitely runs on a different thread from the main. Can we store different values in TLS?
As this is Scala, let’s use the DynamicVariable class instead of Java’s ThreadLocal directly. They are so named as they are static variables who’s value dynamically depends on which thread is executing.
DynamicVariable
has nothing to do with dynamic fields in scala.language.dynamics
.
import scala.concurrent.{ Await, Future } import scala.concurrent.duration._ import scala.util.DynamicVariable def printThreadInfo(id: String) = println { id + " : " + Thread.currentThread.getName + " = " + dyn.value } //create a dynamic variable val dyn = new DynamicVariable[Int](0) implicit val executionContext = scala.concurrent.ExecutionContext.Implicits.global val fut1 = dyn.withValue(1) { Future { printThreadInfo("fut1") } } val fut2 = dyn.withValue(2) { Future { printThreadInfo("fut2") } } val fut3 = dyn.withValue(3) { Future { printThreadInfo("fut3") } } Await.result(fut1, 1.second) Await.result(fut2, 1.second) Await.result(fut3, 1.second) //Output: //> fut1 : ForkJoinPool-1-worker-13 = 1 //> fut2 : ForkJoinPool-1-worker-11 = 2 //> fut3 : ForkJoinPool-1-worker-9 = 3 //But wait, threads work when created, what happens if we reuse threads already in the pool? val fut4 = dyn.withValue(4) { Future { printThreadInfo("fut4") } } val fut5 = dyn.withValue(5) { Future { printThreadInfo("fut5") } } Await.result(fut4, 1.second) Await.result(fut5, 1.second) //Output: //> fut4 : ForkJoinPool-1-worker-11 = 2 //> fut5 : ForkJoinPool-1-worker-11 = 2
So we run into the problem that DynamicVariable
will correctly pass on TLS to new threads, but if the thread has already been created and re-unsed from the pool, the TLS won’t be copied, it will have the old value assigned during its previous use.
The ExecutionContext
handles all of the thread scheduling, can we tell it to copy our TLS into the threads before they execute?
The trait is very simple: execute
is be an obvious first choice to modify:
/** * An `ExecutionContext` is an abstraction over an entity that can execute program logic. */ trait ExecutionContext { /** Runs a block of code on this execution context. */ def execute(runnable: Runnable): Unit /** Reports that an asynchronous computation failed. */ def reportFailure(t: Throwable): Unit /** Prepares for the execution of a task. Returns the prepared * execution context. A valid implementation of `prepare` is one * that simply returns `this`. */ def prepare(): ExecutionContext = this }
We are using the ForkJoinPool
implementation, so create a new inherited class to modify. If we send a DynamicVariable
as a constructor parameter, we won’t have to worry about another closure.
import scala.concurrent.forkjoin._ class ForkJoinPoolWithDynamicVariable[T](dynamicVariable: DynamicVariable[T]) extends ForkJoinPool { override def execute(task: Runnable) { //need to inject dynamicVariable.value into task super.execute(task) } }
So the execute
runs in the main thread, but the task
is run in the Future
‘s thread-pool. We somehow need to inject the dynamicVariable
inside Runnable
. Let’s make another Runnable
which has closure on dynamicVariable
‘s value, and then runs task
.
override def execute(task: Runnable) { val copyValue = dynamicVariable.value super.execute(new Runnable { override def run = { dynamicVariable.value = copyValue task.run } }) }
Basically, the copyValue
reads the dynamicVariable
in the main thread, then while run
is executed in a thread-pool thread it will assign the proper value to the dynamicVariable
. As T
is generic, it can be any Scala class, including a Map
, so one DynamicVariable
is flexible enough for most scenarios. All we are left to do is to use our new ExecutorService
, so replace:
implicit val executionContext = scala.concurrent.ExecutionContext.Implicits.global
With the new class:
val dyn = new DynamicVariable[T](/* default for T */) implicit val executionContext = scala.concurrent.ExecutionContext.fromExecutorService( new ForkJoinPoolWithDynamicVariable(dyn) )
A small note about garbage collection. As long as a thread exists, it will maintain references to its ThreadLocal
variables. If the thread-pool does not recycle threads and a thread goes back into the pool without releasing its TLS then those objects will not be freed. Normally this isn’t an issue, however for larger objects it might be wise to explicitly release them after use, or use a WeakReference
if behaviour allows.
Full Source: Fork-Join-Pool-with-Dynamic-Variable.scala
In the same spirit, I wrote about passing the slf4j MDC context with Future (the MDC is based on thread local variables)
http://yanns.github.io/blog/2014/05/04/slf4j-mapped-diagnostic-context-mdc-with-play-framework/
Very nice Yann!
Setting up better logging for a web application pays for itself many times over. :) Your code is eerily similiar to mine – it makes me wonder if this is missing functionality in Play….
When I was working with Finagle, Twitter’s Futures had ThreadLocal functionality built-in (without the need for a custom ExecutionContext). Anything saved using https://github.com/twitter/util/blob/master/util-core/src/main/scala/com/twitter/util/Local.scala would be automatically copied by the Future into it’s code block using a closure. But trying to sell ThreadLocal hacks to the functional programming crowd is tough, it’s no wonder Typesafe didn’t include it in theirs.
Very nice post. @steven: as we work with finagle do you know about some doc or blog about how Local’s work there?
A great post :) Thanks!