Skip to content
@ruby-kafka-poc

ruby-kafka-poc

Kafka & rails

PoC

PoC that use Kafka with Confluent

  • customers_api: Handle customers and organizations
  • security_api: Handle user access
  • orders_api: Handle orders and invoices
  • notifications_api: Send emails

Confluent have a way to maintain updated data that comes from different sources. ktables (let's think that as a cache service) With this the order api doesn't need to know the organization name, or the customer email. Just sharing ID's is good enough.

sequenceDiagram
    customer_api->>+kafka: organization_created
    order_api->>+kafka: order_created
    order_api->>+kafka: invoice_created
    kafka-->>+notification_api: invoice_created
    notification_api-->>+ktables: get invoice related data
    ktables-->>notification_api: invoice & organization data
    notification_api-->>-postmark: send email
Loading

Produce events

The rails_kafka_integration is a gem that handle work as a producer

As producer, can be called in 2 ways. Directly with KafkaRailsIntegration::Producer.produce or with the model callbacks using the concern KafkaRailsIntegration::Concerns::Model::Eventeable that will send after commits the events. The gem will handle all the events during the callback, and the KafkaRailsIntegration::Middlewares::DeliverMessages will send all together right before the callback ends.

orders_api, customers_api and security_api use it.

Consume events

notifications_api consumes the events with karafka gem. In order to do it, need to be executed be rake kafka_consumers:start this process will be waiting for a new event and process it. There are many ways to configure karafka so you need to check the documentation.

Run

# $0>
git clone [email protected]:ruby-kafka-poc/customers_api.git
git clone [email protected]:ruby-kafka-poc/orders_api.git
git clone [email protected]:ruby-kafka-poc/security_api.git
git clone [email protected]:ruby-kafka-poc/notifications_api.git

echo "
---
  version: '3.5'
  
  networks:
    net:
      name: internal_net

  services:

    db:
      image: postgres:14.2-alpine
      restart: always
      networks:
        - net
      environment:
        - POSTGRES_USER=postgres
        - POSTGRES_PASSWORD=postgres
      ports:
        - 5432:5432
      volumes:
        - db:/var/lib/postgresql/data
  
  volumes:
    db:
      driver: local
" >> docker-compose.yml

docker-compose up -d

open https://confluent.cloud
# Create a free account (no credit card needed free for 1 month), create a cluster,
# create a global API key
# go to cluster, cluster overview, cluster settings and get the bootstrap server

# WARNING DO NOT COMMIT THIS THINGS ANYWHERE!!!
echo "
BOOTSTRAP_SERVERS=AAAAAAAAAA:9092
SECURITY_PROTOCOL=sasl_ssl
SASL_MECHANISM=PLAIN
SASL_USERNAME=BBBBBB
SASL_PASSWORD=CCCCCCC
" >> .private_env

# open 4 terminals

# $1>
cd customers_api
cp ../.private_env .
export $(cat ./.private_env | sed 's/#.*//g' | xargs )
bundle install
bundle update
bundle exec rake db:create db:migrate db:seed
bundle exec rails s -p 3001 -b '0.0.0.0'

# $2>
cd orders_api
cp ../.private_env .
export $(cat ./.private_env | sed 's/#.*//g' | xargs )
bundle install
bundle update
bundle exec rake db:create db:migrate db:seed
bundle exec rails s -p 3002 -b '0.0.0.0'

# $3>
cd notifications_api
cp ../.private_env .
export $(cat ./.private_env | sed 's/#.*//g' | xargs )
bundle install
bundle update
bundle exec rake db:create db:migrate db:seed
bundle exec rails s -p 3003 -b '0.0.0.0'

# $4>
cd security_api
cp ../.private_env .
export $(cat ./.private_env | sed 's/#.*//g' | xargs )
bundle install
bundle update
bundle exec rake db:create db:migrate db:seed
bundle exec rails s -p 3004 -b '0.0.0.0'

# lastly we need another terminal to observe kafka, and receive events
cd notifications_api
cp ../.private_env .
export $(cat ./.private_env | sed 's/#.*//g' | xargs )
bundle exec karafka server

IMAGE

Generate some content

curl --location -g --request POST 'http://localhost:3001/organizations' \
--header 'Content-Type: application/json' \
--data-raw '{ "organization": {"name": "SuperCompany" }}'

curl --location -g --request POST 'http://localhost:3001/organizations' \
--header 'Content-Type: application/json' \
--data-raw '{ "organization": {"name": "SomeCompany" }}'

curl --location -g --request POST 'http://localhost:3001/customers' \
--header 'Content-Type: application/json' \
--data-raw '{ "customer": {"name": "John", "last_name": "Do", "email": "[email protected]"}}'

curl --location -g --request POST 'http://localhost:3001/customers' \
--header 'Content-Type: application/json' \
--data-raw '{ "customer": {"name": "Matt", "last_name": "Berrueta", "email": "[email protected]"}}'

# UPDATE 1 ORG
curl --location -g --request PUT 'http://localhost:3001/organizations/13' \
--header 'Content-Type: application/json' \
--data-raw '{ "organization": {"name": "SuperCompany Tech" }}'

curl --location -g --request POST 'http://localhost:3002/orders' \
--header 'Content-Type: application/json' \
--data-raw '{
  "po": "po002",
  "organization_id": 13,
  "customer_id": 10,
  "date": "2022-05-07T07:50:03-00:00",
  "deliver_date": "2022-05-07T07:53:03-00:00",
  "state": "pending"
}'

