Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Ingest Pipelines #85

Merged
merged 2 commits into from
Mar 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions lib/stretchy/common.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ def default_size(size = nil)
@default_size
end

def default_pipeline(pipeline = nil)
@default_pipeline = pipeline.to_s unless pipeline.nil?
@default_pipeline
end

private

# Return a Relation instance to chain queries
Expand Down
10 changes: 8 additions & 2 deletions lib/stretchy/delegation/gateway_delegation.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,20 @@ def index_name(name=nil, &block)
end
end

def index_settings(settings={})
@index_settings ||= settings
@index_settings.merge!(default_pipeline: default_pipeline.to_s) if default_pipeline
@index_settings.with_indifferent_access
end

def reload_gateway_configuration!
@gateway = nil
end

def gateway(&block)
reload_gateway_configuration! if @gateway && @gateway.client != Stretchy.configuration.client
@gateway ||= Stretchy::Repository.create(client: Stretchy.configuration.client, index_name: index_name, klass: base_class, mapping: base_class.attribute_mappings.merge(dynamic: true))

@gateway ||= Stretchy::Repository.create(client: Stretchy.configuration.client, index_name: index_name, klass: base_class, mapping: base_class.attribute_mappings.merge(dynamic: true), settings: index_settings)
# block.arity < 1 ? @gateway.instance_eval(&block) : block.call(@gateway) if block_given?
@gateway
end
Expand Down
123 changes: 123 additions & 0 deletions lib/stretchy/pipeline.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
module Stretchy
class Pipeline
cattr_reader :client do
Stretchy.configuration.client.ingest
end

class << self
attr_accessor :description, :pipeline_name, :processors

def pipeline_name(name = nil)
return @pipeline_name if name.nil? && @pipeline_name
@pipeline_name = name || self.name.split('::').last.underscore
end

def description(desc = nil)
@description = desc if desc
@description
end

def processor(type, opts = {})
@processors ||= []
@processors << Stretchy::Pipelines::Processor.new(type, opts)
end

def all
begin
client.get_pipeline
rescue not_found => e
return {}
end
end

def find(id)
client.get_pipeline(id: id)
end

def simulate(docs, verbose: true)
client.simulate(id: self.pipeline_name, body: {docs: docs}, verbose: verbose)
end

# PUT _ingest/pipeline/<pipeline-name>
def create!
client.put_pipeline(id: self.pipeline_name, body: self.to_hash)
end

# DELETE _ingest/pipeline/<pipeline-name>
def delete!
client.delete_pipeline(id: self.pipeline_name)
end

def exists?
begin
self.find(self.pipeline_name).present?
rescue not_found => e
return false
end
end


def to_hash
{
description: self.description,
processors: self.processors.map(&:to_hash)
}.as_json
end

protected
def not_found
@not_found ||= Object.const_get("#{client.class.name.split('::').first}::Transport::Transport::Errors::NotFound")
end

end

attr_accessor :description, :pipeline_name, :processors

def initialize
@description = self.class.description
@pipeline_name = self.class.pipeline_name
@processors = self.class.processors
end

# GET _ingest/pipeline/<pipeline-name>
def find
self.class.find(self.pipeline_name)
end

# Simulates the pipeline.
#
# Request body fields
#
# The following table lists the request body fields used to run a pipeline.
#
# Field Required Type Description
# docs Required Array The documents to be used to test the pipeline.
# pipeline Optional Object The pipeline to be simulated. If the pipeline identifier is not included, then the response simulates the latest pipeline created.
# The docs field can include subfields listed in the following table.
#
# Field Required Type Description
# source Required Object The document’s JSON body.
# id Optional String A unique document identifier. The identifier cannot be used elsewhere in the index.
# index Optional String The index where the document’s transformed data appears.
def simulate(docs, verbose: true)
self.class.simulate(docs, verbose: verbose)
end

def exists?
self.class.exists?
end

def to_hash
{
description: self.description,
processors: self.processors.map(&:to_hash)
}.as_json
end

def client
@client ||= Stretchy.configuration.client.ingest
end


end
end
55 changes: 55 additions & 0 deletions lib/stretchy/pipelines/processor.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
module Stretchy::Pipelines
# Creates a new processor for a pipeline
#
# Processor type Description
# append Adds one or more values to a field in a document.
# bytes Converts a human-readable byte value to its value in bytes.
# convert Changes the data type of a field in a document.
# copy Copies an entire object in an existing field to another field.
# csv Extracts CSVs and stores them as individual fields in a document.
# date Parses dates from fields and then uses the date or timestamp as the timestamp for a document.
# date_index_name Indexes documents into time-based indexes based on a date or timestamp field in a document.
# dissect Extracts structured fields from a text field using a defined pattern.
# dot_expander Expands a field with dots into an object field.
# drop Drops a document without indexing it or raising any errors.
# fail Raises an exception and stops the execution of a pipeline.
# foreach Allows for another processor to be applied to each element of an array or an object field in a document.
# geoip Adds information about the geographical location of an IP address.
# geojson-feature Indexes GeoJSON data into a geospatial field.
# grok Parses and structures unstructured data using pattern matching.
# gsub Replaces or deletes substrings within a string field of a document.
# html_strip Removes HTML tags from a text field and returns the plain text content.
# ip2geo Adds information about the geographical location of an IPv4 or IPv6 address.
# join Concatenates each element of an array into a single string using a separator character between each element.
# json Converts a JSON string into a structured JSON object.
# kv Automatically parses key-value pairs in a field.
# lowercase Converts text in a specific field to lowercase letters.
# pipeline Runs an inner pipeline.
# remove Removes fields from a document.
# script Runs an inline or stored script on incoming documents.
# set Sets the value of a field to a specified value.
# sort Sorts the elements of an array in ascending or descending order.
# sparse_encoding Generates a sparse vector/token and weights from text fields for neural sparse search using sparse retrieval.
# split Splits a field into an array using a separator character.
# text_embedding Generates vector embeddings from text fields for semantic search.
# text_image_embedding Generates combined vector embeddings from text and image fields for multimodal neural search.
# trim Removes leading and trailing white space from a string field.
# uppercase Converts text in a specific field to uppercase letters.
# urldecode Decodes a string from URL-encoded format.
# user_agent Extracts details from the user agent sent by a browser to its web requests.
#
class Processor

