Please disable Adblockers and enable JavaScript for domain CEWebS.cs.univie.ac.at! We have NO ADS, but they may interfere with some of our course material.
Name: server/engine/controller.rb
1: | require ::File.dirname(__FILE__) + '/../../lib/remar' |
2: | require 'xml/smart' |
3: | require 'json' |
4: | |
5: | class Controller |
6: | |
7: | def initialize(id,opts) |
8: | @directory = ::File.dirname(__FILE__) + "/../instances/#{id}/" |
9: | @calls = {} |
10: | @events = {} |
11: | @votes = {} |
12: | @votes_results = {} |
13: | @communication = {} |
14: | @callbacks = {} |
15: | @instance = REMAR.new |
16: | |
17: | @properties = Riddl::Utils::Properties::Backend.new( |
18: | { |
19: | :inactive => opts[:properties_schema_inactive], |
20: | :active => opts[:properties_schema_active] |
21: | }, |
22: | @directory + '/properties.xml', |
23: | opts[:properties_init] |
24: | ) |
25: | @notifications = Riddl::Utils::Notifications::Producer::Backend.new( |
26: | opts[:topics], |
27: | @directory + '/notifications/' |
28: | ) |
29: | |
30: | Dir[@directory + 'notifications/*/subscription.xml'].each do |sub| |
31: | key = ::File::basename(::File::dirname(sub)) |
32: | self.unserialize_event!(:cre,key) |
33: | end |
34: | self.unserialize_context! |
35: | self.unserialize_state! |
36: | @instance.default do |call,parameters| |
37: | if @calls[call] && parameters[0].is_a?(Hash) |
38: | parameters = parameters[0] |
39: | notify('running/call',{:call => @calls[call], :parameters => parameters}) |
40: | client = Riddl::Client.new(@calls[call]) |
41: | client.post parameters.map{|k,v|Riddl::Parameter::Simple.new(k,v)} |
42: | end |
43: | end |
44: | @instance.before_push do |evt_name,evt_data,context| |
45: | notify('running/before_push',{:evt_name => evt_name, :evt_data => evt_data, :context => context}) |
46: | end |
47: | @instance.after_push do |context| |
48: | serialize_context!(context) |
49: | notify('running/after_push',{:context => context}) |
50: | end |
51: | end |
52: | |
53: | attr_reader :callbacks |
54: | attr_reader :state |
55: | attr_reader :id |
56: | attr_reader :properties |
57: | attr_reader :notifications |
58: | |
59: | def start# {{{ |
60: | @state = :running |
61: | end# }}} |
62: | |
63: | def stop# {{{ |
64: | @state = :stopped |
65: | end# }}} |
66: | |
67: | def test_push(event,parameters)# {{{ |
68: | if @state == :running |
69: | @instance.test_push event.to_sym, parameters |
70: | end |
71: | end# }}} |
72: | def push(event,parameters)# {{{ |
73: | if @state == :running |
74: | @instance.push event.to_sym, parameters |
75: | end |
76: | end# }}} |
77: | |
78: | def serialize_context!(context)# {{{ |
79: | @properties.modify do |doc| |
80: | doc.find("/p:properties/p:context/*").each do |ss| |
81: | ss.children.each do |e| |
82: | e.text = JSON::generate(context[ss.qname.name.to_sym][e.qname.name.to_sym]) |
83: | end |
84: | end |
85: | end |
86: | end# }}} |
87: | def serialize_state!(state)# {{{ |
88: | @properties.activate_schema(:inactive) if state == :stopped || state == :ready |
89: | @properties.activate_schema(:active) if state == :running |
90: | @properties.modify do |doc| |
91: | doc.register_namespace 'p', 'http://riddl.org/ns/common-patterns/properties/1.0' |
92: | doc.find("/p:properties/p:state").first.text = state |
93: | end |
94: | end# }}} |
95: | |
96: | def unserialize_state!# {{{ |
97: | @state = @properties.data.find("string(/p:properties/p:state)").to_sym |
98: | end# }}} |
99: | |
100: | def unserialize_event!(op,key)# {{{ |
101: | case op |
102: | when :del |
103: | @notifications.subscriptions[key].delete if @notifications.subscriptions.include?(key) |
104: | |
105: | @communication.delete(key) |
106: | @events.each do |eve,keys| |
107: | keys.delete_if{|k,v| key == k} |
108: | end |
109: | @votes.each do |eve,keys| |
110: | keys.delete_if do |k,v| |
111: | if key == k |
112: | @callbacks.each{|voteid,cb|cb.delete_if!(eve,k)} |
113: | true |
114: | end |
115: | end |
116: | end |
117: | when :upd |
118: | if @notifications.subscriptions.include?(key) |
119: | url = @communication[key] |
120: | evs = [] |
121: | vos = [] |
122: | @events.each { |e,v| evs << e } |
123: | @votes.each { |e,v| vos << e } |
124: | @notifications.subscriptions[key].read do |doc| |
125: | turl = doc.find('string(/n:subscription/@url)') |
126: | url = turl == '' ? url : turl |
127: | @communication[key] = url |
128: | doc.find('/n:subscription/n:topic').each do |t| |
129: | t.find('n:event').each do |e| |
130: | @events["#{t.attributes['id']}/#{e}"] ||= {} |
131: | @events["#{t.attributes['id']}/#{e}"][key] = url |
132: | evs.delete("#{t.attributes['id']}/#{e}") |
133: | end |
134: | t.find('n:vote').each do |e| |
135: | @votes["#{t.attributes['id']}/#{e}"] ||= {} |
136: | @votes["#{t.attributes['id']}/#{e}"][key] = url |
137: | vos.delete("#{t.attributes['id']}/#{e}") |
138: | end |
139: | end |
140: | end |
141: | evs.each { |e| @votes[e].delete(key) } |
142: | vos.each do |e| |
143: | @callbacks.each{|voteid,cb|cb.delete_if!(e,key)} |
144: | @votes[e].delete(key) |
145: | end |
146: | end |
147: | when :cre |
148: | @notifications.subscriptions[key].read do |doc| |
149: | turl = doc.find('string(/n:subscription/@url)') |
150: | url = turl == '' ? nil : turl |
151: | @communication[key] = url |
152: | doc.find('/n:subscription/n:topic').each do |t| |
153: | t.find('n:event').each do |e| |
154: | @events["#{t.attributes['id']}/#{e}"] ||= {} |
155: | @events["#{t.attributes['id']}/#{e}"][key] = (url == "" ? nil : url) |
156: | end |
157: | t.find('n:vote').each do |e| |
158: | @votes["#{t.attributes['id']}/#{e}"] ||= {} |
159: | @votes["#{t.attributes['id']}/#{e}"][key] = url |
160: | end |
161: | end |
162: | end |
163: | end |
164: | end # }}} |
165: | |
166: | def unserialize_context!(what=nil)# {{{ |
167: | if what == 'context' || what.nil? |
168: | @instance.clear(:context) |
169: | @properties.data.find("/p:properties/p:context/p:*").each do |e| |
170: | temp = {} |
171: | e.children.each do |c| |
172: | temp[c.qname.to_s.to_sym] = JSON::parse(c.text) rescue nil |
173: | end |
174: | @instance.ruleset.context e.qname.to_s.to_sym, temp |
175: | end |
176: | end |
177: | |
178: | if what == 'events' || what.nil? |
179: | @instance.clear(:events) |
180: | @properties.data.find("/p:properties/p:events/p:*").each do |e| |
181: | temp = [] |
182: | e.children.each do |c| |
183: | temp << c.qname.to_s.to_sym |
184: | end |
185: | @instance.ruleset.event e.qname.to_s.to_sym, *temp |
186: | end |
187: | end |
188: | |
189: | if what == 'values' || what.nil? |
190: | @instance.clear(:values) |
191: | @properties.data.find("/p:properties/p:values/p:*").each do |e| |
192: | @instance.ruleset.value e.qname.to_s.to_sym, e.find('string(p:value)'), e.find('string(p:key)').to_sym |
193: | end |
194: | end |
195: | |
196: | if what == 'calls' || what.nil? |
197: | @instance.clear(:calls) |
198: | @properties.data.find("/p:properties/p:calls/p:*").each do |e| |
199: | @instance.ruleset.call e.qname.to_s.to_sym |
200: | @calls[e.qname.to_s.to_sym] = e.text |
201: | end |
202: | end |
203: | |
204: | if what == 'rules' || what.nil? |
205: | @instance.clear(:rules) |
206: | @properties.data.find("/p:properties/p:rules/p:*").each do |e| |
207: | conditions = [] |
208: | e.find('p:event|p:context').each do |t| |
209: | conditions << "@instance.ruleset.#{t.qname}.#{t.attributes['name']}#{t.attributes['condition'] ? "{#{t.attributes['condition']}}" : ''}" |
210: | end |
211: | eval <<-lave |
212: | @instance.ruleset.rule #{conditions.join(', ')} do |
213: | #{e.find('string(p:action)')} |
214: | end |
215: | lave |
216: | end |
217: | end |
218: | |
219: | state |
220: | end# }}} |
221: | |
222: | def notify(what,content={})# {{{ |
223: | item = @events[what] |
224: | if item |
225: | item.each do |ke,ur| |
226: | Thread.new(ke,ur) do |key,url| |
227: | ev = build_notification(key,what,content,'event') |
228: | if url.class == String |
229: | client = Riddl::Client.new(url) |
230: | client.post ev.map{|k,v|Riddl::Parameter::Simple.new(k,v)} |
231: | elsif url.class == Riddl::Utils::Notifications::Producer::WS |
232: | e = XML::Smart::string("<event/>") |
233: | ev.each do |k,v| |
234: | e.root.add(k,v) |
235: | end |
236: | url.send(e.to_s) |
237: | end |
238: | end |
239: | end |
240: | end |
241: | end# }}} |
242: | |
243: | def call_vote(what,content={})# {{{ |
244: | voteid = Digest::MD5.hexdigest(rand(Time.now.to_i).to_s) |
245: | item = @votes[what] |
246: | if item && item.length > 0 |
247: | continue = Wee::Continue.new |
248: | @votes_results[voteid] = [] |
249: | inum = 0 |
250: | item.each do |key,url| |
251: | if url.class == String |
252: | inum += 1 |
253: | elsif url.class == Riddl::Utils::Notifications::Producer::WS |
254: | inum += 1 unless url.closed? |
255: | end |
256: | end |
257: | |
258: | item.each do |key,url| |
259: | |
260: | Thread.new(key,url,content.dup) do |k,u,c| |
261: | callback = Digest::MD5.hexdigest(rand(Time.now).to_s) |
262: | c['callback'] = callback |
263: | vo = build_notification(k,what,c,'vote') |
264: | if u.class == String |
265: | client = Riddl::Client.new(u) |
266: | params = vo.map{|ke,vo|Riddl::Parameter::Simple.new(ke,vo)} |
267: | params << Riddl::Header.new("CPEE-CALLBACK",callback) |
268: | status, result, headers = client.post params |
269: | |
270: | if headers["CPEE_CALLBACK"] && headers["CPEE_CALLBACK"] == 'true' |
271: | @callbacks[callback] = Callback.new("vote #{vo.find{|a,b| a == 'notification'}[1]}", self, :vote_callback, what, k, :http, continue, voteid, callback, inum) |
272: | else |
273: | vote_callback(result,continue,voteid,callback, inum) |
274: | end |
275: | elsif u.class == Riddl::Utils::Notifications::Producer::WS |
276: | @callbacks[callback] = Callback.new("vote #{vo.find{|a,b| a == 'notification'}[1]}", self, :vote_callback, what, k, :ws, continue, voteid, callback, inum) |
277: | e = XML::Smart::string("<vote/>") |
278: | vo.each do |ke,va| |
279: | e.root.add(ke,va) |
280: | end |
281: | u.send(e.to_s) |
282: | end |
283: | end |
284: | |
285: | end |
286: | continue.wait |
287: | |
288: | !@votes_results.delete(voteid).include?(false) |
289: | else |
290: | true |
291: | end |
292: | end# }}} |
293: | |
294: | def vote_callback(result,continue,voteid,callback,num)# {{{ |
295: | @callbacks.delete(callback) |
296: | if result == :DELETE |
297: | @votes_results[voteid] << true |
298: | else |
299: | @votes_results[voteid] << (result && result[0] && result[0].value == 'true') |
300: | end |
301: | if (num == @votes_results[voteid].length) |
302: | continue.continue |
303: | end |
304: | end# }}} |
305: | |
306: | def add_ws(key,socket)# {{{ |
307: | @communication[key] = socket |
308: | @events.each do |a| |
309: | if a[1].has_key?(key) |
310: | a[1][key] = socket |
311: | end |
312: | end |
313: | @votes.each do |a| |
314: | if a[1].has_key?(key) |
315: | a[1][key] = socket |
316: | end |
317: | end |
318: | end# }}} |
319: | |
320: | def del_ws(key)# {{{ |
321: | @communication[key] = nil |
322: | @events.each do |a| |
323: | if a[1].has_key?(key) |
324: | a[1][key] = nil |
325: | end |
326: | end |
327: | @votes.each do |a| |
328: | if a[1].has_key?(key) |
329: | a[1][key] = nil |
330: | end |
331: | end |
332: | end# }}} |
333: | |
334: | private |
335: | |
336: | def build_notification(key,what,content,type)# {{{ |
337: | res = [] |
338: | res << ['key' , key] |
339: | res << ['topic' , ::File::dirname(what)] |
340: | res << [type , ::File::basename(what)] |
341: | res << ['notification', JSON::generate(content)] |
342: | res << ['uid' , Digest::MD5.hexdigest(Kernel::rand().to_s)] |
343: | res << ['fp' , Digest::MD5.hexdigest(res.join(''))] |
344: | # TODO add secret to fp |
345: | end# }}} |
346: | end |