Refactor to use new status codes

This commit is contained in:
Charles Care
2019-12-04 21:45:24 +00:00
parent 4cd62a3942
commit b113f44a97
7 changed files with 283 additions and 54 deletions

View File

@@ -1,6 +1,28 @@
module Pipeline::Rpc
class FrontEndRequest
DEFAULT_RESPONSES = {
500 => "Internal platform Error",
501 => "Unrecognised_action",
502 => "Malformed request",
503 => "No worker available",
504 => "Request timed out while waiting for response from worker",
510 => "Worker error",
511 => "Container version is not available",
512 => "Failure in container setup",
513 => "Failuter in container invocation",
514 => "Output missing or malformed",
400 => "Bad input",
401 => "Forced exit. Container ran too long",
402 => "Forced exit. Container used too much IO",
403 => "Forced exit. Container was terminated early.",
200 => "OK"
}
def self.recv(socket)
msg = []
socket.recv_strings(msg)
@@ -14,32 +36,19 @@ module Pipeline::Rpc
@raw_msg = msg_strings[2]
@socket = socket
@start = current_timestamp
@params_to_check = []
end
def send_error(err, status_code=999)
msg = {
status: {
ok: false,
status_code: status_code
},
error: err,
failed_request: parsed_msg
}
send_reply(msg)
def send_error(msg, status_code, detail={})
status_code ||= 500
detail ||= {}
detail[:error] = msg
detail[:failed_request] = parsed_msg
send_reply(status_code, detail)
end
def send_result(result, status_code=0)
msg = {
status: {
ok: true,
status_code: status_code
},
result: result,
timing: {
start_time: @start.to_i
}
}
send_reply(msg)
def send_result(result)
send_reply(200, result)
end
def current_timestamp
@@ -51,32 +60,99 @@ module Pipeline::Rpc
5
end
def send_reply(msg)
@end = current_timestamp
@duration_milliseconds = @end - @start
msg[:timing] = {
start_time: @start.to_i,
end_time: @end.to_i,
duration_milliseconds: @duration_milliseconds.to_i
}
reply = [raw_address, "", msg.to_json]
@socket.send_strings(reply)
end
def handle
begin
@parsed_msg = JSON.parse(raw_msg)
rescue JSON::ParserError => e
req.send_error({ status: :parse_error })
puts e.message
detail = {
incoming: raw_msg
}
send_error("Could not parse message", 502, detail)
return
end
action = @parsed_msg["action"]
if action.nil?
req.send_error({ status: :no_action })
send_error("No action specified", 502)
else
yield(action)
begin
yield(action)
rescue => e
puts e.message
detail = {
message: e.message,
trace: e.backtrace
}
send_error("Unhandled error", 500, detail)
end
end
end
def ensure_param(param_name)
@params_to_check << param_name
end
def params_missing?
! missing_params.empty?
end
def missing_params
@missing_params ||= begin
missing = []
@params_to_check.each do |param|
missing << param unless parsed_msg.include?(param)
end
missing
end
end
def merge_context!(context_to_merge)
context.merge!(context_to_merge)
end
private
def send_reply(status_code, payload)
msg = assemble_response(status_code, payload)
reply = [raw_address, "", msg.to_json]
@socket.send_strings(reply)
end
def assemble_response(status_code, payload)
@end = current_timestamp
@duration_milliseconds = @end - @start
context[:timing] = {
start_time: @start.to_i,
end_time: @end.to_i,
duration_milliseconds: @duration_milliseconds.to_i
}
msg = {
status: status_for(status_code),
context: context
}
if status_code == 200
msg[:response] = payload
else
msg[:context][:error_detail] = payload
msg[:status][:error] = payload[:error] unless payload.nil?
end
msg
end
def context
@context ||= {}
end
def status_for(code)
textual_message = DEFAULT_RESPONSES[code]
if textual_message.nil?
group_code = (code.to_i / 100) * 100
textual_message = DEFAULT_RESPONSES[group_code]
end
{
status_code: code,
message: textual_message
}
end
end
end

View File