attr_reader :type, :opts, :description

def initialize(type, opts = {})
@type = type
@description = opts[:description]
@opts = opts
end

def to_hash
{ type => @opts }
end
end
end
124 changes: 124 additions & 0 deletions spec/features/pipeline_integration_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
require 'spec_helper'
require 'faker'

describe 'Ingest Pipeline' do
let(:intake_pipeline) do
IntakeFormPipeline ||= Class.new(Stretchy::Pipeline) do

description "Ingests intake forms and scrubs ssn of html"

processor :csv,
field: :vitals,
target_fields: [:heart_rate, :systolic, :diastolic],
trim: true

processor :script,
description: "Extracts first and last name from name field",
lang: "painless",
source: <<~PAINLESS
String name = ctx['name'];
int index = name.indexOf('. ');
if (index >= 0) {
name = name.substring(index + 2);
}
String[] parts = /\\s+/.split(name);
ctx['first_name'] = parts[0];
if (parts.length > 1) {
ctx['last_name'] = parts[1];
}
PAINLESS

processor :html_strip, field: :ssn
processor :convert, field: :systolic, type: :integer
processor :convert, field: :diastolic, type: :integer
processor :convert, field: :heart_rate, type: :integer

processor :remove, field: :name
processor :remove, field: :vitals

end
end

let(:intake_form) do
IntakeForm ||= Class.new(StretchyModel) do
attribute :first_name, :keyword
attribute :last_name, :keyword
attribute :ssn, :keyword
attribute :heart_rate, :integer
attribute :systolic, :integer
attribute :diastolic, :integer
attribute :age, :integer

default_pipeline :intake_form_pipeline
end
end

let(:initial_data) do
10.times.map do
{
"id" => Faker::Alphanumeric.alphanumeric(number: 7),
"vitals" => [Faker::Number.between(from: 54, to: 120), Faker::Number.between(from: 60, to: 140), Faker::Number.between(from: 40, to: 100)].join(","),
"name" => Faker::Name.name,
"age" => Faker::Number.between(from: 19, to: 84),
"ssn" => "<b>#{Faker::IDNumber.valid}</b>"
}
end
end

let(:bulk_records) do
initial_data.map do |data|
{ index: { _index: 'intake_forms', _id: data["id"], data: data } }
end
end

let(:source_records) do
initial_data.map do |data|
{ _source: data }
end
end

before do
intake_pipeline.create!
intake_form.create_index!

IntakeForm.bulk(bulk_records)
IntakeForm.refresh_index!
end

after do
intake_form.delete_index!
intake_pipeline.delete!
end

it 'simulates the pipeline' do
response = intake_pipeline.simulate(source_records)
statuses = response["docs"].map {|d| d["processor_results"].map {|pr| pr["status"]}}.flatten
expect(statuses).to all(eq("success"))
end

it 'processes data correctly' do
expect(intake_pipeline.exists?).to be_truthy
expect(intake_form.default_pipeline).to eq('intake_form_pipeline')
expect(IntakeForm.count).to eq(initial_data.size)
IntakeForm.all.each_with_index do |form, index|
name = initial_data[index]["name"].gsub(/^\w+\.\s/, '')
expect(form.first_name).to eq(name.split(' ')[0])
expect(form.last_name).to eq(name.split(' ')[1])
expect(form.ssn).not_to include('<b>', '</b>')
expect(form.ssn).to eq(initial_data[index]["ssn"].gsub(/<\/?[^>]*>/, ""))
expect(form.heart_rate).to eq(initial_data[index]["vitals"].split(',')[0].to_i)
expect(form.systolic).to eq(initial_data[index]["vitals"].split(',')[1].to_i)
expect(form.diastolic).to eq(initial_data[index]["vitals"].split(',')[2].to_i)
expect(form.age).to eq(initial_data[index]["age"])
end
end

it 'appears in the pipeline list' do
expect(intake_pipeline.all).to be_a(Hash)
expect(intake_pipeline.all).to have_key('intake_form_pipeline')
end

it 'exists' do
expect(intake_pipeline.exists?).to be_truthy
end
end
13 changes: 13 additions & 0 deletions spec/stretchy/attributes_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,20 @@


context 'mappings' do
context 'pipeline' do
context 'with pipeline' do
it 'includes pipeline' do
model.default_pipeline :test_pipeline
expect(model.settings[:default_pipeline]).to eq('test_pipeline')
end
end

context 'without pipeline' do
it 'does not include pipeline' do
expect(model.settings).not_to have_key(:default_pipeline)
end
end
end

context 'auto keyword fields for text fields' do
it 'adds keyword to text fields' do
Expand Down
Loading
Loading