Skip to content

Commit

Permalink
Merge pull request #72 from joshstevens19/feat/chatbots+streams
Browse files Browse the repository at this point in the history
Feat/chatbots+streams
  • Loading branch information
joshstevens19 authored Aug 6, 2024
2 parents 57e2bfc + f8751a6 commit f594f2b
Show file tree
Hide file tree
Showing 62 changed files with 7,372 additions and 75 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,4 @@ graphql/node_modules
rindexer_rust_playground/generated_csv/*/*.csv
.env
!.env.example
node_modules
2 changes: 2 additions & 0 deletions cli/src/commands/add.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,8 @@ pub async fn handle_add_contract_command(
dependency_events: None,
reorg_safe_distance: None,
generate_csv: None,
streams: None,
chat: None,
});

write_manifest(&manifest, &rindexer_yaml_path).map_err(|e| {
Expand Down
13 changes: 13 additions & 0 deletions cli/src/commands/new.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,15 @@ fn write_docker_compose(path: &Path) -> Result<(), WriteFileError> {
write_file(&path.join("docker-compose.yml"), generate_docker_file())
}

fn write_gitignore(path: &Path) -> Result<(), WriteFileError> {
write_file(
&path.join(".gitignore"),
r#".rindexer
generated_csv/**/*.txt
"#,
)
}

pub fn handle_new_command(
project_path: PathBuf,
project_type: ProjectType,
Expand Down Expand Up @@ -151,6 +160,8 @@ pub fn handle_new_command(
dependency_events: None,
reorg_safe_distance: None,
generate_csv: None,
streams: None,
chat: None,
}],
phantom: None,
global: None,
Expand Down Expand Up @@ -211,6 +222,8 @@ POSTGRES_PASSWORD=rindexer"#;
generate_rindexer_rust_project(&project_path);
}

write_gitignore(&project_path)?;

print_success_message(&success_message);

