feat: init project

This commit is contained in:
fan-tastic-z
2025-08-15 16:04:44 +08:00
parent a927033752
commit 313e013461
24 changed files with 4891 additions and 0 deletions

34
.gitignore vendored Normal file
View File

@@ -0,0 +1,34 @@
/target
# By Default, Ignore any .*, except .gitignore
.*
!.gitignore
!.gitattributes
!.rustfmt.toml
!.github
!.pre-commit-config.yaml
# -- Rust
# For .cargo/config.toml
!.cargo/
target/
# -- Safety net
dist/
__pycache__/
node_modules/
npm-debug.log
report.*.json
*.parquet
*.map
*.zip
*.gz
*.tar
*.tgz
# logs
logs/

3679
Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

37
Cargo.toml Normal file
View File

@@ -0,0 +1,37 @@
[package]
name = "vulnfeed"
version = "0.1.0"
edition = "2024"
[[bin]]
name = "vulnfeed"
path = "src/bin/vulnfeed/main.rs"
[dependencies]
anstyle = "1.0.11"
clap = { version = "4.5.45", features = ["derive"] }
const_format = "0.2.34"
ctrlc = "3.4.7"
error-stack = "0.5.0"
fastimer = "0.9.0"
gix-discover = "0.41.0"
jiff = { version = "0.2.15", features = ["serde"] }
local-ip-address = "0.6.5"
log = "0.4.27"
logforth = { version = "0.26.2", features = ["colored", "layout-json", "append-fastrace", "append-rolling-file", "diagnostic-fastrace"] }
mea = "0.4.0"
pin-project = "1.1.10"
poem = "3.1.12"
serde = { version = "1.0.219", features = ["derive"] }
serde_json = "1.0.142"
shadow-rs = "1.2.1"
sqlx = { version = "0.8.6", features = ["postgres", "runtime-tokio", "macros", "uuid", "migrate", "chrono"] }
thiserror = "2.0.14"
tokio = { version = "1.47.1", features = ["rt-multi-thread", "macros"] }
toml_edit = { version = "0.23.3", features = ["serde"] }
uuid = { version = "1.18.0", features = ["v4", "serde"] }
[build-dependencies]
build-data = "0.3.3"
gix-discover = "0.41.0"
shadow-rs = "1.2.1"

59
build.rs Normal file
View File

@@ -0,0 +1,59 @@
use std::{collections::BTreeSet, env, path::Path};
use build_data::{format_timestamp, get_source_time};
use shadow_rs::{CARGO_METADATA, CARGO_TREE, ShadowBuilder};
fn configure_rerun_if_head_commit_changed() {
let mut current = Path::new(env!("CARGO_MANIFEST_DIR")).to_path_buf();
// skip if no valid-looking git repository could be found
while let Ok((dir, _)) = gix_discover::upwards(current.as_path()) {
match dir {
gix_discover::repository::Path::Repository(git_dir) => {
unreachable!(
"build.rs should never be placed in a git bare repository: {}",
git_dir.display()
);
}
gix_discover::repository::Path::WorkTree(work_dir) => {
let git_refs_heads = work_dir.join(".git/refs/heads");
println!("cargo::rerun-if-changed={}", git_refs_heads.display());
break;
}
gix_discover::repository::Path::LinkedWorkTree { work_dir, .. } => {
current = work_dir
.parent()
.expect("submodule's work_dir must have parent")
.to_path_buf();
continue;
}
};
}
}
fn main() -> shadow_rs::SdResult<()> {
configure_rerun_if_head_commit_changed();
println!(
"cargo::rustc-env=SOURCE_TIMESTAMP={}",
if let Ok(t) = get_source_time() {
format_timestamp(t)?
} else {
"".to_string()
}
);
build_data::set_BUILD_TIMESTAMP();
// The "CARGO_WORKSPACE_DIR" is set manually (not by Rust itself) in Cargo config file, to
// solve the problem where the "CARGO_MANIFEST_DIR" is not what we want when this repo is
// made as a submodule in another repo.
let src_path = env::var("CARGO_WORKSPACE_DIR").or_else(|_| env::var("CARGO_MANIFEST_DIR"))?;
let out_path = env::var("OUT_DIR")?;
let _ = ShadowBuilder::builder()
.src_path(src_path)
.out_path(out_path)
// exclude these two large constants that we don't need
.deny_const(BTreeSet::from([CARGO_METADATA, CARGO_TREE]))
.build()?;
Ok(())
}

18
dev/config.toml Normal file
View File

