Kould commited on
Commit
db8cae3
·
1 Parent(s): d0db329

feat: impl websocket upload file for doc_info (#20)

Browse files
Cargo.toml CHANGED
@@ -13,9 +13,15 @@ actix-multipart = "0.4"
13
  actix-session = { version = "0.5" }
14
  actix-identity = { version = "0.4" }
15
  actix-web-httpauth = { version = "0.6" }
 
 
 
 
 
 
16
  thiserror = "1.0"
17
  postgres = "0.19.7"
18
- sea-orm = {version = "0.12.9", features = ["sqlx-postgres", "runtime-tokio-native-tls", "macros"]}
19
  serde = { version = "1", features = ["derive"] }
20
  serde_json = "1.0"
21
  tracing-subscriber = "0.3.18"
@@ -27,6 +33,7 @@ minio = "0.1.0"
27
  futures-util = "0.3.29"
28
  actix-multipart-extract = "0.1.5"
29
  regex = "1.10.2"
 
30
 
31
  [[bin]]
32
  name = "doc_gpt"
 
13
  actix-session = { version = "0.5" }
14
  actix-identity = { version = "0.4" }
15
  actix-web-httpauth = { version = "0.6" }
16
+ actix-ws = "0.2.5"
17
+ uuid = { version = "1.6.1", features = [
18
+ "v4",
19
+ "fast-rng",
20
+ "macro-diagnostics",
21
+ ] }
22
  thiserror = "1.0"
23
  postgres = "0.19.7"
24
+ sea-orm = { version = "0.12.9", features = ["sqlx-postgres", "runtime-tokio-native-tls", "macros"] }
25
  serde = { version = "1", features = ["derive"] }
26
  serde_json = "1.0"
27
  tracing-subscriber = "0.3.18"
 
33
  futures-util = "0.3.29"
34
  actix-multipart-extract = "0.1.5"
35
  regex = "1.10.2"
36
+ tokio = { version = "1.35.1", features = ["rt", "time", "macros"] }
37
 
38
  [[bin]]
39
  name = "doc_gpt"
migration/src/m20220101_000001_create_table.rs CHANGED
@@ -1,6 +1,7 @@
1
- use sea_orm_migration::{ prelude::*, sea_orm::Statement };
2
  use chrono::{ FixedOffset, Utc };
3
 
 
4
  fn now() -> chrono::DateTime<FixedOffset> {
5
  Utc::now().with_timezone(&FixedOffset::east_opt(3600 * 8).unwrap())
6
  }
 
1
+ use sea_orm_migration::prelude::*;
2
  use chrono::{ FixedOffset, Utc };
3
 
4
+ #[allow(dead_code)]
5
  fn now() -> chrono::DateTime<FixedOffset> {
6
  Utc::now().with_timezone(&FixedOffset::east_opt(3600 * 8).unwrap())
7
  }
src/api/doc_info.rs CHANGED
@@ -107,8 +107,12 @@ async fn upload(
107
  payload: Multipart<UploadForm>,
108
  data: web::Data<AppState>
109
  ) -> Result<HttpResponse, AppError> {
110
- let uid = payload.uid;
111
- let file_name = payload.file_field.name.as_str();
 
 
 
 
112
  async fn add_number_to_filename(
113
  file_name: &str,
114
  conn: &DbConn,
@@ -138,9 +142,9 @@ async fn upload(
138
  }
139
  new_file_name
140
  }
141
- let fnm = add_number_to_filename(file_name, &data.conn, uid, payload.did).await;
142
 
143
- let bucket_name = format!("{}-upload", payload.uid);
144
  let s3_client: &minio::s3::client::Client = &data.s3_client;
145
  let buckets_exists = s3_client
146
  .bucket_exists(&BucketExistsArgs::new(&bucket_name).unwrap()).await
@@ -152,7 +156,7 @@ async fn upload(
152
  print!("Existing bucket: {}", bucket_name.clone());
153
  }
154
 
155
- let location = format!("/{}/{}", payload.did, fnm)
156
  .as_bytes()
157
  .to_vec()
158
  .iter()
@@ -164,8 +168,8 @@ async fn upload(
164
  &mut PutObjectArgs::new(
165
  &bucket_name,
166
  &location,
167
- &mut BufReader::new(payload.file_field.bytes.as_slice()),
168
- Some(payload.file_field.bytes.len()),
169
  None
170
  )?
171
  ).await?;
@@ -174,7 +178,7 @@ async fn upload(
174
  did: Default::default(),
175
  uid: uid,
176
  doc_name: fnm.clone(),
177
- size: payload.file_field.bytes.len() as i64,
178
  location,
179
  r#type: file_type(&fnm),
180
  thumbnail_base64: Default::default(),
@@ -183,9 +187,9 @@ async fn upload(
183
  is_deleted: Default::default(),
184
  }).await?;
185
 
186
- let _ = Mutation::place_doc(&data.conn, payload.did, doc.did.unwrap()).await?;
187
 
188
- Ok(HttpResponse::Ok().body("File uploaded successfully"))
189
  }
190
 
191
  #[derive(Deserialize, Debug)]
 
107
  payload: Multipart<UploadForm>,
108
  data: web::Data<AppState>
109
  ) -> Result<HttpResponse, AppError> {
110
+
111
+
112
+ Ok(HttpResponse::Ok().body("File uploaded successfully"))
113
+ }
114
+
115
+ pub(crate) async fn _upload_file(uid: i64, did: i64, file_name: &str, bytes: &[u8], data: &web::Data<AppState>) -> Result<(), AppError> {
116
  async fn add_number_to_filename(
117
  file_name: &str,
118
  conn: &DbConn,
 
142
  }
143
  new_file_name
144
  }