KStream and KTables

Here is where things start to become pretty

We send events from our application, but eventually you will need data from other application (or unrelated event) K-Streams allow to stream events from different sources, joining or manipulating the payload. K-Tables, allow to create a view of an object that comes from 1 or multiple events and allow to get the LAST value of a field from the last event (this is super cool).

sequenceDiagram
    customer_api->>+kafka: organization_created
    customer_api->>+kafka: customer_created name A
    order_api->>+kafka: order_created
    kafka-->>+notification_api: invoice_created
    notification_api-->>+ktables: send order name A
    customer_api->>+kafka: customer_edited name B
    order_api->>+kafka: order_created
    kafka-->>+notification_api: invoice_created
    notification_api-->>+ktables: send order name B
Loading

So notification_api never received a notification that the customer changed his name, even do, the email is with correct value.

-- Organization
CREATE STREAM organizations_stream ( id INTEGER, name VARCHAR)
  WITH (kafka_topic='organizations', value_format='json', partitions=1);

CREATE TABLE organizations_table AS
SELECT id ,
       LATEST_BY_OFFSET(name) AS name
FROM organizations_stream
GROUP BY id
    EMIT CHANGES;

-- Customer
CREATE STREAM customers_stream ( id INTEGER, name VARCHAR , last_name VARCHAR , email VARCHAR, action VARCHAR)
WITH (kafka_topic='customers', value_format='json', partitions=1);

CREATE TABLE customers_table AS
SELECT id ,
       LATEST_BY_OFFSET(name) AS name,
       LATEST_BY_OFFSET(last_name) AS last_name,
       LATEST_BY_OFFSET(email) AS email,
       LATEST_BY_OFFSET(action) AS action
  FROM customers_stream
  GROUP BY id
  EMIT CHANGES;

CREATE TABLE customer_created AS
SELECT id ,
       name,
       last_name,
       email,
    action
    FROM customers_table
    WHERE action = 'created'
    EMIT CHANGES;

-- Order
CREATE STREAM orders_stream ( id INTEGER, po VARCHAR , organization_id INTEGER , customer_id INTEGER)
WITH (kafka_topic='orders', value_format='json', partitions=1);

DROP STREAM orders_created;
CREATE STREAM orders_created AS
SELECT  os.id ,
        os.po,
        os.organization_id,
        ot.name as organization_name,
        os.customer_id,
        ct.name as customer_name,
        ct.last_name AS customer_last_name,
        ct.email AS email
FROM orders_stream os
         INNER JOIN organizations_table ot ON os.organization_id = ot.id
         INNER JOIN customers_table ct ON os.customer_id = ct.id
    EMIT CHANGES;

Pinned Loading

  1. .github .github Public

Repositories

Showing 7 of 7 repositories
  • .github Public
    ruby-kafka-poc/.github’s past year of commit activity
    0 0 0 0 Updated Oct 7, 2022
  • security_api Public
    ruby-kafka-poc/security_api’s past year of commit activity
    Ruby 0 0 1 0 Updated Oct 7, 2022
  • ruby-kafka-poc/notifications_api’s past year of commit activity
    Ruby 0 0 0 0 Updated Oct 7, 2022
  • orders_api Public
    ruby-kafka-poc/orders_api’s past year of commit activity
    Ruby 0 0 0 0 Updated Oct 7, 2022
  • customers_api Public
    ruby-kafka-poc/customers_api’s past year of commit activity
    Ruby 0 0 0 0 Updated Oct 7, 2022
  • rails_kafka_integration Public

    Integrate Kafka producer and consumers with rails

    ruby-kafka-poc/rails_kafka_integration’s past year of commit activity
    Ruby 0 MIT 0 0 0 Updated Oct 7, 2022
  • ui_api Public

    front face

    ruby-kafka-poc/ui_api’s past year of commit activity
    Ruby 0 MIT 0 0 0 Updated Jun 20, 2022

People

This organization has no public members. You must be a member to see who’s a part of this organization.

Top languages

Loading…

Most used topics

Loading…