@@ -0,0 +1,18 @@
[server]
listen_addr = "0.0.0.0:9000"
advertise_addr = "127.0.0.1:9000"
[database]
host = "127.0.0.1"
port = 5432
username = "app"
password = "YSm*wF60c72CLJD!"
database_name = "vulnfeed"
[telemetry.logs.file]
filter = "INFO"
dir = "logs"
max_files = 64
[telemetry.logs.stderr]
filter = "INFO"

View File

@@ -0,0 +1,38 @@
-- Add migration script here
CREATE TABLE vuln_information (
id TEXT NOT NULL UNIQUE PRIMARY KEY,
title TEXT NOT NULL DEFAULT '',
description TEXT NOT NULL DEFAULT '',
severity TEXT NOT NULL DEFAULT '',
cve TEXT NOT NULL DEFAULT '',
disclosure TEXT NOT NULL DEFAULT '',
solutions TEXT NOT NULL DEFAULT '',
reference_links TEXT [] DEFAULT '{}',
tags TEXT [] DEFAULT '{}',
github_search TEXT [] DEFAULT '{}',
source TEXT NOT NULL DEFAULT '',
pushed BOOLEAN DEFAULT FALSE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
CREATE TABLE sync_task (
id TEXT NOT NULL UNIQUE PRIMARY KEY,
name TEXT NOT NULL DEFAULT '',
minute INTEGER NOT NULL DEFAULT 15, -- 默认每15分钟执行一次
status BOOLEAN DEFAULT FALSE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
CREATE TABLE sync_task_record (
id TEXT NOT NULL UNIQUE PRIMARY KEY,
task_id TEXT NOT NULL,
started_at TIMESTAMP DEFAULT,
ended_at TIMESTAMP DEFAULT,
success BOOLEAN DEFAULT FALSE,
error_message TEXT DEFAULT '',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)

64
scripts/init_db.sh Executable file
View File

@@ -0,0 +1,64 @@
#!/usr/bin/env bash
set -x
set -eo pipefail
# Check if a custom parameter has been set, otherwise use default values
DB_PORT="${DB_PORT:=5432}"
SUPERUSER="${SUPERUSER:=postgres}"
SUPERUSER_PWD="${SUPERUSER_PWD:=YSm*wF60c72CLJD!}"
APP_USER="${APP_USER:=app}"
APP_USER_PWD="${APP_USER_PWD:=YSm*wF60c72CLJD!}"
APP_DB_NAME="${APP_DB_NAME:=vulnfeed}"
# Allow to skip Docker if a dockerized Postgres database is already running
if [[ -z "${SKIP_DOCKER}" ]]
then
# if a postgres container is running, print instructions to kill it and exit
RUNNING_POSTGRES_CONTAINER=$(docker ps --filter 'name=postgres' --format '{{.ID}}')
if [[ -n $RUNNING_POSTGRES_CONTAINER ]]; then
echo >&2 "there is a postgres container already running, kill it with"
echo >&2 " docker kill ${RUNNING_POSTGRES_CONTAINER}"
exit 1
fi
CONTAINER_NAME="postgres_$(date '+%s')"
# Launch postgres using Docker
docker run \
--env POSTGRES_USER=${SUPERUSER} \
--env POSTGRES_PASSWORD=${SUPERUSER_PWD} \
--health-cmd="pg_isready -U ${SUPERUSER} || exit 1" \
--health-interval=1s \
--health-timeout=5s \
--health-retries=5 \
--publish "${DB_PORT}":5432 \
--detach \
--name "${CONTAINER_NAME}" \
postgres -N 1000
# ^ Increased maximum number of connections for testing purposes
until [ \
"$(docker inspect -f "{{.State.Health.Status}}" ${CONTAINER_NAME})" == \
"healthy" \
]; do
>&2 echo "Postgres is still unavailable - sleeping"
sleep 1
done
# Create the application user
CREATE_QUERY="CREATE USER ${APP_USER} WITH PASSWORD '${APP_USER_PWD}';"
docker exec -it "${CONTAINER_NAME}" psql -U "${SUPERUSER}" -c "${CREATE_QUERY}"
# Grant create db privileges to the app user
GRANT_QUERY="ALTER USER ${APP_USER} CREATEDB;"
docker exec -it "${CONTAINER_NAME}" psql -U "${SUPERUSER}" -c "${GRANT_QUERY}"
fi
>&2 echo "Postgres is up and running on port ${DB_PORT} - running migrations now!"
# Create the application database
DATABASE_URL=postgres://${APP_USER}:${APP_USER_PWD}@localhost:${DB_PORT}/${APP_DB_NAME}
export DATABASE_URL
sqlx database create
sqlx migrate run
>&2 echo "Postgres ready to go!"

32
src/bin/vulnfeed/main.rs Normal file
View File

@@ -0,0 +1,32 @@
use clap::Parser;
use vulnfeed::{cli, errors::Error, utils::{styled::styled, version::version}};
use error_stack::Result;
#[derive(Debug, clap::Parser)]
#[command(name = "vulnfeed", version, long_version = version(), styles=styled())]
struct Command {
#[clap(subcommand)]
cmd: SubCommand,
}
impl Command {
pub fn run(self) -> Result<(), Error> {
match self.cmd {
SubCommand::Server(cmd) => cmd.run(),
}
}
}
#[derive(Debug, clap::Subcommand)]
enum SubCommand {
Server(cli::CommandStart),
}
fn main() -> Result<(), Error> {
let cmd = Command::parse();
cmd.run()
}

63
src/cli.rs Normal file
View File

@@ -0,0 +1,63 @@
use std::path::PathBuf;
use clap::ValueHint;
use error_stack::{Result, ResultExt};
use crate::{config::settings::{load_config, Config, LoadConfigResult}, errors::Error, input::http::http_server::{self, make_acceptor_and_advertise_addr}, utils::{num_cpus, runtime::{make_runtime, Runtime}, telemetry}};
#[derive(Debug, clap::Parser)]
pub struct CommandStart {
#[clap(short, long, help = "Path to config file", value_hint = ValueHint::FilePath)]
config_file: PathBuf,
}
impl CommandStart {
pub fn run(self) -> Result<(), Error> {
error_stack::Report::set_color_mode(error_stack::fmt::ColorMode::None);
let LoadConfigResult { config, warnings } = load_config(self.config_file)?;
let telemetry_runtime = make_telemetry_runtime();
let mut drop_guards =
telemetry::init(&telemetry_runtime, "vulnfeed", config.telemetry.clone());
drop_guards.push(Box::new(telemetry_runtime));
for warning in warnings {
log::warn!("{warning}");
}
log::info!("server is starting with config: {config:#?}");
let server_runtime = make_vulnfeed_runtime();
server_runtime.block_on(run_server(&server_runtime, config))
}
}
async fn run_server(server_rt: &Runtime, config: Config) -> Result<(), Error> {
let make_error = || Error::Message("failed to start server".to_string());
let (shutdown_tx, shutdown_rx) = mea::shutdown::new_pair();
let (acceptor, advertise_addr) = make_acceptor_and_advertise_addr(
&config.server.listen_addr,
config.server.advertise_addr.as_deref(),
)
.await
.change_context_lazy(make_error)?;
let server = http_server::start_server(server_rt, shutdown_rx, acceptor, advertise_addr)
.await
.change_context_lazy(|| {
Error::Message("A fatal error has occurred in server process.".to_string())
})?;
ctrlc::set_handler(move || shutdown_tx.shutdown()).change_context_lazy(|| {
Error::Message("failed to setup ctrl-c signal handle".to_string())
})?;
server.await_shutdown().await;
Ok(())
}
fn make_vulnfeed_runtime() -> Runtime {
let parallelism = num_cpus().get();
make_runtime("vulnfeed_runtime", "vulnfeed_thread", parallelism)
}
fn make_telemetry_runtime() -> Runtime {
make_runtime("telemetry_runtime", "telemetry_thread", 1)
}

1
src/config/mod.rs Normal file
View File

@@ -0,0 +1 @@
pub mod settings;

273
src/config/settings.rs Normal file
View File

@@ -0,0 +1,273 @@
use std::{path::PathBuf, str::FromStr};
use error_stack::{Result, ResultExt, bail};
use serde::de::IntoDeserializer;
use serde::{Deserialize, Serialize};
use toml_edit::DocumentMut;
use crate::errors::Error;
pub struct LoadConfigResult {
pub config: Config,
pub warnings: Vec<String>,
}
pub fn load_config(config_file: PathBuf) -> Result<LoadConfigResult, Error> {
let content = std::fs::read_to_string(&config_file).change_context_lazy(|| {
Error::Message(format!(
"failed to read config file: {}",
config_file.display()
))
})?;
let mut config = DocumentMut::from_str(&content)
.change_context_lazy(|| Error::Message("failed to parse config file".to_string()))?;
let env = std::env::vars()
.filter(|(key, _)| key.starts_with("VULNFEED_"))
.collect::<std::collections::HashMap<String, String>>();
fn set_toml_path(
doc: &mut DocumentMut,
key: &str,
path: &'static str,
value: toml_edit::Item,
) -> Vec<String> {
let mut current = doc.as_item_mut();
let mut warnings = vec![];
let parts = path.split('.').collect::<Vec<_>>();
let len = parts.len();
assert!(len > 0, "path must not be empty");
for part in parts.iter().take(len - 1) {
if current.get(part).is_none() {
warnings.push(format!(
"[key={key}] config path '{path}' has missing parent '{part}'; created",
));
}
current = &mut current[part];
}
current[parts[len - 1]] = value;
warnings
}
let known_option_entries = known_option_entries();
let mut warnings = vec![];
for (k, v) in env {
let Some(ent) = known_option_entries.iter().find(|e| e.env_name == k) else {
bail!(Error::Message(format!(
"failed to parse unknown environment variable {k} with value {v}"
)))
};
let (path, item) = match ent.ent_type {
"string" => {
let path = ent.ent_path;
let value = toml_edit::value(v);
(path, value)
}
"integer" => {
let path = ent.ent_path;
let value = v.parse::<i64>().change_context_lazy(|| {
Error::Message(format!("failed to parse integer value {v} of key {k}"))
})?;
let value = toml_edit::value(value);
(path, value)
}
"boolean" => {
let path = ent.ent_path;
let value = v.parse::<bool>().change_context_lazy(|| {
Error::Message(format!("failed to parse boolean value {v} of key {k}"))
})?;
let value = toml_edit::value(value);
(path, value)
}
ty => {
bail!(Error::Message(format!(
"failed to parse environment variable {k} with value {v} and resolved type {ty}"
)))
}
};
let new_warnings = set_toml_path(&mut config, &k, path, item);
warnings.extend(new_warnings);
}
let config = Config::deserialize(config.into_deserializer())
.change_context_lazy(|| Error::Message("failed to deserialize config".to_string()))?;
Ok(LoadConfigResult { config, warnings })
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct Config {
pub server: Server,
pub database: Database,
pub telemetry: TelemetryConfig,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct Server {
#[serde(default = "default_listen_addr")]
pub listen_addr: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub advertise_addr: Option<String>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct Database {
#[serde(default = "default_database_host")]
pub host: String,
#[serde(default = "default_database_port")]
pub port: u16,
#[serde(default = "default_database_username")]
pub username: String,
#[serde(default = "default_database_password")]
pub password: String,
#[serde(default = "default_database_name")]
pub database_name: String,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct TelemetryConfig {
#[serde(default = "LogsConfig::disabled")]
pub logs: LogsConfig,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct LogsConfig {
#[serde(skip_serializing_if = "Option::is_none")]
pub file: Option<FileAppenderConfig>,
#[serde(skip_serializing_if = "Option::is_none")]
pub stderr: Option<StderrAppenderConfig>,
}
impl LogsConfig {
pub fn disabled() -> Self {
Self {
file: None,
stderr: None,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct FileAppenderConfig {
pub filter: String,
pub dir: String,
pub max_files: usize,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct StderrAppenderConfig {
pub filter: String,
}
impl Default for Config {
fn default() -> Self {
Self {
server: Server {
listen_addr: default_listen_addr(),
advertise_addr: None,
},
database: Database {
host: default_database_host(),
port: default_database_port(),
username: default_database_username(),
password: default_database_password(),
database_name: default_database_name(),
},
telemetry: TelemetryConfig {
logs: LogsConfig {
file: Some(FileAppenderConfig {
filter: "INFO".to_string(),
dir: "logs".to_string(),
max_files: 64,
}),
stderr: Some(StderrAppenderConfig {
filter: "INFO".to_string(),
}),
},
},
}
}
}
#[derive(Clone, Debug, Ord, PartialOrd, Eq, PartialEq)]
pub struct OptionEntry {
pub env_name: &'static str,
pub ent_path: &'static str,
pub ent_type: &'static str,
}
pub const fn known_option_entries() -> &'static [OptionEntry] {
&[
OptionEntry {
env_name: "VULNFEED_CONFIG_SERVER_LISTEN_ADDR",
ent_path: "server.listen_addr",
ent_type: "string",
},
OptionEntry {
env_name: "VULNFEED_CONFIG_AUTH_JWT_SECRET",
ent_path: "auth.jwt.secret",
ent_type: "string",
},
OptionEntry {
env_name: "VULNFEED_CONFIG_AUTH_JWT_EXPIRATION",
ent_path: "auth.jwt.expiration",
ent_type: "integer",
},
OptionEntry {
env_name: "VULNFEED_CONFIG_DATABASE_HOST",
ent_path: "database.host",
ent_type: "string",
},
OptionEntry {
env_name: "VULNFEED_CONFIG_DATABASE_PORT",
ent_path: "database.port",
ent_type: "integer",
},
OptionEntry {
env_name: "VULNFEED_CONFIG_DATABASE_NAME",
ent_path: "database.database_name",
ent_type: "string",
},
OptionEntry {
env_name: "VULNFEED_CONFIG_DATABASE_USERNAME",
ent_path: "database.username",
ent_type: "string",
},
OptionEntry {
env_name: "VULNFEED_CONFIG_DATABASE_PASSWORD",
ent_path: "database.password",
ent_type: "string",
},
]
}
fn default_listen_addr() -> String {
"0.0.0.0:9000".to_string()
}
fn default_database_host() -> String {
"localhost".to_string()
}
fn default_database_port() -> u16 {
5432
}
fn default_database_username() -> String {
"postgres".to_string()
}
fn default_database_password() -> String {
"postgres".to_string()
}
fn default_database_name() -> String {
"vulnfeed".to_string()
}

10
src/errors.rs Normal file
View File

@@ -0,0 +1,10 @@
use thiserror::Error;
#[derive(Debug, Error)]
pub enum Error {
#[error("{0}")]
Message(String),
#[error("{0}")]
BadRequest(String),
}

View File

View File

@@ -0,0 +1,102 @@
use std::{io, net::SocketAddr, time::Duration};
use mea::{shutdown::ShutdownRecv, waitgroup::WaitGroup};
use poem::{listener::{Acceptor, Listener, TcpAcceptor, TcpListener}, Route};
use crate::utils::runtime::{self, Runtime};
pub(crate) type ServerFuture<T> = runtime::JoinHandle<Result<T, io::Error>>;
#[derive(Debug)]
pub struct ServerState {
advertise_addr: SocketAddr,
server_fut: ServerFuture<()>,
shutdown_rx_server: ShutdownRecv,
}
impl ServerState {
pub fn advertise_addr(&self) -> SocketAddr {
self.advertise_addr
}
pub async fn await_shutdown(self) {
self.shutdown_rx_server.is_shutdown().await;
log::info!("http server is shutting down");
match self.server_fut.await {
Ok(_) => log::info!("http server stopped"),
Err(err) => log::error!(err:?;"http server failed."),
}
}
}
pub async fn make_acceptor_and_advertise_addr(
listen_addr: &str,
advertise_addr: Option<&str>,
) -> Result<(TcpAcceptor, SocketAddr), io::Error> {
log::info!("listening on {}", listen_addr);
let acceptor = TcpListener::bind(&listen_addr).into_acceptor().await?;
let listen_addr = acceptor.local_addr()[0]
.as_socket_addr()
.cloned()
.ok_or_else(|| {
io::Error::new(
io::ErrorKind::AddrNotAvailable,
"failed to get local listen addr",
)
})?;
let advertise_addr = match advertise_addr {
None => {
if listen_addr.ip().is_unspecified() {
let ip = local_ip_address::local_ip().map_err(io::Error::other)?;
let port = listen_addr.port();
SocketAddr::new(ip, port)
} else {
listen_addr
}
}
Some(advertise_addr) => advertise_addr
.parse::<SocketAddr>()
.map_err(io::Error::other)?,
};
Ok((acceptor, advertise_addr))
}
pub async fn start_server(
rt: &Runtime,
shutdown_rx: ShutdownRecv,
acceptor: TcpAcceptor,
advertise_addr: SocketAddr,
) -> Result<ServerState, io::Error>{
let wg = WaitGroup::new();
let shutdown_rx_server = shutdown_rx;
let server_fut = {
let wg_clone = wg.clone();
let shutdown_clone = shutdown_rx_server.clone();
let route = Route::new();
let listen_addr = acceptor.local_addr()[0].clone();
let signal = async move {
log::info!("server has started on [{listen_addr}]");
drop(wg_clone);
shutdown_clone.is_shutdown().await;
log::info!("server is closing");
};
rt.spawn(async move {
poem::Server::new_with_acceptor(acceptor)
.run_with_graceful_shutdown(route, signal, Some(Duration::from_secs(10)))
.await
})
};
wg.await;
Ok(ServerState {
advertise_addr,
server_fut,
shutdown_rx_server,
})
}

3
src/input/http/mod.rs Normal file
View File

@@ -0,0 +1,3 @@
pub mod response;
pub mod handlers;
pub mod http_server;

129
src/input/http/response.rs Normal file
View File

@@ -0,0 +1,129 @@
use error_stack::Report;
use poem::{IntoResponse, Response, error::ResponseError, http::StatusCode, web::Json};
use serde::Serialize;
use std::fmt;
use crate::errors::Error;
#[derive(Debug, Clone)]
pub struct ApiSuccess<T: Serialize + PartialEq + Send + Sync>(StatusCode, Json<ApiResponseBody<T>>);
impl<T> PartialEq for ApiSuccess<T>
where
T: Serialize + PartialEq + Send + Sync,
{
fn eq(&self, other: &Self) -> bool {
self.0 == other.0 && self.1.0 == other.1.0
}
}
impl<T: Serialize + PartialEq + Send + Sync> IntoResponse for ApiSuccess<T> {
fn into_response(self) -> Response {
(self.0, self.1).into_response()
}
}
impl<T: Serialize + PartialEq + Send + Sync> ApiSuccess<T> {
pub fn new(status: StatusCode, data: T) -> Self {
ApiSuccess(status, Json(ApiResponseBody::new(status, data)))
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
pub struct ApiResponseBody<T: Serialize + PartialEq + Send + Sync> {
status_code: u16,
data: T,
}
impl<T: Serialize + PartialEq + Send + Sync> ApiResponseBody<T> {
pub fn new(status_code: StatusCode, data: T) -> Self {
Self {
status_code: status_code.as_u16(),
data,
}
}
}
impl ApiResponseBody<ApiErrorData> {
pub fn new_error(status_code: StatusCode, message: String) -> Self {
Self {
status_code: status_code.as_u16(),
data: ApiErrorData { message },
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
pub struct ApiErrorData {
pub message: String,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ApiError {
InternalServerError(String),
UnprocessableEntity(String),
Unauthorized(String),
BadRequest(String),
}
impl From<Report<Error>> for ApiError {
fn from(e: Report<Error>) -> Self {
log::error!("ApiError: {:#?}", e);
match e.downcast_ref::<Error>() {
Some(Error::BadRequest(msg)) => Self::BadRequest(msg.to_string()),
Some(e) => Self::InternalServerError(e.to_string()),
None => Self::InternalServerError(e.to_string()),
}
}
}
impl ResponseError for ApiError {
fn status(&self) -> StatusCode {
match self {
Self::InternalServerError(_) => StatusCode::INTERNAL_SERVER_ERROR,
Self::UnprocessableEntity(_) => StatusCode::UNPROCESSABLE_ENTITY,
Self::Unauthorized(_) => StatusCode::UNAUTHORIZED,
Self::BadRequest(_) => StatusCode::BAD_REQUEST,
}
}
fn as_response(&self) -> Response {
let (status, message, should_log) = match self {
Self::InternalServerError(_) => (
StatusCode::INTERNAL_SERVER_ERROR,
"Internal server error".to_string(),
true,
),
Self::Unauthorized(_) => (StatusCode::UNAUTHORIZED, "Unauthorized".to_string(), true),
Self::UnprocessableEntity(msg) => {
(StatusCode::UNPROCESSABLE_ENTITY, msg.to_string(), false)
}
Self::BadRequest(msg) => (StatusCode::BAD_REQUEST, msg.to_string(), false),
};
if should_log {
log::error!(
"{}: {}",
status.canonical_reason().unwrap_or("Error"),
message
);
}
let mut resp = Json(ApiResponseBody::new_error(status, message)).into_response();
resp.set_status(status);
resp
}
}
impl fmt::Display for ApiError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::InternalServerError(msg) => write!(f, "Internal server error: {}", msg),
Self::UnprocessableEntity(msg) => write!(f, "Unprocessable entity: {}", msg),
Self::Unauthorized(msg) => write!(f, "Unauthorized: {}", msg),
Self::BadRequest(msg) => write!(f, "Bad request: {}", msg),
}
}
}
impl std::error::Error for ApiError {}

1
src/input/mod.rs Normal file
View File

@@ -0,0 +1 @@
pub mod http;

6
src/lib.rs Normal file
View File

@@ -0,0 +1,6 @@
pub mod cli;
pub mod errors;
pub mod config;
pub mod input;
pub mod output;
pub mod utils;

0
src/output/mod.rs Normal file
View File

15
src/utils/mod.rs Normal file
View File

@@ -0,0 +1,15 @@
pub mod runtime;
pub mod version;
pub mod styled;
pub mod telemetry;
#[track_caller]
pub fn num_cpus() -> std::num::NonZeroUsize {
match std::thread::available_parallelism() {
Ok(parallelism) => parallelism,
Err(err) => {
log::warn!("failed to fetch the available parallelism (fallback to 1): {err:?}");
std::num::NonZeroUsize::new(1).unwrap()
}
}
}

161
src/utils/runtime.rs Normal file
View File

@@ -0,0 +1,161 @@
use std::future::Future;
use std::panic::resume_unwind;
use std::sync::Arc;
use std::task::ready;
use std::time::Duration;
use std::time::Instant;
pub fn make_runtime(runtime_name: &str, thread_name: &str, worker_threads: usize) -> Runtime {
log::debug!(
"creating runtime with runtime_name: {runtime_name}, thread_name: {thread_name}, work_threads: {worker_threads}"
);
Builder::new(runtime_name, thread_name)
.worker_threads(worker_threads)
.build()
.expect("failed to create runtime")
}
#[derive(Debug, Clone)]
pub struct Runtime {
name: String,
runtime: Arc<tokio::runtime::Runtime>,
}
impl Runtime {
pub fn name(&self) -> &str {
&self.name
}
pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
JoinHandle::new(self.runtime.spawn(future))
}
pub fn spawn_blocking<F, R>(&self, func: F) -> JoinHandle<R>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
JoinHandle::new(self.runtime.spawn_blocking(func))
}
pub fn block_on<F: Future>(&self, future: F) -> F::Output {
self.runtime.block_on(future)
}
}
impl fastimer::Spawn for Runtime {
fn spawn<F: Future<Output = ()> + Send + 'static>(&self, future: F) {
Runtime::spawn(self, future);
}
}
#[derive(Debug)]
struct Timer;
pub fn timer() -> impl fastimer::MakeDelay {
Timer
}
impl fastimer::MakeDelay for Timer {
type Delay = tokio::time::Sleep;
fn delay_util(&self, at: Instant) -> Self::Delay {
tokio::time::sleep_until(tokio::time::Instant::from_std(at))
}
fn delay(&self, duration: Duration) -> Self::Delay {
tokio::time::sleep(duration)
}
}
#[pin_project::pin_project]
#[derive(Debug)]
pub struct JoinHandle<R> {
#[pin]
inner: tokio::task::JoinHandle<R>,
}
impl<R> JoinHandle<R> {
fn new(inner: tokio::task::JoinHandle<R>) -> Self {
Self { inner }
}
}
impl<R> Future for JoinHandle<R> {
type Output = R;
fn poll(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
let this = self.project();
let val = ready!(this.inner.poll(cx));
match val {
Ok(val) => std::task::Poll::Ready(val),
Err(err) => {
if err.is_panic() {
resume_unwind(err.into_panic())
} else {
unreachable!()
}
}
}
}
}
pub struct Builder {
runtime_name: String,
thread_name: String,
builder: tokio::runtime::Builder,
}
impl Builder {
pub fn new(runtime_name: impl Into<String>, thread_name: impl Into<String>) -> Self {
Self {
runtime_name: runtime_name.into(),
thread_name: thread_name.into(),
builder: tokio::runtime::Builder::new_multi_thread(),
}
}
pub fn worker_threads(&mut self, val: usize) -> &mut Self {
self.builder.worker_threads(val);
self
}
pub fn max_blocking_threads(&mut self, val: usize) -> &mut Self {
self.builder.max_blocking_threads(val);
self
}
pub fn thread_keep_alive(&mut self, duration: Duration) -> &mut Self {
self.builder.thread_keep_alive(duration);
self
}
pub fn runtime_name(&mut self, val: impl Into<String>) -> &mut Self {
self.runtime_name = val.into();
self
}
pub fn thread_name(&mut self, val: impl Into<String>) -> &mut Self {
self.thread_name = val.into();
self
}
pub fn build(&mut self) -> std::io::Result<Runtime> {
let name = self.runtime_name.clone();
let runtime = self
.builder
.enable_all()
.thread_name(self.thread_name.clone())
.build()
.map(Arc::new)?;
Ok(Runtime { name, runtime })
}
}

39
src/utils/styled.rs Normal file
View File

@@ -0,0 +1,39 @@
pub fn styled() -> clap::builder::Styles {
clap::builder::Styles::styled()
.usage(
anstyle::Style::new()
.bold()
.underline()
.fg_color(Some(anstyle::Color::Ansi(anstyle::AnsiColor::BrightGreen))),
)
.header(
anstyle::Style::new()
.bold()
.underline()
.fg_color(Some(anstyle::Color::Ansi(anstyle::AnsiColor::BrightGreen))),
)
.literal(
anstyle::Style::new()
.bold()
.fg_color(Some(anstyle::Color::Ansi(anstyle::AnsiColor::BrightCyan))),
)
.invalid(
anstyle::Style::new()
.bold()
.fg_color(Some(anstyle::Color::Ansi(anstyle::AnsiColor::Red))),
)
.error(
anstyle::Style::new()
.bold()
.fg_color(Some(anstyle::Color::Ansi(anstyle::AnsiColor::Red))),
)
.valid(
anstyle::Style::new()
.bold()
.underline()
.fg_color(Some(anstyle::Color::Ansi(anstyle::AnsiColor::Green))),
)
.placeholder(
anstyle::Style::new().fg_color(Some(anstyle::Color::Ansi(anstyle::AnsiColor::Cyan))),
)
}

81
src/utils/telemetry.rs Normal file
View File

@@ -0,0 +1,81 @@
use logforth::{
append::{
self,
rolling_file::{RollingFileBuilder, Rotation},
},
diagnostic::{FastraceDiagnostic, StaticDiagnostic},
filter::{EnvFilter, env_filter::EnvFilterBuilder},
layout,
};
use crate::config::settings::TelemetryConfig;
use super::runtime::Runtime;
pub fn init(
rt: &Runtime,
service_name: &'static str,
config: TelemetryConfig,
) -> Vec<Box<dyn Send + Sync + 'static>> {
let mut drop_guards = vec![];
drop_guards.extend(init_logs(rt, service_name, &config));
drop_guards
}
fn init_logs(
rt: &Runtime,
service_name: &'static str,
config: &TelemetryConfig,
) -> Vec<Box<dyn Send + Sync + 'static>> {
let _ = rt;
let static_diagnostic = StaticDiagnostic::default();
let mut drop_guards: Vec<Box<dyn Send + Sync + 'static>> = Vec::new();
let mut builder = logforth::builder();
if let Some(file) = &config.logs.file {
let (rolling, guard) = RollingFileBuilder::new(&file.dir)
.layout(layout::JsonLayout::default())
.rotation(Rotation::Hourly)
.filename_prefix(service_name)
.filename_suffix("log")
.max_log_files(file.max_files)
.build()
.expect("failed to initialize rolling file appender");
drop_guards.push(guard);
builder = builder.dispatch(|b| {
b.filter(make_rust_log_filter(&file.filter))
.diagnostic(FastraceDiagnostic::default())
.diagnostic(static_diagnostic.clone())
.append(rolling)
});
}
if let Some(stderr) = &config.logs.stderr {
builder = builder.dispatch(|b| {
b.filter(make_rust_log_filter_with_default_env(&stderr.filter))
.diagnostic(FastraceDiagnostic::default())
.diagnostic(static_diagnostic.clone())
.append(append::Stderr::default())
});
}
let _ = builder.try_apply();
drop_guards
}
fn make_rust_log_filter(filter: &str) -> EnvFilter {
let builder = EnvFilterBuilder::new()
.try_parse(filter)
.unwrap_or_else(|_| panic!("failed to parse filter: {filter}"));
EnvFilter::new(builder)
}
fn make_rust_log_filter_with_default_env(filter: &str) -> EnvFilter {
if let Ok(filter) = std::env::var("RUST_LOG") {
make_rust_log_filter(&filter)
} else {
make_rust_log_filter(filter)
}
}

46
src/utils/version.rs Normal file
View File

@@ -0,0 +1,46 @@
use serde::{Deserialize, Serialize};
shadow_rs::shadow!(build);
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct BuildInfo {
pub branch: &'static str,
pub commit: &'static str,
pub commit_short: &'static str,
pub clean: bool,
pub source_time: &'static str,
pub build_time: &'static str,
pub rustc: &'static str,
pub target: &'static str,
pub version: &'static str,
}
pub const fn build_info() -> BuildInfo {
BuildInfo {
branch: build::BRANCH,
commit: build::COMMIT_HASH,
commit_short: build::SHORT_COMMIT,
clean: build::GIT_CLEAN,
source_time: env!("SOURCE_TIMESTAMP"),
build_time: env!("BUILD_TIMESTAMP"),
rustc: build::RUST_VERSION,
target: build::BUILD_TARGET,
version: build::PKG_VERSION,
}
}
pub const fn version() -> &'static str {
const BUILD_INFO: BuildInfo = build_info();
const_format::formatcp!(
"\nversion: {}\nbranch: {}\ncommit: {}\nclean: {}\nsource_time: {}\nbuild_time: {}\nrustc: {}\ntarget: {}",
BUILD_INFO.version,
BUILD_INFO.branch,
BUILD_INFO.commit,
BUILD_INFO.clean,
BUILD_INFO.source_time,
BUILD_INFO.build_time,
BUILD_INFO.rustc,
BUILD_INFO.target,
)
}