forked from mlartz/rflow
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtemp.rb
295 lines (220 loc) · 7.26 KB
/
temp.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
module RFlow
def self.run(config)
# Take in the config file
# Set a module-level config
# Set module-level attributes (logger)
# Create manager
# Start manager with parsed config elements
end
class Manager
def initialize(config)
end
# Find each component
# Instantiate (process management)
end
end
class SchemaRegistry
# maps data type names to schemas based on schema type
find_by_data_type_name
end
class MessageDataRegistry
def find(data_type_name)
# returns a data type class if registered, nil otherwise
end
end
class Message::Data
# contains the schema + data information
# subclasses can add extra functionality, otherwise will just have
# acces to standard messagedata stuffs (i.e. standard avro data types)
# delegates a lot to standard Avro types
# how does this get access to the registry at the class level?
class << self
attr_accessor :class_registry
attr_accessor :schema_registry
end
# Pointer to encapsulating message
attr_accessor :message
def initialize(data_type_name, serialized_data=nil, schema_name=nil, schema_type=nil, schema=nil, message=nil)
# schema_name ||= 'org.rflow.Messages.GenericStringMap'
# schema_type ||= 'avro'
# schema ||= 'default avro schema'
merge_options
# TODO: think about schema resolution and conflicts between passed
# data and schema registry
# Lookup schema based on data type name
registered_schema_name, registered_schema, registered_schema_type = self.class.schema_registry.find(data_type_name)
if registered_schema.nil? && schema
# If you were given a schema and didn't get one from the
# registry register the schema?
self.class.schema_registry.register(data_type_name, schema_name, schema_type, schema)
else
end
end
def self.create(data_type_name, data=nil, schema_name=nil, schema_type=nil, schema=nil)
# look for object in registry by data_type_name
# if object found, call new on that object
# otherwise, call new on the default object
message_class = self.class.data_class_registry.find(data_type_name)
if message_class.nil?
MessageData.new(data_type_name, data, schema_name, schema_type, schema)
else
message_class.create(data_type_name, data, schema_name, schema_type, schema)
end
end
end
module HTTPResponse
end
Message.new.extend(HTTPResponse)
class HTTPRequest < RFlow::Message::Data
# used to add methods, defaults, and more to data object, if required
# Put this in the registry
AVRO_SCHEMA_NAME = 'org.rflow.http_request'
DATA_TYPE_NAME = "HTTPRequest"
# All subclasses must have the same initialize signature. They need
# to figure out what to do when they get the extra parameters that
# might conflict with expectations. Subclasses are usually meant to
# enable extra functionality on a given data type, so as long as it
# operates properly, it might not care (duck typing)
def initialize(data_type_name, data, schema_name, schema_type, schema)
super(DATA_TYPE_NAME, data, AVRO_SCHEMA_NAME)
# do nice stuff with data here
end
def self.create(data_type_name, data, schema_name, schema_type, schema)
# figure out if you are being called with incompatible arguments,
# i.e. schema stuff
end
end
class Message
# contains all definitions about what to do for a message
# has a default Avro schema for a data type
class << self
attr_accessor :data_class_registry
end
# Should load all the data stuff, perhaps to top level method on object
attr_accessor :data_type_name, :provenance, :origination_context, :data_type_schema, :data
def initialize(data_type_name, provenance=nil, origination_context=nil, data_type_schema=nil, data=nil)
if data
# Potentially register this data_type_name to the schema
else
# Lookup MessageData type in the MessageDataRegistry
# if found and a class, create a specific MessageData object
# extend it with the module
# else, create generic MessageData object which will use
# the schema registry, under the hood
# if found and a module, extend object with found module
message_data_class = self.class.data_class_registry.find(data_type_name)
if message_data_class && message_data_class.class.is_a? Class
message_data = message_data_class.new
else
message_data = Message::Data.new
message_data.extend message_data_class if message_data_class.is_a? Module
end
end
end
end
class Port
def read_message
parts = read_all_parts
parts.assemble
data_type_name = read_message_part
provenance = read_message_part
origination_context = read_message_part
data_type_schema = read_message_part
data = read_message_part
message = Message.new(data_type_name, provenance, origination_context, data_type_schema, data)
message
end
end
class PortCollection
end
class Logger
end
class Component
def self.input_port(port_def)
@@input_ports ||= PortCollection.new
if port_def.is_a? Array
port_name = port_def.first.to_sym
port_incidence = :array
else
port_name = port_def
port_incidence = :single
end
@@input_ports[port_name] = InputPort.new port_name, port_incidence
end
def self.output_port
# same as input port with different stuffs
end
STATES = [:initialized, :started, :configured, :running, :stopping, :stopped]
attr_accessor :state
attr_accessor :input_ports
attr_accessor :output_ports
attr_accessor :uuid
attr_accessor :name
CONFIG_DEFAULTS = {
:logger,
:working_directory_path,
}
def initialize(config, run_directory)
# configure component
config = {
}
# TODO: where is the management bus listener configured/started
end
def run
input_ports.ready do |port|
message = port.read_message
process_input(port, message)
# read from the port and think about things
out.send('stuff')
another_out.send('more stuff')
end
# listen to
end
def process_message(input_port, message)
end
def receive_message(port)
port.receive
end
def send_message(port, message)
port.send(message)
end
end
class HTTPServer < RFlow::Component
input_port :responses
output_port :requests
input_types "HTTP::Response"
output_types "HTTP::Request"
end
class PassThrough < RFlow::Component
input_port [:in]
input_port :another_in
output_port :out
output_port :another_out
output_types
def initialize(config, run_directory)
# This will initialize the ports
super
# Do stuff to initialize component. Don't assume singleton
end
def process_message(input_port, data)
out.send(message)
another_out.send(message)
end
def process_data(input_port
end
class Transform < RFlow::Component
end
# Plugins:
# MessageData subclass: rflow-data-http_request
# lib/rflow-data-http_request.rb
require 'rflow'
require 'lib/data_name'
RFlow.available_data_types << data_name_object
# Component: rflow-component-http_server
# lib/rflow-component-http_server
require 'rflow'
require 'lib/component_name'
RFlow.available_components << component_class
# lib/component_name.rb ->
# data_type_name => schema + registration: just register in the application
# Server -> (HttpRequest -> Translate -> HTTPResponse) -> Server