Last active
March 26, 2023 03:25
-
-
Save IzumiSatoshi/fd073a04f6e2210afd928f1c557d6890 to your computer and use it in GitHub Desktop.
ChatGPT on Blender
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
packages_path = ( | |
"C:\\Users\\81809\\AppData\\Roaming\\Python\\Python310\\Scripts" | |
+ "\\..\\site-packages" | |
) | |
sys.path.insert(0, packages_path) | |
import openai | |
import threading | |
import re | |
import bpy | |
import functools | |
import azure.cognitiveservices.speech as speechsdk | |
import pyaudio | |
from collections import deque | |
import numpy as np | |
import time | |
import wave | |
azure_api_key = open("E:/Projects/GPT_on_blender/azure_key.txt", "r").read().strip("\n") | |
openai.api_key = ( | |
open("E:/Projects/GPT_on_blender/openai_key.txt", "r").read().strip("\n") | |
) | |
tmp_dir = "E:/Projects/GPT_on_blender/tmp" | |
# This example requires environment variables named "SPEECH_KEY" and "SPEECH_REGION" | |
speech_config = speechsdk.SpeechConfig(subscription=azure_api_key, region="japaneast") | |
audio_config = speechsdk.audio.AudioOutputConfig(use_default_speaker=True) | |
# The language of the voice that speaks. | |
speech_config.speech_synthesis_voice_name = "en-GB-SoniaNeural" | |
speech_synthesizer = speechsdk.SpeechSynthesizer( | |
speech_config=speech_config, audio_config=audio_config | |
) | |
message_history = [ | |
{ | |
"role": "system", | |
"content": ( | |
"You are my partner to execute what I have in mind on blender. Follow the my instructions exactly and friendly. \n" | |
"- Do not write python code until you are told to actually do something. \n" | |
" - When writing scripts, be sure to write them in a single block (``` ```). \n" | |
" - The code you write will be executed automatically, so do not explain how to execute it. \n" | |
" - Don't output expression words like 'oh', 'lol', 'ah'. \n" | |
" - Don't explain about your script." | |
" - Blender version is 3.3" | |
), | |
}, | |
] | |
def transcribe(filepath): | |
audio_file = open(filepath, "rb") | |
transcript = openai.Audio.transcribe("whisper-1", audio_file, language="en") | |
return transcript["text"] | |
def call_gpt(message_history): | |
completion = openai.ChatCompletion.create( | |
model="gpt-3.5-turbo", messages=message_history | |
) | |
reply_content = completion.choices[0].message.content | |
return reply_content | |
def tts(txt): | |
global speech_synthesizer | |
ssml_string = f""" | |
<speak version="1.0" xmlns="https://www.w3.org/2001/10/synthesis" | |
xmlns:mstts="https://www.w3.org/2001/mstts" xml:lang="en-US" > | |
<voice name="en-US-SaraNeural"> | |
<prosody pitch="15%"> | |
<mstts:express-as style="friendly" styledegree="1"> | |
{txt} | |
</mstts:express-as> | |
</prosody> | |
</voice> | |
</speak> | |
""" | |
speech_synthesizer.speak_ssml_async(ssml_string).get() | |
def thread_func(): | |
global message_history | |
def record_audio(output_filename, stream, prepend_audio, silence_duration): | |
print("Recording...") | |
frames = list(prepend_audio) | |
silence_chunks = 0 | |
silence_threshold = int(rate / chunk_size * silence_duration) | |
while silence_chunks < silence_threshold: | |
data = stream.read(chunk_size) | |
audio_data = np.frombuffer(data, dtype=np.int16) | |
volume = np.linalg.norm(audio_data) / np.sqrt(len(audio_data)) | |
if volume < volume_threshold: | |
silence_chunks += 1 | |
else: | |
silence_chunks = 0 | |
frames.append(data) | |
with wave.open(output_filename, "wb") as wav_file: | |
wav_file.setnchannels(channels) | |
wav_file.setsampwidth(p.get_sample_size(format)) | |
wav_file.setframerate(rate) | |
wav_file.writeframes(b"".join(frames)) | |
chunk_size = 1024 | |
format = pyaudio.paInt16 | |
channels = 1 | |
rate = 44100 | |
volume_threshold = 1500 | |
buffer_duration = 0.5 # Duration (in seconds) of audio to prepend | |
silence_duration = 1 # Duration (in seconds) of silence to stop recording | |
p = pyaudio.PyAudio() | |
stream = p.open( | |
format=format, | |
channels=channels, | |
rate=rate, | |
input=True, | |
frames_per_buffer=chunk_size, | |
) | |
is_recording = False | |
buffer_size = int(rate / chunk_size * buffer_duration) | |
audio_buffer = deque(maxlen=buffer_size) | |
print("you can speak...") | |
try: | |
while True: | |
data = stream.read(chunk_size, exception_on_overflow=False) | |
audio_data = np.frombuffer(data, dtype=np.int16) | |
volume = np.linalg.norm(audio_data) / np.sqrt(len(audio_data)) | |
audio_buffer.append(data) | |
if volume > volume_threshold and not is_recording: | |
is_recording = True | |
output_filename = f"{tmp_dir}/output_{int(time.time())}.wav" | |
record_audio(output_filename, stream, audio_buffer, silence_duration) | |
user_input = transcribe(output_filename) | |
print("{USER INPUT}") | |
print(user_input) | |
user_input = user_input.strip().lower() | |
if user_input == "quit" or user_input == "quit.": | |
print("break") | |
break | |
tts_user_input_th = threading.Thread(target=tts, args=(f"You say '{user_input}', right?", )) | |
tts_user_input_th.start() | |
# get objects info | |
obj_info = ( | |
"current objects infomation is following: \n" | |
"'object name' : 'object location' \n" | |
) | |
for obj in bpy.context.scene.objects: | |
obj_info += f"{obj.name} : {obj.location} \n" | |
obj_info += "\n" | |
prompt = obj_info + user_input | |
print("{PROMPT}") | |
print(prompt) | |
message_history.append( | |
{"role": "user", "content": prompt}, | |
) | |
rep = call_gpt(message_history) | |
message_history.append({"role": "assistant", "content": rep}) | |
tts_user_input_th.join() | |
print("{ASSISTANT}") | |
print(rep) | |
code, conversation = separate_code(rep) | |
print("{CONVERSATION}") | |
print(conversation) | |
print("{CODE}") | |
print(code) | |
if code is not None: | |
bpy.app.timers.register(functools.partial(execute_code, code)) | |
tts(conversation) | |
is_recording = False | |
print("you can speak...") | |
except KeyboardInterrupt: | |
print("Stopped monitoring audio") | |
finally: | |
stream.stop_stream() | |
stream.close() | |
p.terminate() | |
def execute_code(code): | |
global message_history | |
try: | |
exec(code) | |
except Exception as e: | |
error_message = str(e) | |
print("{ERROR}") | |
print(error_message) | |
message_history.append( | |
{"role": "user", "content": error_message}, | |
) | |
return None # unregister | |
def separate_code(chatgpt_response): | |
# Define a regular expression pattern for Python code | |
code_pattern = r"(?s)```(?:python)?(.*?)```" | |
# Search for Python code in the response | |
# python_code_matches = re.findall(python_code_pattern, chatgpt_response) | |
code_blocks = re.findall(code_pattern, chatgpt_response, re.DOTALL) | |
conversation = re.sub(code_pattern, "", chatgpt_response, flags=re.DOTALL).strip() | |
if code_blocks: | |
code_block = code_blocks[0].strip() | |
else: | |
code_block = None | |
return code_block, conversation | |
thread = threading.Thread(target=thread_func) | |
thread.start() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment