feat: support sync data task create api

This commit is contained in:
fan-tastic-z
2025-08-21 19:29:58 +08:00
parent 060ee52468
commit 0ce56914fb
41 changed files with 1864 additions and 83 deletions

729
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -9,25 +9,51 @@ path = "src/bin/vulnfeed/main.rs"
[dependencies]
anstyle = "1.0.11"
async-trait = "0.1.89"
chrono = { version = "0.4.41", features = ["serde"] }
clap = { version = "4.5.45", features = ["derive"] }
const_format = "0.2.34"
ctrlc = "3.4.7"
dashmap = "6.1.0"
error-stack = "0.5.0"
fastimer = "0.9.0"
gix-discover = "0.41.0"
jiff = { version = "0.2.15", features = ["serde"] }
lazy_static = "1.5.0"
local-ip-address = "0.6.5"
log = "0.4.27"
logforth = { version = "0.26.2", features = ["colored", "layout-json", "append-fastrace", "append-rolling-file", "diagnostic-fastrace"] }
logforth = { version = "0.27.0", features = [
"colored",
"layout-json",
"append-fastrace",
"append-rolling-file",
"diagnostic-fastrace",
] }
mea = "0.4.0"
modql = { version = "0.4.1", features = ["with-sea-query"] }
nutype = { version = "0.6.2", features = ["serde"] }
pin-project = "1.1.10"
poem = "3.1.12"
reqwest = { version = "0.12.23", features = ["json"] }
sea-query = { version = "0.32.7", features = ["postgres-array"] }
sea-query-binder = { version = "0.7.0", features = [
"sqlx-postgres",
"with-time",
"postgres-array",
] }
serde = { version = "1.0.219", features = ["derive"] }
serde_json = "1.0.142"
serde_json = "1.0.143"
shadow-rs = "1.2.1"
sqlx = { version = "0.8.6", features = ["postgres", "runtime-tokio", "macros", "uuid", "migrate", "chrono"] }
thiserror = "2.0.14"
sqlx = { version = "0.8.6", features = [
"postgres",
"runtime-tokio",
"macros",
"uuid",
"migrate",
"chrono",
] }
thiserror = "2.0.16"
tokio = { version = "1.47.1", features = ["rt-multi-thread", "macros"] }
tokio-cron-scheduler = { version = "0.14.0", features = ["signal"] }
toml_edit = { version = "0.23.3", features = ["serde"] }
uuid = { version = "1.18.0", features = ["v4", "serde"] }

View File

