CSV Agent for Enhanced Data Interaction
Get plots and more with your csv agent
In the realm of data handling and analysis, CSV files are a staple. They are simple, universally used, and incredibly versatile for a wide array of data tasks. Numerous libraries and tools allow for querying and interacting with CSV data. However, a common limitation with many of these solutions is their output format, which is typically confined to text-only responses. While some tools support visual outputs such as plots, they often do so within the confines of a Jupyter notebook environment via standard output (stdout), which isn’t always ideal for application integration or dynamic web environments.
This limitation can significantly hinder the development of more interactive and user-friendly applications that require a seamless integration of data visualization and manipulation capabilities. The goal, therefore, is to find a more flexible way of querying CSV files that not only supports textual data but also integrates seamlessly with graphical data presentations outside of a notebook environment.
The Inspiration
The inspiration for a more robust solution came from Jeremy Howard’s approach in one of his lectures (link), where he discusses crafting a custom Python code interpreter aimed at enhancing user experience within Jupyter notebooks. While Jeremy’s solution greatly improves interaction within notebooks, it still falls short of providing a comprehensive experience outside this specific environment.
Usual Implementations
A typical workflow of such an agent is depicted as under:
The user’s query will be inserted into a prompt template that also takes in the first few rows of the dataframe (assuming the CSV has been loaded as a dataframe) of interest. The LLM will be asked to look at the query and generate a python code which when executed would provide the answer the user is looking for. Generally , the python tool when run gives a multitude of outputs that includes a plot, or a filtered dataframe, list , tuples, dictionary or a single item . This result (if possible) is inserted into another prompt template and the LLM is asked to summarise the result and serve it to the user. In Jeremy’s implementation of the python tool, the result of the last expression of the code is captured (code as under).
def run(code):
tree = ast.parse(code)
last_node = tree.body[-1] if tree.body else None
# If the last node is an expression, modify the AST to capture the result
if isinstance(last_node, ast.Expr):
tgts = [ast.Name(id='_result', ctx=ast.Store())]
assign = ast.Assign(targets=tgts, value=last_node.value)
tree.body[-1] = ast.fix_missing_locations(assign)
ns = {}
exec(compile(tree, filename='<ast>', mode='exec'), ns)
return ns.get('_result', None)
This indeed is a good solution and works fine in a notebook environment when you have plots or filtered dataframes or some print outputs. In case you need to serve the solution over an API , the tool would need to be modified. This is what this article aims at. It demonstrates the modification this tool and also integrating the rest of the workflow in a single python class which also gives you the conversation history as well as the current code generated by LLM in response to the query. The idea is to address some of the shortcomings and simplify the process.
Towards a Better Solution
The proposed CSV goes beyond the traditional text-only outputs. This agent is designed to interact with CSV files by not only querying data but also by presenting results in various forms including visual plots and filtered dataframes. The key to this enhanced functionality is the ability to not just generate outputs, but to also serialize these outputs in a format that can be easily integrated into different software applications or web interfaces.
Key Features
- Multi-Output Compatibility: The agent is capable of generating and handling multiple forms of output, from tables to complex graphical plots.
- Serialization of Visual Outputs: Unlike conventional tools that render plots directly to stdout, this agent serializes plot data into formats such as base64, allowing them to be easily embedded in web pages or transferred over networks.
- Flexible Integration: Designed with modularity in mind, the agent can be integrated into a variety of application backends and frontends, supporting both desktop and web-based environments.
How It Works
The CSV agent utilizes instructor library to get the desired response format from OpenAI model. Instructor also enables to specify how many times we would like the request to be sent so that the format adheres to the one provided. This lets you have control over the number of API calls you are making. Instructor is also used to get the corrected code on the basis of errors seen from code execution via a different pydantic response format.
Python’s ast
module is used to parse and execute queries on CSV data. Much of the heavy-lifting is being done by the code here.
def run_python_code(code):
try:
# Parse and modify the AST
tree = ast.parse(code)
last_node = tree.body[-1] if tree.body else None
# Modify AST to capture the result
temp_name = "_result"
if isinstance(last_node, ast.Expr):
new_assign = ast.Assign(targets=[ast.Name(id=temp_name, ctx=ast.Store())],
value=last_node.value)
new_assign.lineno = last_node.lineno
new_assign.col_offset = last_node.col_offset
tree.body[-1] = new_assign
elif isinstance(last_node, ast.Assign):
last_node.targets[0].id = temp_name
# Set proper lineno and col_offset
for node in ast.walk(tree):
if 'lineno' in node._attributes:
node.lineno = 1
if 'col_offset' in node._attributes:
node.col_offset = 0
# Setup stdout capture
buffer_stdout = io.StringIO()
sys.stdout = buffer_stdout
# Prepare to capture plots
plots_captured = []
original_show = plt.show
def custom_show(*args, **kwargs):
nonlocal plots_captured
capture_active_figures(plots_captured)
original_show(*args, **kwargs)
def capture_active_figures(plot_list):
figures = [plt.figure(i) for i in plt.get_fignums()]
for fig in figures:
buffer_plot = io.BytesIO()
fig.savefig(buffer_plot, format='png', bbox_inches='tight')
buffer_plot.seek(0)
plot_data = base64.b64encode(buffer_plot.read()).decode('utf-8')
plot_list.append(plot_data)
plt.close(fig) # Optionally close the fig after capture
plt.show = custom_show
# Execute the code
local_state = {}
exec(compile(tree, filename='<ast>', mode='exec'), globals(), local_state)
# Capture any remaining figures not handled by plt.show
capture_active_figures(plots_captured)
# Restore plt.show to its original state
plt.show = original_show
# Restore stdout to capture printed outputs
sys.stdout = sys.__stdout__
output = buffer_stdout.getvalue().strip()
# Prepare the result dictionary
result_dict = {'printed_output': output}
if plots_captured:
result_dict['plot_data'] = plots_captured
# Check for the last expression result
last_result = local_state.get(temp_name, None)
if last_result is not None:
result_dict['last_expression'] = last_result
return result_dict
except Exception as e:
return {'error': f"Error: {type(e).__name__}: {str(e)}"}
Core Components of the Function
- AST Manipulation:
- Parsing: The function starts by parsing the provided code string into an AST using ast.parse(). This converts the plain code into a tree structure we can manipulate.
- Modifying the Last Node: If the last node (piece of code) is an expression (something that evaluates to a value), we modify this AST to store its result in a temporary variable (_result). This is done by wrapping the expression in an assignment operation.
2. Redirection of Standard Output:
- Capturing Print Statements: All print outputs are redirected from the standard output to a buffer (StringIO object). This allows us to capture anything the code prints out for later use.
3. Plot Handling Using matplotlib:
- Custom plt.show(): The standard behavior of plt.show() is overridden (monkey-patched) to capture plots before they are displayed and potentially closed. This custom function captures the figures into a byte buffer, converts them to base64 (a text representation of binary data), and stores this for later use.
- Figure Capture: After the script execution, the function checks for any remaining open figures and captures them as well, ensuring no plots are missed even if they are not explicitly shown with plt.show().
4. Executing the Code:
- The AST, with modifications, is compiled back to code and executed within the function’s environment. Variables and results generated by this execution are captured into a local dictionary.
5. Collecting and Returning Results:
- Output Collection: Outputs captured include the redirected printed outputs, plots (if any), and the result of the last evaluated expression.
- Error Handling: Any errors during execution are caught and included in the results to help with debugging.
This method is used inside our agent class which is as under.
class PandasAgentOpenAI:
def __init__(self, api_key=None, temperature=0.1,data_dir="data"):
if api_key is None:
raise ValueError("API Key is required.")
self.client = instructor.patch(openai.OpenAI(api_key=api_key))
self.data_dir = data_dir
self.list_of_filenames = self.get_list_of_filenames()
self.system_prompt = SYSTEM_PROMPT
self.dataframe_prompt = DATAFRAME_PROMPT
self.dataframe_prompt_prefix = DATAFRAME_PROMPT_PREFIX
self.user_prompt_suffix = USER_PROMPT_SUFFIX
self.user_prompt_suffix_debugging = USER_PROMPT_SUFFIX_DEBUGGING
self.query_count = 0
self.previous_conversation_history = ""
self.max_retries_first_response = 2
self.max_retries_code_correction = 3
self.temperarure = temperature
def get_list_of_filenames(self):
return glob.glob(os.path.join(self.data_dir, '*.csv'))
def make_dataframe_prompt(self):
dataframe_prompt = ""
file_count = 0
for idx, filename in enumerate(self.list_of_filenames, start=1):
try:
globals()[f"df{idx}"] = pd.read_csv(filename)
dataframe_prompt += DATAFRAME_PROMPT.format(
i=idx, num=idx, filename=os.path.basename(filename),
df_head=str(globals()[f"df{idx}"].head(5).to_markdown())
)
file_count += 1
except Exception as e:
print(f"Error reading {filename}: {e}")
dataframe_aggregated = DATAFRAME_PROMPT_PREFIX.format(n_dataframes=file_count) + "\n\n" + dataframe_prompt
return dataframe_aggregated
def add_context(self, context):
self.previous_conversation_history += "\n" + context
def fetch_conversation_history(self):
return self.previous_conversation_history
def make_user_prompt(self):
user_prompt_prefix = self.make_dataframe_prompt()
return user_prompt_prefix
def run_python_code(self,code):
### as above
pass
def format_result(self, result_dict):
formatted_dict = result_dict.copy()
if 'last_expression' in result_dict:
last_exp = result_dict.get('last_expression')
if isinstance(last_exp, pd.DataFrame):
formatted_dict["dataframe"] = last_exp
elif isinstance(last_exp, list):
formatted_dict["list"] = last_exp
elif isinstance(last_exp, pd.Series):
formatted_dict["pandas_series"] = last_exp.to_frame()
elif isinstance(last_exp, dict):
formatted_dict["dict"] = last_exp
elif isinstance(last_exp, (int, float , str)):
formatted_dict["result_value"] = last_exp
return formatted_dict
def get_answer_to_query(self, query):
self.current_code = ""
self.query_count += 1
previous_conversation_history = self.fetch_conversation_history()
user_prompt_prefix = self.make_user_prompt()
user_prompt_suffix = self.user_prompt_suffix.format(
previous_conversation_history=previous_conversation_history, query=query
)
user_prompt = user_prompt_prefix + "\n\n" + user_prompt_suffix
#print(user_prompt)
model_response = self.client.chat.completions.create(
model="gpt-3.5-turbo",
max_retries=self.max_retries_first_response,
messages=[
{"role": "system", "content": self.system_prompt},
{"role": "user", "content": user_prompt},
],
temperature = self.temperarure,
response_model=PythonCodeForDataFrame,
)
self.current_code = model_response.python_code
print(model_response.python_code)
result_dict = self.run_python_code(model_response.python_code)
final_result = self.format_result(result_dict)
context = f"Query no. {self.query_count}: {query}"
context += "\n" f"Code generated for the query: {model_response.python_code}"
if "error" in final_result:
context += "\n" + f"Error: {final_result.get('error', None)}"
else:
if "dict" in final_result:
answer = str(final_result.get("dict"))
context += "\n" + f"Answer to the query: {answer}"
if "list" in final_result:
answer = str(final_result.get("list"))
context += "\n" + f"Answer to the query: {answer}"
if "result_value" in final_result:
answer = str(final_result.get("result_value"))
context += "\n" + f"Answer to the query: {answer}"
if "dataframe" in final_result:
final_result_df_head = str(final_result.get("dataframe").head(5).to_markdown())
context += "\n" + f"Answer to the query: The first few rows of the generated answer: \n{final_result_df_head}"
if "pandas_series" in final_result:
final_result_df_head = str(final_result.get("pandas_series").head(5).to_markdown())
context += "\n" + f"Answer to the query: The first few rows of the generated answer: \n{final_result_df_head}"
if "plot_data" in final_result:
context += "\n" + f"Answer to the query: A figure was plotted."
self.add_context(context)
#print(context)
return final_result
def get_code_correction(self, query, error):
self.current_code = ""
user_prompt_prefix = self.make_user_prompt()
user_prompt_suffix = self.user_prompt_suffix_debugging.format(query=query, python_code = self.current_code, error=error)
user_prompt = user_prompt_prefix + "\n\n" + user_prompt_suffix
model_response = self.client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[
{"role": "system", "content": self.system_prompt},
{"role": "user", "content": user_prompt},
],
temperature = self.temperarure,
response_model=CorrectedPythonCode,
)
self.current_code = model_response.corrected_python_code
print(model_response.corrected_python_code)
result_dict = self.run_python_code(model_response.corrected_python_code)
final_result = self.format_result(result_dict)
context = f"Query no. {self.query_count}: {query}"
context += "\n" f"Code re-generated for the query: {model_response.corrected_python_code}"
if "error" in final_result:
context += "\n" + f"Error: {final_result.get('error', None)}"
else:
if "dict" in final_result:
answer = str(final_result.get("dict"))
context += "\n" + f"Answer to the query: {answer}"
if "list" in final_result:
answer = str(final_result.get("list"))
context += "\n" + f"Answer to the query: {answer}"
if "result_value" in final_result:
answer = str(final_result.get("result_value"))
context += "\n" + f"Answer to the query: {answer}"
if "dataframe" in final_result :
final_result_df_head = str(final_result.get("dataframe").head(5).to_markdown())
context += "\n" + f"Answer to the query: The first few rows of the generated answer: \n{final_result_df_head}"
if "pandas_series" in final_result:
final_result_df_head = str(final_result.get("pandas_series").head(5).to_markdown())
context += "\n" + f"Answer to the query: The first few rows of the generated answer: \n{final_result_df_head}"
if "plot_data" in final_result:
context += "\n" + f"Answer to the query: A figure was plotted."
self.add_context(context)
return final_result
def run_agent(self, query):
self.current_answer = self.get_answer_to_query(query)
if "error" in self.current_answer or "":
for retry_num in range(self.max_retries_code_correction):
error = self.current_answer.get("error")
self.current_answer = self.get_code_correction(query, error)
if "error" not in self.current_answer:
break
return self.current_answer
You can easily integrate it with your streamlit app. The output will look like as under. The code for the same can be found here.
Current Limitations and Areas for Improvement
While this implementation of CSV agent improves the way we interact with CSV data, particularly in terms of output versatility and integration capabilities, there are several areas where it currently falls short. Understanding these limitations is crucial for continuous improvement and for setting the right expectations:
- Lack of Summarized Answers: As of now, the agent is designed to execute queries directly and return raw outputs without texual summarization. This means that users might receive detailed or too concise results which may not always be ideal when quick insights or summaries are needed.
- Integration with Multi-Modal Models Pending: The agent’s ability to integrate with multi-modal models, which can enhance the interpretation and presentation of data, is still under development. This integration is crucial for enabling more sophisticated data analysis capabilities, such as combining textual and visual data insights.
- Limited Use of Prompt Engineering Techniques: Techniques like ReAct and Chain of Thoughts (CoT), which can significantly enhance the contextuality and relevance of responses, have not yet been implemented. Incorporating these methods via advanced libraries such as DSPy could potentially elevate the agent’s effectiveness.
- Focus on Direct Code Execution Outputs: The current design prioritizes outputs that are directly derived from code execution. This approach ensures precision and reduces ambiguity in responses but can sometimes restrict the breadth of insights provided. There’s a potential to expand this to include AI-driven insights, especially for users seeking more exploratory or generative responses. (Recommended reading: https://www.microsoft.com/en-us/research/uploads/prodnew/2024/03/LLM4Code_Inspiration-1.pdf)
- Support Limited to OpenAI Models: Currently, the agent integrates solely with models from OpenAI. But other models are easy to patch.
Enhancing how we interact with CSV data through the CSV agent is an ongoing exploration. The current iteration of the agent introduces a more dynamic way to engage with data, breaking free from the limitations of traditional text-only outputs and notebook-confined visualizations. It’s about broadening our horizons and experimenting with alternative approaches that might better suit diverse needs and environments.
References: