Extracted runtime command with improved options

This commit is contained in:
Charles Care
2019-11-12 12:31:00 +00:00
parent 6e0343adb7
commit c47908c2d4
11 changed files with 205 additions and 49 deletions

View File

@@ -11,6 +11,7 @@ gem "simplecov", require: false, group: :test
gem "ffi-rzmq"
gem "zeitwerk"
gem "docopt"
group :development, :test do
# gem "bundler"

View File

@@ -33,6 +33,7 @@ GEM
aws-eventstream (~> 1.0, >= 1.0.2)
concurrent-ruby (1.1.5)
docile (1.3.2)
docopt (0.6.1)
ffi (1.11.1)
ffi-rzmq (2.0.7)
ffi-rzmq-core (>= 1.0.7)
@@ -69,6 +70,7 @@ DEPENDENCIES
activesupport
aws-sdk-ecr
aws-sdk-s3
docopt
ffi-rzmq
mandate
minitest

View File

@@ -3,9 +3,4 @@ require "bundler/setup"
$LOAD_PATH.unshift File.expand_path("../../lib", __FILE__)
require "pipeline"
Pipeline.load_config(File.expand_path('../../config/pipeline.yml', __FILE__))
context = ZMQ::Context.new
router = Pipeline::Rpc::Router.new(context)
router.run
Pipeline::Cmd::RouterDaemon.(ARGV)

View File

@@ -4,9 +4,4 @@ $LOAD_PATH.unshift File.expand_path("../../lib", __FILE__)
require "pipeline"
worker_identity = ARGV[0]
channel_address = ARGV[1]
env_base = ARGV[2]
daemon = Pipeline::Rpc::Worker::Daemon.new(worker_identity, channel_address, env_base)
daemon.listen
Pipeline::Cmd::WorkerDaemon.(ARGV)

View File

@@ -0,0 +1,50 @@
require "docopt"
module Pipeline::Cmd
class RouterDaemon
SPEC = <<-DOCOPT
Exercism router.
Usage:
#{__FILE__} <configuration_file> [--force-worker-restart]
#{__FILE__} -h | --help
#{__FILE__} --version
DOCOPT
include Mandate
initialize_with :argv
def call
puts "*** Exercism Router ***"
router.force_worker_restart! if restart_workers?
router.run
end
def options
@options ||= begin
Docopt::docopt(SPEC, argv: argv)
rescue Docopt::Exit => e
puts e.message
exit 1
end
end
def restart_workers?
options["--force-worker-restart"] == true
end
def router
@router ||= begin
config_file = options["<configuration_file>"]
context = ZMQ::Context.new
Pipeline.load_config(config_file)
Pipeline::Rpc::Router.new(context)
end
end
end
end

View File

@@ -0,0 +1,65 @@
require "docopt"
module Pipeline::Cmd
class WorkerDaemon
SPEC = <<-DOCOPT
Exercism worker.
Usage:
#{__FILE__} (listen|configure) <identity> <channel_address> <work_folder> [--dryrun]
#{__FILE__} -h | --help
#{__FILE__} --version
DOCOPT
include Mandate
initialize_with :argv
def call
puts "*** Exercism Worker ***"
begin
daemon.bootstrap
exit 0 if dryrun?
daemon.configure
exit 0 if configure?
daemon.listen
rescue Pipeline::Rpc::Worker::DaemonRestartException
puts "Restarting Daemon"
retry
end
end
def options
@options ||= begin
Docopt::docopt(SPEC, argv: argv)
rescue Docopt::Exit => e
puts e.message
exit 1
end
end
def dryrun?
options["--dryrun"] == true
end
def configure?
options["configure"] == true
end
def daemon
@daemon ||= begin
worker_identity = options["<identity>"]
channel_address = options["<channel_address>"]
env_base = options["<work_folder>"]
Pipeline::Rpc::Worker::Daemon.new(worker_identity, channel_address, env_base)
end
end
end
end

View File

@@ -10,8 +10,10 @@ module Pipeline::Rpc
@socket.bind("tcp://*:#{@port}")
end
def emit_configuration(configuration)
@socket.send_string(configuration.to_json)
def emit(msg)
raw = msg.to_json
puts "SENDING #{raw}"
@socket.send_string(raw)
end
end

View File

