More configurable params, fix some panics

This commit is contained in:
Lukas Bachschwell 2020-02-04 23:00:20 +01:00
parent 3fb3508be2
commit 7b41398937
Signed by: lbsadmin
GPG Key ID: CCC6AA87CC8DF425
3 changed files with 111 additions and 45 deletions

View File

@ -1,9 +1,15 @@
filemode = false filemode = false
rssi_threshold = 100 rssi_threshold = 80
enable_websocket = true
obs_websocket_url = "ws://localhost:4444/"
chorus_udp_url = "192.168.0.141:9000"
# chorus_udp_url = "127.0.0.1:9000" # local test with nc
obs_mask_filter_name = "mask"
video_sources = [ "RX1", "RX2", "RX3" ] video_sources = [ "RX1", "RX2", "RX3" ]
lap_sources = [ "RX1Lap", "RX2Lap", "RX3Lap" ] lap_sources = [ "RX1Lap", "RX2Lap", "RX3Lap" ]
laptime_sources = [ "RX1LapTime", "RX2LapTime", "RX3LapTime" ] laptime_sources = [ "RX1LapTime", "RX2LapTime", "RX3LapTime" ]
race_status_source = "RaceStatus" race_status_source = "RaceStatus"
race_time_source = "RaceTime" #race_time_source = "RaceTime"

View File

@ -1 +1,9 @@
# ChorusOBSsync # ChorusOBSsync
- [ ] Finish serde config parsing (more optionals, no filemode flag in clap)
- [x] No OBS Mode
- [x] Less errors!
- [x] OBS Error message decodinng
- [ ] Initial states for race and beginning
- [ ] Could add some disconection handeling of the chorus device in rssi request (if it fails 5 times churus has disconnnected and conenction should be reinitialized ?!)

View File

