Apache Kafka for Real-time Event Streaming
Apache Kafka for Real-time Event Streaming Event-driven architecture has become the backbone of modern distributed systems. At Agoda, we process millions of booking events daily, and at CP Axtra, we handle real-time inventory updates across thousands of retail locations. Here’s how Apache Kafka enabled us to build resilient, scalable event streaming platforms. The Business Case Agoda: Booking Platform 50M+ bookings per month Real-time inventory updates Multi-region deployment 99.99% availability requirement CP Axtra: Retail Operations 10,000+ stores across Thailand Real-time inventory synchronization POS system integration Supply chain optimization Architecture Overview ┌─────────────┐ ┌──────────────┐ ┌─────────────┐ │ Producer │───▶│ Kafka │───▶│ Consumer │ │ Services │ │ Cluster │ │ Services │ └─────────────┘ └──────────────┘ └─────────────┘ │ ┌──────────────┐ │ Schema │ │ Registry │ └──────────────┘ Implementation Details Topic Design Strategy # Booking Events booking-events: partitions: 12 replication-factor: 3 retention: 30 days # Inventory Events inventory-events: partitions: 24 replication-factor: 3 retention: 7 days # Payment Events payment-events: partitions: 6 replication-factor: 3 retention: 90 days Producer Configuration @Configuration public class KafkaProducerConfig { @Bean public ProducerFactory<String, Object> producerFactory() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); // Performance optimizations props.put(ProducerConfig.ACKS_CONFIG, "all"); props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); props.put(ProducerConfig.LINGER_MS_CONFIG, 5); props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy"); return new DefaultKafkaProducerFactory<>(props); } } Consumer Implementation @KafkaListener(topics = "booking-events", groupId = "booking-processor") public void handleBookingEvent( @Payload BookingEvent event, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition, @Header(KafkaHeaders.OFFSET) long offset) { try { log.info("Processing booking event: {} from partition: {}, offset: {}", event.getBookingId(), partition, offset); // Process the event bookingService.processBookingEvent(event); // Update metrics meterRegistry.counter("booking.events.processed", "type", event.getEventType()).increment(); } catch (Exception e) { log.error("Failed to process booking event: {}", event.getBookingId(), e); // Send to dead letter queue deadLetterService.send(event, e.getMessage()); } } Schema Evolution with Avro { "type": "record", "name": "BookingEvent", "namespace": "com.agoda.events", "fields": [ {"name": "bookingId", "type": "string"}, {"name": "customerId", "type": "string"}, {"name": "hotelId", "type": "string"}, {"name": "checkIn", "type": "long", "logicalType": "timestamp-millis"}, {"name": "checkOut", "type": "long", "logicalType": "timestamp-millis"}, {"name": "totalAmount", "type": "double"}, {"name": "currency", "type": "string"}, {"name": "status", "type": {"type": "enum", "name": "BookingStatus", "symbols": ["PENDING", "CONFIRMED", "CANCELLED"]}}, {"name": "metadata", "type": ["null", "string"], "default": null} ] } Monitoring and Observability Key Metrics @Component public class KafkaMetrics { private final MeterRegistry meterRegistry; @EventListener public void handleProducerMetrics(ProducerMetricEvent event) { Gauge.builder("kafka.producer.batch.size.avg") .register(meterRegistry, event::getBatchSizeAvg); Gauge.builder("kafka.producer.record.send.rate") .register(meterRegistry, event::getRecordSendRate); } @EventListener public void handleConsumerMetrics(ConsumerMetricEvent event) { Gauge.builder("kafka.consumer.lag.max") .register(meterRegistry, event::getLagMax); Gauge.builder("kafka.consumer.records.consumed.rate") .register(meterRegistry, event::getRecordsConsumedRate); } } Alerting Rules # High Consumer Lag - alert: KafkaHighConsumerLag expr: kafka_consumer_lag_max > 10000 for: 5m labels: severity: warning annotations: summary: "High consumer lag detected" # Producer Error Rate - alert: KafkaHighProducerErrorRate expr: rate(kafka_producer_record_error_total[5m]) > 0.01 for: 2m labels: severity: critical Performance Results Before Kafka Implementation Message processing: 5,000 msg/sec Latency: 500ms average Downtime: 2-3 hours/month Data loss: Occasional during failures After Kafka Implementation Message processing: 100,000 msg/sec Latency: 50ms average Downtime: 0 hours/month Data loss: Zero Best Practices Learned 1. Partition Strategy // Good: Distribute load evenly String partitionKey = customerId + ":" + region; // Bad: Creates hot partitions String partitionKey = "all-events"; 2. Error Handling @RetryableTopic( attempts = "3", backoff = @Backoff(delay = 1000, multiplier = 2.0), dltStrategy = DltStrategy.FAIL_ON_ERROR ) @KafkaListener(topics = "booking-events") public void processBooking(BookingEvent event) { // Process with automatic retry and DLT } 3. Exactly-Once Semantics @Transactional @KafkaListener(topics = "payment-events") public void processPayment(PaymentEvent event) { // Database update and Kafka produce in same transaction paymentRepository.updateStatus(event.getPaymentId(), COMPLETED); kafkaTemplate.send("payment-completed", event); } What’s Next? In the next post, I’ll share our payment system optimization journey and how we achieved 30% performance improvement through strategic caching and database optimization. ...