Skip to content

Commit

Permalink
parquet api
Browse files Browse the repository at this point in the history
  • Loading branch information
jderecho committed Jul 9, 2024
1 parent 79787bd commit 102cf0a
Show file tree
Hide file tree
Showing 11 changed files with 400 additions and 11 deletions.
6 changes: 5 additions & 1 deletion Gemfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
source 'https://rubygems.org'

ruby '3.2.2'
ruby '3.2.4'

##-- base gems for rails --##
gem 'rack-cors', '2.0.0', require: 'rack/cors'
Expand Down Expand Up @@ -175,6 +175,10 @@ gem 'pgvector'
# Convert Website HTML to Markdown
gem 'reverse_markdown'

gem 'gobject-introspection'
gem 'red-arrow'
gem 'red-parquet'

### Gems required only in specific deployment environments ###
##############################################################

Expand Down
29 changes: 27 additions & 2 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ GEM
crack (0.4.5)
rexml
crass (1.0.6)
csv (3.3.0)
csv-safe (3.2.1)
cypress-on-rails (1.16.0)
rack
Expand Down Expand Up @@ -227,6 +228,7 @@ GEM
et-orbi (1.2.7)
tzinfo
execjs (2.8.1)
extpp (0.1.1)
facebook-messenger (2.0.1)
httparty (~> 0.13, >= 0.13.7)
rack (>= 1.4.5)
Expand Down Expand Up @@ -260,6 +262,7 @@ GEM
ffi-compiler (1.0.1)
ffi (>= 1.0.0)
rake
fiddle (1.1.2)
flag_shih_tzu (0.3.23)
foreman (0.87.2)
fugit (1.9.0)
Expand All @@ -274,11 +277,19 @@ GEM
googleauth (~> 1.0)
grpc (~> 1.36)
geocoder (1.8.1)
gio2 (4.2.2)
fiddle
gobject-introspection (= 4.2.2)
gli (2.21.1)
glib2 (4.2.2)
native-package-installer (>= 1.0.3)
pkg-config (>= 1.3.5)
globalid (1.2.1)
activesupport (>= 6.1)
gmail_xoauth (0.4.3)
oauth (>= 0.3.6)
gobject-introspection (4.2.2)
glib2 (= 4.2.2)
google-apis-core (0.11.0)
addressable (~> 2.5, >= 2.5.1)
googleauth (>= 0.16.2, < 2.a)
Expand Down Expand Up @@ -468,6 +479,7 @@ GEM
multi_json (1.15.0)
multi_xml (0.6.0)
multipart-post (2.3.0)
native-package-installer (1.1.9)
neighbor (0.2.3)
activerecord (>= 5.2)
net-http (0.4.1)
Expand Down Expand Up @@ -538,6 +550,7 @@ GEM
activerecord (>= 5.2)
activesupport (>= 5.2)
pgvector (0.1.1)
pkg-config (1.5.6)
procore-sift (1.0.0)
activerecord (>= 6.1)
pry (0.14.2)
Expand Down Expand Up @@ -602,6 +615,15 @@ GEM
rb-fsevent (0.11.2)
rb-inotify (0.10.1)
ffi (~> 1.0)
red-arrow (16.1.0)
bigdecimal (>= 3.1.0)
csv
extpp (>= 0.1.1)
gio2 (>= 3.5.0)
native-package-installer
pkg-config
red-parquet (16.1.0)
red-arrow (= 16.1.0)
redis (5.0.6)
redis-client (>= 0.9.0)
redis-client (0.22.1)
Expand Down Expand Up @@ -873,6 +895,7 @@ DEPENDENCIES
foreman
geocoder
gmail_xoauth
gobject-introspection
google-cloud-dialogflow-v2
google-cloud-storage
google-cloud-translate-v3
Expand Down Expand Up @@ -917,6 +940,8 @@ DEPENDENCIES
rack-mini-profiler (>= 3.2.0)
rack-timeout
rails (~> 7.0.8.1)
red-arrow
red-parquet
redis
redis-namespace
responders (>= 3.1.1)
Expand Down Expand Up @@ -960,7 +985,7 @@ DEPENDENCIES
working_hours

RUBY VERSION
ruby 3.2.2p185
ruby 3.2.4p170

BUNDLED WITH
2.4.6
2.5.14
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,14 @@ class Api::V1::Accounts::CsatSurveyResponsesController < Api::V1::Accounts::Base

