Move classes to correct source files

This commit is contained in:
Charles Care
2019-10-14 09:45:06 +01:00
parent a3115ee35a
commit c9deab23c7
15 changed files with 422 additions and 404 deletions

View File

@@ -0,0 +1,33 @@
module Pipeline::Rpc
class ChannelPoller
def initialize
@poller = ZMQ::Poller.new
@socket_wrappers = {}
end
def register(socket_wrapper)
socket = socket_wrapper.socket
@poller.register(socket, ZMQ::POLLIN)
@socket_wrappers[socket] = socket_wrapper
end
def listen_for_messages
loop do
poll_result = @poller.poll
break if poll_result == -1
readables = @poller.readables
continue if readables.empty?
readables.each do |readable|
socket_wrapper = @socket_wrappers[readable]
unless socket_wrapper.nil?
msg = socket_wrapper.recv
yield(msg)
end
end
end
end
end
end

View File

@@ -0,0 +1,44 @@
module Pipeline::Rpc
class FrontEndRequest
def self.recv(socket)
msg = []
socket.recv_strings(msg)
self.new(msg, socket)
end
attr_reader :raw_address, :raw_msg, :parsed_msg
def initialize(msg_strings, socket)
@raw_address = msg_strings[0]
@raw_msg = msg_strings[2]
@socket = socket
end
def send_error(err)
reply = [raw_address, "", err.to_json]
@socket.send_strings(reply)
end
def send_result(result)
reply = [raw_address, "", result.to_json]
@socket.send_strings(reply)
end
def handle
begin
@parsed_msg = JSON.parse(raw_msg)
rescue JSON::ParserError => e
req.send_error({ status: :parse_error })
return
end
action = @parsed_msg["action"]
if action.nil?
req.send_error({ status: :no_action })
else
yield(action)
end
end
end
end

View File

@@ -0,0 +1,18 @@
module Pipeline::Rpc
class FrontEndSocket
attr_reader :socket
def initialize(zmq_context, front_end_port)
@socket = zmq_context.socket(ZMQ::ROUTER)
@socket.bind("tcp://*:#{front_end_port}")
end
def recv
msg = []
@socket.recv_strings(msg)
FrontEndRequest.new(msg, @socket)
end
end
end

View File

@@ -0,0 +1,18 @@
module Pipeline::Rpc
class NotificationSocket
attr_reader :socket
def initialize(zmq_context, port)
@zmq_context = zmq_context
@port = port
@socket = zmq_context.socket(ZMQ::PUB)
@socket.bind("tcp://*:#{@port}")
end
def emit_configuration(configuration)
@socket.send_string(configuration.to_json)
end
end
end

View File

@@ -0,0 +1,44 @@
module Pipeline::Rpc
class RequestRegister
def initialize
@in_flight = {}
end
def register(req)
timeout_at = Time.now.to_i + 5
@in_flight[req.raw_address] = {timeout: timeout_at, req: req}
end
def forward_response(msg)
addr = msg.binary_return_address
entry = @in_flight[addr]
if entry.nil?
puts "dropping response"
else
req = entry[:req]
req.send_result(msg.parsed_msg)
unregister(addr)
end
end
def flush_expired_requests
timed_out = []
now = Time.now.to_i
@in_flight.each do |addr, entry|
expiry = entry[:timeout]
timed_out << entry[:req] if expiry < now
end
timed_out.each do |req|
req.send_error({status: :timeout})
puts "Timing out #{req}"
unregister(req.raw_address)
end
end
def unregister(addr)
@in_flight.delete(addr)
end
end
end

View File

@@ -0,0 +1,33 @@
module Pipeline::Rpc
class ResponseSocket
attr_reader :socket
def initialize(zmq_context, response_port)
@zmq_context = zmq_context
@response_port = response_port
@socket = zmq_context.socket(ZMQ::SUB)
@socket.setsockopt(ZMQ::SUBSCRIBE, "")
@socket.bind("tcp://*:#{@response_port}")
end
def recv
msg = ""
@socket.recv_string(msg)
ServiceResponse.new(msg, @socket)
end
def run_heartbeater
puts "STARTING heartbeat_socket"
heartbeat_socket = @zmq_context.socket(ZMQ::PUB)
heartbeat_socket.connect("tcp://127.0.0.1:#{@response_port}")
sleep 2
loop do
heartbeat_socket.send_string({ msg_type: "heartbeat" }.to_json)
puts "ping heartbeat"
sleep 10
end
end
end
end

View File

