dotfiles/vscode/.vscode/extensions/ms-python.python-2024.8.0/python_files/python_server.py
Errol Sancaktar ff17c17e23 vscode
2024-06-14 09:31:58 -06:00

179 lines
5.1 KiB
Python

from typing import Dict, List, Optional, Union
import sys
import json
import contextlib
import io
import traceback
import uuid
import ast
STDIN = sys.stdin
STDOUT = sys.stdout
STDERR = sys.stderr
USER_GLOBALS = {}
def send_message(msg: str):
length_msg = len(msg)
STDOUT.buffer.write(f"Content-Length: {length_msg}\r\n\r\n{msg}".encode(encoding="utf-8"))
STDOUT.buffer.flush()
def print_log(msg: str):
send_message(json.dumps({"jsonrpc": "2.0", "method": "log", "params": msg}))
def send_response(response: str, response_id: int):
send_message(json.dumps({"jsonrpc": "2.0", "id": response_id, "result": response}))
def send_request(params: Optional[Union[List, Dict]] = None):
request_id = uuid.uuid4().hex
if params is None:
send_message(json.dumps({"jsonrpc": "2.0", "id": request_id, "method": "input"}))
else:
send_message(
json.dumps({"jsonrpc": "2.0", "id": request_id, "method": "input", "params": params})
)
return request_id
original_input = input
def custom_input(prompt=""):
try:
send_request({"prompt": prompt})
headers = get_headers()
content_length = int(headers.get("Content-Length", 0))
if content_length:
message_text = STDIN.read(content_length)
message_json = json.loads(message_text)
our_user_input = message_json["result"]["userInput"]
return our_user_input
except Exception:
print_log(traceback.format_exc())
# Set input to our custom input
USER_GLOBALS["input"] = custom_input
input = custom_input
def handle_response(request_id):
while not STDIN.closed:
try:
headers = get_headers()
content_length = int(headers.get("Content-Length", 0))
if content_length:
message_text = STDIN.read(content_length)
message_json = json.loads(message_text)
our_user_input = message_json["result"]["userInput"]
if message_json["id"] == request_id:
send_response(our_user_input, message_json["id"])
elif message_json["method"] == "exit":
sys.exit(0)
except Exception:
print_log(traceback.format_exc())
def exec_function(user_input):
try:
compile(user_input, "<stdin>", "eval")
except SyntaxError:
return exec
return eval
def check_valid_command(request):
try:
user_input = request["params"]
ast.parse(user_input[0])
send_response("True", request["id"])
except SyntaxError:
send_response("False", request["id"])
def execute(request, user_globals):
str_output = CustomIO("<stdout>", encoding="utf-8")
str_error = CustomIO("<stderr>", encoding="utf-8")
with redirect_io("stdout", str_output):
with redirect_io("stderr", str_error):
str_input = CustomIO("<stdin>", encoding="utf-8", newline="\n")
with redirect_io("stdin", str_input):
exec_user_input(request["params"], user_globals)
send_response(str_output.get_value(), request["id"])
def exec_user_input(user_input, user_globals):
user_input = user_input[0] if isinstance(user_input, list) else user_input
try:
callable = exec_function(user_input)
retval = callable(user_input, user_globals)
if retval is not None:
print(retval)
except KeyboardInterrupt:
print(traceback.format_exc())
except Exception:
print(traceback.format_exc())
class CustomIO(io.TextIOWrapper):
"""Custom stream object to replace stdio."""
def __init__(self, name, encoding="utf-8", newline=None):
self._buffer = io.BytesIO()
self._custom_name = name
super().__init__(self._buffer, encoding=encoding, newline=newline)
def close(self):
"""Provide this close method which is used by some tools."""
# This is intentionally empty.
def get_value(self) -> str:
"""Returns value from the buffer as string."""
self.seek(0)
return self.read()
@contextlib.contextmanager
def redirect_io(stream: str, new_stream):
"""Redirect stdio streams to a custom stream."""
old_stream = getattr(sys, stream)
setattr(sys, stream, new_stream)
yield
setattr(sys, stream, old_stream)
def get_headers():
headers = {}
while line := STDIN.readline().strip():
name, value = line.split(":", 1)
headers[name] = value.strip()
return headers
if __name__ == "__main__":
while not STDIN.closed:
try:
headers = get_headers()
content_length = int(headers.get("Content-Length", 0))
if content_length:
request_text = STDIN.read(content_length)
request_json = json.loads(request_text)
if request_json["method"] == "execute":
execute(request_json, USER_GLOBALS)
if request_json["method"] == "check_valid_command":
check_valid_command(request_json)
elif request_json["method"] == "exit":
sys.exit(0)
except Exception:
print_log(traceback.format_exc())