@@ -11,7 +11,7 @@ module Pipeline::Rpc
@in_flight[req.raw_address] = {timeout: timeout_at, req: req}
end
def forward_response(msg)
def forward_as_error(msg)
addr = msg.binary_return_address
entry = @in_flight[addr]
if entry.nil?
@@ -19,8 +19,29 @@ module Pipeline::Rpc
else
req = entry[:req]
resp = msg.parsed_msg
resp.delete("msg_type")
resp.delete("return_address")
req.send_result(msg.parsed_msg)
worker_error_code = resp.delete("worker_error_code") || 510
req.send_error("Error from worker", worker_error_code, resp)
unregister(addr)
end
end
def forward_as_response(msg)
addr = msg.binary_return_address
entry = @in_flight[addr]
if entry.nil?
puts "dropping response"
else
req = entry[:req]
resp = msg.parsed_msg
resp.delete("msg_type")
resp.delete("return_address")
container_response = resp.delete("result")
puts "keys: #{resp.keys}"
puts "Here: #{resp}"
req.merge_context!(resp)
req.send_result(container_response)
unregister(addr)
end
end
@@ -33,7 +54,7 @@ module Pipeline::Rpc
timed_out << entry[:req] if expiry < now
end
timed_out.each do |req|
req.send_error({status: :timeout})
req.send_error("Timed out request", 504)
puts "Timing out #{req}"
unregister(req.raw_address)
end

View File

@@ -81,9 +81,9 @@ module Pipeline::Rpc
def on_service_response(msg)
if msg.type == "response"
@in_flight_requests.forward_response(msg)
@in_flight_requests.forward_as_response(msg)
elsif msg.type == "error_response"
@in_flight_requests.forward_response(msg)
@in_flight_requests.forward_as_error(msg)
elsif msg.type == "heartbeat"
@in_flight_requests.flush_expired_requests
emit_current_spec
@@ -97,10 +97,25 @@ module Pipeline::Rpc
if action == "configure_worker"
respond_with_worker_config(req)
elsif action == "analyze_iteration"
# TODO check mandatory args
req.ensure_param("id")
req.ensure_param("track_slug")
req.ensure_param("exercise_slug")
req.ensure_param("s3_uri")
req.ensure_param("container_version")
handle_with_worker(:static_analyzers, req)
elsif action == "test_solution"
req.ensure_param("id")
req.ensure_param("track_slug")
req.ensure_param("exercise_slug")
req.ensure_param("s3_uri")
req.ensure_param("container_version")
handle_with_worker(:test_runners, req)
elsif action == "represent"
# TODO check mandatory args
req.ensure_param("id")
req.ensure_param("track_slug")
req.ensure_param("container_version")
handle_with_worker(:representers, req)
elsif action == "build_container"
handle_with_worker(:builders, req)
@@ -126,7 +141,7 @@ module Pipeline::Rpc
images = container_repo.images_info
req.send_result({ list_images: images })
else
req.send_error({ status: :unrecognised_action })
req.send_error("Action <#{action}> unrecognised", 501)
end
end
end
@@ -139,12 +154,20 @@ module Pipeline::Rpc
end
def handle_with_worker(worker_class, req)
if req.params_missing?
puts "MISSING"
error = {
missing_params: req.missing_params
}
req.send_error("Missing mandatory paraneters", 502, error)
return
end
channel = select_channel(worker_class)
if channel.nil?
req.send_error({ status: :worker_class_unknown })
else
select_backend_and_forward(req, channel)
req.send_error("worker_class <#{worker_class}> unrecognised", 502)
return
end
select_backend_and_forward(req, channel)
end
def select_channel(worker_class)
@@ -162,7 +185,7 @@ module Pipeline::Rpc
if backend && backend.worker_available?
forward(backend, req)
else
req.send_error({ status: :worker_unavailable })
req.send_error("No workers available for <#{track_slug}>", 503)
end
end

View File

@@ -45,7 +45,7 @@ module Pipeline::Rpc::Worker
msg = "Container #{track_slug}:#{container_version} isn't available"
log msg
@error = {
status_code: 404,
worker_error_code: 511,
error: msg
}
end
@@ -53,7 +53,7 @@ module Pipeline::Rpc::Worker
msg = "Failure accessing environment (during container check)"
log msg
@error = {
status_code: 500,
worker_error_code: 512,
error: msg,
detail: e
}
@@ -69,7 +69,7 @@ module Pipeline::Rpc::Worker
msg = "Failure setting up job"
log msg
@error = {
status_code: 500,
worker_error_code: 512,
error: msg,
detail: e
}
@@ -86,7 +86,7 @@ module Pipeline::Rpc::Worker
msg = "Failure preparing input"
log msg
@error = {
status_code: 500,
worker_error_code: 512,
error: msg,
detail: e
}
@@ -101,7 +101,7 @@ module Pipeline::Rpc::Worker
msg = "Error from container"
log msg
@error = {
status_code: 500,
worker_error_code: 513,
error: msg,
detail: e
}