@@ -1,248 +1,5 @@
module Pipeline::Rpc
class WorkChannel
def initialize(zmq_context, queue_address)
@socket = zmq_context.socket(ZMQ::PUSH)
@socket.setsockopt(ZMQ::SNDHWM, 1)
@socket.bind(queue_address)
@poller = ZMQ::Poller.new
@poller.register(@socket, ZMQ::POLLOUT)
end
def worker_available?
poll_result = @poller.poll(500)
poll_result != -1 && @poller.writables.size > 0
end
def forward_to_backend(req, context=nil)
m = req.parsed_msg.clone
m[:context] = context unless context.nil?
upstream_msg = [req.raw_address, "", m.to_json]
@socket.send_strings(upstream_msg, ZMQ::DONTWAIT)
end
end
class RequestRegister
def initialize
@in_flight = {}
end
def register(req)
timeout_at = Time.now.to_i + 5
@in_flight[req.raw_address] = {timeout: timeout_at, req: req}
end
def forward_response(msg)
addr = msg.binary_return_address
entry = @in_flight[addr]
if entry.nil?
puts "dropping response"
else
req = entry[:req]
req.send_result(msg.parsed_msg)
unregister(addr)
end
end
def flush_expired_requests
timed_out = []
now = Time.now.to_i
@in_flight.each do |addr, entry|
expiry = entry[:timeout]
timed_out << entry[:req] if expiry < now
end
timed_out.each do |req|
req.send_error({status: :timeout})
puts "Timing out #{req}"
unregister(req.raw_address)
end
end
def unregister(addr)
@in_flight.delete(addr)
end
end
class ServiceResponse
def self.recv(socket)
msg = ""
socket.recv_string(msg)
self.new(msg, socket)
end
attr_reader :parsed_msg
def initialize(raw_msg, socket)
@raw_msg = raw_msg
@socket = socket
@parsed_msg = JSON.parse(raw_msg)
end
def type
@parsed_msg["msg_type"]
end
def return_address
@parsed_msg["return_address"]
end
def binary_return_address
return_address.pack("c*")
end
def raw_msg
@raw_msg
end
end
class FrontEndRequest
def self.recv(socket)
msg = []
socket.recv_strings(msg)
self.new(msg, socket)
end
attr_reader :raw_address, :raw_msg, :parsed_msg
def initialize(msg_strings, socket)
@raw_address = msg_strings[0]
@raw_msg = msg_strings[2]
@socket = socket
end
def send_error(err)
reply = [raw_address, "", err.to_json]
@socket.send_strings(reply)
end
def send_result(result)
reply = [raw_address, "", result.to_json]
@socket.send_strings(reply)
end
def handle
begin
@parsed_msg = JSON.parse(raw_msg)
rescue JSON::ParserError => e
req.send_error({ status: :parse_error })
return
end
action = @parsed_msg["action"]
if action.nil?
req.send_error({ status: :no_action })
else
yield(action)
end
end
end
class FrontEndSocket
attr_reader :socket
def initialize(zmq_context, front_end_port)
@socket = zmq_context.socket(ZMQ::ROUTER)
@socket.bind("tcp://*:#{front_end_port}")
end
def recv
msg = []
@socket.recv_strings(msg)
FrontEndRequest.new(msg, @socket)
end
end
class ResponseSocket
attr_reader :socket
def initialize(zmq_context, response_port)
@zmq_context = zmq_context
@response_port = response_port
@socket = zmq_context.socket(ZMQ::SUB)
@socket.setsockopt(ZMQ::SUBSCRIBE, "")
@socket.bind("tcp://*:#{@response_port}")
end
def recv
msg = ""
@socket.recv_string(msg)
ServiceResponse.new(msg, @socket)
end
def run_heartbeater
puts "STARTING heartbeat_socket"
heartbeat_socket = @zmq_context.socket(ZMQ::PUB)
heartbeat_socket.connect("tcp://127.0.0.1:#{@response_port}")
sleep 2
loop do
heartbeat_socket.send_string({ msg_type: "heartbeat" }.to_json)
puts "ping heartbeat"
sleep 10
end
end
end
class NotificationSocket
attr_reader :socket
def initialize(zmq_context, port)
@zmq_context = zmq_context
@port = port
@socket = zmq_context.socket(ZMQ::PUB)
@socket.bind("tcp://*:#{@port}")
end
def emit_configuration(configuration)
@socket.send_string(configuration.to_json)
end
end
class ChannelPoller
def initialize
@poller = ZMQ::Poller.new
@socket_wrappers = {}
end
def register(socket_wrapper)
socket = socket_wrapper.socket
@poller.register(socket, ZMQ::POLLIN)
@socket_wrappers[socket] = socket_wrapper
end
def listen_for_messages
loop do
poll_result = @poller.poll
break if poll_result == -1
readables = @poller.readables
continue if readables.empty?
readables.each do |readable|
socket_wrapper = @socket_wrappers[readable]
unless socket_wrapper.nil?
msg = socket_wrapper.recv
yield(msg)
end
end
end
end
end
class Router
attr_reader :zmq_context, :poller, :response_socket, :notification_socket

