From 7b41398937dc1b0193e29264673dd9796423d613 Mon Sep 17 00:00:00 2001 From: Lukas Bachschwell Date: Tue, 4 Feb 2020 23:00:20 +0100 Subject: [PATCH] More configurable params, fix some panics --- Config.toml | 10 +++- Readme.md | 10 +++- src/main.rs | 136 ++++++++++++++++++++++++++++++++++++---------------- 3 files changed, 111 insertions(+), 45 deletions(-) diff --git a/Config.toml b/Config.toml index f6a3308..98aa2d0 100644 --- a/Config.toml +++ b/Config.toml @@ -1,9 +1,15 @@ filemode = false -rssi_threshold = 100 +rssi_threshold = 80 +enable_websocket = true +obs_websocket_url = "ws://localhost:4444/" +chorus_udp_url = "192.168.0.141:9000" +# chorus_udp_url = "127.0.0.1:9000" # local test with nc + +obs_mask_filter_name = "mask" video_sources = [ "RX1", "RX2", "RX3" ] lap_sources = [ "RX1Lap", "RX2Lap", "RX3Lap" ] laptime_sources = [ "RX1LapTime", "RX2LapTime", "RX3LapTime" ] race_status_source = "RaceStatus" -race_time_source = "RaceTime" +#race_time_source = "RaceTime" diff --git a/Readme.md b/Readme.md index 770160e..e66d4cd 100644 --- a/Readme.md +++ b/Readme.md @@ -1 +1,9 @@ -# ChorusOBSsync \ No newline at end of file +# ChorusOBSsync + + +- [ ] Finish serde config parsing (more optionals, no filemode flag in clap) +- [x] No OBS Mode +- [x] Less errors! +- [x] OBS Error message decodinng +- [ ] Initial states for race and beginning +- [ ] Could add some disconection handeling of the chorus device in rssi request (if it fails 5 times churus has disconnnected and conenction should be reinitialized ?!) \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index 9cf3756..1637404 100644 --- a/src/main.rs +++ b/src/main.rs @@ -12,7 +12,7 @@ use std::io::prelude::*; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::{Duration, Instant}; -use tokio::io::AsyncWriteExt; +//use tokio::io::AsyncWriteExt; use tokio::net::udp::SendHalf; use tokio::net::UdpSocket; use tokio_tungstenite::connect_async; @@ -22,9 +22,14 @@ use url::Url; #[derive(Debug, Deserialize)] struct Conf { filemode: bool, + enable_websocket: bool, rssi_threshold: i32, race_status_source: String, - race_time_source: String, + race_time_source: Option, + + obs_websocket_url: Option, + chorus_udp_url: Option, + obs_mask_filter_name: Option, video_sources: Vec, lap_sources: Vec, @@ -33,8 +38,12 @@ struct Conf { async fn rssi_timer(udpchanneltx: futures::channel::mpsc::UnboundedSender) { loop { - Delay::new(Duration::from_secs(1)).await; - udpchanneltx.unbounded_send("S0r\n".to_string()).unwrap(); + Delay::new(Duration::from_millis(500)).await; + udpchanneltx + .unbounded_send("S0r\n".to_string()) + .unwrap_or_else(|err| { + eprintln!("Could not request RSSI from Chorus: {}", err); + }); } } @@ -65,7 +74,10 @@ async fn programm_to_udp( loop { let send_data = udpchannelrx.next().await.unwrap(); let msg = send_data.into_bytes(); - udptx.send(&msg).await.unwrap(); + udptx.send(&msg).await.unwrap_or_else(|err| { + eprintln!("Could not send to Chorus: {}", err); + return 0; + }); } } @@ -76,23 +88,34 @@ async fn udp_comm(appconf: &Conf, senddata: futures::channel::mpsc::UnboundedSen // Setup the UDP Socket let mut udpsocket = UdpSocket::bind("0.0.0.0:0").await.unwrap(); udpsocket - .connect("127.0.0.1:9000") //192.168.0.141 + .connect( + &appconf + .chorus_udp_url + .as_ref() + .unwrap_or(&"192.168.0.141:9000".to_string()), + ) .await .expect("could not connect to udp "); let msg = String::from("ok\n").into_bytes(); - udpsocket.send(&msg).await.unwrap(); + udpsocket.send(&msg).await.unwrap_or_else(|err| { + eprintln!("Could not send to Chorus: {}", err); + return 0; + }); let (mut udprx, udptx) = udpsocket.split(); let (udpchanneltx, udpchannelrx) = futures::channel::mpsc::unbounded(); tokio::spawn(programm_to_udp(udpchannelrx, udptx)); tokio::spawn(rssi_timer(udpchanneltx.clone())); - tokio::spawn(race_timer( - senddata.clone(), - appconf.race_time_source.clone(), - race_timer_state_clone, - )); + + if let Some(race_time_source) = &appconf.race_time_source { + tokio::spawn(race_timer( + senddata.clone(), + race_time_source.clone(), + race_timer_state_clone, + )); + } loop { let mut buf: [u8; 500] = [0; 500]; @@ -114,7 +137,7 @@ async fn udp_comm(appconf: &Conf, senddata: futures::channel::mpsc::UnboundedSen set_obs_text( &senddata, &appconf.race_status_source, - &"Race inactive".to_string(), + &"Race active".to_string(), ); for i in 0..2 { set_obs_text(&senddata, &appconf.lap_sources[i], &"0".to_string()); @@ -144,7 +167,10 @@ async fn udp_comm(appconf: &Conf, senddata: futures::channel::mpsc::UnboundedSen // Drone is disconnected if drone_active[index] { // Send filter on - let request = json!({"request-type":"SetSourceFilterVisibility", "sourceName":appconf.video_sources[index],"message-id": random::().to_string(), "filterName":"mask" , "filterEnabled": true }); + let request = json!({"request-type":"SetSourceFilterVisibility", "sourceName":appconf.video_sources[index],"message-id": random::().to_string(), "filterName":&appconf + .obs_mask_filter_name + .as_ref() + .unwrap_or(&"mask".to_string()) , "filterEnabled": true }); senddata .unbounded_send(Message::Text(request.to_string())) .unwrap(); @@ -154,7 +180,10 @@ async fn udp_comm(appconf: &Conf, senddata: futures::channel::mpsc::UnboundedSen // Drone is connected! if !drone_active[index] { // Send filter off - let request = json!({"request-type":"SetSourceFilterVisibility", "sourceName":appconf.video_sources[index],"message-id": random::().to_string(), "filterName":"mask" , "filterEnabled": false }); + let request = json!({"request-type":"SetSourceFilterVisibility", "sourceName":appconf.video_sources[index],"message-id": random::().to_string(), "filterName":&appconf + .obs_mask_filter_name + .as_ref() + .unwrap_or(&"mask".to_string()) , "filterEnabled": false }); senddata .unbounded_send(Message::Text(request.to_string())) .unwrap(); @@ -257,12 +286,6 @@ async fn main() { .help("Sets a custom config file") .takes_value(true), ) - .arg( - Arg::with_name("INPUT") - .help("Sets the input file to use") - .required(false) - .index(1), - ) .arg( Arg::with_name("v") .short("v") @@ -278,33 +301,22 @@ async fn main() { .get_matches(); // Gets a value for config if supplied by user, or defaults to "default.conf" - let config = matches.value_of("config").unwrap_or("default.conf"); - println!("Value for config: {}", config); + let configfile = matches.value_of("config").unwrap_or("Config.toml"); + println!("Using config file: {}", configfile); - let mut file = File::open("Config.toml").expect("Unable to open the file"); + let mut file = File::open(&configfile).expect("Unable to open config file"); let mut contents = String::new(); file .read_to_string(&mut contents) .expect("Unable to read the file"); let appconf: Conf = toml::from_str(contents.as_str()).unwrap(); - /* - let appconf = Conf { - filemode: matches.is_present("filemode"), - rssi_threshold: 100, - }; - */ - // Calling .unwrap() is safe here because "INPUT" is required (if "INPUT" wasn't - // required we could have used an 'if let' to conditionally get the value) - if let Some(input) = matches.value_of("INPUT") { - println!("Using input file: {}", input); - } // Vary the output based on how many times the user used the "verbose" flag // (i.e. 'myprog -v -v -v' or 'myprog -vvv' vs 'myprog -v' match matches.occurrences_of("v") { 0 => println!("No verbose info"), - 1 => println!("Some verbose info"), - 2 => println!("Tons of verbose info"), + 1 => println!("Verbose UDP"), + 2 => println!("Verbose UDP + Websocket"), 3 | _ => println!("Don't be crazy"), } @@ -319,10 +331,27 @@ async fn main() { write_file("-.-".to_string(), "rx3_laptime.txt"); } + if !appconf.enable_websocket { + println!("Programm initialized! (Without websocket connection)"); + let (obstx, obsrx) = futures::channel::mpsc::unbounded(); // creating without a place to send the packets + tokio::spawn(obschan_to_nowhere(obsrx)); + + udp_comm(&appconf, obstx).await; + std::process::exit(0); + } + // Setup websocket for OBS - let (ws_stream, _) = connect_async(Url::parse("ws://localhost:4444/").unwrap()) - .await - .expect("Could not connect to OBS"); + let (ws_stream, _) = connect_async( + Url::parse( + &appconf + .obs_websocket_url + .as_ref() + .unwrap_or(&"ws://localhost:4444/".to_string()), + ) + .expect("Invalid Websocket URL format"), + ) + .await + .expect("Could not connect to OBS"); println!("Connected to OBS"); @@ -333,8 +362,25 @@ async fn main() { ws_read.for_each(|message| { async { let data = message.unwrap().into_data(); - println!("Messg"); - tokio::io::stdout().write_all(&data).await.unwrap(); + // TODO: Parse errors here and filter the rest + + let data_string = match std::str::from_utf8(&data) { + Ok(v) => v, + Err(e) => panic!("Invalid UTF-8 sequence: {}", e), + }; + + if let Ok(untyped_json_value) = serde_json::from_str(data_string) { + let json_value: serde_json::Value = untyped_json_value; + if json_value["status"].as_str().is_some() + && json_value["status"].as_str().unwrap() == "error" + { + eprintln!("OBBS Error: {}", json_value["error"]); + } + // TODO: Check appconfig verbosity + //tokio::io::stdout().write_all(&data).await.unwrap(); + } else { + eprintln!("Could not parse json value from obs"); + } } }) }; @@ -347,6 +393,12 @@ async fn main() { udp_comm(&appconf, obstx).await; } +async fn obschan_to_nowhere(mut obsrx: futures::channel::mpsc::UnboundedReceiver) { + loop { + obsrx.next().await.unwrap(); + } +} + fn set_obs_text( wschannel: &futures::channel::mpsc::UnboundedSender, source: &String,