sort_on :created_at, type: :datetime

def index; end
def index
if params[:export_as_parquet]
file_name = "csat_surveys_#{Time.now.to_i}.parquet"
Digitaltolk::StoreSurveyResponsesParquetJob.perform_later(@csat_survey_responses.pluck(:id), file_name)

render json: { file_url: Digitaltolk::SurveyResponsesParquetService.new([], file_name).perform }.to_json and return
end
end

def metrics
@total_count = @csat_survey_responses.count
Expand Down
12 changes: 10 additions & 2 deletions app/controllers/api/v1/accounts/messages_controller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,16 @@ class Api::V1::Accounts::MessagesController < Api::V1::Accounts::BaseController

before_action :set_messages, only: [:index]
before_action :set_current_page, only: [:index]
before_action :set_current_page_messages, only: [:index]

def index; end
def index
if params[:export_as_parquet]
file_name = "messages_#{Time.now.to_i}.parquet"
Digitaltolk::StoreMessagesParquetJob.perform_later(@messages.pluck(:id), file_name)

render json: { file_url: Digitaltolk::MessagesParquetService.new([], file_name).perform }.to_json and return
end
end

private

Expand All @@ -18,7 +26,7 @@ def set_messages
.order(created_at: :desc)
end

def current_page_messages
def set_current_page_messages
@messages = @messages.page(@current_page).per(RESULTS_PER_PAGE) if params[:page].present?
end

Expand Down
9 changes: 9 additions & 0 deletions app/jobs/digitaltolk/store_messages_parquet_job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
class Digitaltolk::StoreMessagesParquetJob < ApplicationJob
queue_as :default

def perform(message_ids, file_name)
@messages = Message.where(id: message_ids)

Digitaltolk::MessagesParquetService.new(@messages, file_name).perform
end
end
11 changes: 11 additions & 0 deletions app/jobs/digitaltolk/store_survey_responses_parquet_job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
class Digitaltolk::StoreSurveyResponsesParquetJob < ApplicationJob
queue_as :default

def perform(survey_response_ids, file_name)
@survey_responses = SurveyResponse.where(id: survey_response_ids)

if @survey_responses.present?
Digitaltolk::SurveyResponsesParquetService.new(@survey_responses, file_name).perform
end
end
end
55 changes: 51 additions & 4 deletions docker/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# pre-build stage
FROM ruby:3.2.2-alpine3.18 AS pre-builder
FROM ruby:3.2.4-alpine3.20 AS pre-builder

# ARG default to production settings
# For development docker-compose file overrides ARGS
Expand Down Expand Up @@ -28,6 +28,27 @@ RUN apk update && apk add --no-cache \
nodejs-current \
yarn \
git \
glib \
gobject-introspection \
gobject-introspection-dev \
libarrow \
libarrow_acero \
libparquet \
&& apk add --no-cache --virtual=.build-dependencies\
apache-arrow-dev \
glib-dev \
meson \
pkgconf \
samurai \
&& wget "https://www.apache.org/dyn/closer.lua?action=download&filename=arrow/arrow-16.1.0/apache-arrow-16.1.0.tar.gz" \
&& tar xf apache-arrow-16.1.0.tar.gz && \
meson setup \
apache-arrow-16.1.0/c_glib.build \
apache-arrow-16.1.0/c_glib \
--prefix=/usr \
--buildtype=minsize \
&& meson install -C apache-arrow-16.1.0/c_glib.build \
&& rm -rf apache-arrow-16.1.0 \
&& mkdir -p /var/app \
&& gem install bundler

Expand All @@ -44,8 +65,8 @@ RUN bundle config set --local force_ruby_platform true

# Do not install development or test gems in production
RUN if [ "$RAILS_ENV" = "production" ]; then \
bundle config set without 'development test'; bundle install -j 4 -r 3; \
else bundle install -j 4 -r 3; \
bundle config set without 'development test'; MAKEFLAGS=-j4 bundle install -j 4 -r 3; \
else MAKEFLAGS=-j4 bundle install -j 4 -r 3; \
fi

