Skip to content

Commit

Permalink
Merge pull request #85 from theablefew/esmarkowski/add-ingest-pipelines
Browse files Browse the repository at this point in the history
  • Loading branch information
esmarkowski authored Mar 17, 2024
2 parents 19fac5e + eeafa50 commit 4881686
Show file tree
Hide file tree
Showing 10 changed files with 501 additions and 2 deletions.
5 changes: 5 additions & 0 deletions lib/stretchy/common.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ def default_size(size = nil)
@default_size
end

def default_pipeline(pipeline = nil)
@default_pipeline = pipeline.to_s unless pipeline.nil?
@default_pipeline
end

private

# Return a Relation instance to chain queries
Expand Down
10 changes: 8 additions & 2 deletions lib/stretchy/delegation/gateway_delegation.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,20 @@ def index_name(name=nil, &block)
end
end

def index_settings(settings={})
@index_settings ||= settings
@index_settings.merge!(default_pipeline: default_pipeline.to_s) if default_pipeline
@index_settings.with_indifferent_access
end

def reload_gateway_configuration!
@gateway = nil
end

def gateway(&block)
reload_gateway_configuration! if @gateway && @gateway.client != Stretchy.configuration.client
@gateway ||= Stretchy::Repository.create(client: Stretchy.configuration.client, index_name: index_name, klass: base_class, mapping: base_class.attribute_mappings.merge(dynamic: true))

@gateway ||= Stretchy::Repository.create(client: Stretchy.configuration.client, index_name: index_name, klass: base_class, mapping: base_class.attribute_mappings.merge(dynamic: true), settings: index_settings)
# block.arity < 1 ? @gateway.instance_eval(&block) : block.call(@gateway) if block_given?
@gateway
end
Expand Down
123 changes: 123 additions & 0 deletions lib/stretchy/pipeline.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
module Stretchy
class Pipeline
cattr_reader :client do
Stretchy.configuration.client.ingest
end

class << self
attr_accessor :description, :pipeline_name, :processors

def pipeline_name(name = nil)
return @pipeline_name if name.nil? && @pipeline_name
@pipeline_name = name || self.name.split('::').last.underscore
end

def description(desc = nil)
@description = desc if desc
@description
end

def processor(type, opts = {})
@processors ||= []
@processors << Stretchy::Pipelines::Processor.new(type, opts)
end

def all
begin
client.get_pipeline
rescue not_found => e
return {}
end
end

def find(id)
client.get_pipeline(id: id)
end

def simulate(docs, verbose: true)
client.simulate(id: self.pipeline_name, body: {docs: docs}, verbose: verbose)
end

# PUT _ingest/pipeline/<pipeline-name>
def create!
client.put_pipeline(id: self.pipeline_name, body: self.to_hash)
end

# DELETE _ingest/pipeline/<pipeline-name>
def delete!
client.delete_pipeline(id: self.pipeline_name)
end

def exists?
begin
self.find(self.pipeline_name).present?
rescue not_found => e
return false
end
end


def to_hash
{
description: self.description,
processors: self.processors.map(&:to_hash)
}.as_json
end

protected
def not_found
@not_found ||= Object.const_get("#{client.class.name.split('::').first}::Transport::Transport::Errors::NotFound")
end

end

attr_accessor :description, :pipeline_name, :processors

def initialize
@description = self.class.description
@pipeline_name = self.class.pipeline_name
@processors = self.class.processors
end

# GET _ingest/pipeline/<pipeline-name>
def find
self.class.find(self.pipeline_name)
end

# Simulates the pipeline.
#
# Request body fields
#
# The following table lists the request body fields used to run a pipeline.
#
# Field Required Type Description
# docs Required Array The documents to be used to test the pipeline.
# pipeline Optional Object The pipeline to be simulated. If the pipeline identifier is not included, then the response simulates the latest pipeline created.
# The docs field can include subfields listed in the following table.
#
# Field Required Type Description
# source Required Object The document’s JSON body.
# id Optional String A unique document identifier. The identifier cannot be used elsewhere in the index.
# index Optional String The index where the document’s transformed data appears.
def simulate(docs, verbose: true)
self.class.simulate(docs, verbose: verbose)
end

def exists?
self.class.exists?
end

def to_hash
{
description: self.description,
processors: self.processors.map(&:to_hash)
}.as_json
end

def client
@client ||= Stretchy.configuration.client.ingest
end


end
end
55 changes: 55 additions & 0 deletions lib/stretchy/pipelines/processor.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
module Stretchy::Pipelines
# Creates a new processor for a pipeline
#
# Processor type Description
# append Adds one or more values to a field in a document.
# bytes Converts a human-readable byte value to its value in bytes.
# convert Changes the data type of a field in a document.
# copy Copies an entire object in an existing field to another field.
# csv Extracts CSVs and stores them as individual fields in a document.
# date Parses dates from fields and then uses the date or timestamp as the timestamp for a document.
# date_index_name Indexes documents into time-based indexes based on a date or timestamp field in a document.
# dissect Extracts structured fields from a text field using a defined pattern.
# dot_expander Expands a field with dots into an object field.
# drop Drops a document without indexing it or raising any errors.
# fail Raises an exception and stops the execution of a pipeline.
# foreach Allows for another processor to be applied to each element of an array or an object field in a document.
# geoip Adds information about the geographical location of an IP address.
# geojson-feature Indexes GeoJSON data into a geospatial field.
# grok Parses and structures unstructured data using pattern matching.
# gsub Replaces or deletes substrings within a string field of a document.
# html_strip Removes HTML tags from a text field and returns the plain text content.
# ip2geo Adds information about the geographical location of an IPv4 or IPv6 address.
# join Concatenates each element of an array into a single string using a separator character between each element.
# json Converts a JSON string into a structured JSON object.
# kv Automatically parses key-value pairs in a field.
# lowercase Converts text in a specific field to lowercase letters.
# pipeline Runs an inner pipeline.
# remove Removes fields from a document.
# script Runs an inline or stored script on incoming documents.
# set Sets the value of a field to a specified value.
# sort Sorts the elements of an array in ascending or descending order.
# sparse_encoding Generates a sparse vector/token and weights from text fields for neural sparse search using sparse retrieval.
# split Splits a field into an array using a separator character.
# text_embedding Generates vector embeddings from text fields for semantic search.
# text_image_embedding Generates combined vector embeddings from text and image fields for multimodal neural search.
# trim Removes leading and trailing white space from a string field.
# uppercase Converts text in a specific field to uppercase letters.
# urldecode Decodes a string from URL-encoded format.
# user_agent Extracts details from the user agent sent by a browser to its web requests.
#
class Processor