@@ -1,38 +1,39 @@
-- Add migration script here
CREATE TABLE vuln_information (
id TEXT NOT NULL UNIQUE PRIMARY KEY,
id BIGSERIAL PRIMARY KEY,
key TEXT NOT NULL UNIQUE,
title TEXT NOT NULL DEFAULT '',
description TEXT NOT NULL DEFAULT '',
severity TEXT NOT NULL DEFAULT '',
cve TEXT NOT NULL DEFAULT '',
disclosure TEXT NOT NULL DEFAULT '',
solutions TEXT NOT NULL DEFAULT '',
reference_links TEXT [] DEFAULT '{}',
tags TEXT [] DEFAULT '{}',
github_search TEXT [] DEFAULT '{}',
reference_links TEXT [] NOT NULL DEFAULT '{}',
tags TEXT [] NOT NULL DEFAULT '{}',
github_search TEXT [] NOT NULL DEFAULT '{}',
reasons TEXT [] NOT NULL DEFAULT '{}',
source TEXT NOT NULL DEFAULT '',
pushed BOOLEAN DEFAULT FALSE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
pushed BOOLEAN NOT NULL DEFAULT FALSE,
created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE sync_task (
id TEXT NOT NULL UNIQUE PRIMARY KEY,
id BIGSERIAL PRIMARY KEY,
name TEXT NOT NULL DEFAULT '',
minute INTEGER NOT NULL DEFAULT 15, -- 默认每15分钟执行一次
status BOOLEAN DEFAULT FALSE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
interval_minutes INTEGER NOT NULL DEFAULT 15,
status BOOLEAN NOT NULL DEFAULT FALSE,
created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE sync_task_record (
id TEXT NOT NULL UNIQUE PRIMARY KEY,
task_id TEXT NOT NULL,
started_at TIMESTAMP DEFAULT,
ended_at TIMESTAMP DEFAULT,
success BOOLEAN DEFAULT FALSE,
error_message TEXT DEFAULT '',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
id BIGSERIAL PRIMARY KEY,
task_id TEXT NOT NULL DEFAULT '',
started_at TIMESTAMPTZ,
ended_at TIMESTAMPTZ,
success BOOLEAN NOT NULL DEFAULT FALSE,
error_message TEXT NOT NULL DEFAULT '',
created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP
);

5
rustfmt.toml Normal file
View File

@@ -0,0 +1,5 @@
comment_width = 120
format_code_in_doc_comments = true
group_imports = "StdExternalCrate"
imports_granularity = "Item"
wrap_comments = true

View File

@@ -1,8 +1,10 @@
use clap::Parser;
use vulnfeed::{cli, errors::Error, utils::{styled::styled, version::version}};
use error_stack::Result;
use vulnfeed::{
cli,
errors::Error,
utils::{styled::styled, version::version},
};
#[derive(Debug, clap::Parser)]
#[command(name = "vulnfeed", version, long_version = version(), styles=styled())]
@@ -24,8 +26,6 @@ enum SubCommand {
Server(cli::CommandStart),
}
fn main() -> Result<(), Error> {
let cmd = Command::parse();
cmd.run()

View File

@@ -1,9 +1,28 @@
use std::path::PathBuf;
use std::{path::PathBuf, sync::Arc};
use crate::{
config::settings::{Config, LoadConfigResult, load_config},
domain::{
models::vuln_information::CreateVulnInformation, ports::VulnService, services::Service,
},
errors::Error,
input::http::http_server::{self, make_acceptor_and_advertise_addr},
output::{db::pg::Pg, plugins, scheduler::Scheduler, worker::worker::Worker},
utils::{
num_cpus,
runtime::{Runtime, make_runtime},
telemetry,
},
};
use clap::ValueHint;
use error_stack::{Result, ResultExt};
use crate::{config::settings::{load_config, Config, LoadConfigResult}, errors::Error, input::http::http_server::{self, make_acceptor_and_advertise_addr}, utils::{num_cpus, runtime::{make_runtime, Runtime}, telemetry}};
#[derive(Clone)]
pub struct Ctx<S: VulnService + Send + Sync + 'static> {
pub vuln_service: Arc<S>,
pub config: Arc<Config>,
pub sched: Arc<Scheduler>,
}
#[derive(Debug, clap::Parser)]
pub struct CommandStart {
@@ -11,7 +30,6 @@ pub struct CommandStart {
config_file: PathBuf,
}
impl CommandStart {
pub fn run(self) -> Result<(), Error> {
error_stack::Report::set_color_mode(error_stack::fmt::ColorMode::None);
@@ -38,7 +56,27 @@ async fn run_server(server_rt: &Runtime, config: Config) -> Result<(), Error> {
)
.await
.change_context_lazy(make_error)?;
let server = http_server::start_server(server_rt, shutdown_rx, acceptor, advertise_addr)
let db = Pg::new(&config).await.change_context_lazy(make_error)?;
let (sender, receiver) = mea::mpsc::unbounded::<CreateVulnInformation>();
plugins::init(sender).change_context_lazy(make_error)?;
let mut worker = Worker::new(receiver, db.clone());
server_rt.spawn(async move { worker.run().await });
let sched = Scheduler::try_new(db.clone())
.await
.change_context_lazy(make_error)?;
let sched = sched.init_from_db().await.change_context_lazy(make_error)?;
let vuln_service = Service::new(db);
let ctx = Ctx {
vuln_service: Arc::new(vuln_service),
config: Arc::new(config),
sched: Arc::new(sched),
};
let server = http_server::start_server(ctx, server_rt, shutdown_rx, acceptor, advertise_addr)
.await
.change_context_lazy(|| {
Error::Message("A fatal error has occurred in server process.".to_string())
@@ -52,7 +90,6 @@ async fn run_server(server_rt: &Runtime, config: Config) -> Result<(), Error> {
Ok(())
}
fn make_vulnfeed_runtime() -> Runtime {
let parallelism = num_cpus().get();
make_runtime("vulnfeed_runtime", "vulnfeed_thread", parallelism)

View File

@@ -15,7 +15,7 @@ pub struct LoadConfigResult {
pub fn load_config(config_file: PathBuf) -> Result<LoadConfigResult, Error> {
let content = std::fs::read_to_string(&config_file).change_context_lazy(|| {
Error::Message(format!(
"failed to read config file: {}",
"failed to read config file {}",
config_file.display()
))
})?;
@@ -251,7 +251,6 @@ fn default_listen_addr() -> String {
"0.0.0.0:9000".to_string()
}
fn default_database_host() -> String {
"localhost".to_string()
}

3
src/domain/mod.rs Normal file
View File

@@ -0,0 +1,3 @@
pub mod models;
pub mod ports;
pub mod services;

2
src/domain/models/mod.rs Normal file
View File

@@ -0,0 +1,2 @@
pub mod sync_data_task;
pub mod vuln_information;

View File

@@ -0,0 +1,67 @@
use chrono::{DateTime, Utc};
use modql::field::Fields;
use nutype::nutype;
use sea_query::Value;
use serde::Serialize;
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, sqlx::FromRow)]
pub struct SyncDataTask {
pub id: i64,
pub name: String,
pub interval_minutes: i32,
pub status: bool,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Fields)]
pub struct CreateSyncDataTaskRequest {
pub name: SyncDataTaskName,
pub interval_minutes: SyncDataTaskIntervalMinutes,
pub status: bool,
}
impl CreateSyncDataTaskRequest {
pub fn new(
name: SyncDataTaskName,
interval_minutes: SyncDataTaskIntervalMinutes,
status: bool,
) -> Self {
Self {
name,
interval_minutes,
status,
}
}
}
#[nutype(
sanitize(trim),
validate(not_empty, len_char_min = 4, len_char_max = 20),
derive(
Clone, Debug, Display, PartialEq, Eq, PartialOrd, Ord, Hash, AsRef, Deref, Borrow, TryFrom,
Serialize
)
)]
pub struct SyncDataTaskName(String);
impl From<SyncDataTaskName> for Value {
fn from(name: SyncDataTaskName) -> Self {
Value::String(Some(Box::new(name.into_inner())))
}
}
#[nutype(
validate(greater_or_equal = 10, less_or_equal = 60),
derive(
Clone, Debug, Display, PartialEq, Eq, PartialOrd, Ord, Hash, AsRef, Deref, Borrow, TryFrom,
Serialize
)
)]
pub struct SyncDataTaskIntervalMinutes(u32);
impl From<SyncDataTaskIntervalMinutes> for Value {
fn from(interval_minutes: SyncDataTaskIntervalMinutes) -> Self {
Value::Int(Some(interval_minutes.into_inner() as i32))
}
}

View File

@@ -0,0 +1,56 @@
use std::fmt;
use chrono::{DateTime, Utc};
use modql::field::Fields;
use serde::Serialize;
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, sqlx::FromRow)]
pub struct VulnInformation {
pub id: i64,
pub key: String,
pub title: String,
pub description: String,
pub severity: String,
pub cve: String,
pub disclosure: String,
pub solutions: String,
pub reference_links: Vec<String>,
pub tags: Vec<String>,
pub github_search: Vec<String>,
pub source: String,
pub reasons: Vec<String>,
pub pushed: bool,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Fields)]
pub struct CreateVulnInformation {
pub key: String,
pub title: String,
pub description: String,
pub severity: String,
pub cve: String,
pub disclosure: String,
pub solutions: String,
pub reference_links: Vec<String>,
pub tags: Vec<String>,
pub github_search: Vec<String>,
pub source: String,
pub reasons: Vec<String>,
pub pushed: bool,
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum Severity {
Low,
Medium,
High,
Critical,
}
impl fmt::Display for Severity {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{:?}", self)
}
}

