Files
analyzer-pipeline/bin/router
2019-09-30 09:26:08 +01:00

116 lines
3.0 KiB
Ruby

#!/usr/bin/env ruby
require "bundler/setup"
$LOAD_PATH.unshift File.expand_path("../../lib", __FILE__)
require "pipeline"
context = ZMQ::Context.new
front_end_socket = context.socket(ZMQ::ROUTER)
front_end_socket.bind('tcp://*:5566')
back_end_socket = context.socket(ZMQ::DEALER)
# back_end_socket = context.socket(ZMQ::ROUTER)
@back_end_socket = back_end_socket
status_socket = context.socket(ZMQ::SUB)
status_socket.setsockopt(ZMQ::SUBSCRIBE, "")
status_socket.bind('tcp://*:5555')
poller = ZMQ::Poller.new
poller.register(back_end_socket, ZMQ::POLLIN)
poller.register(front_end_socket, ZMQ::POLLIN)
poller.register(status_socket, ZMQ::POLLIN)
@workers = {}
def active_workers
active = []
cut_off = Time.now.to_i - 2000
puts "Cut off #{cut_off}"
@workers.each do |k, worker|
puts "worker: #{worker}"
last_seen = worker[:last_seen]
puts "last_seen #{last_seen}"
active << worker if last_seen > cut_off
end
active
end
def check_active
puts "------------------------------"
inactive = []
cut_off = Time.now.to_i - 15
puts "Cut off #{cut_off}"
@workers.each do |k, worker|
last_seen = worker[:last_seen]
puts "last_seen #{last_seen}. #{last_seen} > #{cut_off} .. #{last_seen > cut_off}"
worker[:active] = last_seen > cut_off
unless worker[:active]
inactive << worker
end
puts "------------------------------"
puts worker
puts "------------------------------"
end
inactive.each do |inactive_worker|
puts inactive_worker
address = inactive_worker[:status]["address"]
puts "Unsub #{address}"
@back_end_socket.disconnect(address)
end
end
loop do
poll_result = poller.poll
break if poll_result == -1
puts "POLL #{poll_result}"
readables = poller.readables
puts "readables #{poller.readables.size}"
puts "writables #{poller.writables.size}"
puts "workers #{@workers.size}"
continue if readables.empty?
readables.each do |readable|
case readable
when status_socket
puts "..."
msg = ""
status_socket.recv_string(msg)
status_message = JSON.parse(msg)
address = status_message["address"]
identity = status_message["identity"]
@workers[identity] = { last_seen: Time.now.to_i, status: status_message }
back_end_socket.connect(address)
puts "STATUS: #{msg}"
check_active
when front_end_socket
check_active
workers = active_workers
if workers.empty?
puts "no workers"
msg = []
front_end_socket.recv_strings(msg)
reply = [msg.first, "", { status: :failed }.to_json]
front_end_socket.send_strings(reply)
else
msg = []
front_end_socket.recv_strings(msg)
result = back_end_socket.send_strings(msg, 1000)
puts result
end
when back_end_socket
msg = []
back_end_socket.recv_strings(msg)
puts "HERER!!!! #{msg}"
front_end_socket.send_strings(msg)
end
end
end
# ZMQ::Device.create(front_end_socket, back_end_socket)