You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

105 lines
4.6 KiB
Python

# Only Allow Imports
if __name__ == "__main__":
exit()
else:
# Imports
import socket
# Client
class Client():
# Init
def __init__(self, ip_address, port):
self.HEADER = 100
self.SOCKET = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.SOCKET.connect((ip_address, port))
# Send String
def send_string(self, message):
message = message.encode("utf-8")
message_length = len(message)
message_length = str(message_length).encode("utf-8")
message_length += b" " * (self.HEADER - len(message_length))
self.SOCKET.send(message_length)
self.SOCKET.send(message)
# Send Byte
def send_byte(self, message):
message_length = len(message)
message_length = str(message_length).encode("utf-8")
message_length += b" " * (self.HEADER - len(message_length))
self.SOCKET.send(message_length)
self.SOCKET.send(message)
# Receive String
def receive_string(self):
message = bytearray(b'')
message_length = self.SOCKET.recv(self.HEADER).decode("utf-8")
if message_length:
message_length = int(message_length)
received_message_length = 0
while message_length > received_message_length:
received = self.SOCKET.recv(message_length - received_message_length)
message = message + received
received_message_length += len(received)
message = message.decode("utf-8")
return message
# Receive Byte
def receive_byte(self):
message = bytearray(b'')
message_length = self.SOCKET.recv(self.HEADER).decode("utf-8")
if message_length:
message_length = int(message_length)
received_message_length = 0
while message_length > received_message_length:
received = self.SOCKET.recv(message_length - received_message_length)
message = message + received
received_message_length += len(received)
return message
# Server
class Server():
# Init
def __init__(self, ip_address, port):
self.HEADER = 100
self.SOCKET = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.SOCKET.bind((ip_address, port))
self.SOCKET.listen()
# Connection
def receive_connection(self):
connection, address = self.SOCKET.accept()
return connection, address
# Send String
def send_string(self, connection, message):
message = message.encode("utf-8")
message_length = len(message)
message_length = str(message_length).encode("utf-8")
message_length += b" " * (self.HEADER - len(message_length))
connection.send(message_length)
connection.send(message)
# Send Byte
def send_byte(self, connection, message):
message_length = len(message)
message_length = str(message_length).encode("utf-8")
message_length += b" " * (self.HEADER - len(message_length))
connection.send(message_length)
connection.send(message)
# Receive String
def receive_string(self, connection):
message = bytearray(b'')
message_length = connection.recv(self.HEADER).decode("utf-8")
if message_length:
message_length = int(message_length)
received_message_length = 0
while message_length > received_message_length:
received = connection.recv(message_length - received_message_length)
message = message + received
received_message_length += len(received)
message = message.decode("utf-8")
return message
# Receive Byte
def receive_byte(self, connection):
message = bytearray(b'')
message_length = connection.recv(self.HEADER).decode("utf-8")
if message_length:
message_length = int(message_length)
received_message_length = 0
while message_length > received_message_length:
received = connection.recv(message_length - received_message_length)
message = message + received
received_message_length += len(received)
return message