17
src/domain/ports.rs Normal file
View File

@@ -0,0 +1,17 @@
use crate::{domain::models::sync_data_task::CreateSyncDataTaskRequest, errors::Error};
use error_stack::Result;
use std::future::Future;
pub trait VulnService: Clone + Send + Sync + 'static {
fn create_sync_data_task(
&self,
req: CreateSyncDataTaskRequest,
) -> impl Future<Output = Result<i64, Error>> + Send;
}
pub trait VulnRepository: Clone + Send + Sync + 'static {
fn create_sync_data_task(
&self,
req: CreateSyncDataTaskRequest,
) -> impl Future<Output = Result<i64, Error>> + Send;
}

35
src/domain/services.rs Normal file
View File

@@ -0,0 +1,35 @@
use crate::{
domain::{
models::sync_data_task::CreateSyncDataTaskRequest,
ports::{VulnRepository, VulnService},
},
errors::Error,
};
use error_stack::Result;
#[derive(Debug, Clone)]
pub struct Service<R>
where
R: VulnRepository,
{
repo: R,
}
impl<R> Service<R>
where
R: VulnRepository,
{
pub fn new(repo: R) -> Self {
Self { repo }
}
}
impl<R> VulnService for Service<R>
where
R: VulnRepository,
{
async fn create_sync_data_task(&self, req: CreateSyncDataTaskRequest) -> Result<i64, Error> {
let ret = self.repo.create_sync_data_task(req).await?;
Ok(ret)
}
}

View File

@@ -0,0 +1 @@
pub mod sync_data_task;

View File