attr_reader :type, :opts, :description

def initialize(type, opts = {})
@type = type
@description = opts[:description]
@opts = opts
end

def to_hash
{ type => @opts }
end
end
end
124 changes: 124 additions & 0 deletions spec/features/pipeline_integration_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
require 'spec_helper'
require 'faker'

describe 'Ingest Pipeline' do
let(:intake_pipeline) do
IntakeFormPipeline ||= Class.new(Stretchy::Pipeline) do

description "Ingests intake forms and scrubs ssn of html"

processor :csv,
field: :vitals,
target_fields: [:heart_rate, :systolic, :diastolic],
trim: true

processor :script,
description: "Extracts first and last name from name field",
lang: "painless",
source: <<~PAINLESS
String name = ctx['name'];
int index = name.indexOf('. ');
if (index >= 0) {
name = name.substring(index + 2);
}
String[] parts = /\\s+/.split(name);
ctx['first_name'] = parts[0];
if (parts.length > 1) {
ctx['last_name'] = parts[1];
}
PAINLESS

processor :html_strip, field: :ssn
processor :convert, field: :systolic, type: :integer
processor :convert, field: :diastolic, type: :integer
processor :convert, field: :heart_rate, type: :integer

processor :remove, field: :name
processor :remove, field: :vitals

end
end

let(:intake_form) do
IntakeForm ||= Class.new(StretchyModel) do
attribute :first_name, :keyword
attribute :last_name, :keyword
attribute :ssn, :keyword
attribute :heart_rate, :integer
attribute :systolic, :integer
attribute :diastolic, :integer
attribute :age, :integer

default_pipeline :intake_form_pipeline
end
end

let(:initial_data) do
10.times.map do
{
"id" => Faker::Alphanumeric.alphanumeric(number: 7),
"vitals" => [Faker::Number.between(from: 54, to: 120), Faker::Number.between(from: 60, to: 140), Faker::Number.between(from: 40, to: 100)].join(","),
"name" => Faker::Name.name,
"age" => Faker::Number.between(from: 19, to: 84),
"ssn" => "<b>#{Faker::IDNumber.valid}</b>"
}
end
end

let(:bulk_records) do
initial_data.map do |data|
{ index: { _index: 'intake_forms', _id: data["id"], data: data } }
end
end

let(:source_records) do
initial_data.map do |data|
{ _source: data }
end
end

before do
intake_pipeline.create!
intake_form.create_index!

IntakeForm.bulk(bulk_records)
IntakeForm.refresh_index!
end

after do
intake_form.delete_index!
intake_pipeline.delete!
end

it 'simulates the pipeline' do
response = intake_pipeline.simulate(source_records)
statuses = response["docs"].map {|d| d["processor_results"].map {|pr| pr["status"]}}.flatten
expect(statuses).to all(eq("success"))
end

it 'processes data correctly' do
expect(intake_pipeline.exists?).to be_truthy
expect(intake_form.default_pipeline).to eq('intake_form_pipeline')
expect(IntakeForm.count).to eq(initial_data.size)
IntakeForm.all.each_with_index do |form, index|
name = initial_data[index]["name"].gsub(/^\w+\.\s/, '')
expect(form.first_name).to eq(name.split(' ')[0])
expect(form.last_name).to eq(name.split(' ')[1])
expect(form.ssn).not_to include('<b>', '</b>')
expect(form.ssn).to eq(initial_data[index]["ssn"].gsub(/<\/?[^>]*>/, ""))
expect(form.heart_rate).to eq(initial_data[index]["vitals"].split(',')[0].to_i)
expect(form.systolic).to eq(initial_data[index]["vitals"].split(',')[1].to_i)
expect(form.diastolic).to eq(initial_data[index]["vitals"].split(',')[2].to_i)
expect(form.age).to eq(initial_data[index]["age"])
end
end

it 'appears in the pipeline list' do
expect(intake_pipeline.all).to be_a(Hash)
expect(intake_pipeline.all).to have_key('intake_form_pipeline')
end

it 'exists' do
expect(intake_pipeline.exists?).to be_truthy
end
end
13 changes: 13 additions & 0 deletions spec/stretchy/attributes_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,20 @@


context 'mappings' do
context 'pipeline' do
context 'with pipeline' do
it 'includes pipeline' do
model.default_pipeline :test_pipeline
expect(model.settings[:default_pipeline]).to eq('test_pipeline')
end
end

context 'without pipeline' do
it 'does not include pipeline' do
expect(model.settings).not_to have_key(:default_pipeline)
end
end
end

context 'auto keyword fields for text fields' do
it 'adds keyword to text fields' do
Expand Down
Loading

0 comments on commit 4881686

Please sign in to comment.