145
+ let fnm = add_number_to_filename(file_name, &data.conn, uid, did).await;
146
 
147
+ let bucket_name = format!("{}-upload", uid);
148
  let s3_client: &minio::s3::client::Client = &data.s3_client;
149
  let buckets_exists = s3_client
150
  .bucket_exists(&BucketExistsArgs::new(&bucket_name).unwrap()).await
 
156
  print!("Existing bucket: {}", bucket_name.clone());
157
  }
158
 
159
+ let location = format!("/{}/{}", did, fnm)
160
  .as_bytes()
161
  .to_vec()
162
  .iter()
 
168
  &mut PutObjectArgs::new(
169
  &bucket_name,
170
  &location,
171
+ &mut BufReader::new(bytes),
172
+ Some(bytes.len()),
173
  None
174
  )?
175
  ).await?;
 
178
  did: Default::default(),
179
  uid: uid,
180
  doc_name: fnm.clone(),
181
+ size: bytes.len() as i64,
182
  location,
183
  r#type: file_type(&fnm),
184
  thumbnail_base64: Default::default(),
 
187
  is_deleted: Default::default(),
188
  }).await?;
189
 
190
+ let _ = Mutation::place_doc(&data.conn, did, doc.did.unwrap()).await?;
191
 
192
+ Ok(())
193
  }
194
 
195
  #[derive(Deserialize, Debug)]
src/main.rs CHANGED
@@ -2,6 +2,7 @@ mod api;
2
  mod entity;
3
  mod service;
4
  mod errors;
 
5
 
6
  use std::env;
7
  use actix_files::Files;
@@ -19,6 +20,7 @@ use minio::s3::http::BaseUrl;
19
  use sea_orm::{ Database, DatabaseConnection };
20
  use migration::{ Migrator, MigratorTrait };
21
  use crate::errors::{ AppError, UserError };
 
22
 
23
  #[derive(Debug, Clone)]