View File

@@ -46,7 +46,15 @@ module Pipeline::Rpc::Worker
@setup.send_string(request.to_json)
msg = ""
@setup.recv_string(msg)
@bootstrap = JSON.parse(msg)["result"]
parsed_msg = JSON.parse(msg)
puts parsed_msg
status_code = parsed_msg["status"]["status_code"]
if status_code != 200
puts "Error when configuring"
puts "Recieved: #{msg}"
raise "Got status #{status_code} when trying to configure"
end
@bootstrap = parsed_msg["response"]
puts "Bootstrap with #{JSON.pretty_generate(@bootstrap)}"
@setup.close
end

View File

@@ -6,7 +6,8 @@ module Pipeline::Runtime
end
def result
File.read("#{iteration_folder}/results.json")
raw_results = File.read("#{iteration_folder}/results.json")
JSON.parse(raw_results)
end
def working_directory

100
scripts/client/scratch.rb Normal file
View File

@@ -0,0 +1,100 @@
require 'ffi-rzmq'
require 'pp'
require 'json'
@context = ZMQ::Context.new(1)
@socket = @context.socket(ZMQ::REQ)
@socket.setsockopt(ZMQ::LINGER, 0)
@socket.connect("tcp://localhost:5555")
def exchange(msg)
@socket.setsockopt(ZMQ::RCVTIMEO, 10000)
@socket.send_string(msg)
return_code = @socket.recv_string(response = "")
# LOCAL error condition 1, no network exchange
puts "network timeout" if return_code == -1
# LOCAL error 2, malformed response
response = JSON.parse(response)
puts "----------------------"
pp response["status"]
puts "----------------------"
puts "----------------------"
pp response["response"]
puts "----------------------"
puts "----------------------"
pp response["context"]
puts "----------------------"
response
end
result = exchange("foo") # - 502
raise "error" unless result["status"]["status_code"] == 502
result = exchange("{}") # - 502
raise "error" unless result["status"]["status_code"] == 502
result = exchange("{ \"action\": \"beep\"}") # - 501
raise "error" unless result["status"]["status_code"] == 501
result = exchange("{ \"action\": \"test_solution\"}") # - 502 (no track slug)
raise "error" unless result["status"]["status_code"] == 502
result = exchange("{ \"action\": \"represent\", \"track_slug\": \"parsnip\"}") # - 502 (no container_version slug)
raise "error" unless result["status"]["status_code"] == 502
result = exchange("{ \"action\": \"represent\", \"track_slug\": \"parsnip\", \"container_version\": \"1234\", \"id\": \"_myid\"}") # - 503 (no worker()
raise "error" unless result["status"]["status_code"] == 503
## 504
msg = {
action: "test_solution",
track_slug: "parsnip",
container_version: "1234",
id: "_myid",
exercise_slug: "xx",
s3_uri: "xx"
}
result = exchange(msg.to_json)
raise "error" unless result["status"]["status_code"] == 511
msg = {
action: "test_solution",
track_slug: "ruby",
container_version: "git-b6ea39ccb2dd04e0b047b25c691b17d6e6b44cfb",
id: "_myid",
exercise_slug: "xx",
s3_uri: "xx"
}
result = exchange(msg.to_json)
raise "error" unless result["status"]["status_code"] == 512
#
# pp result
# exit 1
msg = {
action: "test_solution",
track_slug: "ruby",
container_version: "git-b6ea39ccb2dd04e0b047b25c691b17d6e6b44cfb",
id: "_myid",
exercise_slug: "two-fer",
s3_uri: "s3://exercism-submissions/production/submissions/96"
}
result = exchange(msg.to_json)
raise "error" unless result["status"]["status_code"] == 200
pp result
puts "done"