Extra lookups for available containers

This commit is contained in:
Charles Care
2019-11-15 15:49:05 +00:00
parent 5356207752
commit d04671469a
11 changed files with 237 additions and 99 deletions

View File

@@ -16,13 +16,13 @@ loader.setup
module Pipeline
def self.load_config(config_path)
config = YAML.load(File.read(config_path))
Aws.config.update({
credentials: Aws::Credentials.new(config["aws_access_key_id"], config["aws_secret_access_key"])
})
@config = config
end
# def self.load_config(config_path)
# config = YAML.load(File.read(config_path))
# Aws.config.update({
# credentials: Aws::Credentials.new(config["aws_access_key_id"], config["aws_secret_access_key"])
# })
# @config = config
# end
def self.config
@config

View File

@@ -43,14 +43,11 @@ module Pipeline::Cmd
context = ZMQ::Context.new
config = YAML.load(File.read(config_file))
Aws.config.update({
credentials: Aws::Credentials.new(config["aws_access_key_id"], config["aws_secret_access_key"])
})
# puts config
# exit
#
# Pipeline.load_config(config_file)
Pipeline::Rpc::Router.new(context, config)
end
end

View File

@@ -44,11 +44,25 @@ class Pipeline::ContainerRepo
end
def list_images
puts "credentials #{@credentials}"
puts "ECR #{ecr}"
puts create_login_token
ecr.list_images({
repository_name: image_name
})
end
def images_info
images = list_images()
info = {}
images.image_ids.each do |image|
info[image.image_digest] ||= []
info[image.image_digest] << image.image_tag
end
info
end
def git_shas
images = list_images()
tags = []
@@ -65,10 +79,9 @@ class Pipeline::ContainerRepo
def ecr
@ecr ||= begin
Aws::ECR::Client.new(
region: 'eu-west-1',
credentials: @credentials
)
( @credentials.nil? ) ?
Aws::ECR::Client.new(region: 'eu-west-1') :
Aws::ECR::Client.new(region: 'eu-west-1', credentials: @credentials)
end
end

View File

@@ -13,15 +13,48 @@ module Pipeline::Rpc
@raw_address = msg_strings[0]
@raw_msg = msg_strings[2]
@socket = socket
@start = current_timestamp
end
def send_error(err)
reply = [raw_address, "", err.to_json]
@socket.send_strings(reply)
def send_error(err, status_code=999)
msg = {
status: {
ok: false,
status_code: status_code
},
error: err,
failed_request: parsed_msg
}
send_reply(msg)
end
def send_result(result)
reply = [raw_address, "", result.to_json]
def send_result(result, status_code=0)
msg = {
status: {
ok: true,
status_code: status_code
},
result: result,
timing: {
start_time: @start.to_i
}
}
send_reply(msg)
end
def current_timestamp
(Time.now.to_f * 1000)
end
def send_reply(msg)
@end = current_timestamp
@duration_milliseconds = @end - @start
msg[:timing] = {
start_time: @start.to_i,
end_time: @end.to_i,
duration_milliseconds: @duration_milliseconds.to_i
}
reply = [raw_address, "", msg.to_json]
@socket.send_strings(reply)
end

View File

