Spinning Apache Kafka® Microservices With Cadence Workflows

1. Sending a Message from Cadence to Kafka

Properties kafkaProps = new Properties();try (FileReader fileReader = new FileReader("producer.properties")) {kafkaProps.load(fileReader);} catch (IOException e) {e.printStackTrace();}
bootstrap.servers=IP1:9092,IP2:9092,IP3:9092key.serializer=org.apache.kafka.common.serialization.StringSerializervalue.serializer=org.apache.kafka.common.serialization.StringSerializersecurity.protocol=SASL_PLAINTEXTsasl.mechanism=SCRAM-SHA-256sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \username="kafakUser" \password="kafkaPassword";
public interface ExampleActivities {@ActivityMethod(scheduleToCloseTimeoutSeconds = 60)String sendMessageToKafka(String msg);}public static class ExampleActivitiesImpl implements ExampleActivities {public String sendMessageToKafka(String msg) {// The id is the workflow idString id = Activity.getWorkflowExecution().getWorkflowId();// Record contains: topic, key (null), valueProducerRecord<String, String> producerRecord = new ProducerRecord<>("topic1", "", msg);// Add the workflow id to the Record HeaderproducerRecord.headers().add(new RecordHeader("Cadence_WFID", id.getBytes()));try (KafkaProducer<String, String> producer = new KafkaProducer<>(kafkaProps)) {producer.send(producerRecord);producer.flush();} catch (Exception e) {e.printStackTrace();}return "done";}}

2. Introducing Cadence Signals

public interface ExampleWorkflow {@WorkflowMethod(executionStartToCloseTimeoutSeconds = 120, taskList = activityName)String startWorkflow(String name);@SignalMethodvoid signalKafkaReply(String name);}
public static class ExampleWorkflowImpl implements ExampleWorkflow { String message = "";private ExampleActivities activities = null;public ExampleWorkflowImpl() {this.activities = Workflow.newActivityStub(ExampleActivities.class);}@Overridepublic String startWorkflow(String msg) {String r1 = activities.sendMessageToKafka(msg);Workflow.await(() -> message != "");System.out.println("workflow got signal = " + message);return message;}}@Overridepublic void signalKafkaReply(String msg) {message = msg;}}

2. Using Cadence Signals to Integrate With Kafka: First Approach

  1. realize that the record originated from Cadence, and
  2. send the response back to the correct workflow.
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaProps)) {consumer.subscribe(Collections.singleton("topic1"));while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {String id = "";// If the header has Cadence_WFID key then we send the response to Cadencefor (Header header : record.headers()) {if (header.key().matches("Cadence_WFID"))id = new String(header.value());}if (id != ""){ExampleWorkflow workflowById =                            workflowClient.newWorkflowStub(ExampleWorkflow.class, id);workflowById.signalKafkaReply("Kafka consumer received value = " + record.value());}// else don't send signal! But do whatever is normal}}

3. Cadence Activity Completions: Second Approach

public static class ExampleWorkflowImpl implements ExampleWorkflow {private ExampleActivities activities = null;public ExampleWorkflowImpl() {this.activities = Workflow.newActivityStub(ExampleActivities.class);}@Overridepublic String startWorkflow2(String msg) {String r1 = activities.sendMessageToKafka2(msg);return r1;}}
public String sendMessageToKafka2(String msg) {ProducerRecord<String, String> producerRecord = new ProducerRecord<>("topic1", "", name);// The task token is now used to correlate reply.byte[] taskToken = Activity.getTaskToken();producerRecord.headers().add(new RecordHeader("Cadence_TASKID", taskToken));try (KafkaProducer<String, String> producer = new KafkaProducer<>(kafkaProps)) {producer.send(producerRecord);producer.flush();} catch (Exception e) {e.printStackTrace();}// Don’t complete - wait for completion callActivity.doNotCompleteOnReturn();// the return value is replaced by the completion valuereturn "ignored";}
ActivityCompletionClient completionClient = workflowClient.newActivityCompletionClient();try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaProps)) {consumer.subscribe(Collections.singleton("topic1"));while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {byte[] task = null;// If the header has key = Cadence_TASKID then send response to Cadence task using completionfor (Header header : record.headers()) {if (header.key().matches("Cadence_TASKID"))task = Arrays.copyOf(header.value(), header.value().length);}if (task != null)completionClient.complete(task, "Kafka consumer received value = " + record.value());// else process as normal}}

4. Kafka With Cadence: Conclusions

--

--

Managed platform for open source technologies including Apache Cassandra, Apache Kafka, Apache ZooKeepere, Redis, Elasticsearch and PostgreSQL

Love podcasts or audiobooks? Learn on the go with our new app.

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Instaclustr

Instaclustr

Managed platform for open source technologies including Apache Cassandra, Apache Kafka, Apache ZooKeepere, Redis, Elasticsearch and PostgreSQL