@ -12,7 +12,7 @@ use std::io::prelude::*;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc; use std::sync::Arc;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use tokio::io::AsyncWriteExt; //use tokio::io::AsyncWriteExt;
use tokio::net::udp::SendHalf; use tokio::net::udp::SendHalf;
use tokio::net::UdpSocket; use tokio::net::UdpSocket;
use tokio_tungstenite::connect_async; use tokio_tungstenite::connect_async;
@ -22,9 +22,14 @@ use url::Url;
#[derive(Debug, Deserialize)] #[derive(Debug, Deserialize)]
struct Conf { struct Conf {
filemode: bool, filemode: bool,
enable_websocket: bool,
rssi_threshold: i32, rssi_threshold: i32,
race_status_source: String, race_status_source: String,
race_time_source: String, race_time_source: Option<String>,
obs_websocket_url: Option<String>,
chorus_udp_url: Option<String>,
obs_mask_filter_name: Option<String>,
video_sources: Vec<String>, video_sources: Vec<String>,
lap_sources: Vec<String>, lap_sources: Vec<String>,
@ -33,8 +38,12 @@ struct Conf {
async fn rssi_timer(udpchanneltx: futures::channel::mpsc::UnboundedSender<String>) { async fn rssi_timer(udpchanneltx: futures::channel::mpsc::UnboundedSender<String>) {
loop { loop {
Delay::new(Duration::from_secs(1)).await; Delay::new(Duration::from_millis(500)).await;
udpchanneltx.unbounded_send("S0r\n".to_string()).unwrap(); udpchanneltx
.unbounded_send("S0r\n".to_string())
.unwrap_or_else(|err| {
eprintln!("Could not request RSSI from Chorus: {}", err);
});
} }
} }
@ -65,7 +74,10 @@ async fn programm_to_udp(
loop { loop {
let send_data = udpchannelrx.next().await.unwrap(); let send_data = udpchannelrx.next().await.unwrap();
let msg = send_data.into_bytes(); let msg = send_data.into_bytes();
udptx.send(&msg).await.unwrap(); udptx.send(&msg).await.unwrap_or_else(|err| {
eprintln!("Could not send to Chorus: {}", err);
return 0;
});
} }
} }
@ -76,23 +88,34 @@ async fn udp_comm(appconf: &Conf, senddata: futures::channel::mpsc::UnboundedSen
// Setup the UDP Socket // Setup the UDP Socket
let mut udpsocket = UdpSocket::bind("0.0.0.0:0").await.unwrap(); let mut udpsocket = UdpSocket::bind("0.0.0.0:0").await.unwrap();
udpsocket udpsocket
.connect("127.0.0.1:9000") //192.168.0.141 .connect(
&appconf
.chorus_udp_url
.as_ref()
.unwrap_or(&"192.168.0.141:9000".to_string()),
)
.await .await
.expect("could not connect to udp "); .expect("could not connect to udp ");
let msg = String::from("ok\n").into_bytes(); let msg = String::from("ok\n").into_bytes();
udpsocket.send(&msg).await.unwrap(); udpsocket.send(&msg).await.unwrap_or_else(|err| {
eprintln!("Could not send to Chorus: {}", err);
return 0;
});
let (mut udprx, udptx) = udpsocket.split(); let (mut udprx, udptx) = udpsocket.split();
let (udpchanneltx, udpchannelrx) = futures::channel::mpsc::unbounded(); let (udpchanneltx, udpchannelrx) = futures::channel::mpsc::unbounded();
tokio::spawn(programm_to_udp(udpchannelrx, udptx)); tokio::spawn(programm_to_udp(udpchannelrx, udptx));
tokio::spawn(rssi_timer(udpchanneltx.clone())); tokio::spawn(rssi_timer(udpchanneltx.clone()));
tokio::spawn(race_timer(
senddata.clone(), if let Some(race_time_source) = &appconf.race_time_source {
appconf.race_time_source.clone(), tokio::spawn(race_timer(
race_timer_state_clone, senddata.clone(),
)); race_time_source.clone(),
race_timer_state_clone,
));
}
loop { loop {
let mut buf: [u8; 500] = [0; 500]; let mut buf: [u8; 500] = [0; 500];
@ -114,7 +137,7 @@ async fn udp_comm(appconf: &Conf, senddata: futures::channel::mpsc::UnboundedSen
set_obs_text( set_obs_text(
&senddata, &senddata,
&appconf.race_status_source, &appconf.race_status_source,
&"Race inactive".to_string(), &"Race active".to_string(),
); );
for i in 0..2 { for i in 0..2 {
set_obs_text(&senddata, &appconf.lap_sources[i], &"0".to_string()); set_obs_text(&senddata, &appconf.lap_sources[i], &"0".to_string());
@ -144,7 +167,10 @@ async fn udp_comm(appconf: &Conf, senddata: futures::channel::mpsc::UnboundedSen
// Drone is disconnected // Drone is disconnected
if drone_active[index] { if drone_active[index] {
// Send filter on // Send filter on
let request = json!({"request-type":"SetSourceFilterVisibility", "sourceName":appconf.video_sources[index],"message-id": random::<f64>().to_string(), "filterName":"mask" , "filterEnabled": true }); let request = json!({"request-type":"SetSourceFilterVisibility", "sourceName":appconf.video_sources[index],"message-id": random::<f64>().to_string(), "filterName":&appconf
.obs_mask_filter_name
.as_ref()
.unwrap_or(&"mask".to_string()) , "filterEnabled": true });
senddata senddata
.unbounded_send(Message::Text(request.to_string())) .unbounded_send(Message::Text(request.to_string()))
.unwrap(); .unwrap();
@ -154,7 +180,10 @@ async fn udp_comm(appconf: &Conf, senddata: futures::channel::mpsc::UnboundedSen
// Drone is connected! // Drone is connected!
if !drone_active[index] { if !drone_active[index] {
// Send filter off // Send filter off
let request = json!({"request-type":"SetSourceFilterVisibility", "sourceName":appconf.video_sources[index],"message-id": random::<f64>().to_string(), "filterName":"mask" , "filterEnabled": false }); let request = json!({"request-type":"SetSourceFilterVisibility", "sourceName":appconf.video_sources[index],"message-id": random::<f64>().to_string(), "filterName":&appconf
.obs_mask_filter_name
.as_ref()
.unwrap_or(&"mask".to_string()) , "filterEnabled": false });
senddata senddata
.unbounded_send(Message::Text(request.to_string())) .unbounded_send(Message::Text(request.to_string()))
.unwrap(); .unwrap();
@ -257,12 +286,6 @@ async fn main() {
.help("Sets a custom config file") .help("Sets a custom config file")
.takes_value(true), .takes_value(true),
) )
.arg(
Arg::with_name("INPUT")
.help("Sets the input file to use")
.required(false)
.index(1),
)
.arg( .arg(
Arg::with_name("v") Arg::with_name("v")
.short("v") .short("v")
@ -278,33 +301,22 @@ async fn main() {
.get_matches(); .get_matches();
// Gets a value for config if supplied by user, or defaults to "default.conf" // Gets a value for config if supplied by user, or defaults to "default.conf"
let config = matches.value_of("config").unwrap_or("default.conf"); let configfile = matches.value_of("config").unwrap_or("Config.toml");
println!("Value for config: {}", config); println!("Using config file: {}", configfile);
let mut file = File::open("Config.toml").expect("Unable to open the file"); let mut file = File::open(&configfile).expect("Unable to open config file");
let mut contents = String::new(); let mut contents = String::new();
file file
.read_to_string(&mut contents) .read_to_string(&mut contents)
.expect("Unable to read the file"); .expect("Unable to read the file");
let appconf: Conf = toml::from_str(contents.as_str()).unwrap(); let appconf: Conf = toml::from_str(contents.as_str()).unwrap();
/*
let appconf = Conf {
filemode: matches.is_present("filemode"),
rssi_threshold: 100,
};
*/
// Calling .unwrap() is safe here because "INPUT" is required (if "INPUT" wasn't
// required we could have used an 'if let' to conditionally get the value)
if let Some(input) = matches.value_of("INPUT") {
println!("Using input file: {}", input);
}
// Vary the output based on how many times the user used the "verbose" flag // Vary the output based on how many times the user used the "verbose" flag
// (i.e. 'myprog -v -v -v' or 'myprog -vvv' vs 'myprog -v' // (i.e. 'myprog -v -v -v' or 'myprog -vvv' vs 'myprog -v'
match matches.occurrences_of("v") { match matches.occurrences_of("v") {
0 => println!("No verbose info"), 0 => println!("No verbose info"),
1 => println!("Some verbose info"), 1 => println!("Verbose UDP"),
2 => println!("Tons of verbose info"), 2 => println!("Verbose UDP + Websocket"),
3 | _ => println!("Don't be crazy"), 3 | _ => println!("Don't be crazy"),
} }
@ -319,10 +331,27 @@ async fn main() {
write_file("-.-".to_string(), "rx3_laptime.txt"); write_file("-.-".to_string(), "rx3_laptime.txt");
} }
if !appconf.enable_websocket {
println!("Programm initialized! (Without websocket connection)");
let (obstx, obsrx) = futures::channel::mpsc::unbounded(); // creating without a place to send the packets
tokio::spawn(obschan_to_nowhere(obsrx));
udp_comm(&appconf, obstx).await;
std::process::exit(0);
}
// Setup websocket for OBS // Setup websocket for OBS
let (ws_stream, _) = connect_async(Url::parse("ws://localhost:4444/").unwrap()) let (ws_stream, _) = connect_async(
.await Url::parse(
.expect("Could not connect to OBS"); &appconf
.obs_websocket_url
.as_ref()
.unwrap_or(&"ws://localhost:4444/".to_string()),
)
.expect("Invalid Websocket URL format"),
)
.await
.expect("Could not connect to OBS");
println!("Connected to OBS"); println!("Connected to OBS");
@ -333,8 +362,25 @@ async fn main() {
ws_read.for_each(|message| { ws_read.for_each(|message| {
async { async {
let data = message.unwrap().into_data(); let data = message.unwrap().into_data();
println!("Messg"); // TODO: Parse errors here and filter the rest
tokio::io::stdout().write_all(&data).await.unwrap();
let data_string = match std::str::from_utf8(&data) {
Ok(v) => v,
Err(e) => panic!("Invalid UTF-8 sequence: {}", e),
};
if let Ok(untyped_json_value) = serde_json::from_str(data_string) {
let json_value: serde_json::Value = untyped_json_value;
if json_value["status"].as_str().is_some()
&& json_value["status"].as_str().unwrap() == "error"
{
eprintln!("OBBS Error: {}", json_value["error"]);
}
// TODO: Check appconfig verbosity
//tokio::io::stdout().write_all(&data).await.unwrap();
} else {
eprintln!("Could not parse json value from obs");
}
} }
}) })
}; };
@ -347,6 +393,12 @@ async fn main() {
udp_comm(&appconf, obstx).await; udp_comm(&appconf, obstx).await;
} }
async fn obschan_to_nowhere(mut obsrx: futures::channel::mpsc::UnboundedReceiver<Message>) {
loop {
obsrx.next().await.unwrap();
}
}
fn set_obs_text( fn set_obs_text(
wschannel: &futures::channel::mpsc::UnboundedSender<Message>, wschannel: &futures::channel::mpsc::UnboundedSender<Message>,
source: &String, source: &String,