View File

@@ -0,0 +1,35 @@
module Pipeline::Rpc
class ServiceResponse
def self.recv(socket)
msg = ""
socket.recv_string(msg)
self.new(msg, socket)
end
attr_reader :parsed_msg
def initialize(raw_msg, socket)
@raw_msg = raw_msg
@socket = socket
@parsed_msg = JSON.parse(raw_msg)
end
def type
@parsed_msg["msg_type"]
end
def return_address
@parsed_msg["return_address"]
end
def binary_return_address
return_address.pack("c*")
end
def raw_msg
@raw_msg
end
end
end

View File

@@ -0,0 +1,26 @@
module Pipeline::Rpc
class WorkChannel
def initialize(zmq_context, queue_address)
@socket = zmq_context.socket(ZMQ::PUSH)
@socket.setsockopt(ZMQ::SNDHWM, 1)
@socket.bind(queue_address)
@poller = ZMQ::Poller.new
@poller.register(@socket, ZMQ::POLLOUT)
end
def worker_available?
poll_result = @poller.poll(500)
poll_result != -1 && @poller.writables.size > 0
end
def forward_to_backend(req, context=nil)
m = req.parsed_msg.clone
m[:context] = context unless context.nil?
upstream_msg = [req.raw_address, "", m.to_json]
@socket.send_strings(upstream_msg, ZMQ::DONTWAIT)
end
end
end

View File

@@ -1,164 +1,3 @@
class Pipeline::Rpc::WorkerAction
attr_accessor :environment, :request
def invoke
end
def parse_credentials(request)
raw_credentials = request["credentials"]
key = raw_credentials["access_key_id"]
secret = raw_credentials["secret_access_key"]
session = raw_credentials["session_token"]
Aws::Credentials.new(key, secret, session)
end
end
class Pipeline::Rpc::AnalyzeAction < Pipeline::Rpc::WorkerAction
attr_reader :reader, :return_address
def initialize(request, return_address)
@request = request
@return_address = return_address
end
def invoke
s3 = Aws::S3::Client.new(
credentials: parse_credentials(request["context"]),
region: "eu-west-1")
language_slug = request["track_slug"]
exercise_slug = request["exercise_slug"]
solution_slug = request["solution_slug"]
location = request["iteration_folder"]
container_version = request["container_version"]
unless environment.released?(language_slug, container_version)
return {
error: "Container #{language_slug}:#{container_version} isn't available"
}
end
analysis_run = environment.new_invocation(language_slug, container_version, exercise_slug, solution_slug)
analysis_run.prepare_iteration do |iteration_folder|
location_uri = URI(location)
bucket = location_uri.host
path = location_uri.path[1..]
params = {
bucket: bucket,
prefix: "#{path}/",
}
resp = s3.list_objects(params)
resp.contents.each do |item|
key = item[:key]
filename = File.basename(key)
s3.get_object({
bucket: bucket,
key: key,
response_target: "#{iteration_folder}/#{filename}"
})
end
end
begin
result = analysis_run.analyze!
result["return_address"] = return_address
result['msg_type'] = 'response'
result
rescue => e
puts e
ensure
puts "DONE"
end
end
end
class Pipeline::Rpc::ConfigureAction < Pipeline::Rpc::WorkerAction
def invoke
spec = request["specs"]["analyzer_spec"]
credentials = parse_credentials(request)
raise "No spec received" if spec.nil?
spec.each do |language_slug, versions|
puts "Preparing #{language_slug} #{versions}"
versions.each do |version|
if environment.released?(language_slug, version)
puts "Already installed #{language_slug}:#{version}"
else
puts "Installed #{language_slug}"
environment.release_analyzer(language_slug, version, credentials)
end
end
end
end
end
class NotificationSocketWrapper
attr_reader :socket
def initialize(socket)
@socket = socket
end
def recv
msg = ""
@socket.recv_string(msg)
puts "1 Received request. Data: #{msg.inspect}"
request = JSON.parse(msg)
action = request["action"]
puts "HERE #{action}"
if action == "configure"
a = Pipeline::Rpc::ConfigureAction.new
a.request = request
a
elsif action == "analyze_iteration" || action == "test_solution"
a = Pipeline::Rpc::AnalyzeAction.new(request, return_address)
a
else
puts "HERE ELSE: #{request}"
end
end
end
class SocketWrapper
attr_reader :socket
def initialize(socket)
@socket = socket
end
def recv
msg = []
@socket.recv_strings(msg)
puts "1 Received request. Data: #{msg.inspect}"
return_address = msg[0].unpack('c*')
puts "return_address: #{return_address}"
raw_request = msg[2]
request = JSON.parse(raw_request)
action = request["action"]
if action == "configure"
Pipeline::Rpc::ConfigureAction.new
elsif action == "analyze_iteration" || action == "test_solution"
a = Pipeline::Rpc::AnalyzeAction.new(request, return_address)
a.request = request
a
else
puts "HERE ELSE: #{request}"
end
end
end
class Pipeline::Rpc::Worker
attr_reader :identity, :context, :incoming, :outgoing, :environment