Ok(())
Expand Down
12 changes: 11 additions & 1 deletion core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,25 @@ tracing-subscriber = { version = "0.3", features = ["env-filter", "fmt", "time"]
chrono = "0.4.38"
log = "0.4.20"
colored = "2.0"
hex = "0.4.3"
uuid = { version = "1.10.0", features = ["v4"] }
# do not change version as have to match ethers at the moment
reqwest = { version = "0.11.27", features = ["json"] }
thread_local = "1.1"
native-tls = "0.2"
postgres-native-tls = "0.5"
aws-config = "1.5.0"
aws-sdk-sns = "1.37.0"
lapin = "2.5.0"
deadpool = { version = "0.12", features = ["rt_tokio_1"] }
deadpool-lapin = "0.12"
rdkafka = { version = "0.36", features = ["tokio"] }
teloxide = "0.12"
serenity = { version = "0.12", features = ["client", "framework"] }

# build
jemallocator = { version = "0.5.0", optional = true }
jemalloc-ctl = { version = "0.5.0", optional = true }
hex = "0.4.3"

[profile.release]
lto = "fat"
Expand Down
315 changes: 315 additions & 0 deletions core/src/chat/clients.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,315 @@
use std::sync::Arc;

use ethers::types::U64;
use futures::future::join_all;
use serde_json::Value;
use serenity::all::ChannelId;
use teloxide::types::ChatId;
use thiserror::Error;
use tokio::{
task,
task::{JoinError, JoinHandle},
};

use crate::{
chat::{
discord::{DiscordBot, DiscordError},
slack::{SlackBot, SlackError},
telegram::{TelegramBot, TelegramError},
template::Template,
},
event::{filter_event_data_by_conditions, EventMessage},
manifest::chat::{
ChatConfig, DiscordConfig, DiscordEvent, SlackConfig, SlackEvent, TelegramConfig,
TelegramEvent,
},
};

type SendMessage = Vec<JoinHandle<Result<(), ChatError>>>;

#[derive(Error, Debug)]
pub enum ChatError {
#[error("Telegram error: {0}")]
Telegram(#[from] TelegramError),

#[error("Discord error: {0}")]
Discord(#[from] DiscordError),

#[error("Slack error: {0}")]
Slack(#[from] SlackError),

#[error("Task failed: {0}")]
JoinError(JoinError),
}

#[derive(Debug, Clone)]
struct TelegramInstance {
config: TelegramConfig,
client: Arc<TelegramBot>,
}

#[derive(Debug)]
struct DiscordInstance {
config: DiscordConfig,
client: Arc<DiscordBot>,
}

#[derive(Debug)]
struct SlackInstance {
config: SlackConfig,
client: Arc<SlackBot>,
}

pub struct ChatClients {
telegram: Option<Vec<TelegramInstance>>,
discord: Option<Vec<DiscordInstance>>,
slack: Option<Vec<SlackInstance>>,
}

impl ChatClients {
pub async fn new(chat_config: ChatConfig) -> Self {
let telegram = chat_config.telegram.map(|config| {
config
.into_iter()
.map(|config| {
let client = Arc::new(TelegramBot::new(&config.bot_token));
TelegramInstance { config, client }
})
.collect()
});

let discord = chat_config.discord.map(|config| {
config
.into_iter()
.map(|config| {
let client = Arc::new(DiscordBot::new(&config.bot_token));
DiscordInstance { config, client }
})
.collect()
});

let slack = chat_config.slack.map(|config| {
config
.into_iter()
.map(|config| {
let client = Arc::new(SlackBot::new(config.bot_token.clone()));
SlackInstance { config, client }
})
.collect()
});

Self { telegram, discord, slack }
}

fn find_accepted_block_range(&self, from_block: &U64, to_block: &U64) -> U64 {
if from_block > to_block {
panic!("Invalid range: from_block must be less than or equal to to_block");
}

match from_block.overflowing_add(to_block - from_block) {
(result, false) => result,
(_, true) => U64::max_value(),
}
}

pub fn is_in_block_range_to_send(&self, from_block: &U64, to_block: &U64) -> bool {
// only 10 blocks at a time else rate limits will kick in
U64::from(10) <= self.find_accepted_block_range(from_block, to_block)
}

fn has_any_chat(&self) -> bool {
self.telegram.is_some() || self.discord.is_some() || self.slack.is_some()
}

fn telegram_send_message_tasks(
&self,
instance: &TelegramInstance,
event_for: &TelegramEvent,
events_data: &[Value],
) -> SendMessage {
let tasks: Vec<_> = events_data
.iter()
.filter(|event_data| {
if let Some(conditions) = &event_for.conditions {
filter_event_data_by_conditions(event_data, conditions)
} else {
true
}
})
.map(|event_data| {
let client = Arc::clone(&instance.client);
let chat_id = ChatId(instance.config.chat_id);
let message = Template::new(event_for.template_inline.clone())
.parse_template_inline(event_data);
task::spawn(async move {
client.send_message(chat_id, &message).await?;
Ok(())
})
})
.collect();
tasks
}

fn discord_send_message_tasks(
&self,
instance: &DiscordInstance,
event_for: &DiscordEvent,
events_data: &[Value],
) -> SendMessage {
let tasks: Vec<_> = events_data
.iter()
.filter(|event_data| {
if let Some(conditions) = &event_for.conditions {
filter_event_data_by_conditions(event_data, conditions)
} else {
true
}
})
.map(|event_data| {
let client = Arc::clone(&instance.client);
let channel_id = ChannelId::new(instance.config.channel_id);
let message = Template::new(event_for.template_inline.clone())
.parse_template_inline(event_data);
task::spawn(async move {
client.send_message(channel_id, &message).await?;
Ok(())
})
})
.collect();
tasks
}

fn slack_send_message_tasks(
&self,
instance: &SlackInstance,
event_for: &SlackEvent,
events_data: &[Value],
) -> SendMessage {
let tasks: Vec<_> = events_data
.iter()
.filter(|event_data| {
if let Some(conditions) = &event_for.conditions {
filter_event_data_by_conditions(event_data, conditions)
} else {
true
}
})
.map(|event_data| {
let client = Arc::clone(&instance.client);
let channel = instance.config.channel.clone();
let message = Template::new(event_for.template_inline.clone())
.parse_template_inline(event_data);
task::spawn(async move {
client.send_message(&channel, &message).await?;
Ok(())
})
})
.collect();
tasks
}

pub async fn send_message(
&self,
event_message: &EventMessage,
index_event_in_order: bool,
from_block: &U64,
to_block: &U64,
) -> Result<usize, ChatError> {
if !self.has_any_chat() || !self.is_in_block_range_to_send(from_block, to_block) {
return Ok(0);
}

// will always have something even if the event has no parameters due to the tx_information
if let Value::Array(data_array) = &event_message.event_data {
let mut messages: Vec<SendMessage> = Vec::new();

if let Some(telegram) = &self.telegram {
for instance in telegram {
if instance.config.networks.contains(&event_message.network) {
let telegram_event = instance
.config
.messages
.iter()
.find(|e| e.event_name == event_message.event_name);

if let Some(telegram_event) = telegram_event {
let message = self.telegram_send_message_tasks(
instance,
telegram_event,
data_array,
);
messages.push(message);
}
}
}
}

if let Some(discord) = &self.discord {
for instance in discord {
if instance.config.networks.contains(&event_message.network) {
let discord_event = instance
.config
.messages
.iter()
.find(|e| e.event_name == event_message.event_name);

if let Some(discord_event) = discord_event {
let message = self.discord_send_message_tasks(
instance,
discord_event,
data_array,
);
messages.push(message);
}
}
}
}

if let Some(slack) = &self.slack {
for instance in slack {
if instance.config.networks.contains(&event_message.network) {
let slack_event = instance
.config
.messages
.iter()
.find(|e| e.event_name == event_message.event_name);

if let Some(slack_event) = slack_event {
let message =
self.slack_send_message_tasks(instance, slack_event, data_array);
messages.push(message);
}
}
}
}

let mut messages_sent = 0;

if index_event_in_order {
for message in messages {
for publish in message {
match publish.await {
Ok(Ok(_)) => messages_sent += 1,
Ok(Err(e)) => return Err(e),
Err(e) => return Err(ChatError::JoinError(e)),
}
}
}
} else {
let tasks: Vec<_> = messages.into_iter().flatten().collect();
let results = join_all(tasks).await;
for result in results {
match result {
Ok(Ok(_)) => messages_sent += 1,
Ok(Err(e)) => return Err(e),
Err(e) => return Err(ChatError::JoinError(e)),
}
}
}

Ok(messages_sent)
} else {
unreachable!("Event data should be an array");
}
}
}
Loading

0 comments on commit f594f2b

Please sign in to comment.