政务网站建设目标和核心功能,wordpress yuti,北京seo设计公司,运营网站是什么意思bws-rs#xff1a;Rust 编写的 S3 协议网关框架#xff0c;支持灵活后端接入
bws-rs介绍
bws-rs 是一个用 Rust 编写的轻量级 S3 协议服务端网关框架#xff0c;旨在帮助开发者快速构建兼容 AWS S3 协议 的对象存储服务。该框架支持 S3 V4 签名校验#xff0c;集成 Axum 作…bws-rsRust 编写的 S3 协议网关框架支持灵活后端接入
bws-rs介绍
bws-rs 是一个用 Rust 编写的轻量级 S3 协议服务端网关框架旨在帮助开发者快速构建兼容 AWS S3 协议 的对象存储服务。该框架支持 S3 V4 签名校验集成 Axum 作为 Web 框架所有协议校验逻辑通过实现对应的 trait 并注册为 axum::Extension 实现非侵入式扩展具有良好的可维护性与可插拔性。
bws-rs 可作为前端网关挂载在你已有的文件系统、对象存储系统甚至缓存引擎之前为其提供标准化的 S3 协议兼容层支持与 AWS CLI、MinIO Client 等主流 S3 SDK 的交互。
✅ 已支持的功能S3 协议支持列表PutObject上传对象GetObject获取对象HeadObject获取对象元信息DeleteObject删除对象CreateBucket创建桶HeadBucket桶存在性检查ListBucket列举所有桶DeleteBucket删除桶GetBucketLocation获取桶区域MultipartUpload分片上传Range Get部分下载Get/Put Object ACL访问控制列表Get/Put Object Metadata对象元数据Put Object Tagging对象标签✅ MinIO SDK 兼容性验证
使用 MinIO Go SDK 进行功能验证支持以下操作MakeBucketDeleteBucketListBucketListObjectPutObjectDeleteObjectBucketExists在项目中使用bws-rs: cargo add bws-rs
实现bws_rs::service::s3下对应的trait以支持对应的s3 功能
HeadHandler: 对应 s3 head object ,head bucketGetObjectHandler: 对应s3 GetObjectPutObjectHandler: 对应s3 PutObjectDeleteObjectHandler: 对应s3 DeleteObjectListObjectHandler: 对应s3 ListObjectCreateBucketHandler: 对应的s3 create bucketListBucketHandler: 对应s3 list bucketDeleteBucketHandler: 对应s3 delete bucketGetBucketLocationHandler: 对应s3 get bucket locationMultiUploadObjectHandler: 对应s3 MultiUpload系列操作
aceeskey 仓库需要实现bws_rs::authorization::AccesskeyStore 来提供对应accesskey的secretkey
使用示范use std::sync::Arc;use tokio::io::AsyncReadExt;#[derive(Default)]struct Target {}use crate::service::s3::*;impl CreateBucketHandler for Target {fn handlea(a self,_opt: a CreateBucketOption,_bucket: a str,) - std::pin::PinBoxdyn a Send std::future::FutureOutput Result(), String{Box::pin(async move {log::info!(create bucket {_bucket});Ok(())})}}impl ListBucketHandler for Target {fn handlea(a self,_opt: a ListBucketsOption,) - std::pin::PinBoxdyn a Send std::future::FutureOutput ResultVecBucket, String, {Box::pin(async move {let datetime chrono::Utc::now().to_rfc3339();Ok(vec![Bucket {name: test1.to_string(),creation_date: datetime,bucket_region: us-east-1.to_string(),}])})}}impl HeadHandler for Target {fn lookupa(self,_bucket: str,_object: str,) - std::pin::PinBoxdyn a Send Sync std::future::FutureOutput ResultOptionHeadObjectResult, Error,, {Box::pin(async move {let mut ret: HeadObjectResult Default::default();ret.checksum_sha256 Some(2cf24dba5fb0a30e26e83b2ac5b9e29e1b161e5c1fa7425e73043362938b9824.to_string(),);ret.content_length Some(5);ret.etag Some(5d41402abc4b2a76b9719d911017c592.to_string());ret.last_modified Some(chrono::Utc::now().format(%a, %d %b %Y %H:%M:%S GMT).to_string(),);Ok(Some(ret))})}}impl PutObjectHandler for Target {fn handlea(a self,opt: PutObjectOption,bucket: a str,object: a str,body: a mut (dyn tokio::io::AsyncRead Unpin Send),) - std::pin::PinBoxdyn a Send std::future::FutureOutput Result(), String{Box::pin(async move {log::info!(put bucket {bucket} object {object});let mut buff vec![];match body.read_to_end(mut buff).await {Ok(size) {log::info!(get {}, unsafe {std::str::from_utf8_unchecked(buff[..size])});}Err(err) {log::error!(read error {err});}}Ok(())})}}impl DeleteBucketHandler for Target {fn handlea(a self,_opt: a DeleteBucketOption,_bucket: a str,) - std::pin::PinBoxdyn a Send std::future::FutureOutput Result(), String{Box::pin(async move {log::info!(delete bucket {_bucket});Ok(())})}}impl DeleteObjectHandler for Target {fn handlea(a self,_opt: a DeleteObjectOption,_object: a str,) - std::pin::PinBoxdyn a Send std::future::FutureOutput Result(), String{Box::pin(async move {log::info!(delete object {_object});Ok(())})}}impl crate::authorization::AccesskeyStore for Target {fn geta(a self,_accesskey: a str,) - std::pin::PinBoxdyn a Send Sync std::future::FutureOutput ResultOptionString, String,, {Box::pin(async move { Ok(Some(format!({_accesskey}12345))) })}}impl crate::service::s3::GetObjectHandler for Target {fn handlea(a self,bucket: str,object: str,opt: crate::service::s3::GetObjectOption,mut out: tokio::sync::Mutexstd::pin::Pinstd::boxed::Box(dyn crate::utils::io::PollWrite Send Unpin a),,,) - std::pin::PinBoxdyn a Send std::future::FutureOutput Result(), String{Box::pin(async move {let mut l out.lock().await;let _ l.poll_write(bhello).await.map_err(|err| {log::error!(write error {err});});Ok(())})}}impl crate::service::s3::GetBucketLocationHandler for Target {}impl MultiUploadObjectHandler for Target {fn handle_create_sessiona(a self,bucket: a str,key: a str,) - std::pin::PinBoxdyn a Send std::future::FutureOutput ResultString, (){Box::pin(async move { Ok(ffffff.to_string()) })}fn handle_upload_parta(a self,bucket: a str,key: a str,upload_id: a str,part_number: u32,body: a mut (dyn tokio::io::AsyncRead Unpin Send),) - std::pin::PinBoxdyn a Send std::future::FutureOutput ResultString, (){Box::pin(async move {let mut buff Vec::new();let size body.read_to_end(mut buff).await.map_err(|err| log::error!(read body error {err}))?;println!(upload part upload_id{upload_id} part_number{part_number} bucket{bucket} key{key}\n{},unsafe { std::str::from_boxed_utf8_unchecked((buff[..size]).into()) });Ok(5d41402abc4b2a76b9719d911017c592.to_string())})}fn handle_completea(a self,bucket: a str,key: a str,upload_id: a str,//(etag,part number)data: a [(a str, u32)],opts: MultiUploadObjectCompleteOption,) - std::pin::PinBoxdyn a Send std::future::FutureOutput ResultString, (){Box::pin(async move { Ok(69a329523ce1ec88bf63061863d9cb14.to_string()) })}fn handle_aborta(a self,bucket: a str,key: a str,upload_id: a str,) - std::pin::PinBoxdyn a Send std::future::FutureOutput Result(), (){todo!()}}#[tokio::test]async fn test_server() - Result(), Boxdyn std::error::Error {let _ tokio::fs::create_dir_all(.sys_bws).await;env_logger::builder().filter_level(log::LevelFilter::Info).init();let target Arc::new(Target::default());let r axum::Router::new().layer(axum::middleware::from_fn(super::handle_fn)).layer(axum::middleware::from_fn(super::handle_authorization_middleware,)).layer(axum::Extension(target.clone() as Arcdyn PutObjectHandler Send Sync)).layer(axum::Extension(target.clone() as Arcdyn HeadHandler Send Sync)).layer(axum::Extension(target.clone() as Arcdyn ListBucketHandler Send Sync)).layer(axum::Extension(target.clone() as Arcdyn CreateBucketHandler Send Sync)).layer(axum::Extension(target.clone() as Arcdyn DeleteBucketHandler Send Sync)).layer(axum::Extension(target.clone() as Arcdyn DeleteObjectHandler Send Sync)).layer(axum::Extension(target.clone() as Arcdyn crate::authorization::AccesskeyStore Send Sync)).layer(axum::Extension(target.clone() as Arcdyn GetObjectHandler Send Sync)).layer(axum::Extension(target.clone() as Arcdyn GetBucketLocationHandler Send Sync)).layer(axum::Extension(target.clone() as Arcdyn MultiUploadObjectHandler Send Sync));let l tokio::net::TcpListener::bind(0.0.0.0:9900).await?;axum::serve(l, r).await?;Ok(())}golang 客户端
package testsimport (contextioostestinggithub.com/minio/minio-go/v7github.com/minio/minio-go/v7/pkg/credentials
)func TestCreateBucket(t *testing.T) {creds, err : minio.New(127.0.0.1:9900, minio.Options{Secure: false, Creds: credentials.NewStaticV4(root, root12345, ),Region: us-east-1,})if err ! nil {t.Fatal(err)}_, err creds.BucketExists(context.Background(), test)if err ! nil {t.Fatal(err)}err creds.MakeBucket(context.Background(), itest, minio.MakeBucketOptions{})if err ! nil {t.Fatal(err)}bkts, err : creds.ListBuckets(context.Background())if err ! nil {t.Fatal(err)}t.Log(bkts)err creds.RemoveBucket(context.Background(), test)if err ! nil {t.Fatal(err)}err creds.RemoveObject(context.Background(), test, test, minio.RemoveObjectOptions{})if err ! nil {t.Fatal(err)}err os.WriteFile(test.txt, []byte(hello), 0o644)if err ! nil {t.Fatal(err)}fd, err : os.OpenFile(test.txt, os.O_RDONLY, 0)if err ! nil {t.Fatal(err)}defer fd.Close()_, err creds.PutObject(context.Background(), test, hello/world, fd, 5, minio.PutObjectOptions{})if err ! nil {t.Fatal(err)}resp, err : creds.GetObject(context.Background(), test, test, minio.GetObjectOptions{})if err ! nil {t.Fatal(err)}content, err : io.ReadAll(resp)if err ! nil {t.Fatal(err)}if string(content) ! hello {t.Fatal(expect hello got [ string(content) ])}
}
s3 multipart 验证
package testsimport (contextcrypto/tlsfmtlognet/httpostestinggithub.com/aws/aws-sdk-go-v2/awsgithub.com/aws/aws-sdk-go-v2/configgithub.com/aws/aws-sdk-go-v2/credentialsgithub.com/aws/aws-sdk-go-v2/service/s3github.com/aws/aws-sdk-go-v2/service/s3/types
)func TestS3Sdk(t *testing.T) {var (host 127.0.0.1port 9900accesskey rootsecretkey root12345region us-east-1)customResolver : aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (aws.Endpoint, error) {if service s3.ServiceID {return aws.Endpoint{URL: fmt.Sprintf(http://%s:%d, host, port),SigningRegion: us-east-1,}, nil}return aws.Endpoint{}, aws.EndpointNotFoundError{}})// 加载 AWS 配置指定自定义端点解析器cfg, err : config.LoadDefaultConfig(context.TODO(),config.WithEndpointResolverWithOptions(customResolver),config.WithHTTPClient(http.Client{Transport: http.Transport{TLSClientConfig: tls.Config{InsecureSkipVerify: true},},}),config.WithRegion(region),config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(accesskey, secretkey, )),)if err ! nil {log.Fatalf(无法加载 AWS 配置: %v, err)}// 创建 S3 客户端cli : s3.NewFromConfig(cfg, func(o *s3.Options) {o.UsePathStyle true})var (bucket itestkey test.txt)fd, err : os.OpenFile(./test.txt, os.O_RDONLY, 0)if err ! nil {t.Fatal(err)}defer fd.Close()out, err : cli.CreateMultipartUpload(context.Background(), s3.CreateMultipartUploadInput{Bucket: bucket,Key: key,})if err ! nil {t.Fatal(err)}var upNo int32 1resp, err : cli.UploadPart(context.Background(), s3.UploadPartInput{Bucket: bucket, Key: key, PartNumber: upNo, UploadId: out.UploadId, Body: fd,})if err ! nil {t.Fatal(err)}_, err cli.CompleteMultipartUpload(context.Background(), s3.CompleteMultipartUploadInput{Bucket: bucket, Key: key, UploadId: out.UploadId, MultipartUpload: types.CompletedMultipartUpload{Parts: []types.CompletedPart{{ETag: resp.ETag, PartNumber: upNo,},},},})if err ! nil {t.Fatal(err)}
}