Thrift Client Side Caching to Speed Up Unit Tests
One of the largest headaches associated with network system architecture is abstracting away the network. External resources are always slower and more disjoint than working locally. While there are various caching techniques, few are suitable for use in a development environment.
Client-side unit tests usually only have two options: executing calls against a deployed server thereby struggling against long waits per testing iteration, or having all calls tediously mocked out.
An alternative approach available within Finagle: a pre-populated query cache on the client side. In my article Finagle Query Cache with Guava the idea of using a Filter as a means of intercepting and short circuiting service calls was demonstrated. Instead of filling the cache at runtime, the known service calls can be loaded into a static map at compile.
Capturing the necessary unit test request/response pairs is quite simple using a Finagle LoggingFilter, but it’s hard to handle the binary output. A cleaner, and more developer friendly approach is to translate the binary data into JSON, an approach broached in Developer Friendly Thrift Request Logging.
The advantage of re-encoding to TJSONProtocol
is that mocked data can be contained directly within Scala files as
opposed to external resources. Another advantage is the ability to log human readable output directly to the console.
Using ProtocolHelpers
class previously constructed in Developer Friendly Thrift Request Logging we can execute our TProtocol↔TJSONProtocol
reserialization:
import com.twitter.finagle.{ Service, SimpleFilter }
import com.twitter.finagle.thrift.ThriftClientRequest
import com.twitter.util.Future
import org.apache.thrift.transport._
import org.apache.thrift.protocol._
import ProtocolHelpers.reserialize
/**
* Returns response from supplied TJSONProtocol log file,
* matching on serialization of request (excluding SeqId).
*/
class MockJSONDataFilter(
finagleServiceObject: AnyRef,
jsonLog: Seq[(String, String)],
thriftEncoding: TProtocolFactory = new TBinaryProtocol.Factory)
extends SimpleFilter[ThriftClientRequest, Array[Byte]] {
val logEncoding = new TJSONProtocol.Factory
/** All request/response pairs */
lazy val requestResponses: Map[String, Array[Byte]]
/** Change SeqId within any TProtocol */
def changeSeqId(requestOrResponse: Array[Byte], protocolFactory: TProtocolFactory,
seqId: Int = 0): (Int, Array[Byte])
def apply(request: ThriftClientRequest, service: Service[ThriftClientRequest, Array[Byte]])
: Future[Array[Byte]]
}
The thriftEncoding
is what is transmitted over the wire, it can be any TProtocol
. The jsonLog
is assumed to use
the TJSONProtocol
since it is the only full-featured human readable String TProtocol
in Thrift.
The approach taken is to populate the requestResponses cache with the supplied jsonLog
, and throw an exception if any
requests are made that aren’t in this cache (this will help developers running the unit tests know they need to update
their data). Just like other caches, Thrift’s SeqId
must be zeroed out to correctly match requests. Previously we
directly modified the SeqId Int32
within the Array[Byte]
, this time let’s try an alternative approach (just for
kicks), and use TProtocol
’s built in functions:
def changeSeqId(
requestOrResponse: Array[Byte],
protocolFactory: TProtocolFactory,
seqId: Int = 0): (Int, Array[Byte]) = {
val inputTransport = new TMemoryInputTransport(requestOrResponse)
val inputProtocol = protocolFactory.getProtocol(inputTransport)
//pull out the TMessage header
val inputMessage = inputProtocol.readMessageBegin
//find all data past the header
val remainingBytes = inputTransport.getBytesRemainingInBuffer
val l = requestOrResponse.length
val remainingInputMessage = requestOrResponse.slice(l - remainingBytes, l)
//construct a new Array[Byte] using a TMemoryBuffer
val outputTransport = new TMemoryBuffer(requestOrResponse.length)
val outputProtocol = protocolFactory.getProtocol(outputTransport)
//replacement TMessage with our SeqId
val message0 = new TMessage(inputMessage.name, inputMessage.`type`, seqId)
outputProtocol.writeMessageBegin(message0)
val requestOrResponse0 = outputTransport.getArray.slice(0, outputTransport.length)
//json protocols expect the next strut to write commas,
// we need first struct to add it. Try writing a new empty
// struct and see if anything is added.
outputProtocol.writeStructBegin(null)
val jsonCommaFix = if (outputTransport.length > requestOrResponse0.length) {
//this is a complete hack, we only want the first byte added
val l = requestOrResponse.length - remainingBytes
requestOrResponse.slice(l - 1, l)
} else Array[Byte]()
(inputMessage.seqid, requestOrResponse0 ++ jsonCommaFix ++ remainingInputMessage)
}
Reflecting on how we just hacked together conditional logic to handle JSON serialization means we’ve written some
brittle code. Our code does now however handle both TBinaryProtocol
and TJSONProtocol
which is kind of nice, so
let’s trudge forward by completing requestResponses.
lazy val requestResponses: Map[String, Array[Byte]] = jsonLog.map {
case (request, response) => {
val request0 = changeSeqId(request.getBytes("UTF-8"), logEncoding)._2
val reserializedResponse = reserialize(finagleServiceObject,
response.getBytes("UTF-8"), thriftEncoding, logEncoding)
(new String(request0, "UTF-8"), reserializedResponse)
}
}.toMap
This code reads in our TJSONProtocol
log data and populates a request/response dictionary. The keys are JSON – since
it might be helpful to developers to know what’s in the map, but we are choosing to re-encode all responses to
TBinaryProtocol since that’s what the client expects to be returned.
With the requestResponses
“cache” pre-populated on initialization, all we need to do is map incoming requests to its
entries.
Each request gets its SeqID
zero’d out, re-encoded to JSON, and then compared to what is in the requestResponses
map.
def apply(request: ThriftClientRequest, service: Service[ThriftClientRequest, Array[Byte]])
: Future[Array[Byte]] = {
val (oldSeqId, request0) = changeSeqId(request.message, thriftEncoding)
val requestKeyJson = new String(reserialize(finagleServiceObject, request0,
logEncoding, thriftEncoding), "UTF-8")
//try and match request
val response = requestResponses.getOrElse(requestKeyJson, {
return Future.exception(
new Exception(s"Request signature not found in mock data: $requestKeyJson"))
})
Future.value(changeSeqId(response, thriftEncoding, oldSeqId)._2)
}
That’s it. A sample usage, using a hard-coded log, would be
/**
* 2 mocked requests:
* getOrderIds(1) => (3)
* getOrderIds(2) => (8,9)
*/
val log = Seq(
("""[1,"getOrderIds",1,0,{"1":{"i32":1}}]""","""[1,"getOrderIds",2,0,{"0":{"lst":["rec",1,{"1":{"i32":3}}]}}]"""),
("""[1,"getOrderIds",1,0,{"1":{"i32":2}}]""","""[1,"getOrderIds",2,0,{"0":{"lst":["rec",2,{"1":{"i32":8}},{"1":{"i32":9}}]}}]""")
)
val mockedData = new MockJSONDataFilter(TestApi, log)
val service = ClientBuilder()
.codec(ThriftClientFramedCodec())
.hosts(new InetSocketAddress("localhost", 10000))
.hostConnectionLimit(1)
.build()
val client = new TestApi.FinagledClient(mockedData andThen service)
val fromCache1 = client.getOrderIds(1)
val fromCache2 = client.getOrderIds(2)
Full Sources: