diff --git a/src/bin/h2test.rs b/src/bin/h2test.rs index 66f6c086..60141414 100644 --- a/src/bin/h2test.rs +++ b/src/bin/h2test.rs @@ -1,47 +1,9 @@ use failure::*; use futures::*; -use serde_json::{json, Value}; +use serde_json::json; use proxmox_backup::client::*; -fn get(mut h2: h2::client::SendRequest, path: &str) -> impl Future { - - let request = http::Request::builder() - .method("GET") - .uri(format!("https://localhost/{}", path)) - .header(hyper::header::CONTENT_TYPE, "application/x-www-form-urlencoded") - .body(()).unwrap(); - - println!("SEND GET {} REQUEST", path); - let (response, _stream) = h2.send_request(request, true).unwrap(); - - response - .map_err(Error::from) - .and_then(|response| { - let (head, mut body) = response.into_parts(); - - println!("Received response: {:?}", head); - - // The `release_capacity` handle allows the caller to manage - // flow control. - // - // Whenever data is received, the caller is responsible for - // releasing capacity back to the server once it has freed - // the data from memory. - let mut release_capacity = body.release_capacity().clone(); - - body - .concat2() - .map_err(Error::from) - .and_then(move |data| { - println!("RX: {:?}", data); - - // fixme: - Ok(Value::Null) - }) - }).map_err(Error::from) -} - fn run() -> Result<(), Error> { let host = "localhost"; @@ -51,13 +13,13 @@ fn run() -> Result<(), Error> { let mut client = HttpClient::new(host, username)?; let param = json!({"backup-type": "host", "backup-id": "test" }); - let h2client = client.h2upgrade("/api2/json/admin/datastore/store2/backup", Some(param)); + let upgrade = client.h2upgrade("/api2/json/admin/datastore/store2/backup", Some(param)); - let res = h2client.and_then(|mut h2| { + let res = upgrade.and_then(|send_request| { println!("start http2"); - - let result1 = get(h2.clone(), "test1"); - let result2 = get(h2.clone(), "test2"); + let h2 = H2Client::new(send_request); + let result1 = h2.get("test1", None); + let result2 = h2.get("test2", None); result1.join(result2) }).wait()?; diff --git a/src/client/http_client.rs b/src/client/http_client.rs index cc9ee742..cefebe58 100644 --- a/src/client/http_client.rs +++ b/src/client/http_client.rs @@ -6,7 +6,7 @@ use hyper::client::Client; use xdg::BaseDirectories; use chrono::Utc; -use http::Request; +use http::{Request, Response}; use http::header::HeaderValue; use futures::Future; @@ -385,3 +385,109 @@ impl HttpClient { Ok(request) } } + +pub struct H2Client { + h2: h2::client::SendRequest, +} + +impl H2Client { + + pub fn new(h2: h2::client::SendRequest) -> Self { + Self { h2 } + } + + pub fn get(&self, path: &str, param: Option) -> impl Future { + let req = Self::request_builder("localhost", "GET", path, param).unwrap(); + self.request(req) + } + + pub fn post(&self, path: &str, param: Option) -> impl Future { + let req = Self::request_builder("localhost", "POST", path, param).unwrap(); + self.request(req) + } + + fn request( + &self, + request: Request<()>, + ) -> impl Future { + + self.h2.clone().ready().map_err(Error::from). + and_then(move |mut send_request| { + // fixme: what about stream/upload? + let (response, _stream) = send_request.send_request(request, true).unwrap(); + response + .map_err(Error::from) + .and_then(Self::h2api_response) + }) + } + + fn h2api_response(response: Response) -> impl Future { + + let status = response.status(); + + let (_head, mut body) = response.into_parts(); + + // The `release_capacity` handle allows the caller to manage + // flow control. + // + // Whenever data is received, the caller is responsible for + // releasing capacity back to the server once it has freed + // the data from memory. + let mut release_capacity = body.release_capacity().clone(); + + body + .map(move |chunk| { + println!("RX: {} bytes", chunk.len()); + // Let the server send more data. + let _ = release_capacity.release_capacity(chunk.len()); + chunk + }) + .concat2() + .map_err(Error::from) + .and_then(move |data| { + println!("RX: {:?}", data); + let text = String::from_utf8(data.to_vec()).unwrap(); + if status.is_success() { + if text.len() > 0 { + let mut value: Value = serde_json::from_str(&text)?; + if let Some(map) = value.as_object_mut() { + if let Some(data) = map.remove("data") { + return Ok(data); + } + } + bail!("got result without data property"); + } else { + Ok(Value::Null) + } + } else { + bail!("HTTP Error {}: {}", status, text); + } + }) + } + + pub fn request_builder(server: &str, method: &str, path: &str, data: Option) -> Result, Error> { + let path = path.trim_matches('/'); + let url: Uri = format!("https://{}:8007/{}", server, path).parse()?; + + if let Some(data) = data { + let query = tools::json_object_to_query(data)?; + let url: Uri = format!("https://{}:8007/{}?{}", server, path, query).parse()?; + let request = Request::builder() + .method(method) + .uri(url) + .header("User-Agent", "proxmox-backup-client/1.0") + .header(hyper::header::CONTENT_TYPE, "application/x-www-form-urlencoded") + .body(())?; + return Ok(request); + } + + let request = Request::builder() + .method(method) + .uri(url) + .header("User-Agent", "proxmox-backup-client/1.0") + .header(hyper::header::CONTENT_TYPE, "application/x-www-form-urlencoded") + .body(())?; + + Ok(request) + } +}