COPY package.json yarn.lock ./
Expand Down Expand Up @@ -73,7 +94,7 @@ RUN rm -rf /gems/ruby/3.2.0/cache/*.gem \
&& rm .gitignore

# final build stage
FROM ruby:3.2.2-alpine3.18
FROM ruby:3.2.4-alpine3.20


ARG BUNDLE_WITHOUT="development:test"
Expand All @@ -93,6 +114,9 @@ ARG RAILS_ENV=production
ENV RAILS_ENV ${RAILS_ENV}
ENV BUNDLE_PATH="/gems"

ENV GI_TYPELIB_PATH=/usr/lib/girepository-1.0
ENV LD_LIBRARY_PATH=/usr/lib/

RUN apk update && apk add --no-cache \
build-base \
openssl \
Expand All @@ -101,12 +125,35 @@ RUN apk update && apk add --no-cache \
imagemagick \
git \
vips \
glib \
gobject-introspection \
gobject-introspection-dev \
libarrow \
libarrow_acero \
libparquet \
&& apk add --no-cache --virtual=.build-dependencies\
apache-arrow-dev \
glib-dev \
meson \
pkgconf \
samurai \
&& wget "https://www.apache.org/dyn/closer.lua?action=download&filename=arrow/arrow-16.1.0/apache-arrow-16.1.0.tar.gz" \
&& tar xf apache-arrow-16.1.0.tar.gz && \
meson setup \
apache-arrow-16.1.0/c_glib.build \
apache-arrow-16.1.0/c_glib \
--prefix=/usr \
--buildtype=minsize \
&& meson install -C apache-arrow-16.1.0/c_glib.build \
&& rm -rf apache-arrow-16.1.0 \
&& gem install bundler

RUN if [ "$RAILS_ENV" != "production" ]; then \
apk add --no-cache nodejs-current yarn; \
fi

RUN apk del .build-dependencies

COPY --from=pre-builder /gems/ /gems/
COPY --from=pre-builder /app /app

Expand Down
102 changes: 102 additions & 0 deletions lib/digitaltolk/messages_parquet_service.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
require 'parquet'

class Digitaltolk::MessagesParquetService
attr_accessor :messages, :file_name

def initialize(messages, file_name)
@messages = messages
@file_name = file_name
end

# @return [Hash]
def perform
process_upload
end

private

def process_upload
export_parquet
end

def file_path
@file_path ||= Rails.root.join('tmp', file_name)
end

def arrow_fields
[
Arrow::Field.new('id', :int32),
Arrow::Field.new('content', :string),
Arrow::Field.new('inbox_id', :int32),
Arrow::Field.new('conversation_id', :int32),
Arrow::Field.new('message_type', :string),
Arrow::Field.new('content_type', :string),
Arrow::Field.new('status', :string),
Arrow::Field.new('created_at', :int32),
Arrow::Field.new('private', :boolean),
Arrow::Field.new('source_id', :int32)
]
end

def arrow_schema
Arrow::Schema.new(arrow_fields)
end

def initialize_columns
@columns = {}

arrow_fields.each do |f|
@columns[f.name] = []
end
end

def load_columns_data
messages.each do |message|
@columns['id'] << message.id
@columns['content'] << message.content
@columns['inbox_id'] << message.inbox_id
@columns['conversation_id'] << message.conversation.display_id
@columns['message_type'] << message.message_type_before_type_cast
@columns['content_type'] << message.content_type
@columns['status'] << message.status
@columns['created_at'] << message.created_at.to_i
@columns['private'] << message.private
@columns['source_id'] << message.source_id
end
end

def map_columns_array
[
Arrow::Int32Array.new(@columns['id']),
Arrow::StringArray.new(@columns['content']),
Arrow::Int32Array.new(@columns['inbox_id']),
Arrow::Int32Array.new(@columns['conversation_id']),
Arrow::StringArray.new(@columns['message_type']),
Arrow::StringArray.new(@columns['content_type']),
Arrow::StringArray.new(@columns['status']),
Arrow::Int32Array.new(@columns['created_at']),
Arrow::BooleanArray.new(@columns['private']),
Arrow::Int32Array.new(@columns['source_id'])
]
end

def arrow_columns
initialize_columns
load_columns_data
map_columns_array
end

def export_parquet
record_batch = Arrow::RecordBatch.new(arrow_schema, @messages.count, arrow_columns)
record_batch.to_table.save(file_path)

url = Digitaltolk::UploadToS3.new(file_path).perform

# Delete the file after upload
File.delete(file_path) rescue nil
url
end

def export_empty_file
end
end
Loading

0 comments on commit 102cf0a

Please sign in to comment.