@@ -29,7 +29,7 @@ module Pipeline::Rpc
test_runners: {
"*" => 5561,
"ruby" => 33001,
"csharp" => 33002,
"csharp" => 33002
},
representers: {
"*" => 5562
@@ -48,11 +48,16 @@ module Pipeline::Rpc
@notification_socket = NotificationSocket.new(zmq_context, @notification_port)
end
def force_worker_restart!
@force_restart_at = Time.now
end
def run
Thread.new do
response_socket.run_heartbeater
end
poller.listen_for_messages do |msg|
case msg
when FrontEndRequest
@@ -157,8 +162,9 @@ module Pipeline::Rpc
action: "configure",
specs: container_versions
}
m[:force_restart_at] = @force_restart_at.to_i if @force_restart_at
set_temp_credentials(m)
notification_socket.emit_configuration(m)
notification_socket.emit(m)
end
def respond_with_worker_config(req)
@@ -180,14 +186,10 @@ module Pipeline::Rpc
channel_entry = @backend_channels[channel]
puts channel_entry.keys
topics.each do |topic|
unless channel_entry.has_key?(topic)
puts "RESET #{topic}"
topic = "*"
end
next unless channel_entry.has_key?(topic)
port = channel_entry[topic].port
workqueue_addresses << "tcp://#{@public_hostname}:#{port}"
end
workqueue_addresses.uniq!
analyzer_spec[:channel] = {
channel: channel,

View File

@@ -2,27 +2,44 @@ module Pipeline::Rpc::Worker
class ConfigureAction < WorkerAction
def initialize(channel, request)
def initialize(channel, request, topic_scopes)
@channel = channel
@request = request
@topic_scopes = topic_scopes
end
def invoke
def invoke
spec = request["specs"][@channel]
puts "Configuing #{@channel} with #{spec}"
credentials = parse_credentials(request)
raise "No spec received" if spec.nil?
spec.each do |language_slug, versions|
puts "Preparing #{language_slug} #{versions}"
versions.each do |version|
if environment.released?(language_slug, version)
puts "Already installed #{language_slug}:#{version}"
else
puts "Installed #{language_slug}"
environment.release(@channel, language_slug, version, credentials)
if should_install?(language_slug)
puts "Preparing #{language_slug} #{versions}"
versions.each do |version|
configure(language_slug, version, credentials)
end
else
puts "Skipping configuration of #{language_slug}"
end
end
end
private
def should_install?(language_slug)
@topic_scopes.include?("*") || @topic_scopes.include?(language_slug)
end
def configure(language_slug, version, credentials)
if environment.released?(language_slug, version)
puts "Already installed #{language_slug}:#{version}"
else
puts "Installed #{language_slug}"
environment.release(@channel, language_slug, version, credentials)
end
end
end
end

View File

@@ -1,5 +1,8 @@
module Pipeline::Rpc::Worker
class DaemonRestartException < StandardError
end
class Daemon
attr_reader :identity, :context, :incoming, :outgoing, :environment
@@ -18,6 +21,7 @@ module Pipeline::Rpc::Worker
query = CGI::parse(channel_address.query)
@topics = query["topic"] if query["topic"]
end
@topics = ["*"] if @topics.nil? || @topics.empty?
@context = ZMQ::Context.new(1)
@incoming = context.socket(ZMQ::PULL)
@@ -26,7 +30,14 @@ module Pipeline::Rpc::Worker
@environment = Pipeline::Runtime::RuntimeEnvironment.new(env_base)
end
def setup
def bootstrap_and_listen
bootstrap
configure
connect
poll_messages
end
def bootstrap
@setup = context.socket(ZMQ::REQ)
@setup.setsockopt(ZMQ::LINGER, 0)
puts @control_queue
@@ -39,41 +50,48 @@ module Pipeline::Rpc::Worker
@setup.send_string(request.to_json)
msg = ""
@setup.recv_string(msg)
msg = JSON.parse(msg)
puts "Bootstrap with #{msg}"
@bootstrap = JSON.parse(msg)
puts "Bootstrap with #{JSON.pretty_generate(@bootstrap)}"
@setup.close
end
def configure
environment.prepare
action = Pipeline::Rpc::Worker::ConfigureAction.new(@channel, msg)
action = Pipeline::Rpc::Worker::ConfigureAction.new(@channel, @bootstrap, @topics)
action.environment = environment
action.invoke
end
response_address = msg["channel"]["response_address"]
workqueue_addresses = msg["channel"]["workqueue_addresses"]
notification_address = msg["channel"]["notification_address"]
def listen
connect
poll_messages
end
private
def connect
channel_defn = @bootstrap["channel"]
response_address = channel_defn["response_address"]
workqueue_addresses =channel_defn["workqueue_addresses"]
notification_address = channel_defn["notification_address"]
@outgoing = context.socket(ZMQ::PUB)
@outgoing.connect(response_address)
workqueue_addresses.each do |workqueue_address|
incoming.connect(workqueue_address)
end
@notifications.connect(notification_address)
end
def listen
setup
@incoming_wrapper = Pipeline::Rpc::Worker::WorkSocketWrapper.new(incoming)
@noificationincoming_wrapper = Pipeline::Rpc::Worker::NotificationSocketWrapper.new(@notifications, @channel)
@noificationincoming_wrapper = Pipeline::Rpc::Worker::NotificationSocketWrapper.new(@notifications, @channel, @topics)
@poller = Pipeline::Rpc::ChannelPoller.new
@poller.register(@incoming_wrapper)
@poller.register(@noificationincoming_wrapper)
end
def poll_messages
loop do
msg = []
@poller.listen_for_messages do |action_task|
unless action_task.nil?
action_task.environment = environment
@@ -84,7 +102,6 @@ module Pipeline::Rpc::Worker
end
end
end
end
end

View File

@@ -1,11 +1,13 @@
module Pipeline::Rpc::Worker
class NotificationSocketWrapper
attr_reader :socket, :channel
attr_reader :socket, :channel, :topic_scopes
def initialize(socket, channel)
def initialize(socket, channel, topic_scopes)
@socket = socket
@channel = channel
@topic_scopes = topic_scopes
@start_at = Time.now.to_i
end
def recv
@@ -17,11 +19,19 @@ module Pipeline::Rpc::Worker
action = request["action"]
if action == "configure"
a = Pipeline::Rpc::Worker::ConfigureAction.new(channel, request)
force_restart_at = request["force_restart_at"]
if force_restart_at && force_restart_at > @start_at
raise DaemonRestartException
end
a = Pipeline::Rpc::Worker::ConfigureAction.new(channel, request, topic_scopes)
a.request = request
a
else
puts "HERE ELSE: #{request}"
exit 1
end
end