@@ -9,7 +9,6 @@ module Pipeline::Rpc
@notification_port = 5557
@front_end_port = 5555
@zmq_context = zmq_context
@front_end = FrontEndSocket.new(zmq_context, @front_end_port)
@@ -22,27 +21,8 @@ module Pipeline::Rpc
@in_flight_requests = RequestRegister.new
@backend_channels = {}
# @work_channel_ports = {
# static_analyzers: {
# "*" => 5560
# },
# test_runners: {
# "*" => 5561,
# "ruby" => 33001,
# "csharp" => 33002
# },
# representers: {
# "*" => 5562
# }
# }
@work_channel_ports = {}
@container_versions = {}
config["workers"].each do |worker_class, worker_config|
worker_class = worker_class.to_sym
c = @work_channel_ports[worker_class] = {}
cv = @container_versions[worker_class] = {}
backend = @backend_channels[worker_class] = {}
worker_config.each do |k,v|
if k == "shared_queue"
@@ -50,9 +30,7 @@ module Pipeline::Rpc
port = v
else
topic = k
lang_spec = v
port = v["queue"]
cv[k] = v["worker_versions"]
end
bind_address = "tcp://*:#{port}"
work_channel = WorkChannel.new(zmq_context, bind_address)
@@ -60,28 +38,26 @@ module Pipeline::Rpc
end
end
# @work_channel_ports.each do |type, entry|
# backend = @backend_channels[type] = {}
# entry.each do |topic, port|
# bind_address = "tcp://*:#{port}"
# work_channel = WorkChannel.new(zmq_context, bind_address)
# backend[topic] = work_channel
# end
# end
@container_versions = {}
config["workers"].each do |worker_class, worker_config|
worker_class = worker_class.to_sym
cv = @container_versions[worker_class] = {}
worker_config.each do |k,v|
if k != "shared_queue"
lang_spec = v
cv[k] = lang_spec["worker_versions"]
end
end
end
@notification_socket = NotificationSocket.new(zmq_context, @notification_port)
end
def force_worker_restart!
@force_restart_at = Time.now
end
def run
Thread.new do
response_socket.run_heartbeater
end
poller.listen_for_messages do |msg|
case msg
when FrontEndRequest
@@ -92,6 +68,10 @@ module Pipeline::Rpc
end
end
def force_worker_restart!
@force_restart_at = Time.now
end
private
def on_service_response(msg)
@@ -107,7 +87,6 @@ module Pipeline::Rpc
end
end
def on_frontend_request(req)
req.handle do |action|
if action == "configure_worker"
@@ -118,6 +97,23 @@ module Pipeline::Rpc
handle_with_worker(:test_runners, req)
elsif action == "represent"
handle_with_worker(:representers, req)
elsif action == "restart_workers"
force_worker_restart!
req.send_result({ message: "Request accepted" })
elsif action == "restart_router"
force_worker_restart!
req.send_result({ message: "Request accepted" })
elsif action == "current_config"
req.send_result({ container_versions: container_versions })
elsif action == "list_available_containers"
channel = req.parsed_msg["channel"]
track_slug = req.parsed_msg["track_slug"]
c = temp_credentials
puts "C #{c}"
credentials = to_aws_credentials(c)
container_repo = Pipeline::Runtime::RuntimeEnvironment.container_repo(channel, track_slug, nil)
images = container_repo.images_info
req.send_result({ list_images: images })
else
req.send_error({ status: :unrecognised_action })
end
@@ -126,6 +122,13 @@ module Pipeline::Rpc
private
def to_aws_credentials(raw_credentials)
key = raw_credentials["access_key_id"]
secret = raw_credentials["secret_access_key"]
session = raw_credentials["session_token"]
Aws::Credentials.new(key, secret, session)
end
def handle_with_worker(worker_class, req)
channel = @backend_channels[worker_class]
if channel.nil?
@@ -156,32 +159,6 @@ module Pipeline::Rpc
backend.forward_to_backend(req, context)
end
# def container_versions
# @container_versions
# # {
# # static_analyzers: {
# # "ruby" => [
# # "a1f5549b6391443f7a05a038fed8dfebacd3db84",
# # "398007701db580a09f198e806e680f4cdb04b3b4",
# # "dc1c6c4897e63ebeb60ed53ec7423a3f6c33449d"
# # ]
# # },
# # representers: {
# # "ruby" => [
# # "7dad3dd8b43c89d0ac03b5f67700c6aad52d8cf9"
# # ]
# # },
# # test_runners: {
# # "ruby" => [
# # "b6ea39ccb2dd04e0b047b25c691b17d6e6b44cfb"
# # ],
# # "csharp" => [
# # "sha-122a036658c815c2024c604046692adc4c23d5c1"
# # ]
# # }
# # }
# end
def emit_current_spec
m = {
action: "configure",

View File

@@ -8,7 +8,7 @@ module Pipeline::Rpc::Worker
@topic_scopes = topic_scopes
end
def invoke
def invoke
spec = request["specs"][@channel]
puts "Configuing #{@channel} with #{spec}"
credentials = parse_credentials(request)

View File

@@ -46,7 +46,7 @@ module Pipeline::Rpc::Worker
@setup.send_string(request.to_json)
msg = ""
@setup.recv_string(msg)
@bootstrap = JSON.parse(msg)
@bootstrap = JSON.parse(msg)["result"]
puts "Bootstrap with #{JSON.pretty_generate(@bootstrap)}"
@setup.close
end

View File

@@ -25,7 +25,6 @@ module Pipeline::Rpc::Worker
raise DaemonRestartException
end
a = Pipeline::Rpc::Worker::ConfigureAction.new(channel, request, topic_scopes)
a.request = request
a

View File

@@ -1,6 +1,20 @@
module Pipeline::Runtime
class RuntimeEnvironment
def self.container_repo(channel, language_slug, credentials)
container_slug = case channel
when "static_analyzers"
"#{language_slug}-analyzer"
when "test_runners"
"#{language_slug}-test-runner"
when "representers"
"#{language_slug}-representer"
else
raise "Unknown channel: #{channel}"
end
Pipeline::ContainerRepo.instance_for(container_slug, credentials)
end
attr_reader :env_base
def initialize(env_base)
@@ -18,19 +32,7 @@ module Pipeline::Runtime
end
def release(channel, language_slug, version, credentials)
container_slug = case channel
when "static_analyzers"
"#{language_slug}-analyzer"
when "test_runners"
"#{language_slug}-test-runner"
when "representers"
"#{language_slug}-representer"
# when "static_analyzers"
# container_slug = "#{language_slug}-analyzer"
else
raise "Unknown channel: #{channel}"
end
container_repo = Pipeline::ContainerRepo.instance_for(container_slug, credentials)
container_repo = RuntimeEnvironment.container_repo(channel, language_slug, credentials)
release_container(language_slug, version, container_repo)
end

90
lib/pipeline_client.rb Normal file
View File

@@ -0,0 +1,90 @@
require 'json'
require 'ffi-rzmq'
require 'json'
require 'yaml'
require 'securerandom'
class PipelineClient
TIMEOUT_SECS = 20
# ADDRESS = "tcp://analysis-router.exercism.io:5555"
ADDRESS = "tcp://localhost:5555"
def self.run_tests(*args)
instance = new
instance.run_tests(*args)
ensure
instance.close_socket
end
def initialize(address: ADDRESS)
@address = address
@socket = open_socket
end
def restart_workers!
send_recv({
action: :restart_workers
})
end
def run_tests(track_slug, exercise_slug, test_run_id, s3_uri)
params = {
action: :test_solution,
id: test_run_id,
track_slug: track_slug,
exercise_slug: exercise_slug,
s3_uri: s3_uri,
container_version: "b6ea39ccb2dd04e0b047b25c691b17d6e6b44cfb",
# container_version: "sha-122a036658c815c2024c604046692adc4c23d5c1",
}
send_recv(params)
end
private
attr_reader :address, :socket
def send_recv(payload)
# Get a response. Raises if fails
resp = send_msg(payload.to_json, TIMEOUT_SECS)
# Parse the response and return the results hash
parsed = JSON.parse(resp)
puts parsed
raise "failed request" unless parsed["status"]["ok"]
parsed
end
def open_socket
ZMQ::Context.new(1).socket(ZMQ::REQ).tap do |socket|
socket.setsockopt(ZMQ::LINGER, 0)
socket.connect(address)
end
end
def close_socket
socket.close
end
def send_msg(msg, timeout)
timeout_ms = timeout * 1000
socket.setsockopt(ZMQ::RCVTIMEO, timeout_ms)
socket.send_string(msg)
# Get the response back from the runner
recv_result = socket.recv_string(response = "")
# Guard against errors
raise TestRunnerTimeoutError if recv_result < 0
case recv_result
when 20
raise TestRunnerTimeoutError
when 31
raise TestRunnerWorkerUnavailableError
end
# Return the response
response
end
end

View File

@@ -0,0 +1,27 @@
gem "minitest"
require "minitest/autorun"
require "minitest/pride"
require "minitest/mock"
require "mocha/setup"
$LOAD_PATH.unshift File.expand_path("../../lib", __FILE__)
require "pipeline_client"
module Pipeline
class IntegrationTest < Minitest::Test
attr_reader :client
def setup
@client = PipelineClient.new
end
def test_restart_workers
resp = client.restart_workers!
assert_equals true, clietn
end
end
end