diff --git a/Cargo.lock b/Cargo.lock index e81dbcb..9e9326a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -109,12 +109,14 @@ dependencies = [ "clap 2.33.0 (registry+https://github.com/rust-lang/crates.io-index)", "env_logger 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-timer 3.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", "pin-project 0.4.7 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.7.3 (registry+https://github.com/rust-lang/crates.io-index)", "serde_json 1.0.45 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio 0.2.10 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-tungstenite 0.10.0 (git+https://github.com/snapview/tokio-tungstenite)", + "toml 0.5.6 (registry+https://github.com/rust-lang/crates.io-index)", "tungstenite 0.9.2 (registry+https://github.com/rust-lang/crates.io-index)", "url 2.1.1 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -268,6 +270,11 @@ name = "futures-task" version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "futures-timer" +version = "3.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "futures-util" version = "0.3.1" @@ -772,7 +779,7 @@ dependencies = [ [[package]] name = "tokio" -version = "0.2.10" +version = "0.2.11" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "bytes 0.5.4 (registry+https://github.com/rust-lang/crates.io-index)", @@ -783,14 +790,15 @@ dependencies = [ "mio 0.6.21 (registry+https://github.com/rust-lang/crates.io-index)", "mio-uds 0.6.7 (registry+https://github.com/rust-lang/crates.io-index)", "pin-project-lite 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-macros 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-macros 0.2.4 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] name = "tokio-macros" -version = "0.2.3" +version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ + "proc-macro2 1.0.8 (registry+https://github.com/rust-lang/crates.io-index)", "quote 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)", "syn 1.0.14 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -803,10 +811,18 @@ dependencies = [ "futures 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", "pin-project 0.4.7 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio 0.2.10 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)", "tungstenite 0.9.2 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "toml" +version = "0.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "serde 1.0.104 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "tungstenite" version = "0.9.2" @@ -968,6 +984,7 @@ dependencies = [ "checksum futures-macro 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "52e7c56c15537adb4f76d0b7a76ad131cb4d2f4f32d3b0bcabcbe1c7c5e87764" "checksum futures-sink 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "171be33efae63c2d59e6dbba34186fe0d6394fb378069a76dfd80fdcffd43c16" "checksum futures-task 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "0bae52d6b29cf440e298856fec3965ee6fa71b06aa7495178615953fd669e5f9" +"checksum futures-timer 3.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ca1dcc4e4637676fcc27629d9805567be878595c3ffab0372ff1eae3b07bffff" "checksum futures-util 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "c0d66274fb76985d3c62c886d1da7ac4c0903a8c9f754e8fe0f35a6a6cc39e76" "checksum generic-array 0.12.3 (registry+https://github.com/rust-lang/crates.io-index)" = "c68f0274ae0e023facc3c97b2e00f076be70e254bc851d972503b328db79b2ec" "checksum getrandom 0.1.14 (registry+https://github.com/rust-lang/crates.io-index)" = "7abc8dd8451921606d809ba32e95b6111925cd2906060d2dcc29c070220503eb" @@ -1029,9 +1046,10 @@ dependencies = [ "checksum termcolor 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "bb6bfa289a4d7c5766392812c0a1f4c1ba45afa1ad47803c11e1f407d846d75f" "checksum textwrap 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)" = "d326610f408c7a4eb6f51c37c330e496b08506c9457c9d34287ecc38809fb060" "checksum thread_local 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "d40c6d1b69745a6ec6fb1ca717914848da4b44ae29d9b3080cbee91d72a69b14" -"checksum tokio 0.2.10 (registry+https://github.com/rust-lang/crates.io-index)" = "c1fc73332507b971a5010664991a441b5ee0de92017f5a0e8b00fd684573045b" -"checksum tokio-macros 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "50a61f268a3db2acee8dcab514efc813dc6dbe8a00e86076f935f94304b59a7a" +"checksum tokio 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)" = "8fdd17989496f49cdc57978c96f0c9fe5e4a58a8bddc6813c449a4624f6a030b" +"checksum tokio-macros 0.2.4 (registry+https://github.com/rust-lang/crates.io-index)" = "f4b1e7ed7d5d4c2af3d999904b0eebe76544897cdbfb2b9684bed2174ab20f7c" "checksum tokio-tungstenite 0.10.0 (git+https://github.com/snapview/tokio-tungstenite)" = "" +"checksum toml 0.5.6 (registry+https://github.com/rust-lang/crates.io-index)" = "ffc92d160b1eef40665be3a05630d003936a3bc7da7421277846c2613e92c71a" "checksum tungstenite 0.9.2 (registry+https://github.com/rust-lang/crates.io-index)" = "8a0c2bd5aeb7dcd2bb32e472c8872759308495e5eccc942e929a513cd8d36110" "checksum typenum 1.11.2 (registry+https://github.com/rust-lang/crates.io-index)" = "6d2783fe2d6b8c1101136184eb41be8b1ad379e4657050b8aaff0c79ee7575f9" "checksum unicode-bidi 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "49f2bd0c6468a8230e1db229cff8029217cf623c767ea5d60bfbd42729ea54d5" diff --git a/Cargo.toml b/Cargo.toml index c654dbf..4bd1d06 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,11 +8,13 @@ edition = "2018" [dependencies] +toml = "0.5" +futures-timer = "3.0.0" clap = "2.33.0" url = "2.1.1" serde_json = "1.0" rand = "0.7.3" -tokio = { version = "0.2", default-features = false, features = ["io-std", "macros"] } +tokio = { version = "0.2.11", default-features = false, features = ["io-std", "macros", "udp"] } futures = "0.3" pin-project = "0.4" tungstenite = "0.9.2" diff --git a/src/main.rs b/src/main.rs index 020cadd..410f130 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,99 +1,164 @@ -extern crate clap; - use clap::{App, Arg}; -use futures::{future, pin_mut, StreamExt}; +use futures::StreamExt; +use futures_timer::Delay; use rand::random; use serde_json::json; use std::i64; -use std::io; use std::io::Write; -use std::net; -use std::net::SocketAddr; -use std::time::Instant; -use tokio::io::{AsyncReadExt, AsyncWriteExt}; + +use std::time::{Duration, Instant}; +use tokio::io::AsyncWriteExt; +use tokio::net::udp::SendHalf; use tokio::net::UdpSocket; use tokio_tungstenite::connect_async; use tungstenite::protocol::Message; use url::Url; -async fn listen(senddata: futures::channel::mpsc::UnboundedSender) { +struct Conf { + filemode: bool, +} + +async fn rssi_timer(udpchanneltx: futures::channel::mpsc::UnboundedSender) { + loop { + Delay::new(Duration::from_secs(3)).await; + udpchanneltx.unbounded_send("S0r\n".to_string()).unwrap(); + } +} + +async fn programm_to_udp( + mut udpchannelrx: futures::channel::mpsc::UnboundedReceiver, + mut udptx: SendHalf, +) { + loop { + let send_data = udpchannelrx.next().await.unwrap(); + let msg = send_data.into_bytes(); + udptx.send(&msg).await.unwrap(); + } +} + +async fn udp_comm(appconf: Conf, senddata: futures::channel::mpsc::UnboundedSender) { + let mut drone_active = false; + // Setup the UDP Socket let mut now = Instant::now(); let mut udpsocket = UdpSocket::bind("0.0.0.0:0").await.unwrap(); - udpsocket.connect("127.0.0.1:9000").await.unwrap(); + udpsocket + .connect("127.0.0.1:9000") // 192.168.0.141:9000" + .await + .expect("could not connect to udp "); - let msg = String::from("ok").into_bytes(); + let msg = String::from("ok\n").into_bytes(); udpsocket.send(&msg).await.unwrap(); + let (mut udprx, udptx) = udpsocket.split(); + + let (udpchanneltx, udpchannelrx) = futures::channel::mpsc::unbounded(); + tokio::spawn(programm_to_udp(udpchannelrx, udptx)); + tokio::spawn(rssi_timer(udpchanneltx.clone())); + loop { - // Handle Sending part - //senddata - /* - let msg = String::from("ok").into_bytes(); - udpsocket - .send_to(&msg, "192.168.0.141:9000") - .expect("cannot send");*/ - // ------------- let mut buf: [u8; 20] = [0; 20]; - let mut result: Vec = Vec::new(); - let len = udpsocket.recv(&mut buf).await.unwrap(); - result = Vec::from(&buf[0..len]); + let len = udprx.recv(&mut buf).await.unwrap(); + let result = Vec::from(&buf[0..len]); let display_result = result.clone(); let result_str = String::from_utf8(display_result).unwrap(); println!("received message: {:?}", result_str); if result_str.contains("S0R1") { - //senddata.unbounded_send("RaceStart".to_string()).unwrap(); - let source_id = "LAPTIME"; let request = json!({"request-type":"SetTextFreetype2Properties", "source":source_id,"message-id": random::().to_string(), "text": now.elapsed().as_millis().to_string() }); now = Instant::now(); senddata .unbounded_send(Message::Text(request.to_string())) .unwrap(); - - write_file("Race active".to_string(), "racestate.txt"); - write_file("0".to_string(), "rx1.txt"); - write_file("0".to_string(), "rx2.txt"); - write_file("0".to_string(), "rx3.txt"); + if appconf.filemode { + write_file("Race active".to_string(), "racestate.txt"); + write_file("0".to_string(), "rx1.txt"); + write_file("0".to_string(), "rx2.txt"); + write_file("0".to_string(), "rx3.txt"); + } } if result_str.contains("S0R0") { - write_file("Race inactive".to_string(), "racestate.txt"); + if appconf.filemode { + write_file("Race inactive".to_string(), "racestate.txt"); + } } + if result_str.contains("S0r") { + let source_id = "CAM"; + //S0r004A\nS1r0044\nS2r0 + let rssi = i64::from_str_radix(&result_str[11..15], 16).unwrap_or(-1); + println!("Got RSSI: {}", rssi); + if rssi < 100 { + println!("drone not connected"); + if drone_active { + // Send filter on + let request = json!({"request-type":"SetSourceFilterVisibility", "sourceName":source_id,"message-id": random::().to_string(), "filterName":"mask" , "filterEnabled": true }); + now = Instant::now(); + senddata + .unbounded_send(Message::Text(request.to_string())) + .unwrap(); + drone_active = false; + } + } else { + println!("Drone connected!##########################"); + if !drone_active { + // Send filter off + let request = json!({"request-type":"SetSourceFilterVisibility", "sourceName":source_id,"message-id": random::().to_string(), "filterName":"mask" , "filterEnabled": false }); + now = Instant::now(); + senddata + .unbounded_send(Message::Text(request.to_string())) + .unwrap(); + drone_active = true; + } + } + } if result_str.contains("S0L") { // zb sS1L0000000DAF let lap_time = i64::from_str_radix(&result_str[5..13], 16).unwrap_or(-1); if lap_time != -1 { let lap_seconds = (lap_time as f64) / (1000 as f64); - write_file(lap_seconds.to_string(), "rx1_laptime.txt"); + if appconf.filemode { + write_file(lap_seconds.to_string(), "rx1_laptime.txt"); + } } - let intval = &result_str[3..5].parse::().unwrap_or(-1); - if *intval != -1 { - write_file((intval + 1).to_string(), "rx1.txt"); + if let Ok(intval) = &result_str[3..5].parse::() { + if appconf.filemode { + write_file((intval + 1).to_string(), "rx1.txt"); + } + let request = json!({"request-type":"SetTextFreetype2Properties", "source":"RX1","message-id": random::().to_string(), "text": now.elapsed().as_millis().to_string() }); + senddata + .unbounded_send(Message::Text(request.to_string())) + .unwrap(); } } if result_str.contains("S1L") { - let lap_time = i64::from_str_radix(&result_str[5..13], 16).unwrap_or(-1); - if lap_time != -1 { + if let Ok(lap_time) = i64::from_str_radix(&result_str[5..13], 16) { let lap_seconds = (lap_time as f64) / (1000 as f64); - write_file(lap_seconds.to_string(), "rx2_laptime.txt"); + if appconf.filemode { + write_file(lap_seconds.to_string(), "rx2_laptime.txt"); + } } - let intval = &result_str[3..5].parse::().unwrap_or(-1); - if *intval != -1 { - write_file((intval + 1).to_string(), "rx2.txt"); + + if let Ok(intval) = &result_str[3..5].parse::() { + if appconf.filemode { + write_file((intval + 1).to_string(), "rx2.txt"); + } } } if result_str.contains("S2L") { - let lap_time = i64::from_str_radix(&result_str[5..13], 16).unwrap_or(-1); - if lap_time != -1 { + if let Ok(lap_time) = i64::from_str_radix(&result_str[5..13], 16) { let lap_seconds = (lap_time as f64) / (1000 as f64); - write_file(lap_seconds.to_string(), "rx3_laptime.txt"); + if appconf.filemode { + write_file(lap_seconds.to_string(), "rx3_laptime.txt"); + } } - let intval = &result_str[3..5].parse::().unwrap_or(-1); - if *intval != -1 { - write_file((intval + 1).to_string(), "rx3.txt"); + + if let Ok(intval) = &result_str[3..5].parse::() { + if appconf.filemode { + write_file((intval + 1).to_string(), "rx3.txt"); + } } } } @@ -102,7 +167,6 @@ async fn listen(senddata: futures::channel::mpsc::UnboundedSender) { fn write_file(text: String, filename: &str) { let mut file = std::fs::File::create(filename).expect("create failed"); file.write_all(text.as_bytes()).expect("write failed"); - //println!("data written to file"); } #[tokio::main] @@ -110,7 +174,7 @@ async fn main() { let matches = App::new("chorusOBSsync") .version("1.0") .author("Lukas B. ") - .about("Get data from the CHorus32 Laptimer Project and use it to control OBS") + .about("Get data from the Chorus32 Laptimer Project and use it to control OBS") .arg( Arg::with_name("config") .short("c") @@ -131,12 +195,23 @@ async fn main() { .multiple(true) .help("Sets the level of verbosity"), ) + .arg( + Arg::with_name("filemode") + .short("F") + .multiple(false) + .help("Enables File Mode"), + ) .get_matches(); // Gets a value for config if supplied by user, or defaults to "default.conf" let config = matches.value_of("config").unwrap_or("default.conf"); println!("Value for config: {}", config); + let appconf = Conf { + filemode: matches.is_present("filemode"), + other_setting: false, + }; + // Calling .unwrap() is safe here because "INPUT" is required (if "INPUT" wasn't // required we could have used an 'if let' to conditionally get the value) if let Some(input) = matches.value_of("INPUT") { @@ -152,28 +227,29 @@ async fn main() { 3 | _ => println!("Don't be crazy"), } - //////////---------------- - write_file("Race inactive".to_string(), "racestate.txt"); - write_file("0".to_string(), "rx1.txt"); - write_file("0".to_string(), "rx2.txt"); - write_file("0".to_string(), "rx3.txt"); + if appconf.filemode { + write_file("Race inactive".to_string(), "racestate.txt"); + write_file("0".to_string(), "rx1.txt"); + write_file("0".to_string(), "rx2.txt"); + write_file("0".to_string(), "rx3.txt"); - write_file("-.-".to_string(), "rx1_laptime.txt"); - write_file("-.-".to_string(), "rx2_laptime.txt"); - write_file("-.-".to_string(), "rx3_laptime.txt"); + write_file("-.-".to_string(), "rx1_laptime.txt"); + write_file("-.-".to_string(), "rx2_laptime.txt"); + write_file("-.-".to_string(), "rx3_laptime.txt"); + } // Setup websocket for OBS let (ws_stream, _) = connect_async(Url::parse("ws://localhost:4444/").unwrap()) .await .expect("Could not connect to OBS"); - println!("WebSocket handshake has been successfully completed"); - let (write, read) = ws_stream.split(); + println!("Connected to OBS"); + + let (ws_write, ws_read) = ws_stream.split(); let (obstx, obsrx) = futures::channel::mpsc::unbounded(); - //let (udpsockettx, udpsocketrx) = futures::channel::mpsc::unbounded(); let ws_to_stdout = { - read.for_each(|message| { + ws_read.for_each(|message| { async { let data = message.unwrap().into_data(); println!("Messg"); @@ -181,26 +257,11 @@ async fn main() { } }) }; + tokio::spawn(ws_to_stdout); - let programm_to_ws = obsrx.map(Ok).forward(write); + let programm_to_ws = obsrx.map(Ok).forward(ws_write); + tokio::spawn(programm_to_ws); - pin_mut!(programm_to_ws, ws_to_stdout); - - tokio::spawn(listen(obstx)); - - println!("Will wait now"); - future::select(programm_to_ws, ws_to_stdout).await; - - /* - loop { - if now.elapsed().as_secs() >= 5 { - let request = json!({"request-type":"SetTextFreetype2Properties", "source":source_id,"message-id": random::().to_string(), "text": now.elapsed().as_millis().to_string() }); - obstx - .unbounded_send(Message::Text(request.to_string())) - .unwrap(); - println!("{}", now.elapsed().as_secs()); - now = Instant::now(); - } - } - */ + println!("Programm initialized!"); + udp_comm(appconf, obstx).await; }