352 lines
10 KiB
Python
352 lines
10 KiB
Python
|
#
|
||
|
# Project OBS Twitch Chat Spam Script
|
||
|
# @author David Madison
|
||
|
# @link github.com/dmadison/OBS-ChatSpam
|
||
|
# @license GPLv3 - Copyright (c) 2018 David Madison
|
||
|
#
|
||
|
# This program is free software: you can redistribute it and/or modify
|
||
|
# it under the terms of the GNU General Public License as published by
|
||
|
# the Free Software Foundation, either version 3 of the License, or
|
||
|
# (at your option) any later version.
|
||
|
#
|
||
|
# This program is distributed in the hope that it will be useful,
|
||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||
|
# GNU General Public License for more details.
|
||
|
#
|
||
|
# You should have received a copy of the GNU General Public License
|
||
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||
|
#
|
||
|
|
||
|
import obspython as obs
|
||
|
import socket
|
||
|
import time
|
||
|
|
||
|
|
||
|
class TwitchIRC:
|
||
|
def __init__(self, chan="", nick="", passw="", host="irc.twitch.tv", port=6667):
|
||
|
self.channel = chan
|
||
|
self.nickname = nick
|
||
|
self.password = passw
|
||
|
self.host = host
|
||
|
self.port = port
|
||
|
|
||
|
self.rate_num_msgs = 19 # Number of messages allowed...
|
||
|
self.rate_timeframe = 30 # ...in timeframe of x seconds
|
||
|
self.__message_timestamps = []
|
||
|
|
||
|
self.__connected = False
|
||
|
self.__last_message = None # Last connection timestamp
|
||
|
self.timeout = 10.0 # Time before open connection is closed, in seconds
|
||
|
|
||
|
self.__sock = socket.socket()
|
||
|
|
||
|
def connect(self, suppress_warnings=True):
|
||
|
connection_result = self.__connect()
|
||
|
|
||
|
if connection_result is not True:
|
||
|
self.__connected = False
|
||
|
if suppress_warnings:
|
||
|
print("Connection Error:", connection_result)
|
||
|
return False
|
||
|
else:
|
||
|
raise UserWarning(connection_result)
|
||
|
|
||
|
self.__connected = True
|
||
|
return True
|
||
|
|
||
|
def __connect(self):
|
||
|
if self.__connected:
|
||
|
return True # Already connected, nothing to see here
|
||
|
|
||
|
self.__sock = socket.socket()
|
||
|
self.__sock.settimeout(1) # One second to connect
|
||
|
|
||
|
try:
|
||
|
self.__sock.connect((self.host, self.port))
|
||
|
except socket.gaierror:
|
||
|
return "Cannot find server"
|
||
|
except (TimeoutError, socket.timeout):
|
||
|
return "No response from server (connection timed out)"
|
||
|
|
||
|
if self.password is not "":
|
||
|
self.__sock.send("PASS {}\r\n".format(self.password).encode("utf-8"))
|
||
|
self.__sock.send("NICK {}\r\n".format(self.nickname).encode("utf-8"))
|
||
|
self.__sock.send("JOIN #{}\r\n".format(self.channel).encode("utf-8"))
|
||
|
|
||
|
auth_response = self.read()
|
||
|
if "Welcome, GLHF!" not in auth_response:
|
||
|
return "Bad Authentication! Check your Oauth key"
|
||
|
|
||
|
try:
|
||
|
self.read() # Wait for "JOIN" response
|
||
|
except socket.timeout:
|
||
|
return "Channel not found!"
|
||
|
|
||
|
return True
|
||
|
|
||
|
def disconnect(self):
|
||
|
if self.__connected:
|
||
|
self.__sock.shutdown(socket.SHUT_RDWR)
|
||
|
self.__sock.close()
|
||
|
self.__connected = False
|
||
|
|
||
|
def connection_timeout(self):
|
||
|
if self.__connected and time.time() >= self.__last_message + self.timeout:
|
||
|
self.disconnect()
|
||
|
|
||
|
def test_authentication(self):
|
||
|
if self.connect(False):
|
||
|
self.disconnect()
|
||
|
print("Authentication successful!")
|
||
|
|
||
|
def chat(self, msg, suppress_warnings=True):
|
||
|
if not self.check_rates() or not self.connect(suppress_warnings):
|
||
|
return
|
||
|
|
||
|
# Store timestamps for rate limit and connection timeout
|
||
|
message_time = time.time()
|
||
|
self.__message_timestamps.append(message_time + self.rate_timeframe)
|
||
|
self.__last_message = message_time
|
||
|
|
||
|
self.__chat_direct(msg)
|
||
|
print("Sent \'" + msg + "\'", "as", self.nickname, "in #" + self.channel)
|
||
|
|
||
|
def __chat_direct(self, msg):
|
||
|
self.__sock.send("PRIVMSG #{} :{}\r\n".format(self.channel, msg).encode("utf-8"))
|
||
|
|
||
|
def read(self):
|
||
|
response = self.__read_socket()
|
||
|
while self.__ping(response):
|
||
|
response = self.__read_socket()
|
||
|
return response.rstrip()
|
||
|
|
||
|
def __read_socket(self):
|
||
|
return self.__sock.recv(1024).decode("utf-8")
|
||
|
|
||
|
def __ping(self, msg):
|
||
|
if msg[:4] == "PING":
|
||
|
self.__pong(msg[4:])
|
||
|
return True
|
||
|
return False
|
||
|
|
||
|
def __pong(self, host):
|
||
|
self.__sock.send(("PONG" + host).encode("utf-8"))
|
||
|
|
||
|
def check_rates(self):
|
||
|
index = 0
|
||
|
|
||
|
# Remove timestamps that have passed
|
||
|
for index, timestamp in enumerate(self.__message_timestamps):
|
||
|
if time.time() <= timestamp:
|
||
|
break
|
||
|
self.__message_timestamps = self.__message_timestamps[index:]
|
||
|
|
||
|
# If at max rate, throw an error
|
||
|
if len(self.__message_timestamps) >= self.rate_num_msgs:
|
||
|
next_clear = int(self.__message_timestamps[0] - time.time())
|
||
|
msg_plural = "s"
|
||
|
|
||
|
if next_clear <= 1:
|
||
|
next_clear = 1 # Avoiding "wait 0 more seconds" messages
|
||
|
msg_plural = ""
|
||
|
|
||
|
print("Error: Rate limit reached. Please wait " + str(next_clear) + " more second" + msg_plural)
|
||
|
return False
|
||
|
|
||
|
return True
|
||
|
|
||
|
twitch = TwitchIRC()
|
||
|
|
||
|
class ChatMessage:
|
||
|
messages = []
|
||
|
max_description_length = 32
|
||
|
|
||
|
def __init__(self, msg, position, obs_settings, irc=twitch):
|
||
|
self.text = msg
|
||
|
self.irc = irc
|
||
|
|
||
|
self.obs_data = obs_settings
|
||
|
|
||
|
self.position = position
|
||
|
self.hotkey_id = obs.OBS_INVALID_HOTKEY_ID
|
||
|
self.hotkey_saved_key = None
|
||
|
|
||
|
self.load_hotkey()
|
||
|
self.register_hotkey()
|
||
|
self.save_hotkey()
|
||
|
|
||
|
def __del__(self):
|
||
|
self.cleanup()
|
||
|
|
||
|
def cleanup(self):
|
||
|
self.deregister_hotkey()
|
||
|
self.release_memory()
|
||
|
|
||
|
def release_memory(self):
|
||
|
obs.obs_data_array_release(self.hotkey_saved_key)
|
||
|
|
||
|
def new_text(self, msg):
|
||
|
self.text = msg
|
||
|
self.deregister_hotkey()
|
||
|
self.register_hotkey()
|
||
|
|
||
|
def new_position(self, pos):
|
||
|
self.deregister_hotkey()
|
||
|
self.unsave_hotkey()
|
||
|
self.position = pos
|
||
|
self.register_hotkey()
|
||
|
|
||
|
def register_hotkey(self):
|
||
|
if len(self.text) > ChatMessage.max_description_length:
|
||
|
key_description = self.text[:ChatMessage.max_description_length - 3] + "..."
|
||
|
else:
|
||
|
key_description = self.text
|
||
|
key_description = "Chat \'" + key_description + "\'"
|
||
|
|
||
|
self.callback = lambda pressed: self.key_passthrough(pressed) # Small hack to get around the callback signature reqs.
|
||
|
self.hotkey_id = obs.obs_hotkey_register_frontend("chat_hotkey", key_description, self.callback)
|
||
|
obs.obs_hotkey_load(self.hotkey_id, self.hotkey_saved_key)
|
||
|
|
||
|
def deregister_hotkey(self):
|
||
|
obs.obs_hotkey_unregister(self.callback)
|
||
|
|
||
|
def load_hotkey(self):
|
||
|
self.hotkey_saved_key = obs.obs_data_get_array(self.obs_data, "chat_hotkey_" + str(self.position))
|
||
|
|
||
|
def save_hotkey(self):
|
||
|
self.hotkey_saved_key = obs.obs_hotkey_save(self.hotkey_id)
|
||
|
obs.obs_data_set_array(self.obs_data, "chat_hotkey_" + str(self.position), self.hotkey_saved_key)
|
||
|
|
||
|
def unsave_hotkey(self):
|
||
|
obs.obs_data_erase(self.obs_data, "chat_hotkey_" + str(self.position))
|
||
|
|
||
|
def key_passthrough(self, pressed):
|
||
|
if pressed:
|
||
|
self.send()
|
||
|
|
||
|
def send(self, suppress_warnings=True):
|
||
|
self.irc.chat(self.text, suppress_warnings)
|
||
|
|
||
|
@staticmethod
|
||
|
def check_messages(new_msgs, settings):
|
||
|
# Check if list hasn't changed
|
||
|
if len(new_msgs) == len(ChatMessage.messages):
|
||
|
num_diff = 0
|
||
|
diff_index = None
|
||
|
|
||
|
for index, msg in enumerate(ChatMessage.messages):
|
||
|
if new_msgs[index] != msg.text:
|
||
|
num_diff += 1
|
||
|
diff_index = index
|
||
|
if num_diff > 1:
|
||
|
break
|
||
|
else:
|
||
|
if num_diff != 0:
|
||
|
ChatMessage.messages[diff_index].new_text(new_msgs[diff_index]) # single entry modified
|
||
|
return # Lists identical
|
||
|
|
||
|
# Check if objects already exist, otherwise create them
|
||
|
new_list = []
|
||
|
for pos, msg in enumerate(new_msgs):
|
||
|
for msg_obj in ChatMessage.messages:
|
||
|
if msg == msg_obj.text:
|
||
|
new_list.append(msg_obj)
|
||
|
break
|
||
|
else:
|
||
|
new_list.append(ChatMessage(msg, pos, settings))
|
||
|
|
||
|
# Clean up old objects
|
||
|
for msg in ChatMessage.messages:
|
||
|
for msg_new in new_msgs:
|
||
|
if msg.text == msg_new:
|
||
|
break
|
||
|
else:
|
||
|
msg.cleanup()
|
||
|
msg.unsave_hotkey()
|
||
|
|
||
|
# Assign to master array and reindex
|
||
|
ChatMessage.messages = new_list
|
||
|
ChatMessage.__reindex_messages()
|
||
|
|
||
|
@staticmethod
|
||
|
def __reindex_messages():
|
||
|
for index, msg in enumerate(ChatMessage.messages):
|
||
|
msg.new_position(index)
|
||
|
|
||
|
for msg in ChatMessage.messages: # Separate loop as to avoid memory overwrites
|
||
|
msg.save_hotkey()
|
||
|
|
||
|
|
||
|
# ------------------------------------------------------------
|
||
|
|
||
|
# OBS Script Functions
|
||
|
|
||
|
def check_connection():
|
||
|
twitch.connection_timeout()
|
||
|
|
||
|
def test_authentication(prop, props):
|
||
|
twitch.test_authentication()
|
||
|
|
||
|
def test_message(prop, props):
|
||
|
ChatMessage.messages[0].send(False)
|
||
|
|
||
|
def script_description():
|
||
|
return "<b>Twitch Chat Spam</b>" + \
|
||
|
"<hr>" + \
|
||
|
"Python script for sending messages to Twitch chat using OBS hotkeys." + \
|
||
|
"<br/><br/>" + \
|
||
|
"Made by David Madison, © 2018" + \
|
||
|
"<br/><br/>" + \
|
||
|
"github.com/dmadison/OBS-ChatSpam" + \
|
||
|
"<br/>" + \
|
||
|
"partsnotincluded.com"
|
||
|
|
||
|
def script_update(settings):
|
||
|
twitch.channel = obs.obs_data_get_string(settings, "channel").lower()
|
||
|
twitch.nickname = obs.obs_data_get_string(settings, "user").lower()
|
||
|
|
||
|
new_oauth = obs.obs_data_get_string(settings, "oauth").lower()
|
||
|
if new_oauth != twitch.password:
|
||
|
twitch.disconnect() # Disconnect old oauth connection, if it exists
|
||
|
twitch.password = new_oauth
|
||
|
|
||
|
obs_messages = obs.obs_data_get_array(settings, "messages")
|
||
|
num_messages = obs.obs_data_array_count(obs_messages)
|
||
|
|
||
|
messages = []
|
||
|
for i in range(num_messages): # Convert C array to Python list
|
||
|
message_object = obs.obs_data_array_item(obs_messages, i)
|
||
|
messages.append(obs.obs_data_get_string(message_object, "value"))
|
||
|
|
||
|
ChatMessage.check_messages(messages, settings)
|
||
|
obs.obs_data_array_release(obs_messages)
|
||
|
|
||
|
#print("Settings JSON", obs.obs_data_get_json(settings))
|
||
|
|
||
|
def script_properties():
|
||
|
props = obs.obs_properties_create()
|
||
|
|
||
|
obs.obs_properties_add_text(props, "channel", "Channel", obs.OBS_TEXT_DEFAULT)
|
||
|
obs.obs_properties_add_text(props, "user", "User", obs.OBS_TEXT_DEFAULT)
|
||
|
obs.obs_properties_add_text(props, "oauth", "Oauth", obs.OBS_TEXT_PASSWORD)
|
||
|
|
||
|
obs.obs_properties_add_editable_list(props, "messages", "Messages", obs.OBS_EDITABLE_LIST_TYPE_STRINGS, "", "")
|
||
|
obs.obs_properties_add_button(props, "test_auth", "Test Authentication", test_authentication)
|
||
|
obs.obs_properties_add_button(props, "test_message", "Test Message #1", test_message)
|
||
|
|
||
|
return props
|
||
|
|
||
|
def script_save(settings):
|
||
|
for message in ChatMessage.messages:
|
||
|
message.save_hotkey()
|
||
|
|
||
|
def script_load(settings):
|
||
|
obs.timer_add(check_connection, 1000) # Check for timeout every second
|
||
|
|
||
|
def script_unload():
|
||
|
obs.timer_remove(check_connection)
|
||
|
|
||
|
for message in ChatMessage.messages:
|
||
|
message.cleanup()
|
||
|
cargo
|