View File

@@ -0,0 +1,62 @@
module Pipeline::Rpc::Worker
class AnalyzeAction < WorkerAction
attr_reader :reader, :return_address
def initialize(request, return_address)
@request = request
@return_address = return_address
end
def invoke
s3 = Aws::S3::Client.new(
credentials: parse_credentials(request["context"]),
region: "eu-west-1")
language_slug = request["track_slug"]
exercise_slug = request["exercise_slug"]
solution_slug = request["solution_slug"]
location = request["iteration_folder"]
container_version = request["container_version"]
unless environment.released?(language_slug, container_version)
return {
error: "Container #{language_slug}:#{container_version} isn't available"
}
end
analysis_run = environment.new_invocation(language_slug, container_version, exercise_slug, solution_slug)
analysis_run.prepare_iteration do |iteration_folder|
location_uri = URI(location)
bucket = location_uri.host
path = location_uri.path[1..]
params = {
bucket: bucket,
prefix: "#{path}/",
}
resp = s3.list_objects(params)
resp.contents.each do |item|
key = item[:key]
filename = File.basename(key)
s3.get_object({
bucket: bucket,
key: key,
response_target: "#{iteration_folder}/#{filename}"
})
end
end
begin
result = analysis_run.analyze!
result["return_address"] = return_address
result['msg_type'] = 'response'
result
rescue => e
puts e
ensure
puts "DONE"
end
end
end
end

View File

@@ -0,0 +1,23 @@
module Pipeline::Rpc::Worker
class ConfigureAction < WorkerAction
def invoke
spec = request["specs"]["analyzer_spec"]
credentials = parse_credentials(request)
raise "No spec received" if spec.nil?
spec.each do |language_slug, versions|
puts "Preparing #{language_slug} #{versions}"
versions.each do |version|
if environment.released?(language_slug, version)
puts "Already installed #{language_slug}:#{version}"
else
puts "Installed #{language_slug}"
environment.release_analyzer(language_slug, version, credentials)
end
end
end
end
end
end

View File

@@ -0,0 +1,33 @@
module Pipeline::Rpc::Worker
class NotificationSocketWrapper
attr_reader :socket
def initialize(socket)
@socket = socket
end
def recv
msg = ""
@socket.recv_string(msg)
puts "1 Received request. Data: #{msg.inspect}"
request = JSON.parse(msg)
action = request["action"]
puts "HERE #{action}"
if action == "configure"
a = Pipeline::Rpc::ConfigureAction.new
a.request = request
a
elsif action == "analyze_iteration" || action == "test_solution"
a = Pipeline::Rpc::AnalyzeAction.new(request, return_address)
a
else
puts "HERE ELSE: #{request}"
end
end
end
end

View File

@@ -0,0 +1,33 @@
module Pipeline::Rpc::Worker
class WorkSocketWrapper
attr_reader :socket
def initialize(socket)
@socket = socket
end
def recv
msg = []
@socket.recv_strings(msg)
puts "1 Received request. Data: #{msg.inspect}"
return_address = msg[0].unpack('c*')
puts "return_address: #{return_address}"
raw_request = msg[2]
request = JSON.parse(raw_request)
action = request["action"]
if action == "configure"
Pipeline::Rpc::ConfigureAction.new
elsif action == "analyze_iteration" || action == "test_solution"
a = Pipeline::Rpc::AnalyzeAction.new(request, return_address)
a.request = request
a
else
puts "HERE ELSE: #{request}"
end
end
end
end

View File

@@ -0,0 +1,20 @@
module Pipeline::Rpc::Worker
class WorkerAction
attr_accessor :environment, :request
def invoke
end
def parse_credentials(request)
raw_credentials = request["credentials"]
key = raw_credentials["access_key_id"]
secret = raw_credentials["secret_access_key"]
session = raw_credentials["session_token"]
Aws::Credentials.new(key, secret, session)
end
end
end