ThreadLocal Variables and Scala Futures
Thread-Local storage (TLS) allows static variables to be attached to the currently executing thread. The most common use of TLS is to allow global context to be available throughout the entire call stack without passing it explicitly as a method parameters. In a web-application, this allows contextual request metadata, such as the URL, to be referenced anywhere within the code handing the request; which is extremely useful for logging or auditing purposes.
Where TLS can fail is when the execution path is handled by multiple threads.
Anywhere Futures parallelize code,
execution is handled off to a different thread defined within the Executor
thread-pool. This means that any use
of Scala Futures or similar async execution techniques will often lose TLS data. Since the Future
is at the heart of
Reactive web frameworks such as Play! 2.0
alternative techniques or code to handle TLS propagation is required.
Thread-Local Storage Propagation
A simple mechanism to propagate TLS is using
an ExecutionContext
trait. The ExecutionContext
is an abstraction over an entity to manage execution blocks of program logic (Work
).,
and it is an implicit requirement to create a Future:
import scala.concurrent.ExecutionContext.Implicits.global
val f = Future {
/*this block executes in another thread*/
}
The most common implementation of an ExecutionContext
is
the ForkJoinPool,
which is an advancement over a basic thread-pool implementing an efficient work-stealing algorithm. It is the default
for parallelized applications built around Play! and Akka.
Let’s look at a small program which prints out basic thread information:
import scala.concurrent.{Await, Future}
import scala.concurrent.duration._
def printThreadInfo(id: String) = println {
id + " : " + Thread.currentThread.getName
}
implicit val executionContext = scala.concurrent.ExecutionContext.Implicits.global
printThreadInfo("main")
val fut1 = Future {
printThreadInfo("fut1")
}
Await.result(fut1, 1.second)
//Output:
//> main : main
//> fut1 : ForkJoinPool-1-worker-13
The Future
runs on a different thread from the main. Can we store different values in TLS?
DynamicVariable and ThreadLocal to store values
Scala 2 has a modified version of
Java’s ThreadLocal
called DynamicVariable. DynamicVariable
is
unrelated to dynamic fields in scala.language.dynamics
, it is named that way because it will dynamically store static
variables which have a dynamic value depending on the thread executing.
import scala.concurrent.{Await, Future}
import scala.concurrent.duration._
import scala.util.DynamicVariable
def printThreadInfo(id: String) = println {
id + " : " + Thread.currentThread.getName + " = " + dyn.value
}
//create a dynamic variable
val dyn = new DynamicVariable[Int](0)
implicit val executionContext = scala.concurrent.ExecutionContext.Implicits.global
val fut1 = dyn.withValue(1) {
Future {
printThreadInfo("fut1")
}
}
val fut2 = dyn.withValue(2) {
Future {
printThreadInfo("fut2")
}
}
val fut3 = dyn.withValue(3) {
Future {
printThreadInfo("fut3")
}
}
Await.result(fut1, 1.second)
Await.result(fut2, 1.second)
Await.result(fut3, 1.second)
//Output:
//> fut1 : ForkJoinPool-1-worker-13 = 1
//> fut2 : ForkJoinPool-1-worker-11 = 2
//> fut3 : ForkJoinPool-1-worker-9 = 3
//But wait, threads work when created, what happens if we reuse threads already in the pool?
val fut4 = dyn.withValue(4) {
Future {
printThreadInfo("fut4")
}
}
val fut5 = dyn.withValue(5) {
Future {
printThreadInfo("fut5")
}
}
Await.result(fut4, 1.second)
Await.result(fut5, 1.second)
//Output:
//> fut4 : ForkJoinPool-1-worker-11 = 2
//> fut5 : ForkJoinPool-1-worker-11 = 2
This code encounters the problem that DynamicVariable
will correctly pass on TLS to new threads, but if the thread has
already been created and is being reused from the pool the TLS won’t be copied. The values will have the old value
assigned during its previous use.
Modified ExecutionContext to propagate values
The ExecutionContext
handles all thread scheduling, can we implement logic into the ExecutionContext
to copy our
TLS into the threads before they execute? The trait is very simple: execute
can be modified to include these changes:
/**
* An `ExecutionContext` is an abstraction over an entity that can execute program logic.
*/
trait ExecutionContext {
/** Runs a block of code on this execution context.
*/
def execute(runnable: Runnable): Unit
/** Reports that an asynchronous computation failed.
*/
def reportFailure(t: Throwable): Unit
/** Prepares for the execution of a task. Returns the prepared
* execution context. A valid implementation of `prepare` is one
* that simply returns `this`.
*/
def prepare(): ExecutionContext = this
}
We are using the ForkJoinPool
implementation, so it is the base used to inherit from. If we send
a DynamicVariable
as a constructor parameter, we won’t have to worry about closures.
import scala.concurrent.forkjoin._
class ForkJoinPoolWithDynamicVariable[T](dynamicVariable: DynamicVariable[T])
extends ForkJoinPool {
override def execute(task: Runnable) {
//need to inject dynamicVariable.value into task
super.execute(task)
}
}
So the execute
runs in the main thread, but the task
is run in the Future
‘s thread-pool. We somehow need to inject
the dynamicVariable
inside Runnable
. Let’s make another Runnable
which has closure on dynamicVariable
‘s value,
and then runs task
.
override def execute(task: Runnable) {
val copyValue = dynamicVariable.value
super.execute(new Runnable {
override def run = {
dynamicVariable.value = copyValue
task.run
}
})
}
Basically, the copyValue
reads the dynamicVariable
in the main thread, then while run
is executed in a thread-pool
thread it will assign the proper value to the dynamicVariable
. As T
is generic, it can be any Scala class, including
a Map
, so one DynamicVariable
is flexible enough for most scenarios. All we are left to do is to use our
new ExecutorService
, so replace:
implicit val executionContext = scala.concurrent.ExecutionContext.Implicits.global
With the new class:
val dyn = new DynamicVariable[T](/* default for T */)
implicit val executionContext = scala.concurrent.ExecutionContext.fromExecutorService(
new ForkJoinPoolWithDynamicVariable(dyn)
)
Garbage Collection
A small note about garbage collection. As long as a thread exists, it will maintain references to its ThreadLocal
variables. If the thread-pool does not recycle threads and a thread goes back into the pool without releasing its TLS
then those objects will not be freed. Normally this isn’t an issue, however for larger objects it might be wise to
explicitly release them after use, or use
a WeakReference if behaviour allows.
Article Feedback
ⓘ This blog originally supported comments
Yann on June 26, 2014 at 5:45 am said: In the same spirit, I wrote about passing the slf4j MDC context with Future (the MDC is based on thread local variables) http://yanns.github.io/blog/2014/05/04/slf4j-mapped-diagnostic-context-mdc-with-play-framework |
steven on June 26, 2014 at 12:22 pm said: Very nice Yann! Setting up better logging for a web application pays for itself many times over. :) Your code is eerily similiar to mine – it makes me wonder if this is missing functionality in Play…. When I was working with Finagle, Twitter’s Futures had ThreadLocal functionality built-in (without the need for a custom ExecutionContext). Anything saved using |
Chris Wewerka on September 10, 2014 at 8:08 am said: Very nice post. @steven: as we work with finagle do you know about some doc or blog about how Local’s work there? |
Mayumi on June 26, 2014 at 4:38 pm said: A great post :) Thanks! |
Full Sources: ForkJoinPoolWithDynamicVariableSpec.scala