diff --git a/lib/stretchy/common.rb b/lib/stretchy/common.rb index ce462d4..0ccd57d 100644 --- a/lib/stretchy/common.rb +++ b/lib/stretchy/common.rb @@ -20,6 +20,11 @@ def default_size(size = nil) @default_size end + def default_pipeline(pipeline = nil) + @default_pipeline = pipeline.to_s unless pipeline.nil? + @default_pipeline + end + private # Return a Relation instance to chain queries diff --git a/lib/stretchy/delegation/gateway_delegation.rb b/lib/stretchy/delegation/gateway_delegation.rb index 658f521..a206416 100644 --- a/lib/stretchy/delegation/gateway_delegation.rb +++ b/lib/stretchy/delegation/gateway_delegation.rb @@ -30,14 +30,20 @@ def index_name(name=nil, &block) end end + def index_settings(settings={}) + @index_settings ||= settings + @index_settings.merge!(default_pipeline: default_pipeline.to_s) if default_pipeline + @index_settings.with_indifferent_access + end + def reload_gateway_configuration! @gateway = nil end def gateway(&block) reload_gateway_configuration! if @gateway && @gateway.client != Stretchy.configuration.client - - @gateway ||= Stretchy::Repository.create(client: Stretchy.configuration.client, index_name: index_name, klass: base_class, mapping: base_class.attribute_mappings.merge(dynamic: true)) + + @gateway ||= Stretchy::Repository.create(client: Stretchy.configuration.client, index_name: index_name, klass: base_class, mapping: base_class.attribute_mappings.merge(dynamic: true), settings: index_settings) # block.arity < 1 ? @gateway.instance_eval(&block) : block.call(@gateway) if block_given? @gateway end diff --git a/lib/stretchy/pipeline.rb b/lib/stretchy/pipeline.rb new file mode 100644 index 0000000..35e8fee --- /dev/null +++ b/lib/stretchy/pipeline.rb @@ -0,0 +1,123 @@ +module Stretchy + class Pipeline + cattr_reader :client do + Stretchy.configuration.client.ingest + end + + class << self + attr_accessor :description, :pipeline_name, :processors + + def pipeline_name(name = nil) + return @pipeline_name if name.nil? && @pipeline_name + @pipeline_name = name || self.name.split('::').last.underscore + end + + def description(desc = nil) + @description = desc if desc + @description + end + + def processor(type, opts = {}) + @processors ||= [] + @processors << Stretchy::Pipelines::Processor.new(type, opts) + end + + def all + begin + client.get_pipeline + rescue not_found => e + return {} + end + end + + def find(id) + client.get_pipeline(id: id) + end + + def simulate(docs, verbose: true) + client.simulate(id: self.pipeline_name, body: {docs: docs}, verbose: verbose) + end + + # PUT _ingest/pipeline/ + def create! + client.put_pipeline(id: self.pipeline_name, body: self.to_hash) + end + + # DELETE _ingest/pipeline/ + def delete! + client.delete_pipeline(id: self.pipeline_name) + end + + def exists? + begin + self.find(self.pipeline_name).present? + rescue not_found => e + return false + end + end + + + def to_hash + { + description: self.description, + processors: self.processors.map(&:to_hash) + }.as_json + end + + protected + def not_found + @not_found ||= Object.const_get("#{client.class.name.split('::').first}::Transport::Transport::Errors::NotFound") + end + + end + + attr_accessor :description, :pipeline_name, :processors + + def initialize + @description = self.class.description + @pipeline_name = self.class.pipeline_name + @processors = self.class.processors + end + + # GET _ingest/pipeline/ + def find + self.class.find(self.pipeline_name) + end + + # Simulates the pipeline. + # + # Request body fields + # + # The following table lists the request body fields used to run a pipeline. + # + # Field Required Type Description + # docs Required Array The documents to be used to test the pipeline. + # pipeline Optional Object The pipeline to be simulated. If the pipeline identifier is not included, then the response simulates the latest pipeline created. + # The docs field can include subfields listed in the following table. + # + # Field Required Type Description + # source Required Object The document’s JSON body. + # id Optional String A unique document identifier. The identifier cannot be used elsewhere in the index. + # index Optional String The index where the document’s transformed data appears. + def simulate(docs, verbose: true) + self.class.simulate(docs, verbose: verbose) + end + + def exists? + self.class.exists? + end + + def to_hash + { + description: self.description, + processors: self.processors.map(&:to_hash) + }.as_json + end + + def client + @client ||= Stretchy.configuration.client.ingest + end + + + end +end diff --git a/lib/stretchy/pipelines/processor.rb b/lib/stretchy/pipelines/processor.rb new file mode 100644 index 0000000..c1f4989 --- /dev/null +++ b/lib/stretchy/pipelines/processor.rb @@ -0,0 +1,55 @@ +module Stretchy::Pipelines + # Creates a new processor for a pipeline + # + # Processor type Description + # append Adds one or more values to a field in a document. + # bytes Converts a human-readable byte value to its value in bytes. + # convert Changes the data type of a field in a document. + # copy Copies an entire object in an existing field to another field. + # csv Extracts CSVs and stores them as individual fields in a document. + # date Parses dates from fields and then uses the date or timestamp as the timestamp for a document. + # date_index_name Indexes documents into time-based indexes based on a date or timestamp field in a document. + # dissect Extracts structured fields from a text field using a defined pattern. + # dot_expander Expands a field with dots into an object field. + # drop Drops a document without indexing it or raising any errors. + # fail Raises an exception and stops the execution of a pipeline. + # foreach Allows for another processor to be applied to each element of an array or an object field in a document. + # geoip Adds information about the geographical location of an IP address. + # geojson-feature Indexes GeoJSON data into a geospatial field. + # grok Parses and structures unstructured data using pattern matching. + # gsub Replaces or deletes substrings within a string field of a document. + # html_strip Removes HTML tags from a text field and returns the plain text content. + # ip2geo Adds information about the geographical location of an IPv4 or IPv6 address. + # join Concatenates each element of an array into a single string using a separator character between each element. + # json Converts a JSON string into a structured JSON object. + # kv Automatically parses key-value pairs in a field. + # lowercase Converts text in a specific field to lowercase letters. + # pipeline Runs an inner pipeline. + # remove Removes fields from a document. + # script Runs an inline or stored script on incoming documents. + # set Sets the value of a field to a specified value. + # sort Sorts the elements of an array in ascending or descending order. + # sparse_encoding Generates a sparse vector/token and weights from text fields for neural sparse search using sparse retrieval. + # split Splits a field into an array using a separator character. + # text_embedding Generates vector embeddings from text fields for semantic search. + # text_image_embedding Generates combined vector embeddings from text and image fields for multimodal neural search. + # trim Removes leading and trailing white space from a string field. + # uppercase Converts text in a specific field to uppercase letters. + # urldecode Decodes a string from URL-encoded format. + # user_agent Extracts details from the user agent sent by a browser to its web requests. + # + class Processor + + attr_reader :type, :opts, :description + + def initialize(type, opts = {}) + @type = type + @description = opts[:description] + @opts = opts + end + + def to_hash + { type => @opts } + end + end +end \ No newline at end of file diff --git a/spec/features/pipeline_integration_spec.rb b/spec/features/pipeline_integration_spec.rb new file mode 100644 index 0000000..3b711e3 --- /dev/null +++ b/spec/features/pipeline_integration_spec.rb @@ -0,0 +1,124 @@ +require 'spec_helper' +require 'faker' + +describe 'Ingest Pipeline' do + let(:intake_pipeline) do + IntakeFormPipeline ||= Class.new(Stretchy::Pipeline) do + + description "Ingests intake forms and scrubs ssn of html" + + processor :csv, + field: :vitals, + target_fields: [:heart_rate, :systolic, :diastolic], + trim: true + + processor :script, + description: "Extracts first and last name from name field", + lang: "painless", + source: <<~PAINLESS + String name = ctx['name']; + int index = name.indexOf('. '); + if (index >= 0) { + name = name.substring(index + 2); + } + String[] parts = /\\s+/.split(name); + ctx['first_name'] = parts[0]; + if (parts.length > 1) { + ctx['last_name'] = parts[1]; + } + PAINLESS + + processor :html_strip, field: :ssn + processor :convert, field: :systolic, type: :integer + processor :convert, field: :diastolic, type: :integer + processor :convert, field: :heart_rate, type: :integer + + processor :remove, field: :name + processor :remove, field: :vitals + + end + end + + let(:intake_form) do + IntakeForm ||= Class.new(StretchyModel) do + attribute :first_name, :keyword + attribute :last_name, :keyword + attribute :ssn, :keyword + attribute :heart_rate, :integer + attribute :systolic, :integer + attribute :diastolic, :integer + attribute :age, :integer + + default_pipeline :intake_form_pipeline + end + end + + let(:initial_data) do + 10.times.map do + { + "id" => Faker::Alphanumeric.alphanumeric(number: 7), + "vitals" => [Faker::Number.between(from: 54, to: 120), Faker::Number.between(from: 60, to: 140), Faker::Number.between(from: 40, to: 100)].join(","), + "name" => Faker::Name.name, + "age" => Faker::Number.between(from: 19, to: 84), + "ssn" => "#{Faker::IDNumber.valid}" + } + end + end + + let(:bulk_records) do + initial_data.map do |data| + { index: { _index: 'intake_forms', _id: data["id"], data: data } } + end + end + + let(:source_records) do + initial_data.map do |data| + { _source: data } + end + end + + before do + intake_pipeline.create! + intake_form.create_index! + + IntakeForm.bulk(bulk_records) + IntakeForm.refresh_index! + end + + after do + intake_form.delete_index! + intake_pipeline.delete! + end + + it 'simulates the pipeline' do + response = intake_pipeline.simulate(source_records) + statuses = response["docs"].map {|d| d["processor_results"].map {|pr| pr["status"]}}.flatten + expect(statuses).to all(eq("success")) + end + + it 'processes data correctly' do + expect(intake_pipeline.exists?).to be_truthy + expect(intake_form.default_pipeline).to eq('intake_form_pipeline') + expect(IntakeForm.count).to eq(initial_data.size) + IntakeForm.all.each_with_index do |form, index| + name = initial_data[index]["name"].gsub(/^\w+\.\s/, '') + expect(form.first_name).to eq(name.split(' ')[0]) + expect(form.last_name).to eq(name.split(' ')[1]) + expect(form.ssn).not_to include('', '') + expect(form.ssn).to eq(initial_data[index]["ssn"].gsub(/<\/?[^>]*>/, "")) + expect(form.heart_rate).to eq(initial_data[index]["vitals"].split(',')[0].to_i) + expect(form.systolic).to eq(initial_data[index]["vitals"].split(',')[1].to_i) + expect(form.diastolic).to eq(initial_data[index]["vitals"].split(',')[2].to_i) + expect(form.age).to eq(initial_data[index]["age"]) + end + end + + it 'appears in the pipeline list' do + expect(intake_pipeline.all).to be_a(Hash) + expect(intake_pipeline.all).to have_key('intake_form_pipeline') + end + + it 'exists' do + expect(intake_pipeline.exists?).to be_truthy + end +end \ No newline at end of file diff --git a/spec/stretchy/attributes_spec.rb b/spec/stretchy/attributes_spec.rb index 290dca6..90372f7 100644 --- a/spec/stretchy/attributes_spec.rb +++ b/spec/stretchy/attributes_spec.rb @@ -40,7 +40,20 @@ context 'mappings' do + context 'pipeline' do + context 'with pipeline' do + it 'includes pipeline' do + model.default_pipeline :test_pipeline + expect(model.settings[:default_pipeline]).to eq('test_pipeline') + end + end + context 'without pipeline' do + it 'does not include pipeline' do + expect(model.settings).not_to have_key(:default_pipeline) + end + end + end context 'auto keyword fields for text fields' do it 'adds keyword to text fields' do diff --git a/spec/stretchy/pipeline_spec.rb b/spec/stretchy/pipeline_spec.rb new file mode 100644 index 0000000..f1cbe3a --- /dev/null +++ b/spec/stretchy/pipeline_spec.rb @@ -0,0 +1,130 @@ +require 'spec_helper' + +describe Stretchy::Pipeline do + + let!(:pipeline_class) do + TestPipeline ||= Class.new(Stretchy::Pipeline) do + description 'A test pipeline' + processor :sparse_encoding, model_id: 'q32Pw02BJ3squ3VZa', + field_map: { + body: :embedding + } + end + end + + + it 'should have an inferred pipeline name' do + expect(pipeline_class.pipeline_name).to eq('test_pipeline') + end + + it 'should have a description' do + expect(pipeline_class.description).to eq('A test pipeline') + end + + it 'should have a processor' do + expect(pipeline_class.processors.first).to be_a(Stretchy::Pipelines::Processor) + expect(pipeline_class.processors.first.type).to eq(:sparse_encoding) + expect(pipeline_class.processors.first.opts).to eq({model_id: 'q32Pw02BJ3squ3VZa', field_map: {body: :embedding}}) + end + + it 'should list all pipelines' do + expect(pipeline_class.all).to be_a(Hash) + end + + + let(:pipeline) { pipeline_class.new } + context 'instance' do + it 'should have a pipeline name' do + expect(pipeline.pipeline_name).to eq('test_pipeline') + end + + it 'should have a description' do + expect(pipeline.description).to eq('A test pipeline') + end + + it 'should have a processor' do + expect(pipeline.processors.first).to be_a(Stretchy::Pipelines::Processor) + expect(pipeline.processors.first.type).to eq(:sparse_encoding) + expect(pipeline.processors.first.opts).to eq({model_id: 'q32Pw02BJ3squ3VZa', field_map: {body: :embedding}}) + end + end + + context 'api client' do + it 'appears as a hash' do + expect(pipeline_class.to_hash).to eq({ + description: 'A test pipeline', + processors: [ + { + sparse_encoding: { + model_id: 'q32Pw02BJ3squ3VZa', + field_map: { + body: :embedding + } + } + } + ] + }.as_json) + end + + it 'should create a new pipeline' do + allow(pipeline.client).to receive(:put_pipeline).and_return({"acknowledged"=>true}) + expect(pipeline_class.create!).to eq({"acknowledged"=>true}) + end + + it 'should intercept NotFound in exists?' do + allow(pipeline_class).to receive(:find).and_raise(pipeline_class.send(:not_found)) + expect(pipeline_class.exists?).to be_falsey + end + + let(:simulation_response) do + { + "docs" => [ + { + "processor_results" => [ + { + "processor_type" => "uppercase", + "status" => "success", + "doc" => { + "_index" => "_index", + "_id" => "_id", + "_source" => { + "name" => "TEST" + }, + "_ingest" => { + "pipeline" => "uppercase-pipeline", + "timestamp" => "2024-03-16T20:36:04.388880257Z" + } + } + } + ] + } + ] + } + end + + it 'should simulate a pipeline' do + uppercase = Class.new(Stretchy::Pipeline) do + pipeline_name 'uppercase-pipeline' + description 'Uppercase' + processor :uppercase, field: :name + end + + allow(uppercase.client).to receive(:simulate).and_return(simulation_response) + allow(uppercase.client).to receive(:put_pipeline).and_return({"acknowledged"=>true}) + expect(uppercase.pipeline_name).to eq('uppercase-pipeline') + expect(uppercase.description).to eq('Uppercase') + expect(uppercase.create!).to eq({"acknowledged"=>true}) + expect(uppercase.simulate([{_source: {name: 'test'}}])).to eq(simulation_response) + end + + it 'should retrieve a pipeline' do + allow(pipeline.client).to receive(:get_pipeline).and_return({pipeline.pipeline_name => pipeline.to_hash}) + expect(pipeline.find).to eq({pipeline.pipeline_name => pipeline.to_hash}) + end + + it 'should delete a pipeline' do + allow(pipeline_class.client).to receive(:delete_pipeline).and_return({"acknowledged"=>true}) + expect(pipeline_class.delete!).to eq({"acknowledged"=>true}) + end + end +end \ No newline at end of file diff --git a/spec/stretchy/pipelines/processor_spec.rb b/spec/stretchy/pipelines/processor_spec.rb new file mode 100644 index 0000000..9726f6d --- /dev/null +++ b/spec/stretchy/pipelines/processor_spec.rb @@ -0,0 +1,30 @@ +require 'spec_helper' + +describe Stretchy::Pipelines::Processor do + describe '#initialize' do + it 'sets the type, opts, and description' do + type = 'set' + opts = { description: 'some description' } + processor = described_class.new(type, opts) + + expect(processor.type).to eq(type) + expect(processor.opts).to eq(opts) + expect(processor.description).to eq(opts[:description]) + end + end + + describe '#to_hash' do + it 'returns a hash representation of the processor' do + type = 'some_type' + opts = {model_id: 'q32Pw02BJ3squ3VZa', + field_map: { + body: :embedding + } + } + processor = described_class.new(type, opts) + + expected_hash = { type => opts } + expect(processor.to_hash).to eq(expected_hash) + end + end +end \ No newline at end of file diff --git a/spec/support/behaves_like_stretchy_model.rb b/spec/support/behaves_like_stretchy_model.rb index 98a0eae..8089f5f 100644 --- a/spec/support/behaves_like_stretchy_model.rb +++ b/spec/support/behaves_like_stretchy_model.rb @@ -25,6 +25,18 @@ expect(model_class.count).to be_a(Numeric) end + context 'pipelins' do + it 'responds to default_pipeline' do + expect(model_class).to respond_to(:default_pipeline) + end + + it 'can set a default_pipeline' do + model_class.default_pipeline :test_pipeline + expect(model_class.default_pipeline).to eq('test_pipeline') + end + + end + context 'defaults' do context 'attributes' do it 'includes an id' do diff --git a/stretchy-model.gemspec b/stretchy-model.gemspec index f0c0e4f..05e0a94 100644 --- a/stretchy-model.gemspec +++ b/stretchy-model.gemspec @@ -47,6 +47,7 @@ Gem::Specification.new do |spec| spec.add_development_dependency "opensearch-ruby", "~> 3.0" spec.add_development_dependency "octokit", "~> 4.20" spec.add_development_dependency "versionomy", "~> 0.5.0" + spec.add_development_dependency "faker", "~> 2.18" # For more information and examples about making a new gem, check out our # guide at: https://bundler.io/guides/creating_gem.html end