24
  struct AppState {
@@ -138,4 +140,6 @@ fn init(cfg: &mut web::ServiceConfig) {
138
  cfg.service(api::user_info::login);
139
  cfg.service(api::user_info::register);
140
  cfg.service(api::user_info::setting);
 
 
141
  }
 
2
  mod entity;
3
  mod service;
4
  mod errors;
5
+ mod web_socket;
6
 
7
  use std::env;
8
  use actix_files::Files;
 
20
  use sea_orm::{ Database, DatabaseConnection };
21
  use migration::{ Migrator, MigratorTrait };
22
  use crate::errors::{ AppError, UserError };
23
+ use crate::web_socket::doc_info::upload_file_ws;
24
 
25
  #[derive(Debug, Clone)]
26
  struct AppState {
 
140
  cfg.service(api::user_info::login);
141
  cfg.service(api::user_info::register);
142
  cfg.service(api::user_info::setting);
143
+
144
+ cfg.service(web::resource("/ws-upload-doc").route(web::get().to(upload_file_ws)));
145
  }
src/web_socket/doc_info.rs ADDED
@@ -0,0 +1,97 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ use std::io::{Cursor, Write};
2
+ use std::time::{Duration, Instant};
3
+ use actix_rt::time::interval;
4
+ use actix_web::{HttpRequest, HttpResponse, rt, web};
5
+ use actix_web::web::Buf;
6
+ use actix_ws::Message;
7
+ use futures_util::{future, StreamExt};
8
+ use futures_util::future::Either;
9
+ use uuid::Uuid;
10
+ use crate::api::doc_info::_upload_file;
11
+ use crate::AppState;
12
+ use crate::errors::AppError;
13
+
14
+ const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
15
+
16
+ /// How long before lack of client response causes a timeout.
17
+ const CLIENT_TIMEOUT: Duration = Duration::from_secs(10);
18
+
19
+ pub async fn upload_file_ws(req: HttpRequest, stream: web::Payload, data: web::Data<AppState>) -> Result<HttpResponse, AppError> {
20
+ let (res, session, msg_stream) = actix_ws::handle(&req, stream)?;
21
+
22
+ // spawn websocket handler (and don't await it) so that the response is returned immediately
23
+ rt::spawn(upload_file_handler(data, session, msg_stream));
24
+
25
+ Ok(res)
26
+ }
27
+
28
+ async fn upload_file_handler(
29
+ data: web::Data<AppState>,
30
+ mut session: actix_ws::Session,
31
+ mut msg_stream: actix_ws::MessageStream,
32
+ ) {
33
+ let mut bytes = Cursor::new(vec![]);
34
+ let mut last_heartbeat = Instant::now();
35
+ let mut interval = interval(HEARTBEAT_INTERVAL);
36
+
37
+ let reason = loop {
38
+ let tick = interval.tick();
39
+ tokio::pin!(tick);
40
+
41
+ match future::select(msg_stream.next(), tick).await {
42
+ // received message from WebSocket client
43
+ Either::Left((Some(Ok(msg)), _)) => {
44
+ match msg {
45
+ Message::Text(text) => {
46
+ session.text(text).await.unwrap();
47
+ }
48
+
49
+ Message::Binary(bin) => {
50
+ let mut pos = 0; // notice the name of the file that will be written
51
+ while pos < bin.len() {
52
+ let bytes_written = bytes.write(&bin[pos..]).unwrap();
53
+ pos += bytes_written
54
+ };
55
+ session.binary(bin).await.unwrap();
56
+ }
57
+
58
+ Message::Close(reason) => {
59
+ break reason;
60
+ }
61
+
62
+ Message::Ping(bytes) => {
63
+ last_heartbeat = Instant::now();
64
+ let _ = session.pong(&bytes).await;
65
+ }
66
+
67
+ Message::Pong(_) => {
68
+ last_heartbeat = Instant::now();
69
+ }
70
+
71
+ Message::Continuation(_) | Message::Nop => {}
72
+ };
73
+ }
74
+ Either::Left((Some(Err(_)), _)) => {
75
+ break None;
76
+ }
77
+ Either::Left((None, _)) => break None,
78
+ Either::Right((_inst, _)) => {
79
+ if Instant::now().duration_since(last_heartbeat) > CLIENT_TIMEOUT {
80
+ break None;
81
+ }
82
+
83
+ let _ = session.ping(b"").await;
84
+ }
85
+ }
86
+ };
87
+ let _ = session.close(reason).await;
88
+
89
+ if !bytes.has_remaining() {
90
+ return;
91
+ }
92
+
93
+ let uid = bytes.get_i64();
94
+ let did = bytes.get_i64();
95
+
96
+ _upload_file(uid, did, &Uuid::new_v4().to_string(), &bytes.into_inner(), &data).await.unwrap();
97
+ }
src/web_socket/mod.rs ADDED
@@ -0,0 +1 @@
 
 
1
+ pub mod doc_info;