2019-10-07 20:05:14 +01:00
|
|
|
module Pipeline::Rpc
|
2019-10-10 19:36:51 +01:00
|
|
|
|
2019-10-10 21:20:23 +01:00
|
|
|
class Router
|
2019-11-17 17:55:21 +00:00
|
|
|
attr_reader :zmq_context, :poller, :response_socket, :notification_socket, :container_versions, :config
|
2019-11-12 14:00:11 +00:00
|
|
|
|
|
|
|
|
def initialize(zmq_context, config)
|
2019-11-17 17:55:21 +00:00
|
|
|
@zmq_context = zmq_context
|
|
|
|
|
@config = config
|
|
|
|
|
|
2019-11-12 14:00:11 +00:00
|
|
|
@public_hostname = Socket.gethostname
|
|
|
|
|
@response_port = 5556
|
|
|
|
|
@notification_port = 5557
|
|
|
|
|
@front_end_port = 5555
|
2019-10-10 21:20:23 +01:00
|
|
|
|
|
|
|
|
@front_end = FrontEndSocket.new(zmq_context, @front_end_port)
|
|
|
|
|
@response_socket = ResponseSocket.new(zmq_context, @response_port)
|
|
|
|
|
|
|
|
|
|
@poller = ChannelPoller.new
|
|
|
|
|
@poller.register(@front_end)
|
|
|
|
|
@poller.register(@response_socket)
|
|
|
|
|
|
|
|
|
|
@in_flight_requests = RequestRegister.new
|
|
|
|
|
|
|
|
|
|
@backend_channels = {}
|
2019-11-17 17:55:21 +00:00
|
|
|
config.each_worker do |worker_class, worker_config|
|
2019-11-12 14:00:11 +00:00
|
|
|
worker_class = worker_class.to_sym
|
|
|
|
|
backend = @backend_channels[worker_class] = {}
|
|
|
|
|
worker_config.each do |k,v|
|
|
|
|
|
if k == "shared_queue"
|
|
|
|
|
topic = "*"
|
|
|
|
|
port = v
|
|
|
|
|
else
|
|
|
|
|
topic = k
|
|
|
|
|
port = v["queue"]
|
|
|
|
|
end
|
2019-12-09 23:56:42 +00:00
|
|
|
bind_address = "tcp://#{@public_hostname}:#{port}"
|
2019-11-11 22:19:30 +00:00
|
|
|
work_channel = WorkChannel.new(zmq_context, bind_address)
|
|
|
|
|
backend[topic] = work_channel
|
|
|
|
|
end
|
2019-10-10 21:20:23 +01:00
|
|
|
end
|
|
|
|
|
|
2019-12-09 23:56:42 +00:00
|
|
|
@worker_presence = WorkerPresence.new
|
|
|
|
|
|
2019-11-17 17:55:21 +00:00
|
|
|
load_container_versions!
|
2019-11-12 14:00:11 +00:00
|
|
|
|
2019-10-11 15:05:33 +01:00
|
|
|
@notification_socket = NotificationSocket.new(zmq_context, @notification_port)
|
2019-10-10 21:20:23 +01:00
|
|
|
end
|
|
|
|
|
|
|
|
|
|
def run
|
|
|
|
|
Thread.new do
|
|
|
|
|
response_socket.run_heartbeater
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
poller.listen_for_messages do |msg|
|
|
|
|
|
case msg
|
|
|
|
|
when FrontEndRequest
|
|
|
|
|
on_frontend_request(msg)
|
|
|
|
|
when ServiceResponse
|
|
|
|
|
on_service_response(msg)
|
|
|
|
|
end
|
|
|
|
|
end
|
|
|
|
|
end
|
2019-10-07 20:05:14 +01:00
|
|
|
|
2019-11-15 15:49:05 +00:00
|
|
|
def force_worker_restart!
|
|
|
|
|
@force_restart_at = Time.now
|
|
|
|
|
end
|
|
|
|
|
|
2019-10-07 20:05:14 +01:00
|
|
|
private
|
2019-10-07 21:46:30 +01:00
|
|
|
|
2019-11-17 17:55:21 +00:00
|
|
|
def load_container_versions!
|
|
|
|
|
@container_versions = {}
|
|
|
|
|
config.each_worker do |worker_class, worker_config|
|
|
|
|
|
worker_class = worker_class.to_sym
|
|
|
|
|
cv = @container_versions[worker_class] = {}
|
|
|
|
|
worker_config.each do |k,v|
|
|
|
|
|
if k != "shared_queue"
|
|
|
|
|
lang_spec = v
|
|
|
|
|
cv[k] = lang_spec["worker_versions"]
|
|
|
|
|
end
|
|
|
|
|
end
|
|
|
|
|
end
|
|
|
|
|
end
|
|
|
|
|
|
2019-10-10 21:20:23 +01:00
|
|
|
def on_service_response(msg)
|
|
|
|
|
if msg.type == "response"
|
2019-12-04 21:45:24 +00:00
|
|
|
@in_flight_requests.forward_as_response(msg)
|
2019-11-11 22:19:30 +00:00
|
|
|
elsif msg.type == "error_response"
|
2019-12-04 21:45:24 +00:00
|
|
|
@in_flight_requests.forward_as_error(msg)
|
2019-10-10 21:20:23 +01:00
|
|
|
elsif msg.type == "heartbeat"
|
|
|
|
|
@in_flight_requests.flush_expired_requests
|
|
|
|
|
emit_current_spec
|
2019-12-09 23:56:42 +00:00
|
|
|
elsif msg.type == "worker_heartbeat"
|
|
|
|
|
identity = msg.parsed_msg["identity"]
|
|
|
|
|
queues = msg.parsed_msg["workqueue_addresses"]
|
2019-12-20 14:34:32 +00:00
|
|
|
puts "worker heartbeat #{msg.parsed_msg}"
|
2019-12-20 14:58:04 +00:00
|
|
|
@worker_presence.mark_seen!(identity, queues, msg.parsed_msg)
|
2019-10-10 21:20:23 +01:00
|
|
|
else
|
2019-11-11 22:19:30 +00:00
|
|
|
puts "Unrecognised message: #{msg.type} #{msg.parsed_msg}"
|
2019-10-10 21:20:23 +01:00
|
|
|
end
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
def on_frontend_request(req)
|
2019-10-10 19:36:51 +01:00
|
|
|
req.handle do |action|
|
|
|
|
|
if action == "configure_worker"
|
|
|
|
|
respond_with_worker_config(req)
|
|
|
|
|
elsif action == "analyze_iteration"
|
2019-12-04 21:45:24 +00:00
|
|
|
# TODO check mandatory args
|
|
|
|
|
req.ensure_param("id")
|
|
|
|
|
req.ensure_param("track_slug")
|
|
|
|
|
req.ensure_param("exercise_slug")
|
|
|
|
|
req.ensure_param("s3_uri")
|
|
|
|
|
req.ensure_param("container_version")
|
2019-10-10 19:36:51 +01:00
|
|
|
handle_with_worker(:static_analyzers, req)
|
|
|
|
|
elsif action == "test_solution"
|
2019-12-04 21:45:24 +00:00
|
|
|
req.ensure_param("id")
|
|
|
|
|
req.ensure_param("track_slug")
|
|
|
|
|
req.ensure_param("exercise_slug")
|
|
|
|
|
req.ensure_param("s3_uri")
|
|
|
|
|
req.ensure_param("container_version")
|
2019-10-10 19:36:51 +01:00
|
|
|
handle_with_worker(:test_runners, req)
|
|
|
|
|
elsif action == "represent"
|
2019-12-04 21:45:24 +00:00
|
|
|
# TODO check mandatory args
|
|
|
|
|
req.ensure_param("id")
|
|
|
|
|
req.ensure_param("track_slug")
|
|
|
|
|
req.ensure_param("container_version")
|
2019-10-10 19:36:51 +01:00
|
|
|
handle_with_worker(:representers, req)
|
2019-11-17 14:49:20 +00:00
|
|
|
elsif action == "build_container"
|
|
|
|
|
handle_with_worker(:builders, req)
|
2019-11-15 15:49:05 +00:00
|
|
|
elsif action == "restart_workers"
|
|
|
|
|
force_worker_restart!
|
|
|
|
|
req.send_result({ message: "Request accepted" })
|
|
|
|
|
elsif action == "restart_router"
|
|
|
|
|
force_worker_restart!
|
|
|
|
|
req.send_result({ message: "Request accepted" })
|
|
|
|
|
elsif action == "current_config"
|
|
|
|
|
req.send_result({ container_versions: container_versions })
|
2019-11-17 17:55:21 +00:00
|
|
|
elsif action == "update_container_versions"
|
|
|
|
|
update_container_versions(req)
|
2019-11-17 18:03:38 +00:00
|
|
|
elsif action == "deploy_container_version"
|
|
|
|
|
update_container_versions(req)
|
2019-11-15 15:49:05 +00:00
|
|
|
elsif action == "list_available_containers"
|
|
|
|
|
channel = req.parsed_msg["channel"]
|
|
|
|
|
track_slug = req.parsed_msg["track_slug"]
|
|
|
|
|
c = temp_credentials
|
|
|
|
|
puts "C #{c}"
|
|
|
|
|
credentials = to_aws_credentials(c)
|
|
|
|
|
container_repo = Pipeline::Runtime::RuntimeEnvironment.container_repo(channel, track_slug, nil)
|
|
|
|
|
images = container_repo.images_info
|
|
|
|
|
req.send_result({ list_images: images })
|
2019-12-20 14:58:04 +00:00
|
|
|
elsif action == "describe_workers"
|
|
|
|
|
req.send_result({ workers_info: @worker_presence.workers_info })
|
2019-12-20 15:28:39 +00:00
|
|
|
elsif action == "current_worker_status"
|
|
|
|
|
req.send_result({ workers_status: current_worker_status })
|
2019-12-20 16:30:00 +00:00
|
|
|
elsif action == "deployment_check"
|
|
|
|
|
channel = req.parsed_msg["channel"]
|
|
|
|
|
language_slug = req.parsed_msg["track_slug"]
|
|
|
|
|
status = { status: version_check(channel, language_slug) }
|
|
|
|
|
req.send_result(status)
|
2019-10-10 19:36:51 +01:00
|
|
|
else
|
2019-12-04 21:45:24 +00:00
|
|
|
req.send_error("Action <#{action}> unrecognised", 501)
|
2019-10-10 19:36:51 +01:00
|
|
|
end
|
2019-10-10 18:20:28 +01:00
|
|
|
end
|
|
|
|
|
end
|
|
|
|
|
|
2019-12-20 16:30:00 +00:00
|
|
|
def version_check(channel, language_slug)
|
|
|
|
|
status = current_worker_status[channel]
|
|
|
|
|
worker_count = status[:online_workers].size
|
|
|
|
|
deployed_versions = status[:deployed_versions][language_slug]
|
|
|
|
|
check_status = {}
|
|
|
|
|
deployed_versions.map do |version, workers|
|
|
|
|
|
check_status[version] = workers.count >= worker_count
|
|
|
|
|
end
|
|
|
|
|
check_status
|
|
|
|
|
end
|
|
|
|
|
|
2019-12-20 15:28:39 +00:00
|
|
|
def current_worker_status
|
|
|
|
|
status = {}
|
2019-12-20 16:15:32 +00:00
|
|
|
container_versions.each do |worker_class, target_versions|
|
2019-12-20 15:28:39 +00:00
|
|
|
channel = select_channel(worker_class)
|
|
|
|
|
addresses = []
|
|
|
|
|
channel.each do |key, backend|
|
|
|
|
|
addresses << backend.public_address
|
|
|
|
|
end
|
2019-12-20 15:42:00 +00:00
|
|
|
workers = @worker_presence.list_for(addresses)
|
2019-12-20 15:57:17 +00:00
|
|
|
deployed_versions = Hash.new {|h,k| h[k] = Hash.new {|h,k| h[k] = []} }
|
2019-12-20 16:18:27 +00:00
|
|
|
target_versions.each do |lang, versions|
|
|
|
|
|
versions.each do |version|
|
|
|
|
|
deployed_versions[lang][version] = []
|
|
|
|
|
end
|
2019-12-20 16:15:32 +00:00
|
|
|
end
|
|
|
|
|
worker_ids = []
|
2019-12-20 15:57:17 +00:00
|
|
|
workers.each do |worker|
|
2019-12-20 16:06:48 +00:00
|
|
|
identity = worker[:identity]
|
2019-12-20 16:15:32 +00:00
|
|
|
worker_ids << identity
|
2019-12-20 16:10:31 +00:00
|
|
|
worker[:info]["deployed_versions"].each do |lang, versions|
|
|
|
|
|
versions.each do |version|
|
|
|
|
|
deployed_versions[lang][version] << identity
|
|
|
|
|
end
|
2019-12-20 15:57:17 +00:00
|
|
|
end
|
|
|
|
|
end
|
2019-12-20 15:28:39 +00:00
|
|
|
status[worker_class] = {
|
2019-12-20 16:15:32 +00:00
|
|
|
online_workers: worker_ids,
|
2019-12-20 15:57:17 +00:00
|
|
|
deployed_versions: deployed_versions
|
2019-12-20 15:28:39 +00:00
|
|
|
}
|
|
|
|
|
end
|
|
|
|
|
status
|
|
|
|
|
end
|
|
|
|
|
|
2019-11-15 15:49:05 +00:00
|
|
|
def to_aws_credentials(raw_credentials)
|
|
|
|
|
key = raw_credentials["access_key_id"]
|
|
|
|
|
secret = raw_credentials["secret_access_key"]
|
|
|
|
|
session = raw_credentials["session_token"]
|
|
|
|
|
Aws::Credentials.new(key, secret, session)
|
|
|
|
|
end
|
|
|
|
|
|
2019-10-10 19:36:51 +01:00
|
|
|
def handle_with_worker(worker_class, req)
|
2019-12-04 21:45:24 +00:00
|
|
|
if req.params_missing?
|
|
|
|
|
puts "MISSING"
|
|
|
|
|
error = {
|
|
|
|
|
missing_params: req.missing_params
|
|
|
|
|
}
|
|
|
|
|
req.send_error("Missing mandatory paraneters", 502, error)
|
|
|
|
|
return
|
|
|
|
|
end
|
2019-12-09 22:08:15 +00:00
|
|
|
if req.versioned?
|
|
|
|
|
track_slug = req.parsed_msg["track_slug"]
|
|
|
|
|
container_version = req.parsed_msg["container_version"]
|
|
|
|
|
puts "CHECK VERSION #{worker_class} #{track_slug} #{container_version}"
|
|
|
|
|
configured_versions = container_versions[worker_class][track_slug]
|
|
|
|
|
if configured_versions.nil?
|
|
|
|
|
req.send_error("No configuration for track_slug <#{track_slug}>", 502)
|
|
|
|
|
return
|
|
|
|
|
elsif !configured_versions.include?(container_version)
|
|
|
|
|
req.send_error("Container <#{track_slug}>:<#{container_version}> is not deployed. Configured versions are: #{configured_versions}", 505)
|
|
|
|
|
return
|
|
|
|
|
end
|
|
|
|
|
end
|
2019-11-17 14:49:20 +00:00
|
|
|
channel = select_channel(worker_class)
|
2019-10-10 18:20:28 +01:00
|
|
|
if channel.nil?
|
2019-12-04 21:45:24 +00:00
|
|
|
req.send_error("worker_class <#{worker_class}> unrecognised", 502)
|
|
|
|
|
return
|
2019-11-11 22:19:30 +00:00
|
|
|
end
|
2019-12-04 21:45:24 +00:00
|
|
|
select_backend_and_forward(req, channel)
|
2019-11-11 22:19:30 +00:00
|
|
|
end
|
|
|
|
|
|
2019-11-17 14:49:20 +00:00
|
|
|
def select_channel(worker_class)
|
|
|
|
|
@backend_channels[worker_class]
|
|
|
|
|
end
|
|
|
|
|
|
2019-11-11 22:19:30 +00:00
|
|
|
def select_backend_and_forward(req, channel)
|
2019-12-09 23:56:42 +00:00
|
|
|
addresses = []
|
2019-11-11 22:19:30 +00:00
|
|
|
track_slug = req.parsed_msg["track_slug"]
|
|
|
|
|
backend = channel[track_slug]
|
2019-12-11 09:53:17 +00:00
|
|
|
if backend
|
|
|
|
|
addresses << backend.public_address
|
|
|
|
|
if backend.worker_available?
|
|
|
|
|
forward(backend, req)
|
|
|
|
|
return
|
|
|
|
|
end
|
2019-11-11 22:19:30 +00:00
|
|
|
end
|
|
|
|
|
backend = channel["*"]
|
2019-12-11 09:53:17 +00:00
|
|
|
if backend
|
|
|
|
|
addresses << backend.public_address
|
|
|
|
|
if backend.worker_available?
|
|
|
|
|
forward(backend, req)
|
|
|
|
|
return
|
|
|
|
|
end
|
2019-10-10 18:20:28 +01:00
|
|
|
end
|
2019-12-11 09:53:17 +00:00
|
|
|
info = {
|
|
|
|
|
current_worker_count: @worker_presence.count_for(addresses)
|
|
|
|
|
}
|
|
|
|
|
req.send_error("No workers available for <#{track_slug}>", 503, info)
|
2019-10-10 18:20:28 +01:00
|
|
|
end
|
|
|
|
|
|
2019-11-11 22:19:30 +00:00
|
|
|
def forward(backend, req)
|
|
|
|
|
context = { credentials: temp_credentials }
|
|
|
|
|
@in_flight_requests.register(req)
|
|
|
|
|
backend.forward_to_backend(req, context)
|
|
|
|
|
end
|
|
|
|
|
|
2019-10-10 18:20:28 +01:00
|
|
|
def emit_current_spec
|
|
|
|
|
m = {
|
2019-10-11 15:05:33 +01:00
|
|
|
action: "configure",
|
2019-10-14 17:59:47 +01:00
|
|
|
specs: container_versions
|
2019-10-10 18:20:28 +01:00
|
|
|
}
|
2019-11-12 12:31:00 +00:00
|
|
|
m[:force_restart_at] = @force_restart_at.to_i if @force_restart_at
|
2019-10-10 18:20:28 +01:00
|
|
|
set_temp_credentials(m)
|
2019-11-12 12:31:00 +00:00
|
|
|
notification_socket.emit(m)
|
2019-10-10 18:20:28 +01:00
|
|
|
end
|
|
|
|
|
|
2019-10-10 19:36:51 +01:00
|
|
|
def respond_with_worker_config(req)
|
2019-10-11 15:05:33 +01:00
|
|
|
channel = req.parsed_msg["channel"]
|
|
|
|
|
if channel.nil?
|
|
|
|
|
req.send_error({ msg: "channel unknown" })
|
|
|
|
|
return
|
|
|
|
|
end
|
|
|
|
|
channel = channel.to_sym
|
|
|
|
|
analyzer_spec = {}
|
2019-10-14 17:59:47 +01:00
|
|
|
analyzer_spec["specs"] = container_versions
|
2019-11-11 22:19:30 +00:00
|
|
|
|
|
|
|
|
topics = req.parsed_msg["topics"] || ["*"]
|
|
|
|
|
workqueue_addresses = []
|
|
|
|
|
|
|
|
|
|
channel_entry = @backend_channels[channel]
|
|
|
|
|
topics.each do |topic|
|
2019-11-12 12:31:00 +00:00
|
|
|
next unless channel_entry.has_key?(topic)
|
2019-11-11 22:19:30 +00:00
|
|
|
port = channel_entry[topic].port
|
|
|
|
|
workqueue_addresses << "tcp://#{@public_hostname}:#{port}"
|
|
|
|
|
end
|
|
|
|
|
|
2019-10-11 15:05:33 +01:00
|
|
|
analyzer_spec[:channel] = {
|
|
|
|
|
channel: channel,
|
2019-11-11 22:19:30 +00:00
|
|
|
workqueue_addresses: workqueue_addresses,
|
2019-10-11 15:05:33 +01:00
|
|
|
response_address: "tcp://#{@public_hostname}:#{@response_port}",
|
|
|
|
|
notification_address: "tcp://#{@public_hostname}:#{@notification_port}"
|
2019-10-10 18:20:28 +01:00
|
|
|
}
|
2019-10-10 19:36:51 +01:00
|
|
|
analyzer_spec["credentials"] = temp_credentials
|
|
|
|
|
req.send_result(analyzer_spec)
|
2019-10-07 21:28:33 +01:00
|
|
|
end
|
|
|
|
|
|
2019-11-17 17:55:21 +00:00
|
|
|
def update_container_versions(req)
|
|
|
|
|
channel = req.parsed_msg["channel"]
|
|
|
|
|
if channel.nil?
|
|
|
|
|
req.send_error({ msg: "channel unknown" })
|
|
|
|
|
return
|
|
|
|
|
end
|
|
|
|
|
track_slug = req.parsed_msg["track_slug"]
|
2019-11-17 18:03:38 +00:00
|
|
|
# TODO error if args are bad
|
|
|
|
|
if req.parsed_msg["action"] == "update_container_versions"
|
|
|
|
|
versions = req.parsed_msg["versions"]
|
|
|
|
|
config.update_container_versions!(channel, track_slug, versions)
|
|
|
|
|
elsif req.parsed_msg["action"] == "deploy_container_version"
|
|
|
|
|
new_version = req.parsed_msg["new_version"]
|
|
|
|
|
config.add_container_version!(channel, track_slug, new_version)
|
2019-11-17 19:40:31 +00:00
|
|
|
elsif req.parsed_msg["action"] == "unload_container_version"
|
|
|
|
|
new_version = req.parsed_msg["new_version"]
|
|
|
|
|
req.send_error({ msg: "action not yet implemented" })
|
2019-11-17 18:03:38 +00:00
|
|
|
else
|
|
|
|
|
req.send_error({ msg: "action unknown" })
|
|
|
|
|
return
|
|
|
|
|
end
|
2019-11-17 17:55:21 +00:00
|
|
|
load_container_versions!
|
|
|
|
|
req.send_result({ container_versions: container_versions })
|
|
|
|
|
end
|
|
|
|
|
|
2019-10-10 19:36:51 +01:00
|
|
|
def set_temp_credentials(msg)
|
|
|
|
|
msg["credentials"] = temp_credentials
|
|
|
|
|
msg
|
2019-10-07 21:46:30 +01:00
|
|
|
end
|
2019-10-07 20:05:14 +01:00
|
|
|
|
2019-10-10 19:36:51 +01:00
|
|
|
def temp_credentials
|
2019-10-07 21:46:30 +01:00
|
|
|
sts = Aws::STS::Client.new(region: "eu-west-1")
|
|
|
|
|
session = sts.get_session_token(duration_seconds: 900)
|
2019-10-10 19:36:51 +01:00
|
|
|
session.to_h[:credentials]
|
2019-10-07 20:05:14 +01:00
|
|
|
end
|
|
|
|
|
end
|
|
|
|
|
end
|