Open-sourced generic, dynamic POC, RESTful alerting API

This commit is contained in:
2024-02-16 17:37:37 -07:00
parent b2ed41c549
commit 0d1d154c7a
70 changed files with 3272 additions and 0 deletions
+29
View File
@@ -0,0 +1,29 @@
import org.springframework.boot.gradle.tasks.bundling.BootJar
dependencies {
"implementation"(project(":persistence"))
"implementation"(project(":amqp"))
implementation("org.slf4j:slf4j-api")
implementation("org.springframework.boot:spring-boot-starter-data-jpa")
implementation("org.springframework.amqp:spring-amqp")
implementation("org.springframework.amqp:spring-rabbit")
implementation("org.springframework.boot:spring-boot-starter-web")
implementation("org.springframework.boot:spring-boot-starter-quartz")
implementation("org.springframework:spring-jdbc")
implementation("org.projectlombok:lombok")
implementation("org.apache.commons:commons-lang3")
implementation("org.apache.httpcomponents:fluent-hc")
implementation("com.google.code.gson:gson")
annotationProcessor("org.projectlombok:lombok")
}
tasks.getByName<BootJar>("bootJar") {
enabled = true
mainClass.set("com.poc.alerting.batch.BatchWorkerKt")
}
springBoot {
mainClass.set("com.poc.alerting.batch.BatchWorkerKt")
}
@@ -0,0 +1,52 @@
package com.poc.alerting.batch;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.stereotype.Component;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@Component
@Slf4j
public class AccountConsumerManager {
private volatile boolean shuttingDown = false;
@Getter private final Map<String, SimpleMessageListenerContainer> consumers = new ConcurrentHashMap<>();
private final Object lifecycleMonitor = new Object();
void registerAndStart(final String queueName, final SimpleMessageListenerContainer newListenerContainer) {
synchronized (this.lifecycleMonitor) {
if (shuttingDown) {
LOG.warn("Shutdown process is underway. Not registering consumer for queue {}", queueName);
return;
}
final SimpleMessageListenerContainer oldListenerContainer = consumers.get(queueName);
if (oldListenerContainer != null) {
oldListenerContainer.stop();
}
newListenerContainer.start();
consumers.put(queueName, newListenerContainer);
LOG.info("Registered a new consumer on queue {}", queueName);
}
}
public void stopConsumers() {
synchronized (this.lifecycleMonitor) {
shuttingDown = true;
LOG.info("Shutting down consumers on queues {}", consumers.keySet());
consumers.entrySet().parallelStream().forEach(entry -> {
LOG.info("Shutting down consumer on queue {}", entry.getKey());
try {
entry.getValue().stop();
} catch (final Exception e) {
LOG.error("Encountered error while stopping consumer on queue " + entry.getKey(), e);
}
});
LOG.info("Finished shutting down all consumers");
}
}
}
@@ -0,0 +1,67 @@
package com.poc.alerting.batch;
import java.util.Base64;
import java.util.stream.StreamSupport;
import org.apache.http.client.fluent.Request;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;
import com.google.gson.JsonArray;
import com.google.gson.JsonParser;
import lombok.AllArgsConstructor;
@Component
@AllArgsConstructor
public class ApplicationStartup implements ApplicationListener<ApplicationReadyEvent> {
ConnectionFactory connectionFactory;
MessageListenerAdapter accountWorkerListenerAdapter;
MessageConverter gsonMessageConverter;
ApplicationEventPublisher applicationEventPublisher;
ConsumerCreator consumerCreator;
private static final Logger LOG = LoggerFactory.getLogger(ApplicationStartup.class);
@Override
public void onApplicationEvent(@NotNull final ApplicationReadyEvent event) {
LOG.info("Creating consumers for existing queues");
try {
final String rabbitMqUrl = String.format("http://%s:15672/api/exchanges/poc/alerting/bindings/source", "localhost");
//auth is kind of a kluge here. Apparently the HttpClient Fluent API doesn't support
//it except by explicitly setting the auth header.
final String json = Request.Get(rabbitMqUrl)
.connectTimeout(1000)
.socketTimeout(1000)
.addHeader("Authorization", "Basic " + getAuthToken())
.execute().returnContent().asString();
final JsonParser parser = new JsonParser();
final JsonArray array = parser.parse(json).getAsJsonArray();
StreamSupport.stream(array.spliterator(), false)
.map(jsonElement -> jsonElement.getAsJsonObject().get("destination").getAsString())
.forEach(queueName -> consumerCreator.createConsumer(queueName));
} catch (final Exception e) {
LOG.error("Error create consumers for existing queues", e);
}
}
private String getAuthToken() {
final String basicPlaintext = "poc" + ":" + "s!mpleP@ssw0rd";
final Base64.Encoder encoder = Base64.getEncoder();
return encoder.encodeToString(basicPlaintext.getBytes());
}
}
@@ -0,0 +1,11 @@
package com.poc.alerting.batch;
import org.apache.commons.logging.Log;
import org.springframework.amqp.support.ConditionalExceptionLogger;
public class ExclusiveConsumerExceptionLogger implements ConditionalExceptionLogger {
@Override
public void log(final Log logger, final String message, final Throwable t) {
//do not log exclusive consumer warnings
}
}
@@ -0,0 +1,65 @@
package com.poc.alerting.batch;
import java.io.IOException;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import org.apache.commons.lang3.time.DateUtils;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import com.poc.alerting.amqp.AmqpMessage;
import com.poc.alerting.amqp.AmqpResponse;
import com.poc.alerting.amqp.ExceptionType;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import static com.poc.alerting.amqp.AmqpConfiguration.AMQP_NAME;
import static com.poc.alerting.batch.WorkerConfiguration.NEW_CONSUMER;
@Slf4j
@Component
@AllArgsConstructor
public class QueueCreator {
private final RabbitAdmin rabbitAdmin;
private final RabbitTemplate rabbitTemplate;
private final DirectExchange directExchange;
public <T> AmqpResponse<T> createQueue(final AmqpMessage amqpMessage) throws IOException, ClassNotFoundException {
final String routingKey = amqpMessage.getRoutingKey();
LOG.info("Attempting to create new queue {}", routingKey);
setupQueueForAccount(routingKey);
final AmqpResponse response = (AmqpResponse) rabbitTemplate.convertSendAndReceive(AMQP_NAME, routingKey, amqpMessage);
if (response != null && ExceptionType.INVALID_ACCOUNT_EXCEPTION.equals(response.getExceptionType())) {
LOG.info("Invalid account, removing queue {}", routingKey);
CompletableFuture.runAsync(() -> rabbitAdmin.deleteQueue(routingKey, false, true));
}
return response;
}
private void setupQueueForAccount(final String routingKey) {
final Properties properties = rabbitAdmin.getQueueProperties(routingKey);
if (properties == null) {
final Queue queue = QueueBuilder.nonDurable(routingKey)
.withArgument("x-expires", DateUtils.MILLIS_PER_DAY)
.build();
rabbitAdmin.declareQueue(queue);
rabbitAdmin.declareBinding(BindingBuilder.bind(queue).to(directExchange).with(routingKey));
sendCreateConsumerMessage(routingKey);
} else if ((Integer) properties.get(RabbitAdmin.QUEUE_CONSUMER_COUNT) < 1) {
LOG.info("{} queue already exists. Adding consumer.", routingKey);
sendCreateConsumerMessage(routingKey);
}
}
private void sendCreateConsumerMessage(final String queueName) {
rabbitTemplate.convertAndSend(NEW_CONSUMER, "", queueName);
}
}
@@ -0,0 +1,101 @@
package com.poc.alerting.batch
import com.poc.alerting.amqp.AlertingAmqpMessage
import com.poc.alerting.amqp.AlertingAmqpMessage.Add
import com.poc.alerting.amqp.AlertingAmqpMessage.Delete
import com.poc.alerting.amqp.AlertingAmqpMessage.Pause
import com.poc.alerting.amqp.AlertingAmqpMessage.Resume
import com.poc.alerting.amqp.AlertingAmqpMessage.Update
import com.poc.alerting.batch.jobs.AlertQueryJob
import com.poc.alerting.batch.jobs.AlertQueryJob.Companion.ACCOUNT_ID
import com.poc.alerting.batch.jobs.AlertQueryJob.Companion.ALERT_ID
import com.poc.alerting.batch.jobs.AlertQueryJob.Companion.CRON
import com.poc.alerting.persistence.dto.Alert
import com.poc.alerting.persistence.repositories.AlertRepository
import org.quartz.CronScheduleBuilder
import org.quartz.JobBuilder
import org.quartz.JobDataMap
import org.quartz.JobDetail
import org.quartz.JobKey
import org.quartz.Scheduler
import org.quartz.Trigger
import org.quartz.TriggerBuilder
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Service
import java.time.ZoneId
import java.util.TimeZone
@Service
open class AccountWorker @Autowired constructor(
private val alertRepository: AlertRepository,
private val scheduler: Scheduler
){
fun processMessage(message: AlertingAmqpMessage) {
when (message) {
is Add -> createJob(message.alertId, message.accountId, message.frequency)
is Update -> updateJob(message.alertId, message.accountId, message.frequency)
is Delete -> deleteJob(message.alertId, message.accountId)
is Pause -> pauseJob(message.alertId, message.accountId)
is Resume -> resumeJob(message.alertId, message.accountId)
}
}
private fun createJob(alertId: String, accountId: String, cron: String): Alert {
val jobDetail = buildJob(alertId, accountId, cron)
val trigger = createTrigger(alertId, jobDetail, cron)
with (scheduler) {
scheduleJob(jobDetail, trigger)
start()
}
return alertRepository.findByExtIdAndAccount_Id(alertId, accountId)
}
private fun updateJob(alertId: String, accountId: String, cron: String): Alert {
scheduler.deleteJob(JobKey.jobKey(alertId, accountId))
return createJob(alertId, accountId, cron)
}
private fun deleteJob(alertId: String, accountId: String): Alert {
val alert = alertRepository.findByExtIdAndAccount_Id(alertId, accountId)
scheduler.deleteJob(JobKey.jobKey(alertId, accountId))
alertRepository.delete(alert)
return alert
}
private fun pauseJob(alertId: String, accountId: String): Alert {
scheduler.pauseJob(JobKey.jobKey(alertId, accountId))
return alertRepository.findByExtIdAndAccount_Id(alertId, accountId)
}
private fun resumeJob(alertId: String, accountId: String): Alert {
scheduler.resumeJob(JobKey.jobKey(alertId, accountId))
return alertRepository.findByExtIdAndAccount_Id(alertId, accountId)
}
private fun buildJob(alertId: String, accountId: String, cron: String): JobDetail {
val jobDataMap = JobDataMap()
jobDataMap[ALERT_ID] = alertId
jobDataMap[ACCOUNT_ID] = accountId
jobDataMap[CRON] = cron
return JobBuilder.newJob().ofType(AlertQueryJob::class.java)
.storeDurably()
.withIdentity(alertId, accountId)
.usingJobData(jobDataMap)
.build()
}
private fun createTrigger(alertId: String, jobDetail: JobDetail, cron: String): Trigger {
return TriggerBuilder.newTrigger()
.forJob(jobDetail)
.withIdentity("${alertId}_trigger")
.withSchedule(
CronScheduleBuilder.cronSchedule(cron)
.withMisfireHandlingInstructionFireAndProceed()
.inTimeZone(TimeZone.getTimeZone(ZoneId.systemDefault()))
)
.usingJobData("cron", cron)
.build()
}
}
@@ -0,0 +1,37 @@
package com.poc.alerting.batch
import org.quartz.Job
import org.quartz.SchedulerContext
import org.quartz.spi.TriggerFiredBundle
import org.springframework.beans.MutablePropertyValues
import org.springframework.beans.PropertyAccessorFactory
import org.springframework.context.ApplicationContext
import org.springframework.context.ApplicationContextAware
import org.springframework.scheduling.quartz.SpringBeanJobFactory
class AutowiringSpringBeanJobFactory : SpringBeanJobFactory(), ApplicationContextAware {
private var ctx: ApplicationContext? = null
private var schedulerContext: SchedulerContext? = null
override fun setApplicationContext(context: ApplicationContext) {
ctx = context
}
override fun createJobInstance(bundle: TriggerFiredBundle): Any {
val job: Job = ctx!!.getBean(bundle.jobDetail.jobClass)
val bw = PropertyAccessorFactory.forBeanPropertyAccess(job)
val pvs = MutablePropertyValues()
pvs.addPropertyValues(bundle.jobDetail.jobDataMap)
pvs.addPropertyValues(bundle.trigger.jobDataMap)
if (this.schedulerContext != null) {
pvs.addPropertyValues(this.schedulerContext)
}
bw.setPropertyValues(pvs, true)
return job
}
override fun setSchedulerContext(schedulerContext: SchedulerContext) {
this.schedulerContext = schedulerContext
super.setSchedulerContext(schedulerContext)
}
}
@@ -0,0 +1,16 @@
package com.poc.alerting.batch
import org.springframework.boot.Banner
import org.springframework.boot.WebApplicationType
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.builder.SpringApplicationBuilder
@SpringBootApplication(scanBasePackages = ["com.poc.alerting"])
open class BatchWorker
fun main(args: Array<String>) {
SpringApplicationBuilder().sources(BatchWorker::class.java)
.bannerMode(Banner.Mode.OFF)
.web(WebApplicationType.NONE)
.run(*args)
}
@@ -0,0 +1,31 @@
package com.poc.alerting.batch
import org.apache.commons.lang3.time.DateUtils
import org.springframework.amqp.rabbit.connection.ConnectionFactory
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.context.ApplicationEventPublisher
import org.springframework.stereotype.Component
@Component
open class ConsumerCreator @Autowired constructor(
private val connectionFactory: ConnectionFactory,
private val accountWorkerListenerAdapter: MessageListenerAdapter,
private val applicationEventPublisher: ApplicationEventPublisher,
private val accountConsumerManager: AccountConsumerManager
){
fun createConsumer(queueName: String) {
val consumer = SimpleMessageListenerContainer(connectionFactory)
consumer.setExclusive(true)
consumer.setExclusiveConsumerExceptionLogger(ExclusiveConsumerExceptionLogger())
consumer.setQueueNames(queueName)
consumer.setMessageListener(accountWorkerListenerAdapter)
consumer.setIdleEventInterval(DateUtils.MILLIS_PER_HOUR)
consumer.setApplicationEventPublisher(applicationEventPublisher)
consumer.setDefaultRequeueRejected(false)
consumer.setAutoDeclare(false)
consumer.setShutdownTimeout(DateUtils.MILLIS_PER_SECOND * 10)
accountConsumerManager.registerAndStart(queueName, consumer)
}
}
@@ -0,0 +1,97 @@
package com.poc.alerting.batch
import com.poc.alerting.amqp.AmqpConfiguration.Companion.NEW_ACCOUNT
import com.poc.alerting.amqp.GsonMessageConverter
import org.springframework.amqp.core.Binding
import org.springframework.amqp.core.BindingBuilder
import org.springframework.amqp.core.FanoutExchange
import org.springframework.amqp.core.Queue
import org.springframework.amqp.rabbit.connection.ConnectionFactory
import org.springframework.amqp.rabbit.core.RabbitAdmin
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.scheduling.annotation.AsyncConfigurer
import org.springframework.scheduling.annotation.EnableAsync
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor
import java.net.InetAddress
import java.util.concurrent.Executor
@Configuration
@EnableAsync
open class WorkerConfiguration: AsyncConfigurer {
companion object {
const val NEW_CONSUMER = "new_consumer"
}
@Bean("asyncExecutor")
override fun getAsyncExecutor(): Executor {
return ThreadPoolTaskExecutor()
}
@Bean
open fun rabbitAdmin(connectionFactory: ConnectionFactory): RabbitAdmin {
return RabbitAdmin(connectionFactory)
}
@Bean
open fun queueCreatorListenerAdapter(queueCreator: QueueCreator, gsonMessageConverter: GsonMessageConverter): MessageListenerAdapter {
return MessageListenerAdapter(queueCreator, "createQueue").apply {
setMessageConverter(gsonMessageConverter)
}
}
@Bean
open fun queueCreatorContainer(connectionFactory: ConnectionFactory, queueCreatorListenerAdapter: MessageListenerAdapter,
gsonMessageConverter: GsonMessageConverter): SimpleMessageListenerContainer {
return SimpleMessageListenerContainer(connectionFactory).apply {
setExclusive(true)
setExclusiveConsumerExceptionLogger(ExclusiveConsumerExceptionLogger())
setQueueNames(NEW_ACCOUNT)
setMessageListener(queueCreatorListenerAdapter)
setDefaultRequeueRejected(false)
}
}
@Bean
open fun newConsumerExchange(): FanoutExchange {
return FanoutExchange(NEW_CONSUMER)
}
@Bean
open fun newConsumerQueue(): Queue {
return Queue("${NEW_CONSUMER}_${InetAddress.getLocalHost().hostName}", true)
}
@Bean
open fun newConsumerBinding(newConsumerQueue: Queue, newConsumerExchange: FanoutExchange): Binding {
return BindingBuilder.bind(newConsumerQueue).to(newConsumerExchange)
}
@Bean
open fun consumerCreatorListenerAdapter(consumerCreator: ConsumerCreator, gsonMessageConverter: GsonMessageConverter): MessageListenerAdapter {
return MessageListenerAdapter(consumerCreator, "createConsumer").apply {
setMessageConverter(gsonMessageConverter)
}
}
@Bean
open fun consumerCreatorContainer(connectionFactory: ConnectionFactory, consumerCreatorListenerAdapter: MessageListenerAdapter,
newConsumerQueue: Queue): SimpleMessageListenerContainer {
return SimpleMessageListenerContainer(connectionFactory).apply {
setExclusive(true)
setExclusiveConsumerExceptionLogger(ExclusiveConsumerExceptionLogger())
setQueues(newConsumerQueue)
setMessageListener(consumerCreatorListenerAdapter)
setDefaultRequeueRejected(false)
}
}
@Bean
open fun accountWorkerListenerAdapter(accountWorker: AccountWorker, gsonMessageConverter: GsonMessageConverter): MessageListenerAdapter {
return MessageListenerAdapter(accountWorker, "processMessage").apply {
setMessageConverter(gsonMessageConverter)
}
}
}
@@ -0,0 +1,50 @@
package com.poc.alerting.batch.jobs
import com.poc.alerting.persistence.repositories.AlertRepository
import org.quartz.Job
import org.quartz.JobExecutionContext
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Component
import java.time.Duration
import java.util.Date
@Component
open class AlertQueryJob @Autowired constructor(
private val alertRepository: AlertRepository
): Job {
companion object {
const val ALERT_ID = "alertId"
const val CRON = "cron"
const val ACCOUNT_ID = "accountId"
}
override fun execute(context: JobExecutionContext) {
val data = context.jobDetail.jobDataMap
val alertId = data.getString(ALERT_ID)
val cron = data.getString(CRON)
val accountId = data.getString(ACCOUNT_ID)
val alert = alertRepository.findByExtIdAndAccount_Id(alertId, accountId)
with(alert) {
val queryResult = type.query()
println("PERFORMING QUERY for $alertId-$accountId. Running with the following CRON expression: $cron")
if (queryResult >= threshold.toInt()) {
val currentTime = Date()
if (!isTriggered) {
isTriggered = true
}
notificationSentTimestamp.let {
if (Duration.between(notificationSentTimestamp.toInstant(), currentTime.toInstant()).toSeconds() >= 15) {
println("Alert Triggered!!!!!!!!!!!!!")
notificationSentTimestamp = currentTime
}
}
lastTriggerTimestamp = currentTime
alertRepository.save(this)
}
}
}
}
@@ -0,0 +1,33 @@
package com.poc.alerting.batch.jobs
import com.poc.alerting.batch.AutowiringSpringBeanJobFactory
import org.quartz.spi.JobFactory
import org.springframework.boot.autoconfigure.EnableAutoConfiguration
import org.springframework.context.ApplicationContext
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.ComponentScan
import org.springframework.context.annotation.Configuration
import org.springframework.scheduling.quartz.SchedulerFactoryBean
import javax.sql.DataSource
@Configuration
@ComponentScan
@EnableAutoConfiguration
open class ScheduleAlertQueryConfiguration {
@Bean
open fun jobFactory(applicationContext: ApplicationContext): JobFactory {
return AutowiringSpringBeanJobFactory().apply {
setApplicationContext(applicationContext)
}
}
@Bean
open fun schedulerFactory(applicationContext: ApplicationContext, dataSource: DataSource, jobFactory: JobFactory): SchedulerFactoryBean {
return SchedulerFactoryBean().apply {
setOverwriteExistingJobs(true)
isAutoStartup = true
setDataSource(dataSource)
setJobFactory(jobFactory)
}
}
}
+19
View File
@@ -0,0 +1,19 @@
server:
port: 0
servlet:
encoding:
charset: UTF-8
enabled: true
spring:
profiles:
active: "batch"
datasource:
url: "jdbc:h2:tcp://localhost:9091/mem:alerting;DATABASE_TO_LOWER=TRUE;CASE_INSENSITIVE_IDENTIFIERS=TRUE;"
username: "defaultUser"
password: "secret"
application:
name: BatchWorker
jackson:
time-zone: UTC
main:
allow-bean-definition-overriding: true