@@ -0,0 +1,86 @@
use poem::{
handler,
http::StatusCode,
web::{Data, Json},
};
use serde::{Deserialize, Serialize};
use thiserror::Error;
use crate::{
cli::Ctx,
domain::{
models::sync_data_task::{
CreateSyncDataTaskRequest, SyncDataTaskIntervalMinutes,
SyncDataTaskIntervalMinutesError, SyncDataTaskName, SyncDataTaskNameError,
},
ports::VulnService,
},
input::http::response::{ApiError, ApiSuccess},
};
#[derive(Debug, Clone, PartialEq, Eq, Deserialize)]
pub struct CreateSyncDataTaskHttpRequestBody {
pub name: String,
pub interval_minutes: u32,
pub status: bool,
}
impl CreateSyncDataTaskHttpRequestBody {
pub fn try_into_domain(
self,
) -> Result<CreateSyncDataTaskRequest, ParseCreateSyncDataTaskRequestBodyError> {
let name = SyncDataTaskName::try_new(self.name)?;
let interval_minutes = SyncDataTaskIntervalMinutes::try_new(self.interval_minutes)?;
Ok(CreateSyncDataTaskRequest {
name,
interval_minutes,
status: self.status,
})
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
pub struct CreateSyncDataTaskHttpResponseData {
pub id: i64,
}
#[derive(Debug, Clone, Error)]
pub enum ParseCreateSyncDataTaskRequestBodyError {
#[error(transparent)]
InvalidName(#[from] SyncDataTaskNameError),
#[error(transparent)]
InvalidIntervalMinutes(#[from] SyncDataTaskIntervalMinutesError),
}
impl From<ParseCreateSyncDataTaskRequestBodyError> for ApiError {
fn from(parse_error: ParseCreateSyncDataTaskRequestBodyError) -> Self {
let message = match parse_error {
ParseCreateSyncDataTaskRequestBodyError::InvalidName(e) => {
format!("Name is invalid: {}", e)
}
ParseCreateSyncDataTaskRequestBodyError::InvalidIntervalMinutes(e) => {
format!("Interval minutes is invalid: {}", e)
}
};
ApiError::UnprocessableEntity(message)
}
}
#[handler]
pub async fn create_sync_data_task<S: VulnService + Send + Sync + 'static>(
state: Data<&Ctx<S>>,
Json(body): Json<CreateSyncDataTaskHttpRequestBody>,
) -> Result<ApiSuccess<CreateSyncDataTaskHttpResponseData>, ApiError> {
let req = body.try_into_domain()?;
state
.vuln_service
.create_sync_data_task(req)
.await
.map_err(ApiError::from)
.map(|id| {
ApiSuccess::new(
StatusCode::CREATED,
CreateSyncDataTaskHttpResponseData { id },
)
})
}

View File

@@ -1,13 +1,21 @@
use std::{io, net::SocketAddr, time::Duration};
use mea::{shutdown::ShutdownRecv, waitgroup::WaitGroup};
use poem::{listener::{Acceptor, Listener, TcpAcceptor, TcpListener}, Route};
use poem::{
Endpoint, EndpointExt, Route,
listener::{Acceptor, Listener, TcpAcceptor, TcpListener},
post,
};
use crate::utils::runtime::{self, Runtime};
use crate::{
cli::Ctx,
domain::ports::VulnService,
input::http::handlers::sync_data_task,
utils::runtime::{self, Runtime},
};
pub(crate) type ServerFuture<T> = runtime::JoinHandle<Result<T, io::Error>>;
#[derive(Debug)]
pub struct ServerState {
advertise_addr: SocketAddr,
@@ -30,7 +38,6 @@ impl ServerState {
}
}
pub async fn make_acceptor_and_advertise_addr(
listen_addr: &str,
advertise_addr: Option<&str>,
@@ -66,19 +73,21 @@ pub async fn make_acceptor_and_advertise_addr(
Ok((acceptor, advertise_addr))
}
pub async fn start_server(
pub async fn start_server<S: VulnService + Send + Sync + 'static>(
ctx: Ctx<S>,
rt: &Runtime,
shutdown_rx: ShutdownRecv,
acceptor: TcpAcceptor,
advertise_addr: SocketAddr,
) -> Result<ServerState, io::Error>{
) -> Result<ServerState, io::Error> {
let wg = WaitGroup::new();
let shutdown_rx_server = shutdown_rx;
let server_fut = {
let wg_clone = wg.clone();
let shutdown_clone = shutdown_rx_server.clone();
let route = Route::new();
let route = Route::new()
.nest("/api", api_routes::<S>())
.data(ctx.clone());
let listen_addr = acceptor.local_addr()[0].clone();
let signal = async move {
log::info!("server has started on [{listen_addr}]");
@@ -100,3 +109,16 @@ pub async fn start_server(
shutdown_rx_server,
})
}
fn api_routes<S: VulnService + Send + Sync + 'static>() -> impl Endpoint {
Route::new().nest(
"/",
Route::new().nest(
"/sync_data_task",
Route::new().at(
"",
post(sync_data_task::create_sync_data_task::<S>::default()),
),
),
)
}

View File

@@ -1,3 +1,3 @@
pub mod response;
pub mod handlers;
pub mod http_server;
pub mod response;

View File

@@ -1,6 +1,7 @@
pub mod cli;
pub mod errors;
pub mod config;
pub mod domain;
pub mod errors;
pub mod input;
pub mod output;
pub mod utils;

168
src/output/db/base.rs Normal file
View File

@@ -0,0 +1,168 @@
use error_stack::{Result, ResultExt};
use modql::{SIden, field::HasSeaFields};
use sea_query::{
Alias, Asterisk, Expr, Iden, IntoIden, OnConflict, PostgresQueryBuilder, Query, TableRef,
};
use sea_query_binder::SqlxBinder;
use sqlx::{FromRow, Postgres, Transaction};
use crate::errors::Error;
pub trait Dao {
const TABLE: &'static str;
fn table_ref() -> TableRef {
TableRef::Table(SIden(Self::TABLE).into_iden())
}
}
#[derive(Iden)]
pub enum CommonIden {
Id,
}
pub async fn dao_create<D, E>(tx: &mut Transaction<'_, Postgres>, req: E) -> Result<i64, Error>
where
E: HasSeaFields,
D: Dao,
{
let fields = req.not_none_sea_fields();
let (columns, sea_values) = fields.for_sea_insert();
let mut query = Query::insert();
query
.into_table(D::table_ref())
.columns(columns)
.values(sea_values)
.change_context_lazy(|| Error::Message("failed to create record".to_string()))?
.returning(Query::returning().columns([CommonIden::Id]));
let (sql, values) = query.build_sqlx(PostgresQueryBuilder);
log::debug!("sql: {} values: {:?}", sql, values);
let sqlx_query = sqlx::query_as_with::<_, (i64,), _>(&sql, values);
let (id,) = sqlx_query
.fetch_one(tx.as_mut())
.await
.change_context_lazy(|| Error::Message("failed to create record".to_string()))?;
Ok(id)
}
pub async fn dao_first<D, T>(tx: &mut Transaction<'_, Postgres>) -> Result<Option<T>, Error>
where
D: Dao,
T: for<'r> FromRow<'r, sqlx::postgres::PgRow> + Unpin + Send,
{
let (sql, values) = Query::select()
.from(D::table_ref())
.column(Asterisk)
.build_sqlx(PostgresQueryBuilder);
log::debug!("sql: {} values: {:?}", sql, values);
let ret = sqlx::query_as_with::<_, T, _>(&sql, values)
.fetch_optional(tx.as_mut())
.await
.change_context_lazy(|| Error::Message("failed to fetch one record".to_string()))?;
Ok(ret)
}
pub async fn dao_upsert<D, E>(
tx: &mut Transaction<'_, Postgres>,
req: E,
conflict_column: &str,
update_columns: &[&str],
) -> Result<i64, Error>
where
E: HasSeaFields,
D: Dao,
{
let fields = req.not_none_sea_fields();
let (columns, sea_values) = fields.for_sea_insert();
let mut query = Query::insert();
query
.into_table(D::table_ref())
.columns(columns)
.values(sea_values)
.change_context_lazy(|| Error::Message("failed to upsert record".to_string()))?;
let on_conflict = if update_columns.is_empty() {
OnConflict::column(Alias::new(conflict_column))
.do_nothing()
.to_owned()
} else {
let mut on_conflict = OnConflict::column(Alias::new(conflict_column));
for &col in update_columns {
on_conflict = on_conflict.update_column(Alias::new(col)).to_owned();
}
on_conflict
};
query.on_conflict(on_conflict);
query.returning(Query::returning().columns([CommonIden::Id]));
let (sql, values) = query.build_sqlx(PostgresQueryBuilder);
log::debug!("sql: {} values: {:?}", sql, values);
let sqlx_query = sqlx::query_as_with::<_, (i64,), _>(&sql, values);
let (id,) = sqlx_query
.fetch_one(tx.as_mut())
.await
.change_context_lazy(|| Error::Message("failed to upsert record".to_string()))?;
Ok(id)
}
pub async fn dao_fetch_by_column<D, T>(
tx: &mut Transaction<'_, Postgres>,
column_name: &str,
value: &str,
) -> Result<Option<T>, Error>
where
D: Dao,
T: for<'r> FromRow<'r, sqlx::postgres::PgRow> + Unpin + Send,
{
let (sql, values) = Query::select()
.from(D::table_ref())
.column(Asterisk)
.and_where(Expr::col(Alias::new(column_name)).eq(value))
.build_sqlx(PostgresQueryBuilder);
log::debug!("sql: {} values: {:?}", sql, values);
let result = sqlx::query_as_with::<_, T, _>(&sql, values)
.fetch_optional(tx.as_mut())
.await
.change_context_lazy(|| Error::Message("failed to fetch record by column".to_string()))?;
Ok(result)
}
pub async fn dao_update<D, E>(
tx: &mut Transaction<'_, Postgres>,
id: i64,
req: E,
) -> Result<u64, Error>
where
E: HasSeaFields,
D: Dao,
{
let fields = req.not_none_sea_fields();
let (columns, sea_values) = fields.for_sea_insert();
let mut query = Query::update();
query
.table(D::table_ref())
.and_where(Expr::col(CommonIden::Id).eq(id));
// Add values to update
for (column, value) in columns.into_iter().zip(sea_values.into_iter()) {
query.value(column, value);
}
let (sql, values) = query.build_sqlx(PostgresQueryBuilder);
log::debug!("sql: {} values: {:?}", sql, values);
let sqlx_query = sqlx::query_with(&sql, values);
let result = sqlx_query
.execute(tx.as_mut())
.await
.change_context_lazy(|| Error::Message("failed to update record".to_string()))?;
Ok(result.rows_affected())
}

5
src/output/db/mod.rs Normal file
View File

@@ -0,0 +1,5 @@
pub mod base;
pub mod pg;
pub mod repository_impl;
pub mod sync_data_task;
pub mod vuln_information;

28
src/output/db/pg.rs Normal file
View File

@@ -0,0 +1,28 @@
use error_stack::{Result, ResultExt};
use sqlx::{
Pool, Postgres,
postgres::{PgConnectOptions, PgPoolOptions},
};
use crate::{config::settings::Config, errors::Error};
#[derive(Debug, Clone)]
pub struct Pg {
pub pool: Pool<Postgres>,
}
impl Pg {
pub async fn new(config: &Config) -> Result<Self, Error> {
let opts = PgConnectOptions::new()
.host(&config.database.host)
.port(config.database.port)
.username(&config.database.username)
.password(&config.database.password)
.database(&config.database.database_name);
let pool = PgPoolOptions::new()
.connect_with(opts)
.await
.change_context_lazy(|| Error::Message("failed to connect to database".to_string()))?;
Ok(Self { pool })
}
}

View File

@@ -0,0 +1,21 @@
use error_stack::{Result, ResultExt};
use crate::{
domain::{models::sync_data_task::CreateSyncDataTaskRequest, ports::VulnRepository},
errors::Error,
output::db::{pg::Pg, sync_data_task::SyncDataTaskDao},
};
impl VulnRepository for Pg {
async fn create_sync_data_task(&self, req: CreateSyncDataTaskRequest) -> Result<i64, Error> {
let mut tx =
self.pool.begin().await.change_context_lazy(|| {
Error::Message("failed to begin transaction".to_string())
})?;
let sync_data_task_id = SyncDataTaskDao::create(&mut tx, req).await?;
tx.commit()
.await
.change_context_lazy(|| Error::Message("failed to commit transaction".to_string()))?;
Ok(sync_data_task_id)
}
}

View File

@@ -0,0 +1,34 @@
use error_stack::Result;
use sqlx::{Postgres, Transaction};
use crate::{
domain::models::sync_data_task::{CreateSyncDataTaskRequest, SyncDataTask},
errors::Error,
output::db::base::{Dao, dao_create, dao_first, dao_update},
};
pub struct SyncDataTaskDao;
impl Dao for SyncDataTaskDao {
const TABLE: &'static str = "sync_task";
}
impl SyncDataTaskDao {
pub async fn first(tx: &mut Transaction<'_, Postgres>) -> Result<Option<SyncDataTask>, Error> {
let task = dao_first::<Self, _>(tx).await?;
Ok(task)
}
pub async fn create(
tx: &mut Transaction<'_, Postgres>,
req: CreateSyncDataTaskRequest,
) -> Result<i64, Error> {
let task: Option<SyncDataTask> = dao_first::<Self, _>(tx).await?;
if let Some(t) = task {
dao_update::<Self, _>(tx, t.id, req).await?;
return Ok(t.id);
}
let ret = dao_create::<Self, _>(tx, req).await?;
Ok(ret)
}
}

View File

@@ -0,0 +1,86 @@
use error_stack::Result;
use sqlx::{Postgres, Transaction};
use crate::{
domain::models::vuln_information::{CreateVulnInformation, VulnInformation},
errors::Error,
output::db::base::{Dao, dao_create, dao_fetch_by_column, dao_update},
};
const REASON_NEW_CREATED: &str = "漏洞创建";
const REASON_TAG_UPDATED: &str = "标签更新";
const REASON_SEVERITY_UPDATE: &str = "等级更新";
pub struct VulnInformationDao;
impl Dao for VulnInformationDao {
const TABLE: &'static str = "vuln_information";
}
impl VulnInformationDao {
pub async fn create(
tx: &mut Transaction<'_, Postgres>,
req: CreateVulnInformation,
) -> Result<i64, Error> {
let id = dao_create::<Self, _>(tx, req).await?;
Ok(id)
}
pub async fn create_or_update(
tx: &mut Transaction<'_, Postgres>,
mut req: CreateVulnInformation,
) -> Result<(), Error> {
if let Some(mut vuln) =
dao_fetch_by_column::<Self, VulnInformation>(tx, "key", &req.key).await?
{
let mut as_new_vuln = false;
let severity = vuln.severity.to_string();
if severity != req.severity.to_string() {
log::info!(
"{} from {} change severity from {} to {}",
vuln.title.as_str(),
vuln.source.as_str(),
vuln.severity.as_str(),
req.severity.as_str()
);
let reason = format!(
"{}: {} => {}",
REASON_SEVERITY_UPDATE,
vuln.severity.as_str(),
req.severity
);
vuln.reasons.push(reason);
as_new_vuln = true
}
let new_tags = req
.tags
.iter()
.filter(|&x| !vuln.tags.contains(x))
.collect::<Vec<_>>();
if !new_tags.is_empty() {
log::info!(
"{} from {} add new tag {:?}",
vuln.title,
vuln.source,
new_tags
);
let reason = format!("{}: {:?} => {:?}", REASON_TAG_UPDATED, vuln.tags, req.tags);
vuln.reasons.push(reason);
as_new_vuln = true
}
if as_new_vuln {
req.pushed = false;
dao_update::<Self, _>(tx, vuln.id, req).await?;
return Ok(());
} else {
log::warn!("Vuln information already exists: {}", req.key);
return Ok(());
}
} else {
log::info!("New vulnerability created: {}", req.key);
req.reasons.push(REASON_NEW_CREATED.to_string());
dao_create::<Self, _>(tx, req).await?;
return Ok(());
}
}
}

View File

@@ -0,0 +1,4 @@
pub mod db;
pub mod plugins;
pub mod scheduler;
pub mod worker;

123
src/output/plugins/kev.rs Normal file
View File

@@ -0,0 +1,123 @@
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use error_stack::{Result, ResultExt};
use mea::mpsc::UnboundedSender;
use serde::{Deserialize, Serialize};
use crate::{
domain::models::vuln_information::{CreateVulnInformation, Severity},
errors::Error,
output::plugins::{VulnPlugin, register_plugin},
utils::http_client::HttpClient,
};
const KEV_URL: &str =
"https://www.cisa.gov/sites/default/files/feeds/known_exploited_vulnerabilities.json";
const KEV_PAGE_SIZE: usize = 10;
#[derive(Debug, Clone)]
pub struct KevPlugin {
name: String,
display_name: String,
link: String,
http_client: HttpClient,
sender: UnboundedSender<CreateVulnInformation>,
}
impl KevPlugin {
pub fn try_new(sender: UnboundedSender<CreateVulnInformation>) -> Result<KevPlugin, Error> {
let http_client = HttpClient::try_new()?;
let kv = KevPlugin {
name: "KevPlugin".to_string(),
display_name: "Known Exploited Vulnerabilities Catalog".to_string(),
link: "https://www.cisa.gov/known-exploited-vulnerabilities-catalog".to_string(),
http_client,
sender,
};
register_plugin(kv.name.clone(), Box::new(kv.clone()));
Ok(kv)
}
}
#[async_trait]
impl VulnPlugin for KevPlugin {
fn get_name(&self) -> String {
self.display_name.to_string()
}
async fn update(&self, page_limit: i32) -> Result<(), Error> {
let kev_list_resp: KevResp = self
.http_client
.get_json(KEV_URL)
.await?
.json()
.await
.change_context_lazy(|| {
Error::Message(format!("Failed to parse KEV response from {}", KEV_URL))
})?;
let all_count = kev_list_resp.vulnerabilities.len();
let item_limit = if page_limit as usize * KEV_PAGE_SIZE > all_count {
all_count
} else {
page_limit as usize * KEV_PAGE_SIZE
};
let mut vulnerabilities = kev_list_resp.vulnerabilities;
vulnerabilities.sort_by(|a, b| b.date_added.cmp(&a.date_added));
for vuln in vulnerabilities.iter().take(item_limit) {
let mut reference_links = Vec::new();
if !vuln.notes.is_empty() {
reference_links.push(vuln.notes.to_string())
}
let create_vuln_information_req = CreateVulnInformation {
key: format!("{}_KEV", vuln.cve_id),
title: vuln.vulnerability_name.to_string(),
description: vuln.short_description.to_string(),
severity: Severity::Critical.to_string(),
cve: vuln.cve_id.to_string(),
disclosure: vuln.date_added.to_string(),
reference_links,
solutions: vuln.required_action.to_string(),
source: self.link.to_string(),
tags: vec![
vuln.vendor_project.to_string(),
vuln.product.to_string(),
"在野利用".to_string(),
],
github_search: vec![],
reasons: vec![],
pushed: false,
};
self.sender
.send(create_vuln_information_req)
.change_context_lazy(|| {
Error::Message(format!("Failed to send vuln information to channel"))
})?;
}
Ok(())
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all(deserialize = "camelCase"))]
pub struct KevResp {
pub title: String,
pub catalog_version: String,
pub date_released: DateTime<Utc>,
pub count: i32,
pub vulnerabilities: Vec<Vulnerability>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all(deserialize = "camelCase"))]
pub struct Vulnerability {
#[serde(alias = "cveID")]
pub cve_id: String,
pub vendor_project: String,
pub product: String,
pub vulnerability_name: String,
pub date_added: String,
pub short_description: String,
pub required_action: String,
pub due_date: String,
pub known_ransomware_campaign_use: String,
pub notes: String,
}

40
src/output/plugins/mod.rs Normal file
View File

@@ -0,0 +1,40 @@
pub mod kev;
use async_trait::async_trait;
use dashmap::DashMap;
use error_stack::Result;
use lazy_static::lazy_static;
use mea::mpsc::UnboundedSender;
use std::sync::Arc;
use crate::{
domain::models::vuln_information::CreateVulnInformation, errors::Error,
output::plugins::kev::KevPlugin,
};
lazy_static! {
static ref PLUGINS: Arc<DashMap<String, Box<dyn VulnPlugin>>> = Arc::new(DashMap::new());
}
pub fn init(sender: UnboundedSender<CreateVulnInformation>) -> Result<(), Error> {
KevPlugin::try_new(sender)?;
Ok(())
}
#[async_trait]
pub trait VulnPlugin: Send + Sync + 'static {
fn get_name(&self) -> String;
async fn update(&self, page_limit: i32) -> Result<(), Error>;
}
pub fn register_plugin(name: String, plugin: Box<dyn VulnPlugin>) {
PLUGINS.insert(name, plugin);
}
pub fn get_registry() -> Arc<DashMap<String, Box<dyn VulnPlugin>>> {
PLUGINS.clone()
}
pub fn list_plugin_names() -> Vec<String> {
PLUGINS.iter().map(|r| r.key().clone()).collect()
}

107
src/output/scheduler.rs Normal file
View File

@@ -0,0 +1,107 @@
use std::{sync::Arc, time::Instant};
use error_stack::{Result, ResultExt};
use tokio::task::JoinSet;
use tokio_cron_scheduler::JobScheduler;
use uuid::Uuid;
use crate::{
errors::Error,
output::{
db::{pg::Pg, sync_data_task::SyncDataTaskDao},
plugins::{get_registry, list_plugin_names},
},
};
pub struct Scheduler {
sched: JobScheduler,
pg: Arc<Pg>,
}
impl Scheduler {
pub async fn try_new(pg: Pg) -> Result<Self, Error> {
let sched = JobScheduler::new()
.await
.change_context_lazy(|| Error::Message("Failed to create scheduler".to_string()))?;
Ok(Scheduler {
sched,
pg: Arc::new(pg),
})
}
pub async fn init_from_db(self) -> Result<Self, Error> {
let mut tx =
self.pg.pool.begin().await.change_context_lazy(|| {
Error::Message("failed to begin transaction".to_string())
})?;
if let Some(task) = SyncDataTaskDao::first(&mut tx).await? {
log::info!(
"Found scheduled task: name={}, minute={}, status={}",
task.name,
task.interval_minutes,
task.status
);
if task.status {
let cron_syntax = format!("0 */{} * * * *", task.interval_minutes);
log::debug!("Creating job with cron syntax: {}", cron_syntax);
let job = tokio_cron_scheduler::Job::new_async(
cron_syntax.as_str(),
move |uuid, mut _l| {
Box::pin(async move {
execute_job(uuid).await;
})
},
)
.change_context_lazy(|| {
Error::Message(format!(
"Failed to create job with cron syntax: '{}'",
cron_syntax
))
})?;
self.sched.add(job).await.change_context_lazy(|| {
Error::Message("Failed to add job to scheduler".to_string())
})?;
log::info!(
"Successfully added scheduled task '{}' with cron '{}'",
task.name,
cron_syntax
);
} else {
log::info!("Scheduled task '{}' is disabled", task.name);
}
} else {
log::info!("No scheduled tasks found in database");
}
self.sched
.start()
.await
.change_context_lazy(|| Error::Message("Failed to start scheduler".to_string()))?;
tx.commit()
.await
.change_context_lazy(|| Error::Message("Failed to commit transaction".to_string()))?;
Ok(self)
}
}
async fn execute_job(_uuid: Uuid) {
log::info!("Executing scheduled job...");
let start = Instant::now();
let mut job_set = JoinSet::new();
let plugin_names = list_plugin_names();
for plugin_name in plugin_names {
job_set.spawn(async move {
let plugins = get_registry();
log::info!("Updating plugin: {}", plugin_name);
if let Some(plugin) = plugins.get::<str>(&plugin_name) {
if let Err(e) = plugin.update(1).await {
log::error!("Plugin update failed for {}: {}", plugin_name, e)
}
}
});
}
job_set.join_all().await;
log::info!("Plugin syn finished elapsed {:?}", start.elapsed());
}

1
src/output/worker/mod.rs Normal file
View File

@@ -0,0 +1 @@
pub mod worker;

View File

@@ -0,0 +1,47 @@
use std::sync::Arc;
use error_stack::{Result, ResultExt};
use mea::mpsc::UnboundedReceiver;
use crate::{
domain::models::vuln_information::CreateVulnInformation,
errors::Error,
output::db::{pg::Pg, vuln_information::VulnInformationDao},
};
pub struct Worker {
pub receiver: UnboundedReceiver<CreateVulnInformation>,
pub pg: Arc<Pg>,
}
impl Worker {
pub fn new(receiver: UnboundedReceiver<CreateVulnInformation>, pg: Pg) -> Self {
Worker {
receiver,
pg: Arc::new(pg),
}
}
pub async fn run(&mut self) -> Result<(), Error> {
while let Some(req) = self.receiver.recv().await {
if let Err(e) = self.store(req).await {
log::error!("Failed to store vuln information: {:?}", e);
continue;
}
}
Ok(())
}
pub async fn store(&self, req: CreateVulnInformation) -> Result<(), Error> {
let mut tx =
self.pg.pool.begin().await.change_context_lazy(|| {
Error::Message("failed to begin transaction".to_string())
})?;
let _ = VulnInformationDao::create_or_update(&mut tx, req).await?;
tx.commit()
.await
.change_context_lazy(|| Error::Message("failed to commit transaction".to_string()))?;
Ok(())
}
}

59
src/utils/http_client.rs Normal file
View File

@@ -0,0 +1,59 @@
use error_stack::{Result, ResultExt};
use crate::errors::Error;
#[derive(Debug, Clone)]
pub struct HttpClient {
http_client: reqwest::Client,
}
impl HttpClient {
pub fn try_new() -> Result<Self, Error> {
let client = reqwest::Client::builder()
.redirect(reqwest::redirect::Policy::none())
.danger_accept_invalid_certs(true)
.build()
.change_context_lazy(|| Error::Message("Failed to create HTTP client".to_string()))?;
Ok(Self {
http_client: client,
})
}
pub async fn get_json(&self, url: &str) -> Result<reqwest::Response, Error> {
let content = self
.http_client
.get(url)
.send()
.await
.change_context_lazy(|| Error::Message("Failed to send HTTP request".to_string()))?;
Ok(content)
}
pub async fn get_html_content(&self, url: &str) -> Result<String, Error> {
let content = self
.http_client
.get(url)
.send()
.await
.change_context_lazy(|| Error::Message("Failed to send HTTP request".to_string()))?
.text()
.await
.change_context_lazy(|| {
Error::Message("Failed to parse response body as text".to_string())
})?;
Ok(content)
}
pub async fn post_json<Body>(&self, url: &str, body: &Body) -> Result<reqwest::Response, Error>
where
Body: serde::Serialize,
{
let content = self
.http_client
.post(url)
.json(body)
.send()
.await
.change_context_lazy(|| Error::Message("Failed to send HTTP request".to_string()))?;
Ok(content)
}
}

View File

@@ -1,7 +1,8 @@
pub mod http_client;
pub mod runtime;
pub mod version;
pub mod styled;
pub mod telemetry;
pub mod version;
#[track_caller]
pub fn num_cpus() -> std::num::NonZeroUsize {