Ricardo Miranda
23 Dec 2020
•
2 min read
When talking with people new to functional programming currying is the hardest concept to explain. Usually, newcomers are very skeptical and keep asking "why is currying relevant?".
In this article, I will show an example of currying a callback. A callback is a function passed as an argument to another function (a higher-order function).
I created a Kafka consumer that runs in its own thread. To process messages, the Kafka consumer receives a callback with a single argument: a Kafka message to be processed. This way my Kafka consumer abstracts how a message should be processed, postponing this decision to runtime.
Sometimes this callback function requires complex message processing, for example, sending a reply message to a Kafka topic. Let's refer to this complex processing requirement as messaging the processing environment
. The environment
description, name of the reply topic, and everything else is known at run time, reading a configuration file.
The Kafka consumer problem requires heavy usage of Currying. I find this solution aesthetically appealing and I would like to know your opinion.
First I define a Kafka message with headers and a payload (Kafka's value):
/** Abstract message to be sent using a messaging system, for instance, Pub/Sub or Kafka
* attributes are message metadata, payload is the message content
*/
case class Message(attributes: Map[String, String], payload: String)
Here is the Kafka consumer code:
import java.time.Duration
import java.util.{Arrays, Properties}
import com.typesafe.scalalogging.StrictLogging
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecords, KafkaConsumer}
import scala.collection.JavaConverters._
object KafkaSubscriber extends StrictLogging {
/** Creates Properties for the Kafka consumer
*
* @param bootstrap_servers Zookeepers servers
* @param group_id Group ID to which the consumer belongs to
* @param key_deserializer_class Key deserialization
* @param value_deserializer_class Payload deserialization
* @return Properties for the Kafka consumer
*/
def kafkaConsumerSettings(bootstrap_servers: String,
group_id: String,
key_deserializer_class: String,
value_deserializer_class: String
): Properties = {
val properties: Properties = new Properties()
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap_servers)
properties.put(ConsumerConfig.GROUP_ID_CONFIG, group_id)
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, key_deserializer_class)
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, value_deserializer_class)
properties
}
/** Subscribe to a topic consuming it with a callback.
*
* @param consumerSettings Properties for the consumer
* @param topic Topic to read from
* @param callback Function that performs an action over the message
*/
def subscribe(consumerSettings: Properties, topic: String)(callback: Message => Unit): Unit = {
logger.info(s"Starting consumer on Kafka topic: ${topic}.")
val consumer: KafkaConsumer[String, String] = new KafkaConsumer[String, String](consumerSettings)
consumer.subscribe(Arrays.asList(topic))
try {
while (true) {
val records: ConsumerRecords[String, String] = consumer.poll(Duration.ofMillis(100))
records.asScala.foreach(rec => {
val attributes: Map[String, String] =
rec
.headers()
.toArray
.map(x => Tuple2(x.key, new String(x.value)))
.toMap
val payload: String = rec.value()
callback(Message(attributes = attributes, payload = payload))
})
}
} finally {
consumer.close()
}
}
}
Let us look at the signature of a function to be sent as a callback to the above subscribe
function. This callback sends an environment for the Kafka consumer:
/** Callback function to be passed to the Kafka consumer.
*
* @param environment This environment in real-world use case would be a Kafka producer
* or any other complex processing to do in the message consumption.
* @param message The Message to be processed.
*/
def processMessage(environment: String) (message: Message): Unit = {
...
}
One possible use of the processMessage
message is:
subscribe(consumerSettings = kafkaConsumerSettings,
topic = "example_topic")
(callback: processMessage(environment = "Hello World"))
I hope this article has shown the power of currying with higher-order functions. This technique provides the programmer with a clean and easy way to postpone decisions to run time.
Ground Floor, Verse Building, 18 Brunswick Place, London, N1 6DZ
108 E 16th Street, New York, NY 10003
Join over 111,000 others and get access to exclusive content, job opportunities and more!