Votes Internally!
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

89 lines
2.3 KiB

import socket
import cmd
import threading
import time
from packet import Packet, CMD
class KAThread(threading.Thread):
def run(self):
tbot = self._args[0]
pkt = Packet()
pkt.write_ubyte(CMD.KEEPALIVE)
while True:
if tbot.addr:
try:
time.sleep(0.1)
tbot.socket.sendto(bytes(pkt), tbot.addr)
except Exception:
pass
class TBot(cmd.Cmd):
socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
addr = None
def do_connect(self, rest):
'connect [<host>[:<port>]]'
if rest:
host, _, port = rest.partition(':')
if not port:
port = '13676'
self.addr = (host, int(port))
else:
self.addr = None
print('Remote set to', self.addr)
def do_fw(self, rest):
'fw <velocity:m/s>'
if not self.addr:
print('Not connected')
return
if not rest:
rest = '0'
pkt = Packet()
pkt.write_ubyte(CMD.MOTION)
pkt.write_double(float(rest))
pkt.write_double(0.0)
self.socket.sendto(bytes(pkt), self.addr)
def do_lt(self, rest):
'lt <velocity:rad/s>'
if not self.addr:
print('Not connected')
return
if not rest:
rest = '0'
pkt = Packet()
pkt.write_ubyte(CMD.MOTION)
pkt.write_double(0.0)
pkt.write_double(float(rest))
self.socket.sendto(bytes(pkt), self.addr)
def do_snd(self, rest):
'snd <sound, [0,6]>'
if not self.addr:
print('Not connected')
return
if not rest:
rest = '0'
pkt = Packet()
pkt.write_ubyte(CMD.SOUND)
pkt.write_ubyte(int(rest))
self.socket.sendto(bytes(pkt), self.addr)
def do_EOF(self, rest):
print('Goodbye.')
if self.addr:
pkt = Packet()
pkt.write_ubyte(CMD.MOTION)
pkt.write_double(0.0)
pkt.write_double(0.0)
self.socket.sendto(bytes(pkt), self.addr)
exit()
if __name__ == '__main__':
tbot = TBot()
tthr = KAThread(args=(tbot,))
tthr.setDaemon(True)
tthr.start()
tbot.cmdloop()