Routing to backend workers via a shared response channel

This commit is contained in:
Charles Care
2019-10-07 09:25:07 +01:00
parent 832d0cbb7b
commit 2dcd14072f
7 changed files with 198 additions and 26 deletions

View File

@@ -31,6 +31,7 @@ class PipelineClient
response = ""
recv_result = socket.recv_string(response)
puts recv_result
puts response
raise("RCV timeout") if recv_result < 0
parsed = JSON.parse(response)
return parsed
@@ -49,7 +50,7 @@ class PipelineClient
end
def analyze(track_slug, exercise_slug, solution_slug, iteration_folder)
send_msg("analyze_#{track_slug}|#{exercise_slug}|#{solution_slug}|#{iteration_folder}", 1000)
send_msg("analyze_#{track_slug}|#{exercise_slug}|#{solution_slug}|#{iteration_folder}", 10000)
end
end

View File

@@ -7,19 +7,19 @@ pipeline = PipelineClient.new
# return
lang = ARGV[0] || "ruby"
pipeline.build_test_runner(lang)
# pipeline.build_test_runner(lang)
# pipeline.release_latest(lang)
# exit
# r = pipeline.analyze(lang, "two-fer", "soln-42", "s3://exercism-dev/iterations/fff07700-e1c3-402d-8937-823aeefb159f")
# puts r
# if r["logs"]
# r["logs"].each do |log_line|
# puts "+ #{log_line["cmd"]}"
# puts log_line["stdout"]
# puts log_line["stderr"]
# end
# end
#
# puts r["result"]
r = pipeline.analyze(lang, "two-fer", "soln-42", "s3://exercism-dev/iterations/fff07700-e1c3-402d-8937-823aeefb159f")
puts r
if r["logs"]
r["logs"].each do |log_line|
puts "+ #{log_line["cmd"]}"
puts log_line["stdout"]
puts log_line["stderr"]
end
end
puts r["result"]

View File

@@ -75,6 +75,8 @@ loop do
continue if readables.empty?
in_flight = {}
readables.each do |readable|
case readable
when status_socket
@@ -87,19 +89,24 @@ loop do
@workers[identity] = { last_seen: Time.now.to_i, status: status_message }
back_end_socket.connect(address)
puts "STATUS: #{msg}"
puts "in_flight: #{in_flight}"
check_active
when front_end_socket
check_active
workers = active_workers
puts "WORKERS #{workers}"
if workers.empty?
puts "no workers"
msg = []
front_end_socket.recv_strings(msg)
puts "Address #{msg.first} | #{msg}"
reply = [msg.first, "", { status: :failed }.to_json]
front_end_socket.send_strings(reply)
else
msg = []
front_end_socket.recv_strings(msg)
puts "Address #{msg.first} | #{msg}"
in_flight[msg.first] = msg
result = back_end_socket.send_strings(msg, 1000)
puts result
end

145
bin/router2 Normal file
View File

@@ -0,0 +1,145 @@
#!/usr/bin/env ruby
require "bundler/setup"
$LOAD_PATH.unshift File.expand_path("../../lib", __FILE__)
require "pipeline"
context = ZMQ::Context.new
front_end_socket = context.socket(ZMQ::ROUTER)
front_end_socket.bind('tcp://*:5566')
back_end_socket = context.socket(ZMQ::PUSH)
@back_end_socket = back_end_socket
resp_socket = context.socket(ZMQ::SUB)
resp_socket.setsockopt(ZMQ::SUBSCRIBE, "")
resp_socket.bind('tcp://*:5556')
status_socket = context.socket(ZMQ::SUB)
status_socket.setsockopt(ZMQ::SUBSCRIBE, "")
status_socket.bind('tcp://*:5555')
poller = ZMQ::Poller.new
# poller.register(back_end_socket, ZMQ::POLLIN)
poller.register(front_end_socket, ZMQ::POLLIN)
poller.register(status_socket, ZMQ::POLLIN)
poller.register(resp_socket, ZMQ::POLLIN)
workers_poller = ZMQ::Poller.new
workers_poller.register(back_end_socket, ZMQ::POLLOUT)
@workers = {}
def active_workers
active = []
@workers.each do |k, worker|
active << worker if worker[:active]
end
active
end
def check_active
puts "------------------------------"
inactive = []
cut_off = Time.now.to_i - 15
puts "Cut off #{cut_off}"
@workers.each do |k, worker|
last_seen = worker[:last_seen]
puts "last_seen #{last_seen}. #{last_seen} > #{cut_off} .. #{last_seen > cut_off}"
worker[:active] = last_seen > cut_off
unless worker[:active]
inactive << worker
end
puts "------------------------------"
puts worker
puts "------------------------------"
end
inactive.each do |inactive_worker|
puts inactive_worker
address = inactive_worker[:status]["address"]
puts "Unsub #{address}"
@back_end_socket.disconnect(address)
end
end
loop do
poll_result = poller.poll
break if poll_result == -1
puts "POLL #{poll_result}"
readables = poller.readables
puts "readables #{poller.readables.size}"
puts "writables #{poller.writables.size}"
puts "workers #{@workers.size}"
continue if readables.empty?
in_flight = {}
readables.each do |readable|
case readable
when resp_socket
puts "resp_socket"
msg = ""
resp_socket.recv_string(msg)
puts msg
puts "^^^^^^^^^^^^^^^^^^^^^^^^^^^^^"
when status_socket
puts "..."
msg = ""
status_socket.recv_string(msg)
puts "STATUS MSG: #{msg} "
status_message = JSON.parse(msg)
type = status_message["msg_type"]
puts "STATUS MSG TYPE: #{status_message["msg_type"]} "
if type == "status"
address = status_message["address"]
identity = status_message["identity"]
@workers[identity] = { last_seen: Time.now.to_i, status: status_message }
back_end_socket.connect(address)
puts "in_flight: #{in_flight}"
check_active
else
puts "OTHER"
return_address = status_message["return_address"]
puts return_address
puts return_address.pack("c*")
reply = [return_address.pack("c*"), "", msg]
front_end_socket.send_strings(reply)
end
when front_end_socket
check_active
workers = active_workers
puts "WORKERS #{workers}"
if workers.empty?
puts "no workers"
msg = []
front_end_socket.recv_strings(msg)
puts "Address #{msg.first} | #{msg}"
reply = [msg.first, "", { status: :failed }.to_json]
front_end_socket.send_strings(reply)
else
poll_result = workers_poller.poll
puts "WORKERS POLL #{poll_result}"
msg = []
front_end_socket.recv_strings(msg)
puts "Address #{msg.first} | #{msg}"
in_flight[msg.first] = msg
puts "send1"
result = back_end_socket.send_strings(msg, ZMQ::DONTWAIT)
puts "send2"
puts result
end
when back_end_socket
msg = []
back_end_socket.recv_strings(msg)
puts "HERER!!!! #{msg}"
front_end_socket.send_strings(msg)
end
end
end
# ZMQ::Device.create(front_end_socket, back_end_socket)

11
bin/worker Normal file
View File

@@ -0,0 +1,11 @@
#!/usr/bin/env ruby
require "bundler/setup"
$LOAD_PATH.unshift File.expand_path("../../lib", __FILE__)
require "pipeline"
Pipeline.load_config(File.expand_path('../../config/pipeline.yml', __FILE__))
# Pipeline.release("ruby")
Pipeline.daemon
# Pipeline.build_analyzer "ruby"
# Pipeline.build_test_runner "ruby"