Wire in configuration messages
This commit is contained in:
@@ -5,7 +5,8 @@ $LOAD_PATH.unshift File.expand_path("../../lib", __FILE__)
|
||||
require "pipeline"
|
||||
|
||||
worker_identity = ARGV[0]
|
||||
env_base = ARGV[1]
|
||||
channel_address = ARGV[1]
|
||||
env_base = ARGV[2]
|
||||
|
||||
server = Pipeline::Rpc::Worker.new(worker_identity, env_base)
|
||||
server = Pipeline::Rpc::Worker.new(worker_identity, channel_address, env_base)
|
||||
server.listen
|
||||
|
||||
@@ -194,6 +194,23 @@ module Pipeline::Rpc
|
||||
|
||||
end
|
||||
|
||||
class NotificationSocket
|
||||
|
||||
attr_reader :socket
|
||||
|
||||
def initialize(zmq_context, port)
|
||||
@zmq_context = zmq_context
|
||||
@port = port
|
||||
@socket = zmq_context.socket(ZMQ::PUB)
|
||||
@socket.bind("tcp://*:#{@port}")
|
||||
end
|
||||
|
||||
def emit_configuration(configuration)
|
||||
@socket.send_string(configuration.to_json)
|
||||
end
|
||||
|
||||
end
|
||||
|
||||
class ChannelPoller
|
||||
|
||||
def initialize
|
||||
@@ -227,7 +244,7 @@ module Pipeline::Rpc
|
||||
end
|
||||
|
||||
class Router
|
||||
attr_reader :zmq_context, :poller, :response_socket
|
||||
attr_reader :zmq_context, :poller, :response_socket, :notification_socket
|
||||
|
||||
def initialize(zmq_context)
|
||||
@zmq_context = zmq_context
|
||||
@@ -259,6 +276,9 @@ module Pipeline::Rpc
|
||||
@backend_channels[type] = work_channel
|
||||
end
|
||||
|
||||
@notification_port = 5556
|
||||
@notification_socket = NotificationSocket.new(zmq_context, @notification_port)
|
||||
|
||||
end
|
||||
|
||||
def run
|
||||
@@ -332,20 +352,40 @@ module Pipeline::Rpc
|
||||
def emit_current_spec
|
||||
analyzer_spec = analyzer_versions
|
||||
m = {
|
||||
action: "analyzer_spec",
|
||||
spec: analyzer_spec[:analyzer_spec]
|
||||
action: "configure",
|
||||
specs: analyzer_spec
|
||||
}
|
||||
set_temp_credentials(m)
|
||||
message = ["_", "", m.to_json]
|
||||
# message = ["_", "", m.to_json]
|
||||
puts "TODO"
|
||||
puts message
|
||||
puts m
|
||||
notification_socket.emit_configuration(m)
|
||||
|
||||
# @backend_channels.each do |channel_name,v|
|
||||
# m = {
|
||||
# action: "configure",
|
||||
# channel: channel_name,
|
||||
# spec: analyzer_spec[:analyzer_spec]
|
||||
# }
|
||||
# puts v
|
||||
# puts m
|
||||
# end
|
||||
end
|
||||
|
||||
def respond_with_worker_config(req)
|
||||
analyzer_spec = analyzer_versions
|
||||
analyzer_spec[:channels] = {
|
||||
workqueue_address: "tcp://#{@public_hostname}:#{@work_channel_ports[:static_analyzers]}",
|
||||
response_address: "tcp://#{@public_hostname}:#{@response_port}"
|
||||
channel = req.parsed_msg["channel"]
|
||||
if channel.nil?
|
||||
req.send_error({ msg: "channel unknown" })
|
||||
return
|
||||
end
|
||||
channel = channel.to_sym
|
||||
analyzer_spec = {}
|
||||
analyzer_spec["specs"] = analyzer_versions
|
||||
analyzer_spec[:channel] = {
|
||||
channel: channel,
|
||||
workqueue_address: "tcp://#{@public_hostname}:#{@work_channel_ports[channel]}",
|
||||
response_address: "tcp://#{@public_hostname}:#{@response_port}",
|
||||
notification_address: "tcp://#{@public_hostname}:#{@notification_port}"
|
||||
}
|
||||
analyzer_spec["credentials"] = temp_credentials
|
||||
req.send_result(analyzer_spec)
|
||||
|
||||
@@ -1,98 +1,8 @@
|
||||
class Pipeline::Rpc::Worker
|
||||
class Pipeline::Rpc::WorkerAction
|
||||
|
||||
attr_reader :identity, :context, :incoming, :outgoing, :environment
|
||||
attr_accessor :environment, :request
|
||||
|
||||
def initialize(identity, env_base)
|
||||
@identity = identity
|
||||
@context = ZMQ::Context.new(1)
|
||||
@incoming = context.socket(ZMQ::PULL)
|
||||
@environment = Pipeline::Runtime::RuntimeEnvironment.new(env_base)
|
||||
end
|
||||
|
||||
def setup
|
||||
@setup = context.socket(ZMQ::REQ)
|
||||
@setup.setsockopt(ZMQ::LINGER, 0)
|
||||
@setup.connect("tcp://localhost:5566")
|
||||
request = {
|
||||
action: "configure_worker",
|
||||
role: "static_analyzer"
|
||||
}
|
||||
@setup.send_string(request.to_json)
|
||||
msg = ""
|
||||
@setup.recv_string(msg)
|
||||
msg = JSON.parse(msg)
|
||||
analyzer_spec = msg["analyzer_spec"]
|
||||
raise "No spec received" if analyzer_spec.nil?
|
||||
|
||||
puts msg["channels"]
|
||||
response_address = msg["channels"]["response_address"]
|
||||
request_address = msg["channels"]["workqueue_address"]
|
||||
|
||||
@outgoing = context.socket(ZMQ::PUB)
|
||||
@outgoing.connect(response_address)
|
||||
|
||||
credentials = parse_credentials(msg)
|
||||
@setup.close
|
||||
|
||||
environment.prepare
|
||||
|
||||
configure_containers(analyzer_spec, credentials)
|
||||
# analyzer_spec.each do |language_slug, versions|
|
||||
# puts "Preparing #{language_slug} #{versions}"
|
||||
# versions.each do |version|
|
||||
# if environment.released?(language_slug, version)
|
||||
# puts "Already installed #{language_slug}:#{version}"
|
||||
# else
|
||||
# puts "Installed #{language_slug}"
|
||||
# environment.release_analyzer(language_slug, version, credentials)
|
||||
# end
|
||||
# end
|
||||
# end
|
||||
|
||||
|
||||
incoming.connect(request_address)
|
||||
end
|
||||
|
||||
def configure_containers(spec, credentials)
|
||||
spec.each do |language_slug, versions|
|
||||
puts "Preparing #{language_slug} #{versions}"
|
||||
versions.each do |version|
|
||||
if environment.released?(language_slug, version)
|
||||
puts "Already installed #{language_slug}:#{version}"
|
||||
else
|
||||
puts "Installed #{language_slug}"
|
||||
environment.release_analyzer(language_slug, version, credentials)
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def listen
|
||||
setup
|
||||
|
||||
loop do
|
||||
msg = []
|
||||
incoming.recv_strings(msg)
|
||||
puts "Received request. Data: #{msg.inspect}"
|
||||
return_address = msg[0].unpack('c*')
|
||||
raw_request = msg[2]
|
||||
request = JSON.parse(raw_request)
|
||||
action = request["action"]
|
||||
if action == "analyze_iteration"
|
||||
result = analyze(request)
|
||||
result["return_address"] = return_address
|
||||
result['msg_type'] = 'response'
|
||||
outgoing.send_string(result.to_json)
|
||||
elsif action == "analyzer_spec"
|
||||
puts request
|
||||
puts "!!!!!"
|
||||
analyzer_spec = request["spec"]
|
||||
credentials = parse_credentials(request)
|
||||
configure_containers(analyzer_spec, credentials)
|
||||
else
|
||||
puts "HERE ELSE: #{request}"
|
||||
end
|
||||
end
|
||||
def invoke
|
||||
end
|
||||
|
||||
def parse_credentials(request)
|
||||
@@ -103,7 +13,18 @@ class Pipeline::Rpc::Worker
|
||||
Aws::Credentials.new(key, secret, session)
|
||||
end
|
||||
|
||||
def analyze(request)
|
||||
end
|
||||
|
||||
class Pipeline::Rpc::AnalyzeAction < Pipeline::Rpc::WorkerAction
|
||||
|
||||
attr_reader :reader, :return_address
|
||||
|
||||
def initialize(request, return_address)
|
||||
@request = request
|
||||
@return_address = return_address
|
||||
end
|
||||
|
||||
def invoke
|
||||
s3 = Aws::S3::Client.new(
|
||||
credentials: parse_credentials(request["context"]),
|
||||
region: "eu-west-1")
|
||||
@@ -141,7 +62,10 @@ class Pipeline::Rpc::Worker
|
||||
end
|
||||
end
|
||||
begin
|
||||
analysis_run.analyze!
|
||||
result = analysis_run.analyze!
|
||||
result["return_address"] = return_address
|
||||
result['msg_type'] = 'response'
|
||||
result
|
||||
rescue => e
|
||||
puts e
|
||||
ensure
|
||||
@@ -150,3 +74,222 @@ class Pipeline::Rpc::Worker
|
||||
end
|
||||
|
||||
end
|
||||
|
||||
class Pipeline::Rpc::ConfigureAction < Pipeline::Rpc::WorkerAction
|
||||
|
||||
def invoke
|
||||
spec = request["specs"]["analyzer_spec"]
|
||||
credentials = parse_credentials(request)
|
||||
raise "No spec received" if spec.nil?
|
||||
spec.each do |language_slug, versions|
|
||||
puts "Preparing #{language_slug} #{versions}"
|
||||
versions.each do |version|
|
||||
if environment.released?(language_slug, version)
|
||||
puts "Already installed #{language_slug}:#{version}"
|
||||
else
|
||||
puts "Installed #{language_slug}"
|
||||
environment.release_analyzer(language_slug, version, credentials)
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
class NotificationSocketWrapper
|
||||
|
||||
attr_reader :socket
|
||||
|
||||
def initialize(socket)
|
||||
@socket = socket
|
||||
end
|
||||
|
||||
def recv
|
||||
msg = ""
|
||||
@socket.recv_string(msg)
|
||||
|
||||
puts "1 Received request. Data: #{msg.inspect}"
|
||||
request = JSON.parse(msg)
|
||||
action = request["action"]
|
||||
|
||||
puts "HERE #{action}"
|
||||
if action == "configure"
|
||||
a = Pipeline::Rpc::ConfigureAction.new
|
||||
a.request = request
|
||||
a
|
||||
elsif action == "analyze_iteration" || action == "test_solution"
|
||||
a = Pipeline::Rpc::AnalyzeAction.new(request, return_address)
|
||||
a
|
||||
else
|
||||
puts "HERE ELSE: #{request}"
|
||||
end
|
||||
end
|
||||
|
||||
end
|
||||
|
||||
class SocketWrapper
|
||||
|
||||
attr_reader :socket
|
||||
|
||||
def initialize(socket)
|
||||
@socket = socket
|
||||
end
|
||||
|
||||
def recv
|
||||
msg = []
|
||||
@socket.recv_strings(msg)
|
||||
|
||||
puts "1 Received request. Data: #{msg.inspect}"
|
||||
return_address = msg[0].unpack('c*')
|
||||
puts "return_address: #{return_address}"
|
||||
raw_request = msg[2]
|
||||
request = JSON.parse(raw_request)
|
||||
action = request["action"]
|
||||
|
||||
if action == "configure"
|
||||
Pipeline::Rpc::ConfigureAction.new
|
||||
elsif action == "analyze_iteration" || action == "test_solution"
|
||||
a = Pipeline::Rpc::AnalyzeAction.new(request, return_address)
|
||||
a.request = request
|
||||
a
|
||||
else
|
||||
puts "HERE ELSE: #{request}"
|
||||
end
|
||||
end
|
||||
|
||||
end
|
||||
|
||||
|
||||
class Pipeline::Rpc::Worker
|
||||
|
||||
attr_reader :identity, :context, :incoming, :outgoing, :environment
|
||||
|
||||
def initialize(identity, channel_address, env_base)
|
||||
@identity = identity
|
||||
channel_address = URI(channel_address)
|
||||
@control_queue = "#{channel_address.scheme}://#{channel_address.host}:#{channel_address.port}"
|
||||
@channel = channel_address.path[1..]
|
||||
@context = ZMQ::Context.new(1)
|
||||
@incoming = context.socket(ZMQ::PULL)
|
||||
@notifications = context.socket(ZMQ::SUB)
|
||||
@notifications.setsockopt(ZMQ::SUBSCRIBE, "")
|
||||
@environment = Pipeline::Runtime::RuntimeEnvironment.new(env_base)
|
||||
end
|
||||
|
||||
def setup
|
||||
@setup = context.socket(ZMQ::REQ)
|
||||
@setup.setsockopt(ZMQ::LINGER, 0)
|
||||
puts @control_queue
|
||||
@setup.connect(@control_queue)
|
||||
request = {
|
||||
action: "configure_worker",
|
||||
channel: @channel
|
||||
}
|
||||
@setup.send_string(request.to_json)
|
||||
msg = ""
|
||||
@setup.recv_string(msg)
|
||||
msg = JSON.parse(msg)
|
||||
puts "Bootstrap with #{msg}"
|
||||
@setup.close
|
||||
|
||||
environment.prepare
|
||||
|
||||
action = Pipeline::Rpc::ConfigureAction.new
|
||||
action.environment = environment
|
||||
action.request = msg
|
||||
action.invoke
|
||||
|
||||
response_address = msg["channel"]["response_address"]
|
||||
request_address = msg["channel"]["workqueue_address"]
|
||||
notification_address = msg["channel"]["notification_address"]
|
||||
@outgoing = context.socket(ZMQ::PUB)
|
||||
@outgoing.connect(response_address)
|
||||
incoming.connect(request_address)
|
||||
@notifications.connect(notification_address)
|
||||
|
||||
end
|
||||
|
||||
def listen
|
||||
setup
|
||||
|
||||
@incoming_wrapper = SocketWrapper.new(incoming)
|
||||
@noificationincoming_wrapper = NotificationSocketWrapper.new(@notifications)
|
||||
|
||||
@poller = Pipeline::Rpc::ChannelPoller.new
|
||||
@poller.register(@incoming_wrapper)
|
||||
@poller.register(@noificationincoming_wrapper)
|
||||
|
||||
loop do
|
||||
msg = []
|
||||
|
||||
@poller.listen_for_messages do |action_task|
|
||||
# puts "1 Received request. Data: #{msg.inspect}"
|
||||
# return_address = msg[0].unpack('c*')
|
||||
# puts "return_address: #{return_address}"
|
||||
# raw_request = msg[2]
|
||||
# request = JSON.parse(raw_request)
|
||||
# action = request["action"]
|
||||
#
|
||||
# if action == "configure"
|
||||
# action_task = Pipeline::Rpc::ConfigureAction.new
|
||||
# elsif action == "analyze_iteration" || action == "test_solution"
|
||||
# action_task = Pipeline::Rpc::AnalyzeAction.new
|
||||
# else
|
||||
# puts "HERE ELSE: #{request}"
|
||||
# end
|
||||
|
||||
unless action_task.nil?
|
||||
action_task.environment = environment
|
||||
result = action_task.invoke
|
||||
if result && result["return_address"]
|
||||
outgoing.send_string(result.to_json)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
# incoming.recv_strings(msg)
|
||||
# puts "2 Received request. Data: #{msg.inspect}"
|
||||
# return_address = msg[0].unpack('c*')
|
||||
# puts "return_address: #{return_address}"
|
||||
# raw_request = msg[2]
|
||||
# request = JSON.parse(raw_request)
|
||||
# action = request["action"]
|
||||
#
|
||||
# if action == "configure"
|
||||
# action_task = Pipeline::Rpc::ConfigureAction.new
|
||||
# elsif action == "analyze_iteration" || action == "test_solution"
|
||||
# action_task = Pipeline::Rpc::AnalyzeAction.new
|
||||
# else
|
||||
# puts "HERE ELSE: #{request}"
|
||||
# end
|
||||
|
||||
# continue if action_task.nil?
|
||||
#
|
||||
# action_task.environment = environment
|
||||
# action_task.request = request
|
||||
# result = action_task.invoke
|
||||
#
|
||||
# if result && return_address
|
||||
# result["return_address"] = return_address
|
||||
# result['msg_type'] = 'response'
|
||||
# outgoing.send_string(result.to_json)
|
||||
# end
|
||||
|
||||
end
|
||||
end
|
||||
|
||||
def parse_credentials(request)
|
||||
raw_credentials = request["credentials"]
|
||||
key = raw_credentials["access_key_id"]
|
||||
secret = raw_credentials["secret_access_key"]
|
||||
session = raw_credentials["session_token"]
|
||||
Aws::Credentials.new(key, secret, session)
|
||||
end
|
||||
|
||||
def analyze(request)
|
||||
action = Pipeline::Rpc::AnalyzeAction.new
|
||||
action.environment = environment
|
||||
action.request = request
|
||||
action.invoke
|
||||
end
|
||||
|
||||
end
|
||||
|
||||
